mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-23 21:59:57 +02:00
fix(fmt):
This commit is contained in:
@@ -23,3 +23,5 @@ agentic_security/agents/operator_agno.py
|
||||
.claude/
|
||||
plan.md
|
||||
auto_loop.sh
|
||||
agentic_security/static/elm-stuff/
|
||||
agentic_security/static/node_modules/
|
||||
|
||||
@@ -69,7 +69,9 @@ class LLMSpec(BaseModel):
|
||||
|
||||
return response
|
||||
|
||||
def validate(self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None) -> None:
|
||||
def validate(
|
||||
self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None
|
||||
) -> None:
|
||||
if self.has_files and not files:
|
||||
raise ValueError("Files are required for this request.")
|
||||
|
||||
@@ -108,7 +110,9 @@ class LLMSpec(BaseModel):
|
||||
# Remove Content-Length from headers to avoid mismatch when
|
||||
# placeholder replacement changes body size. httpx will set
|
||||
# the correct Content-Length based on the actual content.
|
||||
clean_headers = {k: v for k, v in self.headers.items() if k.lower() != "content-length"}
|
||||
clean_headers = {
|
||||
k: v for k, v in self.headers.items() if k.lower() != "content-length"
|
||||
}
|
||||
|
||||
transport = httpx.AsyncHTTPTransport(retries=settings_var("network.retry", 3))
|
||||
async with httpx.AsyncClient(transport=transport) as client:
|
||||
@@ -130,7 +134,9 @@ class LLMSpec(BaseModel):
|
||||
return await self.probe(
|
||||
"test",
|
||||
# TODO: fix url for mp3
|
||||
encoded_audio=encode_audio_base64_by_url("https://www.example.com/audio.mp3"),
|
||||
encoded_audio=encode_audio_base64_by_url(
|
||||
"https://www.example.com/audio.mp3"
|
||||
),
|
||||
)
|
||||
case LLMSpec(has_files=True):
|
||||
return await self._probe_with_files({})
|
||||
@@ -169,14 +175,18 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
|
||||
# Extract the method and URL from the first line
|
||||
request_line_parts = lines[0].split()
|
||||
if len(request_line_parts) < 2:
|
||||
raise InvalidHTTPSpecError("First line of HTTP spec must include the method and URL.")
|
||||
raise InvalidHTTPSpecError(
|
||||
"First line of HTTP spec must include the method and URL."
|
||||
)
|
||||
method, url = request_line_parts[0], request_line_parts[1]
|
||||
|
||||
# Check url validity
|
||||
valid_url = urlparse(url)
|
||||
# if missing the correct formatting ://, urlparse.netloc will be empty
|
||||
if valid_url.scheme not in ("http", "https") or not valid_url.netloc:
|
||||
raise InvalidHTTPSpecError(f"Invalid URL: {url}. Ensure it starts with 'http://' or 'https://'")
|
||||
raise InvalidHTTPSpecError(
|
||||
f"Invalid URL: {url}. Ensure it starts with 'http://' or 'https://'"
|
||||
)
|
||||
|
||||
# Initialize headers and body
|
||||
headers = {}
|
||||
|
||||
@@ -114,7 +114,9 @@ async def process_prompt(
|
||||
|
||||
if response.status_code >= 400:
|
||||
logger.error(f"HTTP {response.status_code} {response.content=}")
|
||||
fuzzer_state.add_error(module_name, prompt, response.status_code, response.text)
|
||||
fuzzer_state.add_error(
|
||||
module_name, prompt, response.status_code, response.text
|
||||
)
|
||||
return tokens, True
|
||||
|
||||
# Process successful response
|
||||
@@ -124,7 +126,9 @@ async def process_prompt(
|
||||
# Check if the response indicates a refusal
|
||||
refused = refusal_heuristic(response.json())
|
||||
if refused:
|
||||
fuzzer_state.add_refusal(module_name, prompt, response.status_code, response_text)
|
||||
fuzzer_state.add_refusal(
|
||||
module_name, prompt, response.status_code, response_text
|
||||
)
|
||||
|
||||
fuzzer_state.add_output(module_name, prompt, response_text, refused)
|
||||
return tokens, refused
|
||||
@@ -168,7 +172,10 @@ async def process_prompt_batch(
|
||||
- Total number of tokens processed.
|
||||
- Number of failed prompts.
|
||||
"""
|
||||
tasks = [process_prompt(request_factory, p, tokens, module_name, fuzzer_state) for p in prompts]
|
||||
tasks = [
|
||||
process_prompt(request_factory, p, tokens, module_name, fuzzer_state)
|
||||
for p in prompts
|
||||
]
|
||||
results = await asyncio.gather(*tasks)
|
||||
total_tokens = sum(r[0] for r in results)
|
||||
failures = sum(1 for r in results if r[1])
|
||||
@@ -212,7 +219,11 @@ async def scan_module(
|
||||
|
||||
# Initialize optimizer if optimization is enabled
|
||||
optimizer = (
|
||||
Optimizer([Real(0, 1)], base_estimator="GP", n_initial_points=INITIAL_OPTIMIZER_POINTS) if optimize else None
|
||||
Optimizer(
|
||||
[Real(0, 1)], base_estimator="GP", n_initial_points=INITIAL_OPTIMIZER_POINTS
|
||||
)
|
||||
if optimize
|
||||
else None
|
||||
)
|
||||
|
||||
module_size = 0 if module.lazy else len(module.prompts)
|
||||
@@ -544,7 +555,9 @@ async def perform_many_shot_scan(
|
||||
).model_dump_json()
|
||||
|
||||
if optimize and len(failure_rates) >= MIN_FAILURE_SAMPLES:
|
||||
yield ScanResult.status_msg(f"High failure rate detected ({failure_rate:.2%}). Stopping this module...")
|
||||
yield ScanResult.status_msg(
|
||||
f"High failure rate detected ({failure_rate:.2%}). Stopping this module..."
|
||||
)
|
||||
break
|
||||
|
||||
yield ScanResult.status_msg("Scan completed.")
|
||||
|
||||
Reference in New Issue
Block a user