Files
fuzzforge_ai/backend/src/main.py
tduhamel42 60ca088ecf CI/CD Integration with Ephemeral Deployment Model (#14)
* feat: Complete migration from Prefect to Temporal

BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal

## Major Changes
- Replace Prefect with Temporal for workflow orchestration
- Implement vertical worker architecture (rust, android)
- Replace Docker registry with MinIO for unified storage
- Refactor activities to be co-located with workflows
- Update all API endpoints for Temporal compatibility

## Infrastructure
- New: docker-compose.temporal.yaml (Temporal + MinIO + workers)
- New: workers/ directory with rust and android vertical workers
- New: backend/src/temporal/ (manager, discovery)
- New: backend/src/storage/ (S3-cached storage with MinIO)
- New: backend/toolbox/common/ (shared storage activities)
- Deleted: docker-compose.yaml (old Prefect setup)
- Deleted: backend/src/core/prefect_manager.py
- Deleted: backend/src/services/prefect_stats_monitor.py
- Deleted: Docker registry and insecure-registries requirement

## Workflows
- Migrated: security_assessment workflow to Temporal
- New: rust_test workflow (example/test workflow)
- Deleted: secret_detection_scan (Prefect-based, to be reimplemented)
- Activities now co-located with workflows for independent testing

## API Changes
- Updated: backend/src/api/workflows.py (Temporal submission)
- Updated: backend/src/api/runs.py (Temporal status/results)
- Updated: backend/src/main.py (727 lines, TemporalManager integration)
- Updated: All 16 MCP tools to use TemporalManager

## Testing
-  All services healthy (Temporal, PostgreSQL, MinIO, workers, backend)
-  All API endpoints functional
-  End-to-end workflow test passed (72 findings from vulnerable_app)
-  MinIO storage integration working (target upload/download, results)
-  Worker activity discovery working (6 activities registered)
-  Tarball extraction working
-  SARIF report generation working

## Documentation
- ARCHITECTURE.md: Complete Temporal architecture documentation
- QUICKSTART_TEMPORAL.md: Getting started guide
- MIGRATION_DECISION.md: Why we chose Temporal over Prefect
- IMPLEMENTATION_STATUS.md: Migration progress tracking
- workers/README.md: Worker development guide

## Dependencies
- Added: temporalio>=1.6.0
- Added: boto3>=1.34.0 (MinIO S3 client)
- Removed: prefect>=3.4.18

* feat: Add Python fuzzing vertical with Atheris integration

This commit implements a complete Python fuzzing workflow using Atheris:

## Python Worker (workers/python/)
- Dockerfile with Python 3.11, Atheris, and build tools
- Generic worker.py for dynamic workflow discovery
- requirements.txt with temporalio, boto3, atheris dependencies
- Added to docker-compose.temporal.yaml with dedicated cache volume

## AtherisFuzzer Module (backend/toolbox/modules/fuzzer/)
- Reusable module extending BaseModule
- Auto-discovers fuzz targets (fuzz_*.py, *_fuzz.py, fuzz_target.py)
- Recursive search to find targets in nested directories
- Dynamically loads TestOneInput() function
- Configurable max_iterations and timeout
- Real-time stats callback support for live monitoring
- Returns findings as ModuleFinding objects

## Atheris Fuzzing Workflow (backend/toolbox/workflows/atheris_fuzzing/)
- Temporal workflow for orchestrating fuzzing
- Downloads user code from MinIO
- Executes AtherisFuzzer module
- Uploads results to MinIO
- Cleans up cache after execution
- metadata.yaml with vertical: python for routing

## Test Project (test_projects/python_fuzz_waterfall/)
- Demonstrates stateful waterfall vulnerability
- main.py with check_secret() that leaks progress
- fuzz_target.py with Atheris TestOneInput() harness
- Complete README with usage instructions

## Backend Fixes
- Fixed parameter merging in REST API endpoints (workflows.py)
- Changed workflow parameter passing from positional args to kwargs (manager.py)
- Default parameters now properly merged with user parameters

## Testing
 Worker discovered AtherisFuzzingWorkflow
 Workflow executed end-to-end successfully
 Fuzz target auto-discovered in nested directories
 Atheris ran 100,000 iterations
 Results uploaded and cache cleaned

* chore: Complete Temporal migration with updated CLI/SDK/docs

This commit includes all remaining Temporal migration changes:

## CLI Updates (cli/)
- Updated workflow execution commands for Temporal
- Enhanced error handling and exceptions
- Updated dependencies in uv.lock

## SDK Updates (sdk/)
- Client methods updated for Temporal workflows
- Updated models for new workflow execution
- Updated dependencies in uv.lock

## Documentation Updates (docs/)
- Architecture documentation for Temporal
- Workflow concept documentation
- Resource management documentation (new)
- Debugging guide (new)
- Updated tutorials and how-to guides
- Troubleshooting updates

## README Updates
- Main README with Temporal instructions
- Backend README
- CLI README
- SDK README

## Other
- Updated IMPLEMENTATION_STATUS.md
- Removed old vulnerable_app.tar.gz

These changes complete the Temporal migration and ensure the
CLI/SDK work correctly with the new backend.

* fix: Use positional args instead of kwargs for Temporal workflows

The Temporal Python SDK's start_workflow() method doesn't accept
a 'kwargs' parameter. Workflows must receive parameters as positional
arguments via the 'args' parameter.

Changed from:
  args=workflow_args  # Positional arguments

This fixes the error:
  TypeError: Client.start_workflow() got an unexpected keyword argument 'kwargs'

Workflows now correctly receive parameters in order:
- security_assessment: [target_id, scanner_config, analyzer_config, reporter_config]
- atheris_fuzzing: [target_id, target_file, max_iterations, timeout_seconds]
- rust_test: [target_id, test_message]

* fix: Filter metadata-only parameters from workflow arguments

SecurityAssessmentWorkflow was receiving 7 arguments instead of 2-5.
The issue was that target_path and volume_mode from default_parameters
were being passed to the workflow, when they should only be used by
the system for configuration.

Now filters out metadata-only parameters (target_path, volume_mode)
before passing arguments to workflow execution.

* refactor: Remove Prefect leftovers and volume mounting legacy

Complete cleanup of Prefect migration artifacts:

Backend:
- Delete registry.py and workflow_discovery.py (Prefect-specific files)
- Remove Docker validation from setup.py (no longer needed)
- Remove ResourceLimits and VolumeMount models
- Remove target_path and volume_mode from WorkflowSubmission
- Remove supported_volume_modes from API and discovery
- Clean up metadata.yaml files (remove volume/path fields)
- Simplify parameter filtering in manager.py

SDK:
- Remove volume_mode parameter from client methods
- Remove ResourceLimits and VolumeMount models
- Remove Prefect error patterns from docker_logs.py
- Clean up WorkflowSubmission and WorkflowMetadata models

CLI:
- Remove Volume Modes display from workflow info

All removed features are Prefect-specific or Docker volume mounting
artifacts. Temporal workflows use MinIO storage exclusively.

* feat: Add comprehensive test suite and benchmark infrastructure

- Add 68 unit tests for fuzzer, scanner, and analyzer modules
- Implement pytest-based test infrastructure with fixtures
- Add 6 performance benchmarks with category-specific thresholds
- Configure GitHub Actions for automated testing and benchmarking
- Add test and benchmark documentation

Test coverage:
- AtherisFuzzer: 8 tests
- CargoFuzzer: 14 tests
- FileScanner: 22 tests
- SecurityAnalyzer: 24 tests

All tests passing (68/68)
All benchmarks passing (6/6)

* fix: Resolve all ruff linting violations across codebase

Fixed 27 ruff violations in 12 files:
- Removed unused imports (Depends, Dict, Any, Optional, etc.)
- Fixed undefined workflow_info variable in workflows.py
- Removed dead code with undefined variables in atheris_fuzzer.py
- Changed f-string to regular string where no placeholders used

All files now pass ruff checks for CI/CD compliance.

* fix: Configure CI for unit tests only

- Renamed docker-compose.temporal.yaml → docker-compose.yml for CI compatibility
- Commented out integration-tests job (no integration tests yet)
- Updated test-summary to only depend on lint and unit-tests

CI will now run successfully with 68 unit tests. Integration tests can be added later.

* feat: Add CI/CD integration with ephemeral deployment model

Implements comprehensive CI/CD support for FuzzForge with on-demand worker management:

**Worker Management (v0.7.0)**
- Add WorkerManager for automatic worker lifecycle control
- Auto-start workers from stopped state when workflows execute
- Auto-stop workers after workflow completion
- Health checks and startup timeout handling (90s default)

**CI/CD Features**
- `--fail-on` flag: Fail builds based on SARIF severity levels (error/warning/note/info)
- `--export-sarif` flag: Export findings in SARIF 2.1.0 format
- `--auto-start`/`--auto-stop` flags: Control worker lifecycle
- Exit code propagation: Returns 1 on blocking findings, 0 on success

**Exit Code Fix**
- Add `except typer.Exit: raise` handlers at 3 critical locations
- Move worker cleanup to finally block for guaranteed execution
- Exit codes now propagate correctly even when build fails

**CI Scripts & Examples**
- ci-start.sh: Start FuzzForge services with health checks
- ci-stop.sh: Clean shutdown with volume preservation option
- GitHub Actions workflow example (security-scan.yml)
- GitLab CI pipeline example (.gitlab-ci.example.yml)
- docker-compose.ci.yml: CI-optimized compose file with profiles

**OSS-Fuzz Integration**
- New ossfuzz_campaign workflow for running OSS-Fuzz projects
- OSS-Fuzz worker with Docker-in-Docker support
- Configurable campaign duration and project selection

**Documentation**
- Comprehensive CI/CD integration guide (docs/how-to/cicd-integration.md)
- Updated architecture docs with worker lifecycle details
- Updated workspace isolation documentation
- CLI README with worker management examples

**SDK Enhancements**
- Add get_workflow_worker_info() endpoint
- Worker vertical metadata in workflow responses

**Testing**
- All workflows tested: security_assessment, atheris_fuzzing, secret_detection, cargo_fuzzing
- All monitoring commands tested: stats, crashes, status, finding
- Full CI pipeline simulation verified
- Exit codes verified for success/failure scenarios

Ephemeral CI/CD model: ~3-4GB RAM, ~60-90s startup, runs entirely in CI containers.

* fix: Resolve ruff linting violations in CI/CD code

- Remove unused variables (run_id, defaults, result)
- Remove unused imports
- Fix f-string without placeholders

All CI/CD integration files now pass ruff checks.
2025-10-14 10:13:45 +02:00

726 lines
26 KiB
Python

# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import logging
import os
from contextlib import AsyncExitStack, asynccontextmanager, suppress
from typing import Any, Dict, Optional, List
import uvicorn
from fastapi import FastAPI
from starlette.applications import Starlette
from starlette.routing import Mount
from fastmcp.server.http import create_sse_app
from src.temporal.manager import TemporalManager
from src.core.setup import setup_result_storage, validate_infrastructure
from src.api import workflows, runs, fuzzing
from fastmcp import FastMCP
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
temporal_mgr = TemporalManager()
class TemporalBootstrapState:
"""Tracks Temporal initialization progress for API and MCP consumers."""
def __init__(self) -> None:
self.ready: bool = False
self.status: str = "not_started"
self.last_error: Optional[str] = None
self.task_running: bool = False
def as_dict(self) -> Dict[str, Any]:
return {
"ready": self.ready,
"status": self.status,
"last_error": self.last_error,
"task_running": self.task_running,
}
temporal_bootstrap_state = TemporalBootstrapState()
# Configure retry strategy for bootstrapping Temporal + infrastructure
STARTUP_RETRY_SECONDS = max(1, int(os.getenv("FUZZFORGE_STARTUP_RETRY_SECONDS", "5")))
STARTUP_RETRY_MAX_SECONDS = max(
STARTUP_RETRY_SECONDS,
int(os.getenv("FUZZFORGE_STARTUP_RETRY_MAX_SECONDS", "60")),
)
temporal_bootstrap_task: Optional[asyncio.Task] = None
# ---------------------------------------------------------------------------
# FastAPI application (REST API)
# ---------------------------------------------------------------------------
app = FastAPI(
title="FuzzForge API",
description="Security testing workflow orchestration API with fuzzing support",
version="0.6.0",
)
app.include_router(workflows.router)
app.include_router(runs.router)
app.include_router(fuzzing.router)
def get_temporal_status() -> Dict[str, Any]:
"""Return a snapshot of Temporal bootstrap state for diagnostics."""
status = temporal_bootstrap_state.as_dict()
status["workflows_loaded"] = len(temporal_mgr.workflows)
status["bootstrap_task_running"] = (
temporal_bootstrap_task is not None and not temporal_bootstrap_task.done()
)
return status
def _temporal_not_ready_status() -> Optional[Dict[str, Any]]:
"""Return status details if Temporal is not ready yet."""
status = get_temporal_status()
if status.get("ready"):
return None
return status
@app.get("/")
async def root() -> Dict[str, Any]:
status = get_temporal_status()
return {
"name": "FuzzForge API",
"version": "0.6.0",
"status": "ready" if status.get("ready") else "initializing",
"workflows_loaded": status.get("workflows_loaded", 0),
"temporal": status,
}
@app.get("/health")
async def health() -> Dict[str, str]:
status = get_temporal_status()
health_status = "healthy" if status.get("ready") else "initializing"
return {"status": health_status}
# Map FastAPI OpenAPI operationIds to readable MCP tool names
FASTAPI_MCP_NAME_OVERRIDES: Dict[str, str] = {
"list_workflows_workflows__get": "api_list_workflows",
"get_metadata_schema_workflows_metadata_schema_get": "api_get_metadata_schema",
"get_workflow_metadata_workflows__workflow_name__metadata_get": "api_get_workflow_metadata",
"submit_workflow_workflows__workflow_name__submit_post": "api_submit_workflow",
"get_workflow_parameters_workflows__workflow_name__parameters_get": "api_get_workflow_parameters",
"get_run_status_runs__run_id__status_get": "api_get_run_status",
"get_run_findings_runs__run_id__findings_get": "api_get_run_findings",
"get_workflow_findings_runs__workflow_name__findings__run_id__get": "api_get_workflow_findings",
"get_fuzzing_stats_fuzzing__run_id__stats_get": "api_get_fuzzing_stats",
"update_fuzzing_stats_fuzzing__run_id__stats_post": "api_update_fuzzing_stats",
"get_crash_reports_fuzzing__run_id__crashes_get": "api_get_crash_reports",
"report_crash_fuzzing__run_id__crash_post": "api_report_crash",
"stream_fuzzing_updates_fuzzing__run_id__stream_get": "api_stream_fuzzing_updates",
"cleanup_fuzzing_run_fuzzing__run_id__delete": "api_cleanup_fuzzing_run",
"root__get": "api_root",
"health_health_get": "api_health",
}
# Create an MCP adapter exposing all FastAPI endpoints via OpenAPI parsing
FASTAPI_MCP_ADAPTER = FastMCP.from_fastapi(
app,
name="FuzzForge FastAPI",
mcp_names=FASTAPI_MCP_NAME_OVERRIDES,
)
_fastapi_mcp_imported = False
# ---------------------------------------------------------------------------
# FastMCP server (runs on dedicated port outside FastAPI)
# ---------------------------------------------------------------------------
mcp = FastMCP(name="FuzzForge MCP")
async def _bootstrap_temporal_with_retries() -> None:
"""Initialize Temporal infrastructure with exponential backoff retries."""
attempt = 0
while True:
attempt += 1
temporal_bootstrap_state.task_running = True
temporal_bootstrap_state.status = "starting"
temporal_bootstrap_state.ready = False
temporal_bootstrap_state.last_error = None
try:
logger.info("Bootstrapping Temporal infrastructure...")
await validate_infrastructure()
await setup_result_storage()
await temporal_mgr.initialize()
temporal_bootstrap_state.ready = True
temporal_bootstrap_state.status = "ready"
temporal_bootstrap_state.task_running = False
logger.info("Temporal infrastructure ready")
return
except asyncio.CancelledError:
temporal_bootstrap_state.status = "cancelled"
temporal_bootstrap_state.task_running = False
logger.info("Temporal bootstrap task cancelled")
raise
except Exception as exc: # pragma: no cover - defensive logging on infra startup
logger.exception("Temporal bootstrap failed")
temporal_bootstrap_state.ready = False
temporal_bootstrap_state.status = "error"
temporal_bootstrap_state.last_error = str(exc)
# Ensure partial initialization does not leave stale state behind
temporal_mgr.workflows.clear()
wait_time = min(
STARTUP_RETRY_SECONDS * (2 ** (attempt - 1)),
STARTUP_RETRY_MAX_SECONDS,
)
logger.info("Retrying Temporal bootstrap in %s second(s)", wait_time)
try:
await asyncio.sleep(wait_time)
except asyncio.CancelledError:
temporal_bootstrap_state.status = "cancelled"
temporal_bootstrap_state.task_running = False
raise
def _lookup_workflow(workflow_name: str):
info = temporal_mgr.workflows.get(workflow_name)
if not info:
return None
metadata = info.metadata
defaults = metadata.get("default_parameters", {})
default_target_path = metadata.get("default_target_path") or defaults.get("target_path")
supported_modes = metadata.get("supported_volume_modes") or ["ro", "rw"]
if not isinstance(supported_modes, list) or not supported_modes:
supported_modes = ["ro", "rw"]
default_volume_mode = (
metadata.get("default_volume_mode")
or defaults.get("volume_mode")
or supported_modes[0]
)
return {
"name": workflow_name,
"version": metadata.get("version", "0.6.0"),
"description": metadata.get("description", ""),
"author": metadata.get("author"),
"tags": metadata.get("tags", []),
"parameters": metadata.get("parameters", {}),
"default_parameters": metadata.get("default_parameters", {}),
"required_modules": metadata.get("required_modules", []),
"supported_volume_modes": supported_modes,
"default_target_path": default_target_path,
"default_volume_mode": default_volume_mode
}
@mcp.tool
async def list_workflows_mcp() -> Dict[str, Any]:
"""List all discovered workflows and their metadata summary."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"workflows": [],
"temporal": not_ready,
"message": "Temporal infrastructure is still initializing",
}
workflows_summary = []
for name, info in temporal_mgr.workflows.items():
metadata = info.metadata
defaults = metadata.get("default_parameters", {})
workflows_summary.append({
"name": name,
"version": metadata.get("version", "0.6.0"),
"description": metadata.get("description", ""),
"author": metadata.get("author"),
"tags": metadata.get("tags", []),
"supported_volume_modes": metadata.get("supported_volume_modes", ["ro", "rw"]),
"default_volume_mode": metadata.get("default_volume_mode")
or defaults.get("volume_mode")
or "ro",
"default_target_path": metadata.get("default_target_path")
or defaults.get("target_path")
})
return {"workflows": workflows_summary, "temporal": get_temporal_status()}
@mcp.tool
async def get_workflow_metadata_mcp(workflow_name: str) -> Dict[str, Any]:
"""Fetch detailed metadata for a workflow."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
data = _lookup_workflow(workflow_name)
if not data:
return {"error": f"Workflow not found: {workflow_name}"}
return data
@mcp.tool
async def get_workflow_parameters_mcp(workflow_name: str) -> Dict[str, Any]:
"""Return the parameter schema and defaults for a workflow."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
data = _lookup_workflow(workflow_name)
if not data:
return {"error": f"Workflow not found: {workflow_name}"}
return {
"parameters": data.get("parameters", {}),
"defaults": data.get("default_parameters", {}),
}
@mcp.tool
async def get_workflow_metadata_schema_mcp() -> Dict[str, Any]:
"""Return the JSON schema describing workflow metadata files."""
from src.temporal.discovery import WorkflowDiscovery
return WorkflowDiscovery.get_metadata_schema()
@mcp.tool
async def submit_security_scan_mcp(
workflow_name: str,
target_id: str,
parameters: Dict[str, Any] | None = None,
) -> Dict[str, Any] | Dict[str, str]:
"""Submit a Temporal workflow via MCP."""
try:
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
workflow_info = temporal_mgr.workflows.get(workflow_name)
if not workflow_info:
return {"error": f"Workflow '{workflow_name}' not found"}
metadata = workflow_info.metadata or {}
defaults = metadata.get("default_parameters", {})
parameters = parameters or {}
cleaned_parameters: Dict[str, Any] = {**defaults, **parameters}
# Ensure *_config structures default to dicts
for key, value in list(cleaned_parameters.items()):
if isinstance(key, str) and key.endswith("_config") and value is None:
cleaned_parameters[key] = {}
# Some workflows expect configuration dictionaries even when omitted
parameter_definitions = (
metadata.get("parameters", {}).get("properties", {})
if isinstance(metadata.get("parameters"), dict)
else {}
)
for key, definition in parameter_definitions.items():
if not isinstance(key, str) or not key.endswith("_config"):
continue
if key not in cleaned_parameters:
default_value = definition.get("default") if isinstance(definition, dict) else None
cleaned_parameters[key] = default_value if default_value is not None else {}
elif cleaned_parameters[key] is None:
cleaned_parameters[key] = {}
# Start workflow
handle = await temporal_mgr.run_workflow(
workflow_name=workflow_name,
target_id=target_id,
workflow_params=cleaned_parameters,
)
return {
"run_id": handle.id,
"status": "RUNNING",
"workflow": workflow_name,
"message": f"Workflow '{workflow_name}' submitted successfully",
"target_id": target_id,
"parameters": cleaned_parameters,
"mcp_enabled": True,
}
except Exception as exc: # pragma: no cover - defensive logging
logger.exception("MCP submit failed")
return {"error": f"Failed to submit workflow: {exc}"}
@mcp.tool
async def get_comprehensive_scan_summary(run_id: str) -> Dict[str, Any] | Dict[str, str]:
"""Return a summary for the given workflow run via MCP."""
try:
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
status = await temporal_mgr.get_workflow_status(run_id)
# Try to get result if completed
total_findings = 0
severity_summary = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0}
if status.get("status") == "COMPLETED":
try:
result = await temporal_mgr.get_workflow_result(run_id)
if isinstance(result, dict):
summary = result.get("summary", {})
total_findings = summary.get("total_findings", 0)
except Exception as e:
logger.debug(f"Could not retrieve result for {run_id}: {e}")
return {
"run_id": run_id,
"workflow": "unknown", # Temporal doesn't track workflow name in status
"status": status.get("status", "unknown"),
"is_completed": status.get("status") == "COMPLETED",
"total_findings": total_findings,
"severity_summary": severity_summary,
"scan_duration": status.get("close_time", "In progress"),
"recommendations": (
[
"Review high and critical severity findings first",
"Implement security fixes based on finding recommendations",
"Re-run scan after applying fixes to verify remediation",
]
if total_findings > 0
else ["No security issues found"]
),
"mcp_analysis": True,
}
except Exception as exc: # pragma: no cover
logger.exception("MCP summary failed")
return {"error": f"Failed to summarize run: {exc}"}
@mcp.tool
async def get_run_status_mcp(run_id: str) -> Dict[str, Any]:
"""Return current status information for a Temporal run."""
try:
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
status = await temporal_mgr.get_workflow_status(run_id)
return {
"run_id": run_id,
"workflow": "unknown",
"status": status["status"],
"is_completed": status["status"] in ["COMPLETED", "FAILED", "CANCELLED"],
"is_failed": status["status"] == "FAILED",
"is_running": status["status"] == "RUNNING",
"created_at": status.get("start_time"),
"updated_at": status.get("close_time") or status.get("execution_time"),
}
except Exception as exc:
logger.exception("MCP run status failed")
return {"error": f"Failed to get run status: {exc}"}
@mcp.tool
async def get_run_findings_mcp(run_id: str) -> Dict[str, Any]:
"""Return SARIF findings for a completed run."""
try:
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
status = await temporal_mgr.get_workflow_status(run_id)
if status.get("status") != "COMPLETED":
return {"error": f"Run {run_id} not completed. Status: {status.get('status')}"}
result = await temporal_mgr.get_workflow_result(run_id)
metadata = {
"completion_time": status.get("close_time"),
"workflow_version": "unknown",
}
sarif = result.get("sarif", {}) if isinstance(result, dict) else {}
return {
"workflow": "unknown",
"run_id": run_id,
"sarif": sarif,
"metadata": metadata,
}
except Exception as exc:
logger.exception("MCP findings failed")
return {"error": f"Failed to retrieve findings: {exc}"}
@mcp.tool
async def list_recent_runs_mcp(
limit: int = 10,
workflow_name: str | None = None,
) -> Dict[str, Any]:
"""List recent Temporal runs with optional workflow filter."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"runs": [],
"temporal": not_ready,
"message": "Temporal infrastructure is still initializing",
}
try:
limit_value = int(limit)
except (TypeError, ValueError):
limit_value = 10
limit_value = max(1, min(limit_value, 100))
try:
# Build filter query
filter_query = None
if workflow_name:
workflow_info = temporal_mgr.workflows.get(workflow_name)
if workflow_info:
filter_query = f'WorkflowType="{workflow_info.workflow_type}"'
workflows = await temporal_mgr.list_workflows(filter_query, limit_value)
results: List[Dict[str, Any]] = []
for wf in workflows:
results.append({
"run_id": wf["workflow_id"],
"workflow": workflow_name or "unknown",
"state": wf["status"],
"state_type": wf["status"],
"is_completed": wf["status"] in ["COMPLETED", "FAILED", "CANCELLED"],
"is_running": wf["status"] == "RUNNING",
"is_failed": wf["status"] == "FAILED",
"created_at": wf.get("start_time"),
"updated_at": wf.get("close_time"),
})
return {"runs": results, "temporal": get_temporal_status()}
except Exception as exc:
logger.exception("Failed to list runs")
return {
"runs": [],
"temporal": get_temporal_status(),
"error": str(exc)
}
@mcp.tool
async def get_fuzzing_stats_mcp(run_id: str) -> Dict[str, Any]:
"""Return fuzzing statistics for a run if available."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
stats = fuzzing.fuzzing_stats.get(run_id)
if not stats:
return {"error": f"Fuzzing run not found: {run_id}"}
# Be resilient if a plain dict slipped into the cache
if isinstance(stats, dict):
return stats
if hasattr(stats, "model_dump"):
return stats.model_dump()
if hasattr(stats, "dict"):
return stats.dict()
# Last resort
return getattr(stats, "__dict__", {"run_id": run_id})
@mcp.tool
async def get_fuzzing_crash_reports_mcp(run_id: str) -> Dict[str, Any]:
"""Return crash reports collected for a fuzzing run."""
not_ready = _temporal_not_ready_status()
if not_ready:
return {
"error": "Temporal infrastructure not ready",
"temporal": not_ready,
}
reports = fuzzing.crash_reports.get(run_id)
if reports is None:
return {"error": f"Fuzzing run not found: {run_id}"}
return {"run_id": run_id, "crashes": [report.model_dump() for report in reports]}
@mcp.tool
async def get_backend_status_mcp() -> Dict[str, Any]:
"""Expose backend readiness, workflows, and registered MCP tools."""
status = get_temporal_status()
response: Dict[str, Any] = {"temporal": status}
if status.get("ready"):
response["workflows"] = list(temporal_mgr.workflows.keys())
try:
tools = await mcp._tool_manager.list_tools()
response["mcp_tools"] = sorted(tool.name for tool in tools)
except Exception as exc: # pragma: no cover - defensive logging
logger.debug("Failed to enumerate MCP tools: %s", exc)
return response
def create_mcp_transport_app() -> Starlette:
"""Build a Starlette app serving HTTP + SSE transports on one port."""
http_app = mcp.http_app(path="/", transport="streamable-http")
sse_app = create_sse_app(
server=mcp,
message_path="/messages",
sse_path="/",
auth=mcp.auth,
)
routes = [
Mount("/mcp", app=http_app),
Mount("/mcp/sse", app=sse_app),
]
@asynccontextmanager
async def lifespan(app: Starlette): # pragma: no cover - integration wiring
async with AsyncExitStack() as stack:
await stack.enter_async_context(
http_app.router.lifespan_context(http_app)
)
await stack.enter_async_context(
sse_app.router.lifespan_context(sse_app)
)
yield
combined_app = Starlette(routes=routes, lifespan=lifespan)
combined_app.state.fastmcp_server = mcp
combined_app.state.http_app = http_app
combined_app.state.sse_app = sse_app
return combined_app
# ---------------------------------------------------------------------------
# Combined lifespan: Temporal init + dedicated MCP transports
# ---------------------------------------------------------------------------
@asynccontextmanager
async def combined_lifespan(app: FastAPI):
global temporal_bootstrap_task, _fastapi_mcp_imported
logger.info("Starting FuzzForge backend...")
# Ensure FastAPI endpoints are exposed via MCP once
if not _fastapi_mcp_imported:
try:
await mcp.import_server(FASTAPI_MCP_ADAPTER)
_fastapi_mcp_imported = True
logger.info("Mounted FastAPI endpoints as MCP tools")
except Exception as exc:
logger.exception("Failed to import FastAPI endpoints into MCP", exc_info=exc)
# Kick off Temporal bootstrap in the background if needed
if temporal_bootstrap_task is None or temporal_bootstrap_task.done():
temporal_bootstrap_task = asyncio.create_task(_bootstrap_temporal_with_retries())
logger.info("Temporal bootstrap task started")
else:
logger.info("Temporal bootstrap task already running")
# Start MCP transports on shared port (HTTP + SSE)
mcp_app = create_mcp_transport_app()
mcp_config = uvicorn.Config(
app=mcp_app,
host="0.0.0.0",
port=8010,
log_level="info",
lifespan="on",
)
mcp_server = uvicorn.Server(mcp_config)
mcp_server.install_signal_handlers = lambda: None # type: ignore[assignment]
mcp_task = asyncio.create_task(mcp_server.serve())
async def _wait_for_uvicorn_startup() -> None:
started_attr = getattr(mcp_server, "started", None)
if hasattr(started_attr, "wait"):
await asyncio.wait_for(started_attr.wait(), timeout=10)
return
# Fallback for uvicorn versions where "started" is a bool
poll_interval = 0.1
checks = int(10 / poll_interval)
for _ in range(checks):
if getattr(mcp_server, "started", False):
return
await asyncio.sleep(poll_interval)
raise asyncio.TimeoutError
try:
await _wait_for_uvicorn_startup()
except asyncio.TimeoutError: # pragma: no cover - defensive logging
if mcp_task.done():
raise RuntimeError("MCP server failed to start") from mcp_task.exception()
logger.warning("Timed out waiting for MCP server startup; continuing anyway")
logger.info("MCP HTTP available at http://0.0.0.0:8010/mcp")
logger.info("MCP SSE available at http://0.0.0.0:8010/mcp/sse")
try:
yield
finally:
logger.info("Shutting down MCP transports...")
mcp_server.should_exit = True
mcp_server.force_exit = True
await asyncio.gather(mcp_task, return_exceptions=True)
if temporal_bootstrap_task and not temporal_bootstrap_task.done():
temporal_bootstrap_task.cancel()
with suppress(asyncio.CancelledError):
await temporal_bootstrap_task
temporal_bootstrap_state.task_running = False
if not temporal_bootstrap_state.ready:
temporal_bootstrap_state.status = "stopped"
temporal_bootstrap_task = None
# Close Temporal client
await temporal_mgr.close()
logger.info("Shutting down FuzzForge backend...")
app.router.lifespan_context = combined_lifespan