mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-13 23:12:48 +00:00
Add three production-ready secret detection workflows with full benchmarking infrastructure: **New Workflows:** - gitleaks_detection: Pattern-based secret scanning (13/32 benchmark secrets) - trufflehog_detection: Entropy-based detection with verification (1/32 benchmark secrets) - llm_secret_detection: AI-powered semantic analysis (32/32 benchmark secrets - 100% recall) **Benchmarking Infrastructure:** - Ground truth dataset with 32 documented secrets (12 Easy, 10 Medium, 10 Hard) - Automated comparison tools for precision/recall testing - SARIF output format for all workflows - Performance metrics and tool comparison reports **Fixes:** - Set gitleaks default to no_git=True for uploaded directories - Update documentation with correct secret counts and workflow names - Temporarily deactivate AI agent command - Clean up deprecated test files and GitGuardian workflow **Testing:** All workflows verified on secret_detection_benchmark and vulnerable_app test projects. Workers healthy and system fully functional.
353 lines
13 KiB
Python
353 lines
13 KiB
Python
"""
|
|
Gitleaks Secret Detection Module
|
|
|
|
This module uses Gitleaks to detect secrets and sensitive information in Git repositories
|
|
and file systems.
|
|
"""
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
|
|
import asyncio
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
import subprocess
|
|
import logging
|
|
|
|
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
from . import register_module
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@register_module
|
|
class GitleaksModule(BaseModule):
|
|
"""Gitleaks secret detection module"""
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="gitleaks",
|
|
version="8.18.0",
|
|
description="Git-specific secret scanning and leak detection using Gitleaks",
|
|
author="FuzzForge Team",
|
|
category="secret_detection",
|
|
tags=["secrets", "git", "leak-detection", "credentials"],
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"scan_mode": {
|
|
"type": "string",
|
|
"enum": ["detect", "protect"],
|
|
"default": "detect",
|
|
"description": "Scan mode: detect (entire repo history) or protect (staged changes)"
|
|
},
|
|
"config_file": {
|
|
"type": "string",
|
|
"description": "Path to custom Gitleaks configuration file"
|
|
},
|
|
"baseline_file": {
|
|
"type": "string",
|
|
"description": "Path to baseline file to ignore known findings"
|
|
},
|
|
"max_target_megabytes": {
|
|
"type": "integer",
|
|
"default": 100,
|
|
"description": "Maximum size of files to scan (in MB)"
|
|
},
|
|
"redact": {
|
|
"type": "boolean",
|
|
"default": True,
|
|
"description": "Redact secrets in output"
|
|
},
|
|
"no_git": {
|
|
"type": "boolean",
|
|
"default": False,
|
|
"description": "Scan files without Git context"
|
|
}
|
|
}
|
|
},
|
|
output_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"findings": {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "object",
|
|
"properties": {
|
|
"rule_id": {"type": "string"},
|
|
"category": {"type": "string"},
|
|
"file_path": {"type": "string"},
|
|
"line_number": {"type": "integer"},
|
|
"secret": {"type": "string"}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate configuration"""
|
|
scan_mode = config.get("scan_mode", "detect")
|
|
if scan_mode not in ["detect", "protect"]:
|
|
raise ValueError("scan_mode must be 'detect' or 'protect'")
|
|
|
|
max_size = config.get("max_target_megabytes", 100)
|
|
if not isinstance(max_size, int) or max_size < 1 or max_size > 1000:
|
|
raise ValueError("max_target_megabytes must be between 1 and 1000")
|
|
|
|
return True
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""Execute Gitleaks secret detection"""
|
|
self.start_timer()
|
|
|
|
try:
|
|
# Validate inputs
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
logger.info(f"Running Gitleaks on {workspace}")
|
|
|
|
# Build Gitleaks command
|
|
scan_mode = config.get("scan_mode", "detect")
|
|
cmd = ["gitleaks", scan_mode]
|
|
|
|
# Add source path
|
|
cmd.extend(["--source", str(workspace)])
|
|
|
|
# Create temp file for JSON output
|
|
import tempfile
|
|
output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.json', delete=False)
|
|
output_path = output_file.name
|
|
output_file.close()
|
|
|
|
# Add report format and output file
|
|
cmd.extend(["--report-format", "json"])
|
|
cmd.extend(["--report-path", output_path])
|
|
|
|
# Add redact option
|
|
if config.get("redact", True):
|
|
cmd.append("--redact")
|
|
|
|
# Add max target size
|
|
max_size = config.get("max_target_megabytes", 100)
|
|
cmd.extend(["--max-target-megabytes", str(max_size)])
|
|
|
|
# Add config file if specified
|
|
if config.get("config_file"):
|
|
config_path = Path(config["config_file"])
|
|
if config_path.exists():
|
|
cmd.extend(["--config", str(config_path)])
|
|
|
|
# Add baseline file if specified
|
|
if config.get("baseline_file"):
|
|
baseline_path = Path(config["baseline_file"])
|
|
if baseline_path.exists():
|
|
cmd.extend(["--baseline-path", str(baseline_path)])
|
|
|
|
# Add no-git flag if specified
|
|
if config.get("no_git", False):
|
|
cmd.append("--no-git")
|
|
|
|
# Add verbose output
|
|
cmd.append("--verbose")
|
|
|
|
logger.debug(f"Running command: {' '.join(cmd)}")
|
|
|
|
# Run Gitleaks
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=workspace
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
# Parse results
|
|
findings = []
|
|
try:
|
|
# Read the JSON output from file
|
|
with open(output_path, 'r') as f:
|
|
output_content = f.read()
|
|
|
|
if process.returncode == 0:
|
|
# No secrets found
|
|
logger.info("No secrets detected by Gitleaks")
|
|
elif process.returncode == 1:
|
|
# Secrets found - parse from file content
|
|
findings = self._parse_gitleaks_output(output_content, workspace)
|
|
else:
|
|
# Error occurred
|
|
error_msg = stderr.decode()
|
|
logger.error(f"Gitleaks failed: {error_msg}")
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=f"Gitleaks execution failed: {error_msg}"
|
|
)
|
|
finally:
|
|
# Clean up temp file
|
|
import os
|
|
try:
|
|
os.unlink(output_path)
|
|
except:
|
|
pass
|
|
|
|
# Create summary
|
|
summary = {
|
|
"total_leaks": len(findings),
|
|
"unique_rules": len(set(f.metadata.get("rule_id", "") for f in findings)),
|
|
"files_with_leaks": len(set(f.file_path for f in findings if f.file_path)),
|
|
"scan_mode": scan_mode
|
|
}
|
|
|
|
logger.info(f"Gitleaks found {len(findings)} potential leaks")
|
|
|
|
return self.create_result(
|
|
findings=findings,
|
|
status="success",
|
|
summary=summary
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Gitleaks module failed: {e}")
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=str(e)
|
|
)
|
|
|
|
def _parse_gitleaks_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
|
|
"""Parse Gitleaks JSON output into findings"""
|
|
findings = []
|
|
|
|
if not output.strip():
|
|
return findings
|
|
|
|
try:
|
|
# Gitleaks outputs JSON array
|
|
results = json.loads(output)
|
|
if not isinstance(results, list):
|
|
logger.warning("Unexpected Gitleaks output format")
|
|
return findings
|
|
|
|
for result in results:
|
|
# Extract information
|
|
rule_id = result.get("RuleID", "unknown")
|
|
description = result.get("Description", "")
|
|
file_path = result.get("File", "")
|
|
line_number = result.get("StartLine", 0) # Gitleaks outputs "StartLine", not "LineNumber"
|
|
line_end = result.get("EndLine", 0)
|
|
secret = result.get("Secret", "")
|
|
match_text = result.get("Match", "")
|
|
|
|
# Commit info (if available)
|
|
commit = result.get("Commit", "")
|
|
author = result.get("Author", "")
|
|
email = result.get("Email", "")
|
|
date = result.get("Date", "")
|
|
|
|
# Make file path relative to workspace
|
|
if file_path:
|
|
try:
|
|
rel_path = Path(file_path).relative_to(workspace)
|
|
file_path = str(rel_path)
|
|
except ValueError:
|
|
# If file is outside workspace, keep absolute path
|
|
pass
|
|
|
|
# Determine severity based on rule type
|
|
severity = self._get_leak_severity(rule_id, description)
|
|
|
|
# Create finding
|
|
finding = self.create_finding(
|
|
title=f"Secret leak detected: {rule_id}",
|
|
description=self._get_leak_description(rule_id, description, commit),
|
|
severity=severity,
|
|
category="secret_leak",
|
|
file_path=file_path if file_path else None,
|
|
line_start=line_number if line_number > 0 else None,
|
|
line_end=line_end if line_end > 0 else None,
|
|
code_snippet=match_text if match_text else secret,
|
|
recommendation=self._get_leak_recommendation(rule_id),
|
|
metadata={
|
|
"rule_id": rule_id,
|
|
"secret_type": description,
|
|
"commit": commit,
|
|
"author": author,
|
|
"email": email,
|
|
"date": date,
|
|
"entropy": result.get("Entropy", 0),
|
|
"fingerprint": result.get("Fingerprint", "")
|
|
}
|
|
)
|
|
|
|
findings.append(finding)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse Gitleaks output: {e}")
|
|
except Exception as e:
|
|
logger.warning(f"Error processing Gitleaks results: {e}")
|
|
|
|
return findings
|
|
|
|
def _get_leak_severity(self, rule_id: str, description: str) -> str:
|
|
"""Determine severity based on secret type"""
|
|
critical_patterns = [
|
|
"aws", "amazon", "gcp", "google", "azure", "microsoft",
|
|
"private_key", "rsa", "ssh", "certificate", "database",
|
|
"password", "auth", "token", "secret", "key"
|
|
]
|
|
|
|
rule_lower = rule_id.lower()
|
|
desc_lower = description.lower()
|
|
|
|
# Check for critical patterns
|
|
for pattern in critical_patterns:
|
|
if pattern in rule_lower or pattern in desc_lower:
|
|
if any(x in rule_lower for x in ["aws", "gcp", "azure"]):
|
|
return "critical"
|
|
elif any(x in rule_lower for x in ["private", "key", "password"]):
|
|
return "high"
|
|
else:
|
|
return "medium"
|
|
|
|
return "low"
|
|
|
|
def _get_leak_description(self, rule_id: str, description: str, commit: str) -> str:
|
|
"""Get description for the leak finding"""
|
|
base_desc = f"Gitleaks detected a potential secret leak matching rule '{rule_id}'"
|
|
if description:
|
|
base_desc += f" ({description})"
|
|
|
|
if commit:
|
|
base_desc += f" in commit {commit[:8]}"
|
|
|
|
base_desc += ". This may indicate sensitive information has been committed to version control."
|
|
|
|
return base_desc
|
|
|
|
def _get_leak_recommendation(self, rule_id: str) -> str:
|
|
"""Get remediation recommendation"""
|
|
base_rec = "Remove the secret from the codebase and Git history. "
|
|
|
|
if any(pattern in rule_id.lower() for pattern in ["aws", "gcp", "azure"]):
|
|
base_rec += "Revoke the cloud credentials immediately and rotate them. "
|
|
|
|
base_rec += "Consider using Git history rewriting tools (git-filter-branch, BFG) " \
|
|
"to remove sensitive data from commit history. Implement pre-commit hooks " \
|
|
"to prevent future secret commits."
|
|
|
|
return base_rec |