Files
fuzzforge_ai/backend/toolbox/modules/analyzer/bandit_analyzer.py
2025-10-22 15:36:35 +02:00

329 lines
11 KiB
Python

"""
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class BanditAnalyzer(BaseModule):
"""
Analyzes Python code for security issues using Bandit.
This module:
- Runs Bandit security linter on Python files
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
- Reports findings with severity levels
"""
# Severity mapping from Bandit levels to our standard
SEVERITY_MAP = {
"LOW": "low",
"MEDIUM": "medium",
"HIGH": "high"
}
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="bandit_analyzer",
version="1.0.0",
description="Analyzes Python code for security issues using Bandit",
author="FuzzForge Team",
category="analyzer",
tags=["python", "security", "bandit", "sast"],
input_schema={
"severity_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum severity level to report",
"default": "low"
},
"confidence_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum confidence level to report",
"default": "medium"
},
"exclude_tests": {
"type": "boolean",
"description": "Exclude test files from analysis",
"default": True
},
"skip_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of Bandit test IDs to skip",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of security issues found by Bandit"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
severity = config.get("severity_level", "low")
if severity not in ["low", "medium", "high"]:
raise ValueError("severity_level must be one of: low, medium, high")
confidence = config.get("confidence_level", "medium")
if confidence not in ["low", "medium", "high"]:
raise ValueError("confidence_level must be one of: low, medium, high")
skip_ids = config.get("skip_ids", [])
if not isinstance(skip_ids, list):
raise ValueError("skip_ids must be a list")
return True
async def _run_bandit(
self,
workspace: Path,
severity_level: str,
confidence_level: str,
exclude_tests: bool,
skip_ids: List[str]
) -> Dict[str, Any]:
"""
Run Bandit on the workspace.
Args:
workspace: Path to workspace
severity_level: Minimum severity to report
confidence_level: Minimum confidence to report
exclude_tests: Whether to exclude test files
skip_ids: List of test IDs to skip
Returns:
Bandit JSON output as dict
"""
try:
# Build bandit command
cmd = [
"bandit",
"-r", str(workspace),
"-f", "json",
"-ll", # Report all findings (we'll filter later)
]
# Add exclude patterns for test files
if exclude_tests:
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
# Add skip IDs if specified
if skip_ids:
cmd.extend(["-s", ",".join(skip_ids)])
logger.info(f"Running Bandit on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Bandit returns non-zero if issues found, which is expected
if process.returncode not in [0, 1]:
logger.error(f"Bandit failed: {stderr.decode()}")
return {"results": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running Bandit: {e}")
return {"results": []}
def _should_include_finding(
self,
issue: Dict[str, Any],
min_severity: str,
min_confidence: str
) -> bool:
"""
Determine if a Bandit issue should be included based on severity/confidence.
Args:
issue: Bandit issue dict
min_severity: Minimum severity threshold
min_confidence: Minimum confidence threshold
Returns:
True if issue should be included
"""
severity_order = ["low", "medium", "high"]
issue_severity = issue.get("issue_severity", "LOW").lower()
issue_confidence = issue.get("issue_confidence", "LOW").lower()
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
return severity_meets_threshold and confidence_meets_threshold
def _convert_to_findings(
self,
bandit_result: Dict[str, Any],
workspace: Path,
min_severity: str,
min_confidence: str
) -> List[ModuleFinding]:
"""
Convert Bandit results to ModuleFindings.
Args:
bandit_result: Bandit JSON output
workspace: Workspace path for relative paths
min_severity: Minimum severity to include
min_confidence: Minimum confidence to include
Returns:
List of ModuleFindings
"""
findings = []
for issue in bandit_result.get("results", []):
# Filter by severity and confidence
if not self._should_include_finding(issue, min_severity, min_confidence):
continue
# Extract issue details
test_id = issue.get("test_id", "B000")
test_name = issue.get("test_name", "unknown")
issue_text = issue.get("issue_text", "No description")
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
# File location
filename = issue.get("filename", "")
line_number = issue.get("line_number", 0)
code = issue.get("code", "")
# Try to get relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name
# Create finding
finding = self.create_finding(
title=f"{test_name} ({test_id})",
description=issue_text,
severity=severity,
category="security-issue",
file_path=str(rel_path),
line_start=line_number,
line_end=line_number,
code_snippet=code.strip() if code else None,
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
metadata={
"test_id": test_id,
"test_name": test_name,
"confidence": issue.get("issue_confidence", "LOW").lower(),
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
"more_info": issue.get("more_info", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Bandit analyzer module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with security findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
severity_level = config.get("severity_level", "low")
confidence_level = config.get("confidence_level", "medium")
exclude_tests = config.get("exclude_tests", True)
skip_ids = config.get("skip_ids", [])
# Run Bandit
logger.info("Starting Bandit analysis...")
bandit_result = await self._run_bandit(
workspace,
severity_level,
confidence_level,
exclude_tests,
skip_ids
)
# Convert to findings
findings = self._convert_to_findings(
bandit_result,
workspace,
severity_level,
confidence_level
)
# Calculate summary
severity_counts = {}
for finding in findings:
sev = finding.severity
severity_counts[sev] = severity_counts.get(sev, 0) + 1
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_issues": len(findings),
"by_severity": severity_counts,
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"bandit_version": bandit_result.get("generated_at", "unknown"),
"metrics": bandit_result.get("metrics", {})
}
)