feat: FuzzForge AI - complete rewrite for OSS release

This commit is contained in:
AFredefon
2026-01-30 09:57:48 +01:00
commit b46f050aef
226 changed files with 12943 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Install system dependencies for Rust compilation
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Rust toolchain with nightly (required for cargo-fuzz)
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,31 @@
[project]
name = "cargo-fuzzer"
version = "0.1.0"
description = "FuzzForge module that runs cargo-fuzz with libFuzzer on Rust targets"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,516 @@
"""Cargo Fuzzer module for FuzzForge.
This module runs cargo-fuzz (libFuzzer) on validated Rust fuzz targets.
It takes a fuzz project with compiled harnesses and runs fuzzing for a
configurable duration, collecting crashes and statistics.
"""
from __future__ import annotations
import json
import os
import re
import shutil
import subprocess
import signal
import time
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashInfo, FuzzingStats, TargetResult
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Cargo Fuzzer module - runs cargo-fuzz with libFuzzer on Rust targets."""
_settings: Settings | None
_fuzz_project_path: Path | None
_target_results: list[TargetResult]
_crashes_path: Path | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "cargo-fuzzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._fuzz_project_path = None
self._target_results = []
self._crashes_path = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module with settings.
:param settings: Module settings.
"""
self._settings = settings
logger.info("cargo-fuzzer preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the fuzzer.
:param resources: Input resources (fuzz project + source).
:returns: Module execution result.
"""
logger.info("cargo-fuzzer starting", resource_count=len(resources))
# Emit initial progress
self.emit_progress(0, status="initializing", message="Setting up fuzzing environment")
self.emit_event("module_started", resource_count=len(resources))
# Setup the fuzzing environment
if not self._setup_environment(resources):
self.emit_progress(100, status="failed", message="Failed to setup environment")
return FuzzForgeModuleResults.FAILURE
# Get list of fuzz targets
targets = self._get_fuzz_targets()
if not targets:
logger.error("no fuzz targets found")
self.emit_progress(100, status="failed", message="No fuzz targets found")
return FuzzForgeModuleResults.FAILURE
# Filter targets if specific ones were requested
if self._settings and self._settings.targets:
requested = set(self._settings.targets)
targets = [t for t in targets if t in requested]
if not targets:
logger.error("none of the requested targets found", requested=list(requested))
self.emit_progress(100, status="failed", message="Requested targets not found")
return FuzzForgeModuleResults.FAILURE
logger.info("found fuzz targets", targets=targets)
self.emit_event("targets_found", targets=targets, count=len(targets))
# Setup output directories
self._crashes_path = PATH_TO_OUTPUTS / "crashes"
self._crashes_path.mkdir(parents=True, exist_ok=True)
# Run fuzzing on each target
# max_duration=0 means infinite/continuous mode
max_duration = self._settings.max_duration if self._settings else 60
is_continuous = max_duration == 0
if is_continuous:
# Continuous mode: cycle through targets indefinitely
# Each target runs for 60 seconds before moving to next
duration_per_target = 60
else:
duration_per_target = max_duration // max(len(targets), 1)
total_crashes = 0
# In continuous mode, loop forever; otherwise loop once
round_num = 0
while True:
round_num += 1
for i, target in enumerate(targets):
if is_continuous:
progress_msg = f"Round {round_num}: Fuzzing {target}"
else:
progress_msg = f"Fuzzing target {i+1}/{len(targets)}"
progress = int((i / len(targets)) * 100) if not is_continuous else 50
self.emit_progress(
progress,
status="running",
message=progress_msg,
current_task=target,
metrics={
"targets_completed": i,
"total_targets": len(targets),
"crashes_found": total_crashes,
"round": round_num if is_continuous else 1,
}
)
self.emit_event("target_started", target=target, index=i, total=len(targets), round=round_num)
result = self._fuzz_target(target, duration_per_target)
self._target_results.append(result)
total_crashes += len(result.crashes)
# Emit target completion
self.emit_event(
"target_completed",
target=target,
crashes=len(result.crashes),
executions=result.stats.total_executions if result.stats else 0,
coverage=result.stats.coverage_edges if result.stats else 0,
)
logger.info("target completed",
target=target,
crashes=len(result.crashes),
execs=result.stats.total_executions if result.stats else 0)
# Exit loop if not continuous mode
if not is_continuous:
break
# Write output
self._write_output()
# Emit final progress
self.emit_progress(
100,
status="completed",
message=f"Fuzzing completed. Found {total_crashes} crashes.",
metrics={
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": sum(r.stats.total_executions for r in self._target_results if r.stats),
}
)
self.emit_event("module_completed", total_crashes=total_crashes, targets_fuzzed=len(targets))
logger.info("cargo-fuzzer completed",
targets=len(self._target_results),
total_crashes=total_crashes)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _setup_environment(self, resources: list[FuzzForgeModuleResource]) -> bool:
"""Setup the fuzzing environment.
:param resources: Input resources.
:returns: True if setup successful.
"""
import shutil
# Find fuzz project in resources
source_fuzz_project = None
source_project_root = None
for resource in resources:
path = Path(resource.path)
if path.is_dir():
# Check for fuzz subdirectory
fuzz_dir = path / "fuzz"
if fuzz_dir.is_dir() and (fuzz_dir / "Cargo.toml").exists():
source_fuzz_project = fuzz_dir
source_project_root = path
break
# Or direct fuzz project
if (path / "Cargo.toml").exists() and (path / "fuzz_targets").is_dir():
source_fuzz_project = path
source_project_root = path.parent
break
if source_fuzz_project is None:
logger.error("no fuzz project found in resources")
return False
# Copy project to writable location since /data/input is read-only
# and cargo-fuzz needs to write corpus, artifacts, and build cache
work_dir = Path("/tmp/fuzz-work")
if work_dir.exists():
shutil.rmtree(work_dir)
# Copy the entire project root
work_project = work_dir / source_project_root.name
shutil.copytree(source_project_root, work_project, dirs_exist_ok=True)
# Update fuzz_project_path to point to the copied location
relative_fuzz = source_fuzz_project.relative_to(source_project_root)
self._fuzz_project_path = work_project / relative_fuzz
logger.info("using fuzz project", path=str(self._fuzz_project_path))
return True
def _get_fuzz_targets(self) -> list[str]:
"""Get list of fuzz target names.
:returns: List of target names.
"""
if self._fuzz_project_path is None:
return []
targets = []
fuzz_targets_dir = self._fuzz_project_path / "fuzz_targets"
if fuzz_targets_dir.is_dir():
for rs_file in fuzz_targets_dir.glob("*.rs"):
targets.append(rs_file.stem)
return targets
def _fuzz_target(self, target: str, duration: int) -> TargetResult:
"""Run fuzzing on a single target.
:param target: Name of the fuzz target.
:param duration: Maximum duration in seconds.
:returns: Fuzzing result for this target.
"""
logger.info("fuzzing target", target=target, duration=duration)
crashes: list[CrashInfo] = []
stats = FuzzingStats()
if self._fuzz_project_path is None:
return TargetResult(target=target, crashes=crashes, stats=stats)
# Create corpus directory for this target
corpus_dir = self._fuzz_project_path / "corpus" / target
corpus_dir.mkdir(parents=True, exist_ok=True)
# Build the command
cmd = [
"cargo", "+nightly", "fuzz", "run",
target,
"--",
]
# Add time limit
if duration > 0:
cmd.append(f"-max_total_time={duration}")
# Use fork mode to continue after crashes
# This makes libFuzzer restart worker after crash instead of exiting
cmd.append("-fork=1")
cmd.append("-ignore_crashes=1")
cmd.append("-print_final_stats=1")
# Add jobs if specified
if self._settings and self._settings.jobs > 1:
cmd.extend([f"-jobs={self._settings.jobs}"])
try:
env = os.environ.copy()
env["CARGO_INCREMENTAL"] = "0"
process = subprocess.Popen(
cmd,
cwd=self._fuzz_project_path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
env=env,
)
output_lines = []
start_time = time.time()
last_metrics_emit = 0.0
current_execs = 0
current_cov = 0
current_exec_s = 0
crash_count = 0
# Read output with timeout (skip timeout check in infinite mode)
while True:
if process.poll() is not None:
break
elapsed = time.time() - start_time
# Only enforce timeout if duration > 0 (not infinite mode)
if duration > 0 and elapsed > duration + 30: # Grace period
logger.warning("fuzzer timeout, terminating", target=target)
process.terminate()
try:
process.wait(timeout=10)
except subprocess.TimeoutExpired:
process.kill()
break
try:
if process.stdout:
line = process.stdout.readline()
if line:
output_lines.append(line)
# Parse real-time metrics from libFuzzer output
# Example: "#12345 NEW cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", line)
if exec_match:
current_execs = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", line)
if cov_match:
current_cov = int(cov_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", line)
if exec_s_match:
current_exec_s = int(exec_s_match.group(1))
# Check for crash indicators
if "SUMMARY:" in line or "ERROR:" in line or "crash-" in line.lower():
crash_count += 1
self.emit_event(
"crash_detected",
target=target,
crash_number=crash_count,
line=line.strip(),
)
logger.debug("fuzzer output", line=line.strip())
# Emit metrics periodically (every 2 seconds)
if elapsed - last_metrics_emit >= 2.0:
last_metrics_emit = elapsed
self.emit_event(
"metrics",
target=target,
executions=current_execs,
coverage=current_cov,
exec_per_sec=current_exec_s,
crashes=crash_count,
elapsed_seconds=int(elapsed),
remaining_seconds=max(0, duration - int(elapsed)),
)
except Exception:
pass
# Parse statistics from output
stats = self._parse_fuzzer_stats(output_lines)
# Collect crashes
crashes = self._collect_crashes(target)
# Emit final event for this target if crashes were found
if crashes:
self.emit_event(
"crashes_collected",
target=target,
count=len(crashes),
paths=[c.file_path for c in crashes],
)
except FileNotFoundError:
logger.error("cargo-fuzz not found, please install with: cargo install cargo-fuzz")
stats.error = "cargo-fuzz not installed"
self.emit_event("error", target=target, message="cargo-fuzz not installed")
except Exception as e:
logger.exception("fuzzing error", target=target, error=str(e))
stats.error = str(e)
self.emit_event("error", target=target, message=str(e))
return TargetResult(target=target, crashes=crashes, stats=stats)
def _parse_fuzzer_stats(self, output_lines: list[str]) -> FuzzingStats:
"""Parse fuzzer output for statistics.
:param output_lines: Lines of fuzzer output.
:returns: Parsed statistics.
"""
stats = FuzzingStats()
full_output = "".join(output_lines)
# Parse libFuzzer stats
# Example: "#12345 DONE cov: 100 ft: 50 corp: 25/1Kb exec/s: 1000"
exec_match = re.search(r"#(\d+)", full_output)
if exec_match:
stats.total_executions = int(exec_match.group(1))
cov_match = re.search(r"cov:\s*(\d+)", full_output)
if cov_match:
stats.coverage_edges = int(cov_match.group(1))
corp_match = re.search(r"corp:\s*(\d+)", full_output)
if corp_match:
stats.corpus_size = int(corp_match.group(1))
exec_s_match = re.search(r"exec/s:\s*(\d+)", full_output)
if exec_s_match:
stats.executions_per_second = int(exec_s_match.group(1))
return stats
def _collect_crashes(self, target: str) -> list[CrashInfo]:
"""Collect crash files from fuzzer output.
:param target: Name of the fuzz target.
:returns: List of crash info.
"""
crashes: list[CrashInfo] = []
if self._fuzz_project_path is None or self._crashes_path is None:
return crashes
# Check for crashes in the artifacts directory
artifacts_dir = self._fuzz_project_path / "artifacts" / target
if artifacts_dir.is_dir():
for crash_file in artifacts_dir.glob("crash-*"):
if crash_file.is_file():
# Copy crash to output
output_crash = self._crashes_path / target
output_crash.mkdir(parents=True, exist_ok=True)
dest = output_crash / crash_file.name
shutil.copy2(crash_file, dest)
# Read crash input
crash_data = crash_file.read_bytes()
crash_info = CrashInfo(
file_path=str(dest),
input_hash=crash_file.name,
input_size=len(crash_data),
)
crashes.append(crash_info)
logger.info("found crash", target=target, file=crash_file.name)
return crashes
def _write_output(self) -> None:
"""Write the fuzzing results to output."""
output_path = PATH_TO_OUTPUTS / "fuzzing_results.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
total_crashes = sum(len(r.crashes) for r in self._target_results)
total_execs = sum(r.stats.total_executions for r in self._target_results if r.stats)
output_data = {
"fuzz_project": str(self._fuzz_project_path),
"targets_fuzzed": len(self._target_results),
"total_crashes": total_crashes,
"total_executions": total_execs,
"crashes_path": str(self._crashes_path),
"results": [
{
"target": r.target,
"crashes": [c.model_dump() for c in r.crashes],
"stats": r.stats.model_dump() if r.stats else None,
}
for r in self._target_results
],
}
output_path.write_text(json.dumps(output_data, indent=2))
logger.info("wrote fuzzing results", path=str(output_path))

View File

@@ -0,0 +1,88 @@
"""Models for the cargo-fuzzer module."""
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class FuzzingStats(BaseModel):
"""Statistics from a fuzzing run."""
#: Total number of test case executions
total_executions: int = 0
#: Executions per second
executions_per_second: int = 0
#: Number of coverage edges discovered
coverage_edges: int = 0
#: Size of the corpus
corpus_size: int = 0
#: Any error message
error: str = ""
class CrashInfo(BaseModel):
"""Information about a discovered crash."""
#: Path to the crash input file
file_path: str
#: Hash/name of the crash input
input_hash: str
#: Size of the crash input in bytes
input_size: int = 0
#: Crash type (if identified)
crash_type: str = ""
#: Stack trace (if available)
stack_trace: str = ""
class TargetResult(BaseModel):
"""Result of fuzzing a single target."""
#: Name of the fuzz target
target: str
#: List of crashes found
crashes: list[CrashInfo] = Field(default_factory=list)
#: Fuzzing statistics
stats: FuzzingStats = Field(default_factory=FuzzingStats)
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the cargo-fuzzer module.
Expects:
- A fuzz project directory with validated harnesses
- Optionally the source crate to link against
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the cargo-fuzzer module."""
#: Path to the fuzz project
fuzz_project: str = ""
#: Number of targets fuzzed
targets_fuzzed: int = 0
#: Total crashes found across all targets
total_crashes: int = 0
#: Total executions across all targets
total_executions: int = 0
#: Path to collected crash files
crashes_path: str = ""
#: Results per target
results: list[TargetResult] = Field(default_factory=list)

View File

@@ -0,0 +1,35 @@
"""Settings for the cargo-fuzzer module."""
from typing import Optional
from pydantic import model_validator
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the cargo-fuzzer module."""
#: Maximum fuzzing duration in seconds (total across all targets)
#: Set to 0 for infinite/continuous mode
max_duration: int = 60
#: Number of parallel fuzzing jobs
jobs: int = 1
#: Maximum length of generated inputs
max_len: int = 4096
#: Whether to use AddressSanitizer
use_asan: bool = True
#: Specific targets to fuzz (empty = all targets)
targets: list[str] = []
#: Single target to fuzz (convenience alias for targets)
target: Optional[str] = None
@model_validator(mode="after")
def handle_single_target(self) -> "Settings":
"""Convert single target to targets list if provided."""
if self.target and self.target not in self.targets:
self.targets.append(self.target)
return self

View File

@@ -0,0 +1,9 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,32 @@
[project]
name = "crash-analyzer"
version = "0.1.0"
description = "FuzzForge module that analyzes fuzzing crashes and generates security reports"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
"jinja2==3.1.6",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,340 @@
"""Crash Analyzer module for FuzzForge.
This module analyzes crashes from cargo-fuzz, deduplicates them,
extracts stack traces, and triages them by severity.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, CrashAnalysis, Severity
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Crash Analyzer module - analyzes and triages fuzzer crashes."""
_settings: Settings | None
_analyses: list[CrashAnalysis]
_fuzz_project_path: Path | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "crash-analyzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._analyses = []
self._fuzz_project_path = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module.
:param settings: Module settings.
"""
self._settings = settings
logger.info("crash-analyzer preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the crash analyzer.
:param resources: Input resources (fuzzing results + crashes).
:returns: Module execution result.
"""
logger.info("crash-analyzer starting", resource_count=len(resources))
# Find crashes directory and fuzz project
crashes_path = None
for resource in resources:
path = Path(resource.path)
if path.is_dir():
if path.name == "crashes" or (path / "crashes").is_dir():
crashes_path = path if path.name == "crashes" else path / "crashes"
if (path / "fuzz_targets").is_dir():
self._fuzz_project_path = path
if (path / "fuzz" / "fuzz_targets").is_dir():
self._fuzz_project_path = path / "fuzz"
if crashes_path is None:
# Try to find crashes in fuzzing_results.json
for resource in resources:
path = Path(resource.path)
if path.name == "fuzzing_results.json" and path.exists():
with open(path) as f:
data = json.load(f)
if "crashes_path" in data:
crashes_path = Path(data["crashes_path"])
break
if crashes_path is None or not crashes_path.exists():
logger.warning("no crashes found to analyze")
self._write_output()
return FuzzForgeModuleResults.SUCCESS
logger.info("analyzing crashes", path=str(crashes_path))
# Analyze crashes per target
for target_dir in crashes_path.iterdir():
if target_dir.is_dir():
target = target_dir.name
for crash_file in target_dir.glob("crash-*"):
if crash_file.is_file():
analysis = self._analyze_crash(target, crash_file)
self._analyses.append(analysis)
# Deduplicate crashes
self._deduplicate_crashes()
# Write output
self._write_output()
unique_count = sum(1 for a in self._analyses if not a.is_duplicate)
logger.info("crash-analyzer completed",
total=len(self._analyses),
unique=unique_count)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _analyze_crash(self, target: str, crash_file: Path) -> CrashAnalysis:
"""Analyze a single crash.
:param target: Name of the fuzz target.
:param crash_file: Path to the crash input file.
:returns: Crash analysis result.
"""
logger.debug("analyzing crash", target=target, file=crash_file.name)
# Read crash input
crash_data = crash_file.read_bytes()
input_hash = hashlib.sha256(crash_data).hexdigest()[:16]
# Try to reproduce and get stack trace
stack_trace = ""
crash_type = "unknown"
severity = Severity.UNKNOWN
if self._fuzz_project_path:
stack_trace, crash_type = self._reproduce_crash(target, crash_file)
severity = self._determine_severity(crash_type, stack_trace)
return CrashAnalysis(
target=target,
input_file=str(crash_file),
input_hash=input_hash,
input_size=len(crash_data),
crash_type=crash_type,
severity=severity,
stack_trace=stack_trace,
is_duplicate=False,
)
def _reproduce_crash(self, target: str, crash_file: Path) -> tuple[str, str]:
"""Reproduce a crash to get stack trace.
:param target: Name of the fuzz target.
:param crash_file: Path to the crash input file.
:returns: Tuple of (stack_trace, crash_type).
"""
if self._fuzz_project_path is None:
return "", "unknown"
try:
env = os.environ.copy()
env["RUST_BACKTRACE"] = "1"
result = subprocess.run(
[
"cargo", "+nightly", "fuzz", "run",
target,
str(crash_file),
"--",
"-runs=1",
],
cwd=self._fuzz_project_path,
capture_output=True,
text=True,
timeout=30,
env=env,
)
output = result.stdout + result.stderr
# Extract crash type
crash_type = "unknown"
if "heap-buffer-overflow" in output.lower():
crash_type = "heap-buffer-overflow"
elif "stack-buffer-overflow" in output.lower():
crash_type = "stack-buffer-overflow"
elif "heap-use-after-free" in output.lower():
crash_type = "use-after-free"
elif "null" in output.lower() and "deref" in output.lower():
crash_type = "null-pointer-dereference"
elif "panic" in output.lower():
crash_type = "panic"
elif "assertion" in output.lower():
crash_type = "assertion-failure"
elif "timeout" in output.lower():
crash_type = "timeout"
elif "out of memory" in output.lower() or "oom" in output.lower():
crash_type = "out-of-memory"
# Extract stack trace
stack_lines = []
in_stack = False
for line in output.splitlines():
if "SUMMARY:" in line or "ERROR:" in line:
in_stack = True
if in_stack:
stack_lines.append(line)
if len(stack_lines) > 50: # Limit stack trace length
break
return "\n".join(stack_lines), crash_type
except subprocess.TimeoutExpired:
return "", "timeout"
except Exception as e:
logger.warning("failed to reproduce crash", error=str(e))
return "", "unknown"
def _determine_severity(self, crash_type: str, stack_trace: str) -> Severity:
"""Determine crash severity based on type and stack trace.
:param crash_type: Type of the crash.
:param stack_trace: Stack trace string.
:returns: Severity level.
"""
high_severity = [
"heap-buffer-overflow",
"stack-buffer-overflow",
"use-after-free",
"double-free",
]
medium_severity = [
"null-pointer-dereference",
"out-of-memory",
"integer-overflow",
]
low_severity = [
"panic",
"assertion-failure",
"timeout",
]
if crash_type in high_severity:
return Severity.HIGH
elif crash_type in medium_severity:
return Severity.MEDIUM
elif crash_type in low_severity:
return Severity.LOW
else:
return Severity.UNKNOWN
def _deduplicate_crashes(self) -> None:
"""Mark duplicate crashes based on stack trace similarity."""
seen_signatures: set[str] = set()
for analysis in self._analyses:
# Create a signature from crash type and key stack frames
signature = self._create_signature(analysis)
if signature in seen_signatures:
analysis.is_duplicate = True
else:
seen_signatures.add(signature)
def _create_signature(self, analysis: CrashAnalysis) -> str:
"""Create a unique signature for a crash.
:param analysis: Crash analysis.
:returns: Signature string.
"""
# Use crash type + first few significant stack frames
parts = [analysis.target, analysis.crash_type]
# Extract function names from stack trace
func_pattern = re.compile(r"in (\S+)")
funcs = func_pattern.findall(analysis.stack_trace)
# Use first 3 unique functions
seen = set()
for func in funcs:
if func not in seen and not func.startswith("std::"):
parts.append(func)
seen.add(func)
if len(seen) >= 3:
break
return "|".join(parts)
def _write_output(self) -> None:
"""Write the analysis results to output."""
output_path = PATH_TO_OUTPUTS / "crash_analysis.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
unique = [a for a in self._analyses if not a.is_duplicate]
duplicates = [a for a in self._analyses if a.is_duplicate]
# Group by severity
by_severity = {
"high": [a for a in unique if a.severity == Severity.HIGH],
"medium": [a for a in unique if a.severity == Severity.MEDIUM],
"low": [a for a in unique if a.severity == Severity.LOW],
"unknown": [a for a in unique if a.severity == Severity.UNKNOWN],
}
output_data = {
"total_crashes": len(self._analyses),
"unique_crashes": len(unique),
"duplicate_crashes": len(duplicates),
"severity_summary": {k: len(v) for k, v in by_severity.items()},
"unique_analyses": [a.model_dump() for a in unique],
"duplicate_analyses": [a.model_dump() for a in duplicates],
}
output_path.write_text(json.dumps(output_data, indent=2, default=str))
logger.info("wrote crash analysis", path=str(output_path))

View File

@@ -0,0 +1,79 @@
"""Models for the crash-analyzer module."""
from enum import Enum
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Severity(str, Enum):
"""Severity level of a crash."""
HIGH = "high"
MEDIUM = "medium"
LOW = "low"
UNKNOWN = "unknown"
class CrashAnalysis(BaseModel):
"""Analysis of a single crash."""
#: Name of the fuzz target
target: str
#: Path to the input file that caused the crash
input_file: str
#: Hash of the input for identification
input_hash: str
#: Size of the input in bytes
input_size: int = 0
#: Type of crash (e.g., "heap-buffer-overflow", "panic")
crash_type: str = "unknown"
#: Severity level
severity: Severity = Severity.UNKNOWN
#: Stack trace from reproducing the crash
stack_trace: str = ""
#: Whether this crash is a duplicate of another
is_duplicate: bool = False
#: Signature for deduplication
signature: str = ""
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the crash-analyzer module.
Expects:
- Crashes directory from cargo-fuzzer
- Optionally the fuzz project for reproduction
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the crash-analyzer module."""
#: Total number of crashes analyzed
total_crashes: int = 0
#: Number of unique crashes (after deduplication)
unique_crashes: int = 0
#: Number of duplicate crashes
duplicate_crashes: int = 0
#: Summary by severity
severity_summary: dict[str, int] = Field(default_factory=dict)
#: Unique crash analyses
unique_analyses: list[CrashAnalysis] = Field(default_factory=list)
#: Duplicate crash analyses
duplicate_analyses: list[CrashAnalysis] = Field(default_factory=list)

View File

@@ -0,0 +1,16 @@
"""Settings for the crash-analyzer module."""
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the crash-analyzer module."""
#: Whether to reproduce crashes for stack traces
reproduce_crashes: bool = True
#: Timeout for reproducing each crash (seconds)
reproduce_timeout: int = 30
#: Whether to deduplicate crashes
deduplicate: bool = True

View File

@@ -0,0 +1,9 @@
FROM localhost/fuzzforge-modules-sdk:0.0.1
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,31 @@
[project]
name = "fuzzforge-module-template"
version = "0.0.1"
description = "FIXME"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource, FuzzForgeModulesSettingsType
class Module(FuzzForgeModule):
"""TODO."""
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "FIXME"
version: str = "FIXME"
FuzzForgeModule.__init__(self, name=name, version=version)
@classmethod
def _get_input_type(cls) -> type[Input]:
"""TODO."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""TODO."""
return Output
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults: # noqa: ARG002
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""

View File

@@ -0,0 +1,11 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""TODO."""
class Output(FuzzForgeModuleOutputBase):
"""TODO."""

View File

@@ -0,0 +1,7 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""TODO."""
# Here goes your attributes

View File

@@ -0,0 +1,30 @@
# FuzzForge Modules SDK - Base image for all modules
#
# This image provides:
# - Python 3.14 with uv package manager
# - Pre-built wheels for common dependencies
# - Standard module directory structure
FROM ghcr.io/astral-sh/uv:python3.14-bookworm-slim
# Install system dependencies commonly needed by modules
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Set up application directory structure
WORKDIR /app
# Create FuzzForge standard directories
RUN mkdir -p /fuzzforge/input /fuzzforge/output
# Copy wheels directory (built by parent Makefile)
COPY .wheels /wheels
# Set up uv for the container
ENV UV_SYSTEM_PYTHON=1
ENV UV_COMPILE_BYTECODE=1
ENV UV_LINK_MODE=copy
# Default entrypoint - modules override this
ENTRYPOINT ["uv", "run", "module"]

View File

@@ -0,0 +1,39 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
SOURCES=./src
TESTS=./tests
FUZZFORGE_MODULE_TEMPLATE=$(PWD)/src/fuzzforge_modules_sdk/templates/module
.PHONY: bandit clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,67 @@
# FuzzForge Modules SDK
...
# Setup
- start the podman user socket
```shell
systemctl --user start podman.socket
```
NB : you can also automaticllay start it at boot
```shell
systemctl --user enable --now podman.socket
```
## HACK : fix missing `fuzzforge-modules-sdk`
- if you have this error when using some fuzzforge-modules-sdk deps :
```shell
make format
uv run ruff format ./src ./tests
× No solution found when resolving dependencies:
╰─▶ Because fuzzforge-modules-sdk was not found in the package registry and your project depends on fuzzforge-modules-sdk==0.0.1, we can
conclude that your project's requirements are unsatisfiable.
And because your project requires opengrep[lints], we can conclude that your project's requirements are unsatisfiable.
make: *** [Makefile:30: format] Error 1
```
- build a wheel package of fuzzforge-modules-sdk
```shell
cd fuzzforge_ng/fuzzforge-modules/fuzzforge-modules-sdk
uv build
```
- then inside your module project, install it
```shell
cd fuzzforge_ng_modules/mymodule
uv sync --all-extras --find-links ../../fuzzforge_ng/dist/
```
# Usage
## Prepare
- enter venv (or use uv run)
```shell
source .venv/bin/activate
```
- create a new module
```shell
fuzzforge-modules-sdk new module --name my_new_module --directory ../fuzzforge_ng_modules/
```
- build the base image
```shell
fuzzforge-modules-sdk build image
```

View File

@@ -0,0 +1,7 @@
[mypy]
exclude = ^src/fuzzforge_modules_sdk/templates/.*
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,31 @@
[project]
name = "fuzzforge-modules-sdk"
version = "0.0.1"
description = "Software development kit (SDK) for FuzzForge's modules."
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"podman==5.6.0",
"pydantic==2.12.4",
"tomlkit==0.13.3",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
fuzzforge-modules-sdk = "fuzzforge_modules_sdk._cli.main:main"
[tool.setuptools.package-data]
fuzzforge_modules_sdk = [
"assets/**/*",
"templates/**/*",
]

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,66 @@
from importlib.resources import files
from pathlib import Path
from shutil import copyfile, copytree
from tempfile import TemporaryDirectory
from typing import TYPE_CHECKING, Literal
import os
from podman import PodmanClient
from tomlkit import TOMLDocument, parse
if TYPE_CHECKING:
from importlib.resources.abc import Traversable
def _get_default_podman_socket() -> str:
"""Get the default Podman socket path for the current user."""
uid = os.getuid()
return f"unix:///run/user/{uid}/podman/podman.sock"
PATH_TO_SOURCES: Path = Path(__file__).parent.parent
def _build_podman_image(directory: Path, tag: str, socket: str | None = None) -> None:
if socket is None:
socket = _get_default_podman_socket()
with PodmanClient(base_url=socket) as client:
client.images.build(
dockerfile="Dockerfile",
nocache=True,
path=directory,
tag=tag,
)
def build_base_image(engine: Literal["podman"], socket: str | None = None) -> None:
with TemporaryDirectory() as directory:
path_to_assets: Traversable = files("fuzzforge_modules_sdk").joinpath("assets")
copyfile(
src=str(path_to_assets.joinpath("Dockerfile")),
dst=Path(directory).joinpath("Dockerfile"),
)
copyfile(
src=str(path_to_assets.joinpath("pyproject.toml")),
dst=Path(directory).joinpath("pyproject.toml"),
)
copytree(src=str(PATH_TO_SOURCES), dst=Path(directory).joinpath("src").joinpath(PATH_TO_SOURCES.name))
# update the file 'pyproject.toml'
path: Path = Path(directory).joinpath("pyproject.toml")
data: TOMLDocument = parse(path.read_text())
name: str = data["project"]["name"] # type: ignore[assignment, index]
version: str = data["project"]["version"] # type: ignore[assignment, index]
tag: str = f"{name}:{version}"
match engine:
case "podman":
_build_podman_image(
directory=Path(directory),
socket=socket,
tag=tag,
)
case _:
message: str = f"unsupported engine '{engine}'"
raise Exception(message) # noqa: TRY002

View File

@@ -0,0 +1,30 @@
from __future__ import annotations
from importlib.resources import files
from shutil import copytree, ignore_patterns
from typing import TYPE_CHECKING
from tomlkit import dumps, parse
if TYPE_CHECKING:
from importlib.resources.abc import Traversable
from pathlib import Path
from tomlkit import TOMLDocument
def create_new_module(name: str, directory: Path) -> None:
source: Traversable = files("fuzzforge_modules_sdk").joinpath("templates").joinpath("fuzzforge-module-template")
destination: Path = directory.joinpath(name) # TODO: sanitize path
copytree(
src=str(source),
dst=destination,
ignore=ignore_patterns("__pycache__", "*.egg-info", "*.pyc", ".mypy_cache", ".ruff_cache", ".venv"),
)
# update the file 'pyproject.toml'
path: Path = destination.joinpath("pyproject.toml")
data: TOMLDocument = parse(path.read_text())
data["project"]["name"] = name # type: ignore[index]
del data["tool"]["uv"]["sources"] # type: ignore[index, union-attr]
path.write_text(dumps(data))

View File

@@ -0,0 +1,71 @@
from argparse import ArgumentParser
from pathlib import Path
from fuzzforge_modules_sdk._cli.build_base_image import build_base_image
from fuzzforge_modules_sdk._cli.create_new_module import create_new_module
def create_parser() -> ArgumentParser:
parser: ArgumentParser = ArgumentParser(
prog="fuzzforge-modules-sdk", description="Utilities for the Fuzzforge Modules SDK."
)
subparsers = parser.add_subparsers(required=True)
# fuzzforge-modules-sdk build ...
parser_build = subparsers.add_parser(name="build")
subparsers_build = parser_build.add_subparsers(required=True)
# fuzzforge-modules-sdk build image ...
parser_build_image = subparsers_build.add_parser(
name="image",
help="Build the image.",
)
parser_build_image.add_argument(
"--engine",
default="podman",
)
parser_build_image.add_argument(
"--socket",
default=None,
)
parser_build_image.set_defaults(
function_to_execute=build_base_image,
)
# fuzzforge-modules-sdk new ...
parser_new = subparsers.add_parser(name="new")
subparsers_new = parser_new.add_subparsers(required=True)
# fuzzforge-modules-sdk new module ...
parser_new_module = subparsers_new.add_parser(
name="module",
help="Generate the boilerplate required to create a new module.",
)
parser_new_module.add_argument(
"--name",
help="The name of the module to create.",
required=True,
)
parser_new_module.add_argument(
"--directory",
default=".",
type=Path,
help="The directory the new module should be created into (defaults to current working directory).",
)
parser_new_module.set_defaults(
function_to_execute=create_new_module,
)
return parser
def main() -> None:
"""Entry point for the command-line interface."""
parser: ArgumentParser = create_parser()
arguments = parser.parse_args()
function_to_execute = arguments.function_to_execute
del arguments.function_to_execute
function_to_execute(**vars(arguments))

View File

@@ -0,0 +1,13 @@
from pathlib import Path
PATH_TO_DATA: Path = Path("/data")
PATH_TO_INPUTS: Path = PATH_TO_DATA.joinpath("input")
PATH_TO_INPUT: Path = PATH_TO_INPUTS.joinpath("input.json")
PATH_TO_OUTPUTS: Path = PATH_TO_DATA.joinpath("output")
PATH_TO_ARTIFACTS: Path = PATH_TO_OUTPUTS.joinpath("artifacts")
PATH_TO_RESULTS: Path = PATH_TO_OUTPUTS.joinpath("results.json")
PATH_TO_LOGS: Path = PATH_TO_OUTPUTS.joinpath("logs.jsonl")
# Streaming output paths for real-time progress
PATH_TO_PROGRESS: Path = PATH_TO_OUTPUTS.joinpath("progress.json")
PATH_TO_STREAM: Path = PATH_TO_OUTPUTS.joinpath("stream.jsonl")

View File

@@ -0,0 +1,2 @@
class FuzzForgeModuleError(Exception):
"""TODO."""

View File

@@ -0,0 +1,43 @@
import logging
import sys
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_LOGS
class Formatter(logging.Formatter):
"""TODO."""
def format(self, record: logging.LogRecord) -> str:
"""TODO."""
record.exc_info = None
return super().format(record)
def configure() -> None:
"""TODO."""
fmt: str = "%(message)s"
level = logging.DEBUG
PATH_TO_LOGS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_LOGS.unlink(missing_ok=True)
handler_file = logging.FileHandler(filename=PATH_TO_LOGS, mode="a")
handler_file.setFormatter(fmt=Formatter(fmt=fmt))
handler_file.setLevel(level=level)
handler_stderr = logging.StreamHandler(stream=sys.stderr)
handler_stderr.setFormatter(fmt=Formatter(fmt=fmt))
handler_stderr.setLevel(level=level)
logger: logging.Logger = logging.getLogger()
logger.setLevel(level=level)
logger.addHandler(handler_file)
logger.addHandler(handler_stderr)
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.dict_tracebacks,
structlog.processors.JSONRenderer(),
],
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
)

View File

@@ -0,0 +1,85 @@
from enum import StrEnum
from pathlib import Path # noqa: TC003 (required by pydantic at runtime)
from typing import TypeVar
from pydantic import BaseModel, ConfigDict
class Base(BaseModel):
"""TODO."""
model_config = ConfigDict(extra="forbid")
class FuzzForgeModulesSettingsBase(Base):
"""TODO."""
FuzzForgeModulesSettingsType = TypeVar("FuzzForgeModulesSettingsType", bound=FuzzForgeModulesSettingsBase)
class FuzzForgeModuleResources(StrEnum):
"""Enumeration of artifact types."""
#: The type of the resource is unknown or irrelevant.
UNKNOWN = "unknown"
class FuzzForgeModuleResource(Base):
"""TODO."""
#: The description of the resource.
description: str
#: The type of the resource.
kind: FuzzForgeModuleResources
#: The name of the resource.
name: str
#: The path of the resource on disk.
path: Path
class FuzzForgeModuleInputBase[FuzzForgeModulesSettingsType: FuzzForgeModulesSettingsBase](Base):
"""The (standardized) input of a FuzzForge module."""
#: The collection of resources given to the module as inputs.
resources: list[FuzzForgeModuleResource]
#: The settings of the module.
settings: FuzzForgeModulesSettingsType
class FuzzForgeModuleArtifacts(StrEnum):
"""Enumeration of artifact types."""
#: The artifact is an asset.
ASSET = "asset"
class FuzzForgeModuleArtifact(Base):
"""An artifact generated by the module during its run."""
#: The description of the artifact.
description: str
#: The type of the artifact.
kind: FuzzForgeModuleArtifacts
#: The name of the artifact.
name: str
#: The path to the artifact on disk.
path: Path
class FuzzForgeModuleResults(StrEnum):
"""TODO."""
SUCCESS = "success"
FAILURE = "failure"
class FuzzForgeModuleOutputBase(Base):
"""The (standardized) output of a FuzzForge module."""
#: The collection of artifacts generated by the module during its run.
artifacts: list[FuzzForgeModuleArtifacts]
#: The path to the logs.
logs: Path
#: The result of the module's run.
result: FuzzForgeModuleResults

View File

@@ -0,0 +1,288 @@
from abc import ABC, abstractmethod
import json
import time
from datetime import datetime, timezone
from shutil import rmtree
from typing import TYPE_CHECKING, Any, Final, final
from structlog import get_logger
from fuzzforge_modules_sdk.api.constants import (
PATH_TO_ARTIFACTS,
PATH_TO_INPUT,
PATH_TO_LOGS,
PATH_TO_PROGRESS,
PATH_TO_RESULTS,
PATH_TO_STREAM,
)
from fuzzforge_modules_sdk.api.exceptions import FuzzForgeModuleError
from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleArtifact,
FuzzForgeModuleArtifacts,
FuzzForgeModuleInputBase,
FuzzForgeModuleOutputBase,
FuzzForgeModuleResource,
FuzzForgeModuleResults,
FuzzForgeModulesSettingsType,
)
if TYPE_CHECKING:
from pathlib import Path
from structlog.stdlib import BoundLogger
class FuzzForgeModule(ABC):
"""FuzzForge Modules' base."""
__artifacts: dict[str, FuzzForgeModuleArtifact]
#: The logger associated with the module.
__logger: Final[BoundLogger]
#: The name of the module.
__name: Final[str]
#: The version of the module.
__version: Final[str]
#: Start time for progress tracking.
__start_time: float
#: Custom output data set by the module.
__output_data: dict[str, Any]
def __init__(self, name: str, version: str) -> None:
"""Initialize an instance of the class.
:param name: The name of the module.
:param version: The version of the module.
"""
self.__artifacts = {}
self.__logger = get_logger("module")
self.__name = name
self.__version = version
self.__start_time = time.time()
self.__output_data = {}
# Initialize streaming output files
PATH_TO_PROGRESS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_STREAM.parent.mkdir(exist_ok=True, parents=True)
@final
def get_logger(self) -> BoundLogger:
"""Return the logger associated with the module."""
return self.__logger
@final
def get_name(self) -> str:
"""Return the name of the module."""
return self.__name
@final
def get_version(self) -> str:
"""Return the version of the module."""
return self.__version
@final
def set_output(self, **kwargs: Any) -> None:
"""Set custom output data to be included in results.json.
Call this from _run() to add module-specific fields to the output.
:param kwargs: Key-value pairs to include in the output.
Example:
self.set_output(
total_targets=4,
valid_targets=["target1", "target2"],
results=[...]
)
"""
self.__output_data.update(kwargs)
@final
def emit_progress(
self,
progress: int,
status: str = "running",
message: str = "",
metrics: dict[str, Any] | None = None,
current_task: str = "",
) -> None:
"""Emit a progress update to the progress file.
This method writes to /data/output/progress.json which can be polled
by the orchestrator or UI to show real-time progress.
:param progress: Progress percentage (0-100).
:param status: Current status ("initializing", "running", "completed", "failed").
:param message: Human-readable status message.
:param metrics: Dictionary of metrics (e.g., {"executions": 1000, "coverage": 50}).
:param current_task: Name of the current task being performed.
"""
elapsed = time.time() - self.__start_time
progress_data = {
"module": self.__name,
"version": self.__version,
"status": status,
"progress": max(0, min(100, progress)),
"message": message,
"current_task": current_task,
"elapsed_seconds": round(elapsed, 2),
"timestamp": datetime.now(timezone.utc).isoformat(),
"metrics": metrics or {},
}
PATH_TO_PROGRESS.write_text(json.dumps(progress_data, indent=2))
@final
def emit_event(self, event: str, **data: Any) -> None:
"""Emit a streaming event to the stream file.
This method appends to /data/output/stream.jsonl which can be tailed
by the orchestrator or UI for real-time event streaming.
:param event: Event type (e.g., "crash_found", "target_started", "metrics").
:param data: Additional event data as keyword arguments.
"""
elapsed = time.time() - self.__start_time
event_data = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"elapsed_seconds": round(elapsed, 2),
"module": self.__name,
"event": event,
**data,
}
# Append to stream file (create if doesn't exist)
with PATH_TO_STREAM.open("a") as f:
f.write(json.dumps(event_data) + "\n")
@final
def get_elapsed_seconds(self) -> float:
"""Return the elapsed time since module start.
:returns: Elapsed time in seconds.
"""
return time.time() - self.__start_time
@final
def _register_artifact(self, name: str, kind: FuzzForgeModuleArtifacts, description: str, path: Path) -> None:
"""Register an artifact.
:param name: The name of the artifact.
:param kind: The type of the artifact.
:param description: The description of the artifact.
:param path: The path of the artifact on the file system.
"""
source: Path = path.resolve(strict=True)
destination: Path = PATH_TO_ARTIFACTS.joinpath(name).resolve()
if destination.parent != PATH_TO_ARTIFACTS:
message: str = f"path '{destination} is not a direct descendant of path '{PATH_TO_ARTIFACTS}'"
raise FuzzForgeModuleError(message)
if destination.exists(follow_symlinks=False):
if destination.is_file() or destination.is_symlink():
destination.unlink()
elif destination.is_dir():
rmtree(destination)
else:
message = f"unable to remove resource at path '{destination}': unsupported resource type"
raise FuzzForgeModuleError(message)
destination.parent.mkdir(exist_ok=True, parents=True)
source.copy(destination)
self.__artifacts[name] = FuzzForgeModuleArtifact(
description=description,
kind=kind,
name=name,
path=path,
)
@final
def main(self) -> None:
"""TODO."""
result = FuzzForgeModuleResults.SUCCESS
try:
buffer: bytes = PATH_TO_INPUT.read_bytes()
data = self._get_input_type().model_validate_json(buffer)
self._prepare(settings=data.settings)
except: # noqa: E722
self.get_logger().exception(event="exception during 'prepare' step")
result = FuzzForgeModuleResults.FAILURE
if result != FuzzForgeModuleResults.FAILURE:
try:
result = self._run(resources=data.resources)
except: # noqa: E722
self.get_logger().exception(event="exception during 'run' step")
result = FuzzForgeModuleResults.FAILURE
if result != FuzzForgeModuleResults.FAILURE:
try:
self._cleanup(settings=data.settings)
except: # noqa: E722
self.get_logger().exception(event="exception during 'cleanup' step")
output = self._get_output_type()(
artifacts=list(self.__artifacts.values()),
logs=PATH_TO_LOGS,
result=result,
**self.__output_data,
)
buffer = output.model_dump_json().encode("utf-8")
PATH_TO_RESULTS.parent.mkdir(exist_ok=True, parents=True)
PATH_TO_RESULTS.write_bytes(buffer)
@classmethod
@abstractmethod
def _get_input_type(cls) -> type[FuzzForgeModuleInputBase[Any]]:
"""TODO."""
message: str = f"method '_get_input_type' is not implemented for class '{cls.__name__}'"
raise NotImplementedError(message)
@classmethod
@abstractmethod
def _get_output_type(cls) -> type[FuzzForgeModuleOutputBase]:
"""TODO."""
message: str = f"method '_get_output_type' is not implemented for class '{cls.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
message: str = f"method '_prepare' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
message: str = f"method '_run' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)
@abstractmethod
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
message: str = f"method '_cleanup' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)

View File

@@ -0,0 +1,20 @@
FROM docker.io/debian:trixie as base
COPY --from=ghcr.io/astral-sh/uv:0.9.10 /uv /uvx /bin/
FROM base as builder
WORKDIR /sdk
COPY ./src /sdk/src
COPY ./pyproject.toml /sdk/pyproject.toml
RUN uv build --wheel -o /sdk/distributions
FROM base as final
COPY --from=builder /sdk/distributions /wheels
WORKDIR /app
CMD [ "/usr/bin/sleep", "infinity" ]

View File

@@ -0,0 +1 @@
../../../pyproject.toml

View File

@@ -0,0 +1,9 @@
FROM localhost/fuzzforge-modules-sdk:0.0.1
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,31 @@
[project]
name = "fuzzforge-module-template"
version = "0.0.1"
description = "FIXME"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource, FuzzForgeModulesSettingsType
class Module(FuzzForgeModule):
"""TODO."""
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "FIXME"
version: str = "FIXME"
FuzzForgeModule.__init__(self, name=name, version=version)
@classmethod
def _get_input_type(cls) -> type[Input]:
"""TODO."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""TODO."""
return Output
def _prepare(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults: # noqa: ARG002
"""TODO.
:param resources: TODO.
:returns: TODO.
"""
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: FuzzForgeModulesSettingsType) -> None:
"""TODO.
:param settings: TODO.
"""

View File

@@ -0,0 +1,11 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""TODO."""
class Output(FuzzForgeModuleOutputBase):
"""TODO."""

View File

@@ -0,0 +1,7 @@
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""TODO."""
# Here goes your attributes

View File

@@ -0,0 +1,23 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Install build tools and Rust nightly for compiling fuzz harnesses
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz for validation
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,31 @@
[project]
name = "harness-validator"
version = "0.1.0"
description = "FuzzForge module that validates fuzz harnesses compile correctly"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,309 @@
"""Harness Validator module for FuzzForge.
This module validates that fuzz harnesses compile correctly.
It takes a Rust project with a fuzz directory containing harnesses
and runs cargo build to verify they compile.
"""
from __future__ import annotations
import json
import subprocess
import os
from pathlib import Path
from typing import TYPE_CHECKING
import structlog
from fuzzforge_modules_sdk.api.constants import PATH_TO_INPUTS, PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import Input, Output, ValidationResult, HarnessStatus
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
logger = structlog.get_logger()
class Module(FuzzForgeModule):
"""Harness Validator module - validates that fuzz harnesses compile."""
_settings: Settings | None
_results: list[ValidationResult]
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "harness-validator"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self._results = []
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module.
:param settings: Module settings.
"""
self._settings = settings
logger.info("harness-validator preparing", settings=settings.model_dump() if settings else {})
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the harness validator.
:param resources: Input resources (fuzz project directory).
:returns: Module execution result.
"""
logger.info("harness-validator starting", resource_count=len(resources))
# Find the fuzz project directory
fuzz_project_src = self._find_fuzz_project(resources)
if fuzz_project_src is None:
logger.error("No fuzz project found in resources")
return FuzzForgeModuleResults.FAILURE
logger.info("Found fuzz project", path=str(fuzz_project_src))
# Copy the project to a writable location since /data/input is read-only
# and cargo needs to write Cargo.lock and build artifacts
import shutil
work_dir = Path("/tmp/fuzz-build")
if work_dir.exists():
shutil.rmtree(work_dir)
# Copy entire project root (parent of fuzz directory)
project_root = fuzz_project_src.parent
work_project = work_dir / project_root.name
shutil.copytree(project_root, work_project, dirs_exist_ok=True)
# Adjust fuzz_project to point to the copied location
fuzz_project = work_dir / project_root.name / fuzz_project_src.name
logger.info("Copied project to writable location", work_dir=str(fuzz_project))
# Find all harness targets
targets = self._find_harness_targets(fuzz_project)
if not targets:
logger.error("No harness targets found")
return FuzzForgeModuleResults.FAILURE
logger.info("Found harness targets", count=len(targets))
# Validate each harness
all_valid = True
for target in targets:
result = self._validate_harness(fuzz_project, target)
self._results.append(result)
if result.status != HarnessStatus.VALID:
all_valid = False
logger.warning("Harness validation failed",
target=target,
status=result.status.value,
errors=result.errors)
else:
logger.info("Harness valid", target=target)
# Set output data for results.json
valid_targets = [r.target for r in self._results if r.status == HarnessStatus.VALID]
invalid_targets = [r.target for r in self._results if r.status != HarnessStatus.VALID]
self.set_output(
fuzz_project=str(fuzz_project),
total_targets=len(self._results),
valid_count=len(valid_targets),
invalid_count=len(invalid_targets),
valid_targets=valid_targets,
invalid_targets=invalid_targets,
results=[r.model_dump() for r in self._results],
)
valid_count = sum(1 for r in self._results if r.status == HarnessStatus.VALID)
logger.info("harness-validator completed",
total=len(self._results),
valid=valid_count,
invalid=len(self._results) - valid_count)
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass
def _find_fuzz_project(self, resources: list[FuzzForgeModuleResource]) -> Path | None:
"""Find the fuzz project directory in the resources.
:param resources: List of input resources.
:returns: Path to fuzz project or None.
"""
for resource in resources:
path = Path(resource.path)
# Check if it's a fuzz directory with Cargo.toml
if path.is_dir():
cargo_toml = path / "Cargo.toml"
if cargo_toml.exists():
# Check if it has fuzz_targets directory
fuzz_targets = path / "fuzz_targets"
if fuzz_targets.is_dir():
return path
# Check for fuzz subdirectory
fuzz_dir = path / "fuzz"
if fuzz_dir.is_dir():
cargo_toml = fuzz_dir / "Cargo.toml"
if cargo_toml.exists():
return fuzz_dir
return None
def _find_harness_targets(self, fuzz_project: Path) -> list[str]:
"""Find all harness target names in the fuzz project.
:param fuzz_project: Path to the fuzz project.
:returns: List of target names.
"""
targets = []
fuzz_targets_dir = fuzz_project / "fuzz_targets"
if fuzz_targets_dir.is_dir():
for rs_file in fuzz_targets_dir.glob("*.rs"):
# Target name is the file name without extension
target_name = rs_file.stem
targets.append(target_name)
return targets
def _validate_harness(self, fuzz_project: Path, target: str) -> ValidationResult:
"""Validate a single harness by compiling it.
:param fuzz_project: Path to the fuzz project.
:param target: Name of the harness target.
:returns: Validation result.
"""
harness_file = fuzz_project / "fuzz_targets" / f"{target}.rs"
if not harness_file.exists():
return ValidationResult(
target=target,
file_path=str(harness_file),
status=HarnessStatus.NOT_FOUND,
errors=["Harness file not found"],
)
# Try to compile just this target
try:
env = os.environ.copy()
env["CARGO_INCREMENTAL"] = "0"
result = subprocess.run(
[
"cargo", "build",
"--bin", target,
"--message-format=json",
],
cwd=fuzz_project,
capture_output=True,
text=True,
timeout=self._settings.compile_timeout if self._settings else 120,
env=env,
)
# Parse cargo output for errors
errors = []
warnings = []
for line in result.stdout.splitlines():
try:
msg = json.loads(line)
if msg.get("reason") == "compiler-message":
message = msg.get("message", {})
level = message.get("level", "")
rendered = message.get("rendered", "")
if level == "error":
errors.append(rendered.strip())
elif level == "warning":
warnings.append(rendered.strip())
except json.JSONDecodeError:
pass
# Also check stderr for any cargo errors
if result.returncode != 0 and not errors:
errors.append(result.stderr.strip() if result.stderr else "Build failed with unknown error")
if result.returncode == 0:
return ValidationResult(
target=target,
file_path=str(harness_file),
status=HarnessStatus.VALID,
errors=[],
warnings=warnings,
)
else:
return ValidationResult(
target=target,
file_path=str(harness_file),
status=HarnessStatus.COMPILE_ERROR,
errors=errors,
warnings=warnings,
)
except subprocess.TimeoutExpired:
return ValidationResult(
target=target,
file_path=str(harness_file),
status=HarnessStatus.TIMEOUT,
errors=["Compilation timed out"],
)
except Exception as e:
return ValidationResult(
target=target,
file_path=str(harness_file),
status=HarnessStatus.ERROR,
errors=[str(e)],
)
def _write_output(self, fuzz_project: Path) -> None:
"""Write the validation results to output.
:param fuzz_project: Path to the fuzz project.
"""
output_path = PATH_TO_OUTPUTS / "validation.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
valid_targets = [r.target for r in self._results if r.status == HarnessStatus.VALID]
invalid_targets = [r.target for r in self._results if r.status != HarnessStatus.VALID]
output_data = {
"fuzz_project": str(fuzz_project),
"total_targets": len(self._results),
"valid_count": len(valid_targets),
"invalid_count": len(invalid_targets),
"valid_targets": valid_targets,
"invalid_targets": invalid_targets,
"results": [r.model_dump() for r in self._results],
}
output_path.write_text(json.dumps(output_data, indent=2))
logger.info("wrote validation results", path=str(output_path))

View File

@@ -0,0 +1,71 @@
"""Models for the harness-validator module."""
from enum import Enum
from pydantic import BaseModel, Field
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class HarnessStatus(str, Enum):
"""Status of harness validation."""
VALID = "valid"
COMPILE_ERROR = "compile_error"
NOT_FOUND = "not_found"
TIMEOUT = "timeout"
ERROR = "error"
class ValidationResult(BaseModel):
"""Result of validating a single harness."""
#: Name of the harness target
target: str
#: Path to the harness file
file_path: str
#: Validation status
status: HarnessStatus
#: Compilation errors (if any)
errors: list[str] = Field(default_factory=list)
#: Compilation warnings (if any)
warnings: list[str] = Field(default_factory=list)
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the harness-validator module.
Expects a fuzz project directory with:
- Cargo.toml
- fuzz_targets/ directory with .rs harness files
"""
class Output(FuzzForgeModuleOutputBase):
"""Output from the harness-validator module."""
#: Path to the fuzz project
fuzz_project: str = ""
#: Total number of harness targets
total_targets: int = 0
#: Number of valid (compilable) harnesses
valid_count: int = 0
#: Number of invalid harnesses
invalid_count: int = 0
#: List of valid target names (ready for fuzzing)
valid_targets: list[str] = Field(default_factory=list)
#: List of invalid target names (need fixes)
invalid_targets: list[str] = Field(default_factory=list)
#: Detailed validation results per target
results: list[ValidationResult] = Field(default_factory=list)

View File

@@ -0,0 +1,13 @@
"""Settings for the harness-validator module."""
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the harness-validator module."""
#: Timeout for compiling each harness (seconds)
compile_timeout: int = 120
#: Whether to stop on first error
fail_fast: bool = False

View File

@@ -0,0 +1,25 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
# Install Rust toolchain
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y
ENV PATH="/root/.cargo/bin:${PATH}"
# Install Rust analysis tools (skipping cargo-geiger as it's heavy)
# RUN cargo install cargo-geiger --locked || true
RUN cargo install cargo-audit --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,45 @@
PACKAGE=$(word 1, $(shell uv version))
VERSION=$(word 2, $(shell uv version))
PODMAN?=/usr/bin/podman
SOURCES=./src
TESTS=./tests
.PHONY: bandit build clean format mypy pytest ruff version
bandit:
uv run bandit --recursive $(SOURCES)
build:
$(PODMAN) build --file ./Dockerfile --no-cache --tag $(PACKAGE):$(VERSION)
save: build
$(PODMAN) save --format oci-archive --output /tmp/$(PACKAGE)-$(VERSION).oci $(PACKAGE):$(VERSION)
clean:
@find . -type d \( \
-name '*.egg-info' \
-o -name '.mypy_cache' \
-o -name '.pytest_cache' \
-o -name '.ruff_cache' \
-o -name '__pycache__' \
\) -printf 'removing directory %p\n' -exec rm -rf {} +
cloc:
cloc $(SOURCES)
format:
uv run ruff format $(SOURCES) $(TESTS)
mypy:
uv run mypy $(SOURCES)
pytest:
uv run pytest $(TESTS)
ruff:
uv run ruff check --fix $(SOURCES) $(TESTS)
version:
@echo '$(PACKAGE)@$(VERSION)'

View File

@@ -0,0 +1,46 @@
# FuzzForge Modules - FIXME
## Installation
### Python
```shell
# install the package (users)
uv sync
# install the package and all development dependencies (developers)
uv sync --all-extras
```
### Container
```shell
# build the image
make build
# run the container
mkdir -p "${PWD}/data" "${PWD}/data/input" "${PWD}/data/output"
echo '{"settings":{},"resources":[]}' > "${PWD}/data/input/input.json"
podman run --rm \
--volume "${PWD}/data:/data" \
'<name>:<version>' 'uv run module'
```
## Usage
```shell
uv run module
```
## Development tools
```shell
# run ruff (formatter)
make format
# run mypy (type checker)
make mypy
# run tests (pytest)
make pytest
# run ruff (linter)
make ruff
```
See the file `Makefile` at the root of this directory for more tools.

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,28 @@
[project]
name = "rust-analyzer"
version = "0.0.1"
description = "FIXME"
authors = []
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.optional-dependencies]
lints = [
"bandit==1.8.6",
"mypy==1.18.2",
"ruff==0.14.4",
]
tests = [
"pytest==9.0.2",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv]
package = true

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api import logs
from module.mod import Module
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
def main() -> None:
"""TODO."""
logs.configure()
module: FuzzForgeModule = Module()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,314 @@
"""Rust Analyzer module for FuzzForge.
This module analyzes Rust source code to identify fuzzable entry points,
unsafe blocks, and known vulnerabilities.
"""
from __future__ import annotations
import json
import re
import subprocess
from pathlib import Path
from typing import TYPE_CHECKING
from fuzzforge_modules_sdk.api.constants import PATH_TO_OUTPUTS
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResults
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.models import AnalysisResult, EntryPoint, Input, Output, UnsafeBlock, Vulnerability
from module.settings import Settings
if TYPE_CHECKING:
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleResource
class Module(FuzzForgeModule):
"""Rust Analyzer module - analyzes Rust code for fuzzable entry points."""
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "rust-analyzer"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._project_path: Path | None = None
self._settings: Settings | None = None
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module.
:param settings: Module settings.
"""
self._settings = settings
def _find_cargo_toml(self, resources: list[FuzzForgeModuleResource]) -> Path | None:
"""Find the Cargo.toml file in the resources.
:param resources: List of input resources.
:returns: Path to Cargo.toml or None.
"""
for resource in resources:
if resource.path.name == "Cargo.toml":
return resource.path
# Check if resource is a directory containing Cargo.toml
cargo_path = resource.path / "Cargo.toml"
if cargo_path.exists():
return cargo_path
return None
def _parse_cargo_toml(self, cargo_path: Path) -> tuple[str, str, str]:
"""Parse Cargo.toml to extract crate name, version, and lib name.
:param cargo_path: Path to Cargo.toml.
:returns: Tuple of (crate_name, version, lib_name).
"""
import tomllib
with cargo_path.open("rb") as f:
data = tomllib.load(f)
package = data.get("package", {})
crate_name = package.get("name", "unknown")
version = package.get("version", "0.0.0")
# Get lib name - defaults to crate name with dashes converted to underscores
lib_section = data.get("lib", {})
lib_name = lib_section.get("name", crate_name.replace("-", "_"))
return crate_name, version, lib_name
def _find_entry_points(self, project_path: Path) -> list[EntryPoint]:
"""Find fuzzable entry points in the Rust source.
:param project_path: Path to the Rust project.
:returns: List of entry points.
"""
entry_points: list[EntryPoint] = []
# Patterns for fuzzable functions (take &[u8], &str, or impl Read)
fuzzable_patterns = [
r"pub\s+fn\s+(\w+)\s*\([^)]*&\[u8\][^)]*\)",
r"pub\s+fn\s+(\w+)\s*\([^)]*&str[^)]*\)",
r"pub\s+fn\s+(\w+)\s*\([^)]*impl\s+Read[^)]*\)",
r"pub\s+fn\s+(\w+)\s*\([^)]*data:\s*&\[u8\][^)]*\)",
r"pub\s+fn\s+(\w+)\s*\([^)]*input:\s*&\[u8\][^)]*\)",
r"pub\s+fn\s+(\w+)\s*\([^)]*buf:\s*&\[u8\][^)]*\)",
]
# Also find parse/decode functions
parser_patterns = [
r"pub\s+fn\s+(parse\w*)\s*\([^)]*\)",
r"pub\s+fn\s+(decode\w*)\s*\([^)]*\)",
r"pub\s+fn\s+(deserialize\w*)\s*\([^)]*\)",
r"pub\s+fn\s+(from_bytes\w*)\s*\([^)]*\)",
r"pub\s+fn\s+(read\w*)\s*\([^)]*\)",
]
src_path = project_path / "src"
if not src_path.exists():
src_path = project_path
for rust_file in src_path.rglob("*.rs"):
try:
content = rust_file.read_text()
lines = content.split("\n")
for line_num, line in enumerate(lines, 1):
# Check fuzzable patterns
for pattern in fuzzable_patterns:
match = re.search(pattern, line)
if match:
entry_points.append(
EntryPoint(
function=match.group(1),
file=str(rust_file.relative_to(project_path)),
line=line_num,
signature=line.strip(),
fuzzable=True,
)
)
# Check parser patterns (may need manual review)
for pattern in parser_patterns:
match = re.search(pattern, line)
if match:
# Avoid duplicates
func_name = match.group(1)
if not any(ep.function == func_name for ep in entry_points):
entry_points.append(
EntryPoint(
function=func_name,
file=str(rust_file.relative_to(project_path)),
line=line_num,
signature=line.strip(),
fuzzable=True,
)
)
except Exception:
continue
return entry_points
def _find_unsafe_blocks(self, project_path: Path) -> list[UnsafeBlock]:
"""Find unsafe blocks in the Rust source.
:param project_path: Path to the Rust project.
:returns: List of unsafe blocks.
"""
unsafe_blocks: list[UnsafeBlock] = []
src_path = project_path / "src"
if not src_path.exists():
src_path = project_path
for rust_file in src_path.rglob("*.rs"):
try:
content = rust_file.read_text()
lines = content.split("\n")
for line_num, line in enumerate(lines, 1):
if "unsafe" in line and ("{" in line or "fn" in line):
# Determine context
context = "unsafe block"
if "unsafe fn" in line:
context = "unsafe function"
elif "unsafe impl" in line:
context = "unsafe impl"
elif "*const" in line or "*mut" in line:
context = "raw pointer operation"
unsafe_blocks.append(
UnsafeBlock(
file=str(rust_file.relative_to(project_path)),
line=line_num,
context=context,
)
)
except Exception:
continue
return unsafe_blocks
def _run_cargo_audit(self, project_path: Path) -> list[Vulnerability]:
"""Run cargo-audit to find known vulnerabilities.
:param project_path: Path to the Rust project.
:returns: List of vulnerabilities.
"""
vulnerabilities: list[Vulnerability] = []
try:
result = subprocess.run(
["cargo", "audit", "--json"],
cwd=project_path,
capture_output=True,
text=True,
timeout=120,
)
if result.stdout:
audit_data = json.loads(result.stdout)
for vuln in audit_data.get("vulnerabilities", {}).get("list", []):
advisory = vuln.get("advisory", {})
vulnerabilities.append(
Vulnerability(
advisory_id=advisory.get("id", "UNKNOWN"),
crate_name=vuln.get("package", {}).get("name", "unknown"),
version=vuln.get("package", {}).get("version", "0.0.0"),
title=advisory.get("title", "Unknown vulnerability"),
severity=advisory.get("severity", "unknown"),
)
)
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
pass
return vulnerabilities
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run the analysis.
:param resources: Input resources.
:returns: Module result status.
"""
# Find the Rust project
cargo_path = self._find_cargo_toml(resources)
if cargo_path is None:
self.get_logger().error("No Cargo.toml found in resources")
return FuzzForgeModuleResults.FAILURE
project_path = cargo_path.parent
self._project_path = project_path
self.get_logger().info("Analyzing Rust project", project=str(project_path))
# Parse Cargo.toml
crate_name, crate_version, lib_name = self._parse_cargo_toml(cargo_path)
self.get_logger().info("Found crate", name=crate_name, version=crate_version, lib_name=lib_name)
# Find entry points
entry_points = self._find_entry_points(project_path)
self.get_logger().info("Found entry points", count=len(entry_points))
# Find unsafe blocks
unsafe_blocks = self._find_unsafe_blocks(project_path)
self.get_logger().info("Found unsafe blocks", count=len(unsafe_blocks))
# Run cargo-audit if enabled
vulnerabilities: list[Vulnerability] = []
if self._settings and self._settings.run_audit:
vulnerabilities = self._run_cargo_audit(project_path)
self.get_logger().info("Found vulnerabilities", count=len(vulnerabilities))
# Build result
analysis = AnalysisResult(
crate_name=crate_name,
crate_version=crate_version,
lib_name=lib_name,
entry_points=entry_points,
unsafe_blocks=unsafe_blocks,
vulnerabilities=vulnerabilities,
summary={
"entry_points": len(entry_points),
"unsafe_blocks": len(unsafe_blocks),
"vulnerabilities": len(vulnerabilities),
},
)
# Set output data for results.json
self.set_output(
analysis=analysis.model_dump(),
)
# Write analysis to output file (for backwards compatibility)
output_path = PATH_TO_OUTPUTS / "analysis.json"
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(analysis.model_dump_json(indent=2))
self.get_logger().info("Analysis complete", output=str(output_path))
return FuzzForgeModuleResults.SUCCESS
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Clean up after execution.
:param settings: Module settings.
"""
pass

View File

@@ -0,0 +1,100 @@
"""Models for rust-analyzer module."""
from pathlib import Path
from pydantic import BaseModel
from fuzzforge_modules_sdk.api.models import FuzzForgeModuleInputBase, FuzzForgeModuleOutputBase
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the rust-analyzer module."""
class EntryPoint(BaseModel):
"""A fuzzable entry point in the Rust codebase."""
#: Function name.
function: str
#: Source file path.
file: str
#: Line number.
line: int
#: Function signature.
signature: str
#: Whether the function takes &[u8] or similar fuzzable input.
fuzzable: bool = True
class UnsafeBlock(BaseModel):
"""An unsafe block detected in the codebase."""
#: Source file path.
file: str
#: Line number.
line: int
#: Context description.
context: str
class Vulnerability(BaseModel):
"""A known vulnerability from cargo-audit."""
#: Advisory ID (e.g., RUSTSEC-2021-0001).
advisory_id: str
#: Affected crate name.
crate_name: str
#: Affected version.
version: str
#: Vulnerability title.
title: str
#: Severity level.
severity: str
class AnalysisResult(BaseModel):
"""The complete analysis result."""
#: Crate name from Cargo.toml (use this in fuzz/Cargo.toml dependencies).
crate_name: str
#: Crate version.
crate_version: str
#: Library name for use in Rust code (use in `use` statements).
#: In Rust, dashes become underscores: "fuzz-demo" -> "fuzz_demo".
lib_name: str = ""
#: List of fuzzable entry points.
entry_points: list[EntryPoint]
#: List of unsafe blocks.
unsafe_blocks: list[UnsafeBlock]
#: List of known vulnerabilities.
vulnerabilities: list[Vulnerability]
#: Summary statistics.
summary: dict[str, int]
class Output(FuzzForgeModuleOutputBase):
"""Output for the rust-analyzer module."""
#: The analysis result (as dict for serialization).
analysis: dict | None = None
#: Path to the analysis JSON file.
analysis_file: Path | None = None

View File

@@ -0,0 +1,16 @@
"""Settings for rust-analyzer module."""
from fuzzforge_modules_sdk.api.models import FuzzForgeModulesSettingsBase
class Settings(FuzzForgeModulesSettingsBase):
"""Settings for the rust-analyzer module."""
#: Whether to run cargo-audit for CVE detection.
run_audit: bool = True
#: Whether to run cargo-geiger for unsafe detection.
run_geiger: bool = True
#: Maximum depth for dependency analysis.
max_depth: int = 3