Add missing modules and workflow

This commit is contained in:
Tanguy Duhamel
2025-09-30 15:36:23 +02:00
parent 7382ea6e20
commit b1e13ec5d1
9 changed files with 1336 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
"""
Secret Detection Modules
This package contains modules for detecting secrets, credentials, and sensitive information
in codebases and repositories.
Available modules:
- TruffleHog: Comprehensive secret detection with verification
- Gitleaks: Git-specific secret scanning and leak detection
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from typing import List, Type
from ..base import BaseModule
# Module registry for automatic discovery
SECRET_DETECTION_MODULES: List[Type[BaseModule]] = []
def register_module(module_class: Type[BaseModule]):
"""Register a secret detection module"""
SECRET_DETECTION_MODULES.append(module_class)
return module_class
def get_available_modules() -> List[Type[BaseModule]]:
"""Get all available secret detection modules"""
return SECRET_DETECTION_MODULES.copy()

View File

@@ -0,0 +1,351 @@
"""
Gitleaks Secret Detection Module
This module uses Gitleaks to detect secrets and sensitive information in Git repositories
and file systems.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
from pathlib import Path
from typing import Dict, Any, List
import subprocess
import logging
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class GitleaksModule(BaseModule):
"""Gitleaks secret detection module"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="gitleaks",
version="8.18.0",
description="Git-specific secret scanning and leak detection using Gitleaks",
author="FuzzForge Team",
category="secret_detection",
tags=["secrets", "git", "leak-detection", "credentials"],
input_schema={
"type": "object",
"properties": {
"scan_mode": {
"type": "string",
"enum": ["detect", "protect"],
"default": "detect",
"description": "Scan mode: detect (entire repo history) or protect (staged changes)"
},
"config_file": {
"type": "string",
"description": "Path to custom Gitleaks configuration file"
},
"baseline_file": {
"type": "string",
"description": "Path to baseline file to ignore known findings"
},
"max_target_megabytes": {
"type": "integer",
"default": 100,
"description": "Maximum size of files to scan (in MB)"
},
"redact": {
"type": "boolean",
"default": True,
"description": "Redact secrets in output"
},
"no_git": {
"type": "boolean",
"default": False,
"description": "Scan files without Git context"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"rule_id": {"type": "string"},
"category": {"type": "string"},
"file_path": {"type": "string"},
"line_number": {"type": "integer"},
"secret": {"type": "string"}
}
}
}
}
}
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
scan_mode = config.get("scan_mode", "detect")
if scan_mode not in ["detect", "protect"]:
raise ValueError("scan_mode must be 'detect' or 'protect'")
max_size = config.get("max_target_megabytes", 100)
if not isinstance(max_size, int) or max_size < 1 or max_size > 1000:
raise ValueError("max_target_megabytes must be between 1 and 1000")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute Gitleaks secret detection"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running Gitleaks on {workspace}")
# Build Gitleaks command
scan_mode = config.get("scan_mode", "detect")
cmd = ["gitleaks", scan_mode]
# Add source path
cmd.extend(["--source", str(workspace)])
# Create temp file for JSON output
import tempfile
output_file = tempfile.NamedTemporaryFile(mode='w+', suffix='.json', delete=False)
output_path = output_file.name
output_file.close()
# Add report format and output file
cmd.extend(["--report-format", "json"])
cmd.extend(["--report-path", output_path])
# Add redact option
if config.get("redact", True):
cmd.append("--redact")
# Add max target size
max_size = config.get("max_target_megabytes", 100)
cmd.extend(["--max-target-megabytes", str(max_size)])
# Add config file if specified
if config.get("config_file"):
config_path = Path(config["config_file"])
if config_path.exists():
cmd.extend(["--config", str(config_path)])
# Add baseline file if specified
if config.get("baseline_file"):
baseline_path = Path(config["baseline_file"])
if baseline_path.exists():
cmd.extend(["--baseline-path", str(baseline_path)])
# Add no-git flag if specified
if config.get("no_git", False):
cmd.append("--no-git")
# Add verbose output
cmd.append("--verbose")
logger.debug(f"Running command: {' '.join(cmd)}")
# Run Gitleaks
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
try:
# Read the JSON output from file
with open(output_path, 'r') as f:
output_content = f.read()
if process.returncode == 0:
# No secrets found
logger.info("No secrets detected by Gitleaks")
elif process.returncode == 1:
# Secrets found - parse from file content
findings = self._parse_gitleaks_output(output_content, workspace)
else:
# Error occurred
error_msg = stderr.decode()
logger.error(f"Gitleaks failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"Gitleaks execution failed: {error_msg}"
)
finally:
# Clean up temp file
import os
try:
os.unlink(output_path)
except:
pass
# Create summary
summary = {
"total_leaks": len(findings),
"unique_rules": len(set(f.metadata.get("rule_id", "") for f in findings)),
"files_with_leaks": len(set(f.file_path for f in findings if f.file_path)),
"scan_mode": scan_mode
}
logger.info(f"Gitleaks found {len(findings)} potential leaks")
return self.create_result(
findings=findings,
status="success",
summary=summary
)
except Exception as e:
logger.error(f"Gitleaks module failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_gitleaks_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
"""Parse Gitleaks JSON output into findings"""
findings = []
if not output.strip():
return findings
try:
# Gitleaks outputs JSON array
results = json.loads(output)
if not isinstance(results, list):
logger.warning("Unexpected Gitleaks output format")
return findings
for result in results:
# Extract information
rule_id = result.get("RuleID", "unknown")
description = result.get("Description", "")
file_path = result.get("File", "")
line_number = result.get("LineNumber", 0)
secret = result.get("Secret", "")
match_text = result.get("Match", "")
# Commit info (if available)
commit = result.get("Commit", "")
author = result.get("Author", "")
email = result.get("Email", "")
date = result.get("Date", "")
# Make file path relative to workspace
if file_path:
try:
rel_path = Path(file_path).relative_to(workspace)
file_path = str(rel_path)
except ValueError:
# If file is outside workspace, keep absolute path
pass
# Determine severity based on rule type
severity = self._get_leak_severity(rule_id, description)
# Create finding
finding = self.create_finding(
title=f"Secret leak detected: {rule_id}",
description=self._get_leak_description(rule_id, description, commit),
severity=severity,
category="secret_leak",
file_path=file_path if file_path else None,
line_start=line_number if line_number > 0 else None,
code_snippet=match_text if match_text else secret,
recommendation=self._get_leak_recommendation(rule_id),
metadata={
"rule_id": rule_id,
"secret_type": description,
"commit": commit,
"author": author,
"email": email,
"date": date,
"entropy": result.get("Entropy", 0),
"fingerprint": result.get("Fingerprint", "")
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse Gitleaks output: {e}")
except Exception as e:
logger.warning(f"Error processing Gitleaks results: {e}")
return findings
def _get_leak_severity(self, rule_id: str, description: str) -> str:
"""Determine severity based on secret type"""
critical_patterns = [
"aws", "amazon", "gcp", "google", "azure", "microsoft",
"private_key", "rsa", "ssh", "certificate", "database",
"password", "auth", "token", "secret", "key"
]
rule_lower = rule_id.lower()
desc_lower = description.lower()
# Check for critical patterns
for pattern in critical_patterns:
if pattern in rule_lower or pattern in desc_lower:
if any(x in rule_lower for x in ["aws", "gcp", "azure"]):
return "critical"
elif any(x in rule_lower for x in ["private", "key", "password"]):
return "high"
else:
return "medium"
return "low"
def _get_leak_description(self, rule_id: str, description: str, commit: str) -> str:
"""Get description for the leak finding"""
base_desc = f"Gitleaks detected a potential secret leak matching rule '{rule_id}'"
if description:
base_desc += f" ({description})"
if commit:
base_desc += f" in commit {commit[:8]}"
base_desc += ". This may indicate sensitive information has been committed to version control."
return base_desc
def _get_leak_recommendation(self, rule_id: str) -> str:
"""Get remediation recommendation"""
base_rec = "Remove the secret from the codebase and Git history. "
if any(pattern in rule_id.lower() for pattern in ["aws", "gcp", "azure"]):
base_rec += "Revoke the cloud credentials immediately and rotate them. "
base_rec += "Consider using Git history rewriting tools (git-filter-branch, BFG) " \
"to remove sensitive data from commit history. Implement pre-commit hooks " \
"to prevent future secret commits."
return base_rec

View File

@@ -0,0 +1,294 @@
"""
TruffleHog Secret Detection Module
This module uses TruffleHog to detect secrets, credentials, and sensitive information
with verification capabilities.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import tempfile
from pathlib import Path
from typing import Dict, Any, List
import subprocess
import logging
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class TruffleHogModule(BaseModule):
"""TruffleHog secret detection module"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="trufflehog",
version="3.63.2",
description="Comprehensive secret detection with verification using TruffleHog",
author="FuzzForge Team",
category="secret_detection",
tags=["secrets", "credentials", "sensitive-data", "verification"],
input_schema={
"type": "object",
"properties": {
"verify": {
"type": "boolean",
"default": False,
"description": "Verify discovered secrets"
},
"include_detectors": {
"type": "array",
"items": {"type": "string"},
"description": "Specific detectors to include"
},
"exclude_detectors": {
"type": "array",
"items": {"type": "string"},
"description": "Specific detectors to exclude"
},
"max_depth": {
"type": "integer",
"default": 10,
"description": "Maximum directory depth to scan"
},
"concurrency": {
"type": "integer",
"default": 10,
"description": "Number of concurrent workers"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"detector": {"type": "string"},
"verified": {"type": "boolean"},
"file_path": {"type": "string"},
"line": {"type": "integer"},
"secret": {"type": "string"}
}
}
}
}
}
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
# Check concurrency bounds
concurrency = config.get("concurrency", 10)
if not isinstance(concurrency, int) or concurrency < 1 or concurrency > 50:
raise ValueError("Concurrency must be between 1 and 50")
# Check max_depth bounds
max_depth = config.get("max_depth", 10)
if not isinstance(max_depth, int) or max_depth < 1 or max_depth > 20:
raise ValueError("Max depth must be between 1 and 20")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute TruffleHog secret detection"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running TruffleHog on {workspace}")
# Build TruffleHog command
cmd = ["trufflehog", "filesystem", str(workspace)]
# Add verification flag
if config.get("verify", False):
cmd.append("--verify")
# Add JSON output
cmd.extend(["--json", "--no-update"])
# Add concurrency
cmd.extend(["--concurrency", str(config.get("concurrency", 10))])
# Add max depth
cmd.extend(["--max-depth", str(config.get("max_depth", 10))])
# Add include/exclude detectors
if config.get("include_detectors"):
cmd.extend(["--include-detectors", ",".join(config["include_detectors"])])
if config.get("exclude_detectors"):
cmd.extend(["--exclude-detectors", ",".join(config["exclude_detectors"])])
logger.debug(f"Running command: {' '.join(cmd)}")
# Run TruffleHog
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
if process.returncode == 0 or process.returncode == 1: # 1 indicates secrets found
findings = self._parse_trufflehog_output(stdout.decode(), workspace)
else:
error_msg = stderr.decode()
logger.error(f"TruffleHog failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"TruffleHog execution failed: {error_msg}"
)
# Create summary
summary = {
"total_secrets": len(findings),
"verified_secrets": len([f for f in findings if f.metadata.get("verified", False)]),
"detectors_triggered": len(set(f.metadata.get("detector", "") for f in findings)),
"files_with_secrets": len(set(f.file_path for f in findings if f.file_path))
}
logger.info(f"TruffleHog found {len(findings)} secrets")
return self.create_result(
findings=findings,
status="success",
summary=summary
)
except Exception as e:
logger.error(f"TruffleHog module failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_trufflehog_output(self, output: str, workspace: Path) -> List[ModuleFinding]:
"""Parse TruffleHog JSON output into findings"""
findings = []
for line in output.strip().split('\n'):
if not line.strip():
continue
try:
result = json.loads(line)
# Extract information
detector = result.get("DetectorName", "unknown")
verified = result.get("Verified", False)
raw_secret = result.get("Raw", "")
# Source info
source_metadata = result.get("SourceMetadata", {})
source_data = source_metadata.get("Data", {})
file_path = source_data.get("Filesystem", {}).get("file", "")
line_num = source_data.get("Filesystem", {}).get("line", 0)
# Make file path relative to workspace
if file_path:
try:
rel_path = Path(file_path).relative_to(workspace)
file_path = str(rel_path)
except ValueError:
# If file is outside workspace, keep absolute path
pass
# Determine severity based on verification and detector type
severity = self._get_secret_severity(detector, verified, raw_secret)
# Create finding
finding = self.create_finding(
title=f"{detector} secret detected",
description=self._get_secret_description(detector, verified),
severity=severity,
category="secret_detection",
file_path=file_path if file_path else None,
line_start=line_num if line_num > 0 else None,
code_snippet=self._truncate_secret(raw_secret),
recommendation=self._get_secret_recommendation(detector, verified),
metadata={
"detector": detector,
"verified": verified,
"detector_type": result.get("DetectorType", ""),
"decoder_type": result.get("DecoderType", ""),
"structured_data": result.get("StructuredData", {})
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse TruffleHog output line: {e}")
continue
except Exception as e:
logger.warning(f"Error processing TruffleHog result: {e}")
continue
return findings
def _get_secret_severity(self, detector: str, verified: bool, secret: str) -> str:
"""Determine severity based on secret type and verification status"""
if verified:
# Verified secrets are always high risk
critical_detectors = ["aws", "gcp", "azure", "github", "gitlab", "database"]
if any(crit in detector.lower() for crit in critical_detectors):
return "critical"
return "high"
# Unverified secrets
high_risk_detectors = ["private_key", "certificate", "password", "token"]
if any(high in detector.lower() for high in high_risk_detectors):
return "medium"
return "low"
def _get_secret_description(self, detector: str, verified: bool) -> str:
"""Get description for the secret finding"""
verification_status = "verified and active" if verified else "unverified"
return f"A {detector} secret was detected and is {verification_status}. " \
f"This may represent a security risk if the credential is valid."
def _get_secret_recommendation(self, detector: str, verified: bool) -> str:
"""Get remediation recommendation"""
if verified:
return f"IMMEDIATE ACTION REQUIRED: This {detector} secret is verified and active. " \
f"Revoke the credential immediately, remove it from the codebase, and " \
f"implement proper secret management practices."
else:
return f"Review this {detector} secret to determine if it's valid. " \
f"If real, revoke the credential and remove it from the codebase. " \
f"Consider implementing secret scanning in CI/CD pipelines."
def _truncate_secret(self, secret: str, max_length: int = 50) -> str:
"""Truncate secret for display purposes"""
if len(secret) <= max_length:
return secret
return secret[:max_length] + "..."

View File

@@ -0,0 +1,47 @@
# Secret Detection Workflow Dockerfile
FROM prefecthq/prefect:3-python3.11
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
wget \
git \
ca-certificates \
gnupg \
&& rm -rf /var/lib/apt/lists/*
# Install TruffleHog (use direct binary download to avoid install script issues)
RUN curl -sSfL "https://github.com/trufflesecurity/trufflehog/releases/download/v3.63.2/trufflehog_3.63.2_linux_amd64.tar.gz" -o trufflehog.tar.gz \
&& tar -xzf trufflehog.tar.gz \
&& mv trufflehog /usr/local/bin/ \
&& rm trufflehog.tar.gz
# Install Gitleaks (use specific version to avoid API rate limiting)
RUN wget https://github.com/gitleaks/gitleaks/releases/download/v8.18.2/gitleaks_8.18.2_linux_x64.tar.gz \
&& tar -xzf gitleaks_8.18.2_linux_x64.tar.gz \
&& mv gitleaks /usr/local/bin/ \
&& rm gitleaks_8.18.2_linux_x64.tar.gz
# Verify installations
RUN trufflehog --version && gitleaks version
# Set working directory
WORKDIR /opt/prefect
# Create toolbox directory structure
RUN mkdir -p /opt/prefect/toolbox
# Set environment variables
ENV PYTHONPATH=/opt/prefect/toolbox:/opt/prefect/toolbox/workflows
ENV WORKFLOW_NAME=secret_detection_scan
# The toolbox code will be mounted at runtime from the backend container
# This includes:
# - /opt/prefect/toolbox/modules/base.py
# - /opt/prefect/toolbox/modules/secret_detection/ (TruffleHog, Gitleaks modules)
# - /opt/prefect/toolbox/modules/reporter/ (SARIF reporter)
# - /opt/prefect/toolbox/workflows/comprehensive/secret_detection_scan/
VOLUME /opt/prefect/toolbox
# Set working directory for execution
WORKDIR /opt/prefect

View File

@@ -0,0 +1,58 @@
# Secret Detection Workflow Dockerfile - Self-Contained Version
# This version copies all required modules into the image for complete isolation
FROM prefecthq/prefect:3-python3.11
# Install system dependencies
RUN apt-get update && apt-get install -y \
curl \
wget \
git \
ca-certificates \
gnupg \
&& rm -rf /var/lib/apt/lists/*
# Install TruffleHog
RUN curl -sSfL https://raw.githubusercontent.com/trufflesecurity/trufflehog/main/scripts/install.sh | sh -s -- -b /usr/local/bin
# Install Gitleaks
RUN wget https://github.com/gitleaks/gitleaks/releases/latest/download/gitleaks_linux_x64.tar.gz \
&& tar -xzf gitleaks_linux_x64.tar.gz \
&& mv gitleaks /usr/local/bin/ \
&& rm gitleaks_linux_x64.tar.gz
# Verify installations
RUN trufflehog --version && gitleaks version
# Set working directory
WORKDIR /opt/prefect
# Create directory structure
RUN mkdir -p /opt/prefect/toolbox/modules/secret_detection \
/opt/prefect/toolbox/modules/reporter \
/opt/prefect/toolbox/workflows/comprehensive/secret_detection_scan
# Copy the base module and required modules
COPY toolbox/modules/base.py /opt/prefect/toolbox/modules/base.py
COPY toolbox/modules/__init__.py /opt/prefect/toolbox/modules/__init__.py
COPY toolbox/modules/secret_detection/ /opt/prefect/toolbox/modules/secret_detection/
COPY toolbox/modules/reporter/ /opt/prefect/toolbox/modules/reporter/
# Copy the workflow code
COPY toolbox/workflows/comprehensive/secret_detection_scan/ /opt/prefect/toolbox/workflows/comprehensive/secret_detection_scan/
# Copy toolbox init files
COPY toolbox/__init__.py /opt/prefect/toolbox/__init__.py
COPY toolbox/workflows/__init__.py /opt/prefect/toolbox/workflows/__init__.py
COPY toolbox/workflows/comprehensive/__init__.py /opt/prefect/toolbox/workflows/comprehensive/__init__.py
# Install Python dependencies for the modules
RUN pip install --no-cache-dir \
pydantic \
asyncio
# Set environment variables
ENV PYTHONPATH=/opt/prefect/toolbox:/opt/prefect/toolbox/workflows
ENV WORKFLOW_NAME=secret_detection_scan
# Set default command (can be overridden)
CMD ["python", "-m", "toolbox.workflows.comprehensive.secret_detection_scan.workflow"]

View File

@@ -0,0 +1,130 @@
# Secret Detection Scan Workflow
This workflow performs comprehensive secret detection using multiple industry-standard tools:
- **TruffleHog**: Comprehensive secret detection with verification capabilities
- **Gitleaks**: Git-specific secret scanning and leak detection
## Features
- **Parallel Execution**: Runs TruffleHog and Gitleaks concurrently for faster results
- **Deduplication**: Automatically removes duplicate findings across tools
- **SARIF Output**: Generates standardized SARIF reports for integration with security tools
- **Configurable**: Supports extensive configuration for both tools
## Dependencies
### Required Modules
- `toolbox.modules.secret_detection.trufflehog`
- `toolbox.modules.secret_detection.gitleaks`
- `toolbox.modules.reporter` (SARIF reporter)
- `toolbox.modules.base` (Base module interface)
### External Tools
- TruffleHog v3.63.2+
- Gitleaks v8.18.0+
## Docker Deployment
This workflow provides two Docker deployment approaches:
### 1. Volume-Based Approach (Default: `Dockerfile`)
**Advantages:**
- Live code updates without rebuilding images
- Smaller image sizes
- Consistent module versions across workflows
- Faster development iteration
**How it works:**
- Docker image contains only external tools (TruffleHog, Gitleaks)
- Python modules are mounted at runtime from the backend container
- Backend manages code synchronization via shared volumes
### 2. Self-Contained Approach (`Dockerfile.self-contained`)
**Advantages:**
- Complete isolation and reproducibility
- No runtime dependencies on backend code
- Can run independently of FuzzForge platform
- Better for CI/CD integration
**How it works:**
- All required Python modules are copied into the Docker image
- Image is completely self-contained
- Larger image size but fully portable
## Configuration
### TruffleHog Configuration
```json
{
"trufflehog_config": {
"verify": true, // Verify discovered secrets
"concurrency": 10, // Number of concurrent workers
"max_depth": 10, // Maximum directory depth
"include_detectors": [], // Specific detectors to include
"exclude_detectors": [] // Specific detectors to exclude
}
}
```
### Gitleaks Configuration
```json
{
"gitleaks_config": {
"scan_mode": "detect", // "detect" or "protect"
"redact": true, // Redact secrets in output
"max_target_megabytes": 100, // Maximum file size (MB)
"no_git": false, // Scan without Git context
"config_file": "", // Custom Gitleaks config
"baseline_file": "" // Baseline file for known findings
}
}
```
## Usage Example
```bash
curl -X POST "http://localhost:8000/workflows/secret_detection_scan/submit" \
-H "Content-Type: application/json" \
-d '{
"target_path": "/path/to/scan",
"volume_mode": "ro",
"parameters": {
"trufflehog_config": {
"verify": true,
"concurrency": 15
},
"gitleaks_config": {
"scan_mode": "detect",
"max_target_megabytes": 200
}
}
}'
```
## Output Format
The workflow generates a SARIF report containing:
- All unique findings from both tools
- Severity levels mapped to standard scale
- File locations and line numbers
- Detailed descriptions and recommendations
- Tool-specific metadata
## Performance Considerations
- **TruffleHog**: CPU-intensive with verification enabled
- **Gitleaks**: Memory-intensive for large repositories
- **Recommended Resources**: 512Mi memory, 500m CPU
- **Typical Runtime**: 1-5 minutes for small repos, 10-30 minutes for large ones
## Security Notes
- Secrets are redacted in output by default
- Verified secrets are marked with higher severity
- Both tools support custom rules and exclusions
- Consider using baseline files for known false positives

View File

@@ -0,0 +1,17 @@
"""
Secret Detection Scan Workflow
This package contains the comprehensive secret detection workflow that combines
multiple secret detection tools for thorough analysis.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.

View File

@@ -0,0 +1,113 @@
name: secret_detection_scan
version: "2.0.0"
description: "Comprehensive secret detection using TruffleHog and Gitleaks"
author: "FuzzForge Team"
category: "comprehensive"
tags:
- "secrets"
- "credentials"
- "detection"
- "trufflehog"
- "gitleaks"
- "comprehensive"
supported_volume_modes:
- "ro"
- "rw"
default_volume_mode: "ro"
default_target_path: "/workspace"
requirements:
tools:
- "trufflehog"
- "gitleaks"
resources:
memory: "512Mi"
cpu: "500m"
timeout: 1800
has_docker: true
default_parameters:
target_path: "/workspace"
volume_mode: "ro"
trufflehog_config: {}
gitleaks_config: {}
reporter_config: {}
parameters:
type: object
properties:
target_path:
type: string
default: "/workspace"
description: "Path to analyze"
volume_mode:
type: string
enum: ["ro", "rw"]
default: "ro"
description: "Volume mount mode"
trufflehog_config:
type: object
description: "TruffleHog configuration"
properties:
verify:
type: boolean
description: "Verify discovered secrets"
concurrency:
type: integer
description: "Number of concurrent workers"
max_depth:
type: integer
description: "Maximum directory depth to scan"
include_detectors:
type: array
items:
type: string
description: "Specific detectors to include"
exclude_detectors:
type: array
items:
type: string
description: "Specific detectors to exclude"
gitleaks_config:
type: object
description: "Gitleaks configuration"
properties:
scan_mode:
type: string
enum: ["detect", "protect"]
description: "Scan mode"
redact:
type: boolean
description: "Redact secrets in output"
max_target_megabytes:
type: integer
description: "Maximum file size to scan (MB)"
no_git:
type: boolean
description: "Scan files without Git context"
config_file:
type: string
description: "Path to custom configuration file"
baseline_file:
type: string
description: "Path to baseline file"
reporter_config:
type: object
description: "SARIF reporter configuration"
properties:
output_file:
type: string
description: "Output SARIF file name"
include_code_flows:
type: boolean
description: "Include code flow information"
output_schema:
type: object
properties:
sarif:
type: object
description: "SARIF-formatted security findings"

View File

@@ -0,0 +1,290 @@
"""
Secret Detection Scan Workflow
This workflow performs comprehensive secret detection using multiple tools:
- TruffleHog: Comprehensive secret detection with verification
- Gitleaks: Git-specific secret scanning
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import sys
import logging
from pathlib import Path
from typing import Dict, Any, List, Optional
from prefect import flow, task
from prefect.artifacts import create_markdown_artifact, create_table_artifact
import asyncio
import json
# Add modules to path
sys.path.insert(0, '/app')
# Import modules
from toolbox.modules.secret_detection.trufflehog import TruffleHogModule
from toolbox.modules.secret_detection.gitleaks import GitleaksModule
from toolbox.modules.reporter import SARIFReporter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@task(name="trufflehog_scan")
async def run_trufflehog_task(workspace: Path, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Task to run TruffleHog secret detection.
Args:
workspace: Path to the workspace
config: TruffleHog configuration
Returns:
TruffleHog results
"""
logger.info("Running TruffleHog secret detection")
module = TruffleHogModule()
result = await module.execute(config, workspace)
logger.info(f"TruffleHog completed: {result.summary.get('total_secrets', 0)} secrets found")
return result.dict()
@task(name="gitleaks_scan")
async def run_gitleaks_task(workspace: Path, config: Dict[str, Any]) -> Dict[str, Any]:
"""
Task to run Gitleaks secret detection.
Args:
workspace: Path to the workspace
config: Gitleaks configuration
Returns:
Gitleaks results
"""
logger.info("Running Gitleaks secret detection")
module = GitleaksModule()
result = await module.execute(config, workspace)
logger.info(f"Gitleaks completed: {result.summary.get('total_leaks', 0)} leaks found")
return result.dict()
@task(name="aggregate_findings")
async def aggregate_findings_task(
trufflehog_results: Dict[str, Any],
gitleaks_results: Dict[str, Any],
config: Dict[str, Any],
workspace: Path
) -> Dict[str, Any]:
"""
Task to aggregate findings from all secret detection tools.
Args:
trufflehog_results: Results from TruffleHog
gitleaks_results: Results from Gitleaks
config: Reporter configuration
workspace: Path to workspace
Returns:
Aggregated SARIF report
"""
logger.info("Aggregating secret detection findings")
# Combine all findings
all_findings = []
# Add TruffleHog findings
trufflehog_findings = trufflehog_results.get("findings", [])
all_findings.extend(trufflehog_findings)
# Add Gitleaks findings
gitleaks_findings = gitleaks_results.get("findings", [])
all_findings.extend(gitleaks_findings)
# Deduplicate findings based on file path and line number
unique_findings = []
seen_signatures = set()
for finding in all_findings:
# Create signature for deduplication
signature = (
finding.get("file_path", ""),
finding.get("line_start", 0),
finding.get("title", "").lower()[:50] # First 50 chars of title
)
if signature not in seen_signatures:
seen_signatures.add(signature)
unique_findings.append(finding)
else:
logger.debug(f"Deduplicated finding: {signature}")
logger.info(f"Aggregated {len(unique_findings)} unique findings from {len(all_findings)} total")
# Generate SARIF report
reporter = SARIFReporter()
reporter_config = {
**config,
"findings": unique_findings,
"tool_name": "FuzzForge Secret Detection",
"tool_version": "1.0.0",
"tool_description": "Comprehensive secret detection using TruffleHog and Gitleaks"
}
result = await reporter.execute(reporter_config, workspace)
return result.dict().get("sarif", {})
@flow(name="secret_detection_scan", log_prints=True)
async def main_flow(
target_path: str = "/workspace",
volume_mode: str = "ro",
trufflehog_config: Optional[Dict[str, Any]] = None,
gitleaks_config: Optional[Dict[str, Any]] = None,
reporter_config: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""
Main secret detection workflow.
This workflow:
1. Runs TruffleHog for comprehensive secret detection
2. Runs Gitleaks for Git-specific secret detection
3. Aggregates and deduplicates findings
4. Generates a unified SARIF report
Args:
target_path: Path to the mounted workspace (default: /workspace)
volume_mode: Volume mount mode (ro/rw)
trufflehog_config: Configuration for TruffleHog
gitleaks_config: Configuration for Gitleaks
reporter_config: Configuration for SARIF reporter
Returns:
SARIF-formatted findings report
"""
logger.info("Starting comprehensive secret detection workflow")
logger.info(f"Workspace: {target_path}, Mode: {volume_mode}")
# Set workspace path
workspace = Path(target_path)
if not workspace.exists():
logger.error(f"Workspace does not exist: {workspace}")
return {
"error": f"Workspace not found: {workspace}",
"sarif": None
}
# Default configurations - merge with provided configs to ensure defaults are always applied
default_trufflehog_config = {
"verify": False,
"concurrency": 10,
"max_depth": 10,
"no_git": True # Add no_git for filesystem scanning
}
trufflehog_config = {**default_trufflehog_config, **(trufflehog_config or {})}
default_gitleaks_config = {
"scan_mode": "detect",
"redact": True,
"max_target_megabytes": 100,
"no_git": True # Critical for non-git directories
}
gitleaks_config = {**default_gitleaks_config, **(gitleaks_config or {})}
default_reporter_config = {
"include_code_flows": False
}
reporter_config = {**default_reporter_config, **(reporter_config or {})}
try:
# Run secret detection tools in parallel
logger.info("Phase 1: Running secret detection tools")
# Create tasks for parallel execution
trufflehog_task_result = run_trufflehog_task(workspace, trufflehog_config)
gitleaks_task_result = run_gitleaks_task(workspace, gitleaks_config)
# Wait for both to complete
trufflehog_results, gitleaks_results = await asyncio.gather(
trufflehog_task_result,
gitleaks_task_result,
return_exceptions=True
)
# Handle any exceptions
if isinstance(trufflehog_results, Exception):
logger.error(f"TruffleHog failed: {trufflehog_results}")
trufflehog_results = {"findings": [], "status": "failed"}
if isinstance(gitleaks_results, Exception):
logger.error(f"Gitleaks failed: {gitleaks_results}")
gitleaks_results = {"findings": [], "status": "failed"}
# Aggregate findings
logger.info("Phase 2: Aggregating findings")
sarif_report = await aggregate_findings_task(
trufflehog_results,
gitleaks_results,
reporter_config,
workspace
)
# Log summary
if sarif_report and "runs" in sarif_report:
results_count = len(sarif_report["runs"][0].get("results", []))
logger.info(f"Workflow completed successfully with {results_count} unique secret findings")
# Log tool-specific stats
trufflehog_count = len(trufflehog_results.get("findings", []))
gitleaks_count = len(gitleaks_results.get("findings", []))
logger.info(f"Tool results - TruffleHog: {trufflehog_count}, Gitleaks: {gitleaks_count}")
else:
logger.info("Workflow completed successfully with no findings")
return sarif_report
except Exception as e:
logger.error(f"Secret detection workflow failed: {e}")
# Return error in SARIF format
return {
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [
{
"tool": {
"driver": {
"name": "FuzzForge Secret Detection",
"version": "1.0.0"
}
},
"results": [],
"invocations": [
{
"executionSuccessful": False,
"exitCode": 1,
"exitCodeDescription": str(e)
}
]
}
]
}
if __name__ == "__main__":
# For local testing
import asyncio
asyncio.run(main_flow(
target_path="/tmp/test",
trufflehog_config={"verify": True, "max_depth": 5},
gitleaks_config={"scan_mode": "detect"}
))