mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 03:12:44 +00:00
refactor: removed monitor command and --live parameter
This commit is contained in:
@@ -80,8 +80,6 @@ fuzzforge workflows info security_assessment
|
||||
# Submit a workflow for analysis
|
||||
fuzzforge workflow security_assessment /path/to/your/code
|
||||
|
||||
# Monitor progress in real-time
|
||||
fuzzforge monitor live <execution-id>
|
||||
|
||||
# View findings when complete
|
||||
fuzzforge finding <execution-id>
|
||||
@@ -179,7 +177,6 @@ fuzzforge workflow security_assessment /path/to/code --wait
|
||||
- `--timeout, -t` - Execution timeout in seconds
|
||||
- `--interactive/--no-interactive, -i/-n` - Interactive parameter input
|
||||
- `--wait, -w` - Wait for execution to complete
|
||||
- `--live, -l` - Show live monitoring during execution
|
||||
|
||||
#### `fuzzforge workflow status [execution-id]`
|
||||
Check the status of a workflow execution.
|
||||
@@ -261,39 +258,6 @@ fuzzforge finding export abc123def456 --format csv --output report.csv
|
||||
fuzzforge finding export --format html --output report.html
|
||||
```
|
||||
|
||||
### Real-time Monitoring
|
||||
|
||||
#### `fuzzforge monitor stats <execution-id>`
|
||||
Show current fuzzing statistics.
|
||||
|
||||
```bash
|
||||
# Show stats once
|
||||
fuzzforge monitor stats abc123def456 --once
|
||||
|
||||
# Live updating stats (default)
|
||||
fuzzforge monitor stats abc123def456 --refresh 5
|
||||
```
|
||||
|
||||
#### `fuzzforge monitor crashes <run-id>`
|
||||
Display crash reports for a fuzzing run.
|
||||
|
||||
```bash
|
||||
fuzzforge monitor crashes abc123def456 --limit 50
|
||||
```
|
||||
|
||||
#### `fuzzforge monitor live <run-id>`
|
||||
Real-time monitoring dashboard with live updates.
|
||||
|
||||
```bash
|
||||
fuzzforge monitor live abc123def456 --refresh 3
|
||||
```
|
||||
|
||||
Features:
|
||||
- Live updating statistics
|
||||
- Progress indicators and bars
|
||||
- Run status monitoring
|
||||
- Automatic completion detection
|
||||
|
||||
### Configuration Management
|
||||
|
||||
#### `fuzzforge config show`
|
||||
@@ -495,7 +459,6 @@ cli/
|
||||
│ ├── workflows.py # Workflow management
|
||||
│ ├── runs.py # Run management
|
||||
│ ├── findings.py # Findings management
|
||||
│ ├── monitor.py # Real-time monitoring
|
||||
│ ├── config.py # Configuration commands
|
||||
│ └── status.py # Status information
|
||||
├── pyproject.toml # Project configuration
|
||||
@@ -576,7 +539,6 @@ fuzzforge --help
|
||||
# Command-specific help
|
||||
ff workflows --help
|
||||
ff workflow run --help
|
||||
ff monitor live --help
|
||||
|
||||
# Show version
|
||||
fuzzforge --version
|
||||
@@ -618,4 +580,4 @@ Contributions are welcome! Please see the main FuzzForge repository for contribu
|
||||
|
||||
---
|
||||
|
||||
**FuzzForge CLI** - Making security testing workflows accessible and efficient from the command line.
|
||||
**FuzzForge CLI** - Making security testing workflows accessible and efficient from the command line.
|
||||
|
||||
@@ -10,11 +10,10 @@
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from pathlib import Path
|
||||
import os
|
||||
from pathlib import Path
|
||||
from textwrap import dedent
|
||||
from typing import Optional
|
||||
|
||||
@@ -32,17 +31,20 @@ app = typer.Typer()
|
||||
@app.command()
|
||||
def project(
|
||||
name: Optional[str] = typer.Option(
|
||||
None, "--name", "-n",
|
||||
help="Project name (defaults to current directory name)"
|
||||
None, "--name", "-n", help="Project name (defaults to current directory name)"
|
||||
),
|
||||
api_url: Optional[str] = typer.Option(
|
||||
None, "--api-url", "-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)"
|
||||
None,
|
||||
"--api-url",
|
||||
"-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)",
|
||||
),
|
||||
force: bool = typer.Option(
|
||||
False, "--force", "-f",
|
||||
help="Force initialization even if project already exists"
|
||||
)
|
||||
False,
|
||||
"--force",
|
||||
"-f",
|
||||
help="Force initialization even if project already exists",
|
||||
),
|
||||
):
|
||||
"""
|
||||
📁 Initialize a new FuzzForge project in the current directory.
|
||||
@@ -58,24 +60,20 @@ def project(
|
||||
# Check if project already exists
|
||||
if fuzzforge_dir.exists() and not force:
|
||||
if fuzzforge_dir.is_dir() and any(fuzzforge_dir.iterdir()):
|
||||
console.print("❌ FuzzForge project already exists in this directory", style="red")
|
||||
console.print(
|
||||
"❌ FuzzForge project already exists in this directory", style="red"
|
||||
)
|
||||
console.print("Use --force to reinitialize", style="dim")
|
||||
raise typer.Exit(1)
|
||||
|
||||
# Get project name
|
||||
if not name:
|
||||
name = Prompt.ask(
|
||||
"Project name",
|
||||
default=current_dir.name,
|
||||
console=console
|
||||
)
|
||||
name = Prompt.ask("Project name", default=current_dir.name, console=console)
|
||||
|
||||
# Get API URL
|
||||
if not api_url:
|
||||
api_url = Prompt.ask(
|
||||
"FuzzForge API URL",
|
||||
default="http://localhost:8000",
|
||||
console=console
|
||||
"FuzzForge API URL", default="http://localhost:8000", console=console
|
||||
)
|
||||
|
||||
# Confirm initialization
|
||||
@@ -117,15 +115,15 @@ def project(
|
||||
]
|
||||
|
||||
if gitignore_path.exists():
|
||||
with open(gitignore_path, 'r') as f:
|
||||
with open(gitignore_path, "r") as f:
|
||||
existing_content = f.read()
|
||||
|
||||
if "# FuzzForge CLI" not in existing_content:
|
||||
with open(gitignore_path, 'a') as f:
|
||||
with open(gitignore_path, "a") as f:
|
||||
f.write(f"\n{chr(10).join(gitignore_entries)}\n")
|
||||
console.print("📝 Updated .gitignore with FuzzForge entries")
|
||||
else:
|
||||
with open(gitignore_path, 'w') as f:
|
||||
with open(gitignore_path, "w") as f:
|
||||
f.write(f"{chr(10).join(gitignore_entries)}\n")
|
||||
console.print("📝 Created .gitignore")
|
||||
|
||||
@@ -145,9 +143,6 @@ fuzzforge workflows
|
||||
# Submit a workflow for analysis
|
||||
fuzzforge workflow <workflow-name> /path/to/target
|
||||
|
||||
# Monitor run progress
|
||||
fuzzforge monitor live <run-id>
|
||||
|
||||
# View findings
|
||||
fuzzforge finding <run-id>
|
||||
```
|
||||
@@ -159,12 +154,12 @@ fuzzforge finding <run-id>
|
||||
- `.fuzzforge/findings.db` - Local database for runs and findings
|
||||
"""
|
||||
|
||||
with open(readme_path, 'w') as f:
|
||||
with open(readme_path, "w") as f:
|
||||
f.write(readme_content)
|
||||
console.print("📚 Created README.md")
|
||||
|
||||
console.print("\n✅ FuzzForge project initialized successfully!", style="green")
|
||||
console.print(f"\n🎯 Next steps:")
|
||||
console.print("\n🎯 Next steps:")
|
||||
console.print(" • ff workflows - See available workflows")
|
||||
console.print(" • ff status - Check API connectivity")
|
||||
console.print(" • ff workflow <workflow> <path> - Start your first analysis")
|
||||
|
||||
@@ -1,436 +0,0 @@
|
||||
"""
|
||||
Real-time monitoring and statistics commands.
|
||||
"""
|
||||
# Copyright (c) 2025 FuzzingLabs
|
||||
#
|
||||
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
||||
# at the root of this repository for details.
|
||||
#
|
||||
# After the Change Date (four years from publication), this version of the
|
||||
# Licensed Work will be made available under the Apache License, Version 2.0.
|
||||
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
|
||||
import asyncio
|
||||
import time
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.panel import Panel
|
||||
from rich.live import Live
|
||||
from rich.layout import Layout
|
||||
from rich.progress import Progress, BarColumn, TextColumn, SpinnerColumn
|
||||
from rich.align import Align
|
||||
from rich import box
|
||||
|
||||
from ..config import get_project_config, FuzzForgeConfig
|
||||
from ..database import get_project_db, ensure_project_db, CrashRecord
|
||||
from fuzzforge_sdk import FuzzForgeClient
|
||||
|
||||
console = Console()
|
||||
app = typer.Typer()
|
||||
|
||||
|
||||
def get_client() -> FuzzForgeClient:
|
||||
"""Get configured FuzzForge client"""
|
||||
config = get_project_config() or FuzzForgeConfig()
|
||||
return FuzzForgeClient(base_url=config.get_api_url(), timeout=config.get_timeout())
|
||||
|
||||
|
||||
def format_duration(seconds: int) -> str:
|
||||
"""Format duration in human readable format"""
|
||||
if seconds < 60:
|
||||
return f"{seconds}s"
|
||||
elif seconds < 3600:
|
||||
return f"{seconds // 60}m {seconds % 60}s"
|
||||
else:
|
||||
hours = seconds // 3600
|
||||
minutes = (seconds % 3600) // 60
|
||||
return f"{hours}h {minutes}m"
|
||||
|
||||
|
||||
def format_number(num: int) -> str:
|
||||
"""Format large numbers with K, M suffixes"""
|
||||
if num >= 1000000:
|
||||
return f"{num / 1000000:.1f}M"
|
||||
elif num >= 1000:
|
||||
return f"{num / 1000:.1f}K"
|
||||
else:
|
||||
return str(num)
|
||||
|
||||
|
||||
@app.command("stats")
|
||||
def fuzzing_stats(
|
||||
run_id: str = typer.Argument(..., help="Run ID to get statistics for"),
|
||||
refresh: int = typer.Option(
|
||||
5, "--refresh", "-r",
|
||||
help="Refresh interval in seconds"
|
||||
),
|
||||
once: bool = typer.Option(
|
||||
False, "--once",
|
||||
help="Show stats once and exit"
|
||||
)
|
||||
):
|
||||
"""
|
||||
📊 Show current fuzzing statistics for a run
|
||||
"""
|
||||
try:
|
||||
with get_client() as client:
|
||||
if once:
|
||||
# Show stats once
|
||||
stats = client.get_fuzzing_stats(run_id)
|
||||
display_stats_table(stats)
|
||||
else:
|
||||
# Live updating stats
|
||||
console.print(f"📊 [bold]Live Fuzzing Statistics[/bold] (Run: {run_id[:12]}...)")
|
||||
console.print(f"Refreshing every {refresh}s. Press Ctrl+C to stop.\n")
|
||||
|
||||
with Live(auto_refresh=False, console=console) as live:
|
||||
while True:
|
||||
try:
|
||||
stats = client.get_fuzzing_stats(run_id)
|
||||
table = create_stats_table(stats)
|
||||
live.update(table, refresh=True)
|
||||
time.sleep(refresh)
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n📊 Monitoring stopped", style="yellow")
|
||||
break
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"❌ Failed to get fuzzing stats: {e}", style="red")
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def display_stats_table(stats):
|
||||
"""Display stats in a simple table"""
|
||||
table = create_stats_table(stats)
|
||||
console.print(table)
|
||||
|
||||
|
||||
def create_stats_table(stats) -> Panel:
|
||||
"""Create a rich table for fuzzing statistics"""
|
||||
# Create main stats table
|
||||
stats_table = Table(show_header=False, box=box.SIMPLE)
|
||||
stats_table.add_column("Metric", style="bold cyan")
|
||||
stats_table.add_column("Value", justify="right", style="bold white")
|
||||
|
||||
stats_table.add_row("Total Executions", format_number(stats.executions))
|
||||
stats_table.add_row("Executions/sec", f"{stats.executions_per_sec:.1f}")
|
||||
stats_table.add_row("Total Crashes", format_number(stats.crashes))
|
||||
stats_table.add_row("Unique Crashes", format_number(stats.unique_crashes))
|
||||
|
||||
if stats.coverage is not None:
|
||||
stats_table.add_row("Code Coverage", f"{stats.coverage:.1f}%")
|
||||
|
||||
stats_table.add_row("Corpus Size", format_number(stats.corpus_size))
|
||||
stats_table.add_row("Elapsed Time", format_duration(stats.elapsed_time))
|
||||
|
||||
if stats.last_crash_time:
|
||||
time_since_crash = datetime.now() - stats.last_crash_time
|
||||
stats_table.add_row("Last Crash", f"{format_duration(int(time_since_crash.total_seconds()))} ago")
|
||||
|
||||
return Panel.fit(
|
||||
stats_table,
|
||||
title=f"📊 Fuzzing Statistics - {stats.workflow}",
|
||||
subtitle=f"Run: {stats.run_id[:12]}...",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
|
||||
|
||||
@app.command("crashes")
|
||||
def crash_reports(
|
||||
run_id: str = typer.Argument(..., help="Run ID to get crash reports for"),
|
||||
save: bool = typer.Option(
|
||||
True, "--save/--no-save",
|
||||
help="Save crashes to local database"
|
||||
),
|
||||
limit: int = typer.Option(
|
||||
50, "--limit", "-l",
|
||||
help="Maximum number of crashes to show"
|
||||
)
|
||||
):
|
||||
"""
|
||||
🐛 Display crash reports for a fuzzing run
|
||||
"""
|
||||
try:
|
||||
with get_client() as client:
|
||||
console.print(f"🐛 Fetching crash reports for run: {run_id}")
|
||||
crashes = client.get_crash_reports(run_id)
|
||||
|
||||
if not crashes:
|
||||
console.print("✅ No crashes found!", style="green")
|
||||
return
|
||||
|
||||
# Save to database if requested
|
||||
if save:
|
||||
db = ensure_project_db()
|
||||
for crash in crashes:
|
||||
crash_record = CrashRecord(
|
||||
run_id=run_id,
|
||||
crash_id=crash.crash_id,
|
||||
signal=crash.signal,
|
||||
stack_trace=crash.stack_trace,
|
||||
input_file=crash.input_file,
|
||||
severity=crash.severity,
|
||||
timestamp=crash.timestamp
|
||||
)
|
||||
db.save_crash(crash_record)
|
||||
console.print("✅ Crashes saved to local database")
|
||||
|
||||
# Display crashes
|
||||
crashes_to_show = crashes[:limit]
|
||||
|
||||
# Summary
|
||||
severity_counts = {}
|
||||
signal_counts = {}
|
||||
for crash in crashes:
|
||||
severity_counts[crash.severity] = severity_counts.get(crash.severity, 0) + 1
|
||||
if crash.signal:
|
||||
signal_counts[crash.signal] = signal_counts.get(crash.signal, 0) + 1
|
||||
|
||||
summary_table = Table(show_header=False, box=box.SIMPLE)
|
||||
summary_table.add_column("Metric", style="bold cyan")
|
||||
summary_table.add_column("Value", justify="right")
|
||||
|
||||
summary_table.add_row("Total Crashes", str(len(crashes)))
|
||||
summary_table.add_row("Unique Signals", str(len(signal_counts)))
|
||||
|
||||
for severity, count in sorted(severity_counts.items()):
|
||||
summary_table.add_row(f"{severity.title()} Severity", str(count))
|
||||
|
||||
console.print(
|
||||
Panel.fit(
|
||||
summary_table,
|
||||
title=f"🐛 Crash Summary",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
)
|
||||
|
||||
# Detailed crash table
|
||||
if crashes_to_show:
|
||||
crashes_table = Table(box=box.ROUNDED)
|
||||
crashes_table.add_column("Crash ID", style="bold cyan")
|
||||
crashes_table.add_column("Signal", justify="center")
|
||||
crashes_table.add_column("Severity", justify="center")
|
||||
crashes_table.add_column("Timestamp", justify="center")
|
||||
crashes_table.add_column("Input File", style="dim")
|
||||
|
||||
for crash in crashes_to_show:
|
||||
signal_emoji = {
|
||||
"SIGSEGV": "💥",
|
||||
"SIGABRT": "🛑",
|
||||
"SIGFPE": "🧮",
|
||||
"SIGILL": "⚠️"
|
||||
}.get(crash.signal or "", "🐛")
|
||||
|
||||
severity_style = {
|
||||
"high": "red",
|
||||
"medium": "yellow",
|
||||
"low": "green"
|
||||
}.get(crash.severity.lower(), "white")
|
||||
|
||||
input_display = ""
|
||||
if crash.input_file:
|
||||
input_display = crash.input_file.split("/")[-1] # Show just filename
|
||||
|
||||
crashes_table.add_row(
|
||||
crash.crash_id[:12] + "..." if len(crash.crash_id) > 15 else crash.crash_id,
|
||||
f"{signal_emoji} {crash.signal or 'Unknown'}",
|
||||
f"[{severity_style}]{crash.severity}[/{severity_style}]",
|
||||
crash.timestamp.strftime("%H:%M:%S"),
|
||||
input_display
|
||||
)
|
||||
|
||||
console.print(f"\n🐛 [bold]Crash Details[/bold]")
|
||||
if len(crashes) > limit:
|
||||
console.print(f"Showing first {limit} of {len(crashes)} crashes")
|
||||
console.print()
|
||||
console.print(crashes_table)
|
||||
|
||||
console.print(f"\n💡 Use [bold cyan]fuzzforge finding {run_id}[/bold cyan] for detailed analysis")
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"❌ Failed to get crash reports: {e}", style="red")
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def _live_monitor(run_id: str, refresh: int):
|
||||
"""Helper for live monitoring to allow for cleaner exit handling"""
|
||||
with get_client() as client:
|
||||
start_time = time.time()
|
||||
|
||||
def render_layout(run_status, stats):
|
||||
layout = Layout()
|
||||
layout.split_column(
|
||||
Layout(name="header", size=3),
|
||||
Layout(name="main", ratio=1),
|
||||
Layout(name="footer", size=3)
|
||||
)
|
||||
layout["main"].split_row(
|
||||
Layout(name="stats", ratio=1),
|
||||
Layout(name="progress", ratio=1)
|
||||
)
|
||||
header = Panel(
|
||||
f"[bold]FuzzForge Live Monitor[/bold]\n"
|
||||
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
|
||||
f"Uptime: {format_duration(int(time.time() - start_time))}",
|
||||
box=box.ROUNDED,
|
||||
style="cyan"
|
||||
)
|
||||
layout["header"].update(header)
|
||||
layout["stats"].update(create_stats_table(stats))
|
||||
|
||||
progress_table = Table(show_header=False, box=box.SIMPLE)
|
||||
progress_table.add_column("Metric", style="bold")
|
||||
progress_table.add_column("Progress")
|
||||
if stats.executions > 0:
|
||||
exec_rate_percent = min(100, (stats.executions_per_sec / 1000) * 100)
|
||||
progress_table.add_row("Exec Rate", create_progress_bar(exec_rate_percent, "green"))
|
||||
crash_rate = (stats.crashes / stats.executions) * 100000
|
||||
crash_rate_percent = min(100, crash_rate * 10)
|
||||
progress_table.add_row("Crash Rate", create_progress_bar(crash_rate_percent, "red"))
|
||||
if stats.coverage is not None:
|
||||
progress_table.add_row("Coverage", create_progress_bar(stats.coverage, "blue"))
|
||||
layout["progress"].update(Panel.fit(progress_table, title="📊 Progress Indicators", box=box.ROUNDED))
|
||||
|
||||
footer = Panel(
|
||||
f"Last updated: {datetime.now().strftime('%H:%M:%S')} | "
|
||||
f"Refresh interval: {refresh}s | Press Ctrl+C to exit",
|
||||
box=box.ROUNDED,
|
||||
style="dim"
|
||||
)
|
||||
layout["footer"].update(footer)
|
||||
return layout
|
||||
|
||||
with Live(auto_refresh=False, console=console, screen=True) as live:
|
||||
# Initial fetch
|
||||
try:
|
||||
run_status = client.get_run_status(run_id)
|
||||
stats = client.get_fuzzing_stats(run_id)
|
||||
except Exception:
|
||||
# Minimal fallback stats
|
||||
class FallbackStats:
|
||||
def __init__(self, run_id):
|
||||
self.run_id = run_id
|
||||
self.workflow = "unknown"
|
||||
self.executions = 0
|
||||
self.executions_per_sec = 0.0
|
||||
self.crashes = 0
|
||||
self.unique_crashes = 0
|
||||
self.coverage = None
|
||||
self.corpus_size = 0
|
||||
self.elapsed_time = 0
|
||||
self.last_crash_time = None
|
||||
stats = FallbackStats(run_id)
|
||||
run_status = type("RS", (), {"status":"Unknown","is_completed":False,"is_failed":False})()
|
||||
|
||||
live.update(render_layout(run_status, stats), refresh=True)
|
||||
|
||||
# Simple polling approach that actually works
|
||||
consecutive_errors = 0
|
||||
max_errors = 5
|
||||
|
||||
while True:
|
||||
try:
|
||||
# Poll for updates
|
||||
try:
|
||||
run_status = client.get_run_status(run_id)
|
||||
consecutive_errors = 0
|
||||
except Exception as e:
|
||||
consecutive_errors += 1
|
||||
if consecutive_errors >= max_errors:
|
||||
console.print(f"❌ Too many errors getting run status: {e}", style="red")
|
||||
break
|
||||
time.sleep(refresh)
|
||||
continue
|
||||
|
||||
# Try to get fuzzing stats
|
||||
try:
|
||||
stats = client.get_fuzzing_stats(run_id)
|
||||
except Exception as e:
|
||||
# Create fallback stats if not available
|
||||
stats = FallbackStats(run_id)
|
||||
|
||||
# Update display
|
||||
live.update(render_layout(run_status, stats), refresh=True)
|
||||
|
||||
# Check if completed
|
||||
if getattr(run_status, 'is_completed', False) or getattr(run_status, 'is_failed', False):
|
||||
# Show final state for a few seconds
|
||||
console.print("\n🏁 Run completed. Showing final state for 10 seconds...")
|
||||
time.sleep(10)
|
||||
break
|
||||
|
||||
# Wait before next poll
|
||||
time.sleep(refresh)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
raise
|
||||
except Exception as e:
|
||||
console.print(f"⚠️ Monitoring error: {e}", style="yellow")
|
||||
time.sleep(refresh)
|
||||
|
||||
# Completed status update
|
||||
final_message = (
|
||||
f"[bold]FuzzForge Live Monitor - COMPLETED[/bold]\n"
|
||||
f"Run: {run_id[:12]}... | Status: {run_status.status} | "
|
||||
f"Total runtime: {format_duration(int(time.time() - start_time))}"
|
||||
)
|
||||
style = "green" if getattr(run_status, 'is_completed', False) else "red"
|
||||
live.update(Panel(final_message, box=box.ROUNDED, style=style), refresh=True)
|
||||
|
||||
|
||||
@app.command("live")
|
||||
def live_monitor(
|
||||
run_id: str = typer.Argument(..., help="Run ID to monitor live"),
|
||||
refresh: int = typer.Option(
|
||||
2, "--refresh", "-r",
|
||||
help="Refresh interval in seconds (fallback when streaming unavailable)"
|
||||
)
|
||||
):
|
||||
"""
|
||||
📺 Real-time monitoring dashboard with live updates (WebSocket/SSE with REST fallback)
|
||||
"""
|
||||
console.print(f"📺 [bold]Live Monitoring Dashboard[/bold]")
|
||||
console.print(f"Run: {run_id}")
|
||||
console.print(f"Press Ctrl+C to stop monitoring\n")
|
||||
try:
|
||||
_live_monitor(run_id, refresh)
|
||||
except KeyboardInterrupt:
|
||||
console.print("\n📊 Monitoring stopped by user.", style="yellow")
|
||||
except Exception as e:
|
||||
console.print(f"❌ Failed to start live monitoring: {e}", style="red")
|
||||
raise typer.Exit(1)
|
||||
|
||||
|
||||
def create_progress_bar(percentage: float, color: str = "green") -> str:
|
||||
"""Create a simple text progress bar"""
|
||||
width = 20
|
||||
filled = int((percentage / 100) * width)
|
||||
bar = "█" * filled + "░" * (width - filled)
|
||||
return f"[{color}]{bar}[/{color}] {percentage:.1f}%"
|
||||
|
||||
|
||||
@app.callback(invoke_without_command=True)
|
||||
def monitor_callback(ctx: typer.Context):
|
||||
"""
|
||||
📊 Real-time monitoring and statistics
|
||||
"""
|
||||
# Check if a subcommand is being invoked
|
||||
if ctx.invoked_subcommand is not None:
|
||||
# Let the subcommand handle it
|
||||
return
|
||||
|
||||
# Show not implemented message for default command
|
||||
from rich.console import Console
|
||||
console = Console()
|
||||
console.print("🚧 [yellow]Monitor command is not fully implemented yet.[/yellow]")
|
||||
console.print("Please use specific subcommands:")
|
||||
console.print(" • [cyan]ff monitor stats <run-id>[/cyan] - Show execution statistics")
|
||||
console.print(" • [cyan]ff monitor crashes <run-id>[/cyan] - Show crash reports")
|
||||
console.print(" • [cyan]ff monitor live <run-id>[/cyan] - Live monitoring dashboard")
|
||||
@@ -13,39 +13,47 @@ Replaces the old 'runs' terminology with cleaner workflow-centric commands.
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Optional, Dict, Any, List
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
from rich.panel import Panel
|
||||
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
|
||||
from rich.prompt import Prompt, Confirm
|
||||
from rich.live import Live
|
||||
from rich import box
|
||||
|
||||
from ..config import get_project_config, FuzzForgeConfig
|
||||
from ..database import get_project_db, ensure_project_db, RunRecord
|
||||
from ..exceptions import (
|
||||
handle_error, retry_on_network_error, safe_json_load, require_project,
|
||||
APIConnectionError, ValidationError, DatabaseError, FileOperationError
|
||||
)
|
||||
from ..validation import (
|
||||
validate_run_id, validate_workflow_name, validate_target_path,
|
||||
validate_volume_mode, validate_parameters, validate_timeout
|
||||
)
|
||||
from ..progress import progress_manager, spinner, step_progress
|
||||
from ..completion import WorkflowNameComplete, TargetPathComplete, VolumeModetComplete
|
||||
from ..constants import (
|
||||
STATUS_EMOJIS, MAX_RUN_ID_DISPLAY_LENGTH, DEFAULT_VOLUME_MODE,
|
||||
PROGRESS_STEP_DELAYS, MAX_RETRIES, RETRY_DELAY, POLL_INTERVAL
|
||||
)
|
||||
from fuzzforge_sdk import FuzzForgeClient, WorkflowSubmission
|
||||
from rich import box
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.prompt import Confirm, Prompt
|
||||
from rich.table import Table
|
||||
|
||||
from ..config import FuzzForgeConfig, get_project_config
|
||||
from ..constants import (
|
||||
DEFAULT_VOLUME_MODE,
|
||||
MAX_RETRIES,
|
||||
MAX_RUN_ID_DISPLAY_LENGTH,
|
||||
POLL_INTERVAL,
|
||||
PROGRESS_STEP_DELAYS,
|
||||
RETRY_DELAY,
|
||||
STATUS_EMOJIS,
|
||||
)
|
||||
from ..database import RunRecord, ensure_project_db, get_project_db
|
||||
from ..exceptions import (
|
||||
DatabaseError,
|
||||
ValidationError,
|
||||
handle_error,
|
||||
require_project,
|
||||
retry_on_network_error,
|
||||
safe_json_load,
|
||||
)
|
||||
from ..progress import step_progress
|
||||
from ..validation import (
|
||||
validate_parameters,
|
||||
validate_run_id,
|
||||
validate_target_path,
|
||||
validate_timeout,
|
||||
validate_volume_mode,
|
||||
validate_workflow_name,
|
||||
)
|
||||
|
||||
console = Console()
|
||||
app = typer.Typer()
|
||||
@@ -75,7 +83,7 @@ def execute_workflow_submission(
|
||||
parameters: Dict[str, Any],
|
||||
volume_mode: str,
|
||||
timeout: Optional[int],
|
||||
interactive: bool
|
||||
interactive: bool,
|
||||
) -> Any:
|
||||
"""Handle the workflow submission process"""
|
||||
# Get workflow metadata for parameter validation
|
||||
@@ -92,7 +100,9 @@ def execute_workflow_submission(
|
||||
missing_required = required_params - set(parameters.keys())
|
||||
|
||||
if missing_required:
|
||||
console.print(f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}")
|
||||
console.print(
|
||||
f"\n📝 [bold]Missing required parameters:[/bold] {', '.join(missing_required)}"
|
||||
)
|
||||
console.print("Please provide values:\n")
|
||||
|
||||
for param_name in missing_required:
|
||||
@@ -114,9 +124,18 @@ def execute_workflow_submission(
|
||||
elif param_type == "number":
|
||||
parameters[param_name] = float(user_input)
|
||||
elif param_type == "boolean":
|
||||
parameters[param_name] = user_input.lower() in ("true", "yes", "1", "on")
|
||||
parameters[param_name] = user_input.lower() in (
|
||||
"true",
|
||||
"yes",
|
||||
"1",
|
||||
"on",
|
||||
)
|
||||
elif param_type == "array":
|
||||
parameters[param_name] = [item.strip() for item in user_input.split(",") if item.strip()]
|
||||
parameters[param_name] = [
|
||||
item.strip()
|
||||
for item in user_input.split(",")
|
||||
if item.strip()
|
||||
]
|
||||
else:
|
||||
parameters[param_name] = user_input
|
||||
break
|
||||
@@ -127,8 +146,9 @@ def execute_workflow_submission(
|
||||
validate_volume_mode(volume_mode)
|
||||
if volume_mode not in workflow_meta.supported_volume_modes:
|
||||
raise ValidationError(
|
||||
"volume mode", volume_mode,
|
||||
f"one of: {', '.join(workflow_meta.supported_volume_modes)}"
|
||||
"volume mode",
|
||||
volume_mode,
|
||||
f"one of: {', '.join(workflow_meta.supported_volume_modes)}",
|
||||
)
|
||||
|
||||
# Create submission
|
||||
@@ -136,11 +156,11 @@ def execute_workflow_submission(
|
||||
target_path=target_path,
|
||||
volume_mode=volume_mode,
|
||||
parameters=parameters,
|
||||
timeout=timeout
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Show submission summary
|
||||
console.print(f"\n🎯 [bold]Executing workflow:[/bold]")
|
||||
console.print("\n🎯 [bold]Executing workflow:[/bold]")
|
||||
console.print(f" Workflow: {workflow}")
|
||||
console.print(f" Target: {target_path}")
|
||||
console.print(f" Volume Mode: {volume_mode}")
|
||||
@@ -165,7 +185,7 @@ def execute_workflow_submission(
|
||||
"Connecting to FuzzForge API",
|
||||
"Uploading parameters and settings",
|
||||
"Creating workflow deployment",
|
||||
"Initializing execution environment"
|
||||
"Initializing execution environment",
|
||||
]
|
||||
|
||||
with step_progress(steps, f"Executing {workflow}") as progress:
|
||||
@@ -185,46 +205,46 @@ def execute_workflow_submission(
|
||||
progress.next_step() # Initializing
|
||||
time.sleep(PROGRESS_STEP_DELAYS["initializing"])
|
||||
|
||||
progress.complete(f"Workflow started successfully!")
|
||||
progress.complete("Workflow started successfully!")
|
||||
|
||||
return response
|
||||
|
||||
|
||||
# Main workflow execution command (replaces 'runs submit')
|
||||
@app.command(name="exec", hidden=True) # Hidden because it will be called from main workflow command
|
||||
@app.command(
|
||||
name="exec", hidden=True
|
||||
) # Hidden because it will be called from main workflow command
|
||||
def execute_workflow(
|
||||
workflow: str = typer.Argument(..., help="Workflow name to execute"),
|
||||
target_path: str = typer.Argument(..., help="Path to analyze"),
|
||||
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
|
||||
params: List[str] = typer.Argument(
|
||||
default=None, help="Parameters as key=value pairs"
|
||||
),
|
||||
param_file: Optional[str] = typer.Option(
|
||||
None, "--param-file", "-f",
|
||||
help="JSON file containing workflow parameters"
|
||||
None, "--param-file", "-f", help="JSON file containing workflow parameters"
|
||||
),
|
||||
volume_mode: str = typer.Option(
|
||||
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)"
|
||||
DEFAULT_VOLUME_MODE,
|
||||
"--volume-mode",
|
||||
"-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)",
|
||||
),
|
||||
timeout: Optional[int] = typer.Option(
|
||||
None, "--timeout", "-t",
|
||||
help="Execution timeout in seconds"
|
||||
None, "--timeout", "-t", help="Execution timeout in seconds"
|
||||
),
|
||||
interactive: bool = typer.Option(
|
||||
True, "--interactive/--no-interactive", "-i/-n",
|
||||
help="Interactive parameter input for missing required parameters"
|
||||
True,
|
||||
"--interactive/--no-interactive",
|
||||
"-i/-n",
|
||||
help="Interactive parameter input for missing required parameters",
|
||||
),
|
||||
wait: bool = typer.Option(
|
||||
False, "--wait", "-w",
|
||||
help="Wait for execution to complete"
|
||||
False, "--wait", "-w", help="Wait for execution to complete"
|
||||
),
|
||||
live: bool = typer.Option(
|
||||
False, "--live", "-l",
|
||||
help="Start live monitoring after execution (useful for fuzzing workflows)"
|
||||
)
|
||||
):
|
||||
"""
|
||||
🚀 Execute a workflow on a target
|
||||
|
||||
Use --live for fuzzing workflows to see real-time progress.
|
||||
Use --wait to wait for completion without live dashboard.
|
||||
"""
|
||||
try:
|
||||
@@ -264,13 +284,20 @@ def execute_workflow(
|
||||
try:
|
||||
with get_client() as client:
|
||||
response = execute_workflow_submission(
|
||||
client, workflow, target_path, parameters,
|
||||
volume_mode, timeout, interactive
|
||||
client,
|
||||
workflow,
|
||||
target_path,
|
||||
parameters,
|
||||
volume_mode,
|
||||
timeout,
|
||||
interactive,
|
||||
)
|
||||
|
||||
console.print(f"✅ Workflow execution started!", style="green")
|
||||
console.print("✅ Workflow execution started!", style="green")
|
||||
console.print(f" Execution ID: [bold cyan]{response.run_id}[/bold cyan]")
|
||||
console.print(f" Status: {status_emoji(response.status)} {response.status}")
|
||||
console.print(
|
||||
f" Status: {status_emoji(response.status)} {response.status}"
|
||||
)
|
||||
|
||||
# Save to database
|
||||
try:
|
||||
@@ -281,65 +308,55 @@ def execute_workflow(
|
||||
status=response.status,
|
||||
target_path=target_path,
|
||||
parameters=parameters,
|
||||
created_at=datetime.now()
|
||||
created_at=datetime.now(),
|
||||
)
|
||||
db.save_run(run_record)
|
||||
except Exception as e:
|
||||
# Don't fail the whole operation if database save fails
|
||||
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
|
||||
console.print(
|
||||
f"⚠️ Failed to save execution to database: {e}", style="yellow"
|
||||
)
|
||||
|
||||
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
|
||||
console.print(f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]")
|
||||
|
||||
# Suggest --live for fuzzing workflows
|
||||
if not live and not wait and "fuzzing" in workflow.lower():
|
||||
console.print(f"💡 Next time try: [bold cyan]fuzzforge workflow {workflow} {target_path} --live[/bold cyan] for real-time fuzzing dashboard", style="dim")
|
||||
|
||||
# Start live monitoring if requested
|
||||
if live:
|
||||
# Check if this is a fuzzing workflow to show appropriate messaging
|
||||
is_fuzzing = "fuzzing" in workflow.lower()
|
||||
if is_fuzzing:
|
||||
console.print(f"\n📺 Starting live fuzzing dashboard...")
|
||||
console.print("💡 You'll see real-time crash discovery, execution stats, and coverage data.")
|
||||
else:
|
||||
console.print(f"\n📺 Starting live monitoring dashboard...")
|
||||
|
||||
console.print("Press Ctrl+C to stop monitoring (execution continues in background).\n")
|
||||
|
||||
try:
|
||||
from ..commands.monitor import live_monitor
|
||||
# Import monitor command and run it
|
||||
live_monitor(response.run_id, refresh=3)
|
||||
except KeyboardInterrupt:
|
||||
console.print(f"\n⏹️ Live monitoring stopped (execution continues in background)", style="yellow")
|
||||
except Exception as e:
|
||||
console.print(f"⚠️ Failed to start live monitoring: {e}", style="yellow")
|
||||
console.print(f"💡 You can still monitor manually: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
|
||||
console.print(
|
||||
f"💡 Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]"
|
||||
)
|
||||
|
||||
# Wait for completion if requested
|
||||
elif wait:
|
||||
console.print(f"\n⏳ Waiting for execution to complete...")
|
||||
if wait:
|
||||
console.print("\n⏳ Waiting for execution to complete...")
|
||||
try:
|
||||
final_status = client.wait_for_completion(response.run_id, poll_interval=POLL_INTERVAL)
|
||||
final_status = client.wait_for_completion(
|
||||
response.run_id, poll_interval=POLL_INTERVAL
|
||||
)
|
||||
|
||||
# Update database
|
||||
try:
|
||||
db.update_run_status(
|
||||
response.run_id,
|
||||
final_status.status,
|
||||
completed_at=datetime.now() if final_status.is_completed else None
|
||||
completed_at=datetime.now()
|
||||
if final_status.is_completed
|
||||
else None,
|
||||
)
|
||||
except Exception as e:
|
||||
console.print(f"⚠️ Failed to update database: {e}", style="yellow")
|
||||
console.print(
|
||||
f"⚠️ Failed to update database: {e}", style="yellow"
|
||||
)
|
||||
|
||||
console.print(f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}")
|
||||
console.print(
|
||||
f"🏁 Execution completed with status: {status_emoji(final_status.status)} {final_status.status}"
|
||||
)
|
||||
|
||||
if final_status.is_completed:
|
||||
console.print(f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]")
|
||||
console.print(
|
||||
f"💡 View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]"
|
||||
)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
console.print(f"\n⏹️ Monitoring cancelled (execution continues in background)", style="yellow")
|
||||
console.print(
|
||||
"\n⏹️ Monitoring cancelled (execution continues in background)",
|
||||
style="yellow",
|
||||
)
|
||||
except Exception as e:
|
||||
handle_error(e, "waiting for completion")
|
||||
|
||||
@@ -349,7 +366,9 @@ def execute_workflow(
|
||||
|
||||
@app.command("status")
|
||||
def workflow_status(
|
||||
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to check (defaults to most recent)")
|
||||
execution_id: Optional[str] = typer.Argument(
|
||||
None, help="Execution ID to check (defaults to most recent)"
|
||||
),
|
||||
):
|
||||
"""
|
||||
📊 Check the status of a workflow execution
|
||||
@@ -368,7 +387,9 @@ def workflow_status(
|
||||
if not execution_id:
|
||||
recent_runs = db.list_runs(limit=1)
|
||||
if not recent_runs:
|
||||
console.print("⚠️ No executions found in project database", style="yellow")
|
||||
console.print(
|
||||
"⚠️ No executions found in project database", style="yellow"
|
||||
)
|
||||
raise typer.Exit(0)
|
||||
execution_id = recent_runs[0].run_id
|
||||
console.print(f"🔍 Using most recent execution: {execution_id}")
|
||||
@@ -384,7 +405,7 @@ def workflow_status(
|
||||
db.update_run_status(
|
||||
execution_id,
|
||||
status.status,
|
||||
completed_at=status.updated_at if status.is_completed else None
|
||||
completed_at=status.updated_at if status.is_completed else None,
|
||||
)
|
||||
except Exception as e:
|
||||
console.print(f"⚠️ Failed to update database: {e}", style="yellow")
|
||||
@@ -404,23 +425,24 @@ def workflow_status(
|
||||
|
||||
if status.is_completed:
|
||||
duration = status.updated_at - status.created_at
|
||||
status_table.add_row("Duration", str(duration).split('.')[0]) # Remove microseconds
|
||||
status_table.add_row(
|
||||
"Duration", str(duration).split(".")[0]
|
||||
) # Remove microseconds
|
||||
|
||||
console.print(
|
||||
Panel.fit(
|
||||
status_table,
|
||||
title=f"📊 Status Information",
|
||||
box=box.ROUNDED
|
||||
)
|
||||
Panel.fit(status_table, title="📊 Status Information", box=box.ROUNDED)
|
||||
)
|
||||
|
||||
# Show next steps
|
||||
if status.is_running:
|
||||
console.print(f"\n💡 Monitor live: [bold cyan]fuzzforge monitor {execution_id}[/bold cyan]")
|
||||
elif status.is_completed:
|
||||
console.print(f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]")
|
||||
|
||||
if status.is_completed:
|
||||
console.print(
|
||||
f"💡 View findings: [bold cyan]fuzzforge finding {execution_id}[/bold cyan]"
|
||||
)
|
||||
elif status.is_failed:
|
||||
console.print(f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]")
|
||||
console.print(
|
||||
f"💡 Check logs: [bold cyan]fuzzforge workflow logs {execution_id}[/bold cyan]"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
handle_error(e, "getting execution status")
|
||||
@@ -428,9 +450,15 @@ def workflow_status(
|
||||
|
||||
@app.command("history")
|
||||
def workflow_history(
|
||||
workflow: Optional[str] = typer.Option(None, "--workflow", "-w", help="Filter by workflow name"),
|
||||
status: Optional[str] = typer.Option(None, "--status", "-s", help="Filter by status"),
|
||||
limit: int = typer.Option(20, "--limit", "-l", help="Maximum number of executions to show")
|
||||
workflow: Optional[str] = typer.Option(
|
||||
None, "--workflow", "-w", help="Filter by workflow name"
|
||||
),
|
||||
status: Optional[str] = typer.Option(
|
||||
None, "--status", "-s", help="Filter by status"
|
||||
),
|
||||
limit: int = typer.Option(
|
||||
20, "--limit", "-l", help="Maximum number of executions to show"
|
||||
),
|
||||
):
|
||||
"""
|
||||
📋 Show workflow execution history
|
||||
@@ -463,12 +491,14 @@ def workflow_history(
|
||||
param_str = f"{param_count} params" if param_count > 0 else "-"
|
||||
|
||||
table.add_row(
|
||||
run.run_id[:12] + "..." if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH else run.run_id,
|
||||
run.run_id[:12] + "..."
|
||||
if len(run.run_id) > MAX_RUN_ID_DISPLAY_LENGTH
|
||||
else run.run_id,
|
||||
run.workflow,
|
||||
f"{status_emoji(run.status)} {run.status}",
|
||||
Path(run.target_path).name,
|
||||
run.created_at.strftime("%m-%d %H:%M"),
|
||||
param_str
|
||||
param_str,
|
||||
)
|
||||
|
||||
console.print(f"\n📋 [bold]Workflow Execution History ({len(runs)})[/bold]")
|
||||
@@ -479,7 +509,9 @@ def workflow_history(
|
||||
console.print()
|
||||
console.print(table)
|
||||
|
||||
console.print(f"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status")
|
||||
console.print(
|
||||
"\n💡 Use [bold cyan]fuzzforge workflow status <execution-id>[/bold cyan] for detailed status"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
handle_error(e, "listing execution history")
|
||||
@@ -487,11 +519,15 @@ def workflow_history(
|
||||
|
||||
@app.command("retry")
|
||||
def retry_workflow(
|
||||
execution_id: Optional[str] = typer.Argument(None, help="Execution ID to retry (defaults to most recent)"),
|
||||
execution_id: Optional[str] = typer.Argument(
|
||||
None, help="Execution ID to retry (defaults to most recent)"
|
||||
),
|
||||
modify_params: bool = typer.Option(
|
||||
False, "--modify-params", "-m",
|
||||
help="Interactively modify parameters before retrying"
|
||||
)
|
||||
False,
|
||||
"--modify-params",
|
||||
"-m",
|
||||
help="Interactively modify parameters before retrying",
|
||||
),
|
||||
):
|
||||
"""
|
||||
🔄 Retry a workflow execution with the same or modified parameters
|
||||
@@ -517,7 +553,9 @@ def retry_workflow(
|
||||
# Get original execution
|
||||
original_run = db.get_run(execution_id)
|
||||
if not original_run:
|
||||
raise ValidationError("execution_id", execution_id, "an existing execution ID in the database")
|
||||
raise ValidationError(
|
||||
"execution_id", execution_id, "an existing execution ID in the database"
|
||||
)
|
||||
|
||||
console.print(f"🔄 [bold]Retrying workflow:[/bold] {original_run.workflow}")
|
||||
console.print(f" Original Execution ID: {execution_id}")
|
||||
@@ -527,24 +565,29 @@ def retry_workflow(
|
||||
|
||||
# Modify parameters if requested
|
||||
if modify_params and parameters:
|
||||
console.print(f"\n📝 [bold]Current parameters:[/bold]")
|
||||
console.print("\n📝 [bold]Current parameters:[/bold]")
|
||||
for key, value in parameters.items():
|
||||
new_value = Prompt.ask(
|
||||
f"{key}",
|
||||
default=str(value),
|
||||
console=console
|
||||
)
|
||||
new_value = Prompt.ask(f"{key}", default=str(value), console=console)
|
||||
if new_value != str(value):
|
||||
# Try to maintain type
|
||||
try:
|
||||
if isinstance(value, bool):
|
||||
parameters[key] = new_value.lower() in ("true", "yes", "1", "on")
|
||||
parameters[key] = new_value.lower() in (
|
||||
"true",
|
||||
"yes",
|
||||
"1",
|
||||
"on",
|
||||
)
|
||||
elif isinstance(value, int):
|
||||
parameters[key] = int(new_value)
|
||||
elif isinstance(value, float):
|
||||
parameters[key] = float(new_value)
|
||||
elif isinstance(value, list):
|
||||
parameters[key] = [item.strip() for item in new_value.split(",") if item.strip()]
|
||||
parameters[key] = [
|
||||
item.strip()
|
||||
for item in new_value.split(",")
|
||||
if item.strip()
|
||||
]
|
||||
else:
|
||||
parameters[key] = new_value
|
||||
except ValueError:
|
||||
@@ -553,15 +596,18 @@ def retry_workflow(
|
||||
# Submit new execution
|
||||
with get_client() as client:
|
||||
submission = WorkflowSubmission(
|
||||
target_path=original_run.target_path,
|
||||
parameters=parameters
|
||||
target_path=original_run.target_path, parameters=parameters
|
||||
)
|
||||
|
||||
response = client.submit_workflow(original_run.workflow, submission)
|
||||
|
||||
console.print(f"\n✅ Retry submitted successfully!", style="green")
|
||||
console.print(f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]")
|
||||
console.print(f" Status: {status_emoji(response.status)} {response.status}")
|
||||
console.print("\n✅ Retry submitted successfully!", style="green")
|
||||
console.print(
|
||||
f" New Execution ID: [bold cyan]{response.run_id}[/bold cyan]"
|
||||
)
|
||||
console.print(
|
||||
f" Status: {status_emoji(response.status)} {response.status}"
|
||||
)
|
||||
|
||||
# Save to database
|
||||
try:
|
||||
@@ -572,13 +618,17 @@ def retry_workflow(
|
||||
target_path=original_run.target_path,
|
||||
parameters=parameters,
|
||||
created_at=datetime.now(),
|
||||
metadata={"retry_of": execution_id}
|
||||
metadata={"retry_of": execution_id},
|
||||
)
|
||||
db.save_run(run_record)
|
||||
except Exception as e:
|
||||
console.print(f"⚠️ Failed to save execution to database: {e}", style="yellow")
|
||||
console.print(
|
||||
f"⚠️ Failed to save execution to database: {e}", style="yellow"
|
||||
)
|
||||
|
||||
console.print(f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]")
|
||||
console.print(
|
||||
f"\n💡 Monitor progress: [bold cyan]fuzzforge monitor {response.run_id}[/bold cyan]"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
handle_error(e, "retrying workflow")
|
||||
@@ -588,4 +638,4 @@ def retry_workflow(
|
||||
def workflow_exec_callback():
|
||||
"""
|
||||
🚀 Workflow execution management
|
||||
"""
|
||||
"""
|
||||
|
||||
@@ -14,9 +14,9 @@ Provides "Did you mean...?" functionality and intelligent command/parameter sugg
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
|
||||
import difflib
|
||||
from typing import List, Optional, Dict, Any, Tuple
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
@@ -34,10 +34,9 @@ class FuzzyMatcher:
|
||||
"workflows": ["list", "info"],
|
||||
"runs": ["submit", "status", "list", "rerun"],
|
||||
"findings": ["get", "list", "export", "all"],
|
||||
"monitor": ["stats", "crashes", "live"],
|
||||
"config": ["set", "get", "list", "init"],
|
||||
"ai": ["ask", "summarize", "explain"],
|
||||
"ingest": ["project", "findings"]
|
||||
"ingest": ["project", "findings"],
|
||||
}
|
||||
|
||||
# Common workflow names
|
||||
@@ -47,7 +46,7 @@ class FuzzyMatcher:
|
||||
"infrastructure_scan",
|
||||
"static_analysis_scan",
|
||||
"penetration_testing_scan",
|
||||
"secret_detection_scan"
|
||||
"secret_detection_scan",
|
||||
]
|
||||
|
||||
# Common parameter names
|
||||
@@ -60,24 +59,25 @@ class FuzzyMatcher:
|
||||
"param-file",
|
||||
"interactive",
|
||||
"wait",
|
||||
"live",
|
||||
"format",
|
||||
"output",
|
||||
"severity",
|
||||
"since",
|
||||
"limit",
|
||||
"stats",
|
||||
"export"
|
||||
"export",
|
||||
]
|
||||
|
||||
# Common values
|
||||
self.common_values = {
|
||||
"volume_mode": ["ro", "rw"],
|
||||
"format": ["json", "csv", "html", "sarif"],
|
||||
"severity": ["critical", "high", "medium", "low", "info"]
|
||||
"severity": ["critical", "high", "medium", "low", "info"],
|
||||
}
|
||||
|
||||
def find_closest_command(self, user_input: str, command_group: Optional[str] = None) -> Optional[Tuple[str, float]]:
|
||||
def find_closest_command(
|
||||
self, user_input: str, command_group: Optional[str] = None
|
||||
) -> Optional[Tuple[str, float]]:
|
||||
"""Find the closest matching command."""
|
||||
if command_group and command_group in self.commands:
|
||||
# Search within a specific command group
|
||||
@@ -86,9 +86,7 @@ class FuzzyMatcher:
|
||||
# Search all main commands
|
||||
candidates = list(self.commands.keys())
|
||||
|
||||
matches = difflib.get_close_matches(
|
||||
user_input, candidates, n=1, cutoff=0.6
|
||||
)
|
||||
matches = difflib.get_close_matches(user_input, candidates, n=1, cutoff=0.6)
|
||||
|
||||
if matches:
|
||||
match = matches[0]
|
||||
@@ -114,7 +112,7 @@ class FuzzyMatcher:
|
||||
def find_closest_parameter(self, user_input: str) -> Optional[Tuple[str, float]]:
|
||||
"""Find the closest matching parameter name."""
|
||||
# Remove leading dashes
|
||||
clean_input = user_input.lstrip('-')
|
||||
clean_input = user_input.lstrip("-")
|
||||
|
||||
matches = difflib.get_close_matches(
|
||||
clean_input, self.parameter_names, n=1, cutoff=0.6
|
||||
@@ -139,7 +137,9 @@ class FuzzyMatcher:
|
||||
|
||||
return []
|
||||
|
||||
def get_command_suggestions(self, user_command: List[str]) -> Optional[Dict[str, Any]]:
|
||||
def get_command_suggestions(
|
||||
self, user_command: List[str]
|
||||
) -> Optional[Dict[str, Any]]:
|
||||
"""Get suggestions for a user command that may have typos."""
|
||||
if not user_command:
|
||||
return None
|
||||
@@ -153,11 +153,9 @@ class FuzzyMatcher:
|
||||
if closest:
|
||||
match, confidence = closest
|
||||
suggestions["type"] = "main_command"
|
||||
suggestions["suggestions"].append({
|
||||
"text": match,
|
||||
"confidence": confidence,
|
||||
"type": "command"
|
||||
})
|
||||
suggestions["suggestions"].append(
|
||||
{"text": match, "confidence": confidence, "type": "command"}
|
||||
)
|
||||
|
||||
# Check subcommand if present
|
||||
elif len(user_command) > 1:
|
||||
@@ -167,11 +165,13 @@ class FuzzyMatcher:
|
||||
if closest:
|
||||
match, confidence = closest
|
||||
suggestions["type"] = "subcommand"
|
||||
suggestions["suggestions"].append({
|
||||
"text": f"{main_cmd} {match}",
|
||||
"confidence": confidence,
|
||||
"type": "subcommand"
|
||||
})
|
||||
suggestions["suggestions"].append(
|
||||
{
|
||||
"text": f"{main_cmd} {match}",
|
||||
"confidence": confidence,
|
||||
"type": "subcommand",
|
||||
}
|
||||
)
|
||||
|
||||
return suggestions if suggestions["suggestions"] else None
|
||||
|
||||
@@ -210,17 +210,19 @@ def display_command_suggestion(suggestions: Dict[str, Any]):
|
||||
|
||||
# Add helpful context
|
||||
if suggestion_type == "main_command":
|
||||
text.append("\n💡 Use 'fuzzforge --help' to see all available commands", style="dim")
|
||||
text.append(
|
||||
"\n💡 Use 'fuzzforge --help' to see all available commands", style="dim"
|
||||
)
|
||||
elif suggestion_type == "subcommand":
|
||||
main_cmd = suggestions["original"][0]
|
||||
text.append(f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands", style="dim")
|
||||
text.append(
|
||||
f"\n💡 Use 'fuzzforge {main_cmd} --help' to see available subcommands",
|
||||
style="dim",
|
||||
)
|
||||
|
||||
console.print(Panel(
|
||||
text,
|
||||
title="🤔 Command Suggestion",
|
||||
border_style="yellow",
|
||||
expand=False
|
||||
))
|
||||
console.print(
|
||||
Panel(text, title="🤔 Command Suggestion", border_style="yellow", expand=False)
|
||||
)
|
||||
|
||||
|
||||
def display_workflow_suggestion(original: str, suggestion: str):
|
||||
@@ -234,14 +236,13 @@ def display_workflow_suggestion(original: str, suggestion: str):
|
||||
text.append(f"'{suggestion}'", style="bold green")
|
||||
text.append("?\n\n")
|
||||
|
||||
text.append("💡 Use 'fuzzforge workflows' to see all available workflows", style="dim")
|
||||
text.append(
|
||||
"💡 Use 'fuzzforge workflows' to see all available workflows", style="dim"
|
||||
)
|
||||
|
||||
console.print(Panel(
|
||||
text,
|
||||
title="🔧 Workflow Suggestion",
|
||||
border_style="yellow",
|
||||
expand=False
|
||||
))
|
||||
console.print(
|
||||
Panel(text, title="🔧 Workflow Suggestion", border_style="yellow", expand=False)
|
||||
)
|
||||
|
||||
|
||||
def display_parameter_suggestion(original: str, suggestion: str):
|
||||
@@ -257,12 +258,9 @@ def display_parameter_suggestion(original: str, suggestion: str):
|
||||
|
||||
text.append("💡 Use '--help' to see all available parameters", style="dim")
|
||||
|
||||
console.print(Panel(
|
||||
text,
|
||||
title="⚙️ Parameter Suggestion",
|
||||
border_style="yellow",
|
||||
expand=False
|
||||
))
|
||||
console.print(
|
||||
Panel(text, title="⚙️ Parameter Suggestion", border_style="yellow", expand=False)
|
||||
)
|
||||
|
||||
|
||||
def enhanced_command_not_found_handler(command_parts: List[str]):
|
||||
@@ -306,4 +304,4 @@ def enhanced_parameter_not_found_handler(parameter_name: str):
|
||||
|
||||
|
||||
# Global fuzzy matcher instance
|
||||
fuzzy_matcher = FuzzyMatcher()
|
||||
fuzzy_matcher = FuzzyMatcher()
|
||||
|
||||
@@ -12,22 +12,23 @@ Main CLI application with improved command structure.
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
import sys
|
||||
from typing import List, Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.traceback import install
|
||||
from typing import Optional, List
|
||||
import sys
|
||||
|
||||
from .commands import (
|
||||
init,
|
||||
workflows,
|
||||
workflow_exec,
|
||||
findings,
|
||||
monitor,
|
||||
config as config_cmd,
|
||||
ai,
|
||||
findings,
|
||||
ingest,
|
||||
init,
|
||||
workflow_exec,
|
||||
workflows,
|
||||
)
|
||||
from .commands import (
|
||||
config as config_cmd,
|
||||
)
|
||||
from .constants import DEFAULT_VOLUME_MODE
|
||||
from .fuzzy import enhanced_command_not_found_handler
|
||||
@@ -78,25 +79,30 @@ finding_app = typer.Typer(
|
||||
|
||||
# === Top-level commands ===
|
||||
|
||||
|
||||
@app.command()
|
||||
def init(
|
||||
name: Optional[str] = typer.Option(
|
||||
None, "--name", "-n",
|
||||
help="Project name (defaults to current directory name)"
|
||||
None, "--name", "-n", help="Project name (defaults to current directory name)"
|
||||
),
|
||||
api_url: Optional[str] = typer.Option(
|
||||
None, "--api-url", "-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)"
|
||||
None,
|
||||
"--api-url",
|
||||
"-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)",
|
||||
),
|
||||
force: bool = typer.Option(
|
||||
False, "--force", "-f",
|
||||
help="Force initialization even if project already exists"
|
||||
)
|
||||
False,
|
||||
"--force",
|
||||
"-f",
|
||||
help="Force initialization even if project already exists",
|
||||
),
|
||||
):
|
||||
"""
|
||||
📁 Initialize a new FuzzForge project
|
||||
"""
|
||||
from .commands.init import project
|
||||
|
||||
project(name=name, api_url=api_url, force=force)
|
||||
|
||||
|
||||
@@ -106,18 +112,18 @@ def status():
|
||||
📊 Show project and latest execution status
|
||||
"""
|
||||
from .commands.status import show_status
|
||||
|
||||
show_status()
|
||||
|
||||
|
||||
@app.command()
|
||||
def config(
|
||||
key: Optional[str] = typer.Argument(None, help="Configuration key"),
|
||||
value: Optional[str] = typer.Argument(None, help="Configuration value to set")
|
||||
value: Optional[str] = typer.Argument(None, help="Configuration value to set"),
|
||||
):
|
||||
"""
|
||||
⚙️ Manage configuration (show all, get, or set values)
|
||||
"""
|
||||
from .commands import config as config_cmd
|
||||
|
||||
if key is None:
|
||||
# No arguments: show all config
|
||||
@@ -133,13 +139,11 @@ def config(
|
||||
@app.command()
|
||||
def clean(
|
||||
days: int = typer.Option(
|
||||
90, "--days", "-d",
|
||||
help="Remove data older than this many days"
|
||||
90, "--days", "-d", help="Remove data older than this many days"
|
||||
),
|
||||
dry_run: bool = typer.Option(
|
||||
False, "--dry-run",
|
||||
help="Show what would be deleted without actually deleting"
|
||||
)
|
||||
False, "--dry-run", help="Show what would be deleted without actually deleting"
|
||||
),
|
||||
):
|
||||
"""
|
||||
🧹 Clean old execution data and findings
|
||||
@@ -155,7 +159,9 @@ def clean(
|
||||
raise typer.Exit(1)
|
||||
|
||||
if dry_run:
|
||||
console.print(f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days")
|
||||
console.print(
|
||||
f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days"
|
||||
)
|
||||
|
||||
deleted = db.cleanup_old_runs(keep_days=days)
|
||||
|
||||
@@ -177,35 +183,35 @@ workflow_app.command("retry")(workflow_exec.retry_workflow)
|
||||
workflow_app.command("info")(workflows.workflow_info)
|
||||
workflow_app.command("params")(workflows.workflow_parameters)
|
||||
|
||||
|
||||
@workflow_app.command("run")
|
||||
def run_workflow(
|
||||
workflow: str = typer.Argument(help="Workflow name"),
|
||||
target: str = typer.Argument(help="Target path"),
|
||||
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
|
||||
params: List[str] = typer.Argument(
|
||||
default=None, help="Parameters as key=value pairs"
|
||||
),
|
||||
param_file: Optional[str] = typer.Option(
|
||||
None, "--param-file", "-f",
|
||||
help="JSON file containing workflow parameters"
|
||||
None, "--param-file", "-f", help="JSON file containing workflow parameters"
|
||||
),
|
||||
volume_mode: str = typer.Option(
|
||||
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)"
|
||||
DEFAULT_VOLUME_MODE,
|
||||
"--volume-mode",
|
||||
"-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)",
|
||||
),
|
||||
timeout: Optional[int] = typer.Option(
|
||||
None, "--timeout", "-t",
|
||||
help="Execution timeout in seconds"
|
||||
None, "--timeout", "-t", help="Execution timeout in seconds"
|
||||
),
|
||||
interactive: bool = typer.Option(
|
||||
True, "--interactive/--no-interactive", "-i/-n",
|
||||
help="Interactive parameter input for missing required parameters"
|
||||
True,
|
||||
"--interactive/--no-interactive",
|
||||
"-i/-n",
|
||||
help="Interactive parameter input for missing required parameters",
|
||||
),
|
||||
wait: bool = typer.Option(
|
||||
False, "--wait", "-w",
|
||||
help="Wait for execution to complete"
|
||||
False, "--wait", "-w", help="Wait for execution to complete"
|
||||
),
|
||||
live: bool = typer.Option(
|
||||
False, "--live", "-l",
|
||||
help="Start live monitoring after execution (useful for fuzzing workflows)"
|
||||
)
|
||||
):
|
||||
"""
|
||||
🚀 Execute a security testing workflow
|
||||
@@ -221,9 +227,9 @@ def run_workflow(
|
||||
timeout=timeout,
|
||||
interactive=interactive,
|
||||
wait=wait,
|
||||
live=live
|
||||
)
|
||||
|
||||
|
||||
@workflow_app.callback()
|
||||
def workflow_main():
|
||||
"""
|
||||
@@ -239,17 +245,18 @@ def workflow_main():
|
||||
|
||||
# === Finding commands (singular) ===
|
||||
|
||||
|
||||
@finding_app.command("export")
|
||||
def export_finding(
|
||||
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"),
|
||||
execution_id: Optional[str] = typer.Argument(
|
||||
None, help="Execution ID (defaults to latest)"
|
||||
),
|
||||
format: str = typer.Option(
|
||||
"sarif", "--format", "-f",
|
||||
help="Export format: sarif, json, csv"
|
||||
"sarif", "--format", "-f", help="Export format: sarif, json, csv"
|
||||
),
|
||||
output: Optional[str] = typer.Option(
|
||||
None, "--output", "-o",
|
||||
help="Output file (defaults to stdout)"
|
||||
)
|
||||
None, "--output", "-o", help="Output file (defaults to stdout)"
|
||||
),
|
||||
):
|
||||
"""
|
||||
📤 Export findings to file
|
||||
@@ -270,7 +277,9 @@ def export_finding(
|
||||
execution_id = recent_runs[0].run_id
|
||||
console.print(f"🔍 Using most recent execution: {execution_id}")
|
||||
else:
|
||||
console.print("⚠️ No findings found in project database", style="yellow")
|
||||
console.print(
|
||||
"⚠️ No findings found in project database", style="yellow"
|
||||
)
|
||||
return
|
||||
else:
|
||||
console.print("❌ No project database found", style="red")
|
||||
@@ -283,14 +292,16 @@ def export_finding(
|
||||
|
||||
@finding_app.command("analyze")
|
||||
def analyze_finding(
|
||||
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze")
|
||||
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze"),
|
||||
):
|
||||
"""
|
||||
🤖 AI analysis of a finding
|
||||
"""
|
||||
from .commands.ai import analyze_finding as ai_analyze
|
||||
|
||||
ai_analyze(finding_id)
|
||||
|
||||
|
||||
@finding_app.callback(invoke_without_command=True)
|
||||
def finding_main(
|
||||
ctx: typer.Context,
|
||||
@@ -309,7 +320,7 @@ def finding_main(
|
||||
return
|
||||
|
||||
# Get remaining arguments for direct viewing
|
||||
args = ctx.args if hasattr(ctx, 'args') else []
|
||||
args = ctx.args if hasattr(ctx, "args") else []
|
||||
finding_id = args[0] if args else None
|
||||
|
||||
# Direct viewing: fuzzforge finding [id]
|
||||
@@ -329,7 +340,9 @@ def finding_main(
|
||||
finding_id = recent_runs[0].run_id
|
||||
console.print(f"🔍 Using most recent execution: {finding_id}")
|
||||
else:
|
||||
console.print("⚠️ No findings found in project database", style="yellow")
|
||||
console.print(
|
||||
"⚠️ No findings found in project database", style="yellow"
|
||||
)
|
||||
return
|
||||
else:
|
||||
console.print("❌ No project database found", style="red")
|
||||
@@ -351,10 +364,10 @@ app.add_typer(workflow_app, name="workflow", help="🚀 Execute and manage workf
|
||||
app.add_typer(finding_app, name="finding", help="🔍 View and analyze findings")
|
||||
|
||||
# Other command groups
|
||||
app.add_typer(monitor.app, name="monitor", help="📊 Real-time monitoring")
|
||||
app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
|
||||
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")
|
||||
|
||||
|
||||
# Help and utility commands
|
||||
@app.command()
|
||||
def examples():
|
||||
@@ -371,13 +384,11 @@ def examples():
|
||||
|
||||
[bold]Execute Workflows:[/bold]
|
||||
ff workflow afl-fuzzing ./target # Run fuzzing on target
|
||||
ff workflow afl-fuzzing . --live # Run with live monitoring
|
||||
ff workflow scan-c ./src timeout=300 threads=4 # With parameters
|
||||
|
||||
[bold]Monitor Execution:[/bold]
|
||||
ff status # Check latest execution
|
||||
ff workflow status # Same as above
|
||||
ff monitor # Live monitoring dashboard
|
||||
ff workflow history # Show past executions
|
||||
|
||||
[bold]Review Findings:[/bold]
|
||||
@@ -399,16 +410,16 @@ def version():
|
||||
📦 Show version information
|
||||
"""
|
||||
from . import __version__
|
||||
|
||||
console.print(f"FuzzForge CLI v{__version__}")
|
||||
console.print(f"Short command: ff")
|
||||
console.print("Short command: ff")
|
||||
|
||||
|
||||
@app.callback()
|
||||
def main_callback(
|
||||
ctx: typer.Context,
|
||||
version: Optional[bool] = typer.Option(
|
||||
None, "--version", "-v",
|
||||
help="Show version information"
|
||||
None, "--version", "-v", help="Show version information"
|
||||
),
|
||||
):
|
||||
"""
|
||||
@@ -422,6 +433,7 @@ def main_callback(
|
||||
"""
|
||||
if version:
|
||||
from . import __version__
|
||||
|
||||
console.print(f"FuzzForge CLI v{__version__}")
|
||||
raise typer.Exit()
|
||||
|
||||
@@ -432,12 +444,11 @@ def main():
|
||||
if len(sys.argv) > 1:
|
||||
args = sys.argv[1:]
|
||||
|
||||
|
||||
# Handle finding command with pattern recognition
|
||||
if len(args) >= 2 and args[0] == 'finding':
|
||||
finding_subcommands = ['export', 'analyze']
|
||||
if len(args) >= 2 and args[0] == "finding":
|
||||
finding_subcommands = ["export", "analyze"]
|
||||
# Skip custom dispatching if help flags are present
|
||||
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args):
|
||||
if not any(arg in ["--help", "-h", "--version", "-v"] for arg in args):
|
||||
if args[1] not in finding_subcommands:
|
||||
# Direct finding display: ff finding <id>
|
||||
from .commands.findings import get_findings
|
||||
@@ -457,18 +468,26 @@ def main():
|
||||
app()
|
||||
except SystemExit as e:
|
||||
# Enhanced error handling for command not found
|
||||
if hasattr(e, 'code') and e.code != 0 and len(sys.argv) > 1:
|
||||
if hasattr(e, "code") and e.code != 0 and len(sys.argv) > 1:
|
||||
command_parts = sys.argv[1:]
|
||||
clean_parts = [part for part in command_parts if not part.startswith('-')]
|
||||
clean_parts = [part for part in command_parts if not part.startswith("-")]
|
||||
|
||||
if clean_parts:
|
||||
main_cmd = clean_parts[0]
|
||||
valid_commands = [
|
||||
'init', 'status', 'config', 'clean',
|
||||
'workflows', 'workflow',
|
||||
'findings', 'finding',
|
||||
'monitor', 'ai', 'ingest',
|
||||
'examples', 'version'
|
||||
"init",
|
||||
"status",
|
||||
"config",
|
||||
"clean",
|
||||
"workflows",
|
||||
"workflow",
|
||||
"findings",
|
||||
"finding",
|
||||
"monitor",
|
||||
"ai",
|
||||
"ingest",
|
||||
"examples",
|
||||
"version",
|
||||
]
|
||||
|
||||
if main_cmd not in valid_commands:
|
||||
|
||||
Reference in New Issue
Block a user