Compare commits

...

61 Commits

Author SHA1 Message Date
Alexander Myasoedov 85d5c67047 feat(bump version): 2025-03-12 13:42:59 +02:00
Alexander Myasoedov b4e0aef33f Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-12 13:41:56 +02:00
Alexander Myasoedov 3a3ac4421c fix(clean up): 2025-03-12 13:41:24 +02:00
Alexander Myasoedov d6ccd8f114 Merge pull request #180 from nemanjaASE/issue-168-missing-documentation
Add missing documentation in report_chart.py
2025-03-12 13:39:56 +02:00
nemanjaASE 41ecc3c8e2 Add missing documentation in report_chart.py 2025-03-12 12:30:30 +01:00
Alexander Myasoedov a5f8da4139 Merge pull request #178 from nemanjaASE/issue-169-incomplete-error-handling
Add error handling in report_chart.py
2025-03-12 12:53:07 +02:00
Alexander Myasoedov 9b47b72bb8 fix(link): 2025-03-12 12:39:54 +02:00
Alexander Myasoedov 91ff3aa11f feat(add demo gif): 2025-03-12 12:38:32 +02:00
nemanjaASE 87264b793b Fix error handling 2025-03-12 11:38:23 +01:00
Alexander Myasoedov 2bc093cfc4 feat(add gif to repo): 2025-03-12 12:37:36 +02:00
Alexander Myasoedov 182b67de57 Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-12 12:36:22 +02:00
Alexander Myasoedov 35540d0299 fix(link): 2025-03-12 12:34:52 +02:00
Alexander Myasoedov 50ce80b149 Merge pull request #179 from msoedov/dependabot/pip/datasets-3.3.2
build(deps): bump datasets from 3.3.0 to 3.3.2
2025-03-12 11:55:37 +02:00
dependabot[bot] 07772e9779 build(deps): bump datasets from 3.3.0 to 3.3.2
Bumps [datasets](https://github.com/huggingface/datasets) from 3.3.0 to 3.3.2.
- [Release notes](https://github.com/huggingface/datasets/releases)
- [Commits](https://github.com/huggingface/datasets/compare/3.3.0...3.3.2)

---
updated-dependencies:
- dependency-name: datasets
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-03-11 17:34:30 +00:00
Alexander Myasoedov c7a6d16e00 Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-11 14:35:33 +02:00
Alexander Myasoedov fe71ab0b70 fix(AttributeError: 'NoneType' object has no attribute 'values'): 2025-03-11 14:34:41 +02:00
Alexander Myasoedov b6643d3289 Merge pull request #176 from msoedov/dependabot/pip/mkdocstrings-0.29.0
build(deps-dev): bump mkdocstrings from 0.28.2 to 0.29.0
2025-03-11 12:03:49 +02:00
nemanjaASE 3d6018302e Add error handling in report_chart.py 2025-03-11 00:01:33 +01:00
dependabot[bot] a54b11318a build(deps-dev): bump mkdocstrings from 0.28.2 to 0.29.0
Bumps [mkdocstrings](https://github.com/mkdocstrings/mkdocstrings) from 0.28.2 to 0.29.0.
- [Release notes](https://github.com/mkdocstrings/mkdocstrings/releases)
- [Changelog](https://github.com/mkdocstrings/mkdocstrings/blob/main/CHANGELOG.md)
- [Commits](https://github.com/mkdocstrings/mkdocstrings/compare/0.28.2...0.29.0)

---
updated-dependencies:
- dependency-name: mkdocstrings
  dependency-type: direct:development
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-03-10 18:09:28 +00:00
Alexander Myasoedov 91c99e642f fix(pc): 2025-03-10 13:25:17 +02:00
Alexander Myasoedov 1226e2059d Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-10 13:24:42 +02:00
Alexander Myasoedov c94aa54e41 Merge pull request #172 from nemanjaASE/issue-157-error-handling
Added error handling for subprocess.run calls by logging errors and raising AudioGenerationError.
2025-03-10 13:23:20 +02:00
Alexander Myasoedov 431bf84956 Merge pull request #171 from sjay8/main
Added url formatting check
2025-03-10 13:21:10 +02:00
sjay8 9213b64de0 spacing 2025-03-09 14:44:42 -07:00
sjay8 9d2885f7a6 spacing 2025-03-09 14:40:35 -07:00
sjay8 4c0a57dbae fixed spacing issues-4 2025-03-09 14:28:47 -07:00
nemanjaASE 6e6abae680 Added error handling for subprocess.run calls by logging errors and raising AudioGenerationError.
Ensured cleanup of temporary files even if an error occurs.
2025-03-09 22:26:07 +01:00
sjay8 fa7bd104a0 fixed spacing issues-2 2025-03-09 14:25:45 -07:00
sjay8 321c3dafc0 fixed spacing issues 2025-03-09 14:22:13 -07:00
sjay8 eb14fe0f61 typo fixed 2025-03-09 13:55:11 -07:00
sjay8 b5985bf810 formatting in comment 2025-03-09 13:53:53 -07:00
sjay8 29362aed30 Fixed issue 137: ensuring http/https with :// is present 2025-03-09 13:48:32 -07:00
Alexander Myasoedov c2671fd6f1 fix(minor docs update): 2025-03-09 21:23:13 +02:00
Alexander Myasoedov 56984c7078 fix(mcp server): 2025-03-09 21:23:02 +02:00
Alexander Myasoedov f9dce3f915 fix(typo in base.js): 2025-03-09 20:43:08 +02:00
Alexander Myasoedov 6e99dfd54f feat(add toast): 2025-03-09 17:37:02 +02:00
Alexander Myasoedov 67f7381ace feat(add new theme): 2025-03-09 17:14:00 +02:00
Alexander Myasoedov e2cb909329 refactor(data module): 2025-03-09 17:10:14 +02:00
Alexander Myasoedov dbef9fcc03 fix(pytest workers): 2025-03-09 17:07:37 +02:00
Alexander Myasoedov 4a07411fbd fix(pc): 2025-03-09 16:46:09 +02:00
Alexander Myasoedov 11951c0ca0 Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-09 16:45:39 +02:00
Alexander Myasoedov 02222b75f8 Merge pull request #162 from nemanjaASE/issue-158-missing-type-annotations
Added Missing Type Annotations
2025-03-09 16:43:40 +02:00
Alexander Myasoedov aa27817f94 Merge pull request #164 from Praveenk8051/feat/parallel-tests-execution-pydist
chore: update pytest configuration to enable parallel test execution
2025-03-09 16:41:43 +02:00
nemanjaASE 5a4b5e11b2 Fix: Remove unused imports with pycln 2025-03-09 15:21:50 +01:00
Praveenk8051 8bd76b9f05 chore: update pytest configuration to enable parallel test execution 2025-03-09 14:15:44 +01:00
Praveenk8051 6f3c522d59 chore: update poetry.lock and pyproject.toml to include pytest-xdist 2025-03-09 14:01:50 +01:00
nemanjaASE 71787c6ec9 Add type annotations to functions and methods for improved clarity and maintainabiliy 2025-03-09 13:50:18 +01:00
Alexander Myasoedov f911912c0f Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-09 12:33:03 +02:00
Alexander Myasoedov 896ca95ae2 Merge pull request #161 from dogakantarci/move-tests-from-routes-to-tests-div
fix: move routes/test_* to /tests/routes
2025-03-09 12:32:44 +02:00
doa f85c77d622 fix: move routes/test_* to /tests/routes 2025-03-08 23:46:09 +03:00
Alexander Myasoedov 684ba0b70d Merge pull request #160 from dogakantarci/move-test-files
Moved test files from agentic_security.probe_actor to tests.probe_actor
2025-03-08 20:00:18 +02:00
doa 21b43b18e7 Moved test files from agentic_security.probe_actor to tests.probe_actor 2025-03-08 20:18:54 +03:00
Alexander Myasoedov 9340fa7223 Merge branch 'main' of github.com:msoedov/agentic_security 2025-03-08 18:14:06 +02:00
Alexander Myasoedov d20c1a3d0d Merge pull request #159 from dogakantarci/move-test-model
fix: issue #156 - move test_model.py file to /tests directory
2025-03-08 18:10:06 +02:00
Alexander Myasoedov 812f9e7619 fix(SecurityScanner): 2025-03-08 18:02:02 +02:00
doa ebac62e21a fix: issue #156 - move test file to tests dir 2025-03-08 19:01:57 +03:00
Alexander Myasoedov ce2a791663 fix(conflicting logger conf): 2025-03-08 17:54:21 +02:00
Alexander Myasoedov db994fd483 fix(logging): 2025-03-08 17:51:44 +02:00
Alexander Myasoedov 7fe93e8a14 feat(re org tests): 2025-03-08 12:38:54 +02:00
Alexander Myasoedov a934ee243e fix(logger name): 2025-03-08 12:36:38 +02:00
Alexander Myasoedov d646ecd61b feat(add logutils): 2025-03-08 12:35:16 +02:00
50 changed files with 938 additions and 595 deletions
+1
View File
@@ -17,3 +17,4 @@ inv/
scripts/
docx/
agentic_security.toml
/venv
+1 -2
View File
@@ -83,8 +83,7 @@ agentic_security --port=PORT --host=HOST
## UI 🧙
<img width="100%" alt="booking-screen" src="https://res.cloudinary.com/dq0w2rtm9/image/upload/v1736433557/z0bsyzhsqlgcr3w4ovwp.gif">
<img width="100%" alt="booking-screen" src="https://res.cloudinary.com/dq0w2rtm9/image/upload/v1741192668/final_aa9jhb.gif">
<img width="100%" alt="booking-screen" src="https://raw.githubusercontent.com/msoedov/agentic_security/refs/heads/main/docs/images/demo.gif">
## LLM kwargs
+2 -2
View File
@@ -1,3 +1,3 @@
from .lib import AgenticSecurity
from .lib import SecurityScanner
__all__ = ["AgenticSecurity"]
__all__ = ["SecurityScanner"]
+4 -4
View File
@@ -5,7 +5,7 @@ import fire
import uvicorn
from agentic_security.app import app
from agentic_security.lib import AgenticSecurity
from agentic_security.lib import SecurityScanner
from agentic_security.misc.banner import init_banner
@@ -32,14 +32,14 @@ class CLI:
Run Agentic Security in CI mode.
"""
sys.path.append(os.path.dirname("."))
AgenticSecurity().entrypoint()
SecurityScanner().entrypoint()
def init(self, host: str = "0.0.0.0", port: int = 8718):
"""
Generate the default CI configuration file.
"""
sys.path.append(os.path.dirname("."))
AgenticSecurity().generate_default_settings(host, port)
SecurityScanner().generate_default_settings(host, port)
i = init
@@ -48,7 +48,7 @@ class CLI:
List all available security checks.
"""
sys.path.append(os.path.dirname("."))
AgenticSecurity().list_checks()
SecurityScanner().list_checks()
def main():
+1 -3
View File
@@ -1,5 +1,4 @@
import asyncio
import logging
import os
from typing import Any
@@ -10,12 +9,11 @@ from pydantic import BaseModel, ConfigDict, Field
# Assuming LLMSpec is defined elsewhere; placeholder import
from agentic_security.http_spec import LLMSpec
from agentic_security.logutils import logger
LLM_SPECS = [] # Populate with LLM spec strings if needed
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define AgentSpecification model
+1 -5
View File
@@ -1,5 +1,4 @@
import asyncio
import logging
from typing import Any
import httpx
@@ -8,13 +7,10 @@ from pydantic_ai import Agent, RunContext, Tool
# Assuming LLMSpec is defined elsewhere; placeholder import
from agentic_security.http_spec import LLMSpec
from agentic_security.logutils import logger
LLM_SPECS = [] # Populate this list with LLM spec strings if needed
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define AgentSpecification model
class AgentSpecification(BaseModel):
+2 -1
View File
@@ -1,7 +1,8 @@
from functools import lru_cache
import tomli
from loguru import logger
from agentic_security.logutils import logger
SETTINGS_VERSION = 1
+10 -6
View File
@@ -4,10 +4,14 @@ from asyncio import Event, Queue
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
from agentic_security.http_spec import LLMSpec
tools_inbox: Queue = Queue()
stop_event: Event = Event()
current_run: str = {"spec": "", "id": ""}
_secrets = {}
_secrets: dict[str, str] = {}
current_run: dict[str, int | LLMSpec] = {"spec": "", "id": ""}
def create_app() -> FastAPI:
@@ -26,29 +30,29 @@ def get_stop_event() -> Event:
return stop_event
def get_current_run() -> str:
def get_current_run() -> dict[str, int | LLMSpec]:
"""Get the current run id."""
return current_run
def set_current_run(spec):
def set_current_run(spec: LLMSpec) -> dict[str, int | LLMSpec]:
"""Set the current run id."""
current_run["id"] = hash(id(spec))
current_run["spec"] = spec
return current_run
def get_secrets():
def get_secrets() -> dict[str, str]:
return _secrets
def set_secrets(secrets):
def set_secrets(secrets: dict[str, str]) -> dict[str, str]:
_secrets.update(secrets)
expand_secrets(_secrets)
return _secrets
def expand_secrets(secrets):
def expand_secrets(secrets: dict[str, str]) -> None:
for key in secrets:
val = secrets[key]
if val.startswith("$"):
+2 -23
View File
@@ -1,26 +1,5 @@
from logging import config
from agentic_security.logutils import set_log_level_to_info
def setup_logging():
config.dictConfig(
{
"version": 1,
"disable_existing_loggers": True,
"handlers": {
"console": {
"class": "logging.StreamHandler",
},
},
"root": {
"handlers": ["console"],
"level": "INFO",
},
"loggers": {
"uvicorn.access": {
"level": "ERROR", # Set higher log level to suppress info logs globally
"handlers": ["console"],
"propagate": False,
}
},
}
)
return set_log_level_to_info()
+9
View File
@@ -1,5 +1,6 @@
import base64
from enum import Enum
from urllib.parse import urlparse
import httpx
from pydantic import BaseModel
@@ -159,6 +160,14 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
# Extract the method and URL from the first line
method, url = lines[0].split(" ")[0:2]
# Check url validity
valid_url = urlparse(url)
# if missing the correct formatting ://, urlparse.netloc will be empty
if valid_url.scheme not in ("http", "https") or not valid_url.netloc:
raise InvalidHTTPSpecError(
f"Invalid URL: {url}. Ensure it starts with 'http://' or 'https://'"
)
# Initialize headers and body
headers = {}
body = ""
+3 -3
View File
@@ -4,12 +4,12 @@ from datetime import datetime
import colorama
import tqdm.asyncio
from loguru import logger
from rich.console import Console
from rich.table import Table
from tabulate import tabulate
from agentic_security.config import SettingsMixin # Importing the configuration mixin
from agentic_security.logutils import logger
from agentic_security.primitives import Scan
from agentic_security.probe_data import REGISTRY
from agentic_security.routes.scan import streaming_response_generator
@@ -23,7 +23,7 @@ YELLOW = colorama.Fore.YELLOW
BLUE = colorama.Fore.BLUE
class AgenticSecurity(SettingsMixin):
class SecurityScanner(SettingsMixin):
@classmethod
async def async_scan(
cls,
@@ -202,7 +202,7 @@ class AgenticSecurity(SettingsMixin):
self.load_config(self.default_path)
logger.info("Configuration loaded successfully.")
print(self.config)
datasets = list(self.get_config_value("modules").values())
datasets = list(self.get_config_value("modules", {}).values())
for d in datasets:
d["selected"] = True
self.scan(
+85
View File
@@ -0,0 +1,85 @@
# import sys
# from loguru import logger
# # Define custom colors
# BLUE = "#89CFF0"
# BROWN = "#8B4513" # Brown for DEBUG
# # Define custom log level colors
# logger.level("DEBUG", color=f"<fg {BROWN}>")
# logger.level("INFO", color=f"<fg {BLUE}>")
# # Define custom log format with aligned messages and colored levels
# LOG_FORMAT = (
# "<level>{level:<8}</level> " # Properly formatted and colored log level
# "<level>{message:<100}</level> " # Left-aligned message for readability
# "<cyan>{file.name}</cyan>:<cyan>{line}</cyan>" # File name and line number in cyan
# )
# # Remove default handlers and add a new one with custom formatting
# logger.remove()
# logger.add(sys.stdout, format=LOG_FORMAT, level="DEBUG", colorize=True)
import logging
import logging.config
from os import getenv
LOGGER_NAME = None
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"rich": {"format": "%(message)s", "datefmt": "[%X]"},
},
"handlers": {
"rich": {
"class": "rich.logging.RichHandler",
"level": "INFO",
"formatter": "rich",
"show_time": False,
"rich_tracebacks": False,
"show_path": lambda: True if getenv("API_RUNTIME") == "dev" else False,
"tracebacks_show_locals": False,
},
},
"loggers": {
"": { # Root logger configuration
"level": "INFO",
"handlers": ["rich"],
"propagate": True,
},
},
}
def configure_logging():
# Apply the dictionary configuration
logging.config.dictConfig(LOGGING_CONFIG)
# Get and return the logger
logger = logging.getLogger(LOGGER_NAME)
return logger
logger: logging.Logger = configure_logging()
def set_log_level_to_debug():
logger = logging.getLogger(LOGGER_NAME)
logger.setLevel(logging.DEBUG)
# Update handler level as well
for handler in logger.handlers:
handler.setLevel(logging.DEBUG)
def set_log_level_to_info():
logger = logging.getLogger(LOGGER_NAME)
logger.setLevel(logging.INFO)
# Update handler level as well
for handler in logger.handlers:
handler.setLevel(logging.INFO)
# Set initial log level
set_log_level_to_info()
+2 -1
View File
@@ -1,7 +1,8 @@
from fastapi import Request
from loguru import logger
from starlette.middleware.base import BaseHTTPMiddleware
from agentic_security.logutils import logger
class LogNon200ResponsesMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
+9 -8
View File
@@ -8,14 +8,14 @@ except ImportError:
def generate_banner(
title="Agentic Security",
font="slant",
version="v2.1.0",
tagline="Proactive Threat Detection & Automated Security Protocols",
author="Developed by: [Security Team]",
website="Website: https://github.com/msoedov/agentic_security",
warning="",
):
title: str = "Agentic Security",
font: str = "slant",
version: str = "v2.1.0",
tagline: str = "Proactive Threat Detection & Automated Security Protocols",
author: str = "Developed by: [Security Team]",
website: str = "Website: https://github.com/msoedov/agentic_security",
warning: str | None = "", # Using Optional for warning since it might be None
) -> str:
"""Generate a visually enhanced banner with dynamic width and borders."""
# Define the text elements
@@ -80,6 +80,7 @@ def generate_banner(
def init_banner():
return
ver = version("agentic_security")
try:
print(generate_banner(version=ver))
+1 -1
View File
@@ -6,11 +6,11 @@ from json import JSONDecodeError
import httpx
import pandas as pd
from loguru import logger
from skopt import Optimizer
from skopt.space import Real
from agentic_security.http_spec import Modality
from agentic_security.logutils import logger
from agentic_security.primitives import Scan, ScanResult
from agentic_security.probe_actor.cost_module import calculate_cost
from agentic_security.probe_actor.refusal import refusal_heuristic
+1 -5
View File
@@ -1,5 +1,4 @@
import asyncio
import logging
from typing import Any
import httpx
@@ -7,13 +6,10 @@ from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from agentic_security.http_spec import LLMSpec
from agentic_security.logutils import logger
LLM_SPECS = []
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AgentSpecification(BaseModel):
name: str | None = Field(None, description="Name of the LLM/agent")
+1 -1
View File
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from agentic_security.refusal_classifier import RefusalClassifier
from agentic_security.refusal_classifier.model import RefusalClassifier
classifier = RefusalClassifier()
classifier.load_model()
+29 -5
View File
@@ -1,4 +1,5 @@
import base64
import logging
import os
import platform
import subprocess
@@ -7,6 +8,16 @@ import uuid
import httpx
from cache_to_disk import cache_to_disk
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AudioGenerationError(Exception):
"""Custom exception for errors during audio generation."""
pass
def encode(content: bytes) -> str:
encoded_content = base64.b64encode(content).decode("utf-8")
@@ -41,12 +52,25 @@ def generate_audio_mac_wav(prompt: str) -> bytes:
# Read the WAV file into memory
with open(temp_wav_path, "rb") as f:
audio_bytes = f.read()
except subprocess.CalledProcessError as e:
logger.error(f"Subprocess error: {e}")
raise AudioGenerationError("Failed to generate or convert audio.") from e
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
raise AudioGenerationError("Required file not found.") from e
except Exception as e:
logger.exception("Unexpected error occurred.")
raise AudioGenerationError(
"An unexpected error occurred during audio generation."
) from e
finally:
# Clean up the temporary files
if os.path.exists(temp_aiff_path):
os.remove(temp_aiff_path)
if os.path.exists(temp_wav_path):
os.remove(temp_wav_path)
for path in (temp_aiff_path, temp_wav_path):
try:
if os.path.exists(path):
os.remove(path)
except Exception as e:
logger.warning(f"Failed to delete temporary file {path}: {e}")
# Return the audio bytes
return audio_bytes
+282 -391
View File
@@ -1,14 +1,15 @@
import io
import os
import random
from functools import lru_cache
from collections.abc import Callable, Iterator
from functools import partial
import httpx
import pandas as pd
from cache_to_disk import cache_to_disk
from datasets import load_dataset
from loguru import logger
from agentic_security.logutils import logger
from agentic_security.probe_data import stenography_fn
from agentic_security.probe_data.models import ProbeDataset
from agentic_security.probe_data.modules import (
@@ -19,281 +20,200 @@ from agentic_security.probe_data.modules import (
rl_model,
)
# Type aliases for clarity
FilterFn = Callable[[pd.Series], bool]
ColumnMappings = dict[str, str]
DatasetLoader = Callable[[], ProbeDataset]
@cache_to_disk()
def load_dataset_general(
dataset_name,
dataset_split="train",
column_mappings=None,
filter_fn=None,
custom_url=None,
additional_metadata=None,
):
"""
Generalized function to load datasets with flexible configurations.
:param dataset_name: Name of the dataset or URL for custom CSVs
:param dataset_split: Split to load from the dataset (e.g., "train")
:param column_mappings: Dictionary mapping dataset columns to expected keys, e.g., {'prompt': 'query'}
:param filter_fn: A filtering function that takes a row and returns True/False
:param custom_url: URL for custom CSV datasets
:param additional_metadata: Additional metadata to include in the ProbeDataset
:return: A ProbeDataset object with the processed data
"""
if custom_url:
logger.info(f"Loading custom CSV dataset from {custom_url}")
r = httpx.get(custom_url)
content = r.content
df = pd.read_csv(io.StringIO(content.decode("utf-8")))
else:
logger.info(f"Loading dataset {dataset_name} from Hugging Face datasets")
dataset = load_dataset(dataset_name)
df = pd.DataFrame(dataset[dataset_split])
# Core data loading utilities
def fetch_csv_content(url: str) -> str:
"""Fetch CSV content from a URL."""
response = httpx.get(url)
return response.content.decode("utf-8")
# Apply column mappings if provided
if column_mappings:
df.rename(columns=column_mappings, inplace=True)
# Filter rows if filter_fn is provided
def load_df_from_source(source: str, is_url: bool = False) -> pd.DataFrame:
"""Load DataFrame from either URL or Hugging Face dataset."""
if is_url:
logger.info(f"Loading custom CSV from {source}")
return pd.read_csv(io.StringIO(fetch_csv_content(source)))
logger.info(f"Loading dataset {source} from Hugging Face")
return pd.DataFrame(load_dataset(source)["train"])
def transform_df(
df: pd.DataFrame,
mappings: ColumnMappings | None = None,
filter_fn: FilterFn | None = None,
) -> pd.DataFrame:
"""Apply transformations to DataFrame."""
result = df.copy()
if mappings:
result.rename(columns=mappings, inplace=True)
if filter_fn:
df = df[df.apply(filter_fn, axis=1)]
# Extract prompts
prompts = df[column_mappings.get("prompt", "prompt")].tolist()
return ProbeDataset(
dataset_name=dataset_name,
metadata=additional_metadata or {},
prompts=prompts,
tokens=count_words_in_list(prompts),
approx_cost=0.0,
)
result = result[result.apply(filter_fn, axis=1)]
return result
def count_words_in_list(str_list):
"""Calculate the total number of words in a given list of strings.
:param str_list: List of strings
:return: Total number of words across all strings in the list
"""
total_words = sum(len(str(s).split()) for s in str_list)
return total_words
@cache_to_disk()
def load_dataset_v1():
return load_dataset_general(
dataset_name="ShawnMenz/DAN_jailbreak",
column_mappings={"prompt": "prompt", "jailbreak": "jailbreak"},
filter_fn=lambda row: row["jailbreak"],
)
@cache_to_disk()
def load_dataset_v2():
return load_dataset_general(
dataset_name="deepset/prompt-injections",
column_mappings={"prompt": "text", "jailbreak": "label"},
filter_fn=lambda row: row["label"],
)
@cache_to_disk()
def load_dataset_v4():
return load_dataset_general(
dataset_name="notrichardren/refuse-to-answer-prompts",
column_mappings={"prompt": "claim", "jailbreak": "label"},
filter_fn=lambda row: row["label"],
)
@cache_to_disk()
def load_dataset_v3():
return load_dataset_general(
dataset_name="rubend18/ChatGPT-Jailbreak-Prompts",
column_mappings={"prompt": "Prompt"},
filter_fn=lambda row: row["label"],
)
@cache_to_disk()
def load_dataset_v6():
dataset = load_dataset("markush1/LLM-Jailbreak-Classifier")
bad_prompts = [
p
for p, c in zip(dataset["train"]["prompt"], dataset["train"]["classification"])
if c == "jailbreak"
]
return ProbeDataset(
dataset_name="markush1/LLM-Jailbreak-Classifier",
metadata={},
prompts=bad_prompts,
tokens=count_words_in_list(bad_prompts),
approx_cost=0.0,
)
@cache_to_disk()
def load_dataset_v7():
splits = {
"mini_JailBreakV_28K": "JailBreakV_28K/mini_JailBreakV_28K.csv",
"JailBreakV_28K": "JailBreakV_28K/JailBreakV_28K.csv",
}
df = pd.read_csv(
"hf://datasets/JailbreakV-28K/JailBreakV-28k/" + splits["JailBreakV_28K"]
)
bad_prompts = df["jailbreak_query"].tolist()
print(df.shape)
return ProbeDataset(
dataset_name="JailbreakV-28K/JailBreakV-28k",
metadata={},
prompts=bad_prompts,
tokens=count_words_in_list(bad_prompts),
approx_cost=0.0,
)
@cache_to_disk()
def load_dataset_v8():
df = pd.read_csv(
"hf://datasets/ShawnMenz/jailbreak_sft_rm_ds/jailbreak_sft_rm_ds.csv",
names=["jailbreak", "prompt"],
)
filtered = df[df["jailbreak"] == "jailbreak"]["prompt"].tolist()
return ProbeDataset(
dataset_name="JailbreakV-28K/JailBreakV-28k",
metadata={},
prompts=filtered,
tokens=count_words_in_list(filtered),
approx_cost=0.0,
)
@cache_to_disk()
def load_dataset_v5():
ds = []
for c in [
"AdvBench",
"ForbiddenQuestion",
"MJP",
"MaliciousInstruct",
"QuestionList",
]:
dataset = load_dataset("Lemhf14/EasyJailbreak_Datasets", c)
bad_prompts = dataset["train"]["query"]
ds.extend(bad_prompts)
return ProbeDataset(
dataset_name="Lemhf14/EasyJailbreak_Datasets",
metadata={},
prompts=ds,
tokens=count_words_in_list(ds),
approx_cost=0.0,
)
@cache_to_disk()
def load_generic_csv(url, name, column="prompt", predicator=None):
r = httpx.get(url)
content = r.content
df = pd.read_csv(io.StringIO(content.decode("utf-8")))
logger.info(f"Loaded {len(df)} prompts from {url}")
filtered_prompts = df[df.apply(predicator, axis=1)][column].tolist()
def create_probe_dataset(
name: str, prompts: list[str], metadata: dict = None
) -> ProbeDataset:
"""Create a ProbeDataset from prompts."""
metadata = metadata or {}
return ProbeDataset(
dataset_name=name,
metadata={},
prompts=filtered_prompts,
tokens=count_words_in_list(filtered_prompts),
metadata=metadata,
prompts=prompts,
tokens=sum(len(str(s).split()) for s in prompts),
approx_cost=0.0,
)
def prepare_prompts(dataset_names, budget, tools_inbox=None, options=[]):
# ## Datasets used and cleaned:
# markush1/LLM-Jailbreak-Classifier
# 1. Open-Orca/OpenOrca
# 2. ShawnMenz/DAN_jailbreak
# 3. EddyLuo/JailBreakV_28K
# 4. https://raw.githubusercontent.com/verazuo/jailbreak_llms/main/data/jailbreak_prompts.csv
dataset_map = {
"ShawnMenz/DAN_jailbreak": load_dataset_v1,
"deepset/prompt-injections": load_dataset_v2,
"notrichardren/refuse-to-answer-prompts": load_dataset_v4,
"rubend18/ChatGPT-Jailbreak-Prompts": load_dataset_v3,
"Lemhf14/EasyJailbreak_Datasets": load_dataset_v5,
"markush1/LLM-Jailbreak-Classifier": load_dataset_v6,
"JailbreakV-28K/JailBreakV-28k": load_dataset_v7,
"ShawnMenz/jailbreak_sft_rm_ds": load_dataset_v8,
"verazuo/jailbreak_llms/2023_05_07": lambda: load_generic_csv(
url="https://raw.githubusercontent.com/verazuo/jailbreak_llms/main/data/prompts/jailbreak_prompts_2023_05_07.csv",
name="verazuo/jailbreak_llms/2023_05_07",
column="prompt",
predicator=lambda x: bool(x["jailbreak"]),
),
"verazuo/jailbreak_llms/2023_12_25.csv": lambda: load_generic_csv(
url="https://raw.githubusercontent.com/verazuo/jailbreak_llms/main/data/prompts/jailbreak_prompts_2023_12_25.csv.csv",
name="verazuo/jailbreak_llms/2023_12_25.csv",
column="prompt",
predicator=lambda x: bool(x["jailbreak"]),
),
"Custom CSV": load_local_csv,
}
dataset_map.update(dataset_map_generics)
group = []
for dataset_name in dataset_names:
if dataset_name in dataset_map:
logger.info(f"Loading {dataset_name}")
try:
group.append(dataset_map[dataset_name]())
except Exception as e:
logger.error(f"Error loading {dataset_name}: {e}")
dynamic_datasets = {
"AgenticBackend": lambda opts: dataset_from_iterator(
"AgenticBackend",
fine_tuned.Module(group, tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"Steganography": lambda opts: Stenography(group),
"llm-adaptive-attacks": lambda opts: dataset_from_iterator(
"llm-adaptive-attacks",
adaptive_attacks.Module(group, tools_inbox=tools_inbox, opts=opts).apply(),
),
"Garak": lambda opts: dataset_from_iterator(
"Garak",
garak_tool.Module(group, tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"Reinforcement Learning Optimization": lambda opts: dataset_from_iterator(
"Reinforcement Learning Optimization",
rl_model.Module(group, tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"InspectAI": lambda opts: dataset_from_iterator(
"InspectAI",
inspect_ai_tool.Module(group, tools_inbox=tools_inbox).apply(),
lazy=True,
),
"GPT fuzzer": lambda opts: [],
}
dynamic_groups = []
options = options or [{} for _ in dataset_names]
for dataset_name, opts in zip(dataset_names, options):
if dataset_name in dynamic_datasets:
logger.info(f"Loading {dataset_name}")
ds = dynamic_datasets[dataset_name](opts)
for g in ds:
dynamic_groups.append(g)
return group + dynamic_groups
# Generalized dataset loader
@cache_to_disk()
def load_dataset_generic(
name: str,
mappings: ColumnMappings | None = None,
filter_fn: FilterFn | None = None,
url: str | None = None,
metadata: dict | None = None,
) -> ProbeDataset:
"""Load and process a dataset with flexible configuration."""
df = load_df_from_source(url or name, is_url=bool(url))
transformed_df = transform_df(df, mappings, filter_fn)
prompt_col = mappings.get("prompt", "prompt") if mappings else "prompt"
prompts = transformed_df[prompt_col].tolist()
return create_probe_dataset(name, prompts, metadata)
class Stenography:
fn_library = {
# Dataset-specific configurations
DATASET_CONFIGS = {
"ShawnMenz/DAN_jailbreak": {
"mappings": {"prompt": "prompt"},
"filter_fn": lambda row: row["jailbreak"],
},
"deepset/prompt-injections": {
"mappings": {"prompt": "text"},
"filter_fn": lambda row: row["label"],
},
"notrichardren/refuse-to-answer-prompts": {
"mappings": {"prompt": "claim"},
"filter_fn": lambda row: row["label"],
},
"rubend18/ChatGPT-Jailbreak-Prompts": {
"mappings": {"prompt": "Prompt"},
"filter_fn": lambda row: row["label"],
},
"markush1/LLM-Jailbreak-Classifier": {
"mappings": {"prompt": "prompt"},
"filter_fn": lambda row: row["classification"] == "jailbreak",
},
"ShawnMenz/jailbreak_sft_rm_ds": {
"url": "hf://datasets/ShawnMenz/jailbreak_sft_rm_ds/jailbreak_sft_rm_ds.csv",
"mappings": {"prompt": "prompt"},
"filter_fn": lambda row: row["jailbreak"] == "jailbreak",
},
"verazuo/jailbreak_llms/2023_05_07": {
"url": "https://raw.githubusercontent.com/verazuo/jailbreak_llms/main/data/prompts/jailbreak_prompts_2023_05_07.csv",
"mappings": {"prompt": "prompt"},
"filter_fn": lambda row: bool(row["jailbreak"]),
},
"verazuo/jailbreak_llms/2023_12_25": {
"url": "https://raw.githubusercontent.com/verazuo/jailbreak_llms/main/data/prompts/jailbreak_prompts_2023_12_25.csv",
"mappings": {"prompt": "prompt"},
"filter_fn": lambda row: bool(row["jailbreak"]),
},
}
# Additional generic dataset configurations
DATASET_CONFIGS_GENERICS = {
"simonycl/aya-23-8B_advbench_jailbreak": {"mappings": {"prompt": "prompt"}},
"acmc/jailbreaks_dataset_with_perplexity_bigcode_starcoder2-3b_bigcode_starcoder2-7b": {},
"karanxa/dolphin-jailbreak-finetuning-dataset": {"mappings": {"prompt": "text"}},
"karanxa/llama-2-jailbreak-dataset": {"mappings": {"prompt": "text"}},
"karanxa/llama2-uncensored-jailbreak-dataset-finetuning": {
"mappings": {"prompt": "text"}
},
"liuyanchen1015/Llama-3.2-1B_jailbreak_responses": {
"mappings": {"prompt": "jailbreak_prompt_text"}
},
"liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses": {
"mappings": {"prompt": "jailbreak_prompt_text"}
},
"liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses_with_judgment": {
"mappings": {"prompt": "jailbreak_prompt_text"}
},
"jackhhao/jailbreak-classification": {"mappings": {"prompt": "prompt"}},
"walledai/JailbreakBench": {"mappings": {"prompt": "prompt"}},
"walledai/JailbreakHub": {"mappings": {"prompt": "prompt"}},
"Granther/evil-jailbreak": {"mappings": {"prompt": "text"}},
"sevdeawesome/jailbreak_success": {"mappings": {"prompt": "jailbreak_prompt_text"}},
"IDA-SERICS/Disaster-tweet-jailbreaking": {"mappings": {"prompt": "prompt_attack"}},
"GeorgeDaDude/Jailbreak_Complete_DS_labeled": {"mappings": {"prompt": "question"}},
"dayone3nder/jailbreak_prompt_JBB_sft_trainset": {"mappings": {"prompt": "prompt"}},
"dayone3nder/general_safe_mix_jailbreak_prompt_JBB_trainset": {
"mappings": {"prompt": "prompt"}
},
}
# Dataset factory
def create_dataset_loader(name: str, config: dict) -> DatasetLoader:
"""Create a dataset loader from configuration."""
return partial(
load_dataset_generic,
name=name,
mappings=config.get("mappings"),
filter_fn=config.get("filter_fn"),
url=config.get("url"),
)
# Specialized loaders
@cache_to_disk()
def load_multi_dataset(name: str, sub_datasets: list[str]) -> ProbeDataset:
"""Load and combine multiple sub-datasets."""
prompts = []
for sub in sub_datasets:
dataset = load_dataset(name, sub)
prompts.extend(dataset["train"]["query"])
return create_probe_dataset(f"{name}_combined", prompts)
@cache_to_disk()
def load_jailbreak_v28k() -> ProbeDataset:
"""Load JailBreakV-28K dataset."""
df = pd.read_csv("hf://datasets/JailbreakV-28K/JailBreakV-28k/JailBreakV_28K.csv")
prompts = df["jailbreak_query"].tolist()
return create_probe_dataset("JailbreakV-28K/JailBreakV-28k", prompts)
@cache_to_disk()
def load_local_csv() -> ProbeDataset:
"""Load prompts from local CSV files."""
csv_files = [f for f in os.listdir(".") if f.endswith(".csv")]
logger.info(f"Found {len(csv_files)} CSV files: {csv_files}")
prompts = []
for file in csv_files:
try:
df = pd.read_csv(file)
if "prompt" in df.columns:
prompts.extend(df["prompt"].tolist())
else:
logger.warning(f"File {file} lacks 'prompt' column")
except Exception as e:
logger.error(f"Error reading {file}: {e}")
return create_probe_dataset("Local CSV", prompts, {"src": str(csv_files)})
# Stenography transformer
class StenographyTransformer:
"""Apply stenography transformations to datasets."""
TRANSFORMATIONS = {
"rot5": stenography_fn.rot5,
"rot13": stenography_fn.rot13,
"base64": stenography_fn.base64_encode,
@@ -306,71 +226,32 @@ class Stenography:
"zigzag_obfuscation": stenography_fn.zigzag_obfuscation,
}
def __init__(self, prompt_groups: [ProbeDataset]):
self.prompt_groups = prompt_groups
def __init__(self, datasets: list[ProbeDataset]):
self.datasets = datasets
def __iter__(self):
return self.apply()
def transform(self) -> Iterator[ProbeDataset]:
"""Generate transformed datasets."""
for dataset in self.datasets:
size = len(dataset.prompts)
sample_size = int(size * 0.25)
def apply(self):
for prompt_group in self.prompt_groups:
size = len(prompt_group.prompts)
for name, fn in self.fn_library.items():
logger.info(f"Applying {name} to {prompt_group.dataset_name}")
selected_prompts = random.sample(prompt_group.prompts, int(size * 0.25))
prompts = [
f"Follow the instructions in {name}: {fn(p)}"
for p in selected_prompts
for name, fn in self.TRANSFORMATIONS.items():
logger.info(f"Applying {name} to {dataset.dataset_name}")
sampled = random.sample(dataset.prompts, sample_size)
transformed = [
f"Follow the instructions in {name}: {fn(p)}" for p in sampled
]
yield ProbeDataset(
dataset_name=f"stenography.{name}({prompt_group.dataset_name})",
metadata={},
prompts=prompts,
tokens=count_words_in_list(prompts),
approx_cost=0.0,
yield create_probe_dataset(
f"stenography.{name}({dataset.dataset_name})", transformed
)
@lru_cache
def load_local_csv() -> ProbeDataset:
csv_files = [f for f in os.listdir(".") if f.endswith(".csv")]
logger.info(f"Found {len(csv_files)} CSV files")
logger.info(f"CSV files: {csv_files}")
prompt_list = []
for file in csv_files:
try:
df = pd.read_csv(file)
except Exception as e:
logger.error(f"Error reading {file}: {e}")
continue
# Check if 'prompt' column exists
if "prompt" in df.columns:
prompt_list.extend(df["prompt"].tolist())
else:
logger.warning(f"File {file} does not contain a 'prompt' column")
return ProbeDataset(
dataset_name="Local CSV",
metadata={"src": str(csv_files)},
prompts=prompt_list,
tokens=count_words_in_list(prompt_list),
approx_cost=0.0,
)
def dataset_from_iterator(name: str, iterator, lazy=False) -> list:
"""Convert an iterator into a list of prompts and create a ProbeDataset
object.
Args:
name (str): The name of the dataset.
iterator (iterator): An iterator that generates prompts.
Returns:
list: A list containing a single ProbeDataset object.
"""
def dataset_from_iterator(
name: str, iterator, lazy: bool = False
) -> list[ProbeDataset]:
"""Convert an iterator into a list of ProbeDataset objects."""
prompts = list(iterator) if not lazy else iterator
tokens = count_words_in_list(prompts) if not lazy else 0
tokens = sum(len(str(s).split()) for s in prompts) if not lazy else 0
dataset = ProbeDataset(
dataset_name=name,
metadata={},
@@ -382,75 +263,85 @@ def dataset_from_iterator(name: str, iterator, lazy=False) -> list:
return [dataset]
# TODO: refactor this abstraction
# Main dataset preparation
def prepare_prompts(
dataset_names: list[str],
budget: float,
tools_inbox=None,
options: list[dict] = None,
) -> list[ProbeDataset]:
"""Prepare datasets based on names and options."""
# Base dataset loaders
dataset_loaders = {
**{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS.items()},
**{k: create_dataset_loader(k, v) for k, v in DATASET_CONFIGS_GENERICS.items()},
"Lemhf14/EasyJailbreak_Datasets": partial(
load_multi_dataset,
name="Lemhf14/EasyJailbreak_Datasets",
sub_datasets=[
"AdvBench",
"ForbiddenQuestion",
"MJP",
"MaliciousInstruct",
"QuestionList",
],
),
"JailbreakV-28K/JailBreakV-28k": load_jailbreak_v28k,
"Local CSV": load_local_csv,
}
dataset_map_generics = {
"simonycl/aya-23-8B_advbench_jailbreak": lambda: load_dataset_general(
dataset_name="simonycl/aya-23-8B_advbench_jailbreak",
column_mappings={"prompt": "prompt"},
),
"acmc/jailbreaks_dataset_with_perplexity_bigcode_starcoder2-3b_bigcode_starcoder2-7b": lambda: load_dataset_general(
dataset_name="acmc/jailbreaks_dataset_with_perplexity_bigcode_starcoder2-3b_bigcode_starcoder2-7b"
),
"karanxa/dolphin-jailbreak-finetuning-dataset": lambda: load_dataset_general(
dataset_name="karanxa/dolphin-jailbreak-finetuning-dataset",
column_mappings={"prompt": "text"},
),
"karanxa/llama-2-jailbreak-dataset": lambda: load_dataset_general(
dataset_name="karanxa/llama-2-jailbreak-dataset",
column_mappings={"prompt": "text"},
),
"karanxa/llama2-uncensored-jailbreak-dataset-finetuning": lambda: load_dataset_general(
dataset_name="karanxa/llama2-uncensored-jailbreak-dataset-finetuning",
column_mappings={"prompt": "text"},
),
"liuyanchen1015/Llama-3.2-1B_jailbreak_responses": lambda: load_dataset_general(
dataset_name="liuyanchen1015/Llama-3.2-1B_jailbreak_responses",
column_mappings={"prompt": "jailbreak_prompt_text"},
),
"liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses": lambda: load_dataset_general(
dataset_name="liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses",
column_mappings={"prompt": "jailbreak_prompt_text"},
),
"liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses_with_judgment": lambda: load_dataset_general(
dataset_name="liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses_with_judgment",
column_mappings={"prompt": "jailbreak_prompt_text"},
),
"jackhhao/jailbreak-classification": lambda: load_dataset_general(
dataset_name="jackhhao/jailbreak-classification",
column_mappings={"prompt": "prompt"},
),
"markush1/LLM-Jailbreak-Classifier": lambda: load_dataset_general(
dataset_name="markush1/LLM-Jailbreak-Classifier",
column_mappings={"prompt": "prompt"},
),
"walledai/JailbreakBench": lambda: load_dataset_general(
dataset_name="walledai/JailbreakBench", column_mappings={"prompt": "prompt"}
),
"walledai/JailbreakHub": lambda: load_dataset_general(
dataset_name="walledai/JailbreakHub", column_mappings={"prompt": "prompt"}
),
"Granther/evil-jailbreak": lambda: load_dataset_general(
dataset_name="Granther/evil-jailbreak", column_mappings={"prompt": "text"}
),
"sevdeawesome/jailbreak_success": lambda: load_dataset_general(
dataset_name="sevdeawesome/jailbreak_success",
column_mappings={"prompt": "jailbreak_prompt_text"},
),
"IDA-SERICS/Disaster-tweet-jailbreaking": lambda: load_dataset_general(
dataset_name="IDA-SERICS/Disaster-tweet-jailbreaking",
column_mappings={"prompt": "prompt_attack"},
),
"GeorgeDaDude/Jailbreak_Complete_DS_labeled": lambda: load_dataset_general(
dataset_name="GeorgeDaDude/Jailbreak_Complete_DS_labeled",
column_mappings={"prompt": "question"},
),
"dayone3nder/jailbreak_prompt_JBB_sft_trainset": lambda: load_dataset_general(
dataset_name="dayone3nder/jailbreak_prompt_JBB_sft_trainset",
column_mappings={"prompt": "prompt"},
),
"dayone3nder/general_safe_mix_jailbreak_prompt_JBB_trainset": lambda: load_dataset_general(
dataset_name="dayone3nder/general_safe_mix_jailbreak_prompt_JBB_trainset",
column_mappings={"prompt": "prompt"},
),
}
# Dynamic dataset loaders
dynamic_loaders = {
"AgenticBackend": lambda opts: dataset_from_iterator(
"AgenticBackend",
fine_tuned.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"Steganography": lambda opts: list(StenographyTransformer([]).transform()),
"llm-adaptive-attacks": lambda opts: dataset_from_iterator(
"llm-adaptive-attacks",
adaptive_attacks.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
),
"Garak": lambda opts: dataset_from_iterator(
"Garak",
garak_tool.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"Reinforcement Learning Optimization": lambda opts: dataset_from_iterator(
"Reinforcement Learning Optimization",
rl_model.Module([], tools_inbox=tools_inbox, opts=opts).apply(),
lazy=True,
),
"InspectAI": lambda opts: dataset_from_iterator(
"InspectAI",
inspect_ai_tool.Module([], tools_inbox=tools_inbox).apply(),
lazy=True,
),
"GPT fuzzer": lambda opts: [],
}
options = options or [{} for _ in dataset_names]
datasets = []
# Load base datasets
for name, opts in zip(dataset_names, options):
if name in dataset_loaders:
logger.info(f"Loading base dataset {name}")
try:
datasets.append(dataset_loaders[name]())
except Exception as e:
logger.error(f"Error loading {name}: {e}")
# Load dynamic datasets and apply transformations
for name, opts in zip(dataset_names, options):
if name in dynamic_loaders:
logger.info(f"Loading dynamic dataset {name}")
try:
dynamic_result = dynamic_loaders[name](opts)
datasets.extend(dynamic_result)
except Exception as e:
logger.error(f"Error loading dynamic {name}: {e}")
elif name == "Steganography":
datasets.extend(list(StenographyTransformer(datasets).transform()))
return datasets
@@ -3,7 +3,8 @@ import io
import httpx
import pandas as pd
from loguru import logger
from agentic_security.logutils import logger
url = "https://raw.githubusercontent.com/tml-epfl/llm-adaptive-attacks/main/harmful_behaviors/harmful_behaviors_pair.csv"
@@ -3,7 +3,8 @@ import os
import uuid as U
import httpx
from loguru import logger
from agentic_security.logutils import logger
AUTH_TOKEN: str = os.getenv("AS_TOKEN", "gh0-5f4a8ed2-37c6-4bd7-a0cf-7070eae8115b")
@@ -65,7 +66,7 @@ class Module:
return {}
async def fetch_prompts(self) -> list[str]:
api_url = "https://edge.metaheuristic.co/infer"
api_url = "https://mcp.metaheuristic.co/infer"
headers = {
"Authorization": f"Bearer {AUTH_TOKEN}",
"Content-Type": "application/json",
@@ -4,7 +4,7 @@ import json
import os
import subprocess
from loguru import logger
from agentic_security.logutils import logger
# TODO: add probes modules
@@ -2,7 +2,7 @@ import asyncio
import importlib.util
import os
from loguru import logger
from agentic_security.logutils import logger
inspect_ai_task = (
__file__.replace("inspect_ai_tool.py", "inspect_ai_task.py")
@@ -8,7 +8,8 @@ from typing import Deque
import numpy as np
import requests
from loguru import logger
from agentic_security.logutils import logger
AUTH_TOKEN: str = os.getenv("AS_TOKEN", "gh0-5f4a8ed2-37c6-4bd7-a0cf-7070eae8115b")
@@ -215,7 +216,7 @@ class Module:
self.run_id = U.uuid4().hex
self.batch_size = self.opts.get("batch_size", 500)
self.rl_model = CloudRLPromptSelector(
prompt_groups, "https://edge.metaheuristic.co", run_id=self.run_id
prompt_groups, "https://mcp.metaheuristic.co", run_id=self.run_id
)
async def apply(self):
@@ -36,7 +36,7 @@ def mock_requests() -> Mock:
def mock_rl_selector() -> Mock:
return CloudRLPromptSelector(
dataset_prompts,
api_url="https://edge.metaheuristic.co",
api_url="https://mcp.metaheuristic.co",
)
@@ -94,7 +94,7 @@ class TestCloudRLPromptSelector:
def test_select_next_prompt_success_service(self, dataset_prompts):
selector = CloudRLPromptSelector(
dataset_prompts,
api_url="https://edge.metaheuristic.co",
api_url="https://mcp.metaheuristic.co",
)
next_prompt = selector.select_next_prompt(
"How does RL work?", passed_guard=True
+81 -11
View File
@@ -7,9 +7,65 @@ import pandas as pd
from matplotlib.cm import ScalarMappable
from matplotlib.colors import LinearSegmentedColormap, Normalize
from agentic_security.logutils import logger
def plot_security_report(table):
from .primitives import Table
def plot_security_report(table: Table) -> io.BytesIO:
"""
Generates a polar plot representing the security report based on the given data.
Args:
table (Table): The input data table containing security metrics.
Returns:
io.BytesIO: A buffer containing the generated plot image in PNG format.
Returns an empty buffer in case of an error.
"""
try:
return _plot_security_report(table=table)
except (TypeError, ValueError, OverflowError, IndexError, Exception) as e:
logger.error(f"Error in generating the security report: {e}")
return io.BytesIO()
def generate_identifiers(data: pd.DataFrame) -> list[str]:
"""
Generates unique identifiers for the given dataset.
Args:
data (pd.DataFrame): A pandas DataFrame containing security-related data.
Returns:
list[str]: A list of generated identifiers. Returns a list with an empty string in case of an error.
"""
try:
_generate_identifiers(data=data)
except (TypeError, ValueError, Exception) as e:
logger.error(f"Error in generate_identifiers: {e}")
return [""]
def _plot_security_report(table: Table) -> io.BytesIO:
"""
Generates a polar plot-based security report visualizing the failure rates for different modules.
This function processes the input data, sorts it by failure rate, and generates a polar plot
where each bar represents the failure rate for a specific module. The plot includes identifiers,
color-coding based on token count, failure rate values on the bars, and a table listing the modules
and their corresponding failure rates.
Args:
table (Table): A table-like structure (e.g., pandas DataFrame) containing security report data
with columns for failure rate, tokens, and modules.
Returns:
io.BytesIO: A buffer containing the generated plot image in PNG format.
"""
# Data preprocessing
logger.info("Data preprocessing started.")
data = pd.DataFrame(table)
# Sort by failure rate and reset index
@@ -20,10 +76,10 @@ def plot_security_report(table):
fig, ax = plt.subplots(figsize=(12, 10), subplot_kw={"projection": "polar"})
fig.set_facecolor("#f0f0f0")
ax.set_facecolor("#f0f0f0")
logger.info("Plot setup complete.")
# Styling parameters
colors = ["#6C5B7B", "#C06C84", "#F67280", "#F8B195"][::-1] # Pastel palette
# colors = ["#440154", "#3b528b", "#21908c", "#5dc863"] # Viridis-inspired palette
cmap = LinearSegmentedColormap.from_list("custom", colors, N=256)
norm = Normalize(vmin=data["tokens"].min(), vmax=data["tokens"].max())
@@ -74,7 +130,10 @@ def plot_security_report(table):
# Title and caption
fig.suptitle(
"Security Report for Different Modules", fontsize=16, fontweight="bold", y=1.02
"Security Report for Different Modules",
fontsize=16,
fontweight="bold",
y=1.02,
)
caption = "Report generated by https://github.com/msoedov/agentic_security"
fig.text(
@@ -112,17 +171,12 @@ def plot_security_report(table):
data["identifier"], data["failureRate"], data["module"]
)
]
table = ax.table(
cellText=table_data,
loc="right",
cellLoc="left",
)
table = ax.table(cellText=table_data, loc="right", cellLoc="left")
table.auto_set_font_size(False)
table.set_fontsize(8)
# Adjust table style
table.scale(1, 0.7)
for (row, col), cell in table.get_celld().items():
cell.set_edgecolor("none")
cell.set_facecolor("#f0f0f0" if row % 2 == 0 else "#e0e0e0")
@@ -132,17 +186,33 @@ def plot_security_report(table):
cell.set_text_props(fontweight="bold")
# Adjust layout and save
plt.tight_layout()
buf = io.BytesIO()
plt.savefig(buf, format="png", dpi=300, bbox_inches="tight")
plt.close(fig)
buf.seek(0)
logger.info("Report successfully generated and saved to buffer.")
return buf
def generate_identifiers(data):
def _generate_identifiers(data: pd.DataFrame) -> list[str]:
"""
Generates a list of unique identifiers for each row in the given DataFrame.
The identifiers are based on the English alphabet, with each identifier consisting
of a letter followed by a number. The letter represents the "group" of identifiers
(using a letter from A to Z) and the number is a counter within that group. For example:
- A1, A2, ..., A26, B1, B2, ..., Z1, Z2, ...
Args:
data (pd.DataFrame): The input DataFrame containing data for which identifiers
are to be generated.
Returns:
list[str]: A list of unique identifiers as strings, one for each row in the DataFrame.
"""
data_length = len(data)
alphabet = string.ascii_uppercase
num_letters = len(alphabet)
+2 -1
View File
@@ -2,7 +2,8 @@ import random
from asyncio import Event
from fastapi import APIRouter
from loguru import logger
from agentic_security.logutils import logger
from ..core.app import get_current_run, get_tools_inbox
from ..primitives import CompletionRequest, Settings
+9 -6
View File
@@ -1,4 +1,6 @@
from collections.abc import Generator
from datetime import datetime
from typing import Any
from fastapi import (
APIRouter,
@@ -10,7 +12,8 @@ from fastapi import (
UploadFile,
)
from fastapi.responses import StreamingResponse
from loguru import logger
from agentic_security.logutils import logger
from ..core.app import get_stop_event, get_tools_inbox, set_current_run
from ..dependencies import InMemorySecrets, get_in_memory_secrets
@@ -24,7 +27,7 @@ router = APIRouter()
@router.post("/verify")
async def verify(
info: LLMInfo, secrets: InMemorySecrets = Depends(get_in_memory_secrets)
):
) -> dict[str, int | str | float]:
spec = LLMSpec.from_string(info.spec)
try:
r = await spec.verify()
@@ -42,7 +45,7 @@ async def verify(
)
def streaming_response_generator(scan_parameters: Scan):
def streaming_response_generator(scan_parameters: Scan) -> Generator[str, Any, None]:
request_factory = LLMSpec.from_string(scan_parameters.llmSpec)
set_current_run(request_factory)
@@ -63,7 +66,7 @@ async def scan(
scan_parameters: Scan,
background_tasks: BackgroundTasks,
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
):
) -> StreamingResponse:
scan_parameters.with_secrets(secrets)
return StreamingResponse(
streaming_response_generator(scan_parameters), media_type="application/json"
@@ -71,7 +74,7 @@ async def scan(
@router.post("/stop")
async def stop_scan():
async def stop_scan() -> dict[str, str]:
get_stop_event().set()
return {"status": "Scan stopped"}
@@ -85,7 +88,7 @@ async def scan_csv(
maxBudget: int = Query(10_000),
enableMultiStepAttack: bool = Query(False),
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
):
) -> StreamingResponse:
# TODO: content dataset to fuzzer
content = await file.read() # noqa
llm_spec = await llmSpec.read()
+2 -1
View File
@@ -1,7 +1,8 @@
import sentry_sdk
from loguru import logger
from sentry_sdk.integrations.logging import ignore_logger
from agentic_security.logutils import logger
from ..primitives import Settings
+3 -2
View File
@@ -101,7 +101,7 @@ Content-Type: application/json
]
`,
`POST ${SELF_URL}/v1/self-probe-file
Authorization: Bearer $GROQ_API_KEY
Authorization: Bearer $API_KEY
Content-Type: multipart/form-data
{
@@ -204,8 +204,9 @@ let LLM_CONFIGS = [
{ name: 'Claude', prompts: 40000, logo: '/icons/claude.png' },
{ name: 'Cohere', prompts: 40000, logo: '/icons/cohere.png' },
{ name: 'Azure OpenAI', prompts: 40000, logo: '/icons/azureai.png' },
{ name: 'OpenRouter.ai', prompts: 40000, logo: '/icons/openrouter.png' },
{ name: 'assemblyai', prompts: 40000, logo: fallbackIcon },
{ name: 'OpenRouter.ai', prompts: 40000, logo: '/icons/openrouter.png' },
];
function has_image(spec) {
return spec.includes('<<BASE64_IMAGE>>');
+26
View File
@@ -63,6 +63,32 @@
</div>
</section>
</main>
<div class="fixed top-6 right-6 z-50 space-y-3">
<transition-group name="toast">
<div
v-for="toast in toasts"
:key="toast.id"
class="flex items-center p-3 rounded-xl shadow-xl text-white max-w-md animate-toast-in border border-opacity-30"
:class="{
'bg-success-toast border-accent-green': toast.type === 'success',
'bg-error-toast border-accent-red': toast.type === 'error',
'bg-info-toast border-accent-orange': toast.type === 'info'
}"
>
<span class="flex-1 font-medium tracking-wide text-sm">{{ toast.message }}</span>
<button
@click="removeToast(toast.id)"
class="ml-3 focus:outline-none hover:opacity-80 transition-opacity"
>
<svg class="w-4 h-4" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M6 18L18 6M6 6l12 12" />
</svg>
</button>
</div>
</transition-group>
</div>
<main class="max-w-6xl mx-auto space-y-8">
<section class="bg-dark-card rounded-lg p-6 shadow-lg" v-show="false">
<h2 class="text-2xl font-bold mb-4">Select a Config</h2>
+17
View File
@@ -25,6 +25,8 @@ var app = new Vue({
showModules: false,
showLogs: false,
showConsentModal: true,
toasts: [], // Array to store toast notifications
toastTimeout: 3000, // Duration in milliseconds (3 seconds)
statusDotClass: 'bg-gray-500', // Default status dot class
statusText: 'Verified', // Default status text
statusClass: 'bg-green-500 text-dark-bg', // Default status class
@@ -93,6 +95,19 @@ var app = new Vue({
},
methods: {
showToast(message, type = 'success') {
const id = Date.now(); // Unique ID for each toast
this.toasts.push({ id, message, type });
// Automatically remove toast after timeout
setTimeout(() => {
this.removeToast(id);
}, this.toastTimeout);
},
removeToast(id) {
this.toasts = this.toasts.filter(toast => toast.id !== id);
},
focusTextarea() {
this.isFocused = true;
self = this.$refs;
@@ -210,10 +225,12 @@ var app = new Vue({
if (!response.ok) {
this.updateStatusDot(false);
this.errorMsg = 'Integration verification failed:' + JSON.stringify(r);
this.showToast('Integration verification failed', 'error');
} else {
this.errorMsg = '';
this.updateStatusDot(true);
this.okMsg = 'Integration verified';
this.showToast('Integration verified successfully', 'success');
this.integrationVerified = true;
// console.log('Integration verified', this.integrationVerified);
// this.$forceUpdate();
+54 -2
View File
@@ -19,7 +19,7 @@
technopollas: ['Technopollas', 'sans-serif'],
},
colors: {
t1: {
dark: {
bg: '#0D0D0D', // Jet Black
card: '#1A1A1A', // Dark Carbon Fiber
text: '#FFFFFF',
@@ -30,7 +30,7 @@
yellow: '#2E4053', // Jet Black
},
},
dark: {
dark1: {
bg: '#121212',
card: '#1E1E1E',
text: '#FFFFFF',
@@ -86,6 +86,7 @@
}
}
</script>
<style>
.scrollbar-hide::-webkit-scrollbar {
display: none;
@@ -96,4 +97,55 @@
}
</style>
<style>
/* Toast-specific colors */
.bg-success-toast {
background: #1C3F74
}
.bg-error-toast {
background: #85144B
}
.bg-info-toast {
background: #FFC300
}
.border-accent-green, .border-accent-red, .border-accent-orange {
border-color: rgba(255, 255, 255, 0.1); /* Subtle white border for depth */
}
/* Animation */
.animate-toast-in {
animation: tSlideIn 0.4s cubic-bezier(0.25, 0.1, 0.25, 1);
}
@keyframes tSlideIn {
from {
transform: translateX(120%) scale(0.95);
opacity: 0;
}
to {
transform: translateX(0) scale(1);
opacity: 1;
}
}
.toast-enter-active,
.toast-leave-active {
transition: all 0.4s cubic-bezier(0.25, 0.1, 0.25, 1);
}
.toast-enter-from,
.toast-leave-to {
transform: translateX(120%) scale(0.95);
opacity: 0;
}
/* Hover effect */
[toast-type]:hover {
transform: translateY(-2px);
box-shadow: 0 6px 20px rgba(0, 0, 0, 0.6);
}
</style>
</head>
+13 -17
View File
@@ -33,7 +33,7 @@ The `LLMSpec` class is the core of the HTTP specification. It provides the follo
### Methods
- **`from_string(http_spec: str) -> LLMSpec`**: Parses an HTTP specification string into an `LLMSpec` object.
- **`validate(prompt: str, encoded_image: str, encoded_audio: str, files: dict) -> None`**: Validates the request parameters based on the specified modality.
- **`validate(prompt: str, encoded_image: str, encoded_audio: str, files: dict) -> null`**: Validates the request parameters based on the specified modality.
- **`probe(prompt: str, encoded_image: str = "", encoded_audio: str = "", files: dict = {}) -> httpx.Response`**: Sends an HTTP request using the specified parameters.
- **`verify() -> httpx.Response`**: Verifies the HTTP specification by sending a test request.
@@ -52,12 +52,11 @@ Authorization: Bearer sk-xxxxxxxxx
Content-Type: application/json
{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "<<PROMPT>>"}],
"temperature": 0.7
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "<<PROMPT>>"}],
"temperature": 0.7
}
"""
spec = LLMSpec.from_string(http_spec)
response = await spec.probe("What is the capital of France?")
```
@@ -71,12 +70,11 @@ Authorization: Bearer sk-xxxxxxxxx
Content-Type: application/json
{
"model": "gpt-4-vision-preview",
"messages": [{"role": "user", "content": "What is in this image? <<BASE64_IMAGE>>"}],
"temperature": 0.7
"model": "gpt-4-vision-preview",
"messages": [{"role": "user", "content": "What is in this image? <<BASE64_IMAGE>>"}],
"temperature": 0.7
}
"""
spec = LLMSpec.from_string(http_spec)
encoded_image = encode_image_base64_by_url("https://example.com/image.jpg")
response = await spec.probe("What is in this image?", encoded_image=encoded_image)
@@ -91,12 +89,11 @@ Authorization: Bearer sk-xxxxxxxxx
Content-Type: application/json
{
"model": "whisper-large-v3",
"messages": [{"role": "user", "content": "Transcribe this audio: <<BASE64_AUDIO>>"}],
"temperature": 0.7
"model": "whisper-large-v3",
"messages": [{"role": "user", "content": "Transcribe this audio: <<BASE64_AUDIO>>"}],
"temperature": 0.7
}
"""
spec = LLMSpec.from_string(http_spec)
encoded_audio = encode_audio_base64_by_url("https://example.com/audio.mp3")
response = await spec.probe("Transcribe this audio:", encoded_audio=encoded_audio)
@@ -111,12 +108,11 @@ Authorization: Bearer sk-xxxxxxxxx
Content-Type: multipart/form-data
{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Process this file: <<FILE>>"}],
"temperature": 0.7
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Process this file: <<FILE>>"}],
"temperature": 0.7
}
"""
spec = LLMSpec.from_string(http_spec)
files = {"file": ("document.txt", open("document.txt", "rb"))}
response = await spec.probe("Process this file:", files=files)
Binary file not shown.

After

Width:  |  Height:  |  Size: 9.0 MiB

+8 -18
View File
@@ -54,20 +54,15 @@ The `probe_data` module is a core component of the Agentic Security project, res
- **Classes:**
- `PromptSelectionInterface`: Abstract base class for prompt selection strategies.
- Methods:
- `select_next_prompt(current_prompt: str, passed_guard: bool) -> str`: Selects next prompt
- `select_next_prompts(current_prompt: str, passed_guard: bool) -> list[str]`: Selects multiple prompts
- `update_rewards(previous_prompt: str, current_prompt: str, reward: float, passed_guard: bool) -> None`: Updates rewards
- `update_rewards(previous_prompt: str, current_prompt: str, reward: float, passed_guard: bool) -> null`: Updates rewards
- `RandomPromptSelector`: Basic random selection with history tracking.
- Parameters:
- `prompts: list[str]`: List of available prompts
- `history_size: int = 3`: Size of history to prevent cycles
- `CloudRLPromptSelector`: Cloud-based RL implementation with fallback.
- Parameters:
- `prompts: list[str]`: List of available prompts
- `api_url: str`: URL of RL service
@@ -75,9 +70,7 @@ The `probe_data` module is a core component of the Agentic Security project, res
- `history_size: int = 300`: Size of history
- `timeout: int = 5`: Request timeout
- `run_id: str = ""`: Unique run identifier
- `QLearningPromptSelector`: Local Q-learning implementation.
- Parameters:
- `prompts: list[str]`: List of available prompts
- `learning_rate: float = 0.1`: Learning rate
@@ -86,13 +79,11 @@ The `probe_data` module is a core component of the Agentic Security project, res
- `exploration_decay: float = 0.995`: Exploration decay rate
- `min_exploration: float = 0.01`: Minimum exploration rate
- `history_size: int = 300`: Size of history
- `Module`: Main class that uses CloudRLPromptSelector.
- Parameters:
- `prompt_groups: list[str]`: Groups of prompts
- `tools_inbox: asyncio.Queue`: Queue for tool communication
- `opts: dict = {}`: Configuration options
- **Module**: Main class that uses CloudRLPromptSelector.
- Parameters:
- `prompt_groups: list[str]`: Groups of prompts
- `tools_inbox: asyncio.Queue`: Queue for tool communication
- `opts: dict = {}`: Configuration options
## Usage Examples
@@ -119,10 +110,9 @@ from agentic_security.probe_data.modules.rl_model import QLearningPromptSelector
prompts = ["What is AI?", "Explain machine learning"]
selector = QLearningPromptSelector(prompts)
current_prompt = "What is AI?"
next_prompt = selector.select_next_prompt(current_prompt, passed_guard=True)
selector.update_rewards(current_prompt, next_prompt, reward=1.0, passed_guard=True)
next_prompt = selector.select_next_prompt(current_prompt, passed_guard=true)
selector.update_rewards(current_prompt, next_prompt, reward=1.0, passed_guard=true)
```
## Conclusion
Generated
+241 -44
View File
File diff suppressed because it is too large Load Diff
+3 -2
View File
@@ -1,6 +1,6 @@
[tool.poetry]
name = "agentic_security"
version = "0.6.0"
version = "0.7.0"
description = "Agentic LLM vulnerability scanner"
authors = ["Alexander Miasoiedov <msoedov@gmail.com>"]
maintainers = ["Alexander Miasoiedov <msoedov@gmail.com>"]
@@ -54,6 +54,7 @@ pyfiglet = "^1.0.2"
termcolor = "^2.4.0"
# garak = { version = "*", optional = true }
pytest-xdist = "3.6.1"
[tool.poetry.group.dev.dependencies]
@@ -86,7 +87,7 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
addopts = "--durations=5 -m 'not slow'"
addopts = "--durations=5 -m 'not slow' -n 3"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
markers = "slow: marks tests as slow"
@@ -3,7 +3,7 @@ from unittest.mock import patch
import pandas as pd
import pytest
from .model import RefusalClassifier
from agentic_security.refusal_classifier.model import RefusalClassifier
@pytest.fixture
View File
@@ -1,6 +1,6 @@
from fastapi.testclient import TestClient
from ..app import app
from agentic_security.app import app
def test_health_check():
@@ -5,10 +5,10 @@ import httpx
import pytest
from fastapi.testclient import TestClient
from ..app import app
from ..primitives import Probe
from ..probe_actor.refusal import REFUSAL_MARKS
from ..probe_data import REGISTRY
from agentic_security.app import app
from agentic_security.primitives import Probe
from agentic_security.probe_actor.refusal import REFUSAL_MARKS
from agentic_security.probe_data import REGISTRY
client = TestClient(app)
@@ -4,7 +4,7 @@ from unittest.mock import patch
import pytest
from fastapi.testclient import TestClient
from .report import router
from agentic_security.routes.report import router
client = TestClient(router)
@@ -4,8 +4,8 @@ import pytest
from fastapi import HTTPException
from fastapi.testclient import TestClient
from ..primitives import Settings
from .static import get_static_file, router
from agentic_security.primitives import Settings
from agentic_security.routes.static import get_static_file, router
client = TestClient(router)
@@ -8,7 +8,7 @@ import time
import pytest
import agentic_security.test_spec_assets as test_spec_assets
from agentic_security.lib import AgenticSecurity
from agentic_security.lib import SecurityScanner
def has_module(module_name):
@@ -57,7 +57,7 @@ class TestLibraryLevel:
maxBudget = 1000000
max_th = 0.3
datasets = make_test_registry()
result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th)
result = SecurityScanner.scan(llmSpec, maxBudget, datasets, max_th)
assert isinstance(result, dict)
print(result)
assert len(result) in [0, 1]
@@ -68,7 +68,7 @@ class TestLibraryLevel:
maxBudget = 1000
max_th = 0.3
datasets = make_test_registry()
result = AgenticSecurity.scan(
result = SecurityScanner.scan(
llmSpec, maxBudget, datasets, max_th, enableMultiStepAttack=True
)
assert isinstance(result, dict)
@@ -93,7 +93,7 @@ class TestLibraryLevel:
"opts": {"port": 9094},
},
]
result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th)
result = SecurityScanner.scan(llmSpec, maxBudget, datasets, max_th)
assert isinstance(result, dict)
print(result)
assert len(result) in [0, 1]
@@ -120,7 +120,7 @@ class TestLibraryLevel:
"modality": "text",
},
]
result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th)
result = SecurityScanner.scan(llmSpec, maxBudget, datasets, max_th)
assert isinstance(result, dict)
print(result)
assert len(result) in [0, 1]
@@ -148,7 +148,7 @@ class TestLibraryLevel:
"modality": "text",
},
]
result = AgenticSecurity.scan(llmSpec, maxBudget, datasets, max_th)
result = SecurityScanner.scan(llmSpec, maxBudget, datasets, max_th)
assert isinstance(result, dict)
print(result)
assert len(result) in [0, 1]
@@ -164,10 +164,10 @@ class TestEntrypointCI:
temp_path = os.path.join(tmpdir, "custom_agesec.toml")
# Override default_path to the temporary path
AgenticSecurity.default_path = temp_path
SecurityScanner.default_path = temp_path
# Generate the default configuration
security = AgenticSecurity()
security = SecurityScanner()
security.generate_default_settings()
# Check that the config file was created at the temporary path
@@ -189,17 +189,17 @@ class TestEntrypointCI:
temp_path = os.path.join(tmpdir, "custom_agesec.toml")
# Override default_path to the temporary path
AgenticSecurity.default_path = temp_path
SecurityScanner.default_path = temp_path
# Generate the default configuration
security = AgenticSecurity()
security = SecurityScanner()
security.generate_default_settings()
# Load the generated configuration
AgenticSecurity.load_config(temp_path)
SecurityScanner.load_config(temp_path)
# Validate loaded configuration
config = AgenticSecurity.config
config = SecurityScanner.config
assert (
config["general"]["maxBudget"] == 1000000
), "maxBudget should be 1000000"