feat(add state module):

This commit is contained in:
Alexander Myasoedov
2025-03-12 19:38:13 +02:00
parent dda8d13b72
commit bd6d2f3db1
2 changed files with 48 additions and 47 deletions
+1 -47
View File
@@ -6,7 +6,6 @@ from json import JSONDecodeError
from typing import Any
import httpx
import pandas as pd
from skopt import Optimizer
from skopt.space import Real
@@ -15,6 +14,7 @@ from agentic_security.logutils import logger
from agentic_security.primitives import Scan, ScanResult
from agentic_security.probe_actor.cost_module import calculate_cost
from agentic_security.probe_actor.refusal import refusal_heuristic
from agentic_security.probe_actor.state import FuzzerState
from agentic_security.probe_data import audio_generator, image_generator, msj_data
from agentic_security.probe_data.data import prepare_prompts
@@ -26,52 +26,6 @@ MIN_FAILURE_SAMPLES = 5
FAILURE_RATE_THRESHOLD = 0.5
class FuzzerState:
"""Container for tracking scan results"""
def __init__(self):
self.errors = []
self.refusals = []
self.outputs = []
def add_error(
self,
module_name: str,
prompt: str,
status_code: int | str,
error_msg: str,
):
"""Add an error to the state"""
self.errors.append((module_name, prompt, status_code, error_msg))
def add_refusal(
self, module_name: str, prompt: str, status_code: int, response_text: str
):
"""Add a refusal to the state"""
self.refusals.append((module_name, prompt, status_code, response_text))
def add_output(
self, module_name: str, prompt: str, response_text: str, refused: bool
):
"""Add an output to the state"""
self.outputs.append((module_name, prompt, response_text, refused))
def get_last_output(self, prompt: str) -> str | None:
"""Get the last output for a given prompt"""
for output in reversed(self.outputs):
if output[1] == prompt:
return output[2]
return None
def export_failures(self, filename: str = "failures.csv"):
"""Export failures to a CSV file"""
failure_data = self.errors + self.refusals
df = pd.DataFrame(
failure_data, columns=["module", "prompt", "status_code", "content"]
)
df.to_csv(filename, index=False)
async def generate_prompts(
prompts: list[str] | AsyncGenerator,
) -> AsyncGenerator[str, None]:
+47
View File
@@ -0,0 +1,47 @@
import pandas as pd
class FuzzerState:
"""Container for tracking scan results"""
def __init__(self):
self.errors = []
self.refusals = []
self.outputs = []
def add_error(
self,
module_name: str,
prompt: str,
status_code: int | str,
error_msg: str,
):
"""Add an error to the state"""
self.errors.append((module_name, prompt, status_code, error_msg))
def add_refusal(
self, module_name: str, prompt: str, status_code: int, response_text: str
):
"""Add a refusal to the state"""
self.refusals.append((module_name, prompt, status_code, response_text))
def add_output(
self, module_name: str, prompt: str, response_text: str, refused: bool
):
"""Add an output to the state"""
self.outputs.append((module_name, prompt, response_text, refused))
def get_last_output(self, prompt: str) -> str | None:
"""Get the last output for a given prompt"""
for output in reversed(self.outputs):
if output[1] == prompt:
return output[2]
return None
def export_failures(self, filename: str = "failures.csv"):
"""Export failures to a CSV file"""
failure_data = self.errors + self.refusals
df = pd.DataFrame(
failure_data, columns=["module", "prompt", "status_code", "content"]
)
df.to_csv(filename, index=False)