From bd6d2f3db1215769842feb1b8679bf53ef4d1c71 Mon Sep 17 00:00:00 2001 From: Alexander Myasoedov Date: Wed, 12 Mar 2025 19:38:13 +0200 Subject: [PATCH] feat(add state module): --- agentic_security/probe_actor/fuzzer.py | 48 +------------------------- agentic_security/probe_actor/state.py | 47 +++++++++++++++++++++++++ 2 files changed, 48 insertions(+), 47 deletions(-) create mode 100644 agentic_security/probe_actor/state.py diff --git a/agentic_security/probe_actor/fuzzer.py b/agentic_security/probe_actor/fuzzer.py index 088bb70..585b8fa 100644 --- a/agentic_security/probe_actor/fuzzer.py +++ b/agentic_security/probe_actor/fuzzer.py @@ -6,7 +6,6 @@ from json import JSONDecodeError from typing import Any import httpx -import pandas as pd from skopt import Optimizer from skopt.space import Real @@ -15,6 +14,7 @@ from agentic_security.logutils import logger from agentic_security.primitives import Scan, ScanResult from agentic_security.probe_actor.cost_module import calculate_cost from agentic_security.probe_actor.refusal import refusal_heuristic +from agentic_security.probe_actor.state import FuzzerState from agentic_security.probe_data import audio_generator, image_generator, msj_data from agentic_security.probe_data.data import prepare_prompts @@ -26,52 +26,6 @@ MIN_FAILURE_SAMPLES = 5 FAILURE_RATE_THRESHOLD = 0.5 -class FuzzerState: - """Container for tracking scan results""" - - def __init__(self): - self.errors = [] - self.refusals = [] - self.outputs = [] - - def add_error( - self, - module_name: str, - prompt: str, - status_code: int | str, - error_msg: str, - ): - """Add an error to the state""" - self.errors.append((module_name, prompt, status_code, error_msg)) - - def add_refusal( - self, module_name: str, prompt: str, status_code: int, response_text: str - ): - """Add a refusal to the state""" - self.refusals.append((module_name, prompt, status_code, response_text)) - - def add_output( - self, module_name: str, prompt: str, response_text: str, refused: bool - ): - """Add an output to the state""" - self.outputs.append((module_name, prompt, response_text, refused)) - - def get_last_output(self, prompt: str) -> str | None: - """Get the last output for a given prompt""" - for output in reversed(self.outputs): - if output[1] == prompt: - return output[2] - return None - - def export_failures(self, filename: str = "failures.csv"): - """Export failures to a CSV file""" - failure_data = self.errors + self.refusals - df = pd.DataFrame( - failure_data, columns=["module", "prompt", "status_code", "content"] - ) - df.to_csv(filename, index=False) - - async def generate_prompts( prompts: list[str] | AsyncGenerator, ) -> AsyncGenerator[str, None]: diff --git a/agentic_security/probe_actor/state.py b/agentic_security/probe_actor/state.py new file mode 100644 index 0000000..8fba317 --- /dev/null +++ b/agentic_security/probe_actor/state.py @@ -0,0 +1,47 @@ +import pandas as pd + + +class FuzzerState: + """Container for tracking scan results""" + + def __init__(self): + self.errors = [] + self.refusals = [] + self.outputs = [] + + def add_error( + self, + module_name: str, + prompt: str, + status_code: int | str, + error_msg: str, + ): + """Add an error to the state""" + self.errors.append((module_name, prompt, status_code, error_msg)) + + def add_refusal( + self, module_name: str, prompt: str, status_code: int, response_text: str + ): + """Add a refusal to the state""" + self.refusals.append((module_name, prompt, status_code, response_text)) + + def add_output( + self, module_name: str, prompt: str, response_text: str, refused: bool + ): + """Add an output to the state""" + self.outputs.append((module_name, prompt, response_text, refused)) + + def get_last_output(self, prompt: str) -> str | None: + """Get the last output for a given prompt""" + for output in reversed(self.outputs): + if output[1] == prompt: + return output[2] + return None + + def export_failures(self, filename: str = "failures.csv"): + """Export failures to a CSV file""" + failure_data = self.errors + self.refusals + df = pd.DataFrame( + failure_data, columns=["module", "prompt", "status_code", "content"] + ) + df.to_csv(filename, index=False)