diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 242995c..ea5a536 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -757,14 +757,14 @@ def export_findings( try: # Get findings from database first, fallback to API - findings_data = db.get_findings(run_id) - if not findings_data: + findings_record = db.get_findings(run_id) + if not findings_record: console.print(f"📡 Fetching findings from API for run: {run_id}") with get_client() as client: findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_data = findings.sarif else: - sarif_data = findings_data.sarif_data + findings_data = findings_record.findings_data # Generate output filename with timestamp for uniqueness if not output: @@ -776,19 +776,19 @@ def export_findings( # Export based on format if format == "sarif": with open(output_path, 'w') as f: - json.dump(sarif_data, f, indent=2) + json.dump(findings_data, f, indent=2) elif format == "json": # Simplified JSON format - simplified_data = extract_simplified_findings(sarif_data) + simplified_data = extract_simplified_findings(findings_data) with open(output_path, 'w') as f: json.dump(simplified_data, f, indent=2) elif format == "csv": - export_to_csv(sarif_data, output_path) + export_to_csv(findings_data, output_path) elif format == "html": - export_to_html(sarif_data, output_path, run_id) + export_to_html(findings_data, output_path, run_id) else: console.print(f"❌ Unsupported format: {format}", style="red") @@ -801,71 +801,81 @@ def export_findings( raise typer.Exit(1) -def extract_simplified_findings(sarif_data: Dict[str, Any]) -> Dict[str, Any]: - """Extract simplified findings structure from SARIF""" - runs = sarif_data.get("runs", []) - if not runs: - return {"findings": [], "summary": {}} +def extract_simplified_findings(findings_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract simplified findings structure from native format or SARIF""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Unknown") + summary = findings_data.get("summary", {}) - simplified = { - "tool": { - "name": tool.get("name", "Unknown"), - "version": tool.get("version", "Unknown") - }, - "summary": { - "total_issues": len(results), - "by_severity": {} - }, - "findings": [] - } + simplified = { + "tool": { + "name": workflow, + "version": findings_data.get("version", "1.0.0") + }, + "summary": summary if summary else { + "total_issues": len(findings_list), + "by_severity": {} + }, + "findings": [] + } - for result in results: - level = result.get("level", "note") - simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 + # Count by severity if not in summary + if not summary: + for finding in findings_list: + severity = finding.get("severity", "info") + simplified["summary"]["by_severity"][severity] = simplified["summary"]["by_severity"].get(severity, 0) + 1 - # Extract location - location_info = {} - locations = result.get("locations", []) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) + # Extract simplified findings + for finding in findings_list: + location = finding.get("location", {}) + simplified["findings"].append({ + "id": finding.get("id"), + "rule_id": finding.get("rule_id", "unknown"), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "description": finding.get("description", ""), + "category": finding.get("category", "other"), + "found_by": finding.get("found_by", {}), + "location": { + "file": location.get("file", ""), + "line": location.get("line_start"), + "column": location.get("column_start") + } + }) + else: + # SARIF format + runs = findings_data.get("runs", []) + if not runs: + return {"findings": [], "summary": {}} - location_info = { - "file": artifact_location.get("uri", ""), - "line": region.get("startLine"), - "column": region.get("startColumn") - } + run_data = runs[0] + results = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) - simplified["findings"].append({ - "rule_id": result.get("ruleId", "unknown"), - "severity": level, - "message": result.get("message", {}).get("text", ""), - "location": location_info - }) - - return simplified - - -def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): - """Export findings to CSV format""" - runs = sarif_data.get("runs", []) - if not runs: - return - - results = runs[0].get("results", []) - - with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() + simplified = { + "tool": { + "name": tool.get("name", "Unknown"), + "version": tool.get("version", "Unknown") + }, + "summary": { + "total_issues": len(results), + "by_severity": {} + }, + "findings": [] + } for result in results: - location_info = {"file": "", "line": "", "column": ""} + level = result.get("level", "note") + simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 + + # Extract location + location_info = {} locations = result.get("locations", []) if locations: physical_location = locations[0].get("physicalLocation", {}) @@ -874,109 +884,732 @@ def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): location_info = { "file": artifact_location.get("uri", ""), - "line": region.get("startLine", ""), - "column": region.get("startColumn", "") + "line": region.get("startLine"), + "column": region.get("startColumn") } - writer.writerow({ - "rule_id": result.get("ruleId", ""), - "severity": result.get("level", "note"), + simplified["findings"].append({ + "rule_id": result.get("ruleId", "unknown"), + "severity": level, "message": result.get("message", {}).get("text", ""), - **location_info + "location": location_info }) + return simplified -def export_to_html(sarif_data: Dict[str, Any], output_path: Path, run_id: str): - """Export findings to HTML format""" - runs = sarif_data.get("runs", []) - if not runs: - return - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) +def export_to_csv(findings_data: Dict[str, Any], output_path: Path): + """Export findings to CSV format (supports both native and SARIF)""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data - # Simple HTML template + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + if is_native: + # Native FuzzForge format - include more fields + fieldnames = ['id', 'rule_id', 'severity', 'confidence', 'title', 'category', 'module', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + findings_list = findings_data.get("findings", []) + for finding in findings_list: + location = finding.get("location", {}) + found_by = finding.get("found_by", {}) + + writer.writerow({ + "id": finding.get("id", "")[:8], + "rule_id": finding.get("rule_id", ""), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "category": finding.get("category", ""), + "module": found_by.get("module", ""), + "file": location.get("file", ""), + "line": location.get("line_start", ""), + "column": location.get("column_start", "") + }) + else: + # SARIF format + fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + runs = findings_data.get("runs", []) + if not runs: + return + + results = runs[0].get("results", []) + + for result in results: + location_info = {"file": "", "line": "", "column": ""} + locations = result.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + location_info = { + "file": artifact_location.get("uri", ""), + "line": region.get("startLine", ""), + "column": region.get("startColumn", "") + } + + writer.writerow({ + "rule_id": result.get("ruleId", ""), + "severity": result.get("level", "note"), + "message": result.get("message", {}).get("text", ""), + **location_info + }) + + +def export_to_html(findings_data: Dict[str, Any], output_path: Path, run_id: str): + """Export findings to modern, interactive HTML format with charts""" + import html + from datetime import datetime + + # Helper function to safely escape strings + def safe_escape(value): + """Safely escape a value, handling None and non-string types""" + if value is None: + return "" + return html.escape(str(value)) + + # Detect format (native or SARIF) + is_native = "findings" in findings_data and "version" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Security Assessment") + summary = findings_data.get("summary", {}) + total_findings = len(findings_list) + else: + # SARIF format (backward compatibility) + runs = findings_data.get("runs", []) + if not runs: + # Empty report + findings_list = [] + workflow = "Security Assessment" + summary = {} + total_findings = 0 + else: + run_data = runs[0] + findings_list = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) + workflow = tool.get("name", "Security Assessment") + total_findings = len(findings_list) + summary = {} + + # Calculate statistics + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} + confidence_counts = {"high": 0, "medium": 0, "low": 0} + category_counts = {} + source_counts = {} + type_counts = {} + + for finding in findings_list: + if is_native: + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "other") + found_by = finding.get("found_by", {}) + source = found_by.get("module", "unknown") + detection_type = found_by.get("type", "tool") + else: + # Map SARIF levels to severity + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + category = "other" + source = "unknown" + detection_type = "tool" + + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + confidence_counts[confidence] = confidence_counts.get(confidence, 0) + 1 + category_counts[category] = category_counts.get(category, 0) + 1 + source_counts[source] = source_counts.get(source, 0) + 1 + type_counts[detection_type] = type_counts.get(detection_type, 0) + 1 + + # Prepare chart data + severity_data = {k: v for k, v in severity_counts.items() if v > 0} + category_data = dict(sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + source_data = dict(sorted(source_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + type_data = {k: v for k, v in type_counts.items() if v > 0} + + # Generate findings rows + findings_rows = "" + for idx, finding in enumerate(findings_list): + if is_native: + finding_id = finding.get("id", "")[:8] if finding.get("id") else "" + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + title = safe_escape(finding.get("title") or "No title") + description = safe_escape(finding.get("description")) + rule_id = safe_escape(finding.get("rule_id") or "unknown") + category = safe_escape(finding.get("category") or "other") + + found_by = finding.get("found_by") or {} + module = safe_escape(found_by.get("module") or "unknown") + tool_name = safe_escape(found_by.get("tool_name") or "Unknown") + detection_type = found_by.get("type") or "tool" + + location = finding.get("location") or {} + file_path = safe_escape(location.get("file")) + line_start = location.get("line_start") + code_snippet = safe_escape(location.get("snippet")) + + cwe = safe_escape(finding.get("cwe")) + owasp = safe_escape(finding.get("owasp")) + recommendation = safe_escape(finding.get("recommendation")) + + llm_context = finding.get("llm_context") + if llm_context: + llm_model = safe_escape(llm_context.get("model")) + prompt_text = llm_context.get("prompt", "") + if prompt_text: + llm_prompt_preview = safe_escape(prompt_text[:100] + "..." if len(prompt_text) > 100 else prompt_text) + else: + llm_prompt_preview = "" + else: + llm_model = "" + llm_prompt_preview = "" + else: + # SARIF format + props = finding.get("properties") or {} + finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "" + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + rule_id = safe_escape(finding.get("ruleId") or "unknown") + message = finding.get("message") or {} + title = safe_escape(message.get("text") or "No message") + description = title + category = "other" + module = "unknown" + tool_name = "Unknown" + detection_type = "tool" + + locations = finding.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation") or {} + artifact_location = physical_location.get("artifactLocation") or {} + region = physical_location.get("region") or {} + file_path = safe_escape(artifact_location.get("uri")) + line_start = region.get("startLine") + snippet_obj = region.get("snippet") or {} + code_snippet = safe_escape(snippet_obj.get("text")) + else: + file_path = "" + line_start = None + code_snippet = "" + + cwe = "" + owasp = "" + recommendation = "" + llm_model = "" + llm_prompt_preview = "" + + location_str = file_path + if line_start: + location_str += f":{line_start}" + + severity_badge = { + "critical": 'CRITICAL', + "high": 'HIGH', + "medium": 'MEDIUM', + "low": 'LOW', + "info": 'INFO' + }.get(severity, 'INFO') + + confidence_badge = { + "high": 'High', + "medium": 'Medium', + "low": 'Low' + }.get(confidence, 'Medium') + + type_icon = { + "llm": "🤖", + "tool": "🔧", + "fuzzer": "🎯", + "manual": "👤" + }.get(detection_type, "🔧") + + # Build details HTML + details_html = f""" +
+ """ + + findings_rows += f""" +Run ID: {run_id}
-Tool: {tool.get('name', 'Unknown')} v{tool.get('version', 'Unknown')}
-Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
+ +{workflow}
+Run ID: {run_id} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
+Total Issues: {len(results)}
+Total Findings
+Critical + High
+Medium
+Low + Info
+| ID | +Severity | +Finding | +Source | +Location | +
|---|
+ {total_findings} of {total_findings} findings shown. + Click on a row to view details. +
+| Rule ID | -Severity | -Message | -Location | -
|---|