feat: Add Python fuzzing vertical with Atheris integration

This commit implements a complete Python fuzzing workflow using Atheris:

## Python Worker (workers/python/)
- Dockerfile with Python 3.11, Atheris, and build tools
- Generic worker.py for dynamic workflow discovery
- requirements.txt with temporalio, boto3, atheris dependencies
- Added to docker-compose.temporal.yaml with dedicated cache volume

## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/)
- Reusable module extending BaseModule
- Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py)
- Recursive search to find targets in nested directories
- Dynamically loads TestOneInput() function
- Configurable max_iterations and timeout
- Real-time stats callback support for live monitoring
- Returns findings as ModuleFinding objects

## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/)
- Temporal workflow for orchestrating fuzzing
- Downloads user code from MinIO
- Executes AtherisFuzzer module
- Uploads results to MinIO
- Cleans up cache after execution
- metadata.yaml with vertical: python for routing

## Test Project (test_projects/python_fuzz_waterfall/)
- Demonstrates stateful waterfall vulnerability
- main.py with check_secret() that leaks progress
- fuzz_target.py with Atheris TestOneInput() harness
- Complete README with usage instructions

## Backend Fixes
- Fixed parameter merging in REST API endpoints (workflows.py)
- Changed workflow parameter passing from positional args to kwargs (manager.py)
- Default parameters now properly merged with user parameters

## Testing
 Worker discovered AtherisFuzzingWorkflow
 Workflow executed end-to-end successfully
 Fuzz target auto-discovered in nested directories
 Atheris ran 100,000 iterations
 Results uploaded and cache cleaned
This commit is contained in:
Tanguy Duhamel
2025-10-02 11:06:34 +02:00
parent 0680f14df6
commit fe50d4ef72
16 changed files with 1668 additions and 13 deletions
+192 -3
View File
@@ -15,8 +15,10 @@ API endpoints for workflow management with enhanced error handling
import logging
import traceback
import tempfile
import shutil
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form
from pathlib import Path
from src.models.findings import (
@@ -29,6 +31,16 @@ from src.temporal.discovery import WorkflowDiscovery
logger = logging.getLogger(__name__)
# Configuration for file uploads
MAX_UPLOAD_SIZE = 10 * 1024 * 1024 * 1024 # 10 GB
ALLOWED_CONTENT_TYPES = [
"application/gzip",
"application/x-gzip",
"application/x-tar",
"application/x-compressed-tar",
"application/octet-stream", # Generic binary
]
router = APIRouter(prefix="/workflows", tags=["workflows"])
@@ -209,8 +221,11 @@ async def submit_workflow(
metadata={"workflow": workflow_name}
)
# Prepare workflow parameters
workflow_params = submission.parameters or {}
# Merge default parameters with user parameters
metadata = workflow_info.metadata or {}
defaults = metadata.get("default_parameters", {})
user_params = submission.parameters or {}
workflow_params = {**defaults, **user_params}
# Start workflow execution
handle = await temporal_mgr.run_workflow(
@@ -321,6 +336,180 @@ async def submit_workflow(
)
@router.post("/{workflow_name}/upload-and-submit", response_model=RunSubmissionResponse)
async def upload_and_submit_workflow(
workflow_name: str,
file: UploadFile = File(..., description="Target file or tarball to analyze"),
parameters: Optional[str] = Form(None, description="JSON-encoded workflow parameters"),
volume_mode: str = Form("ro", description="Volume mount mode (ro/rw)"),
timeout: Optional[int] = Form(None, description="Timeout in seconds"),
temporal_mgr=Depends(get_temporal_manager)
) -> RunSubmissionResponse:
"""
Upload a target file/tarball and submit workflow for execution.
This endpoint accepts multipart/form-data uploads and is the recommended
way to submit workflows from remote CLI clients.
Args:
workflow_name: Name of the workflow to execute
file: Target file or tarball (compressed directory)
parameters: JSON string of workflow parameters (optional)
volume_mode: Volume mount mode - "ro" (read-only) or "rw" (read-write)
timeout: Execution timeout in seconds (optional)
Returns:
Run submission response with run_id and initial status
Raises:
HTTPException: 404 if workflow not found, 400 for invalid parameters,
413 if file too large
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows"
]
)
raise HTTPException(status_code=404, detail=error_response)
temp_file_path = None
try:
# Validate file size
file_size = 0
chunk_size = 1024 * 1024 # 1MB chunks
# Create temporary file
temp_fd, temp_file_path = tempfile.mkstemp(suffix=".tar.gz")
logger.info(f"Receiving file upload for workflow '{workflow_name}': {file.filename}")
# Stream file to disk
with open(temp_fd, 'wb') as temp_file:
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
file_size += len(chunk)
# Check size limit
if file_size > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=create_structured_error_response(
error_type="FileTooLarge",
message=f"File size exceeds maximum allowed size of {MAX_UPLOAD_SIZE / (1024**3):.1f} GB",
workflow_name=workflow_name,
suggestions=[
"Reduce the size of your target directory",
"Exclude unnecessary files (build artifacts, dependencies, etc.)",
"Consider splitting into smaller analysis targets"
]
)
)
temp_file.write(chunk)
logger.info(f"Received file: {file_size / (1024**2):.2f} MB")
# Parse parameters
workflow_params = {}
if parameters:
try:
import json
workflow_params = json.loads(parameters)
if not isinstance(workflow_params, dict):
raise ValueError("Parameters must be a JSON object")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=create_structured_error_response(
error_type="InvalidParameters",
message=f"Invalid parameters JSON: {e}",
workflow_name=workflow_name,
suggestions=["Ensure parameters is valid JSON object"]
)
)
# Upload to MinIO
target_id = await temporal_mgr.upload_target(
file_path=Path(temp_file_path),
user_id="api-user",
metadata={
"workflow": workflow_name,
"original_filename": file.filename,
"upload_method": "multipart"
}
)
logger.info(f"Uploaded to MinIO with target_id: {target_id}")
# Merge default parameters with user parameters
workflow_info = temporal_mgr.workflows.get(workflow_name)
metadata = workflow_info.metadata or {}
defaults = metadata.get("default_parameters", {})
workflow_params = {**defaults, **workflow_params}
# Start workflow execution
handle = await temporal_mgr.run_workflow(
workflow_name=workflow_name,
target_id=target_id,
workflow_params=workflow_params
)
run_id = handle.id
# Initialize fuzzing tracking if needed
workflow_info = temporal_mgr.workflows.get(workflow_name, {})
workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else []
if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower():
from src.api.fuzzing import initialize_fuzzing_tracking
initialize_fuzzing_tracking(run_id, workflow_name)
return RunSubmissionResponse(
run_id=run_id,
status="RUNNING",
workflow=workflow_name,
message=f"Workflow '{workflow_name}' submitted successfully with uploaded target"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to upload and submit workflow '{workflow_name}': {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
error_response = create_structured_error_response(
error_type="WorkflowSubmissionError",
message=f"Failed to process upload and submit workflow: {str(e)}",
workflow_name=workflow_name,
suggestions=[
"Check if the uploaded file is a valid tarball",
"Verify MinIO storage is accessible",
"Check backend logs for detailed error information",
"Ensure Temporal workers are running"
]
)
raise HTTPException(status_code=500, detail=error_response)
finally:
# Cleanup temporary file
if temp_file_path and Path(temp_file_path).exists():
try:
Path(temp_file_path).unlink()
logger.debug(f"Cleaned up temp file: {temp_file_path}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file {temp_file_path}: {e}")
@router.get("/{workflow_name}/parameters")
async def get_workflow_parameters(
workflow_name: str,
+4 -10
View File
@@ -179,15 +179,8 @@ class TemporalManager:
if not workflow_id:
workflow_id = f"{workflow_name}-{str(uuid4())[:8]}"
# Prepare workflow input arguments in order
# For security_assessment: (target_id, scanner_config, analyzer_config, reporter_config)
# Prepare workflow input - target_id as first arg, rest as kwargs
workflow_params = workflow_params or {}
workflow_args = [
target_id,
workflow_params.get("scanner_config"),
workflow_params.get("analyzer_config"),
workflow_params.get("reporter_config")
]
# Determine task queue from workflow vertical
vertical = workflow_info.metadata.get("vertical", "default")
@@ -199,10 +192,11 @@ class TemporalManager:
)
try:
# Start workflow execution with positional arguments
# Start workflow execution with target_id + keyword arguments
handle = await self.client.start_workflow(
workflow=workflow_info.workflow_type, # Workflow class name
args=workflow_args, # Positional arguments
arg=target_id, # First positional argument
kwargs=workflow_params, # Rest as keyword arguments
id=workflow_id,
task_queue=task_queue,
retry_policy=RetryPolicy(
@@ -0,0 +1,9 @@
"""
Fuzzing modules for FuzzForge
This package contains fuzzing modules for different fuzzing engines.
"""
from .atheris_fuzzer import AtherisFuzzer
__all__ = ["AtherisFuzzer"]
@@ -0,0 +1,377 @@
"""
Atheris Fuzzer Module
Reusable module for fuzzing Python code using Atheris.
Discovers and fuzzes user-provided Python targets with TestOneInput() function.
"""
import asyncio
import base64
import importlib.util
import logging
import sys
import time
import traceback
from pathlib import Path
from typing import Dict, Any, List, Optional, Callable
import uuid
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class AtherisFuzzer(BaseModule):
"""
Atheris fuzzing module - discovers and fuzzes Python code.
This module can be used by any workflow to fuzz Python targets.
"""
def __init__(self):
super().__init__()
self.crashes = []
self.total_executions = 0
self.start_time = None
self.last_stats_time = 0
def get_metadata(self) -> ModuleMetadata:
"""Return module metadata"""
return ModuleMetadata(
name="atheris_fuzzer",
version="1.0.0",
description="Python fuzzing using Atheris - discovers and fuzzes TestOneInput() functions",
author="FuzzForge Team",
category="fuzzer",
tags=["fuzzing", "atheris", "python", "coverage"],
input_schema={
"type": "object",
"properties": {
"target_file": {
"type": "string",
"description": "Python file with TestOneInput() function (auto-discovered if not specified)"
},
"max_iterations": {
"type": "integer",
"description": "Maximum fuzzing iterations",
"default": 100000
},
"timeout_seconds": {
"type": "integer",
"description": "Fuzzing timeout in seconds",
"default": 300
},
"stats_callback": {
"description": "Optional callback for real-time statistics"
}
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate fuzzing configuration"""
max_iterations = config.get("max_iterations", 100000)
if not isinstance(max_iterations, int) or max_iterations <= 0:
raise ValueError(f"max_iterations must be positive integer, got: {max_iterations}")
timeout = config.get("timeout_seconds", 300)
if not isinstance(timeout, int) or timeout <= 0:
raise ValueError(f"timeout_seconds must be positive integer, got: {timeout}")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute Atheris fuzzing on user code.
Args:
config: Fuzzing configuration
workspace: Path to user's uploaded code
Returns:
ModuleResult with crash findings
"""
self.start_timer()
self.start_time = time.time()
# Validate configuration
self.validate_config(config)
self.validate_workspace(workspace)
# Extract config
target_file = config.get("target_file")
max_iterations = config.get("max_iterations", 100000)
timeout_seconds = config.get("timeout_seconds", 300)
stats_callback = config.get("stats_callback")
logger.info(
f"Starting Atheris fuzzing (max_iterations={max_iterations}, "
f"timeout={timeout_seconds}s, target={target_file or 'auto-discover'})"
)
try:
# Step 1: Discover or load target
target_path = self._discover_target(workspace, target_file)
logger.info(f"Using fuzz target: {target_path}")
# Step 2: Load target module
test_one_input = self._load_target_module(target_path)
logger.info(f"Loaded TestOneInput function from {target_path}")
# Step 3: Run fuzzing
await self._run_fuzzing(
test_one_input=test_one_input,
target_path=target_path,
max_iterations=max_iterations,
timeout_seconds=timeout_seconds,
stats_callback=stats_callback
)
# Step 4: Generate findings from crashes
findings = self._generate_findings(target_path)
logger.info(
f"Fuzzing completed: {self.total_executions} executions, "
f"{len(self.crashes)} crashes found"
)
return self.create_result(
findings=findings,
status="success",
summary={
"total_executions": self.total_executions,
"crashes_found": len(self.crashes),
"execution_time": self.get_execution_time(),
"target_file": str(target_path.relative_to(workspace))
},
metadata={
"max_iterations": max_iterations,
"timeout_seconds": timeout_seconds
}
)
except Exception as e:
logger.error(f"Fuzzing failed: {e}", exc_info=True)
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _discover_target(self, workspace: Path, target_file: Optional[str]) -> Path:
"""
Discover fuzz target in workspace.
Args:
workspace: Path to workspace
target_file: Explicit target file or None for auto-discovery
Returns:
Path to target file
"""
if target_file:
# Use specified target
target_path = workspace / target_file
if not target_path.exists():
raise FileNotFoundError(f"Target file not found: {target_file}")
return target_path
# Auto-discover: look for fuzz_*.py or *_fuzz.py
logger.info("Auto-discovering fuzz targets...")
candidates = []
# Use rglob for recursive search (searches all subdirectories)
for pattern in ["fuzz_*.py", "*_fuzz.py", "fuzz_target.py"]:
matches = list(workspace.rglob(pattern))
candidates.extend(matches)
if not candidates:
raise FileNotFoundError(
"No fuzz targets found. Expected files matching: fuzz_*.py, *_fuzz.py, or fuzz_target.py"
)
# Use first candidate
target = candidates[0]
if len(candidates) > 1:
logger.warning(
f"Multiple fuzz targets found: {[str(c) for c in candidates]}. "
f"Using: {target.name}"
)
return target
def _load_target_module(self, target_path: Path) -> Callable:
"""
Load target module and get TestOneInput function.
Args:
target_path: Path to Python file with TestOneInput
Returns:
TestOneInput function
"""
# Add target directory to sys.path
target_dir = target_path.parent
if str(target_dir) not in sys.path:
sys.path.insert(0, str(target_dir))
# Load module dynamically
module_name = target_path.stem
spec = importlib.util.spec_from_file_location(module_name, target_path)
if spec is None or spec.loader is None:
raise ImportError(f"Cannot load module from {target_path}")
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Get TestOneInput function
if not hasattr(module, "TestOneInput"):
raise AttributeError(
f"Module {module_name} does not have TestOneInput() function. "
"Atheris requires a TestOneInput(data: bytes) function."
)
return module.TestOneInput
async def _run_fuzzing(
self,
test_one_input: Callable,
target_path: Path,
max_iterations: int,
timeout_seconds: int,
stats_callback: Optional[Callable] = None
):
"""
Run Atheris fuzzing with real-time monitoring.
Args:
test_one_input: TestOneInput function to fuzz
target_path: Path to target file
max_iterations: Max iterations
timeout_seconds: Timeout in seconds
stats_callback: Optional callback for stats
"""
import atheris
self.crashes = []
self.total_executions = 0
corpus_size = 0
# Wrapper to track executions and crashes
def fuzz_wrapper(data):
self.total_executions += 1
try:
test_one_input(data)
except Exception as e:
# Capture crash
crash_info = {
"input": data,
"exception": e,
"exception_type": type(e).__name__,
"stack_trace": traceback.format_exc(),
"execution": self.total_executions
}
self.crashes.append(crash_info)
logger.warning(
f"Crash found (execution {self.total_executions}): "
f"{type(e).__name__}: {str(e)}"
)
# Re-raise so Atheris detects it
raise
# Configure Atheris
atheris.Setup(
[
"atheris_fuzzer",
f"-runs={max_iterations}",
f"-max_total_time={timeout_seconds}",
"-print_final_stats=1"
],
fuzz_wrapper
)
logger.info(f"Starting Atheris fuzzer (max_runs={max_iterations}, timeout={timeout_seconds}s)...")
# Run fuzzing in a separate task with monitoring
async def monitor_stats():
"""Monitor and report stats every 5 seconds"""
while True:
await asyncio.sleep(5)
if stats_callback:
elapsed = time.time() - self.start_time
execs_per_sec = self.total_executions / elapsed if elapsed > 0 else 0
await stats_callback({
"total_execs": self.total_executions,
"execs_per_sec": execs_per_sec,
"crashes": len(self.crashes),
"corpus_size": corpus_size,
"coverage": 0.0, # Atheris doesn't expose coverage easily
"elapsed_time": int(elapsed)
})
# Start monitoring task
monitor_task = None
if stats_callback:
monitor_task = asyncio.create_task(monitor_stats())
try:
# Run fuzzing (blocking)
atheris.Fuzz()
except SystemExit:
# Atheris exits when done
pass
finally:
# Stop monitoring
if monitor_task:
monitor_task.cancel()
try:
await monitor_task
except asyncio.CancelledError:
pass
def _generate_findings(self, target_path: Path) -> List[ModuleFinding]:
"""
Generate ModuleFinding objects from crashes.
Args:
target_path: Path to target file
Returns:
List of findings
"""
findings = []
for crash in self.crashes:
# Encode crash input for storage
crash_input_b64 = base64.b64encode(crash["input"]).decode()
finding = self.create_finding(
title=f"Crash: {crash['exception_type']}",
description=(
f"Atheris found crash during fuzzing:\n"
f"Exception: {crash['exception_type']}\n"
f"Message: {str(crash['exception'])}\n"
f"Execution: {crash['execution']}"
),
severity="critical",
category="crash",
file_path=str(target_path),
metadata={
"crash_input_base64": crash_input_b64,
"crash_input_hex": crash["input"].hex(),
"exception_type": crash["exception_type"],
"stack_trace": crash["stack_trace"],
"execution_number": crash["execution"]
},
recommendation=(
"Review the crash stack trace and input to identify the vulnerability. "
"The crash input is provided in base64 and hex formats for reproduction."
)
)
findings.append(finding)
return findings
@@ -0,0 +1,9 @@
"""
Atheris Fuzzing Workflow
Fuzzes user-provided Python code using Atheris.
"""
from .workflow import AtherisFuzzingWorkflow
__all__ = ["AtherisFuzzingWorkflow"]
@@ -0,0 +1,90 @@
"""
Atheris Fuzzing Workflow Activities
Activities specific to the Atheris fuzzing workflow.
"""
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, Any
from temporalio import activity
# Configure logging
logger = logging.getLogger(__name__)
# Add toolbox to path for module imports
sys.path.insert(0, '/app/toolbox')
@activity.defn(name="fuzz_with_atheris")
async def fuzz_activity(workspace_path: str, config: dict) -> dict:
"""
Fuzzing activity using the AtherisFuzzer module on user code.
This activity:
1. Imports the reusable AtherisFuzzer module
2. Sets up real-time stats callback
3. Executes fuzzing on user's TestOneInput() function
4. Returns findings as ModuleResult
Args:
workspace_path: Path to the workspace directory (user's uploaded code)
config: Fuzzer configuration (target_file, max_iterations, timeout_seconds)
Returns:
Fuzzer results dictionary (findings, summary, metadata)
"""
logger.info(f"Activity: fuzz_with_atheris (workspace={workspace_path})")
try:
# Import reusable AtherisFuzzer module
from modules.fuzzer import AtherisFuzzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
# Get activity info for real-time stats
info = activity.info()
run_id = info.workflow_id
# Define stats callback for real-time monitoring
async def stats_callback(stats_data: Dict[str, Any]):
"""Callback for live fuzzing statistics"""
try:
logger.info("LIVE_STATS", extra={
"stats_type": "fuzzing_live_update",
"workflow_type": "atheris_fuzzing",
"run_id": run_id,
"executions": stats_data.get("total_execs", 0),
"executions_per_sec": stats_data.get("execs_per_sec", 0.0),
"crashes": stats_data.get("crashes", 0),
"corpus_size": stats_data.get("corpus_size", 0),
"coverage": stats_data.get("coverage", 0.0),
"elapsed_time": stats_data.get("elapsed_time", 0),
"timestamp": datetime.utcnow().isoformat()
})
except Exception as e:
logger.warning(f"Error in stats callback: {e}")
# Add stats callback to config
config["stats_callback"] = stats_callback
# Execute the fuzzer module
fuzzer = AtherisFuzzer()
result = await fuzzer.execute(config, workspace)
logger.info(
f"✓ Fuzzing completed: "
f"{result.summary.get('total_executions', 0)} executions, "
f"{result.summary.get('crashes_found', 0)} crashes"
)
return result.dict()
except Exception as e:
logger.error(f"Fuzzing failed: {e}", exc_info=True)
raise
@@ -0,0 +1,76 @@
name: atheris_fuzzing
version: "1.0.0"
vertical: python
description: "Fuzz Python code using Atheris with real-time monitoring. Automatically discovers and fuzzes TestOneInput() functions in user code."
author: "FuzzForge Team"
category: "fuzzing"
tags:
- "fuzzing"
- "atheris"
- "python"
- "coverage"
- "security"
supported_volume_modes:
- "ro"
default_volume_mode: "ro"
default_target_path: "/workspace"
requirements:
tools:
- "atheris_fuzzer"
resources:
memory: "512Mi"
cpu: "500m"
timeout: 3600
has_docker: false
default_parameters:
target_file: null
max_iterations: 100000
timeout_seconds: 300
parameters:
type: object
properties:
target_file:
type: string
description: "Python file with TestOneInput() function (auto-discovered if not specified)"
max_iterations:
type: integer
default: 100000
description: "Maximum fuzzing iterations"
timeout_seconds:
type: integer
default: 300
description: "Fuzzing timeout in seconds (5 minutes)"
output_schema:
type: object
properties:
findings:
type: array
description: "Crashes and vulnerabilities found during fuzzing"
items:
type: object
properties:
title:
type: string
severity:
type: string
category:
type: string
metadata:
type: object
summary:
type: object
description: "Fuzzing execution summary"
properties:
total_executions:
type: integer
crashes_found:
type: integer
execution_time:
type: number
@@ -0,0 +1,171 @@
"""
Atheris Fuzzing Workflow - Temporal Version
Fuzzes user-provided Python code using Atheris with real-time monitoring.
"""
from datetime import timedelta
from typing import Dict, Any, Optional
from temporalio import workflow
from temporalio.common import RetryPolicy
# Import for type hints (will be executed by worker)
with workflow.unsafe.imports_passed_through():
import logging
logger = logging.getLogger(__name__)
@workflow.defn
class AtherisFuzzingWorkflow:
"""
Fuzz Python code using Atheris.
User workflow:
1. User runs: ff workflow run atheris_fuzzing .
2. CLI uploads project to MinIO
3. Worker downloads project
4. Worker fuzzes TestOneInput() function
5. Crashes reported as findings
"""
@workflow.run
async def run(
self,
target_id: str, # MinIO UUID of uploaded user code
target_file: Optional[str] = None, # Optional: specific file to fuzz
max_iterations: int = 100000,
timeout_seconds: int = 300
) -> Dict[str, Any]:
"""
Main workflow execution.
Args:
target_id: UUID of the uploaded target in MinIO
target_file: Optional specific Python file with TestOneInput() (auto-discovered if None)
max_iterations: Maximum fuzzing iterations
timeout_seconds: Fuzzing timeout in seconds
Returns:
Dictionary containing findings and summary
"""
workflow_id = workflow.info().workflow_id
workflow.logger.info(
f"Starting AtherisFuzzingWorkflow "
f"(workflow_id={workflow_id}, target_id={target_id}, "
f"target_file={target_file or 'auto-discover'}, max_iterations={max_iterations}, "
f"timeout_seconds={timeout_seconds})"
)
results = {
"workflow_id": workflow_id,
"target_id": target_id,
"status": "running",
"steps": []
}
try:
# Step 1: Download user's project from MinIO
workflow.logger.info("Step 1: Downloading user code from MinIO")
target_path = await workflow.execute_activity(
"get_target",
target_id,
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["steps"].append({
"step": "download_target",
"status": "success",
"target_path": target_path
})
workflow.logger.info(f"✓ User code downloaded to: {target_path}")
# Step 2: Run Atheris fuzzing
workflow.logger.info("Step 2: Running Atheris fuzzing")
# Use defaults if parameters are None
max_iterations = max_iterations if max_iterations is not None else 100000
timeout_seconds = timeout_seconds if timeout_seconds is not None else 300
fuzz_config = {
"target_file": target_file,
"max_iterations": max_iterations,
"timeout_seconds": timeout_seconds
}
fuzz_results = await workflow.execute_activity(
"fuzz_with_atheris",
args=[target_path, fuzz_config],
start_to_close_timeout=timedelta(seconds=timeout_seconds + 60),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=1 # Fuzzing shouldn't retry
)
)
results["steps"].append({
"step": "fuzzing",
"status": "success",
"executions": fuzz_results.get("summary", {}).get("total_executions", 0),
"crashes": fuzz_results.get("summary", {}).get("crashes_found", 0)
})
workflow.logger.info(
f"✓ Fuzzing completed: "
f"{fuzz_results.get('summary', {}).get('total_executions', 0)} executions, "
f"{fuzz_results.get('summary', {}).get('crashes_found', 0)} crashes"
)
# Step 3: Upload results to MinIO
workflow.logger.info("Step 3: Uploading results")
try:
results_url = await workflow.execute_activity(
"upload_results",
args=[workflow_id, fuzz_results, "json"],
start_to_close_timeout=timedelta(minutes=2)
)
results["results_url"] = results_url
workflow.logger.info(f"✓ Results uploaded to: {results_url}")
except Exception as e:
workflow.logger.warning(f"Failed to upload results: {e}")
results["results_url"] = None
# Step 4: Cleanup cache
workflow.logger.info("Step 4: Cleaning up cache")
try:
await workflow.execute_activity(
"cleanup_cache",
target_path,
start_to_close_timeout=timedelta(minutes=1)
)
workflow.logger.info("✓ Cache cleaned up")
except Exception as e:
workflow.logger.warning(f"Cache cleanup failed: {e}")
# Mark workflow as successful
results["status"] = "success"
results["findings"] = fuzz_results.get("findings", [])
results["summary"] = fuzz_results.get("summary", {})
workflow.logger.info(
f"✓ Workflow completed successfully: {workflow_id} "
f"({results['summary'].get('crashes_found', 0)} crashes found)"
)
return results
except Exception as e:
workflow.logger.error(f"Workflow failed: {e}")
results["status"] = "error"
results["error"] = str(e)
results["steps"].append({
"step": "error",
"status": "failed",
"error": str(e)
})
raise
+62
View File
@@ -208,6 +208,66 @@ services:
cpus: '1'
memory: 512M
# ============================================================================
# Vertical Worker: Python Fuzzing
# ============================================================================
worker-python:
build:
context: ./workers/python
dockerfile: Dockerfile
container_name: fuzzforge-worker-python
depends_on:
postgresql:
condition: service_healthy
temporal:
condition: service_healthy
minio:
condition: service_healthy
environment:
# Temporal configuration
TEMPORAL_ADDRESS: temporal:7233
TEMPORAL_NAMESPACE: default
# Worker configuration
WORKER_VERTICAL: python
WORKER_TASK_QUEUE: python-queue
MAX_CONCURRENT_ACTIVITIES: 5
# Storage configuration (MinIO)
STORAGE_BACKEND: s3
S3_ENDPOINT: http://minio:9000
S3_ACCESS_KEY: fuzzforge
S3_SECRET_KEY: fuzzforge123
S3_BUCKET: targets
S3_REGION: us-east-1
S3_USE_SSL: "false"
# Cache configuration
CACHE_DIR: /cache
CACHE_MAX_SIZE: 10GB
CACHE_TTL: 7d
# Logging
LOG_LEVEL: INFO
PYTHONUNBUFFERED: 1
volumes:
# Mount workflow code (read-only) for dynamic discovery
- ./backend/toolbox:/app/toolbox:ro
# Worker cache for downloaded targets
- worker_python_cache:/cache
networks:
- fuzzforge-network
restart: unless-stopped
# Resource limits (lighter than rust)
deploy:
resources:
limits:
cpus: '1'
memory: 1G
reservations:
cpus: '0.5'
memory: 256M
# ============================================================================
# Vertical Worker: Android Security
# ============================================================================
@@ -329,6 +389,8 @@ volumes:
name: fuzzforge_minio_data
worker_rust_cache:
name: fuzzforge_worker_rust_cache
worker_python_cache:
name: fuzzforge_worker_python_cache
worker_android_cache:
name: fuzzforge_worker_android_cache
# Add more worker caches as you add verticals:
@@ -0,0 +1,18 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
*.egg-info/
dist/
build/
# FuzzForge
.fuzzforge/
# Atheris fuzzing artifacts
corpus/
crashes/
*.profraw
*.profdata
@@ -0,0 +1,137 @@
# Python Fuzzing Test - Waterfall Vulnerability
This project demonstrates a **stateful vulnerability** that Atheris can discover through fuzzing.
## Vulnerability Description
The `check_secret()` function in `main.py` validates input character by character against the secret string "FUZZINGLABS". This creates a **waterfall vulnerability** where:
1. State leaks through the global `progress` variable
2. Each correct character advances the progress counter
3. When all 11 characters are provided in order, the function crashes with `SystemError`
This pattern is analogous to:
- Timing attacks on password checkers
- Protocol state machines with sequential validation
- Multi-step authentication flows
## Files
- `main.py` - Main application with vulnerable `check_secret()` function
- `fuzz_target.py` - Atheris fuzzing harness (contains `TestOneInput()`)
- `README.md` - This file
## How to Fuzz
### Using FuzzForge CLI
```bash
# Initialize FuzzForge in this directory
cd test_projects/python_fuzz_waterfall/
ff init
# Run fuzzing workflow (uploads code to MinIO)
ff workflow run atheris_fuzzing .
# The workflow will:
# 1. Upload this directory to MinIO
# 2. Worker downloads and extracts the code
# 3. Worker discovers fuzz_target.py (has TestOneInput)
# 4. Worker runs Atheris fuzzing
# 5. Reports real-time stats every 5 seconds
# 6. Finds crash when "FUZZINGLABS" is discovered
```
### Using FuzzForge SDK
```python
from fuzzforge_sdk import FuzzForgeClient
from pathlib import Path
client = FuzzForgeClient(base_url="http://localhost:8000")
# Upload and run fuzzing
response = client.submit_workflow_with_upload(
workflow_name="atheris_fuzzing",
target_path=Path("./"),
parameters={
"max_iterations": 100000,
"timeout_seconds": 300
}
)
print(f"Workflow started: {response.run_id}")
# Wait for completion
final_status = client.wait_for_completion(response.run_id)
findings = client.get_run_findings(response.run_id)
for finding in findings:
print(f"Crash: {finding.title}")
print(f"Input: {finding.metadata.get('crash_input_hex')}")
```
### Standalone (Without FuzzForge)
```bash
# Install Atheris
pip install atheris
# Run fuzzing directly
python fuzz_target.py
```
## Expected Behavior
When fuzzing:
1. **Initial phase**: Random exploration, progress = 0
2. **Discovery phase**: Atheris finds 'F' (first char), progress = 1
3. **Incremental progress**: Finds 'U', then 'Z', etc.
4. **Crash**: When full "FUZZINGLABS" discovered, crashes with:
```
SystemError: SECRET COMPROMISED: FUZZINGLABS
```
## Monitoring
Watch real-time fuzzing stats:
```bash
docker logs fuzzforge-worker-python -f | grep LIVE_STATS
```
Output example:
```
INFO - LIVE_STATS - executions=1523 execs_per_sec=1523.0 crashes=0
INFO - LIVE_STATS - executions=7842 execs_per_sec=2104.2 crashes=0
INFO - LIVE_STATS - executions=15234 execs_per_sec=2167.0 crashes=1 ← Crash found!
```
## Vulnerability Details
**CVE**: N/A (demonstration vulnerability)
**CWE**: CWE-208 (Observable Timing Discrepancy)
**Severity**: Critical (in real systems)
**Fix**: Remove state-based checking or implement constant-time comparison:
```python
def check_secret_safe(input_data: bytes) -> bool:
"""Constant-time comparison"""
import hmac
return hmac.compare_digest(input_data, SECRET.encode())
```
## Adjusting Difficulty
If fuzzing finds the crash too quickly, extend the secret:
```python
# In main.py, change:
SECRET = "FUZZINGLABSSECURITYTESTING" # 26 characters instead of 11
```
## License
MIT License - This is a demonstration project for educational purposes.
@@ -0,0 +1,59 @@
"""
Atheris fuzzing target for the waterfall vulnerability.
This file is automatically discovered by FuzzForge's AtherisFuzzer module.
The fuzzer looks for files named: fuzz_*.py, *_fuzz.py, or fuzz_target.py
"""
import sys
import atheris
# Import the vulnerable function
from main import check_secret
def TestOneInput(data):
"""
Atheris fuzzing entry point.
This function is called by Atheris for each fuzzing iteration.
The fuzzer will try to find inputs that cause crashes.
Args:
data: Bytes to test (generated by Atheris)
The waterfall vulnerability means:
- Random inputs will mostly fail (progress = 0)
- Atheris will discover inputs that make progress
- Eventually Atheris will find the complete secret "FUZZINGLABS"
- When found, check_secret() will crash with SystemError
"""
try:
check_secret(bytes(data))
except SystemError:
# Let Atheris detect the crash
# This is the vulnerability we're trying to find!
raise
if __name__ == "__main__":
"""
Standalone fuzzing mode.
Run directly: python fuzz_target.py
"""
print("=" * 60)
print("Atheris Fuzzing - Waterfall Vulnerability")
print("=" * 60)
print("Fuzzing will try to discover the secret string...")
print("Watch for progress indicators: [DEBUG] Progress: X/11")
print()
print("Press Ctrl+C to stop fuzzing")
print("=" * 60)
print()
# Setup Atheris with command-line args
atheris.Setup(sys.argv, TestOneInput)
# Start fuzzing
atheris.Fuzz()
@@ -0,0 +1,96 @@
"""
Example application with a stateful vulnerability.
This simulates a password checking system that leaks state information
through a global progress variable - a classic waterfall vulnerability.
"""
# Global state - simulates session state
progress = 0
SECRET = "FUZZINGLABS" # 11 characters
def check_secret(input_data: bytes) -> bool:
"""
Vulnerable function: checks secret character by character.
This is a waterfall vulnerability - state leaks through the progress variable.
Real-world analogy:
- Timing attacks on password checkers
- Protocol state machines with sequential validation
- Multi-step authentication flows
Args:
input_data: Input bytes to check
Returns:
True if progress was made, False otherwise
Raises:
SystemError: When complete secret is discovered (vulnerability trigger)
"""
global progress
if len(input_data) > progress:
if input_data[progress] == ord(SECRET[progress]):
progress += 1
# Progress indicator (useful for monitoring during fuzzing)
if progress % 2 == 0: # Every 2 characters
print(f"[DEBUG] Progress: {progress}/{len(SECRET)} characters matched")
# VULNERABILITY: Crashes when complete secret found
if progress == len(SECRET):
raise SystemError(f"SECRET COMPROMISED: {SECRET}")
return True
else:
# Wrong character - reset progress
progress = 0
return False
return False
def reset_state():
"""Reset the global state (useful for testing)"""
global progress
progress = 0
if __name__ == "__main__":
"""Example usage showing the vulnerability"""
print("=" * 60)
print("Waterfall Vulnerability Demonstration")
print("=" * 60)
print(f"Secret: {SECRET}")
print(f"Secret length: {len(SECRET)} characters")
print()
# Test inputs showing progressive discovery
test_inputs = [
b"F", # First char correct
b"FU", # First two chars correct
b"FUZ", # First three chars correct
b"WRONG", # Wrong - resets progress
b"FUZZINGLABS", # Complete secret - triggers crash!
]
for test in test_inputs:
reset_state() # Start fresh for each test
print(f"Testing input: {test.decode(errors='ignore')!r}")
try:
result = check_secret(test)
print(f" Result: {result}, Progress: {progress}/{len(SECRET)}")
except SystemError as e:
print(f" 💥 CRASH: {e}")
print()
print("=" * 60)
print("To fuzz this vulnerability with FuzzForge:")
print(" ff init")
print(" ff workflow run atheris_fuzzing .")
print("=" * 60)
+47
View File
@@ -0,0 +1,47 @@
# FuzzForge Vertical Worker: Python Fuzzing
#
# Pre-installed tools for Python fuzzing and security analysis:
# - Python 3.11
# - Atheris (Python fuzzing)
# - Common Python security tools
# - Temporal worker
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
# Build essentials for Atheris
build-essential \
clang \
llvm \
# Development tools
git \
curl \
wget \
# Cleanup
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies for Temporal worker
COPY requirements.txt /tmp/requirements.txt
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt && \
rm /tmp/requirements.txt
# Create cache directory for downloaded targets
RUN mkdir -p /cache && chmod 755 /cache
# Copy worker entrypoint
COPY worker.py /app/worker.py
# Add toolbox to Python path (mounted at runtime)
ENV PYTHONPATH="/app:/app/toolbox:${PYTHONPATH}"
ENV PYTHONUNBUFFERED=1
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD python3 -c "import sys; sys.exit(0)"
# Run worker
CMD ["python3", "/app/worker.py"]
+12
View File
@@ -0,0 +1,12 @@
# Temporal worker dependencies
temporalio>=1.5.0
pydantic>=2.0.0
# Storage (MinIO/S3)
boto3>=1.34.0
# Configuration
pyyaml>=6.0.0
# Fuzzing
atheris>=2.3.0
+309
View File
@@ -0,0 +1,309 @@
"""
FuzzForge Vertical Worker: Rust/Native Security
This worker:
1. Discovers workflows for the 'rust' vertical from mounted toolbox
2. Dynamically imports and registers workflow classes
3. Connects to Temporal and processes tasks
4. Handles activities for target download/upload from MinIO
"""
import asyncio
import importlib
import inspect
import logging
import os
import sys
from pathlib import Path
from typing import List, Any
import yaml
from temporalio.client import Client
from temporalio.worker import Worker
# Add toolbox to path for workflow and activity imports
sys.path.insert(0, '/app/toolbox')
# Import common storage activities
from toolbox.common.storage_activities import (
get_target_activity,
cleanup_cache_activity,
upload_results_activity
)
# Configure logging
logging.basicConfig(
level=os.getenv('LOG_LEVEL', 'INFO'),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def discover_workflows(vertical: str) -> List[Any]:
"""
Discover workflows for this vertical from mounted toolbox.
Args:
vertical: The vertical name (e.g., 'rust', 'android', 'web')
Returns:
List of workflow classes decorated with @workflow.defn
"""
workflows = []
toolbox_path = Path("/app/toolbox/workflows")
if not toolbox_path.exists():
logger.warning(f"Toolbox path does not exist: {toolbox_path}")
return workflows
logger.info(f"Scanning for workflows in: {toolbox_path}")
for workflow_dir in toolbox_path.iterdir():
if not workflow_dir.is_dir():
continue
# Skip special directories
if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__':
continue
metadata_file = workflow_dir / "metadata.yaml"
if not metadata_file.exists():
logger.debug(f"No metadata.yaml in {workflow_dir.name}, skipping")
continue
try:
# Parse metadata
with open(metadata_file) as f:
metadata = yaml.safe_load(f)
# Check if workflow is for this vertical
workflow_vertical = metadata.get("vertical")
if workflow_vertical != vertical:
logger.debug(
f"Workflow {workflow_dir.name} is for vertical '{workflow_vertical}', "
f"not '{vertical}', skipping"
)
continue
# Check if workflow.py exists
workflow_file = workflow_dir / "workflow.py"
if not workflow_file.exists():
logger.warning(
f"Workflow {workflow_dir.name} has metadata but no workflow.py, skipping"
)
continue
# Dynamically import workflow module
module_name = f"toolbox.workflows.{workflow_dir.name}.workflow"
logger.info(f"Importing workflow module: {module_name}")
try:
module = importlib.import_module(module_name)
except Exception as e:
logger.error(
f"Failed to import workflow module {module_name}: {e}",
exc_info=True
)
continue
# Find @workflow.defn decorated classes
found_workflows = False
for name, obj in inspect.getmembers(module, inspect.isclass):
# Check if class has Temporal workflow definition
if hasattr(obj, '__temporal_workflow_definition'):
workflows.append(obj)
found_workflows = True
logger.info(
f"✓ Discovered workflow: {name} from {workflow_dir.name} "
f"(vertical: {vertical})"
)
if not found_workflows:
logger.warning(
f"Workflow {workflow_dir.name} has no @workflow.defn decorated classes"
)
except Exception as e:
logger.error(
f"Error processing workflow {workflow_dir.name}: {e}",
exc_info=True
)
continue
logger.info(f"Discovered {len(workflows)} workflows for vertical '{vertical}'")
return workflows
async def discover_activities(workflows_dir: Path) -> List[Any]:
"""
Discover activities from workflow directories.
Looks for activities.py files alongside workflow.py in each workflow directory.
Args:
workflows_dir: Path to workflows directory
Returns:
List of activity functions decorated with @activity.defn
"""
activities = []
if not workflows_dir.exists():
logger.warning(f"Workflows directory does not exist: {workflows_dir}")
return activities
logger.info(f"Scanning for workflow activities in: {workflows_dir}")
for workflow_dir in workflows_dir.iterdir():
if not workflow_dir.is_dir():
continue
# Skip special directories
if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__':
continue
# Check if activities.py exists
activities_file = workflow_dir / "activities.py"
if not activities_file.exists():
logger.debug(f"No activities.py in {workflow_dir.name}, skipping")
continue
try:
# Dynamically import activities module
module_name = f"toolbox.workflows.{workflow_dir.name}.activities"
logger.info(f"Importing activities module: {module_name}")
try:
module = importlib.import_module(module_name)
except Exception as e:
logger.error(
f"Failed to import activities module {module_name}: {e}",
exc_info=True
)
continue
# Find @activity.defn decorated functions
found_activities = False
for name, obj in inspect.getmembers(module, inspect.isfunction):
# Check if function has Temporal activity definition
if hasattr(obj, '__temporal_activity_definition'):
activities.append(obj)
found_activities = True
logger.info(
f"✓ Discovered activity: {name} from {workflow_dir.name}"
)
if not found_activities:
logger.warning(
f"Workflow {workflow_dir.name} has activities.py but no @activity.defn decorated functions"
)
except Exception as e:
logger.error(
f"Error processing activities from {workflow_dir.name}: {e}",
exc_info=True
)
continue
logger.info(f"Discovered {len(activities)} workflow-specific activities")
return activities
async def main():
"""Main worker entry point"""
# Get configuration from environment
vertical = os.getenv("WORKER_VERTICAL", "rust")
temporal_address = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
temporal_namespace = os.getenv("TEMPORAL_NAMESPACE", "default")
task_queue = os.getenv("WORKER_TASK_QUEUE", f"{vertical}-queue")
max_concurrent_activities = int(os.getenv("MAX_CONCURRENT_ACTIVITIES", "5"))
logger.info("=" * 60)
logger.info(f"FuzzForge Vertical Worker: {vertical}")
logger.info("=" * 60)
logger.info(f"Temporal Address: {temporal_address}")
logger.info(f"Temporal Namespace: {temporal_namespace}")
logger.info(f"Task Queue: {task_queue}")
logger.info(f"Max Concurrent Activities: {max_concurrent_activities}")
logger.info("=" * 60)
# Discover workflows for this vertical
logger.info(f"Discovering workflows for vertical: {vertical}")
workflows = await discover_workflows(vertical)
if not workflows:
logger.error(f"No workflows found for vertical: {vertical}")
logger.error("Worker cannot start without workflows. Exiting...")
sys.exit(1)
# Discover activities from workflow directories
logger.info("Discovering workflow-specific activities...")
workflows_dir = Path("/app/toolbox/workflows")
workflow_activities = await discover_activities(workflows_dir)
# Combine common storage activities with workflow-specific activities
activities = [
get_target_activity,
cleanup_cache_activity,
upload_results_activity
] + workflow_activities
logger.info(
f"Total activities registered: {len(activities)} "
f"(3 common + {len(workflow_activities)} workflow-specific)"
)
# Connect to Temporal
logger.info(f"Connecting to Temporal at {temporal_address}...")
try:
client = await Client.connect(
temporal_address,
namespace=temporal_namespace
)
logger.info("✓ Connected to Temporal successfully")
except Exception as e:
logger.error(f"Failed to connect to Temporal: {e}", exc_info=True)
sys.exit(1)
# Create worker with discovered workflows and activities
logger.info(f"Creating worker on task queue: {task_queue}")
try:
worker = Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
max_concurrent_activities=max_concurrent_activities
)
logger.info("✓ Worker created successfully")
except Exception as e:
logger.error(f"Failed to create worker: {e}", exc_info=True)
sys.exit(1)
# Start worker
logger.info("=" * 60)
logger.info(f"🚀 Worker started for vertical '{vertical}'")
logger.info(f"📦 Registered {len(workflows)} workflows")
logger.info(f"⚙️ Registered {len(activities)} activities")
logger.info(f"📨 Listening on task queue: {task_queue}")
logger.info("=" * 60)
logger.info("Worker is ready to process tasks...")
try:
await worker.run()
except KeyboardInterrupt:
logger.info("Shutting down worker (keyboard interrupt)...")
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Worker stopped")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)