diff --git a/agentic_security/primitives/models.py b/agentic_security/primitives/models.py index b9bdbc9..e0548b9 100644 --- a/agentic_security/primitives/models.py +++ b/agentic_security/primitives/models.py @@ -23,6 +23,8 @@ class Scan(BaseModel): enableMultiStepAttack: bool = False # MSJ only mode probe_datasets: list[dict] = Field(default_factory=list) + # Inline prompts uploaded via CSV (not stored in registry) + inline_datasets: list[dict] = Field(default_factory=list) # Set and managed by the backend secrets: dict[str, str] = Field(default_factory=dict) diff --git a/agentic_security/probe_actor/fuzzer.py b/agentic_security/probe_actor/fuzzer.py index ff7b3e0..f030cf4 100644 --- a/agentic_security/probe_actor/fuzzer.py +++ b/agentic_security/probe_actor/fuzzer.py @@ -17,7 +17,7 @@ from agentic_security.probe_actor.cost_module import calculate_cost from agentic_security.probe_actor.refusal import refusal_heuristic from agentic_security.probe_actor.state import FuzzerState from agentic_security.probe_data import audio_generator, image_generator, msj_data -from agentic_security.probe_data.data import prepare_prompts +from agentic_security.probe_data.data import prepare_prompts, create_probe_dataset MAX_PROMPT_LENGTH = settings_var("fuzzer.max_prompt_lenght", 2048) BUDGET_MULTIPLIER = settings_var("fuzzer.budget_multiplier", 100000000) @@ -352,6 +352,7 @@ async def perform_single_shot_scan( optimize: bool = False, stop_event: asyncio.Event | None = None, secrets: dict[str, str] | None = None, + inline_datasets: list[dict[str, Any]] | None = None, ) -> AsyncGenerator[str, None]: """ Perform a standard security scan using a given request factory. @@ -378,6 +379,7 @@ async def perform_single_shot_scan( """ datasets = datasets or [] secrets = secrets or {} + inline_datasets = inline_datasets or [] if stop_event and stop_event.is_set(): stop_event.clear() yield ScanResult.status_msg("Loading datasets...") @@ -395,6 +397,18 @@ async def perform_single_shot_scan( tools_inbox=tools_inbox, options=[m.get("opts", {}) for m in selected_datasets], ) + + # Append inline (uploaded CSV) datasets + for inline_ds in inline_datasets: + prompts = inline_ds.get("prompts", []) + if prompts: + ds = create_probe_dataset( + inline_ds.get("name", "Uploaded CSV"), + prompts, + {"src": "upload"}, + ) + prompt_modules.append(ds) + yield ScanResult.status_msg("Datasets loaded. Starting scan...") fuzzer_state = FuzzerState() @@ -620,5 +634,6 @@ def scan_router( optimize=scan_parameters.optimize, stop_event=stop_event, secrets=scan_parameters.secrets, + inline_datasets=scan_parameters.inline_datasets, ) ) diff --git a/agentic_security/probe_data/data.py b/agentic_security/probe_data/data.py index 352d4fb..47f7a06 100644 --- a/agentic_security/probe_data/data.py +++ b/agentic_security/probe_data/data.py @@ -297,6 +297,37 @@ def file_dataset(file) -> list[str]: return prompts +def parse_csv_content(content: bytes) -> ProbeDataset: + """Parse uploaded CSV bytes into a ProbeDataset. + + Looks for a 'prompt' column first; falls back to the first text-like column. + """ + df = pd.read_csv(io.BytesIO(content), encoding_errors="ignore") + + prompt_col = None + # Prefer an explicit 'prompt' column + if "prompt" in df.columns: + prompt_col = "prompt" + else: + # Fall back to the first string/object column + for col in df.columns: + if df[col].dtype == object: + prompt_col = col + break + + if prompt_col is None or df[prompt_col].dropna().empty: + raise ValueError( + "Uploaded CSV has no suitable prompt column. " + "Please include a column named 'prompt'." + ) + + prompts = df[prompt_col].dropna().astype(str).tolist() + logger.info( + f"Parsed {len(prompts)} prompts from uploaded CSV (column='{prompt_col}')" + ) + return create_probe_dataset("Uploaded CSV", prompts, {"src": "upload"}) + + def load_local_csv() -> ProbeDataset: """Load prompts from local CSV files.""" os.makedirs("./datasets", exist_ok=True) diff --git a/agentic_security/routes/scan.py b/agentic_security/routes/scan.py index cb4d293..6c4a0bc 100644 --- a/agentic_security/routes/scan.py +++ b/agentic_security/routes/scan.py @@ -20,6 +20,7 @@ from ..dependencies import InMemorySecrets, get_in_memory_secrets from ..http_spec import InvalidHTTPSpecError, LLMSpec from ..primitives import LLMInfo, Scan from ..probe_actor import fuzzer +from ..probe_data.data import parse_csv_content router = APIRouter() @@ -91,15 +92,25 @@ async def scan_csv( enableMultiStepAttack: bool = Query(False), secrets: InMemorySecrets = Depends(get_in_memory_secrets), ) -> StreamingResponse: - # TODO: content dataset to fuzzer - content = await file.read() # noqa + content = await file.read() llm_spec = await llmSpec.read() + # Parse the uploaded CSV into an inline dataset + inline_datasets = [] + try: + dataset = parse_csv_content(content) + inline_datasets.append( + {"name": dataset.dataset_name, "prompts": dataset.prompts} + ) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) from e + scan_parameters = Scan( llmSpec=llm_spec, optimize=optimize, - maxBudget=1000, + maxBudget=maxBudget, enableMultiStepAttack=enableMultiStepAttack, + inline_datasets=inline_datasets, ) scan_parameters.with_secrets(secrets) return StreamingResponse(