mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 13:52:54 +00:00
329 lines
11 KiB
Python
329 lines
11 KiB
Python
"""
|
|
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class BanditAnalyzer(BaseModule):
|
|
"""
|
|
Analyzes Python code for security issues using Bandit.
|
|
|
|
This module:
|
|
- Runs Bandit security linter on Python files
|
|
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
|
|
- Reports findings with severity levels
|
|
"""
|
|
|
|
# Severity mapping from Bandit levels to our standard
|
|
SEVERITY_MAP = {
|
|
"LOW": "low",
|
|
"MEDIUM": "medium",
|
|
"HIGH": "high"
|
|
}
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="bandit_analyzer",
|
|
version="1.0.0",
|
|
description="Analyzes Python code for security issues using Bandit",
|
|
author="FuzzForge Team",
|
|
category="analyzer",
|
|
tags=["python", "security", "bandit", "sast"],
|
|
input_schema={
|
|
"severity_level": {
|
|
"type": "string",
|
|
"enum": ["low", "medium", "high"],
|
|
"description": "Minimum severity level to report",
|
|
"default": "low"
|
|
},
|
|
"confidence_level": {
|
|
"type": "string",
|
|
"enum": ["low", "medium", "high"],
|
|
"description": "Minimum confidence level to report",
|
|
"default": "medium"
|
|
},
|
|
"exclude_tests": {
|
|
"type": "boolean",
|
|
"description": "Exclude test files from analysis",
|
|
"default": True
|
|
},
|
|
"skip_ids": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "List of Bandit test IDs to skip",
|
|
"default": []
|
|
}
|
|
},
|
|
output_schema={
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "List of security issues found by Bandit"
|
|
}
|
|
},
|
|
requires_workspace=True
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate module configuration"""
|
|
severity = config.get("severity_level", "low")
|
|
if severity not in ["low", "medium", "high"]:
|
|
raise ValueError("severity_level must be one of: low, medium, high")
|
|
|
|
confidence = config.get("confidence_level", "medium")
|
|
if confidence not in ["low", "medium", "high"]:
|
|
raise ValueError("confidence_level must be one of: low, medium, high")
|
|
|
|
skip_ids = config.get("skip_ids", [])
|
|
if not isinstance(skip_ids, list):
|
|
raise ValueError("skip_ids must be a list")
|
|
|
|
return True
|
|
|
|
async def _run_bandit(
|
|
self,
|
|
workspace: Path,
|
|
severity_level: str,
|
|
confidence_level: str,
|
|
exclude_tests: bool,
|
|
skip_ids: List[str]
|
|
) -> Dict[str, Any]:
|
|
"""
|
|
Run Bandit on the workspace.
|
|
|
|
Args:
|
|
workspace: Path to workspace
|
|
severity_level: Minimum severity to report
|
|
confidence_level: Minimum confidence to report
|
|
exclude_tests: Whether to exclude test files
|
|
skip_ids: List of test IDs to skip
|
|
|
|
Returns:
|
|
Bandit JSON output as dict
|
|
"""
|
|
try:
|
|
# Build bandit command
|
|
cmd = [
|
|
"bandit",
|
|
"-r", str(workspace),
|
|
"-f", "json",
|
|
"-ll", # Report all findings (we'll filter later)
|
|
]
|
|
|
|
# Add exclude patterns for test files
|
|
if exclude_tests:
|
|
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
|
|
|
|
# Add skip IDs if specified
|
|
if skip_ids:
|
|
cmd.extend(["-s", ",".join(skip_ids)])
|
|
|
|
logger.info(f"Running Bandit on: {workspace}")
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
# Bandit returns non-zero if issues found, which is expected
|
|
if process.returncode not in [0, 1]:
|
|
logger.error(f"Bandit failed: {stderr.decode()}")
|
|
return {"results": []}
|
|
|
|
# Parse JSON output
|
|
result = json.loads(stdout.decode())
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error running Bandit: {e}")
|
|
return {"results": []}
|
|
|
|
def _should_include_finding(
|
|
self,
|
|
issue: Dict[str, Any],
|
|
min_severity: str,
|
|
min_confidence: str
|
|
) -> bool:
|
|
"""
|
|
Determine if a Bandit issue should be included based on severity/confidence.
|
|
|
|
Args:
|
|
issue: Bandit issue dict
|
|
min_severity: Minimum severity threshold
|
|
min_confidence: Minimum confidence threshold
|
|
|
|
Returns:
|
|
True if issue should be included
|
|
"""
|
|
severity_order = ["low", "medium", "high"]
|
|
issue_severity = issue.get("issue_severity", "LOW").lower()
|
|
issue_confidence = issue.get("issue_confidence", "LOW").lower()
|
|
|
|
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
|
|
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
|
|
|
|
return severity_meets_threshold and confidence_meets_threshold
|
|
|
|
def _convert_to_findings(
|
|
self,
|
|
bandit_result: Dict[str, Any],
|
|
workspace: Path,
|
|
min_severity: str,
|
|
min_confidence: str
|
|
) -> List[ModuleFinding]:
|
|
"""
|
|
Convert Bandit results to ModuleFindings.
|
|
|
|
Args:
|
|
bandit_result: Bandit JSON output
|
|
workspace: Workspace path for relative paths
|
|
min_severity: Minimum severity to include
|
|
min_confidence: Minimum confidence to include
|
|
|
|
Returns:
|
|
List of ModuleFindings
|
|
"""
|
|
findings = []
|
|
|
|
for issue in bandit_result.get("results", []):
|
|
# Filter by severity and confidence
|
|
if not self._should_include_finding(issue, min_severity, min_confidence):
|
|
continue
|
|
|
|
# Extract issue details
|
|
test_id = issue.get("test_id", "B000")
|
|
test_name = issue.get("test_name", "unknown")
|
|
issue_text = issue.get("issue_text", "No description")
|
|
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
|
|
|
|
# File location
|
|
filename = issue.get("filename", "")
|
|
line_number = issue.get("line_number", 0)
|
|
code = issue.get("code", "")
|
|
|
|
# Try to get relative path
|
|
try:
|
|
file_path = Path(filename)
|
|
rel_path = file_path.relative_to(workspace)
|
|
except (ValueError, TypeError):
|
|
rel_path = Path(filename).name
|
|
|
|
# Create finding
|
|
finding = self.create_finding(
|
|
title=f"{test_name} ({test_id})",
|
|
description=issue_text,
|
|
severity=severity,
|
|
category="security-issue",
|
|
file_path=str(rel_path),
|
|
line_start=line_number,
|
|
line_end=line_number,
|
|
code_snippet=code.strip() if code else None,
|
|
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
|
|
metadata={
|
|
"test_id": test_id,
|
|
"test_name": test_name,
|
|
"confidence": issue.get("issue_confidence", "LOW").lower(),
|
|
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
|
|
"more_info": issue.get("more_info", "")
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
return findings
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute the Bandit analyzer module.
|
|
|
|
Args:
|
|
config: Module configuration
|
|
workspace: Path to workspace
|
|
|
|
Returns:
|
|
ModuleResult with security findings
|
|
"""
|
|
start_time = time.time()
|
|
metadata = self.get_metadata()
|
|
|
|
# Validate inputs
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
# Get configuration
|
|
severity_level = config.get("severity_level", "low")
|
|
confidence_level = config.get("confidence_level", "medium")
|
|
exclude_tests = config.get("exclude_tests", True)
|
|
skip_ids = config.get("skip_ids", [])
|
|
|
|
# Run Bandit
|
|
logger.info("Starting Bandit analysis...")
|
|
bandit_result = await self._run_bandit(
|
|
workspace,
|
|
severity_level,
|
|
confidence_level,
|
|
exclude_tests,
|
|
skip_ids
|
|
)
|
|
|
|
# Convert to findings
|
|
findings = self._convert_to_findings(
|
|
bandit_result,
|
|
workspace,
|
|
severity_level,
|
|
confidence_level
|
|
)
|
|
|
|
# Calculate summary
|
|
severity_counts = {}
|
|
for finding in findings:
|
|
sev = finding.severity
|
|
severity_counts[sev] = severity_counts.get(sev, 0) + 1
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
return ModuleResult(
|
|
module=metadata.name,
|
|
version=metadata.version,
|
|
status="success",
|
|
execution_time=execution_time,
|
|
findings=findings,
|
|
summary={
|
|
"total_issues": len(findings),
|
|
"by_severity": severity_counts,
|
|
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
|
|
},
|
|
metadata={
|
|
"bandit_version": bandit_result.get("generated_at", "unknown"),
|
|
"metrics": bandit_result.get("metrics", {})
|
|
}
|
|
)
|