diff --git a/.gitignore b/.gitignore index 4dfe1a2..bbe6c45 100644 --- a/.gitignore +++ b/.gitignore @@ -23,3 +23,5 @@ agentic_security/agents/operator_agno.py .claude/ plan.md auto_loop.sh +agentic_security/static/elm-stuff/ +agentic_security/static/node_modules/ diff --git a/agentic_security/http_spec.py b/agentic_security/http_spec.py index 7aa5645..09b6a0b 100644 --- a/agentic_security/http_spec.py +++ b/agentic_security/http_spec.py @@ -69,7 +69,9 @@ class LLMSpec(BaseModel): return response - def validate(self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None) -> None: + def validate( + self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None + ) -> None: if self.has_files and not files: raise ValueError("Files are required for this request.") @@ -108,7 +110,9 @@ class LLMSpec(BaseModel): # Remove Content-Length from headers to avoid mismatch when # placeholder replacement changes body size. httpx will set # the correct Content-Length based on the actual content. - clean_headers = {k: v for k, v in self.headers.items() if k.lower() != "content-length"} + clean_headers = { + k: v for k, v in self.headers.items() if k.lower() != "content-length" + } transport = httpx.AsyncHTTPTransport(retries=settings_var("network.retry", 3)) async with httpx.AsyncClient(transport=transport) as client: @@ -130,7 +134,9 @@ class LLMSpec(BaseModel): return await self.probe( "test", # TODO: fix url for mp3 - encoded_audio=encode_audio_base64_by_url("https://www.example.com/audio.mp3"), + encoded_audio=encode_audio_base64_by_url( + "https://www.example.com/audio.mp3" + ), ) case LLMSpec(has_files=True): return await self._probe_with_files({}) @@ -169,14 +175,18 @@ def parse_http_spec(http_spec: str) -> LLMSpec: # Extract the method and URL from the first line request_line_parts = lines[0].split() if len(request_line_parts) < 2: - raise InvalidHTTPSpecError("First line of HTTP spec must include the method and URL.") + raise InvalidHTTPSpecError( + "First line of HTTP spec must include the method and URL." + ) method, url = request_line_parts[0], request_line_parts[1] # Check url validity valid_url = urlparse(url) # if missing the correct formatting ://, urlparse.netloc will be empty if valid_url.scheme not in ("http", "https") or not valid_url.netloc: - raise InvalidHTTPSpecError(f"Invalid URL: {url}. Ensure it starts with 'http://' or 'https://'") + raise InvalidHTTPSpecError( + f"Invalid URL: {url}. Ensure it starts with 'http://' or 'https://'" + ) # Initialize headers and body headers = {} diff --git a/agentic_security/probe_actor/fuzzer.py b/agentic_security/probe_actor/fuzzer.py index 5056efa..ff7b3e0 100644 --- a/agentic_security/probe_actor/fuzzer.py +++ b/agentic_security/probe_actor/fuzzer.py @@ -114,7 +114,9 @@ async def process_prompt( if response.status_code >= 400: logger.error(f"HTTP {response.status_code} {response.content=}") - fuzzer_state.add_error(module_name, prompt, response.status_code, response.text) + fuzzer_state.add_error( + module_name, prompt, response.status_code, response.text + ) return tokens, True # Process successful response @@ -124,7 +126,9 @@ async def process_prompt( # Check if the response indicates a refusal refused = refusal_heuristic(response.json()) if refused: - fuzzer_state.add_refusal(module_name, prompt, response.status_code, response_text) + fuzzer_state.add_refusal( + module_name, prompt, response.status_code, response_text + ) fuzzer_state.add_output(module_name, prompt, response_text, refused) return tokens, refused @@ -168,7 +172,10 @@ async def process_prompt_batch( - Total number of tokens processed. - Number of failed prompts. """ - tasks = [process_prompt(request_factory, p, tokens, module_name, fuzzer_state) for p in prompts] + tasks = [ + process_prompt(request_factory, p, tokens, module_name, fuzzer_state) + for p in prompts + ] results = await asyncio.gather(*tasks) total_tokens = sum(r[0] for r in results) failures = sum(1 for r in results if r[1]) @@ -212,7 +219,11 @@ async def scan_module( # Initialize optimizer if optimization is enabled optimizer = ( - Optimizer([Real(0, 1)], base_estimator="GP", n_initial_points=INITIAL_OPTIMIZER_POINTS) if optimize else None + Optimizer( + [Real(0, 1)], base_estimator="GP", n_initial_points=INITIAL_OPTIMIZER_POINTS + ) + if optimize + else None ) module_size = 0 if module.lazy else len(module.prompts) @@ -544,7 +555,9 @@ async def perform_many_shot_scan( ).model_dump_json() if optimize and len(failure_rates) >= MIN_FAILURE_SAMPLES: - yield ScanResult.status_msg(f"High failure rate detected ({failure_rate:.2%}). Stopping this module...") + yield ScanResult.status_msg( + f"High failure rate detected ({failure_rate:.2%}). Stopping this module..." + ) break yield ScanResult.status_msg("Scan completed.")