feat(add toml configuration):

This commit is contained in:
Alexander Myasoedov
2025-01-04 19:20:52 +02:00
parent 6f8b942365
commit 1138b66852
3 changed files with 166 additions and 4 deletions
+3 -1
View File
@@ -5,6 +5,7 @@ import fire
import uvicorn
from agentic_security.app import app
from agentic_security.lib import AgenticSecurity
class T:
@@ -19,6 +20,7 @@ class T:
def headless(self):
sys.path.append(os.path.dirname("."))
AgenticSecurity().entrypoint()
def entrypoint():
@@ -30,4 +32,4 @@ def ci_entrypoint():
if __name__ == "__main__":
entrypoint()
ci_entrypoint()
+104 -1
View File
@@ -3,7 +3,9 @@ import json
from datetime import datetime
import colorama
import tomli
import tqdm.asyncio
from loguru import logger
from tabulate import tabulate
from agentic_security.models.schemas import Scan
@@ -19,7 +21,63 @@ YELLOW = colorama.Fore.YELLOW
BLUE = colorama.Fore.BLUE
class AgenticSecurity:
class CfgMixin:
config = {}
default_path = "agesec.toml"
def has_local_config(self):
try:
with open(self.default_path):
return True
except FileNotFoundError:
return False
@classmethod
def load_config(cls, config_path: str):
"""
Load configuration from a TOML file and store it in the class variable.
Args:
config_path (str): Path to the TOML configuration file.
Raises:
FileNotFoundError: If the configuration file is not found.
toml.TomlDecodeError: If the configuration file has syntax errors.
"""
try:
with open(config_path, "rb") as config_file:
cls.config = tomli.load(config_file)
logger.info(f"Configuration loaded successfully from {config_path}.")
except FileNotFoundError:
logger.error(f"Configuration file {config_path} not found.")
raise
except Exception as e:
logger.error(f"Error parsing TOML configuration: {e}")
raise
@classmethod
def get_config_value(cls, key: str, default=None):
"""
Retrieve a configuration value by key from the loaded configuration.
Args:
key (str): Dot-separated key path to the configuration value (e.g., 'general.maxBudget').
default: Default value if the key is not found.
Returns:
The configuration value if found, otherwise the default value.
"""
keys = key.split(".")
value = cls.config
for k in keys:
if isinstance(value, dict) and k in value:
value = value[k]
else:
return default
return value
class AgenticSecurity(CfgMixin):
@classmethod
async def async_scan(
cls,
@@ -188,3 +246,48 @@ class AgenticSecurity:
probe_datasets=probe_datasets,
)
)
def entrypoint(self):
# Load configuration from the default path
if not self.has_local_config():
print("`agesec.toml` configuration file not found.")
exit(1)
self.load_config(self.default_path)
print(f"Configuration: {self.config}")
def generate_default_cfg(self):
with open(self.default_path, "w") as f:
f.write(
"""
[general]
# General configuration for the security scan
llmSpec = \"""
POST http://0.0.0.0:9094/v1/self-probe
Authorization: Bearer XXXXX
Content-Type: application/json
{
"prompt": "<<PROMPT>>"
}
\""" # LLM API specification
maxBudget = 1000000 # Maximum budget for the scan
max_th = 0.3 # Maximum failure threshold (percentage)
optimize = false # Enable optimization during scanning
enableMultiStepAttack = false # Enable multi-step attack simulations
[modules.AgenticBackend]
dataset_name = "AgenticBackend"
[modules.AgenticBackend.opts]
port = 9094
modules = ["encoding"]
[thresholds]
# Threshold settings
low = 0.15
medium = 0.3
high = 0.5
"""
)
+59 -2
View File
@@ -1,11 +1,12 @@
import importlib
import os
import signal
import subprocess
import tempfile
import time
import pytest
import agentic_security.test_spec_assets as test_spec_assets
import pytest
from agentic_security.lib import AgenticSecurity
@@ -121,3 +122,59 @@ class TestAS:
assert isinstance(result, dict)
print(result)
assert len(result) in [0, 1]
class TestEntrypointCI:
def test_generate_default_cfg_to_tmp_path(self):
"""
Test that the `generate_default_cfg` method generates a valid default config file in a temporary path.
"""
# Create a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = os.path.join(tmpdir, "custom_agesec.toml")
# Override default_path to the temporary path
AgenticSecurity.default_path = temp_path
# Generate the default configuration
security = AgenticSecurity()
security.generate_default_cfg()
# Check that the config file was created at the temporary path
assert os.path.exists(temp_path), f"{temp_path} file should be generated."
# Validate the contents of the generated config file
with open(temp_path, "r") as f:
generated_content = f.read()
assert (
"maxBudget = 1000000" in generated_content
), "maxBudget should be 1000000"
def test_load_generated_tmp_config(self):
"""
Test that the configuration generated in a temporary path can be loaded successfully.
"""
# Create a temporary directory
with tempfile.TemporaryDirectory() as tmpdir:
temp_path = os.path.join(tmpdir, "custom_agesec.toml")
# Override default_path to the temporary path
AgenticSecurity.default_path = temp_path
# Generate the default configuration
security = AgenticSecurity()
security.generate_default_cfg()
# Load the generated configuration
AgenticSecurity.load_config(temp_path)
# Validate loaded configuration
config = AgenticSecurity.config
assert (
config["general"]["maxBudget"] == 1000000
), "maxBudget should be 1000000"
assert config["general"]["max_th"] == 0.3, "max_th should be 0.3"
assert (
config["modules"]["AgenticBackend"]["dataset_name"] == "AgenticBackend"
), "Dataset name should be 'AgenticBackend'"