diff --git a/Readme.md b/Readme.md index 1069f32..20f355e 100644 --- a/Readme.md +++ b/Readme.md @@ -102,6 +102,7 @@ To add your own dataset you can place one or multiples csv files with `prompt` c ## Run as CI check Init config + ```shell agentic_security init @@ -110,6 +111,7 @@ agentic_security init ``` default config sample + ```toml [general] @@ -151,6 +153,7 @@ high = 0.5 ``` List module + ```shell agentic_security ls @@ -196,6 +199,7 @@ Threshold: 30.0% Summary: Total Passing: 2/2 (100.0%) ``` + ## Extending dataset collections 1. Add new metadata to agentic_security.probe_data.REGISTRY diff --git a/agentic_security/http_spec.py b/agentic_security/http_spec.py index d8e6f9e..5ac75b2 100644 --- a/agentic_security/http_spec.py +++ b/agentic_security/http_spec.py @@ -1,9 +1,17 @@ import base64 +from enum import Enum import httpx from pydantic import BaseModel +class Modality(Enum): + TEXT = 0 + IMAGE = 1 + AUDIO = 2 + FILES = 3 + + def encode_image_base64_by_url(url: str = "https://github.com/fluidicon.png") -> str: """Encode image data to base64 from a URL""" response = httpx.get(url) @@ -110,6 +118,14 @@ class LLMSpec(BaseModel): fn = probe + @property + def modality(self) -> Modality: + if self.has_image: + return Modality.IMAGE + if self.has_audio: + return Modality.AUDIO + return Modality.TEXT + def parse_http_spec(http_spec: str) -> LLMSpec: """Parses an HTTP specification string into a LLMSpec object. diff --git a/agentic_security/probe_actor/fuzzer.py b/agentic_security/probe_actor/fuzzer.py index 2b77e95..a461759 100644 --- a/agentic_security/probe_actor/fuzzer.py +++ b/agentic_security/probe_actor/fuzzer.py @@ -8,16 +8,17 @@ from loguru import logger from skopt import Optimizer from skopt.space import Real +from agentic_security.http_spec import Modality from agentic_security.models.schemas import Scan, ScanResult from agentic_security.probe_actor.refusal import refusal_heuristic -from agentic_security.probe_data import msj_data +from agentic_security.probe_data import audio_generator, image_generator, msj_data from agentic_security.probe_data.data import prepare_prompts # TODO: full log file async def generate_prompts( - prompts: list[str] | AsyncGenerator, + prompts: list[str] | AsyncGenerator, modality: Modality = Modality.TEXT ) -> AsyncGenerator[str, None]: if isinstance(prompts, list): for prompt in prompts: @@ -101,7 +102,9 @@ async def perform_single_shot_scan( module_size = 0 if module.lazy else len(module.prompts) logger.info(f"Scanning {module.dataset_name} {module_size}") - async for prompt in generate_prompts(module.prompts): + async for prompt in generate_prompts( + module.prompts, modality=request_factory.modality + ): if stop_event and stop_event.is_set(): stop_event.clear() logger.info("Scan stopped by user.") @@ -212,7 +215,9 @@ async def perform_many_shot_scan( module_size = 0 if module.lazy else len(module.prompts) logger.info(f"Scanning {module.dataset_name} {module_size}") - async for prompt in generate_prompts(module.prompts): + async for prompt in generate_prompts( + module.prompts, modality=request_factory.modality + ): if stop_event and stop_event.is_set(): stop_event.clear() logger.info("Scan stopped by user.") diff --git a/agentic_security/test_lib.py b/agentic_security/test_lib.py index a8afddb..946ac57 100644 --- a/agentic_security/test_lib.py +++ b/agentic_security/test_lib.py @@ -124,6 +124,32 @@ class TestAS: print(result) assert len(result) in [0, 1] + def _test_image_modality(self): + llmSpec = test_spec_assets.IMAGE_SPEC + maxBudget = 2 + max_th = 0.3 + datasets = [ + { + "dataset_name": "AgenticBackend", + "num_prompts": 0, + "tokens": 0, + "approx_cost": 0.0, + "source": "Fine-tuned cloud hosted model", + "selected": True, + "url": "", + "dynamic": True, + "opts": { + "port": 9094, + "modules": ["encoding"], + }, + "modality": "text", + }, + ] + result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th) + assert isinstance(result, dict) + print(result) + assert len(result) in [0, 1] + class TestEntrypointCI: def test_generate_default_cfg_to_tmp_path(self):