Files
fuzzforge_ai/test_security_workflow.py
Tanguy Duhamel 0680f14df6 feat: Complete migration from Prefect to Temporal
BREAKING CHANGE: Replaces Prefect workflow orchestration with Temporal

## Major Changes
- Replace Prefect with Temporal for workflow orchestration
- Implement vertical worker architecture (rust, android)
- Replace Docker registry with MinIO for unified storage
- Refactor activities to be co-located with workflows
- Update all API endpoints for Temporal compatibility

## Infrastructure
- New: docker-compose.temporal.yaml (Temporal + MinIO + workers)
- New: workers/ directory with rust and android vertical workers
- New: backend/src/temporal/ (manager, discovery)
- New: backend/src/storage/ (S3-cached storage with MinIO)
- New: backend/toolbox/common/ (shared storage activities)
- Deleted: docker-compose.yaml (old Prefect setup)
- Deleted: backend/src/core/prefect_manager.py
- Deleted: backend/src/services/prefect_stats_monitor.py
- Deleted: Docker registry and insecure-registries requirement

## Workflows
- Migrated: security_assessment workflow to Temporal
- New: rust_test workflow (example/test workflow)
- Deleted: secret_detection_scan (Prefect-based, to be reimplemented)
- Activities now co-located with workflows for independent testing

## API Changes
- Updated: backend/src/api/workflows.py (Temporal submission)
- Updated: backend/src/api/runs.py (Temporal status/results)
- Updated: backend/src/main.py (727 lines, TemporalManager integration)
- Updated: All 16 MCP tools to use TemporalManager

## Testing
-  All services healthy (Temporal, PostgreSQL, MinIO, workers, backend)
-  All API endpoints functional
-  End-to-end workflow test passed (72 findings from vulnerable_app)
-  MinIO storage integration working (target upload/download, results)
-  Worker activity discovery working (6 activities registered)
-  Tarball extraction working
-  SARIF report generation working

## Documentation
- ARCHITECTURE.md: Complete Temporal architecture documentation
- QUICKSTART_TEMPORAL.md: Getting started guide
- MIGRATION_DECISION.md: Why we chose Temporal over Prefect
- IMPLEMENTATION_STATUS.md: Migration progress tracking
- workers/README.md: Worker development guide

## Dependencies
- Added: temporalio>=1.6.0
- Added: boto3>=1.34.0 (MinIO S3 client)
- Removed: prefect>=3.4.18
2025-10-01 15:11:24 +02:00

144 lines
4.1 KiB
Python

#!/usr/bin/env python3
"""
Test security_assessment workflow with vulnerable_app test project
"""
import asyncio
import os
import shutil
import sys
import uuid
from pathlib import Path
import boto3
from temporalio.client import Client
async def main():
# Configuration
temporal_address = "localhost:7233"
s3_endpoint = "http://localhost:9000"
s3_access_key = "fuzzforge"
s3_secret_key = "fuzzforge123"
# Initialize S3 client
s3_client = boto3.client(
's3',
endpoint_url=s3_endpoint,
aws_access_key_id=s3_access_key,
aws_secret_access_key=s3_secret_key,
region_name='us-east-1',
use_ssl=False
)
print("=" * 70)
print("Testing security_assessment workflow with vulnerable_app")
print("=" * 70)
# Step 1: Create tarball of vulnerable_app
print("\n[1/5] Creating tarball of test_projects/vulnerable_app...")
vulnerable_app_dir = Path("test_projects/vulnerable_app")
if not vulnerable_app_dir.exists():
print(f"❌ Error: {vulnerable_app_dir} not found")
return 1
target_id = str(uuid.uuid4())
tarball_path = f"/tmp/{target_id}.tar.gz"
# Create tarball
shutil.make_archive(
tarball_path.replace('.tar.gz', ''),
'gztar',
root_dir=vulnerable_app_dir.parent,
base_dir=vulnerable_app_dir.name
)
tarball_size = Path(tarball_path).stat().st_size
print(f"✓ Created tarball: {tarball_path} ({tarball_size / 1024:.2f} KB)")
# Step 2: Upload to MinIO
print(f"\n[2/5] Uploading target to MinIO (target_id={target_id})...")
try:
s3_key = f'{target_id}/target'
s3_client.upload_file(
Filename=tarball_path,
Bucket='targets',
Key=s3_key
)
print(f"✓ Uploaded to s3://targets/{s3_key}")
except Exception as e:
print(f"❌ Failed to upload: {e}")
return 1
finally:
# Cleanup local tarball
Path(tarball_path).unlink(missing_ok=True)
# Step 3: Connect to Temporal
print(f"\n[3/5] Connecting to Temporal at {temporal_address}...")
try:
client = await Client.connect(temporal_address)
print("✓ Connected to Temporal")
except Exception as e:
print(f"❌ Failed to connect to Temporal: {e}")
return 1
# Step 4: Execute workflow
print(f"\n[4/5] Executing security_assessment workflow...")
workflow_id = f"security-assessment-{target_id}"
try:
result = await client.execute_workflow(
"SecurityAssessmentWorkflow",
args=[target_id],
id=workflow_id,
task_queue="rust-queue"
)
print(f"✓ Workflow completed successfully: {workflow_id}")
except Exception as e:
print(f"❌ Workflow execution failed: {e}")
return 1
# Step 5: Display results
print(f"\n[5/5] Results Summary:")
print("=" * 70)
if result.get("status") == "success":
summary = result.get("summary", {})
print(f"Total findings: {summary.get('total_findings', 0)}")
print(f"Files scanned: {summary.get('files_scanned', 0)}")
# Display SARIF results URL if available
if result.get("results_url"):
print(f"Results URL: {result['results_url']}")
# Show workflow steps
print("\nWorkflow steps:")
for step in result.get("steps", []):
status_icon = "" if step["status"] == "success" else ""
print(f" {status_icon} {step['step']}")
print("\n" + "=" * 70)
print("✅ Security assessment workflow test PASSED")
print("=" * 70)
return 0
else:
print(f"❌ Workflow failed: {result.get('error', 'Unknown error')}")
return 1
if __name__ == "__main__":
try:
exit_code = asyncio.run(main())
sys.exit(exit_code)
except KeyboardInterrupt:
print("\n\nTest interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n❌ Fatal error: {e}")
import traceback
traceback.print_exc()
sys.exit(1)