mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 17:12:49 +00:00
303 lines
10 KiB
Python
303 lines
10 KiB
Python
"""
|
|
Dependency Scanner Module - Scans Python dependencies for known vulnerabilities using pip-audit
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class DependencyScanner(BaseModule):
|
|
"""
|
|
Scans Python dependencies for known vulnerabilities using pip-audit.
|
|
|
|
This module:
|
|
- Discovers dependency files (requirements.txt, pyproject.toml, setup.py, Pipfile)
|
|
- Runs pip-audit to check for vulnerable dependencies
|
|
- Reports CVEs with severity and affected versions
|
|
"""
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="dependency_scanner",
|
|
version="1.0.0",
|
|
description="Scans Python dependencies for known vulnerabilities",
|
|
author="FuzzForge Team",
|
|
category="scanner",
|
|
tags=["dependencies", "cve", "vulnerabilities", "pip-audit"],
|
|
input_schema={
|
|
"dependency_files": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "List of dependency files to scan (auto-discovered if empty)",
|
|
"default": []
|
|
},
|
|
"ignore_vulns": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "List of vulnerability IDs to ignore",
|
|
"default": []
|
|
}
|
|
},
|
|
output_schema={
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "List of vulnerable dependencies with CVE information"
|
|
}
|
|
},
|
|
requires_workspace=True
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate module configuration"""
|
|
dep_files = config.get("dependency_files", [])
|
|
if not isinstance(dep_files, list):
|
|
raise ValueError("dependency_files must be a list")
|
|
|
|
ignore_vulns = config.get("ignore_vulns", [])
|
|
if not isinstance(ignore_vulns, list):
|
|
raise ValueError("ignore_vulns must be a list")
|
|
|
|
return True
|
|
|
|
def _discover_dependency_files(self, workspace: Path) -> List[Path]:
|
|
"""
|
|
Discover Python dependency files in workspace.
|
|
|
|
Returns:
|
|
List of discovered dependency file paths
|
|
"""
|
|
dependency_patterns = [
|
|
"requirements.txt",
|
|
"*requirements*.txt",
|
|
"pyproject.toml",
|
|
"setup.py",
|
|
"Pipfile",
|
|
"poetry.lock"
|
|
]
|
|
|
|
found_files = []
|
|
for pattern in dependency_patterns:
|
|
found_files.extend(workspace.rglob(pattern))
|
|
|
|
# Deduplicate and return
|
|
unique_files = list(set(found_files))
|
|
logger.info(f"Discovered {len(unique_files)} dependency files")
|
|
return unique_files
|
|
|
|
async def _run_pip_audit(self, file_path: Path) -> Dict[str, Any]:
|
|
"""
|
|
Run pip-audit on a specific dependency file.
|
|
|
|
Args:
|
|
file_path: Path to dependency file
|
|
|
|
Returns:
|
|
pip-audit JSON output as dict
|
|
"""
|
|
try:
|
|
# Run pip-audit with JSON output
|
|
cmd = [
|
|
"pip-audit",
|
|
"--requirement", str(file_path),
|
|
"--format", "json",
|
|
"--progress-spinner", "off"
|
|
]
|
|
|
|
logger.info(f"Running pip-audit on: {file_path.name}")
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
# pip-audit returns 0 if no vulns, 1 if vulns found
|
|
if process.returncode not in [0, 1]:
|
|
logger.error(f"pip-audit failed: {stderr.decode()}")
|
|
return {"dependencies": []}
|
|
|
|
# Parse JSON output
|
|
result = json.loads(stdout.decode())
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error running pip-audit on {file_path}: {e}")
|
|
return {"dependencies": []}
|
|
|
|
def _convert_to_findings(
|
|
self,
|
|
audit_result: Dict[str, Any],
|
|
file_path: Path,
|
|
workspace: Path,
|
|
ignore_vulns: List[str]
|
|
) -> List[ModuleFinding]:
|
|
"""
|
|
Convert pip-audit results to ModuleFindings.
|
|
|
|
Args:
|
|
audit_result: pip-audit JSON output
|
|
file_path: Path to scanned file
|
|
workspace: Workspace path for relative path calculation
|
|
ignore_vulns: List of vulnerability IDs to ignore
|
|
|
|
Returns:
|
|
List of ModuleFindings
|
|
"""
|
|
findings = []
|
|
|
|
# pip-audit format: {"dependencies": [{package, version, vulns: []}]}
|
|
for dep in audit_result.get("dependencies", []):
|
|
package_name = dep.get("name", "unknown")
|
|
package_version = dep.get("version", "unknown")
|
|
vulnerabilities = dep.get("vulns", [])
|
|
|
|
for vuln in vulnerabilities:
|
|
vuln_id = vuln.get("id", "UNKNOWN")
|
|
|
|
# Skip if in ignore list
|
|
if vuln_id in ignore_vulns:
|
|
logger.debug(f"Ignoring vulnerability: {vuln_id}")
|
|
continue
|
|
|
|
description = vuln.get("description", "No description available")
|
|
fix_versions = vuln.get("fix_versions", [])
|
|
|
|
# Map CVSS scores to severity
|
|
# pip-audit doesn't always provide CVSS, so we default to medium
|
|
severity = "medium"
|
|
|
|
# Try to get relative path
|
|
try:
|
|
rel_path = file_path.relative_to(workspace)
|
|
except ValueError:
|
|
rel_path = file_path
|
|
|
|
recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}"
|
|
|
|
finding = self.create_finding(
|
|
title=f"Vulnerable dependency: {package_name} ({vuln_id})",
|
|
description=f"{description}\n\nAffected package: {package_name} {package_version}",
|
|
severity=severity,
|
|
category="vulnerable-dependency",
|
|
file_path=str(rel_path),
|
|
recommendation=recommendation,
|
|
metadata={
|
|
"cve_id": vuln_id,
|
|
"package": package_name,
|
|
"installed_version": package_version,
|
|
"fix_versions": fix_versions,
|
|
"aliases": vuln.get("aliases", []),
|
|
"link": vuln.get("link", "")
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
return findings
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute the dependency scanning module.
|
|
|
|
Args:
|
|
config: Module configuration
|
|
workspace: Path to workspace
|
|
|
|
Returns:
|
|
ModuleResult with vulnerability findings
|
|
"""
|
|
start_time = time.time()
|
|
metadata = self.get_metadata()
|
|
|
|
# Validate inputs
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
# Get configuration
|
|
specified_files = config.get("dependency_files", [])
|
|
ignore_vulns = config.get("ignore_vulns", [])
|
|
|
|
# Discover or use specified dependency files
|
|
if specified_files:
|
|
dep_files = [workspace / f for f in specified_files]
|
|
else:
|
|
dep_files = self._discover_dependency_files(workspace)
|
|
|
|
if not dep_files:
|
|
logger.warning("No dependency files found in workspace")
|
|
return ModuleResult(
|
|
module=metadata.name,
|
|
version=metadata.version,
|
|
status="success",
|
|
execution_time=time.time() - start_time,
|
|
findings=[],
|
|
summary={
|
|
"total_files": 0,
|
|
"total_vulnerabilities": 0,
|
|
"vulnerable_packages": 0
|
|
}
|
|
)
|
|
|
|
# Scan each dependency file
|
|
all_findings = []
|
|
files_scanned = 0
|
|
|
|
for dep_file in dep_files:
|
|
if not dep_file.exists():
|
|
logger.warning(f"Dependency file not found: {dep_file}")
|
|
continue
|
|
|
|
logger.info(f"Scanning dependencies in: {dep_file.name}")
|
|
audit_result = await self._run_pip_audit(dep_file)
|
|
findings = self._convert_to_findings(audit_result, dep_file, workspace, ignore_vulns)
|
|
|
|
all_findings.extend(findings)
|
|
files_scanned += 1
|
|
|
|
# Calculate summary
|
|
unique_packages = len(set(f.metadata.get("package") for f in all_findings))
|
|
|
|
execution_time = time.time() - start_time
|
|
|
|
return ModuleResult(
|
|
module=metadata.name,
|
|
version=metadata.version,
|
|
status="success",
|
|
execution_time=execution_time,
|
|
findings=all_findings,
|
|
summary={
|
|
"total_files": files_scanned,
|
|
"total_vulnerabilities": len(all_findings),
|
|
"vulnerable_packages": unique_packages
|
|
},
|
|
metadata={
|
|
"scanned_files": [str(f.name) for f in dep_files if f.exists()]
|
|
}
|
|
)
|