Files
fuzzforge_ai/test_temporal_workflow.py
Tanguy Duhamel 0680f14df6 feat: Complete migration from Prefect to Temporal
BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal

## Major Changes
- Replace Prefect with Temporal for workflow orchestration
- Implement vertical worker architecture (rust, android)
- Replace Docker registry with MinIO for unified storage
- Refactor activities to be co-located with workflows
- Update all API endpoints for Temporal compatibility

## Infrastructure
- New: docker-compose.temporal.yaml (Temporal + MinIO + workers)
- New: workers/ directory with rust and android vertical workers
- New: backend/src/temporal/ (manager, discovery)
- New: backend/src/storage/ (S3-cached storage with MinIO)
- New: backend/toolbox/common/ (shared storage activities)
- Deleted: docker-compose.yaml (old Prefect setup)
- Deleted: backend/src/core/prefect_manager.py
- Deleted: backend/src/services/prefect_stats_monitor.py
- Deleted: Docker registry and insecure-registries requirement

## Workflows
- Migrated: security_assessment workflow to Temporal
- New: rust_test workflow (example/test workflow)
- Deleted: secret_detection_scan (Prefect-based, to be reimplemented)
- Activities now co-located with workflows for independent testing

## API Changes
- Updated: backend/src/api/workflows.py (Temporal submission)
- Updated: backend/src/api/runs.py (Temporal status/results)
- Updated: backend/src/main.py (727 lines, TemporalManager integration)
- Updated: All 16 MCP tools to use TemporalManager

## Testing
-  All services healthy (Temporal, PostgreSQL, MinIO, workers, backend)
-  All API endpoints functional
-  End-to-end workflow test passed (72 findings from vulnerable_app)
-  MinIO storage integration working (target upload/download, results)
-  Worker activity discovery working (6 activities registered)
-  Tarball extraction working
-  SARIF report generation working

## Documentation
- ARCHITECTURE.md: Complete Temporal architecture documentation
- QUICKSTART_TEMPORAL.md: Getting started guide
- MIGRATION_DECISION.md: Why we chose Temporal over Prefect
- IMPLEMENTATION_STATUS.md: Migration progress tracking
- workers/README.md: Worker development guide

## Dependencies
- Added: temporalio>=1.6.0
- Added: boto3>=1.34.0 (MinIO S3 client)
- Removed: prefect>=3.4.18
2025-10-01 15:11:24 +02:00

106 lines
3.0 KiB
Python

#!/usr/bin/env python3
"""
Test script for Temporal workflow execution.
This script:
1. Creates a test target file
2. Uploads it to MinIO
3. Executes the rust_test workflow
4. Prints the results
"""
import asyncio
import uuid
from pathlib import Path
import boto3
from temporalio.client import Client
async def main():
print("=" * 60)
print("Testing Temporal Workflow Execution")
print("=" * 60)
# Step 1: Create a test target file
print("\n[1/4] Creating test target file...")
test_file = Path("/tmp/test_target.txt")
test_file.write_text("This is a test target file for FuzzForge Temporal architecture.")
print(f"✓ Created test file: {test_file} ({test_file.stat().st_size} bytes)")
# Step 2: Upload to MinIO
print("\n[2/4] Uploading target to MinIO...")
s3_client = boto3.client(
's3',
endpoint_url='http://localhost:9000',
aws_access_key_id='fuzzforge',
aws_secret_access_key='fuzzforge123',
region_name='us-east-1',
use_ssl=False
)
# Generate target ID
target_id = str(uuid.uuid4())
s3_key = f'{target_id}/target'
# Upload file
s3_client.upload_file(
str(test_file),
'targets',
s3_key,
ExtraArgs={
'Metadata': {
'test': 'true',
'uploaded_by': 'test_script'
}
}
)
print(f"✓ Uploaded to MinIO: s3://targets/{s3_key}")
print(f" Target ID: {target_id}")
# Step 3: Execute workflow
print("\n[3/4] Connecting to Temporal...")
client = await Client.connect("localhost:7233")
print("✓ Connected to Temporal")
print("\n[4/4] Starting workflow execution...")
workflow_id = f"test-workflow-{uuid.uuid4().hex[:8]}"
# Start workflow
handle = await client.start_workflow(
"RustTestWorkflow", # Workflow name (class name)
args=[target_id], # Arguments: target_id
id=workflow_id,
task_queue="rust-queue", # Route to rust worker
)
print(f"✓ Workflow started!")
print(f" Workflow ID: {workflow_id}")
print(f" Run ID: {handle.first_execution_run_id}")
print(f"\n View in UI: http://localhost:8080/namespaces/default/workflows/{workflow_id}")
print("\nWaiting for workflow to complete...")
result = await handle.result()
print("\n" + "=" * 60)
print("✓ WORKFLOW COMPLETED SUCCESSFULLY!")
print("=" * 60)
print(f"\nResults:")
print(f" Status: {result.get('status')}")
print(f" Workflow ID: {result.get('workflow_id')}")
print(f" Target ID: {result.get('target_id')}")
print(f" Message: {result.get('message')}")
print(f" Results URL: {result.get('results_url')}")
print(f"\nSteps executed:")
for i, step in enumerate(result.get('steps', []), 1):
print(f" {i}. {step.get('step')}: {step.get('status')}")
print("\n" + "=" * 60)
print("Test completed successfully! 🎉")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())