mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-04-10 00:22:12 +02:00
Compare commits
1 Commits
feat/artif
...
feat/skill
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d3a20b3846 |
@@ -10,6 +10,7 @@ dependencies = [
|
||||
"fuzzforge-common==0.0.1",
|
||||
"pydantic==2.12.4",
|
||||
"pydantic-settings==2.12.0",
|
||||
"pyyaml>=6.0",
|
||||
"structlog==25.5.0",
|
||||
]
|
||||
|
||||
|
||||
@@ -53,40 +53,30 @@ Typical workflow:
|
||||
4. Discover tools from servers with `discover_hub_tools`
|
||||
5. Execute hub tools with `execute_hub_tool`
|
||||
|
||||
Skill packs:
|
||||
Use `list_skills` to see available analysis pipelines (e.g. firmware-analysis).
|
||||
Load one with `load_skill("firmware-analysis")` to get domain-specific guidance
|
||||
and a scoped list of relevant hub servers. Skill packs describe the methodology —
|
||||
follow the pipeline steps while adapting to what you find at each stage.
|
||||
|
||||
Agent context convention:
|
||||
When you call `discover_hub_tools`, some servers return an `agent_context` field
|
||||
with usage tips, known issues, rule templates, and workflow guidance. Always read
|
||||
this context before using the server's tools.
|
||||
|
||||
Artifact tracking:
|
||||
After each `execute_hub_tool` call, new output files are automatically tracked.
|
||||
Use `list_artifacts` to find files produced by previous tools instead of parsing
|
||||
paths from tool output text. Filter by source server or file type.
|
||||
|
||||
File access in containers:
|
||||
- Assets set via `set_project_assets` are mounted read-only at `/app/uploads/` and `/app/samples/`
|
||||
- A writable output directory is mounted at `/app/output/` — use it for extraction results, reports, etc.
|
||||
- Always use container paths (e.g. `/app/uploads/file`) when passing file arguments to hub tools
|
||||
|
||||
Stateful tools:
|
||||
- Some tools (e.g. radare2-mcp) require multi-step sessions. Use `start_hub_server` to launch
|
||||
- Some tools require multi-step sessions. Use `start_hub_server` to launch
|
||||
a persistent container, then `execute_hub_tool` calls reuse that container. Stop with `stop_hub_server`.
|
||||
|
||||
Firmware analysis pipeline (when analyzing firmware images):
|
||||
1. **binwalk-mcp** (`binwalk_scan` + `binwalk_extract`) — identify and extract filesystem from firmware
|
||||
2. **yara-mcp** (`yara_scan_with_rules`) — scan extracted files with vulnerability rules to prioritize targets
|
||||
3. **radare2-mcp** (persistent session) — confirm dangerous code paths
|
||||
4. **searchsploit-mcp** (`search_exploitdb`) — query version strings from radare2 against ExploitDB
|
||||
Run steps 3 and 4 outputs feed into a final triage summary.
|
||||
|
||||
radare2-mcp agent context (upstream tool — no embedded context):
|
||||
- Start a persistent session with `start_hub_server("radare2-mcp")` before any calls.
|
||||
- IMPORTANT: the `open_file` tool requires the parameter name `file_path` (with underscore),
|
||||
not `filepath`. Example: `execute_hub_tool("hub:radare2-mcp:open_file", {"file_path": "/app/output/..."})`
|
||||
- Workflow: `open_file` → `analyze` → `list_imports` → `xrefs_to` → `run_command` with `pdf @ <addr>`.
|
||||
- Static binary fallback: firmware binaries are often statically linked. When `list_imports`
|
||||
returns an empty result, fall back to `list_symbols` and search for dangerous function names
|
||||
(system, strcpy, gets, popen, sprintf) in the output. Then use `xrefs_to` on their addresses.
|
||||
- For string extraction, use `run_command` with `iz` (data section strings).
|
||||
The `list_all_strings` tool may return garbled output for large binaries.
|
||||
- For decompilation, use `run_command` with `pdc @ <addr>` (pseudo-C) or `pdf @ <addr>`
|
||||
(annotated disassembly). The `decompile` tool may fail with "not available in current mode".
|
||||
- Stop the session with `stop_hub_server("radare2-mcp")` when done.
|
||||
""",
|
||||
lifespan=lifespan,
|
||||
)
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, cast
|
||||
from typing import TYPE_CHECKING, Any, cast
|
||||
|
||||
from fastmcp.server.dependencies import get_context
|
||||
|
||||
@@ -21,6 +21,9 @@ _current_project_path: Path | None = None
|
||||
# Singleton storage instance
|
||||
_storage: LocalStorage | None = None
|
||||
|
||||
# Currently loaded skill pack (set by load_skill)
|
||||
_active_skill: dict[str, Any] | None = None
|
||||
|
||||
|
||||
def set_current_project_path(project_path: Path) -> None:
|
||||
"""Set the current project path.
|
||||
@@ -75,3 +78,22 @@ def get_storage() -> LocalStorage:
|
||||
settings = get_settings()
|
||||
_storage = LocalStorage(settings.storage.path)
|
||||
return _storage
|
||||
|
||||
|
||||
def set_active_skill(skill: dict[str, Any] | None) -> None:
|
||||
"""Set (or clear) the currently loaded skill pack.
|
||||
|
||||
:param skill: Parsed skill dict, or None to unload.
|
||||
|
||||
"""
|
||||
global _active_skill
|
||||
_active_skill = skill
|
||||
|
||||
|
||||
def get_active_skill() -> dict[str, Any] | None:
|
||||
"""Get the currently loaded skill pack.
|
||||
|
||||
:return: Active skill dict, or None if no skill is loaded.
|
||||
|
||||
"""
|
||||
return _active_skill
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
name: firmware-analysis
|
||||
description: |
|
||||
## Firmware Binary Vulnerability Analysis
|
||||
|
||||
Goal: Find exploitable vulnerabilities in firmware images.
|
||||
|
||||
### Pipeline
|
||||
|
||||
1. **Extract the filesystem** from the firmware image.
|
||||
Look for SquashFS, JFFS2, CPIO, or other embedded filesystems.
|
||||
|
||||
2. **Scan extracted files for vulnerability patterns.**
|
||||
Use vulnerability-focused rules to identify binaries with dangerous
|
||||
function calls (system, strcpy, popen, sprintf, gets).
|
||||
Prioritize targets by match count — the binary with the most hits
|
||||
is the highest-priority target.
|
||||
|
||||
3. **Deep-analyze the highest-priority binary.**
|
||||
Open a persistent analysis session. Look for:
|
||||
- Dangerous function calls with unsanitized input
|
||||
- Hardcoded credentials or backdoor strings
|
||||
- Network service listeners with weak input validation
|
||||
Focus on confirming whether flagged patterns are actually reachable.
|
||||
|
||||
4. **Search for known CVEs** matching library version strings found
|
||||
during analysis. Cross-reference with public exploit databases.
|
||||
|
||||
5. **Compile findings** with severity ratings:
|
||||
- CRITICAL: confirmed remote code execution paths
|
||||
- HIGH: command injection or buffer overflow with reachable input
|
||||
- MEDIUM: hardcoded credentials, weak crypto, format string issues
|
||||
- LOW: informational findings (library versions, service fingerprints)
|
||||
|
||||
### Key files to prioritize in extracted firmware
|
||||
- `usr/sbin/httpd`, `usr/bin/httpd` — web servers (high-priority)
|
||||
- `etc/shadow`, `etc/passwd` — credential files
|
||||
- `www/cgi-bin/*` — CGI scripts (command injection vectors)
|
||||
- Custom binaries in `usr/sbin/`, `usr/bin/` — vendor attack surface
|
||||
|
||||
servers:
|
||||
- binwalk-mcp
|
||||
- yara-mcp
|
||||
- radare2-mcp
|
||||
- searchsploit-mcp
|
||||
90
fuzzforge-mcp/src/fuzzforge_mcp/skills/go-fuzzing.yaml
Normal file
90
fuzzforge-mcp/src/fuzzforge_mcp/skills/go-fuzzing.yaml
Normal file
@@ -0,0 +1,90 @@
|
||||
name: go-fuzzing
|
||||
description: |
|
||||
## Go Fuzzing Vulnerability Discovery
|
||||
|
||||
Goal: Find memory safety bugs, panics, and logic errors in a Go project
|
||||
using native Go fuzzing (go test -fuzz).
|
||||
|
||||
### Pipeline
|
||||
|
||||
1. **Analyze the Go project** to understand its attack surface.
|
||||
Use `go_analyze` to scan the codebase and identify:
|
||||
- Fuzzable entry points: functions accepting `[]byte`, `string`,
|
||||
`io.Reader`, or other parser-like signatures (`Parse*`, `Decode*`,
|
||||
`Unmarshal*`, `Read*`, `Open*`)
|
||||
- Existing `Fuzz*` test functions already in `*_test.go` files
|
||||
- Unsafe/cgo usage that increases the severity of any bugs found
|
||||
- Known CVEs via govulncheck (enable with `run_vulncheck: true`)
|
||||
|
||||
If there are **no existing Fuzz targets**, stop here and report
|
||||
that the project needs fuzz harnesses written first, listing the
|
||||
recommended entry points from the analysis.
|
||||
|
||||
2. **Test harness quality** before committing to a long fuzzing campaign.
|
||||
Use `go_harness_test` to evaluate each Fuzz* function:
|
||||
- Compilation check — does `go test -c` succeed?
|
||||
- Seed execution — do the seed corpus entries pass without panics?
|
||||
- Short fuzzing trial — does the harness sustain fuzzing for 15-30s?
|
||||
- Quality score (0-100): ≥80 = production-ready, ≥50 = needs work, <50 = broken
|
||||
|
||||
**Decision point:**
|
||||
- If all harnesses are **broken** (score < 50): stop and report issues.
|
||||
The user needs to fix them before fuzzing is useful.
|
||||
- If some are **production-ready** or **needs-improvement** (score ≥ 50):
|
||||
proceed with those targets to step 3.
|
||||
- Skip broken harnesses — do not waste fuzzing time on them.
|
||||
|
||||
3. **Run fuzzing** on the viable targets.
|
||||
Use `go_fuzz_run` for a bounded campaign:
|
||||
- Set `duration` based on project size: 60-120s for quick scan,
|
||||
300-600s for thorough analysis.
|
||||
- Pass only the targets that scored ≥ 50 in step 2 via the `targets`
|
||||
parameter — do not fuzz broken harnesses.
|
||||
- The fuzzer collects crash inputs to `/app/output/crashes/{FuzzName}/`.
|
||||
|
||||
**Alternative — continuous mode** for deeper exploration:
|
||||
- Use `go_fuzz_start` to begin background fuzzing.
|
||||
- Periodically check `go_fuzz_status` to monitor progress.
|
||||
- Use `go_fuzz_stop` when satisfied or when crashes are found.
|
||||
|
||||
If **no crashes** are found after a reasonable duration, report that
|
||||
the fuzzing campaign completed cleanly with the execution metrics.
|
||||
|
||||
4. **Analyze crashes** found during fuzzing.
|
||||
Use `go_crash_analyze` to process the crash inputs:
|
||||
- Reproduction: re-run each crash input to confirm it's real
|
||||
- Classification: categorize by type (nil-dereference, index-out-of-range,
|
||||
slice-bounds, divide-by-zero, stack-overflow, data-race, panic, etc.)
|
||||
- Severity assignment: critical / high / medium / low
|
||||
- Deduplication: group crashes by signature (target + type + top 3 frames)
|
||||
|
||||
Skip this step if no crashes were found in step 3.
|
||||
|
||||
5. **Compile the vulnerability report** with findings organized by severity:
|
||||
- **CRITICAL**: nil-dereference, segfault, data-race, stack-overflow
|
||||
- **HIGH**: index/slice out of bounds, allocation overflow
|
||||
- **MEDIUM**: integer overflow, divide by zero, explicit panics
|
||||
- **LOW**: timeout, unclassified crashes
|
||||
|
||||
For each unique crash, include:
|
||||
- The fuzz target that triggered it
|
||||
- The crash type and root cause function + file + line
|
||||
- Whether it was reproducible
|
||||
- The crash input file path for manual investigation
|
||||
|
||||
### What the user's project needs
|
||||
- A `go.mod` file (any Go module)
|
||||
- At least one `*_test.go` file with `func FuzzXxx(f *testing.F)` functions
|
||||
- Seed corpus entries added via `f.Add(...)` in the Fuzz functions
|
||||
|
||||
### Interpretation guide
|
||||
- **govulncheck CVEs** (step 1) are known dependency vulnerabilities — report separately
|
||||
- **Fuzzer crashes** (steps 3-4) are new bugs found by fuzzing the project's own code
|
||||
- High execution counts with zero crashes = good sign (code is robust to that input space)
|
||||
- Low quality scores in step 2 usually mean the harness needs better seed corpus or input handling
|
||||
|
||||
servers:
|
||||
- go-analyzer-mcp
|
||||
- go-harness-tester-mcp
|
||||
- go-fuzzer-mcp
|
||||
- go-crash-analyzer-mcp
|
||||
@@ -20,6 +20,8 @@ from tarfile import open as Archive # noqa: N812
|
||||
from typing import Any
|
||||
from uuid import uuid4
|
||||
|
||||
import yaml
|
||||
|
||||
logger = logging.getLogger("fuzzforge-mcp")
|
||||
|
||||
#: Name of the FuzzForge storage directory within projects.
|
||||
@@ -475,3 +477,101 @@ class LocalStorage:
|
||||
if artifact["path"] == path:
|
||||
return artifact
|
||||
return None
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Skill packs
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
#: Directory containing built-in skill packs shipped with FuzzForge.
|
||||
_BUILTIN_SKILLS_DIR: Path = Path(__file__).parent / "skills"
|
||||
|
||||
def _skill_dirs(self, project_path: Path) -> list[Path]:
|
||||
"""Return skill directories in priority order (project-local first).
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:returns: List of directories that may contain skill YAML files.
|
||||
|
||||
"""
|
||||
dirs: list[Path] = []
|
||||
project_skills = self._get_project_path(project_path) / "skills"
|
||||
if project_skills.is_dir():
|
||||
dirs.append(project_skills)
|
||||
if self._BUILTIN_SKILLS_DIR.is_dir():
|
||||
dirs.append(self._BUILTIN_SKILLS_DIR)
|
||||
return dirs
|
||||
|
||||
def list_skills(self, project_path: Path) -> list[dict[str, Any]]:
|
||||
"""List available skill packs from project and built-in directories.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:returns: List of skill summaries (name, description first line, source).
|
||||
|
||||
"""
|
||||
seen: set[str] = set()
|
||||
skills: list[dict[str, Any]] = []
|
||||
|
||||
for skill_dir in self._skill_dirs(project_path):
|
||||
for yaml_path in sorted(skill_dir.glob("*.yaml")):
|
||||
skill = self._parse_skill_file(yaml_path)
|
||||
if skill is None:
|
||||
continue
|
||||
name = skill["name"]
|
||||
if name in seen:
|
||||
continue # project-local overrides built-in
|
||||
seen.add(name)
|
||||
desc = skill.get("description", "")
|
||||
first_line = desc.strip().split("\n", 1)[0] if desc else ""
|
||||
is_project = ".fuzzforge" in str(yaml_path.parent)
|
||||
source = "project" if is_project else "builtin"
|
||||
skills.append({
|
||||
"name": name,
|
||||
"summary": first_line,
|
||||
"source": source,
|
||||
"servers": skill.get("servers", []),
|
||||
})
|
||||
|
||||
return skills
|
||||
|
||||
def load_skill(self, project_path: Path, name: str) -> dict[str, Any] | None:
|
||||
"""Load a skill pack by name.
|
||||
|
||||
Searches project-local skills first, then built-in skills.
|
||||
|
||||
:param project_path: Path to the project directory.
|
||||
:param name: Skill name (filename without .yaml extension).
|
||||
:returns: Parsed skill dict with name, description, servers — or None.
|
||||
|
||||
"""
|
||||
for skill_dir in self._skill_dirs(project_path):
|
||||
yaml_path = skill_dir / f"{name}.yaml"
|
||||
if yaml_path.is_file():
|
||||
return self._parse_skill_file(yaml_path)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def _parse_skill_file(yaml_path: Path) -> dict[str, Any] | None:
|
||||
"""Parse and validate a skill YAML file.
|
||||
|
||||
:param yaml_path: Path to the YAML file.
|
||||
:returns: Parsed skill dict, or None if invalid.
|
||||
|
||||
"""
|
||||
try:
|
||||
data = yaml.safe_load(yaml_path.read_text())
|
||||
except (yaml.YAMLError, OSError):
|
||||
logger.warning("Failed to parse skill file: %s", yaml_path)
|
||||
return None
|
||||
|
||||
if not isinstance(data, dict):
|
||||
return None
|
||||
|
||||
name = data.get("name")
|
||||
if not name or not isinstance(name, str):
|
||||
logger.warning("Skill file missing 'name': %s", yaml_path)
|
||||
return None
|
||||
|
||||
return {
|
||||
"name": name,
|
||||
"description": data.get("description", ""),
|
||||
"servers": data.get("servers", []),
|
||||
}
|
||||
|
||||
@@ -8,7 +8,13 @@ from typing import Any
|
||||
from fastmcp import FastMCP
|
||||
from fastmcp.exceptions import ToolError
|
||||
|
||||
from fuzzforge_mcp.dependencies import get_project_path, get_storage, set_current_project_path
|
||||
from fuzzforge_mcp.dependencies import (
|
||||
get_active_skill,
|
||||
get_project_path,
|
||||
get_storage,
|
||||
set_active_skill,
|
||||
set_current_project_path,
|
||||
)
|
||||
|
||||
mcp: FastMCP = FastMCP()
|
||||
|
||||
@@ -213,3 +219,99 @@ async def get_artifact(path: str) -> dict[str, Any]:
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to get artifact: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
|
||||
@mcp.tool
|
||||
async def list_skills() -> dict[str, Any]:
|
||||
"""List available skill packs.
|
||||
|
||||
Skill packs provide domain-specific pipeline guidance for AI agents.
|
||||
They describe analysis methodologies and list the hub servers needed.
|
||||
Project-local skills (.fuzzforge/skills/) override built-in skills.
|
||||
|
||||
:return: List of available skills with name, summary, and server list.
|
||||
|
||||
"""
|
||||
storage = get_storage()
|
||||
project_path: Path = get_project_path()
|
||||
|
||||
try:
|
||||
skills = storage.list_skills(project_path)
|
||||
active = get_active_skill()
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"skills": skills,
|
||||
"count": len(skills),
|
||||
"active_skill": active["name"] if active else None,
|
||||
}
|
||||
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to list skills: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
|
||||
@mcp.tool
|
||||
async def load_skill(name: str) -> dict[str, Any]:
|
||||
"""Load a skill pack to guide the current analysis session.
|
||||
|
||||
A skill pack provides a pipeline description and specifies which
|
||||
hub servers are relevant. Once loaded, the pipeline description
|
||||
is available as context and only the listed servers need discovery.
|
||||
|
||||
:param name: Skill name (e.g. "firmware-analysis").
|
||||
:return: Loaded skill with full description and server list.
|
||||
|
||||
"""
|
||||
storage = get_storage()
|
||||
project_path: Path = get_project_path()
|
||||
|
||||
try:
|
||||
skill = storage.load_skill(project_path, name)
|
||||
|
||||
if skill is None:
|
||||
return {
|
||||
"success": False,
|
||||
"name": name,
|
||||
"error": f"Skill '{name}' not found. Use list_skills to see available skills.",
|
||||
}
|
||||
|
||||
set_active_skill(skill)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"name": skill["name"],
|
||||
"description": skill["description"],
|
||||
"servers": skill["servers"],
|
||||
"message": f"Skill '{name}' loaded. Follow the pipeline description above. "
|
||||
f"Discover tools from: {', '.join(skill['servers'])}" if skill["servers"] else f"Skill '{name}' loaded.",
|
||||
}
|
||||
|
||||
except Exception as exception:
|
||||
message: str = f"Failed to load skill: {exception}"
|
||||
raise ToolError(message) from exception
|
||||
|
||||
|
||||
@mcp.tool
|
||||
async def unload_skill() -> dict[str, Any]:
|
||||
"""Unload the currently active skill pack.
|
||||
|
||||
Clears the active pipeline guidance. The agent returns to generic mode.
|
||||
|
||||
:return: Confirmation of unload.
|
||||
|
||||
"""
|
||||
active = get_active_skill()
|
||||
if active is None:
|
||||
return {
|
||||
"success": True,
|
||||
"message": "No skill was loaded.",
|
||||
}
|
||||
|
||||
name = active["name"]
|
||||
set_active_skill(None)
|
||||
|
||||
return {
|
||||
"success": True,
|
||||
"message": f"Skill '{name}' unloaded.",
|
||||
}
|
||||
|
||||
4
uv.lock
generated
4
uv.lock
generated
@@ -432,6 +432,7 @@ dependencies = [
|
||||
{ name = "fuzzforge-common" },
|
||||
{ name = "pydantic" },
|
||||
{ name = "pydantic-settings" },
|
||||
{ name = "pyyaml" },
|
||||
{ name = "structlog" },
|
||||
]
|
||||
|
||||
@@ -460,6 +461,7 @@ requires-dist = [
|
||||
{ name = "pytest", marker = "extra == 'tests'", specifier = "==9.0.2" },
|
||||
{ name = "pytest-asyncio", marker = "extra == 'tests'", specifier = "==1.3.0" },
|
||||
{ name = "pytest-httpx", marker = "extra == 'tests'", specifier = "==0.36.0" },
|
||||
{ name = "pyyaml", specifier = ">=6.0" },
|
||||
{ name = "ruff", marker = "extra == 'lints'", specifier = "==0.14.4" },
|
||||
{ name = "structlog", specifier = "==25.5.0" },
|
||||
]
|
||||
@@ -467,7 +469,7 @@ provides-extras = ["lints", "tests"]
|
||||
|
||||
[[package]]
|
||||
name = "fuzzforge-oss"
|
||||
version = "1.0.0"
|
||||
version = "0.8.0"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "fuzzforge-cli" },
|
||||
|
||||
Reference in New Issue
Block a user