Compare commits

...

7 Commits

Author SHA1 Message Date
tduhamel42
3e949b2ae8 ci: add worker validation and Docker build checks
Add automated validation to prevent worker-related issues:

**Worker Validation Script:**
- New script: .github/scripts/validate-workers.sh
- Validates all workers in docker-compose.yml exist
- Checks required files: Dockerfile, requirements.txt, worker.py
- Verifies files are tracked by git (not gitignored)
- Detects gitignore issues that could hide workers

**CI Workflow Updates:**
- Added validate-workers job (runs on every PR)
- Added build-workers job (runs if workers/ modified)
- Uses Docker Buildx for caching
- Validates Docker images build successfully
- Updated test-summary to check validation results

**PR Template:**
- New pull request template with comprehensive checklist
- Specific section for worker-related changes
- Reminds contributors to validate worker files
- Includes documentation and changelog reminders

These checks would have caught the secrets worker gitignore issue.

Implements Phase 1 improvements from CI/CD quality assessment.
2025-10-22 11:45:04 +02:00
tduhamel42
731927667d fix/ Change default llm_secret_detection to gpt-5-mini 2025-10-22 10:17:41 +02:00
tduhamel42
75df59ddef fix: add missing secrets worker to repository
The secrets worker was being ignored due to broad gitignore pattern.
Added exception to allow workers/secrets/ directory while still ignoring actual secrets.

Files added:
- workers/secrets/Dockerfile
- workers/secrets/requirements.txt
- workers/secrets/worker.py
2025-10-22 08:39:20 +02:00
tduhamel42
4e14b4207d Merge pull request #20 from FuzzingLabs/dev
Release: v0.7.1 - Worker fixes, monitor consolidation, and findings improvements
2025-10-21 16:59:44 +02:00
tduhamel42
4cf4a1e5e8 Merge pull request #19 from FuzzingLabs/fix/worker-naming-and-compose-version
fix: worker naming, monitor commands, and findings CLI improvements
2025-10-21 16:54:51 +02:00
tduhamel42
076ec71482 fix: worker naming, monitor commands, and findings CLI improvements
This PR addresses multiple issues and improvements across the CLI and backend:

**Worker Naming Fixes:**
- Fix worker container naming mismatch between CLI and docker-compose
- Update worker_manager.py to use docker compose commands with service names
- Remove worker_container field from workflows API, keep only worker_service
- Backend now correctly uses service names (worker-python, worker-secrets, etc.)

**Backend API Fixes:**
- Fix workflow name extraction from run_id in runs.py (was showing "unknown")
- Update monitor command suggestions from 'monitor stats' to 'monitor live'

**Monitor Command Consolidation:**
- Merge 'monitor stats' and 'monitor live' into single 'monitor live' command
- Add --once and --style flags for flexibility
- Remove all references to deprecated 'monitor stats' command

**Findings CLI Structure Improvements (Closes #18):**
- Move 'show' command from 'findings' (plural) to 'finding' (singular)
- Keep 'export' command in 'findings' (plural) as it exports all findings
- Remove broken 'analyze' command (imported non-existent function)
- Update all command suggestions to use correct paths
- Fix smart routing logic in main.py to handle new command structure
- Add export suggestions after viewing findings with unique timestamps
- Change default export format to SARIF (industry standard)

**Docker Compose:**
- Remove obsolete version field to fix deprecation warning

All commands tested and working:
- ff finding show <run-id> --rule <rule-id> ✓
- ff findings export <run-id> ✓
- ff finding <run-id> (direct viewing) ✓
- ff monitor live <run-id> ✓
2025-10-21 16:53:08 +02:00
tduhamel42
fe58b39abf fix: Add benchmark results files to git
- Added exception in .gitignore for benchmark results directory
- Force-added comparison_report.md and comparison_results.json
- These files contain benchmark metrics, not actual secrets
- Fixes broken link in README to benchmark results
2025-10-17 09:56:09 +02:00
17 changed files with 886 additions and 210 deletions

79
.github/pull_request_template.md vendored Normal file
View File

@@ -0,0 +1,79 @@
## Description
<!-- Provide a brief description of the changes in this PR -->
## Type of Change
<!-- Mark the appropriate option with an 'x' -->
- [ ] 🐛 Bug fix (non-breaking change which fixes an issue)
- [ ] ✨ New feature (non-breaking change which adds functionality)
- [ ] 💥 Breaking change (fix or feature that would cause existing functionality to not work as expected)
- [ ] 📝 Documentation update
- [ ] 🔧 Configuration change
- [ ] ♻️ Refactoring (no functional changes)
- [ ] 🎨 Style/formatting changes
- [ ] ✅ Test additions or updates
## Related Issues
<!-- Link to related issues using #issue_number -->
<!-- Example: Closes #123, Relates to #456 -->
## Changes Made
<!-- List the specific changes made in this PR -->
-
-
-
## Testing
<!-- Describe the tests you ran to verify your changes -->
### Tested Locally
- [ ] All tests pass (`pytest`, `uv build`, etc.)
- [ ] Linting passes (`ruff check`)
- [ ] Code builds successfully
### Worker Changes (if applicable)
- [ ] Docker images build successfully (`docker compose build`)
- [ ] Worker containers start correctly
- [ ] Tested with actual workflow execution
### Documentation
- [ ] Documentation updated (if needed)
- [ ] README updated (if needed)
- [ ] CHANGELOG.md updated (if user-facing changes)
## Pre-Merge Checklist
<!-- Ensure all items are completed before requesting review -->
- [ ] My code follows the project's coding standards
- [ ] I have performed a self-review of my code
- [ ] I have commented my code, particularly in hard-to-understand areas
- [ ] I have made corresponding changes to the documentation
- [ ] My changes generate no new warnings
- [ ] I have added tests that prove my fix is effective or that my feature works
- [ ] New and existing unit tests pass locally with my changes
- [ ] Any dependent changes have been merged and published
### Worker-Specific Checks (if workers/ modified)
- [ ] All worker files properly tracked by git (not gitignored)
- [ ] Worker validation script passes (`.github/scripts/validate-workers.sh`)
- [ ] Docker images build without errors
- [ ] Worker configuration updated in `docker-compose.yml` (if needed)
## Screenshots (if applicable)
<!-- Add screenshots to help explain your changes -->
## Additional Notes
<!-- Any additional information that reviewers should know -->

95
.github/scripts/validate-workers.sh vendored Executable file
View File

@@ -0,0 +1,95 @@
#!/bin/bash
# Worker Validation Script
# Ensures all workers defined in docker-compose.yml exist in the repository
# and are properly tracked by git.
set -e
echo "🔍 Validating worker completeness..."
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
ERRORS=0
WARNINGS=0
# Extract worker service names from docker-compose.yml
echo ""
echo "📋 Checking workers defined in docker-compose.yml..."
WORKERS=$(grep -E "^\s+worker-" docker-compose.yml | grep -v "#" | cut -d: -f1 | tr -d ' ' | sort -u)
if [ -z "$WORKERS" ]; then
echo -e "${RED}❌ No workers found in docker-compose.yml${NC}"
exit 1
fi
echo "Found workers:"
for worker in $WORKERS; do
echo " - $worker"
done
# Check each worker
echo ""
echo "🔎 Validating worker files..."
for worker in $WORKERS; do
WORKER_DIR="workers/${worker#worker-}"
echo ""
echo "Checking $worker ($WORKER_DIR)..."
# Check if directory exists
if [ ! -d "$WORKER_DIR" ]; then
echo -e "${RED} ❌ Directory not found: $WORKER_DIR${NC}"
ERRORS=$((ERRORS + 1))
continue
fi
# Check required files
REQUIRED_FILES=("Dockerfile" "requirements.txt" "worker.py")
for file in "${REQUIRED_FILES[@]}"; do
FILE_PATH="$WORKER_DIR/$file"
if [ ! -f "$FILE_PATH" ]; then
echo -e "${RED} ❌ Missing file: $FILE_PATH${NC}"
ERRORS=$((ERRORS + 1))
else
# Check if file is tracked by git
if ! git ls-files --error-unmatch "$FILE_PATH" &> /dev/null; then
echo -e "${RED} ❌ File not tracked by git: $FILE_PATH${NC}"
echo -e "${YELLOW} Check .gitignore patterns!${NC}"
ERRORS=$((ERRORS + 1))
else
echo -e "${GREEN}$file (tracked)${NC}"
fi
fi
done
done
# Check for any ignored worker files
echo ""
echo "🚫 Checking for gitignored worker files..."
IGNORED_FILES=$(git check-ignore workers/*/* 2>/dev/null || true)
if [ -n "$IGNORED_FILES" ]; then
echo -e "${YELLOW}⚠️ Warning: Some worker files are being ignored:${NC}"
echo "$IGNORED_FILES" | while read -r file; do
echo -e "${YELLOW} - $file${NC}"
done
WARNINGS=$((WARNINGS + 1))
fi
# Summary
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
if [ $ERRORS -eq 0 ] && [ $WARNINGS -eq 0 ]; then
echo -e "${GREEN}✅ All workers validated successfully!${NC}"
exit 0
elif [ $ERRORS -eq 0 ]; then
echo -e "${YELLOW}⚠️ Validation passed with $WARNINGS warning(s)${NC}"
exit 0
else
echo -e "${RED}❌ Validation failed with $ERRORS error(s) and $WARNINGS warning(s)${NC}"
exit 1
fi

View File

@@ -7,6 +7,36 @@ on:
branches: [ main, master, develop ]
jobs:
validate-workers:
name: Validate Workers
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Run worker validation
run: |
chmod +x .github/scripts/validate-workers.sh
.github/scripts/validate-workers.sh
build-workers:
name: Build Worker Docker Images
runs-on: ubuntu-latest
# Only run if workers directory is modified
if: |
github.event_name == 'pull_request' &&
contains(github.event.pull_request.changed_files, 'workers/')
steps:
- uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build worker images
run: |
echo "Building worker Docker images..."
docker compose build worker-python worker-secrets worker-rust worker-android worker-ossfuzz --no-cache
continue-on-error: false
lint:
name: Lint
runs-on: ubuntu-latest
@@ -143,11 +173,15 @@ jobs:
test-summary:
name: Test Summary
runs-on: ubuntu-latest
needs: [lint, unit-tests]
needs: [validate-workers, lint, unit-tests]
if: always()
steps:
- name: Check test results
run: |
if [ "${{ needs.validate-workers.result }}" != "success" ]; then
echo "Worker validation failed"
exit 1
fi
if [ "${{ needs.unit-tests.result }}" != "success" ]; then
echo "Unit tests failed"
exit 1

4
.gitignore vendored
View File

@@ -240,6 +240,10 @@ yarn-error.log*
!**/secret_detection_benchmark_GROUND_TRUTH.json
!**/secret_detection/results/
# Exception: Allow workers/secrets/ directory (secrets detection worker)
!workers/secrets/
!workers/secrets/**
secret*
secrets/
credentials*

View File

@@ -55,9 +55,12 @@ async def get_run_status(
is_failed = workflow_status == "FAILED"
is_running = workflow_status == "RUNNING"
# Extract workflow name from run_id (format: workflow_name-unique_id)
workflow_name = run_id.rsplit('-', 1)[0] if '-' in run_id else "unknown"
return WorkflowStatus(
run_id=run_id,
workflow="unknown", # Temporal doesn't track workflow name in status
workflow=workflow_name,
status=workflow_status,
is_completed=is_completed,
is_failed=is_failed,
@@ -123,6 +126,9 @@ async def get_run_findings(
else:
sarif = {}
# Extract workflow name from run_id (format: workflow_name-unique_id)
workflow_name = run_id.rsplit('-', 1)[0] if '-' in run_id else "unknown"
# Metadata
metadata = {
"completion_time": status.get("close_time"),
@@ -130,7 +136,7 @@ async def get_run_findings(
}
return WorkflowFindings(
workflow="unknown",
workflow=workflow_name,
run_id=run_id,
sarif=sarif,
metadata=metadata

View File

@@ -566,7 +566,6 @@ async def get_workflow_worker_info(
return {
"workflow": workflow_name,
"vertical": vertical,
"worker_container": f"fuzzforge-worker-{vertical}",
"worker_service": f"worker-{vertical}",
"task_queue": f"{vertical}-queue",
"required": True

View File

@@ -17,22 +17,22 @@ parameters:
agent_url:
type: string
default: "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
llm_model:
type: string
default: "gpt-4o-mini"
default: "gpt-5-mini"
llm_provider:
type: string
default: "openai"
max_files:
type: integer
default: 20
default_parameters:
agent_url: "http://fuzzforge-task-agent:8000/a2a/litellm_agent"
llm_model: "gpt-4o-mini"
llm_model: "gpt-5-mini"
llm_provider: "openai"
max_files: 20

View File

@@ -140,11 +140,145 @@ def get_findings(
else: # table format
display_findings_table(findings.sarif)
# Suggest export command and show command
console.print(f"\n💡 View full details of a finding: [bold cyan]ff finding show {run_id} --rule <rule-id>[/bold cyan]")
console.print(f"💡 Export these findings: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]")
console.print(" Supported formats: [cyan]sarif[/cyan] (standard), [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]")
except Exception as e:
console.print(f"❌ Failed to get findings: {e}", style="red")
raise typer.Exit(1)
def show_finding(
run_id: str = typer.Argument(..., help="Run ID to get finding from"),
rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show")
):
"""
🔍 Show detailed information about a specific finding
This function is registered as a command in main.py under the finding (singular) command group.
"""
try:
require_project()
validate_run_id(run_id)
# Try to get from database first, fallback to API
db = get_project_db()
findings_data = None
if db:
findings_data = db.get_findings(run_id)
if not findings_data:
with get_client() as client:
console.print(f"🔍 Fetching findings for run: {run_id}")
findings = client.get_run_findings(run_id)
sarif_data = findings.sarif
else:
sarif_data = findings_data.sarif_data
# Find the specific finding by rule_id
runs = sarif_data.get("runs", [])
if not runs:
console.print("❌ No findings data available", style="red")
raise typer.Exit(1)
run_data = runs[0]
results = run_data.get("results", [])
tool = run_data.get("tool", {}).get("driver", {})
# Search for matching finding
matching_finding = None
for result in results:
if result.get("ruleId") == rule_id:
matching_finding = result
break
if not matching_finding:
console.print(f"❌ No finding found with rule ID: {rule_id}", style="red")
console.print(f"💡 Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim")
raise typer.Exit(1)
# Display detailed finding
display_finding_detail(matching_finding, tool, run_id)
except Exception as e:
console.print(f"❌ Failed to get finding: {e}", style="red")
raise typer.Exit(1)
def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id: str):
"""Display detailed information about a single finding"""
rule_id = finding.get("ruleId", "unknown")
level = finding.get("level", "note")
message = finding.get("message", {})
message_text = message.get("text", "No summary available")
message_markdown = message.get("markdown", message_text)
# Get location
locations = finding.get("locations", [])
location_str = "Unknown location"
code_snippet = None
if locations:
physical_location = locations[0].get("physicalLocation", {})
artifact_location = physical_location.get("artifactLocation", {})
region = physical_location.get("region", {})
file_path = artifact_location.get("uri", "")
if file_path:
location_str = file_path
if region.get("startLine"):
location_str += f":{region['startLine']}"
if region.get("startColumn"):
location_str += f":{region['startColumn']}"
# Get code snippet if available
if region.get("snippet", {}).get("text"):
code_snippet = region["snippet"]["text"].strip()
# Get severity style
severity_color = {
"error": "red",
"warning": "yellow",
"note": "blue",
"info": "cyan"
}.get(level.lower(), "white")
# Build detailed content
content_lines = []
content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}")
content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{level.upper()}[/{severity_color}]")
content_lines.append(f"[bold]Location:[/bold] {location_str}")
content_lines.append(f"[bold]Tool:[/bold] {tool.get('name', 'Unknown')} v{tool.get('version', 'unknown')}")
content_lines.append(f"[bold]Run ID:[/bold] {run_id}")
content_lines.append("")
content_lines.append(f"[bold]Summary:[/bold]")
content_lines.append(message_text)
content_lines.append("")
content_lines.append(f"[bold]Description:[/bold]")
content_lines.append(message_markdown)
if code_snippet:
content_lines.append("")
content_lines.append(f"[bold]Code Snippet:[/bold]")
content_lines.append(f"[dim]{code_snippet}[/dim]")
content = "\n".join(content_lines)
# Display in panel
console.print()
console.print(Panel(
content,
title=f"🔍 Finding Detail",
border_style=severity_color,
box=box.ROUNDED,
padding=(1, 2)
))
console.print()
console.print(f"💡 Export this run: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]")
def display_findings_table(sarif_data: Dict[str, Any]):
"""Display SARIF findings in a rich table format"""
runs = sarif_data.get("runs", [])
@@ -195,8 +329,8 @@ def display_findings_table(sarif_data: Dict[str, Any]):
# Detailed results - Rich Text-based table with proper emoji alignment
results_table = Table(box=box.ROUNDED)
results_table.add_column("Severity", width=12, justify="left", no_wrap=True)
results_table.add_column("Rule", width=25, justify="left", style="bold cyan", no_wrap=True)
results_table.add_column("Message", width=55, justify="left", no_wrap=True)
results_table.add_column("Rule", justify="left", style="bold cyan", no_wrap=True)
results_table.add_column("Message", width=45, justify="left", no_wrap=True)
results_table.add_column("Location", width=20, justify="left", style="dim", no_wrap=True)
for result in results[:50]: # Limit to first 50 results
@@ -224,18 +358,16 @@ def display_findings_table(sarif_data: Dict[str, Any]):
severity_text = Text(level.upper(), style=severity_style(level))
severity_text.truncate(12, overflow="ellipsis")
rule_text = Text(rule_id)
rule_text.truncate(25, overflow="ellipsis")
# Show full rule ID without truncation
message_text = Text(message)
message_text.truncate(55, overflow="ellipsis")
message_text.truncate(45, overflow="ellipsis")
location_text = Text(location_str)
location_text.truncate(20, overflow="ellipsis")
results_table.add_row(
severity_text,
rule_text,
rule_id, # Pass string directly to show full UUID
message_text,
location_text
)
@@ -307,16 +439,20 @@ def findings_history(
def export_findings(
run_id: str = typer.Argument(..., help="Run ID to export findings for"),
format: str = typer.Option(
"json", "--format", "-f",
help="Export format: json, csv, html, sarif"
"sarif", "--format", "-f",
help="Export format: sarif (standard), json, csv, html"
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file path (defaults to findings-<run-id>.<format>)"
help="Output file path (defaults to findings-<run-id>-<timestamp>.<format>)"
)
):
"""
📤 Export security findings in various formats
SARIF is the standard format for security findings and is recommended
for interoperability with other security tools. Filenames are automatically
made unique with timestamps to prevent overwriting previous exports.
"""
db = get_project_db()
if not db:
@@ -334,9 +470,10 @@ def export_findings(
else:
sarif_data = findings_data.sarif_data
# Generate output filename
# Generate output filename with timestamp for uniqueness
if not output:
output = f"findings-{run_id[:8]}.{format}"
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
output = f"findings-{run_id[:8]}-{timestamp}.{format}"
output_path = Path(output)

View File

@@ -59,66 +59,6 @@ def format_number(num: int) -> str:
return str(num)
@app.command("stats")
def fuzzing_stats(
run_id: str = typer.Argument(..., help="Run ID to get statistics for"),
refresh: int = typer.Option(
5, "--refresh", "-r",
help="Refresh interval in seconds"
),
once: bool = typer.Option(
False, "--once",
help="Show stats once and exit"
)
):
"""
📊 Show current fuzzing statistics for a run
"""
try:
with get_client() as client:
if once:
# Show stats once
stats = client.get_fuzzing_stats(run_id)
display_stats_table(stats)
else:
# Live updating stats
console.print(f"📊 [bold]Live Fuzzing Statistics[/bold] (Run: {run_id[:12]}...)")
console.print(f"Refreshing every {refresh}s. Press Ctrl+C to stop.\n")
with Live(auto_refresh=False, console=console) as live:
while True:
try:
# Check workflow status
run_status = client.get_run_status(run_id)
stats = client.get_fuzzing_stats(run_id)
table = create_stats_table(stats)
live.update(table, refresh=True)
# Exit if workflow completed or failed
if getattr(run_status, 'is_completed', False) or getattr(run_status, 'is_failed', False):
final_status = getattr(run_status, 'status', 'Unknown')
if getattr(run_status, 'is_completed', False):
console.print("\n✅ [bold green]Workflow completed[/bold green]", style="green")
else:
console.print(f"\n⚠️ [bold yellow]Workflow ended[/bold yellow] | Status: {final_status}", style="yellow")
break
time.sleep(refresh)
except KeyboardInterrupt:
console.print("\n📊 Monitoring stopped", style="yellow")
break
except Exception as e:
console.print(f"❌ Failed to get fuzzing stats: {e}", style="red")
raise typer.Exit(1)
def display_stats_table(stats):
"""Display stats in a simple table"""
table = create_stats_table(stats)
console.print(table)
def create_stats_table(stats) -> Panel:
"""Create a rich table for fuzzing statistics"""
# Create main stats table
@@ -266,8 +206,8 @@ def crash_reports(
raise typer.Exit(1)
def _live_monitor(run_id: str, refresh: int):
"""Helper for live monitoring with inline real-time display"""
def _live_monitor(run_id: str, refresh: int, once: bool = False, style: str = "inline"):
"""Helper for live monitoring with inline real-time display or table display"""
with get_client() as client:
start_time = time.time()
@@ -319,16 +259,29 @@ def _live_monitor(run_id: str, refresh: int):
self.elapsed_time = 0
self.last_crash_time = None
with Live(auto_refresh=False, console=console) as live:
# Initial fetch
try:
run_status = client.get_run_status(run_id)
stats = client.get_fuzzing_stats(run_id)
except Exception:
stats = FallbackStats(run_id)
run_status = type("RS", (), {"status":"Unknown","is_completed":False,"is_failed":False})()
# Initial fetch
try:
run_status = client.get_run_status(run_id)
stats = client.get_fuzzing_stats(run_id)
except Exception:
stats = FallbackStats(run_id)
run_status = type("RS", (), {"status":"Unknown","is_completed":False,"is_failed":False})()
live.update(render_inline_stats(run_status, stats), refresh=True)
# Handle --once mode: show stats once and exit
if once:
if style == "table":
console.print(create_stats_table(stats))
else:
console.print(render_inline_stats(run_status, stats))
return
# Live monitoring mode
with Live(auto_refresh=False, console=console) as live:
# Render based on style
if style == "table":
live.update(create_stats_table(stats), refresh=True)
else:
live.update(render_inline_stats(run_status, stats), refresh=True)
# Polling loop
consecutive_errors = 0
@@ -354,8 +307,11 @@ def _live_monitor(run_id: str, refresh: int):
except Exception:
stats = FallbackStats(run_id)
# Update display
live.update(render_inline_stats(run_status, stats), refresh=True)
# Update display based on style
if style == "table":
live.update(create_stats_table(stats), refresh=True)
else:
live.update(render_inline_stats(run_status, stats), refresh=True)
# Check if completed
if getattr(run_status, 'is_completed', False) or getattr(run_status, 'is_failed', False):
@@ -386,17 +342,36 @@ def live_monitor(
refresh: int = typer.Option(
2, "--refresh", "-r",
help="Refresh interval in seconds"
),
once: bool = typer.Option(
False, "--once",
help="Show stats once and exit"
),
style: str = typer.Option(
"inline", "--style",
help="Display style: 'inline' (default) or 'table'"
)
):
"""
📺 Real-time inline monitoring with live statistics updates
📺 Real-time monitoring with live statistics updates
Display styles:
- inline: Visual inline display with emojis (default)
- table: Clean table-based display
Use --once to show stats once without continuous monitoring (useful for scripts)
"""
try:
_live_monitor(run_id, refresh)
# Validate style
if style not in ["inline", "table"]:
console.print(f"❌ Invalid style: {style}. Must be 'inline' or 'table'", style="red")
raise typer.Exit(1)
_live_monitor(run_id, refresh, once, style)
except KeyboardInterrupt:
console.print("\n\n📊 Monitoring stopped by user.", style="yellow")
except Exception as e:
console.print(f"\n❌ Failed to start live monitoring: {e}", style="red")
console.print(f"\n❌ Failed to start monitoring: {e}", style="red")
raise typer.Exit(1)
@@ -423,6 +398,5 @@ def monitor_callback(ctx: typer.Context):
console = Console()
console.print("📊 [bold cyan]Monitor Command[/bold cyan]")
console.print("\nAvailable subcommands:")
console.print(" • [cyan]ff monitor stats <run-id>[/cyan] - Show execution statistics")
console.print(" • [cyan]ff monitor live <run-id>[/cyan] - Monitor with live updates (supports --once, --style)")
console.print(" • [cyan]ff monitor crashes <run-id>[/cyan] - Show crash reports")
console.print(" • [cyan]ff monitor live <run-id>[/cyan] - Real-time inline monitoring")

View File

@@ -365,7 +365,7 @@ def execute_workflow(
should_auto_start = auto_start if auto_start is not None else config.workers.auto_start_workers
should_auto_stop = auto_stop if auto_stop is not None else config.workers.auto_stop_workers
worker_container = None # Track for cleanup
worker_service = None # Track for cleanup
worker_mgr = None
wait_completed = False # Track if wait completed successfully
@@ -384,7 +384,6 @@ def execute_workflow(
)
# Ensure worker is running
worker_container = worker_info["worker_container"]
worker_service = worker_info.get("worker_service", f"worker-{worker_info['vertical']}")
if not worker_mgr.ensure_worker_running(worker_info, auto_start=should_auto_start):
console.print(
@@ -434,7 +433,7 @@ def execute_workflow(
# Don't fail the whole operation if database save fails
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor stats {response.run_id}[/bold cyan]")
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor live {response.run_id}[/bold cyan]")
console.print(f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]")
# Suggest --live for fuzzing workflows
@@ -461,7 +460,7 @@ def execute_workflow(
console.print("\n⏹️ Live monitoring stopped (execution continues in background)", style="yellow")
except Exception as e:
console.print(f"⚠️ Failed to start live monitoring: {e}", style="yellow")
console.print(f"💡 You can still monitor manually: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
console.print(f"💡 You can still monitor manually: [bold cyan]fuzzforge monitor live {response.run_id}[/bold cyan]")
# Wait for completion if requested
elif wait:
@@ -527,11 +526,11 @@ def execute_workflow(
handle_error(e, "executing workflow")
finally:
# Stop worker if auto-stop is enabled and wait completed
if should_auto_stop and worker_container and worker_mgr and wait_completed:
if should_auto_stop and worker_service and worker_mgr and wait_completed:
try:
console.print("\n🛑 Stopping worker (auto-stop enabled)...")
if worker_mgr.stop_worker(worker_container):
console.print(f"✅ Worker stopped: {worker_container}")
if worker_mgr.stop_worker(worker_service):
console.print(f"✅ Worker stopped: {worker_service}")
except Exception as e:
console.print(
f"⚠️ Failed to stop worker: {e}",
@@ -608,7 +607,7 @@ def workflow_status(
# Show next steps
if status.is_running:
console.print(f"\n💡 Monitor live: [bold cyan]fuzzforge monitor {execution_id}[/bold cyan]")
console.print(f"\n💡 Monitor live: [bold cyan]fuzzforge monitor live {execution_id}[/bold cyan]")
elif status.is_completed:
console.print(f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]")
elif status.is_failed:
@@ -770,7 +769,7 @@ def retry_workflow(
except Exception as e:
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor stats {response.run_id}[/bold cyan]")
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor live {response.run_id}[/bold cyan]")
except Exception as e:
handle_error(e, "retrying workflow")

View File

@@ -260,57 +260,17 @@ def workflow_main():
# === Finding commands (singular) ===
@finding_app.command("export")
def export_finding(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"),
format: str = typer.Option(
"sarif", "--format", "-f",
help="Export format: sarif, json, csv"
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file (defaults to stdout)"
)
@finding_app.command("show")
def show_finding_detail(
run_id: str = typer.Argument(..., help="Run ID to get finding from"),
rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show")
):
"""
📤 Export findings to file
🔍 Show detailed information about a specific finding
"""
from .commands.findings import export_findings
from .database import get_project_db
from .exceptions import require_project
from .commands.findings import show_finding
show_finding(run_id=run_id, rule_id=rule_id)
try:
require_project()
# If no ID provided, get the latest
if not execution_id:
db = get_project_db()
if db:
recent_runs = db.list_runs(limit=1)
if recent_runs:
execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}")
else:
console.print("⚠️ No findings found in project database", style="yellow")
return
else:
console.print("❌ No project database found", style="red")
return
export_findings(run_id=execution_id, format=format, output=output)
except Exception as e:
console.print(f"❌ Failed to export findings: {e}", style="red")
@finding_app.command("analyze")
def analyze_finding(
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze")
):
"""
🤖 AI analysis of a finding
"""
from .commands.ai import analyze_finding as ai_analyze
ai_analyze(finding_id)
@finding_app.callback(invoke_without_command=True)
def finding_main(
@@ -320,9 +280,9 @@ def finding_main(
View and analyze individual findings
Examples:
fuzzforge finding # Show latest finding
fuzzforge finding <id> # Show specific finding
fuzzforge finding export # Export latest findings
fuzzforge finding # Show latest finding
fuzzforge finding <id> # Show specific finding
fuzzforge finding show <run-id> --rule <id> # Show specific finding detail
"""
# Check if a subcommand is being invoked
if ctx.invoked_subcommand is not None:
@@ -418,7 +378,7 @@ def main():
# Handle finding command with pattern recognition
if len(args) >= 2 and args[0] == 'finding':
finding_subcommands = ['export', 'analyze']
finding_subcommands = ['show']
# Skip custom dispatching if help flags are present
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args):
if args[1] not in finding_subcommands:

View File

@@ -95,17 +95,30 @@ class WorkerManager:
check=True
)
def is_worker_running(self, container_name: str) -> bool:
def _service_to_container_name(self, service_name: str) -> str:
"""
Check if a worker container is running.
Convert service name to container name based on docker-compose naming convention.
Args:
container_name: Name of the Docker container (e.g., "fuzzforge-worker-ossfuzz")
service_name: Docker Compose service name (e.g., "worker-python")
Returns:
Container name (e.g., "fuzzforge-worker-python")
"""
return f"fuzzforge-{service_name}"
def is_worker_running(self, service_name: str) -> bool:
"""
Check if a worker service is running.
Args:
service_name: Name of the Docker Compose service (e.g., "worker-ossfuzz")
Returns:
True if container is running, False otherwise
"""
try:
container_name = self._service_to_container_name(service_name)
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Running}}", container_name],
capture_output=True,
@@ -120,46 +133,42 @@ class WorkerManager:
logger.debug(f"Failed to check worker status: {e}")
return False
def start_worker(self, container_name: str) -> bool:
def start_worker(self, service_name: str) -> bool:
"""
Start a worker container using docker.
Start a worker service using docker-compose.
Args:
container_name: Name of the Docker container to start
service_name: Name of the Docker Compose service to start (e.g., "worker-python")
Returns:
True if started successfully, False otherwise
"""
try:
console.print(f"🚀 Starting worker: {container_name}")
console.print(f"🚀 Starting worker: {service_name}")
# Use docker start directly (works with container name)
subprocess.run(
["docker", "start", container_name],
capture_output=True,
text=True,
check=True
)
# Use docker-compose up to create and start the service
result = self._run_docker_compose("up", "-d", service_name)
logger.info(f"Worker {container_name} started")
logger.info(f"Worker {service_name} started")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to start worker {container_name}: {e.stderr}")
logger.error(f"Failed to start worker {service_name}: {e.stderr}")
console.print(f"❌ Failed to start worker: {e.stderr}", style="red")
console.print(f"💡 Start the worker manually: docker compose up -d {service_name}", style="yellow")
return False
except Exception as e:
logger.error(f"Unexpected error starting worker {container_name}: {e}")
logger.error(f"Unexpected error starting worker {service_name}: {e}")
console.print(f"❌ Unexpected error: {e}", style="red")
return False
def wait_for_worker_ready(self, container_name: str, timeout: Optional[int] = None) -> bool:
def wait_for_worker_ready(self, service_name: str, timeout: Optional[int] = None) -> bool:
"""
Wait for a worker to be healthy and ready to process tasks.
Args:
container_name: Name of the Docker container
service_name: Name of the Docker Compose service
timeout: Maximum seconds to wait (uses instance default if not specified)
Returns:
@@ -170,13 +179,14 @@ class WorkerManager:
"""
timeout = timeout or self.startup_timeout
start_time = time.time()
container_name = self._service_to_container_name(service_name)
console.print("⏳ Waiting for worker to be ready...")
while time.time() - start_time < timeout:
# Check if container is running
if not self.is_worker_running(container_name):
logger.debug(f"Worker {container_name} not running yet")
if not self.is_worker_running(service_name):
logger.debug(f"Worker {service_name} not running yet")
time.sleep(self.health_check_interval)
continue
@@ -193,16 +203,16 @@ class WorkerManager:
# If no health check is defined, assume healthy after running
if health_status == "<no value>" or health_status == "":
logger.info(f"Worker {container_name} is running (no health check)")
console.print(f"✅ Worker ready: {container_name}")
logger.info(f"Worker {service_name} is running (no health check)")
console.print(f"✅ Worker ready: {service_name}")
return True
if health_status == "healthy":
logger.info(f"Worker {container_name} is healthy")
console.print(f"✅ Worker ready: {container_name}")
logger.info(f"Worker {service_name} is healthy")
console.print(f"✅ Worker ready: {service_name}")
return True
logger.debug(f"Worker {container_name} health: {health_status}")
logger.debug(f"Worker {service_name} health: {health_status}")
except Exception as e:
logger.debug(f"Failed to check health: {e}")
@@ -210,41 +220,36 @@ class WorkerManager:
time.sleep(self.health_check_interval)
elapsed = time.time() - start_time
logger.warning(f"Worker {container_name} did not become ready within {elapsed:.1f}s")
logger.warning(f"Worker {service_name} did not become ready within {elapsed:.1f}s")
console.print(f"⚠️ Worker startup timeout after {elapsed:.1f}s", style="yellow")
return False
def stop_worker(self, container_name: str) -> bool:
def stop_worker(self, service_name: str) -> bool:
"""
Stop a worker container using docker.
Stop a worker service using docker-compose.
Args:
container_name: Name of the Docker container to stop
service_name: Name of the Docker Compose service to stop
Returns:
True if stopped successfully, False otherwise
"""
try:
console.print(f"🛑 Stopping worker: {container_name}")
console.print(f"🛑 Stopping worker: {service_name}")
# Use docker stop directly (works with container name)
subprocess.run(
["docker", "stop", container_name],
capture_output=True,
text=True,
check=True
)
# Use docker-compose down to stop and remove the service
result = self._run_docker_compose("stop", service_name)
logger.info(f"Worker {container_name} stopped")
logger.info(f"Worker {service_name} stopped")
return True
except subprocess.CalledProcessError as e:
logger.error(f"Failed to stop worker {container_name}: {e.stderr}")
logger.error(f"Failed to stop worker {service_name}: {e.stderr}")
console.print(f"❌ Failed to stop worker: {e.stderr}", style="red")
return False
except Exception as e:
logger.error(f"Unexpected error stopping worker {container_name}: {e}")
logger.error(f"Unexpected error stopping worker {service_name}: {e}")
console.print(f"❌ Unexpected error: {e}", style="red")
return False
@@ -257,17 +262,18 @@ class WorkerManager:
Ensure a worker is running, starting it if necessary.
Args:
worker_info: Worker information dict from API (contains worker_container, etc.)
worker_info: Worker information dict from API (contains worker_service, etc.)
auto_start: Whether to automatically start the worker if not running
Returns:
True if worker is running, False otherwise
"""
container_name = worker_info["worker_container"]
# Get worker_service (docker-compose service name)
service_name = worker_info.get("worker_service", f"worker-{worker_info['vertical']}")
vertical = worker_info["vertical"]
# Check if already running
if self.is_worker_running(container_name):
if self.is_worker_running(service_name):
console.print(f"✓ Worker already running: {vertical}")
return True
@@ -279,8 +285,8 @@ class WorkerManager:
return False
# Start the worker
if not self.start_worker(container_name):
if not self.start_worker(service_name):
return False
# Wait for it to be ready
return self.wait_for_worker_ready(container_name)
return self.wait_for_worker_ready(service_name)

6
cli/uv.lock generated
View File

@@ -1257,7 +1257,7 @@ wheels = [
[[package]]
name = "fuzzforge-ai"
version = "0.6.0"
version = "0.7.0"
source = { editable = "../ai" }
dependencies = [
{ name = "a2a-sdk" },
@@ -1303,7 +1303,7 @@ dev = [
[[package]]
name = "fuzzforge-cli"
version = "0.6.0"
version = "0.7.0"
source = { editable = "." }
dependencies = [
{ name = "fuzzforge-ai" },
@@ -1347,7 +1347,7 @@ provides-extras = ["dev"]
[[package]]
name = "fuzzforge-sdk"
version = "0.6.0"
version = "0.7.0"
source = { editable = "../sdk" }
dependencies = [
{ name = "httpx" },

View File

@@ -9,8 +9,6 @@
# Development: docker-compose -f docker-compose.temporal.yaml up
# Production: docker-compose -f docker-compose.temporal.yaml -f docker-compose.temporal.prod.yaml up
version: '3.8'
services:
# ============================================================================
# Temporal Server - Workflow Orchestration

View File

@@ -0,0 +1,61 @@
# FuzzForge Vertical Worker: Secret Detection
#
# Pre-installed tools for secret detection:
# - Gitleaks v8.18.0
# - TruffleHog v3.63.2
# - Temporal worker
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
# Build essentials
build-essential \
# Development tools
git \
curl \
wget \
# Cleanup
&& rm -rf /var/lib/apt/lists/*
# Install Gitleaks v8.18.0
RUN wget -q https://github.com/gitleaks/gitleaks/releases/download/v8.18.0/gitleaks_8.18.0_linux_x64.tar.gz && \
tar -xzf gitleaks_8.18.0_linux_x64.tar.gz && \
mv gitleaks /usr/local/bin/ && \
chmod +x /usr/local/bin/gitleaks && \
rm gitleaks_8.18.0_linux_x64.tar.gz
# Install TruffleHog v3.63.2
RUN wget -q https://github.com/trufflesecurity/trufflehog/releases/download/v3.63.2/trufflehog_3.63.2_linux_amd64.tar.gz && \
tar -xzf trufflehog_3.63.2_linux_amd64.tar.gz && \
mv trufflehog /usr/local/bin/ && \
chmod +x /usr/local/bin/trufflehog && \
rm trufflehog_3.63.2_linux_amd64.tar.gz
# Verify installations
RUN gitleaks version && trufflehog --version
# Install Python dependencies for Temporal worker
COPY requirements.txt /tmp/requirements.txt
RUN pip3 install --no-cache-dir -r /tmp/requirements.txt && \
rm /tmp/requirements.txt
# Create cache directory for downloaded targets
RUN mkdir -p /cache && chmod 755 /cache
# Copy worker entrypoint
COPY worker.py /app/worker.py
# Add toolbox and AI module to Python path (mounted at runtime)
ENV PYTHONPATH="/app:/app/toolbox:/app/ai_src:${PYTHONPATH}"
ENV PYTHONUNBUFFERED=1
# Healthcheck
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD python3 -c "import sys; sys.exit(0)"
# Run worker
CMD ["python3", "/app/worker.py"]

View File

@@ -0,0 +1,15 @@
# Temporal worker dependencies
temporalio>=1.5.0
pydantic>=2.0.0
# Storage (MinIO/S3)
boto3>=1.34.0
# Configuration
pyyaml>=6.0.0
# HTTP Client (for real-time stats reporting)
httpx>=0.27.0
# A2A Agent Communication (for LLM-based secret detection)
a2a-sdk[all]>=0.1.0

309
workers/secrets/worker.py Normal file
View File

@@ -0,0 +1,309 @@
"""
FuzzForge Vertical Worker: Secret Detection
This worker:
1. Discovers workflows for the 'secrets' vertical from mounted toolbox
2. Dynamically imports and registers workflow classes
3. Connects to Temporal and processes tasks
4. Handles activities for target download/upload from MinIO
"""
import asyncio
import importlib
import inspect
import logging
import os
import sys
from pathlib import Path
from typing import List, Any
import yaml
from temporalio.client import Client
from temporalio.worker import Worker
# Add toolbox to path for workflow and activity imports
sys.path.insert(0, '/app/toolbox')
# Import common storage activities
from toolbox.common.storage_activities import (
get_target_activity,
cleanup_cache_activity,
upload_results_activity
)
# Configure logging
logging.basicConfig(
level=os.getenv('LOG_LEVEL', 'INFO'),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def discover_workflows(vertical: str) -> List[Any]:
"""
Discover workflows for this vertical from mounted toolbox.
Args:
vertical: The vertical name (e.g., 'secrets', 'python', 'web')
Returns:
List of workflow classes decorated with @workflow.defn
"""
workflows = []
toolbox_path = Path("/app/toolbox/workflows")
if not toolbox_path.exists():
logger.warning(f"Toolbox path does not exist: {toolbox_path}")
return workflows
logger.info(f"Scanning for workflows in: {toolbox_path}")
for workflow_dir in toolbox_path.iterdir():
if not workflow_dir.is_dir():
continue
# Skip special directories
if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__':
continue
metadata_file = workflow_dir / "metadata.yaml"
if not metadata_file.exists():
logger.debug(f"No metadata.yaml in {workflow_dir.name}, skipping")
continue
try:
# Parse metadata
with open(metadata_file) as f:
metadata = yaml.safe_load(f)
# Check if workflow is for this vertical
workflow_vertical = metadata.get("vertical")
if workflow_vertical != vertical:
logger.debug(
f"Workflow {workflow_dir.name} is for vertical '{workflow_vertical}', "
f"not '{vertical}', skipping"
)
continue
# Check if workflow.py exists
workflow_file = workflow_dir / "workflow.py"
if not workflow_file.exists():
logger.warning(
f"Workflow {workflow_dir.name} has metadata but no workflow.py, skipping"
)
continue
# Dynamically import workflow module
module_name = f"toolbox.workflows.{workflow_dir.name}.workflow"
logger.info(f"Importing workflow module: {module_name}")
try:
module = importlib.import_module(module_name)
except Exception as e:
logger.error(
f"Failed to import workflow module {module_name}: {e}",
exc_info=True
)
continue
# Find @workflow.defn decorated classes
found_workflows = False
for name, obj in inspect.getmembers(module, inspect.isclass):
# Check if class has Temporal workflow definition
if hasattr(obj, '__temporal_workflow_definition'):
workflows.append(obj)
found_workflows = True
logger.info(
f"✓ Discovered workflow: {name} from {workflow_dir.name} "
f"(vertical: {vertical})"
)
if not found_workflows:
logger.warning(
f"Workflow {workflow_dir.name} has no @workflow.defn decorated classes"
)
except Exception as e:
logger.error(
f"Error processing workflow {workflow_dir.name}: {e}",
exc_info=True
)
continue
logger.info(f"Discovered {len(workflows)} workflows for vertical '{vertical}'")
return workflows
async def discover_activities(workflows_dir: Path) -> List[Any]:
"""
Discover activities from workflow directories.
Looks for activities.py files alongside workflow.py in each workflow directory.
Args:
workflows_dir: Path to workflows directory
Returns:
List of activity functions decorated with @activity.defn
"""
activities = []
if not workflows_dir.exists():
logger.warning(f"Workflows directory does not exist: {workflows_dir}")
return activities
logger.info(f"Scanning for workflow activities in: {workflows_dir}")
for workflow_dir in workflows_dir.iterdir():
if not workflow_dir.is_dir():
continue
# Skip special directories
if workflow_dir.name.startswith('.') or workflow_dir.name == '__pycache__':
continue
# Check if activities.py exists
activities_file = workflow_dir / "activities.py"
if not activities_file.exists():
logger.debug(f"No activities.py in {workflow_dir.name}, skipping")
continue
try:
# Dynamically import activities module
module_name = f"toolbox.workflows.{workflow_dir.name}.activities"
logger.info(f"Importing activities module: {module_name}")
try:
module = importlib.import_module(module_name)
except Exception as e:
logger.error(
f"Failed to import activities module {module_name}: {e}",
exc_info=True
)
continue
# Find @activity.defn decorated functions
found_activities = False
for name, obj in inspect.getmembers(module, inspect.isfunction):
# Check if function has Temporal activity definition
if hasattr(obj, '__temporal_activity_definition'):
activities.append(obj)
found_activities = True
logger.info(
f"✓ Discovered activity: {name} from {workflow_dir.name}"
)
if not found_activities:
logger.warning(
f"Workflow {workflow_dir.name} has activities.py but no @activity.defn decorated functions"
)
except Exception as e:
logger.error(
f"Error processing activities from {workflow_dir.name}: {e}",
exc_info=True
)
continue
logger.info(f"Discovered {len(activities)} workflow-specific activities")
return activities
async def main():
"""Main worker entry point"""
# Get configuration from environment
vertical = os.getenv("WORKER_VERTICAL", "secrets")
temporal_address = os.getenv("TEMPORAL_ADDRESS", "localhost:7233")
temporal_namespace = os.getenv("TEMPORAL_NAMESPACE", "default")
task_queue = os.getenv("WORKER_TASK_QUEUE", f"{vertical}-queue")
max_concurrent_activities = int(os.getenv("MAX_CONCURRENT_ACTIVITIES", "5"))
logger.info("=" * 60)
logger.info(f"FuzzForge Vertical Worker: {vertical}")
logger.info("=" * 60)
logger.info(f"Temporal Address: {temporal_address}")
logger.info(f"Temporal Namespace: {temporal_namespace}")
logger.info(f"Task Queue: {task_queue}")
logger.info(f"Max Concurrent Activities: {max_concurrent_activities}")
logger.info("=" * 60)
# Discover workflows for this vertical
logger.info(f"Discovering workflows for vertical: {vertical}")
workflows = await discover_workflows(vertical)
if not workflows:
logger.error(f"No workflows found for vertical: {vertical}")
logger.error("Worker cannot start without workflows. Exiting...")
sys.exit(1)
# Discover activities from workflow directories
logger.info("Discovering workflow-specific activities...")
workflows_dir = Path("/app/toolbox/workflows")
workflow_activities = await discover_activities(workflows_dir)
# Combine common storage activities with workflow-specific activities
activities = [
get_target_activity,
cleanup_cache_activity,
upload_results_activity
] + workflow_activities
logger.info(
f"Total activities registered: {len(activities)} "
f"(3 common + {len(workflow_activities)} workflow-specific)"
)
# Connect to Temporal
logger.info(f"Connecting to Temporal at {temporal_address}...")
try:
client = await Client.connect(
temporal_address,
namespace=temporal_namespace
)
logger.info("✓ Connected to Temporal successfully")
except Exception as e:
logger.error(f"Failed to connect to Temporal: {e}", exc_info=True)
sys.exit(1)
# Create worker with discovered workflows and activities
logger.info(f"Creating worker on task queue: {task_queue}")
try:
worker = Worker(
client,
task_queue=task_queue,
workflows=workflows,
activities=activities,
max_concurrent_activities=max_concurrent_activities
)
logger.info("✓ Worker created successfully")
except Exception as e:
logger.error(f"Failed to create worker: {e}", exc_info=True)
sys.exit(1)
# Start worker
logger.info("=" * 60)
logger.info(f"🚀 Worker started for vertical '{vertical}'")
logger.info(f"📦 Registered {len(workflows)} workflows")
logger.info(f"⚙️ Registered {len(activities)} activities")
logger.info(f"📨 Listening on task queue: {task_queue}")
logger.info("=" * 60)
logger.info("Worker is ready to process tasks...")
try:
await worker.run()
except KeyboardInterrupt:
logger.info("Shutting down worker (keyboard interrupt)...")
except Exception as e:
logger.error(f"Worker error: {e}", exc_info=True)
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Worker stopped")
except Exception as e:
logger.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)