mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
feat(add scan-csv api route):
This commit is contained in:
@@ -5,6 +5,8 @@ from typing import Protocol
|
||||
class IntegrationProto(Protocol):
|
||||
def __init__(
|
||||
self, prompt_groups: list, tools_inbox: asyncio.Queue, opts: dict = {}
|
||||
): ...
|
||||
):
|
||||
...
|
||||
|
||||
async def apply(self) -> list: ...
|
||||
async def apply(self) -> list:
|
||||
...
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from datetime import datetime
|
||||
|
||||
from fastapi import APIRouter, BackgroundTasks, HTTPException
|
||||
from fastapi import APIRouter, BackgroundTasks, File, HTTPException, Query, UploadFile
|
||||
from fastapi.responses import StreamingResponse
|
||||
|
||||
from ..core.app import get_stop_event, get_tools_inbox, set_current_run
|
||||
@@ -52,3 +52,28 @@ async def scan(scan_parameters: Scan, background_tasks: BackgroundTasks):
|
||||
async def stop_scan():
|
||||
get_stop_event().set()
|
||||
return {"status": "Scan stopped"}
|
||||
|
||||
|
||||
@router.post("/scan-csv")
|
||||
async def scan_csv(
|
||||
background_tasks: BackgroundTasks,
|
||||
file: UploadFile = File(...),
|
||||
llmSpec: UploadFile = File(...),
|
||||
optimize: bool = Query(False),
|
||||
maxBudget: int = Query(10_000),
|
||||
enableMultiStepAttack: bool = Query(False),
|
||||
):
|
||||
# TODO: content dataset to fuzzer
|
||||
content = await file.read()
|
||||
llm_spec = await llmSpec.read()
|
||||
|
||||
scan_parameters = Scan(
|
||||
llmSpec=llm_spec,
|
||||
optimize=optimize,
|
||||
maxBudget=1000,
|
||||
enableMultiStepAttack=enableMultiStepAttack,
|
||||
)
|
||||
|
||||
return StreamingResponse(
|
||||
streaming_response_generator(scan_parameters), media_type="application/json"
|
||||
)
|
||||
|
||||
@@ -0,0 +1,21 @@
|
||||
import agentic_security.test_spec_assets as test_spec_assets
|
||||
from agentic_security.routes.scan import router
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
client = TestClient(router)
|
||||
|
||||
|
||||
def test_upload_csv_and_run():
|
||||
# Create a sample CSV content
|
||||
csv_content = "id,prompt\nspec1,value1\nspec2,value3"
|
||||
# Send a POST request to the /upload-csv endpoint
|
||||
response = client.post(
|
||||
"/scan-csv?optimize=false&enableMultiStepAttack=false&maxBudget=1000",
|
||||
files={
|
||||
"file": ("test.csv", csv_content, "text/csv"),
|
||||
"llmSpec": ("spec.txt", test_spec_assets.SAMPLE_SPEC, "text/plain"),
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "Scan completed." in response.text
|
||||
Reference in New Issue
Block a user