From f360bf424f2115b0e10e3a0aa205043924b7827e Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 14:52:15 +0100 Subject: [PATCH] refactor: Convert reporter to native format and update backend models - Renamed sarif_reporter.py to native_reporter.py to reflect new functionality - Updated WorkflowFindings model to use native format - Field name 'sarif' kept for API compatibility but now contains native format - Updated docstring to reflect native format usage - Converted SARIFReporter to Native Reporter: - Module name changed from sarif_reporter to native_reporter (v2.0.0) - Updated metadata and input/output schemas - Removed SARIF-specific config (tool_name, include_code_flows) - Added native format config (workflow_name, run_id) - Implemented native report generation: - Added _generate_native_report() method - Generates native FuzzForge format with full field support: - Unique finding IDs - found_by attribution (module, tool, type) - LLM context when applicable - Full severity scale (critical/high/medium/low/info) - Confidence levels - CWE and OWASP mappings - Enhanced location info (columns, snippets) - References and metadata - Added _create_native_summary() for aggregated stats - Summary includes counts by severity, confidence, category, source, and type - Tracks affected files count - Kept old SARIF generation methods for reference - Will be moved to separate SARIF exporter module Breaking changes: - Reporter now outputs native format instead of SARIF - Existing workflows using sarif_reporter will need updates - Config parameters changed (tool_name -> workflow_name, etc.) --- backend/src/models/findings.py | 4 +- .../{sarif_reporter.py => native_reporter.py} | 184 ++++++++++++++---- 2 files changed, 150 insertions(+), 38 deletions(-) rename backend/toolbox/modules/reporter/{sarif_reporter.py => native_reporter.py} (66%) diff --git a/backend/src/models/findings.py b/backend/src/models/findings.py index b71a9b6..0eaea6b 100644 --- a/backend/src/models/findings.py +++ b/backend/src/models/findings.py @@ -19,10 +19,10 @@ from datetime import datetime class WorkflowFindings(BaseModel): - """Findings from a workflow execution in SARIF format""" + """Findings from a workflow execution in native FuzzForge format""" workflow: str = Field(..., description="Workflow name") run_id: str = Field(..., description="Unique run identifier") - sarif: Dict[str, Any] = Field(..., description="SARIF formatted findings") + sarif: Dict[str, Any] = Field(..., description="Findings in native FuzzForge format (field name kept for API compatibility)") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") diff --git a/backend/toolbox/modules/reporter/sarif_reporter.py b/backend/toolbox/modules/reporter/native_reporter.py similarity index 66% rename from backend/toolbox/modules/reporter/sarif_reporter.py rename to backend/toolbox/modules/reporter/native_reporter.py index 2a8bec7..eb799f8 100644 --- a/backend/toolbox/modules/reporter/sarif_reporter.py +++ b/backend/toolbox/modules/reporter/native_reporter.py @@ -1,5 +1,6 @@ """ -SARIF Reporter Module - Generates SARIF-formatted security reports +Native Reporter Module - Generates native FuzzForge format security reports +(Previously SARIF Reporter - now generates native format, SARIF export available separately) """ # Copyright (c) 2025 FuzzingLabs @@ -31,50 +32,46 @@ logger = logging.getLogger(__name__) class SARIFReporter(BaseModule): """ - Generates SARIF (Static Analysis Results Interchange Format) reports. + Generates native FuzzForge format security reports. This module: - - Converts findings to SARIF format + - Converts findings to native FuzzForge format - Aggregates results from multiple modules - Adds metadata and context - Provides actionable recommendations + - (SARIF export available via separate exporter module) """ def get_metadata(self) -> ModuleMetadata: """Get module metadata""" return ModuleMetadata( - name="sarif_reporter", - version="1.0.0", - description="Generates SARIF-formatted security reports", + name="native_reporter", + version="2.0.0", + description="Generates native FuzzForge format security reports", author="FuzzForge Team", category="reporter", - tags=["reporting", "sarif", "output"], + tags=["reporting", "native", "output"], input_schema={ "findings": { "type": "array", "description": "List of findings to report", "required": True }, - "tool_name": { + "workflow_name": { "type": "string", - "description": "Name of the tool", + "description": "Name of the workflow", "default": "FuzzForge Security Assessment" }, - "tool_version": { + "run_id": { "type": "string", - "description": "Tool version", - "default": "1.0.0" - }, - "include_code_flows": { - "type": "boolean", - "description": "Include code flow information", - "default": False + "description": "Run identifier", + "required": True } }, output_schema={ - "sarif": { + "native": { "type": "object", - "description": "SARIF 2.1.0 formatted report" + "description": "Native FuzzForge findings format" } }, requires_workspace=False # Reporter doesn't need direct workspace access @@ -88,22 +85,21 @@ class SARIFReporter(BaseModule): async def execute(self, config: Dict[str, Any], workspace: Path = None) -> ModuleResult: """ - Execute the SARIF reporter module. + Execute the native reporter module. Args: config: Module configuration with findings workspace: Optional workspace path for context Returns: - ModuleResult with SARIF report + ModuleResult with native format report """ self.start_timer() self.validate_config(config) # Get configuration - tool_name = config.get("tool_name", "FuzzForge Security Assessment") - tool_version = config.get("tool_version", "1.0.0") - include_code_flows = config.get("include_code_flows", False) + workflow_name = config.get("workflow_name", "FuzzForge Security Assessment") + run_id = config.get("run_id", "unknown") # Collect findings from either direct findings or module results all_findings = [] @@ -123,16 +119,14 @@ class SARIFReporter(BaseModule): elif hasattr(module_result, "findings"): all_findings.extend(module_result.findings) - logger.info(f"Generating SARIF report for {len(all_findings)} findings") + logger.info(f"Generating native format report for {len(all_findings)} findings") try: - # Generate SARIF report - sarif_report = self._generate_sarif( + # Generate native format report + native_report = self._generate_native_report( findings=all_findings, - tool_name=tool_name, - tool_version=tool_version, - include_code_flows=include_code_flows, - workspace_path=str(workspace) if workspace else None + workflow_name=workflow_name, + run_id=run_id ) # Create summary @@ -146,23 +140,141 @@ class SARIFReporter(BaseModule): findings=[], # Reporter doesn't generate new findings summary=summary, metadata={ - "tool_name": tool_name, - "tool_version": tool_version, - "report_format": "SARIF 2.1.0", + "workflow_name": workflow_name, + "run_id": run_id, + "report_format": "Native FuzzForge 1.0.0", "total_findings": len(all_findings) }, error=None, - sarif=sarif_report # Add SARIF as custom field + sarif=native_report # Field name kept for API compatibility ) except Exception as e: - logger.error(f"SARIF reporter failed: {e}") + logger.error(f"Native reporter failed: {e}") return self.create_result( findings=[], status="failed", error=str(e) ) + def _generate_native_report( + self, + findings: List[ModuleFinding], + workflow_name: str, + run_id: str + ) -> Dict[str, Any]: + """ + Generate native FuzzForge format report. + + Args: + findings: List of findings to report + workflow_name: Name of the workflow + run_id: Run identifier + + Returns: + Native FuzzForge formatted dictionary + """ + # Convert ModuleFinding objects to native format dictionaries + findings_list = [] + for finding in findings: + finding_dict = { + "id": finding.id, + "rule_id": finding.rule_id, + "found_by": { + "module": finding.found_by.module, + "tool_name": finding.found_by.tool_name, + "tool_version": finding.found_by.tool_version, + "type": finding.found_by.type + }, + "title": finding.title, + "description": finding.description, + "severity": finding.severity, + "confidence": finding.confidence, + "category": finding.category, + "recommendation": finding.recommendation, + "references": finding.references + } + + # Add optional fields + if finding.cwe: + finding_dict["cwe"] = finding.cwe + if finding.owasp: + finding_dict["owasp"] = finding.owasp + if finding.llm_context: + finding_dict["llm_context"] = { + "model": finding.llm_context.model, + "prompt": finding.llm_context.prompt, + "temperature": finding.llm_context.temperature + } + + # Add location if available + if finding.file_path: + finding_dict["location"] = { + "file": finding.file_path, + "line_start": finding.line_start, + "line_end": finding.line_end, + "column_start": finding.column_start, + "column_end": finding.column_end, + "snippet": finding.code_snippet + } + + finding_dict["metadata"] = finding.metadata + findings_list.append(finding_dict) + + # Create summary + from datetime import datetime + summary = self._create_native_summary(findings) + + # Build native format structure + native_report = { + "version": "1.0.0", + "run_id": run_id, + "workflow": workflow_name, + "timestamp": datetime.utcnow().isoformat() + "Z", + "findings": findings_list, + "summary": summary + } + + return native_report + + def _create_native_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]: + """Create summary for native format""" + summary = { + "total_findings": len(findings), + "by_severity": {}, + "by_confidence": {}, + "by_category": {}, + "by_source": {}, + "by_type": {}, + "affected_files": 0 + } + + affected_files = set() + + for finding in findings: + # Count by severity + summary["by_severity"][finding.severity] = summary["by_severity"].get(finding.severity, 0) + 1 + + # Count by confidence + summary["by_confidence"][finding.confidence] = summary["by_confidence"].get(finding.confidence, 0) + 1 + + # Count by category + summary["by_category"][finding.category] = summary["by_category"].get(finding.category, 0) + 1 + + # Count by source (module) + summary["by_source"][finding.found_by.module] = summary["by_source"].get(finding.found_by.module, 0) + 1 + + # Count by type + summary["by_type"][finding.found_by.type] = summary["by_type"].get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + + summary["affected_files"] = len(affected_files) + return summary + + # Keep old SARIF methods for reference/future SARIF export module def _generate_sarif( self, findings: List[ModuleFinding],