Compare commits

...

8 Commits

Author SHA1 Message Date
Tanguy Duhamel
1ba80c466b fix: register config as command group instead of custom function
The config command was implemented as a custom function that manually
routed to subcommands, which caused 'ff config show' to fail. It
treated 'show' as a configuration key argument instead of a subcommand.

Now properly registered as a Typer command group, enabling all config
subcommands (show, set, get, reset, edit) to work correctly.
2025-10-03 11:13:34 +02:00
abel
c9f8926bc3 ci: run in any situation on docs folder changes 2025-10-02 17:22:15 +02:00
abel
d2e0b61b67 fix: run only when changes to docs folder 2025-10-02 17:21:14 +02:00
tduhamel42
c2de6eae7d Merge pull request #10 from FuzzingLabs/refactor/remove-monitor-command
refactor: removed monitor command and --live parameter
2025-10-02 16:17:33 +02:00
tduhamel42
60b69667e7 Merge branch 'master' into refactor/remove-monitor-command 2025-10-02 16:12:01 +02:00
tduhamel42
28b0712f2f Merge pull request #11 from FuzzingLabs/fix/remove-erroneous-cli-example
fix: removed erroneous example
2025-10-02 16:08:23 +02:00
abel
a53d6c9ae5 fix: removed erroneous example 2025-10-02 16:01:54 +02:00
abel
928a5f5f77 refactor: removed monitor command and --live parameter 2025-10-02 15:49:18 +02:00
8 changed files with 350 additions and 774 deletions

View File

@@ -1,10 +1,13 @@
name: Deploy Docusaurus to GitHub Pages name: Deploy Docusaurus to GitHub Pages
on: on:
workflow_dispatch:
push: push:
branches: branches:
- master - master
workflow_dispatch: paths:
- "docs/**"
jobs: jobs:
build: build:

View File

@@ -1,9 +1,14 @@
name: Docusaurus test deployment name: Docusaurus test deployment
on: on:
workflow_dispatch:
push:
paths:
- "docs/**"
pull_request: pull_request:
branches: paths:
- master - "docs/**"
jobs: jobs:
test-deploy: test-deploy:

View File

@@ -80,8 +80,6 @@ fuzzforge workflows info security_assessment
# Submit a workflow for analysis # Submit a workflow for analysis
fuzzforge workflow security_assessment /path/to/your/code fuzzforge workflow security_assessment /path/to/your/code
# Monitor progress in real-time
fuzzforge monitor live <execution-id>
# View findings when complete # View findings when complete
fuzzforge finding <execution-id> fuzzforge finding <execution-id>
@@ -179,7 +177,6 @@ fuzzforge workflow security_assessment /path/to/code --wait
- `--timeout, -t` - Execution timeout in seconds - `--timeout, -t` - Execution timeout in seconds
- `--interactive/--no-interactive, -i/-n` - Interactive parameter input - `--interactive/--no-interactive, -i/-n` - Interactive parameter input
- `--wait, -w` - Wait for execution to complete - `--wait, -w` - Wait for execution to complete
- `--live, -l` - Show live monitoring during execution
#### `fuzzforge workflow status [execution-id]` #### `fuzzforge workflow status [execution-id]`
Check the status of a workflow execution. Check the status of a workflow execution.
@@ -261,39 +258,6 @@ fuzzforge finding export abc123def456 --format csv --output report.csv
fuzzforge finding export --format html --output report.html fuzzforge finding export --format html --output report.html
``` ```
### Real-time Monitoring
#### `fuzzforge monitor stats <execution-id>`
Show current fuzzing statistics.
```bash
# Show stats once
fuzzforge monitor stats abc123def456 --once
# Live updating stats (default)
fuzzforge monitor stats abc123def456 --refresh 5
```
#### `fuzzforge monitor crashes <run-id>`
Display crash reports for a fuzzing run.
```bash
fuzzforge monitor crashes abc123def456 --limit 50
```
#### `fuzzforge monitor live <run-id>`
Real-time monitoring dashboard with live updates.
```bash
fuzzforge monitor live abc123def456 --refresh 3
```
Features:
- Live updating statistics
- Progress indicators and bars
- Run status monitoring
- Automatic completion detection
### Configuration Management ### Configuration Management
#### `fuzzforge config show` #### `fuzzforge config show`
@@ -495,7 +459,6 @@ cli/
│ ├── workflows.py # Workflow management │ ├── workflows.py # Workflow management
│ ├── runs.py # Run management │ ├── runs.py # Run management
│ ├── findings.py # Findings management │ ├── findings.py # Findings management
│ ├── monitor.py # Real-time monitoring
│ ├── config.py # Configuration commands │ ├── config.py # Configuration commands
│ └── status.py # Status information │ └── status.py # Status information
├── pyproject.toml # Project configuration ├── pyproject.toml # Project configuration
@@ -576,7 +539,6 @@ fuzzforge --help
# Command-specific help # Command-specific help
ff workflows --help ff workflows --help
ff workflow run --help ff workflow run --help
ff monitor live --help
# Show version # Show version
fuzzforge --version fuzzforge --version

View File

@@ -10,11 +10,10 @@
# #
# Additional attribution and requirements are provided in the NOTICE file. # Additional attribution and requirements are provided in the NOTICE file.
from __future__ import annotations from __future__ import annotations
from pathlib import Path
import os import os
from pathlib import Path
from textwrap import dedent from textwrap import dedent
from typing import Optional from typing import Optional
@@ -32,17 +31,20 @@ app = typer.Typer()
@app.command() @app.command()
def project( def project(
name: Optional[str] = typer.Option( name: Optional[str] = typer.Option(
None, "--name", "-n", None, "--name", "-n", help="Project name (defaults to current directory name)"
help="Project name (defaults to current directory name)"
), ),
api_url: Optional[str] = typer.Option( api_url: Optional[str] = typer.Option(
None, "--api-url", "-u", None,
help="FuzzForge API URL (defaults to http://localhost:8000)" "--api-url",
"-u",
help="FuzzForge API URL (defaults to http://localhost:8000)",
), ),
force: bool = typer.Option( force: bool = typer.Option(
False, "--force", "-f", False,
help="Force initialization even if project already exists" "--force",
) "-f",
help="Force initialization even if project already exists",
),
): ):
""" """
📁 Initialize a new FuzzForge project in the current directory. 📁 Initialize a new FuzzForge project in the current directory.
@@ -58,24 +60,20 @@ def project(
# Check if project already exists # Check if project already exists
if fuzzforge_dir.exists() and not force: if fuzzforge_dir.exists() and not force:
if fuzzforge_dir.is_dir() and any(fuzzforge_dir.iterdir()): if fuzzforge_dir.is_dir() and any(fuzzforge_dir.iterdir()):
console.print("❌ FuzzForge project already exists in this directory", style="red") console.print(
"❌ FuzzForge project already exists in this directory", style="red"
)
console.print("Use --force to reinitialize", style="dim") console.print("Use --force to reinitialize", style="dim")
raise typer.Exit(1) raise typer.Exit(1)
# Get project name # Get project name
if not name: if not name:
name = Prompt.ask( name = Prompt.ask("Project name", default=current_dir.name, console=console)
"Project name",
default=current_dir.name,
console=console
)
# Get API URL # Get API URL
if not api_url: if not api_url:
api_url = Prompt.ask( api_url = Prompt.ask(
"FuzzForge API URL", "FuzzForge API URL", default="http://localhost:8000", console=console
default="http://localhost:8000",
console=console
) )
# Confirm initialization # Confirm initialization
@@ -117,15 +115,15 @@ def project(
] ]
if gitignore_path.exists(): if gitignore_path.exists():
with open(gitignore_path, 'r') as f: with open(gitignore_path, "r") as f:
existing_content = f.read() existing_content = f.read()
if "# FuzzForge CLI" not in existing_content: if "# FuzzForge CLI" not in existing_content:
with open(gitignore_path, 'a') as f: with open(gitignore_path, "a") as f:
f.write(f"\n{chr(10).join(gitignore_entries)}\n") f.write(f"\n{chr(10).join(gitignore_entries)}\n")
console.print("📝 Updated .gitignore with FuzzForge entries") console.print("📝 Updated .gitignore with FuzzForge entries")
else: else:
with open(gitignore_path, 'w') as f: with open(gitignore_path, "w") as f:
f.write(f"{chr(10).join(gitignore_entries)}\n") f.write(f"{chr(10).join(gitignore_entries)}\n")
console.print("📝 Created .gitignore") console.print("📝 Created .gitignore")
@@ -145,9 +143,6 @@ fuzzforge workflows
# Submit a workflow for analysis # Submit a workflow for analysis
fuzzforge workflow <workflow-name> /path/to/target fuzzforge workflow <workflow-name> /path/to/target
# Monitor run progress
fuzzforge monitor live <run-id>
# View findings # View findings
fuzzforge finding <run-id> fuzzforge finding <run-id>
``` ```
@@ -159,12 +154,12 @@ fuzzforge finding <run-id>
- `.fuzzforge/findings.db` - Local database for runs and findings - `.fuzzforge/findings.db` - Local database for runs and findings
""" """
with open(readme_path, 'w') as f: with open(readme_path, "w") as f:
f.write(readme_content) f.write(readme_content)
console.print("📚 Created README.md") console.print("📚 Created README.md")
console.print("\n✅ FuzzForge project initialized successfully!", style="green") console.print("\n✅ FuzzForge project initialized successfully!", style="green")
console.print(f"\n🎯 Next steps:") console.print("\n🎯 Next steps:")
console.print(" • ff workflows - See available workflows") console.print(" • ff workflows - See available workflows")
console.print(" • ff status - Check API connectivity") console.print(" • ff status - Check API connectivity")
console.print(" • ff workflow <workflow> <path> - Start your first analysis") console.print(" • ff workflow <workflow> <path> - Start your first analysis")

View File

@@ -1,436 +0,0 @@
"""
Real-time monitoring and statistics commands.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
from rich.align import Align
from rich import box
from ..config import get_project_config, FuzzForgeConfig
from ..database import get_project_db, ensure_project_db, CrashRecord
from fuzzforge_sdk import FuzzForgeClient
console = Console()
app = typer.Typer()
def get_client() -> FuzzForgeClient:
"""Get configured FuzzForge client"""
config = get_project_config() or FuzzForgeConfig()
return FuzzForgeClient(base_url=config.get_api_url(), timeout=config.get_timeout())
def format_duration(seconds: int) -> str:
"""Format duration in human readable format"""
if seconds < 60:
return f"{seconds}s"
elif seconds < 3600:
return f"{seconds // 60}m {seconds % 60}s"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours}h {minutes}m"
def format_number(num: int) -> str:
"""Format large numbers with K, M suffixes"""
if num >= 1000000:
return f"{num / 1000000:.1f}M"
elif num >= 1000:
return f"{num / 1000:.1f}K"
else:
return str(num)
@app.command("stats")
def fuzzing_stats(
run_id: str = typer.Argument(..., help="Run ID to get statistics for"),
refresh: int = typer.Option(
5, "--refresh", "-r",
help="Refresh interval in seconds"
),
once: bool = typer.Option(
False, "--once",
help="Show stats once and exit"
)
):
"""
📊 Show current fuzzing statistics for a run
"""
try:
with get_client() as client:
if once:
# Show stats once
stats = client.get_fuzzing_stats(run_id)
display_stats_table(stats)
else:
# Live updating stats
console.print(f"📊 [bold]Live Fuzzing Statistics[/bold] (Run: {run_id[:12]}...)")
console.print(f"Refreshing every {refresh}s. Press Ctrl+C to stop.\n")
with Live(auto_refresh=False, console=console) as live:
while True:
try:
stats = client.get_fuzzing_stats(run_id)
table = create_stats_table(stats)
live.update(table, refresh=True)
time.sleep(refresh)
except KeyboardInterrupt:
console.print("\n📊 Monitoring stopped", style="yellow")
break
except Exception as e:
console.print(f"❌ Failed to get fuzzing stats: {e}", style="red")
raise typer.Exit(1)
def display_stats_table(stats):
"""Display stats in a simple table"""
table = create_stats_table(stats)
console.print(table)
def create_stats_table(stats) -> Panel:
"""Create a rich table for fuzzing statistics"""
# Create main stats table
stats_table = Table(show_header=False, box=box.SIMPLE)
stats_table.add_column("Metric", style="bold cyan")
stats_table.add_column("Value", justify="right", style="bold white")
stats_table.add_row("Total Executions", format_number(stats.executions))
stats_table.add_row("Executions/sec", f"{stats.executions_per_sec:.1f}")
stats_table.add_row("Total Crashes", format_number(stats.crashes))
stats_table.add_row("Unique Crashes", format_number(stats.unique_crashes))
if stats.coverage is not None:
stats_table.add_row("Code Coverage", f"{stats.coverage:.1f}%")
stats_table.add_row("Corpus Size", format_number(stats.corpus_size))
stats_table.add_row("Elapsed Time", format_duration(stats.elapsed_time))
if stats.last_crash_time:
time_since_crash = datetime.now() - stats.last_crash_time
stats_table.add_row("Last Crash", f"{format_duration(int(time_since_crash.total_seconds()))} ago")
return Panel.fit(
stats_table,
title=f"📊 Fuzzing Statistics - {stats.workflow}",
subtitle=f"Run: {stats.run_id[:12]}...",
box=box.ROUNDED
)
@app.command("crashes")
def crash_reports(
run_id: str = typer.Argument(..., help="Run ID to get crash reports for"),
save: bool = typer.Option(
True, "--save/--no-save",
help="Save crashes to local database"
),
limit: int = typer.Option(
50, "--limit", "-l",
help="Maximum number of crashes to show"
)
):
"""
🐛 Display crash reports for a fuzzing run
"""
try:
with get_client() as client:
console.print(f"🐛 Fetching crash reports for run: {run_id}")
crashes = client.get_crash_reports(run_id)
if not crashes:
console.print("✅ No crashes found!", style="green")
return
# Save to database if requested
if save:
db = ensure_project_db()
for crash in crashes:
crash_record = CrashRecord(
run_id=run_id,
crash_id=crash.crash_id,
signal=crash.signal,
stack_trace=crash.stack_trace,
input_file=crash.input_file,
severity=crash.severity,
timestamp=crash.timestamp
)
db.save_crash(crash_record)
console.print("✅ Crashes saved to local database")
# Display crashes
crashes_to_show = crashes[:limit]
# Summary
severity_counts = {}
signal_counts = {}
for crash in crashes:
severity_counts[crash.severity] = severity_counts.get(crash.severity, 0) + 1
if crash.signal:
signal_counts[crash.signal] = signal_counts.get(crash.signal, 0) + 1
summary_table = Table(show_header=False, box=box.SIMPLE)
summary_table.add_column("Metric", style="bold cyan")
summary_table.add_column("Value", justify="right")
summary_table.add_row("Total Crashes", str(len(crashes)))
summary_table.add_row("Unique Signals", str(len(signal_counts)))
for severity, count in sorted(severity_counts.items()):
summary_table.add_row(f"{severity.title()} Severity", str(count))
console.print(
Panel.fit(
summary_table,
title=f"🐛 Crash Summary",
box=box.ROUNDED
)
)
# Detailed crash table
if crashes_to_show:
crashes_table = Table(box=box.ROUNDED)
crashes_table.add_column("Crash ID", style="bold cyan")
crashes_table.add_column("Signal", justify="center")
crashes_table.add_column("Severity", justify="center")
crashes_table.add_column("Timestamp", justify="center")
crashes_table.add_column("Input File", style="dim")
for crash in crashes_to_show:
signal_emoji = {
"SIGSEGV": "💥",
"SIGABRT": "🛑",
"SIGFPE": "🧮",
"SIGILL": "⚠️"
}.get(crash.signal or "", "🐛")
severity_style = {
"high": "red",
"medium": "yellow",
"low": "green"
}.get(crash.severity.lower(), "white")
input_display = ""
if crash.input_file:
input_display = crash.input_file.split("/")[-1] # Show just filename
crashes_table.add_row(
crash.crash_id[:12] + "..." if len(crash.crash_id) > 15 else crash.crash_id,
f"{signal_emoji} {crash.signal or 'Unknown'}",
f"[{severity_style}]{crash.severity}[/{severity_style}]",
crash.timestamp.strftime("%H:%M:%S"),
input_display
)
console.print(f"\n🐛 [bold]Crash Details[/bold]")
if len(crashes) > limit:
console.print(f"Showing first {limit} of {len(crashes)} crashes")
console.print()
console.print(crashes_table)
console.print(f"\n💡 Use [bold cyan]fuzzforge finding {run_id}[/bold cyan] for detailed analysis")
except Exception as e:
console.print(f"❌ Failed to get crash reports: {e}", style="red")
raise typer.Exit(1)
def _live_monitor(run_id: str, refresh: int):
"""Helper for live monitoring to allow for cleaner exit handling"""
with get_client() as client:
start_time = time.time()
def render_layout(run_status, stats):
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main", ratio=1),
Layout(name="footer", size=3)
)
layout["main"].split_row(
Layout(name="stats", ratio=1),
Layout(name="progress", ratio=1)
)
header = Panel(
f"[bold]FuzzForge Live Monitor[/bold]\n"
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
f"Uptime: {format_duration(int(time.time() - start_time))}",
box=box.ROUNDED,
style="cyan"
)
layout["header"].update(header)
layout["stats"].update(create_stats_table(stats))
progress_table = Table(show_header=False, box=box.SIMPLE)
progress_table.add_column("Metric", style="bold")
progress_table.add_column("Progress")
if stats.executions > 0:
exec_rate_percent = min(100, (stats.executions_per_sec / 1000) * 100)
progress_table.add_row("Exec Rate", create_progress_bar(exec_rate_percent, "green"))
crash_rate = (stats.crashes / stats.executions) * 100000
crash_rate_percent = min(100, crash_rate * 10)
progress_table.add_row("Crash Rate", create_progress_bar(crash_rate_percent, "red"))
if stats.coverage is not None:
progress_table.add_row("Coverage", create_progress_bar(stats.coverage, "blue"))
layout["progress"].update(Panel.fit(progress_table, title="📊 Progress Indicators", box=box.ROUNDED))
footer = Panel(
f"Last updated: {datetime.now().strftime('%H:%M:%S')} | "
f"Refresh interval: {refresh}s | Press Ctrl+C to exit",
box=box.ROUNDED,
style="dim"
)
layout["footer"].update(footer)
return layout
with Live(auto_refresh=False, console=console, screen=True) as live:
# Initial fetch
try:
run_status = client.get_run_status(run_id)
stats = client.get_fuzzing_stats(run_id)
except Exception:
# Minimal fallback stats
class FallbackStats:
def __init__(self, run_id):
self.run_id = run_id
self.workflow = "unknown"
self.executions = 0
self.executions_per_sec = 0.0
self.crashes = 0
self.unique_crashes = 0
self.coverage = None
self.corpus_size = 0
self.elapsed_time = 0
self.last_crash_time = None
stats = FallbackStats(run_id)
run_status = type("RS", (), {"status":"Unknown","is_completed":False,"is_failed":False})()
live.update(render_layout(run_status, stats), refresh=True)
# Simple polling approach that actually works
consecutive_errors = 0
max_errors = 5
while True:
try:
# Poll for updates
try:
run_status = client.get_run_status(run_id)
consecutive_errors = 0
except Exception as e:
consecutive_errors += 1
if consecutive_errors >= max_errors:
console.print(f"❌ Too many errors getting run status: {e}", style="red")
break
time.sleep(refresh)
continue
# Try to get fuzzing stats
try:
stats = client.get_fuzzing_stats(run_id)
except Exception as e:
# Create fallback stats if not available
stats = FallbackStats(run_id)
# Update display
live.update(render_layout(run_status, stats), refresh=True)
# Check if completed
if getattr(run_status, 'is_completed', False) or getattr(run_status, 'is_failed', False):
# Show final state for a few seconds
console.print("\n🏁 Run completed. Showing final state for 10 seconds...")
time.sleep(10)
break
# Wait before next poll
time.sleep(refresh)
except KeyboardInterrupt:
raise
except Exception as e:
console.print(f"⚠️ Monitoring error: {e}", style="yellow")
time.sleep(refresh)
# Completed status update
final_message = (
f"[bold]FuzzForge Live Monitor - COMPLETED[/bold]\n"
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
f"Total runtime: {format_duration(int(time.time() - start_time))}"
)
style = "green" if getattr(run_status, 'is_completed', False) else "red"
live.update(Panel(final_message, box=box.ROUNDED, style=style), refresh=True)
@app.command("live")
def live_monitor(
run_id: str = typer.Argument(..., help="Run ID to monitor live"),
refresh: int = typer.Option(
2, "--refresh", "-r",
help="Refresh interval in seconds (fallback when streaming unavailable)"
)
):
"""
📺 Real-time monitoring dashboard with live updates (WebSocket/SSE with REST fallback)
"""
console.print(f"📺 [bold]Live Monitoring Dashboard[/bold]")
console.print(f"Run: {run_id}")
console.print(f"Press Ctrl+C to stop monitoring\n")
try:
_live_monitor(run_id, refresh)
except KeyboardInterrupt:
console.print("\n📊 Monitoring stopped by user.", style="yellow")
except Exception as e:
console.print(f"❌ Failed to start live monitoring: {e}", style="red")
raise typer.Exit(1)
def create_progress_bar(percentage: float, color: str = "green") -> str:
"""Create a simple text progress bar"""
width = 20
filled = int((percentage / 100) * width)
bar = "" * filled + "" * (width - filled)
return f"[{color}]{bar}[/{color}] {percentage:.1f}%"
@app.callback(invoke_without_command=True)
def monitor_callback(ctx: typer.Context):
"""
📊 Real-time monitoring and statistics
"""
# Check if a subcommand is being invoked
if ctx.invoked_subcommand is not None:
# Let the subcommand handle it
return
# Show not implemented message for default command
from rich.console import Console
console = Console()
console.print("🚧 [yellow]Monitor command is not fully implemented yet.[/yellow]")
console.print("Please use specific subcommands:")
console.print(" • [cyan]ff monitor stats <run-id>[/cyan] - Show execution statistics")
console.print(" • [cyan]ff monitor crashes <run-id>[/cyan] - Show crash reports")
console.print(" • [cyan]ff monitor live <run-id>[/cyan] - Live monitoring dashboard")

View File

@@ -13,39 +13,47 @@ Replaces the old 'runs' terminology with cleaner workflow-centric commands.
# #
# Additional attribution and requirements are provided in the NOTICE file. # Additional attribution and requirements are provided in the NOTICE file.
import json
import time import time
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Optional, Dict, Any, List from typing import Any, Dict, List, Optional
import typer import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.prompt import Prompt, Confirm
from rich.live import Live
from rich import box
from ..config import get_project_config, FuzzForgeConfig
from ..database import get_project_db, ensure_project_db, RunRecord
from ..exceptions import (
handle_error, retry_on_network_error, safe_json_load, require_project,
APIConnectionError, ValidationError, DatabaseError, FileOperationError
)
from ..validation import (
validate_run_id, validate_workflow_name, validate_target_path,
validate_volume_mode, validate_parameters, validate_timeout
)
from ..progress import progress_manager, spinner, step_progress
from ..completion import WorkflowNameComplete, TargetPathComplete, VolumeModetComplete
from ..constants import (
STATUS_EMOJIS, MAX_RUN_ID_DISPLAY_LENGTH, DEFAULT_VOLUME_MODE,
PROGRESS_STEP_DELAYS, MAX_RETRIES, RETRY_DELAY, POLL_INTERVAL
)
from fuzzforge_sdk import FuzzForgeClient, WorkflowSubmission from fuzzforge_sdk import FuzzForgeClient, WorkflowSubmission
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm, Prompt
from rich.table import Table
from ..config import FuzzForgeConfig, get_project_config
from ..constants import (
DEFAULT_VOLUME_MODE,
MAX_RETRIES,
MAX_RUN_ID_DISPLAY_LENGTH,
POLL_INTERVAL,
PROGRESS_STEP_DELAYS,
RETRY_DELAY,
STATUS_EMOJIS,
)
from ..database import RunRecord, ensure_project_db, get_project_db
from ..exceptions import (
DatabaseError,
ValidationError,
handle_error,
require_project,
retry_on_network_error,
safe_json_load,
)
from ..progress import step_progress
from ..validation import (
validate_parameters,
validate_run_id,
validate_target_path,
validate_timeout,
validate_volume_mode,
validate_workflow_name,
)
console = Console() console = Console()
app = typer.Typer() app = typer.Typer()
@@ -75,7 +83,7 @@ def execute_workflow_submission(
parameters: Dict[str, Any], parameters: Dict[str, Any],
volume_mode: str, volume_mode: str,
timeout: Optional[int], timeout: Optional[int],
interactive: bool interactive: bool,
) -> Any: ) -> Any:
"""Handle the workflow submission process""" """Handle the workflow submission process"""
# Get workflow metadata for parameter validation # Get workflow metadata for parameter validation
@@ -92,7 +100,9 @@ def execute_workflow_submission(
missing_required = required_params - set(parameters.keys()) missing_required = required_params - set(parameters.keys())
if missing_required: if missing_required:
console.print(f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}") console.print(
f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}"
)
console.print("Please provide values:\n") console.print("Please provide values:\n")
for param_name in missing_required: for param_name in missing_required:
@@ -114,9 +124,18 @@ def execute_workflow_submission(
elif param_type == "number": elif param_type == "number":
parameters[param_name] = float(user_input) parameters[param_name] = float(user_input)
elif param_type == "boolean": elif param_type == "boolean":
parameters[param_name] = user_input.lower() in ("true", "yes", "1", "on") parameters[param_name] = user_input.lower() in (
"true",
"yes",
"1",
"on",
)
elif param_type == "array": elif param_type == "array":
parameters[param_name] = [item.strip() for item in user_input.split(",") if item.strip()] parameters[param_name] = [
item.strip()
for item in user_input.split(",")
if item.strip()
]
else: else:
parameters[param_name] = user_input parameters[param_name] = user_input
break break
@@ -127,8 +146,9 @@ def execute_workflow_submission(
validate_volume_mode(volume_mode) validate_volume_mode(volume_mode)
if volume_mode not in workflow_meta.supported_volume_modes: if volume_mode not in workflow_meta.supported_volume_modes:
raise ValidationError( raise ValidationError(
"volume mode", volume_mode, "volume mode",
f"one of: {', '.join(workflow_meta.supported_volume_modes)}" volume_mode,
f"one of: {', '.join(workflow_meta.supported_volume_modes)}",
) )
# Create submission # Create submission
@@ -136,11 +156,11 @@ def execute_workflow_submission(
target_path=target_path, target_path=target_path,
volume_mode=volume_mode, volume_mode=volume_mode,
parameters=parameters, parameters=parameters,
timeout=timeout timeout=timeout,
) )
# Show submission summary # Show submission summary
console.print(f"\n🎯 [bold]Executing workflow:[/bold]") console.print("\n🎯 [bold]Executing workflow:[/bold]")
console.print(f" Workflow: {workflow}") console.print(f" Workflow: {workflow}")
console.print(f" Target: {target_path}") console.print(f" Target: {target_path}")
console.print(f" Volume Mode: {volume_mode}") console.print(f" Volume Mode: {volume_mode}")
@@ -165,7 +185,7 @@ def execute_workflow_submission(
"Connecting to FuzzForge API", "Connecting to FuzzForge API",
"Uploading parameters and settings", "Uploading parameters and settings",
"Creating workflow deployment", "Creating workflow deployment",
"Initializing execution environment" "Initializing execution environment",
] ]
with step_progress(steps, f"Executing {workflow}") as progress: with step_progress(steps, f"Executing {workflow}") as progress:
@@ -185,46 +205,46 @@ def execute_workflow_submission(
progress.next_step() # Initializing progress.next_step() # Initializing
time.sleep(PROGRESS_STEP_DELAYS["initializing"]) time.sleep(PROGRESS_STEP_DELAYS["initializing"])
progress.complete(f"Workflow started successfully!") progress.complete("Workflow started successfully!")
return response return response
# Main workflow execution command (replaces 'runs submit') # Main workflow execution command (replaces 'runs submit')
@app.command(name="exec", hidden=True) # Hidden because it will be called from main workflow command @app.command(
name="exec", hidden=True
) # Hidden because it will be called from main workflow command
def execute_workflow( def execute_workflow(
workflow: str = typer.Argument(..., help="Workflow name to execute"), workflow: str = typer.Argument(..., help="Workflow name to execute"),
target_path: str = typer.Argument(..., help="Path to analyze"), target_path: str = typer.Argument(..., help="Path to analyze"),
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"), params: List[str] = typer.Argument(
default=None, help="Parameters as key=value pairs"
),
param_file: Optional[str] = typer.Option( param_file: Optional[str] = typer.Option(
None, "--param-file", "-f", None, "--param-file", "-f", help="JSON file containing workflow parameters"
help="JSON file containing workflow parameters"
), ),
volume_mode: str = typer.Option( volume_mode: str = typer.Option(
DEFAULT_VOLUME_MODE, "--volume-mode", "-v", DEFAULT_VOLUME_MODE,
help="Volume mount mode: ro (read-only) or rw (read-write)" "--volume-mode",
"-v",
help="Volume mount mode: ro (read-only) or rw (read-write)",
), ),
timeout: Optional[int] = typer.Option( timeout: Optional[int] = typer.Option(
None, "--timeout", "-t", None, "--timeout", "-t", help="Execution timeout in seconds"
help="Execution timeout in seconds"
), ),
interactive: bool = typer.Option( interactive: bool = typer.Option(
True, "--interactive/--no-interactive", "-i/-n", True,
help="Interactive parameter input for missing required parameters" "--interactive/--no-interactive",
"-i/-n",
help="Interactive parameter input for missing required parameters",
), ),
wait: bool = typer.Option( wait: bool = typer.Option(
False, "--wait", "-w", False, "--wait", "-w", help="Wait for execution to complete"
help="Wait for execution to complete"
), ),
live: bool = typer.Option(
False, "--live", "-l",
help="Start live monitoring after execution (useful for fuzzing workflows)"
)
): ):
""" """
🚀 Execute a workflow on a target 🚀 Execute a workflow on a target
Use --live for fuzzing workflows to see real-time progress.
Use --wait to wait for completion without live dashboard. Use --wait to wait for completion without live dashboard.
""" """
try: try:
@@ -264,13 +284,20 @@ def execute_workflow(
try: try:
with get_client() as client: with get_client() as client:
response = execute_workflow_submission( response = execute_workflow_submission(
client, workflow, target_path, parameters, client,
volume_mode, timeout, interactive workflow,
target_path,
parameters,
volume_mode,
timeout,
interactive,
) )
console.print(f"✅ Workflow execution started!", style="green") console.print("✅ Workflow execution started!", style="green")
console.print(f" Execution ID: [bold cyan]{response.run_id}[/bold cyan]") console.print(f" Execution ID: [bold cyan]{response.run_id}[/bold cyan]")
console.print(f" Status: {status_emoji(response.status)} {response.status}") console.print(
f" Status: {status_emoji(response.status)} {response.status}"
)
# Save to database # Save to database
try: try:
@@ -281,65 +308,55 @@ def execute_workflow(
status=response.status, status=response.status,
target_path=target_path, target_path=target_path,
parameters=parameters, parameters=parameters,
created_at=datetime.now() created_at=datetime.now(),
) )
db.save_run(run_record) db.save_run(run_record)
except Exception as e: except Exception as e:
# Don't fail the whole operation if database save fails # Don't fail the whole operation if database save fails
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow") console.print(
f"⚠️ Failed to save execution to database: {e}", style="yellow"
)
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]") console.print(
console.print(f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]") f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]"
)
# Suggest --live for fuzzing workflows
if not live and not wait and "fuzzing" in workflow.lower():
console.print(f"💡 Next time try: [bold cyan]fuzzforge workflow {workflow} {target_path} --live[/bold cyan] for real-time fuzzing dashboard", style="dim")
# Start live monitoring if requested
if live:
# Check if this is a fuzzing workflow to show appropriate messaging
is_fuzzing = "fuzzing" in workflow.lower()
if is_fuzzing:
console.print(f"\n📺 Starting live fuzzing dashboard...")
console.print("💡 You'll see real-time crash discovery, execution stats, and coverage data.")
else:
console.print(f"\n📺 Starting live monitoring dashboard...")
console.print("Press Ctrl+C to stop monitoring (execution continues in background).\n")
try:
from ..commands.monitor import live_monitor
# Import monitor command and run it
live_monitor(response.run_id, refresh=3)
except KeyboardInterrupt:
console.print(f"\n⏹️ Live monitoring stopped (execution continues in background)", style="yellow")
except Exception as e:
console.print(f"⚠️ Failed to start live monitoring: {e}", style="yellow")
console.print(f"💡 You can still monitor manually: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
# Wait for completion if requested # Wait for completion if requested
elif wait: if wait:
console.print(f"\n⏳ Waiting for execution to complete...") console.print("\n⏳ Waiting for execution to complete...")
try: try:
final_status = client.wait_for_completion(response.run_id, poll_interval=POLL_INTERVAL) final_status = client.wait_for_completion(
response.run_id, poll_interval=POLL_INTERVAL
)
# Update database # Update database
try: try:
db.update_run_status( db.update_run_status(
response.run_id, response.run_id,
final_status.status, final_status.status,
completed_at=datetime.now() if final_status.is_completed else None completed_at=datetime.now()
if final_status.is_completed
else None,
) )
except Exception as e: except Exception as e:
console.print(f"⚠️ Failed to update database: {e}", style="yellow") console.print(
f"⚠️ Failed to update database: {e}", style="yellow"
)
console.print(f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}") console.print(
f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}"
)
if final_status.is_completed: if final_status.is_completed:
console.print(f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]") console.print(
f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]"
)
except KeyboardInterrupt: except KeyboardInterrupt:
console.print(f"\n⏹️ Monitoring cancelled (execution continues in background)", style="yellow") console.print(
"\n⏹️ Monitoring cancelled (execution continues in background)",
style="yellow",
)
except Exception as e: except Exception as e:
handle_error(e, "waiting for completion") handle_error(e, "waiting for completion")
@@ -349,7 +366,9 @@ def execute_workflow(
@app.command("status") @app.command("status")
def workflow_status( def workflow_status(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to check (defaults to most recent)") execution_id: Optional[str] = typer.Argument(
None, help="Execution ID to check (defaults to most recent)"
),
): ):
""" """
📊 Check the status of a workflow execution 📊 Check the status of a workflow execution
@@ -368,7 +387,9 @@ def workflow_status(
if not execution_id: if not execution_id:
recent_runs = db.list_runs(limit=1) recent_runs = db.list_runs(limit=1)
if not recent_runs: if not recent_runs:
console.print("⚠️ No executions found in project database", style="yellow") console.print(
"⚠️ No executions found in project database", style="yellow"
)
raise typer.Exit(0) raise typer.Exit(0)
execution_id = recent_runs[0].run_id execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}") console.print(f"🔍 Using most recent execution: {execution_id}")
@@ -384,7 +405,7 @@ def workflow_status(
db.update_run_status( db.update_run_status(
execution_id, execution_id,
status.status, status.status,
completed_at=status.updated_at if status.is_completed else None completed_at=status.updated_at if status.is_completed else None,
) )
except Exception as e: except Exception as e:
console.print(f"⚠️ Failed to update database: {e}", style="yellow") console.print(f"⚠️ Failed to update database: {e}", style="yellow")
@@ -404,23 +425,24 @@ def workflow_status(
if status.is_completed: if status.is_completed:
duration = status.updated_at - status.created_at duration = status.updated_at - status.created_at
status_table.add_row("Duration", str(duration).split('.')[0]) # Remove microseconds status_table.add_row(
"Duration", str(duration).split(".")[0]
) # Remove microseconds
console.print( console.print(
Panel.fit( Panel.fit(status_table, title="📊 Status Information", box=box.ROUNDED)
status_table,
title=f"📊 Status Information",
box=box.ROUNDED
)
) )
# Show next steps # Show next steps
if status.is_running:
console.print(f"\n💡 Monitor live: [bold cyan]fuzzforge monitor {execution_id}[/bold cyan]") if status.is_completed:
elif status.is_completed: console.print(
console.print(f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]") f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]"
)
elif status.is_failed: elif status.is_failed:
console.print(f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]") console.print(
f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]"
)
except Exception as e: except Exception as e:
handle_error(e, "getting execution status") handle_error(e, "getting execution status")
@@ -428,9 +450,15 @@ def workflow_status(
@app.command("history") @app.command("history")
def workflow_history( def workflow_history(
workflow: Optional[str] = typer.Option(None, "--workflow", "-w", help="Filter by workflow name"), workflow: Optional[str] = typer.Option(
status: Optional[str] = typer.Option(None, "--status", "-s", help="Filter by status"), None, "--workflow", "-w", help="Filter by workflow name"
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of executions to show") ),
status: Optional[str] = typer.Option(
None, "--status", "-s", help="Filter by status"
),
limit: int = typer.Option(
20, "--limit", "-l", help="Maximum number of executions to show"
),
): ):
""" """
📋 Show workflow execution history 📋 Show workflow execution history
@@ -463,12 +491,14 @@ def workflow_history(
param_str = f"{param_count} params" if param_count > 0 else "-" param_str = f"{param_count} params" if param_count > 0 else "-"
table.add_row( table.add_row(
run.run_id[:12] + "..." if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH else run.run_id, run.run_id[:12] + "..."
if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH
else run.run_id,
run.workflow, run.workflow,
f"{status_emoji(run.status)} {run.status}", f"{status_emoji(run.status)} {run.status}",
Path(run.target_path).name, Path(run.target_path).name,
run.created_at.strftime("%m-%d %H:%M"), run.created_at.strftime("%m-%d %H:%M"),
param_str param_str,
) )
console.print(f"\n📋 [bold]Workflow Execution History ({len(runs)})[/bold]") console.print(f"\n📋 [bold]Workflow Execution History ({len(runs)})[/bold]")
@@ -479,7 +509,9 @@ def workflow_history(
console.print() console.print()
console.print(table) console.print(table)
console.print(f"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status") console.print(
"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status"
)
except Exception as e: except Exception as e:
handle_error(e, "listing execution history") handle_error(e, "listing execution history")
@@ -487,11 +519,15 @@ def workflow_history(
@app.command("retry") @app.command("retry")
def retry_workflow( def retry_workflow(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to retry (defaults to most recent)"), execution_id: Optional[str] = typer.Argument(
None, help="Execution ID to retry (defaults to most recent)"
),
modify_params: bool = typer.Option( modify_params: bool = typer.Option(
False, "--modify-params", "-m", False,
help="Interactively modify parameters before retrying" "--modify-params",
) "-m",
help="Interactively modify parameters before retrying",
),
): ):
""" """
🔄 Retry a workflow execution with the same or modified parameters 🔄 Retry a workflow execution with the same or modified parameters
@@ -517,7 +553,9 @@ def retry_workflow(
# Get original execution # Get original execution
original_run = db.get_run(execution_id) original_run = db.get_run(execution_id)
if not original_run: if not original_run:
raise ValidationError("execution_id", execution_id, "an existing execution ID in the database") raise ValidationError(
"execution_id", execution_id, "an existing execution ID in the database"
)
console.print(f"🔄 [bold]Retrying workflow:[/bold] {original_run.workflow}") console.print(f"🔄 [bold]Retrying workflow:[/bold] {original_run.workflow}")
console.print(f" Original Execution ID: {execution_id}") console.print(f" Original Execution ID: {execution_id}")
@@ -527,24 +565,29 @@ def retry_workflow(
# Modify parameters if requested # Modify parameters if requested
if modify_params and parameters: if modify_params and parameters:
console.print(f"\n📝 [bold]Current parameters:[/bold]") console.print("\n📝 [bold]Current parameters:[/bold]")
for key, value in parameters.items(): for key, value in parameters.items():
new_value = Prompt.ask( new_value = Prompt.ask(f"{key}", default=str(value), console=console)
f"{key}",
default=str(value),
console=console
)
if new_value != str(value): if new_value != str(value):
# Try to maintain type # Try to maintain type
try: try:
if isinstance(value, bool): if isinstance(value, bool):
parameters[key] = new_value.lower() in ("true", "yes", "1", "on") parameters[key] = new_value.lower() in (
"true",
"yes",
"1",
"on",
)
elif isinstance(value, int): elif isinstance(value, int):
parameters[key] = int(new_value) parameters[key] = int(new_value)
elif isinstance(value, float): elif isinstance(value, float):
parameters[key] = float(new_value) parameters[key] = float(new_value)
elif isinstance(value, list): elif isinstance(value, list):
parameters[key] = [item.strip() for item in new_value.split(",") if item.strip()] parameters[key] = [
item.strip()
for item in new_value.split(",")
if item.strip()
]
else: else:
parameters[key] = new_value parameters[key] = new_value
except ValueError: except ValueError:
@@ -553,15 +596,18 @@ def retry_workflow(
# Submit new execution # Submit new execution
with get_client() as client: with get_client() as client:
submission = WorkflowSubmission( submission = WorkflowSubmission(
target_path=original_run.target_path, target_path=original_run.target_path, parameters=parameters
parameters=parameters
) )
response = client.submit_workflow(original_run.workflow, submission) response = client.submit_workflow(original_run.workflow, submission)
console.print(f"\n✅ Retry submitted successfully!", style="green") console.print("\n✅ Retry submitted successfully!", style="green")
console.print(f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]") console.print(
console.print(f" Status: {status_emoji(response.status)} {response.status}") f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]"
)
console.print(
f" Status: {status_emoji(response.status)} {response.status}"
)
# Save to database # Save to database
try: try:
@@ -572,13 +618,17 @@ def retry_workflow(
target_path=original_run.target_path, target_path=original_run.target_path,
parameters=parameters, parameters=parameters,
created_at=datetime.now(), created_at=datetime.now(),
metadata={"retry_of": execution_id} metadata={"retry_of": execution_id},
) )
db.save_run(run_record) db.save_run(run_record)
except Exception as e: except Exception as e:
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow") console.print(
f"⚠️ Failed to save execution to database: {e}", style="yellow"
)
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]") console.print(
f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]"
)
except Exception as e: except Exception as e:
handle_error(e, "retrying workflow") handle_error(e, "retrying workflow")

View File

@@ -14,9 +14,9 @@ Provides "Did you mean...?" functionality and intelligent command/parameter sugg
# #
# Additional attribution and requirements are provided in the NOTICE file. # Additional attribution and requirements are provided in the NOTICE file.
import difflib import difflib
from typing import List, Optional, Dict, Any, Tuple from typing import Any, Dict, List, Optional, Tuple
from rich.console import Console from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from rich.text import Text from rich.text import Text
@@ -34,10 +34,9 @@ class FuzzyMatcher:
"workflows": ["list", "info"], "workflows": ["list", "info"],
"runs": ["submit", "status", "list", "rerun"], "runs": ["submit", "status", "list", "rerun"],
"findings": ["get", "list", "export", "all"], "findings": ["get", "list", "export", "all"],
"monitor": ["stats", "crashes", "live"],
"config": ["set", "get", "list", "init"], "config": ["set", "get", "list", "init"],
"ai": ["ask", "summarize", "explain"], "ai": ["ask", "summarize", "explain"],
"ingest": ["project", "findings"] "ingest": ["project", "findings"],
} }
# Common workflow names # Common workflow names
@@ -47,7 +46,7 @@ class FuzzyMatcher:
"infrastructure_scan", "infrastructure_scan",
"static_analysis_scan", "static_analysis_scan",
"penetration_testing_scan", "penetration_testing_scan",
"secret_detection_scan" "secret_detection_scan",
] ]
# Common parameter names # Common parameter names
@@ -60,24 +59,25 @@ class FuzzyMatcher:
"param-file", "param-file",
"interactive", "interactive",
"wait", "wait",
"live",
"format", "format",
"output", "output",
"severity", "severity",
"since", "since",
"limit", "limit",
"stats", "stats",
"export" "export",
] ]
# Common values # Common values
self.common_values = { self.common_values = {
"volume_mode": ["ro", "rw"], "volume_mode": ["ro", "rw"],
"format": ["json", "csv", "html", "sarif"], "format": ["json", "csv", "html", "sarif"],
"severity": ["critical", "high", "medium", "low", "info"] "severity": ["critical", "high", "medium", "low", "info"],
} }
def find_closest_command(self, user_input: str, command_group: Optional[str] = None) -> Optional[Tuple[str, float]]: def find_closest_command(
self, user_input: str, command_group: Optional[str] = None
) -> Optional[Tuple[str, float]]:
"""Find the closest matching command.""" """Find the closest matching command."""
if command_group and command_group in self.commands: if command_group and command_group in self.commands:
# Search within a specific command group # Search within a specific command group
@@ -86,9 +86,7 @@ class FuzzyMatcher:
# Search all main commands # Search all main commands
candidates = list(self.commands.keys()) candidates = list(self.commands.keys())
matches = difflib.get_close_matches( matches = difflib.get_close_matches(user_input, candidates, n=1, cutoff=0.6)
user_input, candidates, n=1, cutoff=0.6
)
if matches: if matches:
match = matches[0] match = matches[0]
@@ -114,7 +112,7 @@ class FuzzyMatcher:
def find_closest_parameter(self, user_input: str) -> Optional[Tuple[str, float]]: def find_closest_parameter(self, user_input: str) -> Optional[Tuple[str, float]]:
"""Find the closest matching parameter name.""" """Find the closest matching parameter name."""
# Remove leading dashes # Remove leading dashes
clean_input = user_input.lstrip('-') clean_input = user_input.lstrip("-")
matches = difflib.get_close_matches( matches = difflib.get_close_matches(
clean_input, self.parameter_names, n=1, cutoff=0.6 clean_input, self.parameter_names, n=1, cutoff=0.6
@@ -139,7 +137,9 @@ class FuzzyMatcher:
return [] return []
def get_command_suggestions(self, user_command: List[str]) -> Optional[Dict[str, Any]]: def get_command_suggestions(
self, user_command: List[str]
) -> Optional[Dict[str, Any]]:
"""Get suggestions for a user command that may have typos.""" """Get suggestions for a user command that may have typos."""
if not user_command: if not user_command:
return None return None
@@ -153,11 +153,9 @@ class FuzzyMatcher:
if closest: if closest:
match, confidence = closest match, confidence = closest
suggestions["type"] = "main_command" suggestions["type"] = "main_command"
suggestions["suggestions"].append({ suggestions["suggestions"].append(
"text": match, {"text": match, "confidence": confidence, "type": "command"}
"confidence": confidence, )
"type": "command"
})
# Check subcommand if present # Check subcommand if present
elif len(user_command) > 1: elif len(user_command) > 1:
@@ -167,11 +165,13 @@ class FuzzyMatcher:
if closest: if closest:
match, confidence = closest match, confidence = closest
suggestions["type"] = "subcommand" suggestions["type"] = "subcommand"
suggestions["suggestions"].append({ suggestions["suggestions"].append(
"text": f"{main_cmd} {match}", {
"confidence": confidence, "text": f"{main_cmd} {match}",
"type": "subcommand" "confidence": confidence,
}) "type": "subcommand",
}
)
return suggestions if suggestions["suggestions"] else None return suggestions if suggestions["suggestions"] else None
@@ -210,17 +210,19 @@ def display_command_suggestion(suggestions: Dict[str, Any]):
# Add helpful context # Add helpful context
if suggestion_type == "main_command": if suggestion_type == "main_command":
text.append("\n💡 Use 'fuzzforge --help' to see all available commands", style="dim") text.append(
"\n💡 Use 'fuzzforge --help' to see all available commands", style="dim"
)
elif suggestion_type == "subcommand": elif suggestion_type == "subcommand":
main_cmd = suggestions["original"][0] main_cmd = suggestions["original"][0]
text.append(f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands", style="dim") text.append(
f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands",
style="dim",
)
console.print(Panel( console.print(
text, Panel(text, title="🤔 Command Suggestion", border_style="yellow", expand=False)
title="🤔 Command Suggestion", )
border_style="yellow",
expand=False
))
def display_workflow_suggestion(original: str, suggestion: str): def display_workflow_suggestion(original: str, suggestion: str):
@@ -234,14 +236,13 @@ def display_workflow_suggestion(original: str, suggestion: str):
text.append(f"'{suggestion}'", style="bold green") text.append(f"'{suggestion}'", style="bold green")
text.append("?\n\n") text.append("?\n\n")
text.append("💡 Use 'fuzzforge workflows' to see all available workflows", style="dim") text.append(
"💡 Use 'fuzzforge workflows' to see all available workflows", style="dim"
)
console.print(Panel( console.print(
text, Panel(text, title="🔧 Workflow Suggestion", border_style="yellow", expand=False)
title="🔧 Workflow Suggestion", )
border_style="yellow",
expand=False
))
def display_parameter_suggestion(original: str, suggestion: str): def display_parameter_suggestion(original: str, suggestion: str):
@@ -257,12 +258,9 @@ def display_parameter_suggestion(original: str, suggestion: str):
text.append("💡 Use '--help' to see all available parameters", style="dim") text.append("💡 Use '--help' to see all available parameters", style="dim")
console.print(Panel( console.print(
text, Panel(text, title="⚙️ Parameter Suggestion", border_style="yellow", expand=False)
title="⚙️ Parameter Suggestion", )
border_style="yellow",
expand=False
))
def enhanced_command_not_found_handler(command_parts: List[str]): def enhanced_command_not_found_handler(command_parts: List[str]):

View File

@@ -12,22 +12,23 @@ Main CLI application with improved command structure.
# #
# Additional attribution and requirements are provided in the NOTICE file. # Additional attribution and requirements are provided in the NOTICE file.
import sys
from typing import List, Optional
import typer import typer
from rich.console import Console from rich.console import Console
from rich.traceback import install from rich.traceback import install
from typing import Optional, List
import sys
from .commands import ( from .commands import (
init,
workflows,
workflow_exec,
findings,
monitor,
config as config_cmd,
ai, ai,
findings,
ingest, ingest,
init,
workflow_exec,
workflows,
)
from .commands import (
config as config_cmd,
) )
from .constants import DEFAULT_VOLUME_MODE from .constants import DEFAULT_VOLUME_MODE
from .fuzzy import enhanced_command_not_found_handler from .fuzzy import enhanced_command_not_found_handler
@@ -78,25 +79,30 @@ finding_app = typer.Typer(
# === Top-level commands === # === Top-level commands ===
@app.command() @app.command()
def init( def init(
name: Optional[str] = typer.Option( name: Optional[str] = typer.Option(
None, "--name", "-n", None, "--name", "-n", help="Project name (defaults to current directory name)"
help="Project name (defaults to current directory name)"
), ),
api_url: Optional[str] = typer.Option( api_url: Optional[str] = typer.Option(
None, "--api-url", "-u", None,
help="FuzzForge API URL (defaults to http://localhost:8000)" "--api-url",
"-u",
help="FuzzForge API URL (defaults to http://localhost:8000)",
), ),
force: bool = typer.Option( force: bool = typer.Option(
False, "--force", "-f", False,
help="Force initialization even if project already exists" "--force",
) "-f",
help="Force initialization even if project already exists",
),
): ):
""" """
📁 Initialize a new FuzzForge project 📁 Initialize a new FuzzForge project
""" """
from .commands.init import project from .commands.init import project
project(name=name, api_url=api_url, force=force) project(name=name, api_url=api_url, force=force)
@@ -106,40 +112,18 @@ def status():
📊 Show project and latest execution status 📊 Show project and latest execution status
""" """
from .commands.status import show_status from .commands.status import show_status
show_status() show_status()
@app.command()
def config(
key: Optional[str] = typer.Argument(None, help="Configuration key"),
value: Optional[str] = typer.Argument(None, help="Configuration value to set")
):
"""
⚙️ Manage configuration (show all, get, or set values)
"""
from .commands import config as config_cmd
if key is None:
# No arguments: show all config
config_cmd.show_config(global_config=False)
elif value is None:
# Key only: get specific value
config_cmd.get_config(key=key, global_config=False)
else:
# Key and value: set value
config_cmd.set_config(key=key, value=value, global_config=False)
@app.command() @app.command()
def clean( def clean(
days: int = typer.Option( days: int = typer.Option(
90, "--days", "-d", 90, "--days", "-d", help="Remove data older than this many days"
help="Remove data older than this many days"
), ),
dry_run: bool = typer.Option( dry_run: bool = typer.Option(
False, "--dry-run", False, "--dry-run", help="Show what would be deleted without actually deleting"
help="Show what would be deleted without actually deleting" ),
)
): ):
""" """
🧹 Clean old execution data and findings 🧹 Clean old execution data and findings
@@ -155,7 +139,9 @@ def clean(
raise typer.Exit(1) raise typer.Exit(1)
if dry_run: if dry_run:
console.print(f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days") console.print(
f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days"
)
deleted = db.cleanup_old_runs(keep_days=days) deleted = db.cleanup_old_runs(keep_days=days)
@@ -177,35 +163,35 @@ workflow_app.command("retry")(workflow_exec.retry_workflow)
workflow_app.command("info")(workflows.workflow_info) workflow_app.command("info")(workflows.workflow_info)
workflow_app.command("params")(workflows.workflow_parameters) workflow_app.command("params")(workflows.workflow_parameters)
@workflow_app.command("run") @workflow_app.command("run")
def run_workflow( def run_workflow(
workflow: str = typer.Argument(help="Workflow name"), workflow: str = typer.Argument(help="Workflow name"),
target: str = typer.Argument(help="Target path"), target: str = typer.Argument(help="Target path"),
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"), params: List[str] = typer.Argument(
default=None, help="Parameters as key=value pairs"
),
param_file: Optional[str] = typer.Option( param_file: Optional[str] = typer.Option(
None, "--param-file", "-f", None, "--param-file", "-f", help="JSON file containing workflow parameters"
help="JSON file containing workflow parameters"
), ),
volume_mode: str = typer.Option( volume_mode: str = typer.Option(
DEFAULT_VOLUME_MODE, "--volume-mode", "-v", DEFAULT_VOLUME_MODE,
help="Volume mount mode: ro (read-only) or rw (read-write)" "--volume-mode",
"-v",
help="Volume mount mode: ro (read-only) or rw (read-write)",
), ),
timeout: Optional[int] = typer.Option( timeout: Optional[int] = typer.Option(
None, "--timeout", "-t", None, "--timeout", "-t", help="Execution timeout in seconds"
help="Execution timeout in seconds"
), ),
interactive: bool = typer.Option( interactive: bool = typer.Option(
True, "--interactive/--no-interactive", "-i/-n", True,
help="Interactive parameter input for missing required parameters" "--interactive/--no-interactive",
"-i/-n",
help="Interactive parameter input for missing required parameters",
), ),
wait: bool = typer.Option( wait: bool = typer.Option(
False, "--wait", "-w", False, "--wait", "-w", help="Wait for execution to complete"
help="Wait for execution to complete"
), ),
live: bool = typer.Option(
False, "--live", "-l",
help="Start live monitoring after execution (useful for fuzzing workflows)"
)
): ):
""" """
🚀 Execute a security testing workflow 🚀 Execute a security testing workflow
@@ -221,9 +207,9 @@ def run_workflow(
timeout=timeout, timeout=timeout,
interactive=interactive, interactive=interactive,
wait=wait, wait=wait,
live=live
) )
@workflow_app.callback() @workflow_app.callback()
def workflow_main(): def workflow_main():
""" """
@@ -239,17 +225,18 @@ def workflow_main():
# === Finding commands (singular) === # === Finding commands (singular) ===
@finding_app.command("export") @finding_app.command("export")
def export_finding( def export_finding(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"), execution_id: Optional[str] = typer.Argument(
None, help="Execution ID (defaults to latest)"
),
format: str = typer.Option( format: str = typer.Option(
"sarif", "--format", "-f", "sarif", "--format", "-f", help="Export format: sarif, json, csv"
help="Export format: sarif, json, csv"
), ),
output: Optional[str] = typer.Option( output: Optional[str] = typer.Option(
None, "--output", "-o", None, "--output", "-o", help="Output file (defaults to stdout)"
help="Output file (defaults to stdout)" ),
)
): ):
""" """
📤 Export findings to file 📤 Export findings to file
@@ -270,7 +257,9 @@ def export_finding(
execution_id = recent_runs[0].run_id execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}") console.print(f"🔍 Using most recent execution: {execution_id}")
else: else:
console.print("⚠️ No findings found in project database", style="yellow") console.print(
"⚠️ No findings found in project database", style="yellow"
)
return return
else: else:
console.print("❌ No project database found", style="red") console.print("❌ No project database found", style="red")
@@ -283,14 +272,16 @@ def export_finding(
@finding_app.command("analyze") @finding_app.command("analyze")
def analyze_finding( def analyze_finding(
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze") finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze"),
): ):
""" """
🤖 AI analysis of a finding 🤖 AI analysis of a finding
""" """
from .commands.ai import analyze_finding as ai_analyze from .commands.ai import analyze_finding as ai_analyze
ai_analyze(finding_id) ai_analyze(finding_id)
@finding_app.callback(invoke_without_command=True) @finding_app.callback(invoke_without_command=True)
def finding_main( def finding_main(
ctx: typer.Context, ctx: typer.Context,
@@ -309,7 +300,7 @@ def finding_main(
return return
# Get remaining arguments for direct viewing # Get remaining arguments for direct viewing
args = ctx.args if hasattr(ctx, 'args') else [] args = ctx.args if hasattr(ctx, "args") else []
finding_id = args[0] if args else None finding_id = args[0] if args else None
# Direct viewing: fuzzforge finding [id] # Direct viewing: fuzzforge finding [id]
@@ -329,7 +320,9 @@ def finding_main(
finding_id = recent_runs[0].run_id finding_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {finding_id}") console.print(f"🔍 Using most recent execution: {finding_id}")
else: else:
console.print("⚠️ No findings found in project database", style="yellow") console.print(
"⚠️ No findings found in project database", style="yellow"
)
return return
else: else:
console.print("❌ No project database found", style="red") console.print("❌ No project database found", style="red")
@@ -351,9 +344,10 @@ app.add_typer(workflow_app, name="workflow", help="🚀 Execute and manage workf
app.add_typer(finding_app, name="finding", help="🔍 View and analyze findings") app.add_typer(finding_app, name="finding", help="🔍 View and analyze findings")
# Other command groups # Other command groups
app.add_typer(monitor.app, name="monitor", help="📊 Real-time monitoring")
app.add_typer(ai.app, name="ai", help="🤖 AI integration features") app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI") app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")
app.add_typer(config_cmd.app, name="config", help="⚙️ Manage configuration settings")
# Help and utility commands # Help and utility commands
@app.command() @app.command()
@@ -371,13 +365,10 @@ def examples():
[bold]Execute Workflows:[/bold] [bold]Execute Workflows:[/bold]
ff workflow afl-fuzzing ./target # Run fuzzing on target ff workflow afl-fuzzing ./target # Run fuzzing on target
ff workflow afl-fuzzing . --live # Run with live monitoring
ff workflow scan-c ./src timeout=300 threads=4 # With parameters
[bold]Monitor Execution:[/bold] [bold]Monitor Execution:[/bold]
ff status # Check latest execution ff status # Check latest execution
ff workflow status # Same as above ff workflow status # Same as above
ff monitor # Live monitoring dashboard
ff workflow history # Show past executions ff workflow history # Show past executions
[bold]Review Findings:[/bold] [bold]Review Findings:[/bold]
@@ -399,16 +390,16 @@ def version():
📦 Show version information 📦 Show version information
""" """
from . import __version__ from . import __version__
console.print(f"FuzzForge CLI v{__version__}") console.print(f"FuzzForge CLI v{__version__}")
console.print(f"Short command: ff") console.print("Short command: ff")
@app.callback() @app.callback()
def main_callback( def main_callback(
ctx: typer.Context, ctx: typer.Context,
version: Optional[bool] = typer.Option( version: Optional[bool] = typer.Option(
None, "--version", "-v", None, "--version", "-v", help="Show version information"
help="Show version information"
), ),
): ):
""" """
@@ -422,6 +413,7 @@ def main_callback(
""" """
if version: if version:
from . import __version__ from . import __version__
console.print(f"FuzzForge CLI v{__version__}") console.print(f"FuzzForge CLI v{__version__}")
raise typer.Exit() raise typer.Exit()
@@ -432,12 +424,11 @@ def main():
if len(sys.argv) > 1: if len(sys.argv) > 1:
args = sys.argv[1:] args = sys.argv[1:]
# Handle finding command with pattern recognition # Handle finding command with pattern recognition
if len(args) >= 2 and args[0] == 'finding': if len(args) >= 2 and args[0] == "finding":
finding_subcommands = ['export', 'analyze'] finding_subcommands = ["export", "analyze"]
# Skip custom dispatching if help flags are present # Skip custom dispatching if help flags are present
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args): if not any(arg in ["--help", "-h", "--version", "-v"] for arg in args):
if args[1] not in finding_subcommands: if args[1] not in finding_subcommands:
# Direct finding display: ff finding <id> # Direct finding display: ff finding <id>
from .commands.findings import get_findings from .commands.findings import get_findings
@@ -457,18 +448,26 @@ def main():
app() app()
except SystemExit as e: except SystemExit as e:
# Enhanced error handling for command not found # Enhanced error handling for command not found
if hasattr(e, 'code') and e.code != 0 and len(sys.argv) > 1: if hasattr(e, "code") and e.code != 0 and len(sys.argv) > 1:
command_parts = sys.argv[1:] command_parts = sys.argv[1:]
clean_parts = [part for part in command_parts if not part.startswith('-')] clean_parts = [part for part in command_parts if not part.startswith("-")]
if clean_parts: if clean_parts:
main_cmd = clean_parts[0] main_cmd = clean_parts[0]
valid_commands = [ valid_commands = [
'init', 'status', 'config', 'clean', "init",
'workflows', 'workflow', "status",
'findings', 'finding', "config",
'monitor', 'ai', 'ingest', "clean",
'examples', 'version' "workflows",
"workflow",
"findings",
"finding",
"monitor",
"ai",
"ingest",
"examples",
"version",
] ]
if main_cmd not in valid_commands: if main_cmd not in valid_commands: