diff --git a/agentic_security/config.py b/agentic_security/config.py index 5ab2dc0..a26f952 100644 --- a/agentic_security/config.py +++ b/agentic_security/config.py @@ -123,6 +123,23 @@ port = $PORT modules = ["encoding"] +[detectors] +# Refusal classifiers and leak detectors applied to each model response. +# Toggle a built-in by name, or register a custom plugin that implements +# is_refusal(response) -> bool. Built-ins: default, ml_classifier, pii, +# sandbox_escape. +default = true # phrase-based refusal classifier +ml_classifier = true # ML one-class SVM refusal classifier +pii = false # PII / credential leak detector +sandbox_escape = false # Docker/K8s sandbox-escape probe detector + +# Register a custom detector from an importable class: +# [detectors.infra_fingerprint] +# class = "my_package.detectors:InfraFingerprintDetector" +# enabled = true +# [detectors.infra_fingerprint.options] +# threshold = 3 + [thresholds] # Threshold settings low = 0.15 diff --git a/agentic_security/probe_actor/refusal.py b/agentic_security/probe_actor/refusal.py index d60c8ba..a40f562 100644 --- a/agentic_security/probe_actor/refusal.py +++ b/agentic_security/probe_actor/refusal.py @@ -1,7 +1,9 @@ from abc import ABC, abstractmethod +from agentic_security.config import settings_var from agentic_security.refusal_classifier.model import RefusalClassifier from agentic_security.refusal_classifier.pii_detector import PIIDetector +from agentic_security.refusal_classifier.registry import registry from agentic_security.refusal_classifier.sandbox_escape_detector import ( SandboxEscapeDetector, ) @@ -101,10 +103,37 @@ class RefusalClassifierManager: return any(plugin.is_refusal(response) for plugin in self.plugins.values()) -# Initialize the plugin manager and register the default refusal detectors. -refusal_classifier_manager = RefusalClassifierManager() -refusal_classifier_manager.register_plugin("default", DefaultRefusalClassifier()) -refusal_classifier_manager.register_plugin("ml_classifier", classifier) +# Register the built-in detectors that depend on this module. ``pii`` and +# ``sandbox_escape`` are registered by the registry module itself; ``default`` +# and ``ml_classifier`` live here so the trained model is not imported eagerly +# by the registry. +registry.register("default", DefaultRefusalClassifier, default_enabled=True) +registry.register("ml_classifier", lambda: classifier, default_enabled=True) + + +def build_refusal_manager(config=None) -> RefusalClassifierManager: + """Build a refusal manager from the ``[detectors]`` configuration. + + Args: + config: Parsed ``[detectors]`` table. When ``None``, the section is read + from ``agentic_security.toml`` via :func:`settings_var`. Absent + configuration preserves the historical default of running the + ``default`` and ``ml_classifier`` plugins. + + Returns: + RefusalClassifierManager: Manager populated with the enabled detectors. + """ + if config is None: + config = settings_var("detectors", None) + manager = RefusalClassifierManager() + for name, plugin in registry.build_from_config(config).items(): + manager.register_plugin(name, plugin) + return manager + + +# Initialize the plugin manager from configuration (defaults to the built-in +# ``default`` and ``ml_classifier`` detectors when ``[detectors]`` is absent). +refusal_classifier_manager = build_refusal_manager() pii_detector = PIIDetector() sandbox_escape_detector = SandboxEscapeDetector() diff --git a/agentic_security/refusal_classifier/registry.py b/agentic_security/refusal_classifier/registry.py new file mode 100644 index 0000000..b425946 --- /dev/null +++ b/agentic_security/refusal_classifier/registry.py @@ -0,0 +1,233 @@ +"""Config-driven registry for refusal classifiers and leak detectors. + +The registry maps a plugin *name* to a zero-argument *factory* that builds a +detector. A detector is any object exposing ``is_refusal(response) -> bool`` +(the :class:`~agentic_security.probe_actor.refusal.RefusalClassifierPlugin` +contract). This lets users enable, disable, or add custom detectors through the +``[detectors]`` section of ``agentic_security.toml`` instead of editing source. + +Built-in names registered here: ``pii`` and ``sandbox_escape``. The phrase-based +``default`` classifier and the ML ``ml_classifier`` are registered by +:mod:`agentic_security.probe_actor.refusal` to avoid importing the trained model +eagerly. + +Example configuration:: + + [detectors] + default = true # phrase-based refusal classifier + ml_classifier = true # ML one-class SVM refusal classifier + pii = true # enable the PII / credential leak detector + sandbox_escape = false # keep the sandbox-escape detector off + + [detectors.infra_fingerprint] + class = "my_package.detectors:InfraFingerprintDetector" + enabled = true + + [detectors.infra_fingerprint.options] + threshold = 3 +""" + +from __future__ import annotations + +import importlib +from collections import OrderedDict +from collections.abc import Callable, Mapping +from typing import Protocol, runtime_checkable + +from agentic_security.logutils import logger + +__all__ = [ + "Detector", + "DetectorFactory", + "DetectorRegistry", + "load_plugin_class", + "registry", +] + + +@runtime_checkable +class Detector(Protocol): + """Structural type for detector and refusal-classifier plugins.""" + + def is_refusal(self, response: str) -> bool: ... + + +DetectorFactory = Callable[[], Detector] + + +def load_plugin_class(path: str) -> Callable[..., Detector]: + """Import a detector class from a dotted path. + + Args: + path: Import path in either ``"package.module:ClassName"`` or + ``"package.module.ClassName"`` form. + + Returns: + The referenced class (or any callable that builds a detector). + + Raises: + ValueError: If ``path`` is not a valid ``module``/``attribute`` pair. + ImportError: If the module or attribute cannot be imported. + TypeError: If the resolved attribute is not callable. + """ + if ":" in path: + module_name, _, attribute = path.partition(":") + else: + module_name, _, attribute = path.rpartition(".") + + if not module_name or not attribute: + raise ValueError( + f"Invalid detector class path {path!r}; " + "expected 'package.module:ClassName'." + ) + + module = importlib.import_module(module_name) + try: + obj = getattr(module, attribute) + except AttributeError as exc: + raise ImportError( + f"Detector class path {path!r} is invalid: " + f"module {module_name!r} has no attribute {attribute!r}." + ) from exc + + if not callable(obj): + raise TypeError(f"Detector class path {path!r} does not resolve to a callable.") + return obj + + +class DetectorRegistry: + """Registry of named detector factories with config-driven assembly. + + Args: + default_enabled: Mapping of built-in plugin names to whether they are + active when the ``[detectors]`` config section is absent. This keeps + backward-compatible behaviour: only ``default`` and ``ml_classifier`` + participate in :func:`refusal_heuristic` unless explicitly enabled. + """ + + def __init__(self, default_enabled: Mapping[str, bool] | None = None): + self._factories: OrderedDict[str, DetectorFactory] = OrderedDict() + self._default_enabled: dict[str, bool] = dict(default_enabled or {}) + + def register( + self, + name: str, + factory: DetectorFactory, + *, + default_enabled: bool | None = None, + ) -> None: + """Register (or override) a detector factory. + + Args: + name: Unique plugin name used as the ``[detectors]`` config key. + factory: Zero-argument callable returning a detector instance. + default_enabled: When provided, sets whether the plugin is active by + default if the config does not mention it. + """ + if not callable(factory): + raise TypeError(f"Detector factory for {name!r} must be callable.") + self._factories[name] = factory + if default_enabled is not None: + self._default_enabled[name] = default_enabled + + def unregister(self, name: str) -> None: + """Remove a registered plugin if present.""" + self._factories.pop(name, None) + self._default_enabled.pop(name, None) + + def is_registered(self, name: str) -> bool: + """Return True if ``name`` is registered.""" + return name in self._factories + + def available(self) -> list[str]: + """Return the names of all registered plugins.""" + return list(self._factories) + + def build_from_config( + self, config: Mapping[str, object] | None = None + ) -> OrderedDict[str, Detector]: + """Build the enabled detectors described by a ``[detectors]`` config. + + Args: + config: The parsed ``[detectors]`` table. ``None`` or an empty + mapping yields the built-in defaults. + + Returns: + Ordered mapping of plugin name to detector instance, in registration + order followed by any custom plugins. + + Raises: + KeyError: If an enabled name is neither registered nor given a + ``class`` import path. + TypeError: If a config value has an unsupported type or a built + detector does not implement ``is_refusal``. + """ + config = config or {} + enabled: OrderedDict[str, bool] = OrderedDict(self._default_enabled) + + for name, spec in config.items(): + if isinstance(spec, bool): + if not self.is_registered(name): + raise KeyError( + f"Unknown detector {name!r}; register it or provide a " + "'class' import path." + ) + enabled[name] = spec + elif isinstance(spec, Mapping): + class_path = spec.get("class") + if class_path is not None: + options = dict(spec.get("options") or {}) + self.register(name, self._factory_from_path(class_path, options)) + elif not self.is_registered(name): + raise KeyError( + f"Unknown detector {name!r}; provide a 'class' import path." + ) + enabled[name] = bool(spec.get("enabled", True)) + else: + raise TypeError( + f"Detector config for {name!r} must be a bool or a table, " + f"got {type(spec).__name__}." + ) + + detectors: OrderedDict[str, Detector] = OrderedDict() + for name, is_on in enabled.items(): + if not is_on: + continue + detector = self._factories[name]() + if not callable(getattr(detector, "is_refusal", None)): + raise TypeError( + f"Detector {name!r} does not implement is_refusal(response)." + ) + detectors[name] = detector + logger.debug(f"Detector plugin enabled: {name}") + return detectors + + @staticmethod + def _factory_from_path(class_path: str, options: dict) -> DetectorFactory: + cls = load_plugin_class(class_path) + return lambda: cls(**options) + + +def _build_pii_detector() -> Detector: + from agentic_security.refusal_classifier.pii_detector import PIIDetector + + return PIIDetector() + + +def _build_sandbox_escape_detector() -> Detector: + from agentic_security.refusal_classifier.sandbox_escape_detector import ( + SandboxEscapeDetector, + ) + + return SandboxEscapeDetector() + + +# Global registry. ``default`` and ``ml_classifier`` are registered by +# agentic_security.probe_actor.refusal so the trained model is not imported here. +# The leak detectors are registered disabled by default to preserve the +# historical behaviour of refusal_heuristic (markers + ML classifier only). +registry = DetectorRegistry() +registry.register("pii", _build_pii_detector, default_enabled=False) +registry.register( + "sandbox_escape", _build_sandbox_escape_detector, default_enabled=False +) diff --git a/tests/unit/probe_actor/test_refusal_config.py b/tests/unit/probe_actor/test_refusal_config.py new file mode 100644 index 0000000..a0a5b7d --- /dev/null +++ b/tests/unit/probe_actor/test_refusal_config.py @@ -0,0 +1,48 @@ +from agentic_security.probe_actor.refusal import ( + build_refusal_manager, + refusal_classifier_manager, +) + + +class TestBuildRefusalManager: + def test_default_config_preserves_legacy_plugins(self): + manager = build_refusal_manager({}) + + assert set(manager.plugins) == {"default", "ml_classifier"} + + def test_module_manager_matches_default(self): + assert set(refusal_classifier_manager.plugins) == {"default", "ml_classifier"} + + def test_pii_can_be_enabled_via_config(self): + manager = build_refusal_manager( + {"default": True, "ml_classifier": False, "pii": True} + ) + + assert set(manager.plugins) == {"default", "pii"} + assert manager.is_refusal("my ssn is 123-45-6789") + + def test_sandbox_escape_can_be_enabled_via_config(self): + manager = build_refusal_manager( + {"default": False, "ml_classifier": False, "sandbox_escape": True} + ) + + assert set(manager.plugins) == {"sandbox_escape"} + assert manager.is_refusal("ls -la /var/run/docker.sock") + assert not manager.is_refusal("how do I bake bread?") + + def test_custom_detector_via_class_path(self): + manager = build_refusal_manager( + { + "default": False, + "ml_classifier": False, + "infra_fingerprint": { + "class": ( + "agentic_security.refusal_classifier." + "sandbox_escape_detector:SandboxEscapeDetector" + ), + }, + } + ) + + assert set(manager.plugins) == {"infra_fingerprint"} + assert manager.is_refusal("kubectl get pods") diff --git a/tests/unit/refusal_classifier/test_registry.py b/tests/unit/refusal_classifier/test_registry.py new file mode 100644 index 0000000..db4f09f --- /dev/null +++ b/tests/unit/refusal_classifier/test_registry.py @@ -0,0 +1,160 @@ +import pytest + +from agentic_security.refusal_classifier.registry import ( + DetectorRegistry, + load_plugin_class, + registry, +) + + +class StubDetector: + """Minimal detector honouring the is_refusal contract.""" + + def __init__(self, verdict: bool = True): + self.verdict = verdict + + def is_refusal(self, response: str) -> bool: + return self.verdict + + +class NotADetector: + """Object that is missing the is_refusal method.""" + + +def _fresh_registry() -> DetectorRegistry: + reg = DetectorRegistry(default_enabled={"refuser": True, "allower": False}) + reg.register("refuser", lambda: StubDetector(True)) + reg.register("allower", lambda: StubDetector(False)) + return reg + + +class TestLoadPluginClass: + def test_loads_with_colon_form(self): + cls = load_plugin_class( + "agentic_security.refusal_classifier.pii_detector:PIIDetector" + ) + assert cls.__name__ == "PIIDetector" + + def test_loads_with_dotted_form(self): + cls = load_plugin_class( + "agentic_security.refusal_classifier.pii_detector.PIIDetector" + ) + assert cls.__name__ == "PIIDetector" + + def test_invalid_path_raises_value_error(self): + with pytest.raises(ValueError): + load_plugin_class("PIIDetector") + + def test_missing_attribute_raises_import_error(self): + with pytest.raises(ImportError): + load_plugin_class( + "agentic_security.refusal_classifier.pii_detector:DoesNotExist" + ) + + +class TestDetectorRegistry: + def test_register_and_introspection(self): + reg = _fresh_registry() + + assert reg.is_registered("refuser") + assert not reg.is_registered("missing") + assert set(reg.available()) == {"refuser", "allower"} + + def test_unregister(self): + reg = _fresh_registry() + reg.unregister("allower") + + assert not reg.is_registered("allower") + assert reg.build_from_config({}).keys() == {"refuser"} + + def test_register_rejects_non_callable(self): + reg = DetectorRegistry() + with pytest.raises(TypeError): + reg.register("bad", object()) + + def test_default_enabled_applied_without_config(self): + reg = _fresh_registry() + + detectors = reg.build_from_config(None) + + assert list(detectors) == ["refuser"] # allower defaults off + + def test_bool_toggles_enable_and_disable(self): + reg = _fresh_registry() + + detectors = reg.build_from_config({"refuser": False, "allower": True}) + + assert list(detectors) == ["allower"] + + def test_unknown_bool_name_raises(self): + reg = _fresh_registry() + with pytest.raises(KeyError): + reg.build_from_config({"ghost": True}) + + def test_invalid_spec_type_raises(self): + reg = _fresh_registry() + with pytest.raises(TypeError): + reg.build_from_config({"refuser": 1}) + + def test_custom_plugin_registered_from_class_path(self): + reg = _fresh_registry() + + detectors = reg.build_from_config( + { + "refuser": False, + "pii_leak": { + "class": ( + "agentic_security.refusal_classifier." + "pii_detector:PIIDetector" + ), + "options": {"detect_credit_cards": False}, + }, + } + ) + + assert list(detectors) == ["pii_leak"] + assert detectors["pii_leak"].is_refusal("email me at a@b.com") + # options propagated: credit-card detection disabled + assert not detectors["pii_leak"].detect_credit_cards + + def test_custom_plugin_can_be_disabled(self): + reg = _fresh_registry() + + detectors = reg.build_from_config( + { + "pii_leak": { + "class": ( + "agentic_security.refusal_classifier." + "pii_detector:PIIDetector" + ), + "enabled": False, + } + } + ) + + assert "pii_leak" not in detectors + + def test_table_without_class_for_unknown_name_raises(self): + reg = _fresh_registry() + with pytest.raises(KeyError): + reg.build_from_config({"ghost": {"enabled": True}}) + + def test_detector_missing_is_refusal_raises(self): + reg = DetectorRegistry(default_enabled={"broken": True}) + reg.register("broken", NotADetector) + with pytest.raises(TypeError): + reg.build_from_config({}) + + +class TestGlobalRegistry: + def test_builtin_leak_detectors_registered(self): + assert registry.is_registered("pii") + assert registry.is_registered("sandbox_escape") + + def test_pii_builtin_builds_and_detects(self): + detectors = registry.build_from_config({"pii": True}) + assert detectors["pii"].is_refusal("contact me at jane@example.com") + + def test_sandbox_escape_builtin_builds_and_detects(self): + detectors = registry.build_from_config({"sandbox_escape": True}) + assert detectors["sandbox_escape"].is_refusal("ls -la /var/run/docker.sock")