mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 03:12:44 +00:00
This PR addresses multiple issues and improvements across the CLI and backend: **Worker Naming Fixes:** - Fix worker container naming mismatch between CLI and docker-compose - Update worker_manager.py to use docker compose commands with service names - Remove worker_container field from workflows API, keep only worker_service - Backend now correctly uses service names (worker-python, worker-secrets, etc.) **Backend API Fixes:** - Fix workflow name extraction from run_id in runs.py (was showing "unknown") - Update monitor command suggestions from 'monitor stats' to 'monitor live' **Monitor Command Consolidation:** - Merge 'monitor stats' and 'monitor live' into single 'monitor live' command - Add --once and --style flags for flexibility - Remove all references to deprecated 'monitor stats' command **Findings CLI Structure Improvements (Closes #18):** - Move 'show' command from 'findings' (plural) to 'finding' (singular) - Keep 'export' command in 'findings' (plural) as it exports all findings - Remove broken 'analyze' command (imported non-existent function) - Update all command suggestions to use correct paths - Fix smart routing logic in main.py to handle new command structure - Add export suggestions after viewing findings with unique timestamps - Change default export format to SARIF (industry standard) **Docker Compose:** - Remove obsolete version field to fix deprecation warning All commands tested and working: - ff finding show <run-id> --rule <rule-id> ✓ - ff findings export <run-id> ✓ - ff finding <run-id> (direct viewing) ✓ - ff monitor live <run-id> ✓
634 lines
22 KiB
Python
634 lines
22 KiB
Python
"""
|
|
API endpoints for workflow management with enhanced error handling
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import logging
|
|
import traceback
|
|
import tempfile
|
|
from typing import List, Dict, Any, Optional
|
|
from fastapi import APIRouter, HTTPException, Depends, UploadFile, File, Form
|
|
from pathlib import Path
|
|
|
|
from src.models.findings import (
|
|
WorkflowSubmission,
|
|
WorkflowMetadata,
|
|
WorkflowListItem,
|
|
RunSubmissionResponse
|
|
)
|
|
from src.temporal.discovery import WorkflowDiscovery
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Configuration for file uploads
|
|
MAX_UPLOAD_SIZE = 10 * 1024 * 1024 * 1024 # 10 GB
|
|
ALLOWED_CONTENT_TYPES = [
|
|
"application/gzip",
|
|
"application/x-gzip",
|
|
"application/x-tar",
|
|
"application/x-compressed-tar",
|
|
"application/octet-stream", # Generic binary
|
|
]
|
|
|
|
router = APIRouter(prefix="/workflows", tags=["workflows"])
|
|
|
|
|
|
def create_structured_error_response(
|
|
error_type: str,
|
|
message: str,
|
|
workflow_name: Optional[str] = None,
|
|
run_id: Optional[str] = None,
|
|
container_info: Optional[Dict[str, Any]] = None,
|
|
deployment_info: Optional[Dict[str, Any]] = None,
|
|
suggestions: Optional[List[str]] = None
|
|
) -> Dict[str, Any]:
|
|
"""Create a structured error response with rich context."""
|
|
error_response = {
|
|
"error": {
|
|
"type": error_type,
|
|
"message": message,
|
|
"timestamp": __import__("datetime").datetime.utcnow().isoformat() + "Z"
|
|
}
|
|
}
|
|
|
|
if workflow_name:
|
|
error_response["error"]["workflow_name"] = workflow_name
|
|
|
|
if run_id:
|
|
error_response["error"]["run_id"] = run_id
|
|
|
|
if container_info:
|
|
error_response["error"]["container"] = container_info
|
|
|
|
if deployment_info:
|
|
error_response["error"]["deployment"] = deployment_info
|
|
|
|
if suggestions:
|
|
error_response["error"]["suggestions"] = suggestions
|
|
|
|
return error_response
|
|
|
|
|
|
def get_temporal_manager():
|
|
"""Dependency to get the Temporal manager instance"""
|
|
from src.main import temporal_mgr
|
|
return temporal_mgr
|
|
|
|
|
|
@router.get("/", response_model=List[WorkflowListItem])
|
|
async def list_workflows(
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> List[WorkflowListItem]:
|
|
"""
|
|
List all discovered workflows with their metadata.
|
|
|
|
Returns a summary of each workflow including name, version, description,
|
|
author, and tags.
|
|
"""
|
|
workflows = []
|
|
for name, info in temporal_mgr.workflows.items():
|
|
workflows.append(WorkflowListItem(
|
|
name=name,
|
|
version=info.metadata.get("version", "0.6.0"),
|
|
description=info.metadata.get("description", ""),
|
|
author=info.metadata.get("author"),
|
|
tags=info.metadata.get("tags", [])
|
|
))
|
|
|
|
return workflows
|
|
|
|
|
|
@router.get("/metadata/schema")
|
|
async def get_metadata_schema() -> Dict[str, Any]:
|
|
"""
|
|
Get the JSON schema for workflow metadata files.
|
|
|
|
This schema defines the structure and requirements for metadata.yaml files
|
|
that must accompany each workflow.
|
|
"""
|
|
return WorkflowDiscovery.get_metadata_schema()
|
|
|
|
|
|
@router.get("/{workflow_name}/metadata", response_model=WorkflowMetadata)
|
|
async def get_workflow_metadata(
|
|
workflow_name: str,
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> WorkflowMetadata:
|
|
"""
|
|
Get complete metadata for a specific workflow.
|
|
|
|
Args:
|
|
workflow_name: Name of the workflow
|
|
|
|
Returns:
|
|
Complete metadata including parameters schema, supported volume modes,
|
|
required modules, and more.
|
|
|
|
Raises:
|
|
HTTPException: 404 if workflow not found
|
|
"""
|
|
if workflow_name not in temporal_mgr.workflows:
|
|
available_workflows = list(temporal_mgr.workflows.keys())
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowNotFound",
|
|
message=f"Workflow '{workflow_name}' not found",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
f"Available workflows: {', '.join(available_workflows)}",
|
|
"Use GET /workflows/ to see all available workflows",
|
|
"Check workflow name spelling and case sensitivity"
|
|
]
|
|
)
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=error_response
|
|
)
|
|
|
|
info = temporal_mgr.workflows[workflow_name]
|
|
metadata = info.metadata
|
|
|
|
return WorkflowMetadata(
|
|
name=workflow_name,
|
|
version=metadata.get("version", "0.6.0"),
|
|
description=metadata.get("description", ""),
|
|
author=metadata.get("author"),
|
|
tags=metadata.get("tags", []),
|
|
parameters=metadata.get("parameters", {}),
|
|
default_parameters=metadata.get("default_parameters", {}),
|
|
required_modules=metadata.get("required_modules", [])
|
|
)
|
|
|
|
|
|
@router.post("/{workflow_name}/submit", response_model=RunSubmissionResponse)
|
|
async def submit_workflow(
|
|
workflow_name: str,
|
|
submission: WorkflowSubmission,
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> RunSubmissionResponse:
|
|
"""
|
|
Submit a workflow for execution.
|
|
|
|
Args:
|
|
workflow_name: Name of the workflow to execute
|
|
submission: Submission parameters including target path and parameters
|
|
|
|
Returns:
|
|
Run submission response with run_id and initial status
|
|
|
|
Raises:
|
|
HTTPException: 404 if workflow not found, 400 for invalid parameters
|
|
"""
|
|
if workflow_name not in temporal_mgr.workflows:
|
|
available_workflows = list(temporal_mgr.workflows.keys())
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowNotFound",
|
|
message=f"Workflow '{workflow_name}' not found",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
f"Available workflows: {', '.join(available_workflows)}",
|
|
"Use GET /workflows/ to see all available workflows",
|
|
"Check workflow name spelling and case sensitivity"
|
|
]
|
|
)
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=error_response
|
|
)
|
|
|
|
try:
|
|
# Upload target file to MinIO and get target_id
|
|
target_path = Path(submission.target_path)
|
|
if not target_path.exists():
|
|
raise ValueError(f"Target path does not exist: {submission.target_path}")
|
|
|
|
# Upload target (using anonymous user for now)
|
|
target_id = await temporal_mgr.upload_target(
|
|
file_path=target_path,
|
|
user_id="api-user",
|
|
metadata={"workflow": workflow_name}
|
|
)
|
|
|
|
# Merge default parameters with user parameters
|
|
workflow_info = temporal_mgr.workflows[workflow_name]
|
|
metadata = workflow_info.metadata or {}
|
|
defaults = metadata.get("default_parameters", {})
|
|
user_params = submission.parameters or {}
|
|
workflow_params = {**defaults, **user_params}
|
|
|
|
# Start workflow execution
|
|
handle = await temporal_mgr.run_workflow(
|
|
workflow_name=workflow_name,
|
|
target_id=target_id,
|
|
workflow_params=workflow_params
|
|
)
|
|
|
|
run_id = handle.id
|
|
|
|
# Initialize fuzzing tracking if this looks like a fuzzing workflow
|
|
workflow_info = temporal_mgr.workflows.get(workflow_name, {})
|
|
workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else []
|
|
if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower():
|
|
from src.api.fuzzing import initialize_fuzzing_tracking
|
|
initialize_fuzzing_tracking(run_id, workflow_name)
|
|
|
|
return RunSubmissionResponse(
|
|
run_id=run_id,
|
|
status="RUNNING",
|
|
workflow=workflow_name,
|
|
message=f"Workflow '{workflow_name}' submitted successfully"
|
|
)
|
|
|
|
except ValueError as e:
|
|
# Parameter validation errors
|
|
error_response = create_structured_error_response(
|
|
error_type="ValidationError",
|
|
message=str(e),
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
"Check parameter types and values",
|
|
"Use GET /workflows/{workflow_name}/parameters for schema",
|
|
"Ensure all required parameters are provided"
|
|
]
|
|
)
|
|
raise HTTPException(status_code=400, detail=error_response)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to submit workflow '{workflow_name}': {e}")
|
|
logger.error(f"Traceback: {traceback.format_exc()}")
|
|
|
|
# Try to get more context about the error
|
|
container_info = None
|
|
deployment_info = None
|
|
suggestions = []
|
|
|
|
error_message = str(e)
|
|
error_type = "WorkflowSubmissionError"
|
|
|
|
# Detect specific error patterns
|
|
if "workflow" in error_message.lower() and "not found" in error_message.lower():
|
|
error_type = "WorkflowError"
|
|
suggestions.extend([
|
|
"Check if Temporal server is running and accessible",
|
|
"Verify workflow workers are running",
|
|
"Check if workflow is registered with correct vertical",
|
|
"Ensure Docker is running and has sufficient resources"
|
|
])
|
|
|
|
elif "volume" in error_message.lower() or "mount" in error_message.lower():
|
|
error_type = "VolumeError"
|
|
suggestions.extend([
|
|
"Check if the target path exists and is accessible",
|
|
"Verify file permissions (Docker needs read access)",
|
|
"Ensure the path is not in use by another process",
|
|
"Try using an absolute path instead of relative path"
|
|
])
|
|
|
|
elif "memory" in error_message.lower() or "resource" in error_message.lower():
|
|
error_type = "ResourceError"
|
|
suggestions.extend([
|
|
"Check system memory and CPU availability",
|
|
"Consider reducing resource limits or dataset size",
|
|
"Monitor Docker resource usage",
|
|
"Increase Docker memory limits if needed"
|
|
])
|
|
|
|
elif "image" in error_message.lower():
|
|
error_type = "ImageError"
|
|
suggestions.extend([
|
|
"Check if the workflow image exists",
|
|
"Verify Docker registry access",
|
|
"Try rebuilding the workflow image",
|
|
"Check network connectivity to registries"
|
|
])
|
|
|
|
else:
|
|
suggestions.extend([
|
|
"Check FuzzForge backend logs for details",
|
|
"Verify all services are running (docker-compose up -d)",
|
|
"Try restarting the workflow deployment",
|
|
"Contact support if the issue persists"
|
|
])
|
|
|
|
error_response = create_structured_error_response(
|
|
error_type=error_type,
|
|
message=f"Failed to submit workflow: {error_message}",
|
|
workflow_name=workflow_name,
|
|
container_info=container_info,
|
|
deployment_info=deployment_info,
|
|
suggestions=suggestions
|
|
)
|
|
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=error_response
|
|
)
|
|
|
|
|
|
@router.post("/{workflow_name}/upload-and-submit", response_model=RunSubmissionResponse)
|
|
async def upload_and_submit_workflow(
|
|
workflow_name: str,
|
|
file: UploadFile = File(..., description="Target file or tarball to analyze"),
|
|
parameters: Optional[str] = Form(None, description="JSON-encoded workflow parameters"),
|
|
timeout: Optional[int] = Form(None, description="Timeout in seconds"),
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> RunSubmissionResponse:
|
|
"""
|
|
Upload a target file/tarball and submit workflow for execution.
|
|
|
|
This endpoint accepts multipart/form-data uploads and is the recommended
|
|
way to submit workflows from remote CLI clients.
|
|
|
|
Args:
|
|
workflow_name: Name of the workflow to execute
|
|
file: Target file or tarball (compressed directory)
|
|
parameters: JSON string of workflow parameters (optional)
|
|
timeout: Execution timeout in seconds (optional)
|
|
|
|
Returns:
|
|
Run submission response with run_id and initial status
|
|
|
|
Raises:
|
|
HTTPException: 404 if workflow not found, 400 for invalid parameters,
|
|
413 if file too large
|
|
"""
|
|
if workflow_name not in temporal_mgr.workflows:
|
|
available_workflows = list(temporal_mgr.workflows.keys())
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowNotFound",
|
|
message=f"Workflow '{workflow_name}' not found",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
f"Available workflows: {', '.join(available_workflows)}",
|
|
"Use GET /workflows/ to see all available workflows"
|
|
]
|
|
)
|
|
raise HTTPException(status_code=404, detail=error_response)
|
|
|
|
temp_file_path = None
|
|
|
|
try:
|
|
# Validate file size
|
|
file_size = 0
|
|
chunk_size = 1024 * 1024 # 1MB chunks
|
|
|
|
# Create temporary file
|
|
temp_fd, temp_file_path = tempfile.mkstemp(suffix=".tar.gz")
|
|
|
|
logger.info(f"Receiving file upload for workflow '{workflow_name}': {file.filename}")
|
|
|
|
# Stream file to disk
|
|
with open(temp_fd, 'wb') as temp_file:
|
|
while True:
|
|
chunk = await file.read(chunk_size)
|
|
if not chunk:
|
|
break
|
|
|
|
file_size += len(chunk)
|
|
|
|
# Check size limit
|
|
if file_size > MAX_UPLOAD_SIZE:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=create_structured_error_response(
|
|
error_type="FileTooLarge",
|
|
message=f"File size exceeds maximum allowed size of {MAX_UPLOAD_SIZE / (1024**3):.1f} GB",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
"Reduce the size of your target directory",
|
|
"Exclude unnecessary files (build artifacts, dependencies, etc.)",
|
|
"Consider splitting into smaller analysis targets"
|
|
]
|
|
)
|
|
)
|
|
|
|
temp_file.write(chunk)
|
|
|
|
logger.info(f"Received file: {file_size / (1024**2):.2f} MB")
|
|
|
|
# Parse parameters
|
|
workflow_params = {}
|
|
if parameters:
|
|
try:
|
|
import json
|
|
workflow_params = json.loads(parameters)
|
|
if not isinstance(workflow_params, dict):
|
|
raise ValueError("Parameters must be a JSON object")
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=create_structured_error_response(
|
|
error_type="InvalidParameters",
|
|
message=f"Invalid parameters JSON: {e}",
|
|
workflow_name=workflow_name,
|
|
suggestions=["Ensure parameters is valid JSON object"]
|
|
)
|
|
)
|
|
|
|
# Upload to MinIO
|
|
target_id = await temporal_mgr.upload_target(
|
|
file_path=Path(temp_file_path),
|
|
user_id="api-user",
|
|
metadata={
|
|
"workflow": workflow_name,
|
|
"original_filename": file.filename,
|
|
"upload_method": "multipart"
|
|
}
|
|
)
|
|
|
|
logger.info(f"Uploaded to MinIO with target_id: {target_id}")
|
|
|
|
# Merge default parameters with user parameters
|
|
workflow_info = temporal_mgr.workflows.get(workflow_name)
|
|
metadata = workflow_info.metadata or {}
|
|
defaults = metadata.get("default_parameters", {})
|
|
workflow_params = {**defaults, **workflow_params}
|
|
|
|
# Start workflow execution
|
|
handle = await temporal_mgr.run_workflow(
|
|
workflow_name=workflow_name,
|
|
target_id=target_id,
|
|
workflow_params=workflow_params
|
|
)
|
|
|
|
run_id = handle.id
|
|
|
|
# Initialize fuzzing tracking if needed
|
|
workflow_info = temporal_mgr.workflows.get(workflow_name, {})
|
|
workflow_tags = workflow_info.metadata.get("tags", []) if hasattr(workflow_info, 'metadata') else []
|
|
if "fuzzing" in workflow_tags or "fuzz" in workflow_name.lower():
|
|
from src.api.fuzzing import initialize_fuzzing_tracking
|
|
initialize_fuzzing_tracking(run_id, workflow_name)
|
|
|
|
return RunSubmissionResponse(
|
|
run_id=run_id,
|
|
status="RUNNING",
|
|
workflow=workflow_name,
|
|
message=f"Workflow '{workflow_name}' submitted successfully with uploaded target"
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Failed to upload and submit workflow '{workflow_name}': {e}")
|
|
logger.error(f"Traceback: {traceback.format_exc()}")
|
|
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowSubmissionError",
|
|
message=f"Failed to process upload and submit workflow: {str(e)}",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
"Check if the uploaded file is a valid tarball",
|
|
"Verify MinIO storage is accessible",
|
|
"Check backend logs for detailed error information",
|
|
"Ensure Temporal workers are running"
|
|
]
|
|
)
|
|
|
|
raise HTTPException(status_code=500, detail=error_response)
|
|
|
|
finally:
|
|
# Cleanup temporary file
|
|
if temp_file_path and Path(temp_file_path).exists():
|
|
try:
|
|
Path(temp_file_path).unlink()
|
|
logger.debug(f"Cleaned up temp file: {temp_file_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Failed to cleanup temp file {temp_file_path}: {e}")
|
|
|
|
|
|
@router.get("/{workflow_name}/worker-info")
|
|
async def get_workflow_worker_info(
|
|
workflow_name: str,
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get worker information for a workflow.
|
|
|
|
Returns details about which worker is required to execute this workflow,
|
|
including container name, task queue, and vertical.
|
|
|
|
Args:
|
|
workflow_name: Name of the workflow
|
|
|
|
Returns:
|
|
Worker information including vertical, container name, and task queue
|
|
|
|
Raises:
|
|
HTTPException: 404 if workflow not found
|
|
"""
|
|
if workflow_name not in temporal_mgr.workflows:
|
|
available_workflows = list(temporal_mgr.workflows.keys())
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowNotFound",
|
|
message=f"Workflow '{workflow_name}' not found",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
f"Available workflows: {', '.join(available_workflows)}",
|
|
"Use GET /workflows/ to see all available workflows"
|
|
]
|
|
)
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=error_response
|
|
)
|
|
|
|
info = temporal_mgr.workflows[workflow_name]
|
|
metadata = info.metadata
|
|
|
|
# Extract vertical from metadata
|
|
vertical = metadata.get("vertical")
|
|
|
|
if not vertical:
|
|
error_response = create_structured_error_response(
|
|
error_type="MissingVertical",
|
|
message=f"Workflow '{workflow_name}' does not specify a vertical in metadata",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
"Check workflow metadata.yaml for 'vertical' field",
|
|
"Contact workflow author for support"
|
|
]
|
|
)
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=error_response
|
|
)
|
|
|
|
return {
|
|
"workflow": workflow_name,
|
|
"vertical": vertical,
|
|
"worker_service": f"worker-{vertical}",
|
|
"task_queue": f"{vertical}-queue",
|
|
"required": True
|
|
}
|
|
|
|
|
|
@router.get("/{workflow_name}/parameters")
|
|
async def get_workflow_parameters(
|
|
workflow_name: str,
|
|
temporal_mgr=Depends(get_temporal_manager)
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Get the parameters schema for a workflow.
|
|
|
|
Args:
|
|
workflow_name: Name of the workflow
|
|
|
|
Returns:
|
|
Parameters schema with types, descriptions, and defaults
|
|
|
|
Raises:
|
|
HTTPException: 404 if workflow not found
|
|
"""
|
|
if workflow_name not in temporal_mgr.workflows:
|
|
available_workflows = list(temporal_mgr.workflows.keys())
|
|
error_response = create_structured_error_response(
|
|
error_type="WorkflowNotFound",
|
|
message=f"Workflow '{workflow_name}' not found",
|
|
workflow_name=workflow_name,
|
|
suggestions=[
|
|
f"Available workflows: {', '.join(available_workflows)}",
|
|
"Use GET /workflows/ to see all available workflows"
|
|
]
|
|
)
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=error_response
|
|
)
|
|
|
|
info = temporal_mgr.workflows[workflow_name]
|
|
metadata = info.metadata
|
|
|
|
# Return parameters with enhanced schema information
|
|
parameters_schema = metadata.get("parameters", {})
|
|
|
|
# Extract the actual parameter definitions from JSON schema structure
|
|
if "properties" in parameters_schema:
|
|
param_definitions = parameters_schema["properties"]
|
|
else:
|
|
param_definitions = parameters_schema
|
|
|
|
# Add default values to the schema
|
|
default_params = metadata.get("default_parameters", {})
|
|
for param_name, param_schema in param_definitions.items():
|
|
if isinstance(param_schema, dict) and param_name in default_params:
|
|
param_schema["default"] = default_params[param_name]
|
|
|
|
return {
|
|
"workflow": workflow_name,
|
|
"parameters": param_definitions,
|
|
"default_parameters": default_params,
|
|
"required_parameters": [
|
|
name for name, schema in param_definitions.items()
|
|
if isinstance(schema, dict) and schema.get("required", False)
|
|
]
|
|
} |