refactor: Convert reporter to native format and update backend models

- Renamed sarif_reporter.py to native_reporter.py to reflect new functionality

- Updated WorkflowFindings model to use native format
  - Field name 'sarif' kept for API compatibility but now contains native format
  - Updated docstring to reflect native format usage

- Converted SARIFReporter to Native Reporter:
  - Module name changed from sarif_reporter to native_reporter (v2.0.0)
  - Updated metadata and input/output schemas
  - Removed SARIF-specific config (tool_name, include_code_flows)
  - Added native format config (workflow_name, run_id)

- Implemented native report generation:
  - Added _generate_native_report() method
  - Generates native FuzzForge format with full field support:
    - Unique finding IDs
    - found_by attribution (module, tool, type)
    - LLM context when applicable
    - Full severity scale (critical/high/medium/low/info)
    - Confidence levels
    - CWE and OWASP mappings
    - Enhanced location info (columns, snippets)
    - References and metadata

  - Added _create_native_summary() for aggregated stats
  - Summary includes counts by severity, confidence, category, source, and type
  - Tracks affected files count

- Kept old SARIF generation methods for reference
  - Will be moved to separate SARIF exporter module

Breaking changes:
- Reporter now outputs native format instead of SARIF
- Existing workflows using sarif_reporter will need updates
- Config parameters changed (tool_name -> workflow_name, etc.)
This commit is contained in:
tduhamel42
2025-11-02 14:52:15 +01:00
parent 4aa8187716
commit d327ff06ab
2 changed files with 150 additions and 38 deletions

View File

@@ -19,10 +19,10 @@ from datetime import datetime
class WorkflowFindings(BaseModel):
"""Findings from a workflow execution in SARIF format"""
"""Findings from a workflow execution in native FuzzForge format"""
workflow: str = Field(..., description="Workflow name")
run_id: str = Field(..., description="Unique run identifier")
sarif: Dict[str, Any] = Field(..., description="SARIF formatted findings")
sarif: Dict[str, Any] = Field(..., description="Findings in native FuzzForge format (field name kept for API compatibility)")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")

View File

@@ -1,5 +1,6 @@
"""
SARIF Reporter Module - Generates SARIF-formatted security reports
Native Reporter Module - Generates native FuzzForge format security reports
(Previously SARIF Reporter - now generates native format, SARIF export available separately)
"""
# Copyright (c) 2025 FuzzingLabs
@@ -31,50 +32,46 @@ logger = logging.getLogger(__name__)
class SARIFReporter(BaseModule):
"""
Generates SARIF (Static Analysis Results Interchange Format) reports.
Generates native FuzzForge format security reports.
This module:
- Converts findings to SARIF format
- Converts findings to native FuzzForge format
- Aggregates results from multiple modules
- Adds metadata and context
- Provides actionable recommendations
- (SARIF export available via separate exporter module)
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="sarif_reporter",
version="1.0.0",
description="Generates SARIF-formatted security reports",
name="native_reporter",
version="2.0.0",
description="Generates native FuzzForge format security reports",
author="FuzzForge Team",
category="reporter",
tags=["reporting", "sarif", "output"],
tags=["reporting", "native", "output"],
input_schema={
"findings": {
"type": "array",
"description": "List of findings to report",
"required": True
},
"tool_name": {
"workflow_name": {
"type": "string",
"description": "Name of the tool",
"description": "Name of the workflow",
"default": "FuzzForge Security Assessment"
},
"tool_version": {
"run_id": {
"type": "string",
"description": "Tool version",
"default": "1.0.0"
},
"include_code_flows": {
"type": "boolean",
"description": "Include code flow information",
"default": False
"description": "Run identifier",
"required": True
}
},
output_schema={
"sarif": {
"native": {
"type": "object",
"description": "SARIF 2.1.0 formatted report"
"description": "Native FuzzForge findings format"
}
},
requires_workspace=False # Reporter doesn't need direct workspace access
@@ -88,22 +85,21 @@ class SARIFReporter(BaseModule):
async def execute(self, config: Dict[str, Any], workspace: Path = None) -> ModuleResult:
"""
Execute the SARIF reporter module.
Execute the native reporter module.
Args:
config: Module configuration with findings
workspace: Optional workspace path for context
Returns:
ModuleResult with SARIF report
ModuleResult with native format report
"""
self.start_timer()
self.validate_config(config)
# Get configuration
tool_name = config.get("tool_name", "FuzzForge Security Assessment")
tool_version = config.get("tool_version", "1.0.0")
include_code_flows = config.get("include_code_flows", False)
workflow_name = config.get("workflow_name", "FuzzForge Security Assessment")
run_id = config.get("run_id", "unknown")
# Collect findings from either direct findings or module results
all_findings = []
@@ -123,16 +119,14 @@ class SARIFReporter(BaseModule):
elif hasattr(module_result, "findings"):
all_findings.extend(module_result.findings)
logger.info(f"Generating SARIF report for {len(all_findings)} findings")
logger.info(f"Generating native format report for {len(all_findings)} findings")
try:
# Generate SARIF report
sarif_report = self._generate_sarif(
# Generate native format report
native_report = self._generate_native_report(
findings=all_findings,
tool_name=tool_name,
tool_version=tool_version,
include_code_flows=include_code_flows,
workspace_path=str(workspace) if workspace else None
workflow_name=workflow_name,
run_id=run_id
)
# Create summary
@@ -146,23 +140,141 @@ class SARIFReporter(BaseModule):
findings=[], # Reporter doesn't generate new findings
summary=summary,
metadata={
"tool_name": tool_name,
"tool_version": tool_version,
"report_format": "SARIF 2.1.0",
"workflow_name": workflow_name,
"run_id": run_id,
"report_format": "Native FuzzForge 1.0.0",
"total_findings": len(all_findings)
},
error=None,
sarif=sarif_report # Add SARIF as custom field
sarif=native_report # Field name kept for API compatibility
)
except Exception as e:
logger.error(f"SARIF reporter failed: {e}")
logger.error(f"Native reporter failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _generate_native_report(
self,
findings: List[ModuleFinding],
workflow_name: str,
run_id: str
) -> Dict[str, Any]:
"""
Generate native FuzzForge format report.
Args:
findings: List of findings to report
workflow_name: Name of the workflow
run_id: Run identifier
Returns:
Native FuzzForge formatted dictionary
"""
# Convert ModuleFinding objects to native format dictionaries
findings_list = []
for finding in findings:
finding_dict = {
"id": finding.id,
"rule_id": finding.rule_id,
"found_by": {
"module": finding.found_by.module,
"tool_name": finding.found_by.tool_name,
"tool_version": finding.found_by.tool_version,
"type": finding.found_by.type
},
"title": finding.title,
"description": finding.description,
"severity": finding.severity,
"confidence": finding.confidence,
"category": finding.category,
"recommendation": finding.recommendation,
"references": finding.references
}
# Add optional fields
if finding.cwe:
finding_dict["cwe"] = finding.cwe
if finding.owasp:
finding_dict["owasp"] = finding.owasp
if finding.llm_context:
finding_dict["llm_context"] = {
"model": finding.llm_context.model,
"prompt": finding.llm_context.prompt,
"temperature": finding.llm_context.temperature
}
# Add location if available
if finding.file_path:
finding_dict["location"] = {
"file": finding.file_path,
"line_start": finding.line_start,
"line_end": finding.line_end,
"column_start": finding.column_start,
"column_end": finding.column_end,
"snippet": finding.code_snippet
}
finding_dict["metadata"] = finding.metadata
findings_list.append(finding_dict)
# Create summary
from datetime import datetime
summary = self._create_native_summary(findings)
# Build native format structure
native_report = {
"version": "1.0.0",
"run_id": run_id,
"workflow": workflow_name,
"timestamp": datetime.utcnow().isoformat() + "Z",
"findings": findings_list,
"summary": summary
}
return native_report
def _create_native_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
"""Create summary for native format"""
summary = {
"total_findings": len(findings),
"by_severity": {},
"by_confidence": {},
"by_category": {},
"by_source": {},
"by_type": {},
"affected_files": 0
}
affected_files = set()
for finding in findings:
# Count by severity
summary["by_severity"][finding.severity] = summary["by_severity"].get(finding.severity, 0) + 1
# Count by confidence
summary["by_confidence"][finding.confidence] = summary["by_confidence"].get(finding.confidence, 0) + 1
# Count by category
summary["by_category"][finding.category] = summary["by_category"].get(finding.category, 0) + 1
# Count by source (module)
summary["by_source"][finding.found_by.module] = summary["by_source"].get(finding.found_by.module, 0) + 1
# Count by type
summary["by_type"][finding.found_by.type] = summary["by_type"].get(finding.found_by.type, 0) + 1
# Track affected files
if finding.file_path:
affected_files.add(finding.file_path)
summary["affected_files"] = len(affected_files)
return summary
# Keep old SARIF methods for reference/future SARIF export module
def _generate_sarif(
self,
findings: List[ModuleFinding],