Compare commits

...

4 Commits

Author SHA1 Message Date
AFredefon
0d410bd5b4 feat: implement report generation 2026-04-07 16:25:36 +02:00
AFredefon
d3a20b3846 feat: implement skill packs system 2026-04-07 16:12:14 +02:00
AFredefon
664278da3f feat: implement artifact management tools 2026-04-07 16:06:47 +02:00
AFredefon
9374fd3aee feat: implement workflow suggestions pipeline 2026-04-07 01:50:21 +02:00
15 changed files with 1708 additions and 29 deletions

View File

@@ -294,3 +294,17 @@ class HubConfig(BaseModel):
default=True,
description="Cache discovered tools",
)
#: Workflow hints indexed by "after:<tool_name>" keys.
#: Loaded inline or merged from workflow_hints_file.
workflow_hints: dict[str, Any] = Field(
default_factory=dict,
description="Workflow hints indexed by 'after:<tool_name>'",
)
#: Optional path to an external workflow-hints.json file.
#: Relative paths are resolved relative to the hub-config.json location.
workflow_hints_file: str | None = Field(
default=None,
description="Path to an external workflow-hints.json to load and merge",
)

View File

@@ -87,6 +87,28 @@ class HubRegistry:
config=server_config,
)
# Load and merge external workflow hints file if specified.
if self._config.workflow_hints_file:
hints_path = Path(self._config.workflow_hints_file)
if not hints_path.is_absolute():
hints_path = config_path.parent / hints_path
if hints_path.exists():
try:
with hints_path.open() as hf:
hints_data = json.load(hf)
self._config.workflow_hints.update(hints_data.get("hints", {}))
logger.info(
"Loaded workflow hints",
path=str(hints_path),
hints=len(self._config.workflow_hints),
)
except Exception as hints_err:
logger.warning(
"Failed to load workflow hints file",
path=str(hints_path),
error=str(hints_err),
)
logger.info(
"Loaded hub configuration",
path=str(config_path),
@@ -218,6 +240,15 @@ class HubRegistry:
server.discovery_error = None
server.tools = tools
def get_workflow_hint(self, tool_name: str) -> dict | None:
"""Get the workflow hint for a tool by name.
:param tool_name: Tool name (e.g. ``binwalk_extract``).
:returns: Hint dict for the ``after:<tool_name>`` key, or None.
"""
return self._config.workflow_hints.get(f"after:{tool_name}") or None
def get_all_tools(self) -> list:
"""Get all discovered tools from all servers.

View File

@@ -10,6 +10,7 @@ dependencies = [
"fuzzforge-common==0.0.1",
"pydantic==2.12.4",
"pydantic-settings==2.12.0",
"pyyaml>=6.0",
"structlog==25.5.0",
]

View File

@@ -53,40 +53,30 @@ Typical workflow:
4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool`
Skill packs:
Use `list_skills` to see available analysis pipelines (e.g. firmware-analysis).
Load one with `load_skill("firmware-analysis")` to get domain-specific guidance
and a scoped list of relevant hub servers. Skill packs describe the methodology —
follow the pipeline steps while adapting to what you find at each stage.
Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools.
Artifact tracking:
After each `execute_hub_tool` call, new output files are automatically tracked.
Use `list_artifacts` to find files produced by previous tools instead of parsing
paths from tool output text. Filter by source server or file type.
File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
Stateful tools:
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch
- Some tools require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
Firmware analysis pipeline (when analyzing firmware images):
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
Run steps 3 and 4 outputs feed into a final triage summary.
radare2-mcp agent context (upstream tool — no embedded context):
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
returns an empty result, fall back to `list_symbols` and search for dangerous function names
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
- For string extraction, use `run_command` with `iz` (data section strings).
The `list_all_strings` tool may return garbled output for large binaries.
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
""",
lifespan=lifespan,
)

View File

@@ -3,7 +3,7 @@
from __future__ import annotations
from pathlib import Path
from typing import TYPE_CHECKING, cast
from typing import TYPE_CHECKING, Any, cast
from fastmcp.server.dependencies import get_context
@@ -21,6 +21,9 @@ _current_project_path: Path | None = None
# Singleton storage instance
_storage: LocalStorage | None = None
# Currently loaded skill pack (set by load_skill)
_active_skill: dict[str, Any] | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
@@ -75,3 +78,22 @@ def get_storage() -> LocalStorage:
settings = get_settings()
_storage = LocalStorage(settings.storage.path)
return _storage
def set_active_skill(skill: dict[str, Any] | None) -> None:
"""Set (or clear) the currently loaded skill pack.
:param skill: Parsed skill dict, or None to unload.
"""
global _active_skill
_active_skill = skill
def get_active_skill() -> dict[str, Any] | None:
"""Get the currently loaded skill pack.
:return: Active skill dict, or None if no skill is loaded.
"""
return _active_skill

View File

@@ -0,0 +1,44 @@
name: firmware-analysis
description: |
## Firmware Binary Vulnerability Analysis
Goal: Find exploitable vulnerabilities in firmware images.
### Pipeline
1. **Extract the filesystem** from the firmware image.
Look for SquashFS, JFFS2, CPIO, or other embedded filesystems.
2. **Scan extracted files for vulnerability patterns.**
Use vulnerability-focused rules to identify binaries with dangerous
function calls (system, strcpy, popen, sprintf, gets).
Prioritize targets by match count — the binary with the most hits
is the highest-priority target.
3. **Deep-analyze the highest-priority binary.**
Open a persistent analysis session. Look for:
- Dangerous function calls with unsanitized input
- Hardcoded credentials or backdoor strings
- Network service listeners with weak input validation
Focus on confirming whether flagged patterns are actually reachable.
4. **Search for known CVEs** matching library version strings found
during analysis. Cross-reference with public exploit databases.
5. **Compile findings** with severity ratings:
- CRITICAL: confirmed remote code execution paths
- HIGH: command injection or buffer overflow with reachable input
- MEDIUM: hardcoded credentials, weak crypto, format string issues
- LOW: informational findings (library versions, service fingerprints)
### Key files to prioritize in extracted firmware
- `usr/sbin/httpd`, `usr/bin/httpd` — web servers (high-priority)
- `etc/shadow`, `etc/passwd` — credential files
- `www/cgi-bin/*` — CGI scripts (command injection vectors)
- Custom binaries in `usr/sbin/`, `usr/bin/` — vendor attack surface
servers:
- binwalk-mcp
- yara-mcp
- radare2-mcp
- searchsploit-mcp

View File

@@ -0,0 +1,90 @@
name: go-fuzzing
description: |
## Go Fuzzing Vulnerability Discovery
Goal: Find memory safety bugs, panics, and logic errors in a Go project
using native Go fuzzing (go test -fuzz).
### Pipeline
1. **Analyze the Go project** to understand its attack surface.
Use `go_analyze` to scan the codebase and identify:
- Fuzzable entry points: functions accepting `[]byte`, `string`,
`io.Reader`, or other parser-like signatures (`Parse*`, `Decode*`,
`Unmarshal*`, `Read*`, `Open*`)
- Existing `Fuzz*` test functions already in `*_test.go` files
- Unsafe/cgo usage that increases the severity of any bugs found
- Known CVEs via govulncheck (enable with `run_vulncheck: true`)
If there are **no existing Fuzz targets**, stop here and report
that the project needs fuzz harnesses written first, listing the
recommended entry points from the analysis.
2. **Test harness quality** before committing to a long fuzzing campaign.
Use `go_harness_test` to evaluate each Fuzz* function:
- Compilation check — does `go test -c` succeed?
- Seed execution — do the seed corpus entries pass without panics?
- Short fuzzing trial — does the harness sustain fuzzing for 15-30s?
- Quality score (0-100): ≥80 = production-ready, ≥50 = needs work, <50 = broken
**Decision point:**
- If all harnesses are **broken** (score < 50): stop and report issues.
The user needs to fix them before fuzzing is useful.
- If some are **production-ready** or **needs-improvement** (score ≥ 50):
proceed with those targets to step 3.
- Skip broken harnesses — do not waste fuzzing time on them.
3. **Run fuzzing** on the viable targets.
Use `go_fuzz_run` for a bounded campaign:
- Set `duration` based on project size: 60-120s for quick scan,
300-600s for thorough analysis.
- Pass only the targets that scored ≥ 50 in step 2 via the `targets`
parameter — do not fuzz broken harnesses.
- The fuzzer collects crash inputs to `/app/output/crashes/{FuzzName}/`.
**Alternative — continuous mode** for deeper exploration:
- Use `go_fuzz_start` to begin background fuzzing.
- Periodically check `go_fuzz_status` to monitor progress.
- Use `go_fuzz_stop` when satisfied or when crashes are found.
If **no crashes** are found after a reasonable duration, report that
the fuzzing campaign completed cleanly with the execution metrics.
4. **Analyze crashes** found during fuzzing.
Use `go_crash_analyze` to process the crash inputs:
- Reproduction: re-run each crash input to confirm it's real
- Classification: categorize by type (nil-dereference, index-out-of-range,
slice-bounds, divide-by-zero, stack-overflow, data-race, panic, etc.)
- Severity assignment: critical / high / medium / low
- Deduplication: group crashes by signature (target + type + top 3 frames)
Skip this step if no crashes were found in step 3.
5. **Compile the vulnerability report** with findings organized by severity:
- **CRITICAL**: nil-dereference, segfault, data-race, stack-overflow
- **HIGH**: index/slice out of bounds, allocation overflow
- **MEDIUM**: integer overflow, divide by zero, explicit panics
- **LOW**: timeout, unclassified crashes
For each unique crash, include:
- The fuzz target that triggered it
- The crash type and root cause function + file + line
- Whether it was reproducible
- The crash input file path for manual investigation
### What the user's project needs
- A `go.mod` file (any Go module)
- At least one `*_test.go` file with `func FuzzXxx(f *testing.F)` functions
- Seed corpus entries added via `f.Add(...)` in the Fuzz functions
### Interpretation guide
- **govulncheck CVEs** (step 1) are known dependency vulnerabilities — report separately
- **Fuzzer crashes** (steps 3-4) are new bugs found by fuzzing the project's own code
- High execution counts with zero crashes = good sign (code is robust to that input space)
- Low quality scores in step 2 usually mean the harness needs better seed corpus or input handling
servers:
- go-analyzer-mcp
- go-harness-tester-mcp
- go-fuzzer-mcp
- go-crash-analyzer-mcp

View File

@@ -13,12 +13,15 @@ from __future__ import annotations
import json
import logging
import mimetypes
from datetime import UTC, datetime
from pathlib import Path
from tarfile import open as Archive # noqa: N812
from typing import Any
from uuid import uuid4
import yaml
logger = logging.getLogger("fuzzforge-mcp")
#: Name of the FuzzForge storage directory within projects.
@@ -90,6 +93,7 @@ class LocalStorage:
"# FuzzForge storage - ignore large/temporary files\n"
"runs/\n"
"output/\n"
"artifacts.json\n"
"!config.json\n"
)
@@ -273,3 +277,377 @@ class LocalStorage:
except Exception as exc:
msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc
# ------------------------------------------------------------------
# Artifact tracking
# ------------------------------------------------------------------
def _artifacts_path(self, project_path: Path) -> Path:
"""Get the path to the artifacts registry file.
:param project_path: Path to the project directory.
:returns: Path to artifacts.json.
"""
return self._get_project_path(project_path) / "artifacts.json"
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
"""Load the artifact registry from disk.
:param project_path: Path to the project directory.
:returns: List of artifact dicts.
"""
path = self._artifacts_path(project_path)
if path.exists():
try:
return json.loads(path.read_text()) # type: ignore[no-any-return]
except (json.JSONDecodeError, OSError):
return []
return []
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
"""Persist the artifact registry to disk.
:param project_path: Path to the project directory.
:param artifacts: Full artifact list to write.
"""
path = self._artifacts_path(project_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(artifacts, indent=2, default=str))
def _classify_file(self, file_path: Path) -> str:
"""Classify a file into a human-friendly type string.
:param file_path: Path to the file.
:returns: Type string (e.g. "elf-binary", "text", "directory").
"""
mime, _ = mimetypes.guess_type(str(file_path))
suffix = file_path.suffix.lower()
# Try reading ELF magic for binaries with no extension
if mime is None and suffix == "":
try:
header = file_path.read_bytes()[:4]
if header == b"\x7fELF":
return "elf-binary"
except OSError:
pass
if mime:
if "json" in mime:
return "json"
if "text" in mime or "xml" in mime or "yaml" in mime:
return "text"
if "image" in mime:
return "image"
if "octet-stream" in mime:
return "binary"
type_map: dict[str, str] = {
".json": "json",
".sarif": "sarif",
".md": "markdown",
".txt": "text",
".log": "text",
".csv": "csv",
".yaml": "yaml",
".yml": "yaml",
".xml": "xml",
".html": "html",
".elf": "elf-binary",
".so": "elf-binary",
".bin": "binary",
".gz": "archive",
".tar": "archive",
".zip": "archive",
}
return type_map.get(suffix, "binary")
def scan_artifacts(
self,
project_path: Path,
server_name: str,
tool_name: str,
) -> list[dict[str, Any]]:
"""Scan the output directory for new or modified files and register them.
Compares the current state of .fuzzforge/output/ against the existing
artifact registry and registers any new or modified files.
:param project_path: Path to the project directory.
:param server_name: Hub server that produced the artifacts.
:param tool_name: Tool that produced the artifacts.
:returns: List of newly registered artifact dicts.
"""
output_path = self.get_project_output_path(project_path)
if output_path is None or not output_path.exists():
return []
existing = self._load_artifacts(project_path)
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
now = datetime.now(tz=UTC).isoformat()
new_artifacts: list[dict[str, Any]] = []
for file_path in output_path.rglob("*"):
if not file_path.is_file():
continue
# Use the container-style path (/app/output/...) so it's
# directly usable in subsequent tool calls.
relative = file_path.relative_to(output_path)
container_path = f"/app/output/{relative}"
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
prev = known.get(container_path)
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
continue # Unchanged — skip
artifact: dict[str, Any] = {
"path": container_path,
"host_path": str(file_path),
"type": self._classify_file(file_path),
"size": size,
"mtime": mtime,
"source_server": server_name,
"source_tool": tool_name,
"registered_at": now,
}
if prev:
# Update existing entry in-place
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
existing[idx] = artifact
else:
existing.append(artifact)
new_artifacts.append(artifact)
if new_artifacts:
self._save_artifacts(project_path, existing)
logger.info(
"Registered %d new artifact(s) from %s:%s",
len(new_artifacts),
server_name,
tool_name,
)
return new_artifacts
def list_artifacts(
self,
project_path: Path,
*,
source: str | None = None,
artifact_type: str | None = None,
) -> list[dict[str, Any]]:
"""List registered artifacts, with optional filters.
:param project_path: Path to the project directory.
:param source: Filter by source server name.
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
:returns: List of matching artifact dicts.
"""
artifacts = self._load_artifacts(project_path)
if source:
artifacts = [a for a in artifacts if a.get("source_server") == source]
if artifact_type:
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
return artifacts
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
"""Get a single artifact by its container path.
:param project_path: Path to the project directory.
:param path: Container path of the artifact (e.g. /app/output/...).
:returns: Artifact dict, or None if not found.
"""
artifacts = self._load_artifacts(project_path)
for artifact in artifacts:
if artifact["path"] == path:
return artifact
return None
# ------------------------------------------------------------------
# Reports
# ------------------------------------------------------------------
def list_execution_metadata(self, project_path: Path) -> list[dict[str, Any]]:
"""Load full execution metadata for all runs, sorted oldest-first.
:param project_path: Path to the project directory.
:returns: List of full metadata dicts (includes arguments, result).
"""
runs_dir = self._get_project_path(project_path) / "runs"
if not runs_dir.exists():
return []
metadata: list[dict[str, Any]] = []
for run_dir in sorted(runs_dir.iterdir()):
if not run_dir.is_dir():
continue
meta_path = run_dir / "metadata.json"
if meta_path.exists():
try:
metadata.append(json.loads(meta_path.read_text()))
except (json.JSONDecodeError, OSError):
continue
return metadata
def save_report(
self,
project_path: Path,
content: str,
fmt: str = "markdown",
) -> Path:
"""Save a generated report to .fuzzforge/reports/.
:param project_path: Path to the project directory.
:param content: Report content string.
:param fmt: Format name used to choose file extension.
:returns: Path to the saved report file.
"""
reports_dir = self._get_project_path(project_path) / "reports"
reports_dir.mkdir(parents=True, exist_ok=True)
ext_map = {"markdown": "md", "json": "json", "sarif": "sarif"}
ext = ext_map.get(fmt, "md")
filename = f"{datetime.now(tz=UTC).strftime('%Y%m%dT%H%M%SZ')}_report.{ext}"
report_path = reports_dir / filename
report_path.write_text(content)
logger.info("Saved report: %s", report_path)
return report_path
def list_reports(self, project_path: Path) -> list[dict[str, Any]]:
"""List generated reports for a project, newest first.
:param project_path: Path to the project directory.
:returns: List of report dicts with filename, host_path, size, created_at.
"""
reports_dir = self._get_project_path(project_path) / "reports"
if not reports_dir.exists():
return []
reports: list[dict[str, Any]] = []
for report_path in sorted(reports_dir.iterdir(), reverse=True):
if report_path.is_file():
stat = report_path.stat()
reports.append({
"filename": report_path.name,
"host_path": str(report_path),
"size": stat.st_size,
"created_at": datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat(),
})
return reports
# ------------------------------------------------------------------
# Skill packs
# ------------------------------------------------------------------
#: Directory containing built-in skill packs shipped with FuzzForge.
_BUILTIN_SKILLS_DIR: Path = Path(__file__).parent / "skills"
def _skill_dirs(self, project_path: Path) -> list[Path]:
"""Return skill directories in priority order (project-local first).
:param project_path: Path to the project directory.
:returns: List of directories that may contain skill YAML files.
"""
dirs: list[Path] = []
project_skills = self._get_project_path(project_path) / "skills"
if project_skills.is_dir():
dirs.append(project_skills)
if self._BUILTIN_SKILLS_DIR.is_dir():
dirs.append(self._BUILTIN_SKILLS_DIR)
return dirs
def list_skills(self, project_path: Path) -> list[dict[str, Any]]:
"""List available skill packs from project and built-in directories.
:param project_path: Path to the project directory.
:returns: List of skill summaries (name, description first line, source).
"""
seen: set[str] = set()
skills: list[dict[str, Any]] = []
for skill_dir in self._skill_dirs(project_path):
for yaml_path in sorted(skill_dir.glob("*.yaml")):
skill = self._parse_skill_file(yaml_path)
if skill is None:
continue
name = skill["name"]
if name in seen:
continue # project-local overrides built-in
seen.add(name)
desc = skill.get("description", "")
first_line = desc.strip().split("\n", 1)[0] if desc else ""
is_project = ".fuzzforge" in str(yaml_path.parent)
source = "project" if is_project else "builtin"
skills.append({
"name": name,
"summary": first_line,
"source": source,
"servers": skill.get("servers", []),
})
return skills
def load_skill(self, project_path: Path, name: str) -> dict[str, Any] | None:
"""Load a skill pack by name.
Searches project-local skills first, then built-in skills.
:param project_path: Path to the project directory.
:param name: Skill name (filename without .yaml extension).
:returns: Parsed skill dict with name, description, servers — or None.
"""
for skill_dir in self._skill_dirs(project_path):
yaml_path = skill_dir / f"{name}.yaml"
if yaml_path.is_file():
return self._parse_skill_file(yaml_path)
return None
@staticmethod
def _parse_skill_file(yaml_path: Path) -> dict[str, Any] | None:
"""Parse and validate a skill YAML file.
:param yaml_path: Path to the YAML file.
:returns: Parsed skill dict, or None if invalid.
"""
try:
data = yaml.safe_load(yaml_path.read_text())
except (yaml.YAMLError, OSError):
logger.warning("Failed to parse skill file: %s", yaml_path)
return None
if not isinstance(data, dict):
return None
name = data.get("name")
if not name or not isinstance(name, str):
logger.warning("Skill file missing 'name': %s", yaml_path)
return None
return {
"name": name,
"description": data.get("description", ""),
"servers": data.get("servers", []),
}

View File

@@ -2,12 +2,13 @@
from fastmcp import FastMCP
from fuzzforge_mcp.tools import hub, projects
from fuzzforge_mcp.tools import hub, projects, reports
mcp: FastMCP = FastMCP()
mcp.mount(projects.mcp)
mcp.mount(hub.mcp)
mcp.mount(reports.mcp)
__all__ = [
"mcp",

View File

@@ -291,7 +291,33 @@ async def execute_hub_tool(
except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues
pass
return result.to_dict()
# Scan for new artifacts produced by the tool in /app/output.
response = result.to_dict()
try:
storage = get_storage()
project_path = get_project_path()
new_artifacts = storage.scan_artifacts(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
)
if new_artifacts:
response["artifacts"] = [
{"path": a["path"], "type": a["type"], "size": a["size"]}
for a in new_artifacts
]
except Exception: # noqa: BLE001, S110 - never fail the tool call due to artifact scanning
pass
# Append workflow suggestions based on hints configured for this tool.
try:
hint = executor.registry.get_workflow_hint(result.tool_name)
if hint:
response["suggested_next_steps"] = hint
except Exception: # noqa: BLE001, S110 - never fail the tool call due to hint lookup
pass
return response
except Exception as e:
if isinstance(e, ToolError):

View File

@@ -8,7 +8,13 @@ from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
from fuzzforge_mcp.dependencies import (
get_active_skill,
get_project_path,
get_storage,
set_active_skill,
set_current_project_path,
)
mcp: FastMCP = FastMCP()
@@ -146,3 +152,166 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
except Exception as exception:
message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_artifacts(
source: str | None = None,
artifact_type: str | None = None,
) -> dict[str, Any]:
"""List all artifacts produced by hub tools in the current project.
Artifacts are files created by tool executions in /app/output/.
They are automatically tracked after each execute_hub_tool call.
:param source: Filter by source server name (e.g. "binwalk-mcp").
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
:return: List of artifacts with path, type, size, and source info.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifacts = storage.list_artifacts(
project_path,
source=source,
artifact_type=artifact_type,
)
return {
"success": True,
"artifacts": artifacts,
"count": len(artifacts),
}
except Exception as exception:
message: str = f"Failed to list artifacts: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_artifact(path: str) -> dict[str, Any]:
"""Get metadata for a specific artifact by its container path.
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
:return: Artifact metadata including path, type, size, source tool, and timestamps.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifact = storage.get_artifact(project_path, path)
if artifact is None:
return {
"success": False,
"path": path,
"error": "Artifact not found",
}
return {
"success": True,
"artifact": artifact,
}
except Exception as exception:
message: str = f"Failed to get artifact: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_skills() -> dict[str, Any]:
"""List available skill packs.
Skill packs provide domain-specific pipeline guidance for AI agents.
They describe analysis methodologies and list the hub servers needed.
Project-local skills (.fuzzforge/skills/) override built-in skills.
:return: List of available skills with name, summary, and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skills = storage.list_skills(project_path)
active = get_active_skill()
return {
"success": True,
"skills": skills,
"count": len(skills),
"active_skill": active["name"] if active else None,
}
except Exception as exception:
message: str = f"Failed to list skills: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def load_skill(name: str) -> dict[str, Any]:
"""Load a skill pack to guide the current analysis session.
A skill pack provides a pipeline description and specifies which
hub servers are relevant. Once loaded, the pipeline description
is available as context and only the listed servers need discovery.
:param name: Skill name (e.g. "firmware-analysis").
:return: Loaded skill with full description and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skill = storage.load_skill(project_path, name)
if skill is None:
return {
"success": False,
"name": name,
"error": f"Skill '{name}' not found. Use list_skills to see available skills.",
}
set_active_skill(skill)
return {
"success": True,
"name": skill["name"],
"description": skill["description"],
"servers": skill["servers"],
"message": f"Skill '{name}' loaded. Follow the pipeline description above. "
f"Discover tools from: {', '.join(skill['servers'])}" if skill["servers"] else f"Skill '{name}' loaded.",
}
except Exception as exception:
message: str = f"Failed to load skill: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def unload_skill() -> dict[str, Any]:
"""Unload the currently active skill pack.
Clears the active pipeline guidance. The agent returns to generic mode.
:return: Confirmation of unload.
"""
active = get_active_skill()
if active is None:
return {
"success": True,
"message": "No skill was loaded.",
}
name = active["name"]
set_active_skill(None)
return {
"success": True,
"message": f"Skill '{name}' unloaded.",
}

View File

@@ -0,0 +1,346 @@
"""Report generation tools for FuzzForge MCP."""
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from fastmcp import FastMCP
from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage
mcp: FastMCP = FastMCP()
# Maximum characters of tool output to embed per execution in markdown reports.
_OUTPUT_TRUNCATE_CHARS: int = 2000
# ------------------------------------------------------------------
# Formatting helpers
# ------------------------------------------------------------------
def _format_size(size: int) -> str:
"""Format a byte count as a human-friendly string."""
for unit in ("B", "KB", "MB", "GB"):
if size < 1024: # noqa: PLR2004
return f"{size} {unit}" if unit == "B" else f"{size:.1f} {unit}"
size //= 1024
return f"{size:.1f} TB"
def _truncate(text: str, max_chars: int = _OUTPUT_TRUNCATE_CHARS) -> str:
"""Truncate text and append an indicator when truncated."""
if len(text) <= max_chars:
return text
omitted = len(text) - max_chars
return text[:max_chars] + f"\n... [{omitted} chars omitted]"
def _extract_output_text(result: dict[str, Any]) -> str:
"""Extract a human-readable output string from an execution result dict.
Handles both flat dicts (``{"output": "..."}`` or ``{"content": [...]}``),
and the nested format stored by ``record_execution`` where the MCP tool
response is stored one level deeper under the ``"result"`` key.
"""
# Flat output field (most hub tools set this)
output = result.get("output", "")
if output and isinstance(output, str):
return output
# MCP content list format — check both at this level and one level down
for candidate in (result, result.get("result") or {}):
content = candidate.get("content", [])
if isinstance(content, list):
texts = [item.get("text", "") for item in content if isinstance(item, dict)]
combined = "\n".join(t for t in texts if t)
if combined:
return combined
parts: list[str] = []
if result.get("stdout"):
parts.append(f"stdout:\n{result['stdout']}")
if result.get("stderr"):
parts.append(f"stderr:\n{result['stderr']}")
return "\n".join(parts)
# ------------------------------------------------------------------
# Report builders
# ------------------------------------------------------------------
def _report_header(
title: str,
project_path: Path,
assets_path: Path | None,
now: str,
) -> list[str]:
"""Build the header block of the Markdown report."""
lines = [
f"# {title}",
"",
f"**Generated:** {now} ",
f"**Project:** `{project_path}` ",
]
if assets_path:
lines.append(f"**Assets:** `{assets_path}` ")
lines += ["", "---", ""]
return lines
def _report_summary(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the summary table block of the Markdown report."""
success_count = sum(1 for e in executions if e.get("success"))
fail_count = len(executions) - success_count
tool_ids = list(dict.fromkeys(
f"{e.get('server', '?')}:{e.get('tool', '?')}" for e in executions
))
timestamps = [e["timestamp"] for e in executions if e.get("timestamp")]
lines = [
"## Summary",
"",
"| Metric | Value |",
"|--------|-------|",
f"| Total executions | {len(executions)} |",
f"| Successful | {success_count} |",
f"| Failed | {fail_count} |",
f"| Artifacts produced | {len(artifacts)} |",
f"| Unique tools | {len(set(tool_ids))} |",
]
if len(timestamps) >= 2: # noqa: PLR2004
lines.append(f"| Time range | {timestamps[0]}{timestamps[-1]} |")
elif timestamps:
lines.append(f"| Time | {timestamps[0]} |")
lines.append("")
if tool_ids:
lines += [", ".join(f"`{t}`" for t in tool_ids), ""]
lines[-2] = f"**Tools used:** {lines[-2]}"
lines += ["---", ""]
return lines
def _report_timeline(
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> list[str]:
"""Build the execution timeline block of the Markdown report."""
if not executions:
return []
lines: list[str] = ["## Execution Timeline", ""]
for idx, meta in enumerate(executions, 1):
server = meta.get("server", "unknown")
tool = meta.get("tool", "unknown")
ts = meta.get("timestamp", "")
status = "✓ Success" if meta.get("success") else "✗ Failed"
lines.append(f"### [{idx}] {server} :: {tool}{ts}")
lines += ["", f"- **Status:** {status}"]
arguments = meta.get("arguments") or {}
if arguments:
lines.append("- **Arguments:**")
for k, v in arguments.items():
lines.append(f" - `{k}`: `{v}`")
result = meta.get("result") or {}
output_text = _extract_output_text(result).strip()
if output_text:
truncated = _truncate(output_text)
lines += ["- **Output:**", " ```"]
lines.extend(f" {line}" for line in truncated.splitlines())
lines.append(" ```")
exec_artifacts = [
a for a in artifacts
if a.get("source_server") == server and a.get("source_tool") == tool
]
if exec_artifacts:
lines.append(f"- **Artifacts produced:** {len(exec_artifacts)} file(s)")
lines.append("")
return lines
def _report_artifacts(artifacts: list[dict[str, Any]]) -> list[str]:
"""Build the artifacts section of the Markdown report."""
if not artifacts:
return []
lines: list[str] = ["---", "", "## Artifacts", "", f"**{len(artifacts)} file(s) total**", ""]
by_type: dict[str, list[dict[str, Any]]] = {}
for a in artifacts:
by_type.setdefault(a.get("type", "unknown"), []).append(a)
for art_type, arts in sorted(by_type.items()):
lines += [
f"### {art_type} ({len(arts)})",
"",
"| Path | Size | Source |",
"|------|------|--------|",
]
for a in arts:
path = a.get("path", "")
size = _format_size(a.get("size", 0))
source = f"`{a.get('source_server', '?')}:{a.get('source_tool', '?')}`"
lines.append(f"| `{path}` | {size} | {source} |")
lines.append("")
return lines
def _build_markdown_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a Markdown-formatted analysis report."""
now = datetime.now(tz=UTC).strftime("%Y-%m-%d %H:%M:%S UTC")
lines: list[str] = (
_report_header(title, project_path, assets_path, now)
+ _report_summary(executions, artifacts)
+ _report_timeline(executions, artifacts)
+ _report_artifacts(artifacts)
+ ["---", "", "*Generated by FuzzForge*", ""]
)
return "\n".join(lines)
def _build_json_report(
title: str,
project_path: Path,
assets_path: Path | None,
executions: list[dict[str, Any]],
artifacts: list[dict[str, Any]],
) -> str:
"""Build a JSON-formatted analysis report."""
success_count = sum(1 for e in executions if e.get("success"))
report = {
"title": title,
"generated_at": datetime.now(tz=UTC).isoformat(),
"project_path": str(project_path),
"assets_path": str(assets_path) if assets_path else None,
"summary": {
"total_executions": len(executions),
"successful": success_count,
"failed": len(executions) - success_count,
"artifact_count": len(artifacts),
},
"executions": executions,
"artifacts": artifacts,
}
return json.dumps(report, indent=2, default=str)
def _write_to_path(content: str, path: Path) -> None:
"""Write report content to an explicit output path (sync helper)."""
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content)
# ------------------------------------------------------------------
# MCP tools
# ------------------------------------------------------------------
@mcp.tool
async def generate_report(
title: str | None = None,
report_format: str = "markdown",
output_path: str | None = None,
) -> dict[str, Any]:
"""Generate a comprehensive analysis report for the current project.
Aggregates all execution history, tool outputs, and tracked artifacts
into a structured report. The report is saved to `.fuzzforge/reports/`
and its content is returned so the agent can read it immediately.
:param title: Optional report title. Defaults to the project folder name.
:param report_format: Output format — ``"markdown"`` (default) or ``"json"``.
:param output_path: Optional absolute path to save the report. When omitted,
the report is saved automatically to `.fuzzforge/reports/`.
:return: Report content, save path, and counts of included items.
"""
storage = get_storage()
project_path = get_project_path()
try:
fmt = report_format.lower().strip()
if fmt not in ("markdown", "json"):
return {
"success": False,
"error": f"Unsupported format '{fmt}'. Use 'markdown' or 'json'.",
}
executions = storage.list_execution_metadata(project_path)
artifacts = storage.list_artifacts(project_path)
assets_path = storage.get_project_assets_path(project_path)
resolved_title = title or f"FuzzForge Analysis Report — {project_path.name}"
if fmt == "json":
content = _build_json_report(
resolved_title, project_path, assets_path, executions, artifacts
)
else:
content = _build_markdown_report(
resolved_title, project_path, assets_path, executions, artifacts
)
if output_path:
save_path = Path(output_path)
_write_to_path(content, save_path)
else:
save_path = storage.save_report(project_path, content, fmt)
return {
"success": True,
"report_path": str(save_path),
"format": fmt,
"executions_included": len(executions),
"artifacts_included": len(artifacts),
"content": content,
}
except Exception as exception:
message: str = f"Failed to generate report: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_reports() -> dict[str, Any]:
"""List all generated reports for the current project.
Reports are stored in `.fuzzforge/reports/` and are ordered newest-first.
:return: List of report files with filename, path, size, and creation time.
"""
storage = get_storage()
project_path = get_project_path()
try:
reports = storage.list_reports(project_path)
return {
"success": True,
"reports": reports,
"count": len(reports),
}
except Exception as exception:
message: str = f"Failed to list reports: {exception}"
raise ToolError(message) from exception

View File

@@ -1 +1,566 @@
{"servers": []}
{
"servers": [
{
"name": "bloodhound-mcp",
"description": "bloodhound-mcp \u2014 active-directory",
"type": "docker",
"image": "bloodhound-mcp:latest",
"category": "active-directory",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "binwalk-mcp",
"description": "binwalk-mcp \u2014 binary-analysis",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "capa-mcp",
"description": "capa-mcp \u2014 binary-analysis",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ghidra-mcp",
"description": "ghidra-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ida-mcp",
"description": "ida-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ida-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "radare2-mcp",
"description": "radare2-mcp \u2014 binary-analysis",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "yara-mcp",
"description": "yara-mcp \u2014 binary-analysis",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "daml-viewer-mcp",
"description": "daml-viewer-mcp \u2014 blockchain",
"type": "docker",
"image": "daml-viewer-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "medusa-mcp",
"description": "medusa-mcp \u2014 blockchain",
"type": "docker",
"image": "medusa-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "solazy-mcp",
"description": "solazy-mcp \u2014 blockchain",
"type": "docker",
"image": "solazy-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "prowler-mcp",
"description": "prowler-mcp \u2014 cloud-security",
"type": "docker",
"image": "prowler-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "roadrecon-mcp",
"description": "roadrecon-mcp \u2014 cloud-security",
"type": "docker",
"image": "roadrecon-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "trivy-mcp",
"description": "trivy-mcp \u2014 cloud-security",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "semgrep-mcp",
"description": "semgrep-mcp \u2014 code-security",
"type": "docker",
"image": "semgrep-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "searchsploit-mcp",
"description": "searchsploit-mcp \u2014 exploitation",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "boofuzz-mcp",
"description": "boofuzz-mcp \u2014 fuzzing",
"type": "docker",
"image": "boofuzz-mcp:latest",
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "dharma-mcp",
"description": "dharma-mcp \u2014 fuzzing",
"type": "docker",
"image": "dharma-mcp:latest",
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "dnstwist-mcp",
"description": "dnstwist-mcp \u2014 osint",
"type": "docker",
"image": "dnstwist-mcp:latest",
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "maigret-mcp",
"description": "maigret-mcp \u2014 osint",
"type": "docker",
"image": "maigret-mcp:latest",
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "hashcat-mcp",
"description": "hashcat-mcp \u2014 password-cracking",
"type": "docker",
"image": "hashcat-mcp:latest",
"category": "password-cracking",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "externalattacker-mcp",
"description": "externalattacker-mcp \u2014 reconnaissance",
"type": "docker",
"image": "externalattacker-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "masscan-mcp",
"description": "masscan-mcp \u2014 reconnaissance",
"type": "docker",
"image": "masscan-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "networksdb-mcp",
"description": "networksdb-mcp \u2014 reconnaissance",
"type": "docker",
"image": "networksdb-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nmap-mcp",
"description": "nmap-mcp \u2014 reconnaissance",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "pd-tools-mcp",
"description": "pd-tools-mcp \u2014 reconnaissance",
"type": "docker",
"image": "pd-tools-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "shodan-mcp",
"description": "shodan-mcp \u2014 reconnaissance",
"type": "docker",
"image": "shodan-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "whatweb-mcp",
"description": "whatweb-mcp \u2014 reconnaissance",
"type": "docker",
"image": "whatweb-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "zoomeye-mcp",
"description": "zoomeye-mcp \u2014 reconnaissance",
"type": "docker",
"image": "zoomeye-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "gitleaks-mcp",
"description": "gitleaks-mcp \u2014 secrets",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "otx-mcp",
"description": "otx-mcp \u2014 threat-intel",
"type": "docker",
"image": "otx-mcp:latest",
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "virustotal-mcp",
"description": "virustotal-mcp \u2014 threat-intel",
"type": "docker",
"image": "virustotal-mcp:latest",
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "burp-mcp",
"description": "burp-mcp \u2014 web-security",
"type": "docker",
"image": "burp-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ffuf-mcp",
"description": "ffuf-mcp \u2014 web-security",
"type": "docker",
"image": "ffuf-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nikto-mcp",
"description": "nikto-mcp \u2014 web-security",
"type": "docker",
"image": "nikto-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nuclei-mcp",
"description": "nuclei-mcp \u2014 web-security",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "sqlmap-mcp",
"description": "sqlmap-mcp \u2014 web-security",
"type": "docker",
"image": "sqlmap-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "waybackurls-mcp",
"description": "waybackurls-mcp \u2014 web-security",
"type": "docker",
"image": "waybackurls-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-analyzer-mcp",
"description": "Go static analysis: fuzzable entry points, existing Fuzz* targets, unsafe/cgo usage, CVE scanning via govulncheck",
"type": "docker",
"image": "go-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-harness-tester-mcp",
"description": "Test Go fuzz harness quality: compilation, seed execution, fuzzing trial, quality scoring 0-100",
"type": "docker",
"image": "go-harness-tester-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-fuzzer-mcp",
"description": "Run Go native fuzzing (go test -fuzz) with blocking and continuous modes, crash collection, session management",
"type": "docker",
"image": "go-fuzzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-crash-analyzer-mcp",
"description": "Analyze Go fuzzing crashes: reproduce, classify (nil-deref, OOR, panic, race, etc.), deduplicate by stack signature",
"type": "docker",
"image": "go-crash-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
}
],
"workflow_hints_file": "mcp-security-hub/workflow-hints.json"
}

View File

@@ -1,6 +1,6 @@
[project]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md"
requires-python = ">=3.14"

4
uv.lock generated
View File

@@ -432,6 +432,7 @@ dependencies = [
{ name = "fuzzforge-common" },
{ name = "pydantic" },
{ name = "pydantic-settings" },
{ name = "pyyaml" },
{ name = "structlog" },
]
@@ -460,6 +461,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" },
{ name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" },
{ name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" },
{ name = "structlog", specifier = "==25.5.0" },
]
@@ -467,7 +469,7 @@ provides-extras = ["lints", "tests"]
[[package]]
name = "fuzzforge-oss"
version = "1.0.0"
version = "0.8.0"
source = { virtual = "." }
dependencies = [
{ name = "fuzzforge-cli" },