Files
fuzzforge_ai/backend/toolbox/modules/scanner/dependency_scanner.py
2025-10-22 15:36:35 +02:00

303 lines
10 KiB
Python

"""
Dependency Scanner Module - Scans Python dependencies for known vulnerabilities using pip-audit
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
import time
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
logger = logging.getLogger(__name__)
class DependencyScanner(BaseModule):
"""
Scans Python dependencies for known vulnerabilities using pip-audit.
This module:
- Discovers dependency files (requirements.txt, pyproject.toml, setup.py, Pipfile)
- Runs pip-audit to check for vulnerable dependencies
- Reports CVEs with severity and affected versions
"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="dependency_scanner",
version="1.0.0",
description="Scans Python dependencies for known vulnerabilities",
author="FuzzForge Team",
category="scanner",
tags=["dependencies", "cve", "vulnerabilities", "pip-audit"],
input_schema={
"dependency_files": {
"type": "array",
"items": {"type": "string"},
"description": "List of dependency files to scan (auto-discovered if empty)",
"default": []
},
"ignore_vulns": {
"type": "array",
"items": {"type": "string"},
"description": "List of vulnerability IDs to ignore",
"default": []
}
},
output_schema={
"findings": {
"type": "array",
"description": "List of vulnerable dependencies with CVE information"
}
},
requires_workspace=True
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
dep_files = config.get("dependency_files", [])
if not isinstance(dep_files, list):
raise ValueError("dependency_files must be a list")
ignore_vulns = config.get("ignore_vulns", [])
if not isinstance(ignore_vulns, list):
raise ValueError("ignore_vulns must be a list")
return True
def _discover_dependency_files(self, workspace: Path) -> List[Path]:
"""
Discover Python dependency files in workspace.
Returns:
List of discovered dependency file paths
"""
dependency_patterns = [
"requirements.txt",
"*requirements*.txt",
"pyproject.toml",
"setup.py",
"Pipfile",
"poetry.lock"
]
found_files = []
for pattern in dependency_patterns:
found_files.extend(workspace.rglob(pattern))
# Deduplicate and return
unique_files = list(set(found_files))
logger.info(f"Discovered {len(unique_files)} dependency files")
return unique_files
async def _run_pip_audit(self, file_path: Path) -> Dict[str, Any]:
"""
Run pip-audit on a specific dependency file.
Args:
file_path: Path to dependency file
Returns:
pip-audit JSON output as dict
"""
try:
# Run pip-audit with JSON output
cmd = [
"pip-audit",
"--requirement", str(file_path),
"--format", "json",
"--progress-spinner", "off"
]
logger.info(f"Running pip-audit on: {file_path.name}")
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
# pip-audit returns 0 if no vulns, 1 if vulns found
if process.returncode not in [0, 1]:
logger.error(f"pip-audit failed: {stderr.decode()}")
return {"dependencies": []}
# Parse JSON output
result = json.loads(stdout.decode())
return result
except Exception as e:
logger.error(f"Error running pip-audit on {file_path}: {e}")
return {"dependencies": []}
def _convert_to_findings(
self,
audit_result: Dict[str, Any],
file_path: Path,
workspace: Path,
ignore_vulns: List[str]
) -> List[ModuleFinding]:
"""
Convert pip-audit results to ModuleFindings.
Args:
audit_result: pip-audit JSON output
file_path: Path to scanned file
workspace: Workspace path for relative path calculation
ignore_vulns: List of vulnerability IDs to ignore
Returns:
List of ModuleFindings
"""
findings = []
# pip-audit format: {"dependencies": [{package, version, vulns: []}]}
for dep in audit_result.get("dependencies", []):
package_name = dep.get("name", "unknown")
package_version = dep.get("version", "unknown")
vulnerabilities = dep.get("vulns", [])
for vuln in vulnerabilities:
vuln_id = vuln.get("id", "UNKNOWN")
# Skip if in ignore list
if vuln_id in ignore_vulns:
logger.debug(f"Ignoring vulnerability: {vuln_id}")
continue
description = vuln.get("description", "No description available")
fix_versions = vuln.get("fix_versions", [])
# Map CVSS scores to severity
# pip-audit doesn't always provide CVSS, so we default to medium
severity = "medium"
# Try to get relative path
try:
rel_path = file_path.relative_to(workspace)
except ValueError:
rel_path = file_path
recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}"
finding = self.create_finding(
title=f"Vulnerable dependency: {package_name} ({vuln_id})",
description=f"{description}\n\nAffected package: {package_name} {package_version}",
severity=severity,
category="vulnerable-dependency",
file_path=str(rel_path),
recommendation=recommendation,
metadata={
"cve_id": vuln_id,
"package": package_name,
"installed_version": package_version,
"fix_versions": fix_versions,
"aliases": vuln.get("aliases", []),
"link": vuln.get("link", "")
}
)
findings.append(finding)
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute the dependency scanning module.
Args:
config: Module configuration
workspace: Path to workspace
Returns:
ModuleResult with vulnerability findings
"""
start_time = time.time()
metadata = self.get_metadata()
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
specified_files = config.get("dependency_files", [])
ignore_vulns = config.get("ignore_vulns", [])
# Discover or use specified dependency files
if specified_files:
dep_files = [workspace / f for f in specified_files]
else:
dep_files = self._discover_dependency_files(workspace)
if not dep_files:
logger.warning("No dependency files found in workspace")
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=time.time() - start_time,
findings=[],
summary={
"total_files": 0,
"total_vulnerabilities": 0,
"vulnerable_packages": 0
}
)
# Scan each dependency file
all_findings = []
files_scanned = 0
for dep_file in dep_files:
if not dep_file.exists():
logger.warning(f"Dependency file not found: {dep_file}")
continue
logger.info(f"Scanning dependencies in: {dep_file.name}")
audit_result = await self._run_pip_audit(dep_file)
findings = self._convert_to_findings(audit_result, dep_file, workspace, ignore_vulns)
all_findings.extend(findings)
files_scanned += 1
# Calculate summary
unique_packages = len(set(f.metadata.get("package") for f in all_findings))
execution_time = time.time() - start_time
return ModuleResult(
module=metadata.name,
version=metadata.version,
status="success",
execution_time=execution_time,
findings=all_findings,
summary={
"total_files": files_scanned,
"total_vulnerabilities": len(all_findings),
"vulnerable_packages": unique_packages
},
metadata={
"scanned_files": [str(f.name) for f in dep_files if f.exists()]
}
)