mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-23 21:59:57 +02:00
Merge pull request #304 from zhanz5/fix/scan-csv-route-implementation
fix: implement scan-csv route to actually use uploaded CSV data
This commit is contained in:
@@ -23,6 +23,8 @@ class Scan(BaseModel):
|
||||
enableMultiStepAttack: bool = False
|
||||
# MSJ only mode
|
||||
probe_datasets: list[dict] = Field(default_factory=list)
|
||||
# Inline prompts uploaded via CSV (not stored in registry)
|
||||
inline_datasets: list[dict] = Field(default_factory=list)
|
||||
# Set and managed by the backend
|
||||
secrets: dict[str, str] = Field(default_factory=dict)
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ from agentic_security.probe_actor.cost_module import calculate_cost
|
||||
from agentic_security.probe_actor.refusal import refusal_heuristic
|
||||
from agentic_security.probe_actor.state import FuzzerState
|
||||
from agentic_security.probe_data import audio_generator, image_generator, msj_data
|
||||
from agentic_security.probe_data.data import prepare_prompts
|
||||
from agentic_security.probe_data.data import prepare_prompts, create_probe_dataset
|
||||
|
||||
MAX_PROMPT_LENGTH = settings_var("fuzzer.max_prompt_lenght", 2048)
|
||||
BUDGET_MULTIPLIER = settings_var("fuzzer.budget_multiplier", 100000000)
|
||||
@@ -352,6 +352,7 @@ async def perform_single_shot_scan(
|
||||
optimize: bool = False,
|
||||
stop_event: asyncio.Event | None = None,
|
||||
secrets: dict[str, str] | None = None,
|
||||
inline_datasets: list[dict[str, Any]] | None = None,
|
||||
) -> AsyncGenerator[str, None]:
|
||||
"""
|
||||
Perform a standard security scan using a given request factory.
|
||||
@@ -378,6 +379,7 @@ async def perform_single_shot_scan(
|
||||
"""
|
||||
datasets = datasets or []
|
||||
secrets = secrets or {}
|
||||
inline_datasets = inline_datasets or []
|
||||
if stop_event and stop_event.is_set():
|
||||
stop_event.clear()
|
||||
yield ScanResult.status_msg("Loading datasets...")
|
||||
@@ -395,6 +397,18 @@ async def perform_single_shot_scan(
|
||||
tools_inbox=tools_inbox,
|
||||
options=[m.get("opts", {}) for m in selected_datasets],
|
||||
)
|
||||
|
||||
# Append inline (uploaded CSV) datasets
|
||||
for inline_ds in inline_datasets:
|
||||
prompts = inline_ds.get("prompts", [])
|
||||
if prompts:
|
||||
ds = create_probe_dataset(
|
||||
inline_ds.get("name", "Uploaded CSV"),
|
||||
prompts,
|
||||
{"src": "upload"},
|
||||
)
|
||||
prompt_modules.append(ds)
|
||||
|
||||
yield ScanResult.status_msg("Datasets loaded. Starting scan...")
|
||||
|
||||
fuzzer_state = FuzzerState()
|
||||
@@ -620,5 +634,6 @@ def scan_router(
|
||||
optimize=scan_parameters.optimize,
|
||||
stop_event=stop_event,
|
||||
secrets=scan_parameters.secrets,
|
||||
inline_datasets=scan_parameters.inline_datasets,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -297,6 +297,37 @@ def file_dataset(file) -> list[str]:
|
||||
return prompts
|
||||
|
||||
|
||||
def parse_csv_content(content: bytes) -> ProbeDataset:
|
||||
"""Parse uploaded CSV bytes into a ProbeDataset.
|
||||
|
||||
Looks for a 'prompt' column first; falls back to the first text-like column.
|
||||
"""
|
||||
df = pd.read_csv(io.BytesIO(content), encoding_errors="ignore")
|
||||
|
||||
prompt_col = None
|
||||
# Prefer an explicit 'prompt' column
|
||||
if "prompt" in df.columns:
|
||||
prompt_col = "prompt"
|
||||
else:
|
||||
# Fall back to the first string/object column
|
||||
for col in df.columns:
|
||||
if df[col].dtype == object:
|
||||
prompt_col = col
|
||||
break
|
||||
|
||||
if prompt_col is None or df[prompt_col].dropna().empty:
|
||||
raise ValueError(
|
||||
"Uploaded CSV has no suitable prompt column. "
|
||||
"Please include a column named 'prompt'."
|
||||
)
|
||||
|
||||
prompts = df[prompt_col].dropna().astype(str).tolist()
|
||||
logger.info(
|
||||
f"Parsed {len(prompts)} prompts from uploaded CSV (column='{prompt_col}')"
|
||||
)
|
||||
return create_probe_dataset("Uploaded CSV", prompts, {"src": "upload"})
|
||||
|
||||
|
||||
def load_local_csv() -> ProbeDataset:
|
||||
"""Load prompts from local CSV files."""
|
||||
os.makedirs("./datasets", exist_ok=True)
|
||||
|
||||
@@ -20,6 +20,7 @@ from ..dependencies import InMemorySecrets, get_in_memory_secrets
|
||||
from ..http_spec import InvalidHTTPSpecError, LLMSpec
|
||||
from ..primitives import LLMInfo, Scan
|
||||
from ..probe_actor import fuzzer
|
||||
from ..probe_data.data import parse_csv_content
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
@@ -91,15 +92,25 @@ async def scan_csv(
|
||||
enableMultiStepAttack: bool = Query(False),
|
||||
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
|
||||
) -> StreamingResponse:
|
||||
# TODO: content dataset to fuzzer
|
||||
content = await file.read() # noqa
|
||||
content = await file.read()
|
||||
llm_spec = await llmSpec.read()
|
||||
|
||||
# Parse the uploaded CSV into an inline dataset
|
||||
inline_datasets = []
|
||||
try:
|
||||
dataset = parse_csv_content(content)
|
||||
inline_datasets.append(
|
||||
{"name": dataset.dataset_name, "prompts": dataset.prompts}
|
||||
)
|
||||
except ValueError as e:
|
||||
raise HTTPException(status_code=400, detail=str(e)) from e
|
||||
|
||||
scan_parameters = Scan(
|
||||
llmSpec=llm_spec,
|
||||
optimize=optimize,
|
||||
maxBudget=1000,
|
||||
maxBudget=maxBudget,
|
||||
enableMultiStepAttack=enableMultiStepAttack,
|
||||
inline_datasets=inline_datasets,
|
||||
)
|
||||
scan_parameters.with_secrets(secrets)
|
||||
return StreamingResponse(
|
||||
|
||||
Reference in New Issue
Block a user