Files
fuzzforge_ai/backend/toolbox/workflows/trufflehog_detection/activities.py
tduhamel42 2da986ebb0 feat: Add secret detection workflows and comprehensive benchmarking (#15)
Add three production-ready secret detection workflows with full benchmarking infrastructure:

**New Workflows:**
- gitleaks_detection: Pattern-based secret scanning (13/32 benchmark secrets)
- trufflehog_detection: Entropy-based detection with verification (1/32 benchmark secrets)
- llm_secret_detection: AI-powered semantic analysis (32/32 benchmark secrets - 100% recall)

**Benchmarking Infrastructure:**
- Ground truth dataset with 32 documented secrets (12 Easy, 10 Medium, 10 Hard)
- Automated comparison tools for precision/recall testing
- SARIF output format for all workflows
- Performance metrics and tool comparison reports

**Fixes:**
- Set gitleaks default to no_git=True for uploaded directories
- Update documentation with correct secret counts and workflow names
- Temporarily deactivate AI agent command
- Clean up deprecated test files and GitGuardian workflow

**Testing:**
All workflows verified on secret_detection_benchmark and vulnerable_app test projects.
Workers healthy and system fully functional.
2025-10-16 11:21:24 +02:00

112 lines
3.6 KiB
Python

"""TruffleHog Detection Workflow Activities"""
import logging
from pathlib import Path
from typing import Dict, Any
from temporalio import activity
try:
from toolbox.modules.secret_detection.trufflehog import TruffleHogModule
except ImportError:
from modules.secret_detection.trufflehog import TruffleHogModule
@activity.defn(name="scan_with_trufflehog")
async def scan_with_trufflehog(target_path: str, config: Dict[str, Any]) -> Dict[str, Any]:
"""Scan code using TruffleHog."""
activity.logger.info(f"Starting TruffleHog scan: {target_path}")
workspace = Path(target_path)
trufflehog = TruffleHogModule()
trufflehog.validate_config(config)
result = await trufflehog.execute(config, workspace)
if result.status == "failed":
raise RuntimeError(f"TruffleHog scan failed: {result.error}")
findings_dicts = [finding.model_dump() for finding in result.findings]
return {"findings": findings_dicts, "summary": result.summary}
@activity.defn(name="trufflehog_generate_sarif")
async def trufflehog_generate_sarif(findings: list, metadata: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate SARIF report from TruffleHog findings.
Args:
findings: List of finding dictionaries
metadata: Metadata including tool_name, tool_version
Returns:
SARIF report dictionary
"""
activity.logger.info(f"Generating SARIF report from {len(findings)} findings")
# Basic SARIF 2.1.0 structure
sarif_report = {
"version": "2.1.0",
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"runs": [
{
"tool": {
"driver": {
"name": metadata.get("tool_name", "trufflehog"),
"version": metadata.get("tool_version", "3.63.2"),
"informationUri": "https://github.com/trufflesecurity/trufflehog"
}
},
"results": []
}
]
}
# Convert findings to SARIF results
for finding in findings:
sarif_result = {
"ruleId": finding.get("metadata", {}).get("detector", "unknown"),
"level": _severity_to_sarif_level(finding.get("severity", "warning")),
"message": {
"text": finding.get("title", "Secret detected")
},
"locations": []
}
# Add description if present
if finding.get("description"):
sarif_result["message"]["markdown"] = finding["description"]
# Add location if file path is present
if finding.get("file_path"):
location = {
"physicalLocation": {
"artifactLocation": {
"uri": finding["file_path"]
}
}
}
# Add region if line number is present
if finding.get("line_start"):
location["physicalLocation"]["region"] = {
"startLine": finding["line_start"]
}
sarif_result["locations"].append(location)
sarif_report["runs"][0]["results"].append(sarif_result)
activity.logger.info(f"Generated SARIF report with {len(sarif_report['runs'][0]['results'])} results")
return sarif_report
def _severity_to_sarif_level(severity: str) -> str:
"""Convert severity to SARIF level"""
severity_map = {
"critical": "error",
"high": "error",
"medium": "warning",
"low": "note",
"info": "note"
}
return severity_map.get(severity.lower(), "warning")