mirror of
https://github.com/msoedov/agentic_security.git
synced 2026-06-23 21:59:57 +02:00
Merge pull request #321 from DevamShah/config-pluggable-detectors
feat: config-pluggable refusal classifiers and leak detectors
This commit is contained in:
@@ -123,6 +123,23 @@ port = $PORT
|
||||
modules = ["encoding"]
|
||||
|
||||
|
||||
[detectors]
|
||||
# Refusal classifiers and leak detectors applied to each model response.
|
||||
# Toggle a built-in by name, or register a custom plugin that implements
|
||||
# is_refusal(response) -> bool. Built-ins: default, ml_classifier, pii,
|
||||
# sandbox_escape.
|
||||
default = true # phrase-based refusal classifier
|
||||
ml_classifier = true # ML one-class SVM refusal classifier
|
||||
pii = false # PII / credential leak detector
|
||||
sandbox_escape = false # Docker/K8s sandbox-escape probe detector
|
||||
|
||||
# Register a custom detector from an importable class:
|
||||
# [detectors.infra_fingerprint]
|
||||
# class = "my_package.detectors:InfraFingerprintDetector"
|
||||
# enabled = true
|
||||
# [detectors.infra_fingerprint.options]
|
||||
# threshold = 3
|
||||
|
||||
[thresholds]
|
||||
# Threshold settings
|
||||
low = 0.15
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from agentic_security.config import settings_var
|
||||
from agentic_security.refusal_classifier.model import RefusalClassifier
|
||||
from agentic_security.refusal_classifier.pii_detector import PIIDetector
|
||||
from agentic_security.refusal_classifier.registry import registry
|
||||
from agentic_security.refusal_classifier.sandbox_escape_detector import (
|
||||
SandboxEscapeDetector,
|
||||
)
|
||||
@@ -101,10 +103,37 @@ class RefusalClassifierManager:
|
||||
return any(plugin.is_refusal(response) for plugin in self.plugins.values())
|
||||
|
||||
|
||||
# Initialize the plugin manager and register the default refusal detectors.
|
||||
refusal_classifier_manager = RefusalClassifierManager()
|
||||
refusal_classifier_manager.register_plugin("default", DefaultRefusalClassifier())
|
||||
refusal_classifier_manager.register_plugin("ml_classifier", classifier)
|
||||
# Register the built-in detectors that depend on this module. ``pii`` and
|
||||
# ``sandbox_escape`` are registered by the registry module itself; ``default``
|
||||
# and ``ml_classifier`` live here so the trained model is not imported eagerly
|
||||
# by the registry.
|
||||
registry.register("default", DefaultRefusalClassifier, default_enabled=True)
|
||||
registry.register("ml_classifier", lambda: classifier, default_enabled=True)
|
||||
|
||||
|
||||
def build_refusal_manager(config=None) -> RefusalClassifierManager:
|
||||
"""Build a refusal manager from the ``[detectors]`` configuration.
|
||||
|
||||
Args:
|
||||
config: Parsed ``[detectors]`` table. When ``None``, the section is read
|
||||
from ``agentic_security.toml`` via :func:`settings_var`. Absent
|
||||
configuration preserves the historical default of running the
|
||||
``default`` and ``ml_classifier`` plugins.
|
||||
|
||||
Returns:
|
||||
RefusalClassifierManager: Manager populated with the enabled detectors.
|
||||
"""
|
||||
if config is None:
|
||||
config = settings_var("detectors", None)
|
||||
manager = RefusalClassifierManager()
|
||||
for name, plugin in registry.build_from_config(config).items():
|
||||
manager.register_plugin(name, plugin)
|
||||
return manager
|
||||
|
||||
|
||||
# Initialize the plugin manager from configuration (defaults to the built-in
|
||||
# ``default`` and ``ml_classifier`` detectors when ``[detectors]`` is absent).
|
||||
refusal_classifier_manager = build_refusal_manager()
|
||||
pii_detector = PIIDetector()
|
||||
sandbox_escape_detector = SandboxEscapeDetector()
|
||||
|
||||
|
||||
@@ -0,0 +1,233 @@
|
||||
"""Config-driven registry for refusal classifiers and leak detectors.
|
||||
|
||||
The registry maps a plugin *name* to a zero-argument *factory* that builds a
|
||||
detector. A detector is any object exposing ``is_refusal(response) -> bool``
|
||||
(the :class:`~agentic_security.probe_actor.refusal.RefusalClassifierPlugin`
|
||||
contract). This lets users enable, disable, or add custom detectors through the
|
||||
``[detectors]`` section of ``agentic_security.toml`` instead of editing source.
|
||||
|
||||
Built-in names registered here: ``pii`` and ``sandbox_escape``. The phrase-based
|
||||
``default`` classifier and the ML ``ml_classifier`` are registered by
|
||||
:mod:`agentic_security.probe_actor.refusal` to avoid importing the trained model
|
||||
eagerly.
|
||||
|
||||
Example configuration::
|
||||
|
||||
[detectors]
|
||||
default = true # phrase-based refusal classifier
|
||||
ml_classifier = true # ML one-class SVM refusal classifier
|
||||
pii = true # enable the PII / credential leak detector
|
||||
sandbox_escape = false # keep the sandbox-escape detector off
|
||||
|
||||
[detectors.infra_fingerprint]
|
||||
class = "my_package.detectors:InfraFingerprintDetector"
|
||||
enabled = true
|
||||
|
||||
[detectors.infra_fingerprint.options]
|
||||
threshold = 3
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib
|
||||
from collections import OrderedDict
|
||||
from collections.abc import Callable, Mapping
|
||||
from typing import Protocol, runtime_checkable
|
||||
|
||||
from agentic_security.logutils import logger
|
||||
|
||||
__all__ = [
|
||||
"Detector",
|
||||
"DetectorFactory",
|
||||
"DetectorRegistry",
|
||||
"load_plugin_class",
|
||||
"registry",
|
||||
]
|
||||
|
||||
|
||||
@runtime_checkable
|
||||
class Detector(Protocol):
|
||||
"""Structural type for detector and refusal-classifier plugins."""
|
||||
|
||||
def is_refusal(self, response: str) -> bool: ...
|
||||
|
||||
|
||||
DetectorFactory = Callable[[], Detector]
|
||||
|
||||
|
||||
def load_plugin_class(path: str) -> Callable[..., Detector]:
|
||||
"""Import a detector class from a dotted path.
|
||||
|
||||
Args:
|
||||
path: Import path in either ``"package.module:ClassName"`` or
|
||||
``"package.module.ClassName"`` form.
|
||||
|
||||
Returns:
|
||||
The referenced class (or any callable that builds a detector).
|
||||
|
||||
Raises:
|
||||
ValueError: If ``path`` is not a valid ``module``/``attribute`` pair.
|
||||
ImportError: If the module or attribute cannot be imported.
|
||||
TypeError: If the resolved attribute is not callable.
|
||||
"""
|
||||
if ":" in path:
|
||||
module_name, _, attribute = path.partition(":")
|
||||
else:
|
||||
module_name, _, attribute = path.rpartition(".")
|
||||
|
||||
if not module_name or not attribute:
|
||||
raise ValueError(
|
||||
f"Invalid detector class path {path!r}; "
|
||||
"expected 'package.module:ClassName'."
|
||||
)
|
||||
|
||||
module = importlib.import_module(module_name)
|
||||
try:
|
||||
obj = getattr(module, attribute)
|
||||
except AttributeError as exc:
|
||||
raise ImportError(
|
||||
f"Detector class path {path!r} is invalid: "
|
||||
f"module {module_name!r} has no attribute {attribute!r}."
|
||||
) from exc
|
||||
|
||||
if not callable(obj):
|
||||
raise TypeError(f"Detector class path {path!r} does not resolve to a callable.")
|
||||
return obj
|
||||
|
||||
|
||||
class DetectorRegistry:
|
||||
"""Registry of named detector factories with config-driven assembly.
|
||||
|
||||
Args:
|
||||
default_enabled: Mapping of built-in plugin names to whether they are
|
||||
active when the ``[detectors]`` config section is absent. This keeps
|
||||
backward-compatible behaviour: only ``default`` and ``ml_classifier``
|
||||
participate in :func:`refusal_heuristic` unless explicitly enabled.
|
||||
"""
|
||||
|
||||
def __init__(self, default_enabled: Mapping[str, bool] | None = None):
|
||||
self._factories: OrderedDict[str, DetectorFactory] = OrderedDict()
|
||||
self._default_enabled: dict[str, bool] = dict(default_enabled or {})
|
||||
|
||||
def register(
|
||||
self,
|
||||
name: str,
|
||||
factory: DetectorFactory,
|
||||
*,
|
||||
default_enabled: bool | None = None,
|
||||
) -> None:
|
||||
"""Register (or override) a detector factory.
|
||||
|
||||
Args:
|
||||
name: Unique plugin name used as the ``[detectors]`` config key.
|
||||
factory: Zero-argument callable returning a detector instance.
|
||||
default_enabled: When provided, sets whether the plugin is active by
|
||||
default if the config does not mention it.
|
||||
"""
|
||||
if not callable(factory):
|
||||
raise TypeError(f"Detector factory for {name!r} must be callable.")
|
||||
self._factories[name] = factory
|
||||
if default_enabled is not None:
|
||||
self._default_enabled[name] = default_enabled
|
||||
|
||||
def unregister(self, name: str) -> None:
|
||||
"""Remove a registered plugin if present."""
|
||||
self._factories.pop(name, None)
|
||||
self._default_enabled.pop(name, None)
|
||||
|
||||
def is_registered(self, name: str) -> bool:
|
||||
"""Return True if ``name`` is registered."""
|
||||
return name in self._factories
|
||||
|
||||
def available(self) -> list[str]:
|
||||
"""Return the names of all registered plugins."""
|
||||
return list(self._factories)
|
||||
|
||||
def build_from_config(
|
||||
self, config: Mapping[str, object] | None = None
|
||||
) -> OrderedDict[str, Detector]:
|
||||
"""Build the enabled detectors described by a ``[detectors]`` config.
|
||||
|
||||
Args:
|
||||
config: The parsed ``[detectors]`` table. ``None`` or an empty
|
||||
mapping yields the built-in defaults.
|
||||
|
||||
Returns:
|
||||
Ordered mapping of plugin name to detector instance, in registration
|
||||
order followed by any custom plugins.
|
||||
|
||||
Raises:
|
||||
KeyError: If an enabled name is neither registered nor given a
|
||||
``class`` import path.
|
||||
TypeError: If a config value has an unsupported type or a built
|
||||
detector does not implement ``is_refusal``.
|
||||
"""
|
||||
config = config or {}
|
||||
enabled: OrderedDict[str, bool] = OrderedDict(self._default_enabled)
|
||||
|
||||
for name, spec in config.items():
|
||||
if isinstance(spec, bool):
|
||||
if not self.is_registered(name):
|
||||
raise KeyError(
|
||||
f"Unknown detector {name!r}; register it or provide a "
|
||||
"'class' import path."
|
||||
)
|
||||
enabled[name] = spec
|
||||
elif isinstance(spec, Mapping):
|
||||
class_path = spec.get("class")
|
||||
if class_path is not None:
|
||||
options = dict(spec.get("options") or {})
|
||||
self.register(name, self._factory_from_path(class_path, options))
|
||||
elif not self.is_registered(name):
|
||||
raise KeyError(
|
||||
f"Unknown detector {name!r}; provide a 'class' import path."
|
||||
)
|
||||
enabled[name] = bool(spec.get("enabled", True))
|
||||
else:
|
||||
raise TypeError(
|
||||
f"Detector config for {name!r} must be a bool or a table, "
|
||||
f"got {type(spec).__name__}."
|
||||
)
|
||||
|
||||
detectors: OrderedDict[str, Detector] = OrderedDict()
|
||||
for name, is_on in enabled.items():
|
||||
if not is_on:
|
||||
continue
|
||||
detector = self._factories[name]()
|
||||
if not callable(getattr(detector, "is_refusal", None)):
|
||||
raise TypeError(
|
||||
f"Detector {name!r} does not implement is_refusal(response)."
|
||||
)
|
||||
detectors[name] = detector
|
||||
logger.debug(f"Detector plugin enabled: {name}")
|
||||
return detectors
|
||||
|
||||
@staticmethod
|
||||
def _factory_from_path(class_path: str, options: dict) -> DetectorFactory:
|
||||
cls = load_plugin_class(class_path)
|
||||
return lambda: cls(**options)
|
||||
|
||||
|
||||
def _build_pii_detector() -> Detector:
|
||||
from agentic_security.refusal_classifier.pii_detector import PIIDetector
|
||||
|
||||
return PIIDetector()
|
||||
|
||||
|
||||
def _build_sandbox_escape_detector() -> Detector:
|
||||
from agentic_security.refusal_classifier.sandbox_escape_detector import (
|
||||
SandboxEscapeDetector,
|
||||
)
|
||||
|
||||
return SandboxEscapeDetector()
|
||||
|
||||
|
||||
# Global registry. ``default`` and ``ml_classifier`` are registered by
|
||||
# agentic_security.probe_actor.refusal so the trained model is not imported here.
|
||||
# The leak detectors are registered disabled by default to preserve the
|
||||
# historical behaviour of refusal_heuristic (markers + ML classifier only).
|
||||
registry = DetectorRegistry()
|
||||
registry.register("pii", _build_pii_detector, default_enabled=False)
|
||||
registry.register(
|
||||
"sandbox_escape", _build_sandbox_escape_detector, default_enabled=False
|
||||
)
|
||||
@@ -0,0 +1,48 @@
|
||||
from agentic_security.probe_actor.refusal import (
|
||||
build_refusal_manager,
|
||||
refusal_classifier_manager,
|
||||
)
|
||||
|
||||
|
||||
class TestBuildRefusalManager:
|
||||
def test_default_config_preserves_legacy_plugins(self):
|
||||
manager = build_refusal_manager({})
|
||||
|
||||
assert set(manager.plugins) == {"default", "ml_classifier"}
|
||||
|
||||
def test_module_manager_matches_default(self):
|
||||
assert set(refusal_classifier_manager.plugins) == {"default", "ml_classifier"}
|
||||
|
||||
def test_pii_can_be_enabled_via_config(self):
|
||||
manager = build_refusal_manager(
|
||||
{"default": True, "ml_classifier": False, "pii": True}
|
||||
)
|
||||
|
||||
assert set(manager.plugins) == {"default", "pii"}
|
||||
assert manager.is_refusal("my ssn is 123-45-6789")
|
||||
|
||||
def test_sandbox_escape_can_be_enabled_via_config(self):
|
||||
manager = build_refusal_manager(
|
||||
{"default": False, "ml_classifier": False, "sandbox_escape": True}
|
||||
)
|
||||
|
||||
assert set(manager.plugins) == {"sandbox_escape"}
|
||||
assert manager.is_refusal("ls -la /var/run/docker.sock")
|
||||
assert not manager.is_refusal("how do I bake bread?")
|
||||
|
||||
def test_custom_detector_via_class_path(self):
|
||||
manager = build_refusal_manager(
|
||||
{
|
||||
"default": False,
|
||||
"ml_classifier": False,
|
||||
"infra_fingerprint": {
|
||||
"class": (
|
||||
"agentic_security.refusal_classifier."
|
||||
"sandbox_escape_detector:SandboxEscapeDetector"
|
||||
),
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert set(manager.plugins) == {"infra_fingerprint"}
|
||||
assert manager.is_refusal("kubectl get pods")
|
||||
@@ -0,0 +1,160 @@
|
||||
import pytest
|
||||
|
||||
from agentic_security.refusal_classifier.registry import (
|
||||
DetectorRegistry,
|
||||
load_plugin_class,
|
||||
registry,
|
||||
)
|
||||
|
||||
|
||||
class StubDetector:
|
||||
"""Minimal detector honouring the is_refusal contract."""
|
||||
|
||||
def __init__(self, verdict: bool = True):
|
||||
self.verdict = verdict
|
||||
|
||||
def is_refusal(self, response: str) -> bool:
|
||||
return self.verdict
|
||||
|
||||
|
||||
class NotADetector:
|
||||
"""Object that is missing the is_refusal method."""
|
||||
|
||||
|
||||
def _fresh_registry() -> DetectorRegistry:
|
||||
reg = DetectorRegistry(default_enabled={"refuser": True, "allower": False})
|
||||
reg.register("refuser", lambda: StubDetector(True))
|
||||
reg.register("allower", lambda: StubDetector(False))
|
||||
return reg
|
||||
|
||||
|
||||
class TestLoadPluginClass:
|
||||
def test_loads_with_colon_form(self):
|
||||
cls = load_plugin_class(
|
||||
"agentic_security.refusal_classifier.pii_detector:PIIDetector"
|
||||
)
|
||||
assert cls.__name__ == "PIIDetector"
|
||||
|
||||
def test_loads_with_dotted_form(self):
|
||||
cls = load_plugin_class(
|
||||
"agentic_security.refusal_classifier.pii_detector.PIIDetector"
|
||||
)
|
||||
assert cls.__name__ == "PIIDetector"
|
||||
|
||||
def test_invalid_path_raises_value_error(self):
|
||||
with pytest.raises(ValueError):
|
||||
load_plugin_class("PIIDetector")
|
||||
|
||||
def test_missing_attribute_raises_import_error(self):
|
||||
with pytest.raises(ImportError):
|
||||
load_plugin_class(
|
||||
"agentic_security.refusal_classifier.pii_detector:DoesNotExist"
|
||||
)
|
||||
|
||||
|
||||
class TestDetectorRegistry:
|
||||
def test_register_and_introspection(self):
|
||||
reg = _fresh_registry()
|
||||
|
||||
assert reg.is_registered("refuser")
|
||||
assert not reg.is_registered("missing")
|
||||
assert set(reg.available()) == {"refuser", "allower"}
|
||||
|
||||
def test_unregister(self):
|
||||
reg = _fresh_registry()
|
||||
reg.unregister("allower")
|
||||
|
||||
assert not reg.is_registered("allower")
|
||||
assert reg.build_from_config({}).keys() == {"refuser"}
|
||||
|
||||
def test_register_rejects_non_callable(self):
|
||||
reg = DetectorRegistry()
|
||||
with pytest.raises(TypeError):
|
||||
reg.register("bad", object())
|
||||
|
||||
def test_default_enabled_applied_without_config(self):
|
||||
reg = _fresh_registry()
|
||||
|
||||
detectors = reg.build_from_config(None)
|
||||
|
||||
assert list(detectors) == ["refuser"] # allower defaults off
|
||||
|
||||
def test_bool_toggles_enable_and_disable(self):
|
||||
reg = _fresh_registry()
|
||||
|
||||
detectors = reg.build_from_config({"refuser": False, "allower": True})
|
||||
|
||||
assert list(detectors) == ["allower"]
|
||||
|
||||
def test_unknown_bool_name_raises(self):
|
||||
reg = _fresh_registry()
|
||||
with pytest.raises(KeyError):
|
||||
reg.build_from_config({"ghost": True})
|
||||
|
||||
def test_invalid_spec_type_raises(self):
|
||||
reg = _fresh_registry()
|
||||
with pytest.raises(TypeError):
|
||||
reg.build_from_config({"refuser": 1})
|
||||
|
||||
def test_custom_plugin_registered_from_class_path(self):
|
||||
reg = _fresh_registry()
|
||||
|
||||
detectors = reg.build_from_config(
|
||||
{
|
||||
"refuser": False,
|
||||
"pii_leak": {
|
||||
"class": (
|
||||
"agentic_security.refusal_classifier."
|
||||
"pii_detector:PIIDetector"
|
||||
),
|
||||
"options": {"detect_credit_cards": False},
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
assert list(detectors) == ["pii_leak"]
|
||||
assert detectors["pii_leak"].is_refusal("email me at a@b.com")
|
||||
# options propagated: credit-card detection disabled
|
||||
assert not detectors["pii_leak"].detect_credit_cards
|
||||
|
||||
def test_custom_plugin_can_be_disabled(self):
|
||||
reg = _fresh_registry()
|
||||
|
||||
detectors = reg.build_from_config(
|
||||
{
|
||||
"pii_leak": {
|
||||
"class": (
|
||||
"agentic_security.refusal_classifier."
|
||||
"pii_detector:PIIDetector"
|
||||
),
|
||||
"enabled": False,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
assert "pii_leak" not in detectors
|
||||
|
||||
def test_table_without_class_for_unknown_name_raises(self):
|
||||
reg = _fresh_registry()
|
||||
with pytest.raises(KeyError):
|
||||
reg.build_from_config({"ghost": {"enabled": True}})
|
||||
|
||||
def test_detector_missing_is_refusal_raises(self):
|
||||
reg = DetectorRegistry(default_enabled={"broken": True})
|
||||
reg.register("broken", NotADetector)
|
||||
with pytest.raises(TypeError):
|
||||
reg.build_from_config({})
|
||||
|
||||
|
||||
class TestGlobalRegistry:
|
||||
def test_builtin_leak_detectors_registered(self):
|
||||
assert registry.is_registered("pii")
|
||||
assert registry.is_registered("sandbox_escape")
|
||||
|
||||
def test_pii_builtin_builds_and_detects(self):
|
||||
detectors = registry.build_from_config({"pii": True})
|
||||
assert detectors["pii"].is_refusal("contact me at jane@example.com")
|
||||
|
||||
def test_sandbox_escape_builtin_builds_and_detects(self):
|
||||
detectors = registry.build_from_config({"sandbox_escape": True})
|
||||
assert detectors["sandbox_escape"].is_refusal("ls -la /var/run/docker.sock")
|
||||
Reference in New Issue
Block a user