codex quality run #1

This commit is contained in:
Alexander Myasoedov
2025-12-10 23:06:40 +02:00
parent bf628db5c4
commit 5285fdd0a0
5 changed files with 79 additions and 35 deletions
+24 -7
View File
@@ -69,7 +69,9 @@ class LLMSpec(BaseModel):
return response
def validate(self, prompt, encoded_image, encoded_audio, files) -> None:
def validate(
self, prompt: str, encoded_image: str, encoded_audio: str, files: dict | None
) -> None:
if self.has_files and not files:
raise ValueError("Files are required for this request.")
@@ -80,7 +82,11 @@ class LLMSpec(BaseModel):
raise ValueError("Audio is required for this request.")
async def probe(
self, prompt: str, encoded_image: str = "", encoded_audio: str = "", files={}
self,
prompt: str,
encoded_image: str = "",
encoded_audio: str = "",
files: dict | None = None,
) -> httpx.Response:
"""Sends an HTTP request using the `httpx` library.
@@ -155,10 +161,17 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
secrets = get_secrets()
# Split the spec by lines
lines = http_spec.strip().split("\n")
lines = http_spec.strip("\n").splitlines()
if not lines:
raise InvalidHTTPSpecError("HTTP spec is empty.")
# Extract the method and URL from the first line
method, url = lines[0].split(" ")[0:2]
request_line_parts = lines[0].split()
if len(request_line_parts) < 2:
raise InvalidHTTPSpecError(
"First line of HTTP spec must include the method and URL."
)
method, url = request_line_parts[0], request_line_parts[1]
# Check url validity
valid_url = urlparse(url)
@@ -170,13 +183,16 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
# Initialize headers and body
headers = {}
body = ""
body_lines: list[str] = []
# Iterate over the remaining lines
reading_headers = True
for line in lines[1:]:
if line.strip() == "":
reading_headers = False
if reading_headers:
reading_headers = False
continue
body_lines.append("")
continue
if reading_headers:
@@ -189,7 +205,8 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
raise InvalidHTTPSpecError("Header name cannot be empty.")
headers[key] = value
else:
body += line
body_lines.append(line)
body = "\n".join(body_lines)
has_files = "multipart/form-data" in headers.get("Content-Type", "")
has_image = "<<BASE64_IMAGE>>" in body
has_audio = "<<BASE64_AUDIO>>" in body
+13 -7
View File
@@ -1,4 +1,5 @@
import asyncio
import copy
import json
from datetime import datetime
@@ -29,12 +30,14 @@ class SecurityScanner(SettingsMixin):
cls,
llmSpec: str,
maxBudget: int,
datasets: list[dict],
datasets: list[dict] | None,
max_th: float,
optimize: bool = False,
enableMultiStepAttack: bool = False,
probe_datasets: list[dict] = [],
probe_datasets: list[dict] | None = None,
):
datasets = copy.deepcopy(datasets) if datasets is not None else []
probe_datasets = copy.deepcopy(probe_datasets or [])
start_time = datetime.now()
total_modules = len(datasets)
completed_modules = 0
@@ -170,15 +173,18 @@ class SecurityScanner(SettingsMixin):
cls,
llmSpec: str,
maxBudget: int = 1_000_000,
datasets: list[dict] = REGISTRY,
datasets: list[dict] | None = None,
max_th: float = 0.3,
optimize: bool = False,
enableMultiStepAttack: bool = False,
probe_datasets: list[dict] = [],
only: list[str] = [],
probe_datasets: list[dict] | None = None,
only: list[str] | None = None,
):
if only:
datasets = [d for d in datasets if d["dataset_name"] in only]
datasets = copy.deepcopy(datasets or REGISTRY)
probe_datasets = copy.deepcopy(probe_datasets or [])
only_set = set(only) if only else None
if only_set is not None:
datasets = [d for d in datasets if d.get("dataset_name") in only_set]
for d in datasets:
d["selected"] = True
return asyncio.run(
+3 -3
View File
@@ -18,13 +18,13 @@ class LLMInfo(BaseModel):
class Scan(BaseModel):
llmSpec: str
maxBudget: int
datasets: list[dict] = []
datasets: list[dict] = Field(default_factory=list)
optimize: bool = False
enableMultiStepAttack: bool = False
# MSJ only mode
probe_datasets: list[dict] = []
probe_datasets: list[dict] = Field(default_factory=list)
# Set and managed by the backend
secrets: dict[str, str] = {}
secrets: dict[str, str] = Field(default_factory=dict)
def with_secrets(self, secrets) -> "Scan":
match secrets:
+38 -17
View File
@@ -22,8 +22,8 @@ from agentic_security.probe_data.data import prepare_prompts
MAX_PROMPT_LENGTH = settings_var("fuzzer.max_prompt_lenght", 2048)
BUDGET_MULTIPLIER = settings_var("fuzzer.budget_multiplier", 100000000)
INITIAL_OPTIMIZER_POINTS = settings_var("fuzzer.initial_optimizer_points", 25)
MIN_FAILURE_SAMPLES = settings_var("min_failure_samples", 5)
FAILURE_RATE_THRESHOLD = settings_var("failure_rate_threshold", 0.5)
MIN_FAILURE_SAMPLES = settings_var("fuzzer.min_failure_samples", 5)
FAILURE_RATE_THRESHOLD = settings_var("fuzzer.failure_rate_threshold", 0.5)
async def generate_prompts(
@@ -186,9 +186,9 @@ async def scan_module(
processed_prompts: int = 0,
total_prompts: int = 0,
max_budget: int = 0,
total_tokens: int = 0,
optimize: bool = False,
stop_event: asyncio.Event | None = None,
token_counter: dict[str, int] | None = None,
) -> AsyncGenerator[dict[str, Any], None]:
"""
Scan a single module.
@@ -200,7 +200,7 @@ async def scan_module(
processed_prompts: Number of prompts processed so far
total_prompts: Total number of prompts to process
max_budget: Maximum token budget
total_tokens: Current token count
token_counter: Shared token counter to enforce global budget
optimize: Whether to use optimization
stop_event: Event to stop scanning
@@ -208,6 +208,7 @@ async def scan_module(
ScanResult objects as the scan progresses
"""
tokens = 0
token_counter = token_counter or {"total": 0}
module_failures = 0
module_prompts = 0
failure_rates = []
@@ -249,9 +250,9 @@ async def scan_module(
progress = 100 * processed_prompts / total_prompts if total_prompts else 0
progress = progress % 100
total_tokens -= tokens
start = time.time()
previous_tokens = tokens
tokens, failed = await process_prompt(
request_factory,
prompt,
@@ -261,7 +262,8 @@ async def scan_module(
)
end = time.time()
total_tokens += tokens
token_delta = max(tokens - previous_tokens, 0)
token_counter["total"] += token_delta
if failed:
module_failures += 1
@@ -296,12 +298,14 @@ async def scan_module(
break
# Budget check
if total_tokens > max_budget:
if token_counter["total"] > max_budget:
logger.info(
f"Scan ran out of budget and stopped. {total_tokens=} {max_budget=}"
"Scan ran out of budget and stopped. %s %s",
token_counter["total"],
max_budget,
)
yield ScanResult.status_msg(
f"Scan ran out of budget and stopped. {total_tokens=} {max_budget=}"
f"Scan ran out of budget and stopped. total_tokens={token_counter['total']} max_budget={max_budget}"
)
should_stop = True
break
@@ -340,11 +344,11 @@ async def with_error_handling(agen):
async def perform_single_shot_scan(
request_factory,
max_budget: int,
datasets: list[dict[str, str]] = [],
datasets: list[dict[str, str]] | None = None,
tools_inbox=None,
optimize: bool = False,
stop_event: asyncio.Event | None = None,
secrets: dict[str, str] = {},
secrets: dict[str, str] | None = None,
) -> AsyncGenerator[str, None]:
"""
Perform a standard security scan using a given request factory.
@@ -369,8 +373,16 @@ async def perform_single_shot_scan(
failure statistics and token usage. If the scan exceeds the budget or failure rate is too high,
it stops execution. Results are saved to a CSV file upon completion.
"""
datasets = datasets or []
secrets = secrets or {}
if stop_event and stop_event.is_set():
stop_event.clear()
yield ScanResult.status_msg("Loading datasets...")
yield ScanResult.status_msg("Scan stopped by user.")
yield ScanResult.status_msg("Scan completed.")
return
max_budget = max_budget * BUDGET_MULTIPLIER
selected_datasets = [m for m in datasets if m["selected"]]
selected_datasets = [m for m in datasets if m.get("selected")]
request_factory = get_modality_adapter(request_factory)
yield ScanResult.status_msg("Loading datasets...")
@@ -386,7 +398,7 @@ async def perform_single_shot_scan(
total_prompts = sum(len(m.prompts) for m in prompt_modules if not m.lazy)
processed_prompts = 0
total_tokens = 0
token_counter = {"total": 0}
for module in prompt_modules:
module_gen = scan_module(
request_factory=request_factory,
@@ -395,9 +407,9 @@ async def perform_single_shot_scan(
processed_prompts=processed_prompts,
total_prompts=total_prompts,
max_budget=max_budget,
total_tokens=total_tokens,
optimize=optimize,
stop_event=stop_event,
token_counter=token_counter,
)
try:
async for result in module_gen:
@@ -416,14 +428,14 @@ async def perform_single_shot_scan(
async def perform_many_shot_scan(
request_factory,
max_budget: int,
datasets: list[dict[str, str]] = [],
probe_datasets: list[dict[str, str]] = [],
datasets: list[dict[str, str]] | None = None,
probe_datasets: list[dict[str, str]] | None = None,
tools_inbox=None,
optimize: bool = False,
stop_event: asyncio.Event | None = None,
probe_frequency: float = 0.2,
max_ctx_length: int = 10_000,
secrets: dict[str, str] = {},
secrets: dict[str, str] | None = None,
) -> AsyncGenerator[str, None]:
"""
Perform a multi-step security scan with probe injection.
@@ -451,6 +463,15 @@ async def perform_many_shot_scan(
processes them asynchronously, and tracks failure rates. If failure rates exceed a threshold
or budget is exhausted, the scan is stopped early. Results are saved to a CSV file upon completion.
"""
datasets = datasets or []
probe_datasets = probe_datasets or []
secrets = secrets or {}
if stop_event and stop_event.is_set():
stop_event.clear()
yield ScanResult.status_msg("Loading datasets...")
yield ScanResult.status_msg("Scan stopped by user.")
yield ScanResult.status_msg("Scan completed.")
return
request_factory = get_modality_adapter(request_factory)
# Load main and probe datasets
yield ScanResult.status_msg("Loading datasets...")
+1 -1
View File
@@ -3,7 +3,7 @@
set -euo pipefail
PROMPT="Ultrathink. You're a principal engineer. Do not ask me any questions. We need to improve the quality of this codebase. Implement improvements to codebase quality."
MAX_ITERS=200
MAX_ITERS=5
MAX_EMPTY_RUNS=5
CODEX_MODEL="gpt-5.1-codex-max"