diff --git a/agentic_security/http_spec.py b/agentic_security/http_spec.py index 84f7f93..d59a398 100644 --- a/agentic_security/http_spec.py +++ b/agentic_security/http_spec.py @@ -69,7 +69,9 @@ class LLMSpec(BaseModel): return response - def validate(self, prompt, encoded_image, encoded_audio, files) -> None: + def validate( + self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None + ) -> None: if self.has_files and not files: raise ValueError("Files are required for this request.") @@ -80,7 +82,11 @@ class LLMSpec(BaseModel): raise ValueError("Audio is required for this request.") async def probe( - self, prompt: str, encoded_image: str = "", encoded_audio: str = "", files={} + self, + prompt: str, + encoded_image: str = "", + encoded_audio: str = "", + files: dict | None = None, ) -> httpx.Response: """Sends an HTTP request using the `httpx` library. @@ -155,10 +161,17 @@ def parse_http_spec(http_spec: str) -> LLMSpec: secrets = get_secrets() # Split the spec by lines - lines = http_spec.strip().split("\n") + lines = http_spec.strip("\n").splitlines() + if not lines: + raise InvalidHTTPSpecError("HTTP spec is empty.") # Extract the method and URL from the first line - method, url = lines[0].split(" ")[0:2] + request_line_parts = lines[0].split() + if len(request_line_parts) < 2: + raise InvalidHTTPSpecError( + "First line of HTTP spec must include the method and URL." + ) + method, url = request_line_parts[0], request_line_parts[1] # Check url validity valid_url = urlparse(url) @@ -170,13 +183,16 @@ def parse_http_spec(http_spec: str) -> LLMSpec: # Initialize headers and body headers = {} - body = "" + body_lines: list[str] = [] # Iterate over the remaining lines reading_headers = True for line in lines[1:]: if line.strip() == "": - reading_headers = False + if reading_headers: + reading_headers = False + continue + body_lines.append("") continue if reading_headers: @@ -189,7 +205,8 @@ def parse_http_spec(http_spec: str) -> LLMSpec: raise InvalidHTTPSpecError("Header name cannot be empty.") headers[key] = value else: - body += line + body_lines.append(line) + body = "\n".join(body_lines) has_files = "multipart/form-data" in headers.get("Content-Type", "") has_image = "<>" in body has_audio = "<>" in body diff --git a/agentic_security/lib.py b/agentic_security/lib.py index e94a394..6113389 100644 --- a/agentic_security/lib.py +++ b/agentic_security/lib.py @@ -1,4 +1,5 @@ import asyncio +import copy import json from datetime import datetime @@ -29,12 +30,14 @@ class SecurityScanner(SettingsMixin): cls, llmSpec: str, maxBudget: int, - datasets: list[dict], + datasets: list[dict] | None, max_th: float, optimize: bool = False, enableMultiStepAttack: bool = False, - probe_datasets: list[dict] = [], + probe_datasets: list[dict] | None = None, ): + datasets = copy.deepcopy(datasets) if datasets is not None else [] + probe_datasets = copy.deepcopy(probe_datasets or []) start_time = datetime.now() total_modules = len(datasets) completed_modules = 0 @@ -170,15 +173,18 @@ class SecurityScanner(SettingsMixin): cls, llmSpec: str, maxBudget: int = 1_000_000, - datasets: list[dict] = REGISTRY, + datasets: list[dict] | None = None, max_th: float = 0.3, optimize: bool = False, enableMultiStepAttack: bool = False, - probe_datasets: list[dict] = [], - only: list[str] = [], + probe_datasets: list[dict] | None = None, + only: list[str] | None = None, ): - if only: - datasets = [d for d in datasets if d["dataset_name"] in only] + datasets = copy.deepcopy(datasets or REGISTRY) + probe_datasets = copy.deepcopy(probe_datasets or []) + only_set = set(only) if only else None + if only_set is not None: + datasets = [d for d in datasets if d.get("dataset_name") in only_set] for d in datasets: d["selected"] = True return asyncio.run( diff --git a/agentic_security/primitives/models.py b/agentic_security/primitives/models.py index c3eb16f..b9bdbc9 100644 --- a/agentic_security/primitives/models.py +++ b/agentic_security/primitives/models.py @@ -18,13 +18,13 @@ class LLMInfo(BaseModel): class Scan(BaseModel): llmSpec: str maxBudget: int - datasets: list[dict] = [] + datasets: list[dict] = Field(default_factory=list) optimize: bool = False enableMultiStepAttack: bool = False # MSJ only mode - probe_datasets: list[dict] = [] + probe_datasets: list[dict] = Field(default_factory=list) # Set and managed by the backend - secrets: dict[str, str] = {} + secrets: dict[str, str] = Field(default_factory=dict) def with_secrets(self, secrets) -> "Scan": match secrets: diff --git a/agentic_security/probe_actor/fuzzer.py b/agentic_security/probe_actor/fuzzer.py index 2d71785..60b3a39 100644 --- a/agentic_security/probe_actor/fuzzer.py +++ b/agentic_security/probe_actor/fuzzer.py @@ -22,8 +22,8 @@ from agentic_security.probe_data.data import prepare_prompts MAX_PROMPT_LENGTH = settings_var("fuzzer.max_prompt_lenght", 2048) BUDGET_MULTIPLIER = settings_var("fuzzer.budget_multiplier", 100000000) INITIAL_OPTIMIZER_POINTS = settings_var("fuzzer.initial_optimizer_points", 25) -MIN_FAILURE_SAMPLES = settings_var("min_failure_samples", 5) -FAILURE_RATE_THRESHOLD = settings_var("failure_rate_threshold", 0.5) +MIN_FAILURE_SAMPLES = settings_var("fuzzer.min_failure_samples", 5) +FAILURE_RATE_THRESHOLD = settings_var("fuzzer.failure_rate_threshold", 0.5) async def generate_prompts( @@ -186,9 +186,9 @@ async def scan_module( processed_prompts: int = 0, total_prompts: int = 0, max_budget: int = 0, - total_tokens: int = 0, optimize: bool = False, stop_event: asyncio.Event | None = None, + token_counter: dict[str, int] | None = None, ) -> AsyncGenerator[dict[str, Any], None]: """ Scan a single module. @@ -200,7 +200,7 @@ async def scan_module( processed_prompts: Number of prompts processed so far total_prompts: Total number of prompts to process max_budget: Maximum token budget - total_tokens: Current token count + token_counter: Shared token counter to enforce global budget optimize: Whether to use optimization stop_event: Event to stop scanning @@ -208,6 +208,7 @@ async def scan_module( ScanResult objects as the scan progresses """ tokens = 0 + token_counter = token_counter or {"total": 0} module_failures = 0 module_prompts = 0 failure_rates = [] @@ -249,9 +250,9 @@ async def scan_module( progress = 100 * processed_prompts / total_prompts if total_prompts else 0 progress = progress % 100 - total_tokens -= tokens start = time.time() + previous_tokens = tokens tokens, failed = await process_prompt( request_factory, prompt, @@ -261,7 +262,8 @@ async def scan_module( ) end = time.time() - total_tokens += tokens + token_delta = max(tokens - previous_tokens, 0) + token_counter["total"] += token_delta if failed: module_failures += 1 @@ -296,12 +298,14 @@ async def scan_module( break # Budget check - if total_tokens > max_budget: + if token_counter["total"] > max_budget: logger.info( - f"Scan ran out of budget and stopped. {total_tokens=} {max_budget=}" + "Scan ran out of budget and stopped. %s %s", + token_counter["total"], + max_budget, ) yield ScanResult.status_msg( - f"Scan ran out of budget and stopped. {total_tokens=} {max_budget=}" + f"Scan ran out of budget and stopped. total_tokens={token_counter['total']} max_budget={max_budget}" ) should_stop = True break @@ -340,11 +344,11 @@ async def with_error_handling(agen): async def perform_single_shot_scan( request_factory, max_budget: int, - datasets: list[dict[str, str]] = [], + datasets: list[dict[str, str]] | None = None, tools_inbox=None, optimize: bool = False, stop_event: asyncio.Event | None = None, - secrets: dict[str, str] = {}, + secrets: dict[str, str] | None = None, ) -> AsyncGenerator[str, None]: """ Perform a standard security scan using a given request factory. @@ -369,8 +373,16 @@ async def perform_single_shot_scan( failure statistics and token usage. If the scan exceeds the budget or failure rate is too high, it stops execution. Results are saved to a CSV file upon completion. """ + datasets = datasets or [] + secrets = secrets or {} + if stop_event and stop_event.is_set(): + stop_event.clear() + yield ScanResult.status_msg("Loading datasets...") + yield ScanResult.status_msg("Scan stopped by user.") + yield ScanResult.status_msg("Scan completed.") + return max_budget = max_budget * BUDGET_MULTIPLIER - selected_datasets = [m for m in datasets if m["selected"]] + selected_datasets = [m for m in datasets if m.get("selected")] request_factory = get_modality_adapter(request_factory) yield ScanResult.status_msg("Loading datasets...") @@ -386,7 +398,7 @@ async def perform_single_shot_scan( total_prompts = sum(len(m.prompts) for m in prompt_modules if not m.lazy) processed_prompts = 0 - total_tokens = 0 + token_counter = {"total": 0} for module in prompt_modules: module_gen = scan_module( request_factory=request_factory, @@ -395,9 +407,9 @@ async def perform_single_shot_scan( processed_prompts=processed_prompts, total_prompts=total_prompts, max_budget=max_budget, - total_tokens=total_tokens, optimize=optimize, stop_event=stop_event, + token_counter=token_counter, ) try: async for result in module_gen: @@ -416,14 +428,14 @@ async def perform_single_shot_scan( async def perform_many_shot_scan( request_factory, max_budget: int, - datasets: list[dict[str, str]] = [], - probe_datasets: list[dict[str, str]] = [], + datasets: list[dict[str, str]] | None = None, + probe_datasets: list[dict[str, str]] | None = None, tools_inbox=None, optimize: bool = False, stop_event: asyncio.Event | None = None, probe_frequency: float = 0.2, max_ctx_length: int = 10_000, - secrets: dict[str, str] = {}, + secrets: dict[str, str] | None = None, ) -> AsyncGenerator[str, None]: """ Perform a multi-step security scan with probe injection. @@ -451,6 +463,15 @@ async def perform_many_shot_scan( processes them asynchronously, and tracks failure rates. If failure rates exceed a threshold or budget is exhausted, the scan is stopped early. Results are saved to a CSV file upon completion. """ + datasets = datasets or [] + probe_datasets = probe_datasets or [] + secrets = secrets or {} + if stop_event and stop_event.is_set(): + stop_event.clear() + yield ScanResult.status_msg("Loading datasets...") + yield ScanResult.status_msg("Scan stopped by user.") + yield ScanResult.status_msg("Scan completed.") + return request_factory = get_modality_adapter(request_factory) # Load main and probe datasets yield ScanResult.status_msg("Loading datasets...") diff --git a/auto_loop.sh b/auto_loop.sh index eace117..353a13d 100644 --- a/auto_loop.sh +++ b/auto_loop.sh @@ -3,7 +3,7 @@ set -euo pipefail PROMPT="Ultrathink. You're a principal engineer. Do not ask me any questions. We need to improve the quality of this codebase. Implement improvements to codebase quality." -MAX_ITERS=200 +MAX_ITERS=5 MAX_EMPTY_RUNS=5 CODEX_MODEL="gpt-5.1-codex-max"