Compare commits

...

8 Commits

Author SHA1 Message Date
Tanguy Duhamel
1ba80c466b fix: register config as command group instead of custom function
The config command was implemented as a custom function that manually
routed to subcommands, which caused 'ff config show' to fail. It
treated 'show' as a configuration key argument instead of a subcommand.

Now properly registered as a Typer command group, enabling all config
subcommands (show, set, get, reset, edit) to work correctly.
2025-10-03 11:13:34 +02:00
abel
c9f8926bc3 ci: run in any situation on docs folder changes 2025-10-02 17:22:15 +02:00
abel
d2e0b61b67 fix: run only when changes to docs folder 2025-10-02 17:21:14 +02:00
tduhamel42
c2de6eae7d Merge pull request #10 from FuzzingLabs/refactor/remove-monitor-command
refactor: removed monitor command and --live parameter
2025-10-02 16:17:33 +02:00
tduhamel42
60b69667e7 Merge branch 'master' into refactor/remove-monitor-command 2025-10-02 16:12:01 +02:00
tduhamel42
28b0712f2f Merge pull request #11 from FuzzingLabs/fix/remove-erroneous-cli-example
fix: removed erroneous example
2025-10-02 16:08:23 +02:00
abel
a53d6c9ae5 fix: removed erroneous example 2025-10-02 16:01:54 +02:00
abel
928a5f5f77 refactor: removed monitor command and --live parameter 2025-10-02 15:49:18 +02:00
8 changed files with 350 additions and 774 deletions

View File

@@ -1,10 +1,13 @@
name: Deploy Docusaurus to GitHub Pages
on:
workflow_dispatch:
push:
branches:
- master
workflow_dispatch:
paths:
- "docs/**"
jobs:
build:

View File

@@ -1,9 +1,14 @@
name: Docusaurus test deployment
on:
workflow_dispatch:
push:
paths:
- "docs/**"
pull_request:
branches:
- master
paths:
- "docs/**"
jobs:
test-deploy:

View File

@@ -80,8 +80,6 @@ fuzzforge workflows info security_assessment
# Submit a workflow for analysis
fuzzforge workflow security_assessment /path/to/your/code
# Monitor progress in real-time
fuzzforge monitor live <execution-id>
# View findings when complete
fuzzforge finding <execution-id>
@@ -179,7 +177,6 @@ fuzzforge workflow security_assessment /path/to/code --wait
- `--timeout, -t` - Execution timeout in seconds
- `--interactive/--no-interactive, -i/-n` - Interactive parameter input
- `--wait, -w` - Wait for execution to complete
- `--live, -l` - Show live monitoring during execution
#### `fuzzforge workflow status [execution-id]`
Check the status of a workflow execution.
@@ -261,39 +258,6 @@ fuzzforge finding export abc123def456 --format csv --output report.csv
fuzzforge finding export --format html --output report.html
```
### Real-time Monitoring
#### `fuzzforge monitor stats <execution-id>`
Show current fuzzing statistics.
```bash
# Show stats once
fuzzforge monitor stats abc123def456 --once
# Live updating stats (default)
fuzzforge monitor stats abc123def456 --refresh 5
```
#### `fuzzforge monitor crashes <run-id>`
Display crash reports for a fuzzing run.
```bash
fuzzforge monitor crashes abc123def456 --limit 50
```
#### `fuzzforge monitor live <run-id>`
Real-time monitoring dashboard with live updates.
```bash
fuzzforge monitor live abc123def456 --refresh 3
```
Features:
- Live updating statistics
- Progress indicators and bars
- Run status monitoring
- Automatic completion detection
### Configuration Management
#### `fuzzforge config show`
@@ -495,7 +459,6 @@ cli/
│ ├── workflows.py # Workflow management
│ ├── runs.py # Run management
│ ├── findings.py # Findings management
│ ├── monitor.py # Real-time monitoring
│ ├── config.py # Configuration commands
│ └── status.py # Status information
├── pyproject.toml # Project configuration
@@ -576,7 +539,6 @@ fuzzforge --help
# Command-specific help
ff workflows --help
ff workflow run --help
ff monitor live --help
# Show version
fuzzforge --version
@@ -618,4 +580,4 @@ Contributions are welcome! Please see the main FuzzForge repository for contribu
---
**FuzzForge CLI** - Making security testing workflows accessible and efficient from the command line.
**FuzzForge CLI** - Making security testing workflows accessible and efficient from the command line.

View File

@@ -10,11 +10,10 @@
#
# Additional attribution and requirements are provided in the NOTICE file.
from __future__ import annotations
from pathlib import Path
import os
from pathlib import Path
from textwrap import dedent
from typing import Optional
@@ -32,17 +31,20 @@ app = typer.Typer()
@app.command()
def project(
name: Optional[str] = typer.Option(
None, "--name", "-n",
help="Project name (defaults to current directory name)"
None, "--name", "-n", help="Project name (defaults to current directory name)"
),
api_url: Optional[str] = typer.Option(
None, "--api-url", "-u",
help="FuzzForge API URL (defaults to http://localhost:8000)"
None,
"--api-url",
"-u",
help="FuzzForge API URL (defaults to http://localhost:8000)",
),
force: bool = typer.Option(
False, "--force", "-f",
help="Force initialization even if project already exists"
)
False,
"--force",
"-f",
help="Force initialization even if project already exists",
),
):
"""
📁 Initialize a new FuzzForge project in the current directory.
@@ -58,24 +60,20 @@ def project(
# Check if project already exists
if fuzzforge_dir.exists() and not force:
if fuzzforge_dir.is_dir() and any(fuzzforge_dir.iterdir()):
console.print("❌ FuzzForge project already exists in this directory", style="red")
console.print(
"❌ FuzzForge project already exists in this directory", style="red"
)
console.print("Use --force to reinitialize", style="dim")
raise typer.Exit(1)
# Get project name
if not name:
name = Prompt.ask(
"Project name",
default=current_dir.name,
console=console
)
name = Prompt.ask("Project name", default=current_dir.name, console=console)
# Get API URL
if not api_url:
api_url = Prompt.ask(
"FuzzForge API URL",
default="http://localhost:8000",
console=console
"FuzzForge API URL", default="http://localhost:8000", console=console
)
# Confirm initialization
@@ -117,15 +115,15 @@ def project(
]
if gitignore_path.exists():
with open(gitignore_path, 'r') as f:
with open(gitignore_path, "r") as f:
existing_content = f.read()
if "# FuzzForge CLI" not in existing_content:
with open(gitignore_path, 'a') as f:
with open(gitignore_path, "a") as f:
f.write(f"\n{chr(10).join(gitignore_entries)}\n")
console.print("📝 Updated .gitignore with FuzzForge entries")
else:
with open(gitignore_path, 'w') as f:
with open(gitignore_path, "w") as f:
f.write(f"{chr(10).join(gitignore_entries)}\n")
console.print("📝 Created .gitignore")
@@ -145,9 +143,6 @@ fuzzforge workflows
# Submit a workflow for analysis
fuzzforge workflow <workflow-name> /path/to/target
# Monitor run progress
fuzzforge monitor live <run-id>
# View findings
fuzzforge finding <run-id>
```
@@ -159,12 +154,12 @@ fuzzforge finding <run-id>
- `.fuzzforge/findings.db` - Local database for runs and findings
"""
with open(readme_path, 'w') as f:
with open(readme_path, "w") as f:
f.write(readme_content)
console.print("📚 Created README.md")
console.print("\n✅ FuzzForge project initialized successfully!", style="green")
console.print(f"\n🎯 Next steps:")
console.print("\n🎯 Next steps:")
console.print(" • ff workflows - See available workflows")
console.print(" • ff status - Check API connectivity")
console.print(" • ff workflow <workflow> <path> - Start your first analysis")

View File

@@ -1,436 +0,0 @@
"""
Real-time monitoring and statistics commands.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import time
from datetime import datetime, timedelta
from typing import Optional
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.live import Live
from rich.layout import Layout
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
from rich.align import Align
from rich import box
from ..config import get_project_config, FuzzForgeConfig
from ..database import get_project_db, ensure_project_db, CrashRecord
from fuzzforge_sdk import FuzzForgeClient
console = Console()
app = typer.Typer()
def get_client() -> FuzzForgeClient:
"""Get configured FuzzForge client"""
config = get_project_config() or FuzzForgeConfig()
return FuzzForgeClient(base_url=config.get_api_url(), timeout=config.get_timeout())
def format_duration(seconds: int) -> str:
"""Format duration in human readable format"""
if seconds < 60:
return f"{seconds}s"
elif seconds < 3600:
return f"{seconds // 60}m {seconds % 60}s"
else:
hours = seconds // 3600
minutes = (seconds % 3600) // 60
return f"{hours}h {minutes}m"
def format_number(num: int) -> str:
"""Format large numbers with K, M suffixes"""
if num >= 1000000:
return f"{num / 1000000:.1f}M"
elif num >= 1000:
return f"{num / 1000:.1f}K"
else:
return str(num)
@app.command("stats")
def fuzzing_stats(
run_id: str = typer.Argument(..., help="Run ID to get statistics for"),
refresh: int = typer.Option(
5, "--refresh", "-r",
help="Refresh interval in seconds"
),
once: bool = typer.Option(
False, "--once",
help="Show stats once and exit"
)
):
"""
📊 Show current fuzzing statistics for a run
"""
try:
with get_client() as client:
if once:
# Show stats once
stats = client.get_fuzzing_stats(run_id)
display_stats_table(stats)
else:
# Live updating stats
console.print(f"📊 [bold]Live Fuzzing Statistics[/bold] (Run: {run_id[:12]}...)")
console.print(f"Refreshing every {refresh}s. Press Ctrl+C to stop.\n")
with Live(auto_refresh=False, console=console) as live:
while True:
try:
stats = client.get_fuzzing_stats(run_id)
table = create_stats_table(stats)
live.update(table, refresh=True)
time.sleep(refresh)
except KeyboardInterrupt:
console.print("\n📊 Monitoring stopped", style="yellow")
break
except Exception as e:
console.print(f"❌ Failed to get fuzzing stats: {e}", style="red")
raise typer.Exit(1)
def display_stats_table(stats):
"""Display stats in a simple table"""
table = create_stats_table(stats)
console.print(table)
def create_stats_table(stats) -> Panel:
"""Create a rich table for fuzzing statistics"""
# Create main stats table
stats_table = Table(show_header=False, box=box.SIMPLE)
stats_table.add_column("Metric", style="bold cyan")
stats_table.add_column("Value", justify="right", style="bold white")
stats_table.add_row("Total Executions", format_number(stats.executions))
stats_table.add_row("Executions/sec", f"{stats.executions_per_sec:.1f}")
stats_table.add_row("Total Crashes", format_number(stats.crashes))
stats_table.add_row("Unique Crashes", format_number(stats.unique_crashes))
if stats.coverage is not None:
stats_table.add_row("Code Coverage", f"{stats.coverage:.1f}%")
stats_table.add_row("Corpus Size", format_number(stats.corpus_size))
stats_table.add_row("Elapsed Time", format_duration(stats.elapsed_time))
if stats.last_crash_time:
time_since_crash = datetime.now() - stats.last_crash_time
stats_table.add_row("Last Crash", f"{format_duration(int(time_since_crash.total_seconds()))} ago")
return Panel.fit(
stats_table,
title=f"📊 Fuzzing Statistics - {stats.workflow}",
subtitle=f"Run: {stats.run_id[:12]}...",
box=box.ROUNDED
)
@app.command("crashes")
def crash_reports(
run_id: str = typer.Argument(..., help="Run ID to get crash reports for"),
save: bool = typer.Option(
True, "--save/--no-save",
help="Save crashes to local database"
),
limit: int = typer.Option(
50, "--limit", "-l",
help="Maximum number of crashes to show"
)
):
"""
🐛 Display crash reports for a fuzzing run
"""
try:
with get_client() as client:
console.print(f"🐛 Fetching crash reports for run: {run_id}")
crashes = client.get_crash_reports(run_id)
if not crashes:
console.print("✅ No crashes found!", style="green")
return
# Save to database if requested
if save:
db = ensure_project_db()
for crash in crashes:
crash_record = CrashRecord(
run_id=run_id,
crash_id=crash.crash_id,
signal=crash.signal,
stack_trace=crash.stack_trace,
input_file=crash.input_file,
severity=crash.severity,
timestamp=crash.timestamp
)
db.save_crash(crash_record)
console.print("✅ Crashes saved to local database")
# Display crashes
crashes_to_show = crashes[:limit]
# Summary
severity_counts = {}
signal_counts = {}
for crash in crashes:
severity_counts[crash.severity] = severity_counts.get(crash.severity, 0) + 1
if crash.signal:
signal_counts[crash.signal] = signal_counts.get(crash.signal, 0) + 1
summary_table = Table(show_header=False, box=box.SIMPLE)
summary_table.add_column("Metric", style="bold cyan")
summary_table.add_column("Value", justify="right")
summary_table.add_row("Total Crashes", str(len(crashes)))
summary_table.add_row("Unique Signals", str(len(signal_counts)))
for severity, count in sorted(severity_counts.items()):
summary_table.add_row(f"{severity.title()} Severity", str(count))
console.print(
Panel.fit(
summary_table,
title=f"🐛 Crash Summary",
box=box.ROUNDED
)
)
# Detailed crash table
if crashes_to_show:
crashes_table = Table(box=box.ROUNDED)
crashes_table.add_column("Crash ID", style="bold cyan")
crashes_table.add_column("Signal", justify="center")
crashes_table.add_column("Severity", justify="center")
crashes_table.add_column("Timestamp", justify="center")
crashes_table.add_column("Input File", style="dim")
for crash in crashes_to_show:
signal_emoji = {
"SIGSEGV": "💥",
"SIGABRT": "🛑",
"SIGFPE": "🧮",
"SIGILL": "⚠️"
}.get(crash.signal or "", "🐛")
severity_style = {
"high": "red",
"medium": "yellow",
"low": "green"
}.get(crash.severity.lower(), "white")
input_display = ""
if crash.input_file:
input_display = crash.input_file.split("/")[-1] # Show just filename
crashes_table.add_row(
crash.crash_id[:12] + "..." if len(crash.crash_id) > 15 else crash.crash_id,
f"{signal_emoji} {crash.signal or 'Unknown'}",
f"[{severity_style}]{crash.severity}[/{severity_style}]",
crash.timestamp.strftime("%H:%M:%S"),
input_display
)
console.print(f"\n🐛 [bold]Crash Details[/bold]")
if len(crashes) > limit:
console.print(f"Showing first {limit} of {len(crashes)} crashes")
console.print()
console.print(crashes_table)
console.print(f"\n💡 Use [bold cyan]fuzzforge finding {run_id}[/bold cyan] for detailed analysis")
except Exception as e:
console.print(f"❌ Failed to get crash reports: {e}", style="red")
raise typer.Exit(1)
def _live_monitor(run_id: str, refresh: int):
"""Helper for live monitoring to allow for cleaner exit handling"""
with get_client() as client:
start_time = time.time()
def render_layout(run_status, stats):
layout = Layout()
layout.split_column(
Layout(name="header", size=3),
Layout(name="main", ratio=1),
Layout(name="footer", size=3)
)
layout["main"].split_row(
Layout(name="stats", ratio=1),
Layout(name="progress", ratio=1)
)
header = Panel(
f"[bold]FuzzForge Live Monitor[/bold]\n"
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
f"Uptime: {format_duration(int(time.time() - start_time))}",
box=box.ROUNDED,
style="cyan"
)
layout["header"].update(header)
layout["stats"].update(create_stats_table(stats))
progress_table = Table(show_header=False, box=box.SIMPLE)
progress_table.add_column("Metric", style="bold")
progress_table.add_column("Progress")
if stats.executions > 0:
exec_rate_percent = min(100, (stats.executions_per_sec / 1000) * 100)
progress_table.add_row("Exec Rate", create_progress_bar(exec_rate_percent, "green"))
crash_rate = (stats.crashes / stats.executions) * 100000
crash_rate_percent = min(100, crash_rate * 10)
progress_table.add_row("Crash Rate", create_progress_bar(crash_rate_percent, "red"))
if stats.coverage is not None:
progress_table.add_row("Coverage", create_progress_bar(stats.coverage, "blue"))
layout["progress"].update(Panel.fit(progress_table, title="📊 Progress Indicators", box=box.ROUNDED))
footer = Panel(
f"Last updated: {datetime.now().strftime('%H:%M:%S')} | "
f"Refresh interval: {refresh}s | Press Ctrl+C to exit",
box=box.ROUNDED,
style="dim"
)
layout["footer"].update(footer)
return layout
with Live(auto_refresh=False, console=console, screen=True) as live:
# Initial fetch
try:
run_status = client.get_run_status(run_id)
stats = client.get_fuzzing_stats(run_id)
except Exception:
# Minimal fallback stats
class FallbackStats:
def __init__(self, run_id):
self.run_id = run_id
self.workflow = "unknown"
self.executions = 0
self.executions_per_sec = 0.0
self.crashes = 0
self.unique_crashes = 0
self.coverage = None
self.corpus_size = 0
self.elapsed_time = 0
self.last_crash_time = None
stats = FallbackStats(run_id)
run_status = type("RS", (), {"status":"Unknown","is_completed":False,"is_failed":False})()
live.update(render_layout(run_status, stats), refresh=True)
# Simple polling approach that actually works
consecutive_errors = 0
max_errors = 5
while True:
try:
# Poll for updates
try:
run_status = client.get_run_status(run_id)
consecutive_errors = 0
except Exception as e:
consecutive_errors += 1
if consecutive_errors >= max_errors:
console.print(f"❌ Too many errors getting run status: {e}", style="red")
break
time.sleep(refresh)
continue
# Try to get fuzzing stats
try:
stats = client.get_fuzzing_stats(run_id)
except Exception as e:
# Create fallback stats if not available
stats = FallbackStats(run_id)
# Update display
live.update(render_layout(run_status, stats), refresh=True)
# Check if completed
if getattr(run_status, 'is_completed', False) or getattr(run_status, 'is_failed', False):
# Show final state for a few seconds
console.print("\n🏁 Run completed. Showing final state for 10 seconds...")
time.sleep(10)
break
# Wait before next poll
time.sleep(refresh)
except KeyboardInterrupt:
raise
except Exception as e:
console.print(f"⚠️ Monitoring error: {e}", style="yellow")
time.sleep(refresh)
# Completed status update
final_message = (
f"[bold]FuzzForge Live Monitor - COMPLETED[/bold]\n"
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
f"Total runtime: {format_duration(int(time.time() - start_time))}"
)
style = "green" if getattr(run_status, 'is_completed', False) else "red"
live.update(Panel(final_message, box=box.ROUNDED, style=style), refresh=True)
@app.command("live")
def live_monitor(
run_id: str = typer.Argument(..., help="Run ID to monitor live"),
refresh: int = typer.Option(
2, "--refresh", "-r",
help="Refresh interval in seconds (fallback when streaming unavailable)"
)
):
"""
📺 Real-time monitoring dashboard with live updates (WebSocket/SSE with REST fallback)
"""
console.print(f"📺 [bold]Live Monitoring Dashboard[/bold]")
console.print(f"Run: {run_id}")
console.print(f"Press Ctrl+C to stop monitoring\n")
try:
_live_monitor(run_id, refresh)
except KeyboardInterrupt:
console.print("\n📊 Monitoring stopped by user.", style="yellow")
except Exception as e:
console.print(f"❌ Failed to start live monitoring: {e}", style="red")
raise typer.Exit(1)
def create_progress_bar(percentage: float, color: str = "green") -> str:
"""Create a simple text progress bar"""
width = 20
filled = int((percentage / 100) * width)
bar = "" * filled + "" * (width - filled)
return f"[{color}]{bar}[/{color}] {percentage:.1f}%"
@app.callback(invoke_without_command=True)
def monitor_callback(ctx: typer.Context):
"""
📊 Real-time monitoring and statistics
"""
# Check if a subcommand is being invoked
if ctx.invoked_subcommand is not None:
# Let the subcommand handle it
return
# Show not implemented message for default command
from rich.console import Console
console = Console()
console.print("🚧 [yellow]Monitor command is not fully implemented yet.[/yellow]")
console.print("Please use specific subcommands:")
console.print(" • [cyan]ff monitor stats <run-id>[/cyan] - Show execution statistics")
console.print(" • [cyan]ff monitor crashes <run-id>[/cyan] - Show crash reports")
console.print(" • [cyan]ff monitor live <run-id>[/cyan] - Live monitoring dashboard")

View File

@@ -13,39 +13,47 @@ Replaces the old 'runs' terminology with cleaner workflow-centric commands.
#
# Additional attribution and requirements are provided in the NOTICE file.
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Optional, Dict, Any, List
from typing import Any, Dict, List, Optional
import typer
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
from rich.prompt import Prompt, Confirm
from rich.live import Live
from rich import box
from ..config import get_project_config, FuzzForgeConfig
from ..database import get_project_db, ensure_project_db, RunRecord
from ..exceptions import (
handle_error, retry_on_network_error, safe_json_load, require_project,
APIConnectionError, ValidationError, DatabaseError, FileOperationError
)
from ..validation import (
validate_run_id, validate_workflow_name, validate_target_path,
validate_volume_mode, validate_parameters, validate_timeout
)
from ..progress import progress_manager, spinner, step_progress
from ..completion import WorkflowNameComplete, TargetPathComplete, VolumeModetComplete
from ..constants import (
STATUS_EMOJIS, MAX_RUN_ID_DISPLAY_LENGTH, DEFAULT_VOLUME_MODE,
PROGRESS_STEP_DELAYS, MAX_RETRIES, RETRY_DELAY, POLL_INTERVAL
)
from fuzzforge_sdk import FuzzForgeClient, WorkflowSubmission
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Confirm, Prompt
from rich.table import Table
from ..config import FuzzForgeConfig, get_project_config
from ..constants import (
DEFAULT_VOLUME_MODE,
MAX_RETRIES,
MAX_RUN_ID_DISPLAY_LENGTH,
POLL_INTERVAL,
PROGRESS_STEP_DELAYS,
RETRY_DELAY,
STATUS_EMOJIS,
)
from ..database import RunRecord, ensure_project_db, get_project_db
from ..exceptions import (
DatabaseError,
ValidationError,
handle_error,
require_project,
retry_on_network_error,
safe_json_load,
)
from ..progress import step_progress
from ..validation import (
validate_parameters,
validate_run_id,
validate_target_path,
validate_timeout,
validate_volume_mode,
validate_workflow_name,
)
console = Console()
app = typer.Typer()
@@ -75,7 +83,7 @@ def execute_workflow_submission(
parameters: Dict[str, Any],
volume_mode: str,
timeout: Optional[int],
interactive: bool
interactive: bool,
) -> Any:
"""Handle the workflow submission process"""
# Get workflow metadata for parameter validation
@@ -92,7 +100,9 @@ def execute_workflow_submission(
missing_required = required_params - set(parameters.keys())
if missing_required:
console.print(f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}")
console.print(
f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}"
)
console.print("Please provide values:\n")
for param_name in missing_required:
@@ -114,9 +124,18 @@ def execute_workflow_submission(
elif param_type == "number":
parameters[param_name] = float(user_input)
elif param_type == "boolean":
parameters[param_name] = user_input.lower() in ("true", "yes", "1", "on")
parameters[param_name] = user_input.lower() in (
"true",
"yes",
"1",
"on",
)
elif param_type == "array":
parameters[param_name] = [item.strip() for item in user_input.split(",") if item.strip()]
parameters[param_name] = [
item.strip()
for item in user_input.split(",")
if item.strip()
]
else:
parameters[param_name] = user_input
break
@@ -127,8 +146,9 @@ def execute_workflow_submission(
validate_volume_mode(volume_mode)
if volume_mode not in workflow_meta.supported_volume_modes:
raise ValidationError(
"volume mode", volume_mode,
f"one of: {', '.join(workflow_meta.supported_volume_modes)}"
"volume mode",
volume_mode,
f"one of: {', '.join(workflow_meta.supported_volume_modes)}",
)
# Create submission
@@ -136,11 +156,11 @@ def execute_workflow_submission(
target_path=target_path,
volume_mode=volume_mode,
parameters=parameters,
timeout=timeout
timeout=timeout,
)
# Show submission summary
console.print(f"\n🎯 [bold]Executing workflow:[/bold]")
console.print("\n🎯 [bold]Executing workflow:[/bold]")
console.print(f" Workflow: {workflow}")
console.print(f" Target: {target_path}")
console.print(f" Volume Mode: {volume_mode}")
@@ -165,7 +185,7 @@ def execute_workflow_submission(
"Connecting to FuzzForge API",
"Uploading parameters and settings",
"Creating workflow deployment",
"Initializing execution environment"
"Initializing execution environment",
]
with step_progress(steps, f"Executing {workflow}") as progress:
@@ -185,46 +205,46 @@ def execute_workflow_submission(
progress.next_step() # Initializing
time.sleep(PROGRESS_STEP_DELAYS["initializing"])
progress.complete(f"Workflow started successfully!")
progress.complete("Workflow started successfully!")
return response
# Main workflow execution command (replaces 'runs submit')
@app.command(name="exec", hidden=True) # Hidden because it will be called from main workflow command
@app.command(
name="exec", hidden=True
) # Hidden because it will be called from main workflow command
def execute_workflow(
workflow: str = typer.Argument(..., help="Workflow name to execute"),
target_path: str = typer.Argument(..., help="Path to analyze"),
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
params: List[str] = typer.Argument(
default=None, help="Parameters as key=value pairs"
),
param_file: Optional[str] = typer.Option(
None, "--param-file", "-f",
help="JSON file containing workflow parameters"
None, "--param-file", "-f", help="JSON file containing workflow parameters"
),
volume_mode: str = typer.Option(
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
help="Volume mount mode: ro (read-only) or rw (read-write)"
DEFAULT_VOLUME_MODE,
"--volume-mode",
"-v",
help="Volume mount mode: ro (read-only) or rw (read-write)",
),
timeout: Optional[int] = typer.Option(
None, "--timeout", "-t",
help="Execution timeout in seconds"
None, "--timeout", "-t", help="Execution timeout in seconds"
),
interactive: bool = typer.Option(
True, "--interactive/--no-interactive", "-i/-n",
help="Interactive parameter input for missing required parameters"
True,
"--interactive/--no-interactive",
"-i/-n",
help="Interactive parameter input for missing required parameters",
),
wait: bool = typer.Option(
False, "--wait", "-w",
help="Wait for execution to complete"
False, "--wait", "-w", help="Wait for execution to complete"
),
live: bool = typer.Option(
False, "--live", "-l",
help="Start live monitoring after execution (useful for fuzzing workflows)"
)
):
"""
🚀 Execute a workflow on a target
Use --live for fuzzing workflows to see real-time progress.
Use --wait to wait for completion without live dashboard.
"""
try:
@@ -264,13 +284,20 @@ def execute_workflow(
try:
with get_client() as client:
response = execute_workflow_submission(
client, workflow, target_path, parameters,
volume_mode, timeout, interactive
client,
workflow,
target_path,
parameters,
volume_mode,
timeout,
interactive,
)
console.print(f"✅ Workflow execution started!", style="green")
console.print("✅ Workflow execution started!", style="green")
console.print(f" Execution ID: [bold cyan]{response.run_id}[/bold cyan]")
console.print(f" Status: {status_emoji(response.status)} {response.status}")
console.print(
f" Status: {status_emoji(response.status)} {response.status}"
)
# Save to database
try:
@@ -281,65 +308,55 @@ def execute_workflow(
status=response.status,
target_path=target_path,
parameters=parameters,
created_at=datetime.now()
created_at=datetime.now(),
)
db.save_run(run_record)
except Exception as e:
# Don't fail the whole operation if database save fails
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
console.print(
f"⚠️ Failed to save execution to database: {e}", style="yellow"
)
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
console.print(f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]")
# Suggest --live for fuzzing workflows
if not live and not wait and "fuzzing" in workflow.lower():
console.print(f"💡 Next time try: [bold cyan]fuzzforge workflow {workflow} {target_path} --live[/bold cyan] for real-time fuzzing dashboard", style="dim")
# Start live monitoring if requested
if live:
# Check if this is a fuzzing workflow to show appropriate messaging
is_fuzzing = "fuzzing" in workflow.lower()
if is_fuzzing:
console.print(f"\n📺 Starting live fuzzing dashboard...")
console.print("💡 You'll see real-time crash discovery, execution stats, and coverage data.")
else:
console.print(f"\n📺 Starting live monitoring dashboard...")
console.print("Press Ctrl+C to stop monitoring (execution continues in background).\n")
try:
from ..commands.monitor import live_monitor
# Import monitor command and run it
live_monitor(response.run_id, refresh=3)
except KeyboardInterrupt:
console.print(f"\n⏹️ Live monitoring stopped (execution continues in background)", style="yellow")
except Exception as e:
console.print(f"⚠️ Failed to start live monitoring: {e}", style="yellow")
console.print(f"💡 You can still monitor manually: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
console.print(
f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]"
)
# Wait for completion if requested
elif wait:
console.print(f"\n⏳ Waiting for execution to complete...")
if wait:
console.print("\n⏳ Waiting for execution to complete...")
try:
final_status = client.wait_for_completion(response.run_id, poll_interval=POLL_INTERVAL)
final_status = client.wait_for_completion(
response.run_id, poll_interval=POLL_INTERVAL
)
# Update database
try:
db.update_run_status(
response.run_id,
final_status.status,
completed_at=datetime.now() if final_status.is_completed else None
completed_at=datetime.now()
if final_status.is_completed
else None,
)
except Exception as e:
console.print(f"⚠️ Failed to update database: {e}", style="yellow")
console.print(
f"⚠️ Failed to update database: {e}", style="yellow"
)
console.print(f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}")
console.print(
f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}"
)
if final_status.is_completed:
console.print(f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]")
console.print(
f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]"
)
except KeyboardInterrupt:
console.print(f"\n⏹️ Monitoring cancelled (execution continues in background)", style="yellow")
console.print(
"\n⏹️ Monitoring cancelled (execution continues in background)",
style="yellow",
)
except Exception as e:
handle_error(e, "waiting for completion")
@@ -349,7 +366,9 @@ def execute_workflow(
@app.command("status")
def workflow_status(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to check (defaults to most recent)")
execution_id: Optional[str] = typer.Argument(
None, help="Execution ID to check (defaults to most recent)"
),
):
"""
📊 Check the status of a workflow execution
@@ -368,7 +387,9 @@ def workflow_status(
if not execution_id:
recent_runs = db.list_runs(limit=1)
if not recent_runs:
console.print("⚠️ No executions found in project database", style="yellow")
console.print(
"⚠️ No executions found in project database", style="yellow"
)
raise typer.Exit(0)
execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}")
@@ -384,7 +405,7 @@ def workflow_status(
db.update_run_status(
execution_id,
status.status,
completed_at=status.updated_at if status.is_completed else None
completed_at=status.updated_at if status.is_completed else None,
)
except Exception as e:
console.print(f"⚠️ Failed to update database: {e}", style="yellow")
@@ -404,23 +425,24 @@ def workflow_status(
if status.is_completed:
duration = status.updated_at - status.created_at
status_table.add_row("Duration", str(duration).split('.')[0]) # Remove microseconds
status_table.add_row(
"Duration", str(duration).split(".")[0]
) # Remove microseconds
console.print(
Panel.fit(
status_table,
title=f"📊 Status Information",
box=box.ROUNDED
)
Panel.fit(status_table, title="📊 Status Information", box=box.ROUNDED)
)
# Show next steps
if status.is_running:
console.print(f"\n💡 Monitor live: [bold cyan]fuzzforge monitor {execution_id}[/bold cyan]")
elif status.is_completed:
console.print(f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]")
if status.is_completed:
console.print(
f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]"
)
elif status.is_failed:
console.print(f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]")
console.print(
f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]"
)
except Exception as e:
handle_error(e, "getting execution status")
@@ -428,9 +450,15 @@ def workflow_status(
@app.command("history")
def workflow_history(
workflow: Optional[str] = typer.Option(None, "--workflow", "-w", help="Filter by workflow name"),
status: Optional[str] = typer.Option(None, "--status", "-s", help="Filter by status"),
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of executions to show")
workflow: Optional[str] = typer.Option(
None, "--workflow", "-w", help="Filter by workflow name"
),
status: Optional[str] = typer.Option(
None, "--status", "-s", help="Filter by status"
),
limit: int = typer.Option(
20, "--limit", "-l", help="Maximum number of executions to show"
),
):
"""
📋 Show workflow execution history
@@ -463,12 +491,14 @@ def workflow_history(
param_str = f"{param_count} params" if param_count > 0 else "-"
table.add_row(
run.run_id[:12] + "..." if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH else run.run_id,
run.run_id[:12] + "..."
if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH
else run.run_id,
run.workflow,
f"{status_emoji(run.status)} {run.status}",
Path(run.target_path).name,
run.created_at.strftime("%m-%d %H:%M"),
param_str
param_str,
)
console.print(f"\n📋 [bold]Workflow Execution History ({len(runs)})[/bold]")
@@ -479,7 +509,9 @@ def workflow_history(
console.print()
console.print(table)
console.print(f"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status")
console.print(
"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status"
)
except Exception as e:
handle_error(e, "listing execution history")
@@ -487,11 +519,15 @@ def workflow_history(
@app.command("retry")
def retry_workflow(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to retry (defaults to most recent)"),
execution_id: Optional[str] = typer.Argument(
None, help="Execution ID to retry (defaults to most recent)"
),
modify_params: bool = typer.Option(
False, "--modify-params", "-m",
help="Interactively modify parameters before retrying"
)
False,
"--modify-params",
"-m",
help="Interactively modify parameters before retrying",
),
):
"""
🔄 Retry a workflow execution with the same or modified parameters
@@ -517,7 +553,9 @@ def retry_workflow(
# Get original execution
original_run = db.get_run(execution_id)
if not original_run:
raise ValidationError("execution_id", execution_id, "an existing execution ID in the database")
raise ValidationError(
"execution_id", execution_id, "an existing execution ID in the database"
)
console.print(f"🔄 [bold]Retrying workflow:[/bold] {original_run.workflow}")
console.print(f" Original Execution ID: {execution_id}")
@@ -527,24 +565,29 @@ def retry_workflow(
# Modify parameters if requested
if modify_params and parameters:
console.print(f"\n📝 [bold]Current parameters:[/bold]")
console.print("\n📝 [bold]Current parameters:[/bold]")
for key, value in parameters.items():
new_value = Prompt.ask(
f"{key}",
default=str(value),
console=console
)
new_value = Prompt.ask(f"{key}", default=str(value), console=console)
if new_value != str(value):
# Try to maintain type
try:
if isinstance(value, bool):
parameters[key] = new_value.lower() in ("true", "yes", "1", "on")
parameters[key] = new_value.lower() in (
"true",
"yes",
"1",
"on",
)
elif isinstance(value, int):
parameters[key] = int(new_value)
elif isinstance(value, float):
parameters[key] = float(new_value)
elif isinstance(value, list):
parameters[key] = [item.strip() for item in new_value.split(",") if item.strip()]
parameters[key] = [
item.strip()
for item in new_value.split(",")
if item.strip()
]
else:
parameters[key] = new_value
except ValueError:
@@ -553,15 +596,18 @@ def retry_workflow(
# Submit new execution
with get_client() as client:
submission = WorkflowSubmission(
target_path=original_run.target_path,
parameters=parameters
target_path=original_run.target_path, parameters=parameters
)
response = client.submit_workflow(original_run.workflow, submission)
console.print(f"\n✅ Retry submitted successfully!", style="green")
console.print(f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]")
console.print(f" Status: {status_emoji(response.status)} {response.status}")
console.print("\n✅ Retry submitted successfully!", style="green")
console.print(
f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]"
)
console.print(
f" Status: {status_emoji(response.status)} {response.status}"
)
# Save to database
try:
@@ -572,13 +618,17 @@ def retry_workflow(
target_path=original_run.target_path,
parameters=parameters,
created_at=datetime.now(),
metadata={"retry_of": execution_id}
metadata={"retry_of": execution_id},
)
db.save_run(run_record)
except Exception as e:
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
console.print(
f"⚠️ Failed to save execution to database: {e}", style="yellow"
)
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
console.print(
f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]"
)
except Exception as e:
handle_error(e, "retrying workflow")
@@ -588,4 +638,4 @@ def retry_workflow(
def workflow_exec_callback():
"""
🚀 Workflow execution management
"""
"""

View File

@@ -14,9 +14,9 @@ Provides "Did you mean...?" functionality and intelligent command/parameter sugg
#
# Additional attribution and requirements are provided in the NOTICE file.
import difflib
from typing import List, Optional, Dict, Any, Tuple
from typing import Any, Dict, List, Optional, Tuple
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
@@ -34,10 +34,9 @@ class FuzzyMatcher:
"workflows": ["list", "info"],
"runs": ["submit", "status", "list", "rerun"],
"findings": ["get", "list", "export", "all"],
"monitor": ["stats", "crashes", "live"],
"config": ["set", "get", "list", "init"],
"ai": ["ask", "summarize", "explain"],
"ingest": ["project", "findings"]
"ingest": ["project", "findings"],
}
# Common workflow names
@@ -47,7 +46,7 @@ class FuzzyMatcher:
"infrastructure_scan",
"static_analysis_scan",
"penetration_testing_scan",
"secret_detection_scan"
"secret_detection_scan",
]
# Common parameter names
@@ -60,24 +59,25 @@ class FuzzyMatcher:
"param-file",
"interactive",
"wait",
"live",
"format",
"output",
"severity",
"since",
"limit",
"stats",
"export"
"export",
]
# Common values
self.common_values = {
"volume_mode": ["ro", "rw"],
"format": ["json", "csv", "html", "sarif"],
"severity": ["critical", "high", "medium", "low", "info"]
"severity": ["critical", "high", "medium", "low", "info"],
}
def find_closest_command(self, user_input: str, command_group: Optional[str] = None) -> Optional[Tuple[str, float]]:
def find_closest_command(
self, user_input: str, command_group: Optional[str] = None
) -> Optional[Tuple[str, float]]:
"""Find the closest matching command."""
if command_group and command_group in self.commands:
# Search within a specific command group
@@ -86,9 +86,7 @@ class FuzzyMatcher:
# Search all main commands
candidates = list(self.commands.keys())
matches = difflib.get_close_matches(
user_input, candidates, n=1, cutoff=0.6
)
matches = difflib.get_close_matches(user_input, candidates, n=1, cutoff=0.6)
if matches:
match = matches[0]
@@ -114,7 +112,7 @@ class FuzzyMatcher:
def find_closest_parameter(self, user_input: str) -> Optional[Tuple[str, float]]:
"""Find the closest matching parameter name."""
# Remove leading dashes
clean_input = user_input.lstrip('-')
clean_input = user_input.lstrip("-")
matches = difflib.get_close_matches(
clean_input, self.parameter_names, n=1, cutoff=0.6
@@ -139,7 +137,9 @@ class FuzzyMatcher:
return []
def get_command_suggestions(self, user_command: List[str]) -> Optional[Dict[str, Any]]:
def get_command_suggestions(
self, user_command: List[str]
) -> Optional[Dict[str, Any]]:
"""Get suggestions for a user command that may have typos."""
if not user_command:
return None
@@ -153,11 +153,9 @@ class FuzzyMatcher:
if closest:
match, confidence = closest
suggestions["type"] = "main_command"
suggestions["suggestions"].append({
"text": match,
"confidence": confidence,
"type": "command"
})
suggestions["suggestions"].append(
{"text": match, "confidence": confidence, "type": "command"}
)
# Check subcommand if present
elif len(user_command) > 1:
@@ -167,11 +165,13 @@ class FuzzyMatcher:
if closest:
match, confidence = closest
suggestions["type"] = "subcommand"
suggestions["suggestions"].append({
"text": f"{main_cmd} {match}",
"confidence": confidence,
"type": "subcommand"
})
suggestions["suggestions"].append(
{
"text": f"{main_cmd} {match}",
"confidence": confidence,
"type": "subcommand",
}
)
return suggestions if suggestions["suggestions"] else None
@@ -210,17 +210,19 @@ def display_command_suggestion(suggestions: Dict[str, Any]):
# Add helpful context
if suggestion_type == "main_command":
text.append("\n💡 Use 'fuzzforge --help' to see all available commands", style="dim")
text.append(
"\n💡 Use 'fuzzforge --help' to see all available commands", style="dim"
)
elif suggestion_type == "subcommand":
main_cmd = suggestions["original"][0]
text.append(f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands", style="dim")
text.append(
f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands",
style="dim",
)
console.print(Panel(
text,
title="🤔 Command Suggestion",
border_style="yellow",
expand=False
))
console.print(
Panel(text, title="🤔 Command Suggestion", border_style="yellow", expand=False)
)
def display_workflow_suggestion(original: str, suggestion: str):
@@ -234,14 +236,13 @@ def display_workflow_suggestion(original: str, suggestion: str):
text.append(f"'{suggestion}'", style="bold green")
text.append("?\n\n")
text.append("💡 Use 'fuzzforge workflows' to see all available workflows", style="dim")
text.append(
"💡 Use 'fuzzforge workflows' to see all available workflows", style="dim"
)
console.print(Panel(
text,
title="🔧 Workflow Suggestion",
border_style="yellow",
expand=False
))
console.print(
Panel(text, title="🔧 Workflow Suggestion", border_style="yellow", expand=False)
)
def display_parameter_suggestion(original: str, suggestion: str):
@@ -257,12 +258,9 @@ def display_parameter_suggestion(original: str, suggestion: str):
text.append("💡 Use '--help' to see all available parameters", style="dim")
console.print(Panel(
text,
title="⚙️ Parameter Suggestion",
border_style="yellow",
expand=False
))
console.print(
Panel(text, title="⚙️ Parameter Suggestion", border_style="yellow", expand=False)
)
def enhanced_command_not_found_handler(command_parts: List[str]):
@@ -306,4 +304,4 @@ def enhanced_parameter_not_found_handler(parameter_name: str):
# Global fuzzy matcher instance
fuzzy_matcher = FuzzyMatcher()
fuzzy_matcher = FuzzyMatcher()

View File

@@ -12,22 +12,23 @@ Main CLI application with improved command structure.
#
# Additional attribution and requirements are provided in the NOTICE file.
import sys
from typing import List, Optional
import typer
from rich.console import Console
from rich.traceback import install
from typing import Optional, List
import sys
from .commands import (
init,
workflows,
workflow_exec,
findings,
monitor,
config as config_cmd,
ai,
findings,
ingest,
init,
workflow_exec,
workflows,
)
from .commands import (
config as config_cmd,
)
from .constants import DEFAULT_VOLUME_MODE
from .fuzzy import enhanced_command_not_found_handler
@@ -78,25 +79,30 @@ finding_app = typer.Typer(
# === Top-level commands ===
@app.command()
def init(
name: Optional[str] = typer.Option(
None, "--name", "-n",
help="Project name (defaults to current directory name)"
None, "--name", "-n", help="Project name (defaults to current directory name)"
),
api_url: Optional[str] = typer.Option(
None, "--api-url", "-u",
help="FuzzForge API URL (defaults to http://localhost:8000)"
None,
"--api-url",
"-u",
help="FuzzForge API URL (defaults to http://localhost:8000)",
),
force: bool = typer.Option(
False, "--force", "-f",
help="Force initialization even if project already exists"
)
False,
"--force",
"-f",
help="Force initialization even if project already exists",
),
):
"""
📁 Initialize a new FuzzForge project
"""
from .commands.init import project
project(name=name, api_url=api_url, force=force)
@@ -106,40 +112,18 @@ def status():
📊 Show project and latest execution status
"""
from .commands.status import show_status
show_status()
@app.command()
def config(
key: Optional[str] = typer.Argument(None, help="Configuration key"),
value: Optional[str] = typer.Argument(None, help="Configuration value to set")
):
"""
⚙️ Manage configuration (show all, get, or set values)
"""
from .commands import config as config_cmd
if key is None:
# No arguments: show all config
config_cmd.show_config(global_config=False)
elif value is None:
# Key only: get specific value
config_cmd.get_config(key=key, global_config=False)
else:
# Key and value: set value
config_cmd.set_config(key=key, value=value, global_config=False)
@app.command()
def clean(
days: int = typer.Option(
90, "--days", "-d",
help="Remove data older than this many days"
90, "--days", "-d", help="Remove data older than this many days"
),
dry_run: bool = typer.Option(
False, "--dry-run",
help="Show what would be deleted without actually deleting"
)
False, "--dry-run", help="Show what would be deleted without actually deleting"
),
):
"""
🧹 Clean old execution data and findings
@@ -155,7 +139,9 @@ def clean(
raise typer.Exit(1)
if dry_run:
console.print(f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days")
console.print(
f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days"
)
deleted = db.cleanup_old_runs(keep_days=days)
@@ -177,35 +163,35 @@ workflow_app.command("retry")(workflow_exec.retry_workflow)
workflow_app.command("info")(workflows.workflow_info)
workflow_app.command("params")(workflows.workflow_parameters)
@workflow_app.command("run")
def run_workflow(
workflow: str = typer.Argument(help="Workflow name"),
target: str = typer.Argument(help="Target path"),
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
params: List[str] = typer.Argument(
default=None, help="Parameters as key=value pairs"
),
param_file: Optional[str] = typer.Option(
None, "--param-file", "-f",
help="JSON file containing workflow parameters"
None, "--param-file", "-f", help="JSON file containing workflow parameters"
),
volume_mode: str = typer.Option(
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
help="Volume mount mode: ro (read-only) or rw (read-write)"
DEFAULT_VOLUME_MODE,
"--volume-mode",
"-v",
help="Volume mount mode: ro (read-only) or rw (read-write)",
),
timeout: Optional[int] = typer.Option(
None, "--timeout", "-t",
help="Execution timeout in seconds"
None, "--timeout", "-t", help="Execution timeout in seconds"
),
interactive: bool = typer.Option(
True, "--interactive/--no-interactive", "-i/-n",
help="Interactive parameter input for missing required parameters"
True,
"--interactive/--no-interactive",
"-i/-n",
help="Interactive parameter input for missing required parameters",
),
wait: bool = typer.Option(
False, "--wait", "-w",
help="Wait for execution to complete"
False, "--wait", "-w", help="Wait for execution to complete"
),
live: bool = typer.Option(
False, "--live", "-l",
help="Start live monitoring after execution (useful for fuzzing workflows)"
)
):
"""
🚀 Execute a security testing workflow
@@ -221,9 +207,9 @@ def run_workflow(
timeout=timeout,
interactive=interactive,
wait=wait,
live=live
)
@workflow_app.callback()
def workflow_main():
"""
@@ -239,17 +225,18 @@ def workflow_main():
# === Finding commands (singular) ===
@finding_app.command("export")
def export_finding(
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"),
execution_id: Optional[str] = typer.Argument(
None, help="Execution ID (defaults to latest)"
),
format: str = typer.Option(
"sarif", "--format", "-f",
help="Export format: sarif, json, csv"
"sarif", "--format", "-f", help="Export format: sarif, json, csv"
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file (defaults to stdout)"
)
None, "--output", "-o", help="Output file (defaults to stdout)"
),
):
"""
📤 Export findings to file
@@ -270,7 +257,9 @@ def export_finding(
execution_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {execution_id}")
else:
console.print("⚠️ No findings found in project database", style="yellow")
console.print(
"⚠️ No findings found in project database", style="yellow"
)
return
else:
console.print("❌ No project database found", style="red")
@@ -283,14 +272,16 @@ def export_finding(
@finding_app.command("analyze")
def analyze_finding(
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze")
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze"),
):
"""
🤖 AI analysis of a finding
"""
from .commands.ai import analyze_finding as ai_analyze
ai_analyze(finding_id)
@finding_app.callback(invoke_without_command=True)
def finding_main(
ctx: typer.Context,
@@ -309,7 +300,7 @@ def finding_main(
return
# Get remaining arguments for direct viewing
args = ctx.args if hasattr(ctx, 'args') else []
args = ctx.args if hasattr(ctx, "args") else []
finding_id = args[0] if args else None
# Direct viewing: fuzzforge finding [id]
@@ -329,7 +320,9 @@ def finding_main(
finding_id = recent_runs[0].run_id
console.print(f"🔍 Using most recent execution: {finding_id}")
else:
console.print("⚠️ No findings found in project database", style="yellow")
console.print(
"⚠️ No findings found in project database", style="yellow"
)
return
else:
console.print("❌ No project database found", style="red")
@@ -351,9 +344,10 @@ app.add_typer(workflow_app, name="workflow", help="🚀 Execute and manage workf
app.add_typer(finding_app, name="finding", help="🔍 View and analyze findings")
# Other command groups
app.add_typer(monitor.app, name="monitor", help="📊 Real-time monitoring")
app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")
app.add_typer(config_cmd.app, name="config", help="⚙️ Manage configuration settings")
# Help and utility commands
@app.command()
@@ -371,13 +365,10 @@ def examples():
[bold]Execute Workflows:[/bold]
ff workflow afl-fuzzing ./target # Run fuzzing on target
ff workflow afl-fuzzing . --live # Run with live monitoring
ff workflow scan-c ./src timeout=300 threads=4 # With parameters
[bold]Monitor Execution:[/bold]
ff status # Check latest execution
ff workflow status # Same as above
ff monitor # Live monitoring dashboard
ff workflow history # Show past executions
[bold]Review Findings:[/bold]
@@ -399,16 +390,16 @@ def version():
📦 Show version information
"""
from . import __version__
console.print(f"FuzzForge CLI v{__version__}")
console.print(f"Short command: ff")
console.print("Short command: ff")
@app.callback()
def main_callback(
ctx: typer.Context,
version: Optional[bool] = typer.Option(
None, "--version", "-v",
help="Show version information"
None, "--version", "-v", help="Show version information"
),
):
"""
@@ -422,6 +413,7 @@ def main_callback(
"""
if version:
from . import __version__
console.print(f"FuzzForge CLI v{__version__}")
raise typer.Exit()
@@ -432,12 +424,11 @@ def main():
if len(sys.argv) > 1:
args = sys.argv[1:]
# Handle finding command with pattern recognition
if len(args) >= 2 and args[0] == 'finding':
finding_subcommands = ['export', 'analyze']
if len(args) >= 2 and args[0] == "finding":
finding_subcommands = ["export", "analyze"]
# Skip custom dispatching if help flags are present
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args):
if not any(arg in ["--help", "-h", "--version", "-v"] for arg in args):
if args[1] not in finding_subcommands:
# Direct finding display: ff finding <id>
from .commands.findings import get_findings
@@ -457,18 +448,26 @@ def main():
app()
except SystemExit as e:
# Enhanced error handling for command not found
if hasattr(e, 'code') and e.code != 0 and len(sys.argv) > 1:
if hasattr(e, "code") and e.code != 0 and len(sys.argv) > 1:
command_parts = sys.argv[1:]
clean_parts = [part for part in command_parts if not part.startswith('-')]
clean_parts = [part for part in command_parts if not part.startswith("-")]
if clean_parts:
main_cmd = clean_parts[0]
valid_commands = [
'init', 'status', 'config', 'clean',
'workflows', 'workflow',
'findings', 'finding',
'monitor', 'ai', 'ingest',
'examples', 'version'
"init",
"status",
"config",
"clean",
"workflows",
"workflow",
"findings",
"finding",
"monitor",
"ai",
"ingest",
"examples",
"version",
]
if main_cmd not in valid_commands: