Compare commits

...

2 Commits

Author SHA1 Message Date
AFredefon
1d495cedce refactor: remove module system, migrate to MCP hub tools architecture 2026-03-08 17:53:29 +01:00
AFredefon
075b678e9d Merge pull request #42 from FuzzingLabs/features/hub-integration
Features/hub integration
2026-03-04 14:13:20 +01:00
141 changed files with 1182 additions and 8992 deletions

View File

@@ -1,4 +1,4 @@
.PHONY: help install sync format lint typecheck test build-modules build-hub-images clean
.PHONY: help install sync format lint typecheck test build-hub-images clean
SHELL := /bin/bash
@@ -12,7 +12,6 @@ help:
@echo " make lint - Lint code with ruff"
@echo " make typecheck - Type check with mypy"
@echo " make test - Run all tests"
@echo " make build-modules - Build all module container images"
@echo " make build-hub-images - Build all mcp-security-hub images"
@echo " make clean - Clean build artifacts"
@echo ""
@@ -65,35 +64,6 @@ test:
fi \
done
# Build all module container images
# Uses Docker by default, or Podman if FUZZFORGE_ENGINE=podman
build-modules:
@echo "Building FuzzForge module images..."
@if [ "$$FUZZFORGE_ENGINE" = "podman" ]; then \
if [ -n "$$SNAP" ]; then \
echo "Using Podman with isolated storage (Snap detected)"; \
CONTAINER_CMD="podman --root ~/.fuzzforge/containers/storage --runroot ~/.fuzzforge/containers/run"; \
else \
echo "Using Podman"; \
CONTAINER_CMD="podman"; \
fi; \
else \
echo "Using Docker"; \
CONTAINER_CMD="docker"; \
fi; \
for module in fuzzforge-modules/*/; do \
if [ -f "$$module/Dockerfile" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-modules-sdk/" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-module-template/" ]; then \
name=$$(basename $$module); \
version=$$(grep 'version' "$$module/pyproject.toml" 2>/dev/null | head -1 | sed 's/.*"\(.*\\)".*/\\1/' || echo "0.1.0"); \
echo "Building $$name:$$version..."; \
$$CONTAINER_CMD build -t "fuzzforge-$$name:$$version" "$$module" || exit 1; \
fi \
done
@echo ""
@echo "✓ All modules built successfully!"
# Build all mcp-security-hub images for the firmware analysis pipeline
build-hub-images:
@bash scripts/build-hub-images.sh

View File

@@ -6,7 +6,7 @@ authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-runner==0.0.1",
"fuzzforge-mcp==0.0.1",
"rich>=14.0.0",
"typer==0.20.1",
]
@@ -25,4 +25,4 @@ tests = [
fuzzforge = "fuzzforge_cli.__main__:main"
[tool.uv.sources]
fuzzforge-runner = { workspace = true }
fuzzforge-mcp = { workspace = true }

View File

@@ -3,12 +3,12 @@
from pathlib import Path
from typing import Annotated
from fuzzforge_runner import Runner, Settings
from typer import Context as TyperContext
from typer import Option, Typer
from fuzzforge_cli.commands import mcp, modules, projects
from fuzzforge_cli.commands import mcp, projects
from fuzzforge_cli.context import Context
from fuzzforge_mcp.storage import LocalStorage
application: Typer = Typer(
name="fuzzforge",
@@ -27,15 +27,6 @@ def main(
help="Path to the FuzzForge project directory.",
),
] = Path.cwd(),
modules_path: Annotated[
Path,
Option(
"--modules",
"-m",
envvar="FUZZFORGE_MODULES_PATH",
help="Path to the modules directory.",
),
] = Path.home() / ".fuzzforge" / "modules",
storage_path: Annotated[
Path,
Option(
@@ -44,53 +35,20 @@ def main(
help="Path to the storage directory.",
),
] = Path.home() / ".fuzzforge" / "storage",
engine_type: Annotated[
str,
Option(
"--engine",
envvar="FUZZFORGE_ENGINE__TYPE",
help="Container engine type (docker or podman).",
),
] = "docker",
engine_socket: Annotated[
str,
Option(
"--socket",
envvar="FUZZFORGE_ENGINE__SOCKET",
help="Container engine socket path.",
),
] = "",
context: TyperContext = None, # type: ignore[assignment]
) -> None:
"""FuzzForge AI - Security research orchestration platform.
Execute security research modules in isolated containers.
Discover and execute MCP hub tools for security research.
"""
from fuzzforge_runner.settings import EngineSettings, ProjectSettings, StorageSettings
settings = Settings(
engine=EngineSettings(
type=engine_type, # type: ignore[arg-type]
socket=engine_socket,
),
storage=StorageSettings(
path=storage_path,
),
project=ProjectSettings(
default_path=project_path,
modules_path=modules_path,
),
)
runner = Runner(settings)
storage = LocalStorage(storage_path=storage_path)
context.obj = Context(
runner=runner,
storage=storage,
project_path=project_path,
)
application.add_typer(mcp.application)
application.add_typer(modules.application)
application.add_typer(projects.application)

View File

@@ -137,7 +137,7 @@ def _find_fuzzforge_root() -> Path:
# Walk up to find fuzzforge-oss root
for parent in current.parents:
if (parent / "fuzzforge-mcp").is_dir() and (parent / "fuzzforge-runner").is_dir():
if (parent / "fuzzforge-mcp").is_dir():
return parent
# Fall back to cwd
@@ -146,14 +146,12 @@ def _find_fuzzforge_root() -> Path:
def _generate_mcp_config(
fuzzforge_root: Path,
modules_path: Path,
engine_type: str,
engine_socket: str,
) -> dict:
"""Generate MCP server configuration.
:param fuzzforge_root: Path to fuzzforge-oss installation.
:param modules_path: Path to the modules directory.
:param engine_type: Container engine type (podman or docker).
:param engine_socket: Container engine socket path.
:returns: MCP configuration dictionary.
@@ -181,7 +179,6 @@ def _generate_mcp_config(
"args": args,
"cwd": str(fuzzforge_root),
"env": {
"FUZZFORGE_MODULES_PATH": str(modules_path),
"FUZZFORGE_ENGINE__TYPE": engine_type,
"FUZZFORGE_ENGINE__GRAPHROOT": str(graphroot),
"FUZZFORGE_ENGINE__RUNROOT": str(runroot),
@@ -266,14 +263,6 @@ def generate(
help="AI agent to generate config for (copilot, claude-desktop, or claude-code).",
),
],
modules_path: Annotated[
Path | None,
Option(
"--modules",
"-m",
help="Path to the modules directory.",
),
] = None,
engine: Annotated[
str,
Option(
@@ -287,16 +276,12 @@ def generate(
:param context: Typer context.
:param agent: Target AI agent.
:param modules_path: Override modules path.
:param engine: Container engine type.
"""
console = Console()
fuzzforge_root = _find_fuzzforge_root()
# Use defaults if not specified
resolved_modules = modules_path or (fuzzforge_root / "fuzzforge-modules")
# Detect socket
if engine == "podman":
socket = _detect_podman_socket()
@@ -306,7 +291,6 @@ def generate(
# Generate config
server_config = _generate_mcp_config(
fuzzforge_root=fuzzforge_root,
modules_path=resolved_modules,
engine_type=engine,
engine_socket=socket,
)
@@ -350,14 +334,6 @@ def install(
help="AI agent to install config for (copilot, claude-desktop, or claude-code).",
),
],
modules_path: Annotated[
Path | None,
Option(
"--modules",
"-m",
help="Path to the modules directory.",
),
] = None,
engine: Annotated[
str,
Option(
@@ -382,7 +358,6 @@ def install(
:param context: Typer context.
:param agent: Target AI agent.
:param modules_path: Override modules path.
:param engine: Container engine type.
:param force: Overwrite existing configuration.
@@ -401,9 +376,6 @@ def install(
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
# Use defaults if not specified
resolved_modules = modules_path or (fuzzforge_root / "fuzzforge-modules")
# Detect socket
if engine == "podman":
socket = _detect_podman_socket()
@@ -413,7 +385,6 @@ def install(
# Generate server config
server_config = _generate_mcp_config(
fuzzforge_root=fuzzforge_root,
modules_path=resolved_modules,
engine_type=engine,
engine_socket=socket,
)
@@ -453,7 +424,6 @@ def install(
console.print(f"[bold]Configuration file:[/bold] {config_path}")
console.print()
console.print("[bold]Settings:[/bold]")
console.print(f" Modules Path: {resolved_modules}")
console.print(f" Engine: {engine}")
console.print(f" Socket: {socket}")
console.print(f" Hub Config: {fuzzforge_root / 'hub-config.json'}")

View File

@@ -1,166 +0,0 @@
"""Module management commands for FuzzForge CLI."""
import asyncio
from pathlib import Path
from typing import Annotated, Any
from rich.console import Console
from rich.table import Table
from typer import Argument, Context, Option, Typer
from fuzzforge_cli.context import get_project_path, get_runner
application: Typer = Typer(
name="modules",
help="Module management commands.",
)
@application.command(
help="List available modules.",
name="list",
)
def list_modules(
context: Context,
) -> None:
"""List all available modules.
:param context: Typer context.
"""
runner = get_runner(context)
modules = runner.list_modules()
console = Console()
if not modules:
console.print("[yellow]No modules found.[/yellow]")
console.print(f" Modules directory: {runner.settings.modules_path}")
return
table = Table(title="Available Modules")
table.add_column("Identifier", style="cyan")
table.add_column("Available")
table.add_column("Description")
for module in modules:
table.add_row(
module.identifier,
"" if module.available else "",
module.description or "-",
)
console.print(table)
@application.command(
help="Execute a module.",
name="run",
)
def run_module(
context: Context,
module_identifier: Annotated[
str,
Argument(
help="Identifier of the module to execute.",
),
],
assets_path: Annotated[
Path | None,
Option(
"--assets",
"-a",
help="Path to input assets.",
),
] = None,
config: Annotated[
str | None,
Option(
"--config",
"-c",
help="Module configuration as JSON string.",
),
] = None,
) -> None:
"""Execute a module.
:param context: Typer context.
:param module_identifier: Module to execute.
:param assets_path: Optional path to input assets.
:param config: Optional JSON configuration.
"""
import json
runner = get_runner(context)
project_path = get_project_path(context)
configuration: dict[str, Any] | None = None
if config:
try:
configuration = json.loads(config)
except json.JSONDecodeError as e:
console = Console()
console.print(f"[red]✗[/red] Invalid JSON configuration: {e}")
return
console = Console()
console.print(f"[blue]→[/blue] Executing module: {module_identifier}")
async def execute() -> None:
result = await runner.execute_module(
module_identifier=module_identifier,
project_path=project_path,
configuration=configuration,
assets_path=assets_path,
)
if result.success:
console.print(f"[green]✓[/green] Module execution completed")
console.print(f" Execution ID: {result.execution_id}")
console.print(f" Results: {result.results_path}")
else:
console.print(f"[red]✗[/red] Module execution failed")
console.print(f" Error: {result.error}")
asyncio.run(execute())
@application.command(
help="Show module information.",
name="info",
)
def module_info(
context: Context,
module_identifier: Annotated[
str,
Argument(
help="Identifier of the module.",
),
],
) -> None:
"""Show information about a specific module.
:param context: Typer context.
:param module_identifier: Module to get info for.
"""
runner = get_runner(context)
module = runner.get_module_info(module_identifier)
console = Console()
if module is None:
console.print(f"[red]✗[/red] Module not found: {module_identifier}")
return
table = Table(title=f"Module: {module.identifier}")
table.add_column("Property", style="cyan")
table.add_column("Value")
table.add_row("Identifier", module.identifier)
table.add_row("Available", "Yes" if module.available else "No")
table.add_row("Description", module.description or "-")
table.add_row("Version", module.version or "-")
console.print(table)

View File

@@ -7,7 +7,7 @@ from rich.console import Console
from rich.table import Table
from typer import Argument, Context, Option, Typer
from fuzzforge_cli.context import get_project_path, get_runner
from fuzzforge_cli.context import get_project_path, get_storage
application: Typer = Typer(
name="project",
@@ -36,10 +36,10 @@ def init_project(
:param path: Path to initialize (defaults to current directory).
"""
runner = get_runner(context)
storage = get_storage(context)
project_path = path or get_project_path(context)
storage_path = runner.init_project(project_path)
storage_path = storage.init_project(project_path)
console = Console()
console.print(f"[green]✓[/green] Project initialized at {project_path}")
@@ -65,10 +65,10 @@ def set_assets(
:param assets_path: Path to assets.
"""
runner = get_runner(context)
storage = get_storage(context)
project_path = get_project_path(context)
stored_path = runner.set_project_assets(project_path, assets_path)
stored_path = storage.set_project_assets(project_path, assets_path)
console = Console()
console.print(f"[green]✓[/green] Assets stored from {assets_path}")
@@ -87,11 +87,11 @@ def show_info(
:param context: Typer context.
"""
runner = get_runner(context)
storage = get_storage(context)
project_path = get_project_path(context)
executions = runner.list_executions(project_path)
assets_path = runner.storage.get_project_assets_path(project_path)
executions = storage.list_executions(project_path)
assets_path = storage.get_project_assets_path(project_path)
console = Console()
table = Table(title=f"Project: {project_path.name}")
@@ -118,10 +118,10 @@ def list_executions(
:param context: Typer context.
"""
runner = get_runner(context)
storage = get_storage(context)
project_path = get_project_path(context)
executions = runner.list_executions(project_path)
executions = storage.list_executions(project_path)
console = Console()
@@ -134,7 +134,7 @@ def list_executions(
table.add_column("Has Results")
for exec_id in executions:
has_results = runner.get_execution_results(project_path, exec_id) is not None
has_results = storage.get_execution_results(project_path, exec_id) is not None
table.add_row(exec_id, "" if has_results else "-")
console.print(table)
@@ -168,10 +168,10 @@ def get_results(
:param extract_to: Optional directory to extract to.
"""
runner = get_runner(context)
storage = get_storage(context)
project_path = get_project_path(context)
results_path = runner.get_execution_results(project_path, execution_id)
results_path = storage.get_execution_results(project_path, execution_id)
console = Console()
@@ -182,5 +182,5 @@ def get_results(
console.print(f"[green]✓[/green] Results: {results_path}")
if extract_to:
extracted = runner.extract_results(results_path, extract_to)
extracted = storage.extract_results(results_path, extract_to)
console.print(f" Extracted to: {extracted}")

View File

@@ -5,35 +5,35 @@ from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from fuzzforge_runner import Runner, Settings
from fuzzforge_mcp.storage import LocalStorage
if TYPE_CHECKING:
from typer import Context as TyperContext
class Context:
"""CLI context holding the runner instance and settings."""
"""CLI context holding the storage instance and settings."""
_runner: Runner
_storage: LocalStorage
_project_path: Path
def __init__(self, runner: Runner, project_path: Path) -> None:
def __init__(self, storage: LocalStorage, project_path: Path) -> None:
"""Initialize an instance of the class.
:param runner: FuzzForge runner instance.
:param storage: FuzzForge local storage instance.
:param project_path: Path to the current project.
"""
self._runner = runner
self._storage = storage
self._project_path = project_path
def get_runner(self) -> Runner:
"""Get the runner instance.
def get_storage(self) -> LocalStorage:
"""Get the storage instance.
:return: Runner instance.
:return: LocalStorage instance.
"""
return self._runner
return self._storage
def get_project_path(self) -> Path:
"""Get the current project path.
@@ -44,14 +44,14 @@ class Context:
return self._project_path
def get_runner(context: TyperContext) -> Runner:
"""Get runner from Typer context.
def get_storage(context: TyperContext) -> LocalStorage:
"""Get storage from Typer context.
:param context: Typer context.
:return: Runner instance.
:return: LocalStorage instance.
"""
return cast("Context", context.obj).get_runner()
return cast("Context", context.obj).get_storage()
def get_project_path(context: TyperContext) -> Path:

View File

@@ -15,7 +15,7 @@ Supported transport types:
"""
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.client import HubClient, HubClientError, PersistentSession
from fuzzforge_common.hub.executor import HubExecutionResult, HubExecutor
from fuzzforge_common.hub.models import (
HubConfig,
@@ -39,4 +39,5 @@ __all__ = [
"HubServerType",
"HubTool",
"HubToolParameter",
"PersistentSession",
]

View File

@@ -6,6 +6,7 @@ via stdio (docker/command) or SSE transport. It handles:
- Connecting to SSE endpoints
- Discovering tools via list_tools()
- Executing tools via call_tool()
- Persistent container sessions for stateful interactions
"""
@@ -16,6 +17,8 @@ import json
import os
import subprocess
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.models import (
@@ -47,6 +50,48 @@ class HubClientError(Exception):
"""Error in hub client operations."""
@dataclass
class PersistentSession:
"""A persistent container session with an active MCP connection.
Keeps a Docker container running between tool calls to allow
stateful interactions (e.g., radare2 analysis, long-running fuzzing).
"""
#: Server name this session belongs to.
server_name: str
#: Docker container name.
container_name: str
#: Underlying process (docker run).
process: Process
#: Stream reader (process stdout).
reader: asyncio.StreamReader
#: Stream writer (process stdin).
writer: asyncio.StreamWriter
#: Whether the MCP session has been initialized.
initialized: bool = False
#: Lock to serialise concurrent requests on the same session.
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
#: When the session was started.
started_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
#: Monotonic counter for JSON-RPC request IDs.
request_id: int = 0
@property
def alive(self) -> bool:
"""Check if the underlying process is still running."""
return self.process.returncode is None
class HubClient:
"""Client for communicating with MCP hub servers.
@@ -65,6 +110,8 @@ class HubClient:
"""
self._timeout = timeout
self._persistent_sessions: dict[str, PersistentSession] = {}
self._request_id: int = 0
async def discover_tools(self, server: HubServer) -> list[HubTool]:
"""Discover tools from a hub server.
@@ -84,8 +131,9 @@ class HubClient:
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
# List tools
tools_data = await self._call_method(
@@ -141,7 +189,7 @@ class HubClient:
"""
logger = get_logger()
config = server.config
exec_timeout = timeout or self._timeout
exec_timeout = timeout or config.timeout or self._timeout
logger.info(
"Executing hub tool",
@@ -152,8 +200,9 @@ class HubClient:
try:
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
# Call tool
result = await asyncio.wait_for(
@@ -202,10 +251,22 @@ class HubClient:
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
If a persistent session exists for this server, reuse it (with a lock
to serialise concurrent requests). Otherwise, fall through to the
ephemeral per-call connection logic.
:param config: Server configuration.
:yields: Tuple of (reader, writer) for communication.
"""
# Check for active persistent session
session = self._persistent_sessions.get(config.name)
if session and session.initialized and session.alive:
async with session.lock:
yield session.reader, session.writer # type: ignore[misc]
return
# Ephemeral connection (original behaviour)
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config) as streams:
yield streams
@@ -251,11 +312,15 @@ class HubClient:
cmd.append(config.image)
# Use 4 MB buffer to handle large tool responses (YARA rulesets, trivy output, etc.)
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
)
try:
@@ -294,12 +359,16 @@ class HubClient:
# Set up environment
env = dict(config.environment) if config.environment else None
# Use 4 MB buffer to handle large tool responses
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*config.command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
limit=_STREAM_LIMIT,
)
try:
@@ -385,10 +454,11 @@ class HubClient:
:returns: Method result.
"""
# Create JSON-RPC request
# Create JSON-RPC request with unique ID
self._request_id += 1
request = {
"jsonrpc": "2.0",
"id": 1,
"id": self._request_id,
"method": method,
"params": params,
}
@@ -415,7 +485,16 @@ class HubClient:
msg = f"MCP error: {error.get('message', 'Unknown error')}"
raise HubClientError(msg)
return response.get("result", {})
result = response.get("result", {})
# Check for tool-level errors in content items
for item in result.get("content", []):
if item.get("isError", False):
error_text = item.get("text", "unknown error")
msg = f"Tool returned error: {error_text}"
raise HubClientError(msg)
return result
async def _send_notification(
self,
@@ -442,3 +521,218 @@ class HubClient:
notification_line = json.dumps(notification) + "\n"
writer.write(notification_line.encode())
await writer.drain()
# ------------------------------------------------------------------
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_session(
self,
config: HubServerConfig,
) -> PersistentSession:
"""Start a persistent Docker container and initialise MCP session.
The container stays running until :meth:`stop_persistent_session` is
called, allowing multiple tool calls on the same session.
:param config: Server configuration (must be Docker type).
:returns: The created persistent session.
:raises HubClientError: If the container cannot be started.
"""
logger = get_logger()
if config.name in self._persistent_sessions:
session = self._persistent_sessions[config.name]
if session.alive:
logger.info("Persistent session already running", server=config.name)
return session
# Dead session — clean up and restart
await self._cleanup_session(config.name)
if config.type != HubServerType.DOCKER:
msg = f"Persistent mode only supports Docker servers (got {config.type.value})"
raise HubClientError(msg)
if not config.image:
msg = f"Docker image not specified for server '{config.name}'"
raise HubClientError(msg)
container_name = f"fuzzforge-{config.name}"
# Remove stale container with same name if it exists
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
# Build docker run command (no --rm, with --name)
cmd = ["docker", "run", "-i", "--name", container_name]
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(config.image)
_STREAM_LIMIT = 4 * 1024 * 1024
logger.info(
"Starting persistent container",
server=config.name,
container=container_name,
image=config.image,
)
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
)
if process.stdin is None or process.stdout is None:
process.terminate()
msg = "Failed to get process streams"
raise HubClientError(msg)
session = PersistentSession(
server_name=config.name,
container_name=container_name,
process=process,
reader=process.stdout,
writer=process.stdin,
)
# Initialise MCP session
try:
await self._initialize_session(
session.reader, # type: ignore[arg-type]
session.writer, # type: ignore[arg-type]
config.name,
)
session.initialized = True
except Exception as e:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
msg = f"Failed to initialise MCP session for {config.name}: {e}"
raise HubClientError(msg) from e
self._persistent_sessions[config.name] = session
logger.info(
"Persistent session started",
server=config.name,
container=container_name,
)
return session
async def stop_persistent_session(self, server_name: str) -> bool:
"""Stop a persistent container session.
:param server_name: Name of the server whose session to stop.
:returns: True if a session was stopped, False if none found.
"""
return await self._cleanup_session(server_name)
def get_persistent_session(self, server_name: str) -> PersistentSession | None:
"""Get a persistent session by server name.
:param server_name: Server name.
:returns: The session if running, None otherwise.
"""
session = self._persistent_sessions.get(server_name)
if session and not session.alive:
# Mark dead session — don't remove here to avoid async issues
return None
return session
def list_persistent_sessions(self) -> list[dict[str, Any]]:
"""List all persistent sessions with their status.
:returns: List of session info dictionaries.
"""
sessions = []
for name, session in self._persistent_sessions.items():
sessions.append({
"server_name": name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"uptime_seconds": int(
(datetime.now(tz=timezone.utc) - session.started_at).total_seconds()
),
})
return sessions
async def stop_all_persistent_sessions(self) -> int:
"""Stop all persistent sessions.
:returns: Number of sessions stopped.
"""
names = list(self._persistent_sessions.keys())
count = 0
for name in names:
if await self._cleanup_session(name):
count += 1
return count
async def _cleanup_session(self, server_name: str) -> bool:
"""Clean up a persistent session (terminate process, remove container).
:param server_name: Server name.
:returns: True if cleaned up, False if not found.
"""
logger = get_logger()
session = self._persistent_sessions.pop(server_name, None)
if session is None:
return False
logger.info("Stopping persistent session", server=server_name)
# Terminate process
if session.alive:
session.process.terminate()
try:
await asyncio.wait_for(session.process.wait(), timeout=10)
except asyncio.TimeoutError:
session.process.kill()
await session.process.wait()
# Remove Docker container
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", session.container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
logger.info(
"Persistent session stopped",
server=server_name,
container=session.container_name,
)
return True

View File

@@ -12,7 +12,7 @@ from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.client import HubClient, HubClientError, PersistentSession
from fuzzforge_common.hub.models import HubServer, HubServerConfig, HubTool
from fuzzforge_common.hub.registry import HubRegistry
@@ -106,6 +106,7 @@ class HubExecutor:
"""
self._registry = HubRegistry(config_path)
self._client = HubClient(timeout=timeout)
self._continuous_sessions: dict[str, dict[str, Any]] = {}
@property
def registry(self) -> HubRegistry:
@@ -291,6 +292,7 @@ class HubExecutor:
"""
servers = []
for server in self._registry.servers:
session = self._client.get_persistent_session(server.name)
servers.append({
"name": server.name,
"identifier": server.identifier,
@@ -298,6 +300,8 @@ class HubExecutor:
"enabled": server.config.enabled,
"category": server.config.category,
"description": server.config.description,
"persistent": server.config.persistent,
"persistent_session_active": session is not None and session.alive,
"discovered": server.discovered,
"tool_count": len(server.tools),
"error": server.discovery_error,
@@ -332,3 +336,287 @@ class HubExecutor:
if tool:
return tool.input_schema
return None
# ------------------------------------------------------------------
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_server(self, server_name: str) -> dict[str, Any]:
"""Start a persistent container session for a server.
The container stays running between tool calls, allowing stateful
interactions (e.g., radare2 sessions, long-running fuzzing).
:param server_name: Name of the hub server to start.
:returns: Session status dictionary.
:raises ValueError: If server not found.
"""
logger = get_logger()
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
session = await self._client.start_persistent_session(server.config)
# Auto-discover tools on the new session
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
except HubClientError as e:
logger.warning(
"Tool discovery failed on persistent session",
server=server_name,
error=str(e),
)
# Include discovered tools in the result so agent knows what's available
discovered_tools = []
server_obj = self._registry.get_server(server_name)
if server_obj:
for tool in server_obj.tools:
discovered_tools.append({
"identifier": tool.identifier,
"name": tool.name,
"description": tool.description,
})
return {
"server_name": session.server_name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"tools": discovered_tools,
"tool_count": len(discovered_tools),
}
async def stop_persistent_server(self, server_name: str) -> bool:
"""Stop a persistent container session.
:param server_name: Server name.
:returns: True if a session was stopped.
"""
return await self._client.stop_persistent_session(server_name)
def get_persistent_status(self, server_name: str) -> dict[str, Any] | None:
"""Get status of a persistent session.
:param server_name: Server name.
:returns: Status dict or None if no session.
"""
session = self._client.get_persistent_session(server_name)
if not session:
return None
from datetime import datetime, timezone # noqa: PLC0415
return {
"server_name": session.server_name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"uptime_seconds": int(
(datetime.now(tz=timezone.utc) - session.started_at).total_seconds()
),
}
def list_persistent_sessions(self) -> list[dict[str, Any]]:
"""List all persistent sessions.
:returns: List of session status dicts.
"""
return self._client.list_persistent_sessions()
async def stop_all_persistent_servers(self) -> int:
"""Stop all persistent sessions.
:returns: Number of sessions stopped.
"""
return await self._client.stop_all_persistent_sessions()
# ------------------------------------------------------------------
# Continuous session management
# ------------------------------------------------------------------
async def start_continuous_tool(
self,
server_name: str,
start_tool: str,
arguments: dict[str, Any],
) -> dict[str, Any]:
"""Start a continuous hub tool session.
Ensures a persistent container is running, then calls the start tool
(e.g., ``cargo_fuzz_start``) which returns a session_id. Tracks the
session for subsequent status/stop calls.
:param server_name: Hub server name.
:param start_tool: Name of the start tool on the server.
:param arguments: Arguments for the start tool.
:returns: Start result including session_id.
:raises ValueError: If server not found.
"""
logger = get_logger()
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
# Ensure persistent session is running
persistent = self._client.get_persistent_session(server_name)
if not persistent or not persistent.alive:
logger.info(
"Auto-starting persistent session for continuous tool",
server=server_name,
)
await self._client.start_persistent_session(server.config)
# Discover tools on the new session
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
except HubClientError as e:
logger.warning(
"Tool discovery failed on persistent session",
server=server_name,
error=str(e),
)
# Call the start tool
result = await self._client.execute_tool(
server, start_tool, arguments,
)
# Extract session_id from result
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
start_result = json.loads(content_text) if content_text else result
except json.JSONDecodeError:
start_result = result
session_id = start_result.get("session_id", "")
if session_id:
from datetime import datetime, timezone # noqa: PLC0415
self._continuous_sessions[session_id] = {
"session_id": session_id,
"server_name": server_name,
"start_tool": start_tool,
"status_tool": start_tool.replace("_start", "_status"),
"stop_tool": start_tool.replace("_start", "_stop"),
"started_at": datetime.now(tz=timezone.utc).isoformat(),
"status": "running",
}
return start_result
async def get_continuous_tool_status(
self,
session_id: str,
) -> dict[str, Any]:
"""Get status of a continuous hub tool session.
:param session_id: Session ID from start_continuous_tool.
:returns: Status dict from the hub server's status tool.
:raises ValueError: If session not found.
"""
session_info = self._continuous_sessions.get(session_id)
if not session_info:
msg = f"Unknown continuous session: {session_id}"
raise ValueError(msg)
server = self._registry.get_server(session_info["server_name"])
if not server:
msg = f"Server '{session_info['server_name']}' not found"
raise ValueError(msg)
result = await self._client.execute_tool(
server,
session_info["status_tool"],
{"session_id": session_id},
)
# Parse the text content
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
return json.loads(content_text) if content_text else result
except json.JSONDecodeError:
return result
async def stop_continuous_tool(
self,
session_id: str,
) -> dict[str, Any]:
"""Stop a continuous hub tool session.
:param session_id: Session ID to stop.
:returns: Final results from the hub server's stop tool.
:raises ValueError: If session not found.
"""
session_info = self._continuous_sessions.get(session_id)
if not session_info:
msg = f"Unknown continuous session: {session_id}"
raise ValueError(msg)
server = self._registry.get_server(session_info["server_name"])
if not server:
msg = f"Server '{session_info['server_name']}' not found"
raise ValueError(msg)
result = await self._client.execute_tool(
server,
session_info["stop_tool"],
{"session_id": session_id},
)
# Parse the text content
content_text = ""
for item in result.get("content", []):
if item.get("type") == "text":
content_text = item.get("text", "")
break
import json # noqa: PLC0415
try:
stop_result = json.loads(content_text) if content_text else result
except json.JSONDecodeError:
stop_result = result
# Update session tracking
session_info["status"] = "stopped"
return stop_result
def list_continuous_sessions(self) -> list[dict[str, Any]]:
"""List all tracked continuous sessions.
:returns: List of continuous session info dicts.
"""
return list(self._continuous_sessions.values())

View File

@@ -92,6 +92,18 @@ class HubServerConfig(BaseModel):
description="Category for grouping servers",
)
#: Per-server timeout override in seconds (None = use default_timeout).
timeout: int | None = Field(
default=None,
description="Per-server execution timeout override in seconds",
)
#: Whether to use persistent container mode (keep container running between calls).
persistent: bool = Field(
default=False,
description="Keep container running between tool calls for stateful interactions",
)
class HubToolParameter(BaseModel):
"""A parameter for an MCP tool.

View File

@@ -8,7 +8,6 @@ requires-python = ">=3.14"
dependencies = [
"fastmcp==2.14.1",
"fuzzforge-common==0.0.1",
"fuzzforge-runner==0.0.1",
"pydantic==2.12.4",
"pydantic-settings==2.12.0",
"structlog==25.5.0",
@@ -32,5 +31,4 @@ tests = [
[tool.uv.sources]
fuzzforge-common = { workspace = true }
fuzzforge-runner = { workspace = true }
fuzzforge-tests = { workspace = true }

View File

@@ -1,7 +1,8 @@
"""FuzzForge MCP Server Application.
This is the main entry point for the FuzzForge MCP server, providing
AI agents with tools to execute security research modules.
AI agents with tools to discover and execute MCP hub tools for
security research.
"""
@@ -12,7 +13,7 @@ from fastmcp import FastMCP
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fuzzforge_mcp import resources, tools
from fuzzforge_runner import Settings
from fuzzforge_mcp.settings import Settings
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
@@ -38,19 +39,18 @@ mcp: FastMCP = FastMCP(
instructions="""
FuzzForge is a security research orchestration platform. Use these tools to:
1. **List modules**: Discover available security research modules
2. **Execute modules**: Run modules in isolated containers
3. **Execute workflows**: Chain multiple modules together
1. **List hub servers**: Discover registered MCP tool servers
2. **Discover tools**: Find available tools from hub servers
3. **Execute hub tools**: Run security tools in isolated containers
4. **Manage projects**: Initialize and configure projects
5. **Get results**: Retrieve execution results
6. **Hub tools**: Discover and execute tools from external MCP servers
Typical workflow:
1. Initialize a project with `init_project`
2. Set project assets with `set_project_assets` (optional, only needed once for the source directory)
3. List available modules with `list_modules`
4. Execute a module with `execute_module` — use `assets_path` param to pass different inputs per module
5. Read outputs from `results_path` returned by `execute_module` — check module's `output_artifacts` metadata for filenames
3. List available hub servers with `list_hub_servers`
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`
Hub workflow:
1. List available hub servers with `list_hub_servers`

View File

@@ -6,9 +6,10 @@ from pathlib import Path
from typing import TYPE_CHECKING, cast
from fastmcp.server.dependencies import get_context
from fuzzforge_runner import Runner, Settings
from fuzzforge_mcp.exceptions import FuzzForgeMCPError
from fuzzforge_mcp.settings import Settings
from fuzzforge_mcp.storage import LocalStorage
if TYPE_CHECKING:
from fastmcp import Context
@@ -17,6 +18,9 @@ if TYPE_CHECKING:
# Track the current active project path (set by init_project)
_current_project_path: Path | None = None
# Singleton storage instance
_storage: LocalStorage | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
@@ -60,11 +64,14 @@ def get_project_path() -> Path:
return Path.cwd()
def get_runner() -> Runner:
"""Get a configured Runner instance.
def get_storage() -> LocalStorage:
"""Get the storage backend instance.
:return: Runner instance configured from MCP settings.
:return: LocalStorage instance.
"""
settings: Settings = get_settings()
return Runner(settings)
global _storage
if _storage is None:
settings = get_settings()
_storage = LocalStorage(settings.storage.path)
return _storage

View File

@@ -2,14 +2,12 @@
from fastmcp import FastMCP
from fuzzforge_mcp.resources import executions, modules, project, workflows
from fuzzforge_mcp.resources import executions, project
mcp: FastMCP = FastMCP()
mcp.mount(executions.mcp)
mcp.mount(modules.mcp)
mcp.mount(project.mcp)
mcp.mount(workflows.mcp)
__all__ = [
"mcp",

View File

@@ -3,15 +3,12 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
@@ -26,16 +23,16 @@ async def list_executions() -> list[dict[str, Any]]:
:return: List of execution information dictionaries.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
execution_ids = runner.list_executions(project_path)
execution_ids = storage.list_executions(project_path)
return [
{
"execution_id": exec_id,
"has_results": runner.get_execution_results(project_path, exec_id) is not None,
"has_results": storage.get_execution_results(project_path, exec_id) is not None,
}
for exec_id in execution_ids
]
@@ -53,11 +50,11 @@ async def get_execution(execution_id: str) -> dict[str, Any]:
:return: Execution information dictionary.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
results_path = runner.get_execution_results(project_path, execution_id)
results_path = storage.get_execution_results(project_path, execution_id)
if results_path is None:
raise ResourceError(f"Execution not found: {execution_id}")

View File

@@ -1,78 +0,0 @@
"""Module resources for FuzzForge MCP."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.runner import ModuleInfo
mcp: FastMCP = FastMCP()
@mcp.resource("fuzzforge://modules/")
async def list_modules() -> list[dict[str, Any]]:
"""List all available FuzzForge modules.
Returns information about modules that can be executed,
including their identifiers and availability status.
:return: List of module information dictionaries.
"""
runner: Runner = get_runner()
try:
modules: list[ModuleInfo] = runner.list_modules()
return [
{
"identifier": module.identifier,
"description": module.description,
"version": module.version,
"available": module.available,
}
for module in modules
]
except Exception as exception:
message: str = f"Failed to list modules: {exception}"
raise ResourceError(message) from exception
@mcp.resource("fuzzforge://modules/{module_identifier}")
async def get_module(module_identifier: str) -> dict[str, Any]:
"""Get information about a specific module.
:param module_identifier: The identifier of the module to retrieve.
:return: Module information dictionary.
"""
runner: Runner = get_runner()
try:
module: ModuleInfo | None = runner.get_module_info(module_identifier)
if module is None:
raise ResourceError(f"Module not found: {module_identifier}")
return {
"identifier": module.identifier,
"description": module.description,
"version": module.version,
"available": module.available,
}
except ResourceError:
raise
except Exception as exception:
message: str = f"Failed to get module: {exception}"
raise ResourceError(message) from exception

View File

@@ -3,15 +3,12 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
mcp: FastMCP = FastMCP()
@@ -27,12 +24,12 @@ async def get_project() -> dict[str, Any]:
:return: Project information dictionary.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
executions = runner.list_executions(project_path)
assets_path = runner.storage.get_project_assets_path(project_path)
executions = storage.list_executions(project_path)
assets_path = storage.get_project_assets_path(project_path)
return {
"path": str(project_path),
@@ -40,7 +37,7 @@ async def get_project() -> dict[str, Any]:
"has_assets": assets_path is not None,
"assets_path": str(assets_path) if assets_path else None,
"execution_count": len(executions),
"recent_executions": executions[:10], # Last 10 executions
"recent_executions": executions[:10],
}
except Exception as exception:
@@ -53,13 +50,11 @@ async def get_project_settings() -> dict[str, Any]:
"""Get current FuzzForge settings.
Returns the active configuration for the MCP server including
engine, storage, and project settings.
engine, storage, and hub settings.
:return: Settings dictionary.
"""
from fuzzforge_mcp.dependencies import get_settings
try:
settings = get_settings()
@@ -71,9 +66,10 @@ async def get_project_settings() -> dict[str, Any]:
"storage": {
"path": str(settings.storage.path),
},
"project": {
"path": str(settings.project.path),
"modules_path": str(settings.modules_path),
"hub": {
"enabled": settings.hub.enabled,
"config_path": str(settings.hub.config_path),
"timeout": settings.hub.timeout,
},
"debug": settings.debug,
}

View File

@@ -1,53 +0,0 @@
"""Workflow resources for FuzzForge MCP.
Note: In FuzzForge AI, workflows are defined at runtime rather than
stored. This resource provides documentation about workflow capabilities.
"""
from __future__ import annotations
from typing import Any
from fastmcp import FastMCP
mcp: FastMCP = FastMCP()
@mcp.resource("fuzzforge://workflows/help")
async def get_workflow_help() -> dict[str, Any]:
"""Get help information about creating workflows.
Workflows in FuzzForge AI are defined at execution time rather
than stored. Use the execute_workflow tool with step definitions.
:return: Workflow documentation.
"""
return {
"description": "Workflows chain multiple modules together",
"usage": "Use the execute_workflow tool with step definitions",
"example": {
"workflow_name": "security-audit",
"steps": [
{
"module": "compile-contracts",
"configuration": {"solc_version": "0.8.0"},
},
{
"module": "slither",
"configuration": {},
},
{
"module": "echidna",
"configuration": {"test_limit": 10000},
},
],
},
"step_format": {
"module": "Module identifier (required)",
"configuration": "Module-specific configuration (optional)",
"name": "Step name for logging (optional)",
},
}

View File

@@ -1,10 +1,23 @@
"""FuzzForge Runner settings configuration."""
"""FuzzForge MCP Server settings.
Standalone settings for the MCP server. Replaces the previous dependency
on fuzzforge-runner Settings now that the module system has been removed
and FuzzForge operates exclusively through MCP hub tools.
All settings can be configured via environment variables with the prefix
``FUZZFORGE_``. Nested settings use double-underscore as delimiter.
Example:
``FUZZFORGE_ENGINE__TYPE=docker``
``FUZZFORGE_STORAGE__PATH=/data/fuzzforge``
``FUZZFORGE_HUB__CONFIG_PATH=/path/to/hub-config.json``
"""
from __future__ import annotations
from enum import StrEnum
from pathlib import Path
from typing import Literal
from pydantic import BaseModel, Field
from pydantic_settings import BaseSettings, SettingsConfigDict
@@ -20,24 +33,21 @@ class EngineType(StrEnum):
class EngineSettings(BaseModel):
"""Container engine configuration."""
#: Type of container engine to use. Docker is the default for simplicity.
#: Type of container engine to use.
type: EngineType = EngineType.DOCKER
#: Path to the container engine socket (only used as fallback for socket-based engines).
#: Path to the container engine socket.
socket: str = Field(default="")
#: Custom graph root for Podman storage (only used with Podman under Snap).
#: Custom graph root for Podman storage.
graphroot: Path = Field(default=Path.home() / ".fuzzforge" / "containers" / "storage")
#: Custom run root for Podman runtime state (only used with Podman under Snap).
#: Custom run root for Podman runtime state.
runroot: Path = Field(default=Path.home() / ".fuzzforge" / "containers" / "run")
class StorageSettings(BaseModel):
"""Storage configuration for local filesystem storage.
OSS uses direct file mounting without archiving for simplicity.
"""
"""Storage configuration for local filesystem storage."""
#: Base path for local storage.
path: Path = Field(default=Path.home() / ".fuzzforge" / "storage")
@@ -50,33 +60,12 @@ class ProjectSettings(BaseModel):
default_path: Path = Field(default=Path.home() / ".fuzzforge" / "projects")
class RegistrySettings(BaseModel):
"""Container registry configuration for module images.
By default, registry URL is empty (local-only mode). When empty,
modules must be built locally with `make build-modules`.
Set via FUZZFORGE_REGISTRY__URL environment variable if needed.
"""
#: Registry URL for pulling module images (empty = local-only mode).
url: str = Field(default="")
#: Default tag to use when pulling images.
default_tag: str = Field(default="latest")
#: Registry username for authentication (optional).
username: str | None = None
#: Registry password/token for authentication (optional).
password: str | None = None
class HubSettings(BaseModel):
"""MCP Hub configuration for external tool servers.
Controls the hub that bridges FuzzForge with external MCP servers
(e.g., mcp-security-hub). When enabled, AI agents can discover
and execute tools from registered MCP servers.
(e.g., mcp-security-hub). AI agents discover and execute tools
from registered MCP servers.
Configure via environment variables:
``FUZZFORGE_HUB__ENABLED=true``
@@ -95,15 +84,10 @@ class HubSettings(BaseModel):
class Settings(BaseSettings):
"""FuzzForge Runner settings.
"""FuzzForge MCP Server settings.
Settings can be configured via environment variables with the prefix
``FUZZFORGE_``. Nested settings use underscore as delimiter.
Example:
``FUZZFORGE_ENGINE_TYPE=docker``
``FUZZFORGE_STORAGE_PATH=/data/fuzzforge``
``FUZZFORGE_MODULES_PATH=/path/to/modules``
``FUZZFORGE_``. Nested settings use double-underscore as delimiter.
"""
@@ -122,14 +106,8 @@ class Settings(BaseSettings):
#: Project settings.
project: ProjectSettings = Field(default_factory=ProjectSettings)
#: Container registry settings.
registry: RegistrySettings = Field(default_factory=RegistrySettings)
#: MCP Hub settings.
hub: HubSettings = Field(default_factory=HubSettings)
#: Path to modules directory (for development/local builds).
modules_path: Path = Field(default=Path.home() / ".fuzzforge" / "modules")
#: Enable debug logging.
debug: bool = False

View File

@@ -0,0 +1,203 @@
"""FuzzForge MCP Server - Local project storage.
Lightweight project storage for managing `.fuzzforge/` directories,
execution results, and project configuration. Extracted from the
former fuzzforge-runner storage module.
Storage is placed directly in the project directory as `.fuzzforge/`
for maximum visibility and ease of debugging.
"""
from __future__ import annotations
import json
import logging
import shutil
from pathlib import Path
from tarfile import open as Archive # noqa: N812
logger = logging.getLogger("fuzzforge-mcp")
#: Name of the FuzzForge storage directory within projects.
FUZZFORGE_DIR_NAME: str = ".fuzzforge"
#: Standard results archive filename.
RESULTS_ARCHIVE_FILENAME: str = "results.tar.gz"
class StorageError(Exception):
"""Raised when a storage operation fails."""
class LocalStorage:
"""Local filesystem storage backend for FuzzForge.
Provides lightweight storage for project configuration and
execution results tracking.
Directory structure (inside project directory)::
{project_path}/.fuzzforge/
config.json # Project config (source path reference)
runs/ # Execution results
{execution_id}/
results.tar.gz
"""
_base_path: Path
def __init__(self, base_path: Path) -> None:
"""Initialize storage backend.
:param base_path: Root directory for global storage (fallback).
"""
self._base_path = base_path
self._base_path.mkdir(parents=True, exist_ok=True)
def _get_project_path(self, project_path: Path) -> Path:
"""Get the .fuzzforge storage path for a project.
:param project_path: Path to the project directory.
:returns: Storage path (.fuzzforge inside project).
"""
return project_path / FUZZFORGE_DIR_NAME
def init_project(self, project_path: Path) -> Path:
"""Initialize storage for a new project.
Creates a .fuzzforge/ directory inside the project for storing
configuration and execution results.
:param project_path: Path to the project directory.
:returns: Path to the project storage directory.
"""
storage_path = self._get_project_path(project_path)
storage_path.mkdir(parents=True, exist_ok=True)
(storage_path / "runs").mkdir(parents=True, exist_ok=True)
# Create .gitignore to avoid committing large files
gitignore_path = storage_path / ".gitignore"
if not gitignore_path.exists():
gitignore_path.write_text(
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"!config.json\n"
)
logger.info("Initialized project storage: %s", storage_path)
return storage_path
def get_project_assets_path(self, project_path: Path) -> Path | None:
"""Get the configured source path for a project.
:param project_path: Path to the project directory.
:returns: Path to source directory, or None if not configured.
"""
storage_path = self._get_project_path(project_path)
config_path = storage_path / "config.json"
if config_path.exists():
config = json.loads(config_path.read_text())
source_path = config.get("source_path")
if source_path:
path = Path(source_path)
if path.exists():
return path
return None
def set_project_assets(self, project_path: Path, assets_path: Path) -> Path:
"""Set the source path for a project (reference only, no copying).
:param project_path: Path to the project directory.
:param assets_path: Path to source directory.
:returns: The assets path (unchanged).
:raises StorageError: If path doesn't exist.
"""
if not assets_path.exists():
msg = f"Assets path does not exist: {assets_path}"
raise StorageError(msg)
assets_path = assets_path.resolve()
storage_path = self._get_project_path(project_path)
storage_path.mkdir(parents=True, exist_ok=True)
config_path = storage_path / "config.json"
config: dict = {}
if config_path.exists():
config = json.loads(config_path.read_text())
config["source_path"] = str(assets_path)
config_path.write_text(json.dumps(config, indent=2))
logger.info("Set project assets: %s -> %s", project_path.name, assets_path)
return assets_path
def list_executions(self, project_path: Path) -> list[str]:
"""List all execution IDs for a project.
:param project_path: Path to the project directory.
:returns: List of execution IDs.
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
return [d.name for d in runs_dir.iterdir() if d.is_dir()]
def get_execution_results(
self,
project_path: Path,
execution_id: str,
) -> Path | None:
"""Retrieve execution results path.
:param project_path: Path to the project directory.
:param execution_id: Execution ID.
:returns: Path to results archive, or None if not found.
"""
storage_path = self._get_project_path(project_path)
# Try direct path
results_path = storage_path / "runs" / execution_id / RESULTS_ARCHIVE_FILENAME
if results_path.exists():
return results_path
# Search in all run directories
runs_dir = storage_path / "runs"
if runs_dir.exists():
for run_dir in runs_dir.iterdir():
if run_dir.is_dir() and execution_id in run_dir.name:
candidate = run_dir / RESULTS_ARCHIVE_FILENAME
if candidate.exists():
return candidate
return None
def extract_results(self, results_path: Path, destination: Path) -> Path:
"""Extract a results archive to a destination directory.
:param results_path: Path to the results archive.
:param destination: Directory to extract to.
:returns: Path to extracted directory.
:raises StorageError: If extraction fails.
"""
try:
destination.mkdir(parents=True, exist_ok=True)
with Archive(results_path, "r:gz") as tar:
tar.extractall(path=destination) # noqa: S202
logger.info("Extracted results: %s -> %s", results_path, destination)
return destination
except Exception as exc:
msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc

View File

@@ -2,13 +2,11 @@
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, modules, projects, workflows
from fuzzforge_mcp.tools import hub, projects
mcp: FastMCP = FastMCP()
mcp.mount(modules.mcp)
mcp.mount(projects.mcp)
mcp.mount(workflows.mcp)
mcp.mount(hub.mcp)
__all__ = [

View File

@@ -313,3 +313,249 @@ async def add_hub_server(
raise
msg = f"Failed to add hub server: {e}"
raise ToolError(msg) from e
@mcp.tool
async def start_hub_server(server_name: str) -> dict[str, Any]:
"""Start a persistent container session for a hub server.
Starts a Docker container that stays running between tool calls,
allowing stateful interactions. Tools are auto-discovered on start.
Use this for servers like radare2 or ghidra where you want to
keep an analysis session open across multiple tool calls.
After starting, use execute_hub_tool as normal - calls will be
routed to the persistent container automatically.
:param server_name: Name of the hub server to start (e.g., "radare2-mcp").
:return: Session status with container name and start time.
"""
try:
executor = _get_hub_executor()
result = await executor.start_persistent_server(server_name)
return {
"success": True,
"session": result,
"tools": result.get("tools", []),
"tool_count": result.get("tool_count", 0),
"message": (
f"Persistent session started for '{server_name}'. "
f"Discovered {result.get('tool_count', 0)} tools. "
"Use execute_hub_tool to call them — they will reuse this container. "
f"Stop with stop_hub_server('{server_name}') when done."
),
}
except ValueError as e:
msg = f"Server not found: {e}"
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to start persistent server: {e}"
raise ToolError(msg) from e
@mcp.tool
async def stop_hub_server(server_name: str) -> dict[str, Any]:
"""Stop a persistent container session for a hub server.
Terminates the running Docker container and cleans up resources.
After stopping, tool calls will fall back to ephemeral mode
(a new container per call).
:param server_name: Name of the hub server to stop.
:return: Result indicating if the session was stopped.
"""
try:
executor = _get_hub_executor()
stopped = await executor.stop_persistent_server(server_name)
if stopped:
return {
"success": True,
"message": f"Persistent session for '{server_name}' stopped and container removed.",
}
else:
return {
"success": False,
"message": f"No active persistent session found for '{server_name}'.",
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to stop persistent server: {e}"
raise ToolError(msg) from e
@mcp.tool
async def hub_server_status(server_name: str | None = None) -> dict[str, Any]:
"""Get status of persistent hub server sessions.
If server_name is provided, returns status for that specific server.
Otherwise returns status for all active persistent sessions.
:param server_name: Optional specific server to check.
:return: Session status information.
"""
try:
executor = _get_hub_executor()
if server_name:
status = executor.get_persistent_status(server_name)
if status:
return {"active": True, "session": status}
else:
return {
"active": False,
"message": f"No active persistent session for '{server_name}'.",
}
else:
sessions = executor.list_persistent_sessions()
return {
"active_sessions": sessions,
"count": len(sessions),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to get server status: {e}"
raise ToolError(msg) from e
# ------------------------------------------------------------------
# Continuous mode tools
# ------------------------------------------------------------------
@mcp.tool
async def start_continuous_hub_tool(
server_name: str,
start_tool: str,
arguments: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Start a continuous/background tool on a hub server.
Automatically starts a persistent container if not already running,
then calls the server's start tool (e.g., cargo_fuzz_start) which
launches a background process and returns a session_id.
The tool runs indefinitely until stopped with stop_continuous_hub_tool.
Use get_continuous_hub_status to monitor progress.
Example workflow for continuous cargo fuzzing:
1. start_continuous_hub_tool("cargo-fuzzer-mcp", "cargo_fuzz_start", {"project_path": "/data/myproject"})
2. get_continuous_hub_status(session_id) -- poll every 10-30s
3. stop_continuous_hub_tool(session_id) -- when done
:param server_name: Hub server name (e.g., "cargo-fuzzer-mcp").
:param start_tool: Name of the start tool on the server.
:param arguments: Arguments for the start tool.
:return: Start result including session_id for monitoring.
"""
try:
executor = _get_hub_executor()
result = await executor.start_continuous_tool(
server_name=server_name,
start_tool=start_tool,
arguments=arguments or {},
)
# Return the server's response directly — it already contains
# session_id, status, targets, and a message.
return result
except ValueError as e:
msg = f"Server not found: {e}"
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to start continuous tool: {e}"
raise ToolError(msg) from e
@mcp.tool
async def get_continuous_hub_status(session_id: str) -> dict[str, Any]:
"""Get live status of a continuous hub tool session.
Returns current metrics, progress, and recent output from the
running tool. Call periodically (every 10-30 seconds) to monitor.
:param session_id: Session ID returned by start_continuous_hub_tool.
:return: Current status with metrics (executions, coverage, crashes, etc.).
"""
try:
executor = _get_hub_executor()
return await executor.get_continuous_tool_status(session_id)
except ValueError as e:
msg = str(e)
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to get continuous status: {e}"
raise ToolError(msg) from e
@mcp.tool
async def stop_continuous_hub_tool(session_id: str) -> dict[str, Any]:
"""Stop a running continuous hub tool session.
Gracefully stops the background process and returns final results
including total metrics and any artifacts (crash files, etc.).
:param session_id: Session ID of the session to stop.
:return: Final metrics and results summary.
"""
try:
executor = _get_hub_executor()
return await executor.stop_continuous_tool(session_id)
except ValueError as e:
msg = str(e)
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to stop continuous tool: {e}"
raise ToolError(msg) from e
@mcp.tool
async def list_continuous_hub_sessions() -> dict[str, Any]:
"""List all active and recent continuous hub tool sessions.
:return: List of sessions with their status and server info.
"""
try:
executor = _get_hub_executor()
sessions = executor.list_continuous_sessions()
return {
"sessions": sessions,
"count": len(sessions),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to list continuous sessions: {e}"
raise ToolError(msg) from e

View File

@@ -1,392 +0,0 @@
"""Module tools for FuzzForge MCP."""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_runner, get_settings
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.orchestrator import StepResult
mcp: FastMCP = FastMCP()
# Track running background executions
_background_executions: dict[str, dict[str, Any]] = {}
@mcp.tool
async def list_modules() -> dict[str, Any]:
"""List all available FuzzForge modules.
Returns information about modules that can be executed,
including their identifiers, availability status, and metadata
such as use cases, input requirements, and output artifacts.
:return: Dictionary with list of available modules and their details.
"""
try:
runner: Runner = get_runner()
settings = get_settings()
# Use the engine abstraction to list images
# Default filter matches locally-built fuzzforge-* modules
modules = runner.list_module_images(filter_prefix="fuzzforge-")
available_modules = [
{
"identifier": module.identifier,
"image": f"{module.identifier}:{module.version or 'latest'}",
"available": module.available,
"description": module.description,
# New metadata fields from pyproject.toml
"category": module.category,
"language": module.language,
"pipeline_stage": module.pipeline_stage,
"pipeline_order": module.pipeline_order,
"dependencies": module.dependencies,
"continuous_mode": module.continuous_mode,
"typical_duration": module.typical_duration,
# AI-discoverable metadata
"use_cases": module.use_cases,
"input_requirements": module.input_requirements,
"output_artifacts": module.output_artifacts,
}
for module in modules
]
# Sort by pipeline_order if available
available_modules.sort(key=lambda m: (m.get("pipeline_order") or 999, m["identifier"]))
return {
"modules": available_modules,
"count": len(available_modules),
"container_engine": settings.engine.type,
"registry_url": settings.registry.url,
"registry_tag": settings.registry.default_tag,
}
except Exception as exception:
message: str = f"Failed to list modules: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def execute_module(
module_identifier: str,
configuration: dict[str, Any] | None = None,
assets_path: str | None = None,
) -> dict[str, Any]:
"""Execute a FuzzForge module in an isolated container.
This tool runs a module in a sandboxed environment.
The module receives input assets and produces output results.
The response includes `results_path` pointing to the stored results archive.
Use this path directly to read outputs — no need to call `get_execution_results`.
:param module_identifier: The identifier of the module to execute.
:param configuration: Optional configuration dict to pass to the module.
:param assets_path: Optional path to input assets. Use this to pass specific
inputs to a module (e.g. crash files to crash-analyzer) without changing
the project's default assets. If not provided, uses project assets.
:return: Execution result including status and results path.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
result: StepResult = await runner.execute_module(
module_identifier=module_identifier,
project_path=project_path,
configuration=configuration,
assets_path=Path(assets_path) if assets_path else None,
)
return {
"success": result.success,
"execution_id": result.execution_id,
"module": result.module_identifier,
"results_path": str(result.results_path) if result.results_path else None,
"started_at": result.started_at.isoformat(),
"completed_at": result.completed_at.isoformat(),
"error": result.error,
}
except Exception as exception:
message: str = f"Module execution failed: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def start_continuous_module(
module_identifier: str,
configuration: dict[str, Any] | None = None,
assets_path: str | None = None,
) -> dict[str, Any]:
"""Start a module in continuous/background mode.
The module will run indefinitely until stopped with stop_continuous_module().
Use get_continuous_status() to check progress and metrics.
This is useful for long-running modules that should run until
the user decides to stop them.
:param module_identifier: The module to run.
:param configuration: Optional configuration. Set max_duration to 0 for infinite.
:param assets_path: Optional path to input assets.
:return: Execution info including session_id for monitoring.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
session_id = str(uuid.uuid4())[:8]
# Set infinite duration if not specified
if configuration is None:
configuration = {}
if "max_duration" not in configuration:
configuration["max_duration"] = 0 # 0 = infinite
try:
# Determine assets path
if assets_path:
actual_assets_path = Path(assets_path)
else:
storage = runner.storage
actual_assets_path = storage.get_project_assets_path(project_path)
# Use the new non-blocking executor method
executor = runner._executor
result = executor.start_module_continuous(
module_identifier=module_identifier,
assets_path=actual_assets_path,
configuration=configuration,
project_path=project_path,
execution_id=session_id,
)
# Store execution info for tracking
_background_executions[session_id] = {
"session_id": session_id,
"module": module_identifier,
"configuration": configuration,
"started_at": datetime.now(timezone.utc).isoformat(),
"status": "running",
"container_id": result["container_id"],
"input_dir": result["input_dir"],
"project_path": str(project_path),
# Incremental stream.jsonl tracking
"stream_lines_read": 0,
"total_crashes": 0,
}
return {
"success": True,
"session_id": session_id,
"module": module_identifier,
"container_id": result["container_id"],
"status": "running",
"message": f"Continuous module started. Use get_continuous_status('{session_id}') to monitor progress.",
}
except Exception as exception:
message: str = f"Failed to start continuous module: {exception}"
raise ToolError(message) from exception
def _get_continuous_status_impl(session_id: str) -> dict[str, Any]:
"""Internal helper to get continuous session status (non-tool version).
Uses incremental reads of ``stream.jsonl`` via ``tail -n +offset`` so that
only new lines appended since the last poll are fetched and parsed. Crash
counts and latest metrics are accumulated across polls.
"""
if session_id not in _background_executions:
raise ToolError(f"Unknown session: {session_id}. Use list_continuous_sessions() to see active sessions.")
execution = _background_executions[session_id]
container_id = execution.get("container_id")
# Carry forward accumulated state
metrics: dict[str, Any] = {
"total_executions": 0,
"total_crashes": execution.get("total_crashes", 0),
"exec_per_sec": 0,
"coverage": 0,
"current_target": "",
"new_events": [],
}
if container_id:
try:
runner: Runner = get_runner()
executor = runner._executor
# Check container status first
container_status = executor.get_module_status(container_id)
if container_status != "running":
execution["status"] = "stopped" if container_status == "exited" else container_status
# Incremental read: only fetch lines we haven't seen yet
lines_read: int = execution.get("stream_lines_read", 0)
stream_content = executor.read_module_output_incremental(
container_id,
start_line=lines_read + 1,
output_file="/data/output/stream.jsonl",
)
if stream_content:
new_lines = stream_content.strip().split("\n")
new_line_count = 0
for line in new_lines:
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
# Possible torn read on the very last line — skip it
# and do NOT advance the offset so it is re-read next
# poll when the write is complete.
continue
new_line_count += 1
metrics["new_events"].append(event)
# Extract latest metrics snapshot
if event.get("event") == "metrics":
metrics["total_executions"] = event.get("executions", 0)
metrics["current_target"] = event.get("target", "")
metrics["exec_per_sec"] = event.get("exec_per_sec", 0)
metrics["coverage"] = event.get("coverage", 0)
if event.get("event") == "crash_detected":
metrics["total_crashes"] += 1
# Advance offset by successfully parsed lines only
execution["stream_lines_read"] = lines_read + new_line_count
execution["total_crashes"] = metrics["total_crashes"]
except Exception as e:
metrics["error"] = str(e)
# Calculate elapsed time
started_at = execution.get("started_at", "")
elapsed_seconds = 0
if started_at:
try:
start_time = datetime.fromisoformat(started_at)
elapsed_seconds = int((datetime.now(timezone.utc) - start_time).total_seconds())
except Exception:
pass
return {
"session_id": session_id,
"module": execution.get("module"),
"status": execution.get("status"),
"container_id": container_id,
"started_at": started_at,
"elapsed_seconds": elapsed_seconds,
"elapsed_human": f"{elapsed_seconds // 60}m {elapsed_seconds % 60}s",
"metrics": metrics,
}
@mcp.tool
async def get_continuous_status(session_id: str) -> dict[str, Any]:
"""Get the current status and metrics of a running continuous session.
Call this periodically (e.g., every 30 seconds) to get live updates
on progress and metrics.
:param session_id: The session ID returned by start_continuous_module().
:return: Current status, metrics, and any events found.
"""
return _get_continuous_status_impl(session_id)
@mcp.tool
async def stop_continuous_module(session_id: str) -> dict[str, Any]:
"""Stop a running continuous session.
This will gracefully stop the module and collect any results.
:param session_id: The session ID of the session to stop.
:return: Final status and summary of the session.
"""
if session_id not in _background_executions:
raise ToolError(f"Unknown session: {session_id}")
execution = _background_executions[session_id]
container_id = execution.get("container_id")
input_dir = execution.get("input_dir")
try:
# Get final metrics before stopping (use helper, not the tool)
final_metrics = _get_continuous_status_impl(session_id)
# Stop the container and collect results
results_path = None
if container_id:
runner: Runner = get_runner()
executor = runner._executor
try:
results_path = executor.stop_module_continuous(container_id, input_dir)
except Exception:
# Container may have already stopped
pass
execution["status"] = "stopped"
execution["stopped_at"] = datetime.now(timezone.utc).isoformat()
return {
"success": True,
"session_id": session_id,
"message": "Continuous session stopped",
"results_path": str(results_path) if results_path else None,
"final_metrics": final_metrics.get("metrics", {}),
"elapsed": final_metrics.get("elapsed_human", ""),
}
except Exception as exception:
message: str = f"Failed to stop continuous module: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_continuous_sessions() -> dict[str, Any]:
"""List all active and recent continuous sessions.
:return: List of continuous sessions with their status.
"""
sessions = []
for session_id, execution in _background_executions.items():
sessions.append({
"session_id": session_id,
"module": execution.get("module"),
"status": execution.get("status"),
"started_at": execution.get("started_at"),
})
return {
"sessions": sessions,
"count": len(sessions),
}

View File

@@ -3,15 +3,12 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_runner, set_current_project_path
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
mcp: FastMCP = FastMCP()
@@ -22,25 +19,24 @@ async def init_project(project_path: str | None = None) -> dict[str, Any]:
"""Initialize a new FuzzForge project.
Creates a `.fuzzforge/` directory inside the project for storing:
- assets/: Input files (source code, etc.)
- inputs/: Prepared module inputs (for debugging)
- runs/: Execution results from each module
- config.json: Project configuration
- runs/: Execution results
This should be called before executing modules or workflows.
This should be called before executing hub tools.
:param project_path: Path to the project directory. If not provided, uses current directory.
:return: Project initialization result.
"""
runner: Runner = get_runner()
storage = get_storage()
try:
path = Path(project_path) if project_path else get_project_path()
# Track this as the current active project
set_current_project_path(path)
storage_path = runner.init_project(path)
storage_path = storage.init_project(path)
return {
"success": True,
@@ -58,23 +54,18 @@ async def init_project(project_path: str | None = None) -> dict[str, Any]:
async def set_project_assets(assets_path: str) -> dict[str, Any]:
"""Set the initial assets (source code) for a project.
This sets the DEFAULT source directory mounted into modules.
Usually this is the project root containing source code (e.g. Cargo.toml, src/).
This sets the DEFAULT source directory that will be mounted into
hub tool containers via volume mounts.
IMPORTANT: This OVERWRITES the previous assets path. Only call this once
during project setup. To pass different inputs to a specific module
(e.g. crash files to crash-analyzer), use the `assets_path` parameter
on `execute_module` instead.
:param assets_path: Path to the project source directory or archive.
:param assets_path: Path to the project source directory.
:return: Result including stored assets path.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
stored_path = runner.set_project_assets(
stored_path = storage.set_project_assets(
project_path=project_path,
assets_path=Path(assets_path),
)
@@ -100,11 +91,11 @@ async def list_executions() -> dict[str, Any]:
:return: List of execution IDs.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
executions = runner.list_executions(project_path)
executions = storage.list_executions(project_path)
return {
"success": True,
@@ -127,11 +118,11 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
:return: Result including path to results archive.
"""
runner: Runner = get_runner()
storage = get_storage()
project_path: Path = get_project_path()
try:
results_path = runner.get_execution_results(project_path, execution_id)
results_path = storage.get_execution_results(project_path, execution_id)
if results_path is None:
return {
@@ -140,7 +131,7 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
"error": "Execution results not found",
}
result = {
result: dict[str, Any] = {
"success": True,
"execution_id": execution_id,
"results_path": str(results_path),
@@ -148,7 +139,7 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
# Extract if requested
if extract_to:
extracted_path = runner.extract_results(results_path, Path(extract_to))
extracted_path = storage.extract_results(results_path, Path(extract_to))
result["extracted_path"] = str(extracted_path)
return result

View File

@@ -1,92 +0,0 @@
"""Workflow tools for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_runner.orchestrator import WorkflowDefinition, WorkflowStep
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.orchestrator import WorkflowResult
mcp: FastMCP = FastMCP()
@mcp.tool
async def execute_workflow(
workflow_name: str,
steps: list[dict[str, Any]],
initial_assets_path: str | None = None,
) -> dict[str, Any]:
"""Execute a workflow consisting of multiple module steps.
A workflow chains multiple modules together, passing the output of each
module as input to the next. This enables complex pipelines.
:param workflow_name: Name for this workflow execution.
:param steps: List of step definitions, each with "module" and optional "configuration".
:param initial_assets_path: Optional path to initial assets for the first step.
:return: Workflow execution result including status of each step.
Example steps format:
[
{"module": "module-a", "configuration": {"key": "value"}},
{"module": "module-b", "configuration": {}},
{"module": "module-c"}
]
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
# Convert step dicts to WorkflowStep objects
workflow_steps = [
WorkflowStep(
module_identifier=step["module"],
configuration=step.get("configuration"),
name=step.get("name", f"step-{i}"),
)
for i, step in enumerate(steps)
]
workflow = WorkflowDefinition(
name=workflow_name,
steps=workflow_steps,
)
result: WorkflowResult = await runner.execute_workflow(
workflow=workflow,
project_path=project_path,
initial_assets_path=Path(initial_assets_path) if initial_assets_path else None,
)
return {
"success": result.success,
"execution_id": result.execution_id,
"workflow_name": result.name,
"final_results_path": str(result.final_results_path) if result.final_results_path else None,
"steps": [
{
"step_index": step.step_index,
"module": step.module_identifier,
"success": step.success,
"execution_id": step.execution_id,
"results_path": str(step.results_path) if step.results_path else None,
"error": step.error,
}
for step in result.steps
],
}
except Exception as exception:
message: str = f"Workflow execution failed: {exception}"
raise ToolError(message) from exception

View File

@@ -11,16 +11,6 @@ if TYPE_CHECKING:
from fastmcp.client import FastMCPTransport
async def test_list_modules_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the list_modules tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "list_modules" in tool_names
async def test_init_project_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
@@ -31,31 +21,11 @@ async def test_init_project_tool_exists(
assert "init_project" in tool_names
async def test_execute_module_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the execute_module tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "execute_module" in tool_names
async def test_execute_workflow_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the execute_workflow tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "execute_workflow" in tool_names
async def test_mcp_has_expected_tool_count(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that MCP has the expected number of tools."""
tools = await mcp_client.list_tools()
# Should have at least 4 core tools
assert len(tools) >= 4
# Should have project tools + hub tools
assert len(tools) >= 2

View File

@@ -1,26 +0,0 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install system dependencies for Rust compilation
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Rust toolchain with nightly (required for cargo-fuzz)
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -1,45 +0,0 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -1,46 +0,0 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -1,6 +0,0 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -1,58 +0,0 @@
[project]
name = "fuzzforge-cargo-fuzzer"
version = "0.1.0"
description = "Runs continuous coverage-guided fuzzing on Rust targets using cargo-fuzz"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-cargo-fuzzer"
suggested_predecessors = ["fuzzforge-harness-tester"]
continuous_mode = true
use_cases = [
"Run continuous coverage-guided fuzzing on Rust targets with libFuzzer",
"Execute cargo-fuzz on validated harnesses",
"Produce crash artifacts for analysis",
"Long-running fuzzing campaign"
]
common_inputs = [
"validated-harnesses",
"Cargo.toml",
"rust-source-code"
]
output_artifacts = [
"fuzzing_results.json",
"crashes/",
"results.json"
]
output_treatment = "Read fuzzing_results.json which contains: targets_fuzzed, total_crashes, total_executions, crashes_path, and results array with per-target crash info. Display summary of crashes found. The crashes/ directory contains crash inputs for downstream crash-analyzer."

View File

@@ -1,19 +0,0 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,19 +0,0 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -1,538 +0,0 @@
"""Cargo Fuzzer module for FuzzForge.
This module runs cargo-fuzz (libFuzzer) on validated Rust fuzz targets.
It takes a fuzz project with compiled harnesses and runs fuzzing for a
configurable duration, collecting crashes and statistics.
"""
from __future__ import annotations
import json
import os
import re
import shutil
import subprocess
import signal
import time
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults, FuzzForgeModuleStatus
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Cargo Fuzzer module - runs cargo-fuzz with libFuzzer on Rust targets."""
_settings: Settings | None
_fuzz_project_path: Path | None
_target_results: list[TargetResult]
_crashes_path: Path | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "cargo-fuzzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._fuzz_project_path = None
self._target_results = []
self._crashes_path = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module with settings.
:param settings: Module settings.
"""
self._settings = settings
logger.info("cargo-fuzzer preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the fuzzer.
:param resources: Input resources (fuzz project + source).
:returns: Module execution result.
"""
logger.info("cargo-fuzzer starting", resource_count=len(resources))
# Emit initial progress
self.emit_progress(0, status=FuzzForgeModuleStatus.INITIALIZING, message="Setting up fuzzing environment")
self.emit_event("module_started", resource_count=len(resources))
# Setup the fuzzing environment
if not self._setup_environment(resources):
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Failed to setup environment")
return FuzzForgeModuleResults.FAILURE
# Get list of fuzz targets
targets = self._get_fuzz_targets()
if not targets:
logger.error("no fuzz targets found")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="No fuzz targets found")
return FuzzForgeModuleResults.FAILURE
# Filter targets if specific ones were requested
if self._settings and self._settings.targets:
requested = set(self._settings.targets)
targets = [t for t in targets if t in requested]
if not targets:
logger.error("none of the requested targets found", requested=list(requested))
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Requested targets not found")
return FuzzForgeModuleResults.FAILURE
logger.info("found fuzz targets", targets=targets)
self.emit_event("targets_found", targets=targets, count=len(targets))
# Setup output directories
self._crashes_path = PATH_TO_OUTPUTS / "crashes"
self._crashes_path.mkdir(parents=True, exist_ok=True)
# Run fuzzing on each target
# max_duration=0 means infinite/continuous mode
max_duration = self._settings.max_duration if self._settings else 60
is_continuous = max_duration == 0
if is_continuous:
# Continuous mode: cycle through targets indefinitely
# Each target runs for 60 seconds before moving to next
duration_per_target = 60
else:
duration_per_target = max_duration // max(len(targets), 1)
total_crashes = 0
# In continuous mode, loop forever; otherwise loop once
round_num = 0
while True:
round_num += 1
for i, target in enumerate(targets):
if is_continuous:
progress_msg = f"Round {round_num}: Fuzzing {target}"
else:
progress_msg = f"Fuzzing target {i+1}/{len(targets)}"
progress = int((i / len(targets)) * 100) if not is_continuous else 50
self.emit_progress(
progress,
status=FuzzForgeModuleStatus.RUNNING,
message=progress_msg,
current_task=target,
metrics={
"targets_completed": i,
"total_targets": len(targets),
"crashes_found": total_crashes,
"round": round_num if is_continuous else 1,
}
)
self.emit_event("target_started", target=target, index=i, total=len(targets), round=round_num)
result = self._fuzz_target(target, duration_per_target)
self._target_results.append(result)
total_crashes += len(result.crashes)
# Emit target completion
self.emit_event(
"target_completed",
target=target,
crashes=len(result.crashes),
executions=result.stats.total_executions if result.stats else 0,
coverage=result.stats.coverage_edges if result.stats else 0,
)
logger.info("target completed",
target=target,
crashes=len(result.crashes),
execs=result.stats.total_executions if result.stats else 0)
# Exit loop if not continuous mode
if not is_continuous:
break
# Write output
self._write_output()
# Emit final progress
self.emit_progress(
100,
status=FuzzForgeModuleStatus.COMPLETED,
message=f"Fuzzing completed. Found {total_crashes} crashes.",
metrics={
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": sum(r.stats.total_executions for r in self._target_results if r.stats),
}
)
self.emit_event("module_completed", total_crashes=total_crashes, targets_fuzzed=len(targets))
logger.info("cargo-fuzzer completed",
targets=len(self._target_results),
total_crashes=total_crashes)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _setup_environment(self, resources: list[FuzzForgeModuleResource]) -> bool:
"""Setup the fuzzing environment.
:param resources: Input resources.
:returns: True if setup successful.
"""
import shutil
# Find fuzz project in resources
source_fuzz_project = None
source_project_root = None
for resource in resources:
path = Path(resource.path)
if path.is_dir():
# Check for fuzz subdirectory
fuzz_dir = path / "fuzz"
if fuzz_dir.is_dir() and (fuzz_dir / "Cargo.toml").exists():
source_fuzz_project = fuzz_dir
source_project_root = path
break
# Or direct fuzz project
if (path / "Cargo.toml").exists() and (path / "fuzz_targets").is_dir():
source_fuzz_project = path
source_project_root = path.parent
break
if source_fuzz_project is None:
logger.error("no fuzz project found in resources")
return False
# Copy project to writable location since /data/input is read-only
# and cargo-fuzz needs to write corpus, artifacts, and build cache
work_dir = Path("/tmp/fuzz-work")
if work_dir.exists():
shutil.rmtree(work_dir)
# Copy the entire project root
work_project = work_dir / source_project_root.name
shutil.copytree(source_project_root, work_project, dirs_exist_ok=True)
# Update fuzz_project_path to point to the copied location
relative_fuzz = source_fuzz_project.relative_to(source_project_root)
self._fuzz_project_path = work_project / relative_fuzz
logger.info("using fuzz project", path=str(self._fuzz_project_path))
return True
def _get_fuzz_targets(self) -> list[str]:
"""Get list of fuzz target names.
:returns: List of target names.
"""
if self._fuzz_project_path is None:
return []
targets = []
fuzz_targets_dir = self._fuzz_project_path / "fuzz_targets"
if fuzz_targets_dir.is_dir():
for rs_file in fuzz_targets_dir.glob("*.rs"):
targets.append(rs_file.stem)
return targets
def _fuzz_target(self, target: str, duration: int) -> TargetResult:
"""Run fuzzing on a single target.
:param target: Name of the fuzz target.
:param duration: Maximum duration in seconds.
:returns: Fuzzing result for this target.
"""
logger.info("fuzzing target", target=target, duration=duration)
crashes: list[CrashInfo] = []
stats = FuzzingStats()
if self._fuzz_project_path is None:
return TargetResult(target=target, crashes=crashes, stats=stats)
# Create corpus directory for this target
corpus_dir = self._fuzz_project_path / "corpus" / target
corpus_dir.mkdir(parents=True, exist_ok=True)
# Build the command
cmd = [
"cargo", "+nightly", "fuzz", "run",
target,
"--",
]
# Add time limit
if duration > 0:
cmd.append(f"-max_total_time={duration}")
# Use fork mode to continue after crashes
# This makes libFuzzer restart worker after crash instead of exiting
cmd.append("-fork=1")
cmd.append("-ignore_crashes=1")
cmd.append("-print_final_stats=1")
# Add jobs if specified
if self._settings and self._settings.jobs > 1:
cmd.extend([f"-jobs={self._settings.jobs}"])
try:
env = os.environ.copy()
env["CARGO_INCREMENTAL"] = "0"
process = subprocess.Popen(
cmd,
cwd=self._fuzz_project_path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
output_lines = []
start_time = time.time()
last_metrics_emit = 0.0
current_execs = 0
current_cov = 0
current_exec_s = 0
crash_count = 0
# Read output with timeout (skip timeout check in infinite mode)
while True:
if process.poll() is not None:
break
elapsed = time.time() - start_time
# Only enforce timeout if duration > 0 (not infinite mode)
if duration > 0 and elapsed > duration + 30: # Grace period
logger.warning("fuzzer timeout, terminating", target=target)
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
break
try:
if process.stdout:
line = process.stdout.readline()
if line:
output_lines.append(line)
# Parse real-time metrics from libFuzzer output
# Example: "#12345 NEW cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", line)
if exec_match:
current_execs = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", line)
if cov_match:
current_cov = int(cov_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", line)
if exec_s_match:
current_exec_s = int(exec_s_match.group(1))
# Check for crash indicators
if "SUMMARY:" in line or "ERROR:" in line or "crash-" in line.lower():
crash_count += 1
self.emit_event(
"crash_detected",
target=target,
crash_number=crash_count,
line=line.strip(),
)
logger.debug("fuzzer output", line=line.strip())
# Emit metrics periodically (every 2 seconds)
if elapsed - last_metrics_emit >= 2.0:
last_metrics_emit = elapsed
self.emit_event(
"metrics",
target=target,
executions=current_execs,
coverage=current_cov,
exec_per_sec=current_exec_s,
crashes=crash_count,
elapsed_seconds=int(elapsed),
remaining_seconds=max(0, duration - int(elapsed)),
)
except Exception:
pass
# Parse statistics from output
stats = self._parse_fuzzer_stats(output_lines)
# Collect crashes
crashes = self._collect_crashes(target)
# Emit final event for this target if crashes were found
if crashes:
self.emit_event(
"crashes_collected",
target=target,
count=len(crashes),
paths=[c.file_path for c in crashes],
)
except FileNotFoundError:
logger.error("cargo-fuzz not found, please install with: cargo install cargo-fuzz")
stats.error = "cargo-fuzz not installed"
self.emit_event("error", target=target, message="cargo-fuzz not installed")
except Exception as e:
logger.exception("fuzzing error", target=target, error=str(e))
stats.error = str(e)
self.emit_event("error", target=target, message=str(e))
return TargetResult(target=target, crashes=crashes, stats=stats)
def _parse_fuzzer_stats(self, output_lines: list[str]) -> FuzzingStats:
"""Parse fuzzer output for statistics.
:param output_lines: Lines of fuzzer output.
:returns: Parsed statistics.
"""
stats = FuzzingStats()
full_output = "".join(output_lines)
# Parse libFuzzer stats
# Example: "#12345 DONE cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", full_output)
if exec_match:
stats.total_executions = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", full_output)
if cov_match:
stats.coverage_edges = int(cov_match.group(1))
corp_match = re.search(r"corp:\s*(\d+)", full_output)
if corp_match:
stats.corpus_size = int(corp_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", full_output)
if exec_s_match:
stats.executions_per_second = int(exec_s_match.group(1))
return stats
def _collect_crashes(self, target: str) -> list[CrashInfo]:
"""Collect crash files from fuzzer output.
:param target: Name of the fuzz target.
:returns: List of crash info.
"""
crashes: list[CrashInfo] = []
seen_hashes: set[str] = set()
if self._fuzz_project_path is None or self._crashes_path is None:
return crashes
# Check multiple possible crash locations:
# 1. Standard artifacts directory (target-specific)
# 2. Generic artifacts directory
# 3. Fuzz project root (fork mode sometimes writes here)
# 4. Project root (parent of fuzz directory)
search_paths = [
self._fuzz_project_path / "artifacts" / target,
self._fuzz_project_path / "artifacts",
self._fuzz_project_path,
self._fuzz_project_path.parent,
]
for search_dir in search_paths:
if not search_dir.is_dir():
continue
# Use rglob to recursively find crash files
for crash_file in search_dir.rglob("crash-*"):
if not crash_file.is_file():
continue
# Skip duplicates by hash
if crash_file.name in seen_hashes:
continue
seen_hashes.add(crash_file.name)
# Copy crash to output
output_crash = self._crashes_path / target
output_crash.mkdir(parents=True, exist_ok=True)
dest = output_crash / crash_file.name
shutil.copy2(crash_file, dest)
# Read crash input
crash_data = crash_file.read_bytes()
crash_info = CrashInfo(
file_path=str(dest),
input_hash=crash_file.name,
input_size=len(crash_data),
)
crashes.append(crash_info)
logger.info("found crash", target=target, file=crash_file.name, source=str(search_dir))
logger.info("crash collection complete", target=target, total_crashes=len(crashes))
return crashes
def _write_output(self) -> None:
"""Write the fuzzing results to output."""
output_path = PATH_TO_OUTPUTS / "fuzzing_results.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
total_crashes = sum(len(r.crashes) for r in self._target_results)
total_execs = sum(r.stats.total_executions for r in self._target_results if r.stats)
output_data = {
"fuzz_project": str(self._fuzz_project_path),
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": total_execs,
"crashes_path": str(self._crashes_path),
"results": [
{
"target": r.target,
"crashes": [c.model_dump() for c in r.crashes],
"stats": r.stats.model_dump() if r.stats else None,
}
for r in self._target_results
],
}
output_path.write_text(json.dumps(output_data, indent=2))
logger.info("wrote fuzzing results", path=str(output_path))

View File

@@ -1,88 +0,0 @@
"""Models for the cargo-fuzzer module."""
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class FuzzingStats(BaseModel):
"""Statistics from a fuzzing run."""
#: Total number of test case executions
total_executions: int = 0
#: Executions per second
executions_per_second: int = 0
#: Number of coverage edges discovered
coverage_edges: int = 0
#: Size of the corpus
corpus_size: int = 0
#: Any error message
error: str = ""
class CrashInfo(BaseModel):
"""Information about a discovered crash."""
#: Path to the crash input file
file_path: str
#: Hash/name of the crash input
input_hash: str
#: Size of the crash input in bytes
input_size: int = 0
#: Crash type (if identified)
crash_type: str = ""
#: Stack trace (if available)
stack_trace: str = ""
class TargetResult(BaseModel):
"""Result of fuzzing a single target."""
#: Name of the fuzz target
target: str
#: List of crashes found
crashes: list[CrashInfo] = Field(default_factory=list)
#: Fuzzing statistics
stats: FuzzingStats = Field(default_factory=FuzzingStats)
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the cargo-fuzzer module.
Expects:
- A fuzz project directory with validated harnesses
- Optionally the source crate to link against
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the cargo-fuzzer module."""
#: Path to the fuzz project
fuzz_project: str = ""
#: Number of targets fuzzed
targets_fuzzed: int = 0
#: Total crashes found across all targets
total_crashes: int = 0
#: Total executions across all targets
total_executions: int = 0
#: Path to collected crash files
crashes_path: str = ""
#: Results per target
results: list[TargetResult] = Field(default_factory=list)

View File

@@ -1,35 +0,0 @@
"""Settings for the cargo-fuzzer module."""
from typing import Optional
from pydantic import model_validator
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the cargo-fuzzer module."""
#: Maximum fuzzing duration in seconds (total across all targets)
#: Set to 0 for infinite/continuous mode
max_duration: int = 60
#: Number of parallel fuzzing jobs
jobs: int = 1
#: Maximum length of generated inputs
max_len: int = 4096
#: Whether to use AddressSanitizer
use_asan: bool = True
#: Specific targets to fuzz (empty = all targets)
targets: list[str] = []
#: Single target to fuzz (convenience alias for targets)
target: Optional[str] = None
@model_validator(mode="after")
def handle_single_target(self) -> "Settings":
"""Convert single target to targets list if provided."""
if self.target and self.target not in self.targets:
self.targets.append(self.target)
return self

View File

@@ -1,11 +0,0 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -1,45 +0,0 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -1,46 +0,0 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -1,6 +0,0 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -1,58 +0,0 @@
[project]
name = "fuzzforge-crash-analyzer"
version = "0.1.0"
description = "Analyzes fuzzing crashes, deduplicates them, and generates security reports"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
"jinja2==3.1.6",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-crash-analyzer"
suggested_predecessors = ["fuzzforge-cargo-fuzzer"]
continuous_mode = false
use_cases = [
"Analyze Rust crash artifacts from fuzzing",
"Deduplicate crashes by stack trace signature",
"Triage crashes by severity (critical, high, medium, low)",
"Generate security vulnerability reports"
]
common_inputs = [
"crash-artifacts",
"stack-traces",
"rust-source-code"
]
output_artifacts = [
"crash_analysis.json",
"results.json"
]
output_treatment = "Read crash_analysis.json which contains: total_crashes, unique_crashes, duplicate_crashes, severity_summary (high/medium/low/unknown counts), and unique_analyses array with details per crash. Display a summary table of unique crashes by severity."

View File

@@ -1,19 +0,0 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,19 +0,0 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -1,340 +0,0 @@
"""Crash Analyzer module for FuzzForge.
This module analyzes crashes from cargo-fuzz, deduplicates them,
extracts stack traces, and triages them by severity.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashAnalysis, Severity
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Crash Analyzer module - analyzes and triages fuzzer crashes."""
_settings: Settings | None
_analyses: list[CrashAnalysis]
_fuzz_project_path: Path | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "crash-analyzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._analyses = []
self._fuzz_project_path = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module.
:param settings: Module settings.
"""
self._settings = settings
logger.info("crash-analyzer preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the crash analyzer.
:param resources: Input resources (fuzzing results + crashes).
:returns: Module execution result.
"""
logger.info("crash-analyzer starting", resource_count=len(resources))
# Find crashes directory and fuzz project
crashes_path = None
for resource in resources:
path = Path(resource.path)
if path.is_dir():
if path.name == "crashes" or (path / "crashes").is_dir():
crashes_path = path if path.name == "crashes" else path / "crashes"
if (path / "fuzz_targets").is_dir():
self._fuzz_project_path = path
if (path / "fuzz" / "fuzz_targets").is_dir():
self._fuzz_project_path = path / "fuzz"
if crashes_path is None:
# Try to find crashes in fuzzing_results.json
for resource in resources:
path = Path(resource.path)
if path.name == "fuzzing_results.json" and path.exists():
with open(path) as f:
data = json.load(f)
if "crashes_path" in data:
crashes_path = Path(data["crashes_path"])
break
if crashes_path is None or not crashes_path.exists():
logger.warning("no crashes found to analyze")
self._write_output()
return FuzzForgeModuleResults.SUCCESS
logger.info("analyzing crashes", path=str(crashes_path))
# Analyze crashes per target
for target_dir in crashes_path.iterdir():
if target_dir.is_dir():
target = target_dir.name
for crash_file in target_dir.glob("crash-*"):
if crash_file.is_file():
analysis = self._analyze_crash(target, crash_file)
self._analyses.append(analysis)
# Deduplicate crashes
self._deduplicate_crashes()
# Write output
self._write_output()
unique_count = sum(1 for a in self._analyses if not a.is_duplicate)
logger.info("crash-analyzer completed",
total=len(self._analyses),
unique=unique_count)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _analyze_crash(self, target: str, crash_file: Path) -> CrashAnalysis:
"""Analyze a single crash.
:param target: Name of the fuzz target.
:param crash_file: Path to the crash input file.
:returns: Crash analysis result.
"""
logger.debug("analyzing crash", target=target, file=crash_file.name)
# Read crash input
crash_data = crash_file.read_bytes()
input_hash = hashlib.sha256(crash_data).hexdigest()[:16]
# Try to reproduce and get stack trace
stack_trace = ""
crash_type = "unknown"
severity = Severity.UNKNOWN
if self._fuzz_project_path:
stack_trace, crash_type = self._reproduce_crash(target, crash_file)
severity = self._determine_severity(crash_type, stack_trace)
return CrashAnalysis(
target=target,
input_file=str(crash_file),
input_hash=input_hash,
input_size=len(crash_data),
crash_type=crash_type,
severity=severity,
stack_trace=stack_trace,
is_duplicate=False,
)
def _reproduce_crash(self, target: str, crash_file: Path) -> tuple[str, str]:
"""Reproduce a crash to get stack trace.
:param target: Name of the fuzz target.
:param crash_file: Path to the crash input file.
:returns: Tuple of (stack_trace, crash_type).
"""
if self._fuzz_project_path is None:
return "", "unknown"
try:
env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
result = subprocess.run(
[
"cargo", "+nightly", "fuzz", "run",
target,
str(crash_file),
"--",
"-runs=1",
],
cwd=self._fuzz_project_path,
capture_output=True,
text=True,
timeout=30,
env=env,
)
output = result.stdout + result.stderr
# Extract crash type
crash_type = "unknown"
if "heap-buffer-overflow" in output.lower():
crash_type = "heap-buffer-overflow"
elif "stack-buffer-overflow" in output.lower():
crash_type = "stack-buffer-overflow"
elif "heap-use-after-free" in output.lower():
crash_type = "use-after-free"
elif "null" in output.lower() and "deref" in output.lower():
crash_type = "null-pointer-dereference"
elif "panic" in output.lower():
crash_type = "panic"
elif "assertion" in output.lower():
crash_type = "assertion-failure"
elif "timeout" in output.lower():
crash_type = "timeout"
elif "out of memory" in output.lower() or "oom" in output.lower():
crash_type = "out-of-memory"
# Extract stack trace
stack_lines = []
in_stack = False
for line in output.splitlines():
if "SUMMARY:" in line or "ERROR:" in line:
in_stack = True
if in_stack:
stack_lines.append(line)
if len(stack_lines) > 50: # Limit stack trace length
break
return "\n".join(stack_lines), crash_type
except subprocess.TimeoutExpired:
return "", "timeout"
except Exception as e:
logger.warning("failed to reproduce crash", error=str(e))
return "", "unknown"
def _determine_severity(self, crash_type: str, stack_trace: str) -> Severity:
"""Determine crash severity based on type and stack trace.
:param crash_type: Type of the crash.
:param stack_trace: Stack trace string.
:returns: Severity level.
"""
high_severity = [
"heap-buffer-overflow",
"stack-buffer-overflow",
"use-after-free",
"double-free",
]
medium_severity = [
"null-pointer-dereference",
"out-of-memory",
"integer-overflow",
]
low_severity = [
"panic",
"assertion-failure",
"timeout",
]
if crash_type in high_severity:
return Severity.HIGH
elif crash_type in medium_severity:
return Severity.MEDIUM
elif crash_type in low_severity:
return Severity.LOW
else:
return Severity.UNKNOWN
def _deduplicate_crashes(self) -> None:
"""Mark duplicate crashes based on stack trace similarity."""
seen_signatures: set[str] = set()
for analysis in self._analyses:
# Create a signature from crash type and key stack frames
signature = self._create_signature(analysis)
if signature in seen_signatures:
analysis.is_duplicate = True
else:
seen_signatures.add(signature)
def _create_signature(self, analysis: CrashAnalysis) -> str:
"""Create a unique signature for a crash.
:param analysis: Crash analysis.
:returns: Signature string.
"""
# Use crash type + first few significant stack frames
parts = [analysis.target, analysis.crash_type]
# Extract function names from stack trace
func_pattern = re.compile(r"in (\S+)")
funcs = func_pattern.findall(analysis.stack_trace)
# Use first 3 unique functions
seen = set()
for func in funcs:
if func not in seen and not func.startswith("std::"):
parts.append(func)
seen.add(func)
if len(seen) >= 3:
break
return "|".join(parts)
def _write_output(self) -> None:
"""Write the analysis results to output."""
output_path = PATH_TO_OUTPUTS / "crash_analysis.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
unique = [a for a in self._analyses if not a.is_duplicate]
duplicates = [a for a in self._analyses if a.is_duplicate]
# Group by severity
by_severity = {
"high": [a for a in unique if a.severity == Severity.HIGH],
"medium": [a for a in unique if a.severity == Severity.MEDIUM],
"low": [a for a in unique if a.severity == Severity.LOW],
"unknown": [a for a in unique if a.severity == Severity.UNKNOWN],
}
output_data = {
"total_crashes": len(self._analyses),
"unique_crashes": len(unique),
"duplicate_crashes": len(duplicates),
"severity_summary": {k: len(v) for k, v in by_severity.items()},
"unique_analyses": [a.model_dump() for a in unique],
"duplicate_analyses": [a.model_dump() for a in duplicates],
}
output_path.write_text(json.dumps(output_data, indent=2, default=str))
logger.info("wrote crash analysis", path=str(output_path))

View File

@@ -1,79 +0,0 @@
"""Models for the crash-analyzer module."""
from enum import Enum
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Severity(str, Enum):
"""Severity level of a crash."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
UNKNOWN = "unknown"
class CrashAnalysis(BaseModel):
"""Analysis of a single crash."""
#: Name of the fuzz target
target: str
#: Path to the input file that caused the crash
input_file: str
#: Hash of the input for identification
input_hash: str
#: Size of the input in bytes
input_size: int = 0
#: Type of crash (e.g., "heap-buffer-overflow", "panic")
crash_type: str = "unknown"
#: Severity level
severity: Severity = Severity.UNKNOWN
#: Stack trace from reproducing the crash
stack_trace: str = ""
#: Whether this crash is a duplicate of another
is_duplicate: bool = False
#: Signature for deduplication
signature: str = ""
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the crash-analyzer module.
Expects:
- Crashes directory from cargo-fuzzer
- Optionally the fuzz project for reproduction
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the crash-analyzer module."""
#: Total number of crashes analyzed
total_crashes: int = 0
#: Number of unique crashes (after deduplication)
unique_crashes: int = 0
#: Number of duplicate crashes
duplicate_crashes: int = 0
#: Summary by severity
severity_summary: dict[str, int] = Field(default_factory=dict)
#: Unique crash analyses
unique_analyses: list[CrashAnalysis] = Field(default_factory=list)
#: Duplicate crash analyses
duplicate_analyses: list[CrashAnalysis] = Field(default_factory=list)

View File

@@ -1,16 +0,0 @@
"""Settings for the crash-analyzer module."""
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the crash-analyzer module."""
#: Whether to reproduce crashes for stack traces
reproduce_crashes: bool = True
#: Timeout for reproducing each crash (seconds)
reproduce_timeout: int = 30
#: Whether to deduplicate crashes
deduplicate: bool = True

View File

@@ -1,12 +0,0 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# See MODULE_METADATA.md for documentation on configuring metadata
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -1,45 +0,0 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -1,46 +0,0 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -1,6 +0,0 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -1,59 +0,0 @@
[project]
name = "fuzzforge-module-template"
version = "0.1.0"
description = "FIXME: Add module description"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
# REQUIRED: Unique module identifier (should match Docker image name)
identifier = "fuzzforge-module-template"
# Optional: List of module identifiers that should run before this one
suggested_predecessors = []
# Optional: Whether this module supports continuous/background execution
continuous_mode = false
# REQUIRED: Use cases help AI agents understand when to use this module
# Include language/target info here (e.g., "Analyze Rust crate...")
use_cases = [
"FIXME: Describe what this module does",
"FIXME: Describe typical usage scenario"
]
# REQUIRED: What inputs the module expects
common_inputs = [
"FIXME: List required input files or artifacts"
]
# REQUIRED: What outputs the module produces
output_artifacts = [
"FIXME: List output files produced"
]

View File

@@ -1,19 +0,0 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,19 +0,0 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource, FuzzForgeModulesSettingsType
class Module(FuzzForgeModule):
"""TODO."""
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "FIXME"
version: str = "FIXME"
FuzzForgeModule.__init__(self, name=name, version=version)
@classmethod
def _get_input_type(cls) -> type[Input]:
"""TODO."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""TODO."""
return Output
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults: # noqa: ARG002
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""

View File

@@ -1,11 +0,0 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""TODO."""
class Output(FuzzForgeModuleOutputBase):
"""TODO."""

View File

@@ -1,7 +0,0 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""TODO."""
# Here goes your attributes

View File

@@ -1,30 +0,0 @@
# FuzzForge Modules SDK - Base image for all modules
#
# This image provides:
# - Python 3.14 with uv package manager
# - Pre-built wheels for common dependencies
# - Standard module directory structure
FROM ghcr.io/astral-sh/uv:python3.14-bookworm-slim
# Install system dependencies commonly needed by modules
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Set up application directory structure
WORKDIR /app
# Create FuzzForge standard directories
RUN mkdir -p /fuzzforge/input /fuzzforge/output
# Copy wheels directory (built by parent Makefile)
COPY .wheels /wheels
# Set up uv for the container
ENV UV_SYSTEM_PYTHON=1
ENV UV_COMPILE_BYTECODE=1
ENV UV_LINK_MODE=copy
# Default entrypoint - modules override this
ENTRYPOINT ["uv", "run", "module"]

View File

@@ -1,39 +0,0 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
SOURCES=./src
TESTS=./tests
FUZZFORGE_MODULE_TEMPLATE=$(PWD)/src/fuzzforge_modules_sdk/templates/module
.PHONY: bandit clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -1,67 +0,0 @@
# FuzzForge Modules SDK
...
# Setup
- start the podman user socket
```shell
systemctl --user start podman.socket
```
NB : you can also automaticllay start it at boot
```shell
systemctl --user enable --now podman.socket
```
## HACK : fix missing `fuzzforge-modules-sdk`
- if you have this error when using some fuzzforge-modules-sdk deps :
```shell
make format
uv run ruff format ./src ./tests
× No solution found when resolving dependencies:
╰─▶ Because fuzzforge-modules-sdk was not found in the package registry and your project depends on fuzzforge-modules-sdk==0.0.1, we can
conclude that your project's requirements are unsatisfiable.
And because your project requires opengrep[lints], we can conclude that your project's requirements are unsatisfiable.
make: *** [Makefile:30: format] Error 1
```
- build a wheel package of fuzzforge-modules-sdk
```shell
cd fuzzforge_ng/fuzzforge-modules/fuzzforge-modules-sdk
uv build
```
- then inside your module project, install it
```shell
cd fuzzforge_ng_modules/mymodule
uv sync --all-extras --find-links ../../fuzzforge_ng/dist/
```
# Usage
## Prepare
- enter venv (or use uv run)
```shell
source .venv/bin/activate
```
- create a new module
```shell
fuzzforge-modules-sdk new module --name my_new_module --directory ../fuzzforge_ng_modules/
```
- build the base image
```shell
fuzzforge-modules-sdk build image
```

View File

@@ -1,7 +0,0 @@
[mypy]
exclude = ^src/fuzzforge_modules_sdk/templates/.*
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -1,32 +0,0 @@
[project]
name = "fuzzforge-modules-sdk"
version = "0.0.1"
description = "Software development kit (SDK) for FuzzForge's modules."
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"podman==5.6.0",
"pydantic==2.12.4",
"structlog==25.5.0",
"tomlkit==0.13.3",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
fuzzforge-modules-sdk = "fuzzforge_modules_sdk._cli.main:main"
[tool.setuptools.package-data]
fuzzforge_modules_sdk = [
"assets/**/*",
"templates/**/*",
]

View File

@@ -1,19 +0,0 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,66 +0,0 @@
from importlib.resources import files
from pathlib import Path
from shutil import copyfile, copytree
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Literal
import os
from podman import PodmanClient
from tomlkit import TOMLDocument, parse
if TYPE_CHECKING:
from importlib.resources.abc import Traversable
def _get_default_podman_socket() -> str:
"""Get the default Podman socket path for the current user."""
uid = os.getuid()
return f"unix:///run/user/{uid}/podman/podman.sock"
PATH_TO_SOURCES: Path = Path(__file__).parent.parent
def _build_podman_image(directory: Path, tag: str, socket: str | None = None) -> None:
if socket is None:
socket = _get_default_podman_socket()
with PodmanClient(base_url=socket) as client:
client.images.build(
dockerfile="Dockerfile",
nocache=True,
path=directory,
tag=tag,
)
def build_base_image(engine: Literal["podman"], socket: str | None = None) -> None:
with TemporaryDirectory() as directory:
path_to_assets: Traversable = files("fuzzforge_modules_sdk").joinpath("assets")
copyfile(
src=str(path_to_assets.joinpath("Dockerfile")),
dst=Path(directory).joinpath("Dockerfile"),
)
copyfile(
src=str(path_to_assets.joinpath("pyproject.toml")),
dst=Path(directory).joinpath("pyproject.toml"),
)
copytree(src=str(PATH_TO_SOURCES), dst=Path(directory).joinpath("src").joinpath(PATH_TO_SOURCES.name))
# update the file 'pyproject.toml'
path: Path = Path(directory).joinpath("pyproject.toml")
data: TOMLDocument = parse(path.read_text())
name: str = data["project"]["name"] # type: ignore[assignment, index]
version: str = data["project"]["version"] # type: ignore[assignment, index]
tag: str = f"{name}:{version}"
match engine:
case "podman":
_build_podman_image(
directory=Path(directory),
socket=socket,
tag=tag,
)
case _:
message: str = f"unsupported engine '{engine}'"
raise Exception(message) # noqa: TRY002

View File

@@ -1,30 +0,0 @@
from __future__ import annotations
from importlib.resources import files
from shutil import copytree, ignore_patterns
from typing import TYPE_CHECKING
from tomlkit import dumps, parse
if TYPE_CHECKING:
from importlib.resources.abc import Traversable
from pathlib import Path
from tomlkit import TOMLDocument
def create_new_module(name: str, directory: Path) -> None:
source: Traversable = files("fuzzforge_modules_sdk").joinpath("templates").joinpath("fuzzforge-module-template")
destination: Path = directory.joinpath(name) # TODO: sanitize path
copytree(
src=str(source),
dst=destination,
ignore=ignore_patterns("__pycache__", "*.egg-info", "*.pyc", ".mypy_cache", ".ruff_cache", ".venv"),
)
# update the file 'pyproject.toml'
path: Path = destination.joinpath("pyproject.toml")
data: TOMLDocument = parse(path.read_text())
data["project"]["name"] = name # type: ignore[index]
del data["tool"]["uv"]["sources"] # type: ignore[index, union-attr]
path.write_text(dumps(data))

View File

@@ -1,71 +0,0 @@
from argparse import ArgumentParser
from pathlib import Path
from fuzzforge_modules_sdk._cli.build_base_image import build_base_image
from fuzzforge_modules_sdk._cli.create_new_module import create_new_module
def create_parser() -> ArgumentParser:
parser: ArgumentParser = ArgumentParser(
prog="fuzzforge-modules-sdk", description="Utilities for the Fuzzforge Modules SDK."
)
subparsers = parser.add_subparsers(required=True)
# fuzzforge-modules-sdk build ...
parser_build = subparsers.add_parser(name="build")
subparsers_build = parser_build.add_subparsers(required=True)
# fuzzforge-modules-sdk build image ...
parser_build_image = subparsers_build.add_parser(
name="image",
help="Build the image.",
)
parser_build_image.add_argument(
"--engine",
default="podman",
)
parser_build_image.add_argument(
"--socket",
default=None,
)
parser_build_image.set_defaults(
function_to_execute=build_base_image,
)
# fuzzforge-modules-sdk new ...
parser_new = subparsers.add_parser(name="new")
subparsers_new = parser_new.add_subparsers(required=True)
# fuzzforge-modules-sdk new module ...
parser_new_module = subparsers_new.add_parser(
name="module",
help="Generate the boilerplate required to create a new module.",
)
parser_new_module.add_argument(
"--name",
help="The name of the module to create.",
required=True,
)
parser_new_module.add_argument(
"--directory",
default=".",
type=Path,
help="The directory the new module should be created into (defaults to current working directory).",
)
parser_new_module.set_defaults(
function_to_execute=create_new_module,
)
return parser
def main() -> None:
"""Entry point for the command-line interface."""
parser: ArgumentParser = create_parser()
arguments = parser.parse_args()
function_to_execute = arguments.function_to_execute
del arguments.function_to_execute
function_to_execute(**vars(arguments))

View File

@@ -1,9 +0,0 @@
from pathlib import Path
PATH_TO_DATA: Path = Path("/fuzzforge")
PATH_TO_INPUTS: Path = PATH_TO_DATA.joinpath("input")
PATH_TO_INPUT: Path = PATH_TO_INPUTS.joinpath("input.json")
PATH_TO_OUTPUTS: Path = PATH_TO_DATA.joinpath("output")
PATH_TO_ARTIFACTS: Path = PATH_TO_OUTPUTS.joinpath("artifacts")
PATH_TO_RESULTS: Path = PATH_TO_OUTPUTS.joinpath("results.json")
PATH_TO_LOGS: Path = PATH_TO_OUTPUTS.joinpath("logs.jsonl")

View File

@@ -1,2 +0,0 @@
class FuzzForgeModuleError(Exception):
"""TODO."""

View File

@@ -1,43 +0,0 @@
import logging
import sys
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_LOGS
class Formatter(logging.Formatter):
"""TODO."""
def format(self, record: logging.LogRecord) -> str:
"""TODO."""
record.exc_info = None
return super().format(record)
def configure() -> None:
"""TODO."""
fmt: str = "%(message)s"
level = logging.DEBUG
PATH_TO_LOGS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_LOGS.unlink(missing_ok=True)
handler_file = logging.FileHandler(filename=PATH_TO_LOGS, mode="a")
handler_file.setFormatter(fmt=Formatter(fmt=fmt))
handler_file.setLevel(level=level)
handler_stderr = logging.StreamHandler(stream=sys.stderr)
handler_stderr.setFormatter(fmt=Formatter(fmt=fmt))
handler_stderr.setLevel(level=level)
logger: logging.Logger = logging.getLogger()
logger.setLevel(level=level)
logger.addHandler(handler_file)
logger.addHandler(handler_stderr)
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
)

View File

@@ -1,90 +0,0 @@
"""FuzzForge modules SDK models.
This module provides backward-compatible exports for all model types.
For Core SDK compatibility, use imports from `fuzzforge_modules_sdk.api.models.mod`.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from pydantic import ConfigDict
# Re-export from mod.py for Core SDK compatibility
from fuzzforge_modules_sdk.api.models.mod import (
Base,
FuzzForgeModuleInputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResources,
FuzzForgeModulesSettingsBase,
FuzzForgeModulesSettingsType,
)
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""Module execution result enumeration."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleStatus(StrEnum):
"""Possible statuses emitted by a running module."""
#: Module is setting up its environment.
INITIALIZING = "initializing"
#: Module is actively running.
RUNNING = "running"
#: Module finished successfully.
COMPLETED = "completed"
#: Module encountered an error.
FAILED = "failed"
#: Module was stopped by the orchestrator (SIGTERM).
STOPPED = "stopped"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifact]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults
__all__ = [
# Core SDK compatible exports
"Base",
"FuzzForgeModuleInputBase",
"FuzzForgeModuleResource",
"FuzzForgeModuleResources",
"FuzzForgeModulesSettingsBase",
"FuzzForgeModulesSettingsType",
# OSS-specific exports (also used in OSS modules)
"FuzzForgeModuleArtifact",
"FuzzForgeModuleArtifacts",
"FuzzForgeModuleOutputBase",
"FuzzForgeModuleResults",
"FuzzForgeModuleStatus",
]

View File

@@ -1,53 +0,0 @@
"""Core module models for FuzzForge modules SDK.
This module contains the base classes for module settings, inputs, and resources.
These are compatible with the fuzzforge-core SDK structure.
"""
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from typing import TypeVar
from pydantic import BaseModel, ConfigDict
class Base(BaseModel):
"""Base model for all FuzzForge module types."""
model_config = ConfigDict(extra="forbid")
class FuzzForgeModulesSettingsBase(Base):
"""Base class for module settings."""
FuzzForgeModulesSettingsType = TypeVar("FuzzForgeModulesSettingsType", bound=FuzzForgeModulesSettingsBase)
class FuzzForgeModuleResources(StrEnum):
"""Enumeration of resource types."""
#: The type of the resource is unknown or irrelevant.
UNKNOWN = "unknown"
class FuzzForgeModuleResource(Base):
"""A resource provided to a module as input."""
#: The description of the resource.
description: str
#: The type of the resource.
kind: FuzzForgeModuleResources
#: The name of the resource.
name: str
#: The path of the resource on disk.
path: Path
class FuzzForgeModuleInputBase[FuzzForgeModulesSettingsType: FuzzForgeModulesSettingsBase](Base):
"""The (standardized) input of a FuzzForge module."""
#: The collection of resources given to the module as inputs.
resources: list[FuzzForgeModuleResource]
#: The settings of the module.
settings: FuzzForgeModulesSettingsType

View File

@@ -1,335 +0,0 @@
from abc import ABC, abstractmethod
import json
import signal
import threading
import time
from datetime import datetime, timezone
from shutil import rmtree
from typing import TYPE_CHECKING, Any, Final, final
from structlog import get_logger
from fuzzforge_modules_sdk.api.constants import (
PATH_TO_ARTIFACTS,
PATH_TO_INPUT,
PATH_TO_LOGS,
PATH_TO_RESULTS,
)
from fuzzforge_modules_sdk.api.exceptions import FuzzForgeModuleError
from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleArtifact,
FuzzForgeModuleArtifacts,
FuzzForgeModuleInputBase,
FuzzForgeModuleOutputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResults,
FuzzForgeModuleStatus,
FuzzForgeModulesSettingsType,
)
if TYPE_CHECKING:
from pathlib import Path
from structlog.stdlib import BoundLogger
class FuzzForgeModule(ABC):
"""FuzzForge Modules' base."""
__artifacts: dict[str, FuzzForgeModuleArtifact]
#: The logger associated with the module.
__logger: Final[BoundLogger]
#: The name of the module.
__name: Final[str]
#: The version of the module.
__version: Final[str]
#: Start time for progress tracking.
__start_time: float
#: Custom output data set by the module.
__output_data: dict[str, Any]
#: Event set when stop is requested (SIGTERM received).
#: Using :class:`threading.Event` so multi-threaded modules can
#: efficiently wait on it via :pymethod:`threading.Event.wait`.
__stop_requested: threading.Event
def __init__(self, name: str, version: str) -> None:
"""Initialize an instance of the class.
:param name: The name of the module.
:param version: The version of the module.
"""
self.__artifacts = {}
self.__logger = get_logger("module")
self.__name = name
self.__version = version
self.__start_time = time.time()
self.__output_data = {}
self.__stop_requested = threading.Event()
# Register SIGTERM handler for graceful shutdown
signal.signal(signal.SIGTERM, self._handle_sigterm)
@final
def get_logger(self) -> BoundLogger:
"""Return the logger associated with the module."""
return self.__logger
@final
def get_name(self) -> str:
"""Return the name of the module."""
return self.__name
@final
def get_version(self) -> str:
"""Return the version of the module."""
return self.__version
@final
def is_stop_requested(self) -> bool:
"""Check if stop was requested (SIGTERM received).
Long-running modules should check this periodically and exit gracefully
when True. Results will be written automatically on SIGTERM.
The underlying :class:`threading.Event` can be obtained via
:meth:`stop_event` for modules that need to *wait* on it.
:returns: True if SIGTERM was received.
"""
return self.__stop_requested.is_set()
@final
def stop_event(self) -> threading.Event:
"""Return the stop :class:`threading.Event`.
Multi-threaded modules can use ``self.stop_event().wait(timeout)``
instead of polling :meth:`is_stop_requested` in a busy-loop.
:returns: The threading event that is set on SIGTERM.
"""
return self.__stop_requested
@final
def _handle_sigterm(self, signum: int, frame: Any) -> None:
"""Handle SIGTERM signal for graceful shutdown.
Sets the stop event and emits a final progress update, then returns.
The normal :meth:`main` lifecycle (run → cleanup → write results) will
complete as usual once :meth:`_run` observes :meth:`is_stop_requested`
and returns, giving the module a chance to do any last-minute work
before the process exits.
:param signum: Signal number.
:param frame: Current stack frame.
"""
self.__stop_requested.set()
self.get_logger().info("received SIGTERM, stopping after current operation")
# Emit final progress update
self.emit_progress(
progress=100,
status=FuzzForgeModuleStatus.STOPPED,
message="Module stopped by orchestrator (SIGTERM)",
)
@final
def set_output(self, **kwargs: Any) -> None:
"""Set custom output data to be included in results.json.
Call this from _run() to add module-specific fields to the output.
:param kwargs: Key-value pairs to include in the output.
Example:
self.set_output(
total_targets=4,
valid_targets=["target1", "target2"],
results=[...]
)
"""
self.__output_data.update(kwargs)
@final
def emit_progress(
self,
progress: int,
status: FuzzForgeModuleStatus = FuzzForgeModuleStatus.RUNNING,
message: str = "",
metrics: dict[str, Any] | None = None,
current_task: str = "",
) -> None:
"""Emit a structured progress event to stdout (JSONL).
Progress is written as a single JSON line to stdout so that the
orchestrator can capture it via ``kubectl logs`` without requiring
any file-system access inside the container.
:param progress: Progress percentage (0-100).
:param status: Current module status.
:param message: Human-readable status message.
:param metrics: Dictionary of metrics (e.g., {"executions": 1000, "coverage": 50}).
:param current_task: Name of the current task being performed.
"""
self.emit_event(
"progress",
status=status.value,
progress=max(0, min(100, progress)),
message=message,
current_task=current_task,
metrics=metrics or {},
)
@final
def emit_event(self, event: str, **data: Any) -> None:
"""Emit a structured event to stdout as a single JSONL line.
All module events (including progress updates) are written to stdout
so the orchestrator can stream them in real time via ``kubectl logs``.
:param event: Event type (e.g., ``"crash_found"``, ``"target_started"``,
``"progress"``, ``"metrics"``).
:param data: Additional event data as keyword arguments.
"""
event_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_seconds": round(self.get_elapsed_seconds(), 2),
"module": self.__name,
"event": event,
**data,
}
print(json.dumps(event_data), flush=True)
@final
def get_elapsed_seconds(self) -> float:
"""Return the elapsed time since module start.
:returns: Elapsed time in seconds.
"""
return time.time() - self.__start_time
@final
def _register_artifact(self, name: str, kind: FuzzForgeModuleArtifacts, description: str, path: Path) -> None:
"""Register an artifact.
:param name: The name of the artifact.
:param kind: The type of the artifact.
:param description: The description of the artifact.
:param path: The path of the artifact on the file system.
"""
source: Path = path.resolve(strict=True)
destination: Path = PATH_TO_ARTIFACTS.joinpath(name).resolve()
if destination.parent != PATH_TO_ARTIFACTS:
message: str = f"path '{destination} is not a direct descendant of path '{PATH_TO_ARTIFACTS}'"
raise FuzzForgeModuleError(message)
if destination.exists(follow_symlinks=False):
if destination.is_file() or destination.is_symlink():
destination.unlink()
elif destination.is_dir():
rmtree(destination)
else:
message = f"unable to remove resource at path '{destination}': unsupported resource type"
raise FuzzForgeModuleError(message)
destination.parent.mkdir(exist_ok=True, parents=True)
source.copy(destination)
self.__artifacts[name] = FuzzForgeModuleArtifact(
description=description,
kind=kind,
name=name,
path=path,
)
@final
def main(self) -> None:
"""Execute the module lifecycle: prepare → run → cleanup → write results."""
result = FuzzForgeModuleResults.SUCCESS
try:
buffer: bytes = PATH_TO_INPUT.read_bytes()
data = self._get_input_type().model_validate_json(buffer)
self._prepare(settings=data.settings)
except: # noqa: E722
self.get_logger().exception(event="exception during 'prepare' step")
result = FuzzForgeModuleResults.FAILURE
if result != FuzzForgeModuleResults.FAILURE:
try:
result = self._run(resources=data.resources)
except: # noqa: E722
self.get_logger().exception(event="exception during 'run' step")
result = FuzzForgeModuleResults.FAILURE
if result != FuzzForgeModuleResults.FAILURE:
try:
self._cleanup(settings=data.settings)
except: # noqa: E722
self.get_logger().exception(event="exception during 'cleanup' step")
output = self._get_output_type()(
artifacts=list(self.__artifacts.values()),
logs=PATH_TO_LOGS,
result=result,
**self.__output_data,
)
PATH_TO_RESULTS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_RESULTS.write_bytes(output.model_dump_json().encode("utf-8"))
@classmethod
@abstractmethod
def _get_input_type(cls) -> type[FuzzForgeModuleInputBase[Any]]:
"""TODO."""
message: str = f"method '_get_input_type' is not implemented for class '{cls.__name__}'"
raise NotImplementedError(message)
@classmethod
@abstractmethod
def _get_output_type(cls) -> type[FuzzForgeModuleOutputBase]:
"""TODO."""
message: str = f"method '_get_output_type' is not implemented for class '{cls.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
message: str = f"method '_prepare' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
message: str = f"method '_run' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
message: str = f"method '_cleanup' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)

View File

@@ -1,20 +0,0 @@
FROM docker.io/debian:trixie as base
COPY --from=ghcr.io/astral-sh/uv:0.9.10 /uv /uvx /bin/
FROM base as builder
WORKDIR /sdk
COPY ./src /sdk/src
COPY ./pyproject.toml /sdk/pyproject.toml
RUN uv build --wheel -o /sdk/distributions
FROM base as final
COPY --from=builder /sdk/distributions /wheels
WORKDIR /app
CMD [ "/usr/bin/sleep", "infinity" ]

View File

@@ -1,12 +0,0 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is read from pyproject.toml [tool.fuzzforge.module] section
# See MODULE_METADATA.md for documentation on configuring metadata
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -1,45 +0,0 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -1,46 +0,0 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -1,6 +0,0 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -1,62 +0,0 @@
[project]
name = "fuzzforge-module-template"
version = "0.1.0"
description = "FIXME: Add module description"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
# REQUIRED: Unique module identifier (should match Docker image name)
identifier = "fuzzforge-module-template"
# Optional: List of module identifiers that should run before this one
suggested_predecessors = []
# Optional: Whether this module supports continuous/background execution
continuous_mode = false
# REQUIRED: Use cases help AI agents understand when to use this module
# Include language/target info here (e.g., "Analyze Rust crate...")
use_cases = [
"FIXME: Describe what this module does",
"FIXME: Describe typical usage scenario"
]
# REQUIRED: What inputs the module expects
common_inputs = [
"FIXME: List required input files or artifacts"
]
# REQUIRED: What outputs the module produces
output_artifacts = [
"FIXME: List output files produced"
]
# REQUIRED: How AI should display output to user
output_treatment = "FIXME: Describe how to present the output"

View File

@@ -1,19 +0,0 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,19 +0,0 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -1,54 +0,0 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource, FuzzForgeModulesSettingsType
class Module(FuzzForgeModule):
"""TODO."""
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "FIXME"
version: str = "FIXME"
FuzzForgeModule.__init__(self, name=name, version=version)
@classmethod
def _get_input_type(cls) -> type[Input]:
"""TODO."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""TODO."""
return Output
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults: # noqa: ARG002
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""

View File

@@ -1,11 +0,0 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""TODO."""
class Output(FuzzForgeModuleOutputBase):
"""TODO."""

View File

@@ -1,7 +0,0 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""TODO."""
# Here goes your attributes

View File

@@ -1,26 +0,0 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install build tools and Rust nightly for compiling and testing fuzz harnesses
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz for testing harnesses
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
COPY ./README.md /app/README.md
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -1,289 +0,0 @@
# Harness Tester Feedback Types
Complete reference of all feedback the `harness-tester` module provides to help AI agents improve fuzz harnesses.
## Overview
The harness-tester evaluates harnesses across **6 dimensions** and provides specific, actionable suggestions for each issue detected.
---
## 1. Compilation Feedback
### ✅ Success Cases
- **Compiles successfully** → Strength noted
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `undefined_variable` | CRITICAL | "cannot find" in error | Check variable names match function signature. Use exact names from fuzzable_functions.json |
| `type_mismatch` | CRITICAL | "mismatched types" in error | Check function expects types you're passing. Convert fuzzer input to correct type (e.g., &[u8] to &str with from_utf8) |
| `trait_not_implemented` | CRITICAL | "trait" + "not implemented" | Ensure you're using correct types. Some functions require specific trait implementations |
| `compilation_error` | CRITICAL | Any other error | Review error message and fix syntax/type issues. Check function signatures in source code |
### ⚠️ Warning Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unused_variable` | INFO | "unused" in warning | Remove unused variables or use underscore prefix (_variable) to suppress warning |
---
## 2. Execution Feedback
### ✅ Success Cases
- **Executes without crashing** → Strength noted
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `stack_overflow` | CRITICAL | "stack overflow" in crash | Check for infinite recursion or large stack allocations. Use heap allocation (Box, Vec) for large data structures |
| `panic_on_start` | CRITICAL | "panic" in crash | Check initialization code. Ensure required resources are available and input validation doesn't panic on empty input |
| `immediate_crash` | CRITICAL | Crashes on first run | Debug harness initialization. Add error handling and check for null/invalid pointers |
| `infinite_loop` | CRITICAL | Execution timeout | Check for loops that depend on fuzzer input. Add iteration limits or timeout mechanisms |
---
## 3. Coverage Feedback
### ✅ Success Cases
- **>50% coverage** → "Excellent coverage"
- **Good growth** → "Harness exploring code paths"
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `no_coverage` | CRITICAL | 0 new edges found | Ensure you're actually calling the target function with fuzzer-provided data. Check that 'data' parameter is passed to function |
| `very_low_coverage` | WARNING | <5% coverage or "none" growth | Harness may not be reaching target code. Verify correct entry point function. Check if input validation rejects all fuzzer data |
| `low_coverage` | WARNING | <20% coverage or "poor" growth | Try fuzzing multiple entry points or remove restrictive input validation. Consider using dictionary for structured inputs |
| `early_stagnation` | INFO | Coverage stops growing <10s | Harness may be hitting input validation barriers. Consider fuzzing with seed corpus of valid inputs |
---
## 4. Performance Feedback
### ✅ Success Cases
- **>1000 execs/s** → "Excellent performance"
- **>500 execs/s** → "Good performance"
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `extremely_slow` | CRITICAL | <10 execs/s | Remove file I/O, network operations, or expensive computations from harness loop. Move setup code outside fuzz target function |
| `slow_execution` | WARNING | <100 execs/s | Optimize harness: avoid allocations in hot path, reuse buffers, remove logging. Profile to find bottlenecks |
---
## 5. Stability Feedback
### ✅ Success Cases
- **Stable execution** → Strength noted
- **Found unique crashes** → "Found N potential bugs!"
### ⚠️ Warning Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unstable_frequent_crashes` | WARNING | >10 crashes per 1000 execs | This might be expected if testing buggy code. If not, add error handling for edge cases or invalid inputs |
| `hangs_detected` | WARNING | Hangs found during trial | Add timeouts to prevent infinite loops. Check for blocking operations or resource exhaustion |
---
## 6. Code Quality Feedback
### Informational
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unused_variable` | INFO | Compiler warnings | Clean up code for better maintainability |
---
## Quality Scoring Formula
```
Base Score: 20 points (for compiling + running)
+ Coverage (0-40 points):
- Excellent growth: +40
- Good growth: +30
- Poor growth: +10
- No growth: +0
+ Performance (0-25 points):
- >1000 execs/s: +25
- >500 execs/s: +20
- >100 execs/s: +10
- >10 execs/s: +5
- <10 execs/s: +0
+ Stability (0-15 points):
- Stable: +15
- Unstable: +10
- Crashes frequently: +5
Maximum: 100 points
```
### Verdicts
- **70-100**: `production-ready` → Use for long-term fuzzing campaigns
- **30-69**: `needs-improvement` → Fix issues before production use
- **0-29**: `broken` → Critical issues block execution
---
## Example Feedback Flow
### Scenario 1: Broken Harness (Type Mismatch)
```json
{
"quality": {
"score": 0,
"verdict": "broken",
"issues": [
{
"category": "compilation",
"severity": "critical",
"type": "type_mismatch",
"message": "Type mismatch: expected &[u8], found &str",
"suggestion": "Check function expects types you're passing. Convert fuzzer input to correct type (e.g., &[u8] to &str with from_utf8)"
}
],
"recommended_actions": [
"Fix 1 critical issue(s) preventing execution"
]
}
}
```
**AI Agent Action**: Regenerate harness with correct type conversion
---
### Scenario 2: Low Coverage Harness
```json
{
"quality": {
"score": 35,
"verdict": "needs-improvement",
"issues": [
{
"category": "coverage",
"severity": "warning",
"type": "low_coverage",
"message": "Low coverage: 12% - not exploring enough code paths",
"suggestion": "Try fuzzing multiple entry points or remove restrictive input validation"
},
{
"category": "performance",
"severity": "warning",
"type": "slow_execution",
"message": "Slow execution: 45 execs/sec (expected 500+)",
"suggestion": "Optimize harness: avoid allocations in hot path, reuse buffers"
}
],
"strengths": [
"Compiles successfully",
"Executes without crashing"
],
"recommended_actions": [
"Address 2 warning(s) to improve harness quality"
]
}
}
```
**AI Agent Action**: Remove input validation, optimize performance
---
### Scenario 3: Production-Ready Harness
```json
{
"quality": {
"score": 85,
"verdict": "production-ready",
"issues": [],
"strengths": [
"Compiles successfully",
"Executes without crashing",
"Excellent coverage: 67% of target code reached",
"Excellent performance: 1507 execs/sec",
"Stable execution - no crashes or hangs"
],
"recommended_actions": [
"Harness is ready for production fuzzing"
]
}
}
```
**AI Agent Action**: Proceed to long-term fuzzing with cargo-fuzzer
---
## Integration with AI Workflow
```python
def iterative_harness_generation(target_function):
"""AI agent iteratively improves harness based on feedback."""
max_iterations = 3
for iteration in range(max_iterations):
# Generate or improve harness
if iteration == 0:
harness = ai_generate_harness(target_function)
else:
harness = ai_improve_harness(previous_harness, feedback)
# Test harness
result = execute_module("harness-tester", harness)
evaluation = result["harnesses"][0]
# Check verdict
if evaluation["quality"]["verdict"] == "production-ready":
return harness # Success!
# Extract feedback for next iteration
feedback = {
"issues": evaluation["quality"]["issues"],
"suggestions": [issue["suggestion"] for issue in evaluation["quality"]["issues"]],
"score": evaluation["quality"]["score"],
"coverage": evaluation["fuzzing_trial"]["coverage"] if "fuzzing_trial" in evaluation else None,
"performance": evaluation["fuzzing_trial"]["performance"] if "fuzzing_trial" in evaluation else None
}
# Store for next iteration
previous_harness = harness
return harness # Return best attempt after max iterations
```
---
## Summary
The harness-tester provides **comprehensive, actionable feedback** across 6 dimensions:
1.**Compilation** - Syntax and type correctness
2.**Execution** - Runtime stability
3.**Coverage** - Code exploration effectiveness
4.**Performance** - Execution speed
5.**Stability** - Crash/hang frequency
6.**Code Quality** - Best practices
Each issue includes:
- **Clear detection** of what went wrong
- **Specific suggestion** on how to fix it
- **Severity level** to prioritize fixes
This enables AI agents to rapidly iterate and produce high-quality fuzz harnesses with minimal human intervention.

Some files were not shown because too many files have changed in this diff Show More