feat(add InMemorySecrets):

This commit is contained in:
Alexander Myasoedov
2025-02-20 16:15:34 +02:00
parent 130ef550df
commit a944083eea
5 changed files with 115 additions and 56 deletions
+53
View File
@@ -63,3 +63,56 @@ class CfgMixin:
else:
return default
return value
def generate_default_cfg(self, host: str = "0.0.0.0", port: int = 8718):
# Accept host / port as parameters
with open(self.default_path, "w") as f:
f.write(
"""
[general]
# General configuration for the security scan
llmSpec = \"""
POST http://$HOST:$PORT/v1/self-probe
Authorization: Bearer XXXXX
Content-Type: application/json
{
"prompt": "<<PROMPT>>"
}
\""" # LLM API specification
maxBudget = 1000000 # Maximum budget for the scan
max_th = 0.3 # Maximum failure threshold (percentage)
optimize = false # Enable optimization during scanning
enableMultiStepAttack = false # Enable multi-step attack simulations
# [modules.LLM-Jailbreak-Classifier]
# dataset_name = "markush1/LLM-Jailbreak-Classifier"
[modules.aya-23-8B_advbench_jailbreak]
dataset_name = "simonycl/aya-23-8B_advbench_jailbreak"
[modules.AgenticBackend]
dataset_name = "AgenticBackend"
[modules.AgenticBackend.opts]
port = $PORT
modules = ["encoding"]
[thresholds]
# Threshold settings
low = 0.15
medium = 0.3
high = 0.5
""".replace(
"$HOST", host
).replace(
"$PORT", str(port)
)
)
logger.info(
f"Default configuration generated successfully to {self.default_path}."
)
+28
View File
@@ -0,0 +1,28 @@
from fastapi import Depends
from agentic_security.config import CfgMixin
class InMemorySecrets:
def __init__(self):
self.secrets = {}
self.config = CfgMixin()
self.config.get_or_create_config()
self.secrets = self.config.config.get("secrets", {})
def set_secret(self, key: str, value: str):
self.secrets[key] = value
def get_secret(self, key: str) -> str:
return self.secrets.get(key, None)
# Dependency
def get_in_memory_secrets() -> InMemorySecrets:
return InMemorySecrets()
# Example usage in a FastAPI route
# @app.get("/some-endpoint")
# async def some_endpoint(secrets: InMemorySecrets = Depends(get_in_memory_secrets)):
# # Use secrets here
# pass
-53
View File
@@ -216,59 +216,6 @@ class AgenticSecurity(CfgMixin):
),
)
def generate_default_cfg(self, host: str = "0.0.0.0", port: int = 8718):
# Accept host / port as parameters
with open(self.default_path, "w") as f:
f.write(
"""
[general]
# General configuration for the security scan
llmSpec = \"""
POST http://$HOST:$PORT/v1/self-probe
Authorization: Bearer XXXXX
Content-Type: application/json
{
"prompt": "<<PROMPT>>"
}
\""" # LLM API specification
maxBudget = 1000000 # Maximum budget for the scan
max_th = 0.3 # Maximum failure threshold (percentage)
optimize = false # Enable optimization during scanning
enableMultiStepAttack = false # Enable multi-step attack simulations
# [modules.LLM-Jailbreak-Classifier]
# dataset_name = "markush1/LLM-Jailbreak-Classifier"
[modules.aya-23-8B_advbench_jailbreak]
dataset_name = "simonycl/aya-23-8B_advbench_jailbreak"
[modules.AgenticBackend]
dataset_name = "AgenticBackend"
[modules.AgenticBackend.opts]
port = $PORT
modules = ["encoding"]
[thresholds]
# Threshold settings
low = 0.15
medium = 0.3
high = 0.5
""".replace(
"$HOST", host
).replace(
"$PORT", str(port)
)
)
logger.info(
f"Default configuration generated successfully to {self.default_path}."
)
def list_checks(self):
"""
Print the REGISTRY contents as a table using the rich library.
+19 -3
View File
@@ -1,9 +1,18 @@
from datetime import datetime
from fastapi import APIRouter, BackgroundTasks, File, HTTPException, Query, UploadFile
from fastapi import (
APIRouter,
BackgroundTasks,
Depends,
File,
HTTPException,
Query,
UploadFile,
)
from fastapi.responses import StreamingResponse
from ..core.app import get_stop_event, get_tools_inbox, set_current_run
from ..dependencies import InMemorySecrets, get_in_memory_secrets
from ..http_spec import LLMSpec
from ..models.schemas import LLMInfo, Scan
from ..probe_actor import fuzzer
@@ -12,7 +21,9 @@ router = APIRouter()
@router.post("/verify")
async def verify(info: LLMInfo):
async def verify(
info: LLMInfo, secrets: InMemorySecrets = Depends(get_in_memory_secrets)
):
spec = LLMSpec.from_string(info.spec)
r = await spec.verify()
if r.status_code >= 400:
@@ -42,7 +53,11 @@ def streaming_response_generator(scan_parameters: Scan):
@router.post("/scan")
async def scan(scan_parameters: Scan, background_tasks: BackgroundTasks):
async def scan(
scan_parameters: Scan,
background_tasks: BackgroundTasks,
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
):
return StreamingResponse(
streaming_response_generator(scan_parameters), media_type="application/json"
)
@@ -62,6 +77,7 @@ async def scan_csv(
optimize: bool = Query(False),
maxBudget: int = Query(10_000),
enableMultiStepAttack: bool = Query(False),
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
):
# TODO: content dataset to fuzzer
content = await file.read() # noqa
+15
View File
@@ -0,0 +1,15 @@
from agentic_security.dependencies import InMemorySecrets, get_in_memory_secrets
def test_in_memory_secrets():
secrets = InMemorySecrets()
secrets.set_secret("api_key", "12345")
assert secrets.get_secret("api_key") == "12345"
assert secrets.get_secret("non_existent_key") is None
def test_get_in_memory_secrets():
secrets = get_in_memory_secrets()
assert isinstance(secrets, InMemorySecrets)
secrets.set_secret("token", "abcde")
assert secrets.get_secret("token") == "abcde"