Compare commits

...

25 Commits

Author SHA1 Message Date
AFredefon
664278da3f feat: implement artifact management tools 2026-04-07 16:06:47 +02:00
AFredefon
9374fd3aee feat: implement workflow suggestions pipeline 2026-04-07 01:50:21 +02:00
AFredefon
2e96517d11 chore: reset hub-config.json to empty default and gitignore it 2026-03-17 08:08:31 +01:00
AFredefon
575b90f8d4 docs: rewrite README for hub-centric architecture, remove demo GIFs 2026-03-17 07:58:26 +01:00
AFredefon
c59b6ba81a fix(mcp): fix mypy type error in executions resource and improve tool docstrings 2026-03-17 04:21:06 +01:00
AFredefon
a51c495d34 feat(mcp): update application instructions and hub config 2026-03-16 02:10:16 +01:00
AFredefon
7924e44245 feat(hub): volume mounts, get_agent_context convention, category filter 2026-03-16 02:09:04 +01:00
AFredefon
a824809294 feat(mcp): add project assets storage and output directory management 2026-03-16 02:08:20 +01:00
AFredefon
bc5e9373ce fix: document mount paths in execute_hub_tool and inject volumes into persistent sessions 2026-03-11 07:55:58 +01:00
AFredefon
73a0170d65 fix: resolve mypy type errors in TUI app and build_log screen 2026-03-11 07:09:58 +01:00
AFredefon
6cdd0caec0 fix: suppress BLE001 for intentional broad catch in execute_hub_tool 2026-03-11 07:00:25 +01:00
AFredefon
462f6ed408 fix: resolve ruff lint errors in TUI modules 2026-03-11 06:57:38 +01:00
AFredefon
9cfbc29677 fix: add noqa for optional git URL fetch exception 2026-03-11 06:49:37 +01:00
AFredefon
6ced81affc fix: inject project assets as Docker volume mounts in execute_hub_tool 2026-03-11 06:46:51 +01:00
AFredefon
b975d285c6 tui: fix single-click buttons and double-modal push 2026-03-11 05:30:29 +01:00
AFredefon
1891a43189 tui: background image builds with live log viewer 2026-03-11 04:42:25 +01:00
AFredefon
a3441676a3 tui: in-UI image building, hub registry auto-recovery, clean hub-config 2026-03-11 03:02:36 +01:00
AFredefon
f192771b9b fix: improve new user experience and docs 2026-03-11 02:20:18 +01:00
AFredefon
976947cf5c feat: add FUZZFORGE_USER_DIR env var to override user-global data dir 2026-03-11 02:09:06 +01:00
AFredefon
544569ddbd fix: use ~/.fuzzforge for user-global data, keep workspace .fuzzforge for project storage 2026-03-11 02:04:51 +01:00
AFredefon
6f967fff63 fix: find_fuzzforge_root searches cwd first instead of __file__ 2026-03-11 01:41:47 +01:00
AFredefon
47c254e2bd fix: add workspace packages as root deps so uv sync installs everything 2026-03-11 01:32:17 +01:00
AFredefon
b137f48e7f fix(ci): use uv sync 2026-03-11 01:17:43 +01:00
AFredefon
f8002254e5 ci: add GitHub Actions workflows with lint, typecheck and tests 2026-03-11 01:13:35 +01:00
AFredefon
f2dca0a7e7 Merge pull request #45 from FuzzingLabs/feature/tui-agent-setup
feat(tui): add terminal UI with hub and agent management
2026-03-10 04:11:50 +01:00
32 changed files with 1776 additions and 398 deletions

86
.github/workflows/ci.yml vendored Normal file
View File

@@ -0,0 +1,86 @@
name: CI
on:
push:
branches: [main, dev, feature/*]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
lint-and-typecheck:
name: Lint & Type Check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync
- name: Ruff check (fuzzforge-cli)
run: |
cd fuzzforge-cli
uv run --extra lints ruff check src/
- name: Ruff check (fuzzforge-mcp)
run: |
cd fuzzforge-mcp
uv run --extra lints ruff check src/
- name: Ruff check (fuzzforge-common)
run: |
cd fuzzforge-common
uv run --extra lints ruff check src/
- name: Mypy type check (fuzzforge-cli)
run: |
cd fuzzforge-cli
uv run --extra lints mypy src/
- name: Mypy type check (fuzzforge-mcp)
run: |
cd fuzzforge-mcp
uv run --extra lints mypy src/
# NOTE: Mypy check for fuzzforge-common temporarily disabled
# due to 37 pre-existing type errors in legacy code.
# TODO: Fix type errors and re-enable strict checking
#- name: Mypy type check (fuzzforge-common)
# run: |
# cd fuzzforge-common
# uv run --extra lints mypy src/
test:
name: Tests
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Run MCP tests
run: |
cd fuzzforge-mcp
uv run --extra tests pytest -v
- name: Run common tests
run: |
cd fuzzforge-common
uv run --extra tests pytest -v

49
.github/workflows/mcp-server.yml vendored Normal file
View File

@@ -0,0 +1,49 @@
name: MCP Server Smoke Test
on:
push:
branches: [main, dev]
pull_request:
branches: [main, dev]
workflow_dispatch:
jobs:
mcp-server:
name: MCP Server Test
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: "latest"
- name: Set up Python
run: uv python install 3.14
- name: Install dependencies
run: uv sync --all-extras
- name: Start MCP server in background
run: |
cd fuzzforge-mcp
nohup uv run python -m fuzzforge_mcp.server > server.log 2>&1 &
echo $! > server.pid
sleep 3
- name: Run MCP tool tests
run: |
cd fuzzforge-mcp
uv run --extra tests pytest tests/test_resources.py -v
- name: Stop MCP server
if: always()
run: |
if [ -f fuzzforge-mcp/server.pid ]; then
kill $(cat fuzzforge-mcp/server.pid) || true
fi
- name: Show server logs
if: failure()
run: cat fuzzforge-mcp/server.log || true

3
.gitignore vendored
View File

@@ -10,3 +10,6 @@ __pycache__
# Podman/Docker container storage artifacts
~/.fuzzforge/
# User-specific hub config (generated at runtime)
hub-config.json

242
README.md
View File

@@ -17,9 +17,9 @@
<sub>
<a href="#-overview"><b>Overview</b></a> •
<a href="#-features"><b>Features</b></a> •
<a href="#-mcp-security-hub"><b>Security Hub</b></a> •
<a href="#-installation"><b>Installation</b></a> •
<a href="USAGE.md"><b>Usage Guide</b></a> •
<a href="#-modules"><b>Modules</b></a> •
<a href="#-contributing"><b>Contributing</b></a>
</sub>
</p>
@@ -32,39 +32,44 @@
## 🚀 Overview
**FuzzForge AI** is an open-source runtime that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
**FuzzForge AI** is an open-source MCP server that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
### The Core: Modules
FuzzForge connects your AI assistant to **MCP tool hubs** — collections of containerized security tools that the agent can discover, chain, and execute autonomously. Instead of manually running security tools, describe what you want and let your AI assistant handle it.
At the heart of FuzzForge are **modules** - containerized security tools that AI agents can discover, configure, and orchestrate. Each module encapsulates a specific security capability (static analysis, fuzzing, crash analysis, etc.) and runs in an isolated container.
### The Core: Hub Architecture
- **🔌 Plug & Play**: Modules are self-contained - just pull and run
- **🤖 AI-Native**: Designed for AI agent orchestration via MCP
- **🔗 Composable**: Chain modules together into automated workflows
- **📦 Extensible**: Build custom modules with the Python SDK
FuzzForge acts as a **meta-MCP server** — a single MCP endpoint that gives your AI agent access to tools from multiple MCP hub servers. Each hub server is a containerized security tool (Binwalk, YARA, Radare2, Nmap, etc.) that the agent can discover at runtime.
FuzzForge AI handles module discovery, execution, and result collection. Security modules (developed separately) provide the actual security tooling - from static analyzers to fuzzers to crash triagers.
- **🔍 Discovery**: The agent lists available hub servers and discovers their tools
- **🤖 AI-Native**: Hub tools provide agent context — usage tips, workflow guidance, and domain knowledge
- **🔗 Composable**: Chain tools from different hubs into automated pipelines
- **📦 Extensible**: Add your own MCP servers to the hub registry
Instead of manually running security tools, describe what you want and let your AI assistant handle it.
### 🎬 Use Case: Firmware Vulnerability Research
> **Scenario**: Analyze a firmware image to find security vulnerabilities — fully automated by an AI agent.
```
User: "Search for vulnerabilities in firmware.bin"
Agent → Binwalk: Extract filesystem from firmware image
Agent → YARA: Scan extracted files for vulnerability patterns
Agent → Radare2: Trace dangerous function calls in prioritized binaries
Agent → Report: 8 vulnerabilities found (2 critical, 4 high, 2 medium)
```
### 🎬 Use Case: Rust Fuzzing Pipeline
> **Scenario**: Fuzz a Rust crate to discover vulnerabilities using AI-assisted harness generation and parallel fuzzing.
<table align="center">
<tr>
<th>1⃣ Analyze, Generate & Validate Harnesses</th>
<th>2⃣ Run Parallel Continuous Fuzzing</th>
</tr>
<tr>
<td><img src="assets/demopart2.gif" alt="FuzzForge Demo - Analysis Pipeline" width="100%"></td>
<td><img src="assets/demopart1.gif" alt="FuzzForge Demo - Parallel Fuzzing" width="100%"></td>
</tr>
<tr>
<td align="center"><sub>AI agent analyzes code, generates harnesses, and validates they compile</sub></td>
<td align="center"><sub>Multiple fuzzing sessions run in parallel with live metrics</sub></td>
</tr>
</table>
```
User: "Fuzz the blurhash crate for vulnerabilities"
Agent → Rust Analyzer: Identify fuzzable functions and attack surface
Agent → Harness Gen: Generate and validate fuzzing harnesses
Agent → Cargo Fuzzer: Run parallel coverage-guided fuzzing sessions
Agent → Crash Analysis: Deduplicate and triage discovered crashes
```
---
@@ -82,13 +87,13 @@ If you find FuzzForge useful, please **star the repo** to support development!
| Feature | Description |
|---------|-------------|
| 🤖 **AI-Native** | Built for MCP - works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 📦 **Containerized** | Each module runs in isolation via Docker or Podman |
| 🔄 **Continuous Mode** | Long-running tasks (fuzzing) with real-time metrics streaming |
| 🔗 **Workflows** | Chain multiple modules together in automated pipelines |
| 🛠️ **Extensible** | Create custom modules with the Python SDK |
| 🏠 **Local First** | All execution happens on your machine - no cloud required |
| 🔒 **Secure** | Sandboxed containers with no network access by default |
| 🤖 **AI-Native** | Built for MCP works with GitHub Copilot, Claude, and any MCP-compatible agent |
| 🔌 **Hub System** | Connect to MCP tool hubs — each hub brings dozens of containerized security tools |
| 🔍 **Tool Discovery** | Agents discover available tools at runtime with built-in usage guidance |
| 🔗 **Pipelines** | Chain tools from different hubs into automated multi-step workflows |
| 🔄 **Persistent Sessions** | Long-running tools (Radare2, fuzzers) with stateful container sessions |
| 🏠 **Local First** | All execution happens on your machine no cloud required |
| 🔒 **Sandboxed** | Every tool runs in an isolated container via Docker or Podman |
---
@@ -102,27 +107,57 @@ If you find FuzzForge useful, please **star the repo** to support development!
┌─────────────────────────────────────────────────────────────────┐
│ FuzzForge MCP Server │
┌─────────────┐ ┌──────────────┐ ┌────────────────────────┐
│list_modules │ │execute_module│ │start_continuous_module │
───────────── ──────────────┘ └────────────────────────┘
Projects Hub Discovery Hub Execution
┌────────────── ──────────────────┐ ┌───────────────────
│ │init_project │ │list_hub_servers │ │execute_hub_tool │ │
│ │set_assets │ │discover_hub_tools│ │start_hub_server │ │
│ │list_results │ │get_tool_schema │ │stop_hub_server │ │
│ └──────────────┘ └──────────────────┘ └───────────────────┘ │
└───────────────────────────┬─────────────────────────────────────┘
Docker/Podman
┌─────────────────────────────────────────────────────────────────┐
FuzzForge Runner
Container Engine (Docker/Podman)
└────────────────────────────────────────────────────────────────
┌───────────────────┼───────────────────┐
▼ ▼
───────────────┐ ┌───────────────┐ ┌───────────────┐
Module A │ Module B │ │ Module C
(Container) │ │ (Container) (Container)
───────────────┘ └───────────────┘ └───────────────┘
MCP Hub Servers
│ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │
│ Binwalk YARA │ Radare2 │ │ Nmap │
6 tools │ │ 5 tools │ │ 32 tools │ │ 8 tools │ │
│ └───────────┘ └───────────┘ └───────────┘ └───────────┘
┌───────────┐ ─────────── ┌───────────┐ ┌───────────┐
│ Nuclei SQLMap │ │ Trivy │ │ ...
│ 7 tools │ │ 8 tools │ │ 7 tools │ │ 36 hubs │
└───────────┘ ─────────── └───────────┘ └───────────┘
└─────────────────────────────────────────────────────────────────┘
```
---
## 🔧 MCP Security Hub
FuzzForge ships with built-in support for the **[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)** — a collection of 36 production-ready, Dockerized MCP servers covering offensive security:
| Category | Servers | Examples |
|----------|---------|----------|
| 🔍 **Reconnaissance** | 8 | Nmap, Masscan, Shodan, WhatWeb |
| 🌐 **Web Security** | 6 | Nuclei, SQLMap, ffuf, Nikto |
| 🔬 **Binary Analysis** | 6 | Radare2, Binwalk, YARA, Capa, Ghidra |
| ⛓️ **Blockchain** | 3 | Medusa, Solazy, DAML Viewer |
| ☁️ **Cloud Security** | 3 | Trivy, Prowler, RoadRecon |
| 💻 **Code Security** | 1 | Semgrep |
| 🔑 **Secrets Detection** | 1 | Gitleaks |
| 💥 **Exploitation** | 1 | SearchSploit |
| 🎯 **Fuzzing** | 2 | Boofuzz, Dharma |
| 🕵️ **OSINT** | 2 | Maigret, DNSTwist |
| 🛡️ **Threat Intel** | 2 | VirusTotal, AlienVault OTX |
| 🏰 **Active Directory** | 1 | BloodHound |
> 185+ individual tools accessible through a single MCP connection.
The hub is open source and can be extended with your own MCP servers. See the [mcp-security-hub repository](https://github.com/FuzzingLabs/mcp-security-hub) for details.
---
## 📦 Installation
### Prerequisites
@@ -140,11 +175,20 @@ cd fuzzforge_ai
# Install dependencies
uv sync
# Build module images
make build-modules
```
### Link the Security Hub
```bash
# Clone the MCP Security Hub
git clone https://github.com/FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
# Build the Docker images for the hub tools
./scripts/build-hub-images.sh
```
Or use the terminal UI (`uv run fuzzforge ui`) to link hubs interactively.
### Configure MCP for Your AI Agent
```bash
@@ -165,81 +209,20 @@ uv run fuzzforge mcp status
---
## 📦 Modules
## 🧑‍💻 Usage
FuzzForge modules are containerized security tools that AI agents can orchestrate. The module ecosystem is designed around a simple principle: **the OSS runtime orchestrates, enterprise modules execute**.
Once installed, just talk to your AI agent:
### Module Ecosystem
| | FuzzForge AI | FuzzForge Enterprise Modules |
|---|---|---|
| **What** | Runtime & MCP server | Security research modules |
| **License** | Apache 2.0 | BSL 1.1 (Business Source License) |
| **Compatibility** | ✅ Runs any compatible module | ✅ Works with FuzzForge AI |
**Enterprise modules** are developed separately and provide production-ready security tooling:
| Category | Modules | Description |
|----------|---------|-------------|
| 🔍 **Static Analysis** | Rust Analyzer, Solidity Analyzer, Cairo Analyzer | Code analysis and fuzzable function detection |
| 🎯 **Fuzzing** | Cargo Fuzzer, Honggfuzz, AFL++ | Coverage-guided fuzz testing |
| 💥 **Crash Analysis** | Crash Triager, Root Cause Analyzer | Automated crash deduplication and analysis |
| 🔐 **Vulnerability Detection** | Pattern Matcher, Taint Analyzer | Security vulnerability scanning |
| 📝 **Reporting** | Report Generator, SARIF Exporter | Automated security report generation |
> 💡 **Build your own modules!** The FuzzForge SDK allows you to create custom modules that integrate seamlessly with FuzzForge AI. See [Creating Custom Modules](#-creating-custom-modules).
### Execution Modes
Modules run in two execution modes:
#### One-shot Execution
Run a module once and get results:
```python
result = execute_module("my-analyzer", assets_path="/path/to/project")
```
"What security tools are available?"
"Scan this firmware image for vulnerabilities"
"Analyze this binary with radare2"
"Run nuclei against https://example.com"
```
#### Continuous Execution
The agent will use FuzzForge to discover the right hub tools, chain them into a pipeline, and return results — all without you touching a terminal.
For long-running tasks like fuzzing, with real-time metrics:
```python
# Start continuous execution
session = start_continuous_module("my-fuzzer",
assets_path="/path/to/project",
configuration={"target": "my_target"})
# Check status with live metrics
status = get_continuous_status(session["session_id"])
# Stop and collect results
stop_continuous_module(session["session_id"])
```
---
## 🛠️ Creating Custom Modules
Build your own security modules with the FuzzForge SDK:
```python
from fuzzforge_modules_sdk import FuzzForgeModule, FuzzForgeModuleResults
class MySecurityModule(FuzzForgeModule):
def _run(self, resources):
self.emit_event("started", target=resources[0].path)
# Your analysis logic here
results = self.analyze(resources)
self.emit_progress(100, status="completed",
message=f"Analysis complete")
return FuzzForgeModuleResults.SUCCESS
```
📖 See the [Module SDK Guide](fuzzforge-modules/fuzzforge-modules-sdk/README.md) for details.
See the [Usage Guide](USAGE.md) for detailed setup and advanced workflows.
---
@@ -247,26 +230,17 @@ class MySecurityModule(FuzzForgeModule):
```
fuzzforge_ai/
├── fuzzforge-cli/ # Command-line interface
├── fuzzforge-mcp/ # MCP server — the core of FuzzForge
├── fuzzforge-cli/ # Command-line interface & terminal UI
├── fuzzforge-common/ # Shared abstractions (containers, storage)
├── fuzzforge-mcp/ # MCP server for AI agents
├── fuzzforge-modules/ # Security modules
│ └── fuzzforge-modules-sdk/ # Module development SDK
── fuzzforge-runner/ # Local execution engine
├── fuzzforge-types/ # Type definitions & schemas
└── demo/ # Demo projects for testing
├── fuzzforge-runner/ # Container execution engine (Docker/Podman)
├── fuzzforge-tests/ # Integration tests
├── mcp-security-hub/ # Default hub: 36 offensive security MCP servers
── scripts/ # Hub image build scripts
```
---
## 🗺️ What's Next
**[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub) integration** — Bridge 175+ offensive security tools (Nmap, Nuclei, Ghidra, and more) into FuzzForge workflows, all orchestrated by AI agents.
See [ROADMAP.md](ROADMAP.md) for the full roadmap.
---
## 🤝 Contributing
We welcome contributions from the community!
@@ -274,7 +248,7 @@ We welcome contributions from the community!
- 🐛 Report bugs via [GitHub Issues](../../issues)
- 💡 Suggest features or improvements
- 🔧 Submit pull requests
- 📦 Share your custom modules
- 🔌 Add new MCP servers to the [Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)
See [CONTRIBUTING.md](CONTRIBUTING.md) for guidelines.

View File

@@ -49,8 +49,11 @@ uv sync
uv run fuzzforge ui
# 3. Press 'h' → "FuzzingLabs Hub" to clone & link the default security hub
# 4. Select an agent row and press Enter to link it
# 5. Restart your AI agent and start talking:
# 4. Select an agent row and press Enter to install the MCP server for your agent
# 5. Build the Docker images for the hub tools (required before tools can run)
./scripts/build-hub-images.sh
# 6. Restart your AI agent and start talking:
# "What security tools are available?"
# "Scan this binary with binwalk and yara"
# "Analyze this Rust crate for fuzzable functions"
@@ -64,7 +67,10 @@ uv run fuzzforge mcp install copilot # For VS Code + GitHub Copilot
# OR
uv run fuzzforge mcp install claude-code # For Claude Code CLI
# Build hub tool images
# Clone and link the default security hub
git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-security-hub
# Build hub tool images (required — tools only run once their image is built)
./scripts/build-hub-images.sh
# Restart your AI agent — done!
@@ -399,13 +405,20 @@ uv run fuzzforge project results <id> # Get execution results
Configure FuzzForge using environment variables:
```bash
# Storage path for projects and execution results
export FUZZFORGE_STORAGE_PATH=/path/to/storage
# Override the FuzzForge installation root (auto-detected from cwd by default)
export FUZZFORGE_ROOT=/path/to/fuzzforge_ai
# Override the user-global data directory (default: ~/.fuzzforge)
# Useful for isolated testing without touching your real installation
export FUZZFORGE_USER_DIR=/tmp/my-fuzzforge-test
# Storage path for projects and execution results (default: <workspace>/.fuzzforge/storage)
export FUZZFORGE_STORAGE__PATH=/path/to/storage
# Container engine (Docker is default)
export FUZZFORGE_ENGINE__TYPE=docker # or podman
# Podman-specific settings
# Podman-specific container storage paths
export FUZZFORGE_ENGINE__GRAPHROOT=~/.fuzzforge/containers/storage
export FUZZFORGE_ENGINE__RUNROOT=~/.fuzzforge/containers/run
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 360 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 2.1 MiB

View File

@@ -13,3 +13,49 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/fuzzforge_cli/tui/**" = [
"ARG002", # unused method argument: callback signature
"BLE001", # blind exception: broad error handling in UI
"C901", # complexity: UI logic
"D107", # missing docstring in __init__: simple dataclasses
"FBT001", # boolean positional arg
"FBT002", # boolean default arg
"PLC0415", # import outside top-level: lazy loading
"PLR0911", # too many return statements
"PLR0912", # too many branches
"PLR2004", # magic value comparison
"RUF012", # mutable class default: Textual pattern
"S603", # subprocess: validated inputs
"S607", # subprocess: PATH lookup
"SIM108", # ternary: readability preference
"TC001", # TYPE_CHECKING: runtime type needs
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
"TRY300", # try-else: existing pattern
]
"tui/*.py" = [
"D107", # missing docstring in __init__: simple dataclasses
"TC001", # TYPE_CHECKING: runtime type needs
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
]
"src/fuzzforge_cli/commands/mcp.py" = [
"ARG001", # unused argument: callback signature
"B904", # raise from: existing pattern
"F841", # unused variable: legacy code
"FBT002", # boolean default arg
"PLR0912", # too many branches
"PLR0915", # too many statements
"SIM108", # ternary: readability preference
]
"src/fuzzforge_cli/application.py" = [
"B008", # function call in default: Path.cwd()
"PLC0415", # import outside top-level: lazy loading
]
"src/fuzzforge_cli/commands/projects.py" = [
"TC003", # TYPE_CHECKING: runtime type needs
]
"src/fuzzforge_cli/context.py" = [
"TC002", # TYPE_CHECKING: runtime type needs
"TC003", # TYPE_CHECKING: runtime type needs
]

View File

@@ -3,12 +3,12 @@
from pathlib import Path
from typing import Annotated
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
from typer import Context as TyperContext
from typer import Option, Typer
from fuzzforge_cli.commands import mcp, projects
from fuzzforge_cli.context import Context
from fuzzforge_mcp.storage import LocalStorage
application: Typer = Typer(
name="fuzzforge",

View File

@@ -12,7 +12,7 @@ import os
import sys
from enum import StrEnum
from pathlib import Path
from typing import Annotated
from typing import Annotated, Any
from rich.console import Console
from rich.panel import Panel
@@ -44,10 +44,10 @@ def _get_copilot_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Code" / "User" / "mcp.json"
elif sys.platform == "win32":
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Code" / "User" / "mcp.json"
else: # Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
# Linux
return Path.home() / ".config" / "Code" / "User" / "mcp.json"
def _get_claude_desktop_mcp_path() -> Path:
@@ -58,10 +58,10 @@ def _get_claude_desktop_mcp_path() -> Path:
"""
if sys.platform == "darwin":
return Path.home() / "Library" / "Application Support" / "Claude" / "claude_desktop_config.json"
elif sys.platform == "win32":
if sys.platform == "win32":
return Path(os.environ.get("APPDATA", "")) / "Claude" / "claude_desktop_config.json"
else: # Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
# Linux
return Path.home() / ".config" / "Claude" / "claude_desktop_config.json"
def _get_claude_code_mcp_path(project_path: Path | None = None) -> Path:
@@ -114,13 +114,13 @@ def _detect_docker_socket() -> str:
:returns: Path to the Docker socket.
"""
socket_paths = [
"/var/run/docker.sock",
socket_paths: list[Path] = [
Path("/var/run/docker.sock"),
Path.home() / ".docker" / "run" / "docker.sock",
]
for path in socket_paths:
if Path(path).exists():
if path.exists():
return str(path)
return "/var/run/docker.sock"
@@ -132,15 +132,22 @@ def _find_fuzzforge_root() -> Path:
:returns: Path to fuzzforge-oss directory.
"""
# Try to find from current file location
current = Path(__file__).resolve()
# Check environment variable override first
env_root = os.environ.get("FUZZFORGE_ROOT")
if env_root:
return Path(env_root).resolve()
# Walk up to find fuzzforge-oss root
# Walk up from cwd to find a fuzzforge root (hub-config.json is the marker)
for parent in [Path.cwd(), *Path.cwd().parents]:
if (parent / "hub-config.json").is_file():
return parent
# Fall back to __file__-based search (dev install inside fuzzforge-oss)
current = Path(__file__).resolve()
for parent in current.parents:
if (parent / "fuzzforge-mcp").is_dir():
return parent
# Fall back to cwd
return Path.cwd()
@@ -148,7 +155,7 @@ def _generate_mcp_config(
fuzzforge_root: Path,
engine_type: str,
engine_socket: str,
) -> dict:
) -> dict[str, Any]:
"""Generate MCP server configuration.
:param fuzzforge_root: Path to fuzzforge-oss installation.
@@ -167,9 +174,12 @@ def _generate_mcp_config(
command = "uv"
args = ["--directory", str(fuzzforge_root), "run", "fuzzforge-mcp"]
# Self-contained storage paths for FuzzForge containers
# This isolates FuzzForge from system Podman and avoids snap issues
fuzzforge_home = Path.cwd() / ".fuzzforge"
# User-global storage paths for FuzzForge containers.
# Kept under ~/.fuzzforge so images are built once and shared across
# all workspaces — regardless of where `fuzzforge mcp install` is run.
# Override with FUZZFORGE_USER_DIR for isolated testing.
user_dir_env = os.environ.get("FUZZFORGE_USER_DIR")
fuzzforge_home = Path(user_dir_env).resolve() if user_dir_env else Path.home() / ".fuzzforge"
graphroot = fuzzforge_home / "containers" / "storage"
runroot = fuzzforge_home / "containers" / "run"

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from fuzzforge_mcp.storage import LocalStorage
from fuzzforge_mcp.storage import LocalStorage # type: ignore[import-untyped]
if TYPE_CHECKING:
from typer import Context as TyperContext

View File

@@ -9,12 +9,16 @@ hub management capabilities.
from __future__ import annotations
from collections import defaultdict
from pathlib import Path
from typing import TYPE_CHECKING, Any
from rich.text import Text
from textual import events, work
from textual.app import App, ComposeResult
from textual.binding import Binding
from textual.containers import Horizontal, Vertical, VerticalScroll
from textual.widgets import Button, DataTable, Footer, Header, Label
from textual.message import Message
from textual.widgets import Button, DataTable, Footer, Header
from fuzzforge_cli.tui.helpers import (
check_agent_status,
@@ -24,11 +28,49 @@ from fuzzforge_cli.tui.helpers import (
load_hub_config,
)
if TYPE_CHECKING:
from fuzzforge_cli.commands.mcp import AIAgent
# Agent config entries stored alongside their linked status for row mapping
_AgentRow = tuple[str, "AIAgent", "Path", str, bool] # noqa: F821
_AgentRow = tuple[str, "AIAgent", Path, str, bool]
class FuzzForgeApp(App):
class SingleClickDataTable(DataTable[Any]):
"""DataTable subclass that also fires ``RowClicked`` on a single mouse click.
Textual's built-in ``RowSelected`` only fires on Enter or on a second click
of an already-highlighted row. ``RowClicked`` fires on every first click,
enabling single-click-to-act UX without requiring Enter.
"""
class RowClicked(Message):
"""Fired on every single mouse click on a data row."""
def __init__(self, data_table: SingleClickDataTable, cursor_row: int) -> None:
self.data_table = data_table
self.cursor_row = cursor_row
super().__init__()
@property
def control(self) -> SingleClickDataTable:
"""Return the data table that fired this event."""
return self.data_table
async def _on_click(self, event: events.Click) -> None:
"""Forward to parent, then post RowClicked on every mouse click.
The hub table is handled exclusively via RowClicked. RowSelected is
intentionally NOT used for the hub table to avoid double-dispatch.
"""
await super()._on_click(event)
meta = event.style.meta
if meta and "row" in meta and self.cursor_type == "row":
row_index: int = int(meta["row"])
if row_index >= 0:
self.post_message(SingleClickDataTable.RowClicked(self, row_index))
class FuzzForgeApp(App[None]):
"""FuzzForge AI terminal user interface."""
TITLE = "FuzzForge AI"
@@ -91,7 +133,8 @@ class FuzzForgeApp(App):
/* Modal screens */
AgentSetupScreen, AgentUnlinkScreen,
HubManagerScreen, LinkHubScreen, CloneHubScreen {
HubManagerScreen, LinkHubScreen, CloneHubScreen,
BuildImageScreen, BuildLogScreen {
align: center middle;
}
@@ -125,6 +168,35 @@ class FuzzForgeApp(App):
overflow-y: auto;
}
#build-dialog {
width: 72;
height: auto;
max-height: 80%;
border: thick #4699fc;
background: $surface;
padding: 2 3;
}
#confirm-text {
margin: 1 0 2 0;
}
#build-log {
height: 30;
border: round $panel;
margin: 1 0;
}
#build-subtitle {
color: $text-muted;
margin-bottom: 1;
}
#build-status {
height: 1;
margin-top: 1;
}
.dialog-title {
text-style: bold;
text-align: center;
@@ -163,6 +235,7 @@ class FuzzForgeApp(App):
Binding("q", "quit", "Quit"),
Binding("h", "manage_hubs", "Hub Manager"),
Binding("r", "refresh", "Refresh"),
Binding("enter", "select_row", "Select", show=False),
]
def compose(self) -> ComposeResult:
@@ -170,7 +243,7 @@ class FuzzForgeApp(App):
yield Header()
with VerticalScroll(id="main"):
with Vertical(id="hub-panel", classes="panel"):
yield DataTable(id="hub-table")
yield SingleClickDataTable(id="hub-table")
with Horizontal(id="hub-title-bar"):
yield Button(
"Hub Manager (h)",
@@ -189,7 +262,12 @@ class FuzzForgeApp(App):
def on_mount(self) -> None:
"""Populate tables on startup."""
self._agent_rows: list[_AgentRow] = []
self.query_one("#hub-panel").border_title = "Hub Servers"
self._hub_rows: list[tuple[str, str, str, bool] | None] = []
# Background build tracking
self._active_builds: dict[str, object] = {} # image -> Popen
self._build_logs: dict[str, list[str]] = {} # image -> log lines
self._build_results: dict[str, bool] = {} # image -> success
self.query_one("#hub-panel").border_title = "Hub Servers [dim](click ✗ Not built to build)[/dim]"
self.query_one("#agents-panel").border_title = "AI Agents"
self._refresh_agents()
self._refresh_hub()
@@ -215,9 +293,11 @@ class FuzzForgeApp(App):
def _refresh_hub(self) -> None:
"""Refresh the hub servers table, grouped by source hub."""
table = self.query_one("#hub-table", DataTable)
self._hub_rows = []
table = self.query_one("#hub-table", SingleClickDataTable)
table.clear(columns=True)
table.add_columns("Server", "Image", "Hub", "Status")
table.cursor_type = "row"
try:
fuzzforge_root = find_fuzzforge_root()
@@ -236,7 +316,7 @@ class FuzzForgeApp(App):
return
# Group servers by source hub
groups: dict[str, list[dict]] = defaultdict(list)
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
for server in servers:
source = server.get("source_hub", "manual")
groups[source].append(server)
@@ -245,7 +325,7 @@ class FuzzForgeApp(App):
ready_count = 0
total = len(hub_servers)
statuses: list[tuple[dict, bool, str]] = []
statuses: list[tuple[dict[str, Any], bool, str]] = []
for server in hub_servers:
enabled = server.get("enabled", True)
if not enabled:
@@ -270,6 +350,7 @@ class FuzzForgeApp(App):
style="bold",
)
table.add_row(header, "", "", "")
self._hub_rows.append(None) # group header — not selectable
# Tool rows
for server, is_ready, status_text in statuses:
@@ -277,12 +358,14 @@ class FuzzForgeApp(App):
image = server.get("image", "unknown")
enabled = server.get("enabled", True)
if not enabled:
if image in getattr(self, "_active_builds", {}):
status_cell = Text("⏳ Building…", style="yellow")
elif not enabled:
status_cell = Text("Disabled", style="dim")
elif is_ready:
status_cell = Text("✓ Ready", style="green")
else:
status_cell = Text(f"{status_text}", style="red")
status_cell = Text(f"{status_text}", style="red dim")
table.add_row(
f" {name}",
@@ -290,13 +373,27 @@ class FuzzForgeApp(App):
hub_name,
status_cell,
)
self._hub_rows.append((name, image, hub_name, is_ready))
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection on the agents table."""
if event.data_table.id != "agents-table":
return
"""Handle Enter-key row selection (agents table only).
idx = event.cursor_row
Hub table uses RowClicked exclusively — wiring it to RowSelected too
would cause a double push on every click since Textual 8 fires
RowSelected on ALL clicks, not just second-click-on-same-row.
"""
if event.data_table.id == "agents-table":
self._handle_agent_row(event.cursor_row)
def on_single_click_data_table_row_clicked(
self, event: SingleClickDataTable.RowClicked
) -> None:
"""Handle single mouse-click on a hub table row."""
if event.data_table.id == "hub-table":
self._handle_hub_row(event.cursor_row)
def _handle_agent_row(self, idx: int) -> None:
"""Open agent setup/unlink for the selected agent row."""
if idx < 0 or idx >= len(self._agent_rows):
return
@@ -317,6 +414,111 @@ class FuzzForgeApp(App):
callback=self._on_agent_changed,
)
def _handle_hub_row(self, idx: int) -> None:
"""Handle a click on a hub table row."""
# Guard: never push two build dialogs at once (double-click protection)
if getattr(self, "_build_dialog_open", False):
return
if idx < 0 or idx >= len(self._hub_rows):
return
row_data = self._hub_rows[idx]
if row_data is None:
return # group header row — ignore
server_name, image, hub_name, is_ready = row_data
# If a build is already running, open the live log viewer
if image in self._active_builds:
from fuzzforge_cli.tui.screens.build_log import BuildLogScreen
self._build_dialog_open = True
self.push_screen(
BuildLogScreen(image),
callback=lambda _: setattr(self, "_build_dialog_open", False),
)
return
if is_ready:
self.notify(f"{image} is already built ✓", severity="information")
return
if hub_name == "manual":
self.notify("Manual servers must be built outside FuzzForge")
return
from fuzzforge_cli.tui.screens.build_image import BuildImageScreen
self._build_dialog_open = True
def _on_build_dialog_done(result: bool | None) -> None:
self._build_dialog_open = False
if result is not None:
self._on_build_confirmed(result, server_name, image, hub_name)
self.push_screen(
BuildImageScreen(server_name, image, hub_name),
callback=_on_build_dialog_done,
)
def _on_build_confirmed(self, confirmed: bool, server_name: str, image: str, hub_name: str) -> None:
"""Start a background build if the user confirmed."""
if not confirmed:
return
self._build_logs[image] = []
self._build_results.pop(image, None)
self._active_builds[image] = True # mark as pending so ⏳ shows immediately
self._refresh_hub() # show ⏳ Building… immediately
self._run_build(server_name, image, hub_name)
@work(thread=True)
def _run_build(self, server_name: str, image: str, hub_name: str) -> None:
"""Build a Docker/Podman image in a background thread."""
from fuzzforge_cli.tui.helpers import build_image, find_dockerfile_for_server
logs = self._build_logs.setdefault(image, [])
dockerfile = find_dockerfile_for_server(server_name, hub_name)
if dockerfile is None:
logs.append(f"ERROR: Dockerfile not found for '{server_name}' in hub '{hub_name}'")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
logs.append(f"Building {image} from {dockerfile.parent}")
logs.append("")
try:
proc = build_image(image, dockerfile)
except FileNotFoundError as exc:
logs.append(f"ERROR: {exc}")
self._build_results[image] = False
self._active_builds.pop(image, None)
self.call_from_thread(self._on_build_done, image, success=False)
return
self._active_builds[image] = proc # replace pending marker with actual process
self.call_from_thread(self._refresh_hub) # show ⏳ in table
if proc.stdout is None:
return
for line in proc.stdout:
logs.append(line.rstrip())
proc.wait()
self._active_builds.pop(image, None)
success = proc.returncode == 0
self._build_results[image] = success
self.call_from_thread(self._on_build_done, image, success=success)
def _on_build_done(self, image: str, *, success: bool) -> None:
"""Handle completion of a background build on the main thread."""
self._refresh_hub()
if success:
self.notify(f"{image} built successfully", severity="information")
else:
self.notify(f"{image} build failed — click row for log", severity="error")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle button presses."""
if event.button.id == "btn-hub-manager":

View File

@@ -8,7 +8,9 @@ and managing linked MCP hub repositories.
from __future__ import annotations
import contextlib
import json
import os
import subprocess
from pathlib import Path
from typing import Any
@@ -30,12 +32,32 @@ FUZZFORGE_DEFAULT_HUB_URL = "git@github.com:FuzzingLabs/mcp-security-hub.git"
FUZZFORGE_DEFAULT_HUB_NAME = "mcp-security-hub"
def get_fuzzforge_user_dir() -> Path:
"""Return the user-global ``~/.fuzzforge/`` directory.
Stores data that is shared across all workspaces: cloned hub
repositories, the hub registry, container storage (graphroot/runroot),
and the hub workspace volume.
Override with the ``FUZZFORGE_USER_DIR`` environment variable to
redirect all user-global data to a custom path — useful for testing
a fresh install without touching the real ``~/.fuzzforge/``.
:return: ``Path.home() / ".fuzzforge"`` or ``$FUZZFORGE_USER_DIR``
"""
env_dir = os.environ.get("FUZZFORGE_USER_DIR")
if env_dir:
return Path(env_dir).resolve()
return Path.home() / ".fuzzforge"
def get_fuzzforge_dir() -> Path:
"""Return the project-local ``.fuzzforge/`` directory.
Uses the current working directory so that each project gets its
own isolated FuzzForge configuration, hubs, and storage — similar
to how ``.git/`` or ``.venv/`` work.
Stores data that is specific to the current workspace: fuzzing
results and project artifacts. Similar to how ``.git/`` scopes
version-control data to a single project.
:return: ``Path.cwd() / ".fuzzforge"``
@@ -99,16 +121,21 @@ def check_agent_status(config_path: Path, servers_key: str) -> tuple[bool, str]:
def check_hub_image(image: str) -> tuple[bool, str]:
"""Check whether a Docker image exists locally.
"""Check whether a container image exists locally.
:param image: Docker image name (e.g. "semgrep-mcp:latest").
Respects the ``FUZZFORGE_ENGINE__TYPE`` environment variable so that
Podman users see the correct build status instead of always "Not built".
:param image: Image name (e.g. "semgrep-mcp:latest").
:return: Tuple of (is_ready, status_description).
"""
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
cmd = "podman" if engine == "podman" else "docker"
try:
result = subprocess.run(
["docker", "image", "inspect", image],
capture_output=True,
[cmd, "image", "inspect", image],
check=False, capture_output=True,
text=True,
timeout=5,
)
@@ -118,7 +145,7 @@ def check_hub_image(image: str) -> tuple[bool, str]:
except subprocess.TimeoutExpired:
return False, "Timeout"
except FileNotFoundError:
return False, "Docker not found"
return False, f"{cmd} not found"
def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
@@ -132,7 +159,8 @@ def load_hub_config(fuzzforge_root: Path) -> dict[str, Any]:
if not config_path.exists():
return {}
try:
return json.loads(config_path.read_text())
data: dict[str, Any] = json.loads(config_path.read_text())
return data
except json.JSONDecodeError:
return {}
@@ -237,37 +265,103 @@ def uninstall_agent_config(agent: AIAgent) -> str:
def get_hubs_registry_path() -> Path:
"""Return path to the hubs registry file (``.fuzzforge/hubs.json``).
"""Return path to the hubs registry file (``~/.fuzzforge/hubs.json``).
Stored in the user-global directory so the registry is shared across
all workspaces.
:return: Path to the registry JSON file.
"""
return get_fuzzforge_dir() / "hubs.json"
return get_fuzzforge_user_dir() / "hubs.json"
def get_default_hubs_dir() -> Path:
"""Return default directory for cloned hubs (``.fuzzforge/hubs/``).
"""Return default directory for cloned hubs (``~/.fuzzforge/hubs/``).
Stored in the user-global directory so hubs are cloned once and
reused in every workspace.
:return: Path to the default hubs directory.
"""
return get_fuzzforge_dir() / "hubs"
return get_fuzzforge_user_dir() / "hubs"
def _discover_hub_dirs() -> list[Path]:
"""Scan known hub directories for cloned repos.
Checks both the current global location (``~/.fuzzforge/hubs/``) and the
legacy workspace-local location (``<cwd>/.fuzzforge/hubs/``) so that hubs
cloned before the global-dir migration are still found.
:return: List of hub directory paths (each is a direct child with a ``.git``
sub-directory).
"""
candidates: list[Path] = []
for base in (get_fuzzforge_user_dir() / "hubs", get_fuzzforge_dir() / "hubs"):
if base.is_dir():
candidates.extend(
entry for entry in base.iterdir()
if entry.is_dir() and (entry / ".git").is_dir()
)
return candidates
def load_hubs_registry() -> dict[str, Any]:
"""Load the hubs registry from disk.
If the registry file does not exist, auto-recovers it by scanning known hub
directories and rebuilding entries for any discovered hubs. This handles
the migration from the old workspace-local ``<cwd>/.fuzzforge/hubs.json``
path to the global ``~/.fuzzforge/hubs.json`` path, as well as any case
where the registry was lost.
:return: Registry dict with ``hubs`` key containing a list of hub entries.
"""
path = get_hubs_registry_path()
if not path.exists():
return {"hubs": []}
try:
return json.loads(path.read_text())
except (json.JSONDecodeError, OSError):
if path.exists():
try:
data: dict[str, Any] = json.loads(path.read_text())
return data
except (json.JSONDecodeError, OSError):
pass
# Registry missing — attempt to rebuild from discovered hub directories.
discovered = _discover_hub_dirs()
if not discovered:
return {"hubs": []}
hubs: list[dict[str, Any]] = []
for hub_dir in discovered:
name = hub_dir.name
# Try to read the git remote URL
git_url: str = ""
try:
import subprocess as _sp
r = _sp.run(
["git", "-C", str(hub_dir), "remote", "get-url", "origin"],
check=False, capture_output=True, text=True, timeout=5,
)
if r.returncode == 0:
git_url = r.stdout.strip()
except Exception: # noqa: S110 - git URL is optional, failure is acceptable
pass
hubs.append({
"name": name,
"path": str(hub_dir),
"git_url": git_url,
"is_default": name == FUZZFORGE_DEFAULT_HUB_NAME,
})
registry: dict[str, Any] = {"hubs": hubs}
# Persist so we don't re-scan on every load
with contextlib.suppress(OSError):
save_hubs_registry(registry)
return registry
def save_hubs_registry(registry: dict[str, Any]) -> None:
"""Save the hubs registry to disk.
@@ -320,7 +414,7 @@ def scan_hub_for_servers(hub_path: Path) -> list[dict[str, Any]]:
"image": f"{tool_name}:latest",
"category": category,
"capabilities": capabilities,
"volumes": [f"{get_fuzzforge_dir()}/hub/workspace:/data"],
"volumes": [f"{get_fuzzforge_user_dir()}/hub/workspace:/data"],
"enabled": True,
}
)
@@ -422,8 +516,7 @@ def clone_hub(
"""
if name is None:
name = git_url.rstrip("/").split("/")[-1]
if name.endswith(".git"):
name = name[:-4]
name = name.removesuffix(".git")
if dest is None:
dest = get_default_hubs_dir() / name
@@ -433,7 +526,7 @@ def clone_hub(
try:
result = subprocess.run(
["git", "-C", str(dest), "pull"],
capture_output=True,
check=False, capture_output=True,
text=True,
timeout=120,
)
@@ -451,7 +544,7 @@ def clone_hub(
try:
result = subprocess.run(
["git", "clone", git_url, str(dest)],
capture_output=True,
check=False, capture_output=True,
text=True,
timeout=300,
)
@@ -533,3 +626,62 @@ def _remove_hub_servers_from_config(hub_name: str) -> int:
config_path.write_text(json.dumps(config, indent=2))
return before - after
def find_dockerfile_for_server(server_name: str, hub_name: str) -> Path | None:
"""Find the Dockerfile for a hub server tool.
Looks up the hub path from the registry, then scans for
``category/<server_name>/Dockerfile``.
:param server_name: Tool name (e.g. ``"nmap-mcp"``).
:param hub_name: Hub name as stored in the registry.
:return: Absolute path to the Dockerfile, or ``None`` if not found.
"""
registry = load_hubs_registry()
hub_entry = next(
(h for h in registry.get("hubs", []) if h.get("name") == hub_name),
None,
)
if not hub_entry:
return None
hub_path = Path(hub_entry["path"])
for dockerfile in hub_path.rglob("Dockerfile"):
rel = dockerfile.relative_to(hub_path)
parts = rel.parts
if len(parts) == 3 and parts[1] == server_name:
return dockerfile
return None
def build_image(
image: str,
dockerfile: Path,
*,
engine: str | None = None,
) -> subprocess.Popen[str]:
"""Start a non-blocking ``docker/podman build`` subprocess.
Returns the running :class:`subprocess.Popen` object so the caller
can stream ``stdout`` / ``stderr`` lines incrementally.
:param image: Image tag (e.g. ``"nmap-mcp:latest"``).
:param dockerfile: Path to the ``Dockerfile``.
:param engine: ``"docker"`` or ``"podman"`` (auto-detected if ``None``).
:return: Running subprocess with merged stdout+stderr.
"""
if engine is None:
engine = os.environ.get("FUZZFORGE_ENGINE__TYPE", "docker").lower()
engine = "podman" if engine == "podman" else "docker"
context_dir = str(dockerfile.parent)
return subprocess.Popen(
[engine, "build", "-t", image, context_dir],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)

View File

@@ -0,0 +1,58 @@
"""Build-image confirm dialog for FuzzForge TUI.
Simple modal that asks the user to confirm before starting a background
build. The actual build is managed by the app so the user is never
locked on this screen.
"""
from __future__ import annotations
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label
class _NoFocusButton(Button):
can_focus = False
class BuildImageScreen(ModalScreen[bool]):
"""Quick confirmation before starting a background Docker/Podman build."""
BINDINGS = [("escape", "cancel", "Cancel")]
def __init__(self, server_name: str, image: str, hub_name: str) -> None:
super().__init__()
self._server_name = server_name
self._image = image
self._hub_name = hub_name
def compose(self) -> ComposeResult:
"""Build the confirmation dialog UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build {self._image}", classes="dialog-title")
yield Label(
f"Hub: {self._hub_name} • Tool: {self._server_name}",
id="build-subtitle",
)
yield Label(
"The image will be built in the background.\n"
"You'll receive a notification when it's done.",
id="confirm-text",
)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Build", variant="primary", id="btn-build")
yield _NoFocusButton("Cancel", variant="default", id="btn-cancel")
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Build or Cancel button clicks."""
if event.button.id == "btn-build":
self.dismiss(result=True)
elif event.button.id == "btn-cancel":
self.dismiss(result=False)
def action_cancel(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(result=False)

View File

@@ -0,0 +1,80 @@
"""Build-log viewer screen for FuzzForge TUI.
Shows live output of a background build started by the app. Polls the
app's ``_build_logs`` buffer every 500 ms so the user can pop this screen
open at any time while the build is running and see up-to-date output.
"""
from __future__ import annotations
from typing import Any
from textual.app import ComposeResult
from textual.containers import Horizontal, Vertical
from textual.screen import ModalScreen
from textual.widgets import Button, Label, Log
class _NoFocusButton(Button):
can_focus = False
class BuildLogScreen(ModalScreen[None]):
"""Live log viewer for a background build job managed by the app."""
BINDINGS = [("escape", "close", "Close")]
def __init__(self, image: str) -> None:
super().__init__()
self._image = image
self._last_line: int = 0
def compose(self) -> ComposeResult:
"""Build the log viewer UI."""
with Vertical(id="build-dialog"):
yield Label(f"Build log — {self._image}", classes="dialog-title")
yield Label("", id="build-status")
yield Log(id="build-log", auto_scroll=True)
with Horizontal(classes="dialog-buttons"):
yield _NoFocusButton("Close", variant="default", id="btn-close")
def on_mount(self) -> None:
"""Initialize log polling when the screen is mounted."""
self._flush_log()
self.set_interval(0.5, self._poll_log)
def _flush_log(self) -> None:
"""Write any new lines since the last flush."""
logs: list[str] = getattr(self.app, "_build_logs", {}).get(self._image, [])
log_widget = self.query_one("#build-log", Log)
new_lines = logs[self._last_line :]
for line in new_lines:
log_widget.write_line(line)
self._last_line += len(new_lines)
active: dict[str, Any] = getattr(self.app, "_active_builds", {})
status = self.query_one("#build-status", Label)
if self._image in active:
status.update("[yellow]⏳ Building…[/yellow]")
else:
# Build is done — check if we have a result stored
results: dict[str, Any] = getattr(self.app, "_build_results", {})
if self._image in results:
if results[self._image]:
status.update(f"[green]✓ {self._image} built successfully[/green]")
else:
status.update(f"[red]✗ {self._image} build failed[/red]")
def _poll_log(self) -> None:
"""Poll for new log lines periodically."""
self._flush_log()
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Handle Close button click."""
if event.button.id == "btn-close":
self.dismiss(None)
def action_close(self) -> None:
"""Dismiss the dialog when Escape is pressed."""
self.dismiss(None)

View File

@@ -81,6 +81,7 @@ class HubManagerScreen(ModalScreen[str | None]):
is_default = hub.get("is_default", False)
hub_path = Path(path)
count: str | Text
if hub_path.is_dir():
servers = scan_hub_for_servers(hub_path)
count = str(len(servers))
@@ -88,10 +89,11 @@ class HubManagerScreen(ModalScreen[str | None]):
count = Text("dir missing", style="yellow")
source = git_url or "local"
name_cell: str | Text
if is_default:
name_cell = Text(f"{name}", style="bold")
else:
name_cell = Text(name)
name_cell = name
table.add_row(name_cell, path, count, source)

View File

@@ -18,3 +18,32 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/**" = [
"ANN201", # missing return type: legacy code
"ARG002", # unused argument: callback pattern
"ASYNC109", # async with timeout param: intentional pattern
"BLE001", # blind exception: broad error handling needed
"C901", # complexity: legacy code
"EM102", # f-string in exception: existing pattern
"F401", # unused import: re-export pattern
"FBT001", # boolean positional arg
"FBT002", # boolean default arg
"FIX002", # TODO comments: documented tech debt
"N806", # variable naming: intentional constants
"PERF401", # list comprehension: readability over perf
"PLW0603", # global statement: intentional for shared state
"PTH111", # os.path usage: legacy code
"RUF005", # collection literal: legacy style
"S110", # try-except-pass: intentional suppression
"S603", # subprocess: validated inputs
"SIM108", # ternary: readability preference
"TC001", # TYPE_CHECKING: causes circular imports
"TC003", # TYPE_CHECKING: causes circular imports
"TRY003", # message in exception: existing pattern
"TRY300", # try-else: existing pattern
"TRY400", # logging.error vs exception: existing pattern
"UP017", # datetime.UTC: Python 3.11+ only
"UP041", # TimeoutError alias: compatibility
"UP043", # unnecessary type args: compatibility
"W293", # blank line whitespace: formatting
]

View File

@@ -176,6 +176,7 @@ class HubClient:
arguments: dict[str, Any],
*,
timeout: int | None = None,
extra_volumes: list[str] | None = None,
) -> dict[str, Any]:
"""Execute a tool on a hub server.
@@ -183,6 +184,7 @@ class HubClient:
:param tool_name: Name of the tool to execute.
:param arguments: Tool arguments.
:param timeout: Execution timeout (uses default if None).
:param extra_volumes: Additional Docker volume mounts to inject.
:returns: Tool execution result.
:raises HubClientError: If execution fails.
@@ -199,7 +201,7 @@ class HubClient:
)
try:
async with self._connect(config) as (reader, writer):
async with self._connect(config, extra_volumes=extra_volumes) as (reader, writer):
# Initialise MCP session (skip for persistent — already done)
if not self._persistent_sessions.get(config.name):
await self._initialize_session(reader, writer, config.name)
@@ -248,6 +250,7 @@ class HubClient:
async def _connect(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to an MCP server.
@@ -256,6 +259,7 @@ class HubClient:
ephemeral per-call connection logic.
:param config: Server configuration.
:param extra_volumes: Additional Docker volume mounts to inject.
:yields: Tuple of (reader, writer) for communication.
"""
@@ -268,7 +272,7 @@ class HubClient:
# Ephemeral connection (original behaviour)
if config.type == HubServerType.DOCKER:
async with self._connect_docker(config) as streams:
async with self._connect_docker(config, extra_volumes=extra_volumes) as streams:
yield streams
elif config.type == HubServerType.COMMAND:
async with self._connect_command(config) as streams:
@@ -284,10 +288,12 @@ class HubClient:
async def _connect_docker(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> AsyncGenerator[tuple[asyncio.StreamReader, asyncio.StreamWriter], None]:
"""Connect to a Docker-based MCP server.
:param config: Server configuration with image name.
:param extra_volumes: Additional volume mounts to inject (e.g. project assets).
:yields: Tuple of (reader, writer) for stdio communication.
"""
@@ -302,10 +308,14 @@ class HubClient:
for cap in config.capabilities:
cmd.extend(["--cap-add", cap])
# Add volumes
# Add volumes from server config
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
# Add extra volumes (e.g. project assets injected at runtime)
for volume in (extra_volumes or []):
cmd.extend(["-v", os.path.expanduser(volume)])
# Add environment variables
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])
@@ -529,6 +539,7 @@ class HubClient:
async def start_persistent_session(
self,
config: HubServerConfig,
extra_volumes: list[str] | None = None,
) -> PersistentSession:
"""Start a persistent Docker container and initialise MCP session.
@@ -536,6 +547,7 @@ class HubClient:
called, allowing multiple tool calls on the same session.
:param config: Server configuration (must be Docker type).
:param extra_volumes: Additional host:container volume mounts to inject.
:returns: The created persistent session.
:raises HubClientError: If the container cannot be started.
@@ -580,6 +592,9 @@ class HubClient:
for volume in config.volumes:
cmd.extend(["-v", os.path.expanduser(volume)])
for extra_vol in (extra_volumes or []):
cmd.extend(["-v", extra_vol])
for key, value in config.environment.items():
cmd.extend(["-e", f"{key}={value}"])

View File

@@ -180,12 +180,14 @@ class HubExecutor:
arguments: dict[str, Any] | None = None,
*,
timeout: int | None = None,
extra_volumes: list[str] | None = None,
) -> HubExecutionResult:
"""Execute a hub tool.
:param identifier: Tool identifier (hub:server:tool or server:tool).
:param arguments: Tool arguments.
:param timeout: Execution timeout.
:param extra_volumes: Additional Docker volume mounts to inject.
:returns: Execution result.
"""
@@ -232,6 +234,7 @@ class HubExecutor:
tool_name_to_use or tool_name,
arguments,
timeout=timeout,
extra_volumes=extra_volumes,
)
return HubExecutionResult(
success=True,
@@ -268,6 +271,7 @@ class HubExecutor:
tool.name,
arguments,
timeout=timeout,
extra_volumes=extra_volumes,
)
return HubExecutionResult(
success=True,
@@ -341,13 +345,14 @@ class HubExecutor:
# Persistent session management
# ------------------------------------------------------------------
async def start_persistent_server(self, server_name: str) -> dict[str, Any]:
async def start_persistent_server(self, server_name: str, extra_volumes: list[str] | None = None) -> dict[str, Any]:
"""Start a persistent container session for a server.
The container stays running between tool calls, allowing stateful
interactions (e.g., radare2 sessions, long-running fuzzing).
:param server_name: Name of the hub server to start.
:param extra_volumes: Additional host:container volume mounts to inject.
:returns: Session status dictionary.
:raises ValueError: If server not found.
@@ -358,7 +363,7 @@ class HubExecutor:
msg = f"Server '{server_name}' not found"
raise ValueError(msg)
session = await self._client.start_persistent_session(server.config)
session = await self._client.start_persistent_session(server.config, extra_volumes=extra_volumes)
# Auto-discover tools on the new session
try:

View File

@@ -294,3 +294,17 @@ class HubConfig(BaseModel):
default=True,
description="Cache discovered tools",
)
#: Workflow hints indexed by "after:<tool_name>" keys.
#: Loaded inline or merged from workflow_hints_file.
workflow_hints: dict[str, Any] = Field(
default_factory=dict,
description="Workflow hints indexed by 'after:<tool_name>'",
)
#: Optional path to an external workflow-hints.json file.
#: Relative paths are resolved relative to the hub-config.json location.
workflow_hints_file: str | None = Field(
default=None,
description="Path to an external workflow-hints.json to load and merge",
)

View File

@@ -87,6 +87,28 @@ class HubRegistry:
config=server_config,
)
# Load and merge external workflow hints file if specified.
if self._config.workflow_hints_file:
hints_path = Path(self._config.workflow_hints_file)
if not hints_path.is_absolute():
hints_path = config_path.parent / hints_path
if hints_path.exists():
try:
with hints_path.open() as hf:
hints_data = json.load(hf)
self._config.workflow_hints.update(hints_data.get("hints", {}))
logger.info(
"Loaded workflow hints",
path=str(hints_path),
hints=len(self._config.workflow_hints),
)
except Exception as hints_err:
logger.warning(
"Failed to load workflow hints file",
path=str(hints_path),
error=str(hints_err),
)
logger.info(
"Loaded hub configuration",
path=str(config_path),
@@ -218,6 +240,15 @@ class HubRegistry:
server.discovery_error = None
server.tools = tools
def get_workflow_hint(self, tool_name: str) -> dict | None:
"""Get the workflow hint for a tool by name.
:param tool_name: Tool name (e.g. ``binwalk_extract``).
:returns: Hint dict for the ``after:<tool_name>`` key, or None.
"""
return self._config.workflow_hints.get(f"after:{tool_name}") or None
def get_all_tools(self) -> list:
"""Get all discovered tools from all servers.

View File

@@ -4,7 +4,7 @@ from typing import TYPE_CHECKING
from pydantic import BaseModel
from fuzzforge_common.sandboxes.engines.enumeration import (
FuzzForgeSandboxEngines, # noqa: TC001 (required by 'pydantic' at runtime)
FuzzForgeSandboxEngines,
)
if TYPE_CHECKING:

View File

@@ -14,3 +14,18 @@ ignore = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]
"src/**" = [
"ASYNC109", # async with timeout param: intentional pattern
"EM102", # f-string in exception: existing pattern
"PERF401", # list comprehension: readability over perf
"PLR0913", # too many arguments: API compatibility
"PLW0602", # global variable: intentional for shared state
"PLW0603", # global statement: intentional for shared state
"RET504", # unnecessary assignment: readability
"RET505", # unnecessary elif after return: readability
"TC001", # TYPE_CHECKING: causes circular imports
"TC003", # TYPE_CHECKING: causes circular imports
"TRY300", # try-else: existing pattern
"TRY301", # abstract raise: existing pattern
"TRY003", # message in exception: existing pattern
]

View File

@@ -47,15 +47,46 @@ FuzzForge is a security research orchestration platform. Use these tools to:
Typical workflow:
1. Initialize a project with `init_project`
2. Set project assets with `set_project_assets` (optional, only needed once for the source directory)
2. Set project assets with `set_project_assets` — path to the directory containing
target files (firmware images, binaries, source code, etc.)
3. List available hub servers with `list_hub_servers`
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`
Hub workflow:
1. List available hub servers with `list_hub_servers`
2. Discover tools from servers with `discover_hub_tools`
3. Execute hub tools with `execute_hub_tool`
Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools.
File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
Stateful tools:
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
Firmware analysis pipeline (when analyzing firmware images):
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
Run steps 3 and 4 outputs feed into a final triage summary.
radare2-mcp agent context (upstream tool — no embedded context):
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
returns an empty result, fall back to `list_symbols` and search for dangerous function names
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
- For string extraction, use `run_command` with `iz` (data section strings).
The `list_all_strings` tool may return garbled output for large binaries.
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
""",
lifespan=lifespan,
)

View File

@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
@@ -31,10 +30,10 @@ async def list_executions() -> list[dict[str, Any]]:
return [
{
"execution_id": exec_id,
"has_results": storage.get_execution_results(project_path, exec_id) is not None,
"execution_id": entry["execution_id"],
"has_results": storage.get_execution_results(project_path, entry["execution_id"]) is not None,
}
for exec_id in execution_ids
for entry in execution_ids
]
except Exception as exception:

View File

@@ -10,7 +10,6 @@ from fastmcp.exceptions import ResourceError
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
mcp: FastMCP = FastMCP()

View File

@@ -13,9 +13,12 @@ from __future__ import annotations
import json
import logging
import shutil
import mimetypes
from datetime import UTC, datetime
from pathlib import Path
from tarfile import open as Archive # noqa: N812
from typing import Any
from uuid import uuid4
logger = logging.getLogger("fuzzforge-mcp")
@@ -79,6 +82,7 @@ class LocalStorage:
storage_path = self._get_project_path(project_path)
storage_path.mkdir(parents=True, exist_ok=True)
(storage_path / "runs").mkdir(parents=True, exist_ok=True)
(storage_path / "output").mkdir(parents=True, exist_ok=True)
# Create .gitignore to avoid committing large files
gitignore_path = storage_path / ".gitignore"
@@ -86,6 +90,8 @@ class LocalStorage:
gitignore_path.write_text(
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"output/\n"
"artifacts.json\n"
"!config.json\n"
)
@@ -131,7 +137,7 @@ class LocalStorage:
storage_path.mkdir(parents=True, exist_ok=True)
config_path = storage_path / "config.json"
config: dict = {}
config: dict[str, Any] = {}
if config_path.exists():
config = json.loads(config_path.read_text())
@@ -141,17 +147,85 @@ class LocalStorage:
logger.info("Set project assets: %s -> %s", project_path.name, assets_path)
return assets_path
def list_executions(self, project_path: Path) -> list[str]:
"""List all execution IDs for a project.
def get_project_output_path(self, project_path: Path) -> Path | None:
"""Get the output directory path for a project.
Returns the path to the writable output directory that is mounted
into hub tool containers at /app/output.
:param project_path: Path to the project directory.
:returns: List of execution IDs.
:returns: Path to output directory, or None if project not initialized.
"""
output_path = self._get_project_path(project_path) / "output"
if output_path.exists():
return output_path
return None
def record_execution(
self,
project_path: Path,
server_name: str,
tool_name: str,
arguments: dict[str, Any],
result: dict[str, Any],
) -> str:
"""Record an execution result to the project's runs directory.
:param project_path: Path to the project directory.
:param server_name: Hub server name.
:param tool_name: Tool name that was executed.
:param arguments: Arguments passed to the tool.
:param result: Execution result dictionary.
:returns: Execution ID.
"""
execution_id = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:8]}"
run_dir = self._get_project_path(project_path) / "runs" / execution_id
run_dir.mkdir(parents=True, exist_ok=True)
metadata = {
"execution_id": execution_id,
"timestamp": datetime.now(tz=UTC).isoformat(),
"server": server_name,
"tool": tool_name,
"arguments": arguments,
"success": result.get("success", False),
"result": result,
}
(run_dir / "metadata.json").write_text(json.dumps(metadata, indent=2, default=str))
logger.info("Recorded execution %s: %s:%s", execution_id, server_name, tool_name)
return execution_id
def list_executions(self, project_path: Path) -> list[dict[str, Any]]:
"""List all executions for a project with summary metadata.
:param project_path: Path to the project directory.
:returns: List of execution summaries (id, timestamp, server, tool, success).
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
return [d.name for d in runs_dir.iterdir() if d.is_dir()]
executions: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir(), reverse=True):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
meta = json.loads(meta_path.read_text())
executions.append({
"execution_id": meta.get("execution_id", run_dir.name),
"timestamp": meta.get("timestamp"),
"server": meta.get("server"),
"tool": meta.get("tool"),
"success": meta.get("success"),
})
else:
executions.append({"execution_id": run_dir.name})
return executions
def get_execution_results(
self,
@@ -201,3 +275,203 @@ class LocalStorage:
except Exception as exc:
msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc
# ------------------------------------------------------------------
# Artifact tracking
# ------------------------------------------------------------------
def _artifacts_path(self, project_path: Path) -> Path:
"""Get the path to the artifacts registry file.
:param project_path: Path to the project directory.
:returns: Path to artifacts.json.
"""
return self._get_project_path(project_path) / "artifacts.json"
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
"""Load the artifact registry from disk.
:param project_path: Path to the project directory.
:returns: List of artifact dicts.
"""
path = self._artifacts_path(project_path)
if path.exists():
try:
return json.loads(path.read_text()) # type: ignore[no-any-return]
except (json.JSONDecodeError, OSError):
return []
return []
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
"""Persist the artifact registry to disk.
:param project_path: Path to the project directory.
:param artifacts: Full artifact list to write.
"""
path = self._artifacts_path(project_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(artifacts, indent=2, default=str))
def _classify_file(self, file_path: Path) -> str:
"""Classify a file into a human-friendly type string.
:param file_path: Path to the file.
:returns: Type string (e.g. "elf-binary", "text", "directory").
"""
mime, _ = mimetypes.guess_type(str(file_path))
suffix = file_path.suffix.lower()
# Try reading ELF magic for binaries with no extension
if mime is None and suffix == "":
try:
header = file_path.read_bytes()[:4]
if header == b"\x7fELF":
return "elf-binary"
except OSError:
pass
if mime:
if "json" in mime:
return "json"
if "text" in mime or "xml" in mime or "yaml" in mime:
return "text"
if "image" in mime:
return "image"
if "octet-stream" in mime:
return "binary"
type_map: dict[str, str] = {
".json": "json",
".sarif": "sarif",
".md": "markdown",
".txt": "text",
".log": "text",
".csv": "csv",
".yaml": "yaml",
".yml": "yaml",
".xml": "xml",
".html": "html",
".elf": "elf-binary",
".so": "elf-binary",
".bin": "binary",
".gz": "archive",
".tar": "archive",
".zip": "archive",
}
return type_map.get(suffix, "binary")
def scan_artifacts(
self,
project_path: Path,
server_name: str,
tool_name: str,
) -> list[dict[str, Any]]:
"""Scan the output directory for new or modified files and register them.
Compares the current state of .fuzzforge/output/ against the existing
artifact registry and registers any new or modified files.
:param project_path: Path to the project directory.
:param server_name: Hub server that produced the artifacts.
:param tool_name: Tool that produced the artifacts.
:returns: List of newly registered artifact dicts.
"""
output_path = self.get_project_output_path(project_path)
if output_path is None or not output_path.exists():
return []
existing = self._load_artifacts(project_path)
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
now = datetime.now(tz=UTC).isoformat()
new_artifacts: list[dict[str, Any]] = []
for file_path in output_path.rglob("*"):
if not file_path.is_file():
continue
# Use the container-style path (/app/output/...) so it's
# directly usable in subsequent tool calls.
relative = file_path.relative_to(output_path)
container_path = f"/app/output/{relative}"
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
prev = known.get(container_path)
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
continue # Unchanged — skip
artifact: dict[str, Any] = {
"path": container_path,
"host_path": str(file_path),
"type": self._classify_file(file_path),
"size": size,
"mtime": mtime,
"source_server": server_name,
"source_tool": tool_name,
"registered_at": now,
}
if prev:
# Update existing entry in-place
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
existing[idx] = artifact
else:
existing.append(artifact)
new_artifacts.append(artifact)
if new_artifacts:
self._save_artifacts(project_path, existing)
logger.info(
"Registered %d new artifact(s) from %s:%s",
len(new_artifacts),
server_name,
tool_name,
)
return new_artifacts
def list_artifacts(
self,
project_path: Path,
*,
source: str | None = None,
artifact_type: str | None = None,
) -> list[dict[str, Any]]:
"""List registered artifacts, with optional filters.
:param project_path: Path to the project directory.
:param source: Filter by source server name.
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
:returns: List of matching artifact dicts.
"""
artifacts = self._load_artifacts(project_path)
if source:
artifacts = [a for a in artifacts if a.get("source_server") == source]
if artifact_type:
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
return artifacts
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
"""Get a single artifact by its container path.
:param project_path: Path to the project directory.
:param path: Container path of the artifact (e.g. /app/output/...).
:returns: Artifact dict, or None if not found.
"""
artifacts = self._load_artifacts(project_path)
for artifact in artifacts:
if artifact["path"] == path:
return artifact
return None

View File

@@ -10,21 +10,51 @@ through the FuzzForge hub. AI agents can:
from __future__ import annotations
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_common.hub import HubExecutor, HubServerConfig, HubServerType
from fuzzforge_mcp.dependencies import get_settings
from fuzzforge_mcp.dependencies import get_project_path, get_settings, get_storage
mcp: FastMCP = FastMCP()
# Name of the convention tool that hub servers can implement to provide
# rich usage context for AI agents (known issues, workflow tips, rules, etc.).
_AGENT_CONTEXT_TOOL = "get_agent_context"
# Global hub executor instance (lazy initialization)
_hub_executor: HubExecutor | None = None
async def _fetch_agent_context(
executor: HubExecutor,
server_name: str,
tools: list[Any],
) -> str | None:
"""Call get_agent_context if the server provides it.
Returns the context string, or None if the server doesn't implement
the convention or the call fails.
"""
if not any(t.name == _AGENT_CONTEXT_TOOL for t in tools):
return None
try:
result = await executor.execute_tool(
identifier=f"hub:{server_name}:{_AGENT_CONTEXT_TOOL}",
arguments={},
)
if result.success and result.result:
content = result.result.get("content", [])
if content and isinstance(content, list):
text: str = content[0].get("text", "")
return text
except Exception: # noqa: BLE001, S110 - best-effort context fetch
pass
return None
def _get_hub_executor() -> HubExecutor:
"""Get or create the hub executor instance.
@@ -51,12 +81,15 @@ def _get_hub_executor() -> HubExecutor:
@mcp.tool
async def list_hub_servers() -> dict[str, Any]:
async def list_hub_servers(category: str | None = None) -> dict[str, Any]:
"""List all registered MCP hub servers.
Returns information about configured hub servers, including
their connection type, status, and discovered tool count.
:param category: Optional category to filter by (e.g. "binary-analysis",
"web-security", "reconnaissance"). Only servers in this category
are returned.
:return: Dictionary with list of hub servers.
"""
@@ -64,6 +97,9 @@ async def list_hub_servers() -> dict[str, Any]:
executor = _get_hub_executor()
servers = executor.list_servers()
if category:
servers = [s for s in servers if s.get("category") == category]
return {
"servers": servers,
"count": len(servers),
@@ -94,7 +130,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
if server_name:
tools = await executor.discover_server_tools(server_name)
return {
# Convention: auto-fetch agent context if server provides it.
agent_context = await _fetch_agent_context(executor, server_name, tools)
# Hide the convention tool from the agent's tool list.
visible_tools = [t for t in tools if t.name != "get_agent_context"]
result: dict[str, Any] = {
"server": server_name,
"tools": [
{
@@ -103,15 +146,24 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"description": t.description,
"parameters": [p.model_dump() for p in t.parameters],
}
for t in tools
for t in visible_tools
],
"count": len(tools),
"count": len(visible_tools),
}
if agent_context:
result["agent_context"] = agent_context
return result
else:
results = await executor.discover_all_tools()
all_tools = []
contexts: dict[str, str] = {}
for server, tools in results.items():
ctx = await _fetch_agent_context(executor, server, tools)
if ctx:
contexts[server] = ctx
for tool in tools:
if tool.name == "get_agent_context":
continue
all_tools.append({
"identifier": tool.identifier,
"name": tool.name,
@@ -120,11 +172,14 @@ async def discover_hub_tools(server_name: str | None = None) -> dict[str, Any]:
"parameters": [p.model_dump() for p in tool.parameters],
})
return {
result = {
"servers_discovered": len(results),
"tools": all_tools,
"count": len(all_tools),
}
if contexts:
result["agent_contexts"] = contexts
return result
except Exception as e:
if isinstance(e, ToolError):
@@ -173,21 +228,96 @@ async def execute_hub_tool(
:return: Tool execution result.
Example identifiers:
- "hub:binwalk-mcp:binwalk_scan"
- "hub:yara-mcp:yara_scan_with_rules"
- "hub:nmap:nmap_scan"
- "nmap:nmap_scan"
- "hub:nuclei:nuclei_scan"
FILE ACCESS — if set_project_assets was called, the assets directory is
mounted read-only inside the container at two standard paths:
- /app/uploads/ (used by binwalk, and tools with UPLOAD_DIR)
- /app/samples/ (used by yara, capa, and tools with SAMPLES_DIR)
Always use /app/uploads/<filename> or /app/samples/<filename> when
passing file paths to hub tools — do NOT use the host path.
Tool outputs are persisted to a writable shared volume:
- /app/output/ (writable — extraction results, reports, etc.)
Files written here survive container destruction and are available
to subsequent tool calls. The host path is .fuzzforge/output/.
"""
try:
executor = _get_hub_executor()
# Inject project assets as Docker volume mounts if configured.
# Mounts the assets directory at the standard paths used by hub tools:
# /app/uploads — binwalk, and other tools that use UPLOAD_DIR
# /app/samples — yara, capa, and other tools that use SAMPLES_DIR
# /app/output — writable volume for tool outputs (persists across calls)
extra_volumes: list[str] = []
try:
storage = get_storage()
project_path = get_project_path()
assets_path = storage.get_project_assets_path(project_path)
if assets_path:
assets_str = str(assets_path)
extra_volumes = [
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
except Exception: # noqa: BLE001 - never block tool execution due to asset injection failure
extra_volumes = []
result = await executor.execute_tool(
identifier=identifier,
arguments=arguments or {},
timeout=timeout,
extra_volumes=extra_volumes or None,
)
return result.to_dict()
# Record execution history for list_executions / get_execution_results.
try:
storage = get_storage()
project_path = get_project_path()
storage.record_execution(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
arguments=arguments or {},
result=result.to_dict(),
)
except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues
pass
# Scan for new artifacts produced by the tool in /app/output.
response = result.to_dict()
try:
storage = get_storage()
project_path = get_project_path()
new_artifacts = storage.scan_artifacts(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
)
if new_artifacts:
response["artifacts"] = [
{"path": a["path"], "type": a["type"], "size": a["size"]}
for a in new_artifacts
]
except Exception: # noqa: BLE001, S110 - never fail the tool call due to artifact scanning
pass
# Append workflow suggestions based on hints configured for this tool.
try:
hint = executor.registry.get_workflow_hint(result.tool_name)
if hint:
response["suggested_next_steps"] = hint
except Exception: # noqa: BLE001, S110 - never fail the tool call due to hint lookup
pass
return response
except Exception as e:
if isinstance(e, ToolError):
@@ -335,7 +465,25 @@ async def start_hub_server(server_name: str) -> dict[str, Any]:
try:
executor = _get_hub_executor()
result = await executor.start_persistent_server(server_name)
# Inject project assets as Docker volume mounts (same logic as execute_hub_tool).
extra_volumes: list[str] = []
try:
storage = get_storage()
project_path = get_project_path()
assets_path = storage.get_project_assets_path(project_path)
if assets_path:
assets_str = str(assets_path)
extra_volumes = [
f"{assets_str}:/app/uploads:ro",
f"{assets_str}:/app/samples:ro",
]
output_path = storage.get_project_output_path(project_path)
if output_path:
extra_volumes.append(f"{output_path!s}:/app/output:rw")
except Exception: # noqa: BLE001 - never block server start due to asset injection failure
extra_volumes = []
result = await executor.start_persistent_server(server_name, extra_volumes=extra_volumes or None)
return {
"success": True,

View File

@@ -10,21 +10,19 @@ from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
mcp: FastMCP = FastMCP()
@mcp.tool
async def init_project(project_path: str | None = None) -> dict[str, Any]:
"""Initialize a new FuzzForge project.
"""Initialize a new FuzzForge project workspace.
Creates a `.fuzzforge/` directory inside the project for storing:
- config.json: Project configuration
- runs/: Execution results
Creates a `.fuzzforge/` directory for storing configuration and execution results.
Call this once before using hub tools. The project path is a working directory
for FuzzForge state — it does not need to contain the files you want to analyze.
Use `set_project_assets` separately to specify the target files.
This should be called before executing hub tools.
:param project_path: Path to the project directory. If not provided, uses current directory.
:param project_path: Working directory for FuzzForge state. Defaults to current directory.
:return: Project initialization result.
"""
@@ -52,12 +50,13 @@ async def init_project(project_path: str | None = None) -> dict[str, Any]:
@mcp.tool
async def set_project_assets(assets_path: str) -> dict[str, Any]:
"""Set the initial assets (source code) for a project.
"""Set the directory containing target files to analyze.
This sets the DEFAULT source directory that will be mounted into
hub tool containers via volume mounts.
Points FuzzForge to the directory with your analysis targets
(firmware images, binaries, source code, etc.). This directory
is mounted read-only into hub tool containers.
:param assets_path: Path to the project source directory.
:param assets_path: Path to the directory containing files to analyze.
:return: Result including stored assets path.
"""
@@ -86,9 +85,9 @@ async def set_project_assets(assets_path: str) -> dict[str, Any]:
async def list_executions() -> dict[str, Any]:
"""List all executions for the current project.
Returns a list of execution IDs that can be used to retrieve results.
Returns execution summaries including server, tool, timestamp, and success status.
:return: List of execution IDs.
:return: List of execution summaries.
"""
storage = get_storage()
@@ -147,3 +146,70 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
except Exception as exception:
message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_artifacts(
source: str | None = None,
artifact_type: str | None = None,
) -> dict[str, Any]:
"""List all artifacts produced by hub tools in the current project.
Artifacts are files created by tool executions in /app/output/.
They are automatically tracked after each execute_hub_tool call.
:param source: Filter by source server name (e.g. "binwalk-mcp").
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
:return: List of artifacts with path, type, size, and source info.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifacts = storage.list_artifacts(
project_path,
source=source,
artifact_type=artifact_type,
)
return {
"success": True,
"artifacts": artifacts,
"count": len(artifacts),
}
except Exception as exception:
message: str = f"Failed to list artifacts: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_artifact(path: str) -> dict[str, Any]:
"""Get metadata for a specific artifact by its container path.
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
:return: Artifact metadata including path, type, size, source tool, and timestamps.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifact = storage.get_artifact(project_path, path)
if artifact is None:
return {
"success": False,
"path": path,
"error": "Artifact not found",
}
return {
"success": True,
"artifact": artifact,
}
except Exception as exception:
message: str = f"Failed to get artifact: {exception}"
raise ToolError(message) from exception

View File

@@ -1,126 +1,5 @@
{
"servers": [
{
"name": "nmap-mcp",
"description": "Network reconnaissance using Nmap - port scanning, service detection, OS fingerprinting",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"enabled": true
},
{
"name": "binwalk-mcp",
"description": "Firmware extraction and analysis using Binwalk - file signatures, entropy analysis, embedded file extraction",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "yara-mcp",
"description": "Pattern matching and malware classification using YARA rules",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "capa-mcp",
"description": "Static capability detection using capa - identifies malware capabilities in binaries",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "radare2-mcp",
"description": "Binary analysis and reverse engineering using radare2",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "ghidra-mcp",
"description": "Advanced binary decompilation and reverse engineering using Ghidra",
"type": "docker",
"image": "ghcr.io/clearbluejar/pyghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "searchsploit-mcp",
"description": "CVE and exploit search using SearchSploit / Exploit-DB",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "nuclei-mcp",
"description": "Vulnerability scanning using Nuclei templates",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "trivy-mcp",
"description": "Container and filesystem vulnerability scanning using Trivy",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "gitleaks-mcp",
"description": "Secret and credential detection in code and firmware using Gitleaks",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"~/.fuzzforge/hub/workspace:/data"
],
"enabled": true
},
{
"name": "bloodhound-mcp",
"description": "bloodhound-mcp \u2014 active-directory",
@@ -129,7 +8,46 @@
"category": "active-directory",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "binwalk-mcp",
"description": "binwalk-mcp \u2014 binary-analysis",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "capa-mcp",
"description": "capa-mcp \u2014 binary-analysis",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ghidra-mcp",
"description": "ghidra-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -142,7 +60,33 @@
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "radare2-mcp",
"description": "radare2-mcp \u2014 binary-analysis",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "yara-mcp",
"description": "yara-mcp \u2014 binary-analysis",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -155,7 +99,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -168,7 +112,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -181,7 +125,7 @@
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -194,7 +138,7 @@
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -207,7 +151,20 @@
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "trivy-mcp",
"description": "trivy-mcp \u2014 cloud-security",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -220,7 +177,20 @@
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "searchsploit-mcp",
"description": "searchsploit-mcp \u2014 exploitation",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -233,7 +203,7 @@
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -246,7 +216,7 @@
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -259,7 +229,7 @@
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -272,7 +242,7 @@
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -285,7 +255,7 @@
"category": "password-cracking",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -300,7 +270,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -315,7 +285,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -330,7 +300,22 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nmap-mcp",
"description": "nmap-mcp \u2014 reconnaissance",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -345,7 +330,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -360,7 +345,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -375,7 +360,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -390,7 +375,20 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "gitleaks-mcp",
"description": "gitleaks-mcp \u2014 secrets",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -403,7 +401,7 @@
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -416,7 +414,7 @@
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -431,7 +429,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -446,7 +444,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -461,7 +459,22 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nuclei-mcp",
"description": "nuclei-mcp \u2014 web-security",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -476,7 +489,7 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
@@ -491,12 +504,63 @@
"NET_RAW"
],
"volumes": [
"/home/afredefon/FuzzingLabs/FuzzForge/fuzzforge-oss/.fuzzforge/hub/workspace:/data"
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-analyzer-mcp",
"description": "Go static analysis: fuzzable entry points, existing Fuzz* targets, unsafe/cgo usage, CVE scanning via govulncheck",
"type": "docker",
"image": "go-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-harness-tester-mcp",
"description": "Test Go fuzz harness quality: compilation, seed execution, fuzzing trial, quality scoring 0-100",
"type": "docker",
"image": "go-harness-tester-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-fuzzer-mcp",
"description": "Run Go native fuzzing (go test -fuzz) with blocking and continuous modes, crash collection, session management",
"type": "docker",
"image": "go-fuzzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-crash-analyzer-mcp",
"description": "Analyze Go fuzzing crashes: reproduce, classify (nil-deref, OOR, panic, race, etc.), deduplicate by stack signature",
"type": "docker",
"image": "go-crash-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
}
],
"default_timeout": 300,
"cache_tools": true
"workflow_hints_file": "mcp-security-hub/workflow-hints.json"
}

View File

@@ -1,12 +1,17 @@
[project]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md"
requires-python = ">=3.14"
authors = [
{ name = "FuzzingLabs", email = "contact@fuzzinglabs.com" }
]
dependencies = [
"fuzzforge-cli",
"fuzzforge-mcp",
"fuzzforge-common",
]
[project.optional-dependencies]
dev = [

8
uv.lock generated
View File

@@ -469,6 +469,11 @@ provides-extras = ["lints", "tests"]
name = "fuzzforge-oss"
version = "1.0.0"
source = { virtual = "." }
dependencies = [
{ name = "fuzzforge-cli" },
{ name = "fuzzforge-common" },
{ name = "fuzzforge-mcp" },
]
[package.optional-dependencies]
dev = [
@@ -482,7 +487,10 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "fuzzforge-cli", editable = "fuzzforge-cli" },
{ name = "fuzzforge-common", editable = "fuzzforge-common" },
{ name = "fuzzforge-common", marker = "extra == 'dev'", editable = "fuzzforge-common" },
{ name = "fuzzforge-mcp", editable = "fuzzforge-mcp" },
{ name = "fuzzforge-mcp", marker = "extra == 'dev'", editable = "fuzzforge-mcp" },
{ name = "fuzzforge-tests", marker = "extra == 'dev'", editable = "fuzzforge-tests" },
{ name = "pytest", marker = "extra == 'dev'", specifier = "==9.0.2" },