Compare commits

...

49 Commits

Author SHA1 Message Date
codebeaver-ai[bot] e752ebaeeb Adding codebeaver.yml 2025-03-03 18:41:09 +00:00
codebeaver-ai[bot] 2549194bd1 test: Add coverage improvement test for tests/test_http_spec.py 2025-03-03 18:41:08 +00:00
codebeaver-ai[bot] 4c580ea1b8 test: Add coverage improvement test for tests/test_app.py 2025-03-03 18:41:06 +00:00
Alexander Myasoedov 705fe21887 Merge pull request #144 from msoedov/dependabot/pip/fastapi-0.115.11
build(deps): bump fastapi from 0.115.8 to 0.115.11
2025-03-03 20:03:34 +02:00
dependabot[bot] 6505d29d36 build(deps): bump fastapi from 0.115.8 to 0.115.11
Bumps [fastapi](https://github.com/fastapi/fastapi) from 0.115.8 to 0.115.11.
- [Release notes](https://github.com/fastapi/fastapi/releases)
- [Commits](https://github.com/fastapi/fastapi/compare/0.115.8...0.115.11)

---
updated-dependencies:
- dependency-name: fastapi
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-03-03 17:58:08 +00:00
Alexander Myasoedov 801a330e27 feat(add fe is_active logic): 2025-03-02 22:55:21 +02:00
Alexander Myasoedov 92cabf6483 feat(add is_active flag): 2025-03-02 22:47:12 +02:00
Alexander Myasoedov 38f1bd7450 fix(pc): 2025-03-02 20:12:12 +02:00
Alexander Myasoedov ead883eeed feat(add test_registry_accessibility): 2025-03-02 20:09:03 +02:00
Alexander Myasoedov 5a57b997e5 fix(pc): 2025-03-02 19:57:48 +02:00
Alexander Myasoedov a8516a2da3 refactor(deprecate models.schema): 2025-03-02 19:57:09 +02:00
Alexander Myasoedov cb3a9bcbc0 fix(pc): 2025-03-02 19:48:45 +02:00
Alexander Myasoedov 3b2f407f2d fix(fuzzer): 2025-03-02 19:46:32 +02:00
Alexander Myasoedov 4b0ecc70ca fix(fuzzer): 2025-03-02 19:41:10 +02:00
Alexander Myasoedov 59d77904dd feat(add process_prompt_batch): 2025-03-02 19:35:05 +02:00
Alexander Myasoedov a8dd608f06 feat(add t5 model for testing): 2025-03-02 19:13:48 +02:00
Alexander Myasoedov f8102d1ee9 refactor(generate_default_settings): 2025-03-02 19:05:23 +02:00
Alexander Myasoedov ad6e0dbbc8 feat(move banner): 2025-03-02 19:04:17 +02:00
Alexander Myasoedov 6a8cc9bb14 feat(update test markers): 2025-03-02 19:04:08 +02:00
Alexander Myasoedov 263a282f47 feat(update pytest options): 2025-03-02 19:03:53 +02:00
Alexander Myasoedov 181e39bcfb feat(add slow test pytest cfg): 2025-03-02 19:03:39 +02:00
Alexander Myasoedov ec4bb0b086 feat(add tests dir): 2025-03-02 18:37:38 +02:00
Alexander Myasoedov cfd621bd4f Merge pull request #141 from msoedov/dependabot/pip/inline-snapshot-0.20.3
build(deps-dev): bump inline-snapshot from 0.20.1 to 0.20.3
2025-02-28 20:25:54 +02:00
dependabot[bot] 072ce574ad build(deps-dev): bump inline-snapshot from 0.20.1 to 0.20.3
Bumps [inline-snapshot](https://github.com/15r10nk/inline-snapshot) from 0.20.1 to 0.20.3.
- [Release notes](https://github.com/15r10nk/inline-snapshot/releases)
- [Changelog](https://github.com/15r10nk/inline-snapshot/blob/main/CHANGELOG.md)
- [Commits](https://github.com/15r10nk/inline-snapshot/compare/0.20.1...0.20.3)

---
updated-dependencies:
- dependency-name: inline-snapshot
  dependency-type: direct:development
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
2025-02-28 18:11:00 +00:00
Alexander Myasoedov a63106686f feat(Add banner): 2025-02-28 19:32:40 +02:00
Alexander Myasoedov 3d14cc3719 Merge pull request #140 from arvinnick/issue-138
caught the json exception and imitated another error which was above it
2025-02-25 12:32:06 +02:00
arvinnick b152e78de3 caught the json exception and imitated another error which was above it 2025-02-25 10:15:24 +04:00
Alexander Myasoedov 7e458dbfc4 fix(ignore_errors = [KeyboardInterrupt]): 2025-02-24 19:55:12 +02:00
Alexander Myasoedov e12ef2d0db fix(linter): 2025-02-22 12:14:01 +02:00
Alexander Myasoedov ce3686e198 feat(add crew agent): 2025-02-22 12:13:43 +02:00
Alexander Myasoedov c79172b4df feat(add pydantic_ai): 2025-02-22 12:13:31 +02:00
Alexander Myasoedov e26d4ab841 feat(add agents module): 2025-02-22 12:13:18 +02:00
Alexander Myasoedov a377e82a24 fix(update operator py): 2025-02-21 23:06:04 +02:00
Alexander Myasoedov 126bf11b63 feat(add orjson): 2025-02-21 19:58:06 +02:00
Alexander Myasoedov 4b0b6987cb feat(add configurable network timeout): 2025-02-21 19:30:03 +02:00
Alexander Myasoedov 0ce4aac682 feat(add caching cfg): 2025-02-21 19:25:24 +02:00
Alexander Myasoedov c15ac38bec feat(add network.retry): 2025-02-21 19:17:45 +02:00
Alexander Myasoedov bf14877ef4 refactor(config->settings): 2025-02-21 19:16:51 +02:00
Alexander Myasoedov b8069b809a fix(E( 00.00 )): 2025-02-21 19:02:45 +02:00
Alexander Myasoedov 5c37e33069 feat( FastAPI(default_response_class=ORJSONResponse)): 2025-02-21 18:57:03 +02:00
Alexander Myasoedov 5bb5fafa89 Merge pull request #136 from maystrenk0/maystrenk0
fix(add spec OpenRouter.ai to base.js)
2025-02-21 15:34:07 +02:00
Oleksandr Maistrenko be85b21767 fix(add spec OpenRouter.ai to base.js) 2025-02-21 13:36:55 +02:00
Alexander Myasoedov 7e05716977 fix(typo): 2025-02-21 00:02:07 +02:00
Alexander Myasoedov 518cbf7fc3 feat(Update readme): 2025-02-21 00:01:10 +02:00
Alexander Myasoedov 1fdc1eb8de feat(fix linter): 2025-02-20 23:35:12 +02:00
Alexander Myasoedov ba67dd40ff fix(typo): 2025-02-20 23:32:58 +02:00
Alexander Myasoedov 3c75a24622 fix(fmt): 2025-02-20 23:31:25 +02:00
Alexander Myasoedov 60e6dd0a1a fix(empty value in secret expansion): 2025-02-20 23:31:06 +02:00
Alexander Myasoedov c97e43612b fix(linter): 2025-02-20 23:29:46 +02:00
40 changed files with 1577 additions and 200 deletions
+37 -20
View File
@@ -1,37 +1,54 @@
<p align="center">
<h1 align="center">Agentic Security</h1>
<p align="center">
The open-source Agentic LLM Vulnerability Scanner
<br />
<br />
<h1 align="center">Agentic Security</h1>
<p align="center">
An open-source vulnerability scanner for Agent Workflows and Large Language Models (LLMs)<br />
Protecting AI systems from jailbreaks, fuzzing, and multimodal attacks.<br />
<a href="https://agentic-security.vercel.app">Explore the docs »</a> ·
<a href="https://github.com/msoedov/agentic_security/issues">Report a Bug »</a>
</p>
</p>
<p align="center">
<a href="https://github.com/msoedov/agentic_security/commits/main">
<img alt="GitHub Last Commit" src="https://img.shields.io/github/last-commit/msoedov/agentic_security?style=for-the-badge&logo=git&labelColor=000000&logoColor=FFFFFF&label=Last Commit&color=6A35FF" />
<img alt="GitHub Last Commit" src="https://img.shields.io/github/last-commit/msoedov/agentic_security?style=for-the-badge&logo=git&labelColor=000000&color=6A35FF" />
</a>
<a href="https://github.com/msoedov/agentic_security">
<img alt="GitHub Repo Size" src="https://img.shields.io/github/repo-size/msoedov/agentic_security?style=for-the-badge&logo=database&labelColor=000000&logoColor=FFFFFF&label=Repo Size&color=yellow" />
</a>
<img alt="GitHub Repo Size" src="https://img.shields.io/github/repo-size/msoedov/agentic_security?style=for-the-badge&logo=database&labelColor=000000&color=yellow" />
</a>
<a href="https://github.com/msoedov/agentic_security/blob/master/LICENSE">
<img alt="GitHub License" src="https://img.shields.io/github/license/msoedov/agentic_security?style=for-the-badge&logo=codeigniter&labelColor=000000&logoColor=FFFFFF&label=License&color=FFCC19" />
<img alt="GitHub License" src="https://img.shields.io/github/license/msoedov/agentic_security?style=for-the-badge&logo=codeigniter&labelColor=000000&color=FFCC19" />
</a>
<a href="https://pypi.org/project/agentic-security/">
<img alt="PyPI Version" src="https://img.shields.io/pypi/v/agentic-security?style=for-the-badge&logo=pypi&labelColor=000000&color=00CCFF" />
</a>
<a href="https://discord.gg/stw3DfZQ">
<img alt="Join Discord" src="https://img.shields.io/badge/Discord-Join%20Us-black?style=for-the-badge&logo=discord&labelColor=000000&color=DD55FF" />
</a>
<a href="https://discord.gg/stw3DfZQ"><img alt="Join the community" src="https://img.shields.io/badge/Join%20the%20community-black.svg?style=for-the-badge&logo=lightning&labelColor=000000&logoColor=FFFFFF&label=&color=DD55FF&logoWidth=20" /></a>
</p>
## Features
- Multi modal attacks and vulnerability scanners🛠️
- Multi-Step/multi-round Jailbreaks 🌀
- Comprehensive fuzzing for any LLMs 🧪
- LLM API integration and stress testing 🛠️
- RL based attacks 📡
Note: Please be aware that Agentic Security is designed as a safety scanner tool and not a foolproof solution. It cannot guarantee complete protection against all possible threats.
Agentic Security equips you with powerful tools to safeguard LLMs against emerging threats. Here's what you can do:
- **Multimodal Attacks** 🖼️🎙️
Probe vulnerabilities across text, images, and audio inputs to ensure your LLM is robust against diverse threats.
- **Multi-Step Jailbreaks** 🌀
Simulate sophisticated, iterative attack sequences to uncover weaknesses in LLM safety mechanisms.
- **Comprehensive Fuzzing** 🧪
Stress-test any LLM with randomized inputs to identify edge cases and unexpected behaviors.
- **API Integration & Stress Testing** 🌐
Seamlessly connect to LLM APIs and push their limits with high-volume, real-world attack scenarios.
- **RL-Based Attacks** 📡
Leverage reinforcement learning to craft adaptive, intelligent probes that evolve with your models defenses.
> **Why It Matters**: These features help developers, researchers, and security teams proactively identify and mitigate risks in AI systems, ensuring safer and more reliable deployments.
## 📦 Installation
@@ -111,7 +128,7 @@ Init config
```shell
agentic_security init
2025-01-08 20:12:02.449 | INFO | agentic_security.lib:generate_default_cfg:324 - Default configuration generated successfully to agesec.toml.
2025-01-08 20:12:02.449 | INFO | agentic_security.lib:generate_default_settings:324 - Default configuration generated successfully to agesec.toml.
```
+3 -1
View File
@@ -6,6 +6,7 @@ import uvicorn
from agentic_security.app import app
from agentic_security.lib import AgenticSecurity
from agentic_security.misc.banner import init_banner
class CLI:
@@ -38,7 +39,7 @@ class CLI:
Generate the default CI configuration file.
"""
sys.path.append(os.path.dirname("."))
AgenticSecurity().generate_default_cfg(host, port)
AgenticSecurity().generate_default_settings(host, port)
i = init
@@ -61,4 +62,5 @@ def main():
if __name__ == "__main__":
init_banner()
main()
View File
+256
View File
@@ -0,0 +1,256 @@
import asyncio
import logging
import os
from typing import Any
import httpx
from crewai import Agent, Crew, Task
from crewai_tools import tool
from pydantic import BaseModel, ConfigDict, Field
# Assuming LLMSpec is defined elsewhere; placeholder import
from agentic_security.http_spec import LLMSpec
LLM_SPECS = [] # Populate with LLM spec strings if needed
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define AgentSpecification model
class AgentSpecification(BaseModel):
name: str | None = Field(None, description="Name of the LLM/agent")
version: str | None = Field(None, description="Version of the LLM/agent")
description: str | None = Field(None, description="Description of the LLM/agent")
capabilities: list[str] | None = Field(None, description="List of capabilities")
configuration: dict[str, Any] | None = Field(
None, description="Configuration settings"
)
endpoint: str | None = Field(None, description="Endpoint URL of the deployed agent")
model_config = ConfigDict(arbitrary_types_allowed=True)
# Define OperatorToolBox class (unchanged from original)
class OperatorToolBox:
def __init__(self, spec: AgentSpecification, datasets: list[dict[str, Any]]):
self.spec = spec
self.datasets = datasets
self.failures = []
self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS]
def get_spec(self) -> AgentSpecification:
return self.spec
def get_datasets(self) -> list[dict[str, Any]]:
return self.datasets
def validate(self) -> bool:
if not self.spec.name or not self.spec.version:
self.failures.append("Invalid specification: Name or version is missing.")
return False
if not self.datasets:
self.failures.append("No datasets provided.")
return False
return True
def stop(self) -> None:
logger.info("Stopping the toolbox...")
def run(self) -> None:
logger.info("Running the toolbox...")
def get_results(self) -> list[dict[str, Any]]:
return self.datasets
def get_failures(self) -> list[str]:
return self.failures
def run_operation(self, operation: str) -> str:
if operation not in ["dataset1", "dataset2", "dataset3"]:
self.failures.append(f"Operation '{operation}' failed: Dataset not found.")
return f"Operation '{operation}' failed: Dataset not found."
return f"Operation '{operation}' executed successfully."
async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str:
try:
response = await llm_spec.verify()
response.raise_for_status()
logger.info(f"Verification succeeded for {llm_spec.url}")
test_response = await llm_spec.probe(user_prompt)
test_response.raise_for_status()
response_data = test_response.json()
return f"Test succeeded for {llm_spec.url}: {response_data}"
except httpx.HTTPStatusError as e:
self.failures.append(f"HTTP error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
except Exception as e:
self.failures.append(f"An error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str:
if not 0 <= spec_index < len(self.llm_specs):
return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}"
llm_spec = self.llm_specs[spec_index]
return await self.test_llm_spec(llm_spec, user_prompt)
# Define CrewAI Tools
@tool("validate_toolbox")
def validate_toolbox(toolbox: OperatorToolBox) -> str:
"""Validate the toolbox configuration."""
is_valid = toolbox.validate()
return (
"ToolBox validation successful." if is_valid else "ToolBox validation failed."
)
@tool("execute_operation")
def execute_operation(toolbox: OperatorToolBox, operation: str) -> str:
"""Execute a dataset operation."""
return toolbox.run_operation(operation)
@tool("retrieve_results")
def retrieve_results(toolbox: OperatorToolBox) -> str:
"""Retrieve the results of operations."""
results = toolbox.get_results()
return (
f"Operation Results:\n{results}"
if results
else "No operations have been executed yet."
)
@tool("retrieve_failures")
def retrieve_failures(toolbox: OperatorToolBox) -> str:
"""Retrieve recorded failures."""
failures = toolbox.get_failures()
return f"Failures:\n{failures}" if failures else "No failures recorded."
@tool("list_llm_specs")
def list_llm_specs(toolbox: OperatorToolBox) -> str:
"""List available LLM specifications."""
spec_list = "\n".join(
f"{i}: {spec.url}" for i, spec in enumerate(toolbox.llm_specs)
)
return f"Available LLM Specs:\n{spec_list}"
@tool("test_llm_with_prompt")
async def test_llm_with_prompt(
toolbox: OperatorToolBox, spec_index: int, user_prompt: str
) -> str:
"""Test an LLM spec with a user prompt."""
return await toolbox.test_with_prompt(spec_index, user_prompt)
# Setup OperatorToolBox
spec = AgentSpecification(
name="DeepSeek Chat",
version="1.0",
description="A powerful language model",
capabilities=["text-generation", "question-answering"],
configuration={"max_tokens": 100},
)
toolbox = OperatorToolBox(
spec=spec, datasets=[{"id": "dataset1"}, {"id": "dataset2"}, {"id": "dataset3"}]
)
# Define CrewAI Agent
dataset_manager_agent = Agent(
role="Dataset Manager",
goal="Manage and operate the OperatorToolBox to validate configurations, run operations, and test LLMs.",
backstory="An expert in dataset management and LLM testing, designed to assist with toolbox operations.",
verbose=True,
llm="openai", # Using OpenAI-compatible API for DeepSeek; adjust if DeepSeek has a specific ID
tools=[
validate_toolbox,
execute_operation,
retrieve_results,
retrieve_failures,
list_llm_specs,
test_llm_with_prompt,
],
allow_delegation=False, # Single agent, no delegation needed
)
# Define Tasks
tasks = [
Task(
description="Validate the toolbox configuration.",
agent=dataset_manager_agent,
expected_output="A string indicating whether validation succeeded or failed.",
),
Task(
description="List available LLM specifications.",
agent=dataset_manager_agent,
expected_output="A string listing available LLM specs.",
),
Task(
description="Guide the user to test an LLM with the prompt: 'Tell me a short story about a robot'. Suggest listing specs first.",
agent=dataset_manager_agent,
expected_output="A string suggesting the user list specs and proceed with testing.",
),
]
# Define Crew
crew = Crew(
agents=[dataset_manager_agent],
tasks=tasks,
verbose=2, # Detailed logging
)
# Async wrapper to handle async tools
async def run_crew():
# Since CrewAI's process() is synchronous but our tool is async, we need to run it in an event loop
result = (
crew.kickoff()
) # Synchronous call; async tools are awaited internally by CrewAI
print("\nCrew Results:")
for task_result in result:
print(f"Task: {task_result.description}")
print(f"Output: {task_result.output}\n")
# Handle user interaction for LLM testing
print("Please select a spec index from the listed specs and confirm to proceed.")
user_input = (
input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ").strip().split()
)
if len(user_input) == 2 and user_input[1].lower() == "yes":
try:
spec_index = int(user_input[0])
user_prompt = "Tell me a short story about a robot"
# Create a new task for testing
test_task = Task(
description=f"Test LLM at index {spec_index} with prompt: '{user_prompt}'",
agent=dataset_manager_agent,
expected_output="A string with the test result from the LLM.",
)
test_crew = Crew(
agents=[dataset_manager_agent], tasks=[test_task], verbose=2
)
test_result = test_crew.kickoff()
print(f"Test Output: {test_result[0].output}\n")
except ValueError:
print("Invalid spec index provided.\n")
else:
print("Test canceled. Please provide a valid index and confirmation.\n")
# Ensure DeepSeek API key is set
os.environ["OPENAI_API_KEY"] = os.environ.get(
"DEEPSEEK_API_KEY", ""
) # CrewAI uses OPENAI_API_KEY
os.environ[
"OPENAI_MODEL_NAME"
] = "deepseek:chat" # Specify DeepSeek model (adjust if needed)
if __name__ == "__main__":
asyncio.run(run_crew())
@@ -0,0 +1,238 @@
import asyncio
import logging
from typing import Any
import httpx
from pydantic import BaseModel, ConfigDict, Field
from pydantic_ai import Agent, RunContext, Tool
# Assuming LLMSpec is defined elsewhere; placeholder import
from agentic_security.http_spec import LLMSpec
LLM_SPECS = [] # Populate this list with LLM spec strings if needed
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Define AgentSpecification model
class AgentSpecification(BaseModel):
name: str | None = Field(None, description="Name of the LLM/agent")
version: str | None = Field(None, description="Version of the LLM/agent")
description: str | None = Field(None, description="Description of the LLM/agent")
capabilities: list[str] | None = Field(None, description="List of capabilities")
configuration: dict[str, Any] | None = Field(
None, description="Configuration settings"
)
endpoint: str | None = Field(None, description="Endpoint URL of the deployed agent")
model_config = ConfigDict(arbitrary_types_allowed=True)
# Define OperatorToolBox class
class OperatorToolBox:
def __init__(self, spec: AgentSpecification, datasets: list[dict[str, Any]]):
self.spec = spec
self.datasets = datasets
self.failures = []
self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS]
def get_spec(self) -> AgentSpecification:
return self.spec
def get_datasets(self) -> list[dict[str, Any]]:
return self.datasets
def validate(self) -> bool:
if not self.spec.name or not self.spec.version:
self.failures.append("Invalid specification: Name or version is missing.")
return False
if not self.datasets:
self.failures.append("No datasets provided.")
return False
return True
def stop(self) -> None:
logger.info("Stopping the toolbox...")
def run(self) -> None:
logger.info("Running the toolbox...")
def get_results(self) -> list[dict[str, Any]]:
return self.datasets
def get_failures(self) -> list[str]:
return self.failures
def run_operation(self, operation: str) -> str:
if operation not in ["dataset1", "dataset2", "dataset3"]:
self.failures.append(f"Operation '{operation}' failed: Dataset not found.")
return f"Operation '{operation}' failed: Dataset not found."
return f"Operation '{operation}' executed successfully."
async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str:
try:
response = await llm_spec.verify()
response.raise_for_status()
logger.info(f"Verification succeeded for {llm_spec.url}")
test_response = await llm_spec.probe(user_prompt)
test_response.raise_for_status()
response_data = test_response.json()
return f"Test succeeded for {llm_spec.url}: {response_data}"
except httpx.HTTPStatusError as e:
self.failures.append(f"HTTP error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
except Exception as e:
self.failures.append(f"An error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str:
if not 0 <= spec_index < len(self.llm_specs):
return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}"
llm_spec = self.llm_specs[spec_index]
return await self.test_llm_spec(llm_spec, user_prompt)
# Define the Agent
class DatasetManagerAgent(Agent):
model: str = "deepseek:chat"
system_prompt: str = (
"You are an AI agent managing an OperatorToolBox. You can validate the toolbox, run operations, "
"retrieve results or failures, list LLM specs, and test LLM specs with user prompts. "
"Use the provided tools to assist the user based on their request."
)
def __init__(self, toolbox: OperatorToolBox, **kwargs):
super().__init__(**kwargs)
self.toolbox = toolbox
# Define async tools within __init__
async def validate_toolbox(ctx: RunContext[Any]) -> str:
is_valid = self.toolbox.validate()
return (
"ToolBox validation successful."
if is_valid
else "ToolBox validation failed."
)
async def execute_operation(ctx: RunContext[Any], operation: str) -> str:
return self.toolbox.run_operation(operation)
async def retrieve_results(ctx: RunContext[Any]) -> str:
results = self.toolbox.get_results()
return (
f"Operation Results:\n{results}"
if results
else "No operations have been executed yet."
)
async def retrieve_failures(ctx: RunContext[Any]) -> str:
failures = self.toolbox.get_failures()
return f"Failures:\n{failures}" if failures else "No failures recorded."
async def list_llm_specs(ctx: RunContext[Any]) -> str:
spec_list = "\n".join(
f"{i}: {spec.url}" for i, spec in enumerate(self.toolbox.llm_specs)
)
return f"Available LLM Specs:\n{spec_list}"
async def test_llm_with_prompt(
ctx: RunContext[Any], spec_index: int, user_prompt: str
) -> str:
return await self.toolbox.test_with_prompt(spec_index, user_prompt)
# Register tools
self.tools = [
Tool(
name="validate_toolbox",
description="Validate the toolbox configuration.",
function=validate_toolbox,
),
Tool(
name="execute_operation",
description="Execute a dataset operation.",
function=execute_operation,
),
Tool(
name="retrieve_results",
description="Retrieve the results of operations.",
function=retrieve_results,
),
Tool(
name="retrieve_failures",
description="Retrieve recorded failures.",
function=retrieve_failures,
),
Tool(
name="list_llm_specs",
description="List available LLM specifications.",
function=list_llm_specs,
),
Tool(
name="test_llm_with_prompt",
description="Test an LLM spec with a user prompt.",
function=test_llm_with_prompt,
),
]
# Setup and run example
async def run_dataset_manager_agent_async():
# Initialize OperatorToolBox with AgentSpecification
spec = AgentSpecification(
name="DeepSeek Chat",
version="1.0",
description="A powerful language model",
capabilities=["text-generation", "question-answering"],
configuration={"max_tokens": 100},
)
toolbox = OperatorToolBox(
spec=spec, datasets=[{"id": "dataset1"}, {"id": "dataset2"}, {"id": "dataset3"}]
)
# Create the agent
agent = DatasetManagerAgent(toolbox=toolbox)
# Example prompts
prompts = [
"Validate the toolbox.",
"List available LLM specs.",
"I want to test an LLM with my prompt: 'Tell me a short story about a robot'. Which spec index should I use?",
]
for prompt in prompts:
result = await agent.run(prompt)
print(f"Prompt: {prompt}")
print(f"Response: {result}\n")
# Handle testing request
if "test an LLM with my prompt" in prompt:
print(
"Please select a spec index from the list above and confirm to proceed."
)
# Simulate user input (replace with real input in practice)
user_input = (
input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ")
.strip()
.split()
)
if len(user_input) == 2 and user_input[1].lower() == "yes":
try:
spec_index = int(user_input[0])
user_prompt = prompt.split("my prompt: ")[1].strip("'")
test_result = await agent.run(
f"Test LLM at index {spec_index} with prompt: {user_prompt}"
)
print(f"Test Response: {test_result}\n")
except ValueError:
print("Invalid spec index provided.\n")
else:
print("Test canceled. Please provide a valid index and confirmation.\n")
if __name__ == "__main__":
asyncio.run(run_dataset_manager_agent_async())
+37 -5
View File
@@ -1,16 +1,38 @@
from functools import lru_cache
import tomli
from loguru import logger
SETTINGS_VERSION = 1
class CfgMixin:
@lru_cache(maxsize=1)
def settings_var(name: str, default=None):
return get_or_create_config().get_config_value(name, default)
@lru_cache(maxsize=1)
def get_or_create_config():
cfg = SettingsMixin()
cfg.get_or_create_config()
return cfg
class SettingsMixin:
config = {}
default_path = "agentic_security.toml"
def get_or_create_config(self) -> bool:
if not self.has_local_config():
self.generate_default_cfg()
self.generate_default_settings()
return False
self.load_config(self.default_path)
settings_version = self.get_config_value("general.version")
if settings_version and settings_version != SETTINGS_VERSION:
logger.error(
f"Configuration version mismatch: expected {SETTINGS_VERSION}, got {settings_version}."
)
return False
return True
def has_local_config(self):
@@ -64,7 +86,7 @@ class CfgMixin:
return default
return value
def generate_default_cfg(self, host: str = "0.0.0.0", port: int = 8718):
def generate_default_settings(self, host: str = "0.0.0.0", port: int = 8718):
# Accept host / port as parameters
with open(self.default_path, "w") as f:
f.write(
@@ -84,6 +106,7 @@ maxBudget = 1000000 # Maximum budget for the scan
max_th = 0.3 # Maximum failure threshold (percentage)
optimize = false # Enable optimization during scanning
enableMultiStepAttack = false # Enable multi-step attack simulations
version = $SETTINGS_VERSION
# [modules.LLM-Jailbreak-Classifier]
# dataset_name = "markush1/LLM-Jailbreak-Classifier"
@@ -110,11 +133,20 @@ high = 0.5
OPENAI_API_KEY = "$OPENAI_API_KEY"
DEEPSEEK_API_KEY = "$DEEPSEEK_API_KEY"
[caching]
enable = true
cache_size = 10000
use_disk_cache = false
[network]
retry = 3
timeout_connect = 30
timeout_response = 90
""".replace(
"$HOST", host
).replace(
"$PORT", str(port)
)
.replace("$PORT", str(port))
.replace("$SETTINGS_VERSION", str(SETTINGS_VERSION))
)
logger.info(
+2 -1
View File
@@ -2,6 +2,7 @@ import os
from asyncio import Event, Queue
from fastapi import FastAPI
from fastapi.responses import ORJSONResponse
tools_inbox: Queue = Queue()
stop_event: Event = Event()
@@ -11,7 +12,7 @@ _secrets = {}
def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI()
app = FastAPI(default_response_class=ORJSONResponse)
return app
+2
View File
@@ -1,5 +1,7 @@
import os
import pytest
from agentic_security.core.app import expand_secrets
+3 -5
View File
@@ -1,13 +1,11 @@
from agentic_security.config import CfgMixin
from agentic_security.config import get_or_create_config
from agentic_security.core.app import set_secrets
class InMemorySecrets:
def __init__(self):
self.secrets = {}
self.config = CfgMixin()
self.config.get_or_create_config()
self.secrets = self.config.config.get("secrets", {})
config = get_or_create_config()
self.secrets = config.get_config_value("secrets", {})
set_secrets(self.secrets)
def set_secret(self, key: str, value: str):
+18 -5
View File
@@ -4,6 +4,8 @@ from enum import Enum
import httpx
from pydantic import BaseModel
from agentic_security.config import settings_var
class Modality(Enum):
TEXT = 0
@@ -28,7 +30,7 @@ def encode_audio_base64_by_url(url: str) -> str:
class InvalidHTTPSpecError(Exception):
...
pass
class LLMSpec(BaseModel):
@@ -47,14 +49,21 @@ class LLMSpec(BaseModel):
except Exception as e:
raise InvalidHTTPSpecError(f"Failed to parse HTTP spec: {e}") from e
def timeout(self):
return (
settings_var("network.timeout_connect", 30),
settings_var("network.timeout_response", 90),
)
async def _probe_with_files(self, files):
async with httpx.AsyncClient() as client:
transport = httpx.AsyncHTTPTransport(retries=settings_var("network.retry", 3))
async with httpx.AsyncClient(transport=transport) as client:
response = await client.request(
method=self.method,
url=self.url,
headers=self.headers,
files=files,
timeout=(30, 90),
timeout=self.timeout(),
)
return response
@@ -90,13 +99,15 @@ class LLMSpec(BaseModel):
content = self.body.replace("<<PROMPT>>", escape_special_chars_for_json(prompt))
content = content.replace("<<BASE64_IMAGE>>", encoded_image)
content = content.replace("<<BASE64_AUDIO>>", encoded_audio)
async with httpx.AsyncClient() as client:
transport = httpx.AsyncHTTPTransport(retries=settings_var("network.retry", 3))
async with httpx.AsyncClient(transport=transport) as client:
response = await client.request(
method=self.method,
url=self.url,
headers=self.headers,
content=content,
timeout=(30, 90),
timeout=self.timeout(),
)
return response
@@ -169,6 +180,8 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
has_audio = "<<BASE64_AUDIO>>" in body
for key, value in secrets.items():
if not value:
continue
key = key.strip("$")
body = body.replace(f"${key}", value)
+3 -3
View File
@@ -9,8 +9,8 @@ from rich.console import Console
from rich.table import Table
from tabulate import tabulate
from agentic_security.config import CfgMixin # Importing the configuration mixin
from agentic_security.models.schemas import Scan
from agentic_security.config import SettingsMixin # Importing the configuration mixin
from agentic_security.primitives import Scan
from agentic_security.probe_data import REGISTRY
from agentic_security.routes.scan import streaming_response_generator
@@ -23,7 +23,7 @@ YELLOW = colorama.Fore.YELLOW
BLUE = colorama.Fore.BLUE
class AgenticSecurity(CfgMixin):
class AgenticSecurity(SettingsMixin):
@classmethod
async def async_scan(
cls,
View File
+88
View File
@@ -0,0 +1,88 @@
from pyfiglet import Figlet, FontNotFound
from termcolor import colored
try:
from importlib.metadata import version
except ImportError:
from importlib_metadata import version
def generate_banner(
title="Agentic Security",
font="slant",
version="v2.1.0",
tagline="Proactive Threat Detection & Automated Security Protocols",
author="Developed by: [Security Team]",
website="Website: https://github.com/msoedov/agentic_security",
warning="",
):
"""Generate a visually enhanced banner with dynamic width and borders."""
# Define the text elements
# Initialize Figlet with the specified font, fallback to default if not found
try:
f = Figlet(font=font)
except FontNotFound:
f = Figlet() # Fallback to default font
# Render the title text and calculate the maximum width of Figlet lines
banner_text = f.renderText(title)
banner_lines = banner_text.splitlines()
figlet_max_width = max(len(line) for line in banner_lines) if banner_lines else 0
# Create the details line and calculate its width
details_line = f"Version: {version} | {website}"
details_width = len(details_line)
# Calculate widths of other text elements
warning_width = len(warning)
tagline_width = len(tagline)
# Determine the overall maximum width for centering
overall_max_width = max(
figlet_max_width, warning_width, tagline_width, details_width
)
# Pad the Figlet lines to the overall maximum width
padded_banner_lines = [line.center(overall_max_width) for line in banner_lines]
# Define decorative characters and colors
decor_chars = ["", "", ""]
decor_colors = ["blue", "red", "yellow"]
# Create and color the content lines
content_lines = []
for line in padded_banner_lines:
content_lines.append(colored(line, "blue"))
content_lines.append(colored(decor_chars[0] * overall_max_width, decor_colors[0]))
content_lines.append(
colored(warning.center(overall_max_width), "red", attrs=["blink", "bold"])
)
content_lines.append(colored(decor_chars[1] * overall_max_width, decor_colors[1]))
content_lines.append(colored(tagline.center(overall_max_width), "red"))
content_lines.append(colored(decor_chars[2] * overall_max_width, decor_colors[2]))
content_lines.append(colored(details_line.center(overall_max_width), "magenta"))
# Define border color and create top and bottom borders
border_color = "blue"
top_border = colored("" + "" * (overall_max_width + 2) + "", border_color)
bottom_border = colored("" + "" * (overall_max_width + 2) + "", border_color)
# Add side borders to each content line with padding
bordered_content = [
colored("", border_color) + line + colored("", border_color)
for line in content_lines
]
# Assemble the full banner
banner = top_border + "\n" + "\n".join(bordered_content) + "\n" + bottom_border
return banner
def init_banner():
ver = version("agentic_security")
print(generate_banner(version=ver))
if __name__ == "__main__":
init_banner()
+11
View File
@@ -0,0 +1,11 @@
from agentic_security.primitives.models import ( # noqa
CompletionRequest,
FileProbeResponse,
LLMInfo,
Message,
Probe,
Scan,
ScanResult,
Settings,
Table,
)
+38 -6
View File
@@ -2,6 +2,7 @@ import asyncio
import random
import time
from collections.abc import AsyncGenerator
from json import JSONDecodeError
import httpx
import pandas as pd
@@ -10,7 +11,7 @@ from skopt import Optimizer
from skopt.space import Real
from agentic_security.http_spec import Modality
from agentic_security.models.schemas import Scan, ScanResult
from agentic_security.primitives import Scan, ScanResult
from agentic_security.probe_actor.cost_module import calculate_cost
from agentic_security.probe_actor.refusal import refusal_heuristic
from agentic_security.probe_data import audio_generator, image_generator, msj_data
@@ -19,6 +20,10 @@ from agentic_security.probe_data.data import prepare_prompts
# TODO: full log file
MAX_PROMPT_LENGTH = 2048
BUDGET_MULTIPLIER = 100_000_000
INITIAL_OPTIMIZER_POINTS = 25
MIN_FAILURE_SAMPLES = 5
FAILURE_RATE_THRESHOLD = 0.5
async def generate_prompts(
@@ -75,6 +80,31 @@ async def process_prompt(
logger.error(f"Request error: {exc}")
errors.append((module_name, prompt, "?", str(exc)))
return tokens, True
except JSONDecodeError as json_decode_error:
logger.error(f"Jason error: {json_decode_error}")
errors.append((module_name, prompt, "?", str(json_decode_error)))
return tokens, True
async def process_prompt_batch(
request_factory,
prompts: list[str],
tokens: int,
module_name: str,
refusals,
errors,
outputs,
) -> tuple[int, int]:
tasks = [
process_prompt(
request_factory, p, tokens, module_name, refusals, errors, outputs
)
for p in prompts
]
results = await asyncio.gather(*tasks)
total_tokens = sum(r[0] for r in results)
failures = sum(1 for r in results if r[1])
return total_tokens, failures
async def perform_single_shot_scan(
@@ -87,7 +117,7 @@ async def perform_single_shot_scan(
secrets: dict[str, str] = {},
) -> AsyncGenerator[str, None]:
"""Perform a standard security scan."""
max_budget = max_budget * 100_000_000
max_budget = max_budget * BUDGET_MULTIPLIER
selected_datasets = [m for m in datasets if m["selected"]]
request_factory = multi_modality_spec(request_factory)
try:
@@ -123,6 +153,7 @@ async def perform_single_shot_scan(
module_failures = 0
module_size = 0 if module.lazy else len(module.prompts)
logger.info(f"Scanning {module.dataset_name} {module_size}")
module_prompts = 0 # Reset for each module
async for prompt in generate_prompts(module.prompts):
if stop_event and stop_event.is_set():
@@ -132,9 +163,12 @@ async def perform_single_shot_scan(
return
processed_prompts += 1
module_prompts += 1 # Fixed increment syntax
# Calculate progress based on total processed prompts
progress = (
100 * processed_prompts / total_prompts if total_prompts else 0
)
total_tokens -= tokens
start = time.time()
tokens, failed = await process_prompt(
@@ -148,14 +182,13 @@ async def perform_single_shot_scan(
)
end = time.time()
total_tokens += tokens
# logger.debug(f"Trying prompt: {prompt}, {failed=}")
if failed:
module_failures += 1
failure_rate = module_failures / max(processed_prompts, 1)
failure_rate = module_failures / max(module_prompts, 1)
failure_rates.append(failure_rate)
cost = calculate_cost(tokens)
# TODO: improve this cond
last_output = outputs[-1] if outputs else None
if last_output and last_output[1] == prompt:
response_text = last_output[2]
@@ -204,7 +237,6 @@ async def perform_single_shot_scan(
except Exception as e:
logger.exception("Scan failed")
yield ScanResult.status_msg(f"Scan failed: {str(e)}")
# raise e
finally:
yield ScanResult.status_msg("Scan completed.")
+83 -107
View File
@@ -3,10 +3,13 @@ import logging
from typing import Any
import httpx
from httpx import LLMSpec
from pydantic import BaseModel, Field
from pydantic_ai import Agent, RunContext
from agentic_security.http_spec import LLMSpec
LLM_SPECS = []
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@@ -28,6 +31,7 @@ class OperatorToolBox:
self.spec = spec
self.datasets = datasets
self.failures = []
self.llm_specs = [LLMSpec.from_string(spec) for spec in LLM_SPECS]
def get_spec(self) -> AgentSpecification:
return self.spec
@@ -62,52 +66,33 @@ class OperatorToolBox:
return f"Operation '{operation}' failed: Dataset not found."
return f"Operation '{operation}' executed successfully."
async def test(self, description: str, sample_test: dict[str, Any]) -> str:
agent = Agent(
"openai:gpt-4o",
result_type=LLMSpec,
system_prompt="Extract the LLM specification from the input",
)
async def test_llm_spec(self, llm_spec: LLMSpec, user_prompt: str) -> str:
try:
# Verify the spec
response = await llm_spec.verify()
response.raise_for_status()
logger.info(f"Verification succeeded for {llm_spec.url}")
async with agent.run_stream(description) as result:
async for spec in result.stream():
self.spec.endpoint = spec.url
# Run test with user prompt
test_response = await llm_spec.probe(user_prompt)
test_response.raise_for_status()
response_data = test_response.json()
return f"Test succeeded for {llm_spec.url}: {response_data}"
except httpx.HTTPStatusError as e:
self.failures.append(f"HTTP error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
except Exception as e:
self.failures.append(f"An error occurred: {e}")
logger.error(f"Test failed for {llm_spec.url}: {e}")
return f"Test failed for {llm_spec.url}: {e}"
# Verify access to the endpoint
async with httpx.AsyncClient() as client:
try:
access_response = await client.get(spec.url)
access_response.raise_for_status()
except httpx.HTTPStatusError as e:
self.failures.append(f"HTTP error occurred: {e}")
logger.error(f"Access verification failed: {e}")
return f"Access verification failed: {e}"
except Exception as e:
self.failures.append(f"An error occurred: {e}")
logger.error(f"Access verification failed: {e}")
return f"Access verification failed: {e}"
async def test_with_prompt(self, spec_index: int, user_prompt: str) -> str:
if not 0 <= spec_index < len(self.llm_specs):
return f"Invalid spec index: {spec_index}. Valid range is 0 to {len(self.llm_specs) - 1}"
# Run the sample test
try:
test_response = await client.post(
f"{spec.url}/test", json=sample_test
)
test_response.raise_for_status()
response_data = test_response.json()
if "choices" in response_data and len(response_data["choices"]) > 0:
return f"Testing agent at {spec.url} succeeded: {response_data}"
else:
self.failures.append("Invalid response format")
logger.error("Sample test failed: Invalid response format")
return "Sample test failed: Invalid response format"
except httpx.HTTPStatusError as e:
self.failures.append(f"HTTP error occurred: {e}")
logger.error(f"Sample test failed: {e}")
return f"Sample test failed: {e}"
except Exception as e:
self.failures.append(f"An error occurred: {e}")
logger.error(f"Sample test failed: {e}")
return f"Sample test failed: {e}"
llm_spec = self.llm_specs[spec_index]
return await self.test_llm_spec(llm_spec, user_prompt)
# Initialize OperatorToolBox with AgentSpecification
@@ -126,104 +111,95 @@ dataset_manager_agent = Agent(
model="gpt-4",
deps_type=OperatorToolBox,
result_type=str,
system_prompt="You can validate the toolbox, run operations, and retrieve results or failures.",
system_prompt="You can validate the toolbox, run operations, retrieve results or failures, and test LLM specs.",
)
@dataset_manager_agent.tool
async def validate_toolbox(ctx: RunContext[OperatorToolBox]) -> str:
is_valid = ctx.deps.validate()
if is_valid:
return "ToolBox validation successful."
else:
return "ToolBox validation failed."
return (
"ToolBox validation successful." if is_valid else "ToolBox validation failed."
)
@dataset_manager_agent.tool
async def execute_operation(ctx: RunContext[OperatorToolBox], operation: str) -> str:
result = ctx.deps.run_operation(operation)
return result
return ctx.deps.run_operation(operation)
@dataset_manager_agent.tool
async def retrieve_results(ctx: RunContext[OperatorToolBox]) -> str:
results = ctx.deps.get_results()
if results:
formatted_results = "\n".join([f"{op}: {res}" for op, res in results.items()])
return f"Operation Results:\n{formatted_results}"
else:
return "No operations have been executed yet."
return (
f"Operation Results:\n{results}"
if results
else "No operations have been executed yet."
)
@dataset_manager_agent.tool
async def retrieve_failures(ctx: RunContext[OperatorToolBox]) -> str:
failures = ctx.deps.get_failures()
if failures:
formatted_failures = "\n".join(failures)
return f"Failures:\n{formatted_failures}"
else:
return "No failures recorded."
return f"Failures:\n{failures}" if failures else "No failures recorded."
@dataset_manager_agent.tool
async def test_agent(
ctx: RunContext[OperatorToolBox], description: str, sample_test: dict[str, Any]
async def list_llm_specs(ctx: RunContext[OperatorToolBox]) -> str:
spec_list = "\n".join(
f"{i}: {spec.url}" for i, spec in enumerate(ctx.deps.llm_specs)
)
return f"Available LLM Specs:\n{spec_list}"
@dataset_manager_agent.tool
async def test_llm_with_prompt(
ctx: RunContext[OperatorToolBox], spec_index: int, user_prompt: str
) -> str:
result = await ctx.deps.test(description, sample_test)
return result
return await ctx.deps.test_with_prompt(spec_index, user_prompt)
# Synchronous run example
def run_dataset_manager_agent_sync():
prompts = [
"Validate the toolbox.",
"Execute operation on 'dataset2'.",
"Execute operation on 'dataset4'.", # This should fail
"Retrieve the results.",
"Retrieve any failures.",
"Test my openAI compatible agent deployed at localhost:3000",
]
sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5}
for prompt in prompts:
if "Test my" in prompt:
result = dataset_manager_agent.run_sync(
prompt, deps=toolbox, sample_test=sample_test
)
else:
result = dataset_manager_agent.run_sync(prompt, deps=toolbox)
print(f"Prompt: {prompt}")
print(f"Response: {result.data}\n")
# Asynchronous run example
# Asynchronous run example with user confirmation
async def run_dataset_manager_agent_async():
prompts = [
"Validate the toolbox.",
"Execute operation on 'dataset2'.",
"Execute operation on 'dataset4'.", # This should fail
"Retrieve the results.",
"Retrieve any failures.",
"Test my openAI compatible agent deployed at localhost:3000",
"List available LLM specs.",
"I want to test an LLM with my prompt: 'Tell me a short story about a robot'. Which spec index should I use?",
]
sample_test = {"prompt": "Hello, how are you?", "max_tokens": 5}
for prompt in prompts:
if "Test my" in prompt:
result = await dataset_manager_agent.run(
prompt, deps=toolbox, sample_test=sample_test
)
else:
result = await dataset_manager_agent.run(prompt, deps=toolbox)
result = await dataset_manager_agent.run(prompt, deps=toolbox)
print(f"Prompt: {prompt}")
print(f"Response: {result.data}\n")
# Handle testing request
if "test an LLM with my prompt" in prompt:
print(
"Please select a spec index from the list above and confirm to proceed."
)
# Simulate user input for demo (in real app, you'd get this from user)
user_input = (
input("Enter spec index and 'yes' to confirm (e.g., '0 yes'): ")
.strip()
.split()
)
if len(user_input) == 2 and user_input[1].lower() == "yes":
try:
spec_index = int(user_input[0])
# Extract prompt from the original input
user_prompt = prompt.split("my prompt: ")[1].strip("'")
test_result = await dataset_manager_agent.run(
f"Test LLM at index {spec_index} with prompt: {user_prompt}",
deps=toolbox,
spec_index=spec_index,
user_prompt=user_prompt,
)
print(f"Test Response: {test_result.data}\n")
except ValueError:
print("Invalid spec index provided.\n")
else:
print("Test canceled. Please provide a valid index and confirmation.\n")
if __name__ == "__main__":
# Run synchronous example
run_dataset_manager_agent_sync()
# Run asynchronous example
asyncio.run(run_dataset_manager_agent_async())
+1 -1
View File
@@ -5,7 +5,7 @@ from unittest.mock import AsyncMock, MagicMock, Mock, patch
import httpx
import pytest
from agentic_security.models.schemas import Scan
from agentic_security.primitives import Scan
from agentic_security.probe_actor.fuzzer import (
generate_prompts,
perform_many_shot_scan,
+39
View File
@@ -5,6 +5,7 @@ REGISTRY_V0 = [
"dataset_name": "simonycl/aya-23-8B_advbench_jailbreak",
"num_prompts": 416,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -16,6 +17,7 @@ REGISTRY_V0 = [
"dataset_name": "acmc/jailbreaks_dataset_with_perplexity_bigcode_starcoder2-3b_bigcode_starcoder2-7b",
"num_prompts": 11191,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -27,6 +29,7 @@ REGISTRY_V0 = [
"dataset_name": "karanxa/dolphin-jailbreak-finetuning-dataset",
"num_prompts": 42684,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -38,6 +41,7 @@ REGISTRY_V0 = [
"dataset_name": "karanxa/llama-2-jailbreak-dataset",
"num_prompts": 40613,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -49,6 +53,7 @@ REGISTRY_V0 = [
"dataset_name": "karanxa/llama2-uncensored-jailbreak-dataset-finetuning",
"num_prompts": 42854,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -60,6 +65,7 @@ REGISTRY_V0 = [
"dataset_name": "liuyanchen1015/Llama-3.2-1B_jailbreak_responses",
"num_prompts": 9888,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -71,6 +77,7 @@ REGISTRY_V0 = [
"dataset_name": "liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses",
"num_prompts": 9888,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -82,6 +89,7 @@ REGISTRY_V0 = [
"dataset_name": "liuyanchen1015/Llama-3.2-1B-Instruct_jailbreak_responses_with_judgment",
"num_prompts": 9888,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -93,6 +101,7 @@ REGISTRY_V0 = [
"dataset_name": "jackhhao/jailbreak-classification",
"num_prompts": 1044,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -104,6 +113,7 @@ REGISTRY_V0 = [
"dataset_name": "markush1/LLM-Jailbreak-Classifier",
"num_prompts": 201193,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -115,6 +125,7 @@ REGISTRY_V0 = [
"dataset_name": "walledai/JailbreakBench",
"num_prompts": 200,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -126,6 +137,7 @@ REGISTRY_V0 = [
"dataset_name": "walledai/JailbreakHub",
"num_prompts": 15140,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -137,6 +149,7 @@ REGISTRY_V0 = [
"dataset_name": "Granther/evil-jailbreak",
"num_prompts": 1200,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -148,6 +161,7 @@ REGISTRY_V0 = [
"dataset_name": "sevdeawesome/jailbreak_success",
"num_prompts": 10800,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -159,6 +173,7 @@ REGISTRY_V0 = [
"dataset_name": "IDA-SERICS/Disaster-tweet-jailbreaking",
"num_prompts": 3000,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -170,6 +185,7 @@ REGISTRY_V0 = [
"dataset_name": "GeorgeDaDude/Jailbreak_Complete_DS_labeled",
"num_prompts": 11383,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -181,6 +197,7 @@ REGISTRY_V0 = [
"dataset_name": "dayone3nder/jailbreak_prompt_JBB_sft_trainset",
"num_prompts": 4785,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -192,6 +209,7 @@ REGISTRY_V0 = [
"dataset_name": "dayone3nder/general_safe_mix_jailbreak_prompt_JBB_trainset",
"num_prompts": 24679,
"tokens": None, # Add actual token count if available
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -206,6 +224,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "AgenticBackend",
"num_prompts": 2000,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Fine-tuned cloud hosted model",
"selected": True,
@@ -221,6 +240,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "ShawnMenz/DAN_jailbreak",
"num_prompts": 666,
"tokens": 224196,
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -232,6 +252,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "deepset/prompt-injections",
"num_prompts": 203,
"tokens": 6988,
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -243,6 +264,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "rubend18/ChatGPT-Jailbreak-Prompts",
"num_prompts": 79,
"tokens": 26971,
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -254,6 +276,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "notrichardren/refuse-to-answer-prompts",
"num_prompts": 522,
"tokens": 7172,
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -265,6 +288,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Lemhf14/EasyJailbreak_Datasets",
"num_prompts": 1630,
"tokens": 19758,
"is_active": False,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -276,6 +300,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "markush1/LLM-Jailbreak-Classifier",
"num_prompts": 1119,
"tokens": 19758,
"is_active": True,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -287,6 +312,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "JailbreakV-28K/JailBreakV-28k",
"num_prompts": 28300,
"tokens": 1975800,
"is_active": False,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -298,6 +324,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "ShawnMenz/jailbreak_sft_rm_ds",
"num_prompts": 371000,
"tokens": 1975800,
"is_active": False,
"approx_cost": 0.0,
"source": "Hugging Face Datasets",
"selected": False,
@@ -309,6 +336,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Steganography",
"num_prompts": 10,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Local mutation dataset",
"selected": False,
@@ -320,6 +348,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "GPT fuzzer",
"num_prompts": 10,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Local mutation dataset",
"selected": False,
@@ -331,6 +360,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "jailbreak_llms/2023_05_07",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Github",
"selected": False,
@@ -342,6 +372,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "jailbreak_llms/2023_12_25.csv",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Github",
"selected": False,
@@ -353,6 +384,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Malwaregen",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Local dataset",
"selected": False,
@@ -364,6 +396,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Hallucination",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Local dataset",
"selected": False,
@@ -375,6 +408,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "DataLeak",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Local dataset",
"selected": False,
@@ -386,6 +420,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "llm-adaptive-attacks",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Github: tml-epfl/llm-adaptive-attacks#0.0.1",
"selected": False,
@@ -397,6 +432,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Garak",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Github: https://github.com/leondz/garak#v0.9.0.1",
"selected": False,
@@ -412,6 +448,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "Reinforcement Learning Optimization",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Cloud hosted model",
"selected": False,
@@ -427,6 +464,7 @@ REGISTRY = REGISTRY_V0 + [
"dataset_name": "InspectAI",
"num_prompts": 0,
"tokens": 0,
"is_active": True,
"approx_cost": 0.0,
"source": "Github: https://github.com/UKGovernmentBEIS/inspect_ai",
"selected": False,
@@ -439,6 +477,7 @@ REGISTRY = REGISTRY_V0 + [
"num_prompts": len(load_local_csv().prompts),
"tokens": load_local_csv().tokens,
"approx_cost": 0.0,
"is_active": True,
"source": f"Local file dataset: {load_local_csv().metadata['src']}",
"selected": len(load_local_csv().prompts),
"url": "",
+9 -1
View File
@@ -3,7 +3,7 @@ import random
from fastapi import APIRouter, File, Header, HTTPException, UploadFile
from fastapi.responses import JSONResponse
from ..models.schemas import FileProbeResponse, Probe
from ..primitives import FileProbeResponse, Probe
from ..probe_actor.refusal import REFUSAL_MARKS
from ..probe_data import REGISTRY
@@ -77,3 +77,11 @@ async def data_config():
async def health_check():
"""Health check endpoint."""
return JSONResponse(content={"status": "ok"})
@router.post("/v1/self-probe-t5")
def self_probe_t5(probe: Probe):
import languagemodels as lm # noqa
message = lm.do(probe.prompt)
return make_mock_response(message)
+1 -1
View File
@@ -5,7 +5,7 @@ from fastapi import APIRouter
from loguru import logger
from ..core.app import get_current_run, get_tools_inbox
from ..models.schemas import CompletionRequest, Settings
from ..primitives import CompletionRequest, Settings
from ..probe_actor.refusal import REFUSAL_MARKS
router = APIRouter()
+1 -1
View File
@@ -3,7 +3,7 @@ from pathlib import Path
from fastapi import APIRouter, Response
from fastapi.responses import FileResponse, StreamingResponse
from ..models.schemas import Table
from ..primitives import Table
from ..report_chart import plot_security_report
router = APIRouter()
+1 -1
View File
@@ -14,7 +14,7 @@ from fastapi.responses import StreamingResponse
from ..core.app import get_stop_event, get_tools_inbox, set_current_run
from ..dependencies import InMemorySecrets, get_in_memory_secrets
from ..http_spec import LLMSpec
from ..models.schemas import LLMInfo, Scan
from ..primitives import LLMInfo, Scan
from ..probe_actor import fuzzer
router = APIRouter()
+1 -1
View File
@@ -7,7 +7,7 @@ from fastapi.templating import Jinja2Templates
from jinja2 import Environment, FileSystemLoader
from starlette.responses import Response
from ..models.schemas import Settings
from ..primitives import Settings
router = APIRouter()
STATIC_DIR = Path(__file__).parent.parent / "static"
+2 -1
View File
@@ -2,7 +2,7 @@ import sentry_sdk
from loguru import logger
from sentry_sdk.integrations.logging import ignore_logger
from ..models.schemas import Settings
from ..primitives import Settings
def setup(app):
@@ -16,6 +16,7 @@ def setup(app):
# Set traces_sample_rate to 1.0 to capture 100%
# of transactions for tracing.
traces_sample_rate=1.0,
ignore_errors=[KeyboardInterrupt],
_experiments={
# Set continuous_profiling_auto_start to True
# to automatically start the profiler on when
+1 -1
View File
@@ -6,7 +6,7 @@ import pytest
from fastapi.testclient import TestClient
from ..app import app
from ..models.schemas import Probe
from ..primitives import Probe
from ..probe_actor.refusal import REFUSAL_MARKS
from ..probe_data import REGISTRY
+1 -1
View File
@@ -4,7 +4,7 @@ import pytest
from fastapi import HTTPException
from fastapi.testclient import TestClient
from ..models.schemas import Settings
from ..primitives import Settings
from .static import get_static_file, router
client = TestClient(router)
+17 -1
View File
@@ -1,4 +1,3 @@
let SELF_URL = window.location.href;
if (SELF_URL.endsWith('/')) {
SELF_URL = SELF_URL.slice(0, -1);
@@ -171,6 +170,21 @@ Content-Type: application/json
{
"audio_url": "<<AUDIO_FILE_URL>>"
}
`,
`POST https://api.openrouter.ai/v1/chat/completions
Authorization: Bearer $OPENROUTER_API_KEY
Content-Type: application/json
{
"model": "openrouter-latest",
"prompt": "<<PROMPT>>",
"temperature": 0.7,
"max_tokens": 150,
"top_p": 0.9,
"frequency_penalty": 0,
"presence_penalty": 0
}
`,
]
@@ -190,6 +204,7 @@ let LLM_CONFIGS = [
{ name: 'Claude', prompts: 40000, logo: '/icons/claude.png' },
{ name: 'Cohere', prompts: 40000, logo: '/icons/cohere.png' },
{ name: 'Azure OpenAI', prompts: 40000, logo: '/icons/azureai.png' },
{ name: 'OpenRouter.ai', prompts: 40000, logo: '/icons/openrouter.png' },
{ name: 'assemblyai', prompts: 40000, logo: fallbackIcon },
];
function has_image(spec) {
@@ -226,5 +241,6 @@ function _getFailureRateScore(failureRate) {
else if (strengthRate >= 80) return 'B';
else if (strengthRate >= 70) return 'C';
else if (strengthRate >= 60) return 'D';
else if (strengthRate >= 1) return '?';
else return 'E'; // For strengthRate less than 60
}
Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

+19 -20
View File
@@ -383,27 +383,26 @@
class="text-gray-400 hover:underline">Deselect All</button>
</div>
<div class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-4">
<div
v-for="(package, index) in dataConfig"
:key="index"
@click="addPackage(index)"
class="border rounded-lg p-3 cursor-pointer transition-all hover:shadow-md overflow-hidden"
:class="{
'border-dark-accent-green bg-dark-accent-green bg-opacity-20': package.selected,
'border-gray-600': !package.selected
}">
<div class="font-medium mb-1 truncate">{{ package.dataset_name
}}</div>
<div class="text-sm text-gray-400 truncate">
{{ package.source || 'Local dataset' }}
</div>
<div class="mt-2 text-sm font-semibold">
{{ package.dynamic ? 'Dynamic dataset' :
`${package.num_prompts.toLocaleString()} prompts` }}
</div>
</div>
<div class="grid grid-cols-1 sm:grid-cols-2 md:grid-cols-3 gap-4">
<div
v-for="(package, index) in dataConfig"
:key="index"
@click="package.is_active !== false && addPackage(index)"
class="border rounded-lg p-3 cursor-pointer transition-all hover:shadow-md overflow-hidden"
:class="{
'border-dark-accent-green bg-dark-accent-green bg-opacity-20': package.selected,
'border-gray-600': !package.selected,
'opacity-30 pointer-events-none cursor-not-allowed': package.is_active === false
}">
<div class="font-medium mb-1 truncate">{{ package.dataset_name }}</div>
<div class="text-sm text-gray-400 truncate">
{{ package.source || 'Local dataset' }}
</div>
<div class="mt-2 text-sm font-semibold">
{{ package.dynamic ? 'Dynamic dataset' : `${package.num_prompts.toLocaleString()} prompts` }}
</div>
</div>
</div>
</div>
</section>
+4
View File
@@ -350,6 +350,10 @@ var app = new Vue({
// If all are selected, deselect all. Otherwise, select all.
this.dataConfig.forEach(package => {
if (!package.is_active) {
package.selected = false;
return
}
package.selected = !allSelected;
});
+7 -6
View File
@@ -50,7 +50,7 @@ def make_test_registry():
]
class TestAS:
class TestLibraryLevel:
# Handles an empty dataset list.
def test_class(self, test_server):
llmSpec = test_spec_assets.SAMPLE_SPEC
@@ -62,8 +62,8 @@ class TestAS:
print(result)
assert len(result) in [0, 1]
# TODO: slow test
def _test_class_msj(self, test_server):
@pytest.mark.slow
def test_class_msj(self, test_server):
llmSpec = test_spec_assets.SAMPLE_SPEC
maxBudget = 1000
max_th = 0.3
@@ -98,6 +98,7 @@ class TestAS:
print(result)
assert len(result) in [0, 1]
@pytest.mark.slow
def test_backend(self, test_server):
llmSpec = test_spec_assets.SAMPLE_SPEC
maxBudget = 1000000
@@ -156,7 +157,7 @@ class TestAS:
class TestEntrypointCI:
def test_generate_default_cfg_to_tmp_path(self):
"""
Test that the `generate_default_cfg` method generates a valid default config file in a temporary path.
Test that the `generate_default_settings` method generates a valid default config file in a temporary path.
"""
# Create a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
@@ -167,7 +168,7 @@ class TestEntrypointCI:
# Generate the default configuration
security = AgenticSecurity()
security.generate_default_cfg()
security.generate_default_settings()
# Check that the config file was created at the temporary path
assert os.path.exists(temp_path), f"{temp_path} file should be generated."
@@ -192,7 +193,7 @@ class TestEntrypointCI:
# Generate the default configuration
security = AgenticSecurity()
security.generate_default_cfg()
security.generate_default_settings()
# Load the generated configuration
AgenticSecurity.load_config(temp_path)
+2
View File
@@ -0,0 +1,2 @@
from:python-pytest-poetry
# This file was generated automatically by CodeBeaver based on your repository. Learn how to customize it here: https://docs.codebeaver.ai/configuration/
Generated
+109 -10
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
# This file is automatically @generated by Poetry 1.8.5 and should not be changed by hand.
[[package]]
name = "aiohappyeyeballs"
@@ -784,18 +784,18 @@ tests = ["asttokens (>=2.1.0)", "coverage", "coverage-enable-subprocess", "ipyth
[[package]]
name = "fastapi"
version = "0.115.8"
version = "0.115.11"
description = "FastAPI framework, high performance, easy to learn, fast to code, ready for production"
optional = false
python-versions = ">=3.8"
files = [
{file = "fastapi-0.115.8-py3-none-any.whl", hash = "sha256:753a96dd7e036b34eeef8babdfcfe3f28ff79648f86551eb36bfc1b0bf4a8cbf"},
{file = "fastapi-0.115.8.tar.gz", hash = "sha256:0ce9111231720190473e222cdf0f07f7206ad7e53ea02beb1d2dc36e2f0741e9"},
{file = "fastapi-0.115.11-py3-none-any.whl", hash = "sha256:32e1541b7b74602e4ef4a0260ecaf3aadf9d4f19590bba3e1bf2ac4666aa2c64"},
{file = "fastapi-0.115.11.tar.gz", hash = "sha256:cc81f03f688678b92600a65a5e618b93592c65005db37157147204d8924bf94f"},
]
[package.dependencies]
pydantic = ">=1.7.4,<1.8 || >1.8,<1.8.1 || >1.8.1,<2.0.0 || >2.0.0,<2.0.1 || >2.0.1,<2.1.0 || >2.1.0,<3.0.0"
starlette = ">=0.40.0,<0.46.0"
starlette = ">=0.40.0,<0.47.0"
typing-extensions = ">=4.8.0"
[package.extras]
@@ -1202,13 +1202,13 @@ files = [
[[package]]
name = "inline-snapshot"
version = "0.20.1"
version = "0.20.3"
description = "golden master/snapshot/approval testing library which puts the values right into your source code"
optional = false
python-versions = ">=3.8"
files = [
{file = "inline_snapshot-0.20.1-py3-none-any.whl", hash = "sha256:5b5c3fd037f340dff5adee1c2c58db9038325937a8190dedbba98e37b87c979a"},
{file = "inline_snapshot-0.20.1.tar.gz", hash = "sha256:c56c871e59973500eca00610022eac19e79cd2c1b0b2d7a18abe14dde11a1431"},
{file = "inline_snapshot-0.20.3-py3-none-any.whl", hash = "sha256:1ea999fbf38dd11cc72d0e1a0b9303c63d496b77bdc406a394fe2424ae842f70"},
{file = "inline_snapshot-0.20.3.tar.gz", hash = "sha256:7a353170b7e42aa89086c7ba790a973c9645523acf985532648dabd7ee2d71f2"},
]
[package.dependencies]
@@ -1217,7 +1217,7 @@ executing = ">=2.2.0"
rich = ">=13.7.1"
[package.extras]
black = ["black (>=23.3.0)", "click (>=8.1.4)"]
black = ["black (>=23.3.0)"]
dirty-equals = ["dirty-equals (>=0.9.0)"]
[[package]]
@@ -2319,6 +2319,94 @@ files = [
{file = "numpy-2.2.2.tar.gz", hash = "sha256:ed6906f61834d687738d25988ae117683705636936cc605be0bb208b23df4d8f"},
]
[[package]]
name = "orjson"
version = "3.10.15"
description = "Fast, correct Python JSON library supporting dataclasses, datetimes, and numpy"
optional = false
python-versions = ">=3.8"
files = [
{file = "orjson-3.10.15-cp310-cp310-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:552c883d03ad185f720d0c09583ebde257e41b9521b74ff40e08b7dec4559c04"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:616e3e8d438d02e4854f70bfdc03a6bcdb697358dbaa6bcd19cbe24d24ece1f8"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7c2c79fa308e6edb0ffab0a31fd75a7841bf2a79a20ef08a3c6e3b26814c8ca8"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:73cb85490aa6bf98abd20607ab5c8324c0acb48d6da7863a51be48505646c814"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:763dadac05e4e9d2bc14938a45a2d0560549561287d41c465d3c58aec818b164"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a330b9b4734f09a623f74a7490db713695e13b67c959713b78369f26b3dee6bf"},
{file = "orjson-3.10.15-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:a61a4622b7ff861f019974f73d8165be1bd9a0855e1cad18ee167acacabeb061"},
{file = "orjson-3.10.15-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:acd271247691574416b3228db667b84775c497b245fa275c6ab90dc1ffbbd2b3"},
{file = "orjson-3.10.15-cp310-cp310-musllinux_1_2_armv7l.whl", hash = "sha256:e4759b109c37f635aa5c5cc93a1b26927bfde24b254bcc0e1149a9fada253d2d"},
{file = "orjson-3.10.15-cp310-cp310-musllinux_1_2_i686.whl", hash = "sha256:9e992fd5cfb8b9f00bfad2fd7a05a4299db2bbe92e6440d9dd2fab27655b3182"},
{file = "orjson-3.10.15-cp310-cp310-musllinux_1_2_x86_64.whl", hash = "sha256:f95fb363d79366af56c3f26b71df40b9a583b07bbaaf5b317407c4d58497852e"},
{file = "orjson-3.10.15-cp310-cp310-win32.whl", hash = "sha256:f9875f5fea7492da8ec2444839dcc439b0ef298978f311103d0b7dfd775898ab"},
{file = "orjson-3.10.15-cp310-cp310-win_amd64.whl", hash = "sha256:17085a6aa91e1cd70ca8533989a18b5433e15d29c574582f76f821737c8d5806"},
{file = "orjson-3.10.15-cp311-cp311-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:c4cc83960ab79a4031f3119cc4b1a1c627a3dc09df125b27c4201dff2af7eaa6"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ddbeef2481d895ab8be5185f2432c334d6dec1f5d1933a9c83014d188e102cef"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9e590a0477b23ecd5b0ac865b1b907b01b3c5535f5e8a8f6ab0e503efb896334"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:a6be38bd103d2fd9bdfa31c2720b23b5d47c6796bcb1d1b598e3924441b4298d"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ff4f6edb1578960ed628a3b998fa54d78d9bb3e2eb2cfc5c2a09732431c678d0"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b0482b21d0462eddd67e7fce10b89e0b6ac56570424662b685a0d6fccf581e13"},
{file = "orjson-3.10.15-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:bb5cc3527036ae3d98b65e37b7986a918955f85332c1ee07f9d3f82f3a6899b5"},
{file = "orjson-3.10.15-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:d569c1c462912acdd119ccbf719cf7102ea2c67dd03b99edcb1a3048651ac96b"},
{file = "orjson-3.10.15-cp311-cp311-musllinux_1_2_armv7l.whl", hash = "sha256:1e6d33efab6b71d67f22bf2962895d3dc6f82a6273a965fab762e64fa90dc399"},
{file = "orjson-3.10.15-cp311-cp311-musllinux_1_2_i686.whl", hash = "sha256:c33be3795e299f565681d69852ac8c1bc5c84863c0b0030b2b3468843be90388"},
{file = "orjson-3.10.15-cp311-cp311-musllinux_1_2_x86_64.whl", hash = "sha256:eea80037b9fae5339b214f59308ef0589fc06dc870578b7cce6d71eb2096764c"},
{file = "orjson-3.10.15-cp311-cp311-win32.whl", hash = "sha256:d5ac11b659fd798228a7adba3e37c010e0152b78b1982897020a8e019a94882e"},
{file = "orjson-3.10.15-cp311-cp311-win_amd64.whl", hash = "sha256:cf45e0214c593660339ef63e875f32ddd5aa3b4adc15e662cdb80dc49e194f8e"},
{file = "orjson-3.10.15-cp312-cp312-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:9d11c0714fc85bfcf36ada1179400862da3288fc785c30e8297844c867d7505a"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:dba5a1e85d554e3897fa9fe6fbcff2ed32d55008973ec9a2b992bd9a65d2352d"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7723ad949a0ea502df656948ddd8b392780a5beaa4c3b5f97e525191b102fff0"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:6fd9bc64421e9fe9bd88039e7ce8e58d4fead67ca88e3a4014b143cec7684fd4"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dadba0e7b6594216c214ef7894c4bd5f08d7c0135f4dd0145600be4fbcc16767"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b48f59114fe318f33bbaee8ebeda696d8ccc94c9e90bc27dbe72153094e26f41"},
{file = "orjson-3.10.15-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:035fb83585e0f15e076759b6fedaf0abb460d1765b6a36f48018a52858443514"},
{file = "orjson-3.10.15-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:d13b7fe322d75bf84464b075eafd8e7dd9eae05649aa2a5354cfa32f43c59f17"},
{file = "orjson-3.10.15-cp312-cp312-musllinux_1_2_armv7l.whl", hash = "sha256:7066b74f9f259849629e0d04db6609db4cf5b973248f455ba5d3bd58a4daaa5b"},
{file = "orjson-3.10.15-cp312-cp312-musllinux_1_2_i686.whl", hash = "sha256:88dc3f65a026bd3175eb157fea994fca6ac7c4c8579fc5a86fc2114ad05705b7"},
{file = "orjson-3.10.15-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:b342567e5465bd99faa559507fe45e33fc76b9fb868a63f1642c6bc0735ad02a"},
{file = "orjson-3.10.15-cp312-cp312-win32.whl", hash = "sha256:0a4f27ea5617828e6b58922fdbec67b0aa4bb844e2d363b9244c47fa2180e665"},
{file = "orjson-3.10.15-cp312-cp312-win_amd64.whl", hash = "sha256:ef5b87e7aa9545ddadd2309efe6824bd3dd64ac101c15dae0f2f597911d46eaa"},
{file = "orjson-3.10.15-cp313-cp313-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:bae0e6ec2b7ba6895198cd981b7cca95d1487d0147c8ed751e5632ad16f031a6"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:f93ce145b2db1252dd86af37d4165b6faa83072b46e3995ecc95d4b2301b725a"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:7c203f6f969210128af3acae0ef9ea6aab9782939f45f6fe02d05958fe761ef9"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:8918719572d662e18b8af66aef699d8c21072e54b6c82a3f8f6404c1f5ccd5e0"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:f71eae9651465dff70aa80db92586ad5b92df46a9373ee55252109bb6b703307"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e117eb299a35f2634e25ed120c37c641398826c2f5a3d3cc39f5993b96171b9e"},
{file = "orjson-3.10.15-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:13242f12d295e83c2955756a574ddd6741c81e5b99f2bef8ed8d53e47a01e4b7"},
{file = "orjson-3.10.15-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:7946922ada8f3e0b7b958cc3eb22cfcf6c0df83d1fe5521b4a100103e3fa84c8"},
{file = "orjson-3.10.15-cp313-cp313-musllinux_1_2_armv7l.whl", hash = "sha256:b7155eb1623347f0f22c38c9abdd738b287e39b9982e1da227503387b81b34ca"},
{file = "orjson-3.10.15-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:208beedfa807c922da4e81061dafa9c8489c6328934ca2a562efa707e049e561"},
{file = "orjson-3.10.15-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:eca81f83b1b8c07449e1d6ff7074e82e3fd6777e588f1a6632127f286a968825"},
{file = "orjson-3.10.15-cp313-cp313-win32.whl", hash = "sha256:c03cd6eea1bd3b949d0d007c8d57049aa2b39bd49f58b4b2af571a5d3833d890"},
{file = "orjson-3.10.15-cp313-cp313-win_amd64.whl", hash = "sha256:fd56a26a04f6ba5fb2045b0acc487a63162a958ed837648c5781e1fe3316cfbf"},
{file = "orjson-3.10.15-cp38-cp38-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:5e8afd6200e12771467a1a44e5ad780614b86abb4b11862ec54861a82d677746"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:da9a18c500f19273e9e104cca8c1f0b40a6470bcccfc33afcc088045d0bf5ea6"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:bb00b7bfbdf5d34a13180e4805d76b4567025da19a197645ca746fc2fb536586"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:33aedc3d903378e257047fee506f11e0833146ca3e57a1a1fb0ddb789876c1e1"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dd0099ae6aed5eb1fc84c9eb72b95505a3df4267e6962eb93cdd5af03be71c98"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7c864a80a2d467d7786274fce0e4f93ef2a7ca4ff31f7fc5634225aaa4e9e98c"},
{file = "orjson-3.10.15-cp38-cp38-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:c25774c9e88a3e0013d7d1a6c8056926b607a61edd423b50eb5c88fd7f2823ae"},
{file = "orjson-3.10.15-cp38-cp38-musllinux_1_2_aarch64.whl", hash = "sha256:e78c211d0074e783d824ce7bb85bf459f93a233eb67a5b5003498232ddfb0e8a"},
{file = "orjson-3.10.15-cp38-cp38-musllinux_1_2_armv7l.whl", hash = "sha256:43e17289ffdbbac8f39243916c893d2ae41a2ea1a9cbb060a56a4d75286351ae"},
{file = "orjson-3.10.15-cp38-cp38-musllinux_1_2_i686.whl", hash = "sha256:781d54657063f361e89714293c095f506c533582ee40a426cb6489c48a637b81"},
{file = "orjson-3.10.15-cp38-cp38-musllinux_1_2_x86_64.whl", hash = "sha256:6875210307d36c94873f553786a808af2788e362bd0cf4c8e66d976791e7b528"},
{file = "orjson-3.10.15-cp38-cp38-win32.whl", hash = "sha256:305b38b2b8f8083cc3d618927d7f424349afce5975b316d33075ef0f73576b60"},
{file = "orjson-3.10.15-cp38-cp38-win_amd64.whl", hash = "sha256:5dd9ef1639878cc3efffed349543cbf9372bdbd79f478615a1c633fe4e4180d1"},
{file = "orjson-3.10.15-cp39-cp39-macosx_10_15_x86_64.macosx_11_0_arm64.macosx_10_15_universal2.whl", hash = "sha256:ffe19f3e8d68111e8644d4f4e267a069ca427926855582ff01fc012496d19969"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d433bf32a363823863a96561a555227c18a522a8217a6f9400f00ddc70139ae2"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:da03392674f59a95d03fa5fb9fe3a160b0511ad84b7a3914699ea5a1b3a38da2"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3a63bb41559b05360ded9132032239e47983a39b151af1201f07ec9370715c82"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:3766ac4702f8f795ff3fa067968e806b4344af257011858cc3d6d8721588b53f"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7a1c73dcc8fadbd7c55802d9aa093b36878d34a3b3222c41052ce6b0fc65f8e8"},
{file = "orjson-3.10.15-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:b299383825eafe642cbab34be762ccff9fd3408d72726a6b2a4506d410a71ab3"},
{file = "orjson-3.10.15-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:abc7abecdbf67a173ef1316036ebbf54ce400ef2300b4e26a7b843bd446c2480"},
{file = "orjson-3.10.15-cp39-cp39-musllinux_1_2_armv7l.whl", hash = "sha256:3614ea508d522a621384c1d6639016a5a2e4f027f3e4a1c93a51867615d28829"},
{file = "orjson-3.10.15-cp39-cp39-musllinux_1_2_i686.whl", hash = "sha256:295c70f9dc154307777ba30fe29ff15c1bcc9dfc5c48632f37d20a607e9ba85a"},
{file = "orjson-3.10.15-cp39-cp39-musllinux_1_2_x86_64.whl", hash = "sha256:63309e3ff924c62404923c80b9e2048c1f74ba4b615e7584584389ada50ed428"},
{file = "orjson-3.10.15-cp39-cp39-win32.whl", hash = "sha256:a2f708c62d026fb5340788ba94a55c23df4e1869fec74be455e0b2f5363b8507"},
{file = "orjson-3.10.15-cp39-cp39-win_amd64.whl", hash = "sha256:efcf6c735c3d22ef60c4aa27a5238f1a477df85e9b15f2142f9d669beb2d13fd"},
{file = "orjson-3.10.15.tar.gz", hash = "sha256:05ca7fe452a2e9d8d9d706a2984c95b9c2ebc5db417ce0b7a49b91d50642a23e"},
]
[[package]]
name = "packaging"
version = "24.1"
@@ -3014,6 +3102,17 @@ files = [
[package.dependencies]
typing-extensions = ">=4.6.0,<4.7.0 || >4.7.0"
[[package]]
name = "pyfiglet"
version = "1.0.2"
description = "Pure-python FIGlet implementation"
optional = false
python-versions = ">=3.9"
files = [
{file = "pyfiglet-1.0.2-py3-none-any.whl", hash = "sha256:889b351d79c99e50a3f619c8f8e6ffdb27fd8c939fc43ecbd7559bd57d5f93ea"},
{file = "pyfiglet-1.0.2.tar.gz", hash = "sha256:758788018ab8faaddc0984e1ea05ff330d3c64be663c513cc1f105f6a3066dab"},
]
[[package]]
name = "pygments"
version = "2.18.0"
@@ -4439,4 +4538,4 @@ propcache = ">=0.2.0"
[metadata]
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "a741ff960d86175204b90cdb4f935d3873a6a38d2d547c1ded73c17ab54b4312"
content-hash = "28a2b74bfafa9f93d14d2f8d1fcaffa340db212acce6469d6714d342203ad77f"
+7 -1
View File
@@ -1,6 +1,6 @@
[tool.poetry]
name = "agentic_security"
version = "0.5.0"
version = "0.5.1"
description = "Agentic LLM vulnerability scanner"
authors = ["Alexander Miasoiedov <msoedov@gmail.com>"]
maintainers = ["Alexander Miasoiedov <msoedov@gmail.com>"]
@@ -49,6 +49,10 @@ tomli = "^2.2.1"
rich = "13.9.4"
gTTS = "^2.5.4"
sentry_sdk = "^2.22.0"
orjson = "^3.10"
pyfiglet = "^1.0.2"
termcolor = "^2.4.0"
# garak = { version = "*", optional = true }
@@ -82,5 +86,7 @@ build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
addopts = "--durations=5 -m 'not slow'"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
markers = "slow: marks tests as slow"
View File
+8
View File
@@ -0,0 +1,8 @@
import os
import pytest
def pytest_runtest_setup(item):
if "slow" in item.keywords and not os.getenv("RUN_SLOW_TESTS"):
pytest.skip("Skipping slow test")
+161
View File
@@ -0,0 +1,161 @@
import pytest
import asyncio
from fastapi import FastAPI
from asyncio import Queue, Event
from agentic_security.core.app import create_app, get_tools_inbox, get_stop_event, get_current_run, set_current_run
class TestApp:
"""Test suite for agentic_security.core.app module."""
def test_create_app(self):
"""Test that create_app returns a FastAPI instance."""
app = create_app()
assert isinstance(app, FastAPI)
@pytest.mark.asyncio
async def test_get_tools_inbox(self):
"""Test that get_tools_inbox returns the global Queue instance."""
queue1 = get_tools_inbox()
await queue1.put("test item")
queue2 = get_tools_inbox()
result = queue2.get_nowait()
assert result == "test item"
def test_get_stop_event(self):
"""Test that get_stop_event returns the global Event instance and is not set initially."""
event = get_stop_event()
assert isinstance(event, Event)
assert not event.is_set()
def test_current_run_initial(self):
"""Test that get_current_run returns the global current_run with default values initially."""
run = get_current_run()
# Default values should be empty strings
assert run["spec"] == ""
assert run["id"] == ""
def test_set_current_run(self):
"""Test that set_current_run correctly updates current_run."""
spec = "test run"
result = set_current_run(spec)
expected_id = hash(id(spec))
# Verify that spec is set correctly
assert result["spec"] == spec
assert result["id"] == expected_id
def test_current_run_after_set(self):
"""Test that get_current_run returns the updated current_run after set_current_run is called."""
spec = "another test run"
set_current_run(spec)
current = get_current_run()
assert current["spec"] == spec
assert current["id"] == hash(id(spec))
def test_tools_inbox_same_instance(self):
"""Test that get_tools_inbox returns the same Queue instance by default."""
queue1 = get_tools_inbox()
queue2 = get_tools_inbox()
assert queue1 is queue2
def test_stop_event_set(self):
"""Test that setting the stop event is reflected in subsequent calls."""
event = get_stop_event()
event.set() # set the global event
# Now, subsequent calls should return the same event which is set.
event2 = get_stop_event()
assert event2.is_set()
def test_set_current_run_with_none(self):
"""Test that set_current_run handles None as a valid input and updates current_run accordingly."""
result = set_current_run(None)
expected_id = hash(id(None))
assert result["spec"] is None
assert result["id"] == expected_id
def test_multiple_current_run_assignments(self):
"""Test multiple assignments to current_run to ensure it always updates correctly."""
first_spec = "first run"
result1 = set_current_run(first_spec)
expected_id1 = hash(id(first_spec))
assert result1["spec"] == first_spec
assert result1["id"] == expected_id1
second_spec = "second run"
result2 = set_current_run(second_spec)
expected_id2 = hash(id(second_spec))
assert result2["spec"] == second_spec
assert result2["id"] == expected_id2
current = get_current_run()
# The current_run should reflect the latest assignment.
assert current["spec"] == second_spec
assert current["id"] == expected_id2
@pytest.mark.asyncio
async def test_empty_tools_inbox_exception(self):
"""Test that calling get_nowait on an empty tools_inbox raises QueueEmpty."""
from asyncio import QueueEmpty
queue = get_tools_inbox()
# Clear any existing items in the queue
while True:
try:
queue.get_nowait()
except QueueEmpty:
break
with pytest.raises(QueueEmpty):
queue.get_nowait()
def test_set_current_run_with_dict(self):
"""Test that set_current_run correctly handles a dictionary input as spec."""
spec = {"key": "value"}
result = set_current_run(spec)
expected_id = hash(id(spec))
assert result["spec"] == spec
assert result["id"] == expected_id
@pytest.mark.asyncio
async def test_stop_event_wait(self):
"""Test that waiting on the stop event returns once the event is set."""
event = get_stop_event()
event.clear() # ensure event is not set
async def waiter():
await event.wait()
return True
waiter_task = asyncio.create_task(waiter())
# Wait a moment to ensure the waiter is pending
await asyncio.sleep(0.1)
assert not waiter_task.done()
event.set()
result = await waiter_task
assert result is True
def test_set_current_run_with_int(self):
"""Test that set_current_run handles an integer input as spec."""
spec = 12345
result = set_current_run(spec)
expected_id = hash(id(spec))
assert result["spec"] == spec
assert result["id"] == expected_id
def test_create_app_routes(self):
"""Test that create_app returns a FastAPI instance with default routes available."""
app = create_app()
paths = [route.path for route in app.routes]
# Check that the default OpenAPI route exists
assert "/openapi.json" in paths
@pytest.mark.asyncio
async def test_tools_inbox_async_put_get_order(self):
"""Test that tools_inbox preserves order when items are added and retrieved asynchronously."""
queue = get_tools_inbox()
# Clear any existing items in the queue
from asyncio import QueueEmpty
while True:
try:
queue.get_nowait()
except QueueEmpty:
break
items = ["first", "second", "third"]
for item in items:
await queue.put(item)
result_items = []
for _ in items:
result_items.append(await queue.get())
assert result_items == items
+341
View File
@@ -0,0 +1,341 @@
import pytest
import base64
import httpx
import asyncio
from agentic_security.http_spec import (
LLMSpec,
parse_http_spec,
escape_special_chars_for_json,
encode_image_base64_by_url,
encode_audio_base64_by_url,
InvalidHTTPSpecError,
Modality
)
################################################################################
# Tests for agentic_security/http_spec.py
################################################################################
def test_escape_special_chars_for_json():
"""Test escaping special characters in a prompt for JSON safety."""
prompt = 'Line1\nLine2\t"Quote"\\Backslash'
escaped = escape_special_chars_for_json(prompt)
assert '\\n' in escaped
assert '\\t' in escaped
assert '\\"' in escaped
assert '\\\\' in escaped
def test_parse_http_spec_text():
"""Test parsing a text HTTP spec without image/audio/files requirements."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\nThis is a prompt: <<PROMPT>>"
llm_spec = parse_http_spec(spec)
assert llm_spec.method == "POST"
assert llm_spec.url == "http://example.com/api"
assert llm_spec.headers["Content-Type"] == "application/json"
assert "<<PROMPT>>" in llm_spec.body
assert not llm_spec.has_files
assert not llm_spec.has_image
assert not llm_spec.has_audio
def test_parse_http_spec_files():
"""Test parsing a HTTP spec with multipart/form-data header indicating files."""
spec = "PUT http://example.com/upload\nContent-Type: multipart/form-data\n\nFile upload test"
llm_spec = parse_http_spec(spec)
assert llm_spec.has_files
def test_parse_http_spec_image_audio():
"""Test parsing a HTTP spec that requires image and audio via placeholders."""
spec = "GET http://example.com/api\nContent-Type: application/json\n\nImage: <<BASE64_IMAGE>> and Audio: <<BASE64_AUDIO>>"
llm_spec = parse_http_spec(spec)
assert llm_spec.has_image
assert llm_spec.has_audio
def test_encode_image_base64_by_url(monkeypatch):
"""Test that image encoding returns the correct base64 string with prefix."""
dummy_content = b'test_image'
class DummyResponse:
def __init__(self, content):
self.content = content
def dummy_get(url):
return DummyResponse(dummy_content)
monkeypatch.setattr(httpx, "get", dummy_get)
result = encode_image_base64_by_url("http://dummyurl.com/image.jpg")
expected = "data:image/jpeg;base64," + base64.b64encode(dummy_content).decode("utf-8")
assert result == expected
def test_encode_audio_base64_by_url(monkeypatch):
"""Test that audio encoding returns the correct base64 string with prefix."""
dummy_content = b'test_audio'
class DummyResponse:
def __init__(self, content):
self.content = content
def dummy_get(url):
return DummyResponse(dummy_content)
monkeypatch.setattr(httpx, "get", dummy_get)
result = encode_audio_base64_by_url("http://dummyurl.com/audio.mp3")
expected = "data:audio/mpeg;base64," + base64.b64encode(dummy_content).decode("utf-8")
assert result == expected
@pytest.mark.asyncio
async def test_probe_text(monkeypatch):
"""Test the probe function for text modality by replacing <<PROMPT>>."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"prompt\": \"<<PROMPT>>\"}"
llm_spec = parse_http_spec(spec)
async def dummy_request(self, method, url, headers, content, timeout):
return httpx.Response(200, text="ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.probe("Hello")
assert response.status_code == 200
assert "ok" in response.text
@pytest.mark.asyncio
async def test_probe_with_files(monkeypatch):
"""Test that probe correctly branches to _probe_with_files when files are provided."""
spec = "POST http://example.com/api\nContent-Type: multipart/form-data\n\nFile data"
llm_spec = parse_http_spec(spec)
files = {"file": ("dummy.txt", b"data")}
async def dummy_request(self, method, url, headers, files, timeout):
return httpx.Response(200, text="file upload ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.probe("Unused", files=files)
assert response.status_code == 200
assert "file upload ok" in response.text
@pytest.mark.asyncio
async def test_verify_image(monkeypatch):
"""Test verify method branch for image modality by monkeypatching image encoder."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"image\": \"<<BASE64_IMAGE>>\"}"
llm_spec = parse_http_spec(spec)
# Replace the image encoder to return a dummy string
monkeypatch.setattr("agentic_security.http_spec.encode_image_base64_by_url", lambda url="": "dummy_image")
async def dummy_request(self, method, url, headers, content, timeout):
# Check that the dummy image is injected in the content
assert "dummy_image" in content
return httpx.Response(200, text="image ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.verify()
assert response.status_code == 200
assert "image ok" in response.text
@pytest.mark.asyncio
async def test_verify_audio(monkeypatch):
"""Test verify method branch for audio modality by monkeypatching audio encoder."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"audio\": \"<<BASE64_AUDIO>>\"}"
llm_spec = parse_http_spec(spec)
monkeypatch.setattr("agentic_security.http_spec.encode_audio_base64_by_url", lambda url: "dummy_audio")
async def dummy_request(self, method, url, headers, content, timeout):
# Ensure that the dummy audio string is present in the request content
assert "dummy_audio" in content
return httpx.Response(200, text="audio ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.verify()
assert response.status_code == 200
assert "audio ok" in response.text
@pytest.mark.asyncio
async def test_verify_files(monkeypatch):
"""Test verify method branch for files modality where _probe_with_files is invoked."""
spec = "POST http://example.com/api\nContent-Type: multipart/form-data\n\nFile data"
llm_spec = parse_http_spec(spec)
async def dummy_request(self, method, url, headers, files, timeout):
return httpx.Response(200, text="files ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.verify()
assert response.status_code == 200
assert "files ok" in response.text
def test_llm_spec_modality_property():
"""Test that the modality property reflects the correct modality."""
spec_text = "POST http://example.com/api\nContent-Type: application/json\n\nPrompt: <<PROMPT>>"
llm_spec_text = parse_http_spec(spec_text)
assert llm_spec_text.modality == Modality.TEXT
spec_image = "POST http://example.com/api\nContent-Type: application/json\n\nImage: <<BASE64_IMAGE>>"
llm_spec_image = parse_http_spec(spec_image)
assert llm_spec_image.modality == Modality.IMAGE
spec_audio = "POST http://example.com/api\nContent-Type: application/json\n\nAudio: <<BASE64_AUDIO>>"
llm_spec_audio = parse_http_spec(spec_audio)
assert llm_spec_audio.modality == Modality.AUDIO
def test_from_string_invalid():
"""Test that LLMSpec.from_string raises an error for an invalid spec."""
invalid_spec = "INVALID_SPEC"
with pytest.raises(InvalidHTTPSpecError):
LLMSpec.from_string(invalid_spec)
@pytest.mark.asyncio
async def test_validate_missing_files():
"""Test that LLMSpec.validate raises a ValueError when files are required but missing."""
spec = "POST http://example.com/api\nContent-Type: multipart/form-data\n\nFile upload test"
llm_spec = parse_http_spec(spec)
with pytest.raises(ValueError, match="Files are required"):
llm_spec.validate("test prompt", "", "", {})
@pytest.mark.asyncio
async def test_validate_missing_image():
"""Test that LLMSpec.validate raises a ValueError when an image is required but missing."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\nImage: <<BASE64_IMAGE>>"
llm_spec = parse_http_spec(spec)
with pytest.raises(ValueError, match="An image is required"):
llm_spec.validate("test prompt", "", "dummy_audio", {})
@pytest.mark.asyncio
async def test_validate_missing_audio():
"""Test that LLMSpec.validate raises a ValueError when audio is required but missing."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\nAudio: <<BASE64_AUDIO>>"
llm_spec = parse_http_spec(spec)
with pytest.raises(ValueError, match="Audio is required"):
llm_spec.validate("test prompt", "dummy_image", "", {})
def test_fn_alias(monkeypatch):
"""Test that LLMSpec.fn is a functional alias for LLMSpec.probe."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"prompt\": \"<<PROMPT>>\"}"
llm_spec = parse_http_spec(spec)
# Instead of overriding the instance method, verify the alias at the class level.
assert LLMSpec.fn is LLMSpec.probe
def test_escape_special_chars_no_special():
"""Test that the escape function returns the original string if no special characters are present."""
prompt = "Simple text without specials"
escaped = escape_special_chars_for_json(prompt)
assert escaped == "Simple text without specials"
@pytest.mark.asyncio
async def test_probe_text_with_special_chars(monkeypatch):
"""Test probe for text modality with special characters in prompt ensuring escaped content."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"prompt\": \"<<PROMPT>>\"}"
llm_spec = parse_http_spec(spec)
captured = {}
async def dummy_request(self, method, url, headers, content, timeout):
captured['content'] = content
return httpx.Response(200, text="ok")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
test_prompt = 'Hello\nWorld\t"Test"'
response = await llm_spec.probe(test_prompt)
expected_escaped = escape_special_chars_for_json(test_prompt)
assert expected_escaped in captured['content']
assert response.status_code == 200
@pytest.mark.asyncio
async def test_verify_both_image_audio(monkeypatch):
"""Test verify method when both image and audio placeholders are present.
Expect a ValueError because only the image branch is triggered by pattern matching and the missing audio causes validation to fail."""
spec = ("POST http://example.com/api\nContent-Type: application/json\n\n"
"{\"audio\": \"<<BASE64_AUDIO>>\", \"image\":\"<<BASE64_IMAGE>>\"}")
llm_spec = parse_http_spec(spec)
# Monkey patch the image encoder to return a dummy value
monkeypatch.setattr("agentic_security.http_spec.encode_image_base64_by_url", lambda url="": "dummy_image")
with pytest.raises(ValueError, match="Audio is required"):
await llm_spec.verify()
def test_parse_http_spec_invalid_header_format():
"""Test that parse_http_spec raises an error when a header line doesn't have the expected 'key: value' format."""
invalid_spec = "GET http://example.com/api\nInvalidHeaderWithoutColon\n\nBody with <<PROMPT>>"
with pytest.raises(ValueError):
parse_http_spec(invalid_spec)
def test_from_string_valid():
"""Test that LLMSpec.from_string returns a valid LLMSpec object when given a proper spec string."""
spec = "GET http://example.com/api\nContent-Type: application/json\n\n{ \"prompt\": \"<<PROMPT>>\" }"
llm_spec = LLMSpec.from_string(spec)
assert llm_spec.method == "GET"
assert llm_spec.url == "http://example.com/api"
@pytest.mark.asyncio
async def test_parse_http_spec_multiline_body():
"""Test parsing an HTTP spec with a multiline body to ensure body concatenation works."""
spec = (
"PATCH http://example.com/api\n"
"Content-Type: application/json\n"
"\n"
"Line one of body\n"
"Line two of body\n"
"Line three"
)
llm_spec = parse_http_spec(spec)
# As implemented, the parser concatenates lines without newline delimiters
expected_body = "Line one of bodyLine two of bodyLine three"
assert llm_spec.body == expected_body
@pytest.mark.asyncio
async def test_encode_image_default_argument(monkeypatch):
"""Test that encode_image_base64_by_url works with its default URL argument."""
dummy_content = b'default_image'
class DummyResponse:
def __init__(self, content):
self.content = content
def dummy_get(url):
# check that the default URL (which includes 'fluidicon.png') is used
assert "fluidicon.png" in url
return DummyResponse(dummy_content)
monkeypatch.setattr(httpx, "get", dummy_get)
result = encode_image_base64_by_url()
expected = "data:image/jpeg;base64," + base64.b64encode(dummy_content).decode("utf-8")
assert result == expected
@pytest.mark.asyncio
async def test_probe_without_prompt_placeholder(monkeypatch):
"""Test the probe function when the request body does not include the <<PROMPT>> placeholder."""
spec = "POST http://example.com/api\nContent-Type: application/json\n\n{\"message\": \"No placeholder here\"}"
llm_spec = parse_http_spec(spec)
captured = {}
async def dummy_request(self, method, url, headers, content, timeout):
captured['content'] = content
return httpx.Response(200, text="ok without placeholder")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
response = await llm_spec.probe("Ignored prompt")
assert "No placeholder here" in captured['content']
assert response.status_code == 200
def test_validate_success():
"""Test that LLMSpec.validate does not raise an error when all required data is provided."""
# Test case for files: files are provided as required
spec_files = "POST http://example.com/api\nContent-Type: multipart/form-data\n\nFile upload"
llm_spec_files = parse_http_spec(spec_files)
llm_spec_files.validate("some prompt", "dummy_image", "dummy_audio", {"file": ("dummy.txt", b"data")})
# Test case for image: image is provided as required
spec_image = "POST http://example.com/api\nContent-Type: application/json\n\nImage: <<BASE64_IMAGE>>"
llm_spec_image = parse_http_spec(spec_image)
llm_spec_image.validate("some prompt", "dummy_image", "dummy_audio", {})
# Test case for audio: audio is provided as required
spec_audio = "POST http://example.com/api\nContent-Type: application/json\n\nAudio: <<BASE64_AUDIO>>"
llm_spec_audio = parse_http_spec(spec_audio)
llm_spec_audio.validate("some prompt", "dummy_image", "dummy_audio", {})
@pytest.mark.asyncio
async def test_probe_invalid_url(monkeypatch):
"""Test that probe raises an exception when the HTTP client fails due to an invalid URL."""
spec = "GET http://nonexistent_url/api\nContent-Type: application/json\n\n{\"prompt\": \"<<PROMPT>>\"}"
llm_spec = parse_http_spec(spec)
async def dummy_request(self, method, url, headers, content, timeout):
raise httpx.RequestError("Invalid URL")
monkeypatch.setattr(httpx.AsyncClient, "request", dummy_request)
with pytest.raises(httpx.RequestError):
await llm_spec.probe("Test")
+26
View File
@@ -0,0 +1,26 @@
import pytest
from datasets import load_dataset
from agentic_security.probe_data import REGISTRY
@pytest.mark.slow
@pytest.mark.parametrize("dataset", REGISTRY)
def test_registry_accessibility(dataset):
source = dataset.get("source", "")
if "hugging" not in source.lower():
return pytest.skip("skipped dataset")
if not dataset.get("is_active"):
return pytest.skip("skipped dataset")
dataset_name = dataset.get("dataset_name")
if not dataset_name:
pytest.fail(f"No dataset_name found in {dataset}")
# Load only metadata (no data download)
try:
ds = load_dataset(dataset_name, split=None)
# Check if metadata is accessible without loading full data
assert ds is not None, f"Failed to load metadata for {dataset_name}"
except Exception as e:
pytest.fail(f"Error loading metadata for {dataset_name}: {str(e)}")