mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-23 21:59:57 +02:00
feat(multi modaility):
This commit is contained in:
@@ -102,6 +102,7 @@ To add your own dataset you can place one or multiples csv files with `prompt` c
|
||||
## Run as CI check
|
||||
|
||||
Init config
|
||||
|
||||
```shell
|
||||
agentic_security init
|
||||
|
||||
@@ -110,6 +111,7 @@ agentic_security init
|
||||
```
|
||||
|
||||
default config sample
|
||||
|
||||
```toml
|
||||
|
||||
[general]
|
||||
@@ -151,6 +153,7 @@ high = 0.5
|
||||
```
|
||||
|
||||
List module
|
||||
|
||||
```shell
|
||||
agentic_security ls
|
||||
|
||||
@@ -196,6 +199,7 @@ Threshold: 30.0%
|
||||
Summary:
|
||||
Total Passing: 2/2 (100.0%)
|
||||
```
|
||||
|
||||
## Extending dataset collections
|
||||
|
||||
1. Add new metadata to agentic_security.probe_data.REGISTRY
|
||||
|
||||
@@ -1,9 +1,17 @@
|
||||
import base64
|
||||
from enum import Enum
|
||||
|
||||
import httpx
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class Modality(Enum):
|
||||
TEXT = 0
|
||||
IMAGE = 1
|
||||
AUDIO = 2
|
||||
FILES = 3
|
||||
|
||||
|
||||
def encode_image_base64_by_url(url: str = "https://github.com/fluidicon.png") -> str:
|
||||
"""Encode image data to base64 from a URL"""
|
||||
response = httpx.get(url)
|
||||
@@ -110,6 +118,14 @@ class LLMSpec(BaseModel):
|
||||
|
||||
fn = probe
|
||||
|
||||
@property
|
||||
def modality(self) -> Modality:
|
||||
if self.has_image:
|
||||
return Modality.IMAGE
|
||||
if self.has_audio:
|
||||
return Modality.AUDIO
|
||||
return Modality.TEXT
|
||||
|
||||
|
||||
def parse_http_spec(http_spec: str) -> LLMSpec:
|
||||
"""Parses an HTTP specification string into a LLMSpec object.
|
||||
|
||||
@@ -8,16 +8,17 @@ from loguru import logger
|
||||
from skopt import Optimizer
|
||||
from skopt.space import Real
|
||||
|
||||
from agentic_security.http_spec import Modality
|
||||
from agentic_security.models.schemas import Scan, ScanResult
|
||||
from agentic_security.probe_actor.refusal import refusal_heuristic
|
||||
from agentic_security.probe_data import msj_data
|
||||
from agentic_security.probe_data import audio_generator, image_generator, msj_data
|
||||
from agentic_security.probe_data.data import prepare_prompts
|
||||
|
||||
# TODO: full log file
|
||||
|
||||
|
||||
async def generate_prompts(
|
||||
prompts: list[str] | AsyncGenerator,
|
||||
prompts: list[str] | AsyncGenerator, modality: Modality = Modality.TEXT
|
||||
) -> AsyncGenerator[str, None]:
|
||||
if isinstance(prompts, list):
|
||||
for prompt in prompts:
|
||||
@@ -101,7 +102,9 @@ async def perform_single_shot_scan(
|
||||
module_size = 0 if module.lazy else len(module.prompts)
|
||||
logger.info(f"Scanning {module.dataset_name} {module_size}")
|
||||
|
||||
async for prompt in generate_prompts(module.prompts):
|
||||
async for prompt in generate_prompts(
|
||||
module.prompts, modality=request_factory.modality
|
||||
):
|
||||
if stop_event and stop_event.is_set():
|
||||
stop_event.clear()
|
||||
logger.info("Scan stopped by user.")
|
||||
@@ -212,7 +215,9 @@ async def perform_many_shot_scan(
|
||||
module_size = 0 if module.lazy else len(module.prompts)
|
||||
logger.info(f"Scanning {module.dataset_name} {module_size}")
|
||||
|
||||
async for prompt in generate_prompts(module.prompts):
|
||||
async for prompt in generate_prompts(
|
||||
module.prompts, modality=request_factory.modality
|
||||
):
|
||||
if stop_event and stop_event.is_set():
|
||||
stop_event.clear()
|
||||
logger.info("Scan stopped by user.")
|
||||
|
||||
@@ -124,6 +124,32 @@ class TestAS:
|
||||
print(result)
|
||||
assert len(result) in [0, 1]
|
||||
|
||||
def _test_image_modality(self):
|
||||
llmSpec = test_spec_assets.IMAGE_SPEC
|
||||
maxBudget = 2
|
||||
max_th = 0.3
|
||||
datasets = [
|
||||
{
|
||||
"dataset_name": "AgenticBackend",
|
||||
"num_prompts": 0,
|
||||
"tokens": 0,
|
||||
"approx_cost": 0.0,
|
||||
"source": "Fine-tuned cloud hosted model",
|
||||
"selected": True,
|
||||
"url": "",
|
||||
"dynamic": True,
|
||||
"opts": {
|
||||
"port": 9094,
|
||||
"modules": ["encoding"],
|
||||
},
|
||||
"modality": "text",
|
||||
},
|
||||
]
|
||||
result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th)
|
||||
assert isinstance(result, dict)
|
||||
print(result)
|
||||
assert len(result) in [0, 1]
|
||||
|
||||
|
||||
class TestEntrypointCI:
|
||||
def test_generate_default_cfg_to_tmp_path(self):
|
||||
|
||||
Reference in New Issue
Block a user