feat: Add Android static analysis workflow with Jadx, OpenGrep, and MobSF

Comprehensive Android security testing workflow converted from Prefect to Temporal architecture:

Modules (3):
- JadxDecompiler: APK to Java source code decompilation
- OpenGrepAndroid: Static analysis with Android-specific security rules
- MobSFScanner: Comprehensive mobile security framework integration

Custom Rules (13):
- clipboard-sensitive-data, hardcoded-secrets, insecure-data-storage
- insecure-deeplink, insecure-logging, intent-redirection
- sensitive_data_sharedPreferences, sqlite-injection
- vulnerable-activity, vulnerable-content-provider, vulnerable-service
- webview-javascript-enabled, webview-load-arbitrary-url

Workflow:
- 6-phase Temporal workflow: download → Jadx → OpenGrep → MobSF → SARIF → upload
- 4 activities: decompile_with_jadx, scan_with_opengrep, scan_with_mobsf, generate_android_sarif
- SARIF output combining findings from all security tools

Docker Worker:
- ARM64 Mac compatibility via amd64 platform emulation
- Pre-installed: Android SDK, Jadx 1.4.7, OpenGrep 1.45.0, MobSF 3.9.7
- MobSF runs as background service with API key auto-generation
- Added aiohttp for async HTTP communication

Test APKs:
- BeetleBug.apk and shopnest.apk for workflow validation
This commit is contained in:
tduhamel42
2025-10-23 10:25:52 +02:00
parent 171941ef26
commit aa2cd48b00
25 changed files with 2776 additions and 5 deletions
@@ -0,0 +1,25 @@
"""
Android Security Analysis Modules
Modules for Android application security testing:
- JadxDecompiler: APK decompilation using Jadx
- MobSFScanner: Mobile security analysis using MobSF
- OpenGrepAndroid: Static analysis using OpenGrep/Semgrep with Android-specific rules
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
from .jadx_decompiler import JadxDecompiler
from .mobsf_scanner import MobSFScanner
from .opengrep_android import OpenGrepAndroid
__all__ = ["JadxDecompiler", "MobSFScanner", "OpenGrepAndroid"]
@@ -0,0 +1,15 @@
rules:
- id: clipboard-sensitive-data
severity: WARNING
languages: [java]
message: "Sensitive data may be copied to the clipboard."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: security
area: clipboard
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$CLIPBOARD.setPrimaryClip($CLIP)"
@@ -0,0 +1,23 @@
rules:
- id: hardcoded-secrets
severity: WARNING
languages: [java]
message: "Possible hardcoded secret found in variable '$NAME'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: secrets
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: 'String $NAME = "$VAL";'
- pattern: 'final String $NAME = "$VAL";'
- pattern: 'private String $NAME = "$VAL";'
- pattern: 'public static String $NAME = "$VAL";'
- pattern: 'static final String $NAME = "$VAL";'
- pattern-regex: "$NAME =~ /(?i).*(api|key|token|secret|pass|auth|session|bearer|access|private).*/"
@@ -0,0 +1,18 @@
rules:
- id: insecure-data-storage
severity: WARNING
languages: [java]
message: "Potential insecure data storage (external storage)."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern-either:
- pattern: "$CTX.openFileOutput($NAME, $MODE)"
- pattern: "Environment.getExternalStorageDirectory()"
@@ -0,0 +1,16 @@
rules:
- id: insecure-deeplink
severity: WARNING
languages: [xml]
message: "Potential insecure deeplink found in intent-filter."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<intent-filter>
@@ -0,0 +1,21 @@
rules:
- id: insecure-logging
severity: WARNING
languages: [java]
message: "Sensitive data logged via Android Log API."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: logging
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "Log.d($TAG, $MSG)"
- pattern: "Log.e($TAG, $MSG)"
- pattern: "System.out.println($MSG)"
- pattern-regex: "$MSG =~ /(?i).*(password|token|secret|api|auth|session).*/"
@@ -0,0 +1,15 @@
rules:
- id: intent-redirection
severity: WARNING
languages: [java]
message: "Potential intent redirection: using getIntent().getExtras() without validation."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: intent
area: intercomponent
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$ACT.getIntent().getExtras()"
@@ -0,0 +1,18 @@
rules:
- id: sensitive-data-in-shared-preferences
severity: WARNING
languages: [java]
message: "Sensitive data may be stored in SharedPreferences. Please review the key '$KEY'."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M2
category: security
area: storage
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern: "$EDITOR.putString($KEY, $VAL);"
- pattern-regex: "$KEY =~ /(?i).*(username|password|pass|token|auth_token|api_key|secret|sessionid|email).*/"
@@ -0,0 +1,21 @@
rules:
- id: sqlite-injection
severity: ERROR
languages: [java]
message: "Possible SQL injection: concatenated input in rawQuery or execSQL."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: injection
area: database
verification-level: [L1]
paths:
include:
- "**/*.java"
patterns:
- pattern-either:
- pattern: "$DB.rawQuery($QUERY, ...)"
- pattern: "$DB.execSQL($QUERY)"
- pattern-regex: "$QUERY =~ /.*\".*\".*\\+.*/"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-activity
severity: WARNING
languages: [xml]
message: "Activity exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<activity android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-content-provider
severity: WARNING
languages: [xml]
message: "ContentProvider exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<provider android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: vulnerable-service
severity: WARNING
languages: [xml]
message: "Service exported without permission."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
category: component
area: manifest
verification-level: [L1]
paths:
include:
- "**/AndroidManifest.xml"
pattern: |
<service android:exported="true"
@@ -0,0 +1,16 @@
rules:
- id: webview-javascript-enabled
severity: ERROR
languages: [java]
message: "WebView with JavaScript enabled can be dangerous if loading untrusted content."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.getSettings().setJavaScriptEnabled(true)"
@@ -0,0 +1,16 @@
rules:
- id: webview-load-arbitrary-url
severity: WARNING
languages: [java]
message: "Loading unvalidated URL in WebView may cause open redirect or XSS."
metadata:
authors:
- Guerric ELOI (FuzzingLabs)
owasp-mobile: M7
category: webview
area: ui
verification-level: [L1]
paths:
include:
- "**/*.java"
pattern: "$W.loadUrl($URL)"
@@ -0,0 +1,270 @@
"""
Jadx APK Decompilation Module
Decompiles Android APK files to Java source code using Jadx.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import shutil
import logging
from pathlib import Path
from typing import Dict, Any
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleResult
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult
logger = logging.getLogger(__name__)
class JadxDecompiler(BaseModule):
"""Module for decompiling APK files to Java source code using Jadx"""
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="jadx_decompiler",
version="1.5.0",
description="Android APK decompilation using Jadx - converts DEX bytecode to Java source",
author="FuzzForge Team",
category="android",
tags=["android", "jadx", "decompilation", "reverse", "apk"],
input_schema={
"type": "object",
"properties": {
"apk_path": {
"type": "string",
"description": "Path to the APK to decompile (absolute or relative to workspace)",
},
"output_dir": {
"type": "string",
"description": "Directory (relative to workspace) where Jadx output should be written",
"default": "jadx_output",
},
"overwrite": {
"type": "boolean",
"description": "Overwrite existing output directory if present",
"default": True,
},
"threads": {
"type": "integer",
"description": "Number of Jadx decompilation threads",
"default": 4,
"minimum": 1,
"maximum": 32,
},
"decompiler_args": {
"type": "array",
"items": {"type": "string"},
"description": "Additional arguments passed directly to Jadx",
"default": [],
},
},
"required": ["apk_path"],
},
output_schema={
"type": "object",
"properties": {
"output_dir": {
"type": "string",
"description": "Path to decompiled output directory",
},
"source_dir": {
"type": "string",
"description": "Path to decompiled Java sources",
},
"resource_dir": {
"type": "string",
"description": "Path to extracted resources",
},
"java_files": {
"type": "integer",
"description": "Number of Java files decompiled",
},
},
},
requires_workspace=True,
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
apk_path = config.get("apk_path")
if not apk_path:
raise ValueError("'apk_path' must be provided for Jadx decompilation")
threads = config.get("threads", 4)
if not isinstance(threads, int) or threads < 1 or threads > 32:
raise ValueError("threads must be between 1 and 32")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute Jadx decompilation on an APK file.
Args:
config: Configuration dict with apk_path, output_dir, etc.
workspace: Workspace directory path
Returns:
ModuleResult with decompilation summary and metadata
"""
self.start_timer()
try:
self.validate_config(config)
self.validate_workspace(workspace)
workspace = workspace.resolve()
# Resolve APK path
apk_path = Path(config["apk_path"])
if not apk_path.is_absolute():
apk_path = (workspace / apk_path).resolve()
if not apk_path.exists():
raise ValueError(f"APK not found: {apk_path}")
if apk_path.is_dir():
raise ValueError(f"APK path must be a file, not a directory: {apk_path}")
logger.info(f"Decompiling APK: {apk_path}")
# Resolve output directory
output_dir = Path(config.get("output_dir", "jadx_output"))
if not output_dir.is_absolute():
output_dir = (workspace / output_dir).resolve()
# Handle existing output directory
if output_dir.exists():
if config.get("overwrite", True):
logger.info(f"Removing existing output directory: {output_dir}")
shutil.rmtree(output_dir)
else:
raise ValueError(
f"Output directory already exists: {output_dir}. Set overwrite=true to replace it."
)
output_dir.mkdir(parents=True, exist_ok=True)
# Build Jadx command
threads = str(config.get("threads", 4))
extra_args = config.get("decompiler_args", []) or []
cmd = [
"jadx",
"--threads-count",
threads,
"--deobf", # Deobfuscate code
"--output-dir",
str(output_dir),
]
cmd.extend(extra_args)
cmd.append(str(apk_path))
logger.info(f"Running Jadx: {' '.join(cmd)}")
# Execute Jadx
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=str(workspace),
)
stdout, stderr = await process.communicate()
stdout_str = stdout.decode(errors="ignore") if stdout else ""
stderr_str = stderr.decode(errors="ignore") if stderr else ""
if stdout_str:
logger.debug(f"Jadx stdout: {stdout_str[:200]}...")
if stderr_str:
logger.debug(f"Jadx stderr: {stderr_str[:200]}...")
if process.returncode != 0:
error_output = stderr_str or stdout_str or "No error output"
raise RuntimeError(
f"Jadx failed with exit code {process.returncode}: {error_output[:500]}"
)
# Verify output structure
source_dir = output_dir / "sources"
resource_dir = output_dir / "resources"
if not source_dir.exists():
logger.warning(
f"Jadx sources directory not found at expected path: {source_dir}"
)
# Use output_dir as fallback
source_dir = output_dir
# Count decompiled Java files
java_files = 0
if source_dir.exists():
java_files = sum(1 for _ in source_dir.rglob("*.java"))
logger.info(f"Decompiled {java_files} Java files")
# Log sample files for debugging
sample_files = []
for idx, file_path in enumerate(source_dir.rglob("*.java")):
sample_files.append(str(file_path.relative_to(workspace)))
if idx >= 4:
break
if sample_files:
logger.debug(f"Sample Java files: {sample_files}")
# Create summary
summary = {
"output_dir": str(output_dir),
"source_dir": str(source_dir if source_dir.exists() else output_dir),
"resource_dir": str(
resource_dir if resource_dir.exists() else output_dir
),
"java_files": java_files,
"apk_name": apk_path.name,
"apk_size_bytes": apk_path.stat().st_size,
}
metadata = {
"apk_path": str(apk_path),
"output_dir": str(output_dir),
"source_dir": summary["source_dir"],
"resource_dir": summary["resource_dir"],
"threads": threads,
"decompiler": "jadx",
"decompiler_version": "1.5.0",
}
logger.info(
f"✓ Jadx decompilation completed: {java_files} Java files generated"
)
return self.create_result(
findings=[], # Jadx doesn't generate findings, only decompiles
status="success",
summary=summary,
metadata=metadata,
)
except Exception as exc:
logger.error(f"Jadx decompilation failed: {exc}", exc_info=True)
return self.create_result(
findings=[],
status="failed",
error=str(exc),
metadata={"decompiler": "jadx", "apk_path": config.get("apk_path")},
)
@@ -0,0 +1,396 @@
"""
MobSF Scanner Module
Mobile Security Framework (MobSF) integration for comprehensive Android app security analysis.
Performs static analysis on APK files including permissions, manifest analysis, code analysis, and behavior checks.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import hashlib
import json
import logging
import os
from collections import Counter
from pathlib import Path
from typing import Dict, Any, List, Optional
import aiohttp
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
logger = logging.getLogger(__name__)
class MobSFScanner(BaseModule):
"""Mobile Security Framework (MobSF) scanner module for Android applications"""
SEVERITY_MAP = {
"dangerous": "critical",
"high": "high",
"warning": "medium",
"medium": "medium",
"low": "low",
"info": "low",
"secure": "low",
}
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="mobsf_scanner",
version="3.9.7",
description="Comprehensive Android security analysis using Mobile Security Framework (MobSF)",
author="FuzzForge Team",
category="android",
tags=["mobile", "android", "mobsf", "sast", "scanner", "security"],
input_schema={
"type": "object",
"properties": {
"mobsf_url": {
"type": "string",
"description": "MobSF server URL",
"default": "http://localhost:8877",
},
"file_path": {
"type": "string",
"description": "Path to the APK file to scan (absolute or relative to workspace)",
},
"api_key": {
"type": "string",
"description": "MobSF API key (if not provided, will try MOBSF_API_KEY env var)",
"default": None,
},
"rescan": {
"type": "boolean",
"description": "Force rescan even if file was previously analyzed",
"default": False,
},
},
"required": ["file_path"],
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"description": "Security findings from MobSF analysis"
},
"scan_hash": {"type": "string"},
"total_findings": {"type": "integer"},
"severity_counts": {"type": "object"},
}
},
requires_workspace=True,
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
if "mobsf_url" in config and not isinstance(config["mobsf_url"], str):
raise ValueError("mobsf_url must be a string")
file_path = config.get("file_path")
if not file_path:
raise ValueError("file_path is required for MobSF scanning")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute MobSF security analysis on an APK file.
Args:
config: Configuration dict with file_path, mobsf_url, api_key
workspace: Workspace directory path
Returns:
ModuleResult with security findings from MobSF
"""
self.start_timer()
try:
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
mobsf_url = config.get("mobsf_url", "http://localhost:8877")
file_path_str = config["file_path"]
rescan = config.get("rescan", False)
# Get API key from config or environment
api_key = config.get("api_key") or os.environ.get("MOBSF_API_KEY", "")
if not api_key:
logger.warning("No MobSF API key provided. Some functionality may be limited.")
# Resolve APK file path
file_path = Path(file_path_str)
if not file_path.is_absolute():
file_path = (workspace / file_path).resolve()
if not file_path.exists():
raise FileNotFoundError(f"APK file not found: {file_path}")
if not file_path.is_file():
raise ValueError(f"APK path must be a file: {file_path}")
logger.info(f"Starting MobSF scan of APK: {file_path}")
# Upload and scan APK
scan_hash = await self._upload_file(mobsf_url, file_path, api_key)
logger.info(f"APK uploaded to MobSF with hash: {scan_hash}")
# Start scan
await self._start_scan(mobsf_url, scan_hash, api_key, rescan=rescan)
logger.info(f"MobSF scan completed for hash: {scan_hash}")
# Get JSON results
scan_results = await self._get_json_results(mobsf_url, scan_hash, api_key)
# Parse results into findings
findings = self._parse_scan_results(scan_results, file_path)
# Create summary
summary = self._create_summary(findings, scan_hash)
logger.info(f"✓ MobSF scan completed: {len(findings)} findings")
return self.create_result(
findings=findings,
status="success",
summary=summary,
metadata={
"tool": "mobsf",
"tool_version": "3.9.7",
"scan_hash": scan_hash,
"apk_file": str(file_path),
"mobsf_url": mobsf_url,
}
)
except Exception as exc:
logger.error(f"MobSF scanner failed: {exc}", exc_info=True)
return self.create_result(
findings=[],
status="failed",
error=str(exc),
metadata={"tool": "mobsf", "file_path": config.get("file_path")}
)
async def _upload_file(self, mobsf_url: str, file_path: Path, api_key: str) -> str:
"""
Upload APK file to MobSF server.
Returns:
Scan hash for the uploaded file
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
# Create multipart form data
filename = file_path.name
async with aiohttp.ClientSession() as session:
with open(file_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('file',
f,
filename=filename,
content_type='application/vnd.android.package-archive')
async with session.post(
f"{mobsf_url}/api/v1/upload",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to upload file to MobSF: {error_text}")
result = await response.json()
scan_hash = result.get('hash')
if not scan_hash:
raise Exception(f"MobSF upload failed: {result}")
return scan_hash
async def _start_scan(self, mobsf_url: str, scan_hash: str, api_key: str, rescan: bool = False) -> Dict[str, Any]:
"""
Start MobSF scan for uploaded file.
Returns:
Scan result dictionary
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
data = {
'hash': scan_hash,
're_scan': '1' if rescan else '0'
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{mobsf_url}/api/v1/scan",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=600) # 10 minutes for scan
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"MobSF scan failed: {error_text}")
result = await response.json()
return result
async def _get_json_results(self, mobsf_url: str, scan_hash: str, api_key: str) -> Dict[str, Any]:
"""
Retrieve JSON scan results from MobSF.
Returns:
Scan results dictionary
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
data = {'hash': scan_hash}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{mobsf_url}/api/v1/report_json",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to retrieve MobSF results: {error_text}")
return await response.json()
def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List[ModuleFinding]:
"""Parse MobSF JSON results into standardized findings"""
findings = []
# Parse permissions
if 'permissions' in scan_data:
for perm_name, perm_attrs in scan_data['permissions'].items():
if isinstance(perm_attrs, dict):
severity = self.SEVERITY_MAP.get(
perm_attrs.get('status', '').lower(), 'low'
)
finding = self.create_finding(
title=f"Android Permission: {perm_name}",
description=perm_attrs.get('description', 'No description'),
severity=severity,
category="android-permission",
metadata={
'permission': perm_name,
'status': perm_attrs.get('status'),
'info': perm_attrs.get('info'),
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse manifest analysis
if 'manifest_analysis' in scan_data:
manifest_findings = scan_data['manifest_analysis'].get('manifest_findings', [])
for item in manifest_findings:
if isinstance(item, dict):
severity = self.SEVERITY_MAP.get(item.get('severity', '').lower(), 'medium')
finding = self.create_finding(
title=item.get('title') or item.get('name') or "Manifest Issue",
description=item.get('description', 'No description'),
severity=severity,
category="android-manifest",
metadata={
'rule': item.get('rule'),
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse code analysis
if 'code_analysis' in scan_data:
code_findings = scan_data['code_analysis'].get('findings', {})
for finding_name, finding_data in code_findings.items():
if isinstance(finding_data, dict):
metadata_dict = finding_data.get('metadata', {})
severity = self.SEVERITY_MAP.get(
metadata_dict.get('severity', '').lower(), 'medium'
)
files_list = finding_data.get('files', [])
file_path = files_list[0] if files_list else None
finding = self.create_finding(
title=finding_name,
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-code-analysis",
file_path=file_path,
metadata={
'cwe': metadata_dict.get('cwe'),
'owasp': metadata_dict.get('owasp'),
'files': files_list,
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse behavior analysis
if 'behaviour' in scan_data:
for key, value in scan_data['behaviour'].items():
if isinstance(value, dict):
metadata_dict = value.get('metadata', {})
labels = metadata_dict.get('label', [])
label = labels[0] if labels else 'Unknown Behavior'
severity = self.SEVERITY_MAP.get(
metadata_dict.get('severity', '').lower(), 'medium'
)
files_list = value.get('files', [])
finding = self.create_finding(
title=f"Behavior: {label}",
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-behavior",
metadata={
'files': files_list,
'tool': 'mobsf',
}
)
findings.append(finding)
logger.debug(f"Parsed {len(findings)} findings from MobSF results")
return findings
def _create_summary(self, findings: List[ModuleFinding], scan_hash: str) -> Dict[str, Any]:
"""Create analysis summary"""
severity_counter = Counter()
category_counter = Counter()
for finding in findings:
severity_counter[finding.severity] += 1
category_counter[finding.category] += 1
return {
"scan_hash": scan_hash,
"total_findings": len(findings),
"severity_counts": dict(severity_counter),
"category_counts": dict(category_counter),
}
@@ -0,0 +1,442 @@
"""
OpenGrep Android Static Analysis Module
Pattern-based static analysis for Android applications using OpenGrep/Semgrep
with Android-specific security rules.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import asyncio
import json
import logging
from pathlib import Path
from typing import Dict, Any, List
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
logger = logging.getLogger(__name__)
class OpenGrepAndroid(BaseModule):
"""OpenGrep static analysis module specialized for Android security"""
def get_metadata(self) -> ModuleMetadata:
"""Get module metadata"""
return ModuleMetadata(
name="opengrep_android",
version="1.45.0",
description="Android-focused static analysis using OpenGrep/Semgrep with custom security rules for Java/Kotlin",
author="FuzzForge Team",
category="android",
tags=["sast", "android", "opengrep", "semgrep", "java", "kotlin", "security"],
input_schema={
"type": "object",
"properties": {
"config": {
"type": "string",
"enum": ["auto", "p/security-audit", "p/owasp-top-ten", "p/cwe-top-25"],
"default": "auto",
"description": "Rule configuration to use"
},
"custom_rules_path": {
"type": "string",
"description": "Path to a directory containing custom OpenGrep rules (Android-specific rules recommended)",
"default": None,
},
"languages": {
"type": "array",
"items": {"type": "string"},
"description": "Specific languages to analyze (defaults to java, kotlin for Android)",
"default": ["java", "kotlin"],
},
"include_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to include",
"default": [],
},
"exclude_patterns": {
"type": "array",
"items": {"type": "string"},
"description": "File patterns to exclude",
"default": [],
},
"max_target_bytes": {
"type": "integer",
"default": 1000000,
"description": "Maximum file size to analyze (bytes)"
},
"timeout": {
"type": "integer",
"default": 300,
"description": "Analysis timeout in seconds"
},
"severity": {
"type": "array",
"items": {"type": "string", "enum": ["ERROR", "WARNING", "INFO"]},
"default": ["ERROR", "WARNING", "INFO"],
"description": "Minimum severity levels to report"
},
"confidence": {
"type": "array",
"items": {"type": "string", "enum": ["HIGH", "MEDIUM", "LOW"]},
"default": ["HIGH", "MEDIUM", "LOW"],
"description": "Minimum confidence levels to report"
}
}
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"description": "Security findings from OpenGrep analysis"
},
"total_findings": {"type": "integer"},
"severity_counts": {"type": "object"},
"files_analyzed": {"type": "integer"},
}
},
requires_workspace=True,
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate configuration"""
timeout = config.get("timeout", 300)
if not isinstance(timeout, int) or timeout < 30 or timeout > 3600:
raise ValueError("Timeout must be between 30 and 3600 seconds")
max_bytes = config.get("max_target_bytes", 1000000)
if not isinstance(max_bytes, int) or max_bytes < 1000 or max_bytes > 10000000:
raise ValueError("max_target_bytes must be between 1000 and 10000000")
custom_rules_path = config.get("custom_rules_path")
if custom_rules_path:
rules_path = Path(custom_rules_path)
if not rules_path.exists():
logger.warning(f"Custom rules path does not exist: {custom_rules_path}")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""Execute OpenGrep static analysis on Android code"""
self.start_timer()
try:
# Validate inputs
self.validate_config(config)
self.validate_workspace(workspace)
logger.info(f"Running OpenGrep Android analysis on {workspace}")
# Build opengrep command
cmd = ["opengrep", "scan", "--json"]
# Add configuration
custom_rules_path = config.get("custom_rules_path")
use_custom_rules = False
if custom_rules_path and Path(custom_rules_path).exists():
cmd.extend(["--config", custom_rules_path])
use_custom_rules = True
logger.info(f"Using custom Android rules from: {custom_rules_path}")
else:
config_type = config.get("config", "auto")
if config_type == "auto":
cmd.extend(["--config", "auto"])
else:
cmd.extend(["--config", config_type])
# Add timeout
cmd.extend(["--timeout", str(config.get("timeout", 300))])
# Add max target bytes
cmd.extend(["--max-target-bytes", str(config.get("max_target_bytes", 1000000))])
# Add languages if specified (but NOT when using custom rules)
languages = config.get("languages", ["java", "kotlin"])
if languages and not use_custom_rules:
langs = ",".join(languages)
cmd.extend(["--lang", langs])
logger.debug(f"Analyzing languages: {langs}")
# Add include patterns
include_patterns = config.get("include_patterns", [])
for pattern in include_patterns:
cmd.extend(["--include", pattern])
# Add exclude patterns
exclude_patterns = config.get("exclude_patterns", [])
for pattern in exclude_patterns:
cmd.extend(["--exclude", pattern])
# Add severity filter if single level requested
severity_levels = config.get("severity", ["ERROR", "WARNING", "INFO"])
if severity_levels and len(severity_levels) == 1:
cmd.extend(["--severity", severity_levels[0]])
# Disable metrics collection
cmd.append("--disable-version-check")
cmd.append("--no-git-ignore")
# Add target directory
cmd.append(str(workspace))
logger.debug(f"Running command: {' '.join(cmd)}")
# Run OpenGrep
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=workspace
)
stdout, stderr = await process.communicate()
# Parse results
findings = []
if process.returncode in [0, 1]: # 0 = no findings, 1 = findings found
findings = self._parse_opengrep_output(stdout.decode(), workspace, config)
logger.info(f"OpenGrep found {len(findings)} potential security issues")
else:
error_msg = stderr.decode()
logger.error(f"OpenGrep failed: {error_msg}")
return self.create_result(
findings=[],
status="failed",
error=f"OpenGrep execution failed (exit code {process.returncode}): {error_msg[:500]}"
)
# Create summary
summary = self._create_summary(findings)
return self.create_result(
findings=findings,
status="success",
summary=summary,
metadata={
"tool": "opengrep",
"tool_version": "1.45.0",
"languages": languages,
"custom_rules": bool(custom_rules_path),
}
)
except Exception as e:
logger.error(f"OpenGrep Android module failed: {e}", exc_info=True)
return self.create_result(
findings=[],
status="failed",
error=str(e)
)
def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, Any]) -> List[ModuleFinding]:
"""Parse OpenGrep JSON output into findings"""
findings = []
if not output.strip():
return findings
try:
data = json.loads(output)
results = data.get("results", [])
logger.debug(f"OpenGrep returned {len(results)} raw results")
# Get filtering criteria
allowed_severities = set(config.get("severity", ["ERROR", "WARNING", "INFO"]))
allowed_confidences = set(config.get("confidence", ["HIGH", "MEDIUM", "LOW"]))
for result in results:
# Extract basic info
rule_id = result.get("check_id", "unknown")
message = result.get("message", "")
extra = result.get("extra", {})
severity = extra.get("severity", "INFO").upper()
# File location info
path_info = result.get("path", "")
start_line = result.get("start", {}).get("line", 0)
end_line = result.get("end", {}).get("line", 0)
start_col = result.get("start", {}).get("col", 0)
end_col = result.get("end", {}).get("col", 0)
# Code snippet
lines = extra.get("lines", "")
# Metadata
rule_metadata = extra.get("metadata", {})
cwe = rule_metadata.get("cwe", [])
owasp = rule_metadata.get("owasp", [])
confidence = extra.get("confidence", rule_metadata.get("confidence", "MEDIUM")).upper()
# Apply severity filter
if severity not in allowed_severities:
continue
# Apply confidence filter
if confidence not in allowed_confidences:
continue
# Make file path relative to workspace
if path_info:
try:
rel_path = Path(path_info).relative_to(workspace)
path_info = str(rel_path)
except ValueError:
pass
# Map severity to our standard levels
finding_severity = self._map_severity(severity)
# Create finding
finding = self.create_finding(
title=f"Android Security: {rule_id}",
description=message or f"OpenGrep rule {rule_id} triggered",
severity=finding_severity,
category=self._get_category(rule_id, extra),
file_path=path_info if path_info else None,
line_start=start_line if start_line > 0 else None,
line_end=end_line if end_line > 0 and end_line != start_line else None,
code_snippet=lines.strip() if lines else None,
recommendation=self._get_recommendation(rule_id, extra),
metadata={
"rule_id": rule_id,
"opengrep_severity": severity,
"confidence": confidence,
"cwe": cwe,
"owasp": owasp,
"fix": extra.get("fix", ""),
"impact": extra.get("impact", ""),
"likelihood": extra.get("likelihood", ""),
"references": extra.get("references", []),
"tool": "opengrep",
}
)
findings.append(finding)
except json.JSONDecodeError as e:
logger.warning(f"Failed to parse OpenGrep output: {e}. Output snippet: {output[:200]}...")
except Exception as e:
logger.warning(f"Error processing OpenGrep results: {e}", exc_info=True)
return findings
def _map_severity(self, opengrep_severity: str) -> str:
"""Map OpenGrep severity to our standard severity levels"""
severity_map = {
"ERROR": "high",
"WARNING": "medium",
"INFO": "low"
}
return severity_map.get(opengrep_severity.upper(), "medium")
def _get_category(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Determine finding category based on rule and metadata"""
rule_metadata = extra.get("metadata", {})
cwe_list = rule_metadata.get("cwe", [])
owasp_list = rule_metadata.get("owasp", [])
rule_lower = rule_id.lower()
# Android-specific categories
if "injection" in rule_lower or "sql" in rule_lower:
return "injection"
elif "intent" in rule_lower:
return "android-intent"
elif "webview" in rule_lower:
return "android-webview"
elif "deeplink" in rule_lower:
return "android-deeplink"
elif "storage" in rule_lower or "sharedpreferences" in rule_lower:
return "android-storage"
elif "logging" in rule_lower or "log" in rule_lower:
return "android-logging"
elif "clipboard" in rule_lower:
return "android-clipboard"
elif "activity" in rule_lower or "service" in rule_lower or "provider" in rule_lower:
return "android-component"
elif "crypto" in rule_lower or "encrypt" in rule_lower:
return "cryptography"
elif "hardcode" in rule_lower or "secret" in rule_lower:
return "secrets"
elif "auth" in rule_lower:
return "authentication"
elif cwe_list:
return f"cwe-{cwe_list[0]}"
elif owasp_list:
return f"owasp-{owasp_list[0].replace(' ', '-').lower()}"
else:
return "android-security"
def _get_recommendation(self, rule_id: str, extra: Dict[str, Any]) -> str:
"""Generate recommendation based on rule and metadata"""
fix_suggestion = extra.get("fix", "")
if fix_suggestion:
return fix_suggestion
rule_lower = rule_id.lower()
# Android-specific recommendations
if "injection" in rule_lower or "sql" in rule_lower:
return "Use parameterized queries or Room database with type-safe queries to prevent SQL injection."
elif "intent" in rule_lower:
return "Validate all incoming Intent data and use explicit Intents when possible to prevent Intent manipulation attacks."
elif "webview" in rule_lower and "javascript" in rule_lower:
return "Disable JavaScript in WebView if not needed, or implement proper JavaScript interfaces with @JavascriptInterface annotation."
elif "deeplink" in rule_lower:
return "Validate all deeplink URLs and sanitize user input to prevent deeplink hijacking attacks."
elif "storage" in rule_lower or "sharedpreferences" in rule_lower:
return "Encrypt sensitive data before storing in SharedPreferences or use EncryptedSharedPreferences for Android API 23+."
elif "logging" in rule_lower:
return "Remove sensitive data from logs in production builds. Use ProGuard/R8 to strip logging statements."
elif "clipboard" in rule_lower:
return "Avoid placing sensitive data on the clipboard. If necessary, clear clipboard data when no longer needed."
elif "crypto" in rule_lower:
return "Use modern cryptographic algorithms (AES-GCM, RSA-OAEP) and Android Keystore for key management."
elif "hardcode" in rule_lower or "secret" in rule_lower:
return "Remove hardcoded secrets. Use Android Keystore, environment variables, or secure configuration management."
else:
return "Review this Android security issue and apply appropriate fixes based on Android security best practices."
def _create_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]:
"""Create analysis summary"""
severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0}
category_counts = {}
rule_counts = {}
for finding in findings:
# Count by severity
severity_counts[finding.severity] += 1
# Count by category
category = finding.category
category_counts[category] = category_counts.get(category, 0) + 1
# Count by rule
rule_id = finding.metadata.get("rule_id", "unknown")
rule_counts[rule_id] = rule_counts.get(rule_id, 0) + 1
return {
"total_findings": len(findings),
"severity_counts": severity_counts,
"category_counts": category_counts,
"top_rules": dict(sorted(rule_counts.items(), key=lambda x: x[1], reverse=True)[:10]),
"files_analyzed": len(set(f.file_path for f in findings if f.file_path))
}