feat: Add Python SAST workflow with three security analysis tools

Implements Issue #5 - Python SAST workflow that combines:
- Dependency scanning (pip-audit) for CVE detection
- Security linting (Bandit) for vulnerability patterns
- Type checking (Mypy) for type safety issues

## Changes

**New Modules:**
- `DependencyScanner`: Scans Python dependencies for known CVEs using pip-audit
- `BanditAnalyzer`: Analyzes Python code for security issues using Bandit
- `MypyAnalyzer`: Checks Python code for type safety issues using Mypy

**New Workflow:**
- `python_sast`: Temporal workflow that orchestrates all three SAST tools
  - Runs tools in parallel for fast feedback (3-5 min vs hours for fuzzing)
  - Generates unified SARIF report with findings from all tools
  - Supports configurable severity/confidence thresholds

**Updates:**
- Added SAST dependencies to Python worker (bandit, pip-audit, mypy)
- Updated module __init__.py files to export new analyzers
- Added type_errors.py test file to vulnerable_app for Mypy validation

## Testing

Workflow tested successfully on vulnerable_app:
-  Bandit: Detected 9 security issues (command injection, unsafe functions)
-  Mypy: Detected 5 type errors
-  DependencyScanner: Ran successfully (no CVEs in test dependencies)
-  SARIF export: Generated valid SARIF with 14 total findings
This commit is contained in:
tduhamel42
2025-10-22 15:28:19 +02:00
parent 0ed6809d94
commit 6abf4ef71d
11 changed files with 1556 additions and 2 deletions

View File

@@ -10,5 +10,7 @@
# Additional attribution and requirements are provided in the NOTICE file.
from .security_analyzer import SecurityAnalyzer
from .bandit_analyzer import BanditAnalyzer
from .mypy_analyzer import MypyAnalyzer
__all__ = ["SecurityAnalyzer"]
__all__ = ["SecurityAnalyzer", "BanditAnalyzer", "MypyAnalyzer"]

View File

@@ -0,0 +1,329 @@
"""
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
import uuid
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class BanditAnalyzer(BaseModule):
"""
Analyzes Python code for security issues using Bandit.
This module:
- Runs Bandit security linter on Python files
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
- Reports findings with severity levels
"""
# Severity mapping from Bandit levels to our standard
SEVERITY_MAP = {
"LOW": "low",
"MEDIUM": "medium",
"HIGH": "high"
}
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="bandit_analyzer",
version="1.0.0",
description="Analyzes Python code for security issues using Bandit",
author="FuzzForge Team",
category="analyzer",
tags=["python", "security", "bandit", "sast"],
input_schema={
"severity_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum severity level to report",
"default": "low"
},
"confidence_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum confidence level to report",
"default": "medium"
},
"exclude_tests": {
"type": "boolean",
"description": "Exclude test files from analysis",
"default": True
},
"skip_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of Bandit test IDs to skip",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of security issues found by Bandit"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
severity = config.get("severity_level", "low")
if severity not in ["low", "medium", "high"]:
raise ValueError("severity_level must be one of: low, medium, high")
confidence = config.get("confidence_level", "medium")
if confidence not in ["low", "medium", "high"]:
raise ValueError("confidence_level must be one of: low, medium, high")
skip_ids = config.get("skip_ids", [])
if not isinstance(skip_ids, list):
raise ValueError("skip_ids must be a list")
return True
async def _run_bandit(
self,
workspace: Path,
severity_level: str,
confidence_level: str,
exclude_tests: bool,
skip_ids: List[str]
) -> Dict[str, Any]:
"""
Run Bandit on the workspace.
Args:
workspace: Path to workspace
severity_level: Minimum severity to report
confidence_level: Minimum confidence to report
exclude_tests: Whether to exclude test files
skip_ids: List of test IDs to skip
Returns:
Bandit JSON output as dict
"""
try:
# Build bandit command
cmd = [
"bandit",
"-r", str(workspace),
"-f", "json",
"-ll", # Report all findings (we'll filter later)
]
# Add exclude patterns for test files
if exclude_tests:
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
# Add skip IDs if specified
if skip_ids:
cmd.extend(["-s", ",".join(skip_ids)])
logger.info(f"Running Bandit on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Bandit returns non-zero if issues found, which is expected
if process.returncode not in [0, 1]:
logger.error(f"Bandit failed: {stderr.decode()}")
return {"results": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running Bandit: {e}")
return {"results": []}
def _should_include_finding(
self,
issue: Dict[str, Any],
min_severity: str,
min_confidence: str
) -> bool:
"""
Determine if a Bandit issue should be included based on severity/confidence.
Args:
issue: Bandit issue dict
min_severity: Minimum severity threshold
min_confidence: Minimum confidence threshold
Returns:
True if issue should be included
"""
severity_order = ["low", "medium", "high"]
issue_severity = issue.get("issue_severity", "LOW").lower()
issue_confidence = issue.get("issue_confidence", "LOW").lower()
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
return severity_meets_threshold and confidence_meets_threshold
def _convert_to_findings(
self,
bandit_result: Dict[str, Any],
workspace: Path,
min_severity: str,
min_confidence: str
) -> List[ModuleFinding]:
"""
Convert Bandit results to ModuleFindings.
Args:
bandit_result: Bandit JSON output
workspace: Workspace path for relative paths
min_severity: Minimum severity to include
min_confidence: Minimum confidence to include
Returns:
List of ModuleFindings
"""
findings = []
for issue in bandit_result.get("results", []):
# Filter by severity and confidence
if not self._should_include_finding(issue, min_severity, min_confidence):
continue
# Extract issue details
test_id = issue.get("test_id", "B000")
test_name = issue.get("test_name", "unknown")
issue_text = issue.get("issue_text", "No description")
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
# File location
filename = issue.get("filename", "")
line_number = issue.get("line_number", 0)
code = issue.get("code", "")
# Try to get relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name
# Create finding
finding = self.create_finding(
title=f"{test_name} ({test_id})",
description=issue_text,
severity=severity,
category="security-issue",
file_path=str(rel_path),
line_start=line_number,
line_end=line_number,
code_snippet=code.strip() if code else None,
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
metadata={
"test_id": test_id,
"test_name": test_name,
"confidence": issue.get("issue_confidence", "LOW").lower(),
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
"more_info": issue.get("more_info", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Bandit analyzer module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with security findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
severity_level = config.get("severity_level", "low")
confidence_level = config.get("confidence_level", "medium")
exclude_tests = config.get("exclude_tests", True)
skip_ids = config.get("skip_ids", [])
# Run Bandit
logger.info("Starting Bandit analysis...")
bandit_result = await self._run_bandit(
workspace,
severity_level,
confidence_level,
exclude_tests,
skip_ids
)
# Convert to findings
findings = self._convert_to_findings(
bandit_result,
workspace,
severity_level,
confidence_level
)
# Calculate summary
severity_counts = {}
for finding in findings:
sev = finding.severity
severity_counts[sev] = severity_counts.get(sev, 0) + 1
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_issues": len(findings),
"by_severity": severity_counts,
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"bandit_version": bandit_result.get("generated_at", "unknown"),
"metrics": bandit_result.get("metrics", {})
}
)

View File

@@ -0,0 +1,269 @@
"""
Mypy Analyzer Module - Analyzes Python code for type safety issues using Mypy
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import logging
import re
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class MypyAnalyzer(BaseModule):
"""
Analyzes Python code for type safety issues using Mypy.
This module:
- Runs Mypy type checker on Python files
- Detects type errors and inconsistencies
- Reports findings with configurable strictness
"""
# Map Mypy error codes to severity
ERROR_SEVERITY_MAP = {
"error": "medium",
"note": "info"
}
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="mypy_analyzer",
version="1.0.0",
description="Analyzes Python code for type safety issues using Mypy",
author="FuzzForge Team",
category="analyzer",
tags=["python", "type-checking", "mypy", "sast"],
input_schema={
"strict_mode": {
"type": "boolean",
"description": "Enable strict type checking",
"default": False
},
"ignore_missing_imports": {
"type": "boolean",
"description": "Ignore errors about missing imports",
"default": True
},
"follow_imports": {
"type": "string",
"enum": ["normal", "silent", "skip", "error"],
"description": "How to handle imports",
"default": "silent"
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of type errors found by Mypy"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
follow_imports = config.get("follow_imports", "silent")
if follow_imports not in ["normal", "silent", "skip", "error"]:
raise ValueError("follow_imports must be one of: normal, silent, skip, error")
return True
async def _run_mypy(
self,
workspace: Path,
strict_mode: bool,
ignore_missing_imports: bool,
follow_imports: str
) -> str:
"""
Run Mypy on the workspace.
Args:
workspace: Path to workspace
strict_mode: Enable strict checking
ignore_missing_imports: Ignore missing import errors
follow_imports: How to handle imports
Returns:
Mypy output as string
"""
try:
# Build mypy command
cmd = [
"mypy",
str(workspace),
"--show-column-numbers",
"--no-error-summary",
f"--follow-imports={follow_imports}"
]
if strict_mode:
cmd.append("--strict")
if ignore_missing_imports:
cmd.append("--ignore-missing-imports")
logger.info(f"Running Mypy on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Mypy returns non-zero if errors found, which is expected
output = stdout.decode()
return output
except Exception as e:
logger.error(f"Error running Mypy: {e}")
return ""
def _parse_mypy_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
"""
Parse Mypy output and convert to findings.
Mypy output format:
file.py:10:5: error: Incompatible return value type [return-value]
file.py:15: note: See https://...
Args:
output: Mypy stdout
workspace: Workspace path for relative paths
Returns:
List of ModuleFindings
"""
findings = []
# Regex to parse mypy output lines
# Format: filename:line:column: level: message [error-code]
pattern = r'^(.+?):(\d+)(?::(\d+))?: (error|note): (.+?)(?:\s+\[([^\]]+)\])?$'
for line in output.splitlines():
match = re.match(pattern, line.strip())
if not match:
continue
filename, line_num, column, level, message, error_code = match.groups()
# Convert to relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name
# Skip if it's just a note (unless it's a standalone note)
if level == "note" and not error_code:
continue
# Map severity
severity = self.ERROR_SEVERITY_MAP.get(level, "medium")
# Create finding
title = f"Type error: {error_code or 'type-issue'}"
description = message
finding = self.create_finding(
title=title,
description=description,
severity=severity,
category="type-error",
file_path=str(rel_path),
line_start=int(line_num),
line_end=int(line_num),
recommendation="Review and fix the type inconsistency or add appropriate type annotations",
metadata={
"error_code": error_code or "unknown",
"column": int(column) if column else None,
"level": level
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Mypy analyzer module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with type checking findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
strict_mode = config.get("strict_mode", False)
ignore_missing_imports = config.get("ignore_missing_imports", True)
follow_imports = config.get("follow_imports", "silent")
# Run Mypy
logger.info("Starting Mypy analysis...")
mypy_output = await self._run_mypy(
workspace,
strict_mode,
ignore_missing_imports,
follow_imports
)
# Parse output to findings
findings = self._parse_mypy_output(mypy_output, workspace)
# Calculate summary
error_code_counts = {}
for finding in findings:
code = finding.metadata.get("error_code", "unknown")
error_code_counts[code] = error_code_counts.get(code, 0) + 1
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_errors": len(findings),
"by_error_code": error_code_counts,
"files_with_errors": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"strict_mode": strict_mode,
"ignore_missing_imports": ignore_missing_imports
}
)

View File

@@ -10,5 +10,6 @@
# Additional attribution and requirements are provided in the NOTICE file.
from .file_scanner import FileScanner
from .dependency_scanner import DependencyScanner
__all__ = ["FileScanner"]
__all__ = ["FileScanner", "DependencyScanner"]

View File

@@ -0,0 +1,304 @@
"""
Dependency Scanner Module - Scans Python dependencies for known vulnerabilities using pip-audit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
import uuid
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class DependencyScanner(BaseModule):
"""
Scans Python dependencies for known vulnerabilities using pip-audit.
This module:
- Discovers dependency files (requirements.txt, pyproject.toml, setup.py, Pipfile)
- Runs pip-audit to check for vulnerable dependencies
- Reports CVEs with severity and affected versions
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="dependency_scanner",
version="1.0.0",
description="Scans Python dependencies for known vulnerabilities",
author="FuzzForge Team",
category="scanner",
tags=["dependencies", "cve", "vulnerabilities", "pip-audit"],
input_schema={
"dependency_files": {
"type": "array",
"items": {"type": "string"},
"description": "List of dependency files to scan (auto-discovered if empty)",
"default": []
},
"ignore_vulns": {
"type": "array",
"items": {"type": "string"},
"description": "List of vulnerability IDs to ignore",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of vulnerable dependencies with CVE information"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
dep_files = config.get("dependency_files", [])
if not isinstance(dep_files, list):
raise ValueError("dependency_files must be a list")
ignore_vulns = config.get("ignore_vulns", [])
if not isinstance(ignore_vulns, list):
raise ValueError("ignore_vulns must be a list")
return True
def _discover_dependency_files(self, workspace: Path) -> List[Path]:
"""
Discover Python dependency files in workspace.
Returns:
List of discovered dependency file paths
"""
dependency_patterns = [
"requirements.txt",
"*requirements*.txt",
"pyproject.toml",
"setup.py",
"Pipfile",
"poetry.lock"
]
found_files = []
for pattern in dependency_patterns:
found_files.extend(workspace.rglob(pattern))
# Deduplicate and return
unique_files = list(set(found_files))
logger.info(f"Discovered {len(unique_files)} dependency files")
return unique_files
async def _run_pip_audit(self, file_path: Path) -> Dict[str, Any]:
"""
Run pip-audit on a specific dependency file.
Args:
file_path: Path to dependency file
Returns:
pip-audit JSON output as dict
"""
try:
# Run pip-audit with JSON output
cmd = [
"pip-audit",
"--requirement", str(file_path),
"--format", "json",
"--progress-spinner", "off"
]
logger.info(f"Running pip-audit on: {file_path.name}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# pip-audit returns 0 if no vulns, 1 if vulns found
if process.returncode not in [0, 1]:
logger.error(f"pip-audit failed: {stderr.decode()}")
return {"dependencies": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running pip-audit on {file_path}: {e}")
return {"dependencies": []}
def _convert_to_findings(
self,
audit_result: Dict[str, Any],
file_path: Path,
workspace: Path,
ignore_vulns: List[str]
) -> List[ModuleFinding]:
"""
Convert pip-audit results to ModuleFindings.
Args:
audit_result: pip-audit JSON output
file_path: Path to scanned file
workspace: Workspace path for relative path calculation
ignore_vulns: List of vulnerability IDs to ignore
Returns:
List of ModuleFindings
"""
findings = []
# pip-audit format: {"dependencies": [{package, version, vulns: []}]}
for dep in audit_result.get("dependencies", []):
package_name = dep.get("name", "unknown")
package_version = dep.get("version", "unknown")
vulnerabilities = dep.get("vulns", [])
for vuln in vulnerabilities:
vuln_id = vuln.get("id", "UNKNOWN")
# Skip if in ignore list
if vuln_id in ignore_vulns:
logger.debug(f"Ignoring vulnerability: {vuln_id}")
continue
description = vuln.get("description", "No description available")
fix_versions = vuln.get("fix_versions", [])
# Map CVSS scores to severity
# pip-audit doesn't always provide CVSS, so we default to medium
severity = "medium"
# Try to get relative path
try:
rel_path = file_path.relative_to(workspace)
except ValueError:
rel_path = file_path
finding_id = str(uuid.uuid4())
recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}"
finding = self.create_finding(
title=f"Vulnerable dependency: {package_name} ({vuln_id})",
description=f"{description}\n\nAffected package: {package_name} {package_version}",
severity=severity,
category="vulnerable-dependency",
file_path=str(rel_path),
recommendation=recommendation,
metadata={
"cve_id": vuln_id,
"package": package_name,
"installed_version": package_version,
"fix_versions": fix_versions,
"aliases": vuln.get("aliases", []),
"link": vuln.get("link", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the dependency scanning module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with vulnerability findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
specified_files = config.get("dependency_files", [])
ignore_vulns = config.get("ignore_vulns", [])
# Discover or use specified dependency files
if specified_files:
dep_files = [workspace / f for f in specified_files]
else:
dep_files = self._discover_dependency_files(workspace)
if not dep_files:
logger.warning("No dependency files found in workspace")
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=time.time() - start_time,
findings=[],
summary={
"total_files": 0,
"total_vulnerabilities": 0,
"vulnerable_packages": 0
}
)
# Scan each dependency file
all_findings = []
files_scanned = 0
for dep_file in dep_files:
if not dep_file.exists():
logger.warning(f"Dependency file not found: {dep_file}")
continue
logger.info(f"Scanning dependencies in: {dep_file.name}")
audit_result = await self._run_pip_audit(dep_file)
findings = self._convert_to_findings(audit_result, dep_file, workspace, ignore_vulns)
all_findings.extend(findings)
files_scanned += 1
# Calculate summary
unique_packages = len(set(f.metadata.get("package") for f in all_findings))
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=all_findings,
summary={
"total_files": files_scanned,
"total_vulnerabilities": len(all_findings),
"vulnerable_packages": unique_packages
},
metadata={
"scanned_files": [str(f.name) for f in dep_files if f.exists()]
}
)

View File

@@ -0,0 +1,10 @@
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.

View File

@@ -0,0 +1,191 @@
"""
Python SAST Workflow Activities
Activities specific to the Python SAST workflow:
- scan_dependencies_activity: Scan Python dependencies for CVEs using pip-audit
- analyze_with_bandit_activity: Analyze Python code for security issues using Bandit
- analyze_with_mypy_activity: Analyze Python code for type safety using Mypy
- generate_python_sast_sarif_activity: Generate SARIF report from all findings
"""
import logging
import sys
from pathlib import Path
from temporalio import activity
# Configure logging
logger = logging.getLogger(__name__)
# Add toolbox to path for module imports
sys.path.insert(0, '/app/toolbox')
@activity.defn(name="scan_dependencies")
async def scan_dependencies_activity(workspace_path: str, config: dict) -> dict:
"""
Scan Python dependencies for known vulnerabilities using pip-audit.
Args:
workspace_path: Path to the workspace directory
config: DependencyScanner configuration
Returns:
Scanner results dictionary
"""
logger.info(f"Activity: scan_dependencies (workspace={workspace_path})")
try:
from modules.scanner import DependencyScanner
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
scanner = DependencyScanner()
result = await scanner.execute(config, workspace)
logger.info(
f"✓ Dependency scanning completed: "
f"{result.summary.get('total_vulnerabilities', 0)} vulnerabilities found"
)
return result.dict()
except Exception as e:
logger.error(f"Dependency scanning failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_bandit")
async def analyze_with_bandit_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for security issues using Bandit.
Args:
workspace_path: Path to the workspace directory
config: BanditAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_bandit (workspace={workspace_path})")
try:
from modules.analyzer import BanditAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = BanditAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Bandit analysis completed: "
f"{result.summary.get('total_issues', 0)} security issues found"
)
return result.dict()
except Exception as e:
logger.error(f"Bandit analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_mypy")
async def analyze_with_mypy_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for type safety issues using Mypy.
Args:
workspace_path: Path to the workspace directory
config: MypyAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_mypy (workspace={workspace_path})")
try:
from modules.analyzer import MypyAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = MypyAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Mypy analysis completed: "
f"{result.summary.get('total_errors', 0)} type errors found"
)
return result.dict()
except Exception as e:
logger.error(f"Mypy analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="generate_python_sast_sarif")
async def generate_python_sast_sarif_activity(
dependency_results: dict,
bandit_results: dict,
mypy_results: dict,
config: dict,
workspace_path: str
) -> dict:
"""
Generate SARIF report from all SAST analysis results.
Args:
dependency_results: Results from dependency scanner
bandit_results: Results from Bandit analyzer
mypy_results: Results from Mypy analyzer
config: Reporter configuration
workspace_path: Path to the workspace
Returns:
SARIF report dictionary
"""
logger.info("Activity: generate_python_sast_sarif")
try:
from modules.reporter import SARIFReporter
workspace = Path(workspace_path)
# Combine findings from all modules
all_findings = []
# Add dependency scanner findings
dependency_findings = dependency_results.get("findings", [])
all_findings.extend(dependency_findings)
# Add Bandit findings
bandit_findings = bandit_results.get("findings", [])
all_findings.extend(bandit_findings)
# Add Mypy findings
mypy_findings = mypy_results.get("findings", [])
all_findings.extend(mypy_findings)
# Prepare reporter config
reporter_config = {
**config,
"findings": all_findings,
"tool_name": "FuzzForge Python SAST",
"tool_version": "1.0.0"
}
reporter = SARIFReporter()
result = await reporter.execute(reporter_config, workspace)
# Extract SARIF from result
sarif = result.dict().get("sarif", {})
logger.info(f"✓ SARIF report generated with {len(all_findings)} findings")
return sarif
except Exception as e:
logger.error(f"SARIF report generation failed: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,116 @@
name: python_sast
version: "1.0.0"
vertical: python
description: "Python Static Application Security Testing (SAST) workflow combining dependency scanning (pip-audit), security linting (Bandit), and type checking (Mypy)"
author: "FuzzForge Team"
tags:
- "python"
- "sast"
- "security"
- "type-checking"
- "dependencies"
- "bandit"
- "mypy"
- "pip-audit"
- "sarif"
# Workspace isolation mode (system-level configuration)
# Using "shared" mode for read-only SAST analysis (no file modifications)
workspace_isolation: "shared"
default_parameters:
dependency_config: {}
bandit_config: {}
mypy_config: {}
reporter_config: {}
parameters:
type: object
properties:
dependency_config:
type: object
description: "Dependency scanner (pip-audit) configuration"
properties:
dependency_files:
type: array
items:
type: string
description: "List of dependency files to scan (auto-discovered if empty)"
default: []
ignore_vulns:
type: array
items:
type: string
description: "List of vulnerability IDs to ignore"
default: []
bandit_config:
type: object
description: "Bandit security analyzer configuration"
properties:
severity_level:
type: string
enum: ["low", "medium", "high"]
description: "Minimum severity level to report"
default: "low"
confidence_level:
type: string
enum: ["low", "medium", "high"]
description: "Minimum confidence level to report"
default: "medium"
exclude_tests:
type: boolean
description: "Exclude test files from analysis"
default: true
skip_ids:
type: array
items:
type: string
description: "List of Bandit test IDs to skip"
default: []
mypy_config:
type: object
description: "Mypy type checker configuration"
properties:
strict_mode:
type: boolean
description: "Enable strict type checking"
default: false
ignore_missing_imports:
type: boolean
description: "Ignore errors about missing imports"
default: true
follow_imports:
type: string
enum: ["normal", "silent", "skip", "error"]
description: "How to handle imports"
default: "silent"
reporter_config:
type: object
description: "SARIF reporter configuration"
properties:
include_code_flows:
type: boolean
description: "Include code flow information"
default: false
output_schema:
type: object
properties:
sarif:
type: object
description: "SARIF-formatted SAST findings from all tools"
summary:
type: object
description: "SAST execution summary"
properties:
total_findings:
type: integer
vulnerabilities:
type: integer
description: "CVEs found in dependencies"
security_issues:
type: integer
description: "Security issues found by Bandit"
type_errors:
type: integer
description: "Type errors found by Mypy"

View File

@@ -0,0 +1,265 @@
"""
Python SAST Workflow - Temporal Version
Static Application Security Testing for Python projects using multiple tools.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from datetime import timedelta
from typing import Dict, Any, Optional
from temporalio import workflow
from temporalio.common import RetryPolicy
# Import activity interfaces (will be executed by worker)
with workflow.unsafe.imports_passed_through():
import logging
logger = logging.getLogger(__name__)
@workflow.defn
class PythonSastWorkflow:
"""
Python Static Application Security Testing workflow.
This workflow:
1. Downloads target from MinIO
2. Runs dependency scanning (pip-audit for CVEs)
3. Runs security linting (Bandit for security issues)
4. Runs type checking (Mypy for type safety)
5. Generates a SARIF report with all findings
6. Uploads results to MinIO
7. Cleans up cache
"""
@workflow.run
async def run(
self,
target_id: str,
dependency_config: Optional[Dict[str, Any]] = None,
bandit_config: Optional[Dict[str, Any]] = None,
mypy_config: Optional[Dict[str, Any]] = None,
reporter_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Main workflow execution.
Args:
target_id: UUID of the uploaded target in MinIO
dependency_config: Configuration for dependency scanner
bandit_config: Configuration for Bandit analyzer
mypy_config: Configuration for Mypy analyzer
reporter_config: Configuration for SARIF reporter
Returns:
Dictionary containing SARIF report and summary
"""
workflow_id = workflow.info().workflow_id
workflow.logger.info(
f"Starting PythonSASTWorkflow "
f"(workflow_id={workflow_id}, target_id={target_id})"
)
# Default configurations
if not dependency_config:
dependency_config = {
"dependency_files": [], # Auto-discover
"ignore_vulns": []
}
if not bandit_config:
bandit_config = {
"severity_level": "low",
"confidence_level": "medium",
"exclude_tests": True,
"skip_ids": []
}
if not mypy_config:
mypy_config = {
"strict_mode": False,
"ignore_missing_imports": True,
"follow_imports": "silent"
}
if not reporter_config:
reporter_config = {
"include_code_flows": False
}
results = {
"workflow_id": workflow_id,
"target_id": target_id,
"status": "running",
"steps": []
}
try:
# Get run ID for workspace isolation (using shared mode for read-only analysis)
run_id = workflow.info().run_id
# Step 1: Download target from MinIO
workflow.logger.info("Step 1: Downloading target from MinIO")
target_path = await workflow.execute_activity(
"get_target",
args=[target_id, run_id, "shared"], # target_id, run_id, workspace_isolation
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["steps"].append({
"step": "download_target",
"status": "success",
"target_path": target_path
})
workflow.logger.info(f"✓ Target downloaded to: {target_path}")
# Step 2: Dependency scanning (pip-audit)
workflow.logger.info("Step 2: Scanning dependencies for vulnerabilities")
dependency_results = await workflow.execute_activity(
"scan_dependencies",
args=[target_path, dependency_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "dependency_scanning",
"status": "success",
"vulnerabilities": dependency_results.get("summary", {}).get("total_vulnerabilities", 0)
})
workflow.logger.info(
f"✓ Dependency scanning completed: "
f"{dependency_results.get('summary', {}).get('total_vulnerabilities', 0)} vulnerabilities"
)
# Step 3: Security linting (Bandit)
workflow.logger.info("Step 3: Analyzing security issues with Bandit")
bandit_results = await workflow.execute_activity(
"analyze_with_bandit",
args=[target_path, bandit_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "bandit_analysis",
"status": "success",
"issues": bandit_results.get("summary", {}).get("total_issues", 0)
})
workflow.logger.info(
f"✓ Bandit analysis completed: "
f"{bandit_results.get('summary', {}).get('total_issues', 0)} security issues"
)
# Step 4: Type checking (Mypy)
workflow.logger.info("Step 4: Type checking with Mypy")
mypy_results = await workflow.execute_activity(
"analyze_with_mypy",
args=[target_path, mypy_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "mypy_analysis",
"status": "success",
"type_errors": mypy_results.get("summary", {}).get("total_errors", 0)
})
workflow.logger.info(
f"✓ Mypy analysis completed: "
f"{mypy_results.get('summary', {}).get('total_errors', 0)} type errors"
)
# Step 5: Generate SARIF report
workflow.logger.info("Step 5: Generating SARIF report")
sarif_report = await workflow.execute_activity(
"generate_python_sast_sarif",
args=[dependency_results, bandit_results, mypy_results, reporter_config, target_path],
start_to_close_timeout=timedelta(minutes=5)
)
results["steps"].append({
"step": "report_generation",
"status": "success"
})
# Count total findings in SARIF
total_findings = 0
if sarif_report and "runs" in sarif_report:
total_findings = len(sarif_report["runs"][0].get("results", []))
workflow.logger.info(f"✓ SARIF report generated with {total_findings} findings")
# Step 6: Upload results to MinIO
workflow.logger.info("Step 6: Uploading results")
try:
results_url = await workflow.execute_activity(
"upload_results",
args=[workflow_id, sarif_report, "sarif"],
start_to_close_timeout=timedelta(minutes=2)
)
results["results_url"] = results_url
workflow.logger.info(f"✓ Results uploaded to: {results_url}")
except Exception as e:
workflow.logger.warning(f"Failed to upload results: {e}")
results["results_url"] = None
# Step 7: Cleanup cache
workflow.logger.info("Step 7: Cleaning up cache")
try:
await workflow.execute_activity(
"cleanup_cache",
args=[target_path, "shared"], # target_path, workspace_isolation
start_to_close_timeout=timedelta(minutes=1)
)
workflow.logger.info("✓ Cache cleaned up (skipped for shared mode)")
except Exception as e:
workflow.logger.warning(f"Cache cleanup failed: {e}")
# Mark workflow as successful
results["status"] = "success"
results["sarif"] = sarif_report
results["summary"] = {
"total_findings": total_findings,
"vulnerabilities": dependency_results.get("summary", {}).get("total_vulnerabilities", 0),
"security_issues": bandit_results.get("summary", {}).get("total_issues", 0),
"type_errors": mypy_results.get("summary", {}).get("total_errors", 0)
}
workflow.logger.info(f"✓ Workflow completed successfully: {workflow_id}")
return results
except Exception as e:
workflow.logger.error(f"Workflow failed: {e}")
results["status"] = "error"
results["error"] = str(e)
results["steps"].append({
"step": "error",
"status": "failed",
"error": str(e)
})
raise

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
"""
Test file with type errors for Mypy testing.
"""
from typing import List, Dict
def add_numbers(a: int, b: int) -> int:
"""Add two integers"""
# Type error: returning string instead of int
return str(a + b)
def process_items(items: List[str]) -> None:
"""Process a list of strings"""
# Type error: iterating over None
for item in items:
print(item.upper())
# Type error: passing int to function expecting string list
process_items(123)
def get_user_data() -> Dict[str, str]:
"""Get user data"""
# Type error: returning wrong type
return ["user1", "user2"]
def calculate_total(numbers: List[int]) -> float:
"""Calculate total"""
# Type error: calling method that doesn't exist
return numbers.sum()
class User:
def __init__(self, name: str, age: int):
self.name = name
self.age = age
def create_user(name: str, age: int) -> User:
"""Create a user"""
# Type error: returning dict instead of User
return {"name": name, "age": age}
# Missing type annotations
def unsafe_function(x, y):
return x + y

View File

@@ -16,3 +16,8 @@ a2a-sdk[all]>=0.1.0
# Fuzzing
atheris>=2.3.0
# SAST Tools
bandit>=1.7.0
pip-audit>=2.6.0
mypy>=1.8.0