refactor: remove module system, migrate to MCP hub tools architecture

This commit is contained in:
AFredefon
2026-03-08 17:53:29 +01:00
parent 075b678e9d
commit 1d495cedce
141 changed files with 1182 additions and 8992 deletions

View File

@@ -15,7 +15,7 @@ Supported transport types:
"""
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.client import HubClient, HubClientError, PersistentSession
from fuzzforge_common.hub.executor import HubExecutionResult, HubExecutor
from fuzzforge_common.hub.models import (
HubConfig,
@@ -39,4 +39,5 @@ __all__ = [
"HubServerType",
"HubTool",
"HubToolParameter",
"PersistentSession",
]

View File

@@ -6,6 +6,7 @@ via stdio (docker/command) or SSE transport. It handles:
- Connecting to SSE endpoints
- Discovering tools via list_tools()
- Executing tools via call_tool()
- Persistent container sessions for stateful interactions
"""
@@ -16,6 +17,8 @@ import json
import os
import subprocess
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.models import (
@@ -47,6 +50,48 @@ class HubClientError(Exception):
"""Error in hub client operations."""
@dataclass
class PersistentSession:
"""A persistent container session with an active MCP connection.
Keeps a Docker container running between tool calls to allow
stateful interactions (e.g., radare2 analysis, long-running fuzzing).
"""
#: Server name this session belongs to.
server_name: str
#: Docker container name.
container_name: str
#: Underlying process (docker run).
process: Process
#: Stream reader (process stdout).
reader: asyncio.StreamReader
#: Stream writer (process stdin).
writer: asyncio.StreamWriter
#: Whether the MCP session has been initialized.
initialized: bool = False
#: Lock to serialise concurrent requests on the same session.
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
#: When the session was started.
started_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
#: Monotonic counter for JSON-RPC request IDs.
request_id: int = 0
@property
def alive(self) -> bool:
"""Check if the underlying process is still running."""
return self.process.returncode is None
class HubClient:
"""Client for communicating with MCP hub servers.
@@ -65,6 +110,8 @@ class HubClient:
"""
self._timeout = timeout
self._persistent_sessions: dict[str, PersistentSession] = {}
self._request_id: int = 0
async def discover_tools(self, server: HubServer) -> list[HubTool]:
"""Discover tools from a hub server.
@@ -84,8 +131,9 @@ class HubClient:
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
# List tools
tools_data = await self._call_method(
@@ -141,7 +189,7 @@ class HubClient:
"""
logger = get_logger()
config = server.config
exec_timeout = timeout or self._timeout
exec_timeout = timeout or config.timeout or self._timeout
logger.info(
"Executing hub tool",
@@ -152,8 +200,9 @@ class HubClient:
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
# Call tool
result = await asyncio.wait_for(
@@ -202,10 +251,22 @@ class HubClient:
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
If a persistent session exists for this server, reuse it (with a lock
to serialise concurrent requests). Otherwise, fall through to the
ephemeral per-call connection logic.
:param config: Server configuration.
:yields: Tuple of (reader, writer) for communication.
"""
# Check for active persistent session
session = self._persistent_sessions.get(config.name)
if session and session.initialized and session.alive:
async with session.lock:
yield session.reader, session.writer # type: ignore[misc]
return
# Ephemeral connection (original behaviour)
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config) as streams:
yield streams
@@ -251,11 +312,15 @@ class HubClient:
cmd.append(config.image)
# Use 4 MB buffer to handle large tool responses (YARA rulesets, trivy output, etc.)
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
)
try:
@@ -294,12 +359,16 @@ class HubClient:
# Set up environment
env = dict(config.environment) if config.environment else None
# Use 4 MB buffer to handle large tool responses
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*config.command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
limit=_STREAM_LIMIT,
)
try:
@@ -385,10 +454,11 @@ class HubClient:
:returns: Method result.
"""
# Create JSON-RPC request
# Create JSON-RPC request with unique ID
self._request_id += 1
request = {
"jsonrpc": "2.0",
"id": 1,
"id": self._request_id,
"method": method,
"params": params,
}
@@ -415,7 +485,16 @@ class HubClient:
msg = f"MCP error: {error.get('message', 'Unknown error')}"
raise HubClientError(msg)
return response.get("result", {})
result = response.get("result", {})
# Check for tool-level errors in content items
for item in result.get("content", []):
if item.get("isError", False):
error_text = item.get("text", "unknown error")
msg = f"Tool returned error: {error_text}"
raise HubClientError(msg)
return result
async def _send_notification(
self,
@@ -442,3 +521,218 @@ class HubClient:
notification_line = json.dumps(notification) + "\n"
writer.write(notification_line.encode())
await writer.drain()
# ------------------------------------------------------------------
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_session(
self,
config: HubServerConfig,
) -> PersistentSession:
"""Start a persistent Docker container and initialise MCP session.
The container stays running until :meth:`stop_persistent_session` is
called, allowing multiple tool calls on the same session.
:param config: Server configuration (must be Docker type).
:returns: The created persistent session.
:raises HubClientError: If the container cannot be started.
"""
logger = get_logger()
if config.name in self._persistent_sessions:
session = self._persistent_sessions[config.name]
if session.alive:
logger.info("Persistent session already running", server=config.name)
return session
# Dead session — clean up and restart
await self._cleanup_session(config.name)
if config.type != HubServerType.DOCKER:
msg = f"Persistent mode only supports Docker servers (got {config.type.value})"
raise HubClientError(msg)
if not config.image:
msg = f"Docker image not specified for server '{config.name}'"
raise HubClientError(msg)
container_name = f"fuzzforge-{config.name}"
# Remove stale container with same name if it exists
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
# Build docker run command (no --rm, with --name)
cmd = ["docker", "run", "-i", "--name", container_name]
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(config.image)
_STREAM_LIMIT = 4 * 1024 * 1024
logger.info(
"Starting persistent container",
server=config.name,
container=container_name,
image=config.image,
)
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
)
if process.stdin is None or process.stdout is None:
process.terminate()
msg = "Failed to get process streams"
raise HubClientError(msg)
session = PersistentSession(
server_name=config.name,
container_name=container_name,
process=process,
reader=process.stdout,
writer=process.stdin,
)
# Initialise MCP session
try:
await self._initialize_session(
session.reader, # type: ignore[arg-type]
session.writer, # type: ignore[arg-type]
config.name,
)
session.initialized = True
except Exception as e:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
msg = f"Failed to initialise MCP session for {config.name}: {e}"
raise HubClientError(msg) from e
self._persistent_sessions[config.name] = session
logger.info(
"Persistent session started",
server=config.name,
container=container_name,
)
return session
async def stop_persistent_session(self, server_name: str) -> bool:
"""Stop a persistent container session.
:param server_name: Name of the server whose session to stop.
:returns: True if a session was stopped, False if none found.
"""
return await self._cleanup_session(server_name)
def get_persistent_session(self, server_name: str) -> PersistentSession | None:
"""Get a persistent session by server name.
:param server_name: Server name.
:returns: The session if running, None otherwise.
"""
session = self._persistent_sessions.get(server_name)
if session and not session.alive:
# Mark dead session — don't remove here to avoid async issues
return None
return session
def list_persistent_sessions(self) -> list[dict[str, Any]]:
"""List all persistent sessions with their status.
:returns: List of session info dictionaries.
"""
sessions = []
for name, session in self._persistent_sessions.items():
sessions.append({
"server_name": name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"uptime_seconds": int(
(datetime.now(tz=timezone.utc) - session.started_at).total_seconds()
),
})
return sessions
async def stop_all_persistent_sessions(self) -> int:
"""Stop all persistent sessions.
:returns: Number of sessions stopped.
"""
names = list(self._persistent_sessions.keys())
count = 0
for name in names:
if await self._cleanup_session(name):
count += 1
return count
async def _cleanup_session(self, server_name: str) -> bool:
"""Clean up a persistent session (terminate process, remove container).
:param server_name: Server name.
:returns: True if cleaned up, False if not found.
"""
logger = get_logger()
session = self._persistent_sessions.pop(server_name, None)
if session is None:
return False
logger.info("Stopping persistent session", server=server_name)
# Terminate process
if session.alive:
session.process.terminate()
try:
await asyncio.wait_for(session.process.wait(), timeout=10)
except asyncio.TimeoutError:
session.process.kill()
await session.process.wait()
# Remove Docker container
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", session.container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
logger.info(
"Persistent session stopped",
server=server_name,
container=session.container_name,
)
return True

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.client import HubClient, HubClientError, PersistentSession
from fuzzforge_common.hub.models import HubServer, HubServerConfig, HubTool
from fuzzforge_common.hub.registry import HubRegistry
@@ -106,6 +106,7 @@ class HubExecutor:
"""
self._registry = HubRegistry(config_path)
self._client = HubClient(timeout=timeout)
self._continuous_sessions: dict[str, dict[str, Any]] = {}
@property
def registry(self) -> HubRegistry:
@@ -291,6 +292,7 @@ class HubExecutor:
"""
servers = []
for server in self._registry.servers:
session = self._client.get_persistent_session(server.name)
servers.append({
"name": server.name,
"identifier": server.identifier,
@@ -298,6 +300,8 @@ class HubExecutor:
"enabled": server.config.enabled,
"category": server.config.category,
"description": server.config.description,
"persistent": server.config.persistent,
"persistent_session_active": session is not None and session.alive,
"discovered": server.discovered,
"tool_count": len(server.tools),
"error": server.discovery_error,
@@ -332,3 +336,287 @@ class HubExecutor:
if tool:
return tool.input_schema
return None
# ------------------------------------------------------------------
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_server(self, server_name: str) -> dict[str, Any]:
"""Start a persistent container session for a server.
The container stays running between tool calls, allowing stateful
interactions (e.g., radare2 sessions, long-running fuzzing).
:param server_name: Name of the hub server to start.
:returns: Session status dictionary.
:raises ValueError: If server not found.
"""
logger = get_logger()
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
session = await self._client.start_persistent_session(server.config)
# Auto-discover tools on the new session
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
except HubClientError as e:
logger.warning(
"Tool discovery failed on persistent session",
server=server_name,
error=str(e),
)
# Include discovered tools in the result so agent knows what's available
discovered_tools = []
server_obj = self._registry.get_server(server_name)
if server_obj:
for tool in server_obj.tools:
discovered_tools.append({
"identifier": tool.identifier,
"name": tool.name,
"description": tool.description,
})
return {
"server_name": session.server_name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"tools": discovered_tools,
"tool_count": len(discovered_tools),
}
async def stop_persistent_server(self, server_name: str) -> bool:
"""Stop a persistent container session.
:param server_name: Server name.
:returns: True if a session was stopped.
"""
return await self._client.stop_persistent_session(server_name)
def get_persistent_status(self, server_name: str) -> dict[str, Any] | None:
"""Get status of a persistent session.
:param server_name: Server name.
:returns: Status dict or None if no session.
"""
session = self._client.get_persistent_session(server_name)
if not session:
return None
from datetime import datetime, timezone # noqa: PLC0415
return {
"server_name": session.server_name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"uptime_seconds": int(
(datetime.now(tz=timezone.utc) - session.started_at).total_seconds()
),
}
def list_persistent_sessions(self) -> list[dict[str, Any]]:
"""List all persistent sessions.
:returns: List of session status dicts.
"""
return self._client.list_persistent_sessions()
async def stop_all_persistent_servers(self) -> int:
"""Stop all persistent sessions.
:returns: Number of sessions stopped.
"""
return await self._client.stop_all_persistent_sessions()
# ------------------------------------------------------------------
# Continuous session management
# ------------------------------------------------------------------
async def start_continuous_tool(
self,
server_name: str,
start_tool: str,
arguments: dict[str, Any],
) -> dict[str, Any]:
"""Start a continuous hub tool session.
Ensures a persistent container is running, then calls the start tool
(e.g., ``cargo_fuzz_start``) which returns a session_id. Tracks the
session for subsequent status/stop calls.
:param server_name: Hub server name.
:param start_tool: Name of the start tool on the server.
:param arguments: Arguments for the start tool.
:returns: Start result including session_id.
:raises ValueError: If server not found.
"""
logger = get_logger()
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
# Ensure persistent session is running
persistent = self._client.get_persistent_session(server_name)
if not persistent or not persistent.alive:
logger.info(
"Auto-starting persistent session for continuous tool",
server=server_name,
)
await self._client.start_persistent_session(server.config)
# Discover tools on the new session
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
except HubClientError as e:
logger.warning(
"Tool discovery failed on persistent session",
server=server_name,
error=str(e),
)
# Call the start tool
result = await self._client.execute_tool(
server, start_tool, arguments,
)
# Extract session_id from result
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
start_result = json.loads(content_text) if content_text else result
except json.JSONDecodeError:
start_result = result
session_id = start_result.get("session_id", "")
if session_id:
from datetime import datetime, timezone # noqa: PLC0415
self._continuous_sessions[session_id] = {
"session_id": session_id,
"server_name": server_name,
"start_tool": start_tool,
"status_tool": start_tool.replace("_start", "_status"),
"stop_tool": start_tool.replace("_start", "_stop"),
"started_at": datetime.now(tz=timezone.utc).isoformat(),
"status": "running",
}
return start_result
async def get_continuous_tool_status(
self,
session_id: str,
) -> dict[str, Any]:
"""Get status of a continuous hub tool session.
:param session_id: Session ID from start_continuous_tool.
:returns: Status dict from the hub server's status tool.
:raises ValueError: If session not found.
"""
session_info = self._continuous_sessions.get(session_id)
if not session_info:
msg = f"Unknown continuous session: {session_id}"
raise ValueError(msg)
server = self._registry.get_server(session_info["server_name"])
if not server:
msg = f"Server '{session_info['server_name']}' not found"
raise ValueError(msg)
result = await self._client.execute_tool(
server,
session_info["status_tool"],
{"session_id": session_id},
)
# Parse the text content
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
return json.loads(content_text) if content_text else result
except json.JSONDecodeError:
return result
async def stop_continuous_tool(
self,
session_id: str,
) -> dict[str, Any]:
"""Stop a continuous hub tool session.
:param session_id: Session ID to stop.
:returns: Final results from the hub server's stop tool.
:raises ValueError: If session not found.
"""
session_info = self._continuous_sessions.get(session_id)
if not session_info:
msg = f"Unknown continuous session: {session_id}"
raise ValueError(msg)
server = self._registry.get_server(session_info["server_name"])
if not server:
msg = f"Server '{session_info['server_name']}' not found"
raise ValueError(msg)
result = await self._client.execute_tool(
server,
session_info["stop_tool"],
{"session_id": session_id},
)
# Parse the text content
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
stop_result = json.loads(content_text) if content_text else result
except json.JSONDecodeError:
stop_result = result
# Update session tracking
session_info["status"] = "stopped"
return stop_result
def list_continuous_sessions(self) -> list[dict[str, Any]]:
"""List all tracked continuous sessions.
:returns: List of continuous session info dicts.
"""
return list(self._continuous_sessions.values())

View File

@@ -92,6 +92,18 @@ class HubServerConfig(BaseModel):
description="Category for grouping servers",
)
#: Per-server timeout override in seconds (None = use default_timeout).
timeout: int | None = Field(
default=None,
description="Per-server execution timeout override in seconds",
)
#: Whether to use persistent container mode (keep container running between calls).
persistent: bool = Field(
default=False,
description="Keep container running between tool calls for stateful interactions",
)
class HubToolParameter(BaseModel):
"""A parameter for an MCP tool.