mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 15:52:46 +00:00
* feat: Complete migration from Prefect to Temporal BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal ## Major Changes - Replace Prefect with Temporal for workflow orchestration - Implement vertical worker architecture (rust, android) - Replace Docker registry with MinIO for unified storage - Refactor activities to be co-located with workflows - Update all API endpoints for Temporal compatibility ## Infrastructure - New: docker-compose.temporal.yaml (Temporal + MinIO + workers) - New: workers/ directory with rust and android vertical workers - New: backend/src/temporal/ (manager, discovery) - New: backend/src/storage/ (S3-cached storage with MinIO) - New: backend/toolbox/common/ (shared storage activities) - Deleted: docker-compose.yaml (old Prefect setup) - Deleted: backend/src/core/prefect_manager.py - Deleted: backend/src/services/prefect_stats_monitor.py - Deleted: Docker registry and insecure-registries requirement ## Workflows - Migrated: security_assessment workflow to Temporal - New: rust_test workflow (example/test workflow) - Deleted: secret_detection_scan (Prefect-based, to be reimplemented) - Activities now co-located with workflows for independent testing ## API Changes - Updated: backend/src/api/workflows.py (Temporal submission) - Updated: backend/src/api/runs.py (Temporal status/results) - Updated: backend/src/main.py (727 lines, TemporalManager integration) - Updated: All 16 MCP tools to use TemporalManager ## Testing - ✅ All services healthy (Temporal, PostgreSQL, MinIO, workers, backend) - ✅ All API endpoints functional - ✅ End-to-end workflow test passed (72 findings from vulnerable_app) - ✅ MinIO storage integration working (target upload/download, results) - ✅ Worker activity discovery working (6 activities registered) - ✅ Tarball extraction working - ✅ SARIF report generation working ## Documentation - ARCHITECTURE.md: Complete Temporal architecture documentation - QUICKSTART_TEMPORAL.md: Getting started guide - MIGRATION_DECISION.md: Why we chose Temporal over Prefect - IMPLEMENTATION_STATUS.md: Migration progress tracking - workers/README.md: Worker development guide ## Dependencies - Added: temporalio>=1.6.0 - Added: boto3>=1.34.0 (MinIO S3 client) - Removed: prefect>=3.4.18 * feat: Add Python fuzzing vertical with Atheris integration This commit implements a complete Python fuzzing workflow using Atheris: ## Python Worker (workers/python/) - Dockerfile with Python 3.11, Atheris, and build tools - Generic worker.py for dynamic workflow discovery - requirements.txt with temporalio, boto3, atheris dependencies - Added to docker-compose.temporal.yaml with dedicated cache volume ## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/) - Reusable module extending BaseModule - Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py) - Recursive search to find targets in nested directories - Dynamically loads TestOneInput() function - Configurable max_iterations and timeout - Real-time stats callback support for live monitoring - Returns findings as ModuleFinding objects ## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/) - Temporal workflow for orchestrating fuzzing - Downloads user code from MinIO - Executes AtherisFuzzer module - Uploads results to MinIO - Cleans up cache after execution - metadata.yaml with vertical: python for routing ## Test Project (test_projects/python_fuzz_waterfall/) - Demonstrates stateful waterfall vulnerability - main.py with check_secret() that leaks progress - fuzz_target.py with Atheris TestOneInput() harness - Complete README with usage instructions ## Backend Fixes - Fixed parameter merging in REST API endpoints (workflows.py) - Changed workflow parameter passing from positional args to kwargs (manager.py) - Default parameters now properly merged with user parameters ## Testing ✅ Worker discovered AtherisFuzzingWorkflow ✅ Workflow executed end-to-end successfully ✅ Fuzz target auto-discovered in nested directories ✅ Atheris ran 100,000 iterations ✅ Results uploaded and cache cleaned * chore: Complete Temporal migration with updated CLI/SDK/docs This commit includes all remaining Temporal migration changes: ## CLI Updates (cli/) - Updated workflow execution commands for Temporal - Enhanced error handling and exceptions - Updated dependencies in uv.lock ## SDK Updates (sdk/) - Client methods updated for Temporal workflows - Updated models for new workflow execution - Updated dependencies in uv.lock ## Documentation Updates (docs/) - Architecture documentation for Temporal - Workflow concept documentation - Resource management documentation (new) - Debugging guide (new) - Updated tutorials and how-to guides - Troubleshooting updates ## README Updates - Main README with Temporal instructions - Backend README - CLI README - SDK README ## Other - Updated IMPLEMENTATION_STATUS.md - Removed old vulnerable_app.tar.gz These changes complete the Temporal migration and ensure the CLI/SDK work correctly with the new backend. * fix: Use positional args instead of kwargs for Temporal workflows The Temporal Python SDK's start_workflow() method doesn't accept a 'kwargs' parameter. Workflows must receive parameters as positional arguments via the 'args' parameter. Changed from: args=workflow_args # Positional arguments This fixes the error: TypeError: Client.start_workflow() got an unexpected keyword argument 'kwargs' Workflows now correctly receive parameters in order: - security_assessment: [target_id, scanner_config, analyzer_config, reporter_config] - atheris_fuzzing: [target_id, target_file, max_iterations, timeout_seconds] - rust_test: [target_id, test_message] * fix: Filter metadata-only parameters from workflow arguments SecurityAssessmentWorkflow was receiving 7 arguments instead of 2-5. The issue was that target_path and volume_mode from default_parameters were being passed to the workflow, when they should only be used by the system for configuration. Now filters out metadata-only parameters (target_path, volume_mode) before passing arguments to workflow execution. * refactor: Remove Prefect leftovers and volume mounting legacy Complete cleanup of Prefect migration artifacts: Backend: - Delete registry.py and workflow_discovery.py (Prefect-specific files) - Remove Docker validation from setup.py (no longer needed) - Remove ResourceLimits and VolumeMount models - Remove target_path and volume_mode from WorkflowSubmission - Remove supported_volume_modes from API and discovery - Clean up metadata.yaml files (remove volume/path fields) - Simplify parameter filtering in manager.py SDK: - Remove volume_mode parameter from client methods - Remove ResourceLimits and VolumeMount models - Remove Prefect error patterns from docker_logs.py - Clean up WorkflowSubmission and WorkflowMetadata models CLI: - Remove Volume Modes display from workflow info All removed features are Prefect-specific or Docker volume mounting artifacts. Temporal workflows use MinIO storage exclusively. * feat: Add comprehensive test suite and benchmark infrastructure - Add 68 unit tests for fuzzer, scanner, and analyzer modules - Implement pytest-based test infrastructure with fixtures - Add 6 performance benchmarks with category-specific thresholds - Configure GitHub Actions for automated testing and benchmarking - Add test and benchmark documentation Test coverage: - AtherisFuzzer: 8 tests - CargoFuzzer: 14 tests - FileScanner: 22 tests - SecurityAnalyzer: 24 tests All tests passing (68/68) All benchmarks passing (6/6) * fix: Resolve all ruff linting violations across codebase Fixed 27 ruff violations in 12 files: - Removed unused imports (Depends, Dict, Any, Optional, etc.) - Fixed undefined workflow_info variable in workflows.py - Removed dead code with undefined variables in atheris_fuzzer.py - Changed f-string to regular string where no placeholders used All files now pass ruff checks for CI/CD compliance. * fix: Configure CI for unit tests only - Renamed docker-compose.temporal.yaml → docker-compose.yml for CI compatibility - Commented out integration-tests job (no integration tests yet) - Updated test-summary to only depend on lint and unit-tests CI will now run successfully with 68 unit tests. Integration tests can be added later. * feat: Add CI/CD integration with ephemeral deployment model Implements comprehensive CI/CD support for FuzzForge with on-demand worker management: **Worker Management (v0.7.0)** - Add WorkerManager for automatic worker lifecycle control - Auto-start workers from stopped state when workflows execute - Auto-stop workers after workflow completion - Health checks and startup timeout handling (90s default) **CI/CD Features** - `--fail-on` flag: Fail builds based on SARIF severity levels (error/warning/note/info) - `--export-sarif` flag: Export findings in SARIF 2.1.0 format - `--auto-start`/`--auto-stop` flags: Control worker lifecycle - Exit code propagation: Returns 1 on blocking findings, 0 on success **Exit Code Fix** - Add `except typer.Exit: raise` handlers at 3 critical locations - Move worker cleanup to finally block for guaranteed execution - Exit codes now propagate correctly even when build fails **CI Scripts & Examples** - ci-start.sh: Start FuzzForge services with health checks - ci-stop.sh: Clean shutdown with volume preservation option - GitHub Actions workflow example (security-scan.yml) - GitLab CI pipeline example (.gitlab-ci.example.yml) - docker-compose.ci.yml: CI-optimized compose file with profiles **OSS-Fuzz Integration** - New ossfuzz_campaign workflow for running OSS-Fuzz projects - OSS-Fuzz worker with Docker-in-Docker support - Configurable campaign duration and project selection **Documentation** - Comprehensive CI/CD integration guide (docs/how-to/cicd-integration.md) - Updated architecture docs with worker lifecycle details - Updated workspace isolation documentation - CLI README with worker management examples **SDK Enhancements** - Add get_workflow_worker_info() endpoint - Worker vertical metadata in workflow responses **Testing** - All workflows tested: security_assessment, atheris_fuzzing, secret_detection, cargo_fuzzing - All monitoring commands tested: stats, crashes, status, finding - Full CI pipeline simulation verified - Exit codes verified for success/failure scenarios Ephemeral CI/CD model: ~3-4GB RAM, ~60-90s startup, runs entirely in CI containers. * fix: Resolve ruff linting violations in CI/CD code - Remove unused variables (run_id, defaults, result) - Remove unused imports - Fix f-string without placeholders All CI/CD integration files now pass ruff checks.
609 lines
22 KiB
Python
609 lines
22 KiB
Python
"""
|
|
Atheris Fuzzer Module
|
|
|
|
Reusable module for fuzzing Python code using Atheris.
|
|
Discovers and fuzzes user-provided Python targets with TestOneInput() function.
|
|
"""
|
|
|
|
import asyncio
|
|
import base64
|
|
import importlib.util
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import sys
|
|
import time
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List, Optional, Callable
|
|
import uuid
|
|
|
|
import httpx
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _run_atheris_in_subprocess(
|
|
target_path_str: str,
|
|
corpus_dir_str: str,
|
|
max_iterations: int,
|
|
timeout_seconds: int,
|
|
shared_crashes: Any,
|
|
exec_counter: multiprocessing.Value,
|
|
crash_counter: multiprocessing.Value,
|
|
coverage_counter: multiprocessing.Value
|
|
):
|
|
"""
|
|
Run atheris.Fuzz() in a separate process to isolate os._exit() calls.
|
|
|
|
This function runs in a subprocess and loads the target module,
|
|
sets up atheris, and runs fuzzing. Stats are communicated via shared memory.
|
|
|
|
Args:
|
|
target_path_str: String path to target file
|
|
corpus_dir_str: String path to corpus directory
|
|
max_iterations: Maximum fuzzing iterations
|
|
timeout_seconds: Timeout in seconds
|
|
shared_crashes: Manager().list() for storing crash details
|
|
exec_counter: Shared counter for executions
|
|
crash_counter: Shared counter for crashes
|
|
coverage_counter: Shared counter for coverage edges
|
|
"""
|
|
import atheris
|
|
import importlib.util
|
|
import traceback
|
|
from pathlib import Path
|
|
|
|
target_path = Path(target_path_str)
|
|
total_executions = 0
|
|
|
|
# NOTE: Crash details are written directly to shared_crashes (Manager().list())
|
|
# so they can be accessed by parent process after subprocess exits.
|
|
# We don't use a local crashes list because os._exit() prevents cleanup code.
|
|
|
|
try:
|
|
# Load target module in subprocess
|
|
module_name = f"fuzz_target_{uuid.uuid4().hex[:8]}"
|
|
spec = importlib.util.spec_from_file_location(module_name, target_path)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError(f"Could not load module from {target_path}")
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
spec.loader.exec_module(module)
|
|
|
|
if not hasattr(module, "TestOneInput"):
|
|
raise AttributeError("Module does not have TestOneInput() function")
|
|
|
|
test_one_input = module.TestOneInput
|
|
|
|
# Wrapper to track executions and crashes
|
|
def fuzz_wrapper(data):
|
|
nonlocal total_executions
|
|
total_executions += 1
|
|
|
|
# Update shared counter for live stats
|
|
with exec_counter.get_lock():
|
|
exec_counter.value += 1
|
|
|
|
try:
|
|
test_one_input(data)
|
|
except Exception as e:
|
|
# Capture crash details to shared memory
|
|
crash_info = {
|
|
"input": bytes(data), # Convert to bytes for serialization
|
|
"exception_type": type(e).__name__,
|
|
"exception_message": str(e),
|
|
"stack_trace": traceback.format_exc(),
|
|
"execution": total_executions
|
|
}
|
|
# Write to shared memory so parent process can access crash details
|
|
shared_crashes.append(crash_info)
|
|
|
|
# Update shared crash counter
|
|
with crash_counter.get_lock():
|
|
crash_counter.value += 1
|
|
|
|
# Re-raise so Atheris detects it
|
|
raise
|
|
|
|
# Check for dictionary file in target directory
|
|
dict_args = []
|
|
target_dir = target_path.parent
|
|
for dict_name in ["fuzz.dict", "fuzzing.dict", "dict.txt"]:
|
|
dict_path = target_dir / dict_name
|
|
if dict_path.exists():
|
|
dict_args.append(f"-dict={dict_path}")
|
|
break
|
|
|
|
# Configure Atheris
|
|
atheris_args = [
|
|
"atheris_fuzzer",
|
|
f"-runs={max_iterations}",
|
|
f"-max_total_time={timeout_seconds}",
|
|
"-print_final_stats=1"
|
|
] + dict_args + [corpus_dir_str] # Corpus directory as positional arg
|
|
|
|
atheris.Setup(atheris_args, fuzz_wrapper)
|
|
|
|
# Run fuzzing (this will call os._exit() when done)
|
|
atheris.Fuzz()
|
|
|
|
except SystemExit:
|
|
# Atheris exits when done - this is normal
|
|
# Crash details already written to shared_crashes
|
|
pass
|
|
except Exception:
|
|
# Fatal error - traceback already written to shared memory
|
|
# via crash handler in fuzz_wrapper
|
|
pass
|
|
|
|
|
|
class AtherisFuzzer(BaseModule):
|
|
"""
|
|
Atheris fuzzing module - discovers and fuzzes Python code.
|
|
|
|
This module can be used by any workflow to fuzz Python targets.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.crashes = []
|
|
self.total_executions = 0
|
|
self.start_time = None
|
|
self.last_stats_time = 0
|
|
self.run_id = None
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Return module metadata"""
|
|
return ModuleMetadata(
|
|
name="atheris_fuzzer",
|
|
version="1.0.0",
|
|
description="Python fuzzing using Atheris - discovers and fuzzes TestOneInput() functions",
|
|
author="FuzzForge Team",
|
|
category="fuzzer",
|
|
tags=["fuzzing", "atheris", "python", "coverage"],
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"target_file": {
|
|
"type": "string",
|
|
"description": "Python file with TestOneInput() function (auto-discovered if not specified)"
|
|
},
|
|
"max_iterations": {
|
|
"type": "integer",
|
|
"description": "Maximum fuzzing iterations",
|
|
"default": 100000
|
|
},
|
|
"timeout_seconds": {
|
|
"type": "integer",
|
|
"description": "Fuzzing timeout in seconds",
|
|
"default": 300
|
|
},
|
|
"stats_callback": {
|
|
"description": "Optional callback for real-time statistics"
|
|
}
|
|
}
|
|
},
|
|
requires_workspace=True
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate fuzzing configuration"""
|
|
max_iterations = config.get("max_iterations", 100000)
|
|
if not isinstance(max_iterations, int) or max_iterations <= 0:
|
|
raise ValueError(f"max_iterations must be positive integer, got: {max_iterations}")
|
|
|
|
timeout = config.get("timeout_seconds", 300)
|
|
if not isinstance(timeout, int) or timeout <= 0:
|
|
raise ValueError(f"timeout_seconds must be positive integer, got: {timeout}")
|
|
|
|
return True
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute Atheris fuzzing on user code.
|
|
|
|
Args:
|
|
config: Fuzzing configuration
|
|
workspace: Path to user's uploaded code
|
|
|
|
Returns:
|
|
ModuleResult with crash findings
|
|
"""
|
|
self.start_timer()
|
|
self.start_time = time.time()
|
|
|
|
# Validate configuration
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
# Extract config
|
|
target_file = config.get("target_file")
|
|
max_iterations = config.get("max_iterations", 100000)
|
|
timeout_seconds = config.get("timeout_seconds", 300)
|
|
stats_callback = config.get("stats_callback")
|
|
self.run_id = config.get("run_id")
|
|
|
|
logger.info(
|
|
f"Starting Atheris fuzzing (max_iterations={max_iterations}, "
|
|
f"timeout={timeout_seconds}s, target={target_file or 'auto-discover'})"
|
|
)
|
|
|
|
try:
|
|
# Step 1: Discover or load target
|
|
target_path = self._discover_target(workspace, target_file)
|
|
logger.info(f"Using fuzz target: {target_path}")
|
|
|
|
# Step 2: Load target module
|
|
test_one_input = self._load_target_module(target_path)
|
|
logger.info(f"Loaded TestOneInput function from {target_path}")
|
|
|
|
# Step 3: Run fuzzing
|
|
await self._run_fuzzing(
|
|
test_one_input=test_one_input,
|
|
target_path=target_path,
|
|
workspace=workspace,
|
|
max_iterations=max_iterations,
|
|
timeout_seconds=timeout_seconds,
|
|
stats_callback=stats_callback
|
|
)
|
|
|
|
# Step 4: Generate findings from crashes
|
|
findings = await self._generate_findings(target_path)
|
|
|
|
logger.info(
|
|
f"Fuzzing completed: {self.total_executions} executions, "
|
|
f"{len(self.crashes)} crashes found"
|
|
)
|
|
|
|
# Generate SARIF report (always, even with no findings)
|
|
from modules.reporter import SARIFReporter
|
|
reporter = SARIFReporter()
|
|
reporter_config = {
|
|
"findings": findings,
|
|
"tool_name": "Atheris Fuzzer",
|
|
"tool_version": self._metadata.version
|
|
}
|
|
reporter_result = await reporter.execute(reporter_config, workspace)
|
|
sarif_report = reporter_result.sarif
|
|
|
|
return ModuleResult(
|
|
module=self._metadata.name,
|
|
version=self._metadata.version,
|
|
status="success",
|
|
execution_time=self.get_execution_time(),
|
|
findings=findings,
|
|
summary={
|
|
"total_executions": self.total_executions,
|
|
"crashes_found": len(self.crashes),
|
|
"execution_time": self.get_execution_time(),
|
|
"target_file": str(target_path.relative_to(workspace))
|
|
},
|
|
metadata={
|
|
"max_iterations": max_iterations,
|
|
"timeout_seconds": timeout_seconds
|
|
},
|
|
sarif=sarif_report
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fuzzing failed: {e}", exc_info=True)
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=str(e)
|
|
)
|
|
|
|
def _discover_target(self, workspace: Path, target_file: Optional[str]) -> Path:
|
|
"""
|
|
Discover fuzz target in workspace.
|
|
|
|
Args:
|
|
workspace: Path to workspace
|
|
target_file: Explicit target file or None for auto-discovery
|
|
|
|
Returns:
|
|
Path to target file
|
|
"""
|
|
if target_file:
|
|
# Use specified target
|
|
target_path = workspace / target_file
|
|
if not target_path.exists():
|
|
raise FileNotFoundError(f"Target file not found: {target_file}")
|
|
return target_path
|
|
|
|
# Auto-discover: look for fuzz_*.py or *_fuzz.py
|
|
logger.info("Auto-discovering fuzz targets...")
|
|
|
|
candidates = []
|
|
# Use rglob for recursive search (searches all subdirectories)
|
|
for pattern in ["fuzz_*.py", "*_fuzz.py", "fuzz_target.py"]:
|
|
matches = list(workspace.rglob(pattern))
|
|
candidates.extend(matches)
|
|
|
|
if not candidates:
|
|
raise FileNotFoundError(
|
|
"No fuzz targets found. Expected files matching: fuzz_*.py, *_fuzz.py, or fuzz_target.py"
|
|
)
|
|
|
|
# Use first candidate
|
|
target = candidates[0]
|
|
if len(candidates) > 1:
|
|
logger.warning(
|
|
f"Multiple fuzz targets found: {[str(c) for c in candidates]}. "
|
|
f"Using: {target.name}"
|
|
)
|
|
|
|
return target
|
|
|
|
def _load_target_module(self, target_path: Path) -> Callable:
|
|
"""
|
|
Load target module and get TestOneInput function.
|
|
|
|
Args:
|
|
target_path: Path to Python file with TestOneInput
|
|
|
|
Returns:
|
|
TestOneInput function
|
|
"""
|
|
# Add target directory to sys.path
|
|
target_dir = target_path.parent
|
|
if str(target_dir) not in sys.path:
|
|
sys.path.insert(0, str(target_dir))
|
|
|
|
# Load module dynamically
|
|
module_name = target_path.stem
|
|
spec = importlib.util.spec_from_file_location(module_name, target_path)
|
|
if spec is None or spec.loader is None:
|
|
raise ImportError(f"Cannot load module from {target_path}")
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
spec.loader.exec_module(module)
|
|
|
|
# Get TestOneInput function
|
|
if not hasattr(module, "TestOneInput"):
|
|
raise AttributeError(
|
|
f"Module {module_name} does not have TestOneInput() function. "
|
|
"Atheris requires a TestOneInput(data: bytes) function."
|
|
)
|
|
|
|
return module.TestOneInput
|
|
|
|
async def _run_fuzzing(
|
|
self,
|
|
test_one_input: Callable,
|
|
target_path: Path,
|
|
workspace: Path,
|
|
max_iterations: int,
|
|
timeout_seconds: int,
|
|
stats_callback: Optional[Callable] = None
|
|
):
|
|
"""
|
|
Run Atheris fuzzing with real-time monitoring.
|
|
|
|
Args:
|
|
test_one_input: TestOneInput function to fuzz (not used, loaded in subprocess)
|
|
target_path: Path to target file
|
|
workspace: Path to workspace directory
|
|
max_iterations: Max iterations
|
|
timeout_seconds: Timeout in seconds
|
|
stats_callback: Optional callback for stats
|
|
"""
|
|
self.crashes = []
|
|
self.total_executions = 0
|
|
|
|
# Create corpus directory in workspace
|
|
corpus_dir = workspace / ".fuzzforge_corpus"
|
|
corpus_dir.mkdir(exist_ok=True)
|
|
logger.info(f"Using corpus directory: {corpus_dir}")
|
|
|
|
logger.info(f"Starting Atheris fuzzer in subprocess (max_runs={max_iterations}, timeout={timeout_seconds}s)...")
|
|
|
|
# Create shared memory for subprocess communication
|
|
ctx = multiprocessing.get_context('spawn')
|
|
manager = ctx.Manager()
|
|
shared_crashes = manager.list() # Shared list for crash details
|
|
exec_counter = ctx.Value('i', 0) # Shared execution counter
|
|
crash_counter = ctx.Value('i', 0) # Shared crash counter
|
|
coverage_counter = ctx.Value('i', 0) # Shared coverage counter
|
|
|
|
# Start fuzzing in subprocess
|
|
process = ctx.Process(
|
|
target=_run_atheris_in_subprocess,
|
|
args=(str(target_path), str(corpus_dir), max_iterations, timeout_seconds, shared_crashes, exec_counter, crash_counter, coverage_counter)
|
|
)
|
|
|
|
# Run fuzzing in a separate task with monitoring
|
|
async def monitor_stats():
|
|
"""Monitor and report stats every 0.5 seconds"""
|
|
while True:
|
|
await asyncio.sleep(0.5)
|
|
|
|
if stats_callback:
|
|
elapsed = time.time() - self.start_time
|
|
# Read from shared counters
|
|
current_execs = exec_counter.value
|
|
current_crashes = crash_counter.value
|
|
current_coverage = coverage_counter.value
|
|
execs_per_sec = current_execs / elapsed if elapsed > 0 else 0
|
|
|
|
# Count corpus files
|
|
try:
|
|
corpus_size = len(list(corpus_dir.iterdir())) if corpus_dir.exists() else 0
|
|
except Exception:
|
|
corpus_size = 0
|
|
|
|
# TODO: Get real coverage from Atheris
|
|
# For now use corpus_size as proxy
|
|
coverage_value = current_coverage if current_coverage > 0 else corpus_size
|
|
|
|
await stats_callback({
|
|
"total_execs": current_execs,
|
|
"execs_per_sec": execs_per_sec,
|
|
"crashes": current_crashes,
|
|
"corpus_size": corpus_size,
|
|
"coverage": coverage_value, # Using corpus as coverage proxy
|
|
"elapsed_time": int(elapsed)
|
|
})
|
|
|
|
# Start monitoring task
|
|
monitor_task = None
|
|
if stats_callback:
|
|
monitor_task = asyncio.create_task(monitor_stats())
|
|
|
|
try:
|
|
# Start subprocess
|
|
process.start()
|
|
logger.info(f"Fuzzing subprocess started (PID: {process.pid})")
|
|
|
|
# Wait for subprocess to complete
|
|
while process.is_alive():
|
|
await asyncio.sleep(0.1)
|
|
|
|
# NOTE: We cannot use result_queue because Atheris calls os._exit()
|
|
# which terminates immediately without putting results in the queue.
|
|
# Instead, we rely on shared memory (Manager().list() and Value counters).
|
|
|
|
# Read final values from shared memory
|
|
self.total_executions = exec_counter.value
|
|
total_crashes = crash_counter.value
|
|
|
|
# Read crash details from shared memory and convert to our format
|
|
self.crashes = []
|
|
for crash_data in shared_crashes:
|
|
# Reconstruct crash info with exception object
|
|
crash_info = {
|
|
"input": crash_data["input"],
|
|
"exception": Exception(crash_data["exception_message"]),
|
|
"exception_type": crash_data["exception_type"],
|
|
"stack_trace": crash_data["stack_trace"],
|
|
"execution": crash_data["execution"]
|
|
}
|
|
self.crashes.append(crash_info)
|
|
|
|
logger.warning(
|
|
f"Crash found (execution {crash_data['execution']}): "
|
|
f"{crash_data['exception_type']}: {crash_data['exception_message']}"
|
|
)
|
|
|
|
logger.info(f"Fuzzing completed: {self.total_executions} executions, {total_crashes} crashes found")
|
|
|
|
# Send final stats update
|
|
if stats_callback:
|
|
elapsed = time.time() - self.start_time
|
|
execs_per_sec = self.total_executions / elapsed if elapsed > 0 else 0
|
|
|
|
# Count final corpus size
|
|
try:
|
|
final_corpus_size = len(list(corpus_dir.iterdir())) if corpus_dir.exists() else 0
|
|
except Exception:
|
|
final_corpus_size = 0
|
|
|
|
# TODO: Parse coverage from Atheris output
|
|
# For now, use corpus size as proxy (corpus grows with coverage)
|
|
# libFuzzer writes coverage to stdout but sys.stdout redirection
|
|
# doesn't work because it writes to FD 1 directly from C++
|
|
final_coverage = coverage_counter.value if coverage_counter.value > 0 else final_corpus_size
|
|
|
|
await stats_callback({
|
|
"total_execs": self.total_executions,
|
|
"execs_per_sec": execs_per_sec,
|
|
"crashes": total_crashes,
|
|
"corpus_size": final_corpus_size,
|
|
"coverage": final_coverage,
|
|
"elapsed_time": int(elapsed)
|
|
})
|
|
|
|
# Wait for process to fully terminate
|
|
process.join(timeout=5)
|
|
|
|
if process.exitcode is not None and process.exitcode != 0:
|
|
logger.warning(f"Subprocess exited with code: {process.exitcode}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Fuzzing execution error: {e}")
|
|
if process.is_alive():
|
|
logger.warning("Terminating fuzzing subprocess...")
|
|
process.terminate()
|
|
process.join(timeout=5)
|
|
if process.is_alive():
|
|
process.kill()
|
|
raise
|
|
finally:
|
|
# Stop monitoring
|
|
if monitor_task:
|
|
monitor_task.cancel()
|
|
try:
|
|
await monitor_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
async def _generate_findings(self, target_path: Path) -> List[ModuleFinding]:
|
|
"""
|
|
Generate ModuleFinding objects from crashes.
|
|
|
|
Args:
|
|
target_path: Path to target file
|
|
|
|
Returns:
|
|
List of findings
|
|
"""
|
|
findings = []
|
|
|
|
for idx, crash in enumerate(self.crashes):
|
|
# Encode crash input for storage
|
|
crash_input_b64 = base64.b64encode(crash["input"]).decode()
|
|
|
|
finding = self.create_finding(
|
|
title=f"Crash: {crash['exception_type']}",
|
|
description=(
|
|
f"Atheris found crash during fuzzing:\n"
|
|
f"Exception: {crash['exception_type']}\n"
|
|
f"Message: {str(crash['exception'])}\n"
|
|
f"Execution: {crash['execution']}"
|
|
),
|
|
severity="critical",
|
|
category="crash",
|
|
file_path=str(target_path),
|
|
metadata={
|
|
"crash_input_base64": crash_input_b64,
|
|
"crash_input_hex": crash["input"].hex(),
|
|
"exception_type": crash["exception_type"],
|
|
"stack_trace": crash["stack_trace"],
|
|
"execution_number": crash["execution"]
|
|
},
|
|
recommendation=(
|
|
"Review the crash stack trace and input to identify the vulnerability. "
|
|
"The crash input is provided in base64 and hex formats for reproduction."
|
|
)
|
|
)
|
|
findings.append(finding)
|
|
|
|
# Report crash to backend for real-time monitoring
|
|
if self.run_id:
|
|
try:
|
|
crash_report = {
|
|
"run_id": self.run_id,
|
|
"crash_id": f"crash_{idx + 1}",
|
|
"timestamp": datetime.utcnow().isoformat(),
|
|
"crash_type": crash["exception_type"],
|
|
"stack_trace": crash["stack_trace"],
|
|
"input_file": crash_input_b64,
|
|
"severity": "critical",
|
|
"exploitability": "unknown"
|
|
}
|
|
|
|
backend_url = os.getenv("BACKEND_URL", "http://backend:8000")
|
|
async with httpx.AsyncClient(timeout=5.0) as client:
|
|
await client.post(
|
|
f"{backend_url}/fuzzing/{self.run_id}/crash",
|
|
json=crash_report
|
|
)
|
|
logger.debug(f"Crash report sent to backend: {crash_report['crash_id']}")
|
|
except Exception as e:
|
|
logger.debug(f"Failed to post crash report to backend: {e}")
|
|
|
|
return findings
|