Files
fuzzforge_ai/workers/ossfuzz/activities.py
tduhamel42 60ca088ecf CI/CD Integration with Ephemeral Deployment Model (#14)
* feat: Complete migration from Prefect to Temporal

BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal

## Major Changes
- Replace Prefect with Temporal for workflow orchestration
- Implement vertical worker architecture (rust, android)
- Replace Docker registry with MinIO for unified storage
- Refactor activities to be co-located with workflows
- Update all API endpoints for Temporal compatibility

## Infrastructure
- New: docker-compose.temporal.yaml (Temporal + MinIO + workers)
- New: workers/ directory with rust and android vertical workers
- New: backend/src/temporal/ (manager, discovery)
- New: backend/src/storage/ (S3-cached storage with MinIO)
- New: backend/toolbox/common/ (shared storage activities)
- Deleted: docker-compose.yaml (old Prefect setup)
- Deleted: backend/src/core/prefect_manager.py
- Deleted: backend/src/services/prefect_stats_monitor.py
- Deleted: Docker registry and insecure-registries requirement

## Workflows
- Migrated: security_assessment workflow to Temporal
- New: rust_test workflow (example/test workflow)
- Deleted: secret_detection_scan (Prefect-based, to be reimplemented)
- Activities now co-located with workflows for independent testing

## API Changes
- Updated: backend/src/api/workflows.py (Temporal submission)
- Updated: backend/src/api/runs.py (Temporal status/results)
- Updated: backend/src/main.py (727 lines, TemporalManager integration)
- Updated: All 16 MCP tools to use TemporalManager

## Testing
-  All services healthy (Temporal, PostgreSQL, MinIO, workers, backend)
-  All API endpoints functional
-  End-to-end workflow test passed (72 findings from vulnerable_app)
-  MinIO storage integration working (target upload/download, results)
-  Worker activity discovery working (6 activities registered)
-  Tarball extraction working
-  SARIF report generation working

## Documentation
- ARCHITECTURE.md: Complete Temporal architecture documentation
- QUICKSTART_TEMPORAL.md: Getting started guide
- MIGRATION_DECISION.md: Why we chose Temporal over Prefect
- IMPLEMENTATION_STATUS.md: Migration progress tracking
- workers/README.md: Worker development guide

## Dependencies
- Added: temporalio>=1.6.0
- Added: boto3>=1.34.0 (MinIO S3 client)
- Removed: prefect>=3.4.18

* feat: Add Python fuzzing vertical with Atheris integration

This commit implements a complete Python fuzzing workflow using Atheris:

## Python Worker (workers/python/)
- Dockerfile with Python 3.11, Atheris, and build tools
- Generic worker.py for dynamic workflow discovery
- requirements.txt with temporalio, boto3, atheris dependencies
- Added to docker-compose.temporal.yaml with dedicated cache volume

## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/)
- Reusable module extending BaseModule
- Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py)
- Recursive search to find targets in nested directories
- Dynamically loads TestOneInput() function
- Configurable max_iterations and timeout
- Real-time stats callback support for live monitoring
- Returns findings as ModuleFinding objects

## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/)
- Temporal workflow for orchestrating fuzzing
- Downloads user code from MinIO
- Executes AtherisFuzzer module
- Uploads results to MinIO
- Cleans up cache after execution
- metadata.yaml with vertical: python for routing

## Test Project (test_projects/python_fuzz_waterfall/)
- Demonstrates stateful waterfall vulnerability
- main.py with check_secret() that leaks progress
- fuzz_target.py with Atheris TestOneInput() harness
- Complete README with usage instructions

## Backend Fixes
- Fixed parameter merging in REST API endpoints (workflows.py)
- Changed workflow parameter passing from positional args to kwargs (manager.py)
- Default parameters now properly merged with user parameters

## Testing
 Worker discovered AtherisFuzzingWorkflow
 Workflow executed end-to-end successfully
 Fuzz target auto-discovered in nested directories
 Atheris ran 100,000 iterations
 Results uploaded and cache cleaned

* chore: Complete Temporal migration with updated CLI/SDK/docs

This commit includes all remaining Temporal migration changes:

## CLI Updates (cli/)
- Updated workflow execution commands for Temporal
- Enhanced error handling and exceptions
- Updated dependencies in uv.lock

## SDK Updates (sdk/)
- Client methods updated for Temporal workflows
- Updated models for new workflow execution
- Updated dependencies in uv.lock

## Documentation Updates (docs/)
- Architecture documentation for Temporal
- Workflow concept documentation
- Resource management documentation (new)
- Debugging guide (new)
- Updated tutorials and how-to guides
- Troubleshooting updates

## README Updates
- Main README with Temporal instructions
- Backend README
- CLI README
- SDK README

## Other
- Updated IMPLEMENTATION_STATUS.md
- Removed old vulnerable_app.tar.gz

These changes complete the Temporal migration and ensure the
CLI/SDK work correctly with the new backend.

* fix: Use positional args instead of kwargs for Temporal workflows

The Temporal Python SDK's start_workflow() method doesn't accept
a 'kwargs' parameter. Workflows must receive parameters as positional
arguments via the 'args' parameter.

Changed from:
  args=workflow_args  # Positional arguments

This fixes the error:
  TypeError: Client.start_workflow() got an unexpected keyword argument 'kwargs'

Workflows now correctly receive parameters in order:
- security_assessment: [target_id, scanner_config, analyzer_config, reporter_config]
- atheris_fuzzing: [target_id, target_file, max_iterations, timeout_seconds]
- rust_test: [target_id, test_message]

* fix: Filter metadata-only parameters from workflow arguments

SecurityAssessmentWorkflow was receiving 7 arguments instead of 2-5.
The issue was that target_path and volume_mode from default_parameters
were being passed to the workflow, when they should only be used by
the system for configuration.

Now filters out metadata-only parameters (target_path, volume_mode)
before passing arguments to workflow execution.

* refactor: Remove Prefect leftovers and volume mounting legacy

Complete cleanup of Prefect migration artifacts:

Backend:
- Delete registry.py and workflow_discovery.py (Prefect-specific files)
- Remove Docker validation from setup.py (no longer needed)
- Remove ResourceLimits and VolumeMount models
- Remove target_path and volume_mode from WorkflowSubmission
- Remove supported_volume_modes from API and discovery
- Clean up metadata.yaml files (remove volume/path fields)
- Simplify parameter filtering in manager.py

SDK:
- Remove volume_mode parameter from client methods
- Remove ResourceLimits and VolumeMount models
- Remove Prefect error patterns from docker_logs.py
- Clean up WorkflowSubmission and WorkflowMetadata models

CLI:
- Remove Volume Modes display from workflow info

All removed features are Prefect-specific or Docker volume mounting
artifacts. Temporal workflows use MinIO storage exclusively.

* feat: Add comprehensive test suite and benchmark infrastructure

- Add 68 unit tests for fuzzer, scanner, and analyzer modules
- Implement pytest-based test infrastructure with fixtures
- Add 6 performance benchmarks with category-specific thresholds
- Configure GitHub Actions for automated testing and benchmarking
- Add test and benchmark documentation

Test coverage:
- AtherisFuzzer: 8 tests
- CargoFuzzer: 14 tests
- FileScanner: 22 tests
- SecurityAnalyzer: 24 tests

All tests passing (68/68)
All benchmarks passing (6/6)

* fix: Resolve all ruff linting violations across codebase

Fixed 27 ruff violations in 12 files:
- Removed unused imports (Depends, Dict, Any, Optional, etc.)
- Fixed undefined workflow_info variable in workflows.py
- Removed dead code with undefined variables in atheris_fuzzer.py
- Changed f-string to regular string where no placeholders used

All files now pass ruff checks for CI/CD compliance.

* fix: Configure CI for unit tests only

- Renamed docker-compose.temporal.yaml → docker-compose.yml for CI compatibility
- Commented out integration-tests job (no integration tests yet)
- Updated test-summary to only depend on lint and unit-tests

CI will now run successfully with 68 unit tests. Integration tests can be added later.

* feat: Add CI/CD integration with ephemeral deployment model

Implements comprehensive CI/CD support for FuzzForge with on-demand worker management:

**Worker Management (v0.7.0)**
- Add WorkerManager for automatic worker lifecycle control
- Auto-start workers from stopped state when workflows execute
- Auto-stop workers after workflow completion
- Health checks and startup timeout handling (90s default)

**CI/CD Features**
- `--fail-on` flag: Fail builds based on SARIF severity levels (error/warning/note/info)
- `--export-sarif` flag: Export findings in SARIF 2.1.0 format
- `--auto-start`/`--auto-stop` flags: Control worker lifecycle
- Exit code propagation: Returns 1 on blocking findings, 0 on success

**Exit Code Fix**
- Add `except typer.Exit: raise` handlers at 3 critical locations
- Move worker cleanup to finally block for guaranteed execution
- Exit codes now propagate correctly even when build fails

**CI Scripts & Examples**
- ci-start.sh: Start FuzzForge services with health checks
- ci-stop.sh: Clean shutdown with volume preservation option
- GitHub Actions workflow example (security-scan.yml)
- GitLab CI pipeline example (.gitlab-ci.example.yml)
- docker-compose.ci.yml: CI-optimized compose file with profiles

**OSS-Fuzz Integration**
- New ossfuzz_campaign workflow for running OSS-Fuzz projects
- OSS-Fuzz worker with Docker-in-Docker support
- Configurable campaign duration and project selection

**Documentation**
- Comprehensive CI/CD integration guide (docs/how-to/cicd-integration.md)
- Updated architecture docs with worker lifecycle details
- Updated workspace isolation documentation
- CLI README with worker management examples

**SDK Enhancements**
- Add get_workflow_worker_info() endpoint
- Worker vertical metadata in workflow responses

**Testing**
- All workflows tested: security_assessment, atheris_fuzzing, secret_detection, cargo_fuzzing
- All monitoring commands tested: stats, crashes, status, finding
- Full CI pipeline simulation verified
- Exit codes verified for success/failure scenarios

Ephemeral CI/CD model: ~3-4GB RAM, ~60-90s startup, runs entirely in CI containers.

* fix: Resolve ruff linting violations in CI/CD code

- Remove unused variables (run_id, defaults, result)
- Remove unused imports
- Fix f-string without placeholders

All CI/CD integration files now pass ruff checks.
2025-10-14 10:13:45 +02:00

414 lines
14 KiB
Python

"""
OSS-Fuzz Campaign Activities
Activities for running OSS-Fuzz campaigns using Google's infrastructure.
"""
import logging
import os
import subprocess
import shutil
from pathlib import Path
from typing import Dict, Any, List, Optional
from datetime import datetime
import yaml
from temporalio import activity
logger = logging.getLogger(__name__)
# Paths
OSS_FUZZ_REPO = Path("/opt/oss-fuzz")
CACHE_DIR = Path(os.getenv("CACHE_DIR", "/cache"))
@activity.defn(name="load_ossfuzz_project")
async def load_ossfuzz_project_activity(project_name: str) -> Dict[str, Any]:
"""
Load OSS-Fuzz project configuration from project.yaml.
Args:
project_name: Name of the OSS-Fuzz project (e.g., "curl", "sqlite3")
Returns:
Dictionary with project config, paths, and metadata
"""
logger.info(f"Loading OSS-Fuzz project: {project_name}")
# Update OSS-Fuzz repo if it exists, clone if not
if OSS_FUZZ_REPO.exists():
logger.info("Updating OSS-Fuzz repository...")
subprocess.run(
["git", "-C", str(OSS_FUZZ_REPO), "pull", "--depth=1"],
check=False # Don't fail if already up to date
)
else:
logger.info("Cloning OSS-Fuzz repository...")
subprocess.run(
[
"git", "clone", "--depth=1",
"https://github.com/google/oss-fuzz.git",
str(OSS_FUZZ_REPO)
],
check=True
)
# Find project directory
project_path = OSS_FUZZ_REPO / "projects" / project_name
if not project_path.exists():
raise ValueError(
f"Project '{project_name}' not found in OSS-Fuzz. "
f"Available projects: https://github.com/google/oss-fuzz/tree/master/projects"
)
# Read project.yaml
config_file = project_path / "project.yaml"
if not config_file.exists():
raise ValueError(f"No project.yaml found for project '{project_name}'")
with open(config_file) as f:
config = yaml.safe_load(f)
# Add paths
config["project_name"] = project_name
config["project_path"] = str(project_path)
config["dockerfile_path"] = str(project_path / "Dockerfile")
config["build_script_path"] = str(project_path / "build.sh")
# Validate required fields
if not config.get("language"):
logger.warning(f"No language specified in project.yaml for {project_name}")
logger.info(
f"✓ Loaded project {project_name}: "
f"language={config.get('language', 'unknown')}, "
f"engines={config.get('fuzzing_engines', [])}, "
f"sanitizers={config.get('sanitizers', [])}"
)
return config
@activity.defn(name="build_ossfuzz_project")
async def build_ossfuzz_project_activity(
project_name: str,
project_config: Dict[str, Any],
sanitizer: Optional[str] = None,
engine: Optional[str] = None
) -> Dict[str, Any]:
"""
Build OSS-Fuzz project directly using build.sh (no Docker-in-Docker).
Args:
project_name: Name of the project
project_config: Configuration from project.yaml
sanitizer: Override sanitizer (default: first from project.yaml)
engine: Override engine (default: first from project.yaml)
Returns:
Dictionary with build results and discovered fuzz targets
"""
logger.info(f"Building OSS-Fuzz project: {project_name}")
# Determine sanitizer and engine
sanitizers = project_config.get("sanitizers", ["address"])
engines = project_config.get("fuzzing_engines", ["libfuzzer"])
use_sanitizer = sanitizer if sanitizer else sanitizers[0]
use_engine = engine if engine else engines[0]
logger.info(f"Building with sanitizer={use_sanitizer}, engine={use_engine}")
# Setup directories
src_dir = Path("/src")
out_dir = Path("/out")
src_dir.mkdir(exist_ok=True)
out_dir.mkdir(exist_ok=True)
# Clean previous build artifacts
for item in out_dir.glob("*"):
if item.is_file():
item.unlink()
elif item.is_dir():
shutil.rmtree(item)
# Copy project files from OSS-Fuzz repo to /src
project_path = Path(project_config["project_path"])
build_script = project_path / "build.sh"
if not build_script.exists():
raise Exception(f"build.sh not found for project {project_name}")
logger.info(f"Copying project files from {project_path} to {src_dir}")
# Copy build.sh
shutil.copy2(build_script, src_dir / "build.sh")
os.chmod(src_dir / "build.sh", 0o755)
# Copy any fuzzer source files (*.cc, *.c, *.cpp files)
for pattern in ["*.cc", "*.c", "*.cpp", "*.h", "*.hh", "*.hpp"]:
for src_file in project_path.glob(pattern):
dest_file = src_dir / src_file.name
shutil.copy2(src_file, dest_file)
logger.info(f"Copied: {src_file.name}")
# Clone project source code to subdirectory
main_repo = project_config.get("main_repo")
work_dir = src_dir
if main_repo:
logger.info(f"Cloning project source from {main_repo}")
project_src_dir = src_dir / project_name
# Remove existing directory if present
if project_src_dir.exists():
shutil.rmtree(project_src_dir)
clone_cmd = ["git", "clone", "--depth=1", main_repo, str(project_src_dir)]
result = subprocess.run(clone_cmd, capture_output=True, text=True, timeout=600)
if result.returncode != 0:
logger.warning(f"Failed to clone {main_repo}: {result.stderr}")
logger.info("Continuing without cloning (build.sh may download source)")
else:
# Copy build.sh into the project source directory
shutil.copy2(src_dir / "build.sh", project_src_dir / "build.sh")
os.chmod(project_src_dir / "build.sh", 0o755)
# build.sh should run from within the project directory
work_dir = project_src_dir
logger.info(f"Build will run from: {work_dir}")
else:
logger.info("No main_repo in project.yaml, build.sh will download source")
# Set OSS-Fuzz environment variables
build_env = os.environ.copy()
build_env.update({
"SRC": str(src_dir),
"OUT": str(out_dir),
"FUZZING_ENGINE": use_engine,
"SANITIZER": use_sanitizer,
"ARCHITECTURE": "x86_64",
# Use clang's built-in libfuzzer instead of separate library
"LIB_FUZZING_ENGINE": "-fsanitize=fuzzer",
})
# Set sanitizer flags
if use_sanitizer == "address":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=address"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=address"
elif use_sanitizer == "memory":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=memory"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=memory"
elif use_sanitizer == "undefined":
build_env["CFLAGS"] = build_env.get("CFLAGS", "") + " -fsanitize=undefined"
build_env["CXXFLAGS"] = build_env.get("CXXFLAGS", "") + " -fsanitize=undefined"
# Execute build.sh from the work directory
logger.info(f"Executing build.sh in {work_dir}")
build_cmd = ["bash", "./build.sh"]
result = subprocess.run(
build_cmd,
cwd=str(work_dir),
env=build_env,
capture_output=True,
text=True,
timeout=1800 # 30 minutes max build time
)
if result.returncode != 0:
logger.error(f"Build failed:\nSTDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}")
raise Exception(f"Build failed for {project_name}: {result.stderr}")
logger.info("✓ Build completed successfully")
logger.info(f"Build output:\n{result.stdout[-2000:]}") # Last 2000 chars
# Discover fuzz targets in /out
fuzz_targets = []
for file in out_dir.glob("*"):
if file.is_file() and os.access(file, os.X_OK):
# Check if it's a fuzz target (executable, not .so/.a/.o)
if file.suffix not in ['.so', '.a', '.o', '.zip']:
fuzz_targets.append(str(file))
logger.info(f"Found fuzz target: {file.name}")
if not fuzz_targets:
logger.warning(f"No fuzz targets found in {out_dir}")
logger.info(f"Directory contents: {list(out_dir.glob('*'))}")
return {
"fuzz_targets": fuzz_targets,
"build_log": result.stdout[-5000:], # Last 5000 chars
"sanitizer_used": use_sanitizer,
"engine_used": use_engine,
"out_dir": str(out_dir)
}
@activity.defn(name="fuzz_target")
async def fuzz_target_activity(
target_path: str,
engine: str,
duration_seconds: int,
corpus_dir: Optional[str] = None,
dict_file: Optional[str] = None
) -> Dict[str, Any]:
"""
Run fuzzing on a target with specified engine.
Args:
target_path: Path to fuzz target executable
engine: Fuzzing engine (libfuzzer, afl, honggfuzz)
duration_seconds: How long to fuzz
corpus_dir: Optional corpus directory
dict_file: Optional dictionary file
Returns:
Dictionary with fuzzing stats and results
"""
logger.info(f"Fuzzing {Path(target_path).name} with {engine} for {duration_seconds}s")
# Prepare corpus directory
if not corpus_dir:
corpus_dir = str(CACHE_DIR / "corpus" / Path(target_path).stem)
Path(corpus_dir).mkdir(parents=True, exist_ok=True)
output_dir = CACHE_DIR / "output" / Path(target_path).stem
output_dir.mkdir(parents=True, exist_ok=True)
start_time = datetime.now()
try:
if engine == "libfuzzer":
cmd = [
target_path,
corpus_dir,
f"-max_total_time={duration_seconds}",
"-print_final_stats=1",
f"-artifact_prefix={output_dir}/"
]
if dict_file:
cmd.append(f"-dict={dict_file}")
elif engine == "afl":
cmd = [
"afl-fuzz",
"-i", corpus_dir if Path(corpus_dir).glob("*") else "-", # Empty corpus OK
"-o", str(output_dir),
"-t", "1000", # Timeout per execution
"-m", "none", # No memory limit
"--", target_path, "@@"
]
elif engine == "honggfuzz":
cmd = [
"honggfuzz",
f"--run_time={duration_seconds}",
"-i", corpus_dir,
"-o", str(output_dir),
"--", target_path
]
else:
raise ValueError(f"Unsupported fuzzing engine: {engine}")
logger.info(f"Starting fuzzer: {' '.join(cmd[:5])}...")
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=duration_seconds + 120 # Add 2 minute buffer
)
end_time = datetime.now()
elapsed = (end_time - start_time).total_seconds()
# Parse stats from output
stats = parse_fuzzing_stats(result.stdout, result.stderr, engine)
stats["elapsed_time"] = elapsed
stats["target_name"] = Path(target_path).name
stats["engine"] = engine
# Find crashes
crashes = find_crashes(output_dir)
stats["crashes"] = len(crashes)
stats["crash_files"] = crashes
# Collect new corpus files
new_corpus = collect_corpus(corpus_dir)
stats["corpus_size"] = len(new_corpus)
stats["corpus_files"] = new_corpus
logger.info(
f"✓ Fuzzing completed: {stats.get('total_executions', 0)} execs, "
f"{len(crashes)} crashes"
)
return stats
except subprocess.TimeoutExpired:
logger.warning(f"Fuzzing timed out after {duration_seconds}s")
return {
"target_name": Path(target_path).name,
"engine": engine,
"status": "timeout",
"elapsed_time": duration_seconds
}
def parse_fuzzing_stats(stdout: str, stderr: str, engine: str) -> Dict[str, Any]:
"""Parse fuzzing statistics from output"""
stats = {}
if engine == "libfuzzer":
# Parse libFuzzer stats
for line in (stdout + stderr).split('\n'):
if "#" in line and "NEW" in line:
# Example: #8192 NEW cov: 1234 ft: 5678 corp: 89/10KB
parts = line.split()
for i, part in enumerate(parts):
if part.startswith("cov:"):
stats["coverage"] = int(parts[i+1])
elif part.startswith("corp:"):
stats["corpus_entries"] = int(parts[i+1].split('/')[0])
elif part.startswith("exec/s:"):
stats["executions_per_sec"] = float(parts[i+1])
elif part.startswith("#"):
stats["total_executions"] = int(part[1:])
elif engine == "afl":
# Parse AFL stats (would need to read fuzzer_stats file)
pass
elif engine == "honggfuzz":
# Parse Honggfuzz stats
pass
return stats
def find_crashes(output_dir: Path) -> List[str]:
"""Find crash files in output directory"""
crashes = []
# libFuzzer crash files start with "crash-" or "leak-"
for pattern in ["crash-*", "leak-*", "timeout-*"]:
crashes.extend([str(f) for f in output_dir.glob(pattern)])
# AFL crashes in crashes/ subdirectory
crashes_dir = output_dir / "crashes"
if crashes_dir.exists():
crashes.extend([str(f) for f in crashes_dir.glob("*") if f.is_file()])
return crashes
def collect_corpus(corpus_dir: str) -> List[str]:
"""Collect corpus files"""
corpus_path = Path(corpus_dir)
if not corpus_path.exists():
return []
return [str(f) for f in corpus_path.glob("*") if f.is_file()]