Files
fuzzforge_ai/backend/toolbox/modules/base.py
tduhamel42 268aae37ad feat: Add native findings format and fix critical ID bug
Priority 1 implementation:
- Created native FuzzForge findings format schema with full support for:
  - 5-level severity (critical/high/medium/low/info)
  - Confidence levels
  - CWE and OWASP categorization
  - found_by attribution (module, tool, type)
  - LLM context tracking (model, prompt, temperature)

- Updated ModuleFinding model with new fields:
  - Added rule_id for pattern identification
  - Added found_by for detection attribution
  - Added llm_context for LLM-detected findings
  - Added confidence, cwe, owasp, references
  - Added column_start/end for precise location
  - Updated create_finding() helper with new required fields
  - Enhanced _generate_summary() with confidence and source tracking

- Fixed critical ID bug in CLI:
  - Changed 'ff finding show' to use --id (unique) instead of --rule
  - Added new show_findings_by_rule() function to show ALL findings matching a rule
  - Updated display_finding_detail() to support both native and SARIF formats
  - Now properly handles multiple findings with same rule_id

Breaking changes:
- create_finding() now requires rule_id and found_by parameters
- show_finding() now uses --id instead of --rule flag
2025-11-14 10:51:38 +01:00

352 lines
12 KiB
Python

"""
Base module interface for all FuzzForge modules
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Dict, Any, List, Optional
from pydantic import BaseModel, Field
import logging
logger = logging.getLogger(__name__)
class ModuleMetadata(BaseModel):
"""Metadata describing a module's capabilities and requirements"""
name: str = Field(..., description="Module name")
version: str = Field(..., description="Module version")
description: str = Field(..., description="Module description")
author: Optional[str] = Field(None, description="Module author")
category: str = Field(..., description="Module category (scanner, analyzer, reporter, etc.)")
tags: List[str] = Field(default_factory=list, description="Module tags")
input_schema: Dict[str, Any] = Field(default_factory=dict, description="Expected input schema")
output_schema: Dict[str, Any] = Field(default_factory=dict, description="Output schema")
requires_workspace: bool = Field(True, description="Whether module requires workspace access")
class FoundBy(BaseModel):
"""Information about who/what found the vulnerability"""
module: str = Field(..., description="FuzzForge module that detected the finding")
tool_name: str = Field(..., description="Name of the underlying tool")
tool_version: str = Field(..., description="Version of the tool")
type: str = Field(..., description="Type of detection method (llm, tool, fuzzer, manual)")
class LLMContext(BaseModel):
"""Context information for LLM-detected findings"""
model: str = Field(..., description="LLM model used")
prompt: str = Field(..., description="Prompt or analysis instructions used")
temperature: Optional[float] = Field(None, description="Temperature parameter used for generation")
class ModuleFinding(BaseModel):
"""Individual finding from a module"""
id: str = Field(..., description="Unique finding ID (UUID)")
rule_id: str = Field(..., description="Rule/pattern identifier")
found_by: FoundBy = Field(..., description="Detection attribution")
llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context")
title: str = Field(..., description="Finding title")
description: str = Field(..., description="Detailed description")
severity: str = Field(..., description="Severity level (critical, high, medium, low, info)")
confidence: str = Field(default="medium", description="Confidence level (high, medium, low)")
category: str = Field(..., description="Finding category")
cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')")
owasp: Optional[str] = Field(None, description="OWASP category")
file_path: Optional[str] = Field(None, description="Affected file path relative to workspace")
line_start: Optional[int] = Field(None, description="Starting line number")
line_end: Optional[int] = Field(None, description="Ending line number")
column_start: Optional[int] = Field(None, description="Starting column number")
column_end: Optional[int] = Field(None, description="Ending column number")
code_snippet: Optional[str] = Field(None, description="Relevant code snippet")
recommendation: Optional[str] = Field(None, description="Remediation recommendation")
references: List[str] = Field(default_factory=list, description="External references")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
class ModuleResult(BaseModel):
"""Standard result format from module execution"""
module: str = Field(..., description="Module name")
version: str = Field(..., description="Module version")
status: str = Field(default="success", description="Execution status (success, partial, failed)")
execution_time: float = Field(..., description="Execution time in seconds")
findings: List[ModuleFinding] = Field(default_factory=list, description="List of findings")
summary: Dict[str, Any] = Field(default_factory=dict, description="Summary statistics")
metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
error: Optional[str] = Field(None, description="Error message if failed")
sarif: Optional[Dict[str, Any]] = Field(None, description="SARIF report if generated by reporter module")
class BaseModule(ABC):
"""
Base interface for all security testing modules.
All modules must inherit from this class and implement the required methods.
Modules are designed to be stateless and reusable across different workflows.
"""
def __init__(self):
"""Initialize the module"""
self._metadata = self.get_metadata()
self._start_time = None
logger.info(f"Initialized module: {self._metadata.name} v{self._metadata.version}")
@abstractmethod
def get_metadata(self) -> ModuleMetadata:
"""
Get module metadata.
Returns:
ModuleMetadata object describing the module
"""
pass
@abstractmethod
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the module with given configuration and workspace.
Args:
config: Module-specific configuration parameters
workspace: Path to the mounted workspace directory
Returns:
ModuleResult containing findings and metadata
"""
pass
@abstractmethod
def validate_config(self, config: Dict[str, Any]) -> bool:
"""
Validate the provided configuration against module requirements.
Args:
config: Configuration to validate
Returns:
True if configuration is valid, False otherwise
Raises:
ValueError: If configuration is invalid with details
"""
pass
def validate_workspace(self, workspace: Path) -> bool:
"""
Validate that the workspace exists and is accessible.
Args:
workspace: Path to the workspace
Returns:
True if workspace is valid
Raises:
ValueError: If workspace is invalid
"""
if not workspace.exists():
raise ValueError(f"Workspace does not exist: {workspace}")
if not workspace.is_dir():
raise ValueError(f"Workspace is not a directory: {workspace}")
return True
def create_finding(
self,
rule_id: str,
title: str,
description: str,
severity: str,
category: str,
found_by: FoundBy,
confidence: str = "medium",
llm_context: Optional[LLMContext] = None,
cwe: Optional[str] = None,
owasp: Optional[str] = None,
**kwargs
) -> ModuleFinding:
"""
Helper method to create a standardized finding.
Args:
rule_id: Rule/pattern identifier
title: Finding title
description: Detailed description
severity: Severity level (critical, high, medium, low, info)
category: Finding category
found_by: Detection attribution (FoundBy object)
confidence: Confidence level (high, medium, low)
llm_context: Optional LLM context information
cwe: Optional CWE identifier
owasp: Optional OWASP category
**kwargs: Additional finding fields
Returns:
ModuleFinding object
"""
import uuid
finding_id = str(uuid.uuid4())
return ModuleFinding(
id=finding_id,
rule_id=rule_id,
found_by=found_by,
llm_context=llm_context,
title=title,
description=description,
severity=severity,
confidence=confidence,
category=category,
cwe=cwe,
owasp=owasp,
**kwargs
)
def start_timer(self):
"""Start the execution timer"""
from time import time
self._start_time = time()
def get_execution_time(self) -> float:
"""Get the execution time in seconds"""
from time import time
if self._start_time is None:
return 0.0
return time() - self._start_time
def create_result(
self,
findings: List[ModuleFinding],
status: str = "success",
summary: Dict[str, Any] = None,
metadata: Dict[str, Any] = None,
error: str = None
) -> ModuleResult:
"""
Helper method to create a module result.
Args:
findings: List of findings
status: Execution status
summary: Summary statistics
metadata: Additional metadata
error: Error message if failed
Returns:
ModuleResult object
"""
return ModuleResult(
module=self._metadata.name,
version=self._metadata.version,
status=status,
execution_time=self.get_execution_time(),
findings=findings,
summary=summary or self._generate_summary(findings),
metadata=metadata or {},
error=error
)
def _generate_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
"""
Generate summary statistics from findings.
Args:
findings: List of findings
Returns:
Summary dictionary
"""
severity_counts = {
"critical": 0,
"high": 0,
"medium": 0,
"low": 0,
"info": 0
}
confidence_counts = {
"high": 0,
"medium": 0,
"low": 0
}
category_counts = {}
source_counts = {}
type_counts = {}
affected_files = set()
for finding in findings:
# Count by severity
if finding.severity in severity_counts:
severity_counts[finding.severity] += 1
# Count by confidence
if finding.confidence in confidence_counts:
confidence_counts[finding.confidence] += 1
# Count by category
if finding.category not in category_counts:
category_counts[finding.category] = 0
category_counts[finding.category] += 1
# Count by source (module)
module = finding.found_by.module
if module not in source_counts:
source_counts[module] = 0
source_counts[module] += 1
# Count by type
detection_type = finding.found_by.type
if detection_type not in type_counts:
type_counts[detection_type] = 0
type_counts[detection_type] += 1
# Track affected files
if finding.file_path:
affected_files.add(finding.file_path)
return {
"total_findings": len(findings),
"severity_counts": severity_counts,
"confidence_counts": confidence_counts,
"category_counts": category_counts,
"source_counts": source_counts,
"type_counts": type_counts,
"affected_files": len(affected_files),
"highest_severity": self._get_highest_severity(findings)
}
def _get_highest_severity(self, findings: List[ModuleFinding]) -> str:
"""
Get the highest severity from findings.
Args:
findings: List of findings
Returns:
Highest severity level
"""
severity_order = ["critical", "high", "medium", "low", "info"]
for severity in severity_order:
if any(f.severity == severity for f in findings):
return severity
return "none"