first commit

This commit is contained in:
tmarschutz
2025-10-03 11:45:17 +02:00
parent 09821c1c43
commit 5da3f1e071
10571 changed files with 1386578 additions and 1 deletions
@@ -0,0 +1,25 @@
"""
Android Security Modules
This package contains modules for android static code analysis and security testing.
Available modules:
- MobSF: Mobile Security Framework
- Jadx: Dex to Java decompiler
- OpenGrep: Open-source pattern-based static analysis tool
"""
from typing import List, Type
from ..base import BaseModule
# Module registry for automatic discovery
ANDROID_MODULES: List[Type[BaseModule]] = []
def register_module(module_class: Type[BaseModule]):
"""Register a android security module"""
ANDROID_MODULES.append(module_class)
return module_class
def get_available_modules() -> List[Type[BaseModule]]:
"""Get all available android modules"""
return ANDROID_MODULES.copy()
@@ -0,0 +1,15 @@
rules:
- id: clipboard-sensitive-data
severity: WARNING
languages: [java]
message: "Sensitive data may be copied to the clipboard."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: security
area: clipboard
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$CLIPBOARD.setPrimaryClip($CLIP)"
@@ -0,0 +1,23 @@
rules:
- id: hardcoded-secrets
severity: WARNING
languages: [java]
message: "Possible hardcoded secret found in variable '$NAME'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: secrets
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: 'String $NAME = "$VAL";'
- pattern: 'final String $NAME = "$VAL";'
- pattern: 'private String $NAME = "$VAL";'
- pattern: 'public static String $NAME = "$VAL";'
- pattern: 'static final String $NAME = "$VAL";'
- pattern-regex: "$NAME =~ /(?i).*(api|key|token|secret|pass|auth|session|bearer|access|private).*/"
@@ -0,0 +1,18 @@
rules:
- id: insecure-data-storage
severity: WARNING
languages: [java]
message: "Potential insecure data storage (external storage)."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern-either:
- pattern: "$CTX.openFileOutput($NAME, $MODE)"
- pattern: "Environment.getExternalStorageDirectory()"
@@ -0,0 +1,16 @@
rules:
- id: insecure-deeplink
severity: WARNING
languages: [xml]
message: "Potential insecure deeplink found in intent-filter."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<intent-filter>
@@ -0,0 +1,21 @@
rules:
- id: insecure-logging
severity: WARNING
languages: [java]
message: "Sensitive data logged via Android Log API."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: logging
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "Log.d($TAG, $MSG)"
- pattern: "Log.e($TAG, $MSG)"
- pattern: "System.out.println($MSG)"
- pattern-regex: "$MSG =~ /(?i).*(password|token|secret|api|auth|session).*/"
@@ -0,0 +1,15 @@
rules:
- id: intent-redirection
severity: WARNING
languages: [java]
message: "Potential intent redirection: using getIntent().getExtras() without validation."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: intent
area: intercomponent
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$ACT.getIntent().getExtras()"
@@ -0,0 +1,18 @@
rules:
- id: sensitive-data-in-shared-preferences
severity: WARNING
languages: [java]
message: "Sensitive data may be stored in SharedPreferences. Please review the key '$KEY'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern: "$EDITOR.putString($KEY, $VAL);"
- pattern-regex: "$KEY =~ /(?i).*(username|password|pass|token|auth_token|api_key|secret|sessionid|email).*/"
@@ -0,0 +1,21 @@
rules:
- id: sqlite-injection
severity: ERROR
languages: [java]
message: "Possible SQL injection: concatenated input in rawQuery or execSQL."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: injection
area: database
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "$DB.rawQuery($QUERY, ...)"
- pattern: "$DB.execSQL($QUERY)"
- pattern-regex: "$QUERY =~ /.*\".*\".*\\+.*/"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-activity
severity: WARNING
languages: [xml]
message: "Activity exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<activity android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-content-provider
severity: WARNING
languages: [xml]
message: "ContentProvider exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<provider android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-service
severity: WARNING
languages: [xml]
message: "Service exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<service android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: webview-javascript-enabled
severity: ERROR
languages: [java]
message: "WebView with JavaScript enabled can be dangerous if loading untrusted content."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.getSettings().setJavaScriptEnabled(true)"
@@ -0,0 +1,16 @@
rules:
- id: webview-load-arbitrary-url
severity: WARNING
languages: [java]
message: "Loading unvalidated URL in WebView may cause open redirect or XSS."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.loadUrl($URL)"
+197
View File
@@ -0,0 +1,197 @@
"""Jadx APK Decompilation Module"""
import asyncio
import shutil
from pathlib import Path
from typing import Dict, Any
import logging
from ..base import BaseModule, ModuleMetadata, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class JadxModule(BaseModule):
"""Module responsible for decompiling APK files with Jadx"""
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="jadx",
version="1.5.0",
description="Android APK decompilation using Jadx",
author="FuzzForge Team",
category="android",
tags=["android", "jadx", "decompilation", "reverse"],
input_schema={
"type": "object",
"properties": {
"apk_path": {
"type": "string",
"description": "Path to the APK to decompile (absolute or relative to workspace)",
},
"output_dir": {
"type": "string",
"description": "Directory (relative to workspace) where Jadx output should be written",
"default": "jadx_output",
},
"overwrite": {
"type": "boolean",
"description": "Overwrite existing output directory if present",
"default": True,
},
"threads": {
"type": "integer",
"description": "Number of Jadx decompilation threads",
"default": 4,
},
"decompiler_args": {
"type": "array",
"items": {"type": "string"},
"description": "Additional arguments passed directly to Jadx",
},
},
"required": ["apk_path"],
},
output_schema={
"type": "object",
"properties": {
"output_dir": {"type": "string"},
"source_dir": {"type": "string"},
"resource_dir": {"type": "string"},
},
},
)
def validate_config(self, config: Dict[str, Any]) -> bool:
apk_path = config.get("apk_path")
if not apk_path:
raise ValueError("'apk_path' must be provided for Jadx decompilation")
threads = config.get("threads", 4)
if not isinstance(threads, int) or threads < 1 or threads > 32:
raise ValueError("threads must be between 1 and 32")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
self.start_timer()
try:
self.validate_config(config)
workspace = workspace.resolve()
if not workspace.exists():
raise ValueError(f"Workspace does not exist: {workspace}")
apk_path = Path(config["apk_path"])
if not apk_path.is_absolute():
apk_path = (workspace / apk_path).resolve()
if not apk_path.exists():
raise ValueError(f"APK not found: {apk_path}")
if apk_path.is_dir():
raise ValueError(f"APK path must be a file, not a directory: {apk_path}")
output_dir = Path(config.get("output_dir", "jadx_output"))
if not output_dir.is_absolute():
output_dir = (workspace / output_dir).resolve()
if output_dir.exists():
if config.get("overwrite", True):
shutil.rmtree(output_dir)
else:
raise ValueError(
f"Output directory already exists: {output_dir}. Set overwrite=true to replace it."
)
output_dir.mkdir(parents=True, exist_ok=True)
threads = str(config.get("threads", 4))
extra_args = config.get("decompiler_args", []) or []
cmd = [
"jadx",
"--threads-count",
threads,
"--deobf",
"--output-dir",
str(output_dir),
]
cmd.extend(extra_args)
cmd.append(str(apk_path))
logger.info("Running Jadx decompilation: %s", " ".join(cmd))
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
stdout, stderr = await process.communicate()
stdout_str = stdout.decode(errors="ignore") if stdout else ""
stderr_str = stderr.decode(errors="ignore") if stderr else ""
if stdout_str:
logger.debug("Jadx stdout: %s", stdout_str[:200])
if stderr_str:
logger.debug("Jadx stderr: %s", stderr_str[:200])
if process.returncode != 0:
error_output = stderr_str or stdout_str or "No error output"
raise RuntimeError(
f"Jadx failed with exit code {process.returncode}: {error_output[:500]}"
)
logger.debug("Jadx stdout: %s", stdout.decode(errors="ignore")[:200])
source_dir = output_dir / "sources"
resource_dir = output_dir / "resources"
if not source_dir.exists():
logger.warning("Jadx sources directory not found at expected path: %s", source_dir)
else:
sample_files = []
for idx, file_path in enumerate(source_dir.rglob("*.java")):
sample_files.append(str(file_path))
if idx >= 4:
break
logger.info("Sample Jadx Java files: %s", sample_files or "<none>")
java_files = 0
if source_dir.exists():
java_files = sum(1 for _ in source_dir.rglob("*.java"))
summary = {
"output_dir": str(output_dir),
"source_dir": str(source_dir if source_dir.exists() else output_dir),
"resource_dir": str(resource_dir if resource_dir.exists() else output_dir),
"java_files": java_files,
}
metadata = {
"apk_path": str(apk_path),
"output_dir": str(output_dir),
"source_dir": summary["source_dir"],
"resource_dir": summary["resource_dir"],
"threads": threads,
}
return self.create_result(
findings=[],
status="success",
summary=summary,
metadata=metadata,
)
except Exception as exc:
logger.error("Jadx module failed: %s", exc)
return self.create_result(
findings=[],
status="failed",
error=str(exc),
)
+293
View File
@@ -0,0 +1,293 @@
from pathlib import Path
from typing import Dict, Any
from toolbox.modules.base import BaseModule, ModuleResult, ModuleMetadata, ModuleFinding
import requests
import os
import time
import json
from collections import Counter
"""
TODO:
* Configure workspace storage for apk and reports
* Think about mobsf repo implementation inside workflow
* Curl mobsf pdf report
* Save Json mobsf report
* Export Web server interface from the Workflow docker
"""
class MobSFModule(BaseModule):
def __init__(self):
self.mobsf_url = "http://localhost:8877"
self.file_path = ""
self.api_key = ""
self.scan_id = None
self.scan_hash = ""
self.report_file = ""
self._metadata = self.get_metadata()
self.start_timer() # <-- Add this line
def upload_file(self):
"""
Upload file to MobSF VM
Returns scan hash if upload succeeded
"""
# Ensure file_path is set and valid
if not self.file_path or not os.path.isfile(self.file_path):
raise ValueError("Invalid or missing file_path for upload.")
# Don't set Content-Type manually - let requests handle it
# MobSF expects API key in X-Mobsf-Api-Key header
headers = {'X-Mobsf-Api-Key': self.api_key}
# Keep the file open during the entire request
with open(self.file_path, 'rb') as f:
f.seek(0)
# Extract just the filename from the full path
filename = os.path.basename(self.file_path)
files = {'file': (filename, f, 'application/vnd.android.package-archive')}
# Make the request while the file is still open
response = requests.post(f"{self.mobsf_url}/api/v1/upload", files=files, headers=headers)
if response.status_code == 200:
resp_json = response.json()
if resp_json.get('hash'):
print("[+] Upload succeeded, scan hash:", resp_json['hash'])
return resp_json['hash']
else:
raise Exception(f"File upload failed: {resp_json}")
else:
raise Exception(f"Failed to upload file: {response.text}")
def start_scan(self, re_scan: int = 0, max_attempts: int = 10, delay: int = 3):
"""
Scan file that is already uploaded. Retries if scan is not ready.
Returns scan result or raises Exception after max_attempts.
"""
print("[+] Starting scan for hash", self.scan_hash)
data = {'hash': self.scan_hash}
headers = {'X-Mobsf-Api-Key': self.api_key}
response = requests.post(f"{self.mobsf_url}/api/v1/scan", data=data, headers=headers)
if response.status_code == 200:
try:
result = response.json()
# Heuristic: check for expected keys in result
if result:
print("[+] Scan succeeded for hash", self.scan_hash)
return result
except Exception as e:
print(f"Error parsing scan result: {e}")
def get_json_results(self):
"""
Retrieve JSON results for the scanned file
"""
headers = {'X-Mobsf-Api-Key': self.api_key}
data = {'hash': self.scan_hash}
response = requests.post(f"{self.mobsf_url}/api/v1/report_json", data=data, headers=headers)
if response.status_code == 200:
f = open('dump.json', 'w').write(json.dumps(response.json(), indent=2))
print("[+] Retrieved JSON results")
return response.json()
else:
raise Exception(f"Failed to retrieve JSON results: {response.text}")
def create_summary(self, findings):
"""
Summarize findings by severity.
Returns a dict like {'high': 3, 'info': 2, ...}
"""
severity_counter = Counter()
for finding in findings:
sev = getattr(finding, "severity", None)
if sev is None and isinstance(finding, dict):
sev = finding.get("severity")
if sev:
severity_counter[sev] += 1
res = dict(severity_counter)
print("Total Findings:", len(findings))
print("Severity counts:")
print(res)
return res
def parse_json_results(self):
if self.report_file=="" or not os.path.isfile(self.report_file):
raise ValueError("Invalid or missing report_file for parsing.")
f = open(self.report_file, 'r')
data = json.load(f)
findings = []
# Check specific sections
sections_to_parse = ['permissions', 'manifest_analysis', 'code_analysis', 'behaviour']
for section_name in sections_to_parse:
if section_name in data:
section = data[section_name]
#Permissions
if section_name == 'permissions':
for name, attrs in section.items():
findings.append(self.create_finding(
title=name,
description=attrs.get('description'),
severity=attrs.get('status'),
category="permission",
metadata={
'info': attrs.get('info'),
}
))
#Manifest Analysis
elif section_name == 'manifest_analysis':
findings_list = section.get('manifest_findings', [])
for x in findings_list:
findings.append(self.create_finding(
title=attrs.get('title') or attrs.get('name') or "unknown",
description=attrs.get('description', "No description"),
severity=attrs.get('severity', "unknown"),
category=section_name,
metadata={
'tag': attrs.get('rule')
}))
#Code Analysis
elif section_name == 'code_analysis':
findings_list = section.get('findings', [])
for name, attrs in findings_list.items():
metadata = attrs.get('metadata', {})
findings.append(self.create_finding(
title=name,
description=metadata.get('description'),
severity=metadata.get('severity'),
category="code_analysis",
metadata={
'cwe': metadata.get('cwe'),
'owasp': metadata.get('owasp'),
'files': attrs.get('file')
}))
#Behaviour
elif section_name == 'behaviour':
finding = []
for key, value in data['behaviour'].items():
metadata = value.get('metadata', {})
findings.append(self.create_finding(
title="behaviour_"+metadata.get('label')[0],
description=metadata.get('description'),
severity=metadata.get('severity'),
category="behaviour",
metadata={
'file': value.get('files', {})
}
))
return findings
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
findings = []
#Checking that mobsf server is reachable
self.mobsf_url = config.get("mobsf_url", "")
self.file_path = config.get("file_path", "")
# Get API key from config first, fallback to environment variable
self.api_key = config.get("api_key", "") or os.environ.get("MOBSF_API_KEY", "")
#Checking that the file to scan exists
file_path = config.get("file_path", None)
if not file_path or not os.path.isfile(file_path):
raise ValueError(f"Invalid or missing file_path in configuration: {file_path}")
try:
self.scan_hash = self.upload_file()
except Exception as e:
raise Exception(f"Failed to upload file to MobSF: {e}")
if self.scan_hash=="":
raise Exception("scan_hash not returned after upload.")
try:
scan_result = self.start_scan()
except Exception as e:
raise Exception(f"Failed to scan file in MobSF: {e}")
# Parse scan_result and convert to findings
# This is a placeholder; actual parsing logic will depend on MobSF's JSON structure
# Here we just create a dummy finding for illustration
try:
json_data = self.get_json_results()
except json.JSONDecodeError:
return self.create_result(
findings=[],
status="failed",
summary={"error": "Invalid JSON output from MOBSF"},
metadata={"engine": "mobsf", "file_scanned": file_path, "mobsf_url": root_uri}
)
self.report_file = 'dump.json'
findings = self.parse_json_results()
"""
findings.append(ModuleFinding(
title="MobSF Finding",
description="Finding generated by the MobSF module",
severity="medium",
category="mobsf",
metadata={"scan_result": scan_result}
))
"""
tmp_summary = self.create_summary(findings)
summary = {
"total_findings": len(findings),
"dangerous_severity": tmp_summary.get('dangerous', 0),
"warning_severity": tmp_summary.get('warning', 0),
"high_severity": tmp_summary.get('high', 0),
"medium_severity": tmp_summary.get('medium', 0),
"low_severity": tmp_summary.get('low', 0),
"info_severity": tmp_summary.get('info', 0),
}
metadata={"engine": "mobsf", "file_scanned": file_path, "mobsf_url": self.mobsf_url}#Add: "json_report": str(json_output_path
return self.create_result(findings=findings, status="success",summary=summary, metadata=metadata)
return ModuleResult(findings=findings, status="success",summary=summary, metadata=metadata)
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="Mobile Security Framework (MobSF)",
version="1.0.0",
description="Integrates MobSF for mobile app security scanning",
author="FuzzForge Team",
category="scanner",
tags=["mobsf", "mobile", "sast", "scanner"]
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""
Config pattern:
**config
findings: []
"tool_name": "FuzzForge Hello World",
"tool_version": "1.0.0",
"mobsf_uri": "(default: http://localhost:8000)",
"file_path": "(path to the APK or IPA file to scan)"
"""
if "mobsf_url" in config and not isinstance(config["mobsf_url"], str):
return False
# Check that mobsf_url does not render 404 when curling /
if "file_path" in config and not isinstance(config["file_path"], str):
return False
return True
if __name__ == "__main__":
import asyncio
module = MobSFModule()
config = {
"mobsf_url": "http://localhost:8877",
"file_path": "./toolbox/modules/android/beetlebug.apk",
}
workspace = Path("./toolbox/modules/android/")
result = asyncio.run(module.execute(config, workspace))
print(result)
+411
View File
@@ -0,0 +1,411 @@
"""
OpenGrep Static Analysis Module
This module uses OpenGrep (open-source version of Semgrep) for pattern-based
static analysis across multiple programming languages.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import tempfile
from pathlib import Path
from typing import Dict, Any, List
import subprocess
import logging
from ..base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
from . import register_module
logger = logging.getLogger(__name__)
@register_module
class OpenGrepModule(BaseModule):
"""OpenGrep static analysis module"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="opengrep",
version="1.45.0",
description="Open-source pattern-based static analysis tool for security vulnerabilities",
author="FuzzForge Team",
category="static_analysis",
tags=["sast", "pattern-matching", "multi-language", "security"],
input_schema={
"type": "object",
"properties": {
"config": {
"type": "string",
"enum": ["auto", "p/security-audit", "p/owasp-top-ten", "p/cwe-top-25"],
"default": "auto",
"description": "Rule configuration to use"
},
"custom_rules_path": {
"type": "string",
"description": "Path to a directory containing custom OpenGrep rules"
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Specific languages to analyze"
},
"include_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to include"
},
"exclude_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to exclude"
},
"max_target_bytes": {
"type": "integer",
"default": 1000000,
"description": "Maximum file size to analyze (bytes)"
},
"timeout": {
"type": "integer",
"default": 300,
"description": "Analysis timeout in seconds"
},
"severity": {
"type": "array",
"items": {"type": "string", "enum": ["ERROR", "WARNING", "INFO"]},
"default": ["ERROR", "WARNING", "INFO"],
"description": "Minimum severity levels to report"
},
"confidence": {
"type": "array",
"items": {"type": "string", "enum": ["HIGH", "MEDIUM", "LOW"]},
"default": ["HIGH", "MEDIUM", "LOW"],
"description": "Minimum confidence levels to report"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"items": {
"type": "object",
"properties": {
"rule_id": {"type": "string"},
"severity": {"type": "string"},
"confidence": {"type": "string"},
"file_path": {"type": "string"},
"line_number": {"type": "integer"}
}
}
}
}
}
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
timeout = config.get("timeout", 300)
if not isinstance(timeout, int) or timeout < 30 or timeout > 3600:
raise ValueError("Timeout must be between 30 and 3600 seconds")
max_bytes = config.get("max_target_bytes", 1000000)
if not isinstance(max_bytes, int) or max_bytes < 1000 or max_bytes > 10000000:
raise ValueError("max_target_bytes must be between 1000 and 10000000")
custom_rules_path = config.get("custom_rules_path")
if custom_rules_path:
if not Path(custom_rules_path).is_dir():
raise ValueError(f"Custom rules path must be a valid directory: {custom_rules_path}")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute OpenGrep static analysis"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running OpenGrep analysis on {workspace}")
# Build opengrep command
cmd = ["opengrep", "scan", "--json"]
# Add configuration
custom_rules_path = config.get("custom_rules_path")
use_custom_rules = False
if custom_rules_path:
cmd.extend(["--config", custom_rules_path])
use_custom_rules = True
else:
config_type = config.get("config", "auto")
if config_type == "auto":
cmd.extend(["--config", "auto"])
else:
cmd.extend(["--config", config_type])
# Add timeout
cmd.extend(["--timeout", str(config.get("timeout", 300))])
# Add max target bytes
cmd.extend(["--max-target-bytes", str(config.get("max_target_bytes", 1000000))])
# Add languages if specified (but NOT when using custom rules, as rules define their own languages)
if config.get("languages") and not use_custom_rules:
langs = ",".join(config["languages"])
cmd.extend(["--lang", langs])
# Add include patterns
if config.get("include_patterns"):
for pattern in config["include_patterns"]:
cmd.extend(["--include", pattern])
# Add exclude patterns
if config.get("exclude_patterns"):
for pattern in config["exclude_patterns"]:
cmd.extend(["--exclude", pattern])
# Add severity filter only if a single level is requested.
severity_levels = config.get("severity", ["ERROR", "WARNING", "INFO"])
if severity_levels and len(severity_levels) == 1:
cmd.extend(["--severity", severity_levels[0]])
# Add confidence filter (if supported in this version)
confidence_levels = config.get("confidence", ["HIGH", "MEDIUM"])
if confidence_levels and len(confidence_levels) < 3: # Only if not all levels
# Note: confidence filtering might need to be done post-processing
pass
# Disable metrics collection
cmd.append("--disable-version-check")
cmd.append("--no-git-ignore")
# Add target directory
cmd.append(str(workspace))
logger.debug(f"Running command: {' '.join(cmd)}")
# Run OpenGrep
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
if process.returncode in [0, 1]: # 0 = no findings, 1 = findings found
findings = self._parse_opengrep_output(stdout.decode(), workspace, config)
else:
error_msg = stderr.decode()
logger.error(f"OpenGrep failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"OpenGrep execution failed: {error_msg}"
)
# Create summary
summary = self._create_summary(findings)
logger.info(f"OpenGrep found {len(findings)} potential issues")
return self.create_result(
findings=findings,
status="success",
summary=summary
)
except Exception as e:
logger.error(f"OpenGrep module failed: {e}")
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, Any]) -> List[ModuleFinding]:
"""Parse OpenGrep JSON output into findings"""
findings = []
if not output.strip():
return findings
try:
data = json.loads(output)
results = data.get("results", [])
logger.debug(f"OpenGrep returned {len(results)} raw results")
# Get filtering criteria
allowed_severities = set(config.get("severity", ["ERROR", "WARNING", "INFO"]))
allowed_confidences = set(config.get("confidence", ["HIGH", "MEDIUM", "LOW"]))
for result in results:
# Extract basic info
rule_id = result.get("check_id", "unknown")
message = result.get("message", "")
extra = result.get("extra", {})
severity = extra.get("severity", "INFO").upper()
# File location info
path_info = result.get("path", "")
start_line = result.get("start", {}).get("line", 0)
end_line = result.get("end", {}).get("line", 0)
start_col = result.get("start", {}).get("col", 0)
end_col = result.get("end", {}).get("col", 0)
# Code snippet
lines = extra.get("lines", "")
# Metadata
rule_metadata = extra.get("metadata", {})
cwe = rule_metadata.get("cwe", [])
owasp = rule_metadata.get("owasp", [])
confidence = extra.get("confidence", rule_metadata.get("confidence", "MEDIUM")).upper()
# Apply severity filter
if severity not in allowed_severities:
continue
# Apply confidence filter
if confidence not in allowed_confidences:
continue
# Make file path relative to workspace
if path_info:
try:
rel_path = Path(path_info).relative_to(workspace)
path_info = str(rel_path)
except ValueError:
pass
# Map severity to our standard levels
finding_severity = self._map_severity(severity)
# Create finding
finding = self.create_finding(
title=f"Security issue: {rule_id}",
description=message or f"OpenGrep rule {rule_id} triggered",
severity=finding_severity,
category=self._get_category(rule_id, extra),
file_path=path_info if path_info else None,
line_start=start_line if start_line > 0 else None,
line_end=end_line if end_line > 0 and end_line != start_line else None,
code_snippet=lines.strip() if lines else None,
recommendation=self._get_recommendation(rule_id, extra),
metadata={
"rule_id": rule_id,
"opengrep_severity": severity,
"confidence": confidence,
"cwe": cwe,
"owasp": owasp,
"fix": extra.get("fix", ""),
"impact": extra.get("impact", ""),
"likelihood": extra.get("likelihood", ""),
"references": extra.get("references", [])
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse OpenGrep output: {e}. Output snippet: {output[:200]}...")
except Exception as e:
logger.warning(f"Error processing OpenGrep results: {e}")
return findings
def _map_severity(self, opengrep_severity: str) -> str:
"""Map OpenGrep severity to our standard severity levels"""
severity_map = {
"ERROR": "high",
"WARNING": "medium",
"INFO": "low"
}
return severity_map.get(opengrep_severity.upper(), "medium")
def _get_category(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Determine finding category based on rule and metadata"""
rule_metadata = extra.get("metadata", {})
cwe_list = rule_metadata.get("cwe", [])
owasp_list = rule_metadata.get("owasp", [])
# Check for common security categories
if any("injection" in rule_id.lower() for x in [rule_id]):
return "injection"
elif any("xss" in rule_id.lower() for x in [rule_id]):
return "xss"
elif any("csrf" in rule_id.lower() for x in [rule_id]):
return "csrf"
elif any("auth" in rule_id.lower() for x in [rule_id]):
return "authentication"
elif any("crypto" in rule_id.lower() for x in [rule_id]):
return "cryptography"
elif cwe_list:
return f"cwe-{cwe_list[0]}"
elif owasp_list:
return f"owasp-{owasp_list[0].replace(' ', '-').lower()}"
else:
return "security"
def _get_recommendation(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Generate recommendation based on rule and metadata"""
fix_suggestion = extra.get("fix", "")
if fix_suggestion:
return fix_suggestion
# Generic recommendations based on rule type
if "injection" in rule_id.lower():
return "Use parameterized queries or prepared statements to prevent injection attacks."
elif "xss" in rule_id.lower():
return "Properly encode/escape user input before displaying it in web pages."
elif "crypto" in rule_id.lower():
return "Use cryptographically secure algorithms and proper key management."
elif "hardcode" in rule_id.lower():
return "Remove hardcoded secrets and use secure configuration management."
else:
return "Review this security issue and apply appropriate fixes based on your security requirements."
def _create_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
"""Create analysis summary"""
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
category_counts = {}
rule_counts = {}
for finding in findings:
# Count by severity
severity_counts[finding.severity] += 1
# Count by category
category = finding.category
category_counts[category] = category_counts.get(category, 0) + 1
# Count by rule
rule_id = finding.metadata.get("rule_id", "unknown")
rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1
return {
"total_findings": len(findings),
"severity_counts": severity_counts,
"category_counts": category_counts,
"top_rules": dict(sorted(rule_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
}