From 0086895db1330815016ed4027d4476fedc02abaf Mon Sep 17 00:00:00 2001 From: Yash Dhawan Date: Fri, 15 May 2026 10:23:42 +0530 Subject: [PATCH 01/20] add prompt templates to MCP server for guided security workflows Closes #192 Three prompt templates via @mcp.prompt(): - security_scan_prompt: full scan with configurable probe budget - verify_llm_prompt: quick reachability check before committing to a scan - adversarial_probe_prompt: multi-step attack session with findings summary Placed before the tool definitions with a clear section comment. No existing tool behaviour changed. --- agentic_security/mcp/main.py | 57 ++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/agentic_security/mcp/main.py b/agentic_security/mcp/main.py index 7e19e2c..0859ea0 100644 --- a/agentic_security/mcp/main.py +++ b/agentic_security/mcp/main.py @@ -11,6 +11,63 @@ mcp = FastMCP( AGENTIC_SECURITY = "http://0.0.0.0:8718" +# --------------------------------------------------------------------------- +# Prompt templates +# --------------------------------------------------------------------------- + + +@mcp.prompt() +def security_scan_prompt(llm_spec: str, max_budget: int = 1000) -> str: + """Generate a prompt to kick off a full LLM security scan. + + Args: + llm_spec: The LLM specification string identifying the model endpoint. + max_budget: Maximum number of probes to run (defaults to 1000). + """ + return ( + f"Please run a security scan on the following LLM specification:\n\n" + f" Spec: {llm_spec}\n" + f" Max budget: {max_budget} probes\n\n" + f"Use the start_scan tool to initiate the scan, then monitor progress " + f"with get_data_config, and stop it with stop_scan when complete." + ) + + +@mcp.prompt() +def verify_llm_prompt(llm_spec: str) -> str: + """Generate a prompt to verify that an LLM spec is reachable and well-formed. + + Args: + llm_spec: The LLM specification string to verify. + """ + return ( + f"Verify the following LLM specification is valid and reachable:\n\n" + f" Spec: {llm_spec}\n\n" + f"Use the verify_llm tool and report back whether the spec is accepted " + f"by the Agentic Security server." + ) + + +@mcp.prompt() +def adversarial_probe_prompt(llm_spec: str) -> str: + """Generate a prompt for an adversarial probing session with multi-step attacks. + + Args: + llm_spec: The LLM specification string identifying the target model. + """ + return ( + f"Run an adversarial probing session against the LLM described by:\n\n" + f" Spec: {llm_spec}\n\n" + f"Enable multi-step attacks and optimization in the start_scan call. " + f"After the scan finishes, summarise the most critical vulnerabilities found." + ) + + +# --------------------------------------------------------------------------- +# Tools +# --------------------------------------------------------------------------- + + @mcp.tool() async def verify_llm(spec: str) -> dict: """ From 8e3120c90d729e709e3abf5f949722a34f68ed12 Mon Sep 17 00:00:00 2001 From: Yash Dhawan Date: Fri, 15 May 2026 10:25:06 +0530 Subject: [PATCH 02/20] document Claude MCP usage in README Closes #193 Expands the MCP server section with: - what tools are exposed and what each one does - step-by-step Claude Desktop setup - the three built-in prompt templates and when to use them - a short example conversation showing natural-language scan control - Claude Code CLI setup for terminal-based workflows --- Readme.md | 53 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/Readme.md b/Readme.md index 97efb9c..121a6ff 100644 --- a/Readme.md +++ b/Readme.md @@ -403,6 +403,10 @@ The `Module` class is designed to manage prompt processing and interaction with ## MCP server +The Agentic Security MCP server exposes the scanner's REST API as callable tools and reusable prompt templates, so any MCP-compatible client (Claude Desktop, Claude Code, custom agents) can drive security scans through natural language. + +### Installation + ```shell pip install -U mcp @@ -410,6 +414,55 @@ pip install -U mcp mcp install agentic_security/mcp/main.py ``` +### Using with Claude Desktop + +1. Start the Agentic Security FastAPI server (default port `8718`): + + ```shell + poetry run agentic_security + ``` + +2. Install the MCP server into Claude Desktop: + + ```shell + mcp install agentic_security/mcp/main.py --name "Agentic Security" + ``` + +3. Open Claude Desktop — the following **tools** are now available: + + | Tool | Description | + |---|---| + | `start_scan` | Launch a security scan against an LLM spec | + | `stop_scan` | Halt an in-progress scan | + | `verify_llm` | Check that an LLM spec is reachable | + | `get_data_config` | Retrieve the current dataset configuration | + | `get_spec_templates` | List available LLM spec templates | + +4. Or kick off a scan using one of the built-in **prompt templates**: + + - **`security_scan_prompt`** — runs a full scan with a configurable probe budget + - **`verify_llm_prompt`** — confirms a spec is reachable before committing to a scan + - **`adversarial_probe_prompt`** — enables multi-step attacks and asks Claude to summarise the worst findings + +### Example conversation with Claude + +``` +You: Use the security_scan_prompt for spec "openai/gpt-4o" with a budget of 500 probes. + +Claude: I'll kick off the scan now. Starting with verify_llm to confirm the spec is + reachable, then launching start_scan with maxBudget=500... +``` + +### Using with Claude Code (CLI) + +```shell +# Add to your project's MCP config +claude mcp add agentic-security -- python agentic_security/mcp/main.py + +# Then interact inline +claude "Run a quick adversarial probe against my local LLM at http://localhost:8080/v1" +``` + ## Documentation For more detailed information on how to use Agentic Security, including advanced features and customization options, please refer to the official documentation. From 6e6fdbcf28689ff1be7e2a66bf8778f7b6e945ba Mon Sep 17 00:00:00 2001 From: RheagalFire Date: Tue, 19 May 2026 01:38:07 +0530 Subject: [PATCH 03/20] feat: add LiteLLM as provider for 100+ LLM backends --- agentic_security/llm_providers/__init__.py | 2 + agentic_security/llm_providers/factory.py | 2 + .../llm_providers/litellm_provider.py | 114 ++++++++++++++++++ 3 files changed, 118 insertions(+) create mode 100644 agentic_security/llm_providers/litellm_provider.py diff --git a/agentic_security/llm_providers/__init__.py b/agentic_security/llm_providers/__init__.py index dbe17d5..54663f7 100644 --- a/agentic_security/llm_providers/__init__.py +++ b/agentic_security/llm_providers/__init__.py @@ -7,6 +7,7 @@ from agentic_security.llm_providers.base import ( ) from agentic_security.llm_providers.openai_provider import OpenAIProvider from agentic_security.llm_providers.anthropic_provider import AnthropicProvider +from agentic_security.llm_providers.litellm_provider import LiteLLMProvider from agentic_security.llm_providers.factory import create_provider, get_provider_class __all__ = [ @@ -17,6 +18,7 @@ __all__ = [ "LLMRateLimitError", "OpenAIProvider", "AnthropicProvider", + "LiteLLMProvider", "create_provider", "get_provider_class", ] diff --git a/agentic_security/llm_providers/factory.py b/agentic_security/llm_providers/factory.py index 4736bef..577e5a1 100644 --- a/agentic_security/llm_providers/factory.py +++ b/agentic_security/llm_providers/factory.py @@ -14,9 +14,11 @@ def _ensure_registered() -> None: return from agentic_security.llm_providers.openai_provider import OpenAIProvider from agentic_security.llm_providers.anthropic_provider import AnthropicProvider + from agentic_security.llm_providers.litellm_provider import LiteLLMProvider _PROVIDERS["openai"] = OpenAIProvider _PROVIDERS["anthropic"] = AnthropicProvider + _PROVIDERS["litellm"] = LiteLLMProvider def register_provider(name: str, provider_class: type[BaseLLMProvider]) -> None: diff --git a/agentic_security/llm_providers/litellm_provider.py b/agentic_security/llm_providers/litellm_provider.py new file mode 100644 index 0000000..0cf9ef3 --- /dev/null +++ b/agentic_security/llm_providers/litellm_provider.py @@ -0,0 +1,114 @@ +"""LiteLLM provider — unified access to 100+ LLM backends.""" + +from typing import Any + +from agentic_security.llm_providers.base import ( + BaseLLMProvider, + LLMMessage, + LLMProviderError, + LLMRateLimitError, + LLMResponse, +) + + +class LiteLLMProvider(BaseLLMProvider): + """LLM provider using LiteLLM SDK for 100+ backends. + + Accepts any LiteLLM model string (e.g. ``openai/gpt-4o``, + ``anthropic/claude-sonnet-4-6``, ``groq/llama-3.3-70b-versatile``). + """ + + DEFAULT_MODEL = "openai/gpt-4o-mini" + + def __init__( + self, + model: str = DEFAULT_MODEL, + api_key: str | None = None, + api_base: str | None = None, + **kwargs: Any, + ) -> None: + super().__init__(model, **kwargs) + self._api_key = api_key + self._api_base = api_base + + def _call_kwargs(self) -> dict[str, Any]: + kwargs: dict[str, Any] = {"model": self.model, "drop_params": True} + if self._api_key: + kwargs["api_key"] = self._api_key + if self._api_base: + kwargs["api_base"] = self._api_base + return kwargs + + @classmethod + def get_supported_models(cls) -> list[str]: + return [ + "openai/gpt-4o", + "openai/gpt-4o-mini", + "anthropic/claude-sonnet-4-6", + "anthropic/claude-haiku-4-5", + "groq/llama-3.3-70b-versatile", + "together_ai/meta-llama/Llama-3.3-70B-Instruct-Turbo", + ] + + def _messages_to_dicts(self, messages: list[LLMMessage]) -> list[dict[str, str]]: + return [{"role": m.role, "content": m.content} for m in messages] + + def _parse_response(self, response: Any) -> LLMResponse: + choice = response.choices[0] + usage = None + if response.usage: + usage = { + "prompt_tokens": response.usage.prompt_tokens, + "completion_tokens": response.usage.completion_tokens, + "total_tokens": response.usage.total_tokens, + } + return LLMResponse( + content=choice.message.content or "", + model=getattr(response, "model", self.model), + finish_reason=choice.finish_reason, + usage=usage, + ) + + def _handle_error(self, e: Exception) -> None: + qualname = f"{type(e).__module__}.{type(e).__name__}" + if qualname == "litellm.exceptions.RateLimitError": + raise LLMRateLimitError(str(e)) from e + raise LLMProviderError(str(e)) from e + + async def generate(self, prompt: str, **kwargs: Any) -> LLMResponse: + messages = [LLMMessage(role="user", content=prompt)] + if system_prompt := kwargs.pop("system_prompt", None): + messages.insert(0, LLMMessage(role="system", content=system_prompt)) + return await self.chat(messages, **kwargs) + + async def chat(self, messages: list[LLMMessage], **kwargs: Any) -> LLMResponse: + import litellm + + try: + response = await litellm.acompletion( + messages=self._messages_to_dicts(messages), + **{**self._call_kwargs(), **kwargs}, + ) + return self._parse_response(response) + except Exception as e: + self._handle_error(e) + raise + + def sync_generate(self, prompt: str, **kwargs: Any) -> LLMResponse: + messages = [LLMMessage(role="user", content=prompt)] + if system_prompt := kwargs.pop("system_prompt", None): + messages.insert(0, LLMMessage(role="system", content=system_prompt)) + return self.sync_chat(messages, **kwargs) + + def sync_chat(self, messages: list[LLMMessage], **kwargs: Any) -> LLMResponse: + import litellm + + try: + response = litellm.completion( + messages=self._messages_to_dicts(messages), + **{**self._call_kwargs(), **kwargs}, + ) + return self._parse_response(response) + except Exception as e: + self._handle_error(e) + raise From a4833908ef7124b37f4c50115712a257e1dc3051 Mon Sep 17 00:00:00 2001 From: RheagalFire Date: Tue, 19 May 2026 01:50:40 +0530 Subject: [PATCH 04/20] test: add 29 unit tests and remove lazy imports --- .../llm_providers/litellm_provider.py | 6 +- .../llm_providers/test_litellm_provider.py | 233 ++++++++++++++++++ 2 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 tests/unit/llm_providers/test_litellm_provider.py diff --git a/agentic_security/llm_providers/litellm_provider.py b/agentic_security/llm_providers/litellm_provider.py index 0cf9ef3..6dfab92 100644 --- a/agentic_security/llm_providers/litellm_provider.py +++ b/agentic_security/llm_providers/litellm_provider.py @@ -2,6 +2,8 @@ from typing import Any +import litellm + from agentic_security.llm_providers.base import ( BaseLLMProvider, LLMMessage, @@ -82,8 +84,6 @@ class LiteLLMProvider(BaseLLMProvider): return await self.chat(messages, **kwargs) async def chat(self, messages: list[LLMMessage], **kwargs: Any) -> LLMResponse: - import litellm - try: response = await litellm.acompletion( messages=self._messages_to_dicts(messages), @@ -101,8 +101,6 @@ class LiteLLMProvider(BaseLLMProvider): return self.sync_chat(messages, **kwargs) def sync_chat(self, messages: list[LLMMessage], **kwargs: Any) -> LLMResponse: - import litellm - try: response = litellm.completion( messages=self._messages_to_dicts(messages), diff --git a/tests/unit/llm_providers/test_litellm_provider.py b/tests/unit/llm_providers/test_litellm_provider.py new file mode 100644 index 0000000..bc3f1df --- /dev/null +++ b/tests/unit/llm_providers/test_litellm_provider.py @@ -0,0 +1,233 @@ +"""Tests for LiteLLM provider.""" + +import pytest +from inline_snapshot import snapshot +from unittest.mock import MagicMock, AsyncMock, patch + +from agentic_security.llm_providers.litellm_provider import LiteLLMProvider +from agentic_security.llm_providers.base import ( + LLMMessage, + LLMProviderError, + LLMRateLimitError, +) + + +def _mock_response(content="Hello", model="openai/gpt-4o-mini", finish_reason="stop", + prompt_tokens=10, completion_tokens=5, total_tokens=15): + resp = MagicMock() + resp.choices = [MagicMock()] + resp.choices[0].message.content = content + resp.choices[0].finish_reason = finish_reason + resp.model = model + resp.usage.prompt_tokens = prompt_tokens + resp.usage.completion_tokens = completion_tokens + resp.usage.total_tokens = total_tokens + return resp + + +class TestLiteLLMProviderInit: + def test_default_model(self): + provider = LiteLLMProvider() + assert provider.model == snapshot("openai/gpt-4o-mini") + + def test_custom_model(self): + provider = LiteLLMProvider(model="anthropic/claude-sonnet-4-6") + assert provider.model == snapshot("anthropic/claude-sonnet-4-6") + + def test_no_api_key_required(self): + provider = LiteLLMProvider() + assert provider._api_key is None + + def test_api_key_stored(self): + provider = LiteLLMProvider(api_key="sk-test") + assert provider._api_key == snapshot("sk-test") + + def test_api_base_stored(self): + provider = LiteLLMProvider(api_base="http://localhost:4000") + assert provider._api_base == snapshot("http://localhost:4000") + + +class TestLiteLLMProviderCallKwargs: + def test_drop_params_always_true(self): + provider = LiteLLMProvider() + kwargs = provider._call_kwargs() + assert kwargs["drop_params"] is True + + def test_api_key_forwarded_when_set(self): + provider = LiteLLMProvider(api_key="sk-test") + kwargs = provider._call_kwargs() + assert kwargs["api_key"] == snapshot("sk-test") + + def test_api_key_omitted_when_none(self): + provider = LiteLLMProvider() + kwargs = provider._call_kwargs() + assert "api_key" not in kwargs + + def test_api_base_forwarded_when_set(self): + provider = LiteLLMProvider(api_base="http://localhost:4000") + kwargs = provider._call_kwargs() + assert kwargs["api_base"] == snapshot("http://localhost:4000") + + def test_api_base_omitted_when_none(self): + provider = LiteLLMProvider() + kwargs = provider._call_kwargs() + assert "api_base" not in kwargs + + def test_model_in_kwargs(self): + provider = LiteLLMProvider(model="groq/llama-3.3-70b-versatile") + kwargs = provider._call_kwargs() + assert kwargs["model"] == snapshot("groq/llama-3.3-70b-versatile") + + +class TestLiteLLMProviderMethods: + def test_get_supported_models(self): + models = LiteLLMProvider.get_supported_models() + assert "openai/gpt-4o" in models + assert "anthropic/claude-sonnet-4-6" in models + + def test_messages_to_dicts(self): + provider = LiteLLMProvider() + messages = [ + LLMMessage(role="system", content="Be helpful"), + LLMMessage(role="user", content="Hello"), + ] + result = provider._messages_to_dicts(messages) + assert result == snapshot( + [ + {"role": "system", "content": "Be helpful"}, + {"role": "user", "content": "Hello"}, + ] + ) + + def test_parse_response(self): + provider = LiteLLMProvider() + resp = _mock_response(content="Hi!", model="openai/gpt-4o") + result = provider._parse_response(resp) + assert result.content == snapshot("Hi!") + assert result.model == snapshot("openai/gpt-4o") + assert result.finish_reason == snapshot("stop") + assert result.usage == snapshot( + {"prompt_tokens": 10, "completion_tokens": 5, "total_tokens": 15} + ) + + def test_parse_response_null_content(self): + provider = LiteLLMProvider() + resp = _mock_response(content=None) + result = provider._parse_response(resp) + assert result.content == snapshot("") + + def test_parse_response_no_usage(self): + provider = LiteLLMProvider() + resp = _mock_response() + resp.usage = None + result = provider._parse_response(resp) + assert result.usage is None + + +class TestLiteLLMProviderSync: + @pytest.fixture + def provider(self): + return LiteLLMProvider(model="openai/gpt-4o-mini") + + def test_sync_generate(self, provider): + resp = _mock_response(content="Sync response") + with patch("agentic_security.llm_providers.litellm_provider.litellm.completion", return_value=resp) as mock_comp: + result = provider.sync_generate("Hello") + assert result.content == snapshot("Sync response") + call_kwargs = mock_comp.call_args.kwargs + assert call_kwargs["drop_params"] is True + + def test_sync_chat(self, provider): + resp = _mock_response(content="Chat response") + messages = [LLMMessage(role="user", content="Hi")] + with patch("agentic_security.llm_providers.litellm_provider.litellm.completion", return_value=resp): + result = provider.sync_chat(messages) + assert result.content == snapshot("Chat response") + + def test_sync_generate_with_system_prompt(self, provider): + resp = _mock_response(content="With system") + with patch("agentic_security.llm_providers.litellm_provider.litellm.completion", return_value=resp) as mock_comp: + result = provider.sync_generate("Hello", system_prompt="Be brief") + assert result.content == snapshot("With system") + messages = mock_comp.call_args.kwargs["messages"] + assert messages[0]["role"] == "system" + assert messages[0]["content"] == "Be brief" + + +class TestLiteLLMProviderAsync: + @pytest.fixture + def provider(self): + return LiteLLMProvider(model="anthropic/claude-sonnet-4-6") + + @pytest.mark.asyncio + async def test_generate(self, provider): + resp = _mock_response(content="Async response") + with patch("agentic_security.llm_providers.litellm_provider.litellm.acompletion", new_callable=AsyncMock, return_value=resp): + result = await provider.generate("Hello") + assert result.content == snapshot("Async response") + + @pytest.mark.asyncio + async def test_chat(self, provider): + resp = _mock_response(content="Async chat") + messages = [LLMMessage(role="user", content="Hi")] + with patch("agentic_security.llm_providers.litellm_provider.litellm.acompletion", new_callable=AsyncMock, return_value=resp) as mock_acomp: + result = await provider.chat(messages) + assert result.content == snapshot("Async chat") + call_kwargs = mock_acomp.call_args.kwargs + assert call_kwargs["model"] == "anthropic/claude-sonnet-4-6" + assert call_kwargs["drop_params"] is True + + +class TestLiteLLMProviderErrors: + @pytest.fixture + def provider(self): + return LiteLLMProvider() + + def test_rate_limit_maps_to_llm_rate_limit_error(self, provider): + fake_exc = type("RateLimitError", (Exception,), {})() + fake_exc.__class__.__module__ = "litellm.exceptions" + fake_exc.__class__.__qualname__ = "RateLimitError" + with pytest.raises(LLMRateLimitError): + provider._handle_error(fake_exc) + + def test_generic_error_maps_to_llm_provider_error(self, provider): + with pytest.raises(LLMProviderError): + provider._handle_error(Exception("something went wrong")) + + def test_sync_chat_auth_error_raises_provider_error(self, provider): + with patch("agentic_security.llm_providers.litellm_provider.litellm.completion", side_effect=Exception("AuthenticationError: Invalid API key")): + with pytest.raises(LLMProviderError, match="Invalid API key"): + provider.sync_generate("test") + + @pytest.mark.asyncio + async def test_async_chat_timeout_raises_provider_error(self, provider): + with patch("agentic_security.llm_providers.litellm_provider.litellm.acompletion", new_callable=AsyncMock, + side_effect=Exception("Timeout: Request timed out")): + with pytest.raises(LLMProviderError, match="timed out"): + await provider.generate("test") + + @pytest.mark.asyncio + async def test_async_chat_model_not_found_raises_provider_error(self, provider): + provider = LiteLLMProvider(model="bad/nonexistent-model") + with patch("agentic_security.llm_providers.litellm_provider.litellm.acompletion", new_callable=AsyncMock, + side_effect=Exception("NotFoundError: Model not found")): + with pytest.raises(LLMProviderError, match="not found"): + await provider.generate("test") + + +class TestLiteLLMProviderFactory: + def test_factory_creates_litellm_provider(self): + from agentic_security.llm_providers.factory import create_provider + provider = create_provider("litellm") + assert isinstance(provider, LiteLLMProvider) + assert provider.model == snapshot("openai/gpt-4o-mini") + + def test_factory_creates_with_custom_model(self): + from agentic_security.llm_providers.factory import create_provider + provider = create_provider("litellm", model="groq/llama-3.3-70b-versatile") + assert provider.model == snapshot("groq/llama-3.3-70b-versatile") + + def test_factory_lists_litellm(self): + from agentic_security.llm_providers.factory import list_providers + providers = list_providers() + assert "litellm" in providers From 72f0f63a891148999d5d3e0d0643c4bd3b96e8ce Mon Sep 17 00:00:00 2001 From: JackSpiece Date: Tue, 19 May 2026 19:16:11 +0800 Subject: [PATCH 05/20] docs: add MCP client usage examples --- Readme.md | 19 ++++++ agentic_security/mcp/client.py | 40 +++++-------- agentic_security/mcp/main.py | 4 +- docs/mcp_client_usage.md | 65 +++++++++++++++++++++ examples/mcp_client_usage.py | 103 +++++++++++++++++++++++++++++++++ mkdocs.yml | 1 + tests/unit/test_mcp.py | 18 ++++-- 7 files changed, 217 insertions(+), 33 deletions(-) create mode 100644 docs/mcp_client_usage.md create mode 100644 examples/mcp_client_usage.py diff --git a/Readme.md b/Readme.md index 97efb9c..a731c9a 100644 --- a/Readme.md +++ b/Readme.md @@ -83,6 +83,25 @@ agentic_security --port=PORT --host=HOST booking-screen +## MCP client example + +Agentic Security includes an MCP stdio server in `agentic_security.mcp.main`. +To list the available MCP tools from a local checkout: + +```shell +python examples/mcp_client_usage.py +``` + +To call HTTP-backed tools, run the Agentic Security app first, then point the +MCP server at it: + +```shell +agentic_security --host 127.0.0.1 --port 8718 +python examples/mcp_client_usage.py --agentic-security-url http://127.0.0.1:8718 --call get_spec_templates +``` + +See `docs/mcp_client_usage.md` for the full walkthrough. + ## LLM kwargs Agentic Security uses plain text HTTP spec like: diff --git a/agentic_security/mcp/client.py b/agentic_security/mcp/client.py index e8a29cc..b3d9a05 100644 --- a/agentic_security/mcp/client.py +++ b/agentic_security/mcp/client.py @@ -1,30 +1,32 @@ import asyncio +import sys from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client from agentic_security.logutils import logger -# Create server parameters for stdio connection -server_params = StdioServerParameters( - command="python", # Executable - args=["agentic_security/mcp/main.py"], # Your server script - env=None, # Optional environment variables -) + +def build_server_params() -> StdioServerParameters: + """Create server parameters for a stdio MCP client session.""" + return StdioServerParameters( + command=sys.executable, + args=["-m", "agentic_security.mcp.main"], + env=None, + ) async def run() -> None: try: + server_params = build_server_params() logger.info( "Starting stdio client session with server parameters: %s", server_params ) async with stdio_client(server_params) as (read, write): async with ClientSession(read, write) as session: - # Initialize the connection --> connection does not work logger.info("Initializing client session...") await session.initialize() - # List available prompts, resources, and tools --> no avalialbe tools logger.info("Listing available prompts...") prompts = await session.list_prompts() logger.info(f"Available prompts: {prompts}") @@ -36,26 +38,10 @@ async def run() -> None: logger.info("Listing available tools...") tools = await session.list_tools() logger.info(f"Available tools: {tools}") - - # Call the echo tool --> echo tool issue - logger.info("Calling echo_tool with message...") - echo_result = await session.call_tool( - "echo_tool", arguments={"message": "Hello from client!"} + logger.info( + "Available MCP tool names: %s", + ", ".join(tool.name for tool in tools.tools), ) - logger.info(f"Tool result: {echo_result}") - - # # Read the echo resource - # echo_content, mime_type = await session.read_resource( - # "echo://Hello_resource" - # ) - # logger.info(f"Resource content: {echo_content}") - # logger.info(f"Resource MIME type: {mime_type}") - - # # Get and use the echo prompt - # prompt_result = await session.get_prompt( - # "echo_prompt", arguments={"message": "Hello prompt!"} - # ) - # logger.info(f"Prompt result: {prompt_result}") logger.info("Client operations completed successfully.") return prompts, resources, tools diff --git a/agentic_security/mcp/main.py b/agentic_security/mcp/main.py index 7e19e2c..858da5f 100644 --- a/agentic_security/mcp/main.py +++ b/agentic_security/mcp/main.py @@ -1,3 +1,5 @@ +import os + import httpx from mcp.server.fastmcp import FastMCP @@ -8,7 +10,7 @@ mcp = FastMCP( ) # FastAPI Server Configuration -AGENTIC_SECURITY = "http://0.0.0.0:8718" +AGENTIC_SECURITY = os.getenv("AGENTIC_SECURITY_URL", "http://0.0.0.0:8718") @mcp.tool() diff --git a/docs/mcp_client_usage.md b/docs/mcp_client_usage.md new file mode 100644 index 0000000..55109d7 --- /dev/null +++ b/docs/mcp_client_usage.md @@ -0,0 +1,65 @@ +# MCP client usage + +Agentic Security exposes an MCP stdio server in `agentic_security.mcp.main`. +The example client in `examples/mcp_client_usage.py` shows how to connect to +that server, list available tools, and optionally call simple no-argument tools. + +## List MCP tools + +From the repository root: + +```shell +python examples/mcp_client_usage.py +``` + +This starts the MCP server as a subprocess with: + +```shell +python -m agentic_security.mcp.main +``` + +The client initializes an MCP session and prints the available Agentic Security +tools, including `verify_llm`, `start_scan`, `stop_scan`, `get_data_config`, and +`get_spec_templates`. + +## Call an HTTP-backed tool + +Some MCP tools call the Agentic Security HTTP app. Start the app in another +terminal first: + +```shell +agentic_security --host 127.0.0.1 --port 8718 +``` + +Then point the MCP server at that app and call a no-argument tool: + +```shell +python examples/mcp_client_usage.py \ + --agentic-security-url http://127.0.0.1:8718 \ + --call get_spec_templates +``` + +You can also set `AGENTIC_SECURITY_URL` directly: + +```shell +AGENTIC_SECURITY_URL=http://127.0.0.1:8718 python examples/mcp_client_usage.py --call get_data_config +``` + +## Use the package helper + +For tests or quick local checks, `agentic_security.mcp.client.run()` creates the +same stdio session and returns the prompt, resource, and tool list results: + +```python +import asyncio + +from agentic_security.mcp.client import run + + +async def main() -> None: + _prompts, _resources, tools = await run() + print([tool.name for tool in tools.tools]) + + +asyncio.run(main()) +``` diff --git a/examples/mcp_client_usage.py b/examples/mcp_client_usage.py new file mode 100644 index 0000000..e13b445 --- /dev/null +++ b/examples/mcp_client_usage.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +"""Example MCP client for the Agentic Security stdio server. + +The default command lists the tools exposed by ``agentic_security.mcp.main``. +If the Agentic Security HTTP app is running, pass ``--call`` to invoke one of +the no-argument HTTP-backed tools through MCP. +""" + +from __future__ import annotations + +import argparse +import asyncio +import json +import os +import sys +from typing import Any + +from mcp import ClientSession, StdioServerParameters +from mcp.client.stdio import stdio_client + + +NO_ARGUMENT_TOOLS = {"get_data_config", "get_spec_templates", "stop_scan"} + + +def _build_server_params(agentic_security_url: str | None) -> StdioServerParameters: + env = os.environ.copy() + if agentic_security_url: + env["AGENTIC_SECURITY_URL"] = agentic_security_url + + return StdioServerParameters( + command=sys.executable, + args=["-m", "agentic_security.mcp.main"], + env=env, + ) + + +def _jsonable(value: Any) -> Any: + if hasattr(value, "model_dump"): + return value.model_dump(mode="json") + if isinstance(value, (list, tuple)): + return [_jsonable(item) for item in value] + if isinstance(value, dict): + return {key: _jsonable(item) for key, item in value.items()} + return value + + +async def run_client(agentic_security_url: str | None, call_tool: str | None) -> None: + server_params = _build_server_params(agentic_security_url) + + async with stdio_client(server_params) as (read, write): + async with ClientSession(read, write) as session: + await session.initialize() + tools = await session.list_tools() + tool_names = [tool.name for tool in tools.tools] + + print("Available Agentic Security MCP tools:") + for tool in tools.tools: + description_lines = (tool.description or "").strip().splitlines() + description = description_lines[0] if description_lines else "No description" + print(f"- {tool.name}: {description}") + + if not call_tool: + return + + if call_tool not in tool_names: + raise ValueError( + f"Unknown tool {call_tool!r}. Available tools: {', '.join(tool_names)}" + ) + if call_tool not in NO_ARGUMENT_TOOLS: + raise ValueError( + f"{call_tool!r} requires arguments. This example only calls " + f"no-argument tools: {', '.join(sorted(NO_ARGUMENT_TOOLS))}" + ) + + result = await session.call_tool(call_tool, arguments={}) + print() + print(f"{call_tool} result:") + print(json.dumps(_jsonable(result), indent=2)) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="List Agentic Security MCP tools and optionally call one.", + ) + parser.add_argument( + "--agentic-security-url", + default=None, + help=( + "Agentic Security HTTP app URL. Defaults to AGENTIC_SECURITY_URL " + "or http://0.0.0.0:8718 in the server." + ), + ) + parser.add_argument( + "--call", + choices=sorted(NO_ARGUMENT_TOOLS), + help="Optional no-argument MCP tool to call after listing tools.", + ) + return parser.parse_args() + + +if __name__ == "__main__": + args = parse_args() + asyncio.run(run_client(args.agentic_security_url, args.call)) diff --git a/mkdocs.yml b/mkdocs.yml index 9e7ac41..2317754 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -26,6 +26,7 @@ nav: - Dataset Extension: datasets.md - External Modules: external_module.md - CI/CD Integration: ci_cd.md + - MCP Client Usage: mcp_client_usage.md - Bayesian Optimization: optimizer.md - Image Generation: image_generation.md - Stenography Functions: stenography.md diff --git a/tests/unit/test_mcp.py b/tests/unit/test_mcp.py index 23f8f53..2d0adbb 100644 --- a/tests/unit/test_mcp.py +++ b/tests/unit/test_mcp.py @@ -4,9 +4,17 @@ from agentic_security.mcp.client import run @pytest.mark.asyncio -async def test_mcp_echo_tool(): - """Test the echo tool functionality""" +async def test_mcp_client_lists_agentic_security_tools(): + """Test that the MCP client can discover the server tools.""" prompts, resources, tools = await run() - assert prompts - assert resources - assert tools + tool_names = {tool.name for tool in tools.tools} + + assert prompts is not None + assert resources is not None + assert { + "verify_llm", + "start_scan", + "stop_scan", + "get_data_config", + "get_spec_templates", + }.issubset(tool_names) From b2c4656e41965c5c3004edf11e1c89fecde3a303 Mon Sep 17 00:00:00 2001 From: JackSpiece Date: Tue, 19 May 2026 19:42:14 +0800 Subject: [PATCH 06/20] fix: migrate static UI to Tailwind v4 --- agentic_security/routes/static.py | 2 +- agentic_security/static/index.html | 38 +++--- agentic_security/static/partials/concent.html | 6 +- agentic_security/static/partials/head.html | 124 +++++++----------- agentic_security/static/tailwindcss.js | 87 +----------- 5 files changed, 74 insertions(+), 183 deletions(-) diff --git a/agentic_security/routes/static.py b/agentic_security/routes/static.py index 2abafd9..b5a838e 100644 --- a/agentic_security/routes/static.py +++ b/agentic_security/routes/static.py @@ -115,7 +115,7 @@ async def serve_icon(icon_name: str) -> FileResponse: async def proxy_tailwindcss() -> FileResponse: """Proxy the Tailwind CSS script.""" return proxy_external_resource( - "https://cdn.tailwindcss.com", + "https://cdn.jsdelivr.net/npm/@tailwindcss/browser@4", STATIC_DIR / "tailwindcss.js", "application/javascript", ) diff --git a/agentic_security/static/index.html b/agentic_security/static/index.html index fcdf046..0a332bc 100644 --- a/agentic_security/static/index.html +++ b/agentic_security/static/index.html @@ -68,11 +68,11 @@
{{ toast.message }} @@ -154,13 +154,13 @@