From fe50d4ef727921013569f43e19d3298993c2f859 Mon Sep 17 00:00:00 2001 From: Tanguy Duhamel Date: Thu, 2 Oct 2025 11:06:34 +0200 Subject: [PATCH] feat: Add Python fuzzing vertical with Atheris integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit implements a complete Python fuzzing workflow using Atheris: ## Python Worker (workers/python/) - Dockerfile with Python 3.11, Atheris, and build tools - Generic worker.py for dynamic workflow discovery - requirements.txt with temporalio, boto3, atheris dependencies - Added to docker-compose.temporal.yaml with dedicated cache volume ## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/) - Reusable module extending BaseModule - Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py) - Recursive search to find targets in nested directories - Dynamically loads TestOneInput() function - Configurable max_iterations and timeout - Real-time stats callback support for live monitoring - Returns findings as ModuleFinding objects ## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/) - Temporal workflow for orchestrating fuzzing - Downloads user code from MinIO - Executes AtherisFuzzer module - Uploads results to MinIO - Cleans up cache after execution - metadata.yaml with vertical: python for routing ## Test Project (test_projects/python_fuzz_waterfall/) - Demonstrates stateful waterfall vulnerability - main.py with check_secret() that leaks progress - fuzz_target.py with Atheris TestOneInput() harness - Complete README with usage instructions ## Backend Fixes - Fixed parameter merging in REST API endpoints (workflows.py) - Changed workflow parameter passing from positional args to kwargs (manager.py) - Default parameters now properly merged with user parameters ## Testing ✅ Worker discovered AtherisFuzzingWorkflow ✅ Workflow executed end-to-end successfully ✅ Fuzz target auto-discovered in nested directories ✅ Atheris ran 100,000 iterations ✅ Results uploaded and cache cleaned --- backend/src/api/workflows.py | 195 ++++++++- backend/src/temporal/manager.py | 14 +- backend/toolbox/modules/fuzzer/__init__.py | 9 + .../toolbox/modules/fuzzer/atheris_fuzzer.py | 377 ++++++++++++++++++ .../workflows/atheris_fuzzing/__init__.py | 9 + .../workflows/atheris_fuzzing/activities.py | 90 +++++ .../workflows/atheris_fuzzing/metadata.yaml | 76 ++++ .../workflows/atheris_fuzzing/workflow.py | 171 ++++++++ docker-compose.temporal.yaml | 62 +++ .../python_fuzz_waterfall/.gitignore | 18 + test_projects/python_fuzz_waterfall/README.md | 137 +++++++ .../python_fuzz_waterfall/fuzz_target.py | 59 +++ test_projects/python_fuzz_waterfall/main.py | 96 +++++ workers/python/Dockerfile | 47 +++ workers/python/requirements.txt | 12 + workers/python/worker.py | 309 ++++++++++++++ 16 files changed, 1668 insertions(+), 13 deletions(-) create mode 100644 backend/toolbox/modules/fuzzer/__init__.py create mode 100644 backend/toolbox/modules/fuzzer/atheris_fuzzer.py create mode 100644 backend/toolbox/workflows/atheris_fuzzing/__init__.py create mode 100644 backend/toolbox/workflows/atheris_fuzzing/activities.py create mode 100644 backend/toolbox/workflows/atheris_fuzzing/metadata.yaml create mode 100644 backend/toolbox/workflows/atheris_fuzzing/workflow.py create mode 100644 test_projects/python_fuzz_waterfall/.gitignore create mode 100644 test_projects/python_fuzz_waterfall/README.md create mode 100644 test_projects/python_fuzz_waterfall/fuzz_target.py create mode 100644 test_projects/python_fuzz_waterfall/main.py create mode 100644 workers/python/Dockerfile create mode 100644 workers/python/requirements.txt create mode 100644 workers/python/worker.py diff --git a/backend/src/api/workflows.py b/backend/src/api/workflows.py index 608a4f4..2ac8696 100644 --- a/backend/src/api/workflows.py +++ b/backend/src/api/workflows.py @@ -15,8 +15,10 @@ API endpoints for workflow management with enhanced error handling import logging import traceback +import tempfile +import shutil from typing import List, Dict, Any, Optional -from fastapi import APIRouter, HTTPException, Depends +from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form from pathlib import Path from src.models.findings import ( @@ -29,6 +31,16 @@ from src.temporal.discovery import WorkflowDiscovery logger = logging.getLogger(__name__) +# Configuration for file uploads +MAX_UPLOAD_SIZE = 10 * 1024 * 1024 * 1024 # 10 GB +ALLOWED_CONTENT_TYPES = [ + "application/gzip", + "application/x-gzip", + "application/x-tar", + "application/x-compressed-tar", + "application/octet-stream", # Generic binary +] + router = APIRouter(prefix="/workflows", tags=["workflows"]) @@ -209,8 +221,11 @@ async def submit_workflow( metadata={"workflow": workflow_name} ) - # Prepare workflow parameters - workflow_params = submission.parameters or {} + # Merge default parameters with user parameters + metadata = workflow_info.metadata or {} + defaults = metadata.get("default_parameters", {}) + user_params = submission.parameters or {} + workflow_params = {**defaults, **user_params} # Start workflow execution handle = await temporal_mgr.run_workflow( @@ -321,6 +336,180 @@ async def submit_workflow( ) +@router.post("/{workflow_name}/upload-and-submit", response_model=RunSubmissionResponse) +async def upload_and_submit_workflow( + workflow_name: str, + file: UploadFile = File(..., description="Target file or tarball to analyze"), + parameters: Optional[str] = Form(None, description="JSON-encoded workflow parameters"), + volume_mode: str = Form("ro", description="Volume mount mode (ro/rw)"), + timeout: Optional[int] = Form(None, description="Timeout in seconds"), + temporal_mgr=Depends(get_temporal_manager) +) -> RunSubmissionResponse: + """ + Upload a target file/tarball and submit workflow for execution. + + This endpoint accepts multipart/form-data uploads and is the recommended + way to submit workflows from remote CLI clients. + + Args: + workflow_name: Name of the workflow to execute + file: Target file or tarball (compressed directory) + parameters: JSON string of workflow parameters (optional) + volume_mode: Volume mount mode - "ro" (read-only) or "rw" (read-write) + timeout: Execution timeout in seconds (optional) + + Returns: + Run submission response with run_id and initial status + + Raises: + HTTPException: 404 if workflow not found, 400 for invalid parameters, + 413 if file too large + """ + if workflow_name not in temporal_mgr.workflows: + available_workflows = list(temporal_mgr.workflows.keys()) + error_response = create_structured_error_response( + error_type="WorkflowNotFound", + message=f"Workflow '{workflow_name}' not found", + workflow_name=workflow_name, + suggestions=[ + f"Available workflows: {', '.join(available_workflows)}", + "Use GET /workflows/ to see all available workflows" + ] + ) + raise HTTPException(status_code=404, detail=error_response) + + temp_file_path = None + + try: + # Validate file size + file_size = 0 + chunk_size = 1024 * 1024 # 1MB chunks + + # Create temporary file + temp_fd, temp_file_path = tempfile.mkstemp(suffix=".tar.gz") + + logger.info(f"Receiving file upload for workflow '{workflow_name}': {file.filename}") + + # Stream file to disk + with open(temp_fd, 'wb') as temp_file: + while True: + chunk = await file.read(chunk_size) + if not chunk: + break + + file_size += len(chunk) + + # Check size limit + if file_size > MAX_UPLOAD_SIZE: + raise HTTPException( + status_code=413, + detail=create_structured_error_response( + error_type="FileTooLarge", + message=f"File size exceeds maximum allowed size of {MAX_UPLOAD_SIZE / (1024**3):.1f} GB", + workflow_name=workflow_name, + suggestions=[ + "Reduce the size of your target directory", + "Exclude unnecessary files (build artifacts, dependencies, etc.)", + "Consider splitting into smaller analysis targets" + ] + ) + ) + + temp_file.write(chunk) + + logger.info(f"Received file: {file_size / (1024**2):.2f} MB") + + # Parse parameters + workflow_params = {} + if parameters: + try: + import json + workflow_params = json.loads(parameters) + if not isinstance(workflow_params, dict): + raise ValueError("Parameters must be a JSON object") + except (json.JSONDecodeError, ValueError) as e: + raise HTTPException( + status_code=400, + detail=create_structured_error_response( + error_type="InvalidParameters", + message=f"Invalid parameters JSON: {e}", + workflow_name=workflow_name, + suggestions=["Ensure parameters is valid JSON object"] + ) + ) + + # Upload to MinIO + target_id = await temporal_mgr.upload_target( + file_path=Path(temp_file_path), + user_id="api-user", + metadata={ + "workflow": workflow_name, + "original_filename": file.filename, + "upload_method": "multipart" + } + ) + + logger.info(f"Uploaded to MinIO with target_id: {target_id}") + + # Merge default parameters with user parameters + workflow_info = temporal_mgr.workflows.get(workflow_name) + metadata = workflow_info.metadata or {} + defaults = metadata.get("default_parameters", {}) + workflow_params = {**defaults, **workflow_params} + + # Start workflow execution + handle = await temporal_mgr.run_workflow( + workflow_name=workflow_name, + target_id=target_id, + workflow_params=workflow_params + ) + + run_id = handle.id + + # Initialize fuzzing tracking if needed + workflow_info = temporal_mgr.workflows.get(workflow_name, {}) + workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else [] + if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower(): + from src.api.fuzzing import initialize_fuzzing_tracking + initialize_fuzzing_tracking(run_id, workflow_name) + + return RunSubmissionResponse( + run_id=run_id, + status="RUNNING", + workflow=workflow_name, + message=f"Workflow '{workflow_name}' submitted successfully with uploaded target" + ) + + except HTTPException: + raise + except Exception as e: + logger.error(f"Failed to upload and submit workflow '{workflow_name}': {e}") + logger.error(f"Traceback: {traceback.format_exc()}") + + error_response = create_structured_error_response( + error_type="WorkflowSubmissionError", + message=f"Failed to process upload and submit workflow: {str(e)}", + workflow_name=workflow_name, + suggestions=[ + "Check if the uploaded file is a valid tarball", + "Verify MinIO storage is accessible", + "Check backend logs for detailed error information", + "Ensure Temporal workers are running" + ] + ) + + raise HTTPException(status_code=500, detail=error_response) + + finally: + # Cleanup temporary file + if temp_file_path and Path(temp_file_path).exists(): + try: + Path(temp_file_path).unlink() + logger.debug(f"Cleaned up temp file: {temp_file_path}") + except Exception as e: + logger.warning(f"Failed to cleanup temp file {temp_file_path}: {e}") + + @router.get("/{workflow_name}/parameters") async def get_workflow_parameters( workflow_name: str, diff --git a/backend/src/temporal/manager.py b/backend/src/temporal/manager.py index cb3cfca..ea1a7d4 100644 --- a/backend/src/temporal/manager.py +++ b/backend/src/temporal/manager.py @@ -179,15 +179,8 @@ class TemporalManager: if not workflow_id: workflow_id = f"{workflow_name}-{str(uuid4())[:8]}" - # Prepare workflow input arguments in order - # For security_assessment: (target_id, scanner_config, analyzer_config, reporter_config) + # Prepare workflow input - target_id as first arg, rest as kwargs workflow_params = workflow_params or {} - workflow_args = [ - target_id, - workflow_params.get("scanner_config"), - workflow_params.get("analyzer_config"), - workflow_params.get("reporter_config") - ] # Determine task queue from workflow vertical vertical = workflow_info.metadata.get("vertical", "default") @@ -199,10 +192,11 @@ class TemporalManager: ) try: - # Start workflow execution with positional arguments + # Start workflow execution with target_id + keyword arguments handle = await self.client.start_workflow( workflow=workflow_info.workflow_type, # Workflow class name - args=workflow_args, # Positional arguments + arg=target_id, # First positional argument + kwargs=workflow_params, # Rest as keyword arguments id=workflow_id, task_queue=task_queue, retry_policy=RetryPolicy( diff --git a/backend/toolbox/modules/fuzzer/__init__.py b/backend/toolbox/modules/fuzzer/__init__.py new file mode 100644 index 0000000..eeb560b --- /dev/null +++ b/backend/toolbox/modules/fuzzer/__init__.py @@ -0,0 +1,9 @@ +""" +Fuzzing modules for FuzzForge + +This package contains fuzzing modules for different fuzzing engines. +""" + +from .atheris_fuzzer import AtherisFuzzer + +__all__ = ["AtherisFuzzer"] diff --git a/backend/toolbox/modules/fuzzer/atheris_fuzzer.py b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py new file mode 100644 index 0000000..732556d --- /dev/null +++ b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py @@ -0,0 +1,377 @@ +""" +Atheris Fuzzer Module + +Reusable module for fuzzing Python code using Atheris. +Discovers and fuzzes user-provided Python targets with TestOneInput() function. +""" + +import asyncio +import base64 +import importlib.util +import logging +import sys +import time +import traceback +from pathlib import Path +from typing import Dict, Any, List, Optional, Callable +import uuid + +from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + +logger = logging.getLogger(__name__) + + +class AtherisFuzzer(BaseModule): + """ + Atheris fuzzing module - discovers and fuzzes Python code. + + This module can be used by any workflow to fuzz Python targets. + """ + + def __init__(self): + super().__init__() + self.crashes = [] + self.total_executions = 0 + self.start_time = None + self.last_stats_time = 0 + + def get_metadata(self) -> ModuleMetadata: + """Return module metadata""" + return ModuleMetadata( + name="atheris_fuzzer", + version="1.0.0", + description="Python fuzzing using Atheris - discovers and fuzzes TestOneInput() functions", + author="FuzzForge Team", + category="fuzzer", + tags=["fuzzing", "atheris", "python", "coverage"], + input_schema={ + "type": "object", + "properties": { + "target_file": { + "type": "string", + "description": "Python file with TestOneInput() function (auto-discovered if not specified)" + }, + "max_iterations": { + "type": "integer", + "description": "Maximum fuzzing iterations", + "default": 100000 + }, + "timeout_seconds": { + "type": "integer", + "description": "Fuzzing timeout in seconds", + "default": 300 + }, + "stats_callback": { + "description": "Optional callback for real-time statistics" + } + } + }, + requires_workspace=True + ) + + def validate_config(self, config: Dict[str, Any]) -> bool: + """Validate fuzzing configuration""" + max_iterations = config.get("max_iterations", 100000) + if not isinstance(max_iterations, int) or max_iterations <= 0: + raise ValueError(f"max_iterations must be positive integer, got: {max_iterations}") + + timeout = config.get("timeout_seconds", 300) + if not isinstance(timeout, int) or timeout <= 0: + raise ValueError(f"timeout_seconds must be positive integer, got: {timeout}") + + return True + + async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult: + """ + Execute Atheris fuzzing on user code. + + Args: + config: Fuzzing configuration + workspace: Path to user's uploaded code + + Returns: + ModuleResult with crash findings + """ + self.start_timer() + self.start_time = time.time() + + # Validate configuration + self.validate_config(config) + self.validate_workspace(workspace) + + # Extract config + target_file = config.get("target_file") + max_iterations = config.get("max_iterations", 100000) + timeout_seconds = config.get("timeout_seconds", 300) + stats_callback = config.get("stats_callback") + + logger.info( + f"Starting Atheris fuzzing (max_iterations={max_iterations}, " + f"timeout={timeout_seconds}s, target={target_file or 'auto-discover'})" + ) + + try: + # Step 1: Discover or load target + target_path = self._discover_target(workspace, target_file) + logger.info(f"Using fuzz target: {target_path}") + + # Step 2: Load target module + test_one_input = self._load_target_module(target_path) + logger.info(f"Loaded TestOneInput function from {target_path}") + + # Step 3: Run fuzzing + await self._run_fuzzing( + test_one_input=test_one_input, + target_path=target_path, + max_iterations=max_iterations, + timeout_seconds=timeout_seconds, + stats_callback=stats_callback + ) + + # Step 4: Generate findings from crashes + findings = self._generate_findings(target_path) + + logger.info( + f"Fuzzing completed: {self.total_executions} executions, " + f"{len(self.crashes)} crashes found" + ) + + return self.create_result( + findings=findings, + status="success", + summary={ + "total_executions": self.total_executions, + "crashes_found": len(self.crashes), + "execution_time": self.get_execution_time(), + "target_file": str(target_path.relative_to(workspace)) + }, + metadata={ + "max_iterations": max_iterations, + "timeout_seconds": timeout_seconds + } + ) + + except Exception as e: + logger.error(f"Fuzzing failed: {e}", exc_info=True) + return self.create_result( + findings=[], + status="failed", + error=str(e) + ) + + def _discover_target(self, workspace: Path, target_file: Optional[str]) -> Path: + """ + Discover fuzz target in workspace. + + Args: + workspace: Path to workspace + target_file: Explicit target file or None for auto-discovery + + Returns: + Path to target file + """ + if target_file: + # Use specified target + target_path = workspace / target_file + if not target_path.exists(): + raise FileNotFoundError(f"Target file not found: {target_file}") + return target_path + + # Auto-discover: look for fuzz_*.py or *_fuzz.py + logger.info("Auto-discovering fuzz targets...") + + candidates = [] + # Use rglob for recursive search (searches all subdirectories) + for pattern in ["fuzz_*.py", "*_fuzz.py", "fuzz_target.py"]: + matches = list(workspace.rglob(pattern)) + candidates.extend(matches) + + if not candidates: + raise FileNotFoundError( + "No fuzz targets found. Expected files matching: fuzz_*.py, *_fuzz.py, or fuzz_target.py" + ) + + # Use first candidate + target = candidates[0] + if len(candidates) > 1: + logger.warning( + f"Multiple fuzz targets found: {[str(c) for c in candidates]}. " + f"Using: {target.name}" + ) + + return target + + def _load_target_module(self, target_path: Path) -> Callable: + """ + Load target module and get TestOneInput function. + + Args: + target_path: Path to Python file with TestOneInput + + Returns: + TestOneInput function + """ + # Add target directory to sys.path + target_dir = target_path.parent + if str(target_dir) not in sys.path: + sys.path.insert(0, str(target_dir)) + + # Load module dynamically + module_name = target_path.stem + spec = importlib.util.spec_from_file_location(module_name, target_path) + if spec is None or spec.loader is None: + raise ImportError(f"Cannot load module from {target_path}") + + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + + # Get TestOneInput function + if not hasattr(module, "TestOneInput"): + raise AttributeError( + f"Module {module_name} does not have TestOneInput() function. " + "Atheris requires a TestOneInput(data: bytes) function." + ) + + return module.TestOneInput + + async def _run_fuzzing( + self, + test_one_input: Callable, + target_path: Path, + max_iterations: int, + timeout_seconds: int, + stats_callback: Optional[Callable] = None + ): + """ + Run Atheris fuzzing with real-time monitoring. + + Args: + test_one_input: TestOneInput function to fuzz + target_path: Path to target file + max_iterations: Max iterations + timeout_seconds: Timeout in seconds + stats_callback: Optional callback for stats + """ + import atheris + + self.crashes = [] + self.total_executions = 0 + corpus_size = 0 + + # Wrapper to track executions and crashes + def fuzz_wrapper(data): + self.total_executions += 1 + + try: + test_one_input(data) + except Exception as e: + # Capture crash + crash_info = { + "input": data, + "exception": e, + "exception_type": type(e).__name__, + "stack_trace": traceback.format_exc(), + "execution": self.total_executions + } + self.crashes.append(crash_info) + logger.warning( + f"Crash found (execution {self.total_executions}): " + f"{type(e).__name__}: {str(e)}" + ) + # Re-raise so Atheris detects it + raise + + # Configure Atheris + atheris.Setup( + [ + "atheris_fuzzer", + f"-runs={max_iterations}", + f"-max_total_time={timeout_seconds}", + "-print_final_stats=1" + ], + fuzz_wrapper + ) + + logger.info(f"Starting Atheris fuzzer (max_runs={max_iterations}, timeout={timeout_seconds}s)...") + + # Run fuzzing in a separate task with monitoring + async def monitor_stats(): + """Monitor and report stats every 5 seconds""" + while True: + await asyncio.sleep(5) + + if stats_callback: + elapsed = time.time() - self.start_time + execs_per_sec = self.total_executions / elapsed if elapsed > 0 else 0 + + await stats_callback({ + "total_execs": self.total_executions, + "execs_per_sec": execs_per_sec, + "crashes": len(self.crashes), + "corpus_size": corpus_size, + "coverage": 0.0, # Atheris doesn't expose coverage easily + "elapsed_time": int(elapsed) + }) + + # Start monitoring task + monitor_task = None + if stats_callback: + monitor_task = asyncio.create_task(monitor_stats()) + + try: + # Run fuzzing (blocking) + atheris.Fuzz() + except SystemExit: + # Atheris exits when done + pass + finally: + # Stop monitoring + if monitor_task: + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + + def _generate_findings(self, target_path: Path) -> List[ModuleFinding]: + """ + Generate ModuleFinding objects from crashes. + + Args: + target_path: Path to target file + + Returns: + List of findings + """ + findings = [] + + for crash in self.crashes: + # Encode crash input for storage + crash_input_b64 = base64.b64encode(crash["input"]).decode() + + finding = self.create_finding( + title=f"Crash: {crash['exception_type']}", + description=( + f"Atheris found crash during fuzzing:\n" + f"Exception: {crash['exception_type']}\n" + f"Message: {str(crash['exception'])}\n" + f"Execution: {crash['execution']}" + ), + severity="critical", + category="crash", + file_path=str(target_path), + metadata={ + "crash_input_base64": crash_input_b64, + "crash_input_hex": crash["input"].hex(), + "exception_type": crash["exception_type"], + "stack_trace": crash["stack_trace"], + "execution_number": crash["execution"] + }, + recommendation=( + "Review the crash stack trace and input to identify the vulnerability. " + "The crash input is provided in base64 and hex formats for reproduction." + ) + ) + findings.append(finding) + + return findings diff --git a/backend/toolbox/workflows/atheris_fuzzing/__init__.py b/backend/toolbox/workflows/atheris_fuzzing/__init__.py new file mode 100644 index 0000000..38b1648 --- /dev/null +++ b/backend/toolbox/workflows/atheris_fuzzing/__init__.py @@ -0,0 +1,9 @@ +""" +Atheris Fuzzing Workflow + +Fuzzes user-provided Python code using Atheris. +""" + +from .workflow import AtherisFuzzingWorkflow + +__all__ = ["AtherisFuzzingWorkflow"] diff --git a/backend/toolbox/workflows/atheris_fuzzing/activities.py b/backend/toolbox/workflows/atheris_fuzzing/activities.py new file mode 100644 index 0000000..f2fc3c4 --- /dev/null +++ b/backend/toolbox/workflows/atheris_fuzzing/activities.py @@ -0,0 +1,90 @@ +""" +Atheris Fuzzing Workflow Activities + +Activities specific to the Atheris fuzzing workflow. +""" + +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import Dict, Any + +from temporalio import activity + +# Configure logging +logger = logging.getLogger(__name__) + +# Add toolbox to path for module imports +sys.path.insert(0, '/app/toolbox') + + +@activity.defn(name="fuzz_with_atheris") +async def fuzz_activity(workspace_path: str, config: dict) -> dict: + """ + Fuzzing activity using the AtherisFuzzer module on user code. + + This activity: + 1. Imports the reusable AtherisFuzzer module + 2. Sets up real-time stats callback + 3. Executes fuzzing on user's TestOneInput() function + 4. Returns findings as ModuleResult + + Args: + workspace_path: Path to the workspace directory (user's uploaded code) + config: Fuzzer configuration (target_file, max_iterations, timeout_seconds) + + Returns: + Fuzzer results dictionary (findings, summary, metadata) + """ + logger.info(f"Activity: fuzz_with_atheris (workspace={workspace_path})") + + try: + # Import reusable AtherisFuzzer module + from modules.fuzzer import AtherisFuzzer + + workspace = Path(workspace_path) + if not workspace.exists(): + raise FileNotFoundError(f"Workspace not found: {workspace_path}") + + # Get activity info for real-time stats + info = activity.info() + run_id = info.workflow_id + + # Define stats callback for real-time monitoring + async def stats_callback(stats_data: Dict[str, Any]): + """Callback for live fuzzing statistics""" + try: + logger.info("LIVE_STATS", extra={ + "stats_type": "fuzzing_live_update", + "workflow_type": "atheris_fuzzing", + "run_id": run_id, + "executions": stats_data.get("total_execs", 0), + "executions_per_sec": stats_data.get("execs_per_sec", 0.0), + "crashes": stats_data.get("crashes", 0), + "corpus_size": stats_data.get("corpus_size", 0), + "coverage": stats_data.get("coverage", 0.0), + "elapsed_time": stats_data.get("elapsed_time", 0), + "timestamp": datetime.utcnow().isoformat() + }) + except Exception as e: + logger.warning(f"Error in stats callback: {e}") + + # Add stats callback to config + config["stats_callback"] = stats_callback + + # Execute the fuzzer module + fuzzer = AtherisFuzzer() + result = await fuzzer.execute(config, workspace) + + logger.info( + f"✓ Fuzzing completed: " + f"{result.summary.get('total_executions', 0)} executions, " + f"{result.summary.get('crashes_found', 0)} crashes" + ) + + return result.dict() + + except Exception as e: + logger.error(f"Fuzzing failed: {e}", exc_info=True) + raise diff --git a/backend/toolbox/workflows/atheris_fuzzing/metadata.yaml b/backend/toolbox/workflows/atheris_fuzzing/metadata.yaml new file mode 100644 index 0000000..1d725e6 --- /dev/null +++ b/backend/toolbox/workflows/atheris_fuzzing/metadata.yaml @@ -0,0 +1,76 @@ +name: atheris_fuzzing +version: "1.0.0" +vertical: python +description: "Fuzz Python code using Atheris with real-time monitoring. Automatically discovers and fuzzes TestOneInput() functions in user code." +author: "FuzzForge Team" +category: "fuzzing" +tags: + - "fuzzing" + - "atheris" + - "python" + - "coverage" + - "security" + +supported_volume_modes: + - "ro" + +default_volume_mode: "ro" +default_target_path: "/workspace" + +requirements: + tools: + - "atheris_fuzzer" + resources: + memory: "512Mi" + cpu: "500m" + timeout: 3600 + +has_docker: false + +default_parameters: + target_file: null + max_iterations: 100000 + timeout_seconds: 300 + +parameters: + type: object + properties: + target_file: + type: string + description: "Python file with TestOneInput() function (auto-discovered if not specified)" + max_iterations: + type: integer + default: 100000 + description: "Maximum fuzzing iterations" + timeout_seconds: + type: integer + default: 300 + description: "Fuzzing timeout in seconds (5 minutes)" + +output_schema: + type: object + properties: + findings: + type: array + description: "Crashes and vulnerabilities found during fuzzing" + items: + type: object + properties: + title: + type: string + severity: + type: string + category: + type: string + metadata: + type: object + summary: + type: object + description: "Fuzzing execution summary" + properties: + total_executions: + type: integer + crashes_found: + type: integer + execution_time: + type: number diff --git a/backend/toolbox/workflows/atheris_fuzzing/workflow.py b/backend/toolbox/workflows/atheris_fuzzing/workflow.py new file mode 100644 index 0000000..99e5269 --- /dev/null +++ b/backend/toolbox/workflows/atheris_fuzzing/workflow.py @@ -0,0 +1,171 @@ +""" +Atheris Fuzzing Workflow - Temporal Version + +Fuzzes user-provided Python code using Atheris with real-time monitoring. +""" + +from datetime import timedelta +from typing import Dict, Any, Optional + +from temporalio import workflow +from temporalio.common import RetryPolicy + +# Import for type hints (will be executed by worker) +with workflow.unsafe.imports_passed_through(): + import logging + +logger = logging.getLogger(__name__) + + +@workflow.defn +class AtherisFuzzingWorkflow: + """ + Fuzz Python code using Atheris. + + User workflow: + 1. User runs: ff workflow run atheris_fuzzing . + 2. CLI uploads project to MinIO + 3. Worker downloads project + 4. Worker fuzzes TestOneInput() function + 5. Crashes reported as findings + """ + + @workflow.run + async def run( + self, + target_id: str, # MinIO UUID of uploaded user code + target_file: Optional[str] = None, # Optional: specific file to fuzz + max_iterations: int = 100000, + timeout_seconds: int = 300 + ) -> Dict[str, Any]: + """ + Main workflow execution. + + Args: + target_id: UUID of the uploaded target in MinIO + target_file: Optional specific Python file with TestOneInput() (auto-discovered if None) + max_iterations: Maximum fuzzing iterations + timeout_seconds: Fuzzing timeout in seconds + + Returns: + Dictionary containing findings and summary + """ + workflow_id = workflow.info().workflow_id + + workflow.logger.info( + f"Starting AtherisFuzzingWorkflow " + f"(workflow_id={workflow_id}, target_id={target_id}, " + f"target_file={target_file or 'auto-discover'}, max_iterations={max_iterations}, " + f"timeout_seconds={timeout_seconds})" + ) + + results = { + "workflow_id": workflow_id, + "target_id": target_id, + "status": "running", + "steps": [] + } + + try: + # Step 1: Download user's project from MinIO + workflow.logger.info("Step 1: Downloading user code from MinIO") + target_path = await workflow.execute_activity( + "get_target", + target_id, + start_to_close_timeout=timedelta(minutes=5), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=1), + maximum_interval=timedelta(seconds=30), + maximum_attempts=3 + ) + ) + results["steps"].append({ + "step": "download_target", + "status": "success", + "target_path": target_path + }) + workflow.logger.info(f"✓ User code downloaded to: {target_path}") + + # Step 2: Run Atheris fuzzing + workflow.logger.info("Step 2: Running Atheris fuzzing") + + # Use defaults if parameters are None + max_iterations = max_iterations if max_iterations is not None else 100000 + timeout_seconds = timeout_seconds if timeout_seconds is not None else 300 + + fuzz_config = { + "target_file": target_file, + "max_iterations": max_iterations, + "timeout_seconds": timeout_seconds + } + + fuzz_results = await workflow.execute_activity( + "fuzz_with_atheris", + args=[target_path, fuzz_config], + start_to_close_timeout=timedelta(seconds=timeout_seconds + 60), + retry_policy=RetryPolicy( + initial_interval=timedelta(seconds=2), + maximum_interval=timedelta(seconds=60), + maximum_attempts=1 # Fuzzing shouldn't retry + ) + ) + + results["steps"].append({ + "step": "fuzzing", + "status": "success", + "executions": fuzz_results.get("summary", {}).get("total_executions", 0), + "crashes": fuzz_results.get("summary", {}).get("crashes_found", 0) + }) + workflow.logger.info( + f"✓ Fuzzing completed: " + f"{fuzz_results.get('summary', {}).get('total_executions', 0)} executions, " + f"{fuzz_results.get('summary', {}).get('crashes_found', 0)} crashes" + ) + + # Step 3: Upload results to MinIO + workflow.logger.info("Step 3: Uploading results") + try: + results_url = await workflow.execute_activity( + "upload_results", + args=[workflow_id, fuzz_results, "json"], + start_to_close_timeout=timedelta(minutes=2) + ) + results["results_url"] = results_url + workflow.logger.info(f"✓ Results uploaded to: {results_url}") + except Exception as e: + workflow.logger.warning(f"Failed to upload results: {e}") + results["results_url"] = None + + # Step 4: Cleanup cache + workflow.logger.info("Step 4: Cleaning up cache") + try: + await workflow.execute_activity( + "cleanup_cache", + target_path, + start_to_close_timeout=timedelta(minutes=1) + ) + workflow.logger.info("✓ Cache cleaned up") + except Exception as e: + workflow.logger.warning(f"Cache cleanup failed: {e}") + + # Mark workflow as successful + results["status"] = "success" + results["findings"] = fuzz_results.get("findings", []) + results["summary"] = fuzz_results.get("summary", {}) + workflow.logger.info( + f"✓ Workflow completed successfully: {workflow_id} " + f"({results['summary'].get('crashes_found', 0)} crashes found)" + ) + + return results + + except Exception as e: + workflow.logger.error(f"Workflow failed: {e}") + results["status"] = "error" + results["error"] = str(e) + results["steps"].append({ + "step": "error", + "status": "failed", + "error": str(e) + }) + raise diff --git a/docker-compose.temporal.yaml b/docker-compose.temporal.yaml index 9dbdb73..e875af7 100644 --- a/docker-compose.temporal.yaml +++ b/docker-compose.temporal.yaml @@ -208,6 +208,66 @@ services: cpus: '1' memory: 512M + # ============================================================================ + # Vertical Worker: Python Fuzzing + # ============================================================================ + worker-python: + build: + context: ./workers/python + dockerfile: Dockerfile + container_name: fuzzforge-worker-python + depends_on: + postgresql: + condition: service_healthy + temporal: + condition: service_healthy + minio: + condition: service_healthy + environment: + # Temporal configuration + TEMPORAL_ADDRESS: temporal:7233 + TEMPORAL_NAMESPACE: default + + # Worker configuration + WORKER_VERTICAL: python + WORKER_TASK_QUEUE: python-queue + MAX_CONCURRENT_ACTIVITIES: 5 + + # Storage configuration (MinIO) + STORAGE_BACKEND: s3 + S3_ENDPOINT: http://minio:9000 + S3_ACCESS_KEY: fuzzforge + S3_SECRET_KEY: fuzzforge123 + S3_BUCKET: targets + S3_REGION: us-east-1 + S3_USE_SSL: "false" + + # Cache configuration + CACHE_DIR: /cache + CACHE_MAX_SIZE: 10GB + CACHE_TTL: 7d + + # Logging + LOG_LEVEL: INFO + PYTHONUNBUFFERED: 1 + volumes: + # Mount workflow code (read-only) for dynamic discovery + - ./backend/toolbox:/app/toolbox:ro + # Worker cache for downloaded targets + - worker_python_cache:/cache + networks: + - fuzzforge-network + restart: unless-stopped + # Resource limits (lighter than rust) + deploy: + resources: + limits: + cpus: '1' + memory: 1G + reservations: + cpus: '0.5' + memory: 256M + # ============================================================================ # Vertical Worker: Android Security # ============================================================================ @@ -329,6 +389,8 @@ volumes: name: fuzzforge_minio_data worker_rust_cache: name: fuzzforge_worker_rust_cache + worker_python_cache: + name: fuzzforge_worker_python_cache worker_android_cache: name: fuzzforge_worker_android_cache # Add more worker caches as you add verticals: diff --git a/test_projects/python_fuzz_waterfall/.gitignore b/test_projects/python_fuzz_waterfall/.gitignore new file mode 100644 index 0000000..ad398c8 --- /dev/null +++ b/test_projects/python_fuzz_waterfall/.gitignore @@ -0,0 +1,18 @@ +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +*.egg-info/ +dist/ +build/ + +# FuzzForge +.fuzzforge/ + +# Atheris fuzzing artifacts +corpus/ +crashes/ +*.profraw +*.profdata diff --git a/test_projects/python_fuzz_waterfall/README.md b/test_projects/python_fuzz_waterfall/README.md new file mode 100644 index 0000000..2987ed9 --- /dev/null +++ b/test_projects/python_fuzz_waterfall/README.md @@ -0,0 +1,137 @@ +# Python Fuzzing Test - Waterfall Vulnerability + +This project demonstrates a **stateful vulnerability** that Atheris can discover through fuzzing. + +## Vulnerability Description + +The `check_secret()` function in `main.py` validates input character by character against the secret string "FUZZINGLABS". This creates a **waterfall vulnerability** where: + +1. State leaks through the global `progress` variable +2. Each correct character advances the progress counter +3. When all 11 characters are provided in order, the function crashes with `SystemError` + +This pattern is analogous to: +- Timing attacks on password checkers +- Protocol state machines with sequential validation +- Multi-step authentication flows + +## Files + +- `main.py` - Main application with vulnerable `check_secret()` function +- `fuzz_target.py` - Atheris fuzzing harness (contains `TestOneInput()`) +- `README.md` - This file + +## How to Fuzz + +### Using FuzzForge CLI + +```bash +# Initialize FuzzForge in this directory +cd test_projects/python_fuzz_waterfall/ +ff init + +# Run fuzzing workflow (uploads code to MinIO) +ff workflow run atheris_fuzzing . + +# The workflow will: +# 1. Upload this directory to MinIO +# 2. Worker downloads and extracts the code +# 3. Worker discovers fuzz_target.py (has TestOneInput) +# 4. Worker runs Atheris fuzzing +# 5. Reports real-time stats every 5 seconds +# 6. Finds crash when "FUZZINGLABS" is discovered +``` + +### Using FuzzForge SDK + +```python +from fuzzforge_sdk import FuzzForgeClient +from pathlib import Path + +client = FuzzForgeClient(base_url="http://localhost:8000") + +# Upload and run fuzzing +response = client.submit_workflow_with_upload( + workflow_name="atheris_fuzzing", + target_path=Path("./"), + parameters={ + "max_iterations": 100000, + "timeout_seconds": 300 + } +) + +print(f"Workflow started: {response.run_id}") + +# Wait for completion +final_status = client.wait_for_completion(response.run_id) +findings = client.get_run_findings(response.run_id) + +for finding in findings: + print(f"Crash: {finding.title}") + print(f"Input: {finding.metadata.get('crash_input_hex')}") +``` + +### Standalone (Without FuzzForge) + +```bash +# Install Atheris +pip install atheris + +# Run fuzzing directly +python fuzz_target.py +``` + +## Expected Behavior + +When fuzzing: + +1. **Initial phase**: Random exploration, progress = 0 +2. **Discovery phase**: Atheris finds 'F' (first char), progress = 1 +3. **Incremental progress**: Finds 'U', then 'Z', etc. +4. **Crash**: When full "FUZZINGLABS" discovered, crashes with: + ``` + SystemError: SECRET COMPROMISED: FUZZINGLABS + ``` + +## Monitoring + +Watch real-time fuzzing stats: + +```bash +docker logs fuzzforge-worker-python -f | grep LIVE_STATS +``` + +Output example: +``` +INFO - LIVE_STATS - executions=1523 execs_per_sec=1523.0 crashes=0 +INFO - LIVE_STATS - executions=7842 execs_per_sec=2104.2 crashes=0 +INFO - LIVE_STATS - executions=15234 execs_per_sec=2167.0 crashes=1 ← Crash found! +``` + +## Vulnerability Details + +**CVE**: N/A (demonstration vulnerability) +**CWE**: CWE-208 (Observable Timing Discrepancy) +**Severity**: Critical (in real systems) + +**Fix**: Remove state-based checking or implement constant-time comparison: + +```python +def check_secret_safe(input_data: bytes) -> bool: + """Constant-time comparison""" + import hmac + return hmac.compare_digest(input_data, SECRET.encode()) +``` + +## Adjusting Difficulty + +If fuzzing finds the crash too quickly, extend the secret: + +```python +# In main.py, change: +SECRET = "FUZZINGLABSSECURITYTESTING" # 26 characters instead of 11 +``` + +## License + +MIT License - This is a demonstration project for educational purposes. diff --git a/test_projects/python_fuzz_waterfall/fuzz_target.py b/test_projects/python_fuzz_waterfall/fuzz_target.py new file mode 100644 index 0000000..5e474dc --- /dev/null +++ b/test_projects/python_fuzz_waterfall/fuzz_target.py @@ -0,0 +1,59 @@ +""" +Atheris fuzzing target for the waterfall vulnerability. + +This file is automatically discovered by FuzzForge's AtherisFuzzer module. +The fuzzer looks for files named: fuzz_*.py, *_fuzz.py, or fuzz_target.py +""" + +import sys +import atheris + +# Import the vulnerable function +from main import check_secret + + +def TestOneInput(data): + """ + Atheris fuzzing entry point. + + This function is called by Atheris for each fuzzing iteration. + The fuzzer will try to find inputs that cause crashes. + + Args: + data: Bytes to test (generated by Atheris) + + The waterfall vulnerability means: + - Random inputs will mostly fail (progress = 0) + - Atheris will discover inputs that make progress + - Eventually Atheris will find the complete secret "FUZZINGLABS" + - When found, check_secret() will crash with SystemError + """ + try: + check_secret(bytes(data)) + except SystemError: + # Let Atheris detect the crash + # This is the vulnerability we're trying to find! + raise + + +if __name__ == "__main__": + """ + Standalone fuzzing mode. + + Run directly: python fuzz_target.py + """ + print("=" * 60) + print("Atheris Fuzzing - Waterfall Vulnerability") + print("=" * 60) + print("Fuzzing will try to discover the secret string...") + print("Watch for progress indicators: [DEBUG] Progress: X/11") + print() + print("Press Ctrl+C to stop fuzzing") + print("=" * 60) + print() + + # Setup Atheris with command-line args + atheris.Setup(sys.argv, TestOneInput) + + # Start fuzzing + atheris.Fuzz() diff --git a/test_projects/python_fuzz_waterfall/main.py b/test_projects/python_fuzz_waterfall/main.py new file mode 100644 index 0000000..79e1b85 --- /dev/null +++ b/test_projects/python_fuzz_waterfall/main.py @@ -0,0 +1,96 @@ +""" +Example application with a stateful vulnerability. + +This simulates a password checking system that leaks state information +through a global progress variable - a classic waterfall vulnerability. +""" + +# Global state - simulates session state +progress = 0 +SECRET = "FUZZINGLABS" # 11 characters + + +def check_secret(input_data: bytes) -> bool: + """ + Vulnerable function: checks secret character by character. + + This is a waterfall vulnerability - state leaks through the progress variable. + + Real-world analogy: + - Timing attacks on password checkers + - Protocol state machines with sequential validation + - Multi-step authentication flows + + Args: + input_data: Input bytes to check + + Returns: + True if progress was made, False otherwise + + Raises: + SystemError: When complete secret is discovered (vulnerability trigger) + """ + global progress + + if len(input_data) > progress: + if input_data[progress] == ord(SECRET[progress]): + progress += 1 + + # Progress indicator (useful for monitoring during fuzzing) + if progress % 2 == 0: # Every 2 characters + print(f"[DEBUG] Progress: {progress}/{len(SECRET)} characters matched") + + # VULNERABILITY: Crashes when complete secret found + if progress == len(SECRET): + raise SystemError(f"SECRET COMPROMISED: {SECRET}") + + return True + else: + # Wrong character - reset progress + progress = 0 + return False + + return False + + +def reset_state(): + """Reset the global state (useful for testing)""" + global progress + progress = 0 + + +if __name__ == "__main__": + """Example usage showing the vulnerability""" + print("=" * 60) + print("Waterfall Vulnerability Demonstration") + print("=" * 60) + print(f"Secret: {SECRET}") + print(f"Secret length: {len(SECRET)} characters") + print() + + # Test inputs showing progressive discovery + test_inputs = [ + b"F", # First char correct + b"FU", # First two chars correct + b"FUZ", # First three chars correct + b"WRONG", # Wrong - resets progress + b"FUZZINGLABS", # Complete secret - triggers crash! + ] + + for test in test_inputs: + reset_state() # Start fresh for each test + print(f"Testing input: {test.decode(errors='ignore')!r}") + + try: + result = check_secret(test) + print(f" Result: {result}, Progress: {progress}/{len(SECRET)}") + except SystemError as e: + print(f" 💥 CRASH: {e}") + + print() + + print("=" * 60) + print("To fuzz this vulnerability with FuzzForge:") + print(" ff init") + print(" ff workflow run atheris_fuzzing .") + print("=" * 60) diff --git a/workers/python/Dockerfile b/workers/python/Dockerfile new file mode 100644 index 0000000..09b4689 --- /dev/null +++ b/workers/python/Dockerfile @@ -0,0 +1,47 @@ +# FuzzForge Vertical Worker: Python Fuzzing +# +# Pre-installed tools for Python fuzzing and security analysis: +# - Python 3.11 +# - Atheris (Python fuzzing) +# - Common Python security tools +# - Temporal worker + +FROM python:3.11-slim + +# Set working directory +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + # Build essentials for Atheris + build-essential \ + clang \ + llvm \ + # Development tools + git \ + curl \ + wget \ + # Cleanup + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies for Temporal worker +COPY requirements.txt /tmp/requirements.txt +RUN pip3 install --no-cache-dir -r /tmp/requirements.txt && \ + rm /tmp/requirements.txt + +# Create cache directory for downloaded targets +RUN mkdir -p /cache && chmod 755 /cache + +# Copy worker entrypoint +COPY worker.py /app/worker.py + +# Add toolbox to Python path (mounted at runtime) +ENV PYTHONPATH="/app:/app/toolbox:${PYTHONPATH}" +ENV PYTHONUNBUFFERED=1 + +# Healthcheck +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD python3 -c "import sys; sys.exit(0)" + +# Run worker +CMD ["python3", "/app/worker.py"] diff --git a/workers/python/requirements.txt b/workers/python/requirements.txt new file mode 100644 index 0000000..9997574 --- /dev/null +++ b/workers/python/requirements.txt @@ -0,0 +1,12 @@ +# Temporal worker dependencies +temporalio>=1.5.0 +pydantic>=2.0.0 + +# Storage (MinIO/S3) +boto3>=1.34.0 + +# Configuration +pyyaml>=6.0.0 + +# Fuzzing +atheris>=2.3.0 diff --git a/workers/python/worker.py b/workers/python/worker.py new file mode 100644 index 0000000..1254ab5 --- /dev/null +++ b/workers/python/worker.py @@ -0,0 +1,309 @@ +""" +FuzzForge Vertical Worker: Rust/Native Security + +This worker: +1. Discovers workflows for the 'rust' vertical from mounted toolbox +2. Dynamically imports and registers workflow classes +3. Connects to Temporal and processes tasks +4. Handles activities for target download/upload from MinIO +""" + +import asyncio +import importlib +import inspect +import logging +import os +import sys +from pathlib import Path +from typing import List, Any + +import yaml +from temporalio.client import Client +from temporalio.worker import Worker + +# Add toolbox to path for workflow and activity imports +sys.path.insert(0, '/app/toolbox') + +# Import common storage activities +from toolbox.common.storage_activities import ( + get_target_activity, + cleanup_cache_activity, + upload_results_activity +) + +# Configure logging +logging.basicConfig( + level=os.getenv('LOG_LEVEL', 'INFO'), + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +async def discover_workflows(vertical: str) -> List[Any]: + """ + Discover workflows for this vertical from mounted toolbox. + + Args: + vertical: The vertical name (e.g., 'rust', 'android', 'web') + + Returns: + List of workflow classes decorated with @workflow.defn + """ + workflows = [] + toolbox_path = Path("/app/toolbox/workflows") + + if not toolbox_path.exists(): + logger.warning(f"Toolbox path does not exist: {toolbox_path}") + return workflows + + logger.info(f"Scanning for workflows in: {toolbox_path}") + + for workflow_dir in toolbox_path.iterdir(): + if not workflow_dir.is_dir(): + continue + + # Skip special directories + if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__': + continue + + metadata_file = workflow_dir / "metadata.yaml" + if not metadata_file.exists(): + logger.debug(f"No metadata.yaml in {workflow_dir.name}, skipping") + continue + + try: + # Parse metadata + with open(metadata_file) as f: + metadata = yaml.safe_load(f) + + # Check if workflow is for this vertical + workflow_vertical = metadata.get("vertical") + if workflow_vertical != vertical: + logger.debug( + f"Workflow {workflow_dir.name} is for vertical '{workflow_vertical}', " + f"not '{vertical}', skipping" + ) + continue + + # Check if workflow.py exists + workflow_file = workflow_dir / "workflow.py" + if not workflow_file.exists(): + logger.warning( + f"Workflow {workflow_dir.name} has metadata but no workflow.py, skipping" + ) + continue + + # Dynamically import workflow module + module_name = f"toolbox.workflows.{workflow_dir.name}.workflow" + logger.info(f"Importing workflow module: {module_name}") + + try: + module = importlib.import_module(module_name) + except Exception as e: + logger.error( + f"Failed to import workflow module {module_name}: {e}", + exc_info=True + ) + continue + + # Find @workflow.defn decorated classes + found_workflows = False + for name, obj in inspect.getmembers(module, inspect.isclass): + # Check if class has Temporal workflow definition + if hasattr(obj, '__temporal_workflow_definition'): + workflows.append(obj) + found_workflows = True + logger.info( + f"✓ Discovered workflow: {name} from {workflow_dir.name} " + f"(vertical: {vertical})" + ) + + if not found_workflows: + logger.warning( + f"Workflow {workflow_dir.name} has no @workflow.defn decorated classes" + ) + + except Exception as e: + logger.error( + f"Error processing workflow {workflow_dir.name}: {e}", + exc_info=True + ) + continue + + logger.info(f"Discovered {len(workflows)} workflows for vertical '{vertical}'") + return workflows + + +async def discover_activities(workflows_dir: Path) -> List[Any]: + """ + Discover activities from workflow directories. + + Looks for activities.py files alongside workflow.py in each workflow directory. + + Args: + workflows_dir: Path to workflows directory + + Returns: + List of activity functions decorated with @activity.defn + """ + activities = [] + + if not workflows_dir.exists(): + logger.warning(f"Workflows directory does not exist: {workflows_dir}") + return activities + + logger.info(f"Scanning for workflow activities in: {workflows_dir}") + + for workflow_dir in workflows_dir.iterdir(): + if not workflow_dir.is_dir(): + continue + + # Skip special directories + if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__': + continue + + # Check if activities.py exists + activities_file = workflow_dir / "activities.py" + if not activities_file.exists(): + logger.debug(f"No activities.py in {workflow_dir.name}, skipping") + continue + + try: + # Dynamically import activities module + module_name = f"toolbox.workflows.{workflow_dir.name}.activities" + logger.info(f"Importing activities module: {module_name}") + + try: + module = importlib.import_module(module_name) + except Exception as e: + logger.error( + f"Failed to import activities module {module_name}: {e}", + exc_info=True + ) + continue + + # Find @activity.defn decorated functions + found_activities = False + for name, obj in inspect.getmembers(module, inspect.isfunction): + # Check if function has Temporal activity definition + if hasattr(obj, '__temporal_activity_definition'): + activities.append(obj) + found_activities = True + logger.info( + f"✓ Discovered activity: {name} from {workflow_dir.name}" + ) + + if not found_activities: + logger.warning( + f"Workflow {workflow_dir.name} has activities.py but no @activity.defn decorated functions" + ) + + except Exception as e: + logger.error( + f"Error processing activities from {workflow_dir.name}: {e}", + exc_info=True + ) + continue + + logger.info(f"Discovered {len(activities)} workflow-specific activities") + return activities + + +async def main(): + """Main worker entry point""" + # Get configuration from environment + vertical = os.getenv("WORKER_VERTICAL", "rust") + temporal_address = os.getenv("TEMPORAL_ADDRESS", "localhost:7233") + temporal_namespace = os.getenv("TEMPORAL_NAMESPACE", "default") + task_queue = os.getenv("WORKER_TASK_QUEUE", f"{vertical}-queue") + max_concurrent_activities = int(os.getenv("MAX_CONCURRENT_ACTIVITIES", "5")) + + logger.info("=" * 60) + logger.info(f"FuzzForge Vertical Worker: {vertical}") + logger.info("=" * 60) + logger.info(f"Temporal Address: {temporal_address}") + logger.info(f"Temporal Namespace: {temporal_namespace}") + logger.info(f"Task Queue: {task_queue}") + logger.info(f"Max Concurrent Activities: {max_concurrent_activities}") + logger.info("=" * 60) + + # Discover workflows for this vertical + logger.info(f"Discovering workflows for vertical: {vertical}") + workflows = await discover_workflows(vertical) + + if not workflows: + logger.error(f"No workflows found for vertical: {vertical}") + logger.error("Worker cannot start without workflows. Exiting...") + sys.exit(1) + + # Discover activities from workflow directories + logger.info("Discovering workflow-specific activities...") + workflows_dir = Path("/app/toolbox/workflows") + workflow_activities = await discover_activities(workflows_dir) + + # Combine common storage activities with workflow-specific activities + activities = [ + get_target_activity, + cleanup_cache_activity, + upload_results_activity + ] + workflow_activities + + logger.info( + f"Total activities registered: {len(activities)} " + f"(3 common + {len(workflow_activities)} workflow-specific)" + ) + + # Connect to Temporal + logger.info(f"Connecting to Temporal at {temporal_address}...") + try: + client = await Client.connect( + temporal_address, + namespace=temporal_namespace + ) + logger.info("✓ Connected to Temporal successfully") + except Exception as e: + logger.error(f"Failed to connect to Temporal: {e}", exc_info=True) + sys.exit(1) + + # Create worker with discovered workflows and activities + logger.info(f"Creating worker on task queue: {task_queue}") + + try: + worker = Worker( + client, + task_queue=task_queue, + workflows=workflows, + activities=activities, + max_concurrent_activities=max_concurrent_activities + ) + logger.info("✓ Worker created successfully") + except Exception as e: + logger.error(f"Failed to create worker: {e}", exc_info=True) + sys.exit(1) + + # Start worker + logger.info("=" * 60) + logger.info(f"🚀 Worker started for vertical '{vertical}'") + logger.info(f"📦 Registered {len(workflows)} workflows") + logger.info(f"⚙️ Registered {len(activities)} activities") + logger.info(f"📨 Listening on task queue: {task_queue}") + logger.info("=" * 60) + logger.info("Worker is ready to process tasks...") + + try: + await worker.run() + except KeyboardInterrupt: + logger.info("Shutting down worker (keyboard interrupt)...") + except Exception as e: + logger.error(f"Worker error: {e}", exc_info=True) + raise + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + logger.info("Worker stopped") + except Exception as e: + logger.error(f"Fatal error: {e}", exc_info=True) + sys.exit(1)