Files
invariant-gateway/gateway/mcp/mcp_context.py
Luca Beurer-Kellner e18c6b5bdb Add an option to add extra metadata that is pushed and passed to Guardrails during an MCP session (#47)
* use select() before readline

* support for setting static metadata for MCP sessions

* nest extra mcp metadata in metadata object

* unify session metadata

* extra metadata tests

* use empty object as parameters, if None

* list_tools as tool call

* offset indices in tests

* test: adjust addresses

* mcp: make error reporting configurable

* line logging

* log version

* verbose logging + loud exception failure

* add server and client name to policy get

* append trace even if not pushing

* port tools/list message support to SSE

* use python -m build

* adjust guardrail failure address

* support for blocking tools/list in SSE

* use error-based failure response format by default

* tools/list test

* don't list_tools in stdio connect

* flaky test: handle second possible result in anthropic streaming case

---------

Co-authored-by: knielsen404 <kristian@invariantlabs.ai>
2025-05-19 13:44:37 +02:00

112 lines
3.9 KiB
Python

"""Context manager for MCP (Model Context Protocol) gateway."""
import argparse
import os
import random
import uuid
from typing import Dict
from gateway.integrations.explorer import (
fetch_guardrails_from_explorer,
)
from gateway.common.guardrails import GuardrailRuleSet
class McpContext:
"""Singleton class to manage MCP context and state."""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super(McpContext, cls).__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self, cli_args: list):
if not hasattr(self, "_initialized"):
self._initialized = False
if self._initialized:
return
config, extra_args = self._parse_cli_args(cli_args)
# The project name is used to identify the dataset in Invariant Explorer.
self.explorer_dataset = config.project_name
# whether to push traces to Invariant Explorer
self.push_explorer = config.push_explorer
# the format to use to communicate guardrail failures to the client
self.failure_response_format = config.failure_response_format
# verbose logging of in/out
self.verbose = config.verbose
# trace of this MCP session
self.trace = []
# tools available to the MCP server
self.tools = []
# configured guardrails
self.guardrails = GuardrailRuleSet(
blocking_guardrails=[], logging_guardrails=[]
)
# parsed from CLI (all --metadata-* args)
self.extra_metadata: Dict[str, str] = {}
for arg in extra_args:
assert "=" in arg, f"Invalid extra metadata argument: {arg}"
key, value = arg.split("=")
assert key.startswith(
"--metadata-"
), f"Invalid extra metadata argument: {arg}, must start with --metadata-"
key = key[len("--metadata-") :]
self.extra_metadata[key] = value
# captured from MCP calls/responses
self.mcp_client_name = ""
self.mcp_server_name = ""
# We send the same trace messages for guardrails analysis multiple times.
# We need to deduplicate them before sending to the explorer.
self.annotations = []
self.trace_id = None
self.local_session_id = str(uuid.uuid4())
self.last_trace_length = 0
self.id_to_method_mapping = {}
self._initialized = True
def _parse_cli_args(self, cli_args: list) -> argparse.Namespace:
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description="MCP Gateway")
parser.add_argument(
"--project-name",
help="Name of the Project from Invariant Explorer where we want to push the MCP traces. The guardrails are pulled from this project.",
type=str,
default=f"mcp-capture-{random.randint(1, 100)}",
)
parser.add_argument(
"--push-explorer",
help="Enable pushing traces to Invariant Explorer",
action="store_true",
)
parser.add_argument(
"--verbose",
help="Enable verbose logging",
action="store_true",
)
parser.add_argument(
"--failure-response-format",
help="The response format to use to communicate guardrail failures to the client (error: JSON-RPC error response; potentially invisible to the agent, content: JSON-RPC content response, visible to the agent)",
type=str,
default="error",
)
return parser.parse_known_args(cli_args)
async def load_guardrails(self):
"""Run async setup logic (e.g. fetching guardrails)."""
self.guardrails = await fetch_guardrails_from_explorer(
self.explorer_dataset, "Bearer " + os.getenv("INVARIANT_API_KEY"),
self.mcp_client_name,
self.mcp_server_name,
)