feat(add InMemorySecrets to fuzzer):

This commit is contained in:
Alexander Myasoedov
2025-02-20 16:24:52 +02:00
parent da50a48061
commit 1ce59151f3
4 changed files with 18 additions and 3 deletions
-2
View File
@@ -1,5 +1,3 @@
from fastapi import Depends
from agentic_security.config import CfgMixin
+12
View File
@@ -23,6 +23,18 @@ class Scan(BaseModel):
enableMultiStepAttack: bool = False
# MSJ only mode
probe_datasets: list[dict] = []
# Set and managed by the backend
secrets: dict[str, str] = {}
def with_secrets(self, secrets) -> "Scan":
match secrets:
case dict():
self.secrets.update(secrets)
case obj if hasattr(obj, "secrets"):
self.secrets.update(obj.secrets)
case _:
raise ValueError("Invalid secrets type")
return self
class ScanResult(BaseModel):
+4
View File
@@ -84,6 +84,7 @@ async def perform_single_shot_scan(
tools_inbox=None,
optimize=False,
stop_event: asyncio.Event = None,
secrets: dict[str, str] = {},
) -> AsyncGenerator[str, None]:
"""Perform a standard security scan."""
max_budget = max_budget * 100_000_000
@@ -218,6 +219,7 @@ async def perform_many_shot_scan(
stop_event: asyncio.Event = None,
probe_frequency: float = 0.2,
max_ctx_length: int = 10_000,
secrets: dict[str, str] = {},
) -> AsyncGenerator[str, None]:
"""Perform a multi-step security scan with probe injection."""
request_factory = multi_modality_spec(request_factory)
@@ -346,6 +348,7 @@ def scan_router(
tools_inbox=tools_inbox,
optimize=scan_parameters.optimize,
stop_event=stop_event,
secrets=scan_parameters.secrets,
)
else:
return perform_single_shot_scan(
@@ -355,4 +358,5 @@ def scan_router(
tools_inbox=tools_inbox,
optimize=scan_parameters.optimize,
stop_event=stop_event,
secrets=scan_parameters.secrets,
)
+2 -1
View File
@@ -58,6 +58,7 @@ async def scan(
background_tasks: BackgroundTasks,
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
):
scan_parameters.with_secrets(secrets)
return StreamingResponse(
streaming_response_generator(scan_parameters), media_type="application/json"
)
@@ -89,7 +90,7 @@ async def scan_csv(
maxBudget=1000,
enableMultiStepAttack=enableMultiStepAttack,
)
scan_parameters.with_secrets(secrets)
return StreamingResponse(
streaming_response_generator(scan_parameters), media_type="application/json"
)