Files
fuzzforge_ai/backend/src/api/workflows.py
tduhamel42 8063f03d87 docs: Update README and fix worker startup instructions
README updates:
- Update docker compose command (now main docker-compose.yml)
- Remove obsolete insecure registries section (MinIO replaces local registry)
- Add .env configuration section for AI agent API keys

Worker management fixes:
- Add worker_service field to API response (backend)
- Fix CLI help message to use correct service name with 'docker compose up -d'
- Use modern 'docker compose' syntax instead of deprecated 'docker-compose'

This ensures users get correct instructions when workers aren't running.
2025-10-16 12:12:49 +02:00

635 lines
22 KiB
Python

"""
API endpoints for workflow management with enhanced error handling
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
import traceback
import tempfile
from typing import List, Dict, Any, Optional
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form
from pathlib import Path
from src.models.findings import (
WorkflowSubmission,
WorkflowMetadata,
WorkflowListItem,
RunSubmissionResponse
)
from src.temporal.discovery import WorkflowDiscovery
logger = logging.getLogger(__name__)
# Configuration for file uploads
MAX_UPLOAD_SIZE = 10 * 1024 * 1024 * 1024 # 10 GB
ALLOWED_CONTENT_TYPES = [
"application/gzip",
"application/x-gzip",
"application/x-tar",
"application/x-compressed-tar",
"application/octet-stream", # Generic binary
]
router = APIRouter(prefix="/workflows", tags=["workflows"])
def create_structured_error_response(
error_type: str,
message: str,
workflow_name: Optional[str] = None,
run_id: Optional[str] = None,
container_info: Optional[Dict[str, Any]] = None,
deployment_info: Optional[Dict[str, Any]] = None,
suggestions: Optional[List[str]] = None
) -> Dict[str, Any]:
"""Create a structured error response with rich context."""
error_response = {
"error": {
"type": error_type,
"message": message,
"timestamp": __import__("datetime").datetime.utcnow().isoformat() + "Z"
}
}
if workflow_name:
error_response["error"]["workflow_name"] = workflow_name
if run_id:
error_response["error"]["run_id"] = run_id
if container_info:
error_response["error"]["container"] = container_info
if deployment_info:
error_response["error"]["deployment"] = deployment_info
if suggestions:
error_response["error"]["suggestions"] = suggestions
return error_response
def get_temporal_manager():
"""Dependency to get the Temporal manager instance"""
from src.main import temporal_mgr
return temporal_mgr
@router.get("/", response_model=List[WorkflowListItem])
async def list_workflows(
temporal_mgr=Depends(get_temporal_manager)
) -> List[WorkflowListItem]:
"""
List all discovered workflows with their metadata.
Returns a summary of each workflow including name, version, description,
author, and tags.
"""
workflows = []
for name, info in temporal_mgr.workflows.items():
workflows.append(WorkflowListItem(
name=name,
version=info.metadata.get("version", "0.6.0"),
description=info.metadata.get("description", ""),
author=info.metadata.get("author"),
tags=info.metadata.get("tags", [])
))
return workflows
@router.get("/metadata/schema")
async def get_metadata_schema() -> Dict[str, Any]:
"""
Get the JSON schema for workflow metadata files.
This schema defines the structure and requirements for metadata.yaml files
that must accompany each workflow.
"""
return WorkflowDiscovery.get_metadata_schema()
@router.get("/{workflow_name}/metadata", response_model=WorkflowMetadata)
async def get_workflow_metadata(
workflow_name: str,
temporal_mgr=Depends(get_temporal_manager)
) -> WorkflowMetadata:
"""
Get complete metadata for a specific workflow.
Args:
workflow_name: Name of the workflow
Returns:
Complete metadata including parameters schema, supported volume modes,
required modules, and more.
Raises:
HTTPException: 404 if workflow not found
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows",
"Check workflow name spelling and case sensitivity"
]
)
raise HTTPException(
status_code=404,
detail=error_response
)
info = temporal_mgr.workflows[workflow_name]
metadata = info.metadata
return WorkflowMetadata(
name=workflow_name,
version=metadata.get("version", "0.6.0"),
description=metadata.get("description", ""),
author=metadata.get("author"),
tags=metadata.get("tags", []),
parameters=metadata.get("parameters", {}),
default_parameters=metadata.get("default_parameters", {}),
required_modules=metadata.get("required_modules", [])
)
@router.post("/{workflow_name}/submit", response_model=RunSubmissionResponse)
async def submit_workflow(
workflow_name: str,
submission: WorkflowSubmission,
temporal_mgr=Depends(get_temporal_manager)
) -> RunSubmissionResponse:
"""
Submit a workflow for execution.
Args:
workflow_name: Name of the workflow to execute
submission: Submission parameters including target path and parameters
Returns:
Run submission response with run_id and initial status
Raises:
HTTPException: 404 if workflow not found, 400 for invalid parameters
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows",
"Check workflow name spelling and case sensitivity"
]
)
raise HTTPException(
status_code=404,
detail=error_response
)
try:
# Upload target file to MinIO and get target_id
target_path = Path(submission.target_path)
if not target_path.exists():
raise ValueError(f"Target path does not exist: {submission.target_path}")
# Upload target (using anonymous user for now)
target_id = await temporal_mgr.upload_target(
file_path=target_path,
user_id="api-user",
metadata={"workflow": workflow_name}
)
# Merge default parameters with user parameters
workflow_info = temporal_mgr.workflows[workflow_name]
metadata = workflow_info.metadata or {}
defaults = metadata.get("default_parameters", {})
user_params = submission.parameters or {}
workflow_params = {**defaults, **user_params}
# Start workflow execution
handle = await temporal_mgr.run_workflow(
workflow_name=workflow_name,
target_id=target_id,
workflow_params=workflow_params
)
run_id = handle.id
# Initialize fuzzing tracking if this looks like a fuzzing workflow
workflow_info = temporal_mgr.workflows.get(workflow_name, {})
workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else []
if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower():
from src.api.fuzzing import initialize_fuzzing_tracking
initialize_fuzzing_tracking(run_id, workflow_name)
return RunSubmissionResponse(
run_id=run_id,
status="RUNNING",
workflow=workflow_name,
message=f"Workflow '{workflow_name}' submitted successfully"
)
except ValueError as e:
# Parameter validation errors
error_response = create_structured_error_response(
error_type="ValidationError",
message=str(e),
workflow_name=workflow_name,
suggestions=[
"Check parameter types and values",
"Use GET /workflows/{workflow_name}/parameters for schema",
"Ensure all required parameters are provided"
]
)
raise HTTPException(status_code=400, detail=error_response)
except Exception as e:
logger.error(f"Failed to submit workflow '{workflow_name}': {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
# Try to get more context about the error
container_info = None
deployment_info = None
suggestions = []
error_message = str(e)
error_type = "WorkflowSubmissionError"
# Detect specific error patterns
if "workflow" in error_message.lower() and "not found" in error_message.lower():
error_type = "WorkflowError"
suggestions.extend([
"Check if Temporal server is running and accessible",
"Verify workflow workers are running",
"Check if workflow is registered with correct vertical",
"Ensure Docker is running and has sufficient resources"
])
elif "volume" in error_message.lower() or "mount" in error_message.lower():
error_type = "VolumeError"
suggestions.extend([
"Check if the target path exists and is accessible",
"Verify file permissions (Docker needs read access)",
"Ensure the path is not in use by another process",
"Try using an absolute path instead of relative path"
])
elif "memory" in error_message.lower() or "resource" in error_message.lower():
error_type = "ResourceError"
suggestions.extend([
"Check system memory and CPU availability",
"Consider reducing resource limits or dataset size",
"Monitor Docker resource usage",
"Increase Docker memory limits if needed"
])
elif "image" in error_message.lower():
error_type = "ImageError"
suggestions.extend([
"Check if the workflow image exists",
"Verify Docker registry access",
"Try rebuilding the workflow image",
"Check network connectivity to registries"
])
else:
suggestions.extend([
"Check FuzzForge backend logs for details",
"Verify all services are running (docker-compose up -d)",
"Try restarting the workflow deployment",
"Contact support if the issue persists"
])
error_response = create_structured_error_response(
error_type=error_type,
message=f"Failed to submit workflow: {error_message}",
workflow_name=workflow_name,
container_info=container_info,
deployment_info=deployment_info,
suggestions=suggestions
)
raise HTTPException(
status_code=500,
detail=error_response
)
@router.post("/{workflow_name}/upload-and-submit", response_model=RunSubmissionResponse)
async def upload_and_submit_workflow(
workflow_name: str,
file: UploadFile = File(..., description="Target file or tarball to analyze"),
parameters: Optional[str] = Form(None, description="JSON-encoded workflow parameters"),
timeout: Optional[int] = Form(None, description="Timeout in seconds"),
temporal_mgr=Depends(get_temporal_manager)
) -> RunSubmissionResponse:
"""
Upload a target file/tarball and submit workflow for execution.
This endpoint accepts multipart/form-data uploads and is the recommended
way to submit workflows from remote CLI clients.
Args:
workflow_name: Name of the workflow to execute
file: Target file or tarball (compressed directory)
parameters: JSON string of workflow parameters (optional)
timeout: Execution timeout in seconds (optional)
Returns:
Run submission response with run_id and initial status
Raises:
HTTPException: 404 if workflow not found, 400 for invalid parameters,
413 if file too large
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows"
]
)
raise HTTPException(status_code=404, detail=error_response)
temp_file_path = None
try:
# Validate file size
file_size = 0
chunk_size = 1024 * 1024 # 1MB chunks
# Create temporary file
temp_fd, temp_file_path = tempfile.mkstemp(suffix=".tar.gz")
logger.info(f"Receiving file upload for workflow '{workflow_name}': {file.filename}")
# Stream file to disk
with open(temp_fd, 'wb') as temp_file:
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
file_size += len(chunk)
# Check size limit
if file_size > MAX_UPLOAD_SIZE:
raise HTTPException(
status_code=413,
detail=create_structured_error_response(
error_type="FileTooLarge",
message=f"File size exceeds maximum allowed size of {MAX_UPLOAD_SIZE / (1024**3):.1f} GB",
workflow_name=workflow_name,
suggestions=[
"Reduce the size of your target directory",
"Exclude unnecessary files (build artifacts, dependencies, etc.)",
"Consider splitting into smaller analysis targets"
]
)
)
temp_file.write(chunk)
logger.info(f"Received file: {file_size / (1024**2):.2f} MB")
# Parse parameters
workflow_params = {}
if parameters:
try:
import json
workflow_params = json.loads(parameters)
if not isinstance(workflow_params, dict):
raise ValueError("Parameters must be a JSON object")
except (json.JSONDecodeError, ValueError) as e:
raise HTTPException(
status_code=400,
detail=create_structured_error_response(
error_type="InvalidParameters",
message=f"Invalid parameters JSON: {e}",
workflow_name=workflow_name,
suggestions=["Ensure parameters is valid JSON object"]
)
)
# Upload to MinIO
target_id = await temporal_mgr.upload_target(
file_path=Path(temp_file_path),
user_id="api-user",
metadata={
"workflow": workflow_name,
"original_filename": file.filename,
"upload_method": "multipart"
}
)
logger.info(f"Uploaded to MinIO with target_id: {target_id}")
# Merge default parameters with user parameters
workflow_info = temporal_mgr.workflows.get(workflow_name)
metadata = workflow_info.metadata or {}
defaults = metadata.get("default_parameters", {})
workflow_params = {**defaults, **workflow_params}
# Start workflow execution
handle = await temporal_mgr.run_workflow(
workflow_name=workflow_name,
target_id=target_id,
workflow_params=workflow_params
)
run_id = handle.id
# Initialize fuzzing tracking if needed
workflow_info = temporal_mgr.workflows.get(workflow_name, {})
workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else []
if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower():
from src.api.fuzzing import initialize_fuzzing_tracking
initialize_fuzzing_tracking(run_id, workflow_name)
return RunSubmissionResponse(
run_id=run_id,
status="RUNNING",
workflow=workflow_name,
message=f"Workflow '{workflow_name}' submitted successfully with uploaded target"
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Failed to upload and submit workflow '{workflow_name}': {e}")
logger.error(f"Traceback: {traceback.format_exc()}")
error_response = create_structured_error_response(
error_type="WorkflowSubmissionError",
message=f"Failed to process upload and submit workflow: {str(e)}",
workflow_name=workflow_name,
suggestions=[
"Check if the uploaded file is a valid tarball",
"Verify MinIO storage is accessible",
"Check backend logs for detailed error information",
"Ensure Temporal workers are running"
]
)
raise HTTPException(status_code=500, detail=error_response)
finally:
# Cleanup temporary file
if temp_file_path and Path(temp_file_path).exists():
try:
Path(temp_file_path).unlink()
logger.debug(f"Cleaned up temp file: {temp_file_path}")
except Exception as e:
logger.warning(f"Failed to cleanup temp file {temp_file_path}: {e}")
@router.get("/{workflow_name}/worker-info")
async def get_workflow_worker_info(
workflow_name: str,
temporal_mgr=Depends(get_temporal_manager)
) -> Dict[str, Any]:
"""
Get worker information for a workflow.
Returns details about which worker is required to execute this workflow,
including container name, task queue, and vertical.
Args:
workflow_name: Name of the workflow
Returns:
Worker information including vertical, container name, and task queue
Raises:
HTTPException: 404 if workflow not found
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows"
]
)
raise HTTPException(
status_code=404,
detail=error_response
)
info = temporal_mgr.workflows[workflow_name]
metadata = info.metadata
# Extract vertical from metadata
vertical = metadata.get("vertical")
if not vertical:
error_response = create_structured_error_response(
error_type="MissingVertical",
message=f"Workflow '{workflow_name}' does not specify a vertical in metadata",
workflow_name=workflow_name,
suggestions=[
"Check workflow metadata.yaml for 'vertical' field",
"Contact workflow author for support"
]
)
raise HTTPException(
status_code=500,
detail=error_response
)
return {
"workflow": workflow_name,
"vertical": vertical,
"worker_container": f"fuzzforge-worker-{vertical}",
"worker_service": f"worker-{vertical}",
"task_queue": f"{vertical}-queue",
"required": True
}
@router.get("/{workflow_name}/parameters")
async def get_workflow_parameters(
workflow_name: str,
temporal_mgr=Depends(get_temporal_manager)
) -> Dict[str, Any]:
"""
Get the parameters schema for a workflow.
Args:
workflow_name: Name of the workflow
Returns:
Parameters schema with types, descriptions, and defaults
Raises:
HTTPException: 404 if workflow not found
"""
if workflow_name not in temporal_mgr.workflows:
available_workflows = list(temporal_mgr.workflows.keys())
error_response = create_structured_error_response(
error_type="WorkflowNotFound",
message=f"Workflow '{workflow_name}' not found",
workflow_name=workflow_name,
suggestions=[
f"Available workflows: {', '.join(available_workflows)}",
"Use GET /workflows/ to see all available workflows"
]
)
raise HTTPException(
status_code=404,
detail=error_response
)
info = temporal_mgr.workflows[workflow_name]
metadata = info.metadata
# Return parameters with enhanced schema information
parameters_schema = metadata.get("parameters", {})
# Extract the actual parameter definitions from JSON schema structure
if "properties" in parameters_schema:
param_definitions = parameters_schema["properties"]
else:
param_definitions = parameters_schema
# Add default values to the schema
default_params = metadata.get("default_parameters", {})
for param_name, param_schema in param_definitions.items():
if isinstance(param_schema, dict) and param_name in default_params:
param_schema["default"] = default_params[param_name]
return {
"workflow": workflow_name,
"parameters": param_definitions,
"default_parameters": default_params,
"required_parameters": [
name for name, schema in param_definitions.items()
if isinstance(schema, dict) and schema.get("required", False)
]
}