diff --git a/agentic_security/config.py b/agentic_security/config.py index f5d8341..c9421ab 100644 --- a/agentic_security/config.py +++ b/agentic_security/config.py @@ -63,3 +63,56 @@ class CfgMixin: else: return default return value + + def generate_default_cfg(self, host: str = "0.0.0.0", port: int = 8718): + # Accept host / port as parameters + with open(self.default_path, "w") as f: + f.write( + """ +[general] +# General configuration for the security scan +llmSpec = \""" +POST http://$HOST:$PORT/v1/self-probe +Authorization: Bearer XXXXX +Content-Type: application/json + +{ + "prompt": "<>" +} +\""" # LLM API specification +maxBudget = 1000000 # Maximum budget for the scan +max_th = 0.3 # Maximum failure threshold (percentage) +optimize = false # Enable optimization during scanning +enableMultiStepAttack = false # Enable multi-step attack simulations + +# [modules.LLM-Jailbreak-Classifier] +# dataset_name = "markush1/LLM-Jailbreak-Classifier" + +[modules.aya-23-8B_advbench_jailbreak] +dataset_name = "simonycl/aya-23-8B_advbench_jailbreak" + + +[modules.AgenticBackend] +dataset_name = "AgenticBackend" +[modules.AgenticBackend.opts] +port = $PORT +modules = ["encoding"] + + +[thresholds] +# Threshold settings +low = 0.15 +medium = 0.3 +high = 0.5 + + +""".replace( + "$HOST", host + ).replace( + "$PORT", str(port) + ) + ) + + logger.info( + f"Default configuration generated successfully to {self.default_path}." + ) diff --git a/agentic_security/dependencies.py b/agentic_security/dependencies.py new file mode 100644 index 0000000..65cd86e --- /dev/null +++ b/agentic_security/dependencies.py @@ -0,0 +1,28 @@ +from fastapi import Depends +from agentic_security.config import CfgMixin + + +class InMemorySecrets: + def __init__(self): + self.secrets = {} + self.config = CfgMixin() + self.config.get_or_create_config() + self.secrets = self.config.config.get("secrets", {}) + + def set_secret(self, key: str, value: str): + self.secrets[key] = value + + def get_secret(self, key: str) -> str: + return self.secrets.get(key, None) + + +# Dependency +def get_in_memory_secrets() -> InMemorySecrets: + return InMemorySecrets() + + +# Example usage in a FastAPI route +# @app.get("/some-endpoint") +# async def some_endpoint(secrets: InMemorySecrets = Depends(get_in_memory_secrets)): +# # Use secrets here +# pass diff --git a/agentic_security/lib.py b/agentic_security/lib.py index bb7ebbf..555fe9f 100644 --- a/agentic_security/lib.py +++ b/agentic_security/lib.py @@ -216,59 +216,6 @@ class AgenticSecurity(CfgMixin): ), ) - def generate_default_cfg(self, host: str = "0.0.0.0", port: int = 8718): - # Accept host / port as parameters - with open(self.default_path, "w") as f: - f.write( - """ -[general] -# General configuration for the security scan -llmSpec = \""" -POST http://$HOST:$PORT/v1/self-probe -Authorization: Bearer XXXXX -Content-Type: application/json - -{ - "prompt": "<>" -} -\""" # LLM API specification -maxBudget = 1000000 # Maximum budget for the scan -max_th = 0.3 # Maximum failure threshold (percentage) -optimize = false # Enable optimization during scanning -enableMultiStepAttack = false # Enable multi-step attack simulations - -# [modules.LLM-Jailbreak-Classifier] -# dataset_name = "markush1/LLM-Jailbreak-Classifier" - -[modules.aya-23-8B_advbench_jailbreak] -dataset_name = "simonycl/aya-23-8B_advbench_jailbreak" - - -[modules.AgenticBackend] -dataset_name = "AgenticBackend" -[modules.AgenticBackend.opts] -port = $PORT -modules = ["encoding"] - - -[thresholds] -# Threshold settings -low = 0.15 -medium = 0.3 -high = 0.5 - - -""".replace( - "$HOST", host - ).replace( - "$PORT", str(port) - ) - ) - - logger.info( - f"Default configuration generated successfully to {self.default_path}." - ) - def list_checks(self): """ Print the REGISTRY contents as a table using the rich library. diff --git a/agentic_security/routes/scan.py b/agentic_security/routes/scan.py index 7d460b7..800e56f 100644 --- a/agentic_security/routes/scan.py +++ b/agentic_security/routes/scan.py @@ -1,9 +1,18 @@ from datetime import datetime -from fastapi import APIRouter, BackgroundTasks, File, HTTPException, Query, UploadFile +from fastapi import ( + APIRouter, + BackgroundTasks, + Depends, + File, + HTTPException, + Query, + UploadFile, +) from fastapi.responses import StreamingResponse from ..core.app import get_stop_event, get_tools_inbox, set_current_run +from ..dependencies import InMemorySecrets, get_in_memory_secrets from ..http_spec import LLMSpec from ..models.schemas import LLMInfo, Scan from ..probe_actor import fuzzer @@ -12,7 +21,9 @@ router = APIRouter() @router.post("/verify") -async def verify(info: LLMInfo): +async def verify( + info: LLMInfo, secrets: InMemorySecrets = Depends(get_in_memory_secrets) +): spec = LLMSpec.from_string(info.spec) r = await spec.verify() if r.status_code >= 400: @@ -42,7 +53,11 @@ def streaming_response_generator(scan_parameters: Scan): @router.post("/scan") -async def scan(scan_parameters: Scan, background_tasks: BackgroundTasks): +async def scan( + scan_parameters: Scan, + background_tasks: BackgroundTasks, + secrets: InMemorySecrets = Depends(get_in_memory_secrets), +): return StreamingResponse( streaming_response_generator(scan_parameters), media_type="application/json" ) @@ -62,6 +77,7 @@ async def scan_csv( optimize: bool = Query(False), maxBudget: int = Query(10_000), enableMultiStepAttack: bool = Query(False), + secrets: InMemorySecrets = Depends(get_in_memory_secrets), ): # TODO: content dataset to fuzzer content = await file.read() # noqa diff --git a/agentic_security/test_dependencies.py b/agentic_security/test_dependencies.py new file mode 100644 index 0000000..0ebe78f --- /dev/null +++ b/agentic_security/test_dependencies.py @@ -0,0 +1,15 @@ +from agentic_security.dependencies import InMemorySecrets, get_in_memory_secrets + + +def test_in_memory_secrets(): + secrets = InMemorySecrets() + secrets.set_secret("api_key", "12345") + assert secrets.get_secret("api_key") == "12345" + assert secrets.get_secret("non_existent_key") is None + + +def test_get_in_memory_secrets(): + secrets = get_in_memory_secrets() + assert isinstance(secrets, InMemorySecrets) + secrets.set_secret("token", "abcde") + assert secrets.get_secret("token") == "abcde"