feat(add $VAR expansion from config):

This commit is contained in:
Alexander Myasoedov
2025-02-20 23:26:49 +02:00
parent ba36dcd02f
commit 701c175469
6 changed files with 61 additions and 1 deletions
+4
View File
@@ -105,6 +105,10 @@ low = 0.15
medium = 0.3
high = 0.5
[secrets]
# Secrets for the security scan from environment variables
OPENAI_API_KEY = "$OPENAI_API_KEY"
DEEPSEEK_API_KEY = "$DEEPSEEK_API_KEY"
""".replace(
"$HOST", host
+19
View File
@@ -1,3 +1,4 @@
import os
from asyncio import Event, Queue
from fastapi import FastAPI
@@ -5,6 +6,7 @@ from fastapi import FastAPI
tools_inbox: Queue = Queue()
stop_event: Event = Event()
current_run: str = {"spec": "", "id": ""}
_secrets = {}
def create_app() -> FastAPI:
@@ -33,3 +35,20 @@ def set_current_run(spec):
current_run["id"] = hash(id(spec))
current_run["spec"] = spec
return current_run
def get_secrets():
return _secrets
def set_secrets(secrets):
_secrets.update(secrets)
expand_secrets(_secrets)
return _secrets
def expand_secrets(secrets):
for key in secrets:
val = secrets[key]
if val.startswith("$"):
secrets[key] = os.getenv(val.strip("$"))
+27
View File
@@ -0,0 +1,27 @@
import os
import pytest
from agentic_security.core.app import expand_secrets
@pytest.fixture(autouse=True)
def setup_env_vars():
# Set up environment variables for testing
os.environ["TEST_ENV_VAR"] = "test_value"
def test_expand_secrets_with_env_var():
secrets = {"secret_key": "$TEST_ENV_VAR"}
expand_secrets(secrets)
assert secrets["secret_key"] == "test_value"
def test_expand_secrets_without_env_var():
secrets = {"secret_key": "$NON_EXISTENT_VAR"}
expand_secrets(secrets)
assert secrets["secret_key"] is None
def test_expand_secrets_without_dollar_sign():
secrets = {"secret_key": "plain_value"}
expand_secrets(secrets)
assert secrets["secret_key"] == "plain_value"
+2
View File
@@ -1,4 +1,5 @@
from agentic_security.config import CfgMixin
from agentic_security.core.app import set_secrets
class InMemorySecrets:
@@ -7,6 +8,7 @@ class InMemorySecrets:
self.config = CfgMixin()
self.config.get_or_create_config()
self.secrets = self.config.config.get("secrets", {})
set_secrets(self.secrets)
def set_secret(self, key: str, value: str):
self.secrets[key] = value
+8
View File
@@ -138,6 +138,9 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
Returns:
LLMSpec: An object representing the parsed HTTP specification, with attributes for the method, URL, headers, and body.
"""
from agentic_security.core.app import get_secrets
secrets = get_secrets()
# Split the spec by lines
lines = http_spec.strip().split("\n")
@@ -164,6 +167,11 @@ def parse_http_spec(http_spec: str) -> LLMSpec:
has_files = "multipart/form-data" in headers.get("Content-Type", "")
has_image = "<<BASE64_IMAGE>>" in body
has_audio = "<<BASE64_AUDIO>>" in body
for key, value in secrets.items():
key = key.strip("$")
body = body.replace(f"${key}", value)
return LLMSpec(
method=method,
url=url,
+1 -1
View File
@@ -1,8 +1,8 @@
import sentry_sdk
from loguru import logger
from sentry_sdk.integrations.logging import ignore_logger
from ..models.schemas import Settings
from loguru import logger
def setup(app):