mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-12 20:32:46 +00:00
fix: removed erroneous example
This commit is contained in:
@@ -12,22 +12,24 @@ Main CLI application with improved command structure.
|
||||
#
|
||||
# Additional attribution and requirements are provided in the NOTICE file.
|
||||
|
||||
import sys
|
||||
from typing import List, Optional
|
||||
|
||||
import typer
|
||||
from rich.console import Console
|
||||
from rich.traceback import install
|
||||
from typing import Optional, List
|
||||
import sys
|
||||
|
||||
from .commands import (
|
||||
init,
|
||||
workflows,
|
||||
workflow_exec,
|
||||
findings,
|
||||
monitor,
|
||||
config as config_cmd,
|
||||
ai,
|
||||
findings,
|
||||
ingest,
|
||||
init,
|
||||
monitor,
|
||||
workflow_exec,
|
||||
workflows,
|
||||
)
|
||||
from .commands import (
|
||||
config as config_cmd,
|
||||
)
|
||||
from .constants import DEFAULT_VOLUME_MODE
|
||||
from .fuzzy import enhanced_command_not_found_handler
|
||||
@@ -78,25 +80,30 @@ finding_app = typer.Typer(
|
||||
|
||||
# === Top-level commands ===
|
||||
|
||||
|
||||
@app.command()
|
||||
def init(
|
||||
name: Optional[str] = typer.Option(
|
||||
None, "--name", "-n",
|
||||
help="Project name (defaults to current directory name)"
|
||||
None, "--name", "-n", help="Project name (defaults to current directory name)"
|
||||
),
|
||||
api_url: Optional[str] = typer.Option(
|
||||
None, "--api-url", "-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)"
|
||||
None,
|
||||
"--api-url",
|
||||
"-u",
|
||||
help="FuzzForge API URL (defaults to http://localhost:8000)",
|
||||
),
|
||||
force: bool = typer.Option(
|
||||
False, "--force", "-f",
|
||||
help="Force initialization even if project already exists"
|
||||
)
|
||||
False,
|
||||
"--force",
|
||||
"-f",
|
||||
help="Force initialization even if project already exists",
|
||||
),
|
||||
):
|
||||
"""
|
||||
📁 Initialize a new FuzzForge project
|
||||
"""
|
||||
from .commands.init import project
|
||||
|
||||
project(name=name, api_url=api_url, force=force)
|
||||
|
||||
|
||||
@@ -106,18 +113,18 @@ def status():
|
||||
📊 Show project and latest execution status
|
||||
"""
|
||||
from .commands.status import show_status
|
||||
|
||||
show_status()
|
||||
|
||||
|
||||
@app.command()
|
||||
def config(
|
||||
key: Optional[str] = typer.Argument(None, help="Configuration key"),
|
||||
value: Optional[str] = typer.Argument(None, help="Configuration value to set")
|
||||
value: Optional[str] = typer.Argument(None, help="Configuration value to set"),
|
||||
):
|
||||
"""
|
||||
⚙️ Manage configuration (show all, get, or set values)
|
||||
"""
|
||||
from .commands import config as config_cmd
|
||||
|
||||
if key is None:
|
||||
# No arguments: show all config
|
||||
@@ -133,13 +140,11 @@ def config(
|
||||
@app.command()
|
||||
def clean(
|
||||
days: int = typer.Option(
|
||||
90, "--days", "-d",
|
||||
help="Remove data older than this many days"
|
||||
90, "--days", "-d", help="Remove data older than this many days"
|
||||
),
|
||||
dry_run: bool = typer.Option(
|
||||
False, "--dry-run",
|
||||
help="Show what would be deleted without actually deleting"
|
||||
)
|
||||
False, "--dry-run", help="Show what would be deleted without actually deleting"
|
||||
),
|
||||
):
|
||||
"""
|
||||
🧹 Clean old execution data and findings
|
||||
@@ -155,7 +160,9 @@ def clean(
|
||||
raise typer.Exit(1)
|
||||
|
||||
if dry_run:
|
||||
console.print(f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days")
|
||||
console.print(
|
||||
f"🔍 [bold]Dry run:[/bold] Would clean data older than {days} days"
|
||||
)
|
||||
|
||||
deleted = db.cleanup_old_runs(keep_days=days)
|
||||
|
||||
@@ -177,35 +184,41 @@ workflow_app.command("retry")(workflow_exec.retry_workflow)
|
||||
workflow_app.command("info")(workflows.workflow_info)
|
||||
workflow_app.command("params")(workflows.workflow_parameters)
|
||||
|
||||
|
||||
@workflow_app.command("run")
|
||||
def run_workflow(
|
||||
workflow: str = typer.Argument(help="Workflow name"),
|
||||
target: str = typer.Argument(help="Target path"),
|
||||
params: List[str] = typer.Argument(default=None, help="Parameters as key=value pairs"),
|
||||
params: List[str] = typer.Argument(
|
||||
default=None, help="Parameters as key=value pairs"
|
||||
),
|
||||
param_file: Optional[str] = typer.Option(
|
||||
None, "--param-file", "-f",
|
||||
help="JSON file containing workflow parameters"
|
||||
None, "--param-file", "-f", help="JSON file containing workflow parameters"
|
||||
),
|
||||
volume_mode: str = typer.Option(
|
||||
DEFAULT_VOLUME_MODE, "--volume-mode", "-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)"
|
||||
DEFAULT_VOLUME_MODE,
|
||||
"--volume-mode",
|
||||
"-v",
|
||||
help="Volume mount mode: ro (read-only) or rw (read-write)",
|
||||
),
|
||||
timeout: Optional[int] = typer.Option(
|
||||
None, "--timeout", "-t",
|
||||
help="Execution timeout in seconds"
|
||||
None, "--timeout", "-t", help="Execution timeout in seconds"
|
||||
),
|
||||
interactive: bool = typer.Option(
|
||||
True, "--interactive/--no-interactive", "-i/-n",
|
||||
help="Interactive parameter input for missing required parameters"
|
||||
True,
|
||||
"--interactive/--no-interactive",
|
||||
"-i/-n",
|
||||
help="Interactive parameter input for missing required parameters",
|
||||
),
|
||||
wait: bool = typer.Option(
|
||||
False, "--wait", "-w",
|
||||
help="Wait for execution to complete"
|
||||
False, "--wait", "-w", help="Wait for execution to complete"
|
||||
),
|
||||
live: bool = typer.Option(
|
||||
False, "--live", "-l",
|
||||
help="Start live monitoring after execution (useful for fuzzing workflows)"
|
||||
)
|
||||
False,
|
||||
"--live",
|
||||
"-l",
|
||||
help="Start live monitoring after execution (useful for fuzzing workflows)",
|
||||
),
|
||||
):
|
||||
"""
|
||||
🚀 Execute a security testing workflow
|
||||
@@ -221,9 +234,10 @@ def run_workflow(
|
||||
timeout=timeout,
|
||||
interactive=interactive,
|
||||
wait=wait,
|
||||
live=live
|
||||
live=live,
|
||||
)
|
||||
|
||||
|
||||
@workflow_app.callback()
|
||||
def workflow_main():
|
||||
"""
|
||||
@@ -239,17 +253,18 @@ def workflow_main():
|
||||
|
||||
# === Finding commands (singular) ===
|
||||
|
||||
|
||||
@finding_app.command("export")
|
||||
def export_finding(
|
||||
execution_id: Optional[str] = typer.Argument(None, help="Execution ID (defaults to latest)"),
|
||||
execution_id: Optional[str] = typer.Argument(
|
||||
None, help="Execution ID (defaults to latest)"
|
||||
),
|
||||
format: str = typer.Option(
|
||||
"sarif", "--format", "-f",
|
||||
help="Export format: sarif, json, csv"
|
||||
"sarif", "--format", "-f", help="Export format: sarif, json, csv"
|
||||
),
|
||||
output: Optional[str] = typer.Option(
|
||||
None, "--output", "-o",
|
||||
help="Output file (defaults to stdout)"
|
||||
)
|
||||
None, "--output", "-o", help="Output file (defaults to stdout)"
|
||||
),
|
||||
):
|
||||
"""
|
||||
📤 Export findings to file
|
||||
@@ -270,7 +285,9 @@ def export_finding(
|
||||
execution_id = recent_runs[0].run_id
|
||||
console.print(f"🔍 Using most recent execution: {execution_id}")
|
||||
else:
|
||||
console.print("⚠️ No findings found in project database", style="yellow")
|
||||
console.print(
|
||||
"⚠️ No findings found in project database", style="yellow"
|
||||
)
|
||||
return
|
||||
else:
|
||||
console.print("❌ No project database found", style="red")
|
||||
@@ -283,14 +300,16 @@ def export_finding(
|
||||
|
||||
@finding_app.command("analyze")
|
||||
def analyze_finding(
|
||||
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze")
|
||||
finding_id: Optional[str] = typer.Argument(None, help="Finding ID to analyze"),
|
||||
):
|
||||
"""
|
||||
🤖 AI analysis of a finding
|
||||
"""
|
||||
from .commands.ai import analyze_finding as ai_analyze
|
||||
|
||||
ai_analyze(finding_id)
|
||||
|
||||
|
||||
@finding_app.callback(invoke_without_command=True)
|
||||
def finding_main(
|
||||
ctx: typer.Context,
|
||||
@@ -309,7 +328,7 @@ def finding_main(
|
||||
return
|
||||
|
||||
# Get remaining arguments for direct viewing
|
||||
args = ctx.args if hasattr(ctx, 'args') else []
|
||||
args = ctx.args if hasattr(ctx, "args") else []
|
||||
finding_id = args[0] if args else None
|
||||
|
||||
# Direct viewing: fuzzforge finding [id]
|
||||
@@ -329,7 +348,9 @@ def finding_main(
|
||||
finding_id = recent_runs[0].run_id
|
||||
console.print(f"🔍 Using most recent execution: {finding_id}")
|
||||
else:
|
||||
console.print("⚠️ No findings found in project database", style="yellow")
|
||||
console.print(
|
||||
"⚠️ No findings found in project database", style="yellow"
|
||||
)
|
||||
return
|
||||
else:
|
||||
console.print("❌ No project database found", style="red")
|
||||
@@ -355,6 +376,7 @@ app.add_typer(monitor.app, name="monitor", help="📊 Real-time monitoring")
|
||||
app.add_typer(ai.app, name="ai", help="🤖 AI integration features")
|
||||
app.add_typer(ingest.app, name="ingest", help="🧠 Ingest knowledge into AI")
|
||||
|
||||
|
||||
# Help and utility commands
|
||||
@app.command()
|
||||
def examples():
|
||||
@@ -372,7 +394,6 @@ def examples():
|
||||
[bold]Execute Workflows:[/bold]
|
||||
ff workflow afl-fuzzing ./target # Run fuzzing on target
|
||||
ff workflow afl-fuzzing . --live # Run with live monitoring
|
||||
ff workflow scan-c ./src timeout=300 threads=4 # With parameters
|
||||
|
||||
[bold]Monitor Execution:[/bold]
|
||||
ff status # Check latest execution
|
||||
@@ -399,16 +420,16 @@ def version():
|
||||
📦 Show version information
|
||||
"""
|
||||
from . import __version__
|
||||
|
||||
console.print(f"FuzzForge CLI v{__version__}")
|
||||
console.print(f"Short command: ff")
|
||||
console.print("Short command: ff")
|
||||
|
||||
|
||||
@app.callback()
|
||||
def main_callback(
|
||||
ctx: typer.Context,
|
||||
version: Optional[bool] = typer.Option(
|
||||
None, "--version", "-v",
|
||||
help="Show version information"
|
||||
None, "--version", "-v", help="Show version information"
|
||||
),
|
||||
):
|
||||
"""
|
||||
@@ -422,6 +443,7 @@ def main_callback(
|
||||
"""
|
||||
if version:
|
||||
from . import __version__
|
||||
|
||||
console.print(f"FuzzForge CLI v{__version__}")
|
||||
raise typer.Exit()
|
||||
|
||||
@@ -432,12 +454,11 @@ def main():
|
||||
if len(sys.argv) > 1:
|
||||
args = sys.argv[1:]
|
||||
|
||||
|
||||
# Handle finding command with pattern recognition
|
||||
if len(args) >= 2 and args[0] == 'finding':
|
||||
finding_subcommands = ['export', 'analyze']
|
||||
if len(args) >= 2 and args[0] == "finding":
|
||||
finding_subcommands = ["export", "analyze"]
|
||||
# Skip custom dispatching if help flags are present
|
||||
if not any(arg in ['--help', '-h', '--version', '-v'] for arg in args):
|
||||
if not any(arg in ["--help", "-h", "--version", "-v"] for arg in args):
|
||||
if args[1] not in finding_subcommands:
|
||||
# Direct finding display: ff finding <id>
|
||||
from .commands.findings import get_findings
|
||||
@@ -457,18 +478,26 @@ def main():
|
||||
app()
|
||||
except SystemExit as e:
|
||||
# Enhanced error handling for command not found
|
||||
if hasattr(e, 'code') and e.code != 0 and len(sys.argv) > 1:
|
||||
if hasattr(e, "code") and e.code != 0 and len(sys.argv) > 1:
|
||||
command_parts = sys.argv[1:]
|
||||
clean_parts = [part for part in command_parts if not part.startswith('-')]
|
||||
clean_parts = [part for part in command_parts if not part.startswith("-")]
|
||||
|
||||
if clean_parts:
|
||||
main_cmd = clean_parts[0]
|
||||
valid_commands = [
|
||||
'init', 'status', 'config', 'clean',
|
||||
'workflows', 'workflow',
|
||||
'findings', 'finding',
|
||||
'monitor', 'ai', 'ingest',
|
||||
'examples', 'version'
|
||||
"init",
|
||||
"status",
|
||||
"config",
|
||||
"clean",
|
||||
"workflows",
|
||||
"workflow",
|
||||
"findings",
|
||||
"finding",
|
||||
"monitor",
|
||||
"ai",
|
||||
"ingest",
|
||||
"examples",
|
||||
"version",
|
||||
]
|
||||
|
||||
if main_cmd not in valid_commands:
|
||||
|
||||
Reference in New Issue
Block a user