feat(cli): add worker management commands with improved progress feedback

Add comprehensive CLI commands for managing Temporal workers:
- ff worker list - List workers with status and uptime
- ff worker start <name> - Start specific worker with optional rebuild
- ff worker stop - Safely stop all workers without affecting core services

Improvements:
- Live progress display during worker startup with Rich Status spinner
- Real-time elapsed time counter and container state updates
- Health check status tracking (starting → unhealthy → healthy)
- Helpful contextual hints at 10s, 30s, 60s intervals
- Better timeout messages showing last known state

Worker management enhancements:
- Use 'docker compose' (space) instead of 'docker-compose' (hyphen)
- Stop workers individually with 'docker stop' to avoid stopping core services
- Platform detection and Dockerfile selection (ARM64/AMD64)

Documentation:
- Updated docker-setup.md with CLI commands as primary method
- Created comprehensive cli-reference.md with all commands and examples
- Added worker management best practices
This commit is contained in:
tduhamel42
2025-10-29 13:24:04 +01:00
parent fd21a5e7b1
commit 38b2c8ea6e
6 changed files with 1114 additions and 55 deletions

View File

@@ -12,3 +12,6 @@ Command modules for FuzzForge CLI.
#
# Additional attribution and requirements are provided in the NOTICE file.
from . import worker
__all__ = ["worker"]

View File

@@ -0,0 +1,225 @@
"""
Worker management commands for FuzzForge CLI.
Provides commands to start, stop, and list Temporal workers.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import subprocess
import sys
import typer
from pathlib import Path
from rich.console import Console
from rich.table import Table
from typing import Optional
from ..worker_manager import WorkerManager
console = Console()
app = typer.Typer(
name="worker",
help="🔧 Manage Temporal workers",
no_args_is_help=True,
)
@app.command("stop")
def stop_workers(
all: bool = typer.Option(
False, "--all",
help="Stop all workers (default behavior, flag for clarity)"
)
):
"""
🛑 Stop all running FuzzForge workers.
This command stops all worker containers using the proper Docker Compose
profile flag to ensure workers are actually stopped (since they're in profiles).
Examples:
$ ff worker stop
$ ff worker stop --all
"""
try:
worker_mgr = WorkerManager()
success = worker_mgr.stop_all_workers()
if success:
sys.exit(0)
else:
console.print("⚠️ Some workers may not have stopped properly", style="yellow")
sys.exit(1)
except Exception as e:
console.print(f"❌ Error: {e}", style="red")
sys.exit(1)
@app.command("list")
def list_workers(
all: bool = typer.Option(
False, "--all", "-a",
help="Show all workers (including stopped)"
)
):
"""
📋 List FuzzForge workers and their status.
By default, shows only running workers. Use --all to see all workers.
Examples:
$ ff worker list
$ ff worker list --all
"""
try:
# Get list of running workers
result = subprocess.run(
["docker", "ps", "--filter", "name=fuzzforge-worker-",
"--format", "{{.Names}}\t{{.Status}}\t{{.RunningFor}}"],
capture_output=True,
text=True,
check=False
)
running_workers = []
if result.stdout.strip():
for line in result.stdout.strip().splitlines():
parts = line.split('\t')
if len(parts) >= 3:
running_workers.append({
"name": parts[0].replace("fuzzforge-worker-", ""),
"status": "Running",
"uptime": parts[2]
})
# If --all, also get stopped workers
stopped_workers = []
if all:
result_all = subprocess.run(
["docker", "ps", "-a", "--filter", "name=fuzzforge-worker-",
"--format", "{{.Names}}\t{{.Status}}"],
capture_output=True,
text=True,
check=False
)
all_worker_names = set()
for line in result_all.stdout.strip().splitlines():
parts = line.split('\t')
if len(parts) >= 2:
worker_name = parts[0].replace("fuzzforge-worker-", "")
all_worker_names.add(worker_name)
# If not running, it's stopped
if not any(w["name"] == worker_name for w in running_workers):
stopped_workers.append({
"name": worker_name,
"status": "Stopped",
"uptime": "-"
})
# Display results
if not running_workers and not stopped_workers:
console.print(" No workers found", style="cyan")
console.print("\n💡 Start a worker with: [cyan]docker compose up -d worker-<name>[/cyan]")
console.print(" Or run a workflow, which auto-starts workers: [cyan]ff workflow run <workflow> <target>[/cyan]")
return
# Create table
table = Table(title="FuzzForge Workers", show_header=True, header_style="bold cyan")
table.add_column("Worker", style="cyan", no_wrap=True)
table.add_column("Status", style="green")
table.add_column("Uptime", style="dim")
# Add running workers
for worker in running_workers:
table.add_row(
worker["name"],
f"[green]●[/green] {worker['status']}",
worker["uptime"]
)
# Add stopped workers if --all
for worker in stopped_workers:
table.add_row(
worker["name"],
f"[red]●[/red] {worker['status']}",
worker["uptime"]
)
console.print(table)
# Summary
if running_workers:
console.print(f"\n{len(running_workers)} worker(s) running")
if stopped_workers:
console.print(f"⏹️ {len(stopped_workers)} worker(s) stopped")
except Exception as e:
console.print(f"❌ Error listing workers: {e}", style="red")
sys.exit(1)
@app.command("start")
def start_worker(
name: str = typer.Argument(
...,
help="Worker name (e.g., 'python', 'android', 'secrets')"
),
build: bool = typer.Option(
False, "--build",
help="Rebuild worker image before starting"
)
):
"""
🚀 Start a specific worker.
The worker name should be the vertical name (e.g., 'python', 'android', 'rust').
Examples:
$ ff worker start python
$ ff worker start android --build
"""
try:
service_name = f"worker-{name}"
console.print(f"🚀 Starting worker: [cyan]{service_name}[/cyan]")
# Build docker compose command
cmd = ["docker", "compose", "up", "-d"]
if build:
cmd.append("--build")
cmd.append(service_name)
result = subprocess.run(
cmd,
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
console.print(f"✅ Worker [cyan]{service_name}[/cyan] started successfully")
else:
console.print(f"❌ Failed to start worker: {result.stderr}", style="red")
console.print(
f"\n💡 Try manually: [yellow]docker compose up -d {service_name}[/yellow]",
style="dim"
)
sys.exit(1)
except Exception as e:
console.print(f"❌ Error: {e}", style="red")
sys.exit(1)
if __name__ == "__main__":
app()

View File

@@ -29,6 +29,7 @@ from .commands import (
config as config_cmd,
ai,
ingest,
worker,
)
from .fuzzy import enhanced_command_not_found_handler
@@ -334,6 +335,7 @@ app.add_typer(finding_app, name="finding", help="🔍 View and analyze findings"
app.add_typer(monitor.app, name="monitor", help="📊 Real-time monitoring")
app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")
app.add_typer(worker.app, name="worker", help="🔧 Manage Temporal workers")
# Help and utility commands
@app.command()
@@ -409,7 +411,7 @@ def main():
'init', 'status', 'config', 'clean',
'workflows', 'workflow',
'findings', 'finding',
'monitor', 'ai', 'ingest',
'monitor', 'ai', 'ingest', 'worker',
'version'
]

View File

@@ -25,6 +25,7 @@ from typing import Optional, Dict, Any
import requests
import yaml
from rich.console import Console
from rich.status import Status
logger = logging.getLogger(__name__)
console = Console()
@@ -163,11 +164,25 @@ class WorkerManager:
Platform string: "linux/amd64" or "linux/arm64"
"""
machine = platform.machine().lower()
if machine in ["x86_64", "amd64"]:
return "linux/amd64"
elif machine in ["arm64", "aarch64"]:
return "linux/arm64"
return "unknown"
system = platform.system().lower()
logger.debug(f"Platform detection: machine={machine}, system={system}")
# Normalize machine architecture
if machine in ["x86_64", "amd64", "x64"]:
detected = "linux/amd64"
elif machine in ["arm64", "aarch64", "armv8", "arm64v8"]:
detected = "linux/arm64"
else:
# Fallback to amd64 for unknown architectures
logger.warning(
f"Unknown architecture '{machine}' detected, falling back to linux/amd64. "
f"Please report this issue if you're experiencing problems."
)
detected = "linux/amd64"
logger.info(f"Detected platform: {detected}")
return detected
def _read_worker_metadata(self, vertical: str) -> dict:
"""
@@ -213,28 +228,39 @@ class WorkerManager:
platforms = metadata.get("platforms", {})
if not platforms:
# Metadata exists but no platform definitions
logger.debug(f"No platform definitions in metadata for {vertical}, using Dockerfile")
return "Dockerfile"
# Try detected platform first
if detected_platform in platforms:
dockerfile = platforms[detected_platform].get("dockerfile", "Dockerfile")
logger.debug(f"Selected {dockerfile} for {vertical} on {detected_platform}")
logger.info(f"Selected {dockerfile} for {vertical} on {detected_platform}")
return dockerfile
# Fallback to default platform
default_platform = metadata.get("default_platform", "linux/amd64")
logger.warning(
f"Platform {detected_platform} not found in metadata for {vertical}, "
f"falling back to default: {default_platform}"
)
if default_platform in platforms:
dockerfile = platforms[default_platform].get("dockerfile", "Dockerfile.amd64")
logger.debug(f"Using default platform {default_platform}: {dockerfile}")
logger.info(f"Using default platform {default_platform}: {dockerfile}")
return dockerfile
# Last resort
# Last resort: just use Dockerfile
logger.warning(f"No suitable Dockerfile found for {vertical}, using 'Dockerfile'")
return "Dockerfile"
def _run_docker_compose(self, *args: str, env: Optional[Dict[str, str]] = None) -> subprocess.CompletedProcess:
"""
Run docker-compose command with optional environment variables.
Run docker compose command with optional environment variables.
Args:
*args: Arguments to pass to docker-compose
*args: Arguments to pass to docker compose
env: Optional environment variables to set
Returns:
@@ -243,7 +269,7 @@ class WorkerManager:
Raises:
subprocess.CalledProcessError: If command fails
"""
cmd = ["docker-compose", "-f", str(self.compose_file)] + list(args)
cmd = ["docker", "compose", "-f", str(self.compose_file)] + list(args)
logger.debug(f"Running: {' '.join(cmd)}")
# Merge with current environment
@@ -342,9 +368,67 @@ class WorkerManager:
console.print(f"❌ Unexpected error: {e}", style="red")
return False
def _get_container_state(self, service_name: str) -> str:
"""
Get the current state of a container (running, created, restarting, etc.).
Args:
service_name: Name of the Docker Compose service
Returns:
Container state string (running, created, restarting, exited, etc.) or "unknown"
"""
try:
container_name = self._service_to_container_name(service_name)
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Status}}", container_name],
capture_output=True,
text=True,
check=False
)
if result.returncode == 0:
return result.stdout.strip()
return "unknown"
except Exception as e:
logger.debug(f"Failed to get container state: {e}")
return "unknown"
def _get_health_status(self, container_name: str) -> str:
"""
Get container health status.
Args:
container_name: Docker container name
Returns:
Health status: "healthy", "unhealthy", "starting", "none", or "unknown"
"""
try:
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Health.Status}}", container_name],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return "unknown"
health_status = result.stdout.strip()
if health_status == "<no value>" or health_status == "":
return "none" # No health check defined
return health_status # healthy, unhealthy, starting
except Exception as e:
logger.debug(f"Failed to check health: {e}")
return "unknown"
def wait_for_worker_ready(self, service_name: str, timeout: Optional[int] = None) -> bool:
"""
Wait for a worker to be healthy and ready to process tasks.
Shows live progress updates during startup.
Args:
service_name: Name of the Docker Compose service
@@ -352,56 +436,74 @@ class WorkerManager:
Returns:
True if worker is ready, False if timeout reached
Raises:
TimeoutError: If worker doesn't become ready within timeout
"""
timeout = timeout or self.startup_timeout
start_time = time.time()
container_name = self._service_to_container_name(service_name)
last_status_msg = ""
console.print("⏳ Waiting for worker to be ready...")
with Status("[bold cyan]Starting worker...", console=console, spinner="dots") as status:
while time.time() - start_time < timeout:
elapsed = int(time.time() - start_time)
# Get container state
container_state = self._get_container_state(service_name)
# Get health status
health_status = self._get_health_status(container_name)
# Build status message based on current state
if container_state == "created":
status_msg = f"[cyan]Worker starting... ({elapsed}s)[/cyan]"
elif container_state == "restarting":
status_msg = f"[yellow]Worker restarting... ({elapsed}s)[/yellow]"
elif container_state == "running":
if health_status == "starting":
status_msg = f"[cyan]Worker running, health check starting... ({elapsed}s)[/cyan]"
elif health_status == "unhealthy":
status_msg = f"[yellow]Worker running, health check: unhealthy ({elapsed}s)[/yellow]"
elif health_status == "healthy":
status_msg = f"[green]Worker healthy! ({elapsed}s)[/green]"
status.update(status_msg)
console.print(f"✅ Worker ready: {service_name} (took {elapsed}s)")
logger.info(f"Worker {service_name} is healthy (took {elapsed}s)")
return True
elif health_status == "none":
# No health check defined, assume ready
status_msg = f"[green]Worker running (no health check) ({elapsed}s)[/green]"
status.update(status_msg)
console.print(f"✅ Worker ready: {service_name} (took {elapsed}s)")
logger.info(f"Worker {service_name} is running, no health check (took {elapsed}s)")
return True
else:
status_msg = f"[cyan]Worker running ({elapsed}s)[/cyan]"
elif not container_state or container_state == "exited":
status_msg = f"[yellow]Waiting for container to start... ({elapsed}s)[/yellow]"
else:
status_msg = f"[cyan]Worker state: {container_state} ({elapsed}s)[/cyan]"
# Show helpful hints at certain intervals
if elapsed == 10:
status_msg += " [dim](pulling image if not cached)[/dim]"
elif elapsed == 30:
status_msg += " [dim](large images can take time)[/dim]"
elif elapsed == 60:
status_msg += " [dim](still working...)[/dim]"
# Update status if changed
if status_msg != last_status_msg:
status.update(status_msg)
last_status_msg = status_msg
logger.debug(f"Worker {service_name} - state: {container_state}, health: {health_status}")
while time.time() - start_time < timeout:
# Check if container is running
if not self.is_worker_running(service_name):
logger.debug(f"Worker {service_name} not running yet")
time.sleep(self.health_check_interval)
continue
# Check container health status
try:
result = subprocess.run(
["docker", "inspect", "-f", "{{.State.Health.Status}}", container_name],
capture_output=True,
text=True,
check=False
)
health_status = result.stdout.strip()
# If no health check is defined, assume healthy after running
if health_status == "<no value>" or health_status == "":
logger.info(f"Worker {service_name} is running (no health check)")
console.print(f"✅ Worker ready: {service_name}")
return True
if health_status == "healthy":
logger.info(f"Worker {service_name} is healthy")
console.print(f"✅ Worker ready: {service_name}")
return True
logger.debug(f"Worker {service_name} health: {health_status}")
except Exception as e:
logger.debug(f"Failed to check health: {e}")
time.sleep(self.health_check_interval)
elapsed = time.time() - start_time
logger.warning(f"Worker {service_name} did not become ready within {elapsed:.1f}s")
console.print(f"⚠️ Worker startup timeout after {elapsed:.1f}s", style="yellow")
return False
# Timeout reached
elapsed = int(time.time() - start_time)
logger.warning(f"Worker {service_name} did not become ready within {elapsed}s")
console.print(f"⚠️ Worker startup timeout after {elapsed}s", style="yellow")
console.print(f" Last state: {container_state}, health: {health_status}", style="dim")
return False
def stop_worker(self, service_name: str) -> bool:
"""
@@ -432,6 +534,75 @@ class WorkerManager:
console.print(f"❌ Unexpected error: {e}", style="red")
return False
def stop_all_workers(self) -> bool:
"""
Stop all running FuzzForge worker containers.
This uses `docker stop` to stop worker containers individually,
avoiding the Docker Compose profile issue and preventing accidental
shutdown of core services.
Returns:
True if all workers stopped successfully, False otherwise
"""
try:
console.print("🛑 Stopping all FuzzForge workers...")
# Get list of all running worker containers
result = subprocess.run(
["docker", "ps", "--filter", "name=fuzzforge-worker-", "--format", "{{.Names}}"],
capture_output=True,
text=True,
check=False
)
running_workers = [name.strip() for name in result.stdout.splitlines() if name.strip()]
if not running_workers:
console.print("✓ No workers running")
return True
console.print(f"Found {len(running_workers)} running worker(s):")
for worker in running_workers:
console.print(f" - {worker}")
# Stop each worker container individually using docker stop
# This is safer than docker compose down and won't affect core services
failed_workers = []
for worker in running_workers:
try:
logger.info(f"Stopping {worker}...")
result = subprocess.run(
["docker", "stop", worker],
capture_output=True,
text=True,
check=True,
timeout=30
)
console.print(f" ✓ Stopped {worker}")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to stop {worker}: {e.stderr}")
failed_workers.append(worker)
console.print(f" ✗ Failed to stop {worker}", style="red")
except subprocess.TimeoutExpired:
logger.error(f"Timeout stopping {worker}")
failed_workers.append(worker)
console.print(f" ✗ Timeout stopping {worker}", style="red")
if failed_workers:
console.print(f"\n⚠️ {len(failed_workers)} worker(s) failed to stop", style="yellow")
console.print("💡 Try manually: docker stop " + " ".join(failed_workers), style="dim")
return False
console.print("\n✅ All workers stopped")
logger.info("All workers stopped successfully")
return True
except Exception as e:
logger.error(f"Unexpected error stopping workers: {e}")
console.print(f"❌ Unexpected error: {e}", style="red")
return False
def ensure_worker_running(
self,
worker_info: Dict[str, Any],