Compare commits

...

27 Commits

Author SHA1 Message Date
AFredefon
0d410bd5b4 feat: implement report generation 2026-04-07 16:25:36 +02:00
AFredefon
d3a20b3846 feat: implement skill packs system 2026-04-07 16:12:14 +02:00
AFredefon
664278da3f feat: implement artifact management tools 2026-04-07 16:06:47 +02:00
AFredefon
9374fd3aee feat: implement workflow suggestions pipeline 2026-04-07 01:50:21 +02:00
AFredefon
2e96517d11 chore: reset hub-config.json to empty default and gitignore it 2026-03-17 08:08:31 +01:00
AFredefon
575b90f8d4 docs: rewrite README for hub-centric architecture, remove demo GIFs 2026-03-17 07:58:26 +01:00
AFredefon
c59b6ba81a fix(mcp): fix mypy type error in executions resource and improve tool docstrings 2026-03-17 04:21:06 +01:00
AFredefon
a51c495d34 feat(mcp): update application instructions and hub config 2026-03-16 02:10:16 +01:00
AFredefon
7924e44245 feat(hub): volume mounts, get_agent_context convention, category filter 2026-03-16 02:09:04 +01:00
AFredefon
a824809294 feat(mcp): add project assets storage and output directory management 2026-03-16 02:08:20 +01:00
AFredefon
bc5e9373ce fix: document mount paths in execute_hub_tool and inject volumes into persistent sessions 2026-03-11 07:55:58 +01:00
AFredefon
73a0170d65 fix: resolve mypy type errors in TUI app and build_log screen 2026-03-11 07:09:58 +01:00
AFredefon
6cdd0caec0 fix: suppress BLE001 for intentional broad catch in execute_hub_tool 2026-03-11 07:00:25 +01:00
AFredefon
462f6ed408 fix: resolve ruff lint errors in TUI modules 2026-03-11 06:57:38 +01:00
AFredefon
9cfbc29677 fix: add noqa for optional git URL fetch exception 2026-03-11 06:49:37 +01:00
AFredefon
6ced81affc fix: inject project assets as Docker volume mounts in execute_hub_tool 2026-03-11 06:46:51 +01:00
AFredefon
b975d285c6 tui: fix single-click buttons and double-modal push 2026-03-11 05:30:29 +01:00
AFredefon
1891a43189 tui: background image builds with live log viewer 2026-03-11 04:42:25 +01:00
AFredefon
a3441676a3 tui: in-UI image building, hub registry auto-recovery, clean hub-config 2026-03-11 03:02:36 +01:00
AFredefon
f192771b9b fix: improve new user experience and docs 2026-03-11 02:20:18 +01:00
AFredefon
976947cf5c feat: add FUZZFORGE_USER_DIR env var to override user-global data dir 2026-03-11 02:09:06 +01:00
AFredefon
544569ddbd fix: use ~/.fuzzforge for user-global data, keep workspace .fuzzforge for project storage 2026-03-11 02:04:51 +01:00
AFredefon
6f967fff63 fix: find_fuzzforge_root searches cwd first instead of __file__ 2026-03-11 01:41:47 +01:00
AFredefon
47c254e2bd fix: add workspace packages as root deps so uv sync installs everything 2026-03-11 01:32:17 +01:00
AFredefon
b137f48e7f fix(ci): use uv sync 2026-03-11 01:17:43 +01:00
AFredefon
f8002254e5 ci: add GitHub Actions workflows with lint, typecheck and tests 2026-03-11 01:13:35 +01:00
AFredefon
f2dca0a7e7 Merge pull request #45 from FuzzingLabs/feature/tui-agent-setup
feat(tui): add terminal UI with hub and agent management
2026-03-10 04:11:50 +01:00
38 changed files with 2554 additions and 402 deletions

86
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,86 @@
name: CI
on:
push:
branches: [main, dev, feature/*]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
lint-and-typecheck:
name: Lint & Type Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync
- name: Ruff check (fuzzforge-cli)
run: |
cd fuzzforge-cli
uv run --extra lints ruff check src/
- name: Ruff check (fuzzforge-mcp)
run: |
cd fuzzforge-mcp
uv run --extra lints ruff check src/
- name: Ruff check (fuzzforge-common)
run: |
cd fuzzforge-common
uv run --extra lints ruff check src/
- name: Mypy type check (fuzzforge-cli)
run: |
cd fuzzforge-cli
uv run --extra lints mypy src/
- name: Mypy type check (fuzzforge-mcp)
run: |
cd fuzzforge-mcp
uv run --extra lints mypy src/
# NOTE: Mypy check for fuzzforge-common temporarily disabled
# due to 37 pre-existing type errors in legacy code.
# TODO: Fix type errors and re-enable strict checking
#- name: Mypy type check (fuzzforge-common)
# run: |
# cd fuzzforge-common
# uv run --extra lints mypy src/
test:
name: Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Run MCP tests
run: |
cd fuzzforge-mcp
uv run --extra tests pytest -v
- name: Run common tests
run: |
cd fuzzforge-common
uv run --extra tests pytest -v

49
.github/workflows/mcp-server.yml vendored Normal file
View File

@@ -0,0 +1,49 @@
name: MCP Server Smoke Test
on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
mcp-server:
name: MCP Server Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Start MCP server in background
run: |
cd fuzzforge-mcp
nohup uv run python -m fuzzforge_mcp.server > server.log 2>&1 &
echo $! > server.pid
sleep 3
- name: Run MCP tool tests
run: |
cd fuzzforge-mcp
uv run --extra tests pytest tests/test_resources.py -v
- name: Stop MCP server
if: always()
run: |
if [ -f fuzzforge-mcp/server.pid ]; then
kill $(cat fuzzforge-mcp/server.pid) || true
fi
- name: Show server logs
if: failure()
run: cat fuzzforge-mcp/server.log || true

3
.gitignore vendored
View File

@@ -10,3 +10,6 @@ __pycache__
# Podman/Docker container storage artifacts
~/.fuzzforge/
# User-specific hub config (generated at runtime)
hub-config.json

242
README.md
View File

@@ -17,9 +17,9 @@
<sub>
<a href="#-overview"><b>Overview</b></a> •
<a href="#-features"><b>Features</b></a> •
<a href="#-mcp-security-hub"><b>Security Hub</b></a> •
<a href="#-installation"><b>Installation</b></a> •
<a href="USAGE.md"><b>Usage Guide</b></a> •
<a href="#-modules"><b>Modules</b></a> •
<a href="#-contributing"><b>Contributing</b></a>
</sub>
</p>
@@ -32,39 +32,44 @@
## 🚀 Overview
**FuzzForge AI** is an open-source runtime that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
**FuzzForge AI** is an open-source MCP server that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
### The Core: Modules
FuzzForge connects your AI assistant to **MCP tool hubs** — collections of containerized security tools that the agent can discover, chain, and execute autonomously. Instead of manually running security tools, describe what you want and let your AI assistant handle it.
At the heart of FuzzForge are **modules** - containerized security tools that AI agents can discover, configure, and orchestrate. Each module encapsulates a specific security capability (static analysis, fuzzing, crash analysis, etc.) and runs in an isolated container.
### The Core: Hub Architecture
- **🔌 Plug & Play**: Modules are self-contained - just pull and run
- **🤖 AI-Native**: Designed for AI agent orchestration via MCP
- **🔗 Composable**: Chain modules together into automated workflows
- **📦 Extensible**: Build custom modules with the Python SDK
FuzzForge acts as a **meta-MCP server** — a single MCP endpoint that gives your AI agent access to tools from multiple MCP hub servers. Each hub server is a containerized security tool (Binwalk, YARA, Radare2, Nmap, etc.) that the agent can discover at runtime.
FuzzForge AI handles module discovery, execution, and result collection. Security modules (developed separately) provide the actual security tooling - from static analyzers to fuzzers to crash triagers.
- **🔍 Discovery**: The agent lists available hub servers and discovers their tools
- **🤖 AI-Native**: Hub tools provide agent context — usage tips, workflow guidance, and domain knowledge
- **🔗 Composable**: Chain tools from different hubs into automated pipelines
- **📦 Extensible**: Add your own MCP servers to the hub registry
Instead of manually running security tools, describe what you want and let your AI assistant handle it.
### 🎬 Use Case: Firmware Vulnerability Research
> **Scenario**: Analyze a firmware image to find security vulnerabilities — fully automated by an AI agent.
```
User: "Search for vulnerabilities in firmware.bin"
Agent → Binwalk: Extract filesystem from firmware image
Agent → YARA: Scan extracted files for vulnerability patterns
Agent → Radare2: Trace dangerous function calls in prioritized binaries
Agent → Report: 8 vulnerabilities found (2 critical, 4 high, 2 medium)
```
### 🎬 Use Case: Rust Fuzzing Pipeline
> **Scenario**: Fuzz a Rust crate to discover vulnerabilities using AI-assisted harness generation and parallel fuzzing.
<table align="center">
<tr>
<th>1⃣ Analyze, Generate & Validate Harnesses</th>
<th>2⃣ Run Parallel Continuous Fuzzing</th>
</tr>
<tr>
<td><img src="assets/demopart2.gif" alt="FuzzForge Demo - Analysis Pipeline" width="100%"></td>
<td><img src="assets/demopart1.gif" alt="FuzzForge Demo - Parallel Fuzzing" width="100%"></td>
</tr>
<tr>
<td align="center"><sub>AI agent analyzes code, generates harnesses, and validates they compile</sub></td>
<td align="center"><sub>Multiple fuzzing sessions run in parallel with live metrics</sub></td>
</tr>
</table>
```
User: "Fuzz the blurhash crate for vulnerabilities"
Agent → Rust Analyzer: Identify fuzzable functions and attack surface
Agent → Harness Gen: Generate and validate fuzzing harnesses
Agent → Cargo Fuzzer: Run parallel coverage-guided fuzzing sessions
Agent → Crash Analysis: Deduplicate and triage discovered crashes
```
---
@@ -82,13 +87,13 @@ If you find FuzzForge useful, please **star the repo** to support development!
| Feature | Description |
|---------|-------------|
| 🤖 **AI-Native** | Built for MCP - works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 📦 **Containerized** | Each module runs in isolation via Docker or Podman |
| 🔄 **Continuous Mode** | Long-running tasks (fuzzing) with real-time metrics streaming |
| 🔗 **Workflows** | Chain multiple modules together in automated pipelines |
| 🛠️ **Extensible** | Create custom modules with the Python SDK |
| 🏠 **Local First** | All execution happens on your machine - no cloud required |
| 🔒 **Secure** | Sandboxed containers with no network access by default |
| 🤖 **AI-Native** | Built for MCP works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 🔌 **Hub System** | Connect to MCP tool hubs — each hub brings dozens of containerized security tools |
| 🔍 **Tool Discovery** | Agents discover available tools at runtime with built-in usage guidance |
| 🔗 **Pipelines** | Chain tools from different hubs into automated multi-step workflows |
| 🔄 **Persistent Sessions** | Long-running tools (Radare2, fuzzers) with stateful container sessions |
| 🏠 **Local First** | All execution happens on your machine no cloud required |
| 🔒 **Sandboxed** | Every tool runs in an isolated container via Docker or Podman |
---
@@ -102,27 +107,57 @@ If you find FuzzForge useful, please **star the repo** to support development!
┌─────────────────────────────────────────────────────────────────┐
│ FuzzForge MCP Server │
┌─────────────┐ ┌──────────────┐ ┌────────────────────────┐
│list_modules │ │execute_module│ │start_continuous_module │
───────────── ──────────────┘ └────────────────────────┘
Projects Hub Discovery Hub Execution
┌────────────── ──────────────────┐ ┌───────────────────
│ │init_project │ │list_hub_servers │ │execute_hub_tool │ │
│ │set_assets │ │discover_hub_tools│ │start_hub_server │ │
│ │list_results │ │get_tool_schema │ │stop_hub_server │ │
│ └──────────────┘ └──────────────────┘ └───────────────────┘ │
└───────────────────────────┬─────────────────────────────────────┘
Docker/Podman
┌─────────────────────────────────────────────────────────────────┐
FuzzForge Runner
Container Engine (Docker/Podman)
└────────────────────────────────────────────────────────────────
┌───────────────────┼───────────────────┐
▼ ▼
───────────────┐ ┌───────────────┐ ┌───────────────┐
Module A │ Module B │ │ Module C
(Container) │ │ (Container) (Container)
───────────────┘ └───────────────┘ └───────────────┘
MCP Hub Servers
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ Binwalk YARA │ Radare2 │ │ Nmap │
6 tools │ │ 5 tools │ │ 32 tools │ │ 8 tools │ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘
┌───────────┐ ─────────── ┌───────────┐ ┌───────────┐
│ Nuclei SQLMap │ │ Trivy │ │ ...
│ 7 tools │ │ 8 tools │ │ 7 tools │ │ 36 hubs │
└───────────┘ ─────────── └───────────┘ └───────────┘
└─────────────────────────────────────────────────────────────────┘
```
---
## 🔧 MCP Security Hub
FuzzForge ships with built-in support for the **[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)** — a collection of 36 production-ready, Dockerized MCP servers covering offensive security:
| Category | Servers | Examples |
|----------|---------|----------|
| 🔍 **Reconnaissance** | 8 | Nmap, Masscan, Shodan, WhatWeb |
| 🌐 **Web Security** | 6 | Nuclei, SQLMap, ffuf, Nikto |
| 🔬 **Binary Analysis** | 6 | Radare2, Binwalk, YARA, Capa, Ghidra |
| ⛓️ **Blockchain** | 3 | Medusa, Solazy, DAML Viewer |
| ☁️ **Cloud Security** | 3 | Trivy, Prowler, RoadRecon |
| 💻 **Code Security** | 1 | Semgrep |
| 🔑 **Secrets Detection** | 1 | Gitleaks |
| 💥 **Exploitation** | 1 | SearchSploit |
| 🎯 **Fuzzing** | 2 | Boofuzz, Dharma |
| 🕵️ **OSINT** | 2 | Maigret, DNSTwist |
| 🛡️ **Threat Intel** | 2 | VirusTotal, AlienVault OTX |
| 🏰 **Active Directory** | 1 | BloodHound |
> 185+ individual tools accessible through a single MCP connection.
The hub is open source and can be extended with your own MCP servers. See the [mcp-security-hub repository](https://github.com/FuzzingLabs/mcp-security-hub) for details.
---
## 📦 Installation
### Prerequisites
@@ -140,11 +175,20 @@ cd fuzzforge_ai
# Install dependencies
uv sync
# Build module images
make build-modules
```
### Link the Security Hub
```bash
# Clone the MCP Security Hub
git clone https://github.com/FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
# Build the Docker images for the hub tools
./scripts/build-hub-images.sh
```
Or use the terminal UI (`uv run fuzzforge ui`) to link hubs interactively.
### Configure MCP for Your AI Agent
```bash
@@ -165,81 +209,20 @@ uv run fuzzforge mcp status
---
## 📦 Modules
## 🧑‍💻 Usage
FuzzForge modules are containerized security tools that AI agents can orchestrate. The module ecosystem is designed around a simple principle: **the OSS runtime orchestrates, enterprise modules execute**.
Once installed, just talk to your AI agent:
### Module Ecosystem
| | FuzzForge AI | FuzzForge Enterprise Modules |
|---|---|---|
| **What** | Runtime & MCP server | Security research modules |
| **License** | Apache 2.0 | BSL 1.1 (Business Source License) |
| **Compatibility** | ✅ Runs any compatible module | ✅ Works with FuzzForge AI |
**Enterprise modules** are developed separately and provide production-ready security tooling:
| Category | Modules | Description |
|----------|---------|-------------|
| 🔍 **Static Analysis** | Rust Analyzer, Solidity Analyzer, Cairo Analyzer | Code analysis and fuzzable function detection |
| 🎯 **Fuzzing** | Cargo Fuzzer, Honggfuzz, AFL++ | Coverage-guided fuzz testing |
| 💥 **Crash Analysis** | Crash Triager, Root Cause Analyzer | Automated crash deduplication and analysis |
| 🔐 **Vulnerability Detection** | Pattern Matcher, Taint Analyzer | Security vulnerability scanning |
| 📝 **Reporting** | Report Generator, SARIF Exporter | Automated security report generation |
> 💡 **Build your own modules!** The FuzzForge SDK allows you to create custom modules that integrate seamlessly with FuzzForge AI. See [Creating Custom Modules](#-creating-custom-modules).
### Execution Modes
Modules run in two execution modes:
#### One-shot Execution
Run a module once and get results:
```python
result = execute_module("my-analyzer", assets_path="/path/to/project")
```
"What security tools are available?"
"Scan this firmware image for vulnerabilities"
"Analyze this binary with radare2"
"Run nuclei against https://example.com"
```
#### Continuous Execution
The agent will use FuzzForge to discover the right hub tools, chain them into a pipeline, and return results — all without you touching a terminal.
For long-running tasks like fuzzing, with real-time metrics:
```python
# Start continuous execution
session = start_continuous_module("my-fuzzer",
assets_path="/path/to/project",
configuration={"target": "my_target"})
# Check status with live metrics
status = get_continuous_status(session["session_id"])
# Stop and collect results
stop_continuous_module(session["session_id"])
```
---
## 🛠️ Creating Custom Modules
Build your own security modules with the FuzzForge SDK:
```python
from fuzzforge_modules_sdk import FuzzForgeModule, FuzzForgeModuleResults
class MySecurityModule(FuzzForgeModule):
def _run(self, resources):
self.emit_event("started", target=resources[0].path)
# Your analysis logic here
results = self.analyze(resources)
self.emit_progress(100, status="completed",
message=f"Analysis complete")
return FuzzForgeModuleResults.SUCCESS
```
📖 See the [Module SDK Guide](fuzzforge-modules/fuzzforge-modules-sdk/README.md) for details.
See the [Usage Guide](USAGE.md) for detailed setup and advanced workflows.
---
@@ -247,26 +230,17 @@ class MySecurityModule(FuzzForgeModule):
```
fuzzforge_ai/
├── fuzzforge-cli/ # Command-line interface
├── fuzzforge-mcp/ # MCP server — the core of FuzzForge
├── fuzzforge-cli/ # Command-line interface & terminal UI
├── fuzzforge-common/ # Shared abstractions (containers, storage)
├── fuzzforge-mcp/ # MCP server for AI agents
├── fuzzforge-modules/ # Security modules
│ └── fuzzforge-modules-sdk/ # Module development SDK
── fuzzforge-runner/ # Local execution engine
├── fuzzforge-types/ # Type definitions & schemas
└── demo/ # Demo projects for testing
├── fuzzforge-runner/ # Container execution engine (Docker/Podman)
├── fuzzforge-tests/ # Integration tests
├── mcp-security-hub/ # Default hub: 36 offensive security MCP servers
── scripts/ # Hub image build scripts
```
---
## 🗺️ What's Next
**[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub) integration** — Bridge 175+ offensive security tools (Nmap, Nuclei, Ghidra, and more) into FuzzForge workflows, all orchestrated by AI agents.
See [ROADMAP.md](ROADMAP.md) for the full roadmap.
---
## 🤝 Contributing
We welcome contributions from the community!
@@ -274,7 +248,7 @@ We welcome contributions from the community!
- 🐛 Report bugs via [GitHub Issues](../../issues)
- 💡 Suggest features or improvements
- 🔧 Submit pull requests
- 📦 Share your custom modules
- 🔌 Add new MCP servers to the [Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)
See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.

View File

@@ -49,8 +49,11 @@ uv sync
uv run fuzzforge ui
# 3. Press 'h' → "FuzzingLabs Hub" to clone & link the default security hub
# 4. Select an agent row and press Enter to link it
# 5. Restart your AI agent and start talking:
# 4. Select an agent row and press Enter to install the MCP server for your agent
# 5. Build the Docker images for the hub tools (required before tools can run)
./scripts/build-hub-images.sh
# 6. Restart your AI agent and start talking:
# "What security tools are available?"
# "Scan this binary with binwalk and yara"
# "Analyze this Rust crate for fuzzable functions"
@@ -64,7 +67,10 @@ uv run fuzzforge mcp install copilot # For VS Code + GitHub Copilot
# OR
uv run fuzzforge mcp install claude-code # For Claude Code CLI
# Build hub tool images
# Clone and link the default security hub
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
# Build hub tool images (required — tools only run once their image is built)
./scripts/build-hub-images.sh
# Restart your AI agent — done!
@@ -399,13 +405,20 @@ uv run fuzzforge project results <id> # Get execution results
Configure FuzzForge using environment variables:
```bash
# Storage path for projects and execution results
export FUZZFORGE_STORAGE_PATH=/path/to/storage
# Override the FuzzForge installation root (auto-detected from cwd by default)
export FUZZFORGE_ROOT=/path/to/fuzzforge_ai
# Override the user-global data directory (default: ~/.fuzzforge)
# Useful for isolated testing without touching your real installation
export FUZZFORGE_USER_DIR=/tmp/my-fuzzforge-test
# Storage path for projects and execution results (default: <workspace>/.fuzzforge/storage)
export FUZZFORGE_STORAGE__PATH=/path/to/storage
# Container engine (Docker is default)
export FUZZFORGE_ENGINE__TYPE=docker # or podman
# Podman-specific settings
# Podman-specific container storage paths
export FUZZFORGE_ENGINE__GRAPHROOT=~/.fuzzforge/containers/storage
export FUZZFORGE_ENGINE__RUNROOT=~/.fuzzforge/containers/run
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 360 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 MiB

View File

@@ -13,3 +13,49 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/fuzzforge_cli/tui/**" = [
"ARG002", # unused method argument: callback signature
"BLE001", # blind exception: broad error handling in UI
"C901", # complexity: UI logic
"D107", # missing docstring in __init__: simple dataclasses
"FBT001", # boolean positional arg
"FBT002", # boolean default arg
"PLC0415", # import outside top-level: lazy loading
"PLR0911", # too many return statements
"PLR0912", # too many branches
"PLR2004", # magic value comparison
"RUF012", # mutable class default: Textual pattern
"S603", # subprocess: validated inputs
"S607", # subprocess: PATH lookup
"SIM108", # ternary: readability preference
"TC001", # TYPE_CHECKING: runtime type needs
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
"TRY300", # try-else: existing pattern
]
"tui/*.py" = [
"D107", # missing docstring in __init__: simple dataclasses
"TC001", # TYPE_CHECKING: runtime type needs
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
]
"src/fuzzforge_cli/commands/mcp.py" = [
"ARG001", # unused argument: callback signature
"B904", # raise from: existing pattern
"F841", # unused variable: legacy code
"FBT002", # boolean default arg
"PLR0912", # too many branches
"PLR0915", # too many statements
"SIM108", # ternary: readability preference
]
"src/fuzzforge_cli/application.py" = [
"B008", # function call in default: Path.cwd()
"PLC0415", # import outside top-level: lazy loading
]
"src/fuzzforge_cli/commands/projects.py" = [
"TC003", # TYPE_CHECKING: runtime type needs
]
"src/fuzzforge_cli/context.py" = [
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
]

View File

@@ -3,12 +3,12 @@
from pathlib import Path
from typing import Annotated
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
from typer import Context as TyperContext
from typer import Option, Typer
from fuzzforge_cli.commands import mcp, projects
from fuzzforge_cli.context import Context
from fuzzforge_mcp.storage import LocalStorage
application: Typer = Typer(
name="fuzzforge",

View File

@@ -12,7 +12,7 @@ import os
import sys
from enum import StrEnum
from pathlib import Path
from typing import Annotated
from typing import Annotated, Any
from rich.console import Console
from rich.panel import Panel
@@ -44,10 +44,10 @@ def _get_copilot_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Code" / "User" / "mcp.json"
elif sys.platform == "win32":
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "mcp.json"
else: # Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
# Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
def _get_claude_desktop_mcp_path() -> Path:
@@ -58,10 +58,10 @@ def _get_claude_desktop_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json"
elif sys.platform == "win32":
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Claude" / "claude_desktop_config.json"
else: # Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
# Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
def _get_claude_code_mcp_path(project_path: Path | None = None) -> Path:
@@ -114,13 +114,13 @@ def _detect_docker_socket() -> str:
:returns: Path to the Docker socket.
"""
socket_paths = [
"/var/run/docker.sock",
socket_paths: list[Path] = [
Path("/var/run/docker.sock"),
Path.home() / ".docker" / "run" / "docker.sock",
]
for path in socket_paths:
if Path(path).exists():
if path.exists():
return str(path)
return "/var/run/docker.sock"
@@ -132,15 +132,22 @@ def _find_fuzzforge_root() -> Path:
:returns: Path to fuzzforge-oss directory.
"""
# Try to find from current file location
current = Path(__file__).resolve()
# Check environment variable override first
env_root = os.environ.get("FUZZFORGE_ROOT")
if env_root:
return Path(env_root).resolve()
# Walk up to find fuzzforge-oss root
# Walk up from cwd to find a fuzzforge root (hub-config.json is the marker)
for parent in [Path.cwd(), *Path.cwd().parents]:
if (parent / "hub-config.json").is_file():
return parent
# Fall back to __file__-based search (dev install inside fuzzforge-oss)
current = Path(__file__).resolve()
for parent in current.parents:
if (parent / "fuzzforge-mcp").is_dir():
return parent
# Fall back to cwd
return Path.cwd()
@@ -148,7 +155,7 @@ def _generate_mcp_config(
fuzzforge_root: Path,
engine_type: str,
engine_socket: str,
) -> dict:
) -> dict[str, Any]:
"""Generate MCP server configuration.
:param fuzzforge_root: Path to fuzzforge-oss installation.
@@ -167,9 +174,12 @@ def _generate_mcp_config(
command = "uv"
args = ["--directory", str(fuzzforge_root), "run", "fuzzforge-mcp"]
# Self-contained storage paths for FuzzForge containers
# This isolates FuzzForge from system Podman and avoids snap issues
fuzzforge_home = Path.cwd() / ".fuzzforge"
# User-global storage paths for FuzzForge containers.
# Kept under ~/.fuzzforge so images are built once and shared across
# all workspaces — regardless of where `fuzzforge mcp install` is run.
# Override with FUZZFORGE_USER_DIR for isolated testing.
user_dir_env = os.environ.get("FUZZFORGE_USER_DIR")
fuzzforge_home = Path(user_dir_env).resolve() if user_dir_env else Path.home() / ".fuzzforge"
graphroot = fuzzforge_home / "containers" / "storage"
runroot = fuzzforge_home / "containers" / "run"

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from fuzzforge_mcp.storage import LocalStorage
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
if TYPE_CHECKING:
from typer import Context as TyperContext

View File

@@ -9,12 +9,16 @@ hub management capabilities.
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any
from rich.text import Text
from textual import events, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import Button, DataTable, Footer, Header, Label
from textual.message import Message
from textual.widgets import Button, DataTable, Footer, Header
from fuzzforge_cli.tui.helpers import (
check_agent_status,
@@ -24,11 +28,49 @@ from fuzzforge_cli.tui.helpers import (
load_hub_config,
)
if TYPE_CHECKING:
from fuzzforge_cli.commands.mcp import AIAgent
# Agent config entries stored alongside their linked status for row mapping
_AgentRow = tuple[str, "AIAgent", "Path", str, bool] # noqa: F821
_AgentRow = tuple[str, "AIAgent", Path, str, bool]
class FuzzForgeApp(App):
class SingleClickDataTable(DataTable[Any]):
"""DataTable subclass that also fires ``RowClicked`` on a single mouse click.
Textual's built-in ``RowSelected`` only fires on Enter or on a second click
of an already-highlighted row. ``RowClicked`` fires on every first click,
enabling single-click-to-act UX without requiring Enter.
"""
class RowClicked(Message):
"""Fired on every single mouse click on a data row."""
def __init__(self, data_table: SingleClickDataTable, cursor_row: int) -> None:
self.data_table = data_table
self.cursor_row = cursor_row
super().__init__()
@property
def control(self) -> SingleClickDataTable:
"""Return the data table that fired this event."""
return self.data_table
async def _on_click(self, event: events.Click) -> None:
"""Forward to parent, then post RowClicked on every mouse click.
The hub table is handled exclusively via RowClicked. RowSelected is
intentionally NOT used for the hub table to avoid double-dispatch.
"""
await super()._on_click(event)
meta = event.style.meta
if meta and "row" in meta and self.cursor_type == "row":
row_index: int = int(meta["row"])
if row_index >= 0:
self.post_message(SingleClickDataTable.RowClicked(self, row_index))
class FuzzForgeApp(App[None]):
"""FuzzForge AI terminal user interface."""
TITLE = "FuzzForge AI"
@@ -91,7 +133,8 @@ class FuzzForgeApp(App):
/* Modal screens */
AgentSetupScreen, AgentUnlinkScreen,
HubManagerScreen, LinkHubScreen, CloneHubScreen {
HubManagerScreen, LinkHubScreen, CloneHubScreen,
BuildImageScreen, BuildLogScreen {
align: center middle;
}
@@ -125,6 +168,35 @@ class FuzzForgeApp(App):
overflow-y: auto;
}
#build-dialog {
width: 72;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
}
#confirm-text {
margin: 1 0 2 0;
}
#build-log {
height: 30;
border: round $panel;
margin: 1 0;
}
#build-subtitle {
color: $text-muted;
margin-bottom: 1;
}
#build-status {
height: 1;
margin-top: 1;
}
.dialog-title {
text-style: bold;
text-align: center;
@@ -163,6 +235,7 @@ class FuzzForgeApp(App):
Binding("q", "quit", "Quit"),
Binding("h", "manage_hubs", "Hub Manager"),
Binding("r", "refresh", "Refresh"),
Binding("enter", "select_row", "Select", show=False),
]
def compose(self) -> ComposeResult:
@@ -170,7 +243,7 @@ class FuzzForgeApp(App):
yield Header()
with VerticalScroll(id="main"):
with Vertical(id="hub-panel", classes="panel"):
yield DataTable(id="hub-table")
yield SingleClickDataTable(id="hub-table")
with Horizontal(id="hub-title-bar"):
yield Button(
"Hub Manager (h)",
@@ -189,7 +262,12 @@ class FuzzForgeApp(App):
def on_mount(self) -> None:
"""Populate tables on startup."""
self._agent_rows: list[_AgentRow] = []
self.query_one("#hub-panel").border_title = "Hub Servers"
self._hub_rows: list[tuple[str, str, str, bool] | None] = []
# Background build tracking
self._active_builds: dict[str, object] = {} # image -> Popen
self._build_logs: dict[str, list[str]] = {} # image -> log lines
self._build_results: dict[str, bool] = {} # image -> success
self.query_one("#hub-panel").border_title = "Hub Servers [dim](click ✗ Not built to build)[/dim]"
self.query_one("#agents-panel").border_title = "AI Agents"
self._refresh_agents()
self._refresh_hub()
@@ -215,9 +293,11 @@ class FuzzForgeApp(App):
def _refresh_hub(self) -> None:
"""Refresh the hub servers table, grouped by source hub."""
table = self.query_one("#hub-table", DataTable)
self._hub_rows = []
table = self.query_one("#hub-table", SingleClickDataTable)
table.clear(columns=True)
table.add_columns("Server", "Image", "Hub", "Status")
table.cursor_type = "row"
try:
fuzzforge_root = find_fuzzforge_root()
@@ -236,7 +316,7 @@ class FuzzForgeApp(App):
return
# Group servers by source hub
groups: dict[str, list[dict]] = defaultdict(list)
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
for server in servers:
source = server.get("source_hub", "manual")
groups[source].append(server)
@@ -245,7 +325,7 @@ class FuzzForgeApp(App):
ready_count = 0
total = len(hub_servers)
statuses: list[tuple[dict, bool, str]] = []
statuses: list[tuple[dict[str, Any], bool, str]] = []
for server in hub_servers:
enabled = server.get("enabled", True)
if not enabled:
@@ -270,6 +350,7 @@ class FuzzForgeApp(App):
style="bold",
)
table.add_row(header, "", "", "")
self._hub_rows.append(None) # group header — not selectable
# Tool rows
for server, is_ready, status_text in statuses:
@@ -277,12 +358,14 @@ class FuzzForgeApp(App):
image = server.get("image", "unknown")
enabled = server.get("enabled", True)
if not enabled:
if image in getattr(self, "_active_builds", {}):
status_cell = Text("⏳ Building…", style="yellow")
elif not enabled:
status_cell = Text("Disabled", style="dim")
elif is_ready:
status_cell = Text("✓ Ready", style="green")
else:
status_cell = Text(f"{status_text}", style="red")
status_cell = Text(f"{status_text}", style="red dim")
table.add_row(
f" {name}",
@@ -290,13 +373,27 @@ class FuzzForgeApp(App):
hub_name,
status_cell,
)
self._hub_rows.append((name, image, hub_name, is_ready))
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection on the agents table."""
if event.data_table.id != "agents-table":
return
"""Handle Enter-key row selection (agents table only).
idx = event.cursor_row
Hub table uses RowClicked exclusively — wiring it to RowSelected too
would cause a double push on every click since Textual 8 fires
RowSelected on ALL clicks, not just second-click-on-same-row.
"""
if event.data_table.id == "agents-table":
self._handle_agent_row(event.cursor_row)
def on_single_click_data_table_row_clicked(
self, event: SingleClickDataTable.RowClicked
) -> None:
"""Handle single mouse-click on a hub table row."""
if event.data_table.id == "hub-table":
self._handle_hub_row(event.cursor_row)
def _handle_agent_row(self, idx: int) -> None:
"""Open agent setup/unlink for the selected agent row."""
if idx < 0 or idx >= len(self._agent_rows):
return
@@ -317,6 +414,111 @@ class FuzzForgeApp(App):
callback=self._on_agent_changed,
)
def _handle_hub_row(self, idx: int) -> None:
"""Handle a click on a hub table row."""
# Guard: never push two build dialogs at once (double-click protection)
if getattr(self, "_build_dialog_open", False):
return
if idx < 0 or idx >= len(self._hub_rows):
return
row_data = self._hub_rows[idx]
if row_data is None:
return # group header row — ignore
server_name, image, hub_name, is_ready = row_data
# If a build is already running, open the live log viewer
if image in self._active_builds:
from fuzzforge_cli.tui.screens.build_log import BuildLogScreen
self._build_dialog_open = True
self.push_screen(
BuildLogScreen(image),
callback=lambda _: setattr(self, "_build_dialog_open", False),
)
return
if is_ready:
self.notify(f"{image} is already built ✓", severity="information")
return
if hub_name == "manual":
self.notify("Manual servers must be built outside FuzzForge")
return
from fuzzforge_cli.tui.screens.build_image import BuildImageScreen
self._build_dialog_open = True
def _on_build_dialog_done(result: bool | None) -> None:
self._build_dialog_open = False
if result is not None:
self._on_build_confirmed(result, server_name, image, hub_name)
self.push_screen(
BuildImageScreen(server_name, image, hub_name),
callback=_on_build_dialog_done,
)
def _on_build_confirmed(self, confirmed: bool, server_name: str, image: str, hub_name: str) -> None:
"""Start a background build if the user confirmed."""
if not confirmed:
return
self._build_logs[image] = []
self._build_results.pop(image, None)
self._active_builds[image] = True # mark as pending so ⏳ shows immediately
self._refresh_hub() # show ⏳ Building… immediately
self._run_build(server_name, image, hub_name)
@work(thread=True)
def _run_build(self, server_name: str, image: str, hub_name: str) -> None:
"""Build a Docker/Podman image in a background thread."""
from fuzzforge_cli.tui.helpers import build_image, find_dockerfile_for_server
logs = self._build_logs.setdefault(image, [])
dockerfile = find_dockerfile_for_server(server_name, hub_name)
if dockerfile is None:
logs.append(f"ERROR: Dockerfile not found for '{server_name}' in hub '{hub_name}'")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
logs.append(f"Building {image} from {dockerfile.parent}")
logs.append("")
try:
proc = build_image(image, dockerfile)
except FileNotFoundError as exc:
logs.append(f"ERROR: {exc}")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
self._active_builds[image] = proc # replace pending marker with actual process
self.call_from_thread(self._refresh_hub) # show ⏳ in table
if proc.stdout is None:
return
for line in proc.stdout:
logs.append(line.rstrip())
proc.wait()
self._active_builds.pop(image, None)
success = proc.returncode == 0
self._build_results[image] = success
self.call_from_thread(self._on_build_done, image, success=success)
def _on_build_done(self, image: str, *, success: bool) -> None:
"""Handle completion of a background build on the main thread."""
self._refresh_hub()
if success:
self.notify(f"{image} built successfully", severity="information")
else:
self.notify(f"{image} build failed — click row for log", severity="error")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "btn-hub-manager":

View File

@@ -8,7 +8,9 @@ and managing linked MCP hub repositories.
from __future__ import annotations
import contextlib
import json
import os
import subprocess
from pathlib import Path
from typing import Any
@@ -30,12 +32,32 @@ FUZZFORGE_DEFAULT_HUB_URL = "git@github.com:FuzzingLabs/mcp-security-hub.git"
FUZZFORGE_DEFAULT_HUB_NAME = "mcp-security-hub"
def get_fuzzforge_user_dir() -> Path:
"""Return the user-global ``~/.fuzzforge/`` directory.
Stores data that is shared across all workspaces: cloned hub
repositories, the hub registry, container storage (graphroot/runroot),
and the hub workspace volume.
Override with the ``FUZZFORGE_USER_DIR`` environment variable to
redirect all user-global data to a custom path — useful for testing
a fresh install without touching the real ``~/.fuzzforge/``.
:return: ``Path.home() / ".fuzzforge"`` or ``$FUZZFORGE_USER_DIR``
"""
env_dir = os.environ.get("FUZZFORGE_USER_DIR")
if env_dir:
return Path(env_dir).resolve()
return Path.home() / ".fuzzforge"
def get_fuzzforge_dir() -> Path:
"""Return the project-local ``.fuzzforge/`` directory.
Uses the current working directory so that each project gets its
own isolated FuzzForge configuration, hubs, and storage — similar
to how ``.git/`` or ``.venv/`` work.
Stores data that is specific to the current workspace: fuzzing
results and project artifacts. Similar to how ``.git/`` scopes
version-control data to a single project.
:return: ``Path.cwd() / ".fuzzforge"``
@@ -99,16 +121,21 @@ def check_agent_status(config_path: Path, servers_key: str) -> tuple[bool, str]:
def check_hub_image(image: str) -> tuple[bool, str]:
"""Check whether a Docker image exists locally.
"""Check whether a container image exists locally.
:param image: Docker image name (e.g. "semgrep-mcp:latest").
Respects the ``FUZZFORGE_ENGINE__TYPE`` environment variable so that
Podman users see the correct build status instead of always "Not built".
:param image: Image name (e.g. "semgrep-mcp:latest").
:return: Tuple of (is_ready, status_description).
"""
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
cmd = "podman" if engine == "podman" else "docker"
try:
result = subprocess.run(
["docker", "image", "inspect", image],
capture_output=True,
[cmd, "image", "inspect", image],
check=False, capture_output=True,
text=True,
timeout=5,
)
@@ -118,7 +145,7 @@ def check_hub_image(image: str) -> tuple[bool, str]:
except subprocess.TimeoutExpired:
return False, "Timeout"
except FileNotFoundError:
return False, "Docker not found"
return False, f"{cmd} not found"
def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
@@ -132,7 +159,8 @@ def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
if not config_path.exists():
return {}
try:
return json.loads(config_path.read_text())
data: dict[str, Any] = json.loads(config_path.read_text())
return data
except json.JSONDecodeError:
return {}
@@ -237,37 +265,103 @@ def uninstall_agent_config(agent: AIAgent) -> str:
def get_hubs_registry_path() -> Path:
"""Return path to the hubs registry file (``.fuzzforge/hubs.json``).
"""Return path to the hubs registry file (``~/.fuzzforge/hubs.json``).
Stored in the user-global directory so the registry is shared across
all workspaces.
:return: Path to the registry JSON file.
"""
return get_fuzzforge_dir() / "hubs.json"
return get_fuzzforge_user_dir() / "hubs.json"
def get_default_hubs_dir() -> Path:
"""Return default directory for cloned hubs (``.fuzzforge/hubs/``).
"""Return default directory for cloned hubs (``~/.fuzzforge/hubs/``).
Stored in the user-global directory so hubs are cloned once and
reused in every workspace.
:return: Path to the default hubs directory.
"""
return get_fuzzforge_dir() / "hubs"
return get_fuzzforge_user_dir() / "hubs"
def _discover_hub_dirs() -> list[Path]:
"""Scan known hub directories for cloned repos.
Checks both the current global location (``~/.fuzzforge/hubs/``) and the
legacy workspace-local location (``<cwd>/.fuzzforge/hubs/``) so that hubs
cloned before the global-dir migration are still found.
:return: List of hub directory paths (each is a direct child with a ``.git``
sub-directory).
"""
candidates: list[Path] = []
for base in (get_fuzzforge_user_dir() / "hubs", get_fuzzforge_dir() / "hubs"):
if base.is_dir():
candidates.extend(
entry for entry in base.iterdir()
if entry.is_dir() and (entry / ".git").is_dir()
)
return candidates
def load_hubs_registry() -> dict[str, Any]:
"""Load the hubs registry from disk.
If the registry file does not exist, auto-recovers it by scanning known hub
directories and rebuilding entries for any discovered hubs. This handles
the migration from the old workspace-local ``<cwd>/.fuzzforge/hubs.json``
path to the global ``~/.fuzzforge/hubs.json`` path, as well as any case
where the registry was lost.
:return: Registry dict with ``hubs`` key containing a list of hub entries.
"""
path = get_hubs_registry_path()
if not path.exists():
return {"hubs": []}
try:
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
if path.exists():
try:
data: dict[str, Any] = json.loads(path.read_text())
return data
except (json.JSONDecodeError, OSError):
pass
# Registry missing — attempt to rebuild from discovered hub directories.
discovered = _discover_hub_dirs()
if not discovered:
return {"hubs": []}
hubs: list[dict[str, Any]] = []
for hub_dir in discovered:
name = hub_dir.name
# Try to read the git remote URL
git_url: str = ""
try:
import subprocess as _sp
r = _sp.run(
["git", "-C", str(hub_dir), "remote", "get-url", "origin"],
check=False, capture_output=True, text=True, timeout=5,
)
if r.returncode == 0:
git_url = r.stdout.strip()
except Exception: # noqa: S110 - git URL is optional, failure is acceptable
pass
hubs.append({
"name": name,
"path": str(hub_dir),
"git_url": git_url,
"is_default": name == FUZZFORGE_DEFAULT_HUB_NAME,
})
registry: dict[str, Any] = {"hubs": hubs}
# Persist so we don't re-scan on every load
with contextlib.suppress(OSError):
save_hubs_registry(registry)
return registry
def save_hubs_registry(registry: dict[str, Any]) -> None:
"""Save the hubs registry to disk.
@@ -320,7 +414,7 @@ def scan_hub_for_servers(hub_path: Path) -> list[dict[str, Any]]:
"image": f"{tool_name}:latest",
"category": category,
"capabilities": capabilities,
"volumes": [f"{get_fuzzforge_dir()}/hub/workspace:/data"],
"volumes": [f"{get_fuzzforge_user_dir()}/hub/workspace:/data"],
"enabled": True,
}
)
@@ -422,8 +516,7 @@ def clone_hub(
"""
if name is None:
name = git_url.rstrip("/").split("/")[-1]
if name.endswith(".git"):
name = name[:-4]
name = name.removesuffix(".git")
if dest is None:
dest = get_default_hubs_dir() / name
@@ -433,7 +526,7 @@ def clone_hub(
try:
result = subprocess.run(
["git", "-C", str(dest), "pull"],
capture_output=True,
check=False, capture_output=True,
text=True,
timeout=120,
)
@@ -451,7 +544,7 @@ def clone_hub(
try:
result = subprocess.run(
["git", "clone", git_url, str(dest)],
capture_output=True,
check=False, capture_output=True,
text=True,
timeout=300,
)
@@ -533,3 +626,62 @@ def _remove_hub_servers_from_config(hub_name: str) -> int:
config_path.write_text(json.dumps(config, indent=2))
return before - after
def find_dockerfile_for_server(server_name: str, hub_name: str) -> Path | None:
"""Find the Dockerfile for a hub server tool.
Looks up the hub path from the registry, then scans for
``category/<server_name>/Dockerfile``.
:param server_name: Tool name (e.g. ``"nmap-mcp"``).
:param hub_name: Hub name as stored in the registry.
:return: Absolute path to the Dockerfile, or ``None`` if not found.
"""
registry = load_hubs_registry()
hub_entry = next(
(h for h in registry.get("hubs", []) if h.get("name") == hub_name),
None,
)
if not hub_entry:
return None
hub_path = Path(hub_entry["path"])
for dockerfile in hub_path.rglob("Dockerfile"):
rel = dockerfile.relative_to(hub_path)
parts = rel.parts
if len(parts) == 3 and parts[1] == server_name:
return dockerfile
return None
def build_image(
image: str,
dockerfile: Path,
*,
engine: str | None = None,
) -> subprocess.Popen[str]:
"""Start a non-blocking ``docker/podman build`` subprocess.
Returns the running :class:`subprocess.Popen` object so the caller
can stream ``stdout`` / ``stderr`` lines incrementally.
:param image: Image tag (e.g. ``"nmap-mcp:latest"``).
:param dockerfile: Path to the ``Dockerfile``.
:param engine: ``"docker"`` or ``"podman"`` (auto-detected if ``None``).
:return: Running subprocess with merged stdout+stderr.
"""
if engine is None:
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
engine = "podman" if engine == "podman" else "docker"
context_dir = str(dockerfile.parent)
return subprocess.Popen(
[engine, "build", "-t", image, context_dir],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)

View File

@@ -0,0 +1,58 @@
"""Build-image confirm dialog for FuzzForge TUI.
Simple modal that asks the user to confirm before starting a background
build. The actual build is managed by the app so the user is never
locked on this screen.
"""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label
class _NoFocusButton(Button):
can_focus = False
class BuildImageScreen(ModalScreen[bool]):
"""Quick confirmation before starting a background Docker/Podman build."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, server_name: str, image: str, hub_name: str) -> None:
super().__init__()
self._server_name = server_name
self._image = image
self._hub_name = hub_name
def compose(self) -> ComposeResult:
"""Build the confirmation dialog UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build {self._image}", classes="dialog-title")
yield Label(
f"Hub: {self._hub_name} • Tool: {self._server_name}",
id="build-subtitle",
)
yield Label(
"The image will be built in the background.\n"
"You'll receive a notification when it's done.",
id="confirm-text",
)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Build", variant="primary", id="btn-build")
yield _NoFocusButton("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Build or Cancel button clicks."""
if event.button.id == "btn-build":
self.dismiss(result=True)
elif event.button.id == "btn-cancel":
self.dismiss(result=False)
def action_cancel(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(result=False)

View File

@@ -0,0 +1,80 @@
"""Build-log viewer screen for FuzzForge TUI.
Shows live output of a background build started by the app. Polls the
app's ``_build_logs`` buffer every 500 ms so the user can pop this screen
open at any time while the build is running and see up-to-date output.
"""
from __future__ import annotations
from typing import Any
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label, Log
class _NoFocusButton(Button):
can_focus = False
class BuildLogScreen(ModalScreen[None]):
"""Live log viewer for a background build job managed by the app."""
BINDINGS = [("escape", "close", "Close")]
def __init__(self, image: str) -> None:
super().__init__()
self._image = image
self._last_line: int = 0
def compose(self) -> ComposeResult:
"""Build the log viewer UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build log — {self._image}", classes="dialog-title")
yield Label("", id="build-status")
yield Log(id="build-log", auto_scroll=True)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Close", variant="default", id="btn-close")
def on_mount(self) -> None:
"""Initialize log polling when the screen is mounted."""
self._flush_log()
self.set_interval(0.5, self._poll_log)
def _flush_log(self) -> None:
"""Write any new lines since the last flush."""
logs: list[str] = getattr(self.app, "_build_logs", {}).get(self._image, [])
log_widget = self.query_one("#build-log", Log)
new_lines = logs[self._last_line :]
for line in new_lines:
log_widget.write_line(line)
self._last_line += len(new_lines)
active: dict[str, Any] = getattr(self.app, "_active_builds", {})
status = self.query_one("#build-status", Label)
if self._image in active:
status.update("[yellow]⏳ Building…[/yellow]")
else:
# Build is done — check if we have a result stored
results: dict[str, Any] = getattr(self.app, "_build_results", {})
if self._image in results:
if results[self._image]:
status.update(f"[green]✓ {self._image} built successfully[/green]")
else:
status.update(f"[red]✗ {self._image} build failed[/red]")
def _poll_log(self) -> None:
"""Poll for new log lines periodically."""
self._flush_log()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Close button click."""
if event.button.id == "btn-close":
self.dismiss(None)
def action_close(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(None)

View File

@@ -81,6 +81,7 @@ class HubManagerScreen(ModalScreen[str | None]):
is_default = hub.get("is_default", False)
hub_path = Path(path)
count: str | Text
if hub_path.is_dir():
servers = scan_hub_for_servers(hub_path)
count = str(len(servers))
@@ -88,10 +89,11 @@ class HubManagerScreen(ModalScreen[str | None]):
count = Text("dir missing", style="yellow")
source = git_url or "local"
name_cell: str | Text
if is_default:
name_cell = Text(f"{name}", style="bold")
else:
name_cell = Text(name)
name_cell = name
table.add_row(name_cell, path, count, source)

View File

@@ -18,3 +18,32 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/**" = [
"ANN201", # missing return type: legacy code
"ARG002", # unused argument: callback pattern
"ASYNC109", # async with timeout param: intentional pattern
"BLE001", # blind exception: broad error handling needed
"C901", # complexity: legacy code
"EM102", # f-string in exception: existing pattern
"F401", # unused import: re-export pattern
"FBT001", # boolean positional arg
"FBT002", # boolean default arg
"FIX002", # TODO comments: documented tech debt
"N806", # variable naming: intentional constants
"PERF401", # list comprehension: readability over perf
"PLW0603", # global statement: intentional for shared state
"PTH111", # os.path usage: legacy code
"RUF005", # collection literal: legacy style
"S110", # try-except-pass: intentional suppression
"S603", # subprocess: validated inputs
"SIM108", # ternary: readability preference
"TC001", # TYPE_CHECKING: causes circular imports
"TC003", # TYPE_CHECKING: causes circular imports
"TRY003", # message in exception: existing pattern
"TRY300", # try-else: existing pattern
"TRY400", # logging.error vs exception: existing pattern
"UP017", # datetime.UTC: Python 3.11+ only
"UP041", # TimeoutError alias: compatibility
"UP043", # unnecessary type args: compatibility
"W293", # blank line whitespace: formatting
]

View File

@@ -176,6 +176,7 @@ class HubClient:
arguments: dict[str, Any],
*,
timeout: int | None = None,
extra_volumes: list[str] | None = None,
) -> dict[str, Any]:
"""Execute a tool on a hub server.
@@ -183,6 +184,7 @@ class HubClient:
:param tool_name: Name of the tool to execute.
:param arguments: Tool arguments.
:param timeout: Execution timeout (uses default if None).
:param extra_volumes: Additional Docker volume mounts to inject.
:returns: Tool execution result.
:raises HubClientError: If execution fails.
@@ -199,7 +201,7 @@ class HubClient:
)
try:
async with self._connect(config) as (reader, writer):
async with self._connect(config, extra_volumes=extra_volumes) as (reader, writer):
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
@@ -248,6 +250,7 @@ class HubClient:
async def _connect(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
@@ -256,6 +259,7 @@ class HubClient:
ephemeral per-call connection logic.
:param config: Server configuration.
:param extra_volumes: Additional Docker volume mounts to inject.
:yields: Tuple of (reader, writer) for communication.
"""
@@ -268,7 +272,7 @@ class HubClient:
# Ephemeral connection (original behaviour)
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config) as streams:
async with self._connect_docker(config, extra_volumes=extra_volumes) as streams:
yield streams
elif config.type == HubServerType.COMMAND:
async with self._connect_command(config) as streams:
@@ -284,10 +288,12 @@ class HubClient:
async def _connect_docker(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to a Docker-based MCP server.
:param config: Server configuration with image name.
:param extra_volumes: Additional volume mounts to inject (e.g. project assets).
:yields: Tuple of (reader, writer) for stdio communication.
"""
@@ -302,10 +308,14 @@ class HubClient:
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
# Add volumes
# Add volumes from server config
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
# Add extra volumes (e.g. project assets injected at runtime)
for volume in (extra_volumes or []):
cmd.extend(["-v", os.path.expanduser(volume)])
# Add environment variables
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
@@ -529,6 +539,7 @@ class HubClient:
async def start_persistent_session(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> PersistentSession:
"""Start a persistent Docker container and initialise MCP session.
@@ -536,6 +547,7 @@ class HubClient:
called, allowing multiple tool calls on the same session.
:param config: Server configuration (must be Docker type).
:param extra_volumes: Additional host:container volume mounts to inject.
:returns: The created persistent session.
:raises HubClientError: If the container cannot be started.
@@ -580,6 +592,9 @@ class HubClient:
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
for extra_vol in (extra_volumes or []):
cmd.extend(["-v", extra_vol])
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])

View File

@@ -180,12 +180,14 @@ class HubExecutor:
arguments: dict[str, Any] | None = None,
*,
timeout: int | None = None,
extra_volumes: list[str] | None = None,
) -> HubExecutionResult:
"""Execute a hub tool.
:param identifier: Tool identifier (hub:server:tool or server:tool).
:param arguments: Tool arguments.
:param timeout: Execution timeout.
:param extra_volumes: Additional Docker volume mounts to inject.
:returns: Execution result.
"""
@@ -232,6 +234,7 @@ class HubExecutor:
tool_name_to_use or tool_name,
arguments,
timeout=timeout,
extra_volumes=extra_volumes,
)
return HubExecutionResult(
success=True,
@@ -268,6 +271,7 @@ class HubExecutor:
tool.name,
arguments,
timeout=timeout,
extra_volumes=extra_volumes,
)
return HubExecutionResult(
success=True,
@@ -341,13 +345,14 @@ class HubExecutor:
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_server(self, server_name: str) -> dict[str, Any]:
async def start_persistent_server(self, server_name: str, extra_volumes: list[str] | None = None) -> dict[str, Any]:
"""Start a persistent container session for a server.
The container stays running between tool calls, allowing stateful
interactions (e.g., radare2 sessions, long-running fuzzing).
:param server_name: Name of the hub server to start.
:param extra_volumes: Additional host:container volume mounts to inject.
:returns: Session status dictionary.
:raises ValueError: If server not found.
@@ -358,7 +363,7 @@ class HubExecutor:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
session = await self._client.start_persistent_session(server.config)
session = await self._client.start_persistent_session(server.config, extra_volumes=extra_volumes)
# Auto-discover tools on the new session
try:

View File

@@ -294,3 +294,17 @@ class HubConfig(BaseModel):
default=True,
description="Cache discovered tools",
)
#: Workflow hints indexed by "after:<tool_name>" keys.
#: Loaded inline or merged from workflow_hints_file.
workflow_hints: dict[str, Any] = Field(
default_factory=dict,
description="Workflow hints indexed by 'after:<tool_name>'",
)
#: Optional path to an external workflow-hints.json file.
#: Relative paths are resolved relative to the hub-config.json location.
workflow_hints_file: str | None = Field(
default=None,
description="Path to an external workflow-hints.json to load and merge",
)

View File

@@ -87,6 +87,28 @@ class HubRegistry:
config=server_config,
)
# Load and merge external workflow hints file if specified.
if self._config.workflow_hints_file:
hints_path = Path(self._config.workflow_hints_file)
if not hints_path.is_absolute():
hints_path = config_path.parent / hints_path
if hints_path.exists():
try:
with hints_path.open() as hf:
hints_data = json.load(hf)
self._config.workflow_hints.update(hints_data.get("hints", {}))
logger.info(
"Loaded workflow hints",
path=str(hints_path),
hints=len(self._config.workflow_hints),
)
except Exception as hints_err:
logger.warning(
"Failed to load workflow hints file",
path=str(hints_path),
error=str(hints_err),
)
logger.info(
"Loaded hub configuration",
path=str(config_path),
@@ -218,6 +240,15 @@ class HubRegistry:
server.discovery_error = None
server.tools = tools
def get_workflow_hint(self, tool_name: str) -> dict | None:
"""Get the workflow hint for a tool by name.
:param tool_name: Tool name (e.g. ``binwalk_extract``).
:returns: Hint dict for the ``after:<tool_name>`` key, or None.
"""
return self._config.workflow_hints.get(f"after:{tool_name}") or None
def get_all_tools(self) -> list:
"""Get all discovered tools from all servers.

View File

@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
from pydantic import BaseModel
from fuzzforge_common.sandboxes.engines.enumeration import (
FuzzForgeSandboxEngines, # noqa: TC001 (required by 'pydantic' at runtime)
FuzzForgeSandboxEngines,
)
if TYPE_CHECKING:

View File

@@ -10,6 +10,7 @@ dependencies = [
"fuzzforge-common==0.0.1",
"pydantic==2.12.4",
"pydantic-settings==2.12.0",
"pyyaml>=6.0",
"structlog==25.5.0",
]

View File

@@ -14,3 +14,18 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/**" = [
"ASYNC109", # async with timeout param: intentional pattern
"EM102", # f-string in exception: existing pattern
"PERF401", # list comprehension: readability over perf
"PLR0913", # too many arguments: API compatibility
"PLW0602", # global variable: intentional for shared state
"PLW0603", # global statement: intentional for shared state
"RET504", # unnecessary assignment: readability
"RET505", # unnecessary elif after return: readability
"TC001", # TYPE_CHECKING: causes circular imports
"TC003", # TYPE_CHECKING: causes circular imports
"TRY300", # try-else: existing pattern
"TRY301", # abstract raise: existing pattern
"TRY003", # message in exception: existing pattern
]

View File

@@ -47,15 +47,36 @@ FuzzForge is a security research orchestration platform. Use these tools to:
Typical workflow:
1. Initialize a project with `init_project`
2. Set project assets with `set_project_assets` (optional, only needed once for the source directory)
2. Set project assets with `set_project_assets` — path to the directory containing
target files (firmware images, binaries, source code, etc.)
3. List available hub servers with `list_hub_servers`
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`
Hub workflow:
1. List available hub servers with `list_hub_servers`
2. Discover tools from servers with `discover_hub_tools`
3. Execute hub tools with `execute_hub_tool`
Skill packs:
Use `list_skills` to see available analysis pipelines (e.g. firmware-analysis).
Load one with `load_skill("firmware-analysis")` to get domain-specific guidance
and a scoped list of relevant hub servers. Skill packs describe the methodology —
follow the pipeline steps while adapting to what you find at each stage.
Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools.
Artifact tracking:
After each `execute_hub_tool` call, new output files are automatically tracked.
Use `list_artifacts` to find files produced by previous tools instead of parsing
paths from tool output text. Filter by source server or file type.
File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
Stateful tools:
- Some tools require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
""",
lifespan=lifespan,
)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast
from fastmcp.server.dependencies import get_context
@@ -21,6 +21,9 @@ _current_project_path: Path | None = None
# Singleton storage instance
_storage: LocalStorage | None = None
# Currently loaded skill pack (set by load_skill)
_active_skill: dict[str, Any] | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
@@ -75,3 +78,22 @@ def get_storage() -> LocalStorage:
settings = get_settings()
_storage = LocalStorage(settings.storage.path)
return _storage
def set_active_skill(skill: dict[str, Any] | None) -> None:
"""Set (or clear) the currently loaded skill pack.
:param skill: Parsed skill dict, or None to unload.
"""
global _active_skill
_active_skill = skill
def get_active_skill() -> dict[str, Any] | None:
"""Get the currently loaded skill pack.
:return: Active skill dict, or None if no skill is loaded.
"""
return _active_skill

View File

@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
@@ -31,10 +30,10 @@ async def list_executions() -> list[dict[str, Any]]:
return [
{
"execution_id": exec_id,
"has_results": storage.get_execution_results(project_path, exec_id) is not None,
"execution_id": entry["execution_id"],
"has_results": storage.get_execution_results(project_path, entry["execution_id"]) is not None,
}
for exec_id in execution_ids
for entry in execution_ids
]
except Exception as exception:

View File

@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
mcp: FastMCP = FastMCP()

View File

@@ -0,0 +1,44 @@
name: firmware-analysis
description: |
## Firmware Binary Vulnerability Analysis
Goal: Find exploitable vulnerabilities in firmware images.
### Pipeline
1. **Extract the filesystem** from the firmware image.
Look for SquashFS, JFFS2, CPIO, or other embedded filesystems.
2. **Scan extracted files for vulnerability patterns.**
Use vulnerability-focused rules to identify binaries with dangerous
function calls (system, strcpy, popen, sprintf, gets).
Prioritize targets by match count — the binary with the most hits
is the highest-priority target.
3. **Deep-analyze the highest-priority binary.**
Open a persistent analysis session. Look for:
- Dangerous function calls with unsanitized input
- Hardcoded credentials or backdoor strings
- Network service listeners with weak input validation
Focus on confirming whether flagged patterns are actually reachable.
4. **Search for known CVEs** matching library version strings found
during analysis. Cross-reference with public exploit databases.
5. **Compile findings** with severity ratings:
- CRITICAL: confirmed remote code execution paths
- HIGH: command injection or buffer overflow with reachable input
- MEDIUM: hardcoded credentials, weak crypto, format string issues
- LOW: informational findings (library versions, service fingerprints)
### Key files to prioritize in extracted firmware
- `usr/sbin/httpd`, `usr/bin/httpd` — web servers (high-priority)
- `etc/shadow`, `etc/passwd` — credential files
- `www/cgi-bin/*` — CGI scripts (command injection vectors)
- Custom binaries in `usr/sbin/`, `usr/bin/` — vendor attack surface
servers:
- binwalk-mcp
- yara-mcp
- radare2-mcp
- searchsploit-mcp

View File

@@ -0,0 +1,90 @@
name: go-fuzzing
description: |
## Go Fuzzing Vulnerability Discovery
Goal: Find memory safety bugs, panics, and logic errors in a Go project
using native Go fuzzing (go test -fuzz).
### Pipeline
1. **Analyze the Go project** to understand its attack surface.
Use `go_analyze` to scan the codebase and identify:
- Fuzzable entry points: functions accepting `[]byte`, `string`,
`io.Reader`, or other parser-like signatures (`Parse*`, `Decode*`,
`Unmarshal*`, `Read*`, `Open*`)
- Existing `Fuzz*` test functions already in `*_test.go` files
- Unsafe/cgo usage that increases the severity of any bugs found
- Known CVEs via govulncheck (enable with `run_vulncheck: true`)
If there are **no existing Fuzz targets**, stop here and report
that the project needs fuzz harnesses written first, listing the
recommended entry points from the analysis.
2. **Test harness quality** before committing to a long fuzzing campaign.
Use `go_harness_test` to evaluate each Fuzz* function:
- Compilation check — does `go test -c` succeed?
- Seed execution — do the seed corpus entries pass without panics?
- Short fuzzing trial — does the harness sustain fuzzing for 15-30s?
- Quality score (0-100): ≥80 = production-ready, ≥50 = needs work, <50 = broken
**Decision point:**
- If all harnesses are **broken** (score < 50): stop and report issues.
The user needs to fix them before fuzzing is useful.
- If some are **production-ready** or **needs-improvement** (score ≥ 50):
proceed with those targets to step 3.
- Skip broken harnesses — do not waste fuzzing time on them.
3. **Run fuzzing** on the viable targets.
Use `go_fuzz_run` for a bounded campaign:
- Set `duration` based on project size: 60-120s for quick scan,
300-600s for thorough analysis.
- Pass only the targets that scored ≥ 50 in step 2 via the `targets`
parameter — do not fuzz broken harnesses.
- The fuzzer collects crash inputs to `/app/output/crashes/{FuzzName}/`.
**Alternative — continuous mode** for deeper exploration:
- Use `go_fuzz_start` to begin background fuzzing.
- Periodically check `go_fuzz_status` to monitor progress.
- Use `go_fuzz_stop` when satisfied or when crashes are found.
If **no crashes** are found after a reasonable duration, report that
the fuzzing campaign completed cleanly with the execution metrics.
4. **Analyze crashes** found during fuzzing.
Use `go_crash_analyze` to process the crash inputs:
- Reproduction: re-run each crash input to confirm it's real
- Classification: categorize by type (nil-dereference, index-out-of-range,
slice-bounds, divide-by-zero, stack-overflow, data-race, panic, etc.)
- Severity assignment: critical / high / medium / low
- Deduplication: group crashes by signature (target + type + top 3 frames)
Skip this step if no crashes were found in step 3.
5. **Compile the vulnerability report** with findings organized by severity:
- **CRITICAL**: nil-dereference, segfault, data-race, stack-overflow
- **HIGH**: index/slice out of bounds, allocation overflow
- **MEDIUM**: integer overflow, divide by zero, explicit panics
- **LOW**: timeout, unclassified crashes
For each unique crash, include:
- The fuzz target that triggered it
- The crash type and root cause function + file + line
- Whether it was reproducible
- The crash input file path for manual investigation
### What the user's project needs
- A `go.mod` file (any Go module)
- At least one `*_test.go` file with `func FuzzXxx(f *testing.F)` functions
- Seed corpus entries added via `f.Add(...)` in the Fuzz functions
### Interpretation guide
- **govulncheck CVEs** (step 1) are known dependency vulnerabilities — report separately
- **Fuzzer crashes** (steps 3-4) are new bugs found by fuzzing the project's own code
- High execution counts with zero crashes = good sign (code is robust to that input space)
- Low quality scores in step 2 usually mean the harness needs better seed corpus or input handling
servers:
- go-analyzer-mcp
- go-harness-tester-mcp
- go-fuzzer-mcp
- go-crash-analyzer-mcp

View File

@@ -13,9 +13,14 @@ from __future__ import annotations
import json
import logging
import shutil
import mimetypes
from datetime import UTC, datetime
from pathlib import Path
from tarfile import open as Archive # noqa: N812
from typing import Any
from uuid import uuid4
import yaml
logger = logging.getLogger("fuzzforge-mcp")
@@ -79,6 +84,7 @@ class LocalStorage:
storage_path = self._get_project_path(project_path)
storage_path.mkdir(parents=True, exist_ok=True)
(storage_path / "runs").mkdir(parents=True, exist_ok=True)
(storage_path / "output").mkdir(parents=True, exist_ok=True)
# Create .gitignore to avoid committing large files
gitignore_path = storage_path / ".gitignore"
@@ -86,6 +92,8 @@ class LocalStorage:
gitignore_path.write_text(
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"output/\n"
"artifacts.json\n"
"!config.json\n"
)
@@ -131,7 +139,7 @@ class LocalStorage:
storage_path.mkdir(parents=True, exist_ok=True)
config_path = storage_path / "config.json"
config: dict = {}
config: dict[str, Any] = {}
if config_path.exists():
config = json.loads(config_path.read_text())
@@ -141,17 +149,85 @@ class LocalStorage:
logger.info("Set project assets: %s -> %s", project_path.name, assets_path)
return assets_path
def list_executions(self, project_path: Path) -> list[str]:
"""List all execution IDs for a project.
def get_project_output_path(self, project_path: Path) -> Path | None:
"""Get the output directory path for a project.
Returns the path to the writable output directory that is mounted
into hub tool containers at /app/output.
:param project_path: Path to the project directory.
:returns: List of execution IDs.
:returns: Path to output directory, or None if project not initialized.
"""
output_path = self._get_project_path(project_path) / "output"
if output_path.exists():
return output_path
return None
def record_execution(
self,
project_path: Path,
server_name: str,
tool_name: str,
arguments: dict[str, Any],
result: dict[str, Any],
) -> str:
"""Record an execution result to the project's runs directory.
:param project_path: Path to the project directory.
:param server_name: Hub server name.
:param tool_name: Tool name that was executed.
:param arguments: Arguments passed to the tool.
:param result: Execution result dictionary.
:returns: Execution ID.
"""
execution_id = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:8]}"
run_dir = self._get_project_path(project_path) / "runs" / execution_id
run_dir.mkdir(parents=True, exist_ok=True)
metadata = {
"execution_id": execution_id,
"timestamp": datetime.now(tz=UTC).isoformat(),
"server": server_name,
"tool": tool_name,
"arguments": arguments,
"success": result.get("success", False),
"result": result,
}
(run_dir / "metadata.json").write_text(json.dumps(metadata, indent=2, default=str))
logger.info("Recorded execution %s: %s:%s", execution_id, server_name, tool_name)
return execution_id
def list_executions(self, project_path: Path) -> list[dict[str, Any]]:
"""List all executions for a project with summary metadata.
:param project_path: Path to the project directory.
:returns: List of execution summaries (id, timestamp, server, tool, success).
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
return [d.name for d in runs_dir.iterdir() if d.is_dir()]
executions: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir(), reverse=True):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
executions.append({
"execution_id": meta.get("execution_id", run_dir.name),
"timestamp": meta.get("timestamp"),
"server": meta.get("server"),
"tool": meta.get("tool"),
"success": meta.get("success"),
})
else:
executions.append({"execution_id": run_dir.name})
return executions
def get_execution_results(
self,
@@ -201,3 +277,377 @@ class LocalStorage:
except Exception as exc:
msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc
# ------------------------------------------------------------------
# Artifact tracking
# ------------------------------------------------------------------
def _artifacts_path(self, project_path: Path) -> Path:
"""Get the path to the artifacts registry file.
:param project_path: Path to the project directory.
:returns: Path to artifacts.json.
"""
return self._get_project_path(project_path) / "artifacts.json"
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
"""Load the artifact registry from disk.
:param project_path: Path to the project directory.
:returns: List of artifact dicts.
"""
path = self._artifacts_path(project_path)
if path.exists():
try:
return json.loads(path.read_text()) # type: ignore[no-any-return]
except (json.JSONDecodeError, OSError):
return []
return []
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
"""Persist the artifact registry to disk.
:param project_path: Path to the project directory.
:param artifacts: Full artifact list to write.
"""
path = self._artifacts_path(project_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(artifacts, indent=2, default=str))
def _classify_file(self, file_path: Path) -> str:
"""Classify a file into a human-friendly type string.
:param file_path: Path to the file.
:returns: Type string (e.g. "elf-binary", "text", "directory").
"""
mime, _ = mimetypes.guess_type(str(file_path))
suffix = file_path.suffix.lower()
# Try reading ELF magic for binaries with no extension
if mime is None and suffix == "":
try:
header = file_path.read_bytes()[:4]
if header == b"\x7fELF":
return "elf-binary"
except OSError:
pass
if mime:
if "json" in mime:
return "json"
if "text" in mime or "xml" in mime or "yaml" in mime:
return "text"
if "image" in mime:
return "image"
if "octet-stream" in mime:
return "binary"
type_map: dict[str, str] = {
".json": "json",
".sarif": "sarif",
".md": "markdown",
".txt": "text",
".log": "text",
".csv": "csv",
".yaml": "yaml",
".yml": "yaml",
".xml": "xml",
".html": "html",
".elf": "elf-binary",
".so": "elf-binary",
".bin": "binary",
".gz": "archive",
".tar": "archive",
".zip": "archive",
}
return type_map.get(suffix, "binary")
def scan_artifacts(
self,
project_path: Path,
server_name: str,
tool_name: str,
) -> list[dict[str, Any]]:
"""Scan the output directory for new or modified files and register them.
Compares the current state of .fuzzforge/output/ against the existing
artifact registry and registers any new or modified files.
:param project_path: Path to the project directory.
:param server_name: Hub server that produced the artifacts.
:param tool_name: Tool that produced the artifacts.
:returns: List of newly registered artifact dicts.
"""
output_path = self.get_project_output_path(project_path)
if output_path is None or not output_path.exists():
return []
existing = self._load_artifacts(project_path)
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
now = datetime.now(tz=UTC).isoformat()
new_artifacts: list[dict[str, Any]] = []
for file_path in output_path.rglob("*"):
if not file_path.is_file():
continue
# Use the container-style path (/app/output/...) so it's
# directly usable in subsequent tool calls.
relative = file_path.relative_to(output_path)
container_path = f"/app/output/{relative}"
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
prev = known.get(container_path)
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
continue # Unchanged — skip
artifact: dict[str, Any] = {
"path": container_path,
"host_path": str(file_path),
"type": self._classify_file(file_path),
"size": size,
"mtime": mtime,
"source_server": server_name,
"source_tool": tool_name,
"registered_at": now,
}
if prev:
# Update existing entry in-place
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
existing[idx] = artifact
else:
existing.append(artifact)
new_artifacts.append(artifact)
if new_artifacts:
self._save_artifacts(project_path, existing)
logger.info(
"Registered %d new artifact(s) from %s:%s",
len(new_artifacts),
server_name,
tool_name,
)
return new_artifacts
def list_artifacts(
self,
project_path: Path,
*,
source: str | None = None,
artifact_type: str | None = None,
) -> list[dict[str, Any]]:
"""List registered artifacts, with optional filters.
:param project_path: Path to the project directory.
:param source: Filter by source server name.
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
:returns: List of matching artifact dicts.
"""
artifacts = self._load_artifacts(project_path)
if source:
artifacts = [a for a in artifacts if a.get("source_server") == source]
if artifact_type:
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
return artifacts
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
"""Get a single artifact by its container path.
:param project_path: Path to the project directory.
:param path: Container path of the artifact (e.g. /app/output/...).
:returns: Artifact dict, or None if not found.
"""
artifacts = self._load_artifacts(project_path)
for artifact in artifacts:
if artifact["path"] == path:
return artifact
return None
# ------------------------------------------------------------------
# Reports
# ------------------------------------------------------------------
def list_execution_metadata(self, project_path: Path) -> list[dict[str, Any]]:
"""Load full execution metadata for all runs, sorted oldest-first.
:param project_path: Path to the project directory.
:returns: List of full metadata dicts (includes arguments, result).
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
metadata: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir()):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
try:
metadata.append(json.loads(meta_path.read_text()))
except (json.JSONDecodeError, OSError):
continue
return metadata
def save_report(
self,
project_path: Path,
content: str,
fmt: str = "markdown",
) -> Path:
"""Save a generated report to .fuzzforge/reports/.
:param project_path: Path to the project directory.
:param content: Report content string.
:param fmt: Format name used to choose file extension.
:returns: Path to the saved report file.
"""
reports_dir = self._get_project_path(project_path) / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
ext_map = {"markdown": "md", "json": "json", "sarif": "sarif"}
ext = ext_map.get(fmt, "md")
filename = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_report.{ext}"
report_path = reports_dir / filename
report_path.write_text(content)
logger.info("Saved report: %s", report_path)
return report_path
def list_reports(self, project_path: Path) -> list[dict[str, Any]]:
"""List generated reports for a project, newest first.
:param project_path: Path to the project directory.
:returns: List of report dicts with filename, host_path, size, created_at.
"""
reports_dir = self._get_project_path(project_path) / "reports"
if not reports_dir.exists():
return []
reports: list[dict[str, Any]] = []
for report_path in sorted(reports_dir.iterdir(), reverse=True):
if report_path.is_file():
stat = report_path.stat()
reports.append({
"filename": report_path.name,
"host_path": str(report_path),
"size": stat.st_size,
"created_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
})
return reports
# ------------------------------------------------------------------
# Skill packs
# ------------------------------------------------------------------
#: Directory containing built-in skill packs shipped with FuzzForge.
_BUILTIN_SKILLS_DIR: Path = Path(__file__).parent / "skills"
def _skill_dirs(self, project_path: Path) -> list[Path]:
"""Return skill directories in priority order (project-local first).
:param project_path: Path to the project directory.
:returns: List of directories that may contain skill YAML files.
"""
dirs: list[Path] = []
project_skills = self._get_project_path(project_path) / "skills"
if project_skills.is_dir():
dirs.append(project_skills)
if self._BUILTIN_SKILLS_DIR.is_dir():
dirs.append(self._BUILTIN_SKILLS_DIR)
return dirs
def list_skills(self, project_path: Path) -> list[dict[str, Any]]:
"""List available skill packs from project and built-in directories.
:param project_path: Path to the project directory.
:returns: List of skill summaries (name, description first line, source).
"""
seen: set[str] = set()
skills: list[dict[str, Any]] = []
for skill_dir in self._skill_dirs(project_path):
for yaml_path in sorted(skill_dir.glob("*.yaml")):
skill = self._parse_skill_file(yaml_path)
if skill is None:
continue
name = skill["name"]
if name in seen:
continue # project-local overrides built-in
seen.add(name)
desc = skill.get("description", "")
first_line = desc.strip().split("\n", 1)[0] if desc else ""
is_project = ".fuzzforge" in str(yaml_path.parent)
source = "project" if is_project else "builtin"
skills.append({
"name": name,
"summary": first_line,
"source": source,
"servers": skill.get("servers", []),
})
return skills
def load_skill(self, project_path: Path, name: str) -> dict[str, Any] | None:
"""Load a skill pack by name.
Searches project-local skills first, then built-in skills.
:param project_path: Path to the project directory.
:param name: Skill name (filename without .yaml extension).
:returns: Parsed skill dict with name, description, servers — or None.
"""
for skill_dir in self._skill_dirs(project_path):
yaml_path = skill_dir / f"{name}.yaml"
if yaml_path.is_file():
return self._parse_skill_file(yaml_path)
return None
@staticmethod
def _parse_skill_file(yaml_path: Path) -> dict[str, Any] | None:
"""Parse and validate a skill YAML file.
:param yaml_path: Path to the YAML file.
:returns: Parsed skill dict, or None if invalid.
"""
try:
data = yaml.safe_load(yaml_path.read_text())
except (yaml.YAMLError, OSError):
logger.warning("Failed to parse skill file: %s", yaml_path)
return None
if not isinstance(data, dict):
return None
name = data.get("name")
if not name or not isinstance(name, str):
logger.warning("Skill file missing 'name': %s", yaml_path)
return None
return {
"name": name,
"description": data.get("description", ""),
"servers": data.get("servers", []),
}

View File

@@ -2,12 +2,13 @@
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, projects
from fuzzforge_mcp.tools import hub, projects, reports
mcp: FastMCP = FastMCP()
mcp.mount(projects.mcp)
mcp.mount(hub.mcp)
mcp.mount(reports.mcp)
__all__ = [
"mcp",

View File

@@ -10,21 +10,51 @@ through the FuzzForge hub. AI agents can:
from __future__ import annotations
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_common.hub import HubExecutor, HubServerConfig, HubServerType
from fuzzforge_mcp.dependencies import get_settings
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
mcp: FastMCP = FastMCP()
# Name of the convention tool that hub servers can implement to provide
# rich usage context for AI agents (known issues, workflow tips, rules, etc.).
_AGENT_CONTEXT_TOOL = "get_agent_context"
# Global hub executor instance (lazy initialization)
_hub_executor: HubExecutor | None = None
async def _fetch_agent_context(
executor: HubExecutor,
server_name: str,
tools: list[Any],
) -> str | None:
"""Call get_agent_context if the server provides it.
Returns the context string, or None if the server doesn't implement
the convention or the call fails.
"""
if not any(t.name == _AGENT_CONTEXT_TOOL for t in tools):
return None
try:
result = await executor.execute_tool(
identifier=f"hub:{server_name}:{_AGENT_CONTEXT_TOOL}",
arguments={},
)
if result.success and result.result:
content = result.result.get("content", [])
if content and isinstance(content, list):
text: str = content[0].get("text", "")
return text
except Exception: # noqa: BLE001, S110 - best-effort context fetch
pass
return None
def _get_hub_executor() -> HubExecutor:
"""Get or create the hub executor instance.
@@ -51,12 +81,15 @@ def _get_hub_executor() -> HubExecutor:
@mcp.tool
async def list_hub_servers() -> dict[str, Any]:
async def list_hub_servers(category: str | None = None) -> dict[str, Any]:
"""List all registered MCP hub servers.
Returns information about configured hub servers, including
their connection type, status, and discovered tool count.
:param category: Optional category to filter by (e.g. "binary-analysis",
"web-security", "reconnaissance"). Only servers in this category
are returned.
:return: Dictionary with list of hub servers.
"""
@@ -64,6 +97,9 @@ async def list_hub_servers() -> dict[str, Any]:
executor = _get_hub_executor()
servers = executor.list_servers()
if category:
servers = [s for s in servers if s.get("category") == category]
return {
"servers": servers,
"count": len(servers),
@@ -94,7 +130,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
if server_name:
tools = await executor.discover_server_tools(server_name)
return {
# Convention: auto-fetch agent context if server provides it.
agent_context = await _fetch_agent_context(executor, server_name, tools)
# Hide the convention tool from the agent's tool list.
visible_tools = [t for t in tools if t.name != "get_agent_context"]
result: dict[str, Any] = {
"server": server_name,
"tools": [
{
@@ -103,15 +146,24 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"description": t.description,
"parameters": [p.model_dump() for p in t.parameters],
}
for t in tools
for t in visible_tools
],
"count": len(tools),
"count": len(visible_tools),
}
if agent_context:
result["agent_context"] = agent_context
return result
else:
results = await executor.discover_all_tools()
all_tools = []
contexts: dict[str, str] = {}
for server, tools in results.items():
ctx = await _fetch_agent_context(executor, server, tools)
if ctx:
contexts[server] = ctx
for tool in tools:
if tool.name == "get_agent_context":
continue
all_tools.append({
"identifier": tool.identifier,
"name": tool.name,
@@ -120,11 +172,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"parameters": [p.model_dump() for p in tool.parameters],
})
return {
result = {
"servers_discovered": len(results),
"tools": all_tools,
"count": len(all_tools),
}
if contexts:
result["agent_contexts"] = contexts
return result
except Exception as e:
if isinstance(e, ToolError):
@@ -173,21 +228,96 @@ async def execute_hub_tool(
:return: Tool execution result.
Example identifiers:
- "hub:binwalk-mcp:binwalk_scan"
- "hub:yara-mcp:yara_scan_with_rules"
- "hub:nmap:nmap_scan"
- "nmap:nmap_scan"
- "hub:nuclei:nuclei_scan"
FILE ACCESS — if set_project_assets was called, the assets directory is
mounted read-only inside the container at two standard paths:
- /app/uploads/ (used by binwalk, and tools with UPLOAD_DIR)
- /app/samples/ (used by yara, capa, and tools with SAMPLES_DIR)
Always use /app/uploads/<filename> or /app/samples/<filename> when
passing file paths to hub tools — do NOT use the host path.
Tool outputs are persisted to a writable shared volume:
- /app/output/ (writable — extraction results, reports, etc.)
Files written here survive container destruction and are available
to subsequent tool calls. The host path is .fuzzforge/output/.
"""
try:
executor = _get_hub_executor()
# Inject project assets as Docker volume mounts if configured.
# Mounts the assets directory at the standard paths used by hub tools:
# /app/uploads — binwalk, and other tools that use UPLOAD_DIR
# /app/samples — yara, capa, and other tools that use SAMPLES_DIR
# /app/output — writable volume for tool outputs (persists across calls)
extra_volumes: list[str] = []
try:
storage = get_storage()
project_path = get_project_path()
assets_path = storage.get_project_assets_path(project_path)
if assets_path:
assets_str = str(assets_path)
extra_volumes = [
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
except Exception: # noqa: BLE001 - never block tool execution due to asset injection failure
extra_volumes = []
result = await executor.execute_tool(
identifier=identifier,
arguments=arguments or {},
timeout=timeout,
extra_volumes=extra_volumes or None,
)
return result.to_dict()
# Record execution history for list_executions / get_execution_results.
try:
storage = get_storage()
project_path = get_project_path()
storage.record_execution(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
arguments=arguments or {},
result=result.to_dict(),
)
except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues
pass
# Scan for new artifacts produced by the tool in /app/output.
response = result.to_dict()
try:
storage = get_storage()
project_path = get_project_path()
new_artifacts = storage.scan_artifacts(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
)
if new_artifacts:
response["artifacts"] = [
{"path": a["path"], "type": a["type"], "size": a["size"]}
for a in new_artifacts
]
except Exception: # noqa: BLE001, S110 - never fail the tool call due to artifact scanning
pass
# Append workflow suggestions based on hints configured for this tool.
try:
hint = executor.registry.get_workflow_hint(result.tool_name)
if hint:
response["suggested_next_steps"] = hint
except Exception: # noqa: BLE001, S110 - never fail the tool call due to hint lookup
pass
return response
except Exception as e:
if isinstance(e, ToolError):
@@ -335,7 +465,25 @@ async def start_hub_server(server_name: str) -> dict[str, Any]:
try:
executor = _get_hub_executor()
result = await executor.start_persistent_server(server_name)
# Inject project assets as Docker volume mounts (same logic as execute_hub_tool).
extra_volumes: list[str] = []
try:
storage = get_storage()
project_path = get_project_path()
assets_path = storage.get_project_assets_path(project_path)
if assets_path:
assets_str = str(assets_path)
extra_volumes = [
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
except Exception: # noqa: BLE001 - never block server start due to asset injection failure
extra_volumes = []
result = await executor.start_persistent_server(server_name, extra_volumes=extra_volumes or None)
return {
"success": True,

View File

@@ -8,23 +8,27 @@ from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
from fuzzforge_mcp.dependencies import (
get_active_skill,
get_project_path,
get_storage,
set_active_skill,
set_current_project_path,
)
mcp: FastMCP = FastMCP()
@mcp.tool
async def init_project(project_path: str | None = None) -> dict[str, Any]:
"""Initialize a new FuzzForge project.
"""Initialize a new FuzzForge project workspace.
Creates a `.fuzzforge/` directory inside the project for storing:
- config.json: Project configuration
- runs/: Execution results
Creates a `.fuzzforge/` directory for storing configuration and execution results.
Call this once before using hub tools. The project path is a working directory
for FuzzForge state — it does not need to contain the files you want to analyze.
Use `set_project_assets` separately to specify the target files.
This should be called before executing hub tools.
:param project_path: Path to the project directory. If not provided, uses current directory.
:param project_path: Working directory for FuzzForge state. Defaults to current directory.
:return: Project initialization result.
"""
@@ -52,12 +56,13 @@ async def init_project(project_path: str | None = None) -> dict[str, Any]:
@mcp.tool
async def set_project_assets(assets_path: str) -> dict[str, Any]:
"""Set the initial assets (source code) for a project.
"""Set the directory containing target files to analyze.
This sets the DEFAULT source directory that will be mounted into
hub tool containers via volume mounts.
Points FuzzForge to the directory with your analysis targets
(firmware images, binaries, source code, etc.). This directory
is mounted read-only into hub tool containers.
:param assets_path: Path to the project source directory.
:param assets_path: Path to the directory containing files to analyze.
:return: Result including stored assets path.
"""
@@ -86,9 +91,9 @@ async def set_project_assets(assets_path: str) -> dict[str, Any]:
async def list_executions() -> dict[str, Any]:
"""List all executions for the current project.
Returns a list of execution IDs that can be used to retrieve results.
Returns execution summaries including server, tool, timestamp, and success status.
:return: List of execution IDs.
:return: List of execution summaries.
"""
storage = get_storage()
@@ -147,3 +152,166 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
except Exception as exception:
message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_artifacts(
source: str | None = None,
artifact_type: str | None = None,
) -> dict[str, Any]:
"""List all artifacts produced by hub tools in the current project.
Artifacts are files created by tool executions in /app/output/.
They are automatically tracked after each execute_hub_tool call.
:param source: Filter by source server name (e.g. "binwalk-mcp").
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
:return: List of artifacts with path, type, size, and source info.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifacts = storage.list_artifacts(
project_path,
source=source,
artifact_type=artifact_type,
)
return {
"success": True,
"artifacts": artifacts,
"count": len(artifacts),
}
except Exception as exception:
message: str = f"Failed to list artifacts: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_artifact(path: str) -> dict[str, Any]:
"""Get metadata for a specific artifact by its container path.
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
:return: Artifact metadata including path, type, size, source tool, and timestamps.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifact = storage.get_artifact(project_path, path)
if artifact is None:
return {
"success": False,
"path": path,
"error": "Artifact not found",
}
return {
"success": True,
"artifact": artifact,
}
except Exception as exception:
message: str = f"Failed to get artifact: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_skills() -> dict[str, Any]:
"""List available skill packs.
Skill packs provide domain-specific pipeline guidance for AI agents.
They describe analysis methodologies and list the hub servers needed.
Project-local skills (.fuzzforge/skills/) override built-in skills.
:return: List of available skills with name, summary, and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skills = storage.list_skills(project_path)
active = get_active_skill()
return {
"success": True,
"skills": skills,
"count": len(skills),
"active_skill": active["name"] if active else None,
}
except Exception as exception:
message: str = f"Failed to list skills: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def load_skill(name: str) -> dict[str, Any]:
"""Load a skill pack to guide the current analysis session.
A skill pack provides a pipeline description and specifies which
hub servers are relevant. Once loaded, the pipeline description
is available as context and only the listed servers need discovery.
:param name: Skill name (e.g. "firmware-analysis").
:return: Loaded skill with full description and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skill = storage.load_skill(project_path, name)
if skill is None:
return {
"success": False,
"name": name,
"error": f"Skill '{name}' not found. Use list_skills to see available skills.",
}
set_active_skill(skill)
return {
"success": True,
"name": skill["name"],
"description": skill["description"],
"servers": skill["servers"],
"message": f"Skill '{name}' loaded. Follow the pipeline description above. "
f"Discover tools from: {', '.join(skill['servers'])}" if skill["servers"] else f"Skill '{name}' loaded.",
}
except Exception as exception:
message: str = f"Failed to load skill: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def unload_skill() -> dict[str, Any]:
"""Unload the currently active skill pack.
Clears the active pipeline guidance. The agent returns to generic mode.
:return: Confirmation of unload.
"""
active = get_active_skill()
if active is None:
return {
"success": True,
"message": "No skill was loaded.",
}
name = active["name"]
set_active_skill(None)
return {
"success": True,
"message": f"Skill '{name}' unloaded.",
}

View File

@@ -0,0 +1,346 @@
"""Report generation tools for FuzzForge MCP."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
# Maximum characters of tool output to embed per execution in markdown reports.
_OUTPUT_TRUNCATE_CHARS: int = 2000
# ------------------------------------------------------------------
# Formatting helpers
# ------------------------------------------------------------------
def _format_size(size: int) -> str:
"""Format a byte count as a human-friendly string."""
for unit in ("B", "KB", "MB", "GB"):
if size < 1024: # noqa: PLR2004
return f"{size} {unit}" if unit == "B" else f"{size:.1f} {unit}"
size //= 1024
return f"{size:.1f} TB"
def _truncate(text: str, max_chars: int = _OUTPUT_TRUNCATE_CHARS) -> str:
"""Truncate text and append an indicator when truncated."""
if len(text) <= max_chars:
return text
omitted = len(text) - max_chars
return text[:max_chars] + f"\n... [{omitted} chars omitted]"
def _extract_output_text(result: dict[str, Any]) -> str:
"""Extract a human-readable output string from an execution result dict.
Handles both flat dicts (``{"output": "..."}`` or ``{"content": [...]}``),
and the nested format stored by ``record_execution`` where the MCP tool
response is stored one level deeper under the ``"result"`` key.
"""
# Flat output field (most hub tools set this)
output = result.get("output", "")
if output and isinstance(output, str):
return output
# MCP content list format — check both at this level and one level down
for candidate in (result, result.get("result") or {}):
content = candidate.get("content", [])
if isinstance(content, list):
texts = [item.get("text", "") for item in content if isinstance(item, dict)]
combined = "\n".join(t for t in texts if t)
if combined:
return combined
parts: list[str] = []
if result.get("stdout"):
parts.append(f"stdout:\n{result['stdout']}")
if result.get("stderr"):
parts.append(f"stderr:\n{result['stderr']}")
return "\n".join(parts)
# ------------------------------------------------------------------
# Report builders
# ------------------------------------------------------------------
def _report_header(
title: str,
project_path: Path,
assets_path: Path | None,
now: str,
) -> list[str]:
"""Build the header block of the Markdown report."""
lines = [
f"# {title}",
"",
f"**Generated:** {now} ",
f"**Project:** `{project_path}` ",
]
if assets_path:
lines.append(f"**Assets:** `{assets_path}` ")
lines += ["", "---", ""]
return lines
def _report_summary(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the summary table block of the Markdown report."""
success_count = sum(1 for e in executions if e.get("success"))
fail_count = len(executions) - success_count
tool_ids = list(dict.fromkeys(
f"{e.get('server', '?')}:{e.get('tool', '?')}" for e in executions
))
timestamps = [e["timestamp"] for e in executions if e.get("timestamp")]
lines = [
"## Summary",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Total executions | {len(executions)} |",
f"| Successful | {success_count} |",
f"| Failed | {fail_count} |",
f"| Artifacts produced | {len(artifacts)} |",
f"| Unique tools | {len(set(tool_ids))} |",
]
if len(timestamps) >= 2: # noqa: PLR2004
lines.append(f"| Time range | {timestamps[0]}{timestamps[-1]} |")
elif timestamps:
lines.append(f"| Time | {timestamps[0]} |")
lines.append("")
if tool_ids:
lines += [", ".join(f"`{t}`" for t in tool_ids), ""]
lines[-2] = f"**Tools used:** {lines[-2]}"
lines += ["---", ""]
return lines
def _report_timeline(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the execution timeline block of the Markdown report."""
if not executions:
return []
lines: list[str] = ["## Execution Timeline", ""]
for idx, meta in enumerate(executions, 1):
server = meta.get("server", "unknown")
tool = meta.get("tool", "unknown")
ts = meta.get("timestamp", "")
status = "✓ Success" if meta.get("success") else "✗ Failed"
lines.append(f"### [{idx}] {server} :: {tool}{ts}")
lines += ["", f"- **Status:** {status}"]
arguments = meta.get("arguments") or {}
if arguments:
lines.append("- **Arguments:**")
for k, v in arguments.items():
lines.append(f" - `{k}`: `{v}`")
result = meta.get("result") or {}
output_text = _extract_output_text(result).strip()
if output_text:
truncated = _truncate(output_text)
lines += ["- **Output:**", " ```"]
lines.extend(f" {line}" for line in truncated.splitlines())
lines.append(" ```")
exec_artifacts = [
a for a in artifacts
if a.get("source_server") == server and a.get("source_tool") == tool
]
if exec_artifacts:
lines.append(f"- **Artifacts produced:** {len(exec_artifacts)} file(s)")
lines.append("")
return lines
def _report_artifacts(artifacts: list[dict[str, Any]]) -> list[str]:
"""Build the artifacts section of the Markdown report."""
if not artifacts:
return []
lines: list[str] = ["---", "", "## Artifacts", "", f"**{len(artifacts)} file(s) total**", ""]
by_type: dict[str, list[dict[str, Any]]] = {}
for a in artifacts:
by_type.setdefault(a.get("type", "unknown"), []).append(a)
for art_type, arts in sorted(by_type.items()):
lines += [
f"### {art_type} ({len(arts)})",
"",
"| Path | Size | Source |",
"|------|------|--------|",
]
for a in arts:
path = a.get("path", "")
size = _format_size(a.get("size", 0))
source = f"`{a.get('source_server', '?')}:{a.get('source_tool', '?')}`"
lines.append(f"| `{path}` | {size} | {source} |")
lines.append("")
return lines
def _build_markdown_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a Markdown-formatted analysis report."""
now = datetime.now(tz=UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
lines: list[str] = (
_report_header(title, project_path, assets_path, now)
+ _report_summary(executions, artifacts)
+ _report_timeline(executions, artifacts)
+ _report_artifacts(artifacts)
+ ["---", "", "*Generated by FuzzForge*", ""]
)
return "\n".join(lines)
def _build_json_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a JSON-formatted analysis report."""
success_count = sum(1 for e in executions if e.get("success"))
report = {
"title": title,
"generated_at": datetime.now(tz=UTC).isoformat(),
"project_path": str(project_path),
"assets_path": str(assets_path) if assets_path else None,
"summary": {
"total_executions": len(executions),
"successful": success_count,
"failed": len(executions) - success_count,
"artifact_count": len(artifacts),
},
"executions": executions,
"artifacts": artifacts,
}
return json.dumps(report, indent=2, default=str)
def _write_to_path(content: str, path: Path) -> None:
"""Write report content to an explicit output path (sync helper)."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
# ------------------------------------------------------------------
# MCP tools
# ------------------------------------------------------------------
@mcp.tool
async def generate_report(
title: str | None = None,
report_format: str = "markdown",
output_path: str | None = None,
) -> dict[str, Any]:
"""Generate a comprehensive analysis report for the current project.
Aggregates all execution history, tool outputs, and tracked artifacts
into a structured report. The report is saved to `.fuzzforge/reports/`
and its content is returned so the agent can read it immediately.
:param title: Optional report title. Defaults to the project folder name.
:param report_format: Output format — ``"markdown"`` (default) or ``"json"``.
:param output_path: Optional absolute path to save the report. When omitted,
the report is saved automatically to `.fuzzforge/reports/`.
:return: Report content, save path, and counts of included items.
"""
storage = get_storage()
project_path = get_project_path()
try:
fmt = report_format.lower().strip()
if fmt not in ("markdown", "json"):
return {
"success": False,
"error": f"Unsupported format '{fmt}'. Use 'markdown' or 'json'.",
}
executions = storage.list_execution_metadata(project_path)
artifacts = storage.list_artifacts(project_path)
assets_path = storage.get_project_assets_path(project_path)
resolved_title = title or f"FuzzForge Analysis Report — {project_path.name}"
if fmt == "json":
content = _build_json_report(
resolved_title, project_path, assets_path, executions, artifacts
)
else:
content = _build_markdown_report(
resolved_title, project_path, assets_path, executions, artifacts
)
if output_path:
save_path = Path(output_path)
_write_to_path(content, save_path)
else:
save_path = storage.save_report(project_path, content, fmt)
return {
"success": True,
"report_path": str(save_path),
"format": fmt,
"executions_included": len(executions),
"artifacts_included": len(artifacts),
"content": content,
}
except Exception as exception:
message: str = f"Failed to generate report: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_reports() -> dict[str, Any]:
"""List all generated reports for the current project.
Reports are stored in `.fuzzforge/reports/` and are ordered newest-first.
:return: List of report files with filename, path, size, and creation time.
"""
storage = get_storage()
project_path = get_project_path()
try:
reports = storage.list_reports(project_path)
return {
"success": True,
"reports": reports,
"count": len(reports),
}
except Exception as exception:
message: str = f"Failed to list reports: {exception}"
raise ToolError(message) from exception

View File

@@ -1,126 +1,5 @@
{
"servers": [
{
"name": "nmap-mcp",
"description": "Network reconnaissance using Nmap - port scanning, service detection, OS fingerprinting",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"enabled": true
},
{
"name": "binwalk-mcp",
"description": "Firmware extraction and analysis using Binwalk - file signatures, entropy analysis, embedded file extraction",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "yara-mcp",
"description": "Pattern matching and malware classification using YARA rules",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "capa-mcp",
"description": "Static capability detection using capa - identifies malware capabilities in binaries",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "radare2-mcp",
"description": "Binary analysis and reverse engineering using radare2",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "ghidra-mcp",
"description": "Advanced binary decompilation and reverse engineering using Ghidra",
"type": "docker",
"image": "ghcr.io/clearbluejar/pyghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "searchsploit-mcp",
"description": "CVE and exploit search using SearchSploit / Exploit-DB",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "nuclei-mcp",
"description": "Vulnerability scanning using Nuclei templates",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "trivy-mcp",
"description": "Container and filesystem vulnerability scanning using Trivy",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "gitleaks-mcp",
"description": "Secret and credential detection in code and firmware using Gitleaks",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "bloodhound-mcp",
"description": "bloodhound-mcp \u2014 active-directory",
@@ -129,7 +8,46 @@
"category": "active-directory",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "binwalk-mcp",
"description": "binwalk-mcp \u2014 binary-analysis",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "capa-mcp",
"description": "capa-mcp \u2014 binary-analysis",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ghidra-mcp",
"description": "ghidra-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -142,7 +60,33 @@
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "radare2-mcp",
"description": "radare2-mcp \u2014 binary-analysis",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "yara-mcp",
"description": "yara-mcp \u2014 binary-analysis",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -155,7 +99,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -168,7 +112,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -181,7 +125,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -194,7 +138,7 @@
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -207,7 +151,20 @@
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "trivy-mcp",
"description": "trivy-mcp \u2014 cloud-security",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -220,7 +177,20 @@
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "searchsploit-mcp",
"description": "searchsploit-mcp \u2014 exploitation",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -233,7 +203,7 @@
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -246,7 +216,7 @@
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -259,7 +229,7 @@
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -272,7 +242,7 @@
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -285,7 +255,7 @@
"category": "password-cracking",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -300,7 +270,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -315,7 +285,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -330,7 +300,22 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nmap-mcp",
"description": "nmap-mcp \u2014 reconnaissance",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -345,7 +330,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -360,7 +345,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -375,7 +360,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -390,7 +375,20 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "gitleaks-mcp",
"description": "gitleaks-mcp \u2014 secrets",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -403,7 +401,7 @@
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -416,7 +414,7 @@
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -431,7 +429,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -446,7 +444,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -461,7 +459,22 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nuclei-mcp",
"description": "nuclei-mcp \u2014 web-security",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -476,7 +489,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -491,12 +504,63 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-analyzer-mcp",
"description": "Go static analysis: fuzzable entry points, existing Fuzz* targets, unsafe/cgo usage, CVE scanning via govulncheck",
"type": "docker",
"image": "go-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-harness-tester-mcp",
"description": "Test Go fuzz harness quality: compilation, seed execution, fuzzing trial, quality scoring 0-100",
"type": "docker",
"image": "go-harness-tester-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-fuzzer-mcp",
"description": "Run Go native fuzzing (go test -fuzz) with blocking and continuous modes, crash collection, session management",
"type": "docker",
"image": "go-fuzzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-crash-analyzer-mcp",
"description": "Analyze Go fuzzing crashes: reproduce, classify (nil-deref, OOR, panic, race, etc.), deduplicate by stack signature",
"type": "docker",
"image": "go-crash-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
}
],
"default_timeout": 300,
"cache_tools": true
"workflow_hints_file": "mcp-security-hub/workflow-hints.json"
}

View File

@@ -1,12 +1,17 @@
[project]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md"
requires-python = ">=3.14"
authors = [
{ name = "FuzzingLabs", email = "contact@fuzzinglabs.com" }
]
dependencies = [
"fuzzforge-cli",
"fuzzforge-mcp",
"fuzzforge-common",
]
[project.optional-dependencies]
dev = [

12
uv.lock generated
View File

@@ -432,6 +432,7 @@ dependencies = [
{ name = "fuzzforge-common" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyyaml" },
{ name = "structlog" },
]
@@ -460,6 +461,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" },
{ name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" },
{ name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" },
{ name = "structlog", specifier = "==25.5.0" },
]
@@ -467,8 +469,13 @@ provides-extras = ["lints", "tests"]
[[package]]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
source = { virtual = "." }
dependencies = [
{ name = "fuzzforge-cli" },
{ name = "fuzzforge-common" },
{ name = "fuzzforge-mcp" },
]
[package.optional-dependencies]
dev = [
@@ -482,7 +489,10 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "fuzzforge-cli", editable = "fuzzforge-cli" },
{ name = "fuzzforge-common", editable = "fuzzforge-common" },
{ name = "fuzzforge-common", marker = "extra == 'dev'", editable = "fuzzforge-common" },
{ name = "fuzzforge-mcp", editable = "fuzzforge-mcp" },
{ name = "fuzzforge-mcp", marker = "extra == 'dev'", editable = "fuzzforge-mcp" },
{ name = "fuzzforge-tests", marker = "extra == 'dev'", editable = "fuzzforge-tests" },
{ name = "pytest", marker = "extra == 'dev'", specifier = "==9.0.2" },