Files
fuzzforge_ai/backend/toolbox/modules/android/mobsf_scanner.py
tduhamel42 fccd8f32ab refactor: Update all modules to use new create_finding signature
Updated 10 modules to use the new create_finding() signature with required rule_id and found_by parameters:

- llm_analyzer.py: Added FoundBy and LLMContext for AI-detected findings
- bandit_analyzer.py: Added tool attribution and moved CWE/confidence to proper fields
- security_analyzer.py: Updated all three finding types (secrets, SQL injection, dangerous functions)
- mypy_analyzer.py: Added tool attribution and moved column info to column_start
- mobsf_scanner.py: Updated all 6 finding types (permissions, manifest, code analysis, behavior) with proper line number handling
- opengrep_android.py: Added tool attribution, proper CWE/OWASP formatting, and confidence mapping
- dependency_scanner.py: Added pip-audit attribution for CVE findings
- file_scanner.py: Updated both sensitive file and enumeration findings
- cargo_fuzzer.py: Added fuzzer type attribution for crash findings
- atheris_fuzzer.py: Added fuzzer type attribution for Python crash findings

All modules now properly track:
- Finding source (module, tool name, version, type)
- Confidence levels (high/medium/low)
- CWE and OWASP mappings where applicable
- LLM context for AI-detected issues
2025-11-14 10:51:38 +01:00

493 lines
20 KiB
Python

"""
MobSF Scanner Module
Mobile Security Framework (MobSF) integration for comprehensive Android app security analysis.
Performs static analysis on APK files including permissions, manifest analysis, code analysis, and behavior checks.
"""
# Copyright (c) 2025 FuzzingLabs
#
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
# at the root of this repository for details.
#
# After the Change Date (four years from publication), this version of the
# Licensed Work will be made available under the Apache License, Version 2.0.
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
#
# Additional attribution and requirements are provided in the NOTICE file.
import logging
import os
from collections import Counter
from pathlib import Path
from typing import Dict, Any, List
import aiohttp
try:
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy
except ImportError:
try:
from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy
except ImportError:
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy
logger = logging.getLogger(__name__)
class MobSFScanner(BaseModule):
"""Mobile Security Framework (MobSF) scanner module for Android applications"""
SEVERITY_MAP = {
"dangerous": "critical",
"high": "high",
"warning": "medium",
"medium": "medium",
"low": "low",
"info": "low",
"secure": "low",
}
def get_metadata(self) -> ModuleMetadata:
return ModuleMetadata(
name="mobsf_scanner",
version="3.9.7",
description="Comprehensive Android security analysis using Mobile Security Framework (MobSF)",
author="FuzzForge Team",
category="android",
tags=["mobile", "android", "mobsf", "sast", "scanner", "security"],
input_schema={
"type": "object",
"properties": {
"mobsf_url": {
"type": "string",
"description": "MobSF server URL",
"default": "http://localhost:8877",
},
"file_path": {
"type": "string",
"description": "Path to the APK file to scan (absolute or relative to workspace)",
},
"api_key": {
"type": "string",
"description": "MobSF API key (if not provided, will try MOBSF_API_KEY env var)",
"default": None,
},
"rescan": {
"type": "boolean",
"description": "Force rescan even if file was previously analyzed",
"default": False,
},
},
"required": ["file_path"],
},
output_schema={
"type": "object",
"properties": {
"findings": {
"type": "array",
"description": "Security findings from MobSF analysis"
},
"scan_hash": {"type": "string"},
"total_findings": {"type": "integer"},
"severity_counts": {"type": "object"},
}
},
requires_workspace=True,
)
def validate_config(self, config: Dict[str, Any]) -> bool:
"""Validate module configuration"""
if "mobsf_url" in config and not isinstance(config["mobsf_url"], str):
raise ValueError("mobsf_url must be a string")
file_path = config.get("file_path")
if not file_path:
raise ValueError("file_path is required for MobSF scanning")
return True
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
"""
Execute MobSF security analysis on an APK file.
Args:
config: Configuration dict with file_path, mobsf_url, api_key
workspace: Workspace directory path
Returns:
ModuleResult with security findings from MobSF
"""
self.start_timer()
try:
self.validate_config(config)
self.validate_workspace(workspace)
# Get configuration
mobsf_url = config.get("mobsf_url", "http://localhost:8877")
file_path_str = config["file_path"]
rescan = config.get("rescan", False)
# Get API key from config or environment
api_key = config.get("api_key") or os.environ.get("MOBSF_API_KEY", "")
if not api_key:
logger.warning("No MobSF API key provided. Some functionality may be limited.")
# Resolve APK file path
file_path = Path(file_path_str)
if not file_path.is_absolute():
file_path = (workspace / file_path).resolve()
if not file_path.exists():
raise FileNotFoundError(f"APK file not found: {file_path}")
if not file_path.is_file():
raise ValueError(f"APK path must be a file: {file_path}")
logger.info(f"Starting MobSF scan of APK: {file_path}")
# Upload and scan APK
scan_hash = await self._upload_file(mobsf_url, file_path, api_key)
logger.info(f"APK uploaded to MobSF with hash: {scan_hash}")
# Start scan
await self._start_scan(mobsf_url, scan_hash, api_key, rescan=rescan)
logger.info(f"MobSF scan completed for hash: {scan_hash}")
# Get JSON results
scan_results = await self._get_json_results(mobsf_url, scan_hash, api_key)
# Parse results into findings
findings = self._parse_scan_results(scan_results, file_path)
# Create summary
summary = self._create_summary(findings, scan_hash)
logger.info(f"✓ MobSF scan completed: {len(findings)} findings")
return self.create_result(
findings=findings,
status="success",
summary=summary,
metadata={
"tool": "mobsf",
"tool_version": "3.9.7",
"scan_hash": scan_hash,
"apk_file": str(file_path),
"mobsf_url": mobsf_url,
}
)
except Exception as exc:
logger.error(f"MobSF scanner failed: {exc}", exc_info=True)
return self.create_result(
findings=[],
status="failed",
error=str(exc),
metadata={"tool": "mobsf", "file_path": config.get("file_path")}
)
async def _upload_file(self, mobsf_url: str, file_path: Path, api_key: str) -> str:
"""
Upload APK file to MobSF server.
Returns:
Scan hash for the uploaded file
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
# Create multipart form data
filename = file_path.name
async with aiohttp.ClientSession() as session:
with open(file_path, 'rb') as f:
data = aiohttp.FormData()
data.add_field('file',
f,
filename=filename,
content_type='application/vnd.android.package-archive')
async with session.post(
f"{mobsf_url}/api/v1/upload",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=300)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to upload file to MobSF: {error_text}")
result = await response.json()
scan_hash = result.get('hash')
if not scan_hash:
raise Exception(f"MobSF upload failed: {result}")
return scan_hash
async def _start_scan(self, mobsf_url: str, scan_hash: str, api_key: str, rescan: bool = False) -> Dict[str, Any]:
"""
Start MobSF scan for uploaded file.
Returns:
Scan result dictionary
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
data = {
'hash': scan_hash,
're_scan': '1' if rescan else '0'
}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{mobsf_url}/api/v1/scan",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=600) # 10 minutes for scan
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"MobSF scan failed: {error_text}")
result = await response.json()
return result
async def _get_json_results(self, mobsf_url: str, scan_hash: str, api_key: str) -> Dict[str, Any]:
"""
Retrieve JSON scan results from MobSF.
Returns:
Scan results dictionary
"""
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
data = {'hash': scan_hash}
async with aiohttp.ClientSession() as session:
async with session.post(
f"{mobsf_url}/api/v1/report_json",
headers=headers,
data=data,
timeout=aiohttp.ClientTimeout(total=60)
) as response:
if response.status != 200:
error_text = await response.text()
raise Exception(f"Failed to retrieve MobSF results: {error_text}")
return await response.json()
def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List[ModuleFinding]:
"""Parse MobSF JSON results into standardized findings"""
findings = []
# Create FoundBy attribution for all MobSF findings
found_by = FoundBy(
module="mobsf_scanner",
tool_name="MobSF",
tool_version="3.9.7",
type="tool"
)
# Parse permissions
if 'permissions' in scan_data:
for perm_name, perm_attrs in scan_data['permissions'].items():
if isinstance(perm_attrs, dict):
severity = self.SEVERITY_MAP.get(
perm_attrs.get('status', '').lower(), 'low'
)
finding = self.create_finding(
rule_id=f"android_permission_{perm_name.replace('.', '_')}",
title=f"Android Permission: {perm_name}",
description=perm_attrs.get('description', 'No description'),
severity=severity,
category="android-permission",
found_by=found_by,
confidence="high",
metadata={
'permission': perm_name,
'status': perm_attrs.get('status'),
'info': perm_attrs.get('info'),
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse manifest analysis
if 'manifest_analysis' in scan_data:
manifest_findings = scan_data['manifest_analysis'].get('manifest_findings', [])
for item in manifest_findings:
if isinstance(item, dict):
severity = self.SEVERITY_MAP.get(item.get('severity', '').lower(), 'medium')
title = item.get('title') or item.get('name') or "Manifest Issue"
rule = item.get('rule') or "manifest_issue"
finding = self.create_finding(
rule_id=f"android_manifest_{rule.replace(' ', '_').replace('-', '_')}",
title=title,
description=item.get('description', 'No description'),
severity=severity,
category="android-manifest",
found_by=found_by,
confidence="high",
metadata={
'rule': rule,
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse code analysis
if 'code_analysis' in scan_data:
code_findings = scan_data['code_analysis'].get('findings', {})
for finding_name, finding_data in code_findings.items():
if isinstance(finding_data, dict):
metadata_dict = finding_data.get('metadata', {})
severity = self.SEVERITY_MAP.get(
metadata_dict.get('severity', '').lower(), 'medium'
)
# MobSF returns 'files' as a dict: {filename: line_numbers}
files_dict = finding_data.get('files', {})
# Create a finding for each affected file
if isinstance(files_dict, dict) and files_dict:
for file_path, line_numbers in files_dict.items():
# Extract first line number if available
line_start = None
if line_numbers:
try:
# Can be string like "28" or "65,81"
line_start = int(str(line_numbers).split(',')[0])
except (ValueError, AttributeError):
pass
# Extract CWE from metadata
cwe_value = metadata_dict.get('cwe')
cwe_id = f"CWE-{cwe_value}" if cwe_value else None
finding = self.create_finding(
rule_id=finding_name.replace(' ', '_').replace('-', '_'),
title=finding_name,
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-code-analysis",
found_by=found_by,
confidence="medium",
cwe=cwe_id,
owasp=metadata_dict.get('owasp'),
file_path=file_path,
line_start=line_start,
metadata={
'masvs': metadata_dict.get('masvs'),
'cvss': metadata_dict.get('cvss'),
'ref': metadata_dict.get('ref'),
'line_numbers': line_numbers,
'tool': 'mobsf',
}
)
findings.append(finding)
else:
# Fallback: create one finding without file info
# Extract CWE from metadata
cwe_value = metadata_dict.get('cwe')
cwe_id = f"CWE-{cwe_value}" if cwe_value else None
finding = self.create_finding(
rule_id=finding_name.replace(' ', '_').replace('-', '_'),
title=finding_name,
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-code-analysis",
found_by=found_by,
confidence="medium",
cwe=cwe_id,
owasp=metadata_dict.get('owasp'),
metadata={
'masvs': metadata_dict.get('masvs'),
'cvss': metadata_dict.get('cvss'),
'ref': metadata_dict.get('ref'),
'tool': 'mobsf',
}
)
findings.append(finding)
# Parse behavior analysis
if 'behaviour' in scan_data:
for key, value in scan_data['behaviour'].items():
if isinstance(value, dict):
metadata_dict = value.get('metadata', {})
labels = metadata_dict.get('label', [])
label = labels[0] if labels else 'Unknown Behavior'
severity = self.SEVERITY_MAP.get(
metadata_dict.get('severity', '').lower(), 'medium'
)
# MobSF returns 'files' as a dict: {filename: line_numbers}
files_dict = value.get('files', {})
# Create a finding for each affected file
if isinstance(files_dict, dict) and files_dict:
for file_path, line_numbers in files_dict.items():
# Extract first line number if available
line_start = None
if line_numbers:
try:
# Can be string like "28" or "65,81"
line_start = int(str(line_numbers).split(',')[0])
except (ValueError, AttributeError):
pass
finding = self.create_finding(
rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}",
title=f"Behavior: {label}",
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-behavior",
found_by=found_by,
confidence="medium",
file_path=file_path,
line_start=line_start,
metadata={
'line_numbers': line_numbers,
'behavior_key': key,
'tool': 'mobsf',
}
)
findings.append(finding)
else:
# Fallback: create one finding without file info
finding = self.create_finding(
rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}",
title=f"Behavior: {label}",
description=metadata_dict.get('description', 'No description'),
severity=severity,
category="android-behavior",
found_by=found_by,
confidence="medium",
metadata={
'behavior_key': key,
'tool': 'mobsf',
}
)
findings.append(finding)
logger.debug(f"Parsed {len(findings)} findings from MobSF results")
return findings
def _create_summary(self, findings: List[ModuleFinding], scan_hash: str) -> Dict[str, Any]:
"""Create analysis summary"""
severity_counter = Counter()
category_counter = Counter()
for finding in findings:
severity_counter[finding.severity] += 1
category_counter[finding.category] += 1
return {
"scan_hash": scan_hash,
"total_findings": len(findings),
"severity_counts": dict(severity_counter),
"category_counts": dict(category_counter),
}