Files
tduhamel42 2da986ebb0 feat: Add secret detection workflows and comprehensive benchmarking (#15)
Add three production-ready secret detection workflows with full benchmarking infrastructure:

**New Workflows:**
- gitleaks_detection: Pattern-based secret scanning (13/32 benchmark secrets)
- trufflehog_detection: Entropy-based detection with verification (1/32 benchmark secrets)
- llm_secret_detection: AI-powered semantic analysis (32/32 benchmark secrets - 100% recall)

**Benchmarking Infrastructure:**
- Ground truth dataset with 32 documented secrets (12 Easy, 10 Medium, 10 Hard)
- Automated comparison tools for precision/recall testing
- SARIF output format for all workflows
- Performance metrics and tool comparison reports

**Fixes:**
- Set gitleaks default to no_git=True for uploaded directories
- Update documentation with correct secret counts and workflow names
- Temporarily deactivate AI agent command
- Clean up deprecated test files and GitGuardian workflow

**Testing:**
All workflows verified on secret_detection_benchmark and vulnerable_app test projects.
Workers healthy and system fully functional.
2025-10-16 11:21:24 +02:00

353 lines
13 KiB
Python

"""
Gitleaks Secret Detection Module
This module uses Gitleaks to detect secrets and sensitive information in Git repositories
and file systems.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
from pathlib import Path
from typing import Dict, Any, List
import subprocess
import logging
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class GitleaksModule(BaseModule):
"""Gitleaks secret detection module"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="gitleaks",
version="8.18.0",
description="Git-specific secret scanning and leak detection using Gitleaks",
author="FuzzForge Team",
category="secret_detection",
tags=["secrets", "git", "leak-detection", "credentials"],
input_schema={
"type": "object",
"properties": {
"scan_mode": {
"type": "string",
"enum": ["detect", "protect"],
"default": "detect",
"description": "Scan mode: detect (entire repo history) or protect (staged changes)"
},
"config_file": {
"type": "string",
"description": "Path to custom Gitleaks configuration file"
},
"baseline_file": {
"type": "string",
"description": "Path to baseline file to ignore known findings"
},
"max_target_megabytes": {
"type": "integer",
"default": 100,
"description": "Maximum size of files to scan (in MB)"
},
"redact": {
"type": "boolean",
"default": True,
"description": "Redact secrets in output"
},
"no_git": {
"type": "boolean",
"default": False,
"description": "Scan files without Git context"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"rule_id": {"type": "string"},
"category": {"type": "string"},
"file_path": {"type": "string"},
"line_number": {"type": "integer"},
"secret": {"type": "string"}
}
}
}
}
}
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
scan_mode = config.get("scan_mode", "detect")
if scan_mode not in ["detect", "protect"]:
raise ValueError("scan_mode must be 'detect' or 'protect'")
max_size = config.get("max_target_megabytes", 100)
if not isinstance(max_size, int) or max_size < 1 or max_size > 1000:
raise ValueError("max_target_megabytes must be between 1 and 1000")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute Gitleaks secret detection"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running Gitleaks on {workspace}")
# Build Gitleaks command
scan_mode = config.get("scan_mode", "detect")
cmd = ["gitleaks", scan_mode]
# Add source path
cmd.extend(["--source", str(workspace)])
# Create temp file for JSON output
import tempfile
output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.json', delete=False)
output_path = output_file.name
output_file.close()
# Add report format and output file
cmd.extend(["--report-format", "json"])
cmd.extend(["--report-path", output_path])
# Add redact option
if config.get("redact", True):
cmd.append("--redact")
# Add max target size
max_size = config.get("max_target_megabytes", 100)
cmd.extend(["--max-target-megabytes", str(max_size)])
# Add config file if specified
if config.get("config_file"):
config_path = Path(config["config_file"])
if config_path.exists():
cmd.extend(["--config", str(config_path)])
# Add baseline file if specified
if config.get("baseline_file"):
baseline_path = Path(config["baseline_file"])
if baseline_path.exists():
cmd.extend(["--baseline-path", str(baseline_path)])
# Add no-git flag if specified
if config.get("no_git", False):
cmd.append("--no-git")
# Add verbose output
cmd.append("--verbose")
logger.debug(f"Running command: {' '.join(cmd)}")
# Run Gitleaks
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
try:
# Read the JSON output from file
with open(output_path, 'r') as f:
output_content = f.read()
if process.returncode == 0:
# No secrets found
logger.info("No secrets detected by Gitleaks")
elif process.returncode == 1:
# Secrets found - parse from file content
findings = self._parse_gitleaks_output(output_content, workspace)
else:
# Error occurred
error_msg = stderr.decode()
logger.error(f"Gitleaks failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"Gitleaks execution failed: {error_msg}"
)
finally:
# Clean up temp file
import os
try:
os.unlink(output_path)
except:
pass
# Create summary
summary = {
"total_leaks": len(findings),
"unique_rules": len(set(f.metadata.get("rule_id", "") for f in findings)),
"files_with_leaks": len(set(f.file_path for f in findings if f.file_path)),
"scan_mode": scan_mode
}
logger.info(f"Gitleaks found {len(findings)} potential leaks")
return self.create_result(
findings=findings,
status="success",
summary=summary
)
except Exception as e:
logger.error(f"Gitleaks module failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_gitleaks_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
"""Parse Gitleaks JSON output into findings"""
findings = []
if not output.strip():
return findings
try:
# Gitleaks outputs JSON array
results = json.loads(output)
if not isinstance(results, list):
logger.warning("Unexpected Gitleaks output format")
return findings
for result in results:
# Extract information
rule_id = result.get("RuleID", "unknown")
description = result.get("Description", "")
file_path = result.get("File", "")
line_number = result.get("StartLine", 0) # Gitleaks outputs "StartLine", not "LineNumber"
line_end = result.get("EndLine", 0)
secret = result.get("Secret", "")
match_text = result.get("Match", "")
# Commit info (if available)
commit = result.get("Commit", "")
author = result.get("Author", "")
email = result.get("Email", "")
date = result.get("Date", "")
# Make file path relative to workspace
if file_path:
try:
rel_path = Path(file_path).relative_to(workspace)
file_path = str(rel_path)
except ValueError:
# If file is outside workspace, keep absolute path
pass
# Determine severity based on rule type
severity = self._get_leak_severity(rule_id, description)
# Create finding
finding = self.create_finding(
title=f"Secret leak detected: {rule_id}",
description=self._get_leak_description(rule_id, description, commit),
severity=severity,
category="secret_leak",
file_path=file_path if file_path else None,
line_start=line_number if line_number > 0 else None,
line_end=line_end if line_end > 0 else None,
code_snippet=match_text if match_text else secret,
recommendation=self._get_leak_recommendation(rule_id),
metadata={
"rule_id": rule_id,
"secret_type": description,
"commit": commit,
"author": author,
"email": email,
"date": date,
"entropy": result.get("Entropy", 0),
"fingerprint": result.get("Fingerprint", "")
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse Gitleaks output: {e}")
except Exception as e:
logger.warning(f"Error processing Gitleaks results: {e}")
return findings
def _get_leak_severity(self, rule_id: str, description: str) -> str:
"""Determine severity based on secret type"""
critical_patterns = [
"aws", "amazon", "gcp", "google", "azure", "microsoft",
"private_key", "rsa", "ssh", "certificate", "database",
"password", "auth", "token", "secret", "key"
]
rule_lower = rule_id.lower()
desc_lower = description.lower()
# Check for critical patterns
for pattern in critical_patterns:
if pattern in rule_lower or pattern in desc_lower:
if any(x in rule_lower for x in ["aws", "gcp", "azure"]):
return "critical"
elif any(x in rule_lower for x in ["private", "key", "password"]):
return "high"
else:
return "medium"
return "low"
def _get_leak_description(self, rule_id: str, description: str, commit: str) -> str:
"""Get description for the leak finding"""
base_desc = f"Gitleaks detected a potential secret leak matching rule '{rule_id}'"
if description:
base_desc += f" ({description})"
if commit:
base_desc += f" in commit {commit[:8]}"
base_desc += ". This may indicate sensitive information has been committed to version control."
return base_desc
def _get_leak_recommendation(self, rule_id: str) -> str:
"""Get remediation recommendation"""
base_rec = "Remove the secret from the codebase and Git history. "
if any(pattern in rule_id.lower() for pattern in ["aws", "gcp", "azure"]):
base_rec += "Revoke the cloud credentials immediately and rotate them. "
base_rec += "Consider using Git history rewriting tools (git-filter-branch, BFG) " \
"to remove sensitive data from commit history. Implement pre-commit hooks " \
"to prevent future secret commits."
return base_rec