Files
fuzzforge_ai/backend/toolbox/workflows/python_sast/activities.py
tduhamel42 9468a8b023 feat: Add Python SAST workflow with three security analysis tools
Implements Issue #5 - Python SAST workflow that combines:
- Dependency scanning (pip-audit) for CVE detection
- Security linting (Bandit) for vulnerability patterns
- Type checking (Mypy) for type safety issues

## Changes

**New Modules:**
- `DependencyScanner`: Scans Python dependencies for known CVEs using pip-audit
- `BanditAnalyzer`: Analyzes Python code for security issues using Bandit
- `MypyAnalyzer`: Checks Python code for type safety issues using Mypy

**New Workflow:**
- `python_sast`: Temporal workflow that orchestrates all three SAST tools
  - Runs tools in parallel for fast feedback (3-5 min vs hours for fuzzing)
  - Generates unified SARIF report with findings from all tools
  - Supports configurable severity/confidence thresholds

**Updates:**
- Added SAST dependencies to Python worker (bandit, pip-audit, mypy)
- Updated module __init__.py files to export new analyzers
- Added type_errors.py test file to vulnerable_app for Mypy validation

## Testing

Workflow tested successfully on vulnerable_app:
-  Bandit: Detected 9 security issues (command injection, unsafe functions)
-  Mypy: Detected 5 type errors
-  DependencyScanner: Ran successfully (no CVEs in test dependencies)
-  SARIF export: Generated valid SARIF with 14 total findings
2025-10-22 15:28:19 +02:00

192 lines
5.6 KiB
Python

"""
Python SAST Workflow Activities
Activities specific to the Python SAST workflow:
- scan_dependencies_activity: Scan Python dependencies for CVEs using pip-audit
- analyze_with_bandit_activity: Analyze Python code for security issues using Bandit
- analyze_with_mypy_activity: Analyze Python code for type safety using Mypy
- generate_python_sast_sarif_activity: Generate SARIF report from all findings
"""
import logging
import sys
from pathlib import Path
from temporalio import activity
# Configure logging
logger = logging.getLogger(__name__)
# Add toolbox to path for module imports
sys.path.insert(0, '/app/toolbox')
@activity.defn(name="scan_dependencies")
async def scan_dependencies_activity(workspace_path: str, config: dict) -> dict:
"""
Scan Python dependencies for known vulnerabilities using pip-audit.
Args:
workspace_path: Path to the workspace directory
config: DependencyScanner configuration
Returns:
Scanner results dictionary
"""
logger.info(f"Activity: scan_dependencies (workspace={workspace_path})")
try:
from modules.scanner import DependencyScanner
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
scanner = DependencyScanner()
result = await scanner.execute(config, workspace)
logger.info(
f"✓ Dependency scanning completed: "
f"{result.summary.get('total_vulnerabilities', 0)} vulnerabilities found"
)
return result.dict()
except Exception as e:
logger.error(f"Dependency scanning failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_bandit")
async def analyze_with_bandit_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for security issues using Bandit.
Args:
workspace_path: Path to the workspace directory
config: BanditAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_bandit (workspace={workspace_path})")
try:
from modules.analyzer import BanditAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = BanditAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Bandit analysis completed: "
f"{result.summary.get('total_issues', 0)} security issues found"
)
return result.dict()
except Exception as e:
logger.error(f"Bandit analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_mypy")
async def analyze_with_mypy_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for type safety issues using Mypy.
Args:
workspace_path: Path to the workspace directory
config: MypyAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_mypy (workspace={workspace_path})")
try:
from modules.analyzer import MypyAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = MypyAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Mypy analysis completed: "
f"{result.summary.get('total_errors', 0)} type errors found"
)
return result.dict()
except Exception as e:
logger.error(f"Mypy analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="generate_python_sast_sarif")
async def generate_python_sast_sarif_activity(
dependency_results: dict,
bandit_results: dict,
mypy_results: dict,
config: dict,
workspace_path: str
) -> dict:
"""
Generate SARIF report from all SAST analysis results.
Args:
dependency_results: Results from dependency scanner
bandit_results: Results from Bandit analyzer
mypy_results: Results from Mypy analyzer
config: Reporter configuration
workspace_path: Path to the workspace
Returns:
SARIF report dictionary
"""
logger.info("Activity: generate_python_sast_sarif")
try:
from modules.reporter import SARIFReporter
workspace = Path(workspace_path)
# Combine findings from all modules
all_findings = []
# Add dependency scanner findings
dependency_findings = dependency_results.get("findings", [])
all_findings.extend(dependency_findings)
# Add Bandit findings
bandit_findings = bandit_results.get("findings", [])
all_findings.extend(bandit_findings)
# Add Mypy findings
mypy_findings = mypy_results.get("findings", [])
all_findings.extend(mypy_findings)
# Prepare reporter config
reporter_config = {
**config,
"findings": all_findings,
"tool_name": "FuzzForge Python SAST",
"tool_version": "1.0.0"
}
reporter = SARIFReporter()
result = await reporter.execute(reporter_config, workspace)
# Extract SARIF from result
sarif = result.dict().get("sarif", {})
logger.info(f"✓ SARIF report generated with {len(all_findings)} findings")
return sarif
except Exception as e:
logger.error(f"SARIF report generation failed: {e}", exc_info=True)
raise