Compare commits

..

1 Commits

Author SHA1 Message Date
AFredefon
9e7c56ec09 fix: rename to FuzzForge AI and fix new-user onboarding issues 2026-02-26 11:14:08 +01:00
31 changed files with 172 additions and 2217 deletions

View File

@@ -360,8 +360,8 @@ Beyond modules, you can contribute to FuzzForge's core components.
1. **Clone and Install**
```bash
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
git clone https://github.com/FuzzingLabs/fuzzforge-ai.git
cd fuzzforge-ai
uv sync --all-extras
```

View File

@@ -1,4 +1,4 @@
.PHONY: help install sync format lint typecheck test build-modules build-hub-images clean
.PHONY: help install sync format lint typecheck test build-modules clean
SHELL := /bin/bash
@@ -12,9 +12,8 @@ help:
@echo " make lint - Lint code with ruff"
@echo " make typecheck - Type check with mypy"
@echo " make test - Run all tests"
@echo " make build-modules - Build all module container images"
@echo " make build-hub-images - Build all mcp-security-hub images"
@echo " make clean - Clean build artifacts"
@echo " make build-modules - Build all module container images"
@echo " make clean - Clean build artifacts"
@echo ""
# Install all dependencies
@@ -31,7 +30,7 @@ sync:
# Format all packages
format:
@for pkg in packages/fuzzforge-*/; do \
@for pkg in fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ]; then \
echo "Formatting $$pkg..."; \
cd "$$pkg" && uv run ruff format . && cd -; \
@@ -40,7 +39,7 @@ format:
# Lint all packages
lint:
@for pkg in packages/fuzzforge-*/; do \
@for pkg in fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ]; then \
echo "Linting $$pkg..."; \
cd "$$pkg" && uv run ruff check . && cd -; \
@@ -49,7 +48,7 @@ lint:
# Type check all packages
typecheck:
@for pkg in packages/fuzzforge-*/; do \
@for pkg in fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ] && [ -f "$$pkg/mypy.ini" ]; then \
echo "Type checking $$pkg..."; \
cd "$$pkg" && uv run mypy . && cd -; \
@@ -58,7 +57,7 @@ typecheck:
# Run all tests
test:
@for pkg in packages/fuzzforge-*/; do \
@for pkg in fuzzforge-*/; do \
if [ -f "$$pkg/pytest.ini" ]; then \
echo "Testing $$pkg..."; \
cd "$$pkg" && uv run pytest && cd -; \
@@ -81,12 +80,18 @@ build-modules:
echo "Using Docker"; \
CONTAINER_CMD="docker"; \
fi; \
sdk_version=$$(grep 'version' "fuzzforge-modules/fuzzforge-modules-sdk/pyproject.toml" 2>/dev/null | head -1 | sed 's/.*"\(.*\)".*/\1/' || echo "0.1.0"); \
echo "Building fuzzforge-modules-sdk:$$sdk_version (base image)..."; \
$$CONTAINER_CMD build \
-t "fuzzforge-modules-sdk:$$sdk_version" \
-t "localhost/fuzzforge-modules-sdk:$$sdk_version" \
"fuzzforge-modules/fuzzforge-modules-sdk/" || exit 1; \
for module in fuzzforge-modules/*/; do \
if [ -f "$$module/Dockerfile" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-modules-sdk/" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-module-template/" ]; then \
name=$$(basename $$module); \
version=$$(grep 'version' "$$module/pyproject.toml" 2>/dev/null | head -1 | sed 's/.*"\(.*\\)".*/\\1/' || echo "0.1.0"); \
version=$$(grep 'version' "$$module/pyproject.toml" 2>/dev/null | head -1 | sed 's/.*"\(.*\)".*/\1/' || echo "0.1.0"); \
echo "Building $$name:$$version..."; \
$$CONTAINER_CMD build -t "fuzzforge-$$name:$$version" "$$module" || exit 1; \
fi \
@@ -94,10 +99,6 @@ build-modules:
@echo ""
@echo "✓ All modules built successfully!"
# Build all mcp-security-hub images for the firmware analysis pipeline
build-hub-images:
@bash scripts/build-hub-images.sh
# Clean build artifacts
clean:
find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true

View File

@@ -43,7 +43,7 @@ At the heart of FuzzForge are **modules** - containerized security tools that AI
- **🔗 Composable**: Chain modules together into automated workflows
- **📦 Extensible**: Build custom modules with the Python SDK
FuzzForge AI handles module discovery, execution, and result collection. Security modules (developed separately) provide the actual security tooling - from static analyzers to fuzzers to crash triagers.
The OSS runtime handles module discovery, execution, and result collection. Security modules (developed separately) provide the actual security tooling - from static analyzers to fuzzers to crash triagers.
Instead of manually running security tools, describe what you want and let your AI assistant handle it.
@@ -175,7 +175,7 @@ FuzzForge modules are containerized security tools that AI agents can orchestrat
|---|---|---|
| **What** | Runtime & MCP server | Security research modules |
| **License** | Apache 2.0 | BSL 1.1 (Business Source License) |
| **Compatibility** | ✅ Runs any compatible module | ✅ Works with FuzzForge AI |
| **Compatibility** | ✅ Runs any compatible module | ✅ Works with OSS runtime |
**Enterprise modules** are developed separately and provide production-ready security tooling:
@@ -187,7 +187,7 @@ FuzzForge modules are containerized security tools that AI agents can orchestrat
| 🔐 **Vulnerability Detection** | Pattern Matcher, Taint Analyzer | Security vulnerability scanning |
| 📝 **Reporting** | Report Generator, SARIF Exporter | Automated security report generation |
> 💡 **Build your own modules!** The FuzzForge SDK allows you to create custom modules that integrate seamlessly with FuzzForge AI. See [Creating Custom Modules](#-creating-custom-modules).
> 💡 **Build your own modules!** The FuzzForge SDK allows you to create custom modules that integrate seamlessly with the OSS runtime. See [Creating Custom Modules](#-creating-custom-modules).
### Execution Modes

View File

@@ -31,8 +31,8 @@ This guide covers everything you need to know to get started with FuzzForge AI -
```bash
# 1. Clone and install
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
git clone https://github.com/FuzzingLabs/fuzzforge-ai.git
cd fuzzforge-ai
uv sync
# 2. Build the module images (one-time setup)
@@ -59,7 +59,7 @@ uv run fuzzforge mcp install claude-code # For Claude Code CLI
Before installing FuzzForge AI, ensure you have:
- **Python 3.12+** - [Download Python](https://www.python.org/downloads/)
- **Python 3.14+** - [Download Python](https://www.python.org/downloads/)
- **uv** package manager - [Install uv](https://docs.astral.sh/uv/)
- **Docker** - Container runtime ([Install Docker](https://docs.docker.com/get-docker/))
@@ -95,8 +95,8 @@ sudo usermod -aG docker $USER
### 1. Clone the Repository
```bash
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
git clone https://github.com/FuzzingLabs/fuzzforge-ai.git
cd fuzzforge-ai
```
### 2. Install Dependencies
@@ -122,14 +122,14 @@ FuzzForge modules are containerized security tools. After cloning, you need to b
### Build All Modules
```bash
# From the fuzzforge_ai directory
# From the fuzzforge-ai directory
make build-modules
```
This builds all available modules:
- `fuzzforge-rust-analyzer` - Analyzes Rust code for fuzzable functions
- `fuzzforge-cargo-fuzzer` - Runs cargo-fuzz on Rust crates
- `fuzzforge-harness-validator` - Validates generated fuzzing harnesses
- `fuzzforge-harness-tester` - Tests and validates generated fuzzing harnesses
- `fuzzforge-crash-analyzer` - Analyzes crash inputs
### Build a Single Module
@@ -169,7 +169,7 @@ uv run fuzzforge mcp install copilot
The command auto-detects everything:
- **FuzzForge root** - Where FuzzForge is installed
- **Modules path** - Defaults to `fuzzforge_ai/fuzzforge-modules`
- **Modules path** - Defaults to `fuzzforge-ai/fuzzforge-modules`
- **Docker socket** - Auto-detects `/var/run/docker.sock`
**Optional overrides** (usually not needed):
@@ -428,14 +428,14 @@ If you prefer Podman:
uv run fuzzforge mcp install copilot --engine podman
# Or set environment variable
export FUZZFORGE_ENGINE=podman
export FUZZFORGE_ENGINE__TYPE=podman
```
### Check Logs
FuzzForge stores execution logs in the storage directory:
FuzzForge stores execution results inside your project directory:
```bash
ls -la ~/.fuzzforge/storage/<project-id>/<execution-id>/
ls -la <your-project>/.fuzzforge/runs/<execution-id>/
```
---

View File

@@ -129,13 +129,13 @@ def _detect_docker_socket() -> str:
def _find_fuzzforge_root() -> Path:
"""Find the FuzzForge installation root.
:returns: Path to fuzzforge-oss directory.
:returns: Path to fuzzforge-ai directory.
"""
# Try to find from current file location
current = Path(__file__).resolve()
# Walk up to find fuzzforge-oss root
# Walk up to find fuzzforge-ai root
for parent in current.parents:
if (parent / "fuzzforge-mcp").is_dir() and (parent / "fuzzforge-runner").is_dir():
return parent
@@ -152,7 +152,7 @@ def _generate_mcp_config(
) -> dict:
"""Generate MCP server configuration.
:param fuzzforge_root: Path to fuzzforge-oss installation.
:param fuzzforge_root: Path to fuzzforge-ai installation.
:param modules_path: Path to the modules directory.
:param engine_type: Container engine type (podman or docker).
:param engine_socket: Container engine socket path.
@@ -185,8 +185,6 @@ def _generate_mcp_config(
"FUZZFORGE_ENGINE__TYPE": engine_type,
"FUZZFORGE_ENGINE__GRAPHROOT": str(graphroot),
"FUZZFORGE_ENGINE__RUNROOT": str(runroot),
"FUZZFORGE_HUB__ENABLED": "true",
"FUZZFORGE_HUB__CONFIG_PATH": str(fuzzforge_root / "hub-config.json"),
},
}
@@ -328,7 +326,7 @@ def generate(
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_mcp_path(fuzzforge_root)
config_path = _get_claude_code_user_mcp_path()
else: # Claude Desktop
config_path = _get_claude_desktop_mcp_path()
@@ -456,7 +454,6 @@ def install(
console.print(f" Modules Path: {resolved_modules}")
console.print(f" Engine: {engine}")
console.print(f" Socket: {socket}")
console.print(f" Hub Config: {fuzzforge_root / 'hub-config.json'}")
console.print()
console.print("[bold]Next steps:[/bold]")

View File

@@ -1,42 +0,0 @@
"""FuzzForge Hub - Generic MCP server bridge.
This module provides a generic bridge to connect FuzzForge with any MCP server.
It allows AI agents to discover and execute tools from external MCP servers
(like mcp-security-hub) through the same interface as native FuzzForge modules.
The hub is server-agnostic: it doesn't hardcode any specific tools or servers.
Instead, it dynamically discovers tools by connecting to configured MCP servers
and calling their `list_tools()` method.
Supported transport types:
- docker: Run MCP server as a Docker container with stdio transport
- command: Run MCP server as a local process with stdio transport
- sse: Connect to a remote MCP server via Server-Sent Events
"""
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.executor import HubExecutionResult, HubExecutor
from fuzzforge_common.hub.models import (
HubConfig,
HubServer,
HubServerConfig,
HubServerType,
HubTool,
HubToolParameter,
)
from fuzzforge_common.hub.registry import HubRegistry
__all__ = [
"HubClient",
"HubClientError",
"HubConfig",
"HubExecutionResult",
"HubExecutor",
"HubRegistry",
"HubServer",
"HubServerConfig",
"HubServerType",
"HubTool",
"HubToolParameter",
]

View File

@@ -1,444 +0,0 @@
"""MCP client for communicating with hub servers.
This module provides a generic MCP client that can connect to any MCP server
via stdio (docker/command) or SSE transport. It handles:
- Starting containers/processes for stdio transport
- Connecting to SSE endpoints
- Discovering tools via list_tools()
- Executing tools via call_tool()
"""
from __future__ import annotations
import asyncio
import json
import os
import subprocess
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.models import (
HubServer,
HubServerConfig,
HubServerType,
HubTool,
)
if TYPE_CHECKING:
from asyncio.subprocess import Process
from collections.abc import AsyncGenerator
from structlog.stdlib import BoundLogger
def get_logger() -> BoundLogger:
"""Get structlog logger instance.
:returns: Configured structlog logger.
"""
from structlog import get_logger # noqa: PLC0415
return cast("BoundLogger", get_logger())
class HubClientError(Exception):
"""Error in hub client operations."""
class HubClient:
"""Client for communicating with MCP hub servers.
Supports stdio (via docker/command) and SSE transports.
Uses the MCP protocol for tool discovery and execution.
"""
#: Default timeout for operations.
DEFAULT_TIMEOUT: int = 30
def __init__(self, timeout: int = DEFAULT_TIMEOUT) -> None:
"""Initialize the hub client.
:param timeout: Default timeout for operations in seconds.
"""
self._timeout = timeout
async def discover_tools(self, server: HubServer) -> list[HubTool]:
"""Discover tools from a hub server.
Connects to the server, calls list_tools(), and returns
parsed HubTool instances.
:param server: Hub server to discover tools from.
:returns: List of discovered tools.
:raises HubClientError: If discovery fails.
"""
logger = get_logger()
config = server.config
logger.info("Discovering tools", server=config.name, type=config.type.value)
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# List tools
tools_data = await self._call_method(
reader,
writer,
"tools/list",
{},
)
# Parse tools
tools = []
for tool_data in tools_data.get("tools", []):
tool = HubTool.from_mcp_tool(
server_name=config.name,
name=tool_data["name"],
description=tool_data.get("description"),
input_schema=tool_data.get("inputSchema", {}),
)
tools.append(tool)
logger.info(
"Discovered tools",
server=config.name,
count=len(tools),
)
return tools
except Exception as e:
logger.error(
"Tool discovery failed",
server=config.name,
error=str(e),
)
raise HubClientError(f"Discovery failed for {config.name}: {e}") from e
async def execute_tool(
self,
server: HubServer,
tool_name: str,
arguments: dict[str, Any],
*,
timeout: int | None = None,
) -> dict[str, Any]:
"""Execute a tool on a hub server.
:param server: Hub server to execute on.
:param tool_name: Name of the tool to execute.
:param arguments: Tool arguments.
:param timeout: Execution timeout (uses default if None).
:returns: Tool execution result.
:raises HubClientError: If execution fails.
"""
logger = get_logger()
config = server.config
exec_timeout = timeout or self._timeout
logger.info(
"Executing hub tool",
server=config.name,
tool=tool_name,
timeout=exec_timeout,
)
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Call tool
result = await asyncio.wait_for(
self._call_method(
reader,
writer,
"tools/call",
{"name": tool_name, "arguments": arguments},
),
timeout=exec_timeout,
)
logger.info(
"Tool execution completed",
server=config.name,
tool=tool_name,
)
return result
except asyncio.TimeoutError as e:
logger.error(
"Tool execution timed out",
server=config.name,
tool=tool_name,
timeout=exec_timeout,
)
raise HubClientError(
f"Execution timed out for {config.name}:{tool_name}"
) from e
except Exception as e:
logger.error(
"Tool execution failed",
server=config.name,
tool=tool_name,
error=str(e),
)
raise HubClientError(
f"Execution failed for {config.name}:{tool_name}: {e}"
) from e
@asynccontextmanager
async def _connect(
self,
config: HubServerConfig,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
:param config: Server configuration.
:yields: Tuple of (reader, writer) for communication.
"""
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config) as streams:
yield streams
elif config.type == HubServerType.COMMAND:
async with self._connect_command(config) as streams:
yield streams
elif config.type == HubServerType.SSE:
async with self._connect_sse(config) as streams:
yield streams
else:
msg = f"Unsupported server type: {config.type}"
raise HubClientError(msg)
@asynccontextmanager
async def _connect_docker(
self,
config: HubServerConfig,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to a Docker-based MCP server.
:param config: Server configuration with image name.
:yields: Tuple of (reader, writer) for stdio communication.
"""
if not config.image:
msg = f"Docker image not specified for server '{config.name}'"
raise HubClientError(msg)
# Build docker command
cmd = ["docker", "run", "-i", "--rm"]
# Add capabilities
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
# Add volumes
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
# Add environment variables
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(config.image)
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
try:
if process.stdin is None or process.stdout is None:
msg = "Failed to get process streams"
raise HubClientError(msg)
# Create asyncio streams from process pipes
reader = process.stdout
writer = process.stdin
yield reader, writer # type: ignore[misc]
finally:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
@asynccontextmanager
async def _connect_command(
self,
config: HubServerConfig,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to a command-based MCP server.
:param config: Server configuration with command.
:yields: Tuple of (reader, writer) for stdio communication.
"""
if not config.command:
msg = f"Command not specified for server '{config.name}'"
raise HubClientError(msg)
# Set up environment
env = dict(config.environment) if config.environment else None
process: Process = await asyncio.create_subprocess_exec(
*config.command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
try:
if process.stdin is None or process.stdout is None:
msg = "Failed to get process streams"
raise HubClientError(msg)
reader = process.stdout
writer = process.stdin
yield reader, writer # type: ignore[misc]
finally:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
@asynccontextmanager
async def _connect_sse(
self,
config: HubServerConfig,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an SSE-based MCP server.
:param config: Server configuration with URL.
:yields: Tuple of (reader, writer) for SSE communication.
"""
# SSE support requires additional dependencies
# For now, raise not implemented
msg = "SSE transport not yet implemented"
raise NotImplementedError(msg)
async def _initialize_session(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
server_name: str,
) -> dict[str, Any]:
"""Initialize MCP session with the server.
:param reader: Stream reader.
:param writer: Stream writer.
:param server_name: Server name for logging.
:returns: Server capabilities.
"""
# Send initialize request
result = await self._call_method(
reader,
writer,
"initialize",
{
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "fuzzforge-hub",
"version": "0.1.0",
},
},
)
# Send initialized notification
await self._send_notification(reader, writer, "notifications/initialized", {})
return result
async def _call_method(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
method: str,
params: dict[str, Any],
) -> dict[str, Any]:
"""Call an MCP method.
:param reader: Stream reader.
:param writer: Stream writer.
:param method: Method name.
:param params: Method parameters.
:returns: Method result.
"""
# Create JSON-RPC request
request = {
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
}
# Send request
request_line = json.dumps(request) + "\n"
writer.write(request_line.encode())
await writer.drain()
# Read response
response_line = await asyncio.wait_for(
reader.readline(),
timeout=self._timeout,
)
if not response_line:
msg = "Empty response from server"
raise HubClientError(msg)
response = json.loads(response_line.decode())
if "error" in response:
error = response["error"]
msg = f"MCP error: {error.get('message', 'Unknown error')}"
raise HubClientError(msg)
return response.get("result", {})
async def _send_notification(
self,
reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
method: str,
params: dict[str, Any],
) -> None:
"""Send an MCP notification (no response expected).
:param reader: Stream reader (unused but kept for consistency).
:param writer: Stream writer.
:param method: Notification method name.
:param params: Notification parameters.
"""
# Create JSON-RPC notification (no id)
notification = {
"jsonrpc": "2.0",
"method": method,
"params": params,
}
notification_line = json.dumps(notification) + "\n"
writer.write(notification_line.encode())
await writer.drain()

View File

@@ -1,334 +0,0 @@
"""Hub executor for managing MCP server lifecycle and tool execution.
This module provides a high-level interface for:
- Discovering tools from all registered hub servers
- Executing tools with proper error handling
- Managing the lifecycle of hub operations
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.models import HubServer, HubServerConfig, HubTool
from fuzzforge_common.hub.registry import HubRegistry
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
def get_logger() -> BoundLogger:
"""Get structlog logger instance.
:returns: Configured structlog logger.
"""
from structlog import get_logger # noqa: PLC0415
return cast("BoundLogger", get_logger())
class HubExecutionResult:
"""Result of a hub tool execution."""
def __init__(
self,
*,
success: bool,
server_name: str,
tool_name: str,
result: dict[str, Any] | None = None,
error: str | None = None,
) -> None:
"""Initialize execution result.
:param success: Whether execution succeeded.
:param server_name: Name of the hub server.
:param tool_name: Name of the executed tool.
:param result: Tool execution result data.
:param error: Error message if execution failed.
"""
self.success = success
self.server_name = server_name
self.tool_name = tool_name
self.result = result or {}
self.error = error
@property
def identifier(self) -> str:
"""Get full tool identifier."""
return f"hub:{self.server_name}:{self.tool_name}"
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary.
:returns: Dictionary representation.
"""
return {
"success": self.success,
"identifier": self.identifier,
"server": self.server_name,
"tool": self.tool_name,
"result": self.result,
"error": self.error,
}
class HubExecutor:
"""Executor for hub server operations.
Provides high-level methods for discovering and executing
tools from hub servers.
"""
#: Hub registry instance.
_registry: HubRegistry
#: MCP client instance.
_client: HubClient
def __init__(
self,
config_path: Path | None = None,
timeout: int = 300,
) -> None:
"""Initialize the hub executor.
:param config_path: Path to hub-servers.json config file.
:param timeout: Default timeout for tool execution.
"""
self._registry = HubRegistry(config_path)
self._client = HubClient(timeout=timeout)
@property
def registry(self) -> HubRegistry:
"""Get the hub registry.
:returns: Hub registry instance.
"""
return self._registry
def add_server(self, config: HubServerConfig) -> HubServer:
"""Add a server to the registry.
:param config: Server configuration.
:returns: Created HubServer instance.
"""
return self._registry.add_server(config)
async def discover_all_tools(self) -> dict[str, list[HubTool]]:
"""Discover tools from all enabled servers.
:returns: Dict mapping server names to lists of discovered tools.
"""
logger = get_logger()
results: dict[str, list[HubTool]] = {}
for server in self._registry.enabled_servers:
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server.name, tools)
results[server.name] = tools
except HubClientError as e:
logger.warning(
"Failed to discover tools",
server=server.name,
error=str(e),
)
self._registry.update_server_tools(server.name, [], error=str(e))
results[server.name] = []
return results
async def discover_server_tools(self, server_name: str) -> list[HubTool]:
"""Discover tools from a specific server.
:param server_name: Name of the server.
:returns: List of discovered tools.
:raises ValueError: If server not found.
"""
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
return tools
except HubClientError as e:
self._registry.update_server_tools(server_name, [], error=str(e))
raise
async def execute_tool(
self,
identifier: str,
arguments: dict[str, Any] | None = None,
*,
timeout: int | None = None,
) -> HubExecutionResult:
"""Execute a hub tool.
:param identifier: Tool identifier (hub:server:tool or server:tool).
:param arguments: Tool arguments.
:param timeout: Execution timeout.
:returns: Execution result.
"""
logger = get_logger()
arguments = arguments or {}
# Parse identifier and find tool
server, tool = self._registry.find_tool(identifier)
if not server or not tool:
# Try to parse as server:tool and discover
parts = identifier.replace("hub:", "").split(":")
if len(parts) == 2: # noqa: PLR2004
server_name, tool_name = parts
server = self._registry.get_server(server_name)
if server and not server.discovered:
# Try to discover tools first
try:
await self.discover_server_tools(server_name)
tool = server.get_tool(tool_name)
except HubClientError:
pass
if server and not tool:
# Tool not found, but server exists - try to execute anyway
# The server might have the tool even if discovery failed
tool_name_to_use = tool_name
else:
tool_name_to_use = tool.name if tool else ""
if not server:
return HubExecutionResult(
success=False,
server_name=server_name,
tool_name=tool_name,
error=f"Server '{server_name}' not found",
)
# Execute even if tool wasn't discovered (server might still have it)
try:
result = await self._client.execute_tool(
server,
tool_name_to_use or tool_name,
arguments,
timeout=timeout,
)
return HubExecutionResult(
success=True,
server_name=server.name,
tool_name=tool_name_to_use or tool_name,
result=result,
)
except HubClientError as e:
return HubExecutionResult(
success=False,
server_name=server.name,
tool_name=tool_name_to_use or tool_name,
error=str(e),
)
else:
return HubExecutionResult(
success=False,
server_name="unknown",
tool_name=identifier,
error=f"Invalid tool identifier: {identifier}",
)
# Execute the tool
logger.info(
"Executing hub tool",
server=server.name,
tool=tool.name,
arguments=arguments,
)
try:
result = await self._client.execute_tool(
server,
tool.name,
arguments,
timeout=timeout,
)
return HubExecutionResult(
success=True,
server_name=server.name,
tool_name=tool.name,
result=result,
)
except HubClientError as e:
return HubExecutionResult(
success=False,
server_name=server.name,
tool_name=tool.name,
error=str(e),
)
def list_servers(self) -> list[dict[str, Any]]:
"""List all registered servers with their status.
:returns: List of server info dicts.
"""
servers = []
for server in self._registry.servers:
servers.append({
"name": server.name,
"identifier": server.identifier,
"type": server.config.type.value,
"enabled": server.config.enabled,
"category": server.config.category,
"description": server.config.description,
"discovered": server.discovered,
"tool_count": len(server.tools),
"error": server.discovery_error,
})
return servers
def list_tools(self) -> list[dict[str, Any]]:
"""List all discovered tools.
:returns: List of tool info dicts.
"""
tools = []
for tool in self._registry.get_all_tools():
tools.append({
"identifier": tool.identifier,
"name": tool.name,
"server": tool.server_name,
"description": tool.description,
"parameters": [p.model_dump() for p in tool.parameters],
})
return tools
def get_tool_schema(self, identifier: str) -> dict[str, Any] | None:
"""Get the JSON Schema for a tool's input.
:param identifier: Tool identifier.
:returns: JSON Schema dict or None if not found.
"""
_, tool = self._registry.find_tool(identifier)
if tool:
return tool.input_schema
return None

View File

@@ -1,284 +0,0 @@
"""Data models for FuzzForge Hub.
This module defines the Pydantic models used to represent MCP servers
and their tools in the hub registry.
"""
from __future__ import annotations
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class HubServerType(str, Enum):
"""Type of MCP server connection."""
#: Run as Docker container with stdio transport.
DOCKER = "docker"
#: Run as local command/process with stdio transport.
COMMAND = "command"
#: Connect via Server-Sent Events (HTTP).
SSE = "sse"
class HubServerConfig(BaseModel):
"""Configuration for an MCP server in the hub.
This defines how to connect to an MCP server, not what tools it provides.
Tools are discovered dynamically at runtime.
"""
#: Unique identifier for this server (e.g., "nmap", "nuclei").
name: str = Field(description="Unique server identifier")
#: Human-readable description of the server.
description: str | None = Field(
default=None,
description="Human-readable description",
)
#: Type of connection to use.
type: HubServerType = Field(description="Connection type")
#: Docker image name (for type=docker).
image: str | None = Field(
default=None,
description="Docker image name (for docker type)",
)
#: Command to run (for type=command).
command: list[str] | None = Field(
default=None,
description="Command and args (for command type)",
)
#: URL endpoint (for type=sse).
url: str | None = Field(
default=None,
description="SSE endpoint URL (for sse type)",
)
#: Environment variables to pass to the server.
environment: dict[str, str] = Field(
default_factory=dict,
description="Environment variables",
)
#: Docker capabilities to add (e.g., ["NET_RAW"] for nmap).
capabilities: list[str] = Field(
default_factory=list,
description="Docker capabilities to add",
)
#: Volume mounts for Docker (e.g., ["/host/path:/container/path:ro"]).
volumes: list[str] = Field(
default_factory=list,
description="Docker volume mounts",
)
#: Whether this server is enabled.
enabled: bool = Field(
default=True,
description="Whether server is enabled",
)
#: Category for grouping (e.g., "reconnaissance", "web-security").
category: str | None = Field(
default=None,
description="Category for grouping servers",
)
class HubToolParameter(BaseModel):
"""A parameter for an MCP tool.
Parsed from the tool's JSON Schema inputSchema.
"""
#: Parameter name.
name: str
#: Parameter type (string, integer, boolean, array, object).
type: str
#: Human-readable description.
description: str | None = None
#: Whether this parameter is required.
required: bool = False
#: Default value if any.
default: Any = None
#: Enum values if constrained.
enum: list[Any] | None = None
class HubTool(BaseModel):
"""An MCP tool discovered from a hub server.
This is populated by calling `list_tools()` on the MCP server.
"""
#: Tool name as defined by the MCP server.
name: str = Field(description="Tool name from MCP server")
#: Human-readable description.
description: str | None = Field(
default=None,
description="Tool description",
)
#: Name of the hub server this tool belongs to.
server_name: str = Field(description="Parent server name")
#: Parsed parameters from inputSchema.
parameters: list[HubToolParameter] = Field(
default_factory=list,
description="Tool parameters",
)
#: Raw JSON Schema for the tool input.
input_schema: dict[str, Any] = Field(
default_factory=dict,
description="Raw JSON Schema from MCP",
)
@property
def identifier(self) -> str:
"""Get the full tool identifier (hub:server:tool)."""
return f"hub:{self.server_name}:{self.name}"
@classmethod
def from_mcp_tool(
cls,
server_name: str,
name: str,
description: str | None,
input_schema: dict[str, Any],
) -> HubTool:
"""Create a HubTool from MCP tool metadata.
:param server_name: Name of the parent hub server.
:param name: Tool name.
:param description: Tool description.
:param input_schema: JSON Schema for tool input.
:returns: HubTool instance.
"""
parameters = cls._parse_parameters(input_schema)
return cls(
name=name,
description=description,
server_name=server_name,
parameters=parameters,
input_schema=input_schema,
)
@staticmethod
def _parse_parameters(schema: dict[str, Any]) -> list[HubToolParameter]:
"""Parse parameters from JSON Schema.
:param schema: JSON Schema dict.
:returns: List of parsed parameters.
"""
parameters: list[HubToolParameter] = []
properties = schema.get("properties", {})
required_params = set(schema.get("required", []))
for name, prop in properties.items():
param = HubToolParameter(
name=name,
type=prop.get("type", "string"),
description=prop.get("description"),
required=name in required_params,
default=prop.get("default"),
enum=prop.get("enum"),
)
parameters.append(param)
return parameters
class HubServer(BaseModel):
"""A hub server with its discovered tools.
Combines configuration with dynamically discovered tools.
"""
#: Server configuration.
config: HubServerConfig
#: Tools discovered from the server (populated at runtime).
tools: list[HubTool] = Field(
default_factory=list,
description="Discovered tools",
)
#: Whether tools have been discovered.
discovered: bool = Field(
default=False,
description="Whether tools have been discovered",
)
#: Error message if discovery failed.
discovery_error: str | None = Field(
default=None,
description="Error message if discovery failed",
)
@property
def name(self) -> str:
"""Get server name."""
return self.config.name
@property
def identifier(self) -> str:
"""Get server identifier for module listing."""
return f"hub:{self.config.name}"
def get_tool(self, tool_name: str) -> HubTool | None:
"""Get a tool by name.
:param tool_name: Name of the tool.
:returns: HubTool if found, None otherwise.
"""
for tool in self.tools:
if tool.name == tool_name:
return tool
return None
class HubConfig(BaseModel):
"""Configuration for the entire hub.
Loaded from hub-servers.json or similar config file.
"""
#: List of configured servers.
servers: list[HubServerConfig] = Field(
default_factory=list,
description="Configured MCP servers",
)
#: Default timeout for tool execution (seconds).
default_timeout: int = Field(
default=300,
description="Default execution timeout",
)
#: Whether to cache discovered tools.
cache_tools: bool = Field(
default=True,
description="Cache discovered tools",
)

View File

@@ -1,258 +0,0 @@
"""Hub registry for managing MCP server configurations.
The registry loads server configurations from a JSON file and provides
methods to access and manage them. It does not hardcode any specific
servers or tools - everything is configured by the user.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import TYPE_CHECKING, cast
from fuzzforge_common.hub.models import (
HubConfig,
HubServer,
HubServerConfig,
)
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
def get_logger() -> BoundLogger:
"""Get structlog logger instance.
:returns: Configured structlog logger.
"""
from structlog import get_logger # noqa: PLC0415
return cast("BoundLogger", get_logger())
class HubRegistry:
"""Registry for MCP hub servers.
Manages the configuration and state of hub servers.
Configurations are loaded from a JSON file.
"""
#: Loaded hub configuration.
_config: HubConfig
#: Server instances with discovered tools.
_servers: dict[str, HubServer]
#: Path to the configuration file.
_config_path: Path | None
def __init__(self, config_path: Path | str | None = None) -> None:
"""Initialize the hub registry.
:param config_path: Path to hub-servers.json config file.
If None, starts with empty configuration.
"""
if config_path is not None:
self._config_path = Path(config_path)
else:
self._config_path = None
self._servers = {}
self._config = HubConfig()
if self._config_path and self._config_path.exists():
self._load_config(self._config_path)
def _load_config(self, config_path: Path) -> None:
"""Load configuration from JSON file.
:param config_path: Path to config file.
"""
logger = get_logger()
try:
with config_path.open() as f:
data = json.load(f)
self._config = HubConfig.model_validate(data)
# Create server instances from config
for server_config in self._config.servers:
if server_config.enabled:
self._servers[server_config.name] = HubServer(
config=server_config,
)
logger.info(
"Loaded hub configuration",
path=str(config_path),
servers=len(self._servers),
)
except Exception as e:
logger.error(
"Failed to load hub configuration",
path=str(config_path),
error=str(e),
)
raise
def reload(self) -> None:
"""Reload configuration from file."""
if self._config_path and self._config_path.exists():
self._servers.clear()
self._load_config(self._config_path)
@property
def servers(self) -> list[HubServer]:
"""Get all registered servers.
:returns: List of hub servers.
"""
return list(self._servers.values())
@property
def enabled_servers(self) -> list[HubServer]:
"""Get all enabled servers.
:returns: List of enabled hub servers.
"""
return [s for s in self._servers.values() if s.config.enabled]
def get_server(self, name: str) -> HubServer | None:
"""Get a server by name.
:param name: Server name.
:returns: HubServer if found, None otherwise.
"""
return self._servers.get(name)
def add_server(self, config: HubServerConfig) -> HubServer:
"""Add a server to the registry.
:param config: Server configuration.
:returns: Created HubServer instance.
:raises ValueError: If server with same name exists.
"""
if config.name in self._servers:
msg = f"Server '{config.name}' already exists"
raise ValueError(msg)
server = HubServer(config=config)
self._servers[config.name] = server
self._config.servers.append(config)
get_logger().info("Added hub server", name=config.name, type=config.type)
return server
def remove_server(self, name: str) -> bool:
"""Remove a server from the registry.
:param name: Server name.
:returns: True if removed, False if not found.
"""
if name not in self._servers:
return False
del self._servers[name]
self._config.servers = [s for s in self._config.servers if s.name != name]
get_logger().info("Removed hub server", name=name)
return True
def save_config(self, path: Path | None = None) -> None:
"""Save current configuration to file.
:param path: Path to save to. Uses original path if None.
"""
save_path = path or self._config_path
if not save_path:
msg = "No config path specified"
raise ValueError(msg)
with save_path.open("w") as f:
json.dump(
self._config.model_dump(mode="json"),
f,
indent=2,
)
get_logger().info("Saved hub configuration", path=str(save_path))
def update_server_tools(
self,
server_name: str,
tools: list,
*,
error: str | None = None,
) -> None:
"""Update discovered tools for a server.
Called by the hub client after tool discovery.
:param server_name: Server name.
:param tools: List of HubTool instances.
:param error: Error message if discovery failed.
"""
server = self._servers.get(server_name)
if not server:
return
if error:
server.discovered = False
server.discovery_error = error
server.tools = []
else:
server.discovered = True
server.discovery_error = None
server.tools = tools
def get_all_tools(self) -> list:
"""Get all discovered tools from all servers.
:returns: Flat list of all HubTool instances.
"""
tools = []
for server in self._servers.values():
if server.discovered:
tools.extend(server.tools)
return tools
def find_tool(self, identifier: str):
"""Find a tool by its full identifier.
:param identifier: Full identifier (hub:server:tool or server:tool).
:returns: Tuple of (HubServer, HubTool) if found, (None, None) otherwise.
"""
# Parse identifier
parts = identifier.split(":")
if len(parts) == 3 and parts[0] == "hub": # noqa: PLR2004
# hub:server:tool format
server_name = parts[1]
tool_name = parts[2]
elif len(parts) == 2: # noqa: PLR2004
# server:tool format
server_name = parts[0]
tool_name = parts[1]
else:
return None, None
server = self._servers.get(server_name)
if not server:
return None, None
tool = server.get_tool(tool_name)
return server, tool

View File

@@ -272,23 +272,6 @@ class AbstractFuzzForgeSandboxEngine(ABC):
message: str = f"method 'read_file_from_container' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def tail_file_from_container(self, identifier: str, path: str, start_line: int = 1) -> str:
"""Read a file from a running container starting at a given line number.
Uses ``tail -n +{start_line}`` to avoid re-reading the entire file on
every poll. This is the preferred method for incremental reads of
append-only files such as ``stream.jsonl``.
:param identifier: Container identifier.
:param path: Path to file inside container.
:param start_line: 1-based line number to start reading from.
:returns: File contents from *start_line* onwards (may be empty).
"""
message: str = f"method 'tail_file_from_container' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def list_containers(self, all_containers: bool = True) -> list[dict]:
"""List containers.

View File

@@ -389,24 +389,6 @@ class DockerCLI(AbstractFuzzForgeSandboxEngine):
return ""
return result.stdout
def tail_file_from_container(self, identifier: str, path: str, start_line: int = 1) -> str:
"""Read a file from a container starting at a given line number.
:param identifier: Container identifier.
:param path: Path to file in container.
:param start_line: 1-based line number to start reading from.
:returns: File contents from *start_line* onwards.
"""
result = self._run(
["exec", identifier, "tail", "-n", f"+{start_line}", path],
check=False,
)
if result.returncode != 0:
get_logger().debug("failed to tail file from container", path=path, start_line=start_line)
return ""
return result.stdout
def list_containers(self, all_containers: bool = True) -> list[dict]:
"""List containers.

View File

@@ -168,11 +168,6 @@ class Docker(AbstractFuzzForgeSandboxEngine):
message: str = "Docker engine read_file_from_container is not yet implemented"
raise NotImplementedError(message)
def tail_file_from_container(self, identifier: str, path: str, start_line: int = 1) -> str:
"""Read a file from a container starting at a given line number."""
message: str = "Docker engine tail_file_from_container is not yet implemented"
raise NotImplementedError(message)
def list_containers(self, all_containers: bool = True) -> list[dict]:
"""List containers."""
message: str = "Docker engine list_containers is not yet implemented"

View File

@@ -449,24 +449,6 @@ class PodmanCLI(AbstractFuzzForgeSandboxEngine):
return ""
return result.stdout
def tail_file_from_container(self, identifier: str, path: str, start_line: int = 1) -> str:
"""Read a file from a container starting at a given line number.
:param identifier: Container identifier.
:param path: Path to file in container.
:param start_line: 1-based line number to start reading from.
:returns: File contents from *start_line* onwards.
"""
result = self._run(
["exec", identifier, "tail", "-n", f"+{start_line}", path],
check=False,
)
if result.returncode != 0:
get_logger().debug("failed to tail file from container", path=path, start_line=start_line)
return ""
return result.stdout
def list_containers(self, all_containers: bool = True) -> list[dict]:
"""List containers.

View File

@@ -475,30 +475,6 @@ class Podman(AbstractFuzzForgeSandboxEngine):
return ""
return stdout.decode("utf-8", errors="replace") if stdout else ""
def tail_file_from_container(self, identifier: str, path: str, start_line: int = 1) -> str:
"""Read a file from a container starting at a given line number.
:param identifier: Container identifier.
:param path: Path to file inside container.
:param start_line: 1-based line number to start reading from.
:returns: File contents from *start_line* onwards.
"""
client: PodmanClient = self.get_client()
with client:
container: Container = client.containers.get(key=identifier)
(status, (stdout, stderr)) = container.exec_run(
cmd=["tail", "-n", f"+{start_line}", path],
demux=True,
)
if status != 0:
error_msg = stderr.decode("utf-8", errors="replace") if stderr else "File not found"
get_logger().debug(
"failed to tail file from container", path=path, start_line=start_line, error=error_msg,
)
return ""
return stdout.decode("utf-8", errors="replace") if stdout else ""
def list_containers(self, all_containers: bool = True) -> list[dict]:
"""List containers.

View File

@@ -45,11 +45,11 @@ For custom setups, you can manually configure the MCP server.
{
"mcpServers": {
"fuzzforge": {
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"command": "/path/to/fuzzforge-ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"cwd": "/path/to/fuzzforge-ai",
"env": {
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge-ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}
@@ -64,11 +64,11 @@ For custom setups, you can manually configure the MCP server.
"servers": {
"fuzzforge": {
"type": "stdio",
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"command": "/path/to/fuzzforge-ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"cwd": "/path/to/fuzzforge-ai",
"env": {
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge-ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}
@@ -83,11 +83,11 @@ For custom setups, you can manually configure the MCP server.
"mcpServers": {
"fuzzforge": {
"type": "stdio",
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"command": "/path/to/fuzzforge-ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"cwd": "/path/to/fuzzforge-ai",
"env": {
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge-ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}

View File

@@ -43,7 +43,6 @@ FuzzForge is a security research orchestration platform. Use these tools to:
3. **Execute workflows**: Chain multiple modules together
4. **Manage projects**: Initialize and configure projects
5. **Get results**: Retrieve execution results
6. **Hub tools**: Discover and execute tools from external MCP servers
Typical workflow:
1. Initialize a project with `init_project`
@@ -51,11 +50,6 @@ Typical workflow:
3. List available modules with `list_modules`
4. Execute a module with `execute_module` — use `assets_path` param to pass different inputs per module
5. Read outputs from `results_path` returned by `execute_module` — check module's `output_artifacts` metadata for filenames
Hub workflow:
1. List available hub servers with `list_hub_servers`
2. Discover tools from servers with `discover_hub_tools`
3. Execute hub tools with `execute_hub_tool`
""",
lifespan=lifespan,
)

View File

@@ -2,14 +2,13 @@
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, modules, projects, workflows
from fuzzforge_mcp.tools import modules, projects, workflows
mcp: FastMCP = FastMCP()
mcp.mount(modules.mcp)
mcp.mount(projects.mcp)
mcp.mount(workflows.mcp)
mcp.mount(hub.mcp)
__all__ = [
"mcp",

View File

@@ -1,315 +0,0 @@
"""MCP Hub tools for FuzzForge MCP server.
This module provides tools for interacting with external MCP servers
through the FuzzForge hub. AI agents can:
- List available hub servers and their tools
- Discover tools from hub servers
- Execute hub tools
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_common.hub import HubExecutor, HubServerConfig, HubServerType
from fuzzforge_mcp.dependencies import get_settings
mcp: FastMCP = FastMCP()
# Global hub executor instance (lazy initialization)
_hub_executor: HubExecutor | None = None
def _get_hub_executor() -> HubExecutor:
"""Get or create the hub executor instance.
:returns: Hub executor instance.
:raises ToolError: If hub is disabled.
"""
global _hub_executor
settings = get_settings()
if not settings.hub.enabled:
msg = "MCP Hub is disabled. Enable it via FUZZFORGE_HUB__ENABLED=true"
raise ToolError(msg)
if _hub_executor is None:
config_path = settings.hub.config_path
_hub_executor = HubExecutor(
config_path=config_path,
timeout=settings.hub.timeout,
)
return _hub_executor
@mcp.tool
async def list_hub_servers() -> dict[str, Any]:
"""List all registered MCP hub servers.
Returns information about configured hub servers, including
their connection type, status, and discovered tool count.
:return: Dictionary with list of hub servers.
"""
try:
executor = _get_hub_executor()
servers = executor.list_servers()
return {
"servers": servers,
"count": len(servers),
"enabled_count": len([s for s in servers if s["enabled"]]),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to list hub servers: {e}"
raise ToolError(msg) from e
@mcp.tool
async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"""Discover tools from hub servers.
Connects to hub servers and retrieves their available tools.
If server_name is provided, only discovers from that server.
Otherwise discovers from all enabled servers.
:param server_name: Optional specific server to discover from.
:return: Dictionary with discovered tools.
"""
try:
executor = _get_hub_executor()
if server_name:
tools = await executor.discover_server_tools(server_name)
return {
"server": server_name,
"tools": [
{
"identifier": t.identifier,
"name": t.name,
"description": t.description,
"parameters": [p.model_dump() for p in t.parameters],
}
for t in tools
],
"count": len(tools),
}
else:
results = await executor.discover_all_tools()
all_tools = []
for server, tools in results.items():
for tool in tools:
all_tools.append({
"identifier": tool.identifier,
"name": tool.name,
"server": server,
"description": tool.description,
"parameters": [p.model_dump() for p in tool.parameters],
})
return {
"servers_discovered": len(results),
"tools": all_tools,
"count": len(all_tools),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to discover hub tools: {e}"
raise ToolError(msg) from e
@mcp.tool
async def list_hub_tools() -> dict[str, Any]:
"""List all discovered hub tools.
Returns tools that have been previously discovered from hub servers.
Run discover_hub_tools first if no tools are listed.
:return: Dictionary with list of discovered tools.
"""
try:
executor = _get_hub_executor()
tools = executor.list_tools()
return {
"tools": tools,
"count": len(tools),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to list hub tools: {e}"
raise ToolError(msg) from e
@mcp.tool
async def execute_hub_tool(
identifier: str,
arguments: dict[str, Any] | None = None,
timeout: int | None = None,
) -> dict[str, Any]:
"""Execute a tool from a hub server.
:param identifier: Tool identifier (format: hub:server:tool or server:tool).
:param arguments: Tool arguments matching the tool's input schema.
:param timeout: Optional execution timeout in seconds.
:return: Tool execution result.
Example identifiers:
- "hub:nmap:nmap_scan"
- "nmap:nmap_scan"
- "hub:nuclei:nuclei_scan"
"""
try:
executor = _get_hub_executor()
result = await executor.execute_tool(
identifier=identifier,
arguments=arguments or {},
timeout=timeout,
)
return result.to_dict()
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Hub tool execution failed: {e}"
raise ToolError(msg) from e
@mcp.tool
async def get_hub_tool_schema(identifier: str) -> dict[str, Any]:
"""Get the input schema for a hub tool.
Returns the JSON Schema that describes the tool's expected arguments.
:param identifier: Tool identifier (format: hub:server:tool or server:tool).
:return: JSON Schema for the tool's input.
"""
try:
executor = _get_hub_executor()
schema = executor.get_tool_schema(identifier)
if schema is None:
msg = f"Tool '{identifier}' not found. Run discover_hub_tools first."
raise ToolError(msg)
return {
"identifier": identifier,
"schema": schema,
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to get tool schema: {e}"
raise ToolError(msg) from e
@mcp.tool
async def add_hub_server(
name: str,
server_type: str,
image: str | None = None,
command: list[str] | None = None,
url: str | None = None,
category: str | None = None,
description: str | None = None,
capabilities: list[str] | None = None,
environment: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Add a new MCP server to the hub.
Register a new external MCP server that can be used for tool discovery
and execution. Servers can be Docker images, local commands, or SSE endpoints.
:param name: Unique name for the server (e.g., "nmap", "nuclei").
:param server_type: Connection type ("docker", "command", or "sse").
:param image: Docker image name (for docker type).
:param command: Command and args (for command type).
:param url: SSE endpoint URL (for sse type).
:param category: Category for grouping (e.g., "reconnaissance").
:param description: Human-readable description.
:param capabilities: Docker capabilities to add (e.g., ["NET_RAW"]).
:param environment: Environment variables to pass.
:return: Information about the added server.
Examples:
- Docker: add_hub_server("nmap", "docker", image="nmap-mcp:latest", capabilities=["NET_RAW"])
- Command: add_hub_server("custom", "command", command=["python", "server.py"])
"""
try:
executor = _get_hub_executor()
# Parse server type
try:
stype = HubServerType(server_type)
except ValueError:
msg = f"Invalid server type: {server_type}. Use 'docker', 'command', or 'sse'."
raise ToolError(msg) from None
# Validate required fields based on type
if stype == HubServerType.DOCKER and not image:
msg = "Docker image required for docker type"
raise ToolError(msg)
if stype == HubServerType.COMMAND and not command:
msg = "Command required for command type"
raise ToolError(msg)
if stype == HubServerType.SSE and not url:
msg = "URL required for sse type"
raise ToolError(msg)
config = HubServerConfig(
name=name,
type=stype,
image=image,
command=command,
url=url,
category=category,
description=description,
capabilities=capabilities or [],
environment=environment or {},
)
server = executor.add_server(config)
return {
"success": True,
"server": {
"name": server.name,
"identifier": server.identifier,
"type": server.config.type.value,
"enabled": server.config.enabled,
},
"message": f"Server '{name}' added. Use discover_hub_tools('{name}') to discover its tools.",
}
except ValueError as e:
msg = f"Failed to add server: {e}"
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to add hub server: {e}"
raise ToolError(msg) from e

View File

@@ -49,24 +49,19 @@ async def list_modules() -> dict[str, Any]:
"image": f"{module.identifier}:{module.version or 'latest'}",
"available": module.available,
"description": module.description,
# New metadata fields from pyproject.toml
"category": module.category,
"language": module.language,
"pipeline_stage": module.pipeline_stage,
"pipeline_order": module.pipeline_order,
"dependencies": module.dependencies,
"continuous_mode": module.continuous_mode,
"typical_duration": module.typical_duration,
"suggested_predecessors": module.suggested_predecessors,
# AI-discoverable metadata
"use_cases": module.use_cases,
"input_requirements": module.input_requirements,
"common_inputs": module.common_inputs,
"output_artifacts": module.output_artifacts,
"output_treatment": module.output_treatment,
}
for module in modules
]
# Sort by pipeline_order if available
available_modules.sort(key=lambda m: (m.get("pipeline_order") or 999, m["identifier"]))
# Sort alphabetically by identifier
available_modules.sort(key=lambda m: m["identifier"])
return {
"modules": available_modules,
@@ -187,9 +182,6 @@ async def start_continuous_module(
"container_id": result["container_id"],
"input_dir": result["input_dir"],
"project_path": str(project_path),
# Incremental stream.jsonl tracking
"stream_lines_read": 0,
"total_crashes": 0,
}
return {
@@ -207,29 +199,24 @@ async def start_continuous_module(
def _get_continuous_status_impl(session_id: str) -> dict[str, Any]:
"""Internal helper to get continuous session status (non-tool version).
Uses incremental reads of ``stream.jsonl`` via ``tail -n +offset`` so that
only new lines appended since the last poll are fetched and parsed. Crash
counts and latest metrics are accumulated across polls.
"""
"""Internal helper to get continuous session status (non-tool version)."""
if session_id not in _background_executions:
raise ToolError(f"Unknown session: {session_id}. Use list_continuous_sessions() to see active sessions.")
execution = _background_executions[session_id]
container_id = execution.get("container_id")
# Carry forward accumulated state
# Initialize metrics
metrics: dict[str, Any] = {
"total_executions": 0,
"total_crashes": execution.get("total_crashes", 0),
"total_crashes": 0,
"exec_per_sec": 0,
"coverage": 0,
"current_target": "",
"new_events": [],
"latest_events": [],
}
# Read stream.jsonl from inside the running container
if container_id:
try:
runner: Runner = get_runner()
@@ -240,45 +227,34 @@ def _get_continuous_status_impl(session_id: str) -> dict[str, Any]:
if container_status != "running":
execution["status"] = "stopped" if container_status == "exited" else container_status
# Incremental read: only fetch lines we haven't seen yet
lines_read: int = execution.get("stream_lines_read", 0)
stream_content = executor.read_module_output_incremental(
container_id,
start_line=lines_read + 1,
output_file="/data/output/stream.jsonl",
)
# Read stream.jsonl from container
stream_content = executor.read_module_output(container_id, "/data/output/stream.jsonl")
if stream_content:
new_lines = stream_content.strip().split("\n")
new_line_count = 0
lines = stream_content.strip().split("\n")
# Get last 20 events
recent_lines = lines[-20:] if len(lines) > 20 else lines
crash_count = 0
for line in new_lines:
if not line.strip():
continue
for line in recent_lines:
try:
event = json.loads(line)
metrics["latest_events"].append(event)
# Extract metrics from events
if event.get("event") == "metrics":
metrics["total_executions"] = event.get("executions", 0)
metrics["current_target"] = event.get("target", "")
metrics["exec_per_sec"] = event.get("exec_per_sec", 0)
metrics["coverage"] = event.get("coverage", 0)
if event.get("event") == "crash_detected":
crash_count += 1
except json.JSONDecodeError:
# Possible torn read on the very last line — skip it
# and do NOT advance the offset so it is re-read next
# poll when the write is complete.
continue
new_line_count += 1
metrics["new_events"].append(event)
# Extract latest metrics snapshot
if event.get("event") == "metrics":
metrics["total_executions"] = event.get("executions", 0)
metrics["current_target"] = event.get("target", "")
metrics["exec_per_sec"] = event.get("exec_per_sec", 0)
metrics["coverage"] = event.get("coverage", 0)
if event.get("event") == "crash_detected":
metrics["total_crashes"] += 1
# Advance offset by successfully parsed lines only
execution["stream_lines_read"] = lines_read + new_line_count
execution["total_crashes"] = metrics["total_crashes"]
metrics["total_crashes"] = crash_count
except Exception as e:
metrics["error"] = str(e)

View File

@@ -1,6 +1,6 @@
"""MCP tool tests for FuzzForge AI.
Tests the MCP tools that are available in FuzzForge AI.
Tests the MCP tools that are available in the OSS version.
"""
import pytest

View File

@@ -20,7 +20,7 @@ from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults, FuzzForgeModuleStatus
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult
@@ -79,19 +79,19 @@ class Module(FuzzForgeModule):
logger.info("cargo-fuzzer starting", resource_count=len(resources))
# Emit initial progress
self.emit_progress(0, status=FuzzForgeModuleStatus.INITIALIZING, message="Setting up fuzzing environment")
self.emit_progress(0, status="initializing", message="Setting up fuzzing environment")
self.emit_event("module_started", resource_count=len(resources))
# Setup the fuzzing environment
if not self._setup_environment(resources):
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Failed to setup environment")
self.emit_progress(100, status="failed", message="Failed to setup environment")
return FuzzForgeModuleResults.FAILURE
# Get list of fuzz targets
targets = self._get_fuzz_targets()
if not targets:
logger.error("no fuzz targets found")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="No fuzz targets found")
self.emit_progress(100, status="failed", message="No fuzz targets found")
return FuzzForgeModuleResults.FAILURE
# Filter targets if specific ones were requested
@@ -100,7 +100,7 @@ class Module(FuzzForgeModule):
targets = [t for t in targets if t in requested]
if not targets:
logger.error("none of the requested targets found", requested=list(requested))
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Requested targets not found")
self.emit_progress(100, status="failed", message="Requested targets not found")
return FuzzForgeModuleResults.FAILURE
logger.info("found fuzz targets", targets=targets)
@@ -137,7 +137,7 @@ class Module(FuzzForgeModule):
progress = int((i / len(targets)) * 100) if not is_continuous else 50
self.emit_progress(
progress,
status=FuzzForgeModuleStatus.RUNNING,
status="running",
message=progress_msg,
current_task=target,
metrics={
@@ -177,7 +177,7 @@ class Module(FuzzForgeModule):
# Emit final progress
self.emit_progress(
100,
status=FuzzForgeModuleStatus.COMPLETED,
status="completed",
message=f"Fuzzing completed. Found {total_crashes} crashes.",
metrics={
"targets_fuzzed": len(self._target_results),

View File

@@ -18,8 +18,8 @@ WORKDIR /app
# Create FuzzForge standard directories
RUN mkdir -p /fuzzforge/input /fuzzforge/output
# Copy wheels directory (built by parent Makefile)
COPY .wheels /wheels
# Create wheels directory (populated with pre-built wheels when available)
RUN mkdir -p /wheels
# Set up uv for the container
ENV UV_SYSTEM_PYTHON=1

View File

@@ -1,9 +1,13 @@
from pathlib import Path
PATH_TO_DATA: Path = Path("/fuzzforge")
PATH_TO_DATA: Path = Path("/data")
PATH_TO_INPUTS: Path = PATH_TO_DATA.joinpath("input")
PATH_TO_INPUT: Path = PATH_TO_INPUTS.joinpath("input.json")
PATH_TO_OUTPUTS: Path = PATH_TO_DATA.joinpath("output")
PATH_TO_ARTIFACTS: Path = PATH_TO_OUTPUTS.joinpath("artifacts")
PATH_TO_RESULTS: Path = PATH_TO_OUTPUTS.joinpath("results.json")
PATH_TO_LOGS: Path = PATH_TO_OUTPUTS.joinpath("logs.jsonl")
# Streaming output paths for real-time progress
PATH_TO_PROGRESS: Path = PATH_TO_OUTPUTS.joinpath("progress.json")
PATH_TO_STREAM: Path = PATH_TO_OUTPUTS.joinpath("stream.jsonl")

View File

@@ -1,9 +1,3 @@
"""Core module models for FuzzForge modules SDK.
This module contains the base classes for module settings, inputs, and resources.
These are compatible with the fuzzforge-core SDK structure.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from typing import TypeVar
@@ -12,27 +6,27 @@ from pydantic import BaseModel, ConfigDict
class Base(BaseModel):
"""Base model for all FuzzForge module types."""
"""TODO."""
model_config = ConfigDict(extra="forbid")
class FuzzForgeModulesSettingsBase(Base):
"""Base class for module settings."""
"""TODO."""
FuzzForgeModulesSettingsType = TypeVar("FuzzForgeModulesSettingsType", bound=FuzzForgeModulesSettingsBase)
class FuzzForgeModuleResources(StrEnum):
"""Enumeration of resource types."""
"""Enumeration of artifact types."""
#: The type of the resource is unknown or irrelevant.
UNKNOWN = "unknown"
class FuzzForgeModuleResource(Base):
"""A resource provided to a module as input."""
"""TODO."""
#: The description of the resource.
description: str
@@ -51,3 +45,41 @@ class FuzzForgeModuleInputBase[FuzzForgeModulesSettingsType: FuzzForgeModulesSet
resources: list[FuzzForgeModuleResource]
#: The settings of the module.
settings: FuzzForgeModulesSettingsType
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""TODO."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifacts]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults

View File

@@ -1,90 +0,0 @@
"""FuzzForge modules SDK models.
This module provides backward-compatible exports for all model types.
For Core SDK compatibility, use imports from `fuzzforge_modules_sdk.api.models.mod`.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from pydantic import ConfigDict
# Re-export from mod.py for Core SDK compatibility
from fuzzforge_modules_sdk.api.models.mod import (
Base,
FuzzForgeModuleInputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResources,
FuzzForgeModulesSettingsBase,
FuzzForgeModulesSettingsType,
)
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""Module execution result enumeration."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleStatus(StrEnum):
"""Possible statuses emitted by a running module."""
#: Module is setting up its environment.
INITIALIZING = "initializing"
#: Module is actively running.
RUNNING = "running"
#: Module finished successfully.
COMPLETED = "completed"
#: Module encountered an error.
FAILED = "failed"
#: Module was stopped by the orchestrator (SIGTERM).
STOPPED = "stopped"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifact]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults
__all__ = [
# Core SDK compatible exports
"Base",
"FuzzForgeModuleInputBase",
"FuzzForgeModuleResource",
"FuzzForgeModuleResources",
"FuzzForgeModulesSettingsBase",
"FuzzForgeModulesSettingsType",
# OSS-specific exports (also used in OSS modules)
"FuzzForgeModuleArtifact",
"FuzzForgeModuleArtifacts",
"FuzzForgeModuleOutputBase",
"FuzzForgeModuleResults",
"FuzzForgeModuleStatus",
]

View File

@@ -1,7 +1,5 @@
from abc import ABC, abstractmethod
import json
import signal
import threading
import time
from datetime import datetime, timezone
from shutil import rmtree
@@ -13,7 +11,9 @@ from fuzzforge_modules_sdk.api.constants import (
PATH_TO_ARTIFACTS,
PATH_TO_INPUT,
PATH_TO_LOGS,
PATH_TO_PROGRESS,
PATH_TO_RESULTS,
PATH_TO_STREAM,
)
from fuzzforge_modules_sdk.api.exceptions import FuzzForgeModuleError
from fuzzforge_modules_sdk.api.models import (
@@ -23,7 +23,6 @@ from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleOutputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResults,
FuzzForgeModuleStatus,
FuzzForgeModulesSettingsType,
)
@@ -53,11 +52,6 @@ class FuzzForgeModule(ABC):
#: Custom output data set by the module.
__output_data: dict[str, Any]
#: Event set when stop is requested (SIGTERM received).
#: Using :class:`threading.Event` so multi-threaded modules can
#: efficiently wait on it via :pymethod:`threading.Event.wait`.
__stop_requested: threading.Event
def __init__(self, name: str, version: str) -> None:
"""Initialize an instance of the class.
@@ -71,10 +65,10 @@ class FuzzForgeModule(ABC):
self.__version = version
self.__start_time = time.time()
self.__output_data = {}
self.__stop_requested = threading.Event()
# Register SIGTERM handler for graceful shutdown
signal.signal(signal.SIGTERM, self._handle_sigterm)
# Initialize streaming output files
PATH_TO_PROGRESS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_STREAM.parent.mkdir(exist_ok=True, parents=True)
@final
def get_logger(self) -> BoundLogger:
@@ -92,58 +86,6 @@ class FuzzForgeModule(ABC):
return self.__version
@final
def is_stop_requested(self) -> bool:
"""Check if stop was requested (SIGTERM received).
Long-running modules should check this periodically and exit gracefully
when True. Results will be written automatically on SIGTERM.
The underlying :class:`threading.Event` can be obtained via
:meth:`stop_event` for modules that need to *wait* on it.
:returns: True if SIGTERM was received.
"""
return self.__stop_requested.is_set()
@final
def stop_event(self) -> threading.Event:
"""Return the stop :class:`threading.Event`.
Multi-threaded modules can use ``self.stop_event().wait(timeout)``
instead of polling :meth:`is_stop_requested` in a busy-loop.
:returns: The threading event that is set on SIGTERM.
"""
return self.__stop_requested
@final
def _handle_sigterm(self, signum: int, frame: Any) -> None:
"""Handle SIGTERM signal for graceful shutdown.
Sets the stop event and emits a final progress update, then returns.
The normal :meth:`main` lifecycle (run → cleanup → write results) will
complete as usual once :meth:`_run` observes :meth:`is_stop_requested`
and returns, giving the module a chance to do any last-minute work
before the process exits.
:param signum: Signal number.
:param frame: Current stack frame.
"""
self.__stop_requested.set()
self.get_logger().info("received SIGTERM, stopping after current operation")
# Emit final progress update
self.emit_progress(
progress=100,
status=FuzzForgeModuleStatus.STOPPED,
message="Module stopped by orchestrator (SIGTERM)",
)
@final
def set_output(self, **kwargs: Any) -> None:
"""Set custom output data to be included in results.json.
@@ -165,53 +107,63 @@ class FuzzForgeModule(ABC):
def emit_progress(
self,
progress: int,
status: FuzzForgeModuleStatus = FuzzForgeModuleStatus.RUNNING,
status: str = "running",
message: str = "",
metrics: dict[str, Any] | None = None,
current_task: str = "",
) -> None:
"""Emit a structured progress event to stdout (JSONL).
"""Emit a progress update to the progress file.
Progress is written as a single JSON line to stdout so that the
orchestrator can capture it via ``kubectl logs`` without requiring
any file-system access inside the container.
This method writes to /data/output/progress.json which can be polled
by the orchestrator or UI to show real-time progress.
:param progress: Progress percentage (0-100).
:param status: Current module status.
:param status: Current status ("initializing", "running", "completed", "failed").
:param message: Human-readable status message.
:param metrics: Dictionary of metrics (e.g., {"executions": 1000, "coverage": 50}).
:param current_task: Name of the current task being performed.
"""
self.emit_event(
"progress",
status=status.value,
progress=max(0, min(100, progress)),
message=message,
current_task=current_task,
metrics=metrics or {},
)
elapsed = time.time() - self.__start_time
progress_data = {
"module": self.__name,
"version": self.__version,
"status": status,
"progress": max(0, min(100, progress)),
"message": message,
"current_task": current_task,
"elapsed_seconds": round(elapsed, 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": metrics or {},
}
PATH_TO_PROGRESS.write_text(json.dumps(progress_data, indent=2))
@final
def emit_event(self, event: str, **data: Any) -> None:
"""Emit a structured event to stdout as a single JSONL line.
"""Emit a streaming event to the stream file.
All module events (including progress updates) are written to stdout
so the orchestrator can stream them in real time via ``kubectl logs``.
This method appends to /data/output/stream.jsonl which can be tailed
by the orchestrator or UI for real-time event streaming.
:param event: Event type (e.g., ``"crash_found"``, ``"target_started"``,
``"progress"``, ``"metrics"``).
:param event: Event type (e.g., "crash_found", "target_started", "metrics").
:param data: Additional event data as keyword arguments.
"""
elapsed = time.time() - self.__start_time
event_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_seconds": round(self.get_elapsed_seconds(), 2),
"elapsed_seconds": round(elapsed, 2),
"module": self.__name,
"event": event,
**data,
}
print(json.dumps(event_data), flush=True)
# Append to stream file (create if doesn't exist)
with PATH_TO_STREAM.open("a") as f:
f.write(json.dumps(event_data) + "\n")
@final
def get_elapsed_seconds(self) -> float:
@@ -256,7 +208,7 @@ class FuzzForgeModule(ABC):
@final
def main(self) -> None:
"""Execute the module lifecycle: prepare → run → cleanup → write results."""
"""TODO."""
result = FuzzForgeModuleResults.SUCCESS
try:
@@ -286,8 +238,9 @@ class FuzzForgeModule(ABC):
result=result,
**self.__output_data,
)
buffer = output.model_dump_json().encode("utf-8")
PATH_TO_RESULTS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_RESULTS.write_bytes(output.model_dump_json().encode("utf-8"))
PATH_TO_RESULTS.write_bytes(buffer)
@classmethod
@abstractmethod

View File

@@ -741,27 +741,6 @@ class ModuleExecutor:
engine = self._get_engine()
return engine.read_file_from_container(container_id, output_file)
def read_module_output_incremental(
self,
container_id: str,
start_line: int = 1,
output_file: str = f"{SANDBOX_OUTPUT_DIRECTORY}/stream.jsonl",
) -> str:
"""Read new lines from an output file inside a running module container.
Uses ``tail -n +{start_line}`` so only lines appended since the last
read are returned. Callers should track the number of lines already
consumed and pass ``start_line = previous_count + 1`` on the next call.
:param container_id: The container identifier.
:param start_line: 1-based line number to start reading from.
:param output_file: Path to output file inside container.
:returns: New file contents from *start_line* onwards (may be empty).
"""
engine = self._get_engine()
return engine.tail_file_from_container(container_id, output_file, start_line=start_line)
def get_module_status(self, container_id: str) -> str:
"""Get the status of a running module container.

View File

@@ -71,29 +71,6 @@ class RegistrySettings(BaseModel):
password: str | None = None
class HubSettings(BaseModel):
"""MCP Hub configuration for external tool servers.
Controls the hub that bridges FuzzForge with external MCP servers
(e.g., mcp-security-hub). When enabled, AI agents can discover
and execute tools from registered MCP servers.
Configure via environment variables:
``FUZZFORGE_HUB__ENABLED=true``
``FUZZFORGE_HUB__CONFIG_PATH=/path/to/hub-config.json``
``FUZZFORGE_HUB__TIMEOUT=300``
"""
#: Whether the MCP hub is enabled.
enabled: bool = Field(default=True)
#: Path to the hub configuration JSON file.
config_path: Path = Field(default=Path.home() / ".fuzzforge" / "hub-config.json")
#: Default timeout in seconds for hub tool execution.
timeout: int = Field(default=300)
class Settings(BaseSettings):
"""FuzzForge Runner settings.
@@ -125,9 +102,6 @@ class Settings(BaseSettings):
#: Container registry settings.
registry: RegistrySettings = Field(default_factory=RegistrySettings)
#: MCP Hub settings.
hub: HubSettings = Field(default_factory=HubSettings)
#: Path to modules directory (for development/local builds).
modules_path: Path = Field(default=Path.home() / ".fuzzforge" / "modules")

View File

@@ -1,105 +0,0 @@
{
"servers": [
{
"name": "nmap-mcp",
"description": "Network reconnaissance using Nmap - port scanning, service detection, OS fingerprinting",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": ["NET_RAW"],
"enabled": true
},
{
"name": "binwalk-mcp",
"description": "Firmware extraction and analysis using Binwalk - file signatures, entropy analysis, embedded file extraction",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "yara-mcp",
"description": "Pattern matching and malware classification using YARA rules",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "capa-mcp",
"description": "Static capability detection using capa - identifies malware capabilities in binaries",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "radare2-mcp",
"description": "Binary analysis and reverse engineering using radare2",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "ghidra-mcp",
"description": "Advanced binary decompilation and reverse engineering using Ghidra",
"type": "docker",
"image": "ghcr.io/clearbluejar/pyghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "searchsploit-mcp",
"description": "CVE and exploit search using SearchSploit / Exploit-DB",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "nuclei-mcp",
"description": "Vulnerability scanning using Nuclei templates",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": ["NET_RAW"],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "trivy-mcp",
"description": "Container and filesystem vulnerability scanning using Trivy",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
},
{
"name": "gitleaks-mcp",
"description": "Secret and credential detection in code and firmware using Gitleaks",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": ["~/.fuzzforge/hub/workspace:/data"],
"enabled": true
}
],
"default_timeout": 300,
"cache_tools": true
}

View File

@@ -1,5 +1,5 @@
[project]
name = "fuzzforge-oss"
name = "fuzzforge-ai"
version = "1.0.0"
description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md"