mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-24 06:09:55 +02:00
Merge branch 'main' of github.com:msoedov/agentic_security
This commit is contained in:
@@ -4,10 +4,14 @@ from asyncio import Event, Queue
|
||||
from fastapi import FastAPI
|
||||
from fastapi.responses import ORJSONResponse
|
||||
|
||||
from agentic_security.http_spec import LLMSpec
|
||||
|
||||
tools_inbox: Queue = Queue()
|
||||
stop_event: Event = Event()
|
||||
current_run: str = {"spec": "", "id": ""}
|
||||
_secrets = {}
|
||||
_secrets: dict[str, str] = {}
|
||||
|
||||
current_run: dict[str, int | LLMSpec] = {"spec": "", "id": ""}
|
||||
|
||||
|
||||
def create_app() -> FastAPI:
|
||||
@@ -26,29 +30,29 @@ def get_stop_event() -> Event:
|
||||
return stop_event
|
||||
|
||||
|
||||
def get_current_run() -> str:
|
||||
def get_current_run() -> dict[str, int | LLMSpec]:
|
||||
"""Get the current run id."""
|
||||
return current_run
|
||||
|
||||
|
||||
def set_current_run(spec):
|
||||
def set_current_run(spec: LLMSpec) -> dict[str, int | LLMSpec]:
|
||||
"""Set the current run id."""
|
||||
current_run["id"] = hash(id(spec))
|
||||
current_run["spec"] = spec
|
||||
return current_run
|
||||
|
||||
|
||||
def get_secrets():
|
||||
def get_secrets() -> dict[str, str]:
|
||||
return _secrets
|
||||
|
||||
|
||||
def set_secrets(secrets):
|
||||
def set_secrets(secrets: dict[str, str]) -> dict[str, str]:
|
||||
_secrets.update(secrets)
|
||||
expand_secrets(_secrets)
|
||||
return _secrets
|
||||
|
||||
|
||||
def expand_secrets(secrets):
|
||||
def expand_secrets(secrets: dict[str, str]) -> None:
|
||||
for key in secrets:
|
||||
val = secrets[key]
|
||||
if val.startswith("$"):
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
|
||||
from pyfiglet import Figlet, FontNotFound
|
||||
from termcolor import colored
|
||||
|
||||
@@ -8,14 +9,14 @@ except ImportError:
|
||||
|
||||
|
||||
def generate_banner(
|
||||
title="Agentic Security",
|
||||
font="slant",
|
||||
version="v2.1.0",
|
||||
tagline="Proactive Threat Detection & Automated Security Protocols",
|
||||
author="Developed by: [Security Team]",
|
||||
website="Website: https://github.com/msoedov/agentic_security",
|
||||
warning="",
|
||||
):
|
||||
title: str = "Agentic Security",
|
||||
font: str = "slant",
|
||||
version: str = "v2.1.0",
|
||||
tagline: str = "Proactive Threat Detection & Automated Security Protocols",
|
||||
author: str = "Developed by: [Security Team]",
|
||||
website: str = "Website: https://github.com/msoedov/agentic_security",
|
||||
warning: str | None = "", # Using Optional for warning since it might be None
|
||||
) -> str:
|
||||
"""Generate a visually enhanced banner with dynamic width and borders."""
|
||||
# Define the text elements
|
||||
|
||||
|
||||
@@ -7,8 +7,10 @@ import pandas as pd
|
||||
from matplotlib.cm import ScalarMappable
|
||||
from matplotlib.colors import LinearSegmentedColormap, Normalize
|
||||
|
||||
from .primitives import Table
|
||||
|
||||
def plot_security_report(table):
|
||||
|
||||
def plot_security_report(table: Table) -> io.BytesIO:
|
||||
# Data preprocessing
|
||||
data = pd.DataFrame(table)
|
||||
|
||||
@@ -141,7 +143,7 @@ def plot_security_report(table):
|
||||
return buf
|
||||
|
||||
|
||||
def generate_identifiers(data):
|
||||
def generate_identifiers(data: pd.DataFrame) -> list[str]:
|
||||
data_length = len(data)
|
||||
alphabet = string.ascii_uppercase
|
||||
num_letters = len(alphabet)
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
from collections.abc import Generator
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from fastapi import (
|
||||
APIRouter,
|
||||
@@ -25,7 +27,7 @@ router = APIRouter()
|
||||
@router.post("/verify")
|
||||
async def verify(
|
||||
info: LLMInfo, secrets: InMemorySecrets = Depends(get_in_memory_secrets)
|
||||
):
|
||||
) -> dict[str, int | str | float]:
|
||||
spec = LLMSpec.from_string(info.spec)
|
||||
try:
|
||||
r = await spec.verify()
|
||||
@@ -43,7 +45,7 @@ async def verify(
|
||||
)
|
||||
|
||||
|
||||
def streaming_response_generator(scan_parameters: Scan):
|
||||
def streaming_response_generator(scan_parameters: Scan) -> Generator[str, Any, None]:
|
||||
request_factory = LLMSpec.from_string(scan_parameters.llmSpec)
|
||||
set_current_run(request_factory)
|
||||
|
||||
@@ -64,7 +66,7 @@ async def scan(
|
||||
scan_parameters: Scan,
|
||||
background_tasks: BackgroundTasks,
|
||||
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
|
||||
):
|
||||
) -> StreamingResponse:
|
||||
scan_parameters.with_secrets(secrets)
|
||||
return StreamingResponse(
|
||||
streaming_response_generator(scan_parameters), media_type="application/json"
|
||||
@@ -72,7 +74,7 @@ async def scan(
|
||||
|
||||
|
||||
@router.post("/stop")
|
||||
async def stop_scan():
|
||||
async def stop_scan() -> dict[str, str]:
|
||||
get_stop_event().set()
|
||||
return {"status": "Scan stopped"}
|
||||
|
||||
@@ -86,7 +88,7 @@ async def scan_csv(
|
||||
maxBudget: int = Query(10_000),
|
||||
enableMultiStepAttack: bool = Query(False),
|
||||
secrets: InMemorySecrets = Depends(get_in_memory_secrets),
|
||||
):
|
||||
) -> StreamingResponse:
|
||||
# TODO: content dataset to fuzzer
|
||||
content = await file.read() # noqa
|
||||
llm_spec = await llmSpec.read()
|
||||
|
||||
Generated
+36
-2
@@ -1,4 +1,4 @@
|
||||
# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand.
|
||||
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand.
|
||||
|
||||
[[package]]
|
||||
name = "aiohappyeyeballs"
|
||||
@@ -786,6 +786,20 @@ files = [
|
||||
{file = "distlib-0.3.8.tar.gz", hash = "sha256:1530ea13e350031b6312d8580ddb6b27a104275a31106523b8f123787f494f64"},
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "execnet"
|
||||
version = "2.1.1"
|
||||
description = "execnet: rapid multi-Python deployment"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "execnet-2.1.1-py3-none-any.whl", hash = "sha256:26dee51f1b80cebd6d0ca8e74dd8745419761d3bef34163928cbebbdc4749fdc"},
|
||||
{file = "execnet-2.1.1.tar.gz", hash = "sha256:5189b52c6121c24feae288166ab41b32549c7e2348652736540b9e6e7d4e72e3"},
|
||||
]
|
||||
|
||||
[package.extras]
|
||||
testing = ["hatch", "pre-commit", "pytest", "tox"]
|
||||
|
||||
[[package]]
|
||||
name = "executing"
|
||||
version = "2.2.0"
|
||||
@@ -3244,6 +3258,26 @@ pytest = ">=6.2.5"
|
||||
[package.extras]
|
||||
dev = ["pre-commit", "pytest-asyncio", "tox"]
|
||||
|
||||
[[package]]
|
||||
name = "pytest-xdist"
|
||||
version = "3.6.1"
|
||||
description = "pytest xdist plugin for distributed testing, most importantly across multiple CPUs"
|
||||
optional = false
|
||||
python-versions = ">=3.8"
|
||||
files = [
|
||||
{file = "pytest_xdist-3.6.1-py3-none-any.whl", hash = "sha256:9ed4adfb68a016610848639bb7e02c9352d5d9f03d04809919e2dafc3be4cca7"},
|
||||
{file = "pytest_xdist-3.6.1.tar.gz", hash = "sha256:ead156a4db231eec769737f57668ef58a2084a34b2e55c4a8fa20d861107300d"},
|
||||
]
|
||||
|
||||
[package.dependencies]
|
||||
execnet = ">=2.1"
|
||||
pytest = ">=7.0.0"
|
||||
|
||||
[package.extras]
|
||||
psutil = ["psutil (>=3.0)"]
|
||||
setproctitle = ["setproctitle"]
|
||||
testing = ["filelock"]
|
||||
|
||||
[[package]]
|
||||
name = "python-dateutil"
|
||||
version = "2.9.0.post0"
|
||||
@@ -4447,4 +4481,4 @@ propcache = ">=0.2.0"
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.11"
|
||||
content-hash = "28a2b74bfafa9f93d14d2f8d1fcaffa340db212acce6469d6714d342203ad77f"
|
||||
content-hash = "35e03dba41d30cf6129a4a4f3107eca560f779205b21d3ffb2871eeffc5d5a64"
|
||||
|
||||
+2
-1
@@ -54,6 +54,7 @@ pyfiglet = "^1.0.2"
|
||||
termcolor = "^2.4.0"
|
||||
|
||||
# garak = { version = "*", optional = true }
|
||||
pytest-xdist = "3.6.1"
|
||||
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
@@ -86,7 +87,7 @@ build-backend = "poetry.core.masonry.api"
|
||||
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "--durations=5 -m 'not slow'"
|
||||
addopts = "--durations=5 -m 'not slow' -n auto"
|
||||
asyncio_mode = "auto"
|
||||
asyncio_default_fixture_loop_scope = "function"
|
||||
markers = "slow: marks tests as slow"
|
||||
|
||||
Reference in New Issue
Block a user