feat(modules): add harness-tester module for Rust fuzzing pipeline

This commit is contained in:
AFredefon
2026-02-03 18:09:14 +01:00
parent f099bd018d
commit 8b8662d7af
35 changed files with 2571 additions and 280 deletions

139
USAGE.md
View File

@@ -33,18 +33,9 @@ This guide covers everything you need to know to get started with FuzzForge OSS
# 1. Clone and install # 1. Clone and install
git clone https://github.com/FuzzingLabs/fuzzforge-oss.git git clone https://github.com/FuzzingLabs/fuzzforge-oss.git
cd fuzzforge-oss cd fuzzforge-oss
uv sync --all-extras uv sync
# 2. Build the SDK and module images (one-time setup) # 2. Build the module images (one-time setup)
# First, build the SDK base image and wheel
cd fuzzforge-modules/fuzzforge-modules-sdk
uv build
mkdir -p .wheels
cp ../../dist/fuzzforge_modules_sdk-*.whl .wheels/
cd ../..
docker build -t localhost/fuzzforge-modules-sdk:0.1.0 fuzzforge-modules/fuzzforge-modules-sdk/
# Then build all modules
make build-modules make build-modules
# 3. Install MCP for your AI agent # 3. Install MCP for your AI agent
@@ -111,15 +102,10 @@ cd fuzzforge-oss
### 2. Install Dependencies ### 2. Install Dependencies
```bash ```bash
# Install all workspace dependencies including the CLI uv sync
uv sync --all-extras
``` ```
This installs all FuzzForge components in a virtual environment, including: This installs all FuzzForge components in a virtual environment.
- `fuzzforge-cli` - Command-line interface
- `fuzzforge-mcp` - MCP server
- `fuzzforge-runner` - Module execution engine
- All supporting libraries
### 3. Verify Installation ### 3. Verify Installation
@@ -131,30 +117,10 @@ uv run fuzzforge --help
## Building Modules ## Building Modules
FuzzForge modules are containerized security tools. After cloning, you need to build them once. FuzzForge modules are containerized security tools. After cloning, you need to build them once:
> **Important:** The modules depend on a base SDK image that must be built first.
### Build the SDK Base Image (Required First)
```bash
# 1. Build the SDK Python package wheel
cd fuzzforge-modules/fuzzforge-modules-sdk
uv build
# 2. Copy wheel to the .wheels directory
mkdir -p .wheels
cp ../../dist/fuzzforge_modules_sdk-*.whl .wheels/
# 3. Build the SDK Docker image
cd ../..
docker build -t localhost/fuzzforge-modules-sdk:0.1.0 fuzzforge-modules/fuzzforge-modules-sdk/
```
### Build All Modules ### Build All Modules
Once the SDK is built, build all modules:
```bash ```bash
# From the fuzzforge-oss directory # From the fuzzforge-oss directory
make build-modules make build-modules
@@ -166,14 +132,12 @@ This builds all available modules:
- `fuzzforge-harness-validator` - Validates generated fuzzing harnesses - `fuzzforge-harness-validator` - Validates generated fuzzing harnesses
- `fuzzforge-crash-analyzer` - Analyzes crash inputs - `fuzzforge-crash-analyzer` - Analyzes crash inputs
> **Note:** The first build will take several minutes as it downloads Rust toolchains and dependencies.
### Build a Single Module ### Build a Single Module
```bash ```bash
# Build a specific module (after SDK is built) # Build a specific module
cd fuzzforge-modules/rust-analyzer cd fuzzforge-modules/rust-analyzer
docker build -t fuzzforge-rust-analyzer:0.1.0 . make build
``` ```
### Verify Modules are Built ### Verify Modules are Built
@@ -183,27 +147,13 @@ docker build -t fuzzforge-rust-analyzer:0.1.0 .
docker images | grep fuzzforge docker images | grep fuzzforge
``` ```
You should see at least 5 images: You should see something like:
``` ```
localhost/fuzzforge-modules-sdk 0.1.0 abc123def456 5 minutes ago 465 MB fuzzforge-rust-analyzer 0.1.0 abc123def456 2 minutes ago 850 MB
fuzzforge-rust-analyzer 0.1.0 def789ghi012 2 minutes ago 2.0 GB fuzzforge-cargo-fuzzer 0.1.0 789ghi012jkl 2 minutes ago 1.2 GB
fuzzforge-cargo-fuzzer 0.1.0 ghi012jkl345 2 minutes ago 1.9 GB ...
fuzzforge-harness-validator 0.1.0 jkl345mno678 2 minutes ago 1.9 GB
fuzzforge-crash-analyzer 0.1.0 mno678pqr901 2 minutes ago 517 MB
``` ```
### Verify CLI Installation
```bash
# Test the CLI
uv run fuzzforge --help
# List modules (with environment variable for modules path)
FUZZFORGE_MODULES_PATH=/path/to/fuzzforge-modules uv run fuzzforge modules list
```
You should see 4 available modules listed.
--- ---
## MCP Server Configuration ## MCP Server Configuration
@@ -295,21 +245,6 @@ uv run fuzzforge mcp uninstall claude-desktop
uv run fuzzforge mcp uninstall claude-code uv run fuzzforge mcp uninstall claude-code
``` ```
### Test MCP Server
After installation, verify the MCP server is working:
```bash
# Check if MCP server process is running (in VS Code)
ps aux | grep fuzzforge_mcp
```
You can also test the MCP integration directly in your AI agent:
- **GitHub Copilot**: Ask "List available FuzzForge modules"
- **Claude**: Ask "What FuzzForge modules are available?"
The AI should respond with a list of 4 modules (rust-analyzer, cargo-fuzzer, harness-validator, crash-analyzer).
--- ---
## Using FuzzForge with AI ## Using FuzzForge with AI
@@ -457,39 +392,6 @@ sudo usermod -aG docker $USER
docker run --rm hello-world docker run --rm hello-world
``` ```
### Module Build Fails: "fuzzforge-modules-sdk not found"
```
ERROR: failed to solve: localhost/fuzzforge-modules-sdk:0.1.0: not found
```
**Solution:** You need to build the SDK base image first:
```bash
# 1. Build SDK wheel
cd fuzzforge-modules/fuzzforge-modules-sdk
uv build
mkdir -p .wheels
cp ../../dist/fuzzforge_modules_sdk-*.whl .wheels/
# 2. Build SDK Docker image
cd ../..
docker build -t localhost/fuzzforge-modules-sdk:0.1.0 fuzzforge-modules/fuzzforge-modules-sdk/
# 3. Now build modules
make build-modules
```
### fuzzforge Command Not Found
```
error: Failed to spawn: `fuzzforge`
```
**Solution:** Install with `--all-extras` to include the CLI:
```bash
uv sync --all-extras
```
### No Modules Found ### No Modules Found
``` ```
@@ -497,13 +399,9 @@ No modules found.
``` ```
**Solution:** **Solution:**
1. Build the SDK first (see above) 1. Build the modules first: `make build-modules`
2. Build the modules: `make build-modules` 2. Check the modules path: `uv run fuzzforge modules list`
3. Check the modules path with environment variable: 3. Verify images exist: `docker images | grep fuzzforge`
```bash
FUZZFORGE_MODULES_PATH=/path/to/fuzzforge-modules uv run fuzzforge modules list
```
4. Verify images exist: `docker images | grep fuzzforge`
### MCP Server Not Starting ### MCP Server Not Starting
@@ -514,15 +412,6 @@ uv run fuzzforge mcp status
Verify the configuration file path exists and contains valid JSON. Verify the configuration file path exists and contains valid JSON.
If the server process isn't running:
```bash
# Check if MCP server is running
ps aux | grep fuzzforge_mcp
# Test the MCP server manually
uv run python -m fuzzforge_mcp
```
### Module Container Fails to Build ### Module Container Fails to Build
```bash ```bash

View File

@@ -25,6 +25,9 @@ class ImageInfo:
#: Image size in bytes. #: Image size in bytes.
size: int | None = None size: int | None = None
#: Image labels/metadata.
labels: dict[str, str] | None = None
class AbstractFuzzForgeSandboxEngine(ABC): class AbstractFuzzForgeSandboxEngine(ABC):
"""Abstract class used as a base for all FuzzForge sandbox engine classes.""" """Abstract class used as a base for all FuzzForge sandbox engine classes."""
@@ -279,3 +282,17 @@ class AbstractFuzzForgeSandboxEngine(ABC):
""" """
message: str = f"method 'list_containers' is not implemented for class '{self.__class__.__name__}'" message: str = f"method 'list_containers' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message) raise NotImplementedError(message)
@abstractmethod
def read_file_from_image(self, image: str, path: str) -> str:
"""Read a file from inside an image without starting a container.
Creates a temporary container, copies the file, and removes the container.
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.
"""
message: str = f"method 'read_file_from_image' is not implemented for class '{self.__class__.__name__}'"
raise NotImplementedError(message)

View File

@@ -99,6 +99,17 @@ class DockerCLI(AbstractFuzzForgeSandboxEngine):
if filter_prefix and filter_prefix not in reference: if filter_prefix and filter_prefix not in reference:
continue continue
# Try to get labels from image inspect
labels = {}
try:
inspect_result = self._run(["image", "inspect", reference], check=False)
if inspect_result.returncode == 0:
inspect_data = json.loads(inspect_result.stdout)
if inspect_data and len(inspect_data) > 0:
labels = inspect_data[0].get("Config", {}).get("Labels") or {}
except (json.JSONDecodeError, IndexError):
pass
images.append( images.append(
ImageInfo( ImageInfo(
reference=reference, reference=reference,
@@ -106,6 +117,7 @@ class DockerCLI(AbstractFuzzForgeSandboxEngine):
tag=tag, tag=tag,
image_id=image.get("ID", "")[:12], image_id=image.get("ID", "")[:12],
size=image.get("Size"), size=image.get("Size"),
labels=labels,
) )
) )
@@ -404,3 +416,43 @@ class DockerCLI(AbstractFuzzForgeSandboxEngine):
] ]
except json.JSONDecodeError: except json.JSONDecodeError:
return [] return []
def read_file_from_image(self, image: str, path: str) -> str:
"""Read a file from inside an image without starting a long-running container.
Creates a temporary container, reads the file via cat, and removes it.
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.
"""
logger = get_logger()
# Create a temporary container (don't start it)
create_result = self._run(
["create", "--rm", image, "cat", path],
check=False,
)
if create_result.returncode != 0:
logger.debug("failed to create container for file read", image=image, path=path)
return ""
container_id = create_result.stdout.strip()
try:
# Start the container and capture output (cat will run and exit)
start_result = self._run(
["start", "-a", container_id],
check=False,
)
if start_result.returncode != 0:
logger.debug("failed to read file from image", image=image, path=path)
return ""
return start_result.stdout
finally:
# Cleanup: remove the container (may already be removed due to --rm)
self._run(["rm", "-f", container_id], check=False)

View File

@@ -172,3 +172,8 @@ class Docker(AbstractFuzzForgeSandboxEngine):
"""List containers.""" """List containers."""
message: str = "Docker engine list_containers is not yet implemented" message: str = "Docker engine list_containers is not yet implemented"
raise NotImplementedError(message) raise NotImplementedError(message)
def read_file_from_image(self, image: str, path: str) -> str:
"""Read a file from inside an image without starting a long-running container."""
message: str = "Docker engine read_file_from_image is not yet implemented"
raise NotImplementedError(message)

View File

@@ -166,6 +166,9 @@ class PodmanCLI(AbstractFuzzForgeSandboxEngine):
repo = name repo = name
tag = "latest" tag = "latest"
# Get labels if available
labels = image.get("Labels") or {}
images.append( images.append(
ImageInfo( ImageInfo(
reference=name, reference=name,
@@ -173,6 +176,7 @@ class PodmanCLI(AbstractFuzzForgeSandboxEngine):
tag=tag, tag=tag,
image_id=image.get("Id", "")[:12], image_id=image.get("Id", "")[:12],
size=image.get("Size"), size=image.get("Size"),
labels=labels,
) )
) )
@@ -474,6 +478,46 @@ class PodmanCLI(AbstractFuzzForgeSandboxEngine):
except json.JSONDecodeError: except json.JSONDecodeError:
return [] return []
def read_file_from_image(self, image: str, path: str) -> str:
"""Read a file from inside an image without starting a long-running container.
Creates a temporary container, reads the file via cat, and removes it.
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.
"""
logger = get_logger()
# Create a temporary container (don't start it)
create_result = self._run(
["create", "--rm", image, "cat", path],
check=False,
)
if create_result.returncode != 0:
logger.debug("failed to create container for file read", image=image, path=path)
return ""
container_id = create_result.stdout.strip()
try:
# Start the container and capture output (cat will run and exit)
start_result = self._run(
["start", "-a", container_id],
check=False,
)
if start_result.returncode != 0:
logger.debug("failed to read file from image", image=image, path=path)
return ""
return start_result.stdout
finally:
# Cleanup: remove the container (may already be removed due to --rm)
self._run(["rm", "-f", container_id], check=False)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Utility Methods # Utility Methods
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------

View File

@@ -494,3 +494,40 @@ class Podman(AbstractFuzzForgeSandboxEngine):
} }
for c in containers for c in containers
] ]
def read_file_from_image(self, image: str, path: str) -> str:
"""Read a file from inside an image without starting a long-running container.
Creates a temporary container, reads the file, and removes the container.
:param image: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:param path: Path to file inside image.
:returns: File contents as string.
"""
logger = get_logger()
client: PodmanClient = self.get_client()
with client:
try:
# Create a container that just runs cat on the file
container = client.containers.create(
image=image,
command=["cat", path],
remove=True,
)
# Start it and wait for completion
container.start()
container.wait()
# Get the logs (which contain stdout)
output = container.logs(stdout=True, stderr=False)
if isinstance(output, bytes):
return output.decode("utf-8", errors="replace")
return str(output)
except Exception as exc:
logger.debug("failed to read file from image", image=image, path=path, error=str(exc))
return ""

View File

@@ -14,6 +14,22 @@ if TYPE_CHECKING:
from fastmcp import Context from fastmcp import Context
# Track the current active project path (set by init_project)
_current_project_path: Path | None = None
def set_current_project_path(project_path: Path) -> None:
"""Set the current project path.
Called by init_project to track which project is active.
:param project_path: Path to the project directory.
"""
global _current_project_path
_current_project_path = project_path
def get_settings() -> Settings: def get_settings() -> Settings:
"""Get MCP server settings from context. """Get MCP server settings from context.
@@ -31,11 +47,17 @@ def get_settings() -> Settings:
def get_project_path() -> Path: def get_project_path() -> Path:
"""Get the current project path. """Get the current project path.
Returns the project path set by init_project, or falls back to
the current working directory if no project has been initialized.
:return: Path to the current project. :return: Path to the current project.
""" """
settings: Settings = get_settings() global _current_project_path
return Path(settings.project.default_path) if _current_project_path is not None:
return _current_project_path
# Fall back to current working directory (where the AI agent is working)
return Path.cwd()
def get_runner() -> Runner: def get_runner() -> Runner:

View File

@@ -29,7 +29,8 @@ async def list_modules() -> dict[str, Any]:
"""List all available FuzzForge modules. """List all available FuzzForge modules.
Returns information about modules that can be executed, Returns information about modules that can be executed,
including their identifiers and availability status. including their identifiers, availability status, and metadata
such as use cases, input requirements, and output artifacts.
:return: Dictionary with list of available modules and their details. :return: Dictionary with list of available modules and their details.
@@ -47,10 +48,26 @@ async def list_modules() -> dict[str, Any]:
"identifier": module.identifier, "identifier": module.identifier,
"image": f"{module.identifier}:{module.version or 'latest'}", "image": f"{module.identifier}:{module.version or 'latest'}",
"available": module.available, "available": module.available,
"description": module.description,
# New metadata fields from pyproject.toml
"category": module.category,
"language": module.language,
"pipeline_stage": module.pipeline_stage,
"pipeline_order": module.pipeline_order,
"dependencies": module.dependencies,
"continuous_mode": module.continuous_mode,
"typical_duration": module.typical_duration,
# AI-discoverable metadata
"use_cases": module.use_cases,
"input_requirements": module.input_requirements,
"output_artifacts": module.output_artifacts,
} }
for module in modules for module in modules
] ]
# Sort by pipeline_order if available
available_modules.sort(key=lambda m: (m.get("pipeline_order") or 999, m["identifier"]))
return { return {
"modules": available_modules, "modules": available_modules,
"count": len(available_modules), "count": len(available_modules),
@@ -151,6 +168,8 @@ async def start_continuous_module(
module_identifier=module_identifier, module_identifier=module_identifier,
assets_path=actual_assets_path, assets_path=actual_assets_path,
configuration=configuration, configuration=configuration,
project_path=project_path,
execution_id=session_id,
) )
# Store execution info for tracking # Store execution info for tracking
@@ -162,6 +181,7 @@ async def start_continuous_module(
"status": "running", "status": "running",
"container_id": result["container_id"], "container_id": result["container_id"],
"input_dir": result["input_dir"], "input_dir": result["input_dir"],
"project_path": str(project_path),
} }
return { return {

View File

@@ -8,7 +8,7 @@ from typing import TYPE_CHECKING, Any
from fastmcp import FastMCP from fastmcp import FastMCP
from fastmcp.exceptions import ToolError from fastmcp.exceptions import ToolError
from fuzzforge_mcp.dependencies import get_project_path, get_runner from fuzzforge_mcp.dependencies import get_project_path, get_runner, set_current_project_path
if TYPE_CHECKING: if TYPE_CHECKING:
from fuzzforge_runner import Runner from fuzzforge_runner import Runner
@@ -21,8 +21,12 @@ mcp: FastMCP = FastMCP()
async def init_project(project_path: str | None = None) -> dict[str, Any]: async def init_project(project_path: str | None = None) -> dict[str, Any]:
"""Initialize a new FuzzForge project. """Initialize a new FuzzForge project.
Creates the necessary storage directories for a project. This should Creates a `.fuzzforge/` directory inside the project for storing:
be called before executing modules or workflows. - assets/: Input files (source code, etc.)
- inputs/: Prepared module inputs (for debugging)
- runs/: Execution results from each module
This should be called before executing modules or workflows.
:param project_path: Path to the project directory. If not provided, uses current directory. :param project_path: Path to the project directory. If not provided, uses current directory.
:return: Project initialization result. :return: Project initialization result.
@@ -32,13 +36,17 @@ async def init_project(project_path: str | None = None) -> dict[str, Any]:
try: try:
path = Path(project_path) if project_path else get_project_path() path = Path(project_path) if project_path else get_project_path()
# Track this as the current active project
set_current_project_path(path)
storage_path = runner.init_project(path) storage_path = runner.init_project(path)
return { return {
"success": True, "success": True,
"project_path": str(path), "project_path": str(path),
"storage_path": str(storage_path), "storage_path": str(storage_path),
"message": f"Project initialized at {path}", "message": f"Project initialized. Storage at {path}/.fuzzforge/",
} }
except Exception as exception: except Exception as exception:

View File

@@ -1,5 +1,7 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0 FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install system dependencies for Rust compilation # Install system dependencies for Rust compilation
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \
curl \ curl \

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "cargo-fuzzer" name = "fuzzforge-cargo-fuzzer"
version = "0.1.0" version = "0.1.0"
description = "FuzzForge module that runs cargo-fuzz with libFuzzer on Rust targets" description = "Runs continuous coverage-guided fuzzing on Rust targets using cargo-fuzz"
authors = [] authors = []
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
@@ -29,3 +29,34 @@ fuzzforge-modules-sdk = { workspace = true }
[tool.uv] [tool.uv]
package = true package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-cargo-fuzzer"
category = "fuzzer"
language = "rust"
pipeline_stage = "fuzzing"
pipeline_order = 3
dependencies = ["fuzzforge-harness-tester"]
continuous_mode = true
typical_duration = "continuous"
use_cases = [
"Run continuous coverage-guided fuzzing with libFuzzer",
"Execute cargo-fuzz on validated harnesses",
"Produce crash artifacts for analysis",
"Long-running fuzzing campaign"
]
input_requirements = [
"validated-harnesses",
"Cargo.toml",
"rust-source-code"
]
output_artifacts = [
"crashes/",
"coverage-data/",
"corpus/",
"fuzzing-stats.json"
]

View File

@@ -1,5 +1,7 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0 FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
COPY ./src /app/src COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml COPY ./pyproject.toml /app/pyproject.toml

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "crash-analyzer" name = "fuzzforge-crash-analyzer"
version = "0.1.0" version = "0.1.0"
description = "FuzzForge module that analyzes fuzzing crashes and generates security reports" description = "Analyzes fuzzing crashes, deduplicates them, and generates security reports"
authors = [] authors = []
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
@@ -30,3 +30,33 @@ fuzzforge-modules-sdk = { workspace = true }
[tool.uv] [tool.uv]
package = true package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-crash-analyzer"
category = "reporter"
language = "rust"
pipeline_stage = "crash-analysis"
pipeline_order = 4
dependencies = ["fuzzforge-cargo-fuzzer"]
continuous_mode = false
typical_duration = "1m"
use_cases = [
"Analyze crash artifacts from fuzzing",
"Deduplicate crashes by stack trace signature",
"Triage crashes by severity (critical, high, medium, low)",
"Generate security vulnerability reports"
]
input_requirements = [
"crash-artifacts",
"stack-traces",
"rust-source-code"
]
output_artifacts = [
"unique-crashes.json",
"crash-report.md",
"severity-analysis.json"
]

View File

@@ -1,4 +1,7 @@
FROM localhost/fuzzforge-modules-sdk:0.0.1 FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# See MODULE_METADATA.md for documentation on configuring metadata
COPY ./src /app/src COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml COPY ./pyproject.toml /app/pyproject.toml

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "fuzzforge-module-template" name = "fuzzforge-module-template"
version = "0.0.1" version = "0.1.0"
description = "FIXME" description = "FIXME: Add module description"
authors = [] authors = []
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
@@ -29,3 +29,46 @@ fuzzforge-modules-sdk = { workspace = true }
[tool.uv] [tool.uv]
package = true package = true
# FuzzForge module metadata for AI agent discovery
# See MODULE_METADATA.md for full documentation
[tool.fuzzforge.module]
# REQUIRED: Unique module identifier (should match Docker image name)
identifier = "fuzzforge-module-template"
# REQUIRED: Module category - one of: analyzer, validator, fuzzer, reporter
category = "analyzer"
# Optional: Target programming language
language = "rust"
# Optional: Pipeline stage name
pipeline_stage = "analysis"
# Optional: Numeric order in pipeline (for sorting)
pipeline_order = 1
# Optional: List of module identifiers that must run before this one
dependencies = []
# Optional: Whether this module supports continuous/background execution
continuous_mode = false
# Optional: Expected runtime (e.g., "30s", "5m", "continuous")
typical_duration = "30s"
# REQUIRED: Use cases help AI agents understand when to use this module
use_cases = [
"FIXME: Describe what this module does",
"FIXME: Describe typical usage scenario"
]
# REQUIRED: What inputs the module expects
input_requirements = [
"FIXME: List required input files or artifacts"
]
# REQUIRED: What outputs the module produces
output_artifacts = [
"FIXME: List output files produced"
]

View File

@@ -1,4 +1,7 @@
FROM localhost/fuzzforge-modules-sdk:0.0.1 FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is read from pyproject.toml [tool.fuzzforge.module] section
# See MODULE_METADATA.md for documentation on configuring metadata
COPY ./src /app/src COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml COPY ./pyproject.toml /app/pyproject.toml

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "fuzzforge-module-template" name = "fuzzforge-module-template"
version = "0.0.1" version = "0.1.0"
description = "FIXME" description = "FIXME: Add module description"
authors = [] authors = []
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
@@ -29,3 +29,46 @@ fuzzforge-modules-sdk = { workspace = true }
[tool.uv] [tool.uv]
package = true package = true
# FuzzForge module metadata for AI agent discovery
# See MODULE_METADATA.md for full documentation
[tool.fuzzforge.module]
# REQUIRED: Unique module identifier (should match Docker image name)
identifier = "fuzzforge-module-template"
# REQUIRED: Module category - one of: analyzer, validator, fuzzer, reporter
category = "analyzer"
# Optional: Target programming language
language = "rust"
# Optional: Pipeline stage name
pipeline_stage = "analysis"
# Optional: Numeric order in pipeline (for sorting)
pipeline_order = 1
# Optional: List of module identifiers that must run before this one
dependencies = []
# Optional: Whether this module supports continuous/background execution
continuous_mode = false
# Optional: Expected runtime (e.g., "30s", "5m", "continuous")
typical_duration = "30s"
# REQUIRED: Use cases help AI agents understand when to use this module
use_cases = [
"FIXME: Describe what this module does",
"FIXME: Describe typical usage scenario"
]
# REQUIRED: What inputs the module expects
input_requirements = [
"FIXME: List required input files or artifacts"
]
# REQUIRED: What outputs the module produces
output_artifacts = [
"FIXME: List output files produced"
]

View File

@@ -0,0 +1,26 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install build tools and Rust nightly for compiling and testing fuzz harnesses
RUN apt-get update && apt-get install -y \
curl \
build-essential \
pkg-config \
libssl-dev \
&& rm -rf /var/lib/apt/lists/*
RUN curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y --default-toolchain nightly
ENV PATH="/root/.cargo/bin:${PATH}"
# Install cargo-fuzz for testing harnesses
RUN cargo install cargo-fuzz --locked || true
COPY ./src /app/src
COPY ./pyproject.toml /app/pyproject.toml
COPY ./README.md /app/README.md
# Remove workspace reference since we're using wheels
RUN sed -i '/\[tool\.uv\.sources\]/,/^$/d' /app/pyproject.toml
RUN uv sync --find-links /wheels

View File

@@ -0,0 +1,289 @@
# Harness Tester Feedback Types
Complete reference of all feedback the `harness-tester` module provides to help AI agents improve fuzz harnesses.
## Overview
The harness-tester evaluates harnesses across **6 dimensions** and provides specific, actionable suggestions for each issue detected.
---
## 1. Compilation Feedback
### ✅ Success Cases
- **Compiles successfully** → Strength noted
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `undefined_variable` | CRITICAL | "cannot find" in error | Check variable names match function signature. Use exact names from fuzzable_functions.json |
| `type_mismatch` | CRITICAL | "mismatched types" in error | Check function expects types you're passing. Convert fuzzer input to correct type (e.g., &[u8] to &str with from_utf8) |
| `trait_not_implemented` | CRITICAL | "trait" + "not implemented" | Ensure you're using correct types. Some functions require specific trait implementations |
| `compilation_error` | CRITICAL | Any other error | Review error message and fix syntax/type issues. Check function signatures in source code |
### ⚠️ Warning Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unused_variable` | INFO | "unused" in warning | Remove unused variables or use underscore prefix (_variable) to suppress warning |
---
## 2. Execution Feedback
### ✅ Success Cases
- **Executes without crashing** → Strength noted
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `stack_overflow` | CRITICAL | "stack overflow" in crash | Check for infinite recursion or large stack allocations. Use heap allocation (Box, Vec) for large data structures |
| `panic_on_start` | CRITICAL | "panic" in crash | Check initialization code. Ensure required resources are available and input validation doesn't panic on empty input |
| `immediate_crash` | CRITICAL | Crashes on first run | Debug harness initialization. Add error handling and check for null/invalid pointers |
| `infinite_loop` | CRITICAL | Execution timeout | Check for loops that depend on fuzzer input. Add iteration limits or timeout mechanisms |
---
## 3. Coverage Feedback
### ✅ Success Cases
- **>50% coverage** → "Excellent coverage"
- **Good growth** → "Harness exploring code paths"
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `no_coverage` | CRITICAL | 0 new edges found | Ensure you're actually calling the target function with fuzzer-provided data. Check that 'data' parameter is passed to function |
| `very_low_coverage` | WARNING | <5% coverage or "none" growth | Harness may not be reaching target code. Verify correct entry point function. Check if input validation rejects all fuzzer data |
| `low_coverage` | WARNING | <20% coverage or "poor" growth | Try fuzzing multiple entry points or remove restrictive input validation. Consider using dictionary for structured inputs |
| `early_stagnation` | INFO | Coverage stops growing <10s | Harness may be hitting input validation barriers. Consider fuzzing with seed corpus of valid inputs |
---
## 4. Performance Feedback
### ✅ Success Cases
- **>1000 execs/s** → "Excellent performance"
- **>500 execs/s** → "Good performance"
### ❌ Error Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `extremely_slow` | CRITICAL | <10 execs/s | Remove file I/O, network operations, or expensive computations from harness loop. Move setup code outside fuzz target function |
| `slow_execution` | WARNING | <100 execs/s | Optimize harness: avoid allocations in hot path, reuse buffers, remove logging. Profile to find bottlenecks |
---
## 5. Stability Feedback
### ✅ Success Cases
- **Stable execution** → Strength noted
- **Found unique crashes** → "Found N potential bugs!"
### ⚠️ Warning Cases
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unstable_frequent_crashes` | WARNING | >10 crashes per 1000 execs | This might be expected if testing buggy code. If not, add error handling for edge cases or invalid inputs |
| `hangs_detected` | WARNING | Hangs found during trial | Add timeouts to prevent infinite loops. Check for blocking operations or resource exhaustion |
---
## 6. Code Quality Feedback
### Informational
| Issue Type | Severity | Detection | Suggestion |
|------------|----------|-----------|------------|
| `unused_variable` | INFO | Compiler warnings | Clean up code for better maintainability |
---
## Quality Scoring Formula
```
Base Score: 20 points (for compiling + running)
+ Coverage (0-40 points):
- Excellent growth: +40
- Good growth: +30
- Poor growth: +10
- No growth: +0
+ Performance (0-25 points):
- >1000 execs/s: +25
- >500 execs/s: +20
- >100 execs/s: +10
- >10 execs/s: +5
- <10 execs/s: +0
+ Stability (0-15 points):
- Stable: +15
- Unstable: +10
- Crashes frequently: +5
Maximum: 100 points
```
### Verdicts
- **70-100**: `production-ready` → Use for long-term fuzzing campaigns
- **30-69**: `needs-improvement` → Fix issues before production use
- **0-29**: `broken` → Critical issues block execution
---
## Example Feedback Flow
### Scenario 1: Broken Harness (Type Mismatch)
```json
{
"quality": {
"score": 0,
"verdict": "broken",
"issues": [
{
"category": "compilation",
"severity": "critical",
"type": "type_mismatch",
"message": "Type mismatch: expected &[u8], found &str",
"suggestion": "Check function expects types you're passing. Convert fuzzer input to correct type (e.g., &[u8] to &str with from_utf8)"
}
],
"recommended_actions": [
"Fix 1 critical issue(s) preventing execution"
]
}
}
```
**AI Agent Action**: Regenerate harness with correct type conversion
---
### Scenario 2: Low Coverage Harness
```json
{
"quality": {
"score": 35,
"verdict": "needs-improvement",
"issues": [
{
"category": "coverage",
"severity": "warning",
"type": "low_coverage",
"message": "Low coverage: 12% - not exploring enough code paths",
"suggestion": "Try fuzzing multiple entry points or remove restrictive input validation"
},
{
"category": "performance",
"severity": "warning",
"type": "slow_execution",
"message": "Slow execution: 45 execs/sec (expected 500+)",
"suggestion": "Optimize harness: avoid allocations in hot path, reuse buffers"
}
],
"strengths": [
"Compiles successfully",
"Executes without crashing"
],
"recommended_actions": [
"Address 2 warning(s) to improve harness quality"
]
}
}
```
**AI Agent Action**: Remove input validation, optimize performance
---
### Scenario 3: Production-Ready Harness
```json
{
"quality": {
"score": 85,
"verdict": "production-ready",
"issues": [],
"strengths": [
"Compiles successfully",
"Executes without crashing",
"Excellent coverage: 67% of target code reached",
"Excellent performance: 1507 execs/sec",
"Stable execution - no crashes or hangs"
],
"recommended_actions": [
"Harness is ready for production fuzzing"
]
}
}
```
**AI Agent Action**: Proceed to long-term fuzzing with cargo-fuzzer
---
## Integration with AI Workflow
```python
def iterative_harness_generation(target_function):
"""AI agent iteratively improves harness based on feedback."""
max_iterations = 3
for iteration in range(max_iterations):
# Generate or improve harness
if iteration == 0:
harness = ai_generate_harness(target_function)
else:
harness = ai_improve_harness(previous_harness, feedback)
# Test harness
result = execute_module("harness-tester", harness)
evaluation = result["harnesses"][0]
# Check verdict
if evaluation["quality"]["verdict"] == "production-ready":
return harness # Success!
# Extract feedback for next iteration
feedback = {
"issues": evaluation["quality"]["issues"],
"suggestions": [issue["suggestion"] for issue in evaluation["quality"]["issues"]],
"score": evaluation["quality"]["score"],
"coverage": evaluation["fuzzing_trial"]["coverage"] if "fuzzing_trial" in evaluation else None,
"performance": evaluation["fuzzing_trial"]["performance"] if "fuzzing_trial" in evaluation else None
}
# Store for next iteration
previous_harness = harness
return harness # Return best attempt after max iterations
```
---
## Summary
The harness-tester provides **comprehensive, actionable feedback** across 6 dimensions:
1.**Compilation** - Syntax and type correctness
2.**Execution** - Runtime stability
3.**Coverage** - Code exploration effectiveness
4.**Performance** - Execution speed
5.**Stability** - Crash/hang frequency
6.**Code Quality** - Best practices
Each issue includes:
- **Clear detection** of what went wrong
- **Specific suggestion** on how to fix it
- **Severity level** to prioritize fixes
This enables AI agents to rapidly iterate and produce high-quality fuzz harnesses with minimal human intervention.

View File

@@ -0,0 +1,28 @@
.PHONY: help build clean format lint test
help:
@echo "Available targets:"
@echo " build - Build Docker image"
@echo " clean - Remove build artifacts"
@echo " format - Format code with ruff"
@echo " lint - Lint code with ruff and mypy"
@echo " test - Run tests"
build:
docker build -t fuzzforge-harness-tester:0.1.0 .
clean:
rm -rf .pytest_cache
rm -rf .mypy_cache
rm -rf .ruff_cache
find . -type d -name __pycache__ -exec rm -rf {} +
format:
uv run ruff format ./src ./tests
lint:
uv run ruff check ./src ./tests
uv run mypy ./src
test:
uv run pytest tests/ -v

View File

@@ -0,0 +1,155 @@
# Harness Tester Module
Tests and evaluates fuzz harnesses with comprehensive feedback for AI-driven iteration.
## Overview
The `harness-tester` module runs a battery of tests on fuzz harnesses to provide actionable feedback:
1. **Compilation Testing** - Validates harness compiles correctly
2. **Execution Testing** - Ensures harness runs without immediate crashes
3. **Fuzzing Trial** - Runs short fuzzing session (default: 30s) to measure:
- Coverage growth
- Execution performance (execs/sec)
- Stability (crashes, hangs)
4. **Quality Assessment** - Generates scored evaluation with specific issues and suggestions
## Feedback Categories
### 1. Compilation Feedback
- Undefined variables → "Check variable names match function signature"
- Type mismatches → "Convert fuzzer input to correct type"
- Missing traits → "Ensure you're using correct types"
### 2. Execution Feedback
- Stack overflow → "Check for infinite recursion, use heap allocation"
- Immediate panic → "Check initialization code and input validation"
- Timeout/infinite loop → "Add iteration limits"
### 3. Coverage Feedback
- No coverage → "Harness may not be using fuzzer input"
- Very low coverage (<5%) → "May not be reaching target code, check entry point"
- Low coverage (<20%) → "Try fuzzing multiple entry points"
- Good/Excellent coverage → "Harness is exploring code paths well"
### 4. Performance Feedback
- Extremely slow (<10 execs/s) → "Remove file I/O or network operations"
- Slow (<100 execs/s) → "Optimize harness, avoid allocations in hot path"
- Good (>500 execs/s) → Ready for production
- Excellent (>1000 execs/s) → Optimal performance
### 5. Stability Feedback
- Frequent crashes → "Add error handling for edge cases"
- Hangs detected → "Add timeouts to prevent infinite loops"
- Stable → Ready for production
## Usage
```python
# Via MCP
result = execute_module("harness-tester",
assets_path="/path/to/rust/project",
configuration={
"trial_duration_sec": 30,
"execution_timeout_sec": 10
})
```
## Input Requirements
- Rust project with `Cargo.toml`
- Fuzz harnesses in `fuzz/fuzz_targets/`
- Source code to analyze
## Output Artifacts
### `harness-evaluation.json`
Complete structured evaluation with:
```json
{
"harnesses": [
{
"name": "fuzz_png_decode",
"compilation": { "success": true, "time_ms": 4523 },
"execution": { "success": true },
"fuzzing_trial": {
"coverage": {
"final_edges": 891,
"growth_rate": "good",
"percentage_estimate": 67.0
},
"performance": {
"execs_per_sec": 1507.0,
"performance_rating": "excellent"
},
"stability": { "status": "stable" }
},
"quality": {
"score": 85,
"verdict": "production-ready",
"issues": [],
"strengths": ["Excellent performance", "Good coverage"],
"recommended_actions": ["Ready for production fuzzing"]
}
}
],
"summary": {
"total_harnesses": 1,
"production_ready": 1,
"average_score": 85.0
}
}
```
### `feedback-summary.md`
Human-readable summary with all issues and suggestions.
## Quality Scoring
Harnesses are scored 0-100 based on:
- **Compilation** (20 points): Must compile to proceed
- **Execution** (20 points): Must run without crashing
- **Coverage** (40 points):
- Excellent growth: 40 pts
- Good growth: 30 pts
- Poor growth: 10 pts
- **Performance** (25 points):
- >1000 execs/s: 25 pts
- >500 execs/s: 20 pts
- >100 execs/s: 10 pts
- **Stability** (15 points):
- Stable: 15 pts
- Unstable: 10 pts
- Crashes frequently: 5 pts
**Verdicts:**
- 70-100: `production-ready`
- 30-69: `needs-improvement`
- 0-29: `broken`
## AI Agent Iteration Pattern
```
1. AI generates harness
2. harness-tester evaluates it
3. Returns: score=35, verdict="needs-improvement"
Issues: "Low coverage (8%), slow execution (7.8 execs/s)"
Suggestions: "Check entry point function, remove I/O operations"
4. AI fixes harness based on feedback
5. harness-tester re-evaluates
6. Returns: score=85, verdict="production-ready"
7. Proceed to production fuzzing
```
## Configuration Options
| Option | Default | Description |
|--------|---------|-------------|
| `trial_duration_sec` | 30 | How long to run fuzzing trial |
| `execution_timeout_sec` | 10 | Timeout for execution test |
## See Also
- [Module SDK Documentation](../fuzzforge-modules-sdk/README.md)
- [MODULE_METADATA.md](../MODULE_METADATA.md)

View File

@@ -0,0 +1,6 @@
[mypy]
plugins = pydantic.mypy
strict = True
warn_unused_ignores = True
warn_redundant_casts = True
warn_return_any = True

View File

@@ -0,0 +1,58 @@
[project]
name = "fuzzforge-harness-tester"
version = "0.1.0"
description = "Tests and evaluates fuzz harnesses with detailed feedback for AI-driven iteration"
readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
]
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/module"]
[tool.uv]
dev-dependencies = [
"mypy>=1.8.0",
"pytest>=7.4.3",
"pytest-asyncio>=0.21.1",
"pytest-cov>=4.1.0",
"ruff>=0.1.9",
]
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-harness-tester"
category = "validator"
language = "rust"
pipeline_stage = "harness-testing"
pipeline_order = 2
dependencies = ["fuzzforge-rust-analyzer"]
continuous_mode = false
typical_duration = "2m"
use_cases = [
"Validate fuzz harnesses compile correctly",
"Run short fuzzing trials to assess harness quality",
"Provide detailed feedback for AI to improve harnesses",
"Gate before running expensive long fuzzing campaigns"
]
input_requirements = [
"fuzz-harnesses",
"Cargo.toml",
"rust-source-code"
]
output_artifacts = [
"harness-evaluation.json",
"coverage-report.json",
"feedback-summary.md"
]

View File

@@ -0,0 +1,19 @@
line-length = 120
[lint]
select = [ "ALL" ]
ignore = [
"COM812", # conflicts with the formatter
"D100", # ignoring missing docstrings in public modules
"D104", # ignoring missing docstrings in public packages
"D203", # conflicts with 'D211'
"D213", # conflicts with 'D212'
"TD002", # ignoring missing author in 'TODO' statements
"TD003", # ignoring missing issue link in 'TODO' statements
]
[lint.per-file-ignores]
"tests/*" = [
"PLR2004", # allowing comparisons using unamed numerical constants in tests
"S101", # allowing 'assert' statements in tests
]

View File

@@ -0,0 +1,623 @@
"""Harness tester module - tests and evaluates fuzz harnesses."""
import json
import subprocess
import time
from pathlib import Path
from fuzzforge_modules_sdk import (
FuzzForgeModule,
FuzzForgeModuleResults,
FuzzForgeResource,
)
from module.analyzer import FeedbackGenerator
from module.feedback import (
CompilationResult,
CoverageMetrics,
EvaluationSummary,
ExecutionResult,
FuzzingTrial,
HarnessEvaluation,
HarnessTestReport,
PerformanceMetrics,
StabilityMetrics,
)
class HarnessTesterModule(FuzzForgeModule):
"""Tests fuzz harnesses with compilation, execution, and short fuzzing trials."""
def _run(self, resources: list[FuzzForgeResource]) -> FuzzForgeModuleResults:
"""Run harness testing on provided resources.
:param resources: List of resources (Rust project with fuzz harnesses)
:returns: Module execution result
"""
self.emit_event("started", message="Beginning harness testing")
# Configuration
trial_duration = self.configuration.get("trial_duration_sec", 30)
timeout_sec = self.configuration.get("execution_timeout_sec", 10)
# Find Rust project
project_path = self._find_rust_project(resources)
if not project_path:
self.emit_event("error", message="No Rust project found in resources")
return FuzzForgeModuleResults.FAILURE
# Find fuzz harnesses
harnesses = self._find_fuzz_harnesses(project_path)
if not harnesses:
self.emit_event("error", message="No fuzz harnesses found")
return FuzzForgeModuleResults.FAILURE
self.emit_event(
"found_harnesses",
count=len(harnesses),
harnesses=[h.name for h in harnesses],
)
# Test each harness
evaluations = []
total_harnesses = len(harnesses)
for idx, harness in enumerate(harnesses, 1):
self.emit_progress(
int((idx / total_harnesses) * 90),
status="testing",
message=f"Testing harness {idx}/{total_harnesses}: {harness.name}",
)
evaluation = self._test_harness(
project_path, harness, trial_duration, timeout_sec
)
evaluations.append(evaluation)
# Emit evaluation summary
self.emit_event(
"harness_tested",
harness=harness.name,
verdict=evaluation.quality.verdict,
score=evaluation.quality.score,
issues=len(evaluation.quality.issues),
)
# Generate summary
summary = self._generate_summary(evaluations)
# Create report
report = HarnessTestReport(
harnesses=evaluations,
summary=summary,
test_configuration={
"trial_duration_sec": trial_duration,
"execution_timeout_sec": timeout_sec,
},
)
# Save report
self._save_report(report)
self.emit_progress(100, status="completed", message="Harness testing complete")
self.emit_event(
"completed",
total_harnesses=total_harnesses,
production_ready=summary.production_ready,
needs_improvement=summary.needs_improvement,
broken=summary.broken,
)
return FuzzForgeModuleResults.SUCCESS
def _find_rust_project(self, resources: list[FuzzForgeResource]) -> Path | None:
"""Find Rust project with Cargo.toml.
:param resources: List of resources
:returns: Path to Rust project or None
"""
for resource in resources:
cargo_toml = Path(resource.path) / "Cargo.toml"
if cargo_toml.exists():
return Path(resource.path)
return None
def _find_fuzz_harnesses(self, project_path: Path) -> list[Path]:
"""Find fuzz harnesses in project.
:param project_path: Path to Rust project
:returns: List of harness file paths
"""
fuzz_dir = project_path / "fuzz" / "fuzz_targets"
if not fuzz_dir.exists():
return []
harnesses = list(fuzz_dir.glob("*.rs"))
return harnesses
def _test_harness(
self,
project_path: Path,
harness_path: Path,
trial_duration: int,
timeout_sec: int,
) -> HarnessEvaluation:
"""Test a single harness comprehensively.
:param project_path: Path to Rust project
:param harness_path: Path to harness file
:param trial_duration: Duration for fuzzing trial in seconds
:param timeout_sec: Timeout for execution test
:returns: Harness evaluation
"""
harness_name = harness_path.stem
# Step 1: Compilation
self.emit_event("compiling", harness=harness_name)
compilation = self._test_compilation(project_path, harness_name)
# Initialize evaluation
evaluation = HarnessEvaluation(
name=harness_name,
path=str(harness_path),
compilation=compilation,
execution=None,
fuzzing_trial=None,
quality=None, # type: ignore
)
# If compilation failed, generate feedback and return
if not compilation.success:
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
execution_result=None,
coverage=None,
performance=None,
stability=None,
)
return evaluation
# Step 2: Execution test
self.emit_event("testing_execution", harness=harness_name)
execution = self._test_execution(project_path, harness_name, timeout_sec)
evaluation.execution = execution
if not execution.success:
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
execution_result=execution.dict(),
coverage=None,
performance=None,
stability=None,
)
return evaluation
# Step 3: Fuzzing trial
self.emit_event("running_trial", harness=harness_name, duration=trial_duration)
fuzzing_trial = self._run_fuzzing_trial(
project_path, harness_name, trial_duration
)
evaluation.fuzzing_trial = fuzzing_trial
# Generate quality assessment
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
execution_result=execution.dict(),
coverage=fuzzing_trial.coverage if fuzzing_trial else None,
performance=fuzzing_trial.performance if fuzzing_trial else None,
stability=fuzzing_trial.stability if fuzzing_trial else None,
)
return evaluation
def _test_compilation(self, project_path: Path, harness_name: str) -> CompilationResult:
"""Test harness compilation.
:param project_path: Path to Rust project
:param harness_name: Name of harness to compile
:returns: Compilation result
"""
start_time = time.time()
try:
result = subprocess.run(
["cargo", "fuzz", "build", harness_name],
cwd=project_path,
capture_output=True,
text=True,
timeout=300, # 5 min timeout for compilation
)
compilation_time = int((time.time() - start_time) * 1000)
if result.returncode == 0:
# Parse warnings
warnings = self._parse_compiler_warnings(result.stderr)
return CompilationResult(
success=True, time_ms=compilation_time, warnings=warnings
)
else:
# Parse errors
errors = self._parse_compiler_errors(result.stderr)
return CompilationResult(
success=False,
time_ms=compilation_time,
errors=errors,
stderr=result.stderr,
)
except subprocess.TimeoutExpired:
return CompilationResult(
success=False,
errors=["Compilation timed out after 5 minutes"],
stderr="Timeout",
)
except Exception as e:
return CompilationResult(
success=False, errors=[f"Compilation failed: {e!s}"], stderr=str(e)
)
def _test_execution(
self, project_path: Path, harness_name: str, timeout_sec: int
) -> ExecutionResult:
"""Test harness execution with minimal input.
:param project_path: Path to Rust project
:param harness_name: Name of harness
:param timeout_sec: Timeout for execution
:returns: Execution result
"""
try:
# Run with very short timeout and max runs
result = subprocess.run(
[
"cargo",
"fuzz",
"run",
harness_name,
"--",
"-runs=10",
f"-max_total_time={timeout_sec}",
],
cwd=project_path,
capture_output=True,
text=True,
timeout=timeout_sec + 5,
)
# Check if it crashed immediately
if "SUMMARY: libFuzzer: deadly signal" in result.stderr:
return ExecutionResult(
success=False,
immediate_crash=True,
crash_details=self._extract_crash_info(result.stderr),
)
# Success if completed runs
return ExecutionResult(success=True, runs_completed=10)
except subprocess.TimeoutExpired:
return ExecutionResult(success=False, timeout=True)
except Exception as e:
return ExecutionResult(
success=False, immediate_crash=True, crash_details=str(e)
)
def _run_fuzzing_trial(
self, project_path: Path, harness_name: str, duration_sec: int
) -> FuzzingTrial | None:
"""Run short fuzzing trial to gather metrics.
:param project_path: Path to Rust project
:param harness_name: Name of harness
:param duration_sec: Duration to run fuzzing
:returns: Fuzzing trial results or None if failed
"""
try:
result = subprocess.run(
[
"cargo",
"fuzz",
"run",
harness_name,
"--",
f"-max_total_time={duration_sec}",
"-print_final_stats=1",
],
cwd=project_path,
capture_output=True,
text=True,
timeout=duration_sec + 30,
)
# Parse fuzzing statistics
stats = self._parse_fuzzing_stats(result.stderr)
# Create metrics
coverage = CoverageMetrics(
initial_edges=stats.get("initial_edges", 0),
final_edges=stats.get("cov_edges", 0),
new_edges_found=stats.get("cov_edges", 0) - stats.get("initial_edges", 0),
growth_rate=self._assess_coverage_growth(stats),
percentage_estimate=self._estimate_coverage_percentage(stats),
stagnation_time_sec=stats.get("stagnation_time"),
)
performance = PerformanceMetrics(
total_execs=stats.get("total_execs", 0),
execs_per_sec=stats.get("exec_per_sec", 0.0),
performance_rating=self._assess_performance(stats.get("exec_per_sec", 0.0)),
)
stability = StabilityMetrics(
status=self._assess_stability(stats),
crashes_found=stats.get("crashes", 0),
unique_crashes=stats.get("unique_crashes", 0),
crash_rate=self._calculate_crash_rate(stats),
)
return FuzzingTrial(
duration_seconds=duration_sec,
coverage=coverage,
performance=performance,
stability=stability,
trial_successful=True,
)
except Exception:
return None
def _parse_compiler_errors(self, stderr: str) -> list[str]:
"""Parse compiler error messages.
:param stderr: Compiler stderr output
:returns: List of error messages
"""
errors = []
for line in stderr.split("\n"):
if "error:" in line or "error[" in line:
errors.append(line.strip())
return errors[:10] # Limit to first 10 errors
def _parse_compiler_warnings(self, stderr: str) -> list[str]:
"""Parse compiler warnings.
:param stderr: Compiler stderr output
:returns: List of warning messages
"""
warnings = []
for line in stderr.split("\n"):
if "warning:" in line:
warnings.append(line.strip())
return warnings[:5] # Limit to first 5 warnings
def _extract_crash_info(self, stderr: str) -> str:
"""Extract crash information from stderr.
:param stderr: Fuzzer stderr output
:returns: Crash details
"""
lines = stderr.split("\n")
for i, line in enumerate(lines):
if "SUMMARY:" in line or "deadly signal" in line:
return "\n".join(lines[max(0, i - 3) : i + 5])
return stderr[:500] # First 500 chars if no specific crash info
def _parse_fuzzing_stats(self, stderr: str) -> dict:
"""Parse fuzzing statistics from libFuzzer output.
:param stderr: Fuzzer stderr output
:returns: Dictionary of statistics
"""
stats = {
"total_execs": 0,
"exec_per_sec": 0.0,
"cov_edges": 0,
"initial_edges": 0,
"crashes": 0,
"unique_crashes": 0,
}
lines = stderr.split("\n")
# Find initial coverage
for line in lines[:20]:
if "cov:" in line:
try:
cov_part = line.split("cov:")[1].split()[0]
stats["initial_edges"] = int(cov_part)
break
except (IndexError, ValueError):
pass
# Parse final stats
for line in reversed(lines):
if "#" in line and "cov:" in line and "exec/s:" in line:
try:
# Parse line like: "#12345 cov: 891 ft: 1234 corp: 56/789b exec/s: 1507"
parts = line.split()
for i, part in enumerate(parts):
if part.startswith("#"):
stats["total_execs"] = int(part[1:])
elif part == "cov:":
stats["cov_edges"] = int(parts[i + 1])
elif part == "exec/s:":
stats["exec_per_sec"] = float(parts[i + 1])
except (IndexError, ValueError):
pass
# Count crashes
if "crash-" in line or "leak-" in line or "timeout-" in line:
stats["crashes"] += 1
# Estimate unique crashes (simplified)
stats["unique_crashes"] = min(stats["crashes"], 10)
return stats
def _assess_coverage_growth(self, stats: dict) -> str:
"""Assess coverage growth quality.
:param stats: Fuzzing statistics
:returns: Growth rate assessment
"""
new_edges = stats.get("cov_edges", 0) - stats.get("initial_edges", 0)
if new_edges == 0:
return "none"
elif new_edges < 50:
return "poor"
elif new_edges < 200:
return "good"
else:
return "excellent"
def _estimate_coverage_percentage(self, stats: dict) -> float | None:
"""Estimate coverage percentage (rough heuristic).
:param stats: Fuzzing statistics
:returns: Estimated percentage or None
"""
edges = stats.get("cov_edges", 0)
if edges == 0:
return 0.0
# Rough heuristic: assume medium-sized function has ~2000 edges
# This is very approximate
estimated = min((edges / 2000) * 100, 100)
return round(estimated, 1)
def _assess_performance(self, execs_per_sec: float) -> str:
"""Assess performance rating.
:param execs_per_sec: Executions per second
:returns: Performance rating
"""
if execs_per_sec > 1000:
return "excellent"
elif execs_per_sec > 100:
return "good"
else:
return "poor"
def _assess_stability(self, stats: dict) -> str:
"""Assess stability status.
:param stats: Fuzzing statistics
:returns: Stability status
"""
crashes = stats.get("crashes", 0)
total_execs = stats.get("total_execs", 0)
if total_execs == 0:
return "unknown"
crash_rate = (crashes / total_execs) * 1000
if crash_rate > 10:
return "crashes_frequently"
elif crash_rate > 1:
return "unstable"
else:
return "stable"
def _calculate_crash_rate(self, stats: dict) -> float:
"""Calculate crash rate per 1000 executions.
:param stats: Fuzzing statistics
:returns: Crash rate
"""
crashes = stats.get("crashes", 0)
total = stats.get("total_execs", 0)
if total == 0:
return 0.0
return (crashes / total) * 1000
def _generate_summary(self, evaluations: list[HarnessEvaluation]) -> EvaluationSummary:
"""Generate evaluation summary.
:param evaluations: List of harness evaluations
:returns: Summary statistics
"""
production_ready = sum(
1 for e in evaluations if e.quality.verdict == "production-ready"
)
needs_improvement = sum(
1 for e in evaluations if e.quality.verdict == "needs-improvement"
)
broken = sum(1 for e in evaluations if e.quality.verdict == "broken")
avg_score = (
sum(e.quality.score for e in evaluations) / len(evaluations)
if evaluations
else 0
)
# Generate recommendation
if broken > 0:
recommended_action = f"Fix {broken} broken harness(es) before proceeding."
elif needs_improvement > 0:
recommended_action = f"Improve {needs_improvement} harness(es) for better results."
else:
recommended_action = "All harnesses are production-ready!"
return EvaluationSummary(
total_harnesses=len(evaluations),
production_ready=production_ready,
needs_improvement=needs_improvement,
broken=broken,
average_score=round(avg_score, 1),
recommended_action=recommended_action,
)
def _save_report(self, report: HarnessTestReport) -> None:
"""Save test report to results directory.
:param report: Harness test report
"""
# Save JSON report
results_path = Path("/results/harness-evaluation.json")
with results_path.open("w") as f:
json.dump(report.dict(), f, indent=2)
# Save human-readable summary
summary_path = Path("/results/feedback-summary.md")
with summary_path.open("w") as f:
f.write("# Harness Testing Report\n\n")
f.write(f"**Total Harnesses:** {report.summary.total_harnesses}\n")
f.write(f"**Production Ready:** {report.summary.production_ready}\n")
f.write(f"**Needs Improvement:** {report.summary.needs_improvement}\n")
f.write(f"**Broken:** {report.summary.broken}\n")
f.write(f"**Average Score:** {report.summary.average_score}/100\n\n")
f.write(f"**Recommendation:** {report.summary.recommended_action}\n\n")
f.write("## Individual Harness Results\n\n")
for harness in report.harnesses:
f.write(f"### {harness.name}\n\n")
f.write(f"- **Verdict:** {harness.quality.verdict}\n")
f.write(f"- **Score:** {harness.quality.score}/100\n\n")
if harness.quality.strengths:
f.write("**Strengths:**\n")
for strength in harness.quality.strengths:
f.write(f"- {strength}\n")
f.write("\n")
if harness.quality.issues:
f.write("**Issues:**\n")
for issue in harness.quality.issues:
f.write(f"- [{issue.severity.upper()}] {issue.message}\n")
f.write(f" - **Suggestion:** {issue.suggestion}\n")
f.write("\n")
if harness.quality.recommended_actions:
f.write("**Actions:**\n")
for action in harness.quality.recommended_actions:
f.write(f"- {action}\n")
f.write("\n")
# Entry point
harness_tester = HarnessTesterModule()

View File

@@ -0,0 +1,486 @@
"""Feedback generator with actionable suggestions for AI agents."""
from module.feedback import (
CoverageMetrics,
FeedbackCategory,
FeedbackIssue,
FeedbackSeverity,
PerformanceMetrics,
QualityAssessment,
StabilityMetrics,
)
class FeedbackGenerator:
"""Generates actionable feedback based on harness test results."""
@staticmethod
def analyze_compilation(
compilation_result: dict,
) -> tuple[list[FeedbackIssue], list[str]]:
"""Analyze compilation results and generate feedback.
:param compilation_result: Compilation output and errors
:returns: Tuple of (issues, strengths)
"""
issues = []
strengths = []
if not compilation_result.get("success"):
errors = compilation_result.get("errors", [])
for error in errors:
# Analyze specific error types
if "cannot find" in error.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.COMPILATION,
severity=FeedbackSeverity.CRITICAL,
type="undefined_variable",
message=f"Compilation error: {error}",
suggestion="Check variable names match the function signature. Use the exact names from fuzzable_functions.json.",
details={"error": error},
)
)
elif "mismatched types" in error.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.COMPILATION,
severity=FeedbackSeverity.CRITICAL,
type="type_mismatch",
message=f"Type mismatch: {error}",
suggestion="Check the function expects the types you're passing. Convert fuzzer input to the correct type (e.g., &[u8] to &str with from_utf8).",
details={"error": error},
)
)
elif "trait" in error.lower() and "not implemented" in error.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.COMPILATION,
severity=FeedbackSeverity.CRITICAL,
type="trait_not_implemented",
message=f"Trait not implemented: {error}",
suggestion="Ensure you're using the correct types. Some functions require specific trait implementations.",
details={"error": error},
)
)
else:
issues.append(
FeedbackIssue(
category=FeedbackCategory.COMPILATION,
severity=FeedbackSeverity.CRITICAL,
type="compilation_error",
message=f"Compilation failed: {error}",
suggestion="Review the error message and fix syntax/type issues. Check function signatures in the source code.",
details={"error": error},
)
)
else:
strengths.append("Compiles successfully")
# Check for warnings
warnings = compilation_result.get("warnings", [])
if warnings:
for warning in warnings[:3]: # Limit to 3 most important
if "unused" in warning.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.CODE_QUALITY,
severity=FeedbackSeverity.INFO,
type="unused_variable",
message=f"Code quality: {warning}",
suggestion="Remove unused variables or use underscore prefix (_variable) to suppress warning.",
details={"warning": warning},
)
)
return issues, strengths
@staticmethod
def analyze_execution(
execution_result: dict,
) -> tuple[list[FeedbackIssue], list[str]]:
"""Analyze execution results.
:param execution_result: Execution test results
:returns: Tuple of (issues, strengths)
"""
issues = []
strengths = []
if not execution_result.get("success"):
if execution_result.get("immediate_crash"):
crash_details = execution_result.get("crash_details", "")
if "stack overflow" in crash_details.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.EXECUTION,
severity=FeedbackSeverity.CRITICAL,
type="stack_overflow",
message="Harness crashes immediately with stack overflow",
suggestion="Check for infinite recursion or large stack allocations. Use heap allocation (Box, Vec) for large data structures.",
details={"crash": crash_details},
)
)
elif "panic" in crash_details.lower():
issues.append(
FeedbackIssue(
category=FeedbackCategory.EXECUTION,
severity=FeedbackSeverity.CRITICAL,
type="panic_on_start",
message="Harness panics immediately",
suggestion="Check initialization code. Ensure required resources are available and input validation doesn't panic on empty input.",
details={"crash": crash_details},
)
)
else:
issues.append(
FeedbackIssue(
category=FeedbackCategory.EXECUTION,
severity=FeedbackSeverity.CRITICAL,
type="immediate_crash",
message=f"Harness crashes immediately: {crash_details}",
suggestion="Debug the harness initialization. Add error handling and check for null/invalid pointers.",
details={"crash": crash_details},
)
)
elif execution_result.get("timeout"):
issues.append(
FeedbackIssue(
category=FeedbackCategory.EXECUTION,
severity=FeedbackSeverity.CRITICAL,
type="infinite_loop",
message="Harness times out - likely infinite loop",
suggestion="Check for loops that depend on fuzzer input. Add iteration limits or timeout mechanisms.",
details={},
)
)
else:
strengths.append("Executes without crashing")
return issues, strengths
@staticmethod
def analyze_coverage(
coverage: CoverageMetrics,
) -> tuple[list[FeedbackIssue], list[str]]:
"""Analyze coverage metrics.
:param coverage: Coverage metrics from fuzzing trial
:returns: Tuple of (issues, strengths)
"""
issues = []
strengths = []
# No coverage growth
if coverage.new_edges_found == 0:
issues.append(
FeedbackIssue(
category=FeedbackCategory.COVERAGE,
severity=FeedbackSeverity.CRITICAL,
type="no_coverage",
message="No coverage detected - harness may not be using fuzzer input",
suggestion="Ensure you're actually calling the target function with fuzzer-provided data. Check that 'data' parameter is passed to the function being fuzzed.",
details={"initial_edges": coverage.initial_edges},
)
)
# Very low coverage
elif coverage.growth_rate == "none" or (
coverage.percentage_estimate and coverage.percentage_estimate < 5
):
issues.append(
FeedbackIssue(
category=FeedbackCategory.COVERAGE,
severity=FeedbackSeverity.WARNING,
type="very_low_coverage",
message=f"Very low coverage: ~{coverage.percentage_estimate}%",
suggestion="Harness may not be reaching the target code. Verify you're calling the correct entry point function. Check if there's input validation that rejects all fuzzer data.",
details={
"percentage": coverage.percentage_estimate,
"edges": coverage.final_edges,
},
)
)
# Low coverage
elif coverage.growth_rate == "poor" or (
coverage.percentage_estimate and coverage.percentage_estimate < 20
):
issues.append(
FeedbackIssue(
category=FeedbackCategory.COVERAGE,
severity=FeedbackSeverity.WARNING,
type="low_coverage",
message=f"Low coverage: {coverage.percentage_estimate}% - not exploring enough code paths",
suggestion="Try fuzzing multiple entry points or remove restrictive input validation. Consider using a dictionary for structured inputs.",
details={
"percentage": coverage.percentage_estimate,
"new_edges": coverage.new_edges_found,
},
)
)
# Good coverage
elif coverage.growth_rate in ["good", "excellent"]:
if coverage.percentage_estimate and coverage.percentage_estimate > 50:
strengths.append(
f"Excellent coverage: {coverage.percentage_estimate}% of target code reached"
)
else:
strengths.append("Good coverage growth - harness is exploring code paths")
# Coverage stagnation
if (
coverage.stagnation_time_sec
and coverage.stagnation_time_sec < 10
and coverage.final_edges < 500
):
issues.append(
FeedbackIssue(
category=FeedbackCategory.COVERAGE,
severity=FeedbackSeverity.INFO,
type="early_stagnation",
message=f"Coverage stopped growing after {coverage.stagnation_time_sec}s",
suggestion="Harness may be hitting input validation barriers. Consider fuzzing with a seed corpus of valid inputs.",
details={"stagnation_time": coverage.stagnation_time_sec},
)
)
return issues, strengths
@staticmethod
def analyze_performance(
performance: PerformanceMetrics,
) -> tuple[list[FeedbackIssue], list[str]]:
"""Analyze performance metrics.
:param performance: Performance metrics from fuzzing trial
:returns: Tuple of (issues, strengths)
"""
issues = []
strengths = []
execs_per_sec = performance.execs_per_sec
# Very slow execution
if execs_per_sec < 10:
issues.append(
FeedbackIssue(
category=FeedbackCategory.PERFORMANCE,
severity=FeedbackSeverity.CRITICAL,
type="extremely_slow",
message=f"Extremely slow: {execs_per_sec:.1f} execs/sec",
suggestion="Remove file I/O, network operations, or expensive computations from the harness loop. Move setup code outside the fuzz target function.",
details={"execs_per_sec": execs_per_sec},
)
)
# Slow execution
elif execs_per_sec < 100:
issues.append(
FeedbackIssue(
category=FeedbackCategory.PERFORMANCE,
severity=FeedbackSeverity.WARNING,
type="slow_execution",
message=f"Slow execution: {execs_per_sec:.1f} execs/sec (expected 500+)",
suggestion="Optimize harness: avoid allocations in hot path, reuse buffers, remove logging. Profile to find bottlenecks.",
details={"execs_per_sec": execs_per_sec},
)
)
# Good performance
elif execs_per_sec > 1000:
strengths.append(f"Excellent performance: {execs_per_sec:.0f} execs/sec")
elif execs_per_sec > 500:
strengths.append(f"Good performance: {execs_per_sec:.0f} execs/sec")
return issues, strengths
@staticmethod
def analyze_stability(
stability: StabilityMetrics,
) -> tuple[list[FeedbackIssue], list[str]]:
"""Analyze stability metrics.
:param stability: Stability metrics from fuzzing trial
:returns: Tuple of (issues, strengths)
"""
issues = []
strengths = []
if stability.status == "crashes_frequently":
issues.append(
FeedbackIssue(
category=FeedbackCategory.STABILITY,
severity=FeedbackSeverity.WARNING,
type="unstable_frequent_crashes",
message=f"Harness crashes frequently: {stability.crash_rate:.1f} crashes per 1000 execs",
suggestion="This might be expected if testing buggy code. If not, add error handling for edge cases or invalid inputs.",
details={
"crashes": stability.crashes_found,
"crash_rate": stability.crash_rate,
},
)
)
elif stability.status == "hangs":
issues.append(
FeedbackIssue(
category=FeedbackCategory.STABILITY,
severity=FeedbackSeverity.WARNING,
type="hangs_detected",
message=f"Harness hangs: {stability.hangs_found} detected",
suggestion="Add timeouts to prevent infinite loops. Check for blocking operations or resource exhaustion.",
details={"hangs": stability.hangs_found},
)
)
elif stability.status == "stable":
strengths.append("Stable execution - no crashes or hangs")
# Finding crashes can be good!
if stability.unique_crashes > 0 and stability.status != "crashes_frequently":
strengths.append(
f"Found {stability.unique_crashes} potential bugs during trial!"
)
return issues, strengths
@staticmethod
def calculate_quality_score(
compilation_success: bool,
execution_success: bool,
coverage: CoverageMetrics | None,
performance: PerformanceMetrics | None,
stability: StabilityMetrics | None,
) -> int:
"""Calculate overall quality score (0-100).
:param compilation_success: Whether compilation succeeded
:param execution_success: Whether execution succeeded
:param coverage: Coverage metrics
:param performance: Performance metrics
:param stability: Stability metrics
:returns: Quality score 0-100
"""
if not compilation_success:
return 0
if not execution_success:
return 10
score = 20 # Base score for compiling and running
# Coverage contribution (0-40 points)
if coverage:
if coverage.growth_rate == "excellent":
score += 40
elif coverage.growth_rate == "good":
score += 30
elif coverage.growth_rate == "poor":
score += 10
# Performance contribution (0-25 points)
if performance:
if performance.execs_per_sec > 1000:
score += 25
elif performance.execs_per_sec > 500:
score += 20
elif performance.execs_per_sec > 100:
score += 10
elif performance.execs_per_sec > 10:
score += 5
# Stability contribution (0-15 points)
if stability:
if stability.status == "stable":
score += 15
elif stability.status == "unstable":
score += 10
elif stability.status == "crashes_frequently":
score += 5
return min(score, 100)
@classmethod
def generate_quality_assessment(
cls,
compilation_result: dict,
execution_result: dict | None,
coverage: CoverageMetrics | None,
performance: PerformanceMetrics | None,
stability: StabilityMetrics | None,
) -> QualityAssessment:
"""Generate complete quality assessment with all feedback.
:param compilation_result: Compilation results
:param execution_result: Execution results
:param coverage: Coverage metrics
:param performance: Performance metrics
:param stability: Stability metrics
:returns: Complete quality assessment
"""
all_issues = []
all_strengths = []
# Analyze each aspect
comp_issues, comp_strengths = cls.analyze_compilation(compilation_result)
all_issues.extend(comp_issues)
all_strengths.extend(comp_strengths)
if execution_result:
exec_issues, exec_strengths = cls.analyze_execution(execution_result)
all_issues.extend(exec_issues)
all_strengths.extend(exec_strengths)
if coverage:
cov_issues, cov_strengths = cls.analyze_coverage(coverage)
all_issues.extend(cov_issues)
all_strengths.extend(cov_strengths)
if performance:
perf_issues, perf_strengths = cls.analyze_performance(performance)
all_issues.extend(perf_issues)
all_strengths.extend(perf_strengths)
if stability:
stab_issues, stab_strengths = cls.analyze_stability(stability)
all_issues.extend(stab_issues)
all_strengths.extend(stab_strengths)
# Calculate score
score = cls.calculate_quality_score(
compilation_result.get("success", False),
execution_result.get("success", False) if execution_result else False,
coverage,
performance,
stability,
)
# Determine verdict
if score >= 70:
verdict = "production-ready"
elif score >= 30:
verdict = "needs-improvement"
else:
verdict = "broken"
# Generate recommended actions
recommended_actions = []
critical_issues = [i for i in all_issues if i.severity == FeedbackSeverity.CRITICAL]
warning_issues = [i for i in all_issues if i.severity == FeedbackSeverity.WARNING]
if critical_issues:
recommended_actions.append(
f"Fix {len(critical_issues)} critical issue(s) preventing execution"
)
if warning_issues:
recommended_actions.append(
f"Address {len(warning_issues)} warning(s) to improve harness quality"
)
if verdict == "production-ready":
recommended_actions.append("Harness is ready for production fuzzing")
return QualityAssessment(
score=score,
verdict=verdict,
issues=all_issues,
strengths=all_strengths,
recommended_actions=recommended_actions,
)

View File

@@ -0,0 +1,148 @@
"""Feedback types and schemas for harness testing."""
from enum import Enum
from typing import Any
from pydantic import BaseModel, Field
class FeedbackSeverity(str, Enum):
"""Severity levels for feedback issues."""
CRITICAL = "critical" # Blocks execution (compilation errors, crashes)
WARNING = "warning" # Should fix (low coverage, slow execution)
INFO = "info" # Nice to have (optimization suggestions)
class FeedbackCategory(str, Enum):
"""Categories of feedback."""
COMPILATION = "compilation"
EXECUTION = "execution"
PERFORMANCE = "performance"
COVERAGE = "coverage"
STABILITY = "stability"
CODE_QUALITY = "code_quality"
class FeedbackIssue(BaseModel):
"""A single feedback issue with actionable suggestion."""
category: FeedbackCategory
severity: FeedbackSeverity
type: str = Field(description="Specific issue type (e.g., 'low_coverage', 'compilation_error')")
message: str = Field(description="Human-readable description of the issue")
suggestion: str = Field(description="Actionable suggestion for AI agent to fix the issue")
details: dict[str, Any] = Field(default_factory=dict, description="Additional technical details")
class CompilationResult(BaseModel):
"""Results from compilation attempt."""
success: bool
time_ms: int | None = None
errors: list[str] = Field(default_factory=list)
warnings: list[str] = Field(default_factory=list)
stderr: str | None = None
class ExecutionResult(BaseModel):
"""Results from execution test."""
success: bool
runs_completed: int | None = None
immediate_crash: bool = False
timeout: bool = False
crash_details: str | None = None
class CoverageMetrics(BaseModel):
"""Coverage metrics from fuzzing trial."""
initial_edges: int = 0
final_edges: int = 0
new_edges_found: int = 0
growth_rate: str = Field(
description="Qualitative assessment: 'excellent', 'good', 'poor', 'none'"
)
percentage_estimate: float | None = Field(
None, description="Estimated percentage of target code covered"
)
stagnation_time_sec: float | None = Field(
None, description="Time until coverage stopped growing"
)
class PerformanceMetrics(BaseModel):
"""Performance metrics from fuzzing trial."""
total_execs: int
execs_per_sec: float
average_exec_time_us: float | None = None
performance_rating: str = Field(
description="'excellent' (>1000/s), 'good' (100-1000/s), 'poor' (<100/s)"
)
class StabilityMetrics(BaseModel):
"""Stability metrics from fuzzing trial."""
status: str = Field(
description="'stable', 'unstable', 'crashes_frequently', 'hangs'"
)
crashes_found: int = 0
hangs_found: int = 0
unique_crashes: int = 0
crash_rate: float = Field(0.0, description="Crashes per 1000 executions")
class FuzzingTrial(BaseModel):
"""Results from short fuzzing trial."""
duration_seconds: int
coverage: CoverageMetrics
performance: PerformanceMetrics
stability: StabilityMetrics
trial_successful: bool
class QualityAssessment(BaseModel):
"""Overall quality assessment of the harness."""
score: int = Field(ge=0, le=100, description="Quality score 0-100")
verdict: str = Field(
description="'production-ready', 'needs-improvement', 'broken'"
)
issues: list[FeedbackIssue] = Field(default_factory=list)
strengths: list[str] = Field(default_factory=list)
recommended_actions: list[str] = Field(default_factory=list)
class HarnessEvaluation(BaseModel):
"""Complete evaluation of a single harness."""
name: str
path: str | None = None
compilation: CompilationResult
execution: ExecutionResult | None = None
fuzzing_trial: FuzzingTrial | None = None
quality: QualityAssessment
class EvaluationSummary(BaseModel):
"""Summary of all harness evaluations."""
total_harnesses: int
production_ready: int
needs_improvement: int
broken: int
average_score: float
recommended_action: str
class HarnessTestReport(BaseModel):
"""Complete harness testing report."""
harnesses: list[HarnessEvaluation]
summary: EvaluationSummary
test_configuration: dict[str, Any] = Field(default_factory=dict)

View File

@@ -1,5 +1,7 @@
FROM localhost/fuzzforge-modules-sdk:0.1.0 FROM localhost/fuzzforge-modules-sdk:0.1.0
# Module metadata is now read from pyproject.toml [tool.fuzzforge.module] section
# Install system dependencies # Install system dependencies
RUN apt-get update && apt-get install -y \ RUN apt-get update && apt-get install -y \
curl \ curl \

View File

@@ -1,7 +1,7 @@
[project] [project]
name = "rust-analyzer" name = "fuzzforge-rust-analyzer"
version = "0.0.1" version = "0.1.0"
description = "FIXME" description = "Analyzes Rust projects to identify functions suitable for fuzzing"
authors = [] authors = []
readme = "README.md" readme = "README.md"
requires-python = ">=3.14" requires-python = ">=3.14"
@@ -26,3 +26,30 @@ module = "module.__main__:main"
[tool.uv] [tool.uv]
package = true package = true
# FuzzForge module metadata for AI agent discovery
[tool.fuzzforge.module]
identifier = "fuzzforge-rust-analyzer"
category = "analyzer"
language = "rust"
pipeline_stage = "analysis"
pipeline_order = 1
dependencies = []
continuous_mode = false
typical_duration = "30s"
use_cases = [
"Analyze Rust crate to find fuzzable functions",
"First step in Rust fuzzing pipeline before harness generation",
"Produces fuzzable_functions.json for AI harness generation"
]
input_requirements = [
"rust-source-code",
"Cargo.toml"
]
output_artifacts = [
"fuzzable_functions.json",
"analysis_report.md"
]

View File

@@ -322,14 +322,21 @@ class ModuleExecutor:
self, self,
assets_path: Path, assets_path: Path,
configuration: dict[str, Any] | None = None, configuration: dict[str, Any] | None = None,
project_path: Path | None = None,
execution_id: str | None = None,
) -> Path: ) -> Path:
"""Prepare input directory with assets and configuration. """Prepare input directory with assets and configuration.
Creates a temporary directory with input.json describing all resources. Creates a directory with input.json describing all resources.
This directory can be volume-mounted into the container. This directory can be volume-mounted into the container.
If assets_path is a directory, it is used directly (zero-copy mount).
If assets_path is a file (e.g., tar.gz), it is extracted first.
:param assets_path: Path to the assets (file or directory). :param assets_path: Path to the assets (file or directory).
:param configuration: Optional module configuration dict. :param configuration: Optional module configuration dict.
:param project_path: Project directory for storing inputs in .fuzzforge/.
:param execution_id: Execution ID for organizing inputs.
:returns: Path to prepared input directory. :returns: Path to prepared input directory.
:raises SandboxError: If preparation fails. :raises SandboxError: If preparation fails.
@@ -339,12 +346,65 @@ class ModuleExecutor:
logger.info("preparing input directory", assets=str(assets_path)) logger.info("preparing input directory", assets=str(assets_path))
try: try:
# Create temporary directory - caller must clean it up after container finishes # If assets_path is already a directory, use it directly (zero-copy mount)
from tempfile import mkdtemp if assets_path.exists() and assets_path.is_dir():
# Create input.json directly in the source directory
input_json_path = assets_path / "input.json"
# Scan files and build resource list
resources = []
for item in assets_path.iterdir():
if item.name == "input.json":
continue
if item.is_file():
resources.append(
{
"name": item.stem,
"description": f"Input file: {item.name}",
"kind": "unknown",
"path": f"/data/input/{item.name}",
}
)
elif item.is_dir():
resources.append(
{
"name": item.name,
"description": f"Input directory: {item.name}",
"kind": "unknown",
"path": f"/data/input/{item.name}",
}
)
temp_path = Path(mkdtemp(prefix="fuzzforge-input-")) input_data = {
"settings": configuration or {},
"resources": resources,
}
input_json_path.write_text(json.dumps(input_data, indent=2))
# Copy assets to temp directory logger.debug("using source directory directly", path=str(assets_path))
return assets_path
# File input: extract to a directory first
# Determine input directory location
if project_path:
# Store inputs in .fuzzforge/inputs/ for visibility
from fuzzforge_runner.storage import FUZZFORGE_DIR_NAME
exec_id = execution_id or "latest"
input_dir = project_path / FUZZFORGE_DIR_NAME / "inputs" / exec_id
input_dir.mkdir(parents=True, exist_ok=True)
# Clean previous contents if exists
import shutil
for item in input_dir.iterdir():
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
else:
# Fallback to temporary directory
from tempfile import mkdtemp
input_dir = Path(mkdtemp(prefix="fuzzforge-input-"))
# Copy/extract assets to input directory
if assets_path.exists(): if assets_path.exists():
if assets_path.is_file(): if assets_path.is_file():
# Check if it's a tar.gz archive that needs extraction # Check if it's a tar.gz archive that needs extraction
@@ -353,26 +413,26 @@ class ModuleExecutor:
import tarfile import tarfile
with tarfile.open(assets_path, "r:gz") as tar: with tarfile.open(assets_path, "r:gz") as tar:
tar.extractall(path=temp_path) tar.extractall(path=input_dir)
logger.debug("extracted tar.gz archive", archive=str(assets_path)) logger.debug("extracted tar.gz archive", archive=str(assets_path))
else: else:
# Single file - copy it # Single file - copy it
import shutil import shutil
shutil.copy2(assets_path, temp_path / assets_path.name) shutil.copy2(assets_path, input_dir / assets_path.name)
else: else:
# Directory - copy all files (including subdirectories) # Directory - copy all files (including subdirectories)
import shutil import shutil
for item in assets_path.iterdir(): for item in assets_path.iterdir():
if item.is_file(): if item.is_file():
shutil.copy2(item, temp_path / item.name) shutil.copy2(item, input_dir / item.name)
elif item.is_dir(): elif item.is_dir():
shutil.copytree(item, temp_path / item.name) shutil.copytree(item, input_dir / item.name, dirs_exist_ok=True)
# Scan files and directories and build resource list # Scan files and directories and build resource list
resources = [] resources = []
for item in temp_path.iterdir(): for item in input_dir.iterdir():
if item.name == "input.json": if item.name == "input.json":
continue continue
if item.is_file(): if item.is_file():
@@ -399,11 +459,11 @@ class ModuleExecutor:
"settings": configuration or {}, "settings": configuration or {},
"resources": resources, "resources": resources,
} }
input_json_path = temp_path / "input.json" input_json_path = input_dir / "input.json"
input_json_path.write_text(json.dumps(input_data, indent=2)) input_json_path.write_text(json.dumps(input_data, indent=2))
logger.debug("prepared input directory", resources=len(resources), path=str(temp_path)) logger.debug("prepared input directory", resources=len(resources), path=str(input_dir))
return temp_path return input_dir
except Exception as exc: except Exception as exc:
message = f"Failed to prepare input directory" message = f"Failed to prepare input directory"
@@ -542,6 +602,8 @@ class ModuleExecutor:
module_identifier: str, module_identifier: str,
assets_path: Path, assets_path: Path,
configuration: dict[str, Any] | None = None, configuration: dict[str, Any] | None = None,
project_path: Path | None = None,
execution_id: str | None = None,
) -> Path: ) -> Path:
"""Execute a module end-to-end. """Execute a module end-to-end.
@@ -552,9 +614,17 @@ class ModuleExecutor:
4. Pull results 4. Pull results
5. Terminate sandbox 5. Terminate sandbox
All intermediate files are stored in {project_path}/.fuzzforge/ for
easy debugging and visibility.
Source directories are mounted directly without tar.gz compression
for better performance.
:param module_identifier: Name/identifier of the module to execute. :param module_identifier: Name/identifier of the module to execute.
:param assets_path: Path to the input assets archive. :param assets_path: Path to the input assets (file or directory).
:param configuration: Optional module configuration. :param configuration: Optional module configuration.
:param project_path: Project directory for .fuzzforge/ storage.
:param execution_id: Execution ID for organizing files.
:returns: Path to the results archive. :returns: Path to the results archive.
:raises ModuleExecutionError: If any step fails. :raises ModuleExecutionError: If any step fails.
@@ -562,10 +632,20 @@ class ModuleExecutor:
logger = get_logger() logger = get_logger()
sandbox: str | None = None sandbox: str | None = None
input_dir: Path | None = None input_dir: Path | None = None
# Don't cleanup if we're using the source directory directly
cleanup_input = False
try: try:
# 1. Prepare input directory with assets # 1. Prepare input directory with assets
input_dir = self.prepare_input_directory(assets_path, configuration) input_dir = self.prepare_input_directory(
assets_path,
configuration,
project_path=project_path,
execution_id=execution_id,
)
# Only cleanup if we created a temp directory (file input case)
cleanup_input = input_dir != assets_path and project_path is None
# 2. Spawn sandbox with volume mount # 2. Spawn sandbox with volume mount
sandbox = self.spawn_sandbox(module_identifier, input_volume=input_dir) sandbox = self.spawn_sandbox(module_identifier, input_volume=input_dir)
@@ -585,12 +665,12 @@ class ModuleExecutor:
return results_path return results_path
finally: finally:
# 5. Always cleanup # 5. Always cleanup sandbox
if sandbox: if sandbox:
self.terminate_sandbox(sandbox) self.terminate_sandbox(sandbox)
if input_dir and input_dir.exists(): # Only cleanup input if it was a temp directory
if cleanup_input and input_dir and input_dir.exists():
import shutil import shutil
shutil.rmtree(input_dir, ignore_errors=True) shutil.rmtree(input_dir, ignore_errors=True)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
@@ -602,22 +682,34 @@ class ModuleExecutor:
module_identifier: str, module_identifier: str,
assets_path: Path, assets_path: Path,
configuration: dict[str, Any] | None = None, configuration: dict[str, Any] | None = None,
project_path: Path | None = None,
execution_id: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Start a module in continuous/background mode without waiting. """Start a module in continuous/background mode without waiting.
Returns immediately with container info. Use read_module_output() to Returns immediately with container info. Use read_module_output() to
get current status and stop_module_continuous() to stop. get current status and stop_module_continuous() to stop.
Source directories are mounted directly without tar.gz compression
for better performance.
:param module_identifier: Name/identifier of the module to execute. :param module_identifier: Name/identifier of the module to execute.
:param assets_path: Path to the input assets archive. :param assets_path: Path to the input assets (file or directory).
:param configuration: Optional module configuration. :param configuration: Optional module configuration.
:param project_path: Project directory for .fuzzforge/ storage.
:param execution_id: Execution ID for organizing files.
:returns: Dict with container_id, input_dir for later cleanup. :returns: Dict with container_id, input_dir for later cleanup.
""" """
logger = get_logger() logger = get_logger()
# 1. Prepare input directory with assets # 1. Prepare input directory with assets
input_dir = self.prepare_input_directory(assets_path, configuration) input_dir = self.prepare_input_directory(
assets_path,
configuration,
project_path=project_path,
execution_id=execution_id,
)
# 2. Spawn sandbox with volume mount # 2. Spawn sandbox with volume mount
sandbox = self.spawn_sandbox(module_identifier, input_volume=input_dir) sandbox = self.spawn_sandbox(module_identifier, input_volume=input_dir)

View File

@@ -214,11 +214,13 @@ class WorkflowOrchestrator:
message = f"No assets available for step {step_index}" message = f"No assets available for step {step_index}"
raise WorkflowExecutionError(message) raise WorkflowExecutionError(message)
# Execute the module # Execute the module (inputs stored in .fuzzforge/inputs/)
results_path = await self._executor.execute( results_path = await self._executor.execute(
module_identifier=step.module_identifier, module_identifier=step.module_identifier,
assets_path=current_assets, assets_path=current_assets,
configuration=step.configuration, configuration=step.configuration,
project_path=project_path,
execution_id=step_execution_id,
) )
completed_at = datetime.now(UTC) completed_at = datetime.now(UTC)

View File

@@ -53,6 +53,36 @@ class ModuleInfo:
#: Whether module image exists locally. #: Whether module image exists locally.
available: bool = True available: bool = True
#: Module category (analyzer, validator, fuzzer, reporter).
category: str | None = None
#: Target programming language (e.g., "rust", "python").
language: str | None = None
#: Pipeline stage name (e.g., "analysis", "fuzzing").
pipeline_stage: str | None = None
#: Numeric order in pipeline for sorting.
pipeline_order: int | None = None
#: Module identifiers that must run before this one.
dependencies: list[str] | None = None
#: Whether module supports continuous/background execution.
continuous_mode: bool = False
#: Expected runtime (e.g., "30s", "5m", "continuous").
typical_duration: str | None = None
#: Typical use cases and scenarios for this module.
use_cases: list[str] | None = None
#: Input requirements (e.g., ["rust-source-code", "Cargo.toml"]).
input_requirements: list[str] | None = None
#: Output artifacts produced (e.g., ["fuzzable_functions.json"]).
output_artifacts: list[str] | None = None
class Runner: class Runner:
"""Main FuzzForge Runner interface. """Main FuzzForge Runner interface.
@@ -125,16 +155,19 @@ class Runner:
return self._storage.init_project(project_path) return self._storage.init_project(project_path)
def set_project_assets(self, project_path: Path, assets_path: Path) -> Path: def set_project_assets(self, project_path: Path, assets_path: Path) -> Path:
"""Set initial assets for a project. """Set source path for a project (no copying).
Just stores a reference to the source directory.
The source is mounted directly into containers at runtime.
:param project_path: Path to the project directory. :param project_path: Path to the project directory.
:param assets_path: Path to assets (file or directory). :param assets_path: Path to source directory.
:returns: Path to stored assets. :returns: The assets path (unchanged).
""" """
logger = get_logger() logger = get_logger()
logger.info("setting project assets", project=str(project_path), assets=str(assets_path)) logger.info("setting project assets", project=str(project_path), assets=str(assets_path))
return self._storage.store_assets(project_path, assets_path) return self._storage.set_project_assets(project_path, assets_path)
# ------------------------------------------------------------------------- # -------------------------------------------------------------------------
# Module Discovery # Module Discovery
@@ -182,12 +215,15 @@ class Runner:
"""List available module images from the container engine. """List available module images from the container engine.
Uses the container engine API to discover built module images. Uses the container engine API to discover built module images.
Reads metadata from pyproject.toml inside each image.
:param filter_prefix: Prefix to filter images (default: "fuzzforge-"). :param filter_prefix: Prefix to filter images (default: "fuzzforge-").
:param include_all_tags: If True, include all image tags, not just 'latest'. :param include_all_tags: If True, include all image tags, not just 'latest'.
:returns: List of available module images. :returns: List of available module images.
""" """
import tomllib # noqa: PLC0415
logger = get_logger() logger = get_logger()
modules: list[ModuleInfo] = [] modules: list[ModuleInfo] = []
seen: set[str] = set() seen: set[str] = set()
@@ -223,18 +259,67 @@ class Runner:
# Add unique modules # Add unique modules
if module_name not in seen: if module_name not in seen:
seen.add(module_name) seen.add(module_name)
# Read metadata from pyproject.toml inside the image
image_ref = f"{image.repository}:{image.tag}"
module_meta = self._get_module_metadata_from_image(engine, image_ref)
# Get basic info from pyproject.toml [project] section
project_info = module_meta.get("_project", {})
fuzzforge_meta = module_meta.get("module", {})
modules.append( modules.append(
ModuleInfo( ModuleInfo(
identifier=module_name, identifier=fuzzforge_meta.get("identifier", module_name),
description=None, description=project_info.get("description"),
version=image.tag, version=project_info.get("version", image.tag),
available=True, available=True,
category=fuzzforge_meta.get("category"),
language=fuzzforge_meta.get("language"),
pipeline_stage=fuzzforge_meta.get("pipeline_stage"),
pipeline_order=fuzzforge_meta.get("pipeline_order"),
dependencies=fuzzforge_meta.get("dependencies", []),
continuous_mode=fuzzforge_meta.get("continuous_mode", False),
typical_duration=fuzzforge_meta.get("typical_duration"),
use_cases=fuzzforge_meta.get("use_cases", []),
input_requirements=fuzzforge_meta.get("input_requirements", []),
output_artifacts=fuzzforge_meta.get("output_artifacts", []),
) )
) )
logger.info("listed module images", count=len(modules)) logger.info("listed module images", count=len(modules))
return modules return modules
def _get_module_metadata_from_image(self, engine: Any, image_ref: str) -> dict:
"""Read module metadata from pyproject.toml inside a container image.
:param engine: Container engine instance.
:param image_ref: Image reference (e.g., "fuzzforge-rust-analyzer:latest").
:returns: Dict with module metadata from [tool.fuzzforge] section.
"""
import tomllib # noqa: PLC0415
logger = get_logger()
try:
# Read pyproject.toml from the image
content = engine.read_file_from_image(image_ref, "/app/pyproject.toml")
if not content:
logger.debug("no pyproject.toml found in image", image=image_ref)
return {}
pyproject = tomllib.loads(content)
# Return the [tool.fuzzforge] section plus [project] info
result = pyproject.get("tool", {}).get("fuzzforge", {})
result["_project"] = pyproject.get("project", {})
return result
except Exception as exc:
logger.debug("failed to read metadata from image", image=image_ref, error=str(exc))
return {}
def get_module_info(self, module_identifier: str) -> ModuleInfo | None: def get_module_info(self, module_identifier: str) -> ModuleInfo | None:
"""Get information about a specific module. """Get information about a specific module.

View File

@@ -34,23 +34,14 @@ class EngineSettings(BaseModel):
class StorageSettings(BaseModel): class StorageSettings(BaseModel):
"""Storage configuration for local or S3 storage.""" """Storage configuration for local filesystem storage.
#: Storage backend type. OSS uses direct file mounting without archiving for simplicity.
type: Literal["local", "s3"] = "local" """
#: Base path for local storage (used when type is "local"). #: Base path for local storage.
path: Path = Field(default=Path.home() / ".fuzzforge" / "storage") path: Path = Field(default=Path.home() / ".fuzzforge" / "storage")
#: S3 endpoint URL (used when type is "s3").
s3_endpoint: str | None = None
#: S3 access key (used when type is "s3").
s3_access_key: str | None = None
#: S3 secret key (used when type is "s3").
s3_secret_key: str | None = None
class ProjectSettings(BaseModel): class ProjectSettings(BaseModel):
"""Project configuration.""" """Project configuration."""

View File

@@ -1,16 +1,20 @@
"""FuzzForge Runner - Local filesystem storage. """FuzzForge Runner - Local filesystem storage.
This module provides local filesystem storage as an alternative to S3, This module provides local filesystem storage for OSS deployments.
enabling zero-configuration operation for OSS deployments.
Storage is placed directly in the project directory as `.fuzzforge/`
for maximum visibility and ease of debugging.
In OSS mode, source files are referenced (not copied) and mounted
directly into containers at runtime for zero-copy performance.
""" """
from __future__ import annotations from __future__ import annotations
import shutil import shutil
from pathlib import Path, PurePath from pathlib import Path
from tarfile import open as Archive # noqa: N812 from tarfile import open as Archive # noqa: N812
from tempfile import NamedTemporaryFile, TemporaryDirectory
from typing import TYPE_CHECKING, cast from typing import TYPE_CHECKING, cast
from fuzzforge_runner.constants import RESULTS_ARCHIVE_FILENAME from fuzzforge_runner.constants import RESULTS_ARCHIVE_FILENAME
@@ -19,6 +23,9 @@ from fuzzforge_runner.exceptions import StorageError
if TYPE_CHECKING: if TYPE_CHECKING:
from structlog.stdlib import BoundLogger from structlog.stdlib import BoundLogger
#: Name of the FuzzForge storage directory within projects.
FUZZFORGE_DIR_NAME: str = ".fuzzforge"
def get_logger() -> BoundLogger: def get_logger() -> BoundLogger:
"""Get structlog logger instance. """Get structlog logger instance.
@@ -32,33 +39,36 @@ def get_logger() -> BoundLogger:
class LocalStorage: class LocalStorage:
"""Local filesystem storage backend. """Local filesystem storage backend for FuzzForge OSS.
Provides S3-like operations using local filesystem, enabling Provides lightweight storage for execution results while using
FuzzForge operation without external storage infrastructure. direct source mounting (no copying) for input assets.
Directory structure: Storage is placed directly in the project directory as `.fuzzforge/`
{base_path}/ so users can easily inspect outputs and configuration.
projects/
{project_id}/ Directory structure (inside project directory):
assets/ # Initial project assets {project_path}/.fuzzforge/
runs/ config.json # Project config (source path reference)
{execution_id}/ runs/ # Execution results
{execution_id}/
results.tar.gz
{workflow_id}/
modules/
step-0-{exec_id}/
results.tar.gz results.tar.gz
{workflow_id}/
modules/ Source files are NOT copied - they are referenced and mounted directly.
step-0-{exec_id}/
results.tar.gz
""" """
#: Base path for all storage operations. #: Base path for global storage (only used for fallback/config).
_base_path: Path _base_path: Path
def __init__(self, base_path: Path) -> None: def __init__(self, base_path: Path) -> None:
"""Initialize an instance of the class. """Initialize an instance of the class.
:param base_path: Root directory for storage. :param base_path: Root directory for global storage (fallback only).
""" """
self._base_path = base_path self._base_path = base_path
@@ -71,17 +81,22 @@ class LocalStorage:
def _get_project_path(self, project_path: Path) -> Path: def _get_project_path(self, project_path: Path) -> Path:
"""Get the storage path for a project. """Get the storage path for a project.
:param project_path: Original project path (used as identifier). Storage is placed directly inside the project as `.fuzzforge/`.
:returns: Storage path for the project.
:param project_path: Path to the project directory.
:returns: Storage path for the project (.fuzzforge inside project).
""" """
# Use project path name as identifier return project_path / FUZZFORGE_DIR_NAME
project_id = project_path.name
return self._base_path / "projects" / project_id
def init_project(self, project_path: Path) -> Path: def init_project(self, project_path: Path) -> Path:
"""Initialize storage for a new project. """Initialize storage for a new project.
Creates a .fuzzforge/ directory inside the project for storing:
- assets/: Input files (source code, etc.)
- inputs/: Prepared module inputs (for debugging)
- runs/: Execution results from each module
:param project_path: Path to the project directory. :param project_path: Path to the project directory.
:returns: Path to the project storage directory. :returns: Path to the project storage directory.
@@ -89,102 +104,91 @@ class LocalStorage:
logger = get_logger() logger = get_logger()
storage_path = self._get_project_path(project_path) storage_path = self._get_project_path(project_path)
# Create directory structure # Create directory structure (minimal for OSS)
(storage_path / "assets").mkdir(parents=True, exist_ok=True) storage_path.mkdir(parents=True, exist_ok=True)
(storage_path / "runs").mkdir(parents=True, exist_ok=True) (storage_path / "runs").mkdir(parents=True, exist_ok=True)
# Create .gitignore to avoid committing large files
gitignore_path = storage_path / ".gitignore"
if not gitignore_path.exists():
gitignore_content = """# FuzzForge storage - ignore large/temporary files
# Execution results (can be very large)
runs/
# Project configuration
!config.json
"""
gitignore_path.write_text(gitignore_content)
logger.info("initialized project storage", project=project_path.name, storage=str(storage_path)) logger.info("initialized project storage", project=project_path.name, storage=str(storage_path))
return storage_path return storage_path
def get_project_assets_path(self, project_path: Path) -> Path | None: def get_project_assets_path(self, project_path: Path) -> Path | None:
"""Get the path to project assets archive. """Get the path to project assets (source directory).
Returns the configured source path for the project.
In OSS mode, this is just a reference to the user's source - no copying.
:param project_path: Path to the project directory. :param project_path: Path to the project directory.
:returns: Path to assets archive, or None if not found. :returns: Path to source directory, or None if not configured.
""" """
storage_path = self._get_project_path(project_path) storage_path = self._get_project_path(project_path)
assets_dir = storage_path / "assets" config_path = storage_path / "config.json"
# Look for assets archive if config_path.exists():
archive_path = assets_dir / "assets.tar.gz" import json
if archive_path.exists(): config = json.loads(config_path.read_text())
return archive_path source_path = config.get("source_path")
if source_path:
path = Path(source_path)
if path.exists():
return path
# Check if there are any files in assets directory # Fallback: check if project_path itself is the source
if assets_dir.exists() and any(assets_dir.iterdir()): # (common case: user runs from their project directory)
# Create archive from directory contents if (project_path / "Cargo.toml").exists() or (project_path / "src").exists():
return self._create_archive_from_directory(assets_dir) return project_path
return None return None
def _create_archive_from_directory(self, directory: Path) -> Path: def set_project_assets(self, project_path: Path, assets_path: Path) -> Path:
"""Create a tar.gz archive from a directory's contents. """Set the source path for a project (no copying).
:param directory: Directory to archive. Just stores a reference to the source directory.
:returns: Path to the created archive. The source is mounted directly into containers at runtime.
"""
archive_path = directory.parent / f"{directory.name}.tar.gz"
with Archive(archive_path, "w:gz") as tar:
for item in directory.iterdir():
tar.add(item, arcname=item.name)
return archive_path
def create_empty_assets_archive(self, project_path: Path) -> Path:
"""Create an empty assets archive for a project.
:param project_path: Path to the project directory. :param project_path: Path to the project directory.
:returns: Path to the empty archive. :param assets_path: Path to source directory.
:returns: The assets path (unchanged).
:raises StorageError: If path doesn't exist.
""" """
storage_path = self._get_project_path(project_path) import json
assets_dir = storage_path / "assets"
assets_dir.mkdir(parents=True, exist_ok=True)
archive_path = assets_dir / "assets.tar.gz"
# Create empty archive
with Archive(archive_path, "w:gz") as tar:
pass # Empty archive
return archive_path
def store_assets(self, project_path: Path, assets_path: Path) -> Path:
"""Store project assets from a local path.
:param project_path: Path to the project directory.
:param assets_path: Source path (file or directory) to store.
:returns: Path to the stored assets.
:raises StorageError: If storage operation fails.
"""
logger = get_logger() logger = get_logger()
if not assets_path.exists():
raise StorageError(f"Assets path does not exist: {assets_path}")
# Resolve to absolute path
assets_path = assets_path.resolve()
# Store reference in config
storage_path = self._get_project_path(project_path) storage_path = self._get_project_path(project_path)
assets_dir = storage_path / "assets" storage_path.mkdir(parents=True, exist_ok=True)
assets_dir.mkdir(parents=True, exist_ok=True) config_path = storage_path / "config.json"
try: config: dict = {}
if assets_path.is_file(): if config_path.exists():
# Copy archive directly config = json.loads(config_path.read_text())
dest_path = assets_dir / "assets.tar.gz"
shutil.copy2(assets_path, dest_path)
else:
# Create archive from directory
dest_path = assets_dir / "assets.tar.gz"
with Archive(dest_path, "w:gz") as tar:
for item in assets_path.iterdir():
tar.add(item, arcname=item.name)
logger.info("stored project assets", project=project_path.name, path=str(dest_path)) config["source_path"] = str(assets_path)
return dest_path config_path.write_text(json.dumps(config, indent=2))
except Exception as exc: logger.info("set project assets", project=project_path.name, source=str(assets_path))
message = f"Failed to store assets: {exc}" return assets_path
raise StorageError(message) from exc
def store_execution_results( def store_execution_results(
self, self,

View File

@@ -17,7 +17,6 @@ dev = [
"fuzzforge-common", "fuzzforge-common",
"fuzzforge-types", "fuzzforge-types",
"fuzzforge-mcp", "fuzzforge-mcp",
"fuzzforge-cli",
] ]
[tool.uv.workspace] [tool.uv.workspace]