From 268aae37adf893b21b579b347c208182b519b3fa Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 14:28:32 +0100 Subject: [PATCH] feat: Add native findings format and fix critical ID bug Priority 1 implementation: - Created native FuzzForge findings format schema with full support for: - 5-level severity (critical/high/medium/low/info) - Confidence levels - CWE and OWASP categorization - found_by attribution (module, tool, type) - LLM context tracking (model, prompt, temperature) - Updated ModuleFinding model with new fields: - Added rule_id for pattern identification - Added found_by for detection attribution - Added llm_context for LLM-detected findings - Added confidence, cwe, owasp, references - Added column_start/end for precise location - Updated create_finding() helper with new required fields - Enhanced _generate_summary() with confidence and source tracking - Fixed critical ID bug in CLI: - Changed 'ff finding show' to use --id (unique) instead of --rule - Added new show_findings_by_rule() function to show ALL findings matching a rule - Updated display_finding_detail() to support both native and SARIF formats - Now properly handles multiple findings with same rule_id Breaking changes: - create_finding() now requires rule_id and found_by parameters - show_finding() now uses --id instead of --rule flag --- backend/src/models/finding_schema.py | 166 +++++++++++++ backend/toolbox/modules/base.py | 95 +++++++- cli/src/fuzzforge_cli/commands/findings.py | 259 ++++++++++++++++----- 3 files changed, 458 insertions(+), 62 deletions(-) create mode 100644 backend/src/models/finding_schema.py diff --git a/backend/src/models/finding_schema.py b/backend/src/models/finding_schema.py new file mode 100644 index 0000000..a58b829 --- /dev/null +++ b/backend/src/models/finding_schema.py @@ -0,0 +1,166 @@ +""" +FuzzForge Native Finding Format Schema + +This module defines the native finding format used internally by FuzzForge. +This format is more expressive than SARIF and optimized for security testing workflows. +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +from typing import Dict, Any, List, Optional, Literal +from pydantic import BaseModel, Field +from datetime import datetime + + +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding (e.g., 'semgrep_scanner', 'llm_analysis')") + tool_name: str = Field(..., description="Name of the underlying tool (e.g., 'Semgrep', 'Claude-3.5-Sonnet', 'MobSF')") + tool_version: str = Field(..., description="Version of the tool") + type: Literal["llm", "tool", "fuzzer", "manual"] = Field(..., description="Type of detection method") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used (e.g., 'claude-3-5-sonnet-20250129')") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + +class Location(BaseModel): + """Location information for a finding""" + file: str = Field(..., description="File path relative to workspace root") + line_start: Optional[int] = Field(None, description="Starting line number (1-indexed)") + line_end: Optional[int] = Field(None, description="Ending line number (1-indexed)") + column_start: Optional[int] = Field(None, description="Starting column number (1-indexed)") + column_end: Optional[int] = Field(None, description="Ending column number (1-indexed)") + snippet: Optional[str] = Field(None, description="Code snippet at the location") + + +class Finding(BaseModel): + """Individual security finding""" + id: str = Field(..., description="Unique finding identifier (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier (e.g., 'sql_injection', 'hardcoded_secret')") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context (only if found_by.type == 'llm')") + + title: str = Field(..., description="Short finding title") + description: str = Field(..., description="Detailed description of the finding") + + severity: Literal["critical", "high", "medium", "low", "info"] = Field(..., description="Severity level") + confidence: Literal["high", "medium", "low"] = Field(..., description="Confidence level in the finding") + + category: str = Field(..., description="Finding category (e.g., 'injection', 'authentication', 'cryptography')") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category (e.g., 'A03:2021-Injection')") + + location: Optional[Location] = Field(None, description="Location of the finding in source code") + + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references (URLs, documentation)") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + + +class FindingsSummary(BaseModel): + """Summary statistics for findings""" + total_findings: int = Field(..., description="Total number of findings") + by_severity: Dict[str, int] = Field(default_factory=dict, description="Count by severity level") + by_confidence: Dict[str, int] = Field(default_factory=dict, description="Count by confidence level") + by_category: Dict[str, int] = Field(default_factory=dict, description="Count by category") + by_source: Dict[str, int] = Field(default_factory=dict, description="Count by detection source (module name)") + by_type: Dict[str, int] = Field(default_factory=dict, description="Count by detection type (llm/tool/fuzzer)") + affected_files: int = Field(0, description="Number of unique files with findings") + + +class FuzzForgeFindingsReport(BaseModel): + """Native FuzzForge findings report format""" + version: str = Field(default="1.0.0", description="Format version") + run_id: str = Field(..., description="Workflow run identifier") + workflow: str = Field(..., description="Workflow name") + timestamp: datetime = Field(default_factory=datetime.utcnow, description="Report generation timestamp") + + findings: List[Finding] = Field(default_factory=list, description="List of security findings") + summary: FindingsSummary = Field(..., description="Summary statistics") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional report metadata") + + +# JSON Schema export for documentation +FINDING_SCHEMA_VERSION = "1.0.0" + +def get_json_schema() -> Dict[str, Any]: + """Get JSON schema for the FuzzForge findings format""" + return FuzzForgeFindingsReport.model_json_schema() + + +def validate_findings_report(data: Dict[str, Any]) -> FuzzForgeFindingsReport: + """ + Validate a findings report against the schema + + Args: + data: Dictionary containing findings report data + + Returns: + Validated FuzzForgeFindingsReport object + + Raises: + ValidationError: If data doesn't match schema + """ + return FuzzForgeFindingsReport(**data) + + +def create_summary(findings: List[Finding]) -> FindingsSummary: + """ + Generate summary statistics from a list of findings + + Args: + findings: List of Finding objects + + Returns: + FindingsSummary with aggregated statistics + """ + summary = FindingsSummary( + total_findings=len(findings), + by_severity={}, + by_confidence={}, + by_category={}, + by_source={}, + by_type={}, + affected_files=0 + ) + + affected_files = set() + + for finding in findings: + # Count by severity + summary.by_severity[finding.severity] = summary.by_severity.get(finding.severity, 0) + 1 + + # Count by confidence + summary.by_confidence[finding.confidence] = summary.by_confidence.get(finding.confidence, 0) + 1 + + # Count by category + summary.by_category[finding.category] = summary.by_category.get(finding.category, 0) + 1 + + # Count by source (module) + summary.by_source[finding.found_by.module] = summary.by_source.get(finding.found_by.module, 0) + 1 + + # Count by type + summary.by_type[finding.found_by.type] = summary.by_type.get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.location and finding.location.file: + affected_files.add(finding.location.file) + + summary.affected_files = len(affected_files) + + return summary diff --git a/backend/toolbox/modules/base.py b/backend/toolbox/modules/base.py index dcef98d..2990ede 100644 --- a/backend/toolbox/modules/base.py +++ b/backend/toolbox/modules/base.py @@ -35,18 +35,48 @@ class ModuleMetadata(BaseModel): requires_workspace: bool = Field(True, description="Whether module requires workspace access") +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding") + tool_name: str = Field(..., description="Name of the underlying tool") + tool_version: str = Field(..., description="Version of the tool") + type: str = Field(..., description="Type of detection method (llm, tool, fuzzer, manual)") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + class ModuleFinding(BaseModel): """Individual finding from a module""" - id: str = Field(..., description="Unique finding ID") + id: str = Field(..., description="Unique finding ID (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context") + title: str = Field(..., description="Finding title") description: str = Field(..., description="Detailed description") - severity: str = Field(..., description="Severity level (info, low, medium, high, critical)") + + severity: str = Field(..., description="Severity level (critical, high, medium, low, info)") + confidence: str = Field(default="medium", description="Confidence level (high, medium, low)") + category: str = Field(..., description="Finding category") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category") + file_path: Optional[str] = Field(None, description="Affected file path relative to workspace") line_start: Optional[int] = Field(None, description="Starting line number") line_end: Optional[int] = Field(None, description="Ending line number") + column_start: Optional[int] = Field(None, description="Starting column number") + column_end: Optional[int] = Field(None, description="Ending column number") code_snippet: Optional[str] = Field(None, description="Relevant code snippet") + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") @@ -140,20 +170,32 @@ class BaseModule(ABC): def create_finding( self, + rule_id: str, title: str, description: str, severity: str, category: str, + found_by: FoundBy, + confidence: str = "medium", + llm_context: Optional[LLMContext] = None, + cwe: Optional[str] = None, + owasp: Optional[str] = None, **kwargs ) -> ModuleFinding: """ Helper method to create a standardized finding. Args: + rule_id: Rule/pattern identifier title: Finding title description: Detailed description - severity: Severity level + severity: Severity level (critical, high, medium, low, info) category: Finding category + found_by: Detection attribution (FoundBy object) + confidence: Confidence level (high, medium, low) + llm_context: Optional LLM context information + cwe: Optional CWE identifier + owasp: Optional OWASP category **kwargs: Additional finding fields Returns: @@ -164,10 +206,16 @@ class BaseModule(ABC): return ModuleFinding( id=finding_id, + rule_id=rule_id, + found_by=found_by, + llm_context=llm_context, title=title, description=description, severity=severity, + confidence=confidence, category=category, + cwe=cwe, + owasp=owasp, **kwargs ) @@ -226,29 +274,62 @@ class BaseModule(ABC): Summary dictionary """ severity_counts = { - "info": 0, - "low": 0, - "medium": 0, + "critical": 0, "high": 0, - "critical": 0 + "medium": 0, + "low": 0, + "info": 0 + } + + confidence_counts = { + "high": 0, + "medium": 0, + "low": 0 } category_counts = {} + source_counts = {} + type_counts = {} + affected_files = set() for finding in findings: # Count by severity if finding.severity in severity_counts: severity_counts[finding.severity] += 1 + # Count by confidence + if finding.confidence in confidence_counts: + confidence_counts[finding.confidence] += 1 + # Count by category if finding.category not in category_counts: category_counts[finding.category] = 0 category_counts[finding.category] += 1 + # Count by source (module) + module = finding.found_by.module + if module not in source_counts: + source_counts[module] = 0 + source_counts[module] += 1 + + # Count by type + detection_type = finding.found_by.type + if detection_type not in type_counts: + type_counts[detection_type] = 0 + type_counts[detection_type] += 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + return { "total_findings": len(findings), "severity_counts": severity_counts, + "confidence_counts": confidence_counts, "category_counts": category_counts, + "source_counts": source_counts, + "type_counts": type_counts, + "affected_files": len(affected_files), "highest_severity": self._get_highest_severity(findings) } diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 7058527..b556415 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -152,12 +152,13 @@ def get_findings( def show_finding( run_id: str = typer.Argument(..., help="Run ID to get finding from"), - rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show") + finding_id: str = typer.Option(..., "--id", "-i", help="Unique ID of the specific finding to show") ): """ šŸ” Show detailed information about a specific finding This function is registered as a command in main.py under the finding (singular) command group. + Use the unique finding ID (shown in the findings table) to view details. """ try: require_project() @@ -173,91 +174,239 @@ def show_finding( with get_client() as client: console.print(f"šŸ” Fetching findings for run: {run_id}") findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_dict = findings.sarif # Will become native format else: - sarif_data = findings_data.sarif_data + findings_dict = findings_data.sarif_data # Will become findings_data - # Find the specific finding by rule_id - runs = sarif_data.get("runs", []) - if not runs: - console.print("āŒ No findings data available", style="red") - raise typer.Exit(1) - - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) - - # Search for matching finding + # Find the specific finding by unique ID + # For now, support both SARIF (old) and native format (new) matching_finding = None - for result in results: - if result.get("ruleId") == rule_id: - matching_finding = result - break + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("id") == finding_id or finding.get("id", "").startswith(finding_id): + matching_finding = finding + break + # Fallback to SARIF format (for backward compatibility during transition) + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + # Check if finding ID is in properties + props = result.get("properties", {}) + fid = props.get("findingId", "") + if fid == finding_id or fid.startswith(finding_id): + matching_finding = result + break if not matching_finding: - console.print(f"āŒ No finding found with rule ID: {rule_id}", style="red") + console.print(f"āŒ No finding found with ID: {finding_id}", style="red") console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") raise typer.Exit(1) # Display detailed finding - display_finding_detail(matching_finding, tool, run_id) + display_finding_detail(matching_finding, run_id) except Exception as e: console.print(f"āŒ Failed to get finding: {e}", style="red") raise typer.Exit(1) -def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id: str): - """Display detailed information about a single finding""" - rule_id = finding.get("ruleId", "unknown") - level = finding.get("level", "note") - message = finding.get("message", {}) - message_text = message.get("text", "No summary available") - message_markdown = message.get("markdown", message_text) +def show_findings_by_rule( + run_id: str = typer.Argument(..., help="Run ID to get findings from"), + rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID to filter findings") +): + """ + šŸ” Show all findings matching a specific rule - # Get location - locations = finding.get("locations", []) - location_str = "Unknown location" - code_snippet = None + This command shows ALL findings that match the given rule ID. + Useful when you have multiple instances of the same vulnerability type. + """ + try: + require_project() + validate_run_id(run_id) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) + # Try to get from database first, fallback to API + db = get_project_db() + findings_data = None + if db: + findings_data = db.get_findings(run_id) - file_path = artifact_location.get("uri", "") - if file_path: - location_str = file_path - if region.get("startLine"): - location_str += f":{region['startLine']}" - if region.get("startColumn"): - location_str += f":{region['startColumn']}" + if not findings_data: + with get_client() as client: + console.print(f"šŸ” Fetching findings for run: {run_id}") + findings = client.get_run_findings(run_id) + findings_dict = findings.sarif + else: + findings_dict = findings_data.sarif_data - # Get code snippet if available - if region.get("snippet", {}).get("text"): - code_snippet = region["snippet"]["text"].strip() + # Find all findings matching the rule + matching_findings = [] + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("rule_id") == rule_id: + matching_findings.append(finding) + # Fallback to SARIF format + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + if result.get("ruleId") == rule_id: + matching_findings.append(result) + + if not matching_findings: + console.print(f"āŒ No findings found with rule ID: {rule_id}", style="red") + console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") + raise typer.Exit(1) + + console.print(f"\nšŸ” Found {len(matching_findings)} finding(s) matching rule: [bold cyan]{rule_id}[/bold cyan]\n") + + # Display each finding + for i, finding in enumerate(matching_findings, 1): + console.print(f"[bold]Finding {i} of {len(matching_findings)}[/bold]") + display_finding_detail(finding, run_id) + if i < len(matching_findings): + console.print("\n" + "─" * 80 + "\n") + + except Exception as e: + console.print(f"āŒ Failed to get findings: {e}", style="red") + raise typer.Exit(1) + + +def display_finding_detail(finding: Dict[str, Any], run_id: str): + """Display detailed information about a single finding (supports both native and SARIF format)""" + + # Detect format and extract fields + is_native = "rule_id" in finding # Native format has rule_id, SARIF has ruleId + + if is_native: + # Native FuzzForge format + finding_id = finding.get("id", "unknown") + rule_id = finding.get("rule_id", "unknown") + title = finding.get("title", "No title") + description = finding.get("description", "No description") + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "unknown") + cwe = finding.get("cwe") + owasp = finding.get("owasp") + recommendation = finding.get("recommendation") + + # Found by information + found_by = finding.get("found_by", {}) + module = found_by.get("module", "unknown") + tool_name = found_by.get("tool_name", "Unknown") + tool_version = found_by.get("tool_version", "unknown") + detection_type = found_by.get("type", "unknown") + + # LLM context if available + llm_context = finding.get("llm_context") + + # Location + location = finding.get("location", {}) + file_path = location.get("file", "") + line_start = location.get("line_start") + column_start = location.get("column_start") + code_snippet = location.get("snippet") + + location_str = file_path if file_path else "Unknown location" + if line_start: + location_str += f":{line_start}" + if column_start: + location_str += f":{column_start}" + + else: + # SARIF format (backward compatibility) + props = finding.get("properties", {}) + finding_id = props.get("findingId", "unknown") + rule_id = finding.get("ruleId", "unknown") + title = props.get("title", "No title") + severity = finding.get("level", "note") + confidence = "medium" # Not available in SARIF + category = "unknown" + cwe = None + owasp = None + + message = finding.get("message", {}) + description = message.get("text", "No description") + recommendation = None + + module = "unknown" + tool_name = "Unknown" + tool_version = "unknown" + detection_type = "tool" + llm_context = None + + # Location from SARIF + locations = finding.get("locations", []) + location_str = "Unknown location" + code_snippet = None + + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + file_path = artifact_location.get("uri", "") + if file_path: + location_str = file_path + if region.get("startLine"): + location_str += f":{region['startLine']}" + if region.get("startColumn"): + location_str += f":{region['startColumn']}" + + if region.get("snippet", {}).get("text"): + code_snippet = region["snippet"]["text"].strip() # Get severity style severity_color = { + "critical": "red", + "high": "red", + "medium": "yellow", + "low": "blue", + "info": "cyan", + # SARIF levels "error": "red", "warning": "yellow", - "note": "blue", - "info": "cyan" - }.get(level.lower(), "white") + "note": "blue" + }.get(severity.lower(), "white") # Build detailed content content_lines = [] + content_lines.append(f"[bold]Finding ID:[/bold] {finding_id}") content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}") - content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{level.upper()}[/{severity_color}]") + content_lines.append(f"[bold]Title:[/bold] {title}") + content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{severity.upper()}[/{severity_color}] (Confidence: {confidence})") + + if cwe: + content_lines.append(f"[bold]CWE:[/bold] {cwe}") + if owasp: + content_lines.append(f"[bold]OWASP:[/bold] {owasp}") + + content_lines.append(f"[bold]Category:[/bold] {category}") content_lines.append(f"[bold]Location:[/bold] {location_str}") - content_lines.append(f"[bold]Tool:[/bold] {tool.get('name', 'Unknown')} v{tool.get('version', 'unknown')}") + content_lines.append(f"[bold]Found by:[/bold] {tool_name} v{tool_version} ({module}) [{detection_type}]") + + if llm_context: + model = llm_context.get("model", "unknown") + content_lines.append(f"[bold]LLM Model:[/bold] {model}") + content_lines.append(f"[bold]Run ID:[/bold] {run_id}") content_lines.append("") - content_lines.append("[bold]Summary:[/bold]") - content_lines.append(message_text) - content_lines.append("") content_lines.append("[bold]Description:[/bold]") - content_lines.append(message_markdown) + content_lines.append(description) + + if recommendation: + content_lines.append("") + content_lines.append("[bold]Recommendation:[/bold]") + content_lines.append(recommendation) if code_snippet: content_lines.append("") @@ -276,7 +425,7 @@ def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id padding=(1, 2) )) console.print() - console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]") + console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") def display_findings_table(sarif_data: Dict[str, Any]):