fix: worker naming, monitor commands, and findings CLI improvements

This PR addresses multiple issues and improvements across the CLI and backend:

**Worker Naming Fixes:**
- Fix worker container naming mismatch between CLI and docker-compose
- Update worker_manager.py to use docker compose commands with service names
- Remove worker_container field from workflows API, keep only worker_service
- Backend now correctly uses service names (worker-python, worker-secrets, etc.)

**Backend API Fixes:**
- Fix workflow name extraction from run_id in runs.py (was showing "unknown")
- Update monitor command suggestions from 'monitor stats' to 'monitor live'

**Monitor Command Consolidation:**
- Merge 'monitor stats' and 'monitor live' into single 'monitor live' command
- Add --once and --style flags for flexibility
- Remove all references to deprecated 'monitor stats' command

**Findings CLI Structure Improvements (Closes #18):**
- Move 'show' command from 'findings' (plural) to 'finding' (singular)
- Keep 'export' command in 'findings' (plural) as it exports all findings
- Remove broken 'analyze' command (imported non-existent function)
- Update all command suggestions to use correct paths
- Fix smart routing logic in main.py to handle new command structure
- Add export suggestions after viewing findings with unique timestamps
- Change default export format to SARIF (industry standard)

**Docker Compose:**
- Remove obsolete version field to fix deprecation warning

All commands tested and working:
- ff finding show <run-id> --rule <rule-id> ✓
- ff findings export <run-id> ✓
- ff finding <run-id> (direct viewing) ✓
- ff monitor live <run-id> ✓
This commit is contained in:
tduhamel42
2025-10-21 16:53:08 +02:00
parent 00e23450c9
commit 8c04ff7bda
9 changed files with 283 additions and 204 deletions
+149 -12
View File
@@ -140,11 +140,145 @@ def get_findings(
else: # table format
display_findings_table(findings.sarif)
# Suggest export command and show command
console.print(f"\n💡 View full details of a finding: [bold cyan]ff finding show {run_id} --rule <rule-id>[/bold cyan]")
console.print(f"💡 Export these findings: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]")
console.print(" Supported formats: [cyan]sarif[/cyan] (standard), [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]")
except Exception as e:
console.print(f"❌ Failed to get findings: {e}", style="red")
raise typer.Exit(1)
def show_finding(
run_id: str = typer.Argument(..., help="Run ID to get finding from"),
rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show")
):
"""
🔍 Show detailed information about a specific finding
This function is registered as a command in main.py under the finding (singular) command group.
"""
try:
require_project()
validate_run_id(run_id)
# Try to get from database first, fallback to API
db = get_project_db()
findings_data = None
if db:
findings_data = db.get_findings(run_id)
if not findings_data:
with get_client() as client:
console.print(f"🔍 Fetching findings for run: {run_id}")
findings = client.get_run_findings(run_id)
sarif_data = findings.sarif
else:
sarif_data = findings_data.sarif_data
# Find the specific finding by rule_id
runs = sarif_data.get("runs", [])
if not runs:
console.print("❌ No findings data available", style="red")
raise typer.Exit(1)
run_data = runs[0]
results = run_data.get("results", [])
tool = run_data.get("tool", {}).get("driver", {})
# Search for matching finding
matching_finding = None
for result in results:
if result.get("ruleId") == rule_id:
matching_finding = result
break
if not matching_finding:
console.print(f"❌ No finding found with rule ID: {rule_id}", style="red")
console.print(f"💡 Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim")
raise typer.Exit(1)
# Display detailed finding
display_finding_detail(matching_finding, tool, run_id)
except Exception as e:
console.print(f"❌ Failed to get finding: {e}", style="red")
raise typer.Exit(1)
def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id: str):
"""Display detailed information about a single finding"""
rule_id = finding.get("ruleId", "unknown")
level = finding.get("level", "note")
message = finding.get("message", {})
message_text = message.get("text", "No summary available")
message_markdown = message.get("markdown", message_text)
# Get location
locations = finding.get("locations", [])
location_str = "Unknown location"
code_snippet = None
if locations:
physical_location = locations[0].get("physicalLocation", {})
artifact_location = physical_location.get("artifactLocation", {})
region = physical_location.get("region", {})
file_path = artifact_location.get("uri", "")
if file_path:
location_str = file_path
if region.get("startLine"):
location_str += f":{region['startLine']}"
if region.get("startColumn"):
location_str += f":{region['startColumn']}"
# Get code snippet if available
if region.get("snippet", {}).get("text"):
code_snippet = region["snippet"]["text"].strip()
# Get severity style
severity_color = {
"error": "red",
"warning": "yellow",
"note": "blue",
"info": "cyan"
}.get(level.lower(), "white")
# Build detailed content
content_lines = []
content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}")
content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{level.upper()}[/{severity_color}]")
content_lines.append(f"[bold]Location:[/bold] {location_str}")
content_lines.append(f"[bold]Tool:[/bold] {tool.get('name', 'Unknown')} v{tool.get('version', 'unknown')}")
content_lines.append(f"[bold]Run ID:[/bold] {run_id}")
content_lines.append("")
content_lines.append(f"[bold]Summary:[/bold]")
content_lines.append(message_text)
content_lines.append("")
content_lines.append(f"[bold]Description:[/bold]")
content_lines.append(message_markdown)
if code_snippet:
content_lines.append("")
content_lines.append(f"[bold]Code Snippet:[/bold]")
content_lines.append(f"[dim]{code_snippet}[/dim]")
content = "\n".join(content_lines)
# Display in panel
console.print()
console.print(Panel(
content,
title=f"🔍 Finding Detail",
border_style=severity_color,
box=box.ROUNDED,
padding=(1, 2)
))
console.print()
console.print(f"💡 Export this run: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]")
def display_findings_table(sarif_data: Dict[str, Any]):
"""Display SARIF findings in a rich table format"""
runs = sarif_data.get("runs", [])
@@ -195,8 +329,8 @@ def display_findings_table(sarif_data: Dict[str, Any]):
# Detailed results - Rich Text-based table with proper emoji alignment
results_table = Table(box=box.ROUNDED)
results_table.add_column("Severity", width=12, justify="left", no_wrap=True)
results_table.add_column("Rule", width=25, justify="left", style="bold cyan", no_wrap=True)
results_table.add_column("Message", width=55, justify="left", no_wrap=True)
results_table.add_column("Rule", justify="left", style="bold cyan", no_wrap=True)
results_table.add_column("Message", width=45, justify="left", no_wrap=True)
results_table.add_column("Location", width=20, justify="left", style="dim", no_wrap=True)
for result in results[:50]: # Limit to first 50 results
@@ -224,18 +358,16 @@ def display_findings_table(sarif_data: Dict[str, Any]):
severity_text = Text(level.upper(), style=severity_style(level))
severity_text.truncate(12, overflow="ellipsis")
rule_text = Text(rule_id)
rule_text.truncate(25, overflow="ellipsis")
# Show full rule ID without truncation
message_text = Text(message)
message_text.truncate(55, overflow="ellipsis")
message_text.truncate(45, overflow="ellipsis")
location_text = Text(location_str)
location_text.truncate(20, overflow="ellipsis")
results_table.add_row(
severity_text,
rule_text,
rule_id, # Pass string directly to show full UUID
message_text,
location_text
)
@@ -307,16 +439,20 @@ def findings_history(
def export_findings(
run_id: str = typer.Argument(..., help="Run ID to export findings for"),
format: str = typer.Option(
"json", "--format", "-f",
help="Export format: json, csv, html, sarif"
"sarif", "--format", "-f",
help="Export format: sarif (standard), json, csv, html"
),
output: Optional[str] = typer.Option(
None, "--output", "-o",
help="Output file path (defaults to findings-<run-id>.<format>)"
help="Output file path (defaults to findings-<run-id>-<timestamp>.<format>)"
)
):
"""
📤 Export security findings in various formats
SARIF is the standard format for security findings and is recommended
for interoperability with other security tools. Filenames are automatically
made unique with timestamps to prevent overwriting previous exports.
"""
db = get_project_db()
if not db:
@@ -334,9 +470,10 @@ def export_findings(
else:
sarif_data = findings_data.sarif_data
# Generate output filename
# Generate output filename with timestamp for uniqueness
if not output:
output = f"findings-{run_id[:8]}.{format}"
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
output = f"findings-{run_id[:8]}-{timestamp}.{format}"
output_path = Path(output)