Compare commits

...

3 Commits

Author SHA1 Message Date
AFredefon
d3a20b3846 feat: implement skill packs system 2026-04-07 16:12:14 +02:00
AFredefon
664278da3f feat: implement artifact management tools 2026-04-07 16:06:47 +02:00
AFredefon
9374fd3aee feat: implement workflow suggestions pipeline 2026-04-07 01:50:21 +02:00
13 changed files with 1284 additions and 28 deletions

View File

@@ -294,3 +294,17 @@ class HubConfig(BaseModel):
default=True, default=True,
description="Cache discovered tools", description="Cache discovered tools",
) )
#: Workflow hints indexed by "after:<tool_name>" keys.
#: Loaded inline or merged from workflow_hints_file.
workflow_hints: dict[str, Any] = Field(
default_factory=dict,
description="Workflow hints indexed by 'after:<tool_name>'",
)
#: Optional path to an external workflow-hints.json file.
#: Relative paths are resolved relative to the hub-config.json location.
workflow_hints_file: str | None = Field(
default=None,
description="Path to an external workflow-hints.json to load and merge",
)

View File

@@ -87,6 +87,28 @@ class HubRegistry:
config=server_config, config=server_config,
) )
# Load and merge external workflow hints file if specified.
if self._config.workflow_hints_file:
hints_path = Path(self._config.workflow_hints_file)
if not hints_path.is_absolute():
hints_path = config_path.parent / hints_path
if hints_path.exists():
try:
with hints_path.open() as hf:
hints_data = json.load(hf)
self._config.workflow_hints.update(hints_data.get("hints", {}))
logger.info(
"Loaded workflow hints",
path=str(hints_path),
hints=len(self._config.workflow_hints),
)
except Exception as hints_err:
logger.warning(
"Failed to load workflow hints file",
path=str(hints_path),
error=str(hints_err),
)
logger.info( logger.info(
"Loaded hub configuration", "Loaded hub configuration",
path=str(config_path), path=str(config_path),
@@ -218,6 +240,15 @@ class HubRegistry:
server.discovery_error = None server.discovery_error = None
server.tools = tools server.tools = tools
def get_workflow_hint(self, tool_name: str) -> dict | None:
"""Get the workflow hint for a tool by name.
:param tool_name: Tool name (e.g. ``binwalk_extract``).
:returns: Hint dict for the ``after:<tool_name>`` key, or None.
"""
return self._config.workflow_hints.get(f"after:{tool_name}") or None
def get_all_tools(self) -> list: def get_all_tools(self) -> list:
"""Get all discovered tools from all servers. """Get all discovered tools from all servers.

View File

@@ -10,6 +10,7 @@ dependencies = [
"fuzzforge-common==0.0.1", "fuzzforge-common==0.0.1",
"pydantic==2.12.4", "pydantic==2.12.4",
"pydantic-settings==2.12.0", "pydantic-settings==2.12.0",
"pyyaml>=6.0",
"structlog==25.5.0", "structlog==25.5.0",
] ]

View File

@@ -53,40 +53,30 @@ Typical workflow:
4. Discover tools from servers with `discover_hub_tools` 4. Discover tools from servers with `discover_hub_tools`
5. Execute hub tools with `execute_hub_tool` 5. Execute hub tools with `execute_hub_tool`
Skill packs:
Use `list_skills` to see available analysis pipelines (e.g. firmware-analysis).
Load one with `load_skill("firmware-analysis")` to get domain-specific guidance
and a scoped list of relevant hub servers. Skill packs describe the methodology —
follow the pipeline steps while adapting to what you find at each stage.
Agent context convention: Agent context convention:
When you call `discover_hub_tools`, some servers return an `agent_context` field When you call `discover_hub_tools`, some servers return an `agent_context` field
with usage tips, known issues, rule templates, and workflow guidance. Always read with usage tips, known issues, rule templates, and workflow guidance. Always read
this context before using the server's tools. this context before using the server's tools.
Artifact tracking:
After each `execute_hub_tool` call, new output files are automatically tracked.
Use `list_artifacts` to find files produced by previous tools instead of parsing
paths from tool output text. Filter by source server or file type.
File access in containers: File access in containers:
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/` - Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc. - A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools - Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
Stateful tools: Stateful tools:
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch - Some tools require multi-step sessions. Use `start_hub_server` to launch
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`. a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
Firmware analysis pipeline (when analyzing firmware images):
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
Run steps 3 and 4 outputs feed into a final triage summary.
radare2-mcp agent context (upstream tool — no embedded context):
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
returns an empty result, fall back to `list_symbols` and search for dangerous function names
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
- For string extraction, use `run_command` with `iz` (data section strings).
The `list_all_strings` tool may return garbled output for large binaries.
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
""", """,
lifespan=lifespan, lifespan=lifespan,
) )

View File

@@ -3,7 +3,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, cast from typing import TYPE_CHECKING, Any, cast
from fastmcp.server.dependencies import get_context from fastmcp.server.dependencies import get_context
@@ -21,6 +21,9 @@ _current_project_path: Path | None = None
# Singleton storage instance # Singleton storage instance
_storage: LocalStorage | None = None _storage: LocalStorage | None = None
# Currently loaded skill pack (set by load_skill)
_active_skill: dict[str, Any] | None = None
def set_current_project_path(project_path: Path) -> None: def set_current_project_path(project_path: Path) -> None:
"""Set the current project path. """Set the current project path.
@@ -75,3 +78,22 @@ def get_storage() -> LocalStorage:
settings = get_settings() settings = get_settings()
_storage = LocalStorage(settings.storage.path) _storage = LocalStorage(settings.storage.path)
return _storage return _storage
def set_active_skill(skill: dict[str, Any] | None) -> None:
"""Set (or clear) the currently loaded skill pack.
:param skill: Parsed skill dict, or None to unload.
"""
global _active_skill
_active_skill = skill
def get_active_skill() -> dict[str, Any] | None:
"""Get the currently loaded skill pack.
:return: Active skill dict, or None if no skill is loaded.
"""
return _active_skill

View File

@@ -0,0 +1,44 @@
name: firmware-analysis
description: |
## Firmware Binary Vulnerability Analysis
Goal: Find exploitable vulnerabilities in firmware images.
### Pipeline
1. **Extract the filesystem** from the firmware image.
Look for SquashFS, JFFS2, CPIO, or other embedded filesystems.
2. **Scan extracted files for vulnerability patterns.**
Use vulnerability-focused rules to identify binaries with dangerous
function calls (system, strcpy, popen, sprintf, gets).
Prioritize targets by match count — the binary with the most hits
is the highest-priority target.
3. **Deep-analyze the highest-priority binary.**
Open a persistent analysis session. Look for:
- Dangerous function calls with unsanitized input
- Hardcoded credentials or backdoor strings
- Network service listeners with weak input validation
Focus on confirming whether flagged patterns are actually reachable.
4. **Search for known CVEs** matching library version strings found
during analysis. Cross-reference with public exploit databases.
5. **Compile findings** with severity ratings:
- CRITICAL: confirmed remote code execution paths
- HIGH: command injection or buffer overflow with reachable input
- MEDIUM: hardcoded credentials, weak crypto, format string issues
- LOW: informational findings (library versions, service fingerprints)
### Key files to prioritize in extracted firmware
- `usr/sbin/httpd`, `usr/bin/httpd` — web servers (high-priority)
- `etc/shadow`, `etc/passwd` — credential files
- `www/cgi-bin/*` — CGI scripts (command injection vectors)
- Custom binaries in `usr/sbin/`, `usr/bin/` — vendor attack surface
servers:
- binwalk-mcp
- yara-mcp
- radare2-mcp
- searchsploit-mcp

View File

@@ -0,0 +1,90 @@
name: go-fuzzing
description: |
## Go Fuzzing Vulnerability Discovery
Goal: Find memory safety bugs, panics, and logic errors in a Go project
using native Go fuzzing (go test -fuzz).
### Pipeline
1. **Analyze the Go project** to understand its attack surface.
Use `go_analyze` to scan the codebase and identify:
- Fuzzable entry points: functions accepting `[]byte`, `string`,
`io.Reader`, or other parser-like signatures (`Parse*`, `Decode*`,
`Unmarshal*`, `Read*`, `Open*`)
- Existing `Fuzz*` test functions already in `*_test.go` files
- Unsafe/cgo usage that increases the severity of any bugs found
- Known CVEs via govulncheck (enable with `run_vulncheck: true`)
If there are **no existing Fuzz targets**, stop here and report
that the project needs fuzz harnesses written first, listing the
recommended entry points from the analysis.
2. **Test harness quality** before committing to a long fuzzing campaign.
Use `go_harness_test` to evaluate each Fuzz* function:
- Compilation check — does `go test -c` succeed?
- Seed execution — do the seed corpus entries pass without panics?
- Short fuzzing trial — does the harness sustain fuzzing for 15-30s?
- Quality score (0-100): ≥80 = production-ready, ≥50 = needs work, <50 = broken
**Decision point:**
- If all harnesses are **broken** (score < 50): stop and report issues.
The user needs to fix them before fuzzing is useful.
- If some are **production-ready** or **needs-improvement** (score ≥ 50):
proceed with those targets to step 3.
- Skip broken harnesses — do not waste fuzzing time on them.
3. **Run fuzzing** on the viable targets.
Use `go_fuzz_run` for a bounded campaign:
- Set `duration` based on project size: 60-120s for quick scan,
300-600s for thorough analysis.
- Pass only the targets that scored ≥ 50 in step 2 via the `targets`
parameter — do not fuzz broken harnesses.
- The fuzzer collects crash inputs to `/app/output/crashes/{FuzzName}/`.
**Alternative — continuous mode** for deeper exploration:
- Use `go_fuzz_start` to begin background fuzzing.
- Periodically check `go_fuzz_status` to monitor progress.
- Use `go_fuzz_stop` when satisfied or when crashes are found.
If **no crashes** are found after a reasonable duration, report that
the fuzzing campaign completed cleanly with the execution metrics.
4. **Analyze crashes** found during fuzzing.
Use `go_crash_analyze` to process the crash inputs:
- Reproduction: re-run each crash input to confirm it's real
- Classification: categorize by type (nil-dereference, index-out-of-range,
slice-bounds, divide-by-zero, stack-overflow, data-race, panic, etc.)
- Severity assignment: critical / high / medium / low
- Deduplication: group crashes by signature (target + type + top 3 frames)
Skip this step if no crashes were found in step 3.
5. **Compile the vulnerability report** with findings organized by severity:
- **CRITICAL**: nil-dereference, segfault, data-race, stack-overflow
- **HIGH**: index/slice out of bounds, allocation overflow
- **MEDIUM**: integer overflow, divide by zero, explicit panics
- **LOW**: timeout, unclassified crashes
For each unique crash, include:
- The fuzz target that triggered it
- The crash type and root cause function + file + line
- Whether it was reproducible
- The crash input file path for manual investigation
### What the user's project needs
- A `go.mod` file (any Go module)
- At least one `*_test.go` file with `func FuzzXxx(f *testing.F)` functions
- Seed corpus entries added via `f.Add(...)` in the Fuzz functions
### Interpretation guide
- **govulncheck CVEs** (step 1) are known dependency vulnerabilities — report separately
- **Fuzzer crashes** (steps 3-4) are new bugs found by fuzzing the project's own code
- High execution counts with zero crashes = good sign (code is robust to that input space)
- Low quality scores in step 2 usually mean the harness needs better seed corpus or input handling
servers:
- go-analyzer-mcp
- go-harness-tester-mcp
- go-fuzzer-mcp
- go-crash-analyzer-mcp

View File

@@ -13,12 +13,15 @@ from __future__ import annotations
import json import json
import logging import logging
import mimetypes
from datetime import UTC, datetime from datetime import UTC, datetime
from pathlib import Path from pathlib import Path
from tarfile import open as Archive # noqa: N812 from tarfile import open as Archive # noqa: N812
from typing import Any from typing import Any
from uuid import uuid4 from uuid import uuid4
import yaml
logger = logging.getLogger("fuzzforge-mcp") logger = logging.getLogger("fuzzforge-mcp")
#: Name of the FuzzForge storage directory within projects. #: Name of the FuzzForge storage directory within projects.
@@ -90,6 +93,7 @@ class LocalStorage:
"# FuzzForge storage - ignore large/temporary files\n" "# FuzzForge storage - ignore large/temporary files\n"
"runs/\n" "runs/\n"
"output/\n" "output/\n"
"artifacts.json\n"
"!config.json\n" "!config.json\n"
) )
@@ -273,3 +277,301 @@ class LocalStorage:
except Exception as exc: except Exception as exc:
msg = f"Failed to extract results: {exc}" msg = f"Failed to extract results: {exc}"
raise StorageError(msg) from exc raise StorageError(msg) from exc
# ------------------------------------------------------------------
# Artifact tracking
# ------------------------------------------------------------------
def _artifacts_path(self, project_path: Path) -> Path:
"""Get the path to the artifacts registry file.
:param project_path: Path to the project directory.
:returns: Path to artifacts.json.
"""
return self._get_project_path(project_path) / "artifacts.json"
def _load_artifacts(self, project_path: Path) -> list[dict[str, Any]]:
"""Load the artifact registry from disk.
:param project_path: Path to the project directory.
:returns: List of artifact dicts.
"""
path = self._artifacts_path(project_path)
if path.exists():
try:
return json.loads(path.read_text()) # type: ignore[no-any-return]
except (json.JSONDecodeError, OSError):
return []
return []
def _save_artifacts(self, project_path: Path, artifacts: list[dict[str, Any]]) -> None:
"""Persist the artifact registry to disk.
:param project_path: Path to the project directory.
:param artifacts: Full artifact list to write.
"""
path = self._artifacts_path(project_path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(artifacts, indent=2, default=str))
def _classify_file(self, file_path: Path) -> str:
"""Classify a file into a human-friendly type string.
:param file_path: Path to the file.
:returns: Type string (e.g. "elf-binary", "text", "directory").
"""
mime, _ = mimetypes.guess_type(str(file_path))
suffix = file_path.suffix.lower()
# Try reading ELF magic for binaries with no extension
if mime is None and suffix == "":
try:
header = file_path.read_bytes()[:4]
if header == b"\x7fELF":
return "elf-binary"
except OSError:
pass
if mime:
if "json" in mime:
return "json"
if "text" in mime or "xml" in mime or "yaml" in mime:
return "text"
if "image" in mime:
return "image"
if "octet-stream" in mime:
return "binary"
type_map: dict[str, str] = {
".json": "json",
".sarif": "sarif",
".md": "markdown",
".txt": "text",
".log": "text",
".csv": "csv",
".yaml": "yaml",
".yml": "yaml",
".xml": "xml",
".html": "html",
".elf": "elf-binary",
".so": "elf-binary",
".bin": "binary",
".gz": "archive",
".tar": "archive",
".zip": "archive",
}
return type_map.get(suffix, "binary")
def scan_artifacts(
self,
project_path: Path,
server_name: str,
tool_name: str,
) -> list[dict[str, Any]]:
"""Scan the output directory for new or modified files and register them.
Compares the current state of .fuzzforge/output/ against the existing
artifact registry and registers any new or modified files.
:param project_path: Path to the project directory.
:param server_name: Hub server that produced the artifacts.
:param tool_name: Tool that produced the artifacts.
:returns: List of newly registered artifact dicts.
"""
output_path = self.get_project_output_path(project_path)
if output_path is None or not output_path.exists():
return []
existing = self._load_artifacts(project_path)
known: dict[str, dict[str, Any]] = {a["path"]: a for a in existing}
now = datetime.now(tz=UTC).isoformat()
new_artifacts: list[dict[str, Any]] = []
for file_path in output_path.rglob("*"):
if not file_path.is_file():
continue
# Use the container-style path (/app/output/...) so it's
# directly usable in subsequent tool calls.
relative = file_path.relative_to(output_path)
container_path = f"/app/output/{relative}"
stat = file_path.stat()
size = stat.st_size
mtime = datetime.fromtimestamp(stat.st_mtime, tz=UTC).isoformat()
prev = known.get(container_path)
if prev and prev.get("mtime") == mtime and prev.get("size") == size:
continue # Unchanged — skip
artifact: dict[str, Any] = {
"path": container_path,
"host_path": str(file_path),
"type": self._classify_file(file_path),
"size": size,
"mtime": mtime,
"source_server": server_name,
"source_tool": tool_name,
"registered_at": now,
}
if prev:
# Update existing entry in-place
idx = next(i for i, a in enumerate(existing) if a["path"] == container_path)
existing[idx] = artifact
else:
existing.append(artifact)
new_artifacts.append(artifact)
if new_artifacts:
self._save_artifacts(project_path, existing)
logger.info(
"Registered %d new artifact(s) from %s:%s",
len(new_artifacts),
server_name,
tool_name,
)
return new_artifacts
def list_artifacts(
self,
project_path: Path,
*,
source: str | None = None,
artifact_type: str | None = None,
) -> list[dict[str, Any]]:
"""List registered artifacts, with optional filters.
:param project_path: Path to the project directory.
:param source: Filter by source server name.
:param artifact_type: Filter by artifact type (e.g. "elf-binary", "json").
:returns: List of matching artifact dicts.
"""
artifacts = self._load_artifacts(project_path)
if source:
artifacts = [a for a in artifacts if a.get("source_server") == source]
if artifact_type:
artifacts = [a for a in artifacts if a.get("type") == artifact_type]
return artifacts
def get_artifact(self, project_path: Path, path: str) -> dict[str, Any] | None:
"""Get a single artifact by its container path.
:param project_path: Path to the project directory.
:param path: Container path of the artifact (e.g. /app/output/...).
:returns: Artifact dict, or None if not found.
"""
artifacts = self._load_artifacts(project_path)
for artifact in artifacts:
if artifact["path"] == path:
return artifact
return None
# ------------------------------------------------------------------
# Skill packs
# ------------------------------------------------------------------
#: Directory containing built-in skill packs shipped with FuzzForge.
_BUILTIN_SKILLS_DIR: Path = Path(__file__).parent / "skills"
def _skill_dirs(self, project_path: Path) -> list[Path]:
"""Return skill directories in priority order (project-local first).
:param project_path: Path to the project directory.
:returns: List of directories that may contain skill YAML files.
"""
dirs: list[Path] = []
project_skills = self._get_project_path(project_path) / "skills"
if project_skills.is_dir():
dirs.append(project_skills)
if self._BUILTIN_SKILLS_DIR.is_dir():
dirs.append(self._BUILTIN_SKILLS_DIR)
return dirs
def list_skills(self, project_path: Path) -> list[dict[str, Any]]:
"""List available skill packs from project and built-in directories.
:param project_path: Path to the project directory.
:returns: List of skill summaries (name, description first line, source).
"""
seen: set[str] = set()
skills: list[dict[str, Any]] = []
for skill_dir in self._skill_dirs(project_path):
for yaml_path in sorted(skill_dir.glob("*.yaml")):
skill = self._parse_skill_file(yaml_path)
if skill is None:
continue
name = skill["name"]
if name in seen:
continue # project-local overrides built-in
seen.add(name)
desc = skill.get("description", "")
first_line = desc.strip().split("\n", 1)[0] if desc else ""
is_project = ".fuzzforge" in str(yaml_path.parent)
source = "project" if is_project else "builtin"
skills.append({
"name": name,
"summary": first_line,
"source": source,
"servers": skill.get("servers", []),
})
return skills
def load_skill(self, project_path: Path, name: str) -> dict[str, Any] | None:
"""Load a skill pack by name.
Searches project-local skills first, then built-in skills.
:param project_path: Path to the project directory.
:param name: Skill name (filename without .yaml extension).
:returns: Parsed skill dict with name, description, servers — or None.
"""
for skill_dir in self._skill_dirs(project_path):
yaml_path = skill_dir / f"{name}.yaml"
if yaml_path.is_file():
return self._parse_skill_file(yaml_path)
return None
@staticmethod
def _parse_skill_file(yaml_path: Path) -> dict[str, Any] | None:
"""Parse and validate a skill YAML file.
:param yaml_path: Path to the YAML file.
:returns: Parsed skill dict, or None if invalid.
"""
try:
data = yaml.safe_load(yaml_path.read_text())
except (yaml.YAMLError, OSError):
logger.warning("Failed to parse skill file: %s", yaml_path)
return None
if not isinstance(data, dict):
return None
name = data.get("name")
if not name or not isinstance(name, str):
logger.warning("Skill file missing 'name': %s", yaml_path)
return None
return {
"name": name,
"description": data.get("description", ""),
"servers": data.get("servers", []),
}

View File

@@ -291,7 +291,33 @@ async def execute_hub_tool(
except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues except Exception: # noqa: BLE001, S110 - never fail the tool call due to recording issues
pass pass
return result.to_dict() # Scan for new artifacts produced by the tool in /app/output.
response = result.to_dict()
try:
storage = get_storage()
project_path = get_project_path()
new_artifacts = storage.scan_artifacts(
project_path=project_path,
server_name=result.server_name,
tool_name=result.tool_name,
)
if new_artifacts:
response["artifacts"] = [
{"path": a["path"], "type": a["type"], "size": a["size"]}
for a in new_artifacts
]
except Exception: # noqa: BLE001, S110 - never fail the tool call due to artifact scanning
pass
# Append workflow suggestions based on hints configured for this tool.
try:
hint = executor.registry.get_workflow_hint(result.tool_name)
if hint:
response["suggested_next_steps"] = hint
except Exception: # noqa: BLE001, S110 - never fail the tool call due to hint lookup
pass
return response
except Exception as e: except Exception as e:
if isinstance(e, ToolError): if isinstance(e, ToolError):

View File

@@ -8,7 +8,13 @@ from typing import Any
from fastmcp import FastMCP from fastmcp import FastMCP
from fastmcp.exceptions import ToolError from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path from fuzzforge_mcp.dependencies import (
get_active_skill,
get_project_path,
get_storage,
set_active_skill,
set_current_project_path,
)
mcp: FastMCP = FastMCP() mcp: FastMCP = FastMCP()
@@ -146,3 +152,166 @@ async def get_execution_results(execution_id: str, extract_to: str | None = None
except Exception as exception: except Exception as exception:
message: str = f"Failed to get execution results: {exception}" message: str = f"Failed to get execution results: {exception}"
raise ToolError(message) from exception raise ToolError(message) from exception
@mcp.tool
async def list_artifacts(
source: str | None = None,
artifact_type: str | None = None,
) -> dict[str, Any]:
"""List all artifacts produced by hub tools in the current project.
Artifacts are files created by tool executions in /app/output/.
They are automatically tracked after each execute_hub_tool call.
:param source: Filter by source server name (e.g. "binwalk-mcp").
:param artifact_type: Filter by type (e.g. "elf-binary", "json", "text", "archive").
:return: List of artifacts with path, type, size, and source info.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifacts = storage.list_artifacts(
project_path,
source=source,
artifact_type=artifact_type,
)
return {
"success": True,
"artifacts": artifacts,
"count": len(artifacts),
}
except Exception as exception:
message: str = f"Failed to list artifacts: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def get_artifact(path: str) -> dict[str, Any]:
"""Get metadata for a specific artifact by its container path.
:param path: Container path of the artifact (e.g. /app/output/extract_abc123/squashfs-root/usr/sbin/httpd).
:return: Artifact metadata including path, type, size, source tool, and timestamps.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
artifact = storage.get_artifact(project_path, path)
if artifact is None:
return {
"success": False,
"path": path,
"error": "Artifact not found",
}
return {
"success": True,
"artifact": artifact,
}
except Exception as exception:
message: str = f"Failed to get artifact: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def list_skills() -> dict[str, Any]:
"""List available skill packs.
Skill packs provide domain-specific pipeline guidance for AI agents.
They describe analysis methodologies and list the hub servers needed.
Project-local skills (.fuzzforge/skills/) override built-in skills.
:return: List of available skills with name, summary, and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skills = storage.list_skills(project_path)
active = get_active_skill()
return {
"success": True,
"skills": skills,
"count": len(skills),
"active_skill": active["name"] if active else None,
}
except Exception as exception:
message: str = f"Failed to list skills: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def load_skill(name: str) -> dict[str, Any]:
"""Load a skill pack to guide the current analysis session.
A skill pack provides a pipeline description and specifies which
hub servers are relevant. Once loaded, the pipeline description
is available as context and only the listed servers need discovery.
:param name: Skill name (e.g. "firmware-analysis").
:return: Loaded skill with full description and server list.
"""
storage = get_storage()
project_path: Path = get_project_path()
try:
skill = storage.load_skill(project_path, name)
if skill is None:
return {
"success": False,
"name": name,
"error": f"Skill '{name}' not found. Use list_skills to see available skills.",
}
set_active_skill(skill)
return {
"success": True,
"name": skill["name"],
"description": skill["description"],
"servers": skill["servers"],
"message": f"Skill '{name}' loaded. Follow the pipeline description above. "
f"Discover tools from: {', '.join(skill['servers'])}" if skill["servers"] else f"Skill '{name}' loaded.",
}
except Exception as exception:
message: str = f"Failed to load skill: {exception}"
raise ToolError(message) from exception
@mcp.tool
async def unload_skill() -> dict[str, Any]:
"""Unload the currently active skill pack.
Clears the active pipeline guidance. The agent returns to generic mode.
:return: Confirmation of unload.
"""
active = get_active_skill()
if active is None:
return {
"success": True,
"message": "No skill was loaded.",
}
name = active["name"]
set_active_skill(None)
return {
"success": True,
"message": f"Skill '{name}' unloaded.",
}

View File

@@ -1 +1,566 @@
{"servers": []} {
"servers": [
{
"name": "bloodhound-mcp",
"description": "bloodhound-mcp \u2014 active-directory",
"type": "docker",
"image": "bloodhound-mcp:latest",
"category": "active-directory",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "binwalk-mcp",
"description": "binwalk-mcp \u2014 binary-analysis",
"type": "docker",
"image": "binwalk-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "capa-mcp",
"description": "capa-mcp \u2014 binary-analysis",
"type": "docker",
"image": "capa-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ghidra-mcp",
"description": "ghidra-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ghidra-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ida-mcp",
"description": "ida-mcp \u2014 binary-analysis",
"type": "docker",
"image": "ida-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "radare2-mcp",
"description": "radare2-mcp \u2014 binary-analysis",
"type": "docker",
"image": "radare2-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "yara-mcp",
"description": "yara-mcp \u2014 binary-analysis",
"type": "docker",
"image": "yara-mcp:latest",
"category": "binary-analysis",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "daml-viewer-mcp",
"description": "daml-viewer-mcp \u2014 blockchain",
"type": "docker",
"image": "daml-viewer-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "medusa-mcp",
"description": "medusa-mcp \u2014 blockchain",
"type": "docker",
"image": "medusa-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "solazy-mcp",
"description": "solazy-mcp \u2014 blockchain",
"type": "docker",
"image": "solazy-mcp:latest",
"category": "blockchain",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "prowler-mcp",
"description": "prowler-mcp \u2014 cloud-security",
"type": "docker",
"image": "prowler-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "roadrecon-mcp",
"description": "roadrecon-mcp \u2014 cloud-security",
"type": "docker",
"image": "roadrecon-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "trivy-mcp",
"description": "trivy-mcp \u2014 cloud-security",
"type": "docker",
"image": "trivy-mcp:latest",
"category": "cloud-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "semgrep-mcp",
"description": "semgrep-mcp \u2014 code-security",
"type": "docker",
"image": "semgrep-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "searchsploit-mcp",
"description": "searchsploit-mcp \u2014 exploitation",
"type": "docker",
"image": "searchsploit-mcp:latest",
"category": "exploitation",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "boofuzz-mcp",
"description": "boofuzz-mcp \u2014 fuzzing",
"type": "docker",
"image": "boofuzz-mcp:latest",
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "dharma-mcp",
"description": "dharma-mcp \u2014 fuzzing",
"type": "docker",
"image": "dharma-mcp:latest",
"category": "fuzzing",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "dnstwist-mcp",
"description": "dnstwist-mcp \u2014 osint",
"type": "docker",
"image": "dnstwist-mcp:latest",
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "maigret-mcp",
"description": "maigret-mcp \u2014 osint",
"type": "docker",
"image": "maigret-mcp:latest",
"category": "osint",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "hashcat-mcp",
"description": "hashcat-mcp \u2014 password-cracking",
"type": "docker",
"image": "hashcat-mcp:latest",
"category": "password-cracking",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "externalattacker-mcp",
"description": "externalattacker-mcp \u2014 reconnaissance",
"type": "docker",
"image": "externalattacker-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "masscan-mcp",
"description": "masscan-mcp \u2014 reconnaissance",
"type": "docker",
"image": "masscan-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "networksdb-mcp",
"description": "networksdb-mcp \u2014 reconnaissance",
"type": "docker",
"image": "networksdb-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nmap-mcp",
"description": "nmap-mcp \u2014 reconnaissance",
"type": "docker",
"image": "nmap-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "pd-tools-mcp",
"description": "pd-tools-mcp \u2014 reconnaissance",
"type": "docker",
"image": "pd-tools-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "shodan-mcp",
"description": "shodan-mcp \u2014 reconnaissance",
"type": "docker",
"image": "shodan-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "whatweb-mcp",
"description": "whatweb-mcp \u2014 reconnaissance",
"type": "docker",
"image": "whatweb-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "zoomeye-mcp",
"description": "zoomeye-mcp \u2014 reconnaissance",
"type": "docker",
"image": "zoomeye-mcp:latest",
"category": "reconnaissance",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "gitleaks-mcp",
"description": "gitleaks-mcp \u2014 secrets",
"type": "docker",
"image": "gitleaks-mcp:latest",
"category": "secrets",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "otx-mcp",
"description": "otx-mcp \u2014 threat-intel",
"type": "docker",
"image": "otx-mcp:latest",
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "virustotal-mcp",
"description": "virustotal-mcp \u2014 threat-intel",
"type": "docker",
"image": "virustotal-mcp:latest",
"category": "threat-intel",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "burp-mcp",
"description": "burp-mcp \u2014 web-security",
"type": "docker",
"image": "burp-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "ffuf-mcp",
"description": "ffuf-mcp \u2014 web-security",
"type": "docker",
"image": "ffuf-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nikto-mcp",
"description": "nikto-mcp \u2014 web-security",
"type": "docker",
"image": "nikto-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "nuclei-mcp",
"description": "nuclei-mcp \u2014 web-security",
"type": "docker",
"image": "nuclei-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "sqlmap-mcp",
"description": "sqlmap-mcp \u2014 web-security",
"type": "docker",
"image": "sqlmap-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "waybackurls-mcp",
"description": "waybackurls-mcp \u2014 web-security",
"type": "docker",
"image": "waybackurls-mcp:latest",
"category": "web-security",
"capabilities": [
"NET_RAW"
],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-analyzer-mcp",
"description": "Go static analysis: fuzzable entry points, existing Fuzz* targets, unsafe/cgo usage, CVE scanning via govulncheck",
"type": "docker",
"image": "go-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-harness-tester-mcp",
"description": "Test Go fuzz harness quality: compilation, seed execution, fuzzing trial, quality scoring 0-100",
"type": "docker",
"image": "go-harness-tester-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-fuzzer-mcp",
"description": "Run Go native fuzzing (go test -fuzz) with blocking and continuous modes, crash collection, session management",
"type": "docker",
"image": "go-fuzzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
},
{
"name": "go-crash-analyzer-mcp",
"description": "Analyze Go fuzzing crashes: reproduce, classify (nil-deref, OOR, panic, race, etc.), deduplicate by stack signature",
"type": "docker",
"image": "go-crash-analyzer-mcp:latest",
"category": "code-security",
"capabilities": [],
"volumes": [
"/home/afredefon/.fuzzforge/hub/workspace:/data"
],
"enabled": true,
"source_hub": "mcp-security-hub"
}
],
"workflow_hints_file": "mcp-security-hub/workflow-hints.json"
}

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "fuzzforge-oss" name = "fuzzforge-oss"
version = "1.0.0" version = "0.8.0"
description = "FuzzForge AI - AI-driven security research platform for local execution" description = "FuzzForge AI - AI-driven security research platform for local execution"
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"

4
uv.lock generated
View File

@@ -432,6 +432,7 @@ dependencies = [
{ name = "fuzzforge-common" }, { name = "fuzzforge-common" },
{ name = "pydantic" }, { name = "pydantic" },
{ name = "pydantic-settings" }, { name = "pydantic-settings" },
{ name = "pyyaml" },
{ name = "structlog" }, { name = "structlog" },
] ]
@@ -460,6 +461,7 @@ requires-dist = [
{ name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" }, { name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" },
{ name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" }, { name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" },
{ name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" }, { name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" },
{ name = "pyyaml", specifier = ">=6.0" },
{ name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" }, { name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" },
{ name = "structlog", specifier = "==25.5.0" }, { name = "structlog", specifier = "==25.5.0" },
] ]
@@ -467,7 +469,7 @@ provides-extras = ["lints", "tests"]
[[package]] [[package]]
name = "fuzzforge-oss" name = "fuzzforge-oss"
version = "1.0.0" version = "0.8.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "fuzzforge-cli" }, { name = "fuzzforge-cli" },