mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-12 20:32:46 +00:00
- Remove unused imports from mobsf_scanner.py (asyncio, hashlib, json, Optional) - Remove unused variables from opengrep_android.py (start_col, end_col) - Remove duplicate Path import from workflow.py
441 lines
18 KiB
Python
441 lines
18 KiB
Python
"""
|
|
OpenGrep Android Static Analysis Module
|
|
|
|
Pattern-based static analysis for Android applications using OpenGrep/Semgrep
|
|
with Android-specific security rules.
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OpenGrepAndroid(BaseModule):
|
|
"""OpenGrep static analysis module specialized for Android security"""
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
"""Get module metadata"""
|
|
return ModuleMetadata(
|
|
name="opengrep_android",
|
|
version="1.45.0",
|
|
description="Android-focused static analysis using OpenGrep/Semgrep with custom security rules for Java/Kotlin",
|
|
author="FuzzForge Team",
|
|
category="android",
|
|
tags=["sast", "android", "opengrep", "semgrep", "java", "kotlin", "security"],
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"config": {
|
|
"type": "string",
|
|
"enum": ["auto", "p/security-audit", "p/owasp-top-ten", "p/cwe-top-25"],
|
|
"default": "auto",
|
|
"description": "Rule configuration to use"
|
|
},
|
|
"custom_rules_path": {
|
|
"type": "string",
|
|
"description": "Path to a directory containing custom OpenGrep rules (Android-specific rules recommended)",
|
|
"default": None,
|
|
},
|
|
"languages": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "Specific languages to analyze (defaults to java, kotlin for Android)",
|
|
"default": ["java", "kotlin"],
|
|
},
|
|
"include_patterns": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "File patterns to include",
|
|
"default": [],
|
|
},
|
|
"exclude_patterns": {
|
|
"type": "array",
|
|
"items": {"type": "string"},
|
|
"description": "File patterns to exclude",
|
|
"default": [],
|
|
},
|
|
"max_target_bytes": {
|
|
"type": "integer",
|
|
"default": 1000000,
|
|
"description": "Maximum file size to analyze (bytes)"
|
|
},
|
|
"timeout": {
|
|
"type": "integer",
|
|
"default": 300,
|
|
"description": "Analysis timeout in seconds"
|
|
},
|
|
"severity": {
|
|
"type": "array",
|
|
"items": {"type": "string", "enum": ["ERROR", "WARNING", "INFO"]},
|
|
"default": ["ERROR", "WARNING", "INFO"],
|
|
"description": "Minimum severity levels to report"
|
|
},
|
|
"confidence": {
|
|
"type": "array",
|
|
"items": {"type": "string", "enum": ["HIGH", "MEDIUM", "LOW"]},
|
|
"default": ["HIGH", "MEDIUM", "LOW"],
|
|
"description": "Minimum confidence levels to report"
|
|
}
|
|
}
|
|
},
|
|
output_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "Security findings from OpenGrep analysis"
|
|
},
|
|
"total_findings": {"type": "integer"},
|
|
"severity_counts": {"type": "object"},
|
|
"files_analyzed": {"type": "integer"},
|
|
}
|
|
},
|
|
requires_workspace=True,
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate configuration"""
|
|
timeout = config.get("timeout", 300)
|
|
if not isinstance(timeout, int) or timeout < 30 or timeout > 3600:
|
|
raise ValueError("Timeout must be between 30 and 3600 seconds")
|
|
|
|
max_bytes = config.get("max_target_bytes", 1000000)
|
|
if not isinstance(max_bytes, int) or max_bytes < 1000 or max_bytes > 10000000:
|
|
raise ValueError("max_target_bytes must be between 1000 and 10000000")
|
|
|
|
custom_rules_path = config.get("custom_rules_path")
|
|
if custom_rules_path:
|
|
rules_path = Path(custom_rules_path)
|
|
if not rules_path.exists():
|
|
logger.warning(f"Custom rules path does not exist: {custom_rules_path}")
|
|
|
|
return True
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""Execute OpenGrep static analysis on Android code"""
|
|
self.start_timer()
|
|
|
|
try:
|
|
# Validate inputs
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
logger.info(f"Running OpenGrep Android analysis on {workspace}")
|
|
|
|
# Build opengrep command
|
|
cmd = ["opengrep", "scan", "--json"]
|
|
|
|
# Add configuration
|
|
custom_rules_path = config.get("custom_rules_path")
|
|
use_custom_rules = False
|
|
if custom_rules_path and Path(custom_rules_path).exists():
|
|
cmd.extend(["--config", custom_rules_path])
|
|
use_custom_rules = True
|
|
logger.info(f"Using custom Android rules from: {custom_rules_path}")
|
|
else:
|
|
config_type = config.get("config", "auto")
|
|
if config_type == "auto":
|
|
cmd.extend(["--config", "auto"])
|
|
else:
|
|
cmd.extend(["--config", config_type])
|
|
|
|
# Add timeout
|
|
cmd.extend(["--timeout", str(config.get("timeout", 300))])
|
|
|
|
# Add max target bytes
|
|
cmd.extend(["--max-target-bytes", str(config.get("max_target_bytes", 1000000))])
|
|
|
|
# Add languages if specified (but NOT when using custom rules)
|
|
languages = config.get("languages", ["java", "kotlin"])
|
|
if languages and not use_custom_rules:
|
|
langs = ",".join(languages)
|
|
cmd.extend(["--lang", langs])
|
|
logger.debug(f"Analyzing languages: {langs}")
|
|
|
|
# Add include patterns
|
|
include_patterns = config.get("include_patterns", [])
|
|
for pattern in include_patterns:
|
|
cmd.extend(["--include", pattern])
|
|
|
|
# Add exclude patterns
|
|
exclude_patterns = config.get("exclude_patterns", [])
|
|
for pattern in exclude_patterns:
|
|
cmd.extend(["--exclude", pattern])
|
|
|
|
# Add severity filter if single level requested
|
|
severity_levels = config.get("severity", ["ERROR", "WARNING", "INFO"])
|
|
if severity_levels and len(severity_levels) == 1:
|
|
cmd.extend(["--severity", severity_levels[0]])
|
|
|
|
# Disable metrics collection
|
|
cmd.append("--disable-version-check")
|
|
cmd.append("--no-git-ignore")
|
|
|
|
# Add target directory
|
|
cmd.append(str(workspace))
|
|
|
|
logger.debug(f"Running command: {' '.join(cmd)}")
|
|
|
|
# Run OpenGrep
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
cwd=workspace
|
|
)
|
|
|
|
stdout, stderr = await process.communicate()
|
|
|
|
# Parse results
|
|
findings = []
|
|
if process.returncode in [0, 1]: # 0 = no findings, 1 = findings found
|
|
findings = self._parse_opengrep_output(stdout.decode(), workspace, config)
|
|
logger.info(f"OpenGrep found {len(findings)} potential security issues")
|
|
else:
|
|
error_msg = stderr.decode()
|
|
logger.error(f"OpenGrep failed: {error_msg}")
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=f"OpenGrep execution failed (exit code {process.returncode}): {error_msg[:500]}"
|
|
)
|
|
|
|
# Create summary
|
|
summary = self._create_summary(findings)
|
|
|
|
return self.create_result(
|
|
findings=findings,
|
|
status="success",
|
|
summary=summary,
|
|
metadata={
|
|
"tool": "opengrep",
|
|
"tool_version": "1.45.0",
|
|
"languages": languages,
|
|
"custom_rules": bool(custom_rules_path),
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"OpenGrep Android module failed: {e}", exc_info=True)
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=str(e)
|
|
)
|
|
|
|
def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, Any]) -> List[ModuleFinding]:
|
|
"""Parse OpenGrep JSON output into findings"""
|
|
findings = []
|
|
|
|
if not output.strip():
|
|
return findings
|
|
|
|
try:
|
|
data = json.loads(output)
|
|
results = data.get("results", [])
|
|
logger.debug(f"OpenGrep returned {len(results)} raw results")
|
|
|
|
# Get filtering criteria
|
|
allowed_severities = set(config.get("severity", ["ERROR", "WARNING", "INFO"]))
|
|
allowed_confidences = set(config.get("confidence", ["HIGH", "MEDIUM", "LOW"]))
|
|
|
|
for result in results:
|
|
# Extract basic info
|
|
rule_id = result.get("check_id", "unknown")
|
|
message = result.get("message", "")
|
|
extra = result.get("extra", {})
|
|
severity = extra.get("severity", "INFO").upper()
|
|
|
|
# File location info
|
|
path_info = result.get("path", "")
|
|
start_line = result.get("start", {}).get("line", 0)
|
|
end_line = result.get("end", {}).get("line", 0)
|
|
|
|
# Code snippet
|
|
lines = extra.get("lines", "")
|
|
|
|
# Metadata
|
|
rule_metadata = extra.get("metadata", {})
|
|
cwe = rule_metadata.get("cwe", [])
|
|
owasp = rule_metadata.get("owasp", [])
|
|
confidence = extra.get("confidence", rule_metadata.get("confidence", "MEDIUM")).upper()
|
|
|
|
# Apply severity filter
|
|
if severity not in allowed_severities:
|
|
continue
|
|
|
|
# Apply confidence filter
|
|
if confidence not in allowed_confidences:
|
|
continue
|
|
|
|
# Make file path relative to workspace
|
|
if path_info:
|
|
try:
|
|
rel_path = Path(path_info).relative_to(workspace)
|
|
path_info = str(rel_path)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Map severity to our standard levels
|
|
finding_severity = self._map_severity(severity)
|
|
|
|
# Create finding
|
|
finding = self.create_finding(
|
|
title=f"Android Security: {rule_id}",
|
|
description=message or f"OpenGrep rule {rule_id} triggered",
|
|
severity=finding_severity,
|
|
category=self._get_category(rule_id, extra),
|
|
file_path=path_info if path_info else None,
|
|
line_start=start_line if start_line > 0 else None,
|
|
line_end=end_line if end_line > 0 and end_line != start_line else None,
|
|
code_snippet=lines.strip() if lines else None,
|
|
recommendation=self._get_recommendation(rule_id, extra),
|
|
metadata={
|
|
"rule_id": rule_id,
|
|
"opengrep_severity": severity,
|
|
"confidence": confidence,
|
|
"cwe": cwe,
|
|
"owasp": owasp,
|
|
"fix": extra.get("fix", ""),
|
|
"impact": extra.get("impact", ""),
|
|
"likelihood": extra.get("likelihood", ""),
|
|
"references": extra.get("references", []),
|
|
"tool": "opengrep",
|
|
}
|
|
)
|
|
|
|
findings.append(finding)
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.warning(f"Failed to parse OpenGrep output: {e}. Output snippet: {output[:200]}...")
|
|
except Exception as e:
|
|
logger.warning(f"Error processing OpenGrep results: {e}", exc_info=True)
|
|
|
|
return findings
|
|
|
|
def _map_severity(self, opengrep_severity: str) -> str:
|
|
"""Map OpenGrep severity to our standard severity levels"""
|
|
severity_map = {
|
|
"ERROR": "high",
|
|
"WARNING": "medium",
|
|
"INFO": "low"
|
|
}
|
|
return severity_map.get(opengrep_severity.upper(), "medium")
|
|
|
|
def _get_category(self, rule_id: str, extra: Dict[str, Any]) -> str:
|
|
"""Determine finding category based on rule and metadata"""
|
|
rule_metadata = extra.get("metadata", {})
|
|
cwe_list = rule_metadata.get("cwe", [])
|
|
owasp_list = rule_metadata.get("owasp", [])
|
|
|
|
rule_lower = rule_id.lower()
|
|
|
|
# Android-specific categories
|
|
if "injection" in rule_lower or "sql" in rule_lower:
|
|
return "injection"
|
|
elif "intent" in rule_lower:
|
|
return "android-intent"
|
|
elif "webview" in rule_lower:
|
|
return "android-webview"
|
|
elif "deeplink" in rule_lower:
|
|
return "android-deeplink"
|
|
elif "storage" in rule_lower or "sharedpreferences" in rule_lower:
|
|
return "android-storage"
|
|
elif "logging" in rule_lower or "log" in rule_lower:
|
|
return "android-logging"
|
|
elif "clipboard" in rule_lower:
|
|
return "android-clipboard"
|
|
elif "activity" in rule_lower or "service" in rule_lower or "provider" in rule_lower:
|
|
return "android-component"
|
|
elif "crypto" in rule_lower or "encrypt" in rule_lower:
|
|
return "cryptography"
|
|
elif "hardcode" in rule_lower or "secret" in rule_lower:
|
|
return "secrets"
|
|
elif "auth" in rule_lower:
|
|
return "authentication"
|
|
elif cwe_list:
|
|
return f"cwe-{cwe_list[0]}"
|
|
elif owasp_list:
|
|
return f"owasp-{owasp_list[0].replace(' ', '-').lower()}"
|
|
else:
|
|
return "android-security"
|
|
|
|
def _get_recommendation(self, rule_id: str, extra: Dict[str, Any]) -> str:
|
|
"""Generate recommendation based on rule and metadata"""
|
|
fix_suggestion = extra.get("fix", "")
|
|
if fix_suggestion:
|
|
return fix_suggestion
|
|
|
|
rule_lower = rule_id.lower()
|
|
|
|
# Android-specific recommendations
|
|
if "injection" in rule_lower or "sql" in rule_lower:
|
|
return "Use parameterized queries or Room database with type-safe queries to prevent SQL injection."
|
|
elif "intent" in rule_lower:
|
|
return "Validate all incoming Intent data and use explicit Intents when possible to prevent Intent manipulation attacks."
|
|
elif "webview" in rule_lower and "javascript" in rule_lower:
|
|
return "Disable JavaScript in WebView if not needed, or implement proper JavaScript interfaces with @JavascriptInterface annotation."
|
|
elif "deeplink" in rule_lower:
|
|
return "Validate all deeplink URLs and sanitize user input to prevent deeplink hijacking attacks."
|
|
elif "storage" in rule_lower or "sharedpreferences" in rule_lower:
|
|
return "Encrypt sensitive data before storing in SharedPreferences or use EncryptedSharedPreferences for Android API 23+."
|
|
elif "logging" in rule_lower:
|
|
return "Remove sensitive data from logs in production builds. Use ProGuard/R8 to strip logging statements."
|
|
elif "clipboard" in rule_lower:
|
|
return "Avoid placing sensitive data on the clipboard. If necessary, clear clipboard data when no longer needed."
|
|
elif "crypto" in rule_lower:
|
|
return "Use modern cryptographic algorithms (AES-GCM, RSA-OAEP) and Android Keystore for key management."
|
|
elif "hardcode" in rule_lower or "secret" in rule_lower:
|
|
return "Remove hardcoded secrets. Use Android Keystore, environment variables, or secure configuration management."
|
|
else:
|
|
return "Review this Android security issue and apply appropriate fixes based on Android security best practices."
|
|
|
|
def _create_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
|
|
"""Create analysis summary"""
|
|
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
|
|
category_counts = {}
|
|
rule_counts = {}
|
|
|
|
for finding in findings:
|
|
# Count by severity
|
|
severity_counts[finding.severity] += 1
|
|
|
|
# Count by category
|
|
category = finding.category
|
|
category_counts[category] = category_counts.get(category, 0) + 1
|
|
|
|
# Count by rule
|
|
rule_id = finding.metadata.get("rule_id", "unknown")
|
|
rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1
|
|
|
|
return {
|
|
"total_findings": len(findings),
|
|
"severity_counts": severity_counts,
|
|
"category_counts": category_counts,
|
|
"top_rules": dict(sorted(rule_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
|
|
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
|
|
}
|