mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 19:12:44 +00:00
Implements Issue #5 - Python SAST workflow that combines: - Dependency scanning (pip-audit) for CVE detection - Security linting (Bandit) for vulnerability patterns - Type checking (Mypy) for type safety issues ## Changes **New Modules:** - `DependencyScanner`: Scans Python dependencies for known CVEs using pip-audit - `BanditAnalyzer`: Analyzes Python code for security issues using Bandit - `MypyAnalyzer`: Checks Python code for type safety issues using Mypy **New Workflow:** - `python_sast`: Temporal workflow that orchestrates all three SAST tools - Runs tools in parallel for fast feedback (3-5 min vs hours for fuzzing) - Generates unified SARIF report with findings from all tools - Supports configurable severity/confidence thresholds **Updates:** - Added SAST dependencies to Python worker (bandit, pip-audit, mypy) - Updated module __init__.py files to export new analyzers - Added type_errors.py test file to vulnerable_app for Mypy validation ## Testing Workflow tested successfully on vulnerable_app: - ✅ Bandit: Detected 9 security issues (command injection, unsafe functions) - ✅ Mypy: Detected 5 type errors - ✅ DependencyScanner: Ran successfully (no CVEs in test dependencies) - ✅ SARIF export: Generated valid SARIF with 14 total findings
270 lines
8.6 KiB
Python
270 lines
8.6 KiB
Python
"""
|
|
Mypy Analyzer Module - Analyzes Python code for type safety issues using Mypy
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import asyncio
|
|
import logging
|
|
import re
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MypyAnalyzer(BaseModule):
|
|
"""
|
|
Analyzes Python code for type safety issues using Mypy.
|
|
|
|
This module:
|
|
- Runs Mypy type checker on Python files
|
|
- Detects type errors and inconsistencies
|
|
- Reports findings with configurable strictness
|
|
"""
|
|
|
|
# Map Mypy error codes to severity
|
|
ERROR_SEVERITY_MAP = {
|
|
"error": "medium",
|
|
"note": "info"
|
|
}
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="mypy_analyzer",
|
|
version="1.0.0",
|
|
description="Analyzes Python code for type safety issues using Mypy",
|
|
author="FuzzForge Team",
|
|
category="analyzer",
|
|
tags=["python", "type-checking", "mypy", "sast"],
|
|
input_schema={
|
|
"strict_mode": {
|
|
"type": "boolean",
|
|
"description": "Enable strict type checking",
|
|
"default": False
|
|
},
|
|
"ignore_missing_imports": {
|
|
"type": "boolean",
|
|
"description": "Ignore errors about missing imports",
|
|
"default": True
|
|
},
|
|
"follow_imports": {
|
|
"type": "string",
|
|
"enum": ["normal", "silent", "skip", "error"],
|
|
"description": "How to handle imports",
|
|
"default": "silent"
|
|
}
|
|
},
|
|
output_schema={
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "List of type errors found by Mypy"
|
|
}
|
|
},
|
|
requires_workspace=True
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate module configuration"""
|
|
follow_imports = config.get("follow_imports", "silent")
|
|
if follow_imports not in ["normal", "silent", "skip", "error"]:
|
|
raise ValueError("follow_imports must be one of: normal, silent, skip, error")
|
|
|
|
return True
|
|
|
|
async def _run_mypy(
|
|
self,
|
|
workspace: Path,
|
|
strict_mode: bool,
|
|
ignore_missing_imports: bool,
|
|
follow_imports: str
|
|
) -> str:
|
|
"""
|
|
Run Mypy on the workspace.
|
|
|
|
Args:
|
|
workspace: Path to workspace
|
|
strict_mode: Enable strict checking
|
|
ignore_missing_imports: Ignore missing import errors
|
|
follow_imports: How to handle imports
|
|
|
|
Returns:
|
|
Mypy output as string
|
|
"""
|
|
try:
|
|
# Build mypy command
|
|
cmd = [
|
|
"mypy",
|
|
str(workspace),
|
|
"--show-column-numbers",
|
|
"--no-error-summary",
|
|
f"--follow-imports={follow_imports}"
|
|
]
|
|
|
|
if strict_mode:
|
|
cmd.append("--strict")
|
|
|
|
if ignore_missing_imports:
|
|
cmd.append("--ignore-missing-imports")
|
|
|
|
logger.info(f"Running Mypy on: {workspace}")
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
# Mypy returns non-zero if errors found, which is expected
|
|
output = stdout.decode()
|
|
return output
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error running Mypy: {e}")
|
|
return ""
|
|
|
|
def _parse_mypy_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
|
|
"""
|
|
Parse Mypy output and convert to findings.
|
|
|
|
Mypy output format:
|
|
file.py:10:5: error: Incompatible return value type [return-value]
|
|
file.py:15: note: See https://...
|
|
|
|
Args:
|
|
output: Mypy stdout
|
|
workspace: Workspace path for relative paths
|
|
|
|
Returns:
|
|
List of ModuleFindings
|
|
"""
|
|
findings = []
|
|
|
|
# Regex to parse mypy output lines
|
|
# Format: filename:line:column: level: message [error-code]
|
|
pattern = r'^(.+?):(\d+)(?::(\d+))?: (error|note): (.+?)(?:\s+\[([^\]]+)\])?$'
|
|
|
|
for line in output.splitlines():
|
|
match = re.match(pattern, line.strip())
|
|
if not match:
|
|
continue
|
|
|
|
filename, line_num, column, level, message, error_code = match.groups()
|
|
|
|
# Convert to relative path
|
|
try:
|
|
file_path = Path(filename)
|
|
rel_path = file_path.relative_to(workspace)
|
|
except (ValueError, TypeError):
|
|
rel_path = Path(filename).name
|
|
|
|
# Skip if it's just a note (unless it's a standalone note)
|
|
if level == "note" and not error_code:
|
|
continue
|
|
|
|
# Map severity
|
|
severity = self.ERROR_SEVERITY_MAP.get(level, "medium")
|
|
|
|
# Create finding
|
|
title = f"Type error: {error_code or 'type-issue'}"
|
|
description = message
|
|
|
|
finding = self.create_finding(
|
|
title=title,
|
|
description=description,
|
|
severity=severity,
|
|
category="type-error",
|
|
file_path=str(rel_path),
|
|
line_start=int(line_num),
|
|
line_end=int(line_num),
|
|
recommendation="Review and fix the type inconsistency or add appropriate type annotations",
|
|
metadata={
|
|
"error_code": error_code or "unknown",
|
|
"column": int(column) if column else None,
|
|
"level": level
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
return findings
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute the Mypy analyzer module.
|
|
|
|
Args:
|
|
config: Module configuration
|
|
workspace: Path to workspace
|
|
|
|
Returns:
|
|
ModuleResult with type checking findings
|
|
"""
|
|
start_time = time.time()
|
|
metadata = self.get_metadata()
|
|
|
|
# Validate inputs
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
# Get configuration
|
|
strict_mode = config.get("strict_mode", False)
|
|
ignore_missing_imports = config.get("ignore_missing_imports", True)
|
|
follow_imports = config.get("follow_imports", "silent")
|
|
|
|
# Run Mypy
|
|
logger.info("Starting Mypy analysis...")
|
|
mypy_output = await self._run_mypy(
|
|
workspace,
|
|
strict_mode,
|
|
ignore_missing_imports,
|
|
follow_imports
|
|
)
|
|
|
|
# Parse output to findings
|
|
findings = self._parse_mypy_output(mypy_output, workspace)
|
|
|
|
# Calculate summary
|
|
error_code_counts = {}
|
|
for finding in findings:
|
|
code = finding.metadata.get("error_code", "unknown")
|
|
error_code_counts[code] = error_code_counts.get(code, 0) + 1
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
return ModuleResult(
|
|
module=metadata.name,
|
|
version=metadata.version,
|
|
status="success",
|
|
execution_time=execution_time,
|
|
findings=findings,
|
|
summary={
|
|
"total_errors": len(findings),
|
|
"by_error_code": error_code_counts,
|
|
"files_with_errors": len(set(f.file_path for f in findings if f.file_path))
|
|
},
|
|
metadata={
|
|
"strict_mode": strict_mode,
|
|
"ignore_missing_imports": ignore_missing_imports
|
|
}
|
|
)
|