fix: pipeline module fixes and improved AI agent guidance

This commit is contained in:
AFredefon
2026-02-16 10:08:46 +01:00
parent 8adc7a2e00
commit cd5bfc27ee
16 changed files with 302 additions and 128 deletions

View File

@@ -50,10 +50,9 @@ common_inputs = [
]
output_artifacts = [
"fuzzing_results.json",
"crashes/",
"coverage-data/",
"corpus/",
"fuzzing-stats.json"
"results.json"
]
output_treatment = "Show fuzzing-stats.json as a live summary with total_executions, exec/sec, coverage_percent, and crashes_found. List files in crashes/ directory if any crashes found. The corpus/ and coverage-data/ directories are artifacts for downstream modules, don't display their contents."
output_treatment = "Read fuzzing_results.json which contains: targets_fuzzed, total_crashes, total_executions, crashes_path, and results array with per-target crash info. Display summary of crashes found. The crashes/ directory contains crash inputs for downstream crash-analyzer."

View File

@@ -458,34 +458,56 @@ class Module(FuzzForgeModule):
"""
crashes: list[CrashInfo] = []
seen_hashes: set[str] = set()
if self._fuzz_project_path is None or self._crashes_path is None:
return crashes
# Check for crashes in the artifacts directory
artifacts_dir = self._fuzz_project_path / "artifacts" / target
# Check multiple possible crash locations:
# 1. Standard artifacts directory (target-specific)
# 2. Generic artifacts directory
# 3. Fuzz project root (fork mode sometimes writes here)
# 4. Project root (parent of fuzz directory)
search_paths = [
self._fuzz_project_path / "artifacts" / target,
self._fuzz_project_path / "artifacts",
self._fuzz_project_path,
self._fuzz_project_path.parent,
]
if artifacts_dir.is_dir():
for crash_file in artifacts_dir.glob("crash-*"):
if crash_file.is_file():
# Copy crash to output
output_crash = self._crashes_path / target
output_crash.mkdir(parents=True, exist_ok=True)
dest = output_crash / crash_file.name
shutil.copy2(crash_file, dest)
for search_dir in search_paths:
if not search_dir.is_dir():
continue
# Use rglob to recursively find crash files
for crash_file in search_dir.rglob("crash-*"):
if not crash_file.is_file():
continue
# Skip duplicates by hash
if crash_file.name in seen_hashes:
continue
seen_hashes.add(crash_file.name)
# Read crash input
crash_data = crash_file.read_bytes()
# Copy crash to output
output_crash = self._crashes_path / target
output_crash.mkdir(parents=True, exist_ok=True)
dest = output_crash / crash_file.name
shutil.copy2(crash_file, dest)
crash_info = CrashInfo(
file_path=str(dest),
input_hash=crash_file.name,
input_size=len(crash_data),
)
crashes.append(crash_info)
# Read crash input
crash_data = crash_file.read_bytes()
logger.info("found crash", target=target, file=crash_file.name)
crash_info = CrashInfo(
file_path=str(dest),
input_hash=crash_file.name,
input_size=len(crash_data),
)
crashes.append(crash_info)
logger.info("found crash", target=target, file=crash_file.name, source=str(search_dir))
logger.info("crash collection complete", target=target, total_crashes=len(crashes))
return crashes
def _write_output(self) -> None:

View File

@@ -51,9 +51,8 @@ common_inputs = [
]
output_artifacts = [
"unique-crashes.json",
"crash-report.md",
"severity-analysis.json"
"crash_analysis.json",
"results.json"
]
output_treatment = "Display crash-report.md as rendered markdown - this is the primary output. Show unique-crashes.json as a table with crash ID, severity, and affected function. Summarize severity-analysis.json showing counts by severity level (critical, high, medium, low)."
output_treatment = "Read crash_analysis.json which contains: total_crashes, unique_crashes, duplicate_crashes, severity_summary (high/medium/low/unknown counts), and unique_analyses array with details per crash. Display a summary table of unique crashes by severity."

View File

@@ -8,6 +8,7 @@ requires-python = ">=3.14"
dependencies = [
"podman==5.6.0",
"pydantic==2.12.4",
"structlog==25.5.0",
"tomlkit==0.13.3",
]

View File

@@ -6,8 +6,13 @@ readme = "README.md"
requires-python = ">=3.14"
dependencies = [
"fuzzforge-modules-sdk==0.0.1",
"pydantic==2.12.4",
"structlog==25.5.0",
]
[project.scripts]
module = "module.__main__:main"
[tool.uv.sources]
fuzzforge-modules-sdk = { workspace = true }
@@ -18,8 +23,8 @@ build-backend = "hatchling.build"
[tool.hatch.build.targets.wheel]
packages = ["src/module"]
[tool.uv]
dev-dependencies = [
[dependency-groups]
dev = [
"mypy>=1.8.0",
"pytest>=7.4.3",
"pytest-asyncio>=0.21.1",
@@ -47,9 +52,9 @@ common_inputs = [
]
output_artifacts = [
"harness-evaluation.json",
"coverage-report.json",
"feedback-summary.md"
"artifacts/harness-evaluation.json",
"artifacts/feedback-summary.md",
"results.json"
]
output_treatment = "Display feedback-summary.md as rendered markdown for quick review. Show harness-evaluation.json summary with pass/fail status and error messages. Show coverage-report.json as a table of covered functions."
output_treatment = "Display artifacts/feedback-summary.md as rendered markdown for quick review. Read artifacts/harness-evaluation.json for detailed per-harness results with verdict (production_ready/needs_improvement/broken), score, strengths, and issues with suggestions."

View File

@@ -1,15 +1,18 @@
"""Harness tester module - tests and evaluates fuzz harnesses."""
from __future__ import annotations
import json
import subprocess
import time
from pathlib import Path
from typing import TYPE_CHECKING, Any
from fuzzforge_modules_sdk import (
FuzzForgeModule,
from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleResource,
FuzzForgeModuleResults,
FuzzForgeResource,
)
from fuzzforge_modules_sdk.api.modules.base import FuzzForgeModule
from module.analyzer import FeedbackGenerator
from module.feedback import (
@@ -23,31 +26,102 @@ from module.feedback import (
PerformanceMetrics,
StabilityMetrics,
)
from module.models import Input, Output
from module.settings import Settings
class HarnessTesterModule(FuzzForgeModule):
"""Tests fuzz harnesses with compilation, execution, and short fuzzing trials."""
def _run(self, resources: list[FuzzForgeResource]) -> FuzzForgeModuleResults:
_settings: Settings | None
def __init__(self) -> None:
"""Initialize an instance of the class."""
name: str = "harness-tester"
version: str = "0.1.0"
FuzzForgeModule.__init__(self, name=name, version=version)
self._settings = None
self.configuration: dict[str, Any] = {}
@classmethod
def _get_input_type(cls) -> type[Input]:
"""Return the input type."""
return Input
@classmethod
def _get_output_type(cls) -> type[Output]:
"""Return the output type."""
return Output
def _prepare(self, settings: Settings) -> None: # type: ignore[override]
"""Prepare the module.
:param settings: Module settings.
"""
self._settings = settings
self.configuration = {
"trial_duration_sec": settings.trial_duration_sec,
"execution_timeout_sec": settings.execution_timeout_sec,
"enable_coverage": settings.enable_coverage,
"min_quality_score": settings.min_quality_score,
}
def _cleanup(self, settings: Settings) -> None: # type: ignore[override]
"""Cleanup after module execution.
:param settings: Module settings.
"""
pass # No cleanup needed
def _run(self, resources: list[FuzzForgeModuleResource]) -> FuzzForgeModuleResults:
"""Run harness testing on provided resources.
:param resources: List of resources (Rust project with fuzz harnesses)
:returns: Module execution result
"""
import shutil
self.emit_event("started", message="Beginning harness testing")
# Configuration
trial_duration = self.configuration.get("trial_duration_sec", 30)
timeout_sec = self.configuration.get("execution_timeout_sec", 10)
# Debug: Log resources
self.get_logger().info(
"Received resources",
count=len(resources),
resources=[str(r.path) for r in resources],
)
# Find Rust project
project_path = self._find_rust_project(resources)
if not project_path:
self.emit_event("error", message="No Rust project found in resources")
return FuzzForgeModuleResults.FAILURE
# Copy project to writable workspace (input is read-only)
workspace = Path("/tmp/harness-workspace")
if workspace.exists():
shutil.rmtree(workspace)
shutil.copytree(project_path, workspace)
project_path = workspace
self.get_logger().info("Copied project to writable workspace", path=str(project_path))
# Find fuzz harnesses
harnesses = self._find_fuzz_harnesses(project_path)
# Debug: Log fuzz directory status
fuzz_dir = project_path / "fuzz" / "fuzz_targets"
self.get_logger().info(
"Checking fuzz directory",
fuzz_dir=str(fuzz_dir),
exists=fuzz_dir.exists(),
)
if not harnesses:
self.emit_event("error", message="No fuzz harnesses found")
return FuzzForgeModuleResults.FAILURE
@@ -110,16 +184,35 @@ class HarnessTesterModule(FuzzForgeModule):
return FuzzForgeModuleResults.SUCCESS
def _find_rust_project(self, resources: list[FuzzForgeResource]) -> Path | None:
"""Find Rust project with Cargo.toml.
def _find_rust_project(self, resources: list[FuzzForgeModuleResource]) -> Path | None:
"""Find Rust project with Cargo.toml (the main project, not fuzz workspace).
:param resources: List of resources
:returns: Path to Rust project or None
"""
# First, try to find a directory with both Cargo.toml and src/
for resource in resources:
cargo_toml = Path(resource.path) / "Cargo.toml"
path = Path(resource.path)
cargo_toml = path / "Cargo.toml"
src_dir = path / "src"
if cargo_toml.exists() and src_dir.exists():
return path
# Fall back to finding parent of fuzz directory
for resource in resources:
path = Path(resource.path)
if path.name == "fuzz" and (path / "Cargo.toml").exists():
# This is the fuzz workspace, return parent
parent = path.parent
if (parent / "Cargo.toml").exists():
return parent
# Last resort: find any Cargo.toml
for resource in resources:
path = Path(resource.path)
cargo_toml = path / "Cargo.toml"
if cargo_toml.exists():
return Path(resource.path)
return path
return None
def _find_fuzz_harnesses(self, project_path: Path) -> list[Path]:
@@ -156,59 +249,68 @@ class HarnessTesterModule(FuzzForgeModule):
self.emit_event("compiling", harness=harness_name)
compilation = self._test_compilation(project_path, harness_name)
# Initialize evaluation
evaluation = HarnessEvaluation(
name=harness_name,
path=str(harness_path),
compilation=compilation,
execution=None,
fuzzing_trial=None,
quality=None, # type: ignore
)
# If compilation failed, generate feedback and return
# If compilation failed, generate feedback and return early
if not compilation.success:
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.model_dump(),
execution_result=None,
coverage=None,
performance=None,
stability=None,
)
return evaluation
return HarnessEvaluation(
name=harness_name,
path=str(harness_path),
compilation=compilation,
execution=None,
fuzzing_trial=None,
quality=quality,
)
# Step 2: Execution test
self.emit_event("testing_execution", harness=harness_name)
execution = self._test_execution(project_path, harness_name, timeout_sec)
evaluation.execution = execution
if not execution.success:
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
execution_result=execution.dict(),
quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.model_dump(),
execution_result=execution.model_dump(),
coverage=None,
performance=None,
stability=None,
)
return evaluation
return HarnessEvaluation(
name=harness_name,
path=str(harness_path),
compilation=compilation,
execution=execution,
fuzzing_trial=None,
quality=quality,
)
# Step 3: Fuzzing trial
self.emit_event("running_trial", harness=harness_name, duration=trial_duration)
fuzzing_trial = self._run_fuzzing_trial(
project_path, harness_name, trial_duration
)
evaluation.fuzzing_trial = fuzzing_trial
# Generate quality assessment
evaluation.quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.dict(),
execution_result=execution.dict(),
quality = FeedbackGenerator.generate_quality_assessment(
compilation_result=compilation.model_dump(),
execution_result=execution.model_dump(),
coverage=fuzzing_trial.coverage if fuzzing_trial else None,
performance=fuzzing_trial.performance if fuzzing_trial else None,
stability=fuzzing_trial.stability if fuzzing_trial else None,
)
return evaluation
return HarnessEvaluation(
name=harness_name,
path=str(harness_path),
compilation=compilation,
execution=execution,
fuzzing_trial=fuzzing_trial,
quality=quality,
)
def _test_compilation(self, project_path: Path, harness_name: str) -> CompilationResult:
"""Test harness compilation.
@@ -577,13 +679,18 @@ class HarnessTesterModule(FuzzForgeModule):
:param report: Harness test report
"""
from fuzzforge_modules_sdk.api.constants import PATH_TO_ARTIFACTS
# Ensure artifacts directory exists
PATH_TO_ARTIFACTS.mkdir(parents=True, exist_ok=True)
# Save JSON report
results_path = Path("/results/harness-evaluation.json")
results_path = PATH_TO_ARTIFACTS / "harness-evaluation.json"
with results_path.open("w") as f:
json.dump(report.dict(), f, indent=2)
json.dump(report.model_dump(), f, indent=2)
# Save human-readable summary
summary_path = Path("/results/feedback-summary.md")
summary_path = PATH_TO_ARTIFACTS / "feedback-summary.md"
with summary_path.open("w") as f:
f.write("# Harness Testing Report\n\n")
f.write(f"**Total Harnesses:** {report.summary.total_harnesses}\n")
@@ -619,5 +726,5 @@ class HarnessTesterModule(FuzzForgeModule):
f.write("\n")
# Entry point
harness_tester = HarnessTesterModule()
# Export the module class for use by __main__.py
__all__ = ["HarnessTesterModule"]

View File

@@ -0,0 +1,16 @@
"""Harness tester module entrypoint."""
from fuzzforge_modules_sdk.api import logs
from module import HarnessTesterModule
def main() -> None:
"""Run the harness tester module."""
logs.configure()
module = HarnessTesterModule()
module.main()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,27 @@
"""Models for harness-tester module."""
from pathlib import Path
from typing import Any
from pydantic import BaseModel
from fuzzforge_modules_sdk.api.models import (
FuzzForgeModuleInputBase,
FuzzForgeModuleOutputBase,
)
from module.settings import Settings
class Input(FuzzForgeModuleInputBase[Settings]):
"""Input for the harness-tester module."""
class Output(FuzzForgeModuleOutputBase):
"""Output for the harness-tester module."""
#: The test report data.
report: dict[str, Any] | None = None
#: Path to the report JSON file.
report_file: Path | None = None

View File

@@ -0,0 +1,19 @@
"""Settings for harness-tester module."""
from pydantic import BaseModel, Field
class Settings(BaseModel):
"""Settings for the harness-tester module."""
#: Duration for each fuzzing trial in seconds.
trial_duration_sec: int = Field(default=30, ge=1, le=300)
#: Timeout for harness execution in seconds.
execution_timeout_sec: int = Field(default=10, ge=1, le=60)
#: Whether to generate coverage reports.
enable_coverage: bool = Field(default=True)
#: Minimum score threshold for harness to be considered "good".
min_quality_score: int = Field(default=50, ge=0, le=100)

View File

@@ -45,8 +45,8 @@ common_inputs = [
]
output_artifacts = [
"fuzzable_functions.json",
"analysis_report.md"
"analysis.json",
"results.json"
]
output_treatment = "Display analysis_report.md as rendered markdown. Show fuzzable_functions.json as a table listing function names, signatures, and fuzz-worthiness scores."
output_treatment = "Read analysis.json which contains: project_info, fuzzable_functions (array with name, signature, file_path, fuzz_score), and vulnerabilities (array of known CVEs). Display fuzzable_functions as a table. Highlight any vulnerabilities found."