Files
fuzzforge_ai/backend/toolbox/workflows/registry.py
Tanguy Duhamel 323a434c73 Initial commit
2025-09-29 21:26:41 +02:00

187 lines
6.0 KiB
Python

"""
Manual Workflow Registry for Prefect Deployment
This file contains the manual registry of all workflows that can be deployed.
Developers MUST add their workflows here after creating them.
This approach is required because:
1. Prefect cannot deploy dynamically imported flows
2. Docker deployment needs static flow references
3. Explicit registration provides better control and visibility
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from typing import Dict, Any, Callable
import logging
logger = logging.getLogger(__name__)
# Import only essential workflows
# Import each workflow individually to handle failures gracefully
security_assessment_flow = None
secret_detection_flow = None
# Try to import each workflow individually
try:
from .security_assessment.workflow import main_flow as security_assessment_flow
except ImportError as e:
logger.warning(f"Failed to import security_assessment workflow: {e}")
try:
from .comprehensive.secret_detection_scan.workflow import main_flow as secret_detection_flow
except ImportError as e:
logger.warning(f"Failed to import secret_detection_scan workflow: {e}")
# Manual registry - developers add workflows here after creation
# Only include workflows that were successfully imported
WORKFLOW_REGISTRY: Dict[str, Dict[str, Any]] = {}
# Add workflows that were successfully imported
if security_assessment_flow is not None:
WORKFLOW_REGISTRY["security_assessment"] = {
"flow": security_assessment_flow,
"module_path": "toolbox.workflows.security_assessment.workflow",
"function_name": "main_flow",
"description": "Comprehensive security assessment workflow that scans files, analyzes code for vulnerabilities, and generates SARIF reports",
"version": "1.0.0",
"author": "FuzzForge Team",
"tags": ["security", "scanner", "analyzer", "static-analysis", "sarif"]
}
if secret_detection_flow is not None:
WORKFLOW_REGISTRY["secret_detection_scan"] = {
"flow": secret_detection_flow,
"module_path": "toolbox.workflows.comprehensive.secret_detection_scan.workflow",
"function_name": "main_flow",
"description": "Comprehensive secret detection using TruffleHog and Gitleaks for thorough credential scanning",
"version": "1.0.0",
"author": "FuzzForge Team",
"tags": ["secrets", "credentials", "detection", "trufflehog", "gitleaks", "comprehensive"]
}
#
# To add a new workflow, follow this pattern:
#
# "my_new_workflow": {
# "flow": my_new_flow_function, # Import the flow function above
# "module_path": "toolbox.workflows.my_new_workflow.workflow",
# "function_name": "my_new_flow_function",
# "description": "Description of what this workflow does",
# "version": "1.0.0",
# "author": "Developer Name",
# "tags": ["tag1", "tag2"]
# }
def get_workflow_flow(workflow_name: str) -> Callable:
"""
Get the flow function for a workflow.
Args:
workflow_name: Name of the workflow
Returns:
Flow function
Raises:
KeyError: If workflow not found in registry
"""
if workflow_name not in WORKFLOW_REGISTRY:
available = list(WORKFLOW_REGISTRY.keys())
raise KeyError(
f"Workflow '{workflow_name}' not found in registry. "
f"Available workflows: {available}. "
f"Please add the workflow to toolbox/workflows/registry.py"
)
return WORKFLOW_REGISTRY[workflow_name]["flow"]
def get_workflow_info(workflow_name: str) -> Dict[str, Any]:
"""
Get registry information for a workflow.
Args:
workflow_name: Name of the workflow
Returns:
Registry information dictionary
Raises:
KeyError: If workflow not found in registry
"""
if workflow_name not in WORKFLOW_REGISTRY:
available = list(WORKFLOW_REGISTRY.keys())
raise KeyError(
f"Workflow '{workflow_name}' not found in registry. "
f"Available workflows: {available}"
)
return WORKFLOW_REGISTRY[workflow_name]
def list_registered_workflows() -> Dict[str, Dict[str, Any]]:
"""
Get all registered workflows.
Returns:
Dictionary of all workflow registry entries
"""
return WORKFLOW_REGISTRY.copy()
def validate_registry() -> bool:
"""
Validate the workflow registry for consistency.
Returns:
True if valid, raises exceptions if not
Raises:
ValueError: If registry is invalid
"""
if not WORKFLOW_REGISTRY:
raise ValueError("Workflow registry is empty")
required_fields = ["flow", "module_path", "function_name", "description"]
for name, entry in WORKFLOW_REGISTRY.items():
# Check required fields
missing_fields = [field for field in required_fields if field not in entry]
if missing_fields:
raise ValueError(
f"Workflow '{name}' missing required fields: {missing_fields}"
)
# Check if flow is callable
if not callable(entry["flow"]):
raise ValueError(f"Workflow '{name}' flow is not callable")
# Check if flow has the required Prefect attributes
if not hasattr(entry["flow"], "deploy"):
raise ValueError(
f"Workflow '{name}' flow is not a Prefect flow (missing deploy method)"
)
logger.info(f"Registry validation passed. {len(WORKFLOW_REGISTRY)} workflows registered.")
return True
# Validate registry on import
try:
validate_registry()
logger.info(f"Workflow registry loaded successfully with {len(WORKFLOW_REGISTRY)} workflows")
except Exception as e:
logger.error(f"Workflow registry validation failed: {e}")
raise