Compare commits

..

1 Commits

Author SHA1 Message Date
AFredefon
aa50787869 fix(hub): increase StreamReader limit and add volumes/expandvars support 2026-03-02 02:19:08 +01:00
254 changed files with 11139 additions and 8937 deletions

View File

@@ -1,86 +0,0 @@
name: CI
on:
push:
branches: [main, dev, feature/*]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
lint-and-typecheck:
name: Lint & Type Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync
- name: Ruff check (secpipe-cli)
run: |
cd secpipe-cli
uv run --extra lints ruff check src/
- name: Ruff check (secpipe-mcp)
run: |
cd secpipe-mcp
uv run --extra lints ruff check src/
- name: Ruff check (secpipe-common)
run: |
cd secpipe-common
uv run --extra lints ruff check src/
- name: Mypy type check (secpipe-cli)
run: |
cd secpipe-cli
uv run --extra lints mypy src/
- name: Mypy type check (secpipe-mcp)
run: |
cd secpipe-mcp
uv run --extra lints mypy src/
# NOTE: Mypy check for secpipe-common temporarily disabled
# due to 37 pre-existing type errors in legacy code.
# TODO: Fix type errors and re-enable strict checking
#- name: Mypy type check (secpipe-common)
# run: |
# cd secpipe-common
# uv run --extra lints mypy src/
test:
name: Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Run MCP tests
run: |
cd secpipe-mcp
uv run --extra tests pytest -v
- name: Run common tests
run: |
cd secpipe-common
uv run --extra tests pytest -v

View File

@@ -1,49 +0,0 @@
name: MCP Server Smoke Test
on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
mcp-server:
name: MCP Server Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Start MCP server in background
run: |
cd secpipe-mcp
nohup uv run uvicorn secpipe_mcp.application:app --host 127.0.0.1 --port 8000 > server.log 2>&1 &
echo $! > server.pid
sleep 3
- name: Run MCP tool tests
run: |
cd secpipe-mcp
uv run --extra tests pytest tests/test_resources.py -v
- name: Stop MCP server
if: always()
run: |
if [ -f secpipe-mcp/server.pid ]; then
kill $(cat secpipe-mcp/server.pid) || true
fi
- name: Show server logs
if: failure()
run: cat secpipe-mcp/server.log || true

4
.gitignore vendored
View File

@@ -10,7 +10,3 @@ __pycache__
# Podman/Docker container storage artifacts
~/.fuzzforge/
# User-specific hub config (generated at runtime)
hub-config.json
*.egg-info/

View File

@@ -1,8 +1,8 @@
# Contributing to SecPipe AI
# Contributing to FuzzForge AI
Thank you for your interest in contributing to SecPipe AI! We welcome contributions from the community and are excited to collaborate with you.
Thank you for your interest in contributing to FuzzForge AI! We welcome contributions from the community and are excited to collaborate with you.
**Our Vision**: SecPipe aims to be a **universal platform for security research** across all cybersecurity domains. Through our modular architecture, any security tool—from fuzzing engines to cloud scanners, from mobile app analyzers to IoT security tools—can be integrated as a containerized module and controlled via AI agents.
**Our Vision**: FuzzForge aims to be a **universal platform for security research** across all cybersecurity domains. Through our modular architecture, any security tool—from fuzzing engines to cloud scanners, from mobile app analyzers to IoT security tools—can be integrated as a containerized module and controlled via AI agents.
## Ways to Contribute
@@ -13,7 +13,7 @@ Thank you for your interest in contributing to SecPipe AI! We welcome contributi
- **Documentation** - Improve guides, tutorials, and module documentation
- **Testing** - Help test new features and report issues
- **AI Integration** - Improve MCP tools and AI agent interactions
- **Tool Integrations** - Wrap existing security tools as SecPipe modules
- **Tool Integrations** - Wrap existing security tools as FuzzForge modules
## Contribution Guidelines
@@ -71,13 +71,13 @@ test(runner): add container execution tests
3. **Test Your Changes**
```bash
# Test modules
SECPIPE_MODULES_PATH=./secpipe-modules uv run secpipe modules list
FUZZFORGE_MODULES_PATH=./fuzzforge-modules uv run fuzzforge modules list
# Run a module
uv run secpipe modules run your-module --assets ./test-assets
uv run fuzzforge modules run your-module --assets ./test-assets
# Test MCP integration (if applicable)
uv run secpipe mcp status
uv run fuzzforge mcp status
```
4. **Submit Pull Request**
@@ -88,11 +88,11 @@ test(runner): add container execution tests
## Module Development
SecPipe uses a modular architecture where security tools run as isolated containers. The `secpipe-modules-sdk` provides everything you need to create new modules.
FuzzForge uses a modular architecture where security tools run as isolated containers. The `fuzzforge-modules-sdk` provides everything you need to create new modules.
**Documentation:**
- [Module SDK Documentation](secpipe-modules/secpipe-modules-sdk/README.md) - Complete SDK reference
- [Module Template](secpipe-modules/secpipe-module-template/) - Starting point for new modules
- [Module SDK Documentation](fuzzforge-modules/fuzzforge-modules-sdk/README.md) - Complete SDK reference
- [Module Template](fuzzforge-modules/fuzzforge-module-template/) - Starting point for new modules
- [USAGE Guide](USAGE.md) - Setup and installation instructions
### Creating a New Module
@@ -100,8 +100,8 @@ SecPipe uses a modular architecture where security tools run as isolated contain
1. **Use the Module Template**
```bash
# Generate a new module from template
cd secpipe-modules/
cp -r secpipe-module-template my-new-module
cd fuzzforge-modules/
cp -r fuzzforge-module-template my-new-module
cd my-new-module
```
@@ -127,8 +127,8 @@ SecPipe uses a modular architecture where security tools run as isolated contain
Edit `src/module/mod.py`:
```python
from secpipe_modules_sdk.api.modules import BaseModule
from secpipe_modules_sdk.api.models import ModuleResult
from fuzzforge_modules_sdk.api.modules import BaseModule
from fuzzforge_modules_sdk.api.models import ModuleResult
from .models import MyModuleConfig, MyModuleOutput
class MyModule(BaseModule[MyModuleConfig, MyModuleOutput]):
@@ -157,7 +157,7 @@ SecPipe uses a modular architecture where security tools run as isolated contain
Edit `src/module/models.py`:
```python
from pydantic import BaseModel, Field
from secpipe_modules_sdk.api.models import BaseModuleConfig, BaseModuleOutput
from fuzzforge_modules_sdk.api.models import BaseModuleConfig, BaseModuleOutput
class MyModuleConfig(BaseModuleConfig):
"""Configuration for your module."""
@@ -173,31 +173,31 @@ SecPipe uses a modular architecture where security tools run as isolated contain
5. **Build Your Module**
```bash
# Build the SDK first (if not already done)
cd ../secpipe-modules-sdk
cd ../fuzzforge-modules-sdk
uv build
mkdir -p .wheels
cp ../../dist/secpipe_modules_sdk-*.whl .wheels/
cp ../../dist/fuzzforge_modules_sdk-*.whl .wheels/
cd ../..
docker build -t localhost/secpipe-modules-sdk:0.1.0 secpipe-modules/secpipe-modules-sdk/
docker build -t localhost/fuzzforge-modules-sdk:0.1.0 fuzzforge-modules/fuzzforge-modules-sdk/
# Build your module
cd secpipe-modules/my-new-module
docker build -t secpipe-my-new-module:0.1.0 .
cd fuzzforge-modules/my-new-module
docker build -t fuzzforge-my-new-module:0.1.0 .
```
6. **Test Your Module**
```bash
# Run with test assets
uv run secpipe modules run my-new-module --assets ./test-assets
uv run fuzzforge modules run my-new-module --assets ./test-assets
# Check module info
uv run secpipe modules info my-new-module
uv run fuzzforge modules info my-new-module
```
### Module Development Guidelines
**Important Conventions:**
- **Input/Output**: Use `/secpipe/input` for assets and `/secpipe/output` for results
- **Input/Output**: Use `/fuzzforge/input` for assets and `/fuzzforge/output` for results
- **Configuration**: Support JSON configuration via stdin or file
- **Logging**: Use structured logging (structlog is pre-configured)
- **Error Handling**: Return proper exit codes and error messages
@@ -206,12 +206,12 @@ SecPipe uses a modular architecture where security tools run as isolated contain
- **Dependencies**: Minimize container size, use multi-stage builds
**See also:**
- [Module SDK API Reference](secpipe-modules/secpipe-modules-sdk/src/secpipe_modules_sdk/api/)
- [Module SDK API Reference](fuzzforge-modules/fuzzforge-modules-sdk/src/fuzzforge_modules_sdk/api/)
- [Dockerfile Best Practices](https://docs.docker.com/develop/develop-images/dockerfile_best-practices/)
### Module Types
SecPipe is designed to support modules across **all cybersecurity domains**. The modular architecture allows any security tool to be containerized and integrated. Here are the main categories:
FuzzForge is designed to support modules across **all cybersecurity domains**. The modular architecture allows any security tool to be containerized and integrated. Here are the main categories:
**Application Security**
- Fuzzing engines (coverage-guided, grammar-based, mutation-based)
@@ -273,8 +273,8 @@ SecPipe is designed to support modules across **all cybersecurity domains**. The
```python
# src/module/mod.py
from pathlib import Path
from secpipe_modules_sdk.api.modules import BaseModule
from secpipe_modules_sdk.api.models import ModuleResult
from fuzzforge_modules_sdk.api.modules import BaseModule
from fuzzforge_modules_sdk.api.models import ModuleResult
from .models import ScannerConfig, ScannerOutput
class SecurityScanner(BaseModule[ScannerConfig, ScannerOutput]):
@@ -341,7 +341,7 @@ uv run pytest
## Contributing to Core Features
Beyond modules, you can contribute to SecPipe's core components.
Beyond modules, you can contribute to FuzzForge's core components.
**Useful Resources:**
- [Project Structure](README.md) - Overview of the codebase
@@ -350,18 +350,18 @@ Beyond modules, you can contribute to SecPipe's core components.
### Core Components
- **secpipe-mcp** - MCP server for AI agent integration
- **secpipe-runner** - Module execution engine
- **secpipe-cli** - Command-line interface
- **secpipe-common** - Shared utilities and sandbox engines
- **secpipe-types** - Type definitions and schemas
- **fuzzforge-mcp** - MCP server for AI agent integration
- **fuzzforge-runner** - Module execution engine
- **fuzzforge-cli** - Command-line interface
- **fuzzforge-common** - Shared utilities and sandbox engines
- **fuzzforge-types** - Type definitions and schemas
### Development Setup
1. **Clone and Install**
```bash
git clone https://github.com/FuzzingLabs/secpipe_ai.git
cd secpipe_ai
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
uv sync --all-extras
```
@@ -371,7 +371,7 @@ Beyond modules, you can contribute to SecPipe's core components.
make test
# Run specific package tests
cd secpipe-mcp
cd fuzzforge-mcp
uv run pytest
```
@@ -381,7 +381,7 @@ Beyond modules, you can contribute to SecPipe's core components.
make typecheck
# Type check specific package
cd secpipe-runner
cd fuzzforge-runner
uv run mypy .
```
@@ -399,7 +399,7 @@ Beyond modules, you can contribute to SecPipe's core components.
When reporting bugs, please include:
- **Environment**: OS, Python version, Docker version, uv version
- **SecPipe Version**: Output of `uv run secpipe --version`
- **FuzzForge Version**: Output of `uv run fuzzforge --version`
- **Module**: Which module or component is affected
- **Steps to Reproduce**: Clear steps to recreate the issue
- **Expected Behavior**: What should happen
@@ -419,7 +419,7 @@ When reporting bugs, please include:
**Module:** my-custom-scanner
**Steps to Reproduce:**
1. Run `uv run secpipe modules run my-scanner --assets ./test-target`
1. Run `uv run fuzzforge modules run my-scanner --assets ./test-target`
2. Module fails with timeout error
**Expected:** Module completes analysis
@@ -491,7 +491,7 @@ Brief description of what this module does.
## Usage
\`\`\`bash
uv run secpipe modules run module-name --assets ./path/to/assets
uv run fuzzforge modules run module-name --assets ./path/to/assets
\`\`\`
## Output
@@ -538,7 +538,7 @@ Before submitting a new module:
## License
By contributing to SecPipe AI, you agree that your contributions will be licensed under the same license as the project (see [LICENSE](LICENSE)).
By contributing to FuzzForge AI, you agree that your contributions will be licensed under the same license as the project (see [LICENSE](LICENSE)).
For module contributions:
- Modules you create remain under the project license
@@ -552,12 +552,12 @@ For module contributions:
Need help contributing?
- Join our [Discord](https://discord.gg/8XEX33UUwZ)
- Read the [Module SDK Documentation](secpipe-modules/secpipe-modules-sdk/README.md)
- Read the [Module SDK Documentation](fuzzforge-modules/fuzzforge-modules-sdk/README.md)
- Check the module template for examples
- Contact: contact@fuzzinglabs.com
---
**Thank you for making SecPipe better!**
**Thank you for making FuzzForge better!**
Every contribution, no matter how small, helps build a stronger security research platform. Whether you're creating a module for web security, cloud scanning, mobile analysis, or any other cybersecurity domain, your work makes SecPipe more powerful and versatile for the entire security community!
Every contribution, no matter how small, helps build a stronger security research platform. Whether you're creating a module for web security, cloud scanning, mobile analysis, or any other cybersecurity domain, your work makes FuzzForge more powerful and versatile for the entire security community!

View File

@@ -1,10 +1,10 @@
.PHONY: help install sync format lint typecheck test build-hub-images clean
.PHONY: help install sync format lint typecheck test build-modules build-hub-images clean
SHELL := /bin/bash
# Default target
help:
@echo "SecPipe AI Development Commands"
@echo "FuzzForge AI Development Commands"
@echo ""
@echo " make install - Install all dependencies"
@echo " make sync - Sync shared packages from upstream"
@@ -12,6 +12,7 @@ help:
@echo " make lint - Lint code with ruff"
@echo " make typecheck - Type check with mypy"
@echo " make test - Run all tests"
@echo " make build-modules - Build all module container images"
@echo " make build-hub-images - Build all mcp-security-hub images"
@echo " make clean - Clean build artifacts"
@echo ""
@@ -20,17 +21,17 @@ help:
install:
uv sync
# Sync shared packages from upstream secpipe-core
# Sync shared packages from upstream fuzzforge-core
sync:
@if [ -z "$(UPSTREAM)" ]; then \
echo "Usage: make sync UPSTREAM=/path/to/secpipe-core"; \
echo "Usage: make sync UPSTREAM=/path/to/fuzzforge-core"; \
exit 1; \
fi
./scripts/sync-upstream.sh $(UPSTREAM)
# Format all packages
format:
@for pkg in packages/secpipe-*/; do \
@for pkg in packages/fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ]; then \
echo "Formatting $$pkg..."; \
cd "$$pkg" && uv run ruff format . && cd -; \
@@ -39,7 +40,7 @@ format:
# Lint all packages
lint:
@for pkg in packages/secpipe-*/; do \
@for pkg in packages/fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ]; then \
echo "Linting $$pkg..."; \
cd "$$pkg" && uv run ruff check . && cd -; \
@@ -48,7 +49,7 @@ lint:
# Type check all packages
typecheck:
@for pkg in packages/secpipe-*/; do \
@for pkg in packages/fuzzforge-*/; do \
if [ -f "$$pkg/pyproject.toml" ] && [ -f "$$pkg/mypy.ini" ]; then \
echo "Type checking $$pkg..."; \
cd "$$pkg" && uv run mypy . && cd -; \
@@ -57,13 +58,42 @@ typecheck:
# Run all tests
test:
@for pkg in packages/secpipe-*/; do \
@for pkg in packages/fuzzforge-*/; do \
if [ -f "$$pkg/pytest.ini" ]; then \
echo "Testing $$pkg..."; \
cd "$$pkg" && uv run pytest && cd -; \
fi \
done
# Build all module container images
# Uses Docker by default, or Podman if FUZZFORGE_ENGINE=podman
build-modules:
@echo "Building FuzzForge module images..."
@if [ "$$FUZZFORGE_ENGINE" = "podman" ]; then \
if [ -n "$$SNAP" ]; then \
echo "Using Podman with isolated storage (Snap detected)"; \
CONTAINER_CMD="podman --root ~/.fuzzforge/containers/storage --runroot ~/.fuzzforge/containers/run"; \
else \
echo "Using Podman"; \
CONTAINER_CMD="podman"; \
fi; \
else \
echo "Using Docker"; \
CONTAINER_CMD="docker"; \
fi; \
for module in fuzzforge-modules/*/; do \
if [ -f "$$module/Dockerfile" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-modules-sdk/" ] && \
[ "$$module" != "fuzzforge-modules/fuzzforge-module-template/" ]; then \
name=$$(basename $$module); \
version=$$(grep 'version' "$$module/pyproject.toml" 2>/dev/null | head -1 | sed 's/.*"\(.*\\)".*/\\1/' || echo "0.1.0"); \
echo "Building $$name:$$version..."; \
$$CONTAINER_CMD build -t "fuzzforge-$$name:$$version" "$$module" || exit 1; \
fi \
done
@echo ""
@echo "✓ All modules built successfully!"
# Build all mcp-security-hub images for the firmware analysis pipeline
build-hub-images:
@bash scripts/build-hub-images.sh

4
NOTICE
View File

@@ -1,4 +1,4 @@
SecPipe
FuzzForge
Copyright (c) 2025 FuzzingLabs
This product includes software developed by FuzzingLabs (https://fuzzforge.ai).
@@ -7,6 +7,6 @@ Licensed under the Business Source License 1.1 (BSL).
After the Change Date (four years from the date of publication), this version
of the Licensed Work will be made available under the Apache License, Version 2.0.
You may not use the name "FuzzingLabs" or "SecPipe" nor the names of its
You may not use the name "FuzzingLabs" or "FuzzForge" nor the names of its
contributors to endorse or promote products derived from this software
without specific prior written permission.

275
README.md
View File

@@ -1,4 +1,4 @@
<h1 align="center">SecPipe</h1>
<h1 align="center"> FuzzForge AI</h1>
<h3 align="center">AI-Powered Security Research Orchestration via MCP</h3>
<p align="center">
@@ -6,6 +6,7 @@
<a href="LICENSE"><img src="https://img.shields.io/badge/license-BSL%201.1-blue" alt="License: BSL 1.1"></a>
<a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.12%2B-blue" alt="Python 3.12+"/></a>
<a href="https://modelcontextprotocol.io"><img src="https://img.shields.io/badge/MCP-compatible-green" alt="MCP Compatible"/></a>
<a href="https://fuzzforge.ai"><img src="https://img.shields.io/badge/Website-fuzzforge.ai-purple" alt="Website"/></a>
</p>
<p align="center">
@@ -16,68 +17,63 @@
<sub>
<a href="#-overview"><b>Overview</b></a> •
<a href="#-features"><b>Features</b></a> •
<a href="#-mcp-security-hub"><b>Security Hub</b></a> •
<a href="#-installation"><b>Installation</b></a> •
<a href="USAGE.md"><b>Usage Guide</b></a> •
<a href="#-modules"><b>Modules</b></a> •
<a href="#-contributing"><b>Contributing</b></a>
</sub>
</p>
---
> 🚧 **SecPipe AI is under active development.** Expect breaking changes and new features!
> 🚧 **FuzzForge AI is under active development.** Expect breaking changes and new features!
---
## 🚀 Overview
**SecPipe AI** is an open-source MCP server that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
**FuzzForge AI** is an open-source runtime that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
SecPipe connects your AI assistant to **MCP tool hubs** — collections of containerized security tools that the agent can discover, chain, and execute autonomously. Instead of manually running security tools, describe what you want and let your AI assistant handle it.
### The Core: Modules
### The Core: Hub Architecture
At the heart of FuzzForge are **modules** - containerized security tools that AI agents can discover, configure, and orchestrate. Each module encapsulates a specific security capability (static analysis, fuzzing, crash analysis, etc.) and runs in an isolated container.
SecPipe acts as a **meta-MCP server** — a single MCP endpoint that gives your AI agent access to tools from multiple MCP hub servers. Each hub server is a containerized security tool (Binwalk, YARA, Radare2, Nmap, etc.) that the agent can discover at runtime.
- **🔌 Plug & Play**: Modules are self-contained - just pull and run
- **🤖 AI-Native**: Designed for AI agent orchestration via MCP
- **🔗 Composable**: Chain modules together into automated workflows
- **📦 Extensible**: Build custom modules with the Python SDK
- **🔍 Discovery**: The agent lists available hub servers and discovers their tools
- **🤖 AI-Native**: Hub tools provide agent context — usage tips, workflow guidance, and domain knowledge
- **🔗 Composable**: Chain tools from different hubs into automated pipelines
- **📦 Extensible**: Add your own MCP servers to the hub registry
FuzzForge AI handles module discovery, execution, and result collection. Security modules (developed separately) provide the actual security tooling - from static analyzers to fuzzers to crash triagers.
### 🎬 Use Case: Firmware Vulnerability Research
> **Scenario**: Analyze a firmware image to find security vulnerabilities — fully automated by an AI agent.
```
User: "Search for vulnerabilities in firmware.bin"
Agent → Binwalk: Extract filesystem from firmware image
Agent → YARA: Scan extracted files for vulnerability patterns
Agent → Radare2: Trace dangerous function calls in prioritized binaries
Agent → Report: 8 vulnerabilities found (2 critical, 4 high, 2 medium)
```
Instead of manually running security tools, describe what you want and let your AI assistant handle it.
### 🎬 Use Case: Rust Fuzzing Pipeline
> **Scenario**: Fuzz a Rust crate to discover vulnerabilities using AI-assisted harness generation and parallel fuzzing.
```
User: "Fuzz the blurhash crate for vulnerabilities"
Agent → Rust Analyzer: Identify fuzzable functions and attack surface
Agent → Harness Gen: Generate and validate fuzzing harnesses
Agent → Cargo Fuzzer: Run parallel coverage-guided fuzzing sessions
Agent → Crash Analysis: Deduplicate and triage discovered crashes
```
<table align="center">
<tr>
<th>1⃣ Analyze, Generate & Validate Harnesses</th>
<th>2⃣ Run Parallel Continuous Fuzzing</th>
</tr>
<tr>
<td><img src="assets/demopart2.gif" alt="FuzzForge Demo - Analysis Pipeline" width="100%"></td>
<td><img src="assets/demopart1.gif" alt="FuzzForge Demo - Parallel Fuzzing" width="100%"></td>
</tr>
<tr>
<td align="center"><sub>AI agent analyzes code, generates harnesses, and validates they compile</sub></td>
<td align="center"><sub>Multiple fuzzing sessions run in parallel with live metrics</sub></td>
</tr>
</table>
---
## ⭐ Support the Project
If you find SecPipe useful, please **star the repo** to support development! 🚀
If you find FuzzForge useful, please **star the repo** to support development! 🚀
<a href="https://github.com/FuzzingLabs/secpipe_ai/stargazers">
<img src="https://img.shields.io/github/stars/FuzzingLabs/secpipe_ai?style=social" alt="GitHub Stars">
<a href="https://github.com/FuzzingLabs/fuzzforge_ai/stargazers">
<img src="https://img.shields.io/github/stars/FuzzingLabs/fuzzforge_ai?style=social" alt="GitHub Stars">
</a>
---
@@ -86,13 +82,13 @@ If you find SecPipe useful, please **star the repo** to support development!
| Feature | Description |
|---------|-------------|
| 🤖 **AI-Native** | Built for MCP works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 🔌 **Hub System** | Connect to MCP tool hubs — each hub brings dozens of containerized security tools |
| 🔍 **Tool Discovery** | Agents discover available tools at runtime with built-in usage guidance |
| 🔗 **Pipelines** | Chain tools from different hubs into automated multi-step workflows |
| 🔄 **Persistent Sessions** | Long-running tools (Radare2, fuzzers) with stateful container sessions |
| 🏠 **Local First** | All execution happens on your machine no cloud required |
| 🔒 **Sandboxed** | Every tool runs in an isolated container via Docker or Podman |
| 🤖 **AI-Native** | Built for MCP - works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 📦 **Containerized** | Each module runs in isolation via Docker or Podman |
| 🔄 **Continuous Mode** | Long-running tasks (fuzzing) with real-time metrics streaming |
| 🔗 **Workflows** | Chain multiple modules together in automated pipelines |
| 🛠️ **Extensible** | Create custom modules with the Python SDK |
| 🏠 **Local First** | All execution happens on your machine - no cloud required |
| 🔒 **Secure** | Sandboxed containers with no network access by default |
---
@@ -105,58 +101,28 @@ If you find SecPipe useful, please **star the repo** to support development!
│ MCP Protocol (stdio)
┌─────────────────────────────────────────────────────────────────┐
SecPipe MCP Server
Projects Hub Discovery Hub Execution
┌────────────── ──────────────────┐ ───────────────────
│ │init_project │ │list_hub_servers │ │execute_hub_tool │ │
│ │set_assets │ │discover_hub_tools│ │start_hub_server │ │
│ │list_results │ │get_tool_schema │ │stop_hub_server │ │
│ └──────────────┘ └──────────────────┘ └───────────────────┘ │
FuzzForge MCP Server │
┌─────────────┐ ┌──────────────┐ ┌────────────────────────┐
│list_modules │ │execute_module│ │start_continuous_module │
───────────── ────────────── ────────────────────────┘
└───────────────────────────┬─────────────────────────────────────┘
Docker/Podman
┌─────────────────────────────────────────────────────────────────┐
MCP Hub Servers
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ Binwalk YARA │ Radare2 │ │ Nmap │
6 tools │ │ 5 tools │ │ 32 tools │ │ 8 tools │ │
└───────────┘ └───────────┘ └───────────┘ └───────────┘ │
┌───────────┐ ┌───────────┐ ───────────┐ ───────────┐
│ Nuclei SQLMap │ │ Trivy │ │ ...
│ 7 tools │ │ 8 tools │ │ 7 tools │ │ 36 hubs │
└───────────┘ └───────────┘ ───────────┘ ───────────┘
└─────────────────────────────────────────────────────────────────┘
FuzzForge Runner
Container Engine (Docker/Podman)
────────────────────────────────────────────────────────────────┘
┌───────────────────┼───────────────────┐
▼ ▼ ▼
┌───────────────┐ ┌───────────────┐ ┌───────────────
Module A │ Module B │ │ Module C
(Container) (Container) │ │ (Container)
└───────────────┘ └───────────────┘ └───────────────
```
---
## 🔧 MCP Security Hub
SecPipe ships with built-in support for the **[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)** — a collection of 36 production-ready, Dockerized MCP servers covering offensive security:
| Category | Servers | Examples |
|----------|---------|----------|
| 🔍 **Reconnaissance** | 8 | Nmap, Masscan, Shodan, WhatWeb |
| 🌐 **Web Security** | 6 | Nuclei, SQLMap, ffuf, Nikto |
| 🔬 **Binary Analysis** | 6 | Radare2, Binwalk, YARA, Capa, Ghidra |
| ⛓️ **Blockchain** | 3 | Medusa, Solazy, DAML Viewer |
| ☁️ **Cloud Security** | 3 | Trivy, Prowler, RoadRecon |
| 💻 **Code Security** | 1 | Semgrep |
| 🔑 **Secrets Detection** | 1 | Gitleaks |
| 💥 **Exploitation** | 1 | SearchSploit |
| 🎯 **Fuzzing** | 2 | Boofuzz, Dharma |
| 🕵️ **OSINT** | 2 | Maigret, DNSTwist |
| 🛡️ **Threat Intel** | 2 | VirusTotal, AlienVault OTX |
| 🏰 **Active Directory** | 1 | BloodHound |
> 185+ individual tools accessible through a single MCP connection.
The hub is open source and can be extended with your own MCP servers. See the [mcp-security-hub repository](https://github.com/FuzzingLabs/mcp-security-hub) for details.
---
## 📦 Installation
### Prerequisites
@@ -169,77 +135,138 @@ The hub is open source and can be extended with your own MCP servers. See the [m
```bash
# Clone the repository
git clone https://github.com/FuzzingLabs/secpipe_ai.git
cd secpipe_ai
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
# Install dependencies
uv sync
# Build module images
make build-modules
```
### Link the Security Hub
```bash
# Clone the MCP Security Hub
git clone https://github.com/FuzzingLabs/mcp-security-hub.git ~/.secpipe/hubs/mcp-security-hub
# Build the Docker images for the hub tools
./scripts/build-hub-images.sh
```
Or use the terminal UI (`uv run secpipe ui`) to link hubs interactively.
### Configure MCP for Your AI Agent
```bash
# For GitHub Copilot
uv run secpipe mcp install copilot
uv run fuzzforge mcp install copilot
# For Claude Code (CLI)
uv run secpipe mcp install claude-code
uv run fuzzforge mcp install claude-code
# For Claude Desktop (standalone app)
uv run secpipe mcp install claude-desktop
uv run fuzzforge mcp install claude-desktop
# Verify installation
uv run secpipe mcp status
uv run fuzzforge mcp status
```
**Restart your editor** and your AI agent will have access to SecPipe tools!
**Restart your editor** and your AI agent will have access to FuzzForge tools!
---
## 🧑‍💻 Usage
## 📦 Modules
Once installed, just talk to your AI agent:
FuzzForge modules are containerized security tools that AI agents can orchestrate. The module ecosystem is designed around a simple principle: **the OSS runtime orchestrates, enterprise modules execute**.
```
"What security tools are available?"
"Scan this firmware image for vulnerabilities"
"Analyze this binary with radare2"
"Run nuclei against https://example.com"
### Module Ecosystem
| | FuzzForge AI | FuzzForge Enterprise Modules |
|---|---|---|
| **What** | Runtime & MCP server | Security research modules |
| **License** | Apache 2.0 | BSL 1.1 (Business Source License) |
| **Compatibility** | ✅ Runs any compatible module | ✅ Works with FuzzForge AI |
**Enterprise modules** are developed separately and provide production-ready security tooling:
| Category | Modules | Description |
|----------|---------|-------------|
| 🔍 **Static Analysis** | Rust Analyzer, Solidity Analyzer, Cairo Analyzer | Code analysis and fuzzable function detection |
| 🎯 **Fuzzing** | Cargo Fuzzer, Honggfuzz, AFL++ | Coverage-guided fuzz testing |
| 💥 **Crash Analysis** | Crash Triager, Root Cause Analyzer | Automated crash deduplication and analysis |
| 🔐 **Vulnerability Detection** | Pattern Matcher, Taint Analyzer | Security vulnerability scanning |
| 📝 **Reporting** | Report Generator, SARIF Exporter | Automated security report generation |
> 💡 **Build your own modules!** The FuzzForge SDK allows you to create custom modules that integrate seamlessly with FuzzForge AI. See [Creating Custom Modules](#-creating-custom-modules).
### Execution Modes
Modules run in two execution modes:
#### One-shot Execution
Run a module once and get results:
```python
result = execute_module("my-analyzer", assets_path="/path/to/project")
```
The agent will use SecPipe to discover the right hub tools, chain them into a pipeline, and return results — all without you touching a terminal.
#### Continuous Execution
See the [Usage Guide](USAGE.md) for detailed setup and advanced workflows.
For long-running tasks like fuzzing, with real-time metrics:
```python
# Start continuous execution
session = start_continuous_module("my-fuzzer",
assets_path="/path/to/project",
configuration={"target": "my_target"})
# Check status with live metrics
status = get_continuous_status(session["session_id"])
# Stop and collect results
stop_continuous_module(session["session_id"])
```
---
## 🛠️ Creating Custom Modules
Build your own security modules with the FuzzForge SDK:
```python
from fuzzforge_modules_sdk import FuzzForgeModule, FuzzForgeModuleResults
class MySecurityModule(FuzzForgeModule):
def _run(self, resources):
self.emit_event("started", target=resources[0].path)
# Your analysis logic here
results = self.analyze(resources)
self.emit_progress(100, status="completed",
message=f"Analysis complete")
return FuzzForgeModuleResults.SUCCESS
```
📖 See the [Module SDK Guide](fuzzforge-modules/fuzzforge-modules-sdk/README.md) for details.
---
## 📁 Project Structure
```
secpipe_ai/
├── secpipe-mcp/ # MCP server — the core of SecPipe
├── secpipe-cli/ # Command-line interface & terminal UI
├── secpipe-common/ # Shared abstractions (containers, storage)
├── secpipe-runner/ # Container execution engine (Docker/Podman)
├── secpipe-tests/ # Integration tests
├── mcp-security-hub/ # Default hub: 36 offensive security MCP servers
── scripts/ # Hub image build scripts
fuzzforge_ai/
├── fuzzforge-cli/ # Command-line interface
├── fuzzforge-common/ # Shared abstractions (containers, storage)
├── fuzzforge-mcp/ # MCP server for AI agents
├── fuzzforge-modules/ # Security modules
│ └── fuzzforge-modules-sdk/ # Module development SDK
├── fuzzforge-runner/ # Local execution engine
── fuzzforge-types/ # Type definitions & schemas
└── demo/ # Demo projects for testing
```
---
## 🗺️ What's Next
**[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub) integration** — Bridge 175+ offensive security tools (Nmap, Nuclei, Ghidra, and more) into FuzzForge workflows, all orchestrated by AI agents.
See [ROADMAP.md](ROADMAP.md) for the full roadmap.
---
## 🤝 Contributing
We welcome contributions from the community!
@@ -247,7 +274,7 @@ We welcome contributions from the community!
- 🐛 Report bugs via [GitHub Issues](../../issues)
- 💡 Suggest features or improvements
- 🔧 Submit pull requests
- 🔌 Add new MCP servers to the [Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)
- 📦 Share your custom modules
See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.
@@ -262,4 +289,4 @@ BSL 1.1 - See [LICENSE](LICENSE) for details.
<p align="center">
<strong>Maintained by <a href="https://fuzzinglabs.com">FuzzingLabs</a></strong>
<br>
</p>
</p>

View File

@@ -1,65 +0,0 @@
# v0.8.0 — MCP Hub Architecture
SecPipe AI v0.8.0 is a major architectural rewrite. The previous module system has been replaced by the **MCP Hub** architecture — SecPipe now acts as a meta-MCP server that connects AI agents to collections of containerized security tools, discovered and orchestrated at runtime.
---
## Highlights
### MCP Hub System
SecPipe no longer ships its own security modules. Instead, it connects to **MCP tool hubs** — registries of Dockerized MCP servers that AI agents can discover, chain, and execute autonomously.
- **Runtime tool discovery** — agents call `list_hub_servers` and `discover_hub_tools` to find available tools
- **Agent context convention** — hub tools provide built-in usage tips, workflow guidance, and domain knowledge so agents can use them without human intervention
- **Category filtering** — servers are organized by category (`binary-analysis`, `web-security`, `reconnaissance`, etc.) for efficient discovery
- **Persistent sessions** — stateful tools like Radare2 run in long-lived containers with `start_hub_server` / `stop_hub_server`
- **Volume mounts** — project assets are automatically mounted into tool containers for seamless file access
- **Continuous mode** — long-running tools (fuzzers) with real-time status via `start_continuous_hub_tool`
### MCP Security Hub Integration
Ships with built-in support for the [MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub) — **36 production-ready MCP servers** covering:
| Category | Servers | Examples |
|----------|---------|----------|
| Reconnaissance | 8 | Nmap, Masscan, Shodan, WhatWeb |
| Web Security | 6 | Nuclei, SQLMap, ffuf, Nikto |
| Binary Analysis | 6 | Radare2, Binwalk, YARA, Capa, Ghidra |
| Blockchain | 3 | Medusa, Solazy, DAML Viewer |
| Cloud Security | 3 | Trivy, Prowler, RoadRecon |
| Code Security | 1 | Semgrep |
| Secrets Detection | 1 | Gitleaks |
| Exploitation | 1 | SearchSploit |
| Fuzzing | 2 | Boofuzz, Dharma |
| OSINT | 2 | Maigret, DNSTwist |
| Threat Intel | 2 | VirusTotal, AlienVault OTX |
| Active Directory | 1 | BloodHound |
> **185+ individual security tools** accessible through a single MCP connection.
### Terminal UI
A new interactive terminal interface (`uv run secpipe ui`) for managing hubs and agents:
- Dashboard with hub status overview
- One-click MCP server installation for GitHub Copilot, Claude Code, and Claude Desktop
- In-UI Docker image building with live log viewer
- Hub linking and registry management
---
## Breaking Changes
- The module system has been removed (`list_modules`, `execute_module`, `start_continuous_module`)
- Replaced by hub tools: `list_hub_servers`, `discover_hub_tools`, `execute_hub_tool`, `start_hub_server`, `stop_hub_server`, etc.
- `make build-modules` replaced by `./scripts/build-hub-images.sh`
---
## Other Changes
- **CI**: GitHub Actions workflows with ruff lint, mypy typecheck, and tests
- **Config**: `SECPIPE_USER_DIR` environment variable to override user-global data directory
- **Storage**: `~/.secpipe` for user-global data, `.secpipe/` in workspace for project storage
- **Docs**: README rewritten for hub-centric architecture

View File

@@ -1,6 +1,6 @@
# SecPipe AI Roadmap
# FuzzForge AI Roadmap
This document outlines the planned features and development direction for SecPipe AI.
This document outlines the planned features and development direction for FuzzForge AI.
---
@@ -10,27 +10,27 @@ This document outlines the planned features and development direction for SecPip
**Status:** 🔄 Planned
Integrate [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) tools into SecPipe, giving AI agents access to 28 MCP servers and 163+ security tools through a unified interface.
Integrate [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) tools into FuzzForge, giving AI agents access to 28 MCP servers and 163+ security tools through a unified interface.
#### How It Works
Unlike native SecPipe modules (built with the SDK), mcp-security-hub tools are **standalone MCP servers**. The integration will bridge these tools so they can be:
Unlike native FuzzForge modules (built with the SDK), mcp-security-hub tools are **standalone MCP servers**. The integration will bridge these tools so they can be:
- Discovered via `list_modules` alongside native modules
- Executed through SecPipe's orchestration layer
- Executed through FuzzForge's orchestration layer
- Chained with native modules in workflows
| Aspect | Native Modules | MCP Hub Tools |
|--------|----------------|---------------|
| **Runtime** | SecPipe SDK container | Standalone MCP server container |
| **Runtime** | FuzzForge SDK container | Standalone MCP server container |
| **Protocol** | Direct execution | MCP-to-MCP bridge |
| **Configuration** | Module config | Tool-specific args |
| **Output** | SecPipe results format | Tool-native format (normalized) |
| **Output** | FuzzForge results format | Tool-native format (normalized) |
#### Goals
- Unified discovery of all available tools (native + hub)
- Orchestrate hub tools through SecPipe's workflow engine
- Orchestrate hub tools through FuzzForge's workflow engine
- Normalize outputs for consistent result handling
- No modification required to mcp-security-hub tools
@@ -65,7 +65,7 @@ AI Agent:
**Status:** 🔄 Planned
A graphical interface to manage SecPipe without the command line.
A graphical interface to manage FuzzForge without the command line.
#### Goals
@@ -115,7 +115,7 @@ Features under consideration for future releases:
Have suggestions for the roadmap?
- Open an issue on [GitHub](https://github.com/FuzzingLabs/secpipe_ai/issues)
- Open an issue on [GitHub](https://github.com/FuzzingLabs/fuzzforge_ai/issues)
- Join our [Discord](https://discord.gg/8XEX33UUwZ)
---

488
USAGE.md
View File

@@ -1,9 +1,8 @@
# SecPipe AI Usage Guide
# FuzzForge AI Usage Guide
This guide covers everything you need to know to get started with SecPipe AI from installation to linking your first MCP hub and running security research workflows with AI.
This guide covers everything you need to know to get started with FuzzForge AI - from installation to running your first security research workflow with AI.
> **SecPipe is designed to be used with AI agents** (GitHub Copilot, Claude, etc.) via MCP.
> A terminal UI (`secpipe ui`) is provided for managing agents and hubs.
> **FuzzForge is designed to be used with AI agents** (GitHub Copilot, Claude, etc.) via MCP.
> The CLI is available for advanced users but the primary experience is through natural language interaction with your AI assistant.
---
@@ -13,21 +12,12 @@ This guide covers everything you need to know to get started with SecPipe AI —
- [Quick Start](#quick-start)
- [Prerequisites](#prerequisites)
- [Installation](#installation)
- [Terminal UI](#terminal-ui)
- [Launching the UI](#launching-the-ui)
- [Dashboard](#dashboard)
- [Agent Setup](#agent-setup)
- [Hub Manager](#hub-manager)
- [MCP Hub System](#mcp-hub-system)
- [What is an MCP Hub?](#what-is-an-mcp-hub)
- [FuzzingLabs Security Hub](#fuzzinglabs-security-hub)
- [Linking a Custom Hub](#linking-a-custom-hub)
- [Building Hub Images](#building-hub-images)
- [MCP Server Configuration (CLI)](#mcp-server-configuration-cli)
- [Building Modules](#building-modules)
- [MCP Server Configuration](#mcp-server-configuration)
- [GitHub Copilot](#github-copilot)
- [Claude Code (CLI)](#claude-code-cli)
- [Claude Desktop](#claude-desktop)
- [Using SecPipe with AI](#using-secpipe-with-ai)
- [Using FuzzForge with AI](#using-fuzzforge-with-ai)
- [CLI Reference](#cli-reference)
- [Environment Variables](#environment-variables)
- [Troubleshooting](#troubleshooting)
@@ -37,57 +27,41 @@ This guide covers everything you need to know to get started with SecPipe AI —
## Quick Start
> **Prerequisites:** You need [uv](https://docs.astral.sh/uv/) and [Docker](https://docs.docker.com/get-docker/) installed.
> See the [Prerequisites](#prerequisites) section for details.
> See the [Prerequisites](#prerequisites) section for installation instructions.
```bash
# 1. Clone and install
git clone https://github.com/FuzzingLabs/secpipe_ai.git
cd secpipe_ai
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
uv sync
# 2. Launch the terminal UI
uv run secpipe ui
# 2. Build the module images (one-time setup)
make build-modules
# 3. Press 'h' → "FuzzingLabs Hub" to clone & link the default security hub
# 4. Select an agent row and press Enter to install the MCP server for your agent
# 5. Build the Docker images for the hub tools (required before tools can run)
./scripts/build-hub-images.sh
# 6. Restart your AI agent and start talking:
# "What security tools are available?"
# "Scan this binary with binwalk and yara"
# "Analyze this Rust crate for fuzzable functions"
```
Or do it entirely from the command line:
```bash
# Install MCP for your AI agent
uv run secpipe mcp install copilot # For VS Code + GitHub Copilot
# 3. Install MCP for your AI agent
uv run fuzzforge mcp install copilot # For VS Code + GitHub Copilot
# OR
uv run secpipe mcp install claude-code # For Claude Code CLI
uv run fuzzforge mcp install claude-code # For Claude Code CLI
# Clone and link the default security hub
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.secpipe/hubs/mcp-security-hub
# 4. Restart your AI agent (VS Code, Claude, etc.)
# Build hub tool images (required — tools only run once their image is built)
./scripts/build-hub-images.sh
# Restart your AI agent — done!
# 5. Start talking to your AI:
# "List available FuzzForge modules"
# "Analyze this Rust crate for fuzzable functions"
# "Start fuzzing the parse_input function"
```
> **Note:** SecPipe uses Docker by default. Podman is also supported via `--engine podman`.
> **Note:** FuzzForge uses Docker by default. Podman is also supported via `--engine podman`.
---
## Prerequisites
Before installing SecPipe AI, ensure you have:
Before installing FuzzForge AI, ensure you have:
- **Python 3.12+** [Download Python](https://www.python.org/downloads/)
- **uv** package manager [Install uv](https://docs.astral.sh/uv/)
- **Docker** Container runtime ([Install Docker](https://docs.docker.com/get-docker/))
- **Git** — For cloning hub repositories
- **Python 3.12+** - [Download Python](https://www.python.org/downloads/)
- **uv** package manager - [Install uv](https://docs.astral.sh/uv/)
- **Docker** - Container runtime ([Install Docker](https://docs.docker.com/get-docker/))
### Installing uv
@@ -112,7 +86,7 @@ sudo usermod -aG docker $USER
```
> **Note:** Podman is also supported. Use `--engine podman` with CLI commands
> or set `SECPIPE_ENGINE=podman` environment variable.
> or set `FUZZFORGE_ENGINE=podman` environment variable.
---
@@ -121,8 +95,8 @@ sudo usermod -aG docker $USER
### 1. Clone the Repository
```bash
git clone https://github.com/FuzzingLabs/secpipe_ai.git
cd secpipe_ai
git clone https://github.com/FuzzingLabs/fuzzforge_ai.git
cd fuzzforge_ai
```
### 2. Install Dependencies
@@ -131,296 +105,254 @@ cd secpipe_ai
uv sync
```
This installs all SecPipe components in a virtual environment.
This installs all FuzzForge components in a virtual environment.
### 3. Verify Installation
```bash
uv run secpipe --help
uv run fuzzforge --help
```
---
## Terminal UI
## Building Modules
SecPipe ships with a terminal user interface (TUI) built on [Textual](https://textual.textualize.io/) for managing AI agents and MCP hub servers from a single dashboard.
FuzzForge modules are containerized security tools. After cloning, you need to build them once:
### Launching the UI
### Build All Modules
```bash
uv run secpipe ui
# From the fuzzforge_ai directory
make build-modules
```
### Dashboard
This builds all available modules:
- `fuzzforge-rust-analyzer` - Analyzes Rust code for fuzzable functions
- `fuzzforge-cargo-fuzzer` - Runs cargo-fuzz on Rust crates
- `fuzzforge-harness-validator` - Validates generated fuzzing harnesses
- `fuzzforge-crash-analyzer` - Analyzes crash inputs
The main screen is split into two panels:
### Build a Single Module
| Panel | Content |
|-------|---------|
| **AI Agents** (left) | Shows GitHub Copilot, Claude Desktop, and Claude Code with live link status and config file path |
| **Hub Servers** (right) | Shows all configured MCP hub tools with Docker image name, source hub, and build status (✓ Ready / ✗ Not built) |
```bash
# Build a specific module
cd fuzzforge-modules/rust-analyzer
make build
```
### Keyboard Shortcuts
### Verify Modules are Built
| Key | Action |
|-----|--------|
| `Enter` | **Select** — Act on the selected row (setup/unlink an agent) |
| `h` | **Hub Manager** — Open the hub management screen |
| `r` | **Refresh** — Re-check all agent and hub statuses |
| `q` | **Quit** |
```bash
# List built module images
docker images | grep fuzzforge
```
### Agent Setup
Select an agent row in the AI Agents table and press `Enter`:
- **If the agent is not linked** → a setup dialog opens asking for your container engine (Docker or Podman), then installs the SecPipe MCP configuration
- **If the agent is already linked** → a confirmation dialog offers to unlink it (removes the `secpipe` entry without touching other MCP servers)
The setup auto-detects:
- SecPipe installation root
- Docker/Podman socket path
- Hub configuration from `hub-config.json`
### Hub Manager
Press `h` to open the hub manager. This is where you manage your MCP hub repositories:
| Button | Action |
|--------|--------|
| **FuzzingLabs Hub** | One-click clone of the official [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) repository — clones to `~/.secpipe/hubs/mcp-security-hub`, scans for tools, and registers them in `hub-config.json` |
| **Link Path** | Link any local directory as a hub — enter a name and path, SecPipe scans it for `category/tool-name/Dockerfile` patterns |
| **Clone URL** | Clone any git repository and link it as a hub |
| **Remove** | Unlink the selected hub and remove its servers from the configuration |
The hub table shows:
- **Name** — Hub name (★ prefix for the default hub)
- **Path** — Local directory path
- **Servers** — Number of MCP tools discovered
- **Source** — Git URL or "local"
You should see something like:
```
fuzzforge-rust-analyzer 0.1.0 abc123def456 2 minutes ago 850 MB
fuzzforge-cargo-fuzzer 0.1.0 789ghi012jkl 2 minutes ago 1.2 GB
...
```
---
## MCP Hub System
## MCP Server Configuration
### What is an MCP Hub?
An MCP hub is a directory containing one or more containerized MCP tools, organized by category:
```
my-hub/
├── category-a/
│ ├── tool-1/
│ │ └── Dockerfile
│ └── tool-2/
│ └── Dockerfile
├── category-b/
│ └── tool-3/
│ └── Dockerfile
└── ...
```
SecPipe scans for the pattern `category/tool-name/Dockerfile` and auto-generates server configuration entries for each discovered tool.
### FuzzingLabs Security Hub
The default MCP hub is [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub), maintained by FuzzingLabs. It includes **40+ security tools** across categories:
| Category | Tools |
|----------|-------|
| **Reconnaissance** | nmap, masscan, shodan, zoomeye, whatweb, pd-tools, externalattacker, networksdb |
| **Binary Analysis** | binwalk, yara, capa, radare2, ghidra, ida |
| **Code Security** | semgrep, rust-analyzer, harness-tester, cargo-fuzzer, crash-analyzer |
| **Web Security** | nuclei, nikto, sqlmap, ffuf, burp, waybackurls |
| **Fuzzing** | boofuzz, dharma |
| **Exploitation** | searchsploit |
| **Secrets** | gitleaks |
| **Cloud Security** | trivy, prowler, roadrecon |
| **OSINT** | maigret, dnstwist |
| **Threat Intel** | virustotal, otx |
| **Password Cracking** | hashcat |
| **Blockchain** | medusa, solazy, daml-viewer |
**Clone it via the UI:**
1. `uv run secpipe ui`
2. Press `h` → click **FuzzingLabs Hub**
3. Wait for the clone to finish — servers are auto-registered
**Or clone manually:**
```bash
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.secpipe/hubs/mcp-security-hub
```
### Linking a Custom Hub
You can link any directory that follows the `category/tool-name/Dockerfile` layout:
**Via the UI:**
1. Press `h`**Link Path**
2. Enter a name and the directory path
**Via the CLI (planned):** Not yet available — use the UI.
### Building Hub Images
After linking a hub, you need to build the Docker images before the tools can be used:
```bash
# Build all images from the default security hub
./scripts/build-hub-images.sh
# Or build a single tool image
docker build -t semgrep-mcp:latest mcp-security-hub/code-security/semgrep-mcp/
```
The dashboard hub table shows ✓ Ready for built images and ✗ Not built for missing ones.
---
## MCP Server Configuration (CLI)
If you prefer the command line over the TUI, you can configure agents directly:
FuzzForge integrates with AI agents through the Model Context Protocol (MCP). Configure your preferred AI agent to use FuzzForge tools.
### GitHub Copilot
```bash
uv run secpipe mcp install copilot
# That's it! Just run this command:
uv run fuzzforge mcp install copilot
```
The command auto-detects:
- **SecPipe root** Where SecPipe is installed
- **Docker socket** — Auto-detects `/var/run/docker.sock`
The command auto-detects everything:
- **FuzzForge root** - Where FuzzForge is installed
- **Modules path** - Defaults to `fuzzforge_ai/fuzzforge-modules`
- **Docker socket** - Auto-detects `/var/run/docker.sock`
**Optional overrides:**
**Optional overrides** (usually not needed):
```bash
uv run secpipe mcp install copilot --engine podman
uv run fuzzforge mcp install copilot \
--modules /path/to/modules \
--engine podman # if using Podman instead of Docker
```
**After installation:** Restart VS Code. SecPipe tools appear in GitHub Copilot Chat.
**After installation:**
1. Restart VS Code
2. Open GitHub Copilot Chat
3. FuzzForge tools are now available!
### Claude Code (CLI)
```bash
uv run secpipe mcp install claude-code
uv run fuzzforge mcp install claude-code
```
Installs to `~/.claude.json`. SecPipe tools are available from any directory after restarting Claude.
Installs to `~/.claude.json` so FuzzForge tools are available from any directory.
**After installation:**
1. Run `claude` from any directory
2. FuzzForge tools are now available!
### Claude Desktop
```bash
uv run secpipe mcp install claude-desktop
# Automatic installation
uv run fuzzforge mcp install claude-desktop
# Verify
uv run fuzzforge mcp status
```
**After installation:** Restart Claude Desktop.
**After installation:**
1. Restart Claude Desktop
2. FuzzForge tools are now available!
### Check Status
### Check MCP Status
```bash
uv run secpipe mcp status
uv run fuzzforge mcp status
```
### Remove Configuration
Shows configuration status for all supported AI agents:
```
┏━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Agent ┃ Config Path ┃ Status ┃ FuzzForge Configured ┃
┡━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ GitHub Copilot │ ~/.config/Code/User/mcp.json │ ✓ Exists │ ✓ Yes │
│ Claude Desktop │ ~/.config/Claude/claude_desktop_config... │ Not found │ - │
│ Claude Code │ ~/.claude.json │ ✓ Exists │ ✓ Yes │
└──────────────────────┴───────────────────────────────────────────┴──────────────┴─────────────────────────┘
```
### Generate Config Without Installing
```bash
uv run secpipe mcp uninstall copilot
uv run secpipe mcp uninstall claude-code
uv run secpipe mcp uninstall claude-desktop
# Preview the configuration that would be installed
uv run fuzzforge mcp generate copilot
uv run fuzzforge mcp generate claude-desktop
uv run fuzzforge mcp generate claude-code
```
### Remove MCP Configuration
```bash
uv run fuzzforge mcp uninstall copilot
uv run fuzzforge mcp uninstall claude-desktop
uv run fuzzforge mcp uninstall claude-code
```
---
## Using SecPipe with AI
## Using FuzzForge with AI
Once MCP is configured and hub images are built, interact with SecPipe through natural language with your AI assistant.
Once MCP is configured, you interact with FuzzForge through natural language with your AI assistant.
### Example Conversations
**Discover available tools:**
```
You: "What security tools are available in SecPipe?"
AI: Queries hub tools → "I found 15 tools across categories: nmap for
port scanning, binwalk for firmware analysis, semgrep for code
scanning, cargo-fuzzer for Rust fuzzing..."
You: "What FuzzForge modules are available?"
AI: Uses list_modules → "I found 4 modules: rust-analyzer, cargo-fuzzer,
harness-validator, and crash-analyzer..."
```
**Analyze a binary:**
```
You: "Extract and analyze this firmware image"
AI: Uses binwalk to extract → yara for pattern matching → capa for
capability detection → "Found 3 embedded filesystems, 2 YARA
matches for known vulnerabilities..."
```
**Fuzz Rust code:**
**Analyze code for fuzzing targets:**
```
You: "Analyze this Rust crate for functions I should fuzz"
AI: Uses rust-analyzer → "Found 3 fuzzable entry points..."
AI: Uses execute_module("rust-analyzer") → "I found 3 good fuzzing candidates:
- parse_input() in src/parser.rs - handles untrusted input
- decode_message() in src/codec.rs - complex parsing logic
..."
```
**Generate and validate harnesses:**
```
You: "Generate a fuzzing harness for the parse_input function"
AI: Creates harness code, then uses execute_module("harness-validator")
→ "Here's a harness that compiles successfully..."
```
**Run continuous fuzzing:**
```
You: "Start fuzzing parse_input for 10 minutes"
AI: Uses cargo-fuzzer → "Fuzzing session started. 2 crashes found..."
AI: Uses start_continuous_module("cargo-fuzzer") → "Started fuzzing session abc123"
You: "How's the fuzzing going?"
AI: Uses get_continuous_status("abc123") → "Running for 5 minutes:
- 150,000 executions
- 2 crashes found
- 45% edge coverage"
You: "Stop and show me the crashes"
AI: Uses stop_continuous_module("abc123") → "Found 2 unique crashes..."
```
**Scan for vulnerabilities:**
```
You: "Scan this codebase with semgrep for security issues"
AI: Uses semgrep-mcp → "Found 5 findings: 2 high severity SQL injection
patterns, 3 medium severity hardcoded secrets..."
```
### Available MCP Tools
| Tool | Description |
|------|-------------|
| `list_modules` | List all available security modules |
| `execute_module` | Run a module once and get results |
| `start_continuous_module` | Start a long-running module (e.g., fuzzing) |
| `get_continuous_status` | Check status of a continuous session |
| `stop_continuous_module` | Stop a continuous session |
| `list_continuous_sessions` | List all active sessions |
| `get_execution_results` | Retrieve results from an execution |
| `execute_workflow` | Run a multi-step workflow |
---
## CLI Reference
### UI Command
```bash
uv run secpipe ui # Launch the terminal dashboard
```
> **Note:** The CLI is for advanced users. Most users should interact with FuzzForge through their AI assistant.
### MCP Commands
```bash
uv run secpipe mcp status # Check agent configuration status
uv run secpipe mcp install <agent> # Install MCP config (copilot|claude-code|claude-desktop)
uv run secpipe mcp uninstall <agent> # Remove MCP config
uv run secpipe mcp generate <agent> # Preview config without installing
uv run fuzzforge mcp status # Check configuration status
uv run fuzzforge mcp install <agent> # Install MCP config
uv run fuzzforge mcp uninstall <agent> # Remove MCP config
uv run fuzzforge mcp generate <agent> # Preview config without installing
```
### Module Commands
```bash
uv run fuzzforge modules list # List available modules
uv run fuzzforge modules info <module> # Show module details
uv run fuzzforge modules run <module> --assets . # Run a module
```
### Project Commands
```bash
uv run secpipe project init # Initialize a project
uv run secpipe project info # Show project info
uv run secpipe project executions # List executions
uv run secpipe project results <id> # Get execution results
uv run fuzzforge project init # Initialize a project
uv run fuzzforge project info # Show project info
uv run fuzzforge project executions # List executions
uv run fuzzforge project results <id> # Get execution results
```
---
## Environment Variables
Configure SecPipe using environment variables:
Configure FuzzForge using environment variables:
```bash
# Override the SecPipe installation root (auto-detected from cwd by default)
export SECPIPE_ROOT=/path/to/secpipe_ai
# Override the user-global data directory (default: ~/.secpipe)
# Useful for isolated testing without touching your real installation
export SECPIPE_USER_DIR=/tmp/my-secpipe-test
# Storage path for projects and execution results (default: <workspace>/.secpipe/storage)
export SECPIPE_STORAGE__PATH=/path/to/storage
# Project paths
export FUZZFORGE_MODULES_PATH=/path/to/modules
export FUZZFORGE_STORAGE_PATH=/path/to/storage
# Container engine (Docker is default)
export SECPIPE_ENGINE__TYPE=docker # or podman
export FUZZFORGE_ENGINE__TYPE=docker # or podman
# Podman-specific container storage paths
export SECPIPE_ENGINE__GRAPHROOT=~/.secpipe/containers/storage
export SECPIPE_ENGINE__RUNROOT=~/.secpipe/containers/run
# Podman-specific settings (only needed if using Podman under Snap)
export FUZZFORGE_ENGINE__GRAPHROOT=~/.fuzzforge/containers/storage
export FUZZFORGE_ENGINE__RUNROOT=~/.fuzzforge/containers/run
```
---
@@ -452,62 +384,66 @@ Error: Permission denied connecting to Docker socket
**Solution:**
```bash
# Add your user to the docker group
sudo usermod -aG docker $USER
# Log out and back in, then verify:
# Log out and back in for changes to take effect
# Then verify:
docker run --rm hello-world
```
### Hub Images Not Built
### No Modules Found
The dashboard shows ✗ Not built for tools:
```bash
# Build all hub images
./scripts/build-hub-images.sh
# Or build a single tool
docker build -t <tool-name>:latest mcp-security-hub/<category>/<tool-name>/
```
No modules found.
```
**Solution:**
1. Build the modules first: `make build-modules`
2. Check the modules path: `uv run fuzzforge modules list`
3. Verify images exist: `docker images | grep fuzzforge`
### MCP Server Not Starting
Check the MCP configuration:
```bash
# Check agent configuration
uv run secpipe mcp status
uv run fuzzforge mcp status
```
# Verify the config file path exists and contains valid JSON
cat ~/.config/Code/User/mcp.json # Copilot
cat ~/.claude.json # Claude Code
Verify the configuration file path exists and contains valid JSON.
### Module Container Fails to Build
```bash
# Build module container manually to see errors
cd fuzzforge-modules/<module-name>
docker build -t <module-name> .
```
### Using Podman Instead of Docker
If you prefer Podman:
```bash
# Install with Podman engine
uv run secpipe mcp install copilot --engine podman
# Use --engine podman with CLI
uv run fuzzforge mcp install copilot --engine podman
# Or set environment variable
export SECPIPE_ENGINE=podman
export FUZZFORGE_ENGINE=podman
```
### Hub Registry
SecPipe stores linked hub information in `~/.secpipe/hubs.json`. If something goes wrong:
### Check Logs
FuzzForge stores execution logs in the storage directory:
```bash
# View registry
cat ~/.secpipe/hubs.json
# Reset registry
rm ~/.secpipe/hubs.json
ls -la ~/.fuzzforge/storage/<project-id>/<execution-id>/
```
---
## Next Steps
- 🖥️ Launch `uv run secpipe ui` and explore the dashboard
- 🔒 Clone the [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) for 40+ security tools
- 📖 Read the [Module SDK Guide](fuzzforge-modules/fuzzforge-modules-sdk/README.md) to create custom modules
- 🎬 Check the demos in the [README](README.md)
- 💬 Join our [Discord](https://discord.gg/8XEX33UUwZ) for support
---

BIN
assets/demopart1.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 360 KiB

BIN
assets/demopart2.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.1 MiB

3
fuzzforge-cli/README.md Normal file
View File

@@ -0,0 +1,3 @@
# FuzzForge CLI
...

View File

@@ -1,14 +1,13 @@
[project]
name = "secpipe-cli"
name = "fuzzforge-cli"
version = "0.0.1"
description = "SecPipe CLI - Command-line interface for SecPipe AI."
description = "FuzzForge CLI - Command-line interface for FuzzForge AI."
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"secpipe-mcp==0.0.1",
"fuzzforge-runner==0.0.1",
"rich>=14.0.0",
"textual>=1.0.0",
"typer==0.20.1",
]
@@ -23,7 +22,7 @@ tests = [
]
[project.scripts]
secpipe = "secpipe_cli.__main__:main"
fuzzforge = "fuzzforge_cli.__main__:main"
[tool.uv.sources]
secpipe-mcp = { workspace = true }
fuzzforge-runner = { workspace = true }

15
fuzzforge-cli/ruff.toml Normal file
View File

@@ -0,0 +1,15 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,6 +1,6 @@
"""TODO."""
from secpipe_cli.application import application
from fuzzforge_cli.application import application
def main() -> None:

View File

@@ -0,0 +1,96 @@
"""FuzzForge CLI application."""
from pathlib import Path
from typing import Annotated
from fuzzforge_runner import Runner, Settings
from typer import Context as TyperContext
from typer import Option, Typer
from fuzzforge_cli.commands import mcp, modules, projects
from fuzzforge_cli.context import Context
application: Typer = Typer(
name="fuzzforge",
help="FuzzForge AI - Security research orchestration platform.",
)
@application.callback()
def main(
project_path: Annotated[
Path,
Option(
"--project",
"-p",
envvar="FUZZFORGE_PROJECT__DEFAULT_PATH",
help="Path to the FuzzForge project directory.",
),
] = Path.cwd(),
modules_path: Annotated[
Path,
Option(
"--modules",
"-m",
envvar="FUZZFORGE_MODULES_PATH",
help="Path to the modules directory.",
),
] = Path.home() / ".fuzzforge" / "modules",
storage_path: Annotated[
Path,
Option(
"--storage",
envvar="FUZZFORGE_STORAGE__PATH",
help="Path to the storage directory.",
),
] = Path.home() / ".fuzzforge" / "storage",
engine_type: Annotated[
str,
Option(
"--engine",
envvar="FUZZFORGE_ENGINE__TYPE",
help="Container engine type (docker or podman).",
),
] = "docker",
engine_socket: Annotated[
str,
Option(
"--socket",
envvar="FUZZFORGE_ENGINE__SOCKET",
help="Container engine socket path.",
),
] = "",
context: TyperContext = None, # type: ignore[assignment]
) -> None:
"""FuzzForge AI - Security research orchestration platform.
Execute security research modules in isolated containers.
"""
from fuzzforge_runner.settings import EngineSettings, ProjectSettings, StorageSettings
settings = Settings(
engine=EngineSettings(
type=engine_type, # type: ignore[arg-type]
socket=engine_socket,
),
storage=StorageSettings(
path=storage_path,
),
project=ProjectSettings(
default_path=project_path,
modules_path=modules_path,
),
)
runner = Runner(settings)
context.obj = Context(
runner=runner,
project_path=project_path,
)
application.add_typer(mcp.application)
application.add_typer(modules.application)
application.add_typer(projects.application)

View File

@@ -1,4 +1,4 @@
"""MCP server configuration commands for SecPipe CLI.
"""MCP server configuration commands for FuzzForge CLI.
This module provides commands for setting up MCP server connections
with various AI agents (VS Code Copilot, Claude Code, etc.).
@@ -12,7 +12,7 @@ import os
import sys
from enum import StrEnum
from pathlib import Path
from typing import Annotated, Any
from typing import Annotated
from rich.console import Console
from rich.panel import Panel
@@ -44,10 +44,10 @@ def _get_copilot_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Code" / "User" / "mcp.json"
if sys.platform == "win32":
elif sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "mcp.json"
# Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
else: # Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
def _get_claude_desktop_mcp_path() -> Path:
@@ -58,10 +58,10 @@ def _get_claude_desktop_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json"
if sys.platform == "win32":
elif sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Claude" / "claude_desktop_config.json"
# Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
else: # Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
def _get_claude_code_mcp_path(project_path: Path | None = None) -> Path:
@@ -114,86 +114,79 @@ def _detect_docker_socket() -> str:
:returns: Path to the Docker socket.
"""
socket_paths: list[Path] = [
Path("/var/run/docker.sock"),
socket_paths = [
"/var/run/docker.sock",
Path.home() / ".docker" / "run" / "docker.sock",
]
for path in socket_paths:
if path.exists():
if Path(path).exists():
return str(path)
return "/var/run/docker.sock"
def _find_secpipe_root() -> Path:
"""Find the SecPipe installation root.
def _find_fuzzforge_root() -> Path:
"""Find the FuzzForge installation root.
:returns: Path to secpipe-oss directory.
:returns: Path to fuzzforge-oss directory.
"""
# Check environment variable override first
env_root = os.environ.get("SECPIPE_ROOT")
if env_root:
return Path(env_root).resolve()
# Walk up from cwd to find a secpipe root (hub-config.json is the marker)
for parent in [Path.cwd(), *Path.cwd().parents]:
if (parent / "hub-config.json").is_file():
return parent
# Fall back to __file__-based search (dev install inside secpipe-oss)
# Try to find from current file location
current = Path(__file__).resolve()
# Walk up to find fuzzforge-oss root
for parent in current.parents:
if (parent / "secpipe-mcp").is_dir():
if (parent / "fuzzforge-mcp").is_dir() and (parent / "fuzzforge-runner").is_dir():
return parent
# Fall back to cwd
return Path.cwd()
def _generate_mcp_config(
secpipe_root: Path,
fuzzforge_root: Path,
modules_path: Path,
engine_type: str,
engine_socket: str,
) -> dict[str, Any]:
) -> dict:
"""Generate MCP server configuration.
:param secpipe_root: Path to secpipe-oss installation.
:param fuzzforge_root: Path to fuzzforge-oss installation.
:param modules_path: Path to the modules directory.
:param engine_type: Container engine type (podman or docker).
:param engine_socket: Container engine socket path.
:returns: MCP configuration dictionary.
"""
venv_python = secpipe_root / ".venv" / "bin" / "python"
venv_python = fuzzforge_root / ".venv" / "bin" / "python"
# Use uv run if no venv, otherwise use venv python directly
if venv_python.exists():
command = str(venv_python)
args = ["-m", "secpipe_mcp"]
args = ["-m", "fuzzforge_mcp"]
else:
command = "uv"
args = ["--directory", str(secpipe_root), "run", "secpipe-mcp"]
args = ["--directory", str(fuzzforge_root), "run", "fuzzforge-mcp"]
# User-global storage paths for SecPipe containers.
# Kept under ~/.secpipe so images are built once and shared across
# all workspaces — regardless of where `secpipe mcp install` is run.
# Override with SECPIPE_USER_DIR for isolated testing.
user_dir_env = os.environ.get("SECPIPE_USER_DIR")
secpipe_home = Path(user_dir_env).resolve() if user_dir_env else Path.home() / ".secpipe"
graphroot = secpipe_home / "containers" / "storage"
runroot = secpipe_home / "containers" / "run"
# Self-contained storage paths for FuzzForge containers
# This isolates FuzzForge from system Podman and avoids snap issues
fuzzforge_home = Path.home() / ".fuzzforge"
graphroot = fuzzforge_home / "containers" / "storage"
runroot = fuzzforge_home / "containers" / "run"
return {
"type": "stdio",
"command": command,
"args": args,
"cwd": str(secpipe_root),
"cwd": str(fuzzforge_root),
"env": {
"SECPIPE_ENGINE__TYPE": engine_type,
"SECPIPE_ENGINE__GRAPHROOT": str(graphroot),
"SECPIPE_ENGINE__RUNROOT": str(runroot),
"SECPIPE_HUB__ENABLED": "true",
"SECPIPE_HUB__CONFIG_PATH": str(secpipe_root / "hub-config.json"),
"FUZZFORGE_MODULES_PATH": str(modules_path),
"FUZZFORGE_ENGINE__TYPE": engine_type,
"FUZZFORGE_ENGINE__GRAPHROOT": str(graphroot),
"FUZZFORGE_ENGINE__RUNROOT": str(runroot),
"FUZZFORGE_HUB__ENABLED": "true",
"FUZZFORGE_HUB__CONFIG_PATH": str(fuzzforge_root / "hub-config.json"),
},
}
@@ -214,9 +207,9 @@ def status(context: Context) -> None:
table.add_column("Agent", style="cyan")
table.add_column("Config Path")
table.add_column("Status")
table.add_column("SecPipe Configured")
table.add_column("FuzzForge Configured")
secpipe_root = _find_secpipe_root()
fuzzforge_root = _find_fuzzforge_root()
agents = [
("GitHub Copilot", _get_copilot_mcp_path(), "servers"),
@@ -229,12 +222,12 @@ def status(context: Context) -> None:
try:
config = json.loads(config_path.read_text())
servers = config.get(servers_key, {})
has_secpipe = "secpipe" in servers
has_fuzzforge = "fuzzforge" in servers
table.add_row(
name,
str(config_path),
"[green]✓ Exists[/green]",
"[green]✓ Yes[/green]" if has_secpipe else "[yellow]✗ No[/yellow]",
"[green]✓ Yes[/green]" if has_fuzzforge else "[yellow]✗ No[/yellow]",
)
except json.JSONDecodeError:
table.add_row(
@@ -256,7 +249,7 @@ def status(context: Context) -> None:
# Show detected environment
console.print()
console.print("[bold]Detected Environment:[/bold]")
console.print(f" SecPipe Root: {_find_secpipe_root()}")
console.print(f" FuzzForge Root: {_find_fuzzforge_root()}")
console.print(f" Podman Socket: {_detect_podman_socket()}")
console.print(f" Docker Socket: {_detect_docker_socket()}")
@@ -273,6 +266,14 @@ def generate(
help="AI agent to generate config for (copilot, claude-desktop, or claude-code).",
),
],
modules_path: Annotated[
Path | None,
Option(
"--modules",
"-m",
help="Path to the modules directory.",
),
] = None,
engine: Annotated[
str,
Option(
@@ -286,11 +287,15 @@ def generate(
:param context: Typer context.
:param agent: Target AI agent.
:param modules_path: Override modules path.
:param engine: Container engine type.
"""
console = Console()
secpipe_root = _find_secpipe_root()
fuzzforge_root = _find_fuzzforge_root()
# Use defaults if not specified
resolved_modules = modules_path or (fuzzforge_root / "fuzzforge-modules")
# Detect socket
if engine == "podman":
@@ -300,16 +305,17 @@ def generate(
# Generate config
server_config = _generate_mcp_config(
secpipe_root=secpipe_root,
fuzzforge_root=fuzzforge_root,
modules_path=resolved_modules,
engine_type=engine,
engine_socket=socket,
)
# Format based on agent
if agent == AIAgent.COPILOT:
full_config = {"servers": {"secpipe": server_config}}
full_config = {"servers": {"fuzzforge": server_config}}
else: # Claude Desktop or Claude Code
full_config = {"mcpServers": {"secpipe": server_config}}
full_config = {"mcpServers": {"fuzzforge": server_config}}
config_json = json.dumps(full_config, indent=4)
@@ -322,14 +328,14 @@ def generate(
if agent == AIAgent.COPILOT:
config_path = _get_copilot_mcp_path()
elif agent == AIAgent.CLAUDE_CODE:
config_path = _get_claude_code_mcp_path(secpipe_root)
config_path = _get_claude_code_mcp_path(fuzzforge_root)
else: # Claude Desktop
config_path = _get_claude_desktop_mcp_path()
console.print()
console.print(f"[bold]Save to:[/bold] {config_path}")
console.print()
console.print("[dim]Or run 'secpipe mcp install' to install automatically.[/dim]")
console.print("[dim]Or run 'fuzzforge mcp install' to install automatically.[/dim]")
@application.command(
@@ -344,6 +350,14 @@ def install(
help="AI agent to install config for (copilot, claude-desktop, or claude-code).",
),
],
modules_path: Annotated[
Path | None,
Option(
"--modules",
"-m",
help="Path to the modules directory.",
),
] = None,
engine: Annotated[
str,
Option(
@@ -357,23 +371,24 @@ def install(
Option(
"--force",
"-f",
help="Overwrite existing secpipe configuration.",
help="Overwrite existing fuzzforge configuration.",
),
] = False,
) -> None:
"""Install MCP configuration for the specified AI agent.
This will create or update the MCP configuration file, adding the
secpipe server configuration.
fuzzforge server configuration.
:param context: Typer context.
:param agent: Target AI agent.
:param modules_path: Override modules path.
:param engine: Container engine type.
:param force: Overwrite existing configuration.
"""
console = Console()
secpipe_root = _find_secpipe_root()
fuzzforge_root = _find_fuzzforge_root()
# Determine config path
if agent == AIAgent.COPILOT:
@@ -386,6 +401,9 @@ def install(
config_path = _get_claude_desktop_mcp_path()
servers_key = "mcpServers"
# Use defaults if not specified
resolved_modules = modules_path or (fuzzforge_root / "fuzzforge-modules")
# Detect socket
if engine == "podman":
socket = _detect_podman_socket()
@@ -394,7 +412,8 @@ def install(
# Generate server config
server_config = _generate_mcp_config(
secpipe_root=secpipe_root,
fuzzforge_root=fuzzforge_root,
modules_path=resolved_modules,
engine_type=engine,
engine_socket=socket,
)
@@ -408,47 +427,48 @@ def install(
console.print("[dim]Please fix the file manually or delete it.[/dim]")
raise SystemExit(1)
# Check if secpipe already exists
# Check if fuzzforge already exists
servers = existing_config.get(servers_key, {})
if "secpipe" in servers and not force:
console.print("[yellow]SecPipe is already configured.[/yellow]")
if "fuzzforge" in servers and not force:
console.print("[yellow]FuzzForge is already configured.[/yellow]")
console.print("[dim]Use --force to overwrite existing configuration.[/dim]")
raise SystemExit(1)
# Add/update secpipe
# Add/update fuzzforge
if servers_key not in existing_config:
existing_config[servers_key] = {}
existing_config[servers_key]["secpipe"] = server_config
existing_config[servers_key]["fuzzforge"] = server_config
full_config = existing_config
else:
# Create new config
config_path.parent.mkdir(parents=True, exist_ok=True)
full_config = {servers_key: {"secpipe": server_config}}
full_config = {servers_key: {"fuzzforge": server_config}}
# Write config
config_path.write_text(json.dumps(full_config, indent=4))
console.print(f"[green]✓ Installed SecPipe MCP configuration for {agent.value}[/green]")
console.print(f"[green]✓ Installed FuzzForge MCP configuration for {agent.value}[/green]")
console.print()
console.print(f"[bold]Configuration file:[/bold] {config_path}")
console.print()
console.print("[bold]Settings:[/bold]")
console.print(f" Modules Path: {resolved_modules}")
console.print(f" Engine: {engine}")
console.print(f" Socket: {socket}")
console.print(f" Hub Config: {secpipe_root / 'hub-config.json'}")
console.print(f" Hub Config: {fuzzforge_root / 'hub-config.json'}")
console.print()
console.print("[bold]Next steps:[/bold]")
if agent == AIAgent.COPILOT:
console.print(" 1. Restart VS Code")
console.print(" 2. Open Copilot Chat and look for SecPipe tools")
console.print(" 2. Open Copilot Chat and look for FuzzForge tools")
elif agent == AIAgent.CLAUDE_CODE:
console.print(" 1. Run 'claude' from any directory")
console.print(" 2. SecPipe tools will be available")
console.print(" 2. FuzzForge tools will be available")
else: # Claude Desktop
console.print(" 1. Restart Claude Desktop")
console.print(" 2. The secpipe MCP server will be available")
console.print(" 2. The fuzzforge MCP server will be available")
@application.command(
@@ -464,14 +484,14 @@ def uninstall(
),
],
) -> None:
"""Remove SecPipe MCP configuration from the specified AI agent.
"""Remove FuzzForge MCP configuration from the specified AI agent.
:param context: Typer context.
:param agent: Target AI agent.
"""
console = Console()
secpipe_root = _find_secpipe_root()
fuzzforge_root = _find_fuzzforge_root()
# Determine config path
if agent == AIAgent.COPILOT:
@@ -495,16 +515,16 @@ def uninstall(
raise SystemExit(1)
servers = config.get(servers_key, {})
if "secpipe" not in servers:
console.print("[yellow]SecPipe is not configured.[/yellow]")
if "fuzzforge" not in servers:
console.print("[yellow]FuzzForge is not configured.[/yellow]")
return
# Remove secpipe
del servers["secpipe"]
# Remove fuzzforge
del servers["fuzzforge"]
# Write back
config_path.write_text(json.dumps(config, indent=4))
console.print(f"[green]✓ Removed SecPipe MCP configuration from {agent.value}[/green]")
console.print(f"[green]✓ Removed FuzzForge MCP configuration from {agent.value}[/green]")
console.print()
console.print("[dim]Restart your AI agent for changes to take effect.[/dim]")

View File

@@ -0,0 +1,166 @@
"""Module management commands for FuzzForge CLI."""
import asyncio
from pathlib import Path
from typing import Annotated, Any
from rich.console import Console
from rich.table import Table
from typer import Argument, Context, Option, Typer
from fuzzforge_cli.context import get_project_path, get_runner
application: Typer = Typer(
name="modules",
help="Module management commands.",
)
@application.command(
help="List available modules.",
name="list",
)
def list_modules(
context: Context,
) -> None:
"""List all available modules.
:param context: Typer context.
"""
runner = get_runner(context)
modules = runner.list_modules()
console = Console()
if not modules:
console.print("[yellow]No modules found.[/yellow]")
console.print(f" Modules directory: {runner.settings.modules_path}")
return
table = Table(title="Available Modules")
table.add_column("Identifier", style="cyan")
table.add_column("Available")
table.add_column("Description")
for module in modules:
table.add_row(
module.identifier,
"" if module.available else "",
module.description or "-",
)
console.print(table)
@application.command(
help="Execute a module.",
name="run",
)
def run_module(
context: Context,
module_identifier: Annotated[
str,
Argument(
help="Identifier of the module to execute.",
),
],
assets_path: Annotated[
Path | None,
Option(
"--assets",
"-a",
help="Path to input assets.",
),
] = None,
config: Annotated[
str | None,
Option(
"--config",
"-c",
help="Module configuration as JSON string.",
),
] = None,
) -> None:
"""Execute a module.
:param context: Typer context.
:param module_identifier: Module to execute.
:param assets_path: Optional path to input assets.
:param config: Optional JSON configuration.
"""
import json
runner = get_runner(context)
project_path = get_project_path(context)
configuration: dict[str, Any] | None = None
if config:
try:
configuration = json.loads(config)
except json.JSONDecodeError as e:
console = Console()
console.print(f"[red]✗[/red] Invalid JSON configuration: {e}")
return
console = Console()
console.print(f"[blue]→[/blue] Executing module: {module_identifier}")
async def execute() -> None:
result = await runner.execute_module(
module_identifier=module_identifier,
project_path=project_path,
configuration=configuration,
assets_path=assets_path,
)
if result.success:
console.print(f"[green]✓[/green] Module execution completed")
console.print(f" Execution ID: {result.execution_id}")
console.print(f" Results: {result.results_path}")
else:
console.print(f"[red]✗[/red] Module execution failed")
console.print(f" Error: {result.error}")
asyncio.run(execute())
@application.command(
help="Show module information.",
name="info",
)
def module_info(
context: Context,
module_identifier: Annotated[
str,
Argument(
help="Identifier of the module.",
),
],
) -> None:
"""Show information about a specific module.
:param context: Typer context.
:param module_identifier: Module to get info for.
"""
runner = get_runner(context)
module = runner.get_module_info(module_identifier)
console = Console()
if module is None:
console.print(f"[red]✗[/red] Module not found: {module_identifier}")
return
table = Table(title=f"Module: {module.identifier}")
table.add_column("Property", style="cyan")
table.add_column("Value")
table.add_row("Identifier", module.identifier)
table.add_row("Available", "Yes" if module.available else "No")
table.add_row("Description", module.description or "-")
table.add_row("Version", module.version or "-")
console.print(table)

View File

@@ -1,4 +1,4 @@
"""Project management commands for SecPipe CLI."""
"""Project management commands for FuzzForge CLI."""
from pathlib import Path
from typing import Annotated
@@ -7,7 +7,7 @@ from rich.console import Console
from rich.table import Table
from typer import Argument, Context, Option, Typer
from secpipe_cli.context import get_project_path, get_storage
from fuzzforge_cli.context import get_project_path, get_runner
application: Typer = Typer(
name="project",
@@ -16,7 +16,7 @@ application: Typer = Typer(
@application.command(
help="Initialize a new SecPipe project.",
help="Initialize a new FuzzForge project.",
name="init",
)
def init_project(
@@ -28,7 +28,7 @@ def init_project(
),
] = None,
) -> None:
"""Initialize a new SecPipe project.
"""Initialize a new FuzzForge project.
Creates the necessary storage directories for the project.
@@ -36,10 +36,10 @@ def init_project(
:param path: Path to initialize (defaults to current directory).
"""
storage = get_storage(context)
runner = get_runner(context)
project_path = path or get_project_path(context)
storage_path = storage.init_project(project_path)
storage_path = runner.init_project(project_path)
console = Console()
console.print(f"[green]✓[/green] Project initialized at {project_path}")
@@ -65,10 +65,10 @@ def set_assets(
:param assets_path: Path to assets.
"""
storage = get_storage(context)
runner = get_runner(context)
project_path = get_project_path(context)
stored_path = storage.set_project_assets(project_path, assets_path)
stored_path = runner.set_project_assets(project_path, assets_path)
console = Console()
console.print(f"[green]✓[/green] Assets stored from {assets_path}")
@@ -87,11 +87,11 @@ def show_info(
:param context: Typer context.
"""
storage = get_storage(context)
runner = get_runner(context)
project_path = get_project_path(context)
executions = storage.list_executions(project_path)
assets_path = storage.get_project_assets_path(project_path)
executions = runner.list_executions(project_path)
assets_path = runner.storage.get_project_assets_path(project_path)
console = Console()
table = Table(title=f"Project: {project_path.name}")
@@ -118,10 +118,10 @@ def list_executions(
:param context: Typer context.
"""
storage = get_storage(context)
runner = get_runner(context)
project_path = get_project_path(context)
executions = storage.list_executions(project_path)
executions = runner.list_executions(project_path)
console = Console()
@@ -134,7 +134,7 @@ def list_executions(
table.add_column("Has Results")
for exec_id in executions:
has_results = storage.get_execution_results(project_path, exec_id) is not None
has_results = runner.get_execution_results(project_path, exec_id) is not None
table.add_row(exec_id, "" if has_results else "-")
console.print(table)
@@ -168,10 +168,10 @@ def get_results(
:param extract_to: Optional directory to extract to.
"""
storage = get_storage(context)
runner = get_runner(context)
project_path = get_project_path(context)
results_path = storage.get_execution_results(project_path, execution_id)
results_path = runner.get_execution_results(project_path, execution_id)
console = Console()
@@ -182,5 +182,5 @@ def get_results(
console.print(f"[green]✓[/green] Results: {results_path}")
if extract_to:
extracted = storage.extract_results(results_path, extract_to)
extracted = runner.extract_results(results_path, extract_to)
console.print(f" Extracted to: {extracted}")

View File

@@ -1,39 +1,39 @@
"""SecPipe CLI context management."""
"""FuzzForge CLI context management."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from secpipe_mcp.storage import LocalStorage # type: ignore[import-untyped]
from fuzzforge_runner import Runner, Settings
if TYPE_CHECKING:
from typer import Context as TyperContext
class Context:
"""CLI context holding the storage instance and settings."""
"""CLI context holding the runner instance and settings."""
_storage: LocalStorage
_runner: Runner
_project_path: Path
def __init__(self, storage: LocalStorage, project_path: Path) -> None:
def __init__(self, runner: Runner, project_path: Path) -> None:
"""Initialize an instance of the class.
:param storage: SecPipe local storage instance.
:param runner: FuzzForge runner instance.
:param project_path: Path to the current project.
"""
self._storage = storage
self._runner = runner
self._project_path = project_path
def get_storage(self) -> LocalStorage:
"""Get the storage instance.
def get_runner(self) -> Runner:
"""Get the runner instance.
:return: LocalStorage instance.
:return: Runner instance.
"""
return self._storage
return self._runner
def get_project_path(self) -> Path:
"""Get the current project path.
@@ -44,14 +44,14 @@ class Context:
return self._project_path
def get_storage(context: TyperContext) -> LocalStorage:
"""Get storage from Typer context.
def get_runner(context: TyperContext) -> Runner:
"""Get runner from Typer context.
:param context: Typer context.
:return: LocalStorage instance.
:return: Runner instance.
"""
return cast("Context", context.obj).get_storage()
return cast("Context", context.obj).get_runner()
def get_project_path(context: TyperContext) -> Path:

View File

@@ -0,0 +1,3 @@
# FuzzForge Common
...

View File

@@ -1,7 +1,7 @@
[project]
name = "secpipe-common"
name = "fuzzforge-common"
version = "0.0.1"
description = "SecPipe's common types and utilities."
description = "FuzzForge's common types and utilities."
authors = []
readme = "README.md"
requires-python = ">=3.14"

View File

@@ -0,0 +1,20 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"ANN401", # allowing 'typing.Any' to be used to type function parameters in tests
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,38 @@
"""FuzzForge Common - Shared abstractions and implementations for FuzzForge.
This package provides:
- Sandbox engine abstractions (Podman, Docker)
- Common exceptions
Example usage:
from fuzzforge_common import (
AbstractFuzzForgeSandboxEngine,
ImageInfo,
Podman,
PodmanConfiguration,
)
"""
from fuzzforge_common.exceptions import FuzzForgeError
from fuzzforge_common.sandboxes import (
AbstractFuzzForgeEngineConfiguration,
AbstractFuzzForgeSandboxEngine,
Docker,
DockerConfiguration,
FuzzForgeSandboxEngines,
ImageInfo,
Podman,
PodmanConfiguration,
)
__all__ = [
"AbstractFuzzForgeEngineConfiguration",
"AbstractFuzzForgeSandboxEngine",
"Docker",
"DockerConfiguration",
"FuzzForgeError",
"FuzzForgeSandboxEngines",
"ImageInfo",
"Podman",
"PodmanConfiguration",
]

View File

@@ -4,8 +4,8 @@ if TYPE_CHECKING:
from typing import Any
class SecPipeError(Exception):
"""Base exception for all SecPipe custom exceptions.
class FuzzForgeError(Exception):
"""Base exception for all FuzzForge custom exceptions.
All domain exceptions should inherit from this base to enable
consistent exception handling and hierarchy navigation.
@@ -13,7 +13,7 @@ class SecPipeError(Exception):
"""
def __init__(self, message: str, details: dict[str, Any] | None = None) -> None:
"""Initialize SecPipe error.
"""Initialize FuzzForge error.
:param message: Error message.
:param details: Optional error details dictionary.

View File

@@ -1,8 +1,8 @@
"""SecPipe Hub - Generic MCP server bridge.
"""FuzzForge Hub - Generic MCP server bridge.
This module provides a generic bridge to connect SecPipe with any MCP server.
This module provides a generic bridge to connect FuzzForge with any MCP server.
It allows AI agents to discover and execute tools from external MCP servers
(like mcp-security-hub) through the same interface as native SecPipe modules.
(like mcp-security-hub) through the same interface as native FuzzForge modules.
The hub is server-agnostic: it doesn't hardcode any specific tools or servers.
Instead, it dynamically discovers tools by connecting to configured MCP servers
@@ -15,9 +15,9 @@ Supported transport types:
"""
from secpipe_common.hub.client import HubClient, HubClientError, PersistentSession
from secpipe_common.hub.executor import HubExecutionResult, HubExecutor
from secpipe_common.hub.models import (
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.executor import HubExecutionResult, HubExecutor
from fuzzforge_common.hub.models import (
HubConfig,
HubServer,
HubServerConfig,
@@ -25,7 +25,7 @@ from secpipe_common.hub.models import (
HubTool,
HubToolParameter,
)
from secpipe_common.hub.registry import HubRegistry
from fuzzforge_common.hub.registry import HubRegistry
__all__ = [
"HubClient",
@@ -39,5 +39,4 @@ __all__ = [
"HubServerType",
"HubTool",
"HubToolParameter",
"PersistentSession",
]

View File

@@ -6,7 +6,6 @@ via stdio (docker/command) or SSE transport. It handles:
- Connecting to SSE endpoints
- Discovering tools via list_tools()
- Executing tools via call_tool()
- Persistent container sessions for stateful interactions
"""
@@ -17,11 +16,9 @@ import json
import os
import subprocess
from contextlib import asynccontextmanager
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, cast
from secpipe_common.hub.models import (
from fuzzforge_common.hub.models import (
HubServer,
HubServerConfig,
HubServerType,
@@ -50,48 +47,6 @@ class HubClientError(Exception):
"""Error in hub client operations."""
@dataclass
class PersistentSession:
"""A persistent container session with an active MCP connection.
Keeps a Docker container running between tool calls to allow
stateful interactions (e.g., radare2 analysis, long-running fuzzing).
"""
#: Server name this session belongs to.
server_name: str
#: Docker container name.
container_name: str
#: Underlying process (docker run).
process: Process
#: Stream reader (process stdout).
reader: asyncio.StreamReader
#: Stream writer (process stdin).
writer: asyncio.StreamWriter
#: Whether the MCP session has been initialized.
initialized: bool = False
#: Lock to serialise concurrent requests on the same session.
lock: asyncio.Lock = field(default_factory=asyncio.Lock)
#: When the session was started.
started_at: datetime = field(default_factory=lambda: datetime.now(tz=timezone.utc))
#: Monotonic counter for JSON-RPC request IDs.
request_id: int = 0
@property
def alive(self) -> bool:
"""Check if the underlying process is still running."""
return self.process.returncode is None
class HubClient:
"""Client for communicating with MCP hub servers.
@@ -110,8 +65,6 @@ class HubClient:
"""
self._timeout = timeout
self._persistent_sessions: dict[str, PersistentSession] = {}
self._request_id: int = 0
async def discover_tools(self, server: HubServer) -> list[HubTool]:
"""Discover tools from a hub server.
@@ -131,9 +84,8 @@ class HubClient:
try:
async with self._connect(config) as (reader, writer):
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# List tools
tools_data = await self._call_method(
@@ -176,7 +128,6 @@ class HubClient:
arguments: dict[str, Any],
*,
timeout: int | None = None,
extra_volumes: list[str] | None = None,
) -> dict[str, Any]:
"""Execute a tool on a hub server.
@@ -184,14 +135,13 @@ class HubClient:
:param tool_name: Name of the tool to execute.
:param arguments: Tool arguments.
:param timeout: Execution timeout (uses default if None).
:param extra_volumes: Additional Docker volume mounts to inject.
:returns: Tool execution result.
:raises HubClientError: If execution fails.
"""
logger = get_logger()
config = server.config
exec_timeout = timeout or config.timeout or self._timeout
exec_timeout = timeout or self._timeout
logger.info(
"Executing hub tool",
@@ -201,10 +151,9 @@ class HubClient:
)
try:
async with self._connect(config, extra_volumes=extra_volumes) as (reader, writer):
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
async with self._connect(config) as (reader, writer):
# Initialize MCP session
await self._initialize_session(reader, writer, config.name)
# Call tool
result = await asyncio.wait_for(
@@ -250,29 +199,15 @@ class HubClient:
async def _connect(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
If a persistent session exists for this server, reuse it (with a lock
to serialise concurrent requests). Otherwise, fall through to the
ephemeral per-call connection logic.
:param config: Server configuration.
:param extra_volumes: Additional Docker volume mounts to inject.
:yields: Tuple of (reader, writer) for communication.
"""
# Check for active persistent session
session = self._persistent_sessions.get(config.name)
if session and session.initialized and session.alive:
async with session.lock:
yield session.reader, session.writer # type: ignore[misc]
return
# Ephemeral connection (original behaviour)
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config, extra_volumes=extra_volumes) as streams:
async with self._connect_docker(config) as streams:
yield streams
elif config.type == HubServerType.COMMAND:
async with self._connect_command(config) as streams:
@@ -288,12 +223,10 @@ class HubClient:
async def _connect_docker(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to a Docker-based MCP server.
:param config: Server configuration with image name.
:param extra_volumes: Additional volume mounts to inject (e.g. project assets).
:yields: Tuple of (reader, writer) for stdio communication.
"""
@@ -308,13 +241,9 @@ class HubClient:
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
# Add volumes from server config
# Add volumes
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
# Add extra volumes (e.g. project assets injected at runtime)
for volume in (extra_volumes or []):
cmd.extend(["-v", os.path.expanduser(volume)])
cmd.extend(["-v", os.path.expandvars(os.path.expanduser(volume))])
# Add environment variables
for key, value in config.environment.items():
@@ -322,15 +251,12 @@ class HubClient:
cmd.append(config.image)
# Use 4 MB buffer to handle large tool responses (YARA rulesets, trivy output, etc.)
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
limit=4 * 1024 * 1024, # 4 MB — default 64 KB breaks large YARA/capa results
)
try:
@@ -369,16 +295,13 @@ class HubClient:
# Set up environment
env = dict(config.environment) if config.environment else None
# Use 4 MB buffer to handle large tool responses
_STREAM_LIMIT = 4 * 1024 * 1024
process: Process = await asyncio.create_subprocess_exec(
*config.command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
limit=_STREAM_LIMIT,
limit=4 * 1024 * 1024, # 4 MB — default 64 KB breaks large tool results
)
try:
@@ -437,7 +360,7 @@ class HubClient:
"protocolVersion": "2024-11-05",
"capabilities": {},
"clientInfo": {
"name": "secpipe-hub",
"name": "fuzzforge-hub",
"version": "0.1.0",
},
},
@@ -464,11 +387,10 @@ class HubClient:
:returns: Method result.
"""
# Create JSON-RPC request with unique ID
self._request_id += 1
# Create JSON-RPC request
request = {
"jsonrpc": "2.0",
"id": self._request_id,
"id": 1,
"method": method,
"params": params,
}
@@ -495,16 +417,7 @@ class HubClient:
msg = f"MCP error: {error.get('message', 'Unknown error')}"
raise HubClientError(msg)
result = response.get("result", {})
# Check for tool-level errors in content items
for item in result.get("content", []):
if item.get("isError", False):
error_text = item.get("text", "unknown error")
msg = f"Tool returned error: {error_text}"
raise HubClientError(msg)
return result
return response.get("result", {})
async def _send_notification(
self,
@@ -531,223 +444,3 @@ class HubClient:
notification_line = json.dumps(notification) + "\n"
writer.write(notification_line.encode())
await writer.drain()
# ------------------------------------------------------------------
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_session(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> PersistentSession:
"""Start a persistent Docker container and initialise MCP session.
The container stays running until :meth:`stop_persistent_session` is
called, allowing multiple tool calls on the same session.
:param config: Server configuration (must be Docker type).
:param extra_volumes: Additional host:container volume mounts to inject.
:returns: The created persistent session.
:raises HubClientError: If the container cannot be started.
"""
logger = get_logger()
if config.name in self._persistent_sessions:
session = self._persistent_sessions[config.name]
if session.alive:
logger.info("Persistent session already running", server=config.name)
return session
# Dead session — clean up and restart
await self._cleanup_session(config.name)
if config.type != HubServerType.DOCKER:
msg = f"Persistent mode only supports Docker servers (got {config.type.value})"
raise HubClientError(msg)
if not config.image:
msg = f"Docker image not specified for server '{config.name}'"
raise HubClientError(msg)
container_name = f"secpipe-{config.name}"
# Remove stale container with same name if it exists
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
# Build docker run command (no --rm, with --name)
cmd = ["docker", "run", "-i", "--name", container_name]
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
for extra_vol in (extra_volumes or []):
cmd.extend(["-v", extra_vol])
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
cmd.append(config.image)
_STREAM_LIMIT = 4 * 1024 * 1024
logger.info(
"Starting persistent container",
server=config.name,
container=container_name,
image=config.image,
)
process: Process = await asyncio.create_subprocess_exec(
*cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
limit=_STREAM_LIMIT,
)
if process.stdin is None or process.stdout is None:
process.terminate()
msg = "Failed to get process streams"
raise HubClientError(msg)
session = PersistentSession(
server_name=config.name,
container_name=container_name,
process=process,
reader=process.stdout,
writer=process.stdin,
)
# Initialise MCP session
try:
await self._initialize_session(
session.reader, # type: ignore[arg-type]
session.writer, # type: ignore[arg-type]
config.name,
)
session.initialized = True
except Exception as e:
process.terminate()
try:
await asyncio.wait_for(process.wait(), timeout=5)
except asyncio.TimeoutError:
process.kill()
msg = f"Failed to initialise MCP session for {config.name}: {e}"
raise HubClientError(msg) from e
self._persistent_sessions[config.name] = session
logger.info(
"Persistent session started",
server=config.name,
container=container_name,
)
return session
async def stop_persistent_session(self, server_name: str) -> bool:
"""Stop a persistent container session.
:param server_name: Name of the server whose session to stop.
:returns: True if a session was stopped, False if none found.
"""
return await self._cleanup_session(server_name)
def get_persistent_session(self, server_name: str) -> PersistentSession | None:
"""Get a persistent session by server name.
:param server_name: Server name.
:returns: The session if running, None otherwise.
"""
session = self._persistent_sessions.get(server_name)
if session and not session.alive:
# Mark dead session — don't remove here to avoid async issues
return None
return session
def list_persistent_sessions(self) -> list[dict[str, Any]]:
"""List all persistent sessions with their status.
:returns: List of session info dictionaries.
"""
sessions = []
for name, session in self._persistent_sessions.items():
sessions.append({
"server_name": name,
"container_name": session.container_name,
"alive": session.alive,
"initialized": session.initialized,
"started_at": session.started_at.isoformat(),
"uptime_seconds": int(
(datetime.now(tz=timezone.utc) - session.started_at).total_seconds()
),
})
return sessions
async def stop_all_persistent_sessions(self) -> int:
"""Stop all persistent sessions.
:returns: Number of sessions stopped.
"""
names = list(self._persistent_sessions.keys())
count = 0
for name in names:
if await self._cleanup_session(name):
count += 1
return count
async def _cleanup_session(self, server_name: str) -> bool:
"""Clean up a persistent session (terminate process, remove container).
:param server_name: Server name.
:returns: True if cleaned up, False if not found.
"""
logger = get_logger()
session = self._persistent_sessions.pop(server_name, None)
if session is None:
return False
logger.info("Stopping persistent session", server=server_name)
# Terminate process
if session.alive:
session.process.terminate()
try:
await asyncio.wait_for(session.process.wait(), timeout=10)
except asyncio.TimeoutError:
session.process.kill()
await session.process.wait()
# Remove Docker container
try:
rm_proc = await asyncio.create_subprocess_exec(
"docker", "rm", "-f", session.container_name,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
await rm_proc.wait()
except Exception:
pass
logger.info(
"Persistent session stopped",
server=server_name,
container=session.container_name,
)
return True

View File

@@ -0,0 +1,334 @@
"""Hub executor for managing MCP server lifecycle and tool execution.
This module provides a high-level interface for:
- Discovering tools from all registered hub servers
- Executing tools with proper error handling
- Managing the lifecycle of hub operations
"""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from fuzzforge_common.hub.client import HubClient, HubClientError
from fuzzforge_common.hub.models import HubServer, HubServerConfig, HubTool
from fuzzforge_common.hub.registry import HubRegistry
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
def get_logger() -> BoundLogger:
"""Get structlog logger instance.
:returns: Configured structlog logger.
"""
from structlog import get_logger # noqa: PLC0415
return cast("BoundLogger", get_logger())
class HubExecutionResult:
"""Result of a hub tool execution."""
def __init__(
self,
*,
success: bool,
server_name: str,
tool_name: str,
result: dict[str, Any] | None = None,
error: str | None = None,
) -> None:
"""Initialize execution result.
:param success: Whether execution succeeded.
:param server_name: Name of the hub server.
:param tool_name: Name of the executed tool.
:param result: Tool execution result data.
:param error: Error message if execution failed.
"""
self.success = success
self.server_name = server_name
self.tool_name = tool_name
self.result = result or {}
self.error = error
@property
def identifier(self) -> str:
"""Get full tool identifier."""
return f"hub:{self.server_name}:{self.tool_name}"
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary.
:returns: Dictionary representation.
"""
return {
"success": self.success,
"identifier": self.identifier,
"server": self.server_name,
"tool": self.tool_name,
"result": self.result,
"error": self.error,
}
class HubExecutor:
"""Executor for hub server operations.
Provides high-level methods for discovering and executing
tools from hub servers.
"""
#: Hub registry instance.
_registry: HubRegistry
#: MCP client instance.
_client: HubClient
def __init__(
self,
config_path: Path | None = None,
timeout: int = 300,
) -> None:
"""Initialize the hub executor.
:param config_path: Path to hub-servers.json config file.
:param timeout: Default timeout for tool execution.
"""
self._registry = HubRegistry(config_path)
self._client = HubClient(timeout=timeout)
@property
def registry(self) -> HubRegistry:
"""Get the hub registry.
:returns: Hub registry instance.
"""
return self._registry
def add_server(self, config: HubServerConfig) -> HubServer:
"""Add a server to the registry.
:param config: Server configuration.
:returns: Created HubServer instance.
"""
return self._registry.add_server(config)
async def discover_all_tools(self) -> dict[str, list[HubTool]]:
"""Discover tools from all enabled servers.
:returns: Dict mapping server names to lists of discovered tools.
"""
logger = get_logger()
results: dict[str, list[HubTool]] = {}
for server in self._registry.enabled_servers:
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server.name, tools)
results[server.name] = tools
except HubClientError as e:
logger.warning(
"Failed to discover tools",
server=server.name,
error=str(e),
)
self._registry.update_server_tools(server.name, [], error=str(e))
results[server.name] = []
return results
async def discover_server_tools(self, server_name: str) -> list[HubTool]:
"""Discover tools from a specific server.
:param server_name: Name of the server.
:returns: List of discovered tools.
:raises ValueError: If server not found.
"""
server = self._registry.get_server(server_name)
if not server:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
try:
tools = await self._client.discover_tools(server)
self._registry.update_server_tools(server_name, tools)
return tools
except HubClientError as e:
self._registry.update_server_tools(server_name, [], error=str(e))
raise
async def execute_tool(
self,
identifier: str,
arguments: dict[str, Any] | None = None,
*,
timeout: int | None = None,
) -> HubExecutionResult:
"""Execute a hub tool.
:param identifier: Tool identifier (hub:server:tool or server:tool).
:param arguments: Tool arguments.
:param timeout: Execution timeout.
:returns: Execution result.
"""
logger = get_logger()
arguments = arguments or {}
# Parse identifier and find tool
server, tool = self._registry.find_tool(identifier)
if not server or not tool:
# Try to parse as server:tool and discover
parts = identifier.replace("hub:", "").split(":")
if len(parts) == 2: # noqa: PLR2004
server_name, tool_name = parts
server = self._registry.get_server(server_name)
if server and not server.discovered:
# Try to discover tools first
try:
await self.discover_server_tools(server_name)
tool = server.get_tool(tool_name)
except HubClientError:
pass
if server and not tool:
# Tool not found, but server exists - try to execute anyway
# The server might have the tool even if discovery failed
tool_name_to_use = tool_name
else:
tool_name_to_use = tool.name if tool else ""
if not server:
return HubExecutionResult(
success=False,
server_name=server_name,
tool_name=tool_name,
error=f"Server '{server_name}' not found",
)
# Execute even if tool wasn't discovered (server might still have it)
try:
result = await self._client.execute_tool(
server,
tool_name_to_use or tool_name,
arguments,
timeout=timeout,
)
return HubExecutionResult(
success=True,
server_name=server.name,
tool_name=tool_name_to_use or tool_name,
result=result,
)
except HubClientError as e:
return HubExecutionResult(
success=False,
server_name=server.name,
tool_name=tool_name_to_use or tool_name,
error=str(e),
)
else:
return HubExecutionResult(
success=False,
server_name="unknown",
tool_name=identifier,
error=f"Invalid tool identifier: {identifier}",
)
# Execute the tool
logger.info(
"Executing hub tool",
server=server.name,
tool=tool.name,
arguments=arguments,
)
try:
result = await self._client.execute_tool(
server,
tool.name,
arguments,
timeout=timeout,
)
return HubExecutionResult(
success=True,
server_name=server.name,
tool_name=tool.name,
result=result,
)
except HubClientError as e:
return HubExecutionResult(
success=False,
server_name=server.name,
tool_name=tool.name,
error=str(e),
)
def list_servers(self) -> list[dict[str, Any]]:
"""List all registered servers with their status.
:returns: List of server info dicts.
"""
servers = []
for server in self._registry.servers:
servers.append({
"name": server.name,
"identifier": server.identifier,
"type": server.config.type.value,
"enabled": server.config.enabled,
"category": server.config.category,
"description": server.config.description,
"discovered": server.discovered,
"tool_count": len(server.tools),
"error": server.discovery_error,
})
return servers
def list_tools(self) -> list[dict[str, Any]]:
"""List all discovered tools.
:returns: List of tool info dicts.
"""
tools = []
for tool in self._registry.get_all_tools():
tools.append({
"identifier": tool.identifier,
"name": tool.name,
"server": tool.server_name,
"description": tool.description,
"parameters": [p.model_dump() for p in tool.parameters],
})
return tools
def get_tool_schema(self, identifier: str) -> dict[str, Any] | None:
"""Get the JSON Schema for a tool's input.
:param identifier: Tool identifier.
:returns: JSON Schema dict or None if not found.
"""
_, tool = self._registry.find_tool(identifier)
if tool:
return tool.input_schema
return None

View File

@@ -1,4 +1,4 @@
"""Data models for SecPipe Hub.
"""Data models for FuzzForge Hub.
This module defines the Pydantic models used to represent MCP servers
and their tools in the hub registry.
@@ -92,18 +92,6 @@ class HubServerConfig(BaseModel):
description="Category for grouping servers",
)
#: Per-server timeout override in seconds (None = use default_timeout).
timeout: int | None = Field(
default=None,
description="Per-server execution timeout override in seconds",
)
#: Whether to use persistent container mode (keep container running between calls).
persistent: bool = Field(
default=False,
description="Keep container running between tool calls for stateful interactions",
)
class HubToolParameter(BaseModel):
"""A parameter for an MCP tool.
@@ -294,17 +282,3 @@ class HubConfig(BaseModel):
default=True,
description="Cache discovered tools",
)
#: Workflow hints indexed by "after:<tool_name>" keys.
#: Loaded inline or merged from workflow_hints_file.
workflow_hints: dict[str, Any] = Field(
default_factory=dict,
description="Workflow hints indexed by 'after:<tool_name>'",
)
#: Optional path to an external workflow-hints.json file.
#: Relative paths are resolved relative to the hub-config.json location.
workflow_hints_file: str | None = Field(
default=None,
description="Path to an external workflow-hints.json to load and merge",
)

View File

@@ -12,7 +12,7 @@ import json
from pathlib import Path
from typing import TYPE_CHECKING, cast
from secpipe_common.hub.models import (
from fuzzforge_common.hub.models import (
HubConfig,
HubServer,
HubServerConfig,
@@ -87,28 +87,6 @@ class HubRegistry:
config=server_config,
)
# Load and merge external workflow hints file if specified.
if self._config.workflow_hints_file:
hints_path = Path(self._config.workflow_hints_file)
if not hints_path.is_absolute():
hints_path = config_path.parent / hints_path
if hints_path.exists():
try:
with hints_path.open() as hf:
hints_data = json.load(hf)
self._config.workflow_hints.update(hints_data.get("hints", {}))
logger.info(
"Loaded workflow hints",
path=str(hints_path),
hints=len(self._config.workflow_hints),
)
except Exception as hints_err:
logger.warning(
"Failed to load workflow hints file",
path=str(hints_path),
error=str(hints_err),
)
logger.info(
"Loaded hub configuration",
path=str(config_path),
@@ -240,15 +218,6 @@ class HubRegistry:
server.discovery_error = None
server.tools = tools
def get_workflow_hint(self, tool_name: str) -> dict | None:
"""Get the workflow hint for a tool by name.
:param tool_name: Tool name (e.g. ``binwalk_extract``).
:returns: Hint dict for the ``after:<tool_name>`` key, or None.
"""
return self._config.workflow_hints.get(f"after:{tool_name}") or None
def get_all_tools(self) -> list:
"""Get all discovered tools from all servers.

View File

@@ -0,0 +1,23 @@
"""FuzzForge sandbox abstractions and implementations."""
from fuzzforge_common.sandboxes.engines import (
AbstractFuzzForgeEngineConfiguration,
AbstractFuzzForgeSandboxEngine,
Docker,
DockerConfiguration,
FuzzForgeSandboxEngines,
ImageInfo,
Podman,
PodmanConfiguration,
)
__all__ = [
"AbstractFuzzForgeEngineConfiguration",
"AbstractFuzzForgeSandboxEngine",
"Docker",
"DockerConfiguration",
"FuzzForgeSandboxEngines",
"ImageInfo",
"Podman",
"PodmanConfiguration",
]

View File

@@ -0,0 +1,21 @@
"""Container engine implementations for FuzzForge sandboxes."""
from fuzzforge_common.sandboxes.engines.base import (
AbstractFuzzForgeEngineConfiguration,
AbstractFuzzForgeSandboxEngine,
ImageInfo,
)
from fuzzforge_common.sandboxes.engines.docker import Docker, DockerConfiguration
from fuzzforge_common.sandboxes.engines.enumeration import FuzzForgeSandboxEngines
from fuzzforge_common.sandboxes.engines.podman import Podman, PodmanConfiguration
__all__ = [
"AbstractFuzzForgeEngineConfiguration",
"AbstractFuzzForgeSandboxEngine",
"Docker",
"DockerConfiguration",
"FuzzForgeSandboxEngines",
"ImageInfo",
"Podman",
"PodmanConfiguration",
]

View File

@@ -0,0 +1,15 @@
"""Base engine abstractions."""
from fuzzforge_common.sandboxes.engines.base.configuration import (
AbstractFuzzForgeEngineConfiguration,
)
from fuzzforge_common.sandboxes.engines.base.engine import (
AbstractFuzzForgeSandboxEngine,
ImageInfo,
)
__all__ = [
"AbstractFuzzForgeEngineConfiguration",
"AbstractFuzzForgeSandboxEngine",
"ImageInfo",
]

View File

@@ -0,0 +1,24 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from pydantic import BaseModel
from fuzzforge_common.sandboxes.engines.enumeration import (
FuzzForgeSandboxEngines, # noqa: TC001 (required by 'pydantic' at runtime)
)
if TYPE_CHECKING:
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine
class AbstractFuzzForgeEngineConfiguration(ABC, BaseModel):
"""TODO."""
#: TODO.
kind: FuzzForgeSandboxEngines
@abstractmethod
def into_engine(self) -> AbstractFuzzForgeSandboxEngine:
"""TODO."""
message: str = f"method 'into_engine' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)

View File

@@ -10,10 +10,10 @@ if TYPE_CHECKING:
class ImageInfo:
"""Information about a container image."""
#: Full image reference (e.g., "localhost/secpipe-module-echidna:latest").
#: Full image reference (e.g., "localhost/fuzzforge-module-echidna:latest").
reference: str
#: Repository name (e.g., "localhost/secpipe-module-echidna").
#: Repository name (e.g., "localhost/fuzzforge-module-echidna").
repository: str
#: Image tag (e.g., "latest").
@@ -29,8 +29,8 @@ class ImageInfo:
labels: dict[str, str] | None = None
class AbstractSecPipeSandboxEngine(ABC):
"""Abstract class used as a base for all SecPipe sandbox engine classes."""
class AbstractFuzzForgeSandboxEngine(ABC):
"""Abstract class used as a base for all FuzzForge sandbox engine classes."""
@abstractmethod
def list_images(self, filter_prefix: str | None = None) -> list[ImageInfo]:
@@ -140,7 +140,7 @@ class AbstractSecPipeSandboxEngine(ABC):
:param image: Full image reference to pull.
:param timeout: Timeout in seconds for the pull operation.
:raises SecPipeError: If pull fails.
:raises FuzzForgeError: If pull fails.
"""
message: str = f"method 'pull_image' is not implemented for class '{self.__class__.__name__}'"
@@ -306,7 +306,7 @@ class AbstractSecPipeSandboxEngine(ABC):
Creates a temporary container, copies the file, and removes the container.
:param image: Image reference (e.g., "secpipe-rust-analyzer:latest").
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.

View File

@@ -0,0 +1,13 @@
"""Docker container engine implementation."""
from fuzzforge_common.sandboxes.engines.docker.cli import DockerCLI
from fuzzforge_common.sandboxes.engines.docker.configuration import (
DockerConfiguration,
)
from fuzzforge_common.sandboxes.engines.docker.engine import Docker
__all__ = [
"Docker",
"DockerCLI",
"DockerConfiguration",
]

View File

@@ -13,8 +13,8 @@ from pathlib import Path, PurePath
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, cast
from secpipe_common.exceptions import SecPipeError
from secpipe_common.sandboxes.engines.base.engine import AbstractSecPipeSandboxEngine, ImageInfo
from fuzzforge_common.exceptions import FuzzForgeError
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine, ImageInfo
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
@@ -27,7 +27,7 @@ def get_logger() -> BoundLogger:
return cast("BoundLogger", get_logger())
class DockerCLI(AbstractSecPipeSandboxEngine):
class DockerCLI(AbstractFuzzForgeSandboxEngine):
"""Docker engine using CLI commands.
This implementation uses subprocess calls to the Docker CLI,
@@ -37,7 +37,7 @@ class DockerCLI(AbstractSecPipeSandboxEngine):
def __init__(self) -> None:
"""Initialize the DockerCLI engine."""
AbstractSecPipeSandboxEngine.__init__(self)
AbstractFuzzForgeSandboxEngine.__init__(self)
def _base_cmd(self) -> list[str]:
"""Get base Docker command.
@@ -147,7 +147,7 @@ class DockerCLI(AbstractSecPipeSandboxEngine):
get_logger().info("image pulled successfully", image=image)
except subprocess.CalledProcessError as exc:
message = f"Failed to pull image '{image}': {exc.stderr}"
raise SecPipeError(message) from exc
raise FuzzForgeError(message) from exc
def tag_image(self, source: str, target: str) -> None:
"""Tag an image with a new name.
@@ -440,7 +440,7 @@ class DockerCLI(AbstractSecPipeSandboxEngine):
Uses docker run with --entrypoint override to read the file via cat.
:param image: Image reference (e.g., "secpipe-rust-analyzer:latest").
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.

View File

@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING, Literal
from fuzzforge_common.sandboxes.engines.base.configuration import AbstractFuzzForgeEngineConfiguration
from fuzzforge_common.sandboxes.engines.docker.engine import Docker
from fuzzforge_common.sandboxes.engines.enumeration import FuzzForgeSandboxEngines
if TYPE_CHECKING:
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine
class DockerConfiguration(AbstractFuzzForgeEngineConfiguration):
"""TODO."""
#: TODO.
kind: Literal[FuzzForgeSandboxEngines.DOCKER] = FuzzForgeSandboxEngines.DOCKER
#: TODO.
socket: str
def into_engine(self) -> AbstractFuzzForgeSandboxEngine:
"""TODO."""
return Docker(socket=self.socket)

View File

@@ -2,13 +2,13 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from secpipe_common.sandboxes.engines.base.engine import AbstractSecPipeSandboxEngine, ImageInfo
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine, ImageInfo
if TYPE_CHECKING:
from pathlib import Path, PurePath
class Docker(AbstractSecPipeSandboxEngine):
class Docker(AbstractFuzzForgeSandboxEngine):
"""TODO."""
#: TODO.

View File

@@ -1,7 +1,7 @@
from enum import StrEnum
class SecPipeSandboxEngines(StrEnum):
class FuzzForgeSandboxEngines(StrEnum):
"""TODO."""
#: TODO.

View File

@@ -0,0 +1,13 @@
"""Podman container engine implementation."""
from fuzzforge_common.sandboxes.engines.podman.cli import PodmanCLI
from fuzzforge_common.sandboxes.engines.podman.configuration import (
PodmanConfiguration,
)
from fuzzforge_common.sandboxes.engines.podman.engine import Podman
__all__ = [
"Podman",
"PodmanCLI",
"PodmanConfiguration",
]

View File

@@ -15,8 +15,8 @@ from pathlib import Path, PurePath
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, cast
from secpipe_common.exceptions import SecPipeError
from secpipe_common.sandboxes.engines.base.engine import AbstractSecPipeSandboxEngine, ImageInfo
from fuzzforge_common.exceptions import FuzzForgeError
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine, ImageInfo
if TYPE_CHECKING:
from structlog.stdlib import BoundLogger
@@ -43,7 +43,7 @@ def _is_running_under_snap() -> bool:
return os.getenv("SNAP") is not None
class PodmanCLI(AbstractSecPipeSandboxEngine):
class PodmanCLI(AbstractFuzzForgeSandboxEngine):
"""Podman engine using CLI with custom storage paths.
This implementation uses subprocess calls to the Podman CLI with --root
@@ -71,7 +71,7 @@ class PodmanCLI(AbstractSecPipeSandboxEngine):
Custom storage is used when running under Snap AND paths are provided.
:raises SecPipeError: If running on macOS (Podman not supported).
:raises FuzzForgeError: If running on macOS (Podman not supported).
"""
import sys # noqa: PLC0415
@@ -81,9 +81,9 @@ class PodmanCLI(AbstractSecPipeSandboxEngine):
" brew install --cask docker\n"
" # Or download from https://docker.com/products/docker-desktop"
)
raise SecPipeError(msg)
raise FuzzForgeError(msg)
AbstractSecPipeSandboxEngine.__init__(self)
AbstractFuzzForgeSandboxEngine.__init__(self)
# Use custom storage only under Snap (to fix XDG_DATA_HOME issues)
self.__use_custom_storage = _is_running_under_snap() and graphroot is not None and runroot is not None
@@ -206,7 +206,7 @@ class PodmanCLI(AbstractSecPipeSandboxEngine):
get_logger().info("image pulled successfully", image=image)
except subprocess.CalledProcessError as exc:
message = f"Failed to pull image '{image}': {exc.stderr}"
raise SecPipeError(message) from exc
raise FuzzForgeError(message) from exc
def tag_image(self, source: str, target: str) -> None:
"""Tag an image with a new name.
@@ -501,7 +501,7 @@ class PodmanCLI(AbstractSecPipeSandboxEngine):
Uses podman run with --entrypoint override to read the file via cat.
:param image: Image reference (e.g., "secpipe-rust-analyzer:latest").
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.

View File

@@ -0,0 +1,22 @@
from typing import TYPE_CHECKING, Literal
from fuzzforge_common.sandboxes.engines.base.configuration import AbstractFuzzForgeEngineConfiguration
from fuzzforge_common.sandboxes.engines.enumeration import FuzzForgeSandboxEngines
from fuzzforge_common.sandboxes.engines.podman.engine import Podman
if TYPE_CHECKING:
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine
class PodmanConfiguration(AbstractFuzzForgeEngineConfiguration):
"""TODO."""
#: TODO.
kind: Literal[FuzzForgeSandboxEngines.PODMAN] = FuzzForgeSandboxEngines.PODMAN
#: TODO.
socket: str
def into_engine(self) -> AbstractFuzzForgeSandboxEngine:
"""TODO."""
return Podman(socket=self.socket)

View File

@@ -8,8 +8,8 @@ from typing import TYPE_CHECKING, cast
from podman.errors import ImageNotFound
from secpipe_common.exceptions import SecPipeError
from secpipe_common.sandboxes.engines.base.engine import AbstractSecPipeSandboxEngine, ImageInfo
from fuzzforge_common.exceptions import FuzzForgeError
from fuzzforge_common.sandboxes.engines.base.engine import AbstractFuzzForgeSandboxEngine, ImageInfo
if TYPE_CHECKING:
from podman import PodmanClient
@@ -24,7 +24,7 @@ def get_logger() -> BoundLogger:
return cast("BoundLogger", get_logger())
class Podman(AbstractSecPipeSandboxEngine):
class Podman(AbstractFuzzForgeSandboxEngine):
"""TODO."""
#: TODO.
@@ -36,7 +36,7 @@ class Podman(AbstractSecPipeSandboxEngine):
:param socket: TODO.
"""
AbstractSecPipeSandboxEngine.__init__(self)
AbstractFuzzForgeSandboxEngine.__init__(self)
self.__socket = socket
def get_client(self) -> PodmanClient:
@@ -99,7 +99,7 @@ class Podman(AbstractSecPipeSandboxEngine):
images = list(client.images.load(file_path=archive))
if len(images) != 1:
message: str = "expected only one image"
raise SecPipeError(message)
raise FuzzForgeError(message)
image = images[0]
image.tag(repository=repository, tag="latest")
@@ -254,7 +254,7 @@ class Podman(AbstractSecPipeSandboxEngine):
:param image: Full image reference to pull.
:param timeout: Timeout in seconds for the pull operation.
:raises SecPipeError: If pull fails.
:raises FuzzForgeError: If pull fails.
"""
client: PodmanClient = self.get_client()
@@ -265,7 +265,7 @@ class Podman(AbstractSecPipeSandboxEngine):
get_logger().info("image pulled successfully", image=image)
except Exception as exc:
message = f"Failed to pull image '{image}': {exc}"
raise SecPipeError(message) from exc
raise FuzzForgeError(message) from exc
def tag_image(self, source: str, target: str) -> None:
"""Tag an image with a new name.
@@ -524,7 +524,7 @@ class Podman(AbstractSecPipeSandboxEngine):
Creates a temporary container, reads the file, and removes the container.
:param image: Image reference (e.g., "secpipe-rust-analyzer:latest").
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.

View File

@@ -0,0 +1 @@
pytest_plugins = ["fuzzforge_tests.fixtures"]

View File

@@ -4,7 +4,7 @@ from unittest import mock
import pytest
from secpipe_common.sandboxes.engines.docker.cli import DockerCLI
from fuzzforge_common.sandboxes.engines.docker.cli import DockerCLI
def test_docker_cli_base_cmd() -> None:

View File

@@ -9,8 +9,8 @@ from unittest import mock
import pytest
from secpipe_common.exceptions import SecPipeError
from secpipe_common.sandboxes.engines.podman.cli import PodmanCLI, _is_running_under_snap
from fuzzforge_common.exceptions import FuzzForgeError
from fuzzforge_common.sandboxes.engines.podman.cli import PodmanCLI, _is_running_under_snap
# Helper to mock Linux platform for testing (since Podman is Linux-only)
@@ -60,7 +60,7 @@ def test_snap_detection_when_snap_not_set() -> None:
def test_podman_cli_blocks_macos() -> None:
"""Test that PodmanCLI raises error on macOS."""
with mock.patch.object(sys, "platform", "darwin"):
with pytest.raises(SecPipeError) as exc_info:
with pytest.raises(FuzzForgeError) as exc_info:
PodmanCLI()
assert "Podman is not supported on macOS" in str(exc_info.value)
assert "Docker" in str(exc_info.value)

View File

@@ -8,4 +8,4 @@ WORKDIR /app
RUN /bin/uv venv && /bin/uv pip install --find-links /wheels $PACKAGE
CMD [ "/bin/uv", "run", "uvicorn", "secpipe_mcp.application:app"]
CMD [ "/bin/uv", "run", "uvicorn", "fuzzforge_mcp.application:app"]

View File

@@ -1,10 +1,10 @@
# SecPipe MCP
# FuzzForge MCP
Model Context Protocol (MCP) server that enables AI agents to orchestrate SecPipe security research modules.
Model Context Protocol (MCP) server that enables AI agents to orchestrate FuzzForge security research modules.
## Overview
SecPipe MCP provides a standardized interface for AI agents (Claude Code, GitHub Copilot, Claude Desktop) to:
FuzzForge MCP provides a standardized interface for AI agents (Claude Code, GitHub Copilot, Claude Desktop) to:
- List and discover available security modules
- Execute modules in isolated containers
@@ -17,20 +17,20 @@ The server communicates with AI agents using the [Model Context Protocol](https:
### Automatic Installation (Recommended)
Use the SecPipe CLI to automatically configure MCP for your AI agent:
Use the FuzzForge CLI to automatically configure MCP for your AI agent:
```bash
# For GitHub Copilot
uv run secpipe mcp install copilot
uv run fuzzforge mcp install copilot
# For Claude Code (VS Code extension)
uv run secpipe mcp install claude-code
uv run fuzzforge mcp install claude-code
# For Claude Desktop (standalone app)
uv run secpipe mcp install claude-desktop
uv run fuzzforge mcp install claude-desktop
# Verify installation
uv run secpipe mcp status
uv run fuzzforge mcp status
```
After installation, restart your AI agent to activate the connection.
@@ -44,13 +44,13 @@ For custom setups, you can manually configure the MCP server.
```json
{
"mcpServers": {
"secpipe": {
"command": "/path/to/secpipe_ai/.venv/bin/python",
"args": ["-m", "secpipe_mcp"],
"cwd": "/path/to/secpipe_ai",
"fuzzforge": {
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"env": {
"SECPIPE_MODULES_PATH": "/path/to/secpipe_ai/secpipe-modules",
"SECPIPE_ENGINE__TYPE": "docker"
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}
}
@@ -62,14 +62,14 @@ For custom setups, you can manually configure the MCP server.
```json
{
"servers": {
"secpipe": {
"fuzzforge": {
"type": "stdio",
"command": "/path/to/secpipe_ai/.venv/bin/python",
"args": ["-m", "secpipe_mcp"],
"cwd": "/path/to/secpipe_ai",
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"env": {
"SECPIPE_MODULES_PATH": "/path/to/secpipe_ai/secpipe-modules",
"SECPIPE_ENGINE__TYPE": "docker"
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}
}
@@ -81,14 +81,14 @@ For custom setups, you can manually configure the MCP server.
```json
{
"mcpServers": {
"secpipe": {
"fuzzforge": {
"type": "stdio",
"command": "/path/to/secpipe_ai/.venv/bin/python",
"args": ["-m", "secpipe_mcp"],
"cwd": "/path/to/secpipe_ai",
"command": "/path/to/fuzzforge_ai/.venv/bin/python",
"args": ["-m", "fuzzforge_mcp"],
"cwd": "/path/to/fuzzforge_ai",
"env": {
"SECPIPE_MODULES_PATH": "/path/to/secpipe_ai/secpipe-modules",
"SECPIPE_ENGINE__TYPE": "docker"
"FUZZFORGE_MODULES_PATH": "/path/to/fuzzforge_ai/fuzzforge-modules",
"FUZZFORGE_ENGINE__TYPE": "docker"
}
}
}
@@ -99,10 +99,10 @@ For custom setups, you can manually configure the MCP server.
| Variable | Required | Default | Description |
| -------- | -------- | ------- | ----------- |
| `SECPIPE_MODULES_PATH` | Yes | - | Path to the modules directory |
| `SECPIPE_ENGINE__TYPE` | No | `docker` | Container engine (`docker` or `podman`) |
| `SECPIPE_ENGINE__GRAPHROOT` | No | - | Container storage path (Podman under Snap only) |
| `SECPIPE_ENGINE__RUNROOT` | No | - | Container runtime state path (Podman under Snap only) |
| `FUZZFORGE_MODULES_PATH` | Yes | - | Path to the modules directory |
| `FUZZFORGE_ENGINE__TYPE` | No | `docker` | Container engine (`docker` or `podman`) |
| `FUZZFORGE_ENGINE__GRAPHROOT` | No | - | Container storage path (Podman under Snap only) |
| `FUZZFORGE_ENGINE__RUNROOT` | No | - | Container runtime state path (Podman under Snap only) |
## Available Tools
@@ -110,7 +110,7 @@ The MCP server exposes the following tools to AI agents:
### Project Management
- **`init_project`** - Initialize a new SecPipe project
- **`init_project`** - Initialize a new FuzzForge project
- **`set_project_assets`** - Set initial assets (source code, contracts, etc.) for the project
### Module Management
@@ -135,7 +135,7 @@ The server also provides resources for accessing:
### From AI Agent (e.g., Claude Code)
Once configured, AI agents can interact with SecPipe naturally:
Once configured, AI agents can interact with FuzzForge naturally:
```text
User: List the available security modules
@@ -161,10 +161,10 @@ For testing during development, you can run the MCP server directly:
```bash
# Run MCP server in stdio mode (for AI agents)
uv run python -m secpipe_mcp
uv run python -m fuzzforge_mcp
# Run HTTP server for testing (not for production)
uv run uvicorn secpipe_mcp.application:app --reload
uv run uvicorn fuzzforge_mcp.application:app --reload
```
## Architecture
@@ -178,14 +178,14 @@ uv run uvicorn secpipe_mcp.application:app --reload
│ stdio/JSON-RPC
┌─────────────────────────────────────────┐
SecPipe MCP Server
FuzzForge MCP Server │
│ Tools: init_project, list_modules, │
│ execute_module, execute_workflow│
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
SecPipe Runner
FuzzForge Runner │
│ Podman/Docker Orchestration │
└─────────────────────────────────────────┘
@@ -212,6 +212,6 @@ uv run pytest
## See Also
- [SecPipe Main README](../README.md) - Overall project documentation
- [Module SDK](../secpipe-modules/secpipe-modules-sdk/README.md) - Creating custom modules
- [FuzzForge Main README](../README.md) - Overall project documentation
- [Module SDK](../fuzzforge-modules/fuzzforge-modules-sdk/README.md) - Creating custom modules
- [Model Context Protocol](https://modelcontextprotocol.io/) - MCP specification

View File

@@ -1,21 +1,21 @@
[project]
name = "secpipe-mcp"
name = "fuzzforge-mcp"
version = "0.0.1"
description = "SecPipe MCP Server - AI agent gateway for SecPipe AI."
description = "FuzzForge MCP Server - AI agent gateway for FuzzForge AI."
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fastmcp==2.14.1",
"secpipe-common==0.0.1",
"fuzzforge-common==0.0.1",
"fuzzforge-runner==0.0.1",
"pydantic==2.12.4",
"pydantic-settings==2.12.0",
"pyyaml>=6.0",
"structlog==25.5.0",
]
[project.scripts]
secpipe-mcp = "secpipe_mcp.__main__:main"
fuzzforge-mcp = "fuzzforge_mcp.__main__:main"
[project.optional-dependencies]
lints = [
@@ -24,12 +24,13 @@ lints = [
"ruff==0.14.4",
]
tests = [
"secpipe-tests==0.0.1",
"fuzzforge-tests==0.0.1",
"pytest==9.0.2",
"pytest-asyncio==1.3.0",
"pytest-httpx==0.36.0",
]
[tool.uv.sources]
secpipe-common = { workspace = true }
secpipe-tests = { workspace = true }
fuzzforge-common = { workspace = true }
fuzzforge-runner = { workspace = true }
fuzzforge-tests = { workspace = true }

16
fuzzforge-mcp/ruff.toml Normal file
View File

@@ -0,0 +1,16 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
]
[lint.per-file-ignores]
"tests/*" = [
"PLR0913", # allowing functions with many arguments in tests (required for fixtures)
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -1,10 +1,10 @@
"""SecPipe MCP Server entry point."""
"""FuzzForge MCP Server entry point."""
from secpipe_mcp.application import mcp
from fuzzforge_mcp.application import mcp
def main() -> None:
"""Run the SecPipe MCP server in stdio mode.
"""Run the FuzzForge MCP server in stdio mode.
This is the primary entry point for AI agent integration.
The server communicates via stdin/stdout using the MCP protocol.

View File

@@ -0,0 +1,70 @@
"""FuzzForge MCP Server Application.
This is the main entry point for the FuzzForge MCP server, providing
AI agents with tools to execute security research modules.
"""
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING
from fastmcp import FastMCP
from fastmcp.server.middleware.error_handling import ErrorHandlingMiddleware
from fuzzforge_mcp import resources, tools
from fuzzforge_runner import Settings
if TYPE_CHECKING:
from collections.abc import AsyncGenerator
@asynccontextmanager
async def lifespan(_: FastMCP) -> AsyncGenerator[Settings]:
"""Initialize MCP server lifespan context.
Loads settings from environment variables and makes them
available to all tools and resources.
:param mcp: FastMCP server instance (unused).
:return: Settings instance for dependency injection.
"""
settings: Settings = Settings()
yield settings
mcp: FastMCP = FastMCP(
name="FuzzForge MCP Server",
instructions="""
FuzzForge is a security research orchestration platform. Use these tools to:
1. **List modules**: Discover available security research modules
2. **Execute modules**: Run modules in isolated containers
3. **Execute workflows**: Chain multiple modules together
4. **Manage projects**: Initialize and configure projects
5. **Get results**: Retrieve execution results
6. **Hub tools**: Discover and execute tools from external MCP servers
Typical workflow:
1. Initialize a project with `init_project`
2. Set project assets with `set_project_assets` (optional, only needed once for the source directory)
3. List available modules with `list_modules`
4. Execute a module with `execute_module` — use `assets_path` param to pass different inputs per module
5. Read outputs from `results_path` returned by `execute_module` — check module's `output_artifacts` metadata for filenames
Hub workflow:
1. List available hub servers with `list_hub_servers`
2. Discover tools from servers with `discover_hub_tools`
3. Execute hub tools with `execute_hub_tool`
""",
lifespan=lifespan,
)
mcp.add_middleware(middleware=ErrorHandlingMiddleware())
mcp.mount(resources.mcp)
mcp.mount(tools.mcp)
# HTTP app for testing (primary mode is stdio)
app = mcp.http_app()

View File

@@ -1,15 +1,14 @@
"""Dependency injection helpers for SecPipe MCP."""
"""Dependency injection helpers for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any, cast
from typing import TYPE_CHECKING, cast
from fastmcp.server.dependencies import get_context
from fuzzforge_runner import Runner, Settings
from secpipe_mcp.exceptions import SecPipeMCPError
from secpipe_mcp.settings import Settings
from secpipe_mcp.storage import LocalStorage
from fuzzforge_mcp.exceptions import FuzzForgeMCPError
if TYPE_CHECKING:
from fastmcp import Context
@@ -18,12 +17,6 @@ if TYPE_CHECKING:
# Track the current active project path (set by init_project)
_current_project_path: Path | None = None
# Singleton storage instance
_storage: LocalStorage | None = None
# Currently loaded skill pack (set by load_skill)
_active_skill: dict[str, Any] | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
@@ -41,13 +34,13 @@ def get_settings() -> Settings:
"""Get MCP server settings from context.
:return: Settings instance.
:raises SecPipeMCPError: If settings not available.
:raises FuzzForgeMCPError: If settings not available.
"""
context: Context = get_context()
if context.request_context is None:
message: str = "Request context not available"
raise SecPipeMCPError(message)
raise FuzzForgeMCPError(message)
return cast("Settings", context.request_context.lifespan_context)
@@ -67,33 +60,11 @@ def get_project_path() -> Path:
return Path.cwd()
def get_storage() -> LocalStorage:
"""Get the storage backend instance.
def get_runner() -> Runner:
"""Get a configured Runner instance.
:return: LocalStorage instance.
:return: Runner instance configured from MCP settings.
"""
global _storage
if _storage is None:
settings = get_settings()
_storage = LocalStorage(settings.storage.path)
return _storage
def set_active_skill(skill: dict[str, Any] | None) -> None:
"""Set (or clear) the currently loaded skill pack.
:param skill: Parsed skill dict, or None to unload.
"""
global _active_skill
_active_skill = skill
def get_active_skill() -> dict[str, Any] | None:
"""Get the currently loaded skill pack.
:return: Active skill dict, or None if no skill is loaded.
"""
return _active_skill
settings: Settings = get_settings()
return Runner(settings)

View File

@@ -0,0 +1,5 @@
"""TODO."""
class FuzzForgeMCPError(Exception):
"""TODO."""

View File

@@ -0,0 +1,16 @@
"""FuzzForge MCP Resources."""
from fastmcp import FastMCP
from fuzzforge_mcp.resources import executions, modules, project, workflows
mcp: FastMCP = FastMCP()
mcp.mount(executions.mcp)
mcp.mount(modules.mcp)
mcp.mount(project.mcp)
mcp.mount(workflows.mcp)
__all__ = [
"mcp",
]

View File

@@ -1,19 +1,23 @@
"""Execution resources for SecPipe MCP."""
"""Execution resources for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from secpipe_mcp.dependencies import get_project_path, get_storage
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
mcp: FastMCP = FastMCP()
@mcp.resource("secpipe://executions/")
@mcp.resource("fuzzforge://executions/")
async def list_executions() -> list[dict[str, Any]]:
"""List all executions for the current project.
@@ -22,18 +26,18 @@ async def list_executions() -> list[dict[str, Any]]:
:return: List of execution information dictionaries.
"""
storage = get_storage()
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
execution_ids = storage.list_executions(project_path)
execution_ids = runner.list_executions(project_path)
return [
{
"execution_id": entry["execution_id"],
"has_results": storage.get_execution_results(project_path, entry["execution_id"]) is not None,
"execution_id": exec_id,
"has_results": runner.get_execution_results(project_path, exec_id) is not None,
}
for entry in execution_ids
for exec_id in execution_ids
]
except Exception as exception:
@@ -41,7 +45,7 @@ async def list_executions() -> list[dict[str, Any]]:
raise ResourceError(message) from exception
@mcp.resource("secpipe://executions/{execution_id}")
@mcp.resource("fuzzforge://executions/{execution_id}")
async def get_execution(execution_id: str) -> dict[str, Any]:
"""Get information about a specific execution.
@@ -49,11 +53,11 @@ async def get_execution(execution_id: str) -> dict[str, Any]:
:return: Execution information dictionary.
"""
storage = get_storage()
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
results_path = storage.get_execution_results(project_path, execution_id)
results_path = runner.get_execution_results(project_path, execution_id)
if results_path is None:
raise ResourceError(f"Execution not found: {execution_id}")

View File

@@ -0,0 +1,78 @@
"""Module resources for FuzzForge MCP."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.runner import ModuleInfo
mcp: FastMCP = FastMCP()
@mcp.resource("fuzzforge://modules/")
async def list_modules() -> list[dict[str, Any]]:
"""List all available FuzzForge modules.
Returns information about modules that can be executed,
including their identifiers and availability status.
:return: List of module information dictionaries.
"""
runner: Runner = get_runner()
try:
modules: list[ModuleInfo] = runner.list_modules()
return [
{
"identifier": module.identifier,
"description": module.description,
"version": module.version,
"available": module.available,
}
for module in modules
]
except Exception as exception:
message: str = f"Failed to list modules: {exception}"
raise ResourceError(message) from exception
@mcp.resource("fuzzforge://modules/{module_identifier}")
async def get_module(module_identifier: str) -> dict[str, Any]:
"""Get information about a specific module.
:param module_identifier: The identifier of the module to retrieve.
:return: Module information dictionary.
"""
runner: Runner = get_runner()
try:
module: ModuleInfo | None = runner.get_module_info(module_identifier)
if module is None:
raise ResourceError(f"Module not found: {module_identifier}")
return {
"identifier": module.identifier,
"description": module.description,
"version": module.version,
"available": module.available,
}
except ResourceError:
raise
except Exception as exception:
message: str = f"Failed to get module: {exception}"
raise ResourceError(message) from exception

View File

@@ -1,19 +1,23 @@
"""Project resources for SecPipe MCP."""
"""Project resources for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import Any
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ResourceError
from secpipe_mcp.dependencies import get_project_path, get_settings, get_storage
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
mcp: FastMCP = FastMCP()
@mcp.resource("secpipe://project")
@mcp.resource("fuzzforge://project")
async def get_project() -> dict[str, Any]:
"""Get information about the current project.
@@ -23,12 +27,12 @@ async def get_project() -> dict[str, Any]:
:return: Project information dictionary.
"""
storage = get_storage()
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
executions = storage.list_executions(project_path)
assets_path = storage.get_project_assets_path(project_path)
executions = runner.list_executions(project_path)
assets_path = runner.storage.get_project_assets_path(project_path)
return {
"path": str(project_path),
@@ -36,7 +40,7 @@ async def get_project() -> dict[str, Any]:
"has_assets": assets_path is not None,
"assets_path": str(assets_path) if assets_path else None,
"execution_count": len(executions),
"recent_executions": executions[:10],
"recent_executions": executions[:10], # Last 10 executions
}
except Exception as exception:
@@ -44,16 +48,18 @@ async def get_project() -> dict[str, Any]:
raise ResourceError(message) from exception
@mcp.resource("secpipe://project/settings")
@mcp.resource("fuzzforge://project/settings")
async def get_project_settings() -> dict[str, Any]:
"""Get current SecPipe settings.
"""Get current FuzzForge settings.
Returns the active configuration for the MCP server including
engine, storage, and hub settings.
engine, storage, and project settings.
:return: Settings dictionary.
"""
from fuzzforge_mcp.dependencies import get_settings
try:
settings = get_settings()
@@ -65,10 +71,9 @@ async def get_project_settings() -> dict[str, Any]:
"storage": {
"path": str(settings.storage.path),
},
"hub": {
"enabled": settings.hub.enabled,
"config_path": str(settings.hub.config_path),
"timeout": settings.hub.timeout,
"project": {
"path": str(settings.project.path),
"modules_path": str(settings.modules_path),
},
"debug": settings.debug,
}

View File

@@ -0,0 +1,53 @@
"""Workflow resources for FuzzForge MCP.
Note: In FuzzForge AI, workflows are defined at runtime rather than
stored. This resource provides documentation about workflow capabilities.
"""
from __future__ import annotations
from typing import Any
from fastmcp import FastMCP
mcp: FastMCP = FastMCP()
@mcp.resource("fuzzforge://workflows/help")
async def get_workflow_help() -> dict[str, Any]:
"""Get help information about creating workflows.
Workflows in FuzzForge AI are defined at execution time rather
than stored. Use the execute_workflow tool with step definitions.
:return: Workflow documentation.
"""
return {
"description": "Workflows chain multiple modules together",
"usage": "Use the execute_workflow tool with step definitions",
"example": {
"workflow_name": "security-audit",
"steps": [
{
"module": "compile-contracts",
"configuration": {"solc_version": "0.8.0"},
},
{
"module": "slither",
"configuration": {},
},
{
"module": "echidna",
"configuration": {"test_limit": 10000},
},
],
},
"step_format": {
"module": "Module identifier (required)",
"configuration": "Module-specific configuration (optional)",
"name": "Step name for logging (optional)",
},
}

View File

@@ -0,0 +1,17 @@
"""FuzzForge MCP Tools."""
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, modules, projects, workflows
mcp: FastMCP = FastMCP()
mcp.mount(modules.mcp)
mcp.mount(projects.mcp)
mcp.mount(workflows.mcp)
mcp.mount(hub.mcp)
__all__ = [
"mcp",
]

View File

@@ -0,0 +1,318 @@
"""MCP Hub tools for FuzzForge MCP server.
This module provides tools for interacting with external MCP servers
through the FuzzForge hub. AI agents can:
- List available hub servers and their tools
- Discover tools from hub servers
- Execute hub tools
"""
from __future__ import annotations
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_common.hub import HubExecutor, HubServerConfig, HubServerType
from fuzzforge_mcp.dependencies import get_settings
mcp: FastMCP = FastMCP()
# Global hub executor instance (lazy initialization)
_hub_executor: HubExecutor | None = None
def _get_hub_executor() -> HubExecutor:
"""Get or create the hub executor instance.
:returns: Hub executor instance.
:raises ToolError: If hub is disabled.
"""
global _hub_executor
settings = get_settings()
if not settings.hub.enabled:
msg = "MCP Hub is disabled. Enable it via FUZZFORGE_HUB__ENABLED=true"
raise ToolError(msg)
if _hub_executor is None:
config_path = settings.hub.config_path
_hub_executor = HubExecutor(
config_path=config_path,
timeout=settings.hub.timeout,
)
return _hub_executor
@mcp.tool
async def list_hub_servers() -> dict[str, Any]:
"""List all registered MCP hub servers.
Returns information about configured hub servers, including
their connection type, status, and discovered tool count.
:return: Dictionary with list of hub servers.
"""
try:
executor = _get_hub_executor()
servers = executor.list_servers()
return {
"servers": servers,
"count": len(servers),
"enabled_count": len([s for s in servers if s["enabled"]]),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to list hub servers: {e}"
raise ToolError(msg) from e
@mcp.tool
async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"""Discover tools from hub servers.
Connects to hub servers and retrieves their available tools.
If server_name is provided, only discovers from that server.
Otherwise discovers from all enabled servers.
:param server_name: Optional specific server to discover from.
:return: Dictionary with discovered tools.
"""
try:
executor = _get_hub_executor()
if server_name:
tools = await executor.discover_server_tools(server_name)
return {
"server": server_name,
"tools": [
{
"identifier": t.identifier,
"name": t.name,
"description": t.description,
"parameters": [p.model_dump() for p in t.parameters],
}
for t in tools
],
"count": len(tools),
}
else:
results = await executor.discover_all_tools()
all_tools = []
for server, tools in results.items():
for tool in tools:
all_tools.append({
"identifier": tool.identifier,
"name": tool.name,
"server": server,
"description": tool.description,
"parameters": [p.model_dump() for p in tool.parameters],
})
return {
"servers_discovered": len(results),
"tools": all_tools,
"count": len(all_tools),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to discover hub tools: {e}"
raise ToolError(msg) from e
@mcp.tool
async def list_hub_tools() -> dict[str, Any]:
"""List all discovered hub tools.
Returns tools that have been previously discovered from hub servers.
Run discover_hub_tools first if no tools are listed.
:return: Dictionary with list of discovered tools.
"""
try:
executor = _get_hub_executor()
tools = executor.list_tools()
return {
"tools": tools,
"count": len(tools),
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to list hub tools: {e}"
raise ToolError(msg) from e
@mcp.tool
async def execute_hub_tool(
identifier: str,
arguments: dict[str, Any] | None = None,
timeout: int | None = None,
) -> dict[str, Any]:
"""Execute a tool from a hub server.
:param identifier: Tool identifier (format: hub:server:tool or server:tool).
:param arguments: Tool arguments matching the tool's input schema.
:param timeout: Optional execution timeout in seconds.
:return: Tool execution result.
Example identifiers:
- "hub:nmap:nmap_scan"
- "nmap:nmap_scan"
- "hub:nuclei:nuclei_scan"
"""
try:
executor = _get_hub_executor()
result = await executor.execute_tool(
identifier=identifier,
arguments=arguments or {},
timeout=timeout,
)
return result.to_dict()
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Hub tool execution failed: {e}"
raise ToolError(msg) from e
@mcp.tool
async def get_hub_tool_schema(identifier: str) -> dict[str, Any]:
"""Get the input schema for a hub tool.
Returns the JSON Schema that describes the tool's expected arguments.
:param identifier: Tool identifier (format: hub:server:tool or server:tool).
:return: JSON Schema for the tool's input.
"""
try:
executor = _get_hub_executor()
schema = executor.get_tool_schema(identifier)
if schema is None:
msg = f"Tool '{identifier}' not found. Run discover_hub_tools first."
raise ToolError(msg)
return {
"identifier": identifier,
"schema": schema,
}
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to get tool schema: {e}"
raise ToolError(msg) from e
@mcp.tool
async def add_hub_server(
name: str,
server_type: str,
image: str | None = None,
command: list[str] | None = None,
url: str | None = None,
category: str | None = None,
description: str | None = None,
capabilities: list[str] | None = None,
environment: dict[str, str] | None = None,
volumes: list[str] | None = None,
) -> dict[str, Any]:
"""Add a new MCP server to the hub.
Register a new external MCP server that can be used for tool discovery
and execution. Servers can be Docker images, local commands, or SSE endpoints.
:param name: Unique name for the server (e.g., "nmap", "nuclei").
:param server_type: Connection type ("docker", "command", or "sse").
:param image: Docker image name (for docker type).
:param command: Command and args (for command type).
:param url: SSE endpoint URL (for sse type).
:param category: Category for grouping (e.g., "reconnaissance").
:param description: Human-readable description.
:param capabilities: Docker capabilities to add (e.g., ["NET_RAW"]).
:param environment: Environment variables to pass.
:param volumes: Docker volume mounts (e.g., ["~/.fuzzforge/hub/workspace:/data"]).
:return: Information about the added server.
Examples:
- Docker: add_hub_server("nmap", "docker", image="nmap-mcp:latest", capabilities=["NET_RAW"])
- Command: add_hub_server("custom", "command", command=["python", "server.py"])
"""
try:
executor = _get_hub_executor()
# Parse server type
try:
stype = HubServerType(server_type)
except ValueError:
msg = f"Invalid server type: {server_type}. Use 'docker', 'command', or 'sse'."
raise ToolError(msg) from None
# Validate required fields based on type
if stype == HubServerType.DOCKER and not image:
msg = "Docker image required for docker type"
raise ToolError(msg)
if stype == HubServerType.COMMAND and not command:
msg = "Command required for command type"
raise ToolError(msg)
if stype == HubServerType.SSE and not url:
msg = "URL required for sse type"
raise ToolError(msg)
config = HubServerConfig(
name=name,
type=stype,
image=image,
command=command,
url=url,
category=category,
description=description,
capabilities=capabilities or [],
environment=environment or {},
volumes=volumes or [],
)
server = executor.add_server(config)
return {
"success": True,
"server": {
"name": server.name,
"identifier": server.identifier,
"type": server.config.type.value,
"enabled": server.config.enabled,
},
"message": f"Server '{name}' added. Use discover_hub_tools('{name}') to discover its tools.",
}
except ValueError as e:
msg = f"Failed to add server: {e}"
raise ToolError(msg) from e
except Exception as e:
if isinstance(e, ToolError):
raise
msg = f"Failed to add hub server: {e}"
raise ToolError(msg) from e

View File

@@ -0,0 +1,392 @@
"""Module tools for FuzzForge MCP."""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_runner, get_settings
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.orchestrator import StepResult
mcp: FastMCP = FastMCP()
# Track running background executions
_background_executions: dict[str, dict[str, Any]] = {}
@mcp.tool
async def list_modules() -> dict[str, Any]:
"""List all available FuzzForge modules.
Returns information about modules that can be executed,
including their identifiers, availability status, and metadata
such as use cases, input requirements, and output artifacts.
:return: Dictionary with list of available modules and their details.
"""
try:
runner: Runner = get_runner()
settings = get_settings()
# Use the engine abstraction to list images
# Default filter matches locally-built fuzzforge-* modules
modules = runner.list_module_images(filter_prefix="fuzzforge-")
available_modules = [
{
"identifier": module.identifier,
"image": f"{module.identifier}:{module.version or 'latest'}",
"available": module.available,
"description": module.description,
# New metadata fields from pyproject.toml
"category": module.category,
"language": module.language,
"pipeline_stage": module.pipeline_stage,
"pipeline_order": module.pipeline_order,
"dependencies": module.dependencies,
"continuous_mode": module.continuous_mode,
"typical_duration": module.typical_duration,
# AI-discoverable metadata
"use_cases": module.use_cases,
"input_requirements": module.input_requirements,
"output_artifacts": module.output_artifacts,
}
for module in modules
]
# Sort by pipeline_order if available
available_modules.sort(key=lambda m: (m.get("pipeline_order") or 999, m["identifier"]))
return {
"modules": available_modules,
"count": len(available_modules),
"container_engine": settings.engine.type,
"registry_url": settings.registry.url,
"registry_tag": settings.registry.default_tag,
}
except Exception as exception:
message: str = f"Failed to list modules: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def execute_module(
module_identifier: str,
configuration: dict[str, Any] | None = None,
assets_path: str | None = None,
) -> dict[str, Any]:
"""Execute a FuzzForge module in an isolated container.
This tool runs a module in a sandboxed environment.
The module receives input assets and produces output results.
The response includes `results_path` pointing to the stored results archive.
Use this path directly to read outputs — no need to call `get_execution_results`.
:param module_identifier: The identifier of the module to execute.
:param configuration: Optional configuration dict to pass to the module.
:param assets_path: Optional path to input assets. Use this to pass specific
inputs to a module (e.g. crash files to crash-analyzer) without changing
the project's default assets. If not provided, uses project assets.
:return: Execution result including status and results path.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
result: StepResult = await runner.execute_module(
module_identifier=module_identifier,
project_path=project_path,
configuration=configuration,
assets_path=Path(assets_path) if assets_path else None,
)
return {
"success": result.success,
"execution_id": result.execution_id,
"module": result.module_identifier,
"results_path": str(result.results_path) if result.results_path else None,
"started_at": result.started_at.isoformat(),
"completed_at": result.completed_at.isoformat(),
"error": result.error,
}
except Exception as exception:
message: str = f"Module execution failed: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def start_continuous_module(
module_identifier: str,
configuration: dict[str, Any] | None = None,
assets_path: str | None = None,
) -> dict[str, Any]:
"""Start a module in continuous/background mode.
The module will run indefinitely until stopped with stop_continuous_module().
Use get_continuous_status() to check progress and metrics.
This is useful for long-running modules that should run until
the user decides to stop them.
:param module_identifier: The module to run.
:param configuration: Optional configuration. Set max_duration to 0 for infinite.
:param assets_path: Optional path to input assets.
:return: Execution info including session_id for monitoring.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
session_id = str(uuid.uuid4())[:8]
# Set infinite duration if not specified
if configuration is None:
configuration = {}
if "max_duration" not in configuration:
configuration["max_duration"] = 0 # 0 = infinite
try:
# Determine assets path
if assets_path:
actual_assets_path = Path(assets_path)
else:
storage = runner.storage
actual_assets_path = storage.get_project_assets_path(project_path)
# Use the new non-blocking executor method
executor = runner._executor
result = executor.start_module_continuous(
module_identifier=module_identifier,
assets_path=actual_assets_path,
configuration=configuration,
project_path=project_path,
execution_id=session_id,
)
# Store execution info for tracking
_background_executions[session_id] = {
"session_id": session_id,
"module": module_identifier,
"configuration": configuration,
"started_at": datetime.now(timezone.utc).isoformat(),
"status": "running",
"container_id": result["container_id"],
"input_dir": result["input_dir"],
"project_path": str(project_path),
# Incremental stream.jsonl tracking
"stream_lines_read": 0,
"total_crashes": 0,
}
return {
"success": True,
"session_id": session_id,
"module": module_identifier,
"container_id": result["container_id"],
"status": "running",
"message": f"Continuous module started. Use get_continuous_status('{session_id}') to monitor progress.",
}
except Exception as exception:
message: str = f"Failed to start continuous module: {exception}"
raise ToolError(message) from exception
def _get_continuous_status_impl(session_id: str) -> dict[str, Any]:
"""Internal helper to get continuous session status (non-tool version).
Uses incremental reads of ``stream.jsonl`` via ``tail -n +offset`` so that
only new lines appended since the last poll are fetched and parsed. Crash
counts and latest metrics are accumulated across polls.
"""
if session_id not in _background_executions:
raise ToolError(f"Unknown session: {session_id}. Use list_continuous_sessions() to see active sessions.")
execution = _background_executions[session_id]
container_id = execution.get("container_id")
# Carry forward accumulated state
metrics: dict[str, Any] = {
"total_executions": 0,
"total_crashes": execution.get("total_crashes", 0),
"exec_per_sec": 0,
"coverage": 0,
"current_target": "",
"new_events": [],
}
if container_id:
try:
runner: Runner = get_runner()
executor = runner._executor
# Check container status first
container_status = executor.get_module_status(container_id)
if container_status != "running":
execution["status"] = "stopped" if container_status == "exited" else container_status
# Incremental read: only fetch lines we haven't seen yet
lines_read: int = execution.get("stream_lines_read", 0)
stream_content = executor.read_module_output_incremental(
container_id,
start_line=lines_read + 1,
output_file="/data/output/stream.jsonl",
)
if stream_content:
new_lines = stream_content.strip().split("\n")
new_line_count = 0
for line in new_lines:
if not line.strip():
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
# Possible torn read on the very last line — skip it
# and do NOT advance the offset so it is re-read next
# poll when the write is complete.
continue
new_line_count += 1
metrics["new_events"].append(event)
# Extract latest metrics snapshot
if event.get("event") == "metrics":
metrics["total_executions"] = event.get("executions", 0)
metrics["current_target"] = event.get("target", "")
metrics["exec_per_sec"] = event.get("exec_per_sec", 0)
metrics["coverage"] = event.get("coverage", 0)
if event.get("event") == "crash_detected":
metrics["total_crashes"] += 1
# Advance offset by successfully parsed lines only
execution["stream_lines_read"] = lines_read + new_line_count
execution["total_crashes"] = metrics["total_crashes"]
except Exception as e:
metrics["error"] = str(e)
# Calculate elapsed time
started_at = execution.get("started_at", "")
elapsed_seconds = 0
if started_at:
try:
start_time = datetime.fromisoformat(started_at)
elapsed_seconds = int((datetime.now(timezone.utc) - start_time).total_seconds())
except Exception:
pass
return {
"session_id": session_id,
"module": execution.get("module"),
"status": execution.get("status"),
"container_id": container_id,
"started_at": started_at,
"elapsed_seconds": elapsed_seconds,
"elapsed_human": f"{elapsed_seconds // 60}m {elapsed_seconds % 60}s",
"metrics": metrics,
}
@mcp.tool
async def get_continuous_status(session_id: str) -> dict[str, Any]:
"""Get the current status and metrics of a running continuous session.
Call this periodically (e.g., every 30 seconds) to get live updates
on progress and metrics.
:param session_id: The session ID returned by start_continuous_module().
:return: Current status, metrics, and any events found.
"""
return _get_continuous_status_impl(session_id)
@mcp.tool
async def stop_continuous_module(session_id: str) -> dict[str, Any]:
"""Stop a running continuous session.
This will gracefully stop the module and collect any results.
:param session_id: The session ID of the session to stop.
:return: Final status and summary of the session.
"""
if session_id not in _background_executions:
raise ToolError(f"Unknown session: {session_id}")
execution = _background_executions[session_id]
container_id = execution.get("container_id")
input_dir = execution.get("input_dir")
try:
# Get final metrics before stopping (use helper, not the tool)
final_metrics = _get_continuous_status_impl(session_id)
# Stop the container and collect results
results_path = None
if container_id:
runner: Runner = get_runner()
executor = runner._executor
try:
results_path = executor.stop_module_continuous(container_id, input_dir)
except Exception:
# Container may have already stopped
pass
execution["status"] = "stopped"
execution["stopped_at"] = datetime.now(timezone.utc).isoformat()
return {
"success": True,
"session_id": session_id,
"message": "Continuous session stopped",
"results_path": str(results_path) if results_path else None,
"final_metrics": final_metrics.get("metrics", {}),
"elapsed": final_metrics.get("elapsed_human", ""),
}
except Exception as exception:
message: str = f"Failed to stop continuous module: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_continuous_sessions() -> dict[str, Any]:
"""List all active and recent continuous sessions.
:return: List of continuous sessions with their status.
"""
sessions = []
for session_id, execution in _background_executions.items():
sessions.append({
"session_id": session_id,
"module": execution.get("module"),
"status": execution.get("status"),
"started_at": execution.get("started_at"),
})
return {
"sessions": sessions,
"count": len(sessions),
}

View File

@@ -0,0 +1,158 @@
"""Project management tools for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_runner, set_current_project_path
if TYPE_CHECKING:
from fuzzforge_runner import Runner
mcp: FastMCP = FastMCP()
@mcp.tool
async def init_project(project_path: str | None = None) -> dict[str, Any]:
"""Initialize a new FuzzForge project.
Creates a `.fuzzforge/` directory inside the project for storing:
- assets/: Input files (source code, etc.)
- inputs/: Prepared module inputs (for debugging)
- runs/: Execution results from each module
This should be called before executing modules or workflows.
:param project_path: Path to the project directory. If not provided, uses current directory.
:return: Project initialization result.
"""
runner: Runner = get_runner()
try:
path = Path(project_path) if project_path else get_project_path()
# Track this as the current active project
set_current_project_path(path)
storage_path = runner.init_project(path)
return {
"success": True,
"project_path": str(path),
"storage_path": str(storage_path),
"message": f"Project initialized. Storage at {path}/.fuzzforge/",
}
except Exception as exception:
message: str = f"Failed to initialize project: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def set_project_assets(assets_path: str) -> dict[str, Any]:
"""Set the initial assets (source code) for a project.
This sets the DEFAULT source directory mounted into modules.
Usually this is the project root containing source code (e.g. Cargo.toml, src/).
IMPORTANT: This OVERWRITES the previous assets path. Only call this once
during project setup. To pass different inputs to a specific module
(e.g. crash files to crash-analyzer), use the `assets_path` parameter
on `execute_module` instead.
:param assets_path: Path to the project source directory or archive.
:return: Result including stored assets path.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
stored_path = runner.set_project_assets(
project_path=project_path,
assets_path=Path(assets_path),
)
return {
"success": True,
"project_path": str(project_path),
"assets_path": str(stored_path),
"message": f"Assets stored from {assets_path}",
}
except Exception as exception:
message: str = f"Failed to set project assets: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_executions() -> dict[str, Any]:
"""List all executions for the current project.
Returns a list of execution IDs that can be used to retrieve results.
:return: List of execution IDs.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
executions = runner.list_executions(project_path)
return {
"success": True,
"project_path": str(project_path),
"executions": executions,
"count": len(executions),
}
except Exception as exception:
message: str = f"Failed to list executions: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_execution_results(execution_id: str, extract_to: str | None = None) -> dict[str, Any]:
"""Get results for a specific execution.
:param execution_id: The execution ID to retrieve results for.
:param extract_to: Optional directory to extract results to.
:return: Result including path to results archive.
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
results_path = runner.get_execution_results(project_path, execution_id)
if results_path is None:
return {
"success": False,
"execution_id": execution_id,
"error": "Execution results not found",
}
result = {
"success": True,
"execution_id": execution_id,
"results_path": str(results_path),
}
# Extract if requested
if extract_to:
extracted_path = runner.extract_results(results_path, Path(extract_to))
result["extracted_path"] = str(extracted_path)
return result
except Exception as exception:
message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception

View File

@@ -0,0 +1,92 @@
"""Workflow tools for FuzzForge MCP."""
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_runner.orchestrator import WorkflowDefinition, WorkflowStep
from fuzzforge_mcp.dependencies import get_project_path, get_runner
if TYPE_CHECKING:
from fuzzforge_runner import Runner
from fuzzforge_runner.orchestrator import WorkflowResult
mcp: FastMCP = FastMCP()
@mcp.tool
async def execute_workflow(
workflow_name: str,
steps: list[dict[str, Any]],
initial_assets_path: str | None = None,
) -> dict[str, Any]:
"""Execute a workflow consisting of multiple module steps.
A workflow chains multiple modules together, passing the output of each
module as input to the next. This enables complex pipelines.
:param workflow_name: Name for this workflow execution.
:param steps: List of step definitions, each with "module" and optional "configuration".
:param initial_assets_path: Optional path to initial assets for the first step.
:return: Workflow execution result including status of each step.
Example steps format:
[
{"module": "module-a", "configuration": {"key": "value"}},
{"module": "module-b", "configuration": {}},
{"module": "module-c"}
]
"""
runner: Runner = get_runner()
project_path: Path = get_project_path()
try:
# Convert step dicts to WorkflowStep objects
workflow_steps = [
WorkflowStep(
module_identifier=step["module"],
configuration=step.get("configuration"),
name=step.get("name", f"step-{i}"),
)
for i, step in enumerate(steps)
]
workflow = WorkflowDefinition(
name=workflow_name,
steps=workflow_steps,
)
result: WorkflowResult = await runner.execute_workflow(
workflow=workflow,
project_path=project_path,
initial_assets_path=Path(initial_assets_path) if initial_assets_path else None,
)
return {
"success": result.success,
"execution_id": result.execution_id,
"workflow_name": result.name,
"final_results_path": str(result.final_results_path) if result.final_results_path else None,
"steps": [
{
"step_index": step.step_index,
"module": step.module_identifier,
"success": step.success,
"execution_id": step.execution_id,
"results_path": str(step.results_path) if step.results_path else None,
"error": step.error,
}
for step in result.steps
],
}
except Exception as exception:
message: str = f"Workflow execution failed: {exception}"
raise ToolError(message) from exception

View File

@@ -5,26 +5,26 @@ from typing import TYPE_CHECKING
import pytest
from fastmcp import Client
from secpipe_mcp.application import mcp
from fuzzforge_mcp.application import mcp
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Callable
from fastmcp.client import FastMCPTransport
from secpipe_tests.fixtures import SecPipeProjectIdentifier
from fuzzforge_tests.fixtures import FuzzForgeProjectIdentifier
pytest_plugins = ["secpipe_tests.fixtures"]
pytest_plugins = ["fuzzforge_tests.fixtures"]
@pytest.fixture(autouse=True)
def environment(
monkeypatch: pytest.MonkeyPatch,
random_project_identifier: Callable[[], SecPipeProjectIdentifier],
random_project_identifier: Callable[[], FuzzForgeProjectIdentifier],
) -> None:
"""TODO."""
monkeypatch.setenv("SECPIPE_PROJECT_IDENTIFIER", str(random_project_identifier()))
monkeypatch.setenv("SECPIPE_API_HOST", "127.0.0.1")
monkeypatch.setenv("SECPIPE_API_PORT", "8000")
monkeypatch.setenv("FUZZFORGE_PROJECT_IDENTIFIER", str(random_project_identifier()))
monkeypatch.setenv("FUZZFORGE_API_HOST", "127.0.0.1")
monkeypatch.setenv("FUZZFORGE_API_PORT", "8000")
@pytest.fixture

View File

@@ -0,0 +1,61 @@
"""MCP tool tests for FuzzForge AI.
Tests the MCP tools that are available in FuzzForge AI.
"""
import pytest
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from fastmcp import Client
from fastmcp.client import FastMCPTransport
async def test_list_modules_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the list_modules tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "list_modules" in tool_names
async def test_init_project_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the init_project tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "init_project" in tool_names
async def test_execute_module_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the execute_module tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "execute_module" in tool_names
async def test_execute_workflow_tool_exists(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that the execute_workflow tool is available."""
tools = await mcp_client.list_tools()
tool_names = [tool.name for tool in tools]
assert "execute_workflow" in tool_names
async def test_mcp_has_expected_tool_count(
mcp_client: "Client[FastMCPTransport]",
) -> None:
"""Test that MCP has the expected number of tools."""
tools = await mcp_client.list_tools()
# Should have at least 4 core tools
assert len(tools) >= 4

View File

@@ -0,0 +1,26 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install system dependencies for Rust compilation
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Rust toolchain with nightly (required for cargo-fuzz)
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,58 @@
[project]
name = "fuzzforge-cargo-fuzzer"
version = "0.1.0"
description = "Runs continuous coverage-guided fuzzing on Rust targets using cargo-fuzz"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-cargo-fuzzer"
suggested_predecessors = ["fuzzforge-harness-tester"]
continuous_mode = true
use_cases = [
"Run continuous coverage-guided fuzzing on Rust targets with libFuzzer",
"Execute cargo-fuzz on validated harnesses",
"Produce crash artifacts for analysis",
"Long-running fuzzing campaign"
]
common_inputs = [
"validated-harnesses",
"Cargo.toml",
"rust-source-code"
]
output_artifacts = [
"fuzzing_results.json",
"crashes/",
"results.json"
]
output_treatment = "Read fuzzing_results.json which contains: targets_fuzzed, total_crashes, total_executions, crashes_path, and results array with per-target crash info. Display summary of crashes found. The crashes/ directory contains crash inputs for downstream crash-analyzer."

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,538 @@
"""Cargo Fuzzer module for FuzzForge.
This module runs cargo-fuzz (libFuzzer) on validated Rust fuzz targets.
It takes a fuzz project with compiled harnesses and runs fuzzing for a
configurable duration, collecting crashes and statistics.
"""
from __future__ import annotations
import json
import os
import re
import shutil
import subprocess
import signal
import time
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults, FuzzForgeModuleStatus
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Cargo Fuzzer module - runs cargo-fuzz with libFuzzer on Rust targets."""
_settings: Settings | None
_fuzz_project_path: Path | None
_target_results: list[TargetResult]
_crashes_path: Path | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "cargo-fuzzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._fuzz_project_path = None
self._target_results = []
self._crashes_path = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module with settings.
:param settings: Module settings.
"""
self._settings = settings
logger.info("cargo-fuzzer preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the fuzzer.
:param resources: Input resources (fuzz project + source).
:returns: Module execution result.
"""
logger.info("cargo-fuzzer starting", resource_count=len(resources))
# Emit initial progress
self.emit_progress(0, status=FuzzForgeModuleStatus.INITIALIZING, message="Setting up fuzzing environment")
self.emit_event("module_started", resource_count=len(resources))
# Setup the fuzzing environment
if not self._setup_environment(resources):
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Failed to setup environment")
return FuzzForgeModuleResults.FAILURE
# Get list of fuzz targets
targets = self._get_fuzz_targets()
if not targets:
logger.error("no fuzz targets found")
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="No fuzz targets found")
return FuzzForgeModuleResults.FAILURE
# Filter targets if specific ones were requested
if self._settings and self._settings.targets:
requested = set(self._settings.targets)
targets = [t for t in targets if t in requested]
if not targets:
logger.error("none of the requested targets found", requested=list(requested))
self.emit_progress(100, status=FuzzForgeModuleStatus.FAILED, message="Requested targets not found")
return FuzzForgeModuleResults.FAILURE
logger.info("found fuzz targets", targets=targets)
self.emit_event("targets_found", targets=targets, count=len(targets))
# Setup output directories
self._crashes_path = PATH_TO_OUTPUTS / "crashes"
self._crashes_path.mkdir(parents=True, exist_ok=True)
# Run fuzzing on each target
# max_duration=0 means infinite/continuous mode
max_duration = self._settings.max_duration if self._settings else 60
is_continuous = max_duration == 0
if is_continuous:
# Continuous mode: cycle through targets indefinitely
# Each target runs for 60 seconds before moving to next
duration_per_target = 60
else:
duration_per_target = max_duration // max(len(targets), 1)
total_crashes = 0
# In continuous mode, loop forever; otherwise loop once
round_num = 0
while True:
round_num += 1
for i, target in enumerate(targets):
if is_continuous:
progress_msg = f"Round {round_num}: Fuzzing {target}"
else:
progress_msg = f"Fuzzing target {i+1}/{len(targets)}"
progress = int((i / len(targets)) * 100) if not is_continuous else 50
self.emit_progress(
progress,
status=FuzzForgeModuleStatus.RUNNING,
message=progress_msg,
current_task=target,
metrics={
"targets_completed": i,
"total_targets": len(targets),
"crashes_found": total_crashes,
"round": round_num if is_continuous else 1,
}
)
self.emit_event("target_started", target=target, index=i, total=len(targets), round=round_num)
result = self._fuzz_target(target, duration_per_target)
self._target_results.append(result)
total_crashes += len(result.crashes)
# Emit target completion
self.emit_event(
"target_completed",
target=target,
crashes=len(result.crashes),
executions=result.stats.total_executions if result.stats else 0,
coverage=result.stats.coverage_edges if result.stats else 0,
)
logger.info("target completed",
target=target,
crashes=len(result.crashes),
execs=result.stats.total_executions if result.stats else 0)
# Exit loop if not continuous mode
if not is_continuous:
break
# Write output
self._write_output()
# Emit final progress
self.emit_progress(
100,
status=FuzzForgeModuleStatus.COMPLETED,
message=f"Fuzzing completed. Found {total_crashes} crashes.",
metrics={
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": sum(r.stats.total_executions for r in self._target_results if r.stats),
}
)
self.emit_event("module_completed", total_crashes=total_crashes, targets_fuzzed=len(targets))
logger.info("cargo-fuzzer completed",
targets=len(self._target_results),
total_crashes=total_crashes)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _setup_environment(self, resources: list[FuzzForgeModuleResource]) -> bool:
"""Setup the fuzzing environment.
:param resources: Input resources.
:returns: True if setup successful.
"""
import shutil
# Find fuzz project in resources
source_fuzz_project = None
source_project_root = None
for resource in resources:
path = Path(resource.path)
if path.is_dir():
# Check for fuzz subdirectory
fuzz_dir = path / "fuzz"
if fuzz_dir.is_dir() and (fuzz_dir / "Cargo.toml").exists():
source_fuzz_project = fuzz_dir
source_project_root = path
break
# Or direct fuzz project
if (path / "Cargo.toml").exists() and (path / "fuzz_targets").is_dir():
source_fuzz_project = path
source_project_root = path.parent
break
if source_fuzz_project is None:
logger.error("no fuzz project found in resources")
return False
# Copy project to writable location since /data/input is read-only
# and cargo-fuzz needs to write corpus, artifacts, and build cache
work_dir = Path("/tmp/fuzz-work")
if work_dir.exists():
shutil.rmtree(work_dir)
# Copy the entire project root
work_project = work_dir / source_project_root.name
shutil.copytree(source_project_root, work_project, dirs_exist_ok=True)
# Update fuzz_project_path to point to the copied location
relative_fuzz = source_fuzz_project.relative_to(source_project_root)
self._fuzz_project_path = work_project / relative_fuzz
logger.info("using fuzz project", path=str(self._fuzz_project_path))
return True
def _get_fuzz_targets(self) -> list[str]:
"""Get list of fuzz target names.
:returns: List of target names.
"""
if self._fuzz_project_path is None:
return []
targets = []
fuzz_targets_dir = self._fuzz_project_path / "fuzz_targets"
if fuzz_targets_dir.is_dir():
for rs_file in fuzz_targets_dir.glob("*.rs"):
targets.append(rs_file.stem)
return targets
def _fuzz_target(self, target: str, duration: int) -> TargetResult:
"""Run fuzzing on a single target.
:param target: Name of the fuzz target.
:param duration: Maximum duration in seconds.
:returns: Fuzzing result for this target.
"""
logger.info("fuzzing target", target=target, duration=duration)
crashes: list[CrashInfo] = []
stats = FuzzingStats()
if self._fuzz_project_path is None:
return TargetResult(target=target, crashes=crashes, stats=stats)
# Create corpus directory for this target
corpus_dir = self._fuzz_project_path / "corpus" / target
corpus_dir.mkdir(parents=True, exist_ok=True)
# Build the command
cmd = [
"cargo", "+nightly", "fuzz", "run",
target,
"--",
]
# Add time limit
if duration > 0:
cmd.append(f"-max_total_time={duration}")
# Use fork mode to continue after crashes
# This makes libFuzzer restart worker after crash instead of exiting
cmd.append("-fork=1")
cmd.append("-ignore_crashes=1")
cmd.append("-print_final_stats=1")
# Add jobs if specified
if self._settings and self._settings.jobs > 1:
cmd.extend([f"-jobs={self._settings.jobs}"])
try:
env = os.environ.copy()
env["CARGO_INCREMENTAL"] = "0"
process = subprocess.Popen(
cmd,
cwd=self._fuzz_project_path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
output_lines = []
start_time = time.time()
last_metrics_emit = 0.0
current_execs = 0
current_cov = 0
current_exec_s = 0
crash_count = 0
# Read output with timeout (skip timeout check in infinite mode)
while True:
if process.poll() is not None:
break
elapsed = time.time() - start_time
# Only enforce timeout if duration > 0 (not infinite mode)
if duration > 0 and elapsed > duration + 30: # Grace period
logger.warning("fuzzer timeout, terminating", target=target)
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
break
try:
if process.stdout:
line = process.stdout.readline()
if line:
output_lines.append(line)
# Parse real-time metrics from libFuzzer output
# Example: "#12345 NEW cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", line)
if exec_match:
current_execs = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", line)
if cov_match:
current_cov = int(cov_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", line)
if exec_s_match:
current_exec_s = int(exec_s_match.group(1))
# Check for crash indicators
if "SUMMARY:" in line or "ERROR:" in line or "crash-" in line.lower():
crash_count += 1
self.emit_event(
"crash_detected",
target=target,
crash_number=crash_count,
line=line.strip(),
)
logger.debug("fuzzer output", line=line.strip())
# Emit metrics periodically (every 2 seconds)
if elapsed - last_metrics_emit >= 2.0:
last_metrics_emit = elapsed
self.emit_event(
"metrics",
target=target,
executions=current_execs,
coverage=current_cov,
exec_per_sec=current_exec_s,
crashes=crash_count,
elapsed_seconds=int(elapsed),
remaining_seconds=max(0, duration - int(elapsed)),
)
except Exception:
pass
# Parse statistics from output
stats = self._parse_fuzzer_stats(output_lines)
# Collect crashes
crashes = self._collect_crashes(target)
# Emit final event for this target if crashes were found
if crashes:
self.emit_event(
"crashes_collected",
target=target,
count=len(crashes),
paths=[c.file_path for c in crashes],
)
except FileNotFoundError:
logger.error("cargo-fuzz not found, please install with: cargo install cargo-fuzz")
stats.error = "cargo-fuzz not installed"
self.emit_event("error", target=target, message="cargo-fuzz not installed")
except Exception as e:
logger.exception("fuzzing error", target=target, error=str(e))
stats.error = str(e)
self.emit_event("error", target=target, message=str(e))
return TargetResult(target=target, crashes=crashes, stats=stats)
def _parse_fuzzer_stats(self, output_lines: list[str]) -> FuzzingStats:
"""Parse fuzzer output for statistics.
:param output_lines: Lines of fuzzer output.
:returns: Parsed statistics.
"""
stats = FuzzingStats()
full_output = "".join(output_lines)
# Parse libFuzzer stats
# Example: "#12345 DONE cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", full_output)
if exec_match:
stats.total_executions = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", full_output)
if cov_match:
stats.coverage_edges = int(cov_match.group(1))
corp_match = re.search(r"corp:\s*(\d+)", full_output)
if corp_match:
stats.corpus_size = int(corp_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", full_output)
if exec_s_match:
stats.executions_per_second = int(exec_s_match.group(1))
return stats
def _collect_crashes(self, target: str) -> list[CrashInfo]:
"""Collect crash files from fuzzer output.
:param target: Name of the fuzz target.
:returns: List of crash info.
"""
crashes: list[CrashInfo] = []
seen_hashes: set[str] = set()
if self._fuzz_project_path is None or self._crashes_path is None:
return crashes
# Check multiple possible crash locations:
# 1. Standard artifacts directory (target-specific)
# 2. Generic artifacts directory
# 3. Fuzz project root (fork mode sometimes writes here)
# 4. Project root (parent of fuzz directory)
search_paths = [
self._fuzz_project_path / "artifacts" / target,
self._fuzz_project_path / "artifacts",
self._fuzz_project_path,
self._fuzz_project_path.parent,
]
for search_dir in search_paths:
if not search_dir.is_dir():
continue
# Use rglob to recursively find crash files
for crash_file in search_dir.rglob("crash-*"):
if not crash_file.is_file():
continue
# Skip duplicates by hash
if crash_file.name in seen_hashes:
continue
seen_hashes.add(crash_file.name)
# Copy crash to output
output_crash = self._crashes_path / target
output_crash.mkdir(parents=True, exist_ok=True)
dest = output_crash / crash_file.name
shutil.copy2(crash_file, dest)
# Read crash input
crash_data = crash_file.read_bytes()
crash_info = CrashInfo(
file_path=str(dest),
input_hash=crash_file.name,
input_size=len(crash_data),
)
crashes.append(crash_info)
logger.info("found crash", target=target, file=crash_file.name, source=str(search_dir))
logger.info("crash collection complete", target=target, total_crashes=len(crashes))
return crashes
def _write_output(self) -> None:
"""Write the fuzzing results to output."""
output_path = PATH_TO_OUTPUTS / "fuzzing_results.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
total_crashes = sum(len(r.crashes) for r in self._target_results)
total_execs = sum(r.stats.total_executions for r in self._target_results if r.stats)
output_data = {
"fuzz_project": str(self._fuzz_project_path),
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": total_execs,
"crashes_path": str(self._crashes_path),
"results": [
{
"target": r.target,
"crashes": [c.model_dump() for c in r.crashes],
"stats": r.stats.model_dump() if r.stats else None,
}
for r in self._target_results
],
}
output_path.write_text(json.dumps(output_data, indent=2))
logger.info("wrote fuzzing results", path=str(output_path))

View File

@@ -0,0 +1,88 @@
"""Models for the cargo-fuzzer module."""
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class FuzzingStats(BaseModel):
"""Statistics from a fuzzing run."""
#: Total number of test case executions
total_executions: int = 0
#: Executions per second
executions_per_second: int = 0
#: Number of coverage edges discovered
coverage_edges: int = 0
#: Size of the corpus
corpus_size: int = 0
#: Any error message
error: str = ""
class CrashInfo(BaseModel):
"""Information about a discovered crash."""
#: Path to the crash input file
file_path: str
#: Hash/name of the crash input
input_hash: str
#: Size of the crash input in bytes
input_size: int = 0
#: Crash type (if identified)
crash_type: str = ""
#: Stack trace (if available)
stack_trace: str = ""
class TargetResult(BaseModel):
"""Result of fuzzing a single target."""
#: Name of the fuzz target
target: str
#: List of crashes found
crashes: list[CrashInfo] = Field(default_factory=list)
#: Fuzzing statistics
stats: FuzzingStats = Field(default_factory=FuzzingStats)
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the cargo-fuzzer module.
Expects:
- A fuzz project directory with validated harnesses
- Optionally the source crate to link against
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the cargo-fuzzer module."""
#: Path to the fuzz project
fuzz_project: str = ""
#: Number of targets fuzzed
targets_fuzzed: int = 0
#: Total crashes found across all targets
total_crashes: int = 0
#: Total executions across all targets
total_executions: int = 0
#: Path to collected crash files
crashes_path: str = ""
#: Results per target
results: list[TargetResult] = Field(default_factory=list)

Some files were not shown because too many files have changed in this diff Show More