Compare commits

...

11 Commits

Author SHA1 Message Date
AFredefon
bbf864e88b Merge pull request #55 from FuzzingLabs/feat/report-generation
feat: implement report generation
2026-04-08 03:14:45 +02:00
AFredefon
d04797b21d Merge pull request #54 from FuzzingLabs/feat/skill-packs
feat: implement skill packs system
2026-04-08 03:10:47 +02:00
AFredefon
0ea8c4bd1d Merge pull request #53 from FuzzingLabs/feat/artifact-management
feat: implement artifact management tools
2026-04-08 03:09:31 +02:00
AFredefon
af7532c811 Merge pull request #52 from FuzzingLabs/feat/workflow-hints
feat: implement workflow suggestions pipeline
2026-04-08 03:08:52 +02:00
AFredefon
0d410bd5b4 feat: implement report generation 2026-04-07 16:25:36 +02:00
AFredefon
d3a20b3846 feat: implement skill packs system 2026-04-07 16:12:14 +02:00
AFredefon
664278da3f feat: implement artifact management tools 2026-04-07 16:06:47 +02:00
tduhamel42
01e6bc3fb1 docs: rename FuzzForge to SecPipe in all markdown files
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-03 14:04:49 +02:00
tduhamel42
b634214e01 Update README.md 2026-04-03 13:52:06 +02:00
AFredefon
e7022c2c82 Merge pull request #48 from FuzzingLabs/dev 2026-03-17 08:15:42 +01:00
AFredefon
07c32de294 Merge pull request #46 from FuzzingLabs/dev
Refactor hub integration and enhance TUI with new features
2026-03-11 08:07:05 +01:00
19 changed files with 1136 additions and 94 deletions

View File

@@ -1,8 +1,8 @@
# Contributing to FuzzForge AI
# Contributing to SecPipe AI
Thank you for your interest in contributing to FuzzForge AI! We welcome contributions from the community and are excited to collaborate with you.
Thank you for your interest in contributing to SecPipe AI! We welcome contributions from the community and are excited to collaborate with you.
**Our Vision**: FuzzForge aims to be a **universal platform for security research** across all cybersecurity domains. Through our modular architecture, any security tool—from fuzzing engines to cloud scanners, from mobile app analyzers to IoT security tools—can be integrated as a containerized module and controlled via AI agents.
**Our Vision**: SecPipe aims to be a **universal platform for security research** across all cybersecurity domains. Through our modular architecture, any security tool—from fuzzing engines to cloud scanners, from mobile app analyzers to IoT security tools—can be integrated as a containerized module and controlled via AI agents.
## Ways to Contribute
@@ -13,7 +13,7 @@ Thank you for your interest in contributing to FuzzForge AI! We welcome contribu
- **Documentation** - Improve guides, tutorials, and module documentation
- **Testing** - Help test new features and report issues
- **AI Integration** - Improve MCP tools and AI agent interactions
- **Tool Integrations** - Wrap existing security tools as FuzzForge modules
- **Tool Integrations** - Wrap existing security tools as SecPipe modules
## Contribution Guidelines
@@ -88,7 +88,7 @@ test(runner): add container execution tests
## Module Development
FuzzForge uses a modular architecture where security tools run as isolated containers. The `fuzzforge-modules-sdk` provides everything you need to create new modules.
SecPipe uses a modular architecture where security tools run as isolated containers. The `fuzzforge-modules-sdk` provides everything you need to create new modules.
**Documentation:**
- [Module SDK Documentation](fuzzforge-modules/fuzzforge-modules-sdk/README.md) - Complete SDK reference
@@ -211,7 +211,7 @@ FuzzForge uses a modular architecture where security tools run as isolated conta
### Module Types
FuzzForge is designed to support modules across **all cybersecurity domains**. The modular architecture allows any security tool to be containerized and integrated. Here are the main categories:
SecPipe is designed to support modules across **all cybersecurity domains**. The modular architecture allows any security tool to be containerized and integrated. Here are the main categories:
**Application Security**
- Fuzzing engines (coverage-guided, grammar-based, mutation-based)
@@ -341,7 +341,7 @@ uv run pytest
## Contributing to Core Features
Beyond modules, you can contribute to FuzzForge's core components.
Beyond modules, you can contribute to SecPipe's core components.
**Useful Resources:**
- [Project Structure](README.md) - Overview of the codebase
@@ -399,7 +399,7 @@ Beyond modules, you can contribute to FuzzForge's core components.
When reporting bugs, please include:
- **Environment**: OS, Python version, Docker version, uv version
- **FuzzForge Version**: Output of `uv run fuzzforge --version`
- **SecPipe Version**: Output of `uv run fuzzforge --version`
- **Module**: Which module or component is affected
- **Steps to Reproduce**: Clear steps to recreate the issue
- **Expected Behavior**: What should happen
@@ -538,7 +538,7 @@ Before submitting a new module:
## License
By contributing to FuzzForge AI, you agree that your contributions will be licensed under the same license as the project (see [LICENSE](LICENSE)).
By contributing to SecPipe AI, you agree that your contributions will be licensed under the same license as the project (see [LICENSE](LICENSE)).
For module contributions:
- Modules you create remain under the project license
@@ -558,6 +558,6 @@ Need help contributing?
---
**Thank you for making FuzzForge better!**
**Thank you for making SecPipe better!**
Every contribution, no matter how small, helps build a stronger security research platform. Whether you're creating a module for web security, cloud scanning, mobile analysis, or any other cybersecurity domain, your work makes FuzzForge more powerful and versatile for the entire security community!
Every contribution, no matter how small, helps build a stronger security research platform. Whether you're creating a module for web security, cloud scanning, mobile analysis, or any other cybersecurity domain, your work makes SecPipe more powerful and versatile for the entire security community!

View File

@@ -1,4 +1,4 @@
<h1 align="center"> FuzzForge AI</h1>
<h1 align="center">SecPipe</h1>
<h3 align="center">AI-Powered Security Research Orchestration via MCP</h3>
<p align="center">
@@ -6,7 +6,6 @@
<a href="LICENSE"><img src="https://img.shields.io/badge/license-BSL%201.1-blue" alt="License: BSL 1.1"></a>
<a href="https://www.python.org/downloads/"><img src="https://img.shields.io/badge/python-3.12%2B-blue" alt="Python 3.12+"/></a>
<a href="https://modelcontextprotocol.io"><img src="https://img.shields.io/badge/MCP-compatible-green" alt="MCP Compatible"/></a>
<a href="https://fuzzforge.ai"><img src="https://img.shields.io/badge/Website-fuzzforge.ai-purple" alt="Website"/></a>
</p>
<p align="center">
@@ -26,19 +25,19 @@
---
> 🚧 **FuzzForge AI is under active development.** Expect breaking changes and new features!
> 🚧 **SecPipe AI is under active development.** Expect breaking changes and new features!
---
## 🚀 Overview
**FuzzForge AI** is an open-source MCP server that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
**SecPipe AI** is an open-source MCP server that enables AI agents (GitHub Copilot, Claude, etc.) to orchestrate security research workflows through the **Model Context Protocol (MCP)**.
FuzzForge connects your AI assistant to **MCP tool hubs** — collections of containerized security tools that the agent can discover, chain, and execute autonomously. Instead of manually running security tools, describe what you want and let your AI assistant handle it.
SecPipe connects your AI assistant to **MCP tool hubs** — collections of containerized security tools that the agent can discover, chain, and execute autonomously. Instead of manually running security tools, describe what you want and let your AI assistant handle it.
### The Core: Hub Architecture
FuzzForge acts as a **meta-MCP server** — a single MCP endpoint that gives your AI agent access to tools from multiple MCP hub servers. Each hub server is a containerized security tool (Binwalk, YARA, Radare2, Nmap, etc.) that the agent can discover at runtime.
SecPipe acts as a **meta-MCP server** — a single MCP endpoint that gives your AI agent access to tools from multiple MCP hub servers. Each hub server is a containerized security tool (Binwalk, YARA, Radare2, Nmap, etc.) that the agent can discover at runtime.
- **🔍 Discovery**: The agent lists available hub servers and discovers their tools
- **🤖 AI-Native**: Hub tools provide agent context — usage tips, workflow guidance, and domain knowledge
@@ -75,7 +74,7 @@ Agent → Crash Analysis: Deduplicate and triage discovered crashes
## ⭐ Support the Project
If you find FuzzForge useful, please **star the repo** to support development! 🚀
If you find SecPipe useful, please **star the repo** to support development! 🚀
<a href="https://github.com/FuzzingLabs/fuzzforge_ai/stargazers">
<img src="https://img.shields.io/github/stars/FuzzingLabs/fuzzforge_ai?style=social" alt="GitHub Stars">
@@ -106,7 +105,7 @@ If you find FuzzForge useful, please **star the repo** to support development!
│ MCP Protocol (stdio)
┌─────────────────────────────────────────────────────────────────┐
FuzzForge MCP Server │
SecPipe MCP Server
│ │
│ Projects Hub Discovery Hub Execution │
│ ┌──────────────┐ ┌──────────────────┐ ┌───────────────────┐ │
@@ -135,7 +134,7 @@ If you find FuzzForge useful, please **star the repo** to support development!
## 🔧 MCP Security Hub
FuzzForge ships with built-in support for the **[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)** — a collection of 36 production-ready, Dockerized MCP servers covering offensive security:
SecPipe ships with built-in support for the **[MCP Security Hub](https://github.com/FuzzingLabs/mcp-security-hub)** — a collection of 36 production-ready, Dockerized MCP servers covering offensive security:
| Category | Servers | Examples |
|----------|---------|----------|
@@ -205,7 +204,7 @@ uv run fuzzforge mcp install claude-desktop
uv run fuzzforge mcp status
```
**Restart your editor** and your AI agent will have access to FuzzForge tools!
**Restart your editor** and your AI agent will have access to SecPipe tools!
---
@@ -220,7 +219,7 @@ Once installed, just talk to your AI agent:
"Run nuclei against https://example.com"
```
The agent will use FuzzForge to discover the right hub tools, chain them into a pipeline, and return results — all without you touching a terminal.
The agent will use SecPipe to discover the right hub tools, chain them into a pipeline, and return results — all without you touching a terminal.
See the [Usage Guide](USAGE.md) for detailed setup and advanced workflows.
@@ -230,7 +229,7 @@ See the [Usage Guide](USAGE.md) for detailed setup and advanced workflows.
```
fuzzforge_ai/
├── fuzzforge-mcp/ # MCP server — the core of FuzzForge
├── fuzzforge-mcp/ # MCP server — the core of SecPipe
├── fuzzforge-cli/ # Command-line interface & terminal UI
├── fuzzforge-common/ # Shared abstractions (containers, storage)
├── fuzzforge-runner/ # Container execution engine (Docker/Podman)
@@ -263,4 +262,4 @@ BSL 1.1 - See [LICENSE](LICENSE) for details.
<p align="center">
<strong>Maintained by <a href="https://fuzzinglabs.com">FuzzingLabs</a></strong>
<br>
</p>
</p>

View File

@@ -1,6 +1,6 @@
# FuzzForge AI Roadmap
# SecPipe AI Roadmap
This document outlines the planned features and development direction for FuzzForge AI.
This document outlines the planned features and development direction for SecPipe AI.
---
@@ -10,27 +10,27 @@ This document outlines the planned features and development direction for FuzzFo
**Status:** 🔄 Planned
Integrate [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) tools into FuzzForge, giving AI agents access to 28 MCP servers and 163+ security tools through a unified interface.
Integrate [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) tools into SecPipe, giving AI agents access to 28 MCP servers and 163+ security tools through a unified interface.
#### How It Works
Unlike native FuzzForge modules (built with the SDK), mcp-security-hub tools are **standalone MCP servers**. The integration will bridge these tools so they can be:
Unlike native SecPipe modules (built with the SDK), mcp-security-hub tools are **standalone MCP servers**. The integration will bridge these tools so they can be:
- Discovered via `list_modules` alongside native modules
- Executed through FuzzForge's orchestration layer
- Executed through SecPipe's orchestration layer
- Chained with native modules in workflows
| Aspect | Native Modules | MCP Hub Tools |
|--------|----------------|---------------|
| **Runtime** | FuzzForge SDK container | Standalone MCP server container |
| **Runtime** | SecPipe SDK container | Standalone MCP server container |
| **Protocol** | Direct execution | MCP-to-MCP bridge |
| **Configuration** | Module config | Tool-specific args |
| **Output** | FuzzForge results format | Tool-native format (normalized) |
| **Output** | SecPipe results format | Tool-native format (normalized) |
#### Goals
- Unified discovery of all available tools (native + hub)
- Orchestrate hub tools through FuzzForge's workflow engine
- Orchestrate hub tools through SecPipe's workflow engine
- Normalize outputs for consistent result handling
- No modification required to mcp-security-hub tools
@@ -65,7 +65,7 @@ AI Agent:
**Status:** 🔄 Planned
A graphical interface to manage FuzzForge without the command line.
A graphical interface to manage SecPipe without the command line.
#### Goals

View File

@@ -1,8 +1,8 @@
# FuzzForge AI Usage Guide
# SecPipe AI Usage Guide
This guide covers everything you need to know to get started with FuzzForge AI — from installation to linking your first MCP hub and running security research workflows with AI.
This guide covers everything you need to know to get started with SecPipe AI — from installation to linking your first MCP hub and running security research workflows with AI.
> **FuzzForge is designed to be used with AI agents** (GitHub Copilot, Claude, etc.) via MCP.
> **SecPipe is designed to be used with AI agents** (GitHub Copilot, Claude, etc.) via MCP.
> A terminal UI (`fuzzforge ui`) is provided for managing agents and hubs.
> The CLI is available for advanced users but the primary experience is through natural language interaction with your AI assistant.
@@ -27,7 +27,7 @@ This guide covers everything you need to know to get started with FuzzForge AI
- [GitHub Copilot](#github-copilot)
- [Claude Code (CLI)](#claude-code-cli)
- [Claude Desktop](#claude-desktop)
- [Using FuzzForge with AI](#using-fuzzforge-with-ai)
- [Using SecPipe with AI](#using-secpipe-with-ai)
- [CLI Reference](#cli-reference)
- [Environment Variables](#environment-variables)
- [Troubleshooting](#troubleshooting)
@@ -76,13 +76,13 @@ git clone git@github.com:FuzzingLabs/mcp-security-hub.git ~/.fuzzforge/hubs/mcp-
# Restart your AI agent — done!
```
> **Note:** FuzzForge uses Docker by default. Podman is also supported via `--engine podman`.
> **Note:** SecPipe uses Docker by default. Podman is also supported via `--engine podman`.
---
## Prerequisites
Before installing FuzzForge AI, ensure you have:
Before installing SecPipe AI, ensure you have:
- **Python 3.12+** — [Download Python](https://www.python.org/downloads/)
- **uv** package manager — [Install uv](https://docs.astral.sh/uv/)
@@ -131,7 +131,7 @@ cd fuzzforge_ai
uv sync
```
This installs all FuzzForge components in a virtual environment.
This installs all SecPipe components in a virtual environment.
### 3. Verify Installation
@@ -143,7 +143,7 @@ uv run fuzzforge --help
## Terminal UI
FuzzForge ships with a terminal user interface (TUI) built on [Textual](https://textual.textualize.io/) for managing AI agents and MCP hub servers from a single dashboard.
SecPipe ships with a terminal user interface (TUI) built on [Textual](https://textual.textualize.io/) for managing AI agents and MCP hub servers from a single dashboard.
### Launching the UI
@@ -173,11 +173,11 @@ The main screen is split into two panels:
Select an agent row in the AI Agents table and press `Enter`:
- **If the agent is not linked** → a setup dialog opens asking for your container engine (Docker or Podman), then installs the FuzzForge MCP configuration
- **If the agent is not linked** → a setup dialog opens asking for your container engine (Docker or Podman), then installs the SecPipe MCP configuration
- **If the agent is already linked** → a confirmation dialog offers to unlink it (removes the `fuzzforge` entry without touching other MCP servers)
The setup auto-detects:
- FuzzForge installation root
- SecPipe installation root
- Docker/Podman socket path
- Hub configuration from `hub-config.json`
@@ -188,7 +188,7 @@ Press `h` to open the hub manager. This is where you manage your MCP hub reposit
| Button | Action |
|--------|--------|
| **FuzzingLabs Hub** | One-click clone of the official [mcp-security-hub](https://github.com/FuzzingLabs/mcp-security-hub) repository — clones to `~/.fuzzforge/hubs/mcp-security-hub`, scans for tools, and registers them in `hub-config.json` |
| **Link Path** | Link any local directory as a hub — enter a name and path, FuzzForge scans it for `category/tool-name/Dockerfile` patterns |
| **Link Path** | Link any local directory as a hub — enter a name and path, SecPipe scans it for `category/tool-name/Dockerfile` patterns |
| **Clone URL** | Clone any git repository and link it as a hub |
| **Remove** | Unlink the selected hub and remove its servers from the configuration |
@@ -219,7 +219,7 @@ my-hub/
└── ...
```
FuzzForge scans for the pattern `category/tool-name/Dockerfile` and auto-generates server configuration entries for each discovered tool.
SecPipe scans for the pattern `category/tool-name/Dockerfile` and auto-generates server configuration entries for each discovered tool.
### FuzzingLabs Security Hub
@@ -290,7 +290,7 @@ uv run fuzzforge mcp install copilot
```
The command auto-detects:
- **FuzzForge root** — Where FuzzForge is installed
- **SecPipe root** — Where SecPipe is installed
- **Docker socket** — Auto-detects `/var/run/docker.sock`
**Optional overrides:**
@@ -298,7 +298,7 @@ The command auto-detects:
uv run fuzzforge mcp install copilot --engine podman
```
**After installation:** Restart VS Code. FuzzForge tools appear in GitHub Copilot Chat.
**After installation:** Restart VS Code. SecPipe tools appear in GitHub Copilot Chat.
### Claude Code (CLI)
@@ -306,7 +306,7 @@ uv run fuzzforge mcp install copilot --engine podman
uv run fuzzforge mcp install claude-code
```
Installs to `~/.claude.json`. FuzzForge tools are available from any directory after restarting Claude.
Installs to `~/.claude.json`. SecPipe tools are available from any directory after restarting Claude.
### Claude Desktop
@@ -332,15 +332,15 @@ uv run fuzzforge mcp uninstall claude-desktop
---
## Using FuzzForge with AI
## Using SecPipe with AI
Once MCP is configured and hub images are built, interact with FuzzForge through natural language with your AI assistant.
Once MCP is configured and hub images are built, interact with SecPipe through natural language with your AI assistant.
### Example Conversations
**Discover available tools:**
```
You: "What security tools are available in FuzzForge?"
You: "What security tools are available in SecPipe?"
AI: Queries hub tools → "I found 15 tools across categories: nmap for
port scanning, binwalk for firmware analysis, semgrep for code
scanning, cargo-fuzzer for Rust fuzzing..."
@@ -402,10 +402,10 @@ uv run fuzzforge project results <id> # Get execution results
## Environment Variables
Configure FuzzForge using environment variables:
Configure SecPipe using environment variables:
```bash
# Override the FuzzForge installation root (auto-detected from cwd by default)
# Override the SecPipe installation root (auto-detected from cwd by default)
export FUZZFORGE_ROOT=/path/to/fuzzforge_ai
# Override the user-global data directory (default: ~/.fuzzforge)
@@ -492,7 +492,7 @@ export FUZZFORGE_ENGINE=podman
### Hub Registry
FuzzForge stores linked hub information in `~/.fuzzforge/hubs.json`. If something goes wrong:
SecPipe stores linked hub information in `~/.fuzzforge/hubs.json`. If something goes wrong:
```bash
# View registry

View File

@@ -1,3 +1,3 @@
# FuzzForge CLI
# SecPipe CLI
...

View File

@@ -1,3 +1,3 @@
# FuzzForge Common
# SecPipe Common
...

View File

@@ -1,10 +1,10 @@
# FuzzForge MCP
# SecPipe MCP
Model Context Protocol (MCP) server that enables AI agents to orchestrate FuzzForge security research modules.
Model Context Protocol (MCP) server that enables AI agents to orchestrate SecPipe security research modules.
## Overview
FuzzForge MCP provides a standardized interface for AI agents (Claude Code, GitHub Copilot, Claude Desktop) to:
SecPipe MCP provides a standardized interface for AI agents (Claude Code, GitHub Copilot, Claude Desktop) to:
- List and discover available security modules
- Execute modules in isolated containers
@@ -17,7 +17,7 @@ The server communicates with AI agents using the [Model Context Protocol](https:
### Automatic Installation (Recommended)
Use the FuzzForge CLI to automatically configure MCP for your AI agent:
Use the SecPipe CLI to automatically configure MCP for your AI agent:
```bash
# For GitHub Copilot
@@ -110,7 +110,7 @@ The MCP server exposes the following tools to AI agents:
### Project Management
- **`init_project`** - Initialize a new FuzzForge project
- **`init_project`** - Initialize a new SecPipe project
- **`set_project_assets`** - Set initial assets (source code, contracts, etc.) for the project
### Module Management
@@ -135,7 +135,7 @@ The server also provides resources for accessing:
### From AI Agent (e.g., Claude Code)
Once configured, AI agents can interact with FuzzForge naturally:
Once configured, AI agents can interact with SecPipe naturally:
```text
User: List the available security modules
@@ -178,14 +178,14 @@ uv run uvicorn fuzzforge_mcp.application:app --reload
│ stdio/JSON-RPC
┌─────────────────────────────────────────┐
FuzzForge MCP Server │
SecPipe MCP Server
│ Tools: init_project, list_modules, │
│ execute_module, execute_workflow│
└─────────────────────────────────────────┘
┌─────────────────────────────────────────┐
FuzzForge Runner │
SecPipe Runner
│ Podman/Docker Orchestration │
└─────────────────────────────────────────┘
@@ -212,6 +212,6 @@ uv run pytest
## See Also
- [FuzzForge Main README](../README.md) - Overall project documentation
- [SecPipe Main README](../README.md) - Overall project documentation
- [Module SDK](../fuzzforge-modules/fuzzforge-modules-sdk/README.md) - Creating custom modules
- [Model Context Protocol](https://modelcontextprotocol.io/) - MCP specification

View File

@@ -10,6 +10,7 @@ dependencies = [
"fuzzforge-common==0.0.1",
"pydantic==2.12.4",
"pydantic-settings==2.12.0",
"pyyaml>=6.0",
"structlog==25.5.0",
]

View File

@@ -53,40 +53,30 @@ Typical workflow:
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`
Skill packs:
Use `list_skills` to see available analysis pipelines (e.g. firmware-analysis).
Load one with `load_skill("firmware-analysis")` to get domain-specific guidance
and a scoped list of relevant hub servers. Skill packs describe the methodology —
follow the pipeline steps while adapting to what you find at each stage.
Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools.
Artifact tracking:
After each `execute_hub_tool` call, new output files are automatically tracked.
Use `list_artifacts` to find files produced by previous tools instead of parsing
paths from tool output text. Filter by source server or file type.
File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
Stateful tools:
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch
- Some tools require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
Firmware analysis pipeline (when analyzing firmware images):
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
Run steps 3 and 4 outputs feed into a final triage summary.
radare2-mcp agent context (upstream tool — no embedded context):
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
returns an empty result, fall back to `list_symbols` and search for dangerous function names
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
- For string extraction, use `run_command` with `iz` (data section strings).
The `list_all_strings` tool may return garbled output for large binaries.
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
""",
lifespan=lifespan,
)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast
from fastmcp.server.dependencies import get_context
@@ -21,6 +21,9 @@ _current_project_path: Path | None = None
# Singleton storage instance
_storage: LocalStorage | None = None
# Currently loaded skill pack (set by load_skill)
_active_skill: dict[str, Any] | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
@@ -75,3 +78,22 @@ def get_storage() -> LocalStorage:
settings = get_settings()
_storage = LocalStorage(settings.storage.path)
return _storage
def set_active_skill(skill: dict[str, Any] | None) -> None:
"""Set (or clear) the currently loaded skill pack.
:param skill: Parsed skill dict, or None to unload.
"""
global _active_skill
_active_skill = skill
def get_active_skill() -> dict[str, Any] | None:
"""Get the currently loaded skill pack.
:return: Active skill dict, or None if no skill is loaded.
"""
return _active_skill

View File

@@ -0,0 +1,44 @@
name: firmware-analysis
description: |
## Firmware Binary Vulnerability Analysis
Goal: Find exploitable vulnerabilities in firmware images.
### Pipeline
1. **Extract the filesystem** from the firmware image.
Look for SquashFS, JFFS2, CPIO, or other embedded filesystems.
2. **Scan extracted files for vulnerability patterns.**
Use vulnerability-focused rules to identify binaries with dangerous
function calls (system, strcpy, popen, sprintf, gets).
Prioritize targets by match count — the binary with the most hits
is the highest-priority target.
3. **Deep-analyze the highest-priority binary.**
Open a persistent analysis session. Look for:
- Dangerous function calls with unsanitized input
- Hardcoded credentials or backdoor strings
- Network service listeners with weak input validation
Focus on confirming whether flagged patterns are actually reachable.
4. **Search for known CVEs** matching library version strings found
during analysis. Cross-reference with public exploit databases.
5. **Compile findings** with severity ratings:
- CRITICAL: confirmed remote code execution paths
- HIGH: command injection or buffer overflow with reachable input
- MEDIUM: hardcoded credentials, weak crypto, format string issues
- LOW: informational findings (library versions, service fingerprints)
### Key files to prioritize in extracted firmware
- `usr/sbin/httpd`, `usr/bin/httpd` — web servers (high-priority)
- `etc/shadow`, `etc/passwd` — credential files
- `www/cgi-bin/*` — CGI scripts (command injection vectors)
- Custom binaries in `usr/sbin/`, `usr/bin/` — vendor attack surface
servers:
- binwalk-mcp
- yara-mcp
- radare2-mcp
- searchsploit-mcp

View File

@@ -0,0 +1,90 @@
name: go-fuzzing
description: |
## Go Fuzzing Vulnerability Discovery
Goal: Find memory safety bugs, panics, and logic errors in a Go project
using native Go fuzzing (go test -fuzz).
### Pipeline
1. **Analyze the Go project** to understand its attack surface.
Use `go_analyze` to scan the codebase and identify:
- Fuzzable entry points: functions accepting `[]byte`, `string`,
`io.Reader`, or other parser-like signatures (`Parse*`, `Decode*`,
`Unmarshal*`, `Read*`, `Open*`)
- Existing `Fuzz*` test functions already in `*_test.go` files
- Unsafe/cgo usage that increases the severity of any bugs found
- Known CVEs via govulncheck (enable with `run_vulncheck: true`)
If there are **no existing Fuzz targets**, stop here and report
that the project needs fuzz harnesses written first, listing the
recommended entry points from the analysis.
2. **Test harness quality** before committing to a long fuzzing campaign.
Use `go_harness_test` to evaluate each Fuzz* function:
- Compilation check — does `go test -c` succeed?
- Seed execution — do the seed corpus entries pass without panics?
- Short fuzzing trial — does the harness sustain fuzzing for 15-30s?
- Quality score (0-100): ≥80 = production-ready, ≥50 = needs work, <50 = broken
**Decision point:**
- If all harnesses are **broken** (score < 50): stop and report issues.
The user needs to fix them before fuzzing is useful.
- If some are **production-ready** or **needs-improvement** (score ≥ 50):
proceed with those targets to step 3.
- Skip broken harnesses — do not waste fuzzing time on them.
3. **Run fuzzing** on the viable targets.
Use `go_fuzz_run` for a bounded campaign:
- Set `duration` based on project size: 60-120s for quick scan,
300-600s for thorough analysis.
- Pass only the targets that scored ≥ 50 in step 2 via the `targets`
parameter — do not fuzz broken harnesses.
- The fuzzer collects crash inputs to `/app/output/crashes/{FuzzName}/`.
**Alternative — continuous mode** for deeper exploration:
- Use `go_fuzz_start` to begin background fuzzing.
- Periodically check `go_fuzz_status` to monitor progress.
- Use `go_fuzz_stop` when satisfied or when crashes are found.
If **no crashes** are found after a reasonable duration, report that
the fuzzing campaign completed cleanly with the execution metrics.
4. **Analyze crashes** found during fuzzing.
Use `go_crash_analyze` to process the crash inputs:
- Reproduction: re-run each crash input to confirm it's real
- Classification: categorize by type (nil-dereference, index-out-of-range,
slice-bounds, divide-by-zero, stack-overflow, data-race, panic, etc.)
- Severity assignment: critical / high / medium / low
- Deduplication: group crashes by signature (target + type + top 3 frames)
Skip this step if no crashes were found in step 3.
5. **Compile the vulnerability report** with findings organized by severity:
- **CRITICAL**: nil-dereference, segfault, data-race, stack-overflow
- **HIGH**: index/slice out of bounds, allocation overflow
- **MEDIUM**: integer overflow, divide by zero, explicit panics
- **LOW**: timeout, unclassified crashes
For each unique crash, include:
- The fuzz target that triggered it
- The crash type and root cause function + file + line
- Whether it was reproducible
- The crash input file path for manual investigation
### What the user's project needs
- A `go.mod` file (any Go module)
- At least one `*_test.go` file with `func FuzzXxx(f *testing.F)` functions
- Seed corpus entries added via `f.Add(...)` in the Fuzz functions
### Interpretation guide
- **govulncheck CVEs** (step 1) are known dependency vulnerabilities — report separately
- **Fuzzer crashes** (steps 3-4) are new bugs found by fuzzing the project's own code
- High execution counts with zero crashes = good sign (code is robust to that input space)
- Low quality scores in step 2 usually mean the harness needs better seed corpus or input handling
servers:
- go-analyzer-mcp
- go-harness-tester-mcp
- go-fuzzer-mcp
- go-crash-analyzer-mcp

View File

@@ -13,12 +13,15 @@ from __future__ import annotations
import json
import logging
import mimetypes
from datetime import UTC, datetime
from pathlib import Path
from tarfile import open as Archive # noqa: N812
from typing import Any
from uuid import uuid4
import yaml
logger = logging.getLogger("fuzzforge-mcp")
#: Name of the FuzzForge storage directory within projects.
@@ -90,6 +93,7 @@ class LocalStorage:
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"output/\n"
"artifacts.json\n"
"!config.json\n"
)
@@ -273,3 +277,377 @@ class LocalStorage:
except Exception as exc:
msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc
# ------------------------------------------------------------------
# Artifact tracking
# ------------------------------------------------------------------
def _artifacts_path(self, project_path: Path) -> Path:
"""Get the path to the artifacts registry file.
:param project_path: Path to the project directory.
:returns: Path to artifacts.json.
"""
return self._get_project_path(project_path) / "artifacts.json"
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
"""Load the artifact registry from disk.
:param project_path: Path to the project directory.
:returns: List of artifact dicts.
"""
path = self._artifacts_path(project_path)
if path.exists():
try:
return json.loads(path.read_text()) # type: ignore[no-any-return]
except (json.JSONDecodeError, OSError):
return []
return []
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
"""Persist the artifact registry to disk.
:param project_path: Path to the project directory.
:param artifacts: Full artifact list to write.
"""
path = self._artifacts_path(project_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(artifacts, indent=2, default=str))
def _classify_file(self, file_path: Path) -> str:
"""Classify a file into a human-friendly type string.
:param file_path: Path to the file.
:returns: Type string (e.g. "elf-binary", "text", "directory").
"""
mime, _ = mimetypes.guess_type(str(file_path))
suffix = file_path.suffix.lower()
# Try reading ELF magic for binaries with no extension
if mime is None and suffix == "":
try:
header = file_path.read_bytes()[:4]
if header == b"\x7fELF":
return "elf-binary"
except OSError:
pass
if mime:
if "json" in mime:
return "json"
if "text" in mime or "xml" in mime or "yaml" in mime:
return "text"
if "image" in mime:
return "image"
if "octet-stream" in mime:
return "binary"
type_map: dict[str, str] = {
".json": "json",
".sarif": "sarif",
".md": "markdown",
".txt": "text",
".log": "text",
".csv": "csv",
".yaml": "yaml",
".yml": "yaml",
".xml": "xml",
".html": "html",
".elf": "elf-binary",
".so": "elf-binary",
".bin": "binary",
".gz": "archive",
".tar": "archive",
".zip": "archive",
}
return type_map.get(suffix, "binary")
def scan_artifacts(
self,
project_path: Path,
server_name: str,
tool_name: str,
) -> list[dict[str, Any]]:
"""Scan the output directory for new or modified files and register them.
Compares the current state of .fuzzforge/output/ against the existing
artifact registry and registers any new or modified files.
:param project_path: Path to the project directory.
:param server_name: Hub server that produced the artifacts.
:param tool_name: Tool that produced the artifacts.
:returns: List of newly registered artifact dicts.
"""
output_path = self.get_project_output_path(project_path)
if output_path is None or not output_path.exists():
return []
existing = self._load_artifacts(project_path)
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
now = datetime.now(tz=UTC).isoformat()
new_artifacts: list[dict[str, Any]] = []
for file_path in output_path.rglob("*"):
if not file_path.is_file():
continue
# Use the container-style path (/app/output/...) so it's
# directly usable in subsequent tool calls.
relative = file_path.relative_to(output_path)
container_path = f"/app/output/{relative}"
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
prev = known.get(container_path)
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
continue # Unchanged — skip
artifact: dict[str, Any] = {
"path": container_path,
"host_path": str(file_path),
"type": self._classify_file(file_path),
"size": size,
"mtime": mtime,
"source_server": server_name,
"source_tool": tool_name,
"registered_at": now,
}
if prev:
# Update existing entry in-place
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
existing[idx] = artifact
else:
existing.append(artifact)
new_artifacts.append(artifact)
if new_artifacts:
self._save_artifacts(project_path, existing)
logger.info(
"Registered %d new artifact(s) from %s:%s",
len(new_artifacts),
server_name,
tool_name,
)
return new_artifacts
def list_artifacts(
self,
project_path: Path,
*,
source: str | None = None,
artifact_type: str | None = None,
) -> list[dict[str, Any]]:
"""List registered artifacts, with optional filters.
:param project_path: Path to the project directory.
:param source: Filter by source server name.
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
:returns: List of matching artifact dicts.
"""
artifacts = self._load_artifacts(project_path)
if source:
artifacts = [a for a in artifacts if a.get("source_server") == source]
if artifact_type:
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
return artifacts
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
"""Get a single artifact by its container path.
:param project_path: Path to the project directory.
:param path: Container path of the artifact (e.g. /app/output/...).
:returns: Artifact dict, or None if not found.
"""
artifacts = self._load_artifacts(project_path)
for artifact in artifacts:
if artifact["path"] == path:
return artifact
return None
# ------------------------------------------------------------------
# Reports
# ------------------------------------------------------------------
def list_execution_metadata(self, project_path: Path) -> list[dict[str, Any]]:
"""Load full execution metadata for all runs, sorted oldest-first.
:param project_path: Path to the project directory.
:returns: List of full metadata dicts (includes arguments, result).
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
metadata: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir()):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
try:
metadata.append(json.loads(meta_path.read_text()))
except (json.JSONDecodeError, OSError):
continue
return metadata
def save_report(
self,
project_path: Path,
content: str,
fmt: str = "markdown",
) -> Path:
"""Save a generated report to .fuzzforge/reports/.
:param project_path: Path to the project directory.
:param content: Report content string.
:param fmt: Format name used to choose file extension.
:returns: Path to the saved report file.
"""
reports_dir = self._get_project_path(project_path) / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
ext_map = {"markdown": "md", "json": "json", "sarif": "sarif"}
ext = ext_map.get(fmt, "md")
filename = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_report.{ext}"
report_path = reports_dir / filename
report_path.write_text(content)
logger.info("Saved report: %s", report_path)
return report_path
def list_reports(self, project_path: Path) -> list[dict[str, Any]]:
"""List generated reports for a project, newest first.
:param project_path: Path to the project directory.
:returns: List of report dicts with filename, host_path, size, created_at.
"""
reports_dir = self._get_project_path(project_path) / "reports"
if not reports_dir.exists():
return []
reports: list[dict[str, Any]] = []
for report_path in sorted(reports_dir.iterdir(), reverse=True):
if report_path.is_file():
stat = report_path.stat()
reports.append({
"filename": report_path.name,
"host_path": str(report_path),
"size": stat.st_size,
"created_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
})
return reports
# ------------------------------------------------------------------
# Skill packs
# ------------------------------------------------------------------
#: Directory containing built-in skill packs shipped with FuzzForge.
_BUILTIN_SKILLS_DIR: Path = Path(__file__).parent / "skills"
def _skill_dirs(self, project_path: Path) -> list[Path]:
"""Return skill directories in priority order (project-local first).
:param project_path: Path to the project directory.
:returns: List of directories that may contain skill YAML files.
"""
dirs: list[Path] = []
project_skills = self._get_project_path(project_path) / "skills"
if project_skills.is_dir():
dirs.append(project_skills)
if self._BUILTIN_SKILLS_DIR.is_dir():
dirs.append(self._BUILTIN_SKILLS_DIR)
return dirs
def list_skills(self, project_path: Path) -> list[dict[str, Any]]:
"""List available skill packs from project and built-in directories.
:param project_path: Path to the project directory.
:returns: List of skill summaries (name, description first line, source).
"""
seen: set[str] = set()
skills: list[dict[str, Any]] = []
for skill_dir in self._skill_dirs(project_path):
for yaml_path in sorted(skill_dir.glob("*.yaml")):
skill = self._parse_skill_file(yaml_path)
if skill is None:
continue
name = skill["name"]
if name in seen:
continue # project-local overrides built-in
seen.add(name)
desc = skill.get("description", "")
first_line = desc.strip().split("\n", 1)[0] if desc else ""
is_project = ".fuzzforge" in str(yaml_path.parent)
source = "project" if is_project else "builtin"
skills.append({
"name": name,
"summary": first_line,
"source": source,
"servers": skill.get("servers", []),
})
return skills
def load_skill(self, project_path: Path, name: str) -> dict[str, Any] | None:
"""Load a skill pack by name.
Searches project-local skills first, then built-in skills.
:param project_path: Path to the project directory.
:param name: Skill name (filename without .yaml extension).
:returns: Parsed skill dict with name, description, servers — or None.
"""
for skill_dir in self._skill_dirs(project_path):
yaml_path = skill_dir / f"{name}.yaml"
if yaml_path.is_file():
return self._parse_skill_file(yaml_path)
return None
@staticmethod
def _parse_skill_file(yaml_path: Path) -> dict[str, Any] | None:
"""Parse and validate a skill YAML file.
:param yaml_path: Path to the YAML file.
:returns: Parsed skill dict, or None if invalid.
"""
try:
data = yaml.safe_load(yaml_path.read_text())
except (yaml.YAMLError, OSError):
logger.warning("Failed to parse skill file: %s", yaml_path)
return None
if not isinstance(data, dict):
return None
name = data.get("name")
if not name or not isinstance(name, str):
logger.warning("Skill file missing 'name': %s", yaml_path)
return None
return {
"name": name,
"description": data.get("description", ""),
"servers": data.get("servers", []),
}

View File

@@ -2,12 +2,13 @@
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, projects
from fuzzforge_mcp.tools import hub, projects, reports
mcp: FastMCP = FastMCP()
mcp.mount(projects.mcp)
mcp.mount(hub.mcp)
mcp.mount(reports.mcp)
__all__ = [
"mcp",

View File

@@ -8,7 +8,13 @@ from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
from fuzzforge_mcp.dependencies import (
get_active_skill,
get_project_path,
get_storage,
set_active_skill,
set_current_project_path,
)
mcp: FastMCP = FastMCP()
@@ -146,3 +152,166 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
except Exception as exception:
message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_artifacts(
source: str | None = None,
artifact_type: str | None = None,
) -> dict[str, Any]:
"""List all artifacts produced by hub tools in the current project.
Artifacts are files created by tool executions in /app/output/.
They are automatically tracked after each execute_hub_tool call.
:param source: Filter by source server name (e.g. "binwalk-mcp").
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
:return: List of artifacts with path, type, size, and source info.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifacts = storage.list_artifacts(
project_path,
source=source,
artifact_type=artifact_type,
)
return {
"success": True,
"artifacts": artifacts,
"count": len(artifacts),
}
except Exception as exception:
message: str = f"Failed to list artifacts: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_artifact(path: str) -> dict[str, Any]:
"""Get metadata for a specific artifact by its container path.
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
:return: Artifact metadata including path, type, size, source tool, and timestamps.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifact = storage.get_artifact(project_path, path)
if artifact is None:
return {
"success": False,
"path": path,
"error": "Artifact not found",
}
return {
"success": True,
"artifact": artifact,
}
except Exception as exception:
message: str = f"Failed to get artifact: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_skills() -> dict[str, Any]:
"""List available skill packs.
Skill packs provide domain-specific pipeline guidance for AI agents.
They describe analysis methodologies and list the hub servers needed.
Project-local skills (.fuzzforge/skills/) override built-in skills.
:return: List of available skills with name, summary, and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skills = storage.list_skills(project_path)
active = get_active_skill()
return {
"success": True,
"skills": skills,
"count": len(skills),
"active_skill": active["name"] if active else None,
}
except Exception as exception:
message: str = f"Failed to list skills: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def load_skill(name: str) -> dict[str, Any]:
"""Load a skill pack to guide the current analysis session.
A skill pack provides a pipeline description and specifies which
hub servers are relevant. Once loaded, the pipeline description
is available as context and only the listed servers need discovery.
:param name: Skill name (e.g. "firmware-analysis").
:return: Loaded skill with full description and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skill = storage.load_skill(project_path, name)
if skill is None:
return {
"success": False,
"name": name,
"error": f"Skill '{name}' not found. Use list_skills to see available skills.",
}
set_active_skill(skill)
return {
"success": True,
"name": skill["name"],
"description": skill["description"],
"servers": skill["servers"],
"message": f"Skill '{name}' loaded. Follow the pipeline description above. "
f"Discover tools from: {', '.join(skill['servers'])}" if skill["servers"] else f"Skill '{name}' loaded.",
}
except Exception as exception:
message: str = f"Failed to load skill: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def unload_skill() -> dict[str, Any]:
"""Unload the currently active skill pack.
Clears the active pipeline guidance. The agent returns to generic mode.
:return: Confirmation of unload.
"""
active = get_active_skill()
if active is None:
return {
"success": True,
"message": "No skill was loaded.",
}
name = active["name"]
set_active_skill(None)
return {
"success": True,
"message": f"Skill '{name}' unloaded.",
}

View File

@@ -0,0 +1,346 @@
"""Report generation tools for FuzzForge MCP."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
# Maximum characters of tool output to embed per execution in markdown reports.
_OUTPUT_TRUNCATE_CHARS: int = 2000
# ------------------------------------------------------------------
# Formatting helpers
# ------------------------------------------------------------------
def _format_size(size: int) -> str:
"""Format a byte count as a human-friendly string."""
for unit in ("B", "KB", "MB", "GB"):
if size < 1024: # noqa: PLR2004
return f"{size} {unit}" if unit == "B" else f"{size:.1f} {unit}"
size //= 1024
return f"{size:.1f} TB"
def _truncate(text: str, max_chars: int = _OUTPUT_TRUNCATE_CHARS) -> str:
"""Truncate text and append an indicator when truncated."""
if len(text) <= max_chars:
return text
omitted = len(text) - max_chars
return text[:max_chars] + f"\n... [{omitted} chars omitted]"
def _extract_output_text(result: dict[str, Any]) -> str:
"""Extract a human-readable output string from an execution result dict.
Handles both flat dicts (``{"output": "..."}`` or ``{"content": [...]}``),
and the nested format stored by ``record_execution`` where the MCP tool
response is stored one level deeper under the ``"result"`` key.
"""
# Flat output field (most hub tools set this)
output = result.get("output", "")
if output and isinstance(output, str):
return output
# MCP content list format — check both at this level and one level down
for candidate in (result, result.get("result") or {}):
content = candidate.get("content", [])
if isinstance(content, list):
texts = [item.get("text", "") for item in content if isinstance(item, dict)]
combined = "\n".join(t for t in texts if t)
if combined:
return combined
parts: list[str] = []
if result.get("stdout"):
parts.append(f"stdout:\n{result['stdout']}")
if result.get("stderr"):
parts.append(f"stderr:\n{result['stderr']}")
return "\n".join(parts)
# ------------------------------------------------------------------
# Report builders
# ------------------------------------------------------------------
def _report_header(
title: str,
project_path: Path,
assets_path: Path | None,
now: str,
) -> list[str]:
"""Build the header block of the Markdown report."""
lines = [
f"# {title}",
"",
f"**Generated:** {now} ",
f"**Project:** `{project_path}` ",
]
if assets_path:
lines.append(f"**Assets:** `{assets_path}` ")
lines += ["", "---", ""]
return lines
def _report_summary(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the summary table block of the Markdown report."""
success_count = sum(1 for e in executions if e.get("success"))
fail_count = len(executions) - success_count
tool_ids = list(dict.fromkeys(
f"{e.get('server', '?')}:{e.get('tool', '?')}" for e in executions
))
timestamps = [e["timestamp"] for e in executions if e.get("timestamp")]
lines = [
"## Summary",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Total executions | {len(executions)} |",
f"| Successful | {success_count} |",
f"| Failed | {fail_count} |",
f"| Artifacts produced | {len(artifacts)} |",
f"| Unique tools | {len(set(tool_ids))} |",
]
if len(timestamps) >= 2: # noqa: PLR2004
lines.append(f"| Time range | {timestamps[0]}{timestamps[-1]} |")
elif timestamps:
lines.append(f"| Time | {timestamps[0]} |")
lines.append("")
if tool_ids:
lines += [", ".join(f"`{t}`" for t in tool_ids), ""]
lines[-2] = f"**Tools used:** {lines[-2]}"
lines += ["---", ""]
return lines
def _report_timeline(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the execution timeline block of the Markdown report."""
if not executions:
return []
lines: list[str] = ["## Execution Timeline", ""]
for idx, meta in enumerate(executions, 1):
server = meta.get("server", "unknown")
tool = meta.get("tool", "unknown")
ts = meta.get("timestamp", "")
status = "✓ Success" if meta.get("success") else "✗ Failed"
lines.append(f"### [{idx}] {server} :: {tool}{ts}")
lines += ["", f"- **Status:** {status}"]
arguments = meta.get("arguments") or {}
if arguments:
lines.append("- **Arguments:**")
for k, v in arguments.items():
lines.append(f" - `{k}`: `{v}`")
result = meta.get("result") or {}
output_text = _extract_output_text(result).strip()
if output_text:
truncated = _truncate(output_text)
lines += ["- **Output:**", " ```"]
lines.extend(f" {line}" for line in truncated.splitlines())
lines.append(" ```")
exec_artifacts = [
a for a in artifacts
if a.get("source_server") == server and a.get("source_tool") == tool
]
if exec_artifacts:
lines.append(f"- **Artifacts produced:** {len(exec_artifacts)} file(s)")
lines.append("")
return lines
def _report_artifacts(artifacts: list[dict[str, Any]]) -> list[str]:
"""Build the artifacts section of the Markdown report."""
if not artifacts:
return []
lines: list[str] = ["---", "", "## Artifacts", "", f"**{len(artifacts)} file(s) total**", ""]
by_type: dict[str, list[dict[str, Any]]] = {}
for a in artifacts:
by_type.setdefault(a.get("type", "unknown"), []).append(a)
for art_type, arts in sorted(by_type.items()):
lines += [
f"### {art_type} ({len(arts)})",
"",
"| Path | Size | Source |",
"|------|------|--------|",
]
for a in arts:
path = a.get("path", "")
size = _format_size(a.get("size", 0))
source = f"`{a.get('source_server', '?')}:{a.get('source_tool', '?')}`"
lines.append(f"| `{path}` | {size} | {source} |")
lines.append("")
return lines
def _build_markdown_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a Markdown-formatted analysis report."""
now = datetime.now(tz=UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
lines: list[str] = (
_report_header(title, project_path, assets_path, now)
+ _report_summary(executions, artifacts)
+ _report_timeline(executions, artifacts)
+ _report_artifacts(artifacts)
+ ["---", "", "*Generated by FuzzForge*", ""]
)
return "\n".join(lines)
def _build_json_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a JSON-formatted analysis report."""
success_count = sum(1 for e in executions if e.get("success"))
report = {
"title": title,
"generated_at": datetime.now(tz=UTC).isoformat(),
"project_path": str(project_path),
"assets_path": str(assets_path) if assets_path else None,
"summary": {
"total_executions": len(executions),
"successful": success_count,
"failed": len(executions) - success_count,
"artifact_count": len(artifacts),
},
"executions": executions,
"artifacts": artifacts,
}
return json.dumps(report, indent=2, default=str)
def _write_to_path(content: str, path: Path) -> None:
"""Write report content to an explicit output path (sync helper)."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
# ------------------------------------------------------------------
# MCP tools
# ------------------------------------------------------------------
@mcp.tool
async def generate_report(
title: str | None = None,
report_format: str = "markdown",
output_path: str | None = None,
) -> dict[str, Any]:
"""Generate a comprehensive analysis report for the current project.
Aggregates all execution history, tool outputs, and tracked artifacts
into a structured report. The report is saved to `.fuzzforge/reports/`
and its content is returned so the agent can read it immediately.
:param title: Optional report title. Defaults to the project folder name.
:param report_format: Output format — ``"markdown"`` (default) or ``"json"``.
:param output_path: Optional absolute path to save the report. When omitted,
the report is saved automatically to `.fuzzforge/reports/`.
:return: Report content, save path, and counts of included items.
"""
storage = get_storage()
project_path = get_project_path()
try:
fmt = report_format.lower().strip()
if fmt not in ("markdown", "json"):
return {
"success": False,
"error": f"Unsupported format '{fmt}'. Use 'markdown' or 'json'.",
}
executions = storage.list_execution_metadata(project_path)
artifacts = storage.list_artifacts(project_path)
assets_path = storage.get_project_assets_path(project_path)
resolved_title = title or f"FuzzForge Analysis Report — {project_path.name}"
if fmt == "json":
content = _build_json_report(
resolved_title, project_path, assets_path, executions, artifacts
)
else:
content = _build_markdown_report(
resolved_title, project_path, assets_path, executions, artifacts
)
if output_path:
save_path = Path(output_path)
_write_to_path(content, save_path)
else:
save_path = storage.save_report(project_path, content, fmt)
return {
"success": True,
"report_path": str(save_path),
"format": fmt,
"executions_included": len(executions),
"artifacts_included": len(artifacts),
"content": content,
}
except Exception as exception:
message: str = f"Failed to generate report: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_reports() -> dict[str, Any]:
"""List all generated reports for the current project.
Reports are stored in `.fuzzforge/reports/` and are ordered newest-first.
:return: List of report files with filename, path, size, and creation time.
"""
storage = get_storage()
project_path = get_project_path()
try:
reports = storage.list_reports(project_path)
return {
"success": True,
"reports": reports,
"count": len(reports),
}
except Exception as exception:
message: str = f"Failed to list reports: {exception}"
raise ToolError(message) from exception

View File

@@ -1,3 +1,3 @@
# fuzzforge-tests
# SecPipe Tests
Common test utilities and fixtures for FuzzForge packages.
Common test utilities and fixtures for SecPipe packages.

View File

@@ -1,6 +1,6 @@
[project]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md"
requires-python = ">=3.14"

4
uv.lock generated
View File

@@ -432,6 +432,7 @@ dependencies = [
{ name = "fuzzforge-common" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyyaml" },
{ name = "structlog" },
]
@@ -460,6 +461,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" },
{ name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" },
{ name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" },
{ name = "structlog", specifier = "==25.5.0" },
]
@@ -467,7 +469,7 @@ provides-extras = ["lints", "tests"]
[[package]]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
source = { virtual = "." }
dependencies = [
{ name = "fuzzforge-cli" },