Merge pull request #23 from FuzzingLabs/feature/python-sast-workflow

feat: Add Python SAST workflow (Issue #5)
This commit is contained in:
tduhamel42
2025-10-22 15:55:26 +02:00
committed by GitHub
11 changed files with 1553 additions and 2 deletions

View File

@@ -10,5 +10,7 @@
# Additional attribution and requirements are provided in the NOTICE file.
from .security_analyzer import SecurityAnalyzer
from .bandit_analyzer import BanditAnalyzer
from .mypy_analyzer import MypyAnalyzer
__all__ = ["SecurityAnalyzer"]
__all__ = ["SecurityAnalyzer", "BanditAnalyzer", "MypyAnalyzer"]

View File

@@ -0,0 +1,328 @@
"""
Bandit Analyzer Module - Analyzes Python code for security issues using Bandit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class BanditAnalyzer(BaseModule):
"""
Analyzes Python code for security issues using Bandit.
This module:
- Runs Bandit security linter on Python files
- Detects common security issues (SQL injection, hardcoded secrets, etc.)
- Reports findings with severity levels
"""
# Severity mapping from Bandit levels to our standard
SEVERITY_MAP = {
"LOW": "low",
"MEDIUM": "medium",
"HIGH": "high"
}
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="bandit_analyzer",
version="1.0.0",
description="Analyzes Python code for security issues using Bandit",
author="FuzzForge Team",
category="analyzer",
tags=["python", "security", "bandit", "sast"],
input_schema={
"severity_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum severity level to report",
"default": "low"
},
"confidence_level": {
"type": "string",
"enum": ["low", "medium", "high"],
"description": "Minimum confidence level to report",
"default": "medium"
},
"exclude_tests": {
"type": "boolean",
"description": "Exclude test files from analysis",
"default": True
},
"skip_ids": {
"type": "array",
"items": {"type": "string"},
"description": "List of Bandit test IDs to skip",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of security issues found by Bandit"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
severity = config.get("severity_level", "low")
if severity not in ["low", "medium", "high"]:
raise ValueError("severity_level must be one of: low, medium, high")
confidence = config.get("confidence_level", "medium")
if confidence not in ["low", "medium", "high"]:
raise ValueError("confidence_level must be one of: low, medium, high")
skip_ids = config.get("skip_ids", [])
if not isinstance(skip_ids, list):
raise ValueError("skip_ids must be a list")
return True
async def _run_bandit(
self,
workspace: Path,
severity_level: str,
confidence_level: str,
exclude_tests: bool,
skip_ids: List[str]
) -> Dict[str, Any]:
"""
Run Bandit on the workspace.
Args:
workspace: Path to workspace
severity_level: Minimum severity to report
confidence_level: Minimum confidence to report
exclude_tests: Whether to exclude test files
skip_ids: List of test IDs to skip
Returns:
Bandit JSON output as dict
"""
try:
# Build bandit command
cmd = [
"bandit",
"-r", str(workspace),
"-f", "json",
"-ll", # Report all findings (we'll filter later)
]
# Add exclude patterns for test files
if exclude_tests:
cmd.extend(["-x", "*/test_*.py,*/tests/*,*_test.py"])
# Add skip IDs if specified
if skip_ids:
cmd.extend(["-s", ",".join(skip_ids)])
logger.info(f"Running Bandit on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Bandit returns non-zero if issues found, which is expected
if process.returncode not in [0, 1]:
logger.error(f"Bandit failed: {stderr.decode()}")
return {"results": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running Bandit: {e}")
return {"results": []}
def _should_include_finding(
self,
issue: Dict[str, Any],
min_severity: str,
min_confidence: str
) -> bool:
"""
Determine if a Bandit issue should be included based on severity/confidence.
Args:
issue: Bandit issue dict
min_severity: Minimum severity threshold
min_confidence: Minimum confidence threshold
Returns:
True if issue should be included
"""
severity_order = ["low", "medium", "high"]
issue_severity = issue.get("issue_severity", "LOW").lower()
issue_confidence = issue.get("issue_confidence", "LOW").lower()
severity_meets_threshold = severity_order.index(issue_severity) >= severity_order.index(min_severity)
confidence_meets_threshold = severity_order.index(issue_confidence) >= severity_order.index(min_confidence)
return severity_meets_threshold and confidence_meets_threshold
def _convert_to_findings(
self,
bandit_result: Dict[str, Any],
workspace: Path,
min_severity: str,
min_confidence: str
) -> List[ModuleFinding]:
"""
Convert Bandit results to ModuleFindings.
Args:
bandit_result: Bandit JSON output
workspace: Workspace path for relative paths
min_severity: Minimum severity to include
min_confidence: Minimum confidence to include
Returns:
List of ModuleFindings
"""
findings = []
for issue in bandit_result.get("results", []):
# Filter by severity and confidence
if not self._should_include_finding(issue, min_severity, min_confidence):
continue
# Extract issue details
test_id = issue.get("test_id", "B000")
test_name = issue.get("test_name", "unknown")
issue_text = issue.get("issue_text", "No description")
severity = self.SEVERITY_MAP.get(issue.get("issue_severity", "LOW"), "low")
# File location
filename = issue.get("filename", "")
line_number = issue.get("line_number", 0)
code = issue.get("code", "")
# Try to get relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name
# Create finding
finding = self.create_finding(
title=f"{test_name} ({test_id})",
description=issue_text,
severity=severity,
category="security-issue",
file_path=str(rel_path),
line_start=line_number,
line_end=line_number,
code_snippet=code.strip() if code else None,
recommendation=f"Review and fix the security issue identified by Bandit test {test_id}",
metadata={
"test_id": test_id,
"test_name": test_name,
"confidence": issue.get("issue_confidence", "LOW").lower(),
"cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None,
"more_info": issue.get("more_info", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Bandit analyzer module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with security findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
severity_level = config.get("severity_level", "low")
confidence_level = config.get("confidence_level", "medium")
exclude_tests = config.get("exclude_tests", True)
skip_ids = config.get("skip_ids", [])
# Run Bandit
logger.info("Starting Bandit analysis...")
bandit_result = await self._run_bandit(
workspace,
severity_level,
confidence_level,
exclude_tests,
skip_ids
)
# Convert to findings
findings = self._convert_to_findings(
bandit_result,
workspace,
severity_level,
confidence_level
)
# Calculate summary
severity_counts = {}
for finding in findings:
sev = finding.severity
severity_counts[sev] = severity_counts.get(sev, 0) + 1
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_issues": len(findings),
"by_severity": severity_counts,
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"bandit_version": bandit_result.get("generated_at", "unknown"),
"metrics": bandit_result.get("metrics", {})
}
)

View File

@@ -0,0 +1,269 @@
"""
Mypy Analyzer Module - Analyzes Python code for type safety issues using Mypy
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import logging
import re
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class MypyAnalyzer(BaseModule):
"""
Analyzes Python code for type safety issues using Mypy.
This module:
- Runs Mypy type checker on Python files
- Detects type errors and inconsistencies
- Reports findings with configurable strictness
"""
# Map Mypy error codes to severity
ERROR_SEVERITY_MAP = {
"error": "medium",
"note": "info"
}
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="mypy_analyzer",
version="1.0.0",
description="Analyzes Python code for type safety issues using Mypy",
author="FuzzForge Team",
category="analyzer",
tags=["python", "type-checking", "mypy", "sast"],
input_schema={
"strict_mode": {
"type": "boolean",
"description": "Enable strict type checking",
"default": False
},
"ignore_missing_imports": {
"type": "boolean",
"description": "Ignore errors about missing imports",
"default": True
},
"follow_imports": {
"type": "string",
"enum": ["normal", "silent", "skip", "error"],
"description": "How to handle imports",
"default": "silent"
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of type errors found by Mypy"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
follow_imports = config.get("follow_imports", "silent")
if follow_imports not in ["normal", "silent", "skip", "error"]:
raise ValueError("follow_imports must be one of: normal, silent, skip, error")
return True
async def _run_mypy(
self,
workspace: Path,
strict_mode: bool,
ignore_missing_imports: bool,
follow_imports: str
) -> str:
"""
Run Mypy on the workspace.
Args:
workspace: Path to workspace
strict_mode: Enable strict checking
ignore_missing_imports: Ignore missing import errors
follow_imports: How to handle imports
Returns:
Mypy output as string
"""
try:
# Build mypy command
cmd = [
"mypy",
str(workspace),
"--show-column-numbers",
"--no-error-summary",
f"--follow-imports={follow_imports}"
]
if strict_mode:
cmd.append("--strict")
if ignore_missing_imports:
cmd.append("--ignore-missing-imports")
logger.info(f"Running Mypy on: {workspace}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# Mypy returns non-zero if errors found, which is expected
output = stdout.decode()
return output
except Exception as e:
logger.error(f"Error running Mypy: {e}")
return ""
def _parse_mypy_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
"""
Parse Mypy output and convert to findings.
Mypy output format:
file.py:10:5: error: Incompatible return value type [return-value]
file.py:15: note: See https://...
Args:
output: Mypy stdout
workspace: Workspace path for relative paths
Returns:
List of ModuleFindings
"""
findings = []
# Regex to parse mypy output lines
# Format: filename:line:column: level: message [error-code]
pattern = r'^(.+?):(\d+)(?::(\d+))?: (error|note): (.+?)(?:\s+\[([^\]]+)\])?$'
for line in output.splitlines():
match = re.match(pattern, line.strip())
if not match:
continue
filename, line_num, column, level, message, error_code = match.groups()
# Convert to relative path
try:
file_path = Path(filename)
rel_path = file_path.relative_to(workspace)
except (ValueError, TypeError):
rel_path = Path(filename).name
# Skip if it's just a note (unless it's a standalone note)
if level == "note" and not error_code:
continue
# Map severity
severity = self.ERROR_SEVERITY_MAP.get(level, "medium")
# Create finding
title = f"Type error: {error_code or 'type-issue'}"
description = message
finding = self.create_finding(
title=title,
description=description,
severity=severity,
category="type-error",
file_path=str(rel_path),
line_start=int(line_num),
line_end=int(line_num),
recommendation="Review and fix the type inconsistency or add appropriate type annotations",
metadata={
"error_code": error_code or "unknown",
"column": int(column) if column else None,
"level": level
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the Mypy analyzer module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with type checking findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
strict_mode = config.get("strict_mode", False)
ignore_missing_imports = config.get("ignore_missing_imports", True)
follow_imports = config.get("follow_imports", "silent")
# Run Mypy
logger.info("Starting Mypy analysis...")
mypy_output = await self._run_mypy(
workspace,
strict_mode,
ignore_missing_imports,
follow_imports
)
# Parse output to findings
findings = self._parse_mypy_output(mypy_output, workspace)
# Calculate summary
error_code_counts = {}
for finding in findings:
code = finding.metadata.get("error_code", "unknown")
error_code_counts[code] = error_code_counts.get(code, 0) + 1
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=findings,
summary={
"total_errors": len(findings),
"by_error_code": error_code_counts,
"files_with_errors": len(set(f.file_path for f in findings if f.file_path))
},
metadata={
"strict_mode": strict_mode,
"ignore_missing_imports": ignore_missing_imports
}
)

View File

@@ -10,5 +10,6 @@
# Additional attribution and requirements are provided in the NOTICE file.
from .file_scanner import FileScanner
from .dependency_scanner import DependencyScanner
__all__ = ["FileScanner"]
__all__ = ["FileScanner", "DependencyScanner"]

View File

@@ -0,0 +1,302 @@
"""
Dependency Scanner Module - Scans Python dependencies for known vulnerabilities using pip-audit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class DependencyScanner(BaseModule):
"""
Scans Python dependencies for known vulnerabilities using pip-audit.
This module:
- Discovers dependency files (requirements.txt, pyproject.toml, setup.py, Pipfile)
- Runs pip-audit to check for vulnerable dependencies
- Reports CVEs with severity and affected versions
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="dependency_scanner",
version="1.0.0",
description="Scans Python dependencies for known vulnerabilities",
author="FuzzForge Team",
category="scanner",
tags=["dependencies", "cve", "vulnerabilities", "pip-audit"],
input_schema={
"dependency_files": {
"type": "array",
"items": {"type": "string"},
"description": "List of dependency files to scan (auto-discovered if empty)",
"default": []
},
"ignore_vulns": {
"type": "array",
"items": {"type": "string"},
"description": "List of vulnerability IDs to ignore",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of vulnerable dependencies with CVE information"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
dep_files = config.get("dependency_files", [])
if not isinstance(dep_files, list):
raise ValueError("dependency_files must be a list")
ignore_vulns = config.get("ignore_vulns", [])
if not isinstance(ignore_vulns, list):
raise ValueError("ignore_vulns must be a list")
return True
def _discover_dependency_files(self, workspace: Path) -> List[Path]:
"""
Discover Python dependency files in workspace.
Returns:
List of discovered dependency file paths
"""
dependency_patterns = [
"requirements.txt",
"*requirements*.txt",
"pyproject.toml",
"setup.py",
"Pipfile",
"poetry.lock"
]
found_files = []
for pattern in dependency_patterns:
found_files.extend(workspace.rglob(pattern))
# Deduplicate and return
unique_files = list(set(found_files))
logger.info(f"Discovered {len(unique_files)} dependency files")
return unique_files
async def _run_pip_audit(self, file_path: Path) -> Dict[str, Any]:
"""
Run pip-audit on a specific dependency file.
Args:
file_path: Path to dependency file
Returns:
pip-audit JSON output as dict
"""
try:
# Run pip-audit with JSON output
cmd = [
"pip-audit",
"--requirement", str(file_path),
"--format", "json",
"--progress-spinner", "off"
]
logger.info(f"Running pip-audit on: {file_path.name}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# pip-audit returns 0 if no vulns, 1 if vulns found
if process.returncode not in [0, 1]:
logger.error(f"pip-audit failed: {stderr.decode()}")
return {"dependencies": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running pip-audit on {file_path}: {e}")
return {"dependencies": []}
def _convert_to_findings(
self,
audit_result: Dict[str, Any],
file_path: Path,
workspace: Path,
ignore_vulns: List[str]
) -> List[ModuleFinding]:
"""
Convert pip-audit results to ModuleFindings.
Args:
audit_result: pip-audit JSON output
file_path: Path to scanned file
workspace: Workspace path for relative path calculation
ignore_vulns: List of vulnerability IDs to ignore
Returns:
List of ModuleFindings
"""
findings = []
# pip-audit format: {"dependencies": [{package, version, vulns: []}]}
for dep in audit_result.get("dependencies", []):
package_name = dep.get("name", "unknown")
package_version = dep.get("version", "unknown")
vulnerabilities = dep.get("vulns", [])
for vuln in vulnerabilities:
vuln_id = vuln.get("id", "UNKNOWN")
# Skip if in ignore list
if vuln_id in ignore_vulns:
logger.debug(f"Ignoring vulnerability: {vuln_id}")
continue
description = vuln.get("description", "No description available")
fix_versions = vuln.get("fix_versions", [])
# Map CVSS scores to severity
# pip-audit doesn't always provide CVSS, so we default to medium
severity = "medium"
# Try to get relative path
try:
rel_path = file_path.relative_to(workspace)
except ValueError:
rel_path = file_path
recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}"
finding = self.create_finding(
title=f"Vulnerable dependency: {package_name} ({vuln_id})",
description=f"{description}\n\nAffected package: {package_name} {package_version}",
severity=severity,
category="vulnerable-dependency",
file_path=str(rel_path),
recommendation=recommendation,
metadata={
"cve_id": vuln_id,
"package": package_name,
"installed_version": package_version,
"fix_versions": fix_versions,
"aliases": vuln.get("aliases", []),
"link": vuln.get("link", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the dependency scanning module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with vulnerability findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
specified_files = config.get("dependency_files", [])
ignore_vulns = config.get("ignore_vulns", [])
# Discover or use specified dependency files
if specified_files:
dep_files = [workspace / f for f in specified_files]
else:
dep_files = self._discover_dependency_files(workspace)
if not dep_files:
logger.warning("No dependency files found in workspace")
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=time.time() - start_time,
findings=[],
summary={
"total_files": 0,
"total_vulnerabilities": 0,
"vulnerable_packages": 0
}
)
# Scan each dependency file
all_findings = []
files_scanned = 0
for dep_file in dep_files:
if not dep_file.exists():
logger.warning(f"Dependency file not found: {dep_file}")
continue
logger.info(f"Scanning dependencies in: {dep_file.name}")
audit_result = await self._run_pip_audit(dep_file)
findings = self._convert_to_findings(audit_result, dep_file, workspace, ignore_vulns)
all_findings.extend(findings)
files_scanned += 1
# Calculate summary
unique_packages = len(set(f.metadata.get("package") for f in all_findings))
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=all_findings,
summary={
"total_files": files_scanned,
"total_vulnerabilities": len(all_findings),
"vulnerable_packages": unique_packages
},
metadata={
"scanned_files": [str(f.name) for f in dep_files if f.exists()]
}
)

View File

@@ -0,0 +1,10 @@
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.

View File

@@ -0,0 +1,191 @@
"""
Python SAST Workflow Activities
Activities specific to the Python SAST workflow:
- scan_dependencies_activity: Scan Python dependencies for CVEs using pip-audit
- analyze_with_bandit_activity: Analyze Python code for security issues using Bandit
- analyze_with_mypy_activity: Analyze Python code for type safety using Mypy
- generate_python_sast_sarif_activity: Generate SARIF report from all findings
"""
import logging
import sys
from pathlib import Path
from temporalio import activity
# Configure logging
logger = logging.getLogger(__name__)
# Add toolbox to path for module imports
sys.path.insert(0, '/app/toolbox')
@activity.defn(name="scan_dependencies")
async def scan_dependencies_activity(workspace_path: str, config: dict) -> dict:
"""
Scan Python dependencies for known vulnerabilities using pip-audit.
Args:
workspace_path: Path to the workspace directory
config: DependencyScanner configuration
Returns:
Scanner results dictionary
"""
logger.info(f"Activity: scan_dependencies (workspace={workspace_path})")
try:
from modules.scanner import DependencyScanner
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
scanner = DependencyScanner()
result = await scanner.execute(config, workspace)
logger.info(
f"✓ Dependency scanning completed: "
f"{result.summary.get('total_vulnerabilities', 0)} vulnerabilities found"
)
return result.dict()
except Exception as e:
logger.error(f"Dependency scanning failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_bandit")
async def analyze_with_bandit_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for security issues using Bandit.
Args:
workspace_path: Path to the workspace directory
config: BanditAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_bandit (workspace={workspace_path})")
try:
from modules.analyzer import BanditAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = BanditAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Bandit analysis completed: "
f"{result.summary.get('total_issues', 0)} security issues found"
)
return result.dict()
except Exception as e:
logger.error(f"Bandit analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="analyze_with_mypy")
async def analyze_with_mypy_activity(workspace_path: str, config: dict) -> dict:
"""
Analyze Python code for type safety issues using Mypy.
Args:
workspace_path: Path to the workspace directory
config: MypyAnalyzer configuration
Returns:
Analysis results dictionary
"""
logger.info(f"Activity: analyze_with_mypy (workspace={workspace_path})")
try:
from modules.analyzer import MypyAnalyzer
workspace = Path(workspace_path)
if not workspace.exists():
raise FileNotFoundError(f"Workspace not found: {workspace_path}")
analyzer = MypyAnalyzer()
result = await analyzer.execute(config, workspace)
logger.info(
f"✓ Mypy analysis completed: "
f"{result.summary.get('total_errors', 0)} type errors found"
)
return result.dict()
except Exception as e:
logger.error(f"Mypy analysis failed: {e}", exc_info=True)
raise
@activity.defn(name="generate_python_sast_sarif")
async def generate_python_sast_sarif_activity(
dependency_results: dict,
bandit_results: dict,
mypy_results: dict,
config: dict,
workspace_path: str
) -> dict:
"""
Generate SARIF report from all SAST analysis results.
Args:
dependency_results: Results from dependency scanner
bandit_results: Results from Bandit analyzer
mypy_results: Results from Mypy analyzer
config: Reporter configuration
workspace_path: Path to the workspace
Returns:
SARIF report dictionary
"""
logger.info("Activity: generate_python_sast_sarif")
try:
from modules.reporter import SARIFReporter
workspace = Path(workspace_path)
# Combine findings from all modules
all_findings = []
# Add dependency scanner findings
dependency_findings = dependency_results.get("findings", [])
all_findings.extend(dependency_findings)
# Add Bandit findings
bandit_findings = bandit_results.get("findings", [])
all_findings.extend(bandit_findings)
# Add Mypy findings
mypy_findings = mypy_results.get("findings", [])
all_findings.extend(mypy_findings)
# Prepare reporter config
reporter_config = {
**config,
"findings": all_findings,
"tool_name": "FuzzForge Python SAST",
"tool_version": "1.0.0"
}
reporter = SARIFReporter()
result = await reporter.execute(reporter_config, workspace)
# Extract SARIF from result
sarif = result.dict().get("sarif", {})
logger.info(f"✓ SARIF report generated with {len(all_findings)} findings")
return sarif
except Exception as e:
logger.error(f"SARIF report generation failed: {e}", exc_info=True)
raise

View File

@@ -0,0 +1,116 @@
name: python_sast
version: "1.0.0"
vertical: python
description: "Python Static Application Security Testing (SAST) workflow combining dependency scanning (pip-audit), security linting (Bandit), and type checking (Mypy)"
author: "FuzzForge Team"
tags:
- "python"
- "sast"
- "security"
- "type-checking"
- "dependencies"
- "bandit"
- "mypy"
- "pip-audit"
- "sarif"
# Workspace isolation mode (system-level configuration)
# Using "shared" mode for read-only SAST analysis (no file modifications)
workspace_isolation: "shared"
default_parameters:
dependency_config: {}
bandit_config: {}
mypy_config: {}
reporter_config: {}
parameters:
type: object
properties:
dependency_config:
type: object
description: "Dependency scanner (pip-audit) configuration"
properties:
dependency_files:
type: array
items:
type: string
description: "List of dependency files to scan (auto-discovered if empty)"
default: []
ignore_vulns:
type: array
items:
type: string
description: "List of vulnerability IDs to ignore"
default: []
bandit_config:
type: object
description: "Bandit security analyzer configuration"
properties:
severity_level:
type: string
enum: ["low", "medium", "high"]
description: "Minimum severity level to report"
default: "low"
confidence_level:
type: string
enum: ["low", "medium", "high"]
description: "Minimum confidence level to report"
default: "medium"
exclude_tests:
type: boolean
description: "Exclude test files from analysis"
default: true
skip_ids:
type: array
items:
type: string
description: "List of Bandit test IDs to skip"
default: []
mypy_config:
type: object
description: "Mypy type checker configuration"
properties:
strict_mode:
type: boolean
description: "Enable strict type checking"
default: false
ignore_missing_imports:
type: boolean
description: "Ignore errors about missing imports"
default: true
follow_imports:
type: string
enum: ["normal", "silent", "skip", "error"]
description: "How to handle imports"
default: "silent"
reporter_config:
type: object
description: "SARIF reporter configuration"
properties:
include_code_flows:
type: boolean
description: "Include code flow information"
default: false
output_schema:
type: object
properties:
sarif:
type: object
description: "SARIF-formatted SAST findings from all tools"
summary:
type: object
description: "SAST execution summary"
properties:
total_findings:
type: integer
vulnerabilities:
type: integer
description: "CVEs found in dependencies"
security_issues:
type: integer
description: "Security issues found by Bandit"
type_errors:
type: integer
description: "Type errors found by Mypy"

View File

@@ -0,0 +1,265 @@
"""
Python SAST Workflow - Temporal Version
Static Application Security Testing for Python projects using multiple tools.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from datetime import timedelta
from typing import Dict, Any, Optional
from temporalio import workflow
from temporalio.common import RetryPolicy
# Import activity interfaces (will be executed by worker)
with workflow.unsafe.imports_passed_through():
import logging
logger = logging.getLogger(__name__)
@workflow.defn
class PythonSastWorkflow:
"""
Python Static Application Security Testing workflow.
This workflow:
1. Downloads target from MinIO
2. Runs dependency scanning (pip-audit for CVEs)
3. Runs security linting (Bandit for security issues)
4. Runs type checking (Mypy for type safety)
5. Generates a SARIF report with all findings
6. Uploads results to MinIO
7. Cleans up cache
"""
@workflow.run
async def run(
self,
target_id: str,
dependency_config: Optional[Dict[str, Any]] = None,
bandit_config: Optional[Dict[str, Any]] = None,
mypy_config: Optional[Dict[str, Any]] = None,
reporter_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Main workflow execution.
Args:
target_id: UUID of the uploaded target in MinIO
dependency_config: Configuration for dependency scanner
bandit_config: Configuration for Bandit analyzer
mypy_config: Configuration for Mypy analyzer
reporter_config: Configuration for SARIF reporter
Returns:
Dictionary containing SARIF report and summary
"""
workflow_id = workflow.info().workflow_id
workflow.logger.info(
f"Starting PythonSASTWorkflow "
f"(workflow_id={workflow_id}, target_id={target_id})"
)
# Default configurations
if not dependency_config:
dependency_config = {
"dependency_files": [], # Auto-discover
"ignore_vulns": []
}
if not bandit_config:
bandit_config = {
"severity_level": "low",
"confidence_level": "medium",
"exclude_tests": True,
"skip_ids": []
}
if not mypy_config:
mypy_config = {
"strict_mode": False,
"ignore_missing_imports": True,
"follow_imports": "silent"
}
if not reporter_config:
reporter_config = {
"include_code_flows": False
}
results = {
"workflow_id": workflow_id,
"target_id": target_id,
"status": "running",
"steps": []
}
try:
# Get run ID for workspace isolation (using shared mode for read-only analysis)
run_id = workflow.info().run_id
# Step 1: Download target from MinIO
workflow.logger.info("Step 1: Downloading target from MinIO")
target_path = await workflow.execute_activity(
"get_target",
args=[target_id, run_id, "shared"], # target_id, run_id, workspace_isolation
start_to_close_timeout=timedelta(minutes=5),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=3
)
)
results["steps"].append({
"step": "download_target",
"status": "success",
"target_path": target_path
})
workflow.logger.info(f"✓ Target downloaded to: {target_path}")
# Step 2: Dependency scanning (pip-audit)
workflow.logger.info("Step 2: Scanning dependencies for vulnerabilities")
dependency_results = await workflow.execute_activity(
"scan_dependencies",
args=[target_path, dependency_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "dependency_scanning",
"status": "success",
"vulnerabilities": dependency_results.get("summary", {}).get("total_vulnerabilities", 0)
})
workflow.logger.info(
f"✓ Dependency scanning completed: "
f"{dependency_results.get('summary', {}).get('total_vulnerabilities', 0)} vulnerabilities"
)
# Step 3: Security linting (Bandit)
workflow.logger.info("Step 3: Analyzing security issues with Bandit")
bandit_results = await workflow.execute_activity(
"analyze_with_bandit",
args=[target_path, bandit_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "bandit_analysis",
"status": "success",
"issues": bandit_results.get("summary", {}).get("total_issues", 0)
})
workflow.logger.info(
f"✓ Bandit analysis completed: "
f"{bandit_results.get('summary', {}).get('total_issues', 0)} security issues"
)
# Step 4: Type checking (Mypy)
workflow.logger.info("Step 4: Type checking with Mypy")
mypy_results = await workflow.execute_activity(
"analyze_with_mypy",
args=[target_path, mypy_config],
start_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=2),
maximum_interval=timedelta(seconds=60),
maximum_attempts=2
)
)
results["steps"].append({
"step": "mypy_analysis",
"status": "success",
"type_errors": mypy_results.get("summary", {}).get("total_errors", 0)
})
workflow.logger.info(
f"✓ Mypy analysis completed: "
f"{mypy_results.get('summary', {}).get('total_errors', 0)} type errors"
)
# Step 5: Generate SARIF report
workflow.logger.info("Step 5: Generating SARIF report")
sarif_report = await workflow.execute_activity(
"generate_python_sast_sarif",
args=[dependency_results, bandit_results, mypy_results, reporter_config, target_path],
start_to_close_timeout=timedelta(minutes=5)
)
results["steps"].append({
"step": "report_generation",
"status": "success"
})
# Count total findings in SARIF
total_findings = 0
if sarif_report and "runs" in sarif_report:
total_findings = len(sarif_report["runs"][0].get("results", []))
workflow.logger.info(f"✓ SARIF report generated with {total_findings} findings")
# Step 6: Upload results to MinIO
workflow.logger.info("Step 6: Uploading results")
try:
results_url = await workflow.execute_activity(
"upload_results",
args=[workflow_id, sarif_report, "sarif"],
start_to_close_timeout=timedelta(minutes=2)
)
results["results_url"] = results_url
workflow.logger.info(f"✓ Results uploaded to: {results_url}")
except Exception as e:
workflow.logger.warning(f"Failed to upload results: {e}")
results["results_url"] = None
# Step 7: Cleanup cache
workflow.logger.info("Step 7: Cleaning up cache")
try:
await workflow.execute_activity(
"cleanup_cache",
args=[target_path, "shared"], # target_path, workspace_isolation
start_to_close_timeout=timedelta(minutes=1)
)
workflow.logger.info("✓ Cache cleaned up (skipped for shared mode)")
except Exception as e:
workflow.logger.warning(f"Cache cleanup failed: {e}")
# Mark workflow as successful
results["status"] = "success"
results["sarif"] = sarif_report
results["summary"] = {
"total_findings": total_findings,
"vulnerabilities": dependency_results.get("summary", {}).get("total_vulnerabilities", 0),
"security_issues": bandit_results.get("summary", {}).get("total_issues", 0),
"type_errors": mypy_results.get("summary", {}).get("total_errors", 0)
}
workflow.logger.info(f"✓ Workflow completed successfully: {workflow_id}")
return results
except Exception as e:
workflow.logger.error(f"Workflow failed: {e}")
results["status"] = "error"
results["error"] = str(e)
results["steps"].append({
"step": "error",
"status": "failed",
"error": str(e)
})
raise

View File

@@ -0,0 +1,62 @@
#!/usr/bin/env python3
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
"""
Test file with type errors for Mypy testing.
"""
from typing import List, Dict
def add_numbers(a: int, b: int) -> int:
"""Add two integers"""
# Type error: returning string instead of int
return str(a + b)
def process_items(items: List[str]) -> None:
"""Process a list of strings"""
# Type error: iterating over None
for item in items:
print(item.upper())
# Type error: passing int to function expecting string list
process_items(123)
def get_user_data() -> Dict[str, str]:
"""Get user data"""
# Type error: returning wrong type
return ["user1", "user2"]
def calculate_total(numbers: List[int]) -> float:
"""Calculate total"""
# Type error: calling method that doesn't exist
return numbers.sum()
class User:
def __init__(self, name: str, age: int):
self.name = name
self.age = age
def create_user(name: str, age: int) -> User:
"""Create a user"""
# Type error: returning dict instead of User
return {"name": name, "age": age}
# Missing type annotations
def unsafe_function(x, y):
return x + y

View File

@@ -16,3 +16,8 @@ a2a-sdk[all]>=0.1.0
# Fuzzing
atheris>=2.3.0
# SAST Tools
bandit>=1.7.0
pip-audit>=2.6.0
mypy>=1.8.0