mirror of
https://github.com/FuzzingLabs/fuzzforge_ai.git
synced 2026-02-12 23:52:47 +00:00
MobSF returns 'files' as a dict (not list):
{"filename": "line_numbers"}
The parser was treating it as a list, causing zero findings
to be extracted. Now properly iterates over the dict and
creates one finding per affected file with correct line numbers
and metadata (CWE, OWASP, MASVS, CVSS).
Fixed in both code_analysis and behaviour sections.
438 lines
18 KiB
Python
438 lines
18 KiB
Python
"""
|
|
MobSF Scanner Module
|
|
|
|
Mobile Security Framework (MobSF) integration for comprehensive Android app security analysis.
|
|
Performs static analysis on APK files including permissions, manifest analysis, code analysis, and behavior checks.
|
|
"""
|
|
|
|
# Copyright (c) 2025 FuzzingLabs
|
|
#
|
|
# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file
|
|
# at the root of this repository for details.
|
|
#
|
|
# After the Change Date (four years from publication), this version of the
|
|
# Licensed Work will be made available under the Apache License, Version 2.0.
|
|
# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Additional attribution and requirements are provided in the NOTICE file.
|
|
|
|
import logging
|
|
import os
|
|
from collections import Counter
|
|
from pathlib import Path
|
|
from typing import Dict, Any, List
|
|
import aiohttp
|
|
|
|
try:
|
|
from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
except ImportError:
|
|
try:
|
|
from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
except ImportError:
|
|
from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MobSFScanner(BaseModule):
|
|
"""Mobile Security Framework (MobSF) scanner module for Android applications"""
|
|
|
|
SEVERITY_MAP = {
|
|
"dangerous": "critical",
|
|
"high": "high",
|
|
"warning": "medium",
|
|
"medium": "medium",
|
|
"low": "low",
|
|
"info": "low",
|
|
"secure": "low",
|
|
}
|
|
|
|
def get_metadata(self) -> ModuleMetadata:
|
|
return ModuleMetadata(
|
|
name="mobsf_scanner",
|
|
version="3.9.7",
|
|
description="Comprehensive Android security analysis using Mobile Security Framework (MobSF)",
|
|
author="FuzzForge Team",
|
|
category="android",
|
|
tags=["mobile", "android", "mobsf", "sast", "scanner", "security"],
|
|
input_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"mobsf_url": {
|
|
"type": "string",
|
|
"description": "MobSF server URL",
|
|
"default": "http://localhost:8877",
|
|
},
|
|
"file_path": {
|
|
"type": "string",
|
|
"description": "Path to the APK file to scan (absolute or relative to workspace)",
|
|
},
|
|
"api_key": {
|
|
"type": "string",
|
|
"description": "MobSF API key (if not provided, will try MOBSF_API_KEY env var)",
|
|
"default": None,
|
|
},
|
|
"rescan": {
|
|
"type": "boolean",
|
|
"description": "Force rescan even if file was previously analyzed",
|
|
"default": False,
|
|
},
|
|
},
|
|
"required": ["file_path"],
|
|
},
|
|
output_schema={
|
|
"type": "object",
|
|
"properties": {
|
|
"findings": {
|
|
"type": "array",
|
|
"description": "Security findings from MobSF analysis"
|
|
},
|
|
"scan_hash": {"type": "string"},
|
|
"total_findings": {"type": "integer"},
|
|
"severity_counts": {"type": "object"},
|
|
}
|
|
},
|
|
requires_workspace=True,
|
|
)
|
|
|
|
def validate_config(self, config: Dict[str, Any]) -> bool:
|
|
"""Validate module configuration"""
|
|
if "mobsf_url" in config and not isinstance(config["mobsf_url"], str):
|
|
raise ValueError("mobsf_url must be a string")
|
|
|
|
file_path = config.get("file_path")
|
|
if not file_path:
|
|
raise ValueError("file_path is required for MobSF scanning")
|
|
|
|
return True
|
|
|
|
async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult:
|
|
"""
|
|
Execute MobSF security analysis on an APK file.
|
|
|
|
Args:
|
|
config: Configuration dict with file_path, mobsf_url, api_key
|
|
workspace: Workspace directory path
|
|
|
|
Returns:
|
|
ModuleResult with security findings from MobSF
|
|
"""
|
|
self.start_timer()
|
|
|
|
try:
|
|
self.validate_config(config)
|
|
self.validate_workspace(workspace)
|
|
|
|
# Get configuration
|
|
mobsf_url = config.get("mobsf_url", "http://localhost:8877")
|
|
file_path_str = config["file_path"]
|
|
rescan = config.get("rescan", False)
|
|
|
|
# Get API key from config or environment
|
|
api_key = config.get("api_key") or os.environ.get("MOBSF_API_KEY", "")
|
|
if not api_key:
|
|
logger.warning("No MobSF API key provided. Some functionality may be limited.")
|
|
|
|
# Resolve APK file path
|
|
file_path = Path(file_path_str)
|
|
if not file_path.is_absolute():
|
|
file_path = (workspace / file_path).resolve()
|
|
|
|
if not file_path.exists():
|
|
raise FileNotFoundError(f"APK file not found: {file_path}")
|
|
|
|
if not file_path.is_file():
|
|
raise ValueError(f"APK path must be a file: {file_path}")
|
|
|
|
logger.info(f"Starting MobSF scan of APK: {file_path}")
|
|
|
|
# Upload and scan APK
|
|
scan_hash = await self._upload_file(mobsf_url, file_path, api_key)
|
|
logger.info(f"APK uploaded to MobSF with hash: {scan_hash}")
|
|
|
|
# Start scan
|
|
await self._start_scan(mobsf_url, scan_hash, api_key, rescan=rescan)
|
|
logger.info(f"MobSF scan completed for hash: {scan_hash}")
|
|
|
|
# Get JSON results
|
|
scan_results = await self._get_json_results(mobsf_url, scan_hash, api_key)
|
|
|
|
# Parse results into findings
|
|
findings = self._parse_scan_results(scan_results, file_path)
|
|
|
|
# Create summary
|
|
summary = self._create_summary(findings, scan_hash)
|
|
|
|
logger.info(f"✓ MobSF scan completed: {len(findings)} findings")
|
|
|
|
return self.create_result(
|
|
findings=findings,
|
|
status="success",
|
|
summary=summary,
|
|
metadata={
|
|
"tool": "mobsf",
|
|
"tool_version": "3.9.7",
|
|
"scan_hash": scan_hash,
|
|
"apk_file": str(file_path),
|
|
"mobsf_url": mobsf_url,
|
|
}
|
|
)
|
|
|
|
except Exception as exc:
|
|
logger.error(f"MobSF scanner failed: {exc}", exc_info=True)
|
|
return self.create_result(
|
|
findings=[],
|
|
status="failed",
|
|
error=str(exc),
|
|
metadata={"tool": "mobsf", "file_path": config.get("file_path")}
|
|
)
|
|
|
|
async def _upload_file(self, mobsf_url: str, file_path: Path, api_key: str) -> str:
|
|
"""
|
|
Upload APK file to MobSF server.
|
|
|
|
Returns:
|
|
Scan hash for the uploaded file
|
|
"""
|
|
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
|
|
|
|
# Create multipart form data
|
|
filename = file_path.name
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
with open(file_path, 'rb') as f:
|
|
data = aiohttp.FormData()
|
|
data.add_field('file',
|
|
f,
|
|
filename=filename,
|
|
content_type='application/vnd.android.package-archive')
|
|
|
|
async with session.post(
|
|
f"{mobsf_url}/api/v1/upload",
|
|
headers=headers,
|
|
data=data,
|
|
timeout=aiohttp.ClientTimeout(total=300)
|
|
) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
raise Exception(f"Failed to upload file to MobSF: {error_text}")
|
|
|
|
result = await response.json()
|
|
scan_hash = result.get('hash')
|
|
if not scan_hash:
|
|
raise Exception(f"MobSF upload failed: {result}")
|
|
|
|
return scan_hash
|
|
|
|
async def _start_scan(self, mobsf_url: str, scan_hash: str, api_key: str, rescan: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Start MobSF scan for uploaded file.
|
|
|
|
Returns:
|
|
Scan result dictionary
|
|
"""
|
|
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
|
|
data = {
|
|
'hash': scan_hash,
|
|
're_scan': '1' if rescan else '0'
|
|
}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{mobsf_url}/api/v1/scan",
|
|
headers=headers,
|
|
data=data,
|
|
timeout=aiohttp.ClientTimeout(total=600) # 10 minutes for scan
|
|
) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
raise Exception(f"MobSF scan failed: {error_text}")
|
|
|
|
result = await response.json()
|
|
return result
|
|
|
|
async def _get_json_results(self, mobsf_url: str, scan_hash: str, api_key: str) -> Dict[str, Any]:
|
|
"""
|
|
Retrieve JSON scan results from MobSF.
|
|
|
|
Returns:
|
|
Scan results dictionary
|
|
"""
|
|
headers = {'X-Mobsf-Api-Key': api_key} if api_key else {}
|
|
data = {'hash': scan_hash}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
async with session.post(
|
|
f"{mobsf_url}/api/v1/report_json",
|
|
headers=headers,
|
|
data=data,
|
|
timeout=aiohttp.ClientTimeout(total=60)
|
|
) as response:
|
|
if response.status != 200:
|
|
error_text = await response.text()
|
|
raise Exception(f"Failed to retrieve MobSF results: {error_text}")
|
|
|
|
return await response.json()
|
|
|
|
def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List[ModuleFinding]:
|
|
"""Parse MobSF JSON results into standardized findings"""
|
|
findings = []
|
|
|
|
# Parse permissions
|
|
if 'permissions' in scan_data:
|
|
for perm_name, perm_attrs in scan_data['permissions'].items():
|
|
if isinstance(perm_attrs, dict):
|
|
severity = self.SEVERITY_MAP.get(
|
|
perm_attrs.get('status', '').lower(), 'low'
|
|
)
|
|
|
|
finding = self.create_finding(
|
|
title=f"Android Permission: {perm_name}",
|
|
description=perm_attrs.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-permission",
|
|
metadata={
|
|
'permission': perm_name,
|
|
'status': perm_attrs.get('status'),
|
|
'info': perm_attrs.get('info'),
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
# Parse manifest analysis
|
|
if 'manifest_analysis' in scan_data:
|
|
manifest_findings = scan_data['manifest_analysis'].get('manifest_findings', [])
|
|
for item in manifest_findings:
|
|
if isinstance(item, dict):
|
|
severity = self.SEVERITY_MAP.get(item.get('severity', '').lower(), 'medium')
|
|
|
|
finding = self.create_finding(
|
|
title=item.get('title') or item.get('name') or "Manifest Issue",
|
|
description=item.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-manifest",
|
|
metadata={
|
|
'rule': item.get('rule'),
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
# Parse code analysis
|
|
if 'code_analysis' in scan_data:
|
|
code_findings = scan_data['code_analysis'].get('findings', {})
|
|
for finding_name, finding_data in code_findings.items():
|
|
if isinstance(finding_data, dict):
|
|
metadata_dict = finding_data.get('metadata', {})
|
|
severity = self.SEVERITY_MAP.get(
|
|
metadata_dict.get('severity', '').lower(), 'medium'
|
|
)
|
|
|
|
# MobSF returns 'files' as a dict: {filename: line_numbers}
|
|
files_dict = finding_data.get('files', {})
|
|
|
|
# Create a finding for each affected file
|
|
if isinstance(files_dict, dict) and files_dict:
|
|
for file_path, line_numbers in files_dict.items():
|
|
finding = self.create_finding(
|
|
title=finding_name,
|
|
description=metadata_dict.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-code-analysis",
|
|
file_path=file_path,
|
|
line_number=line_numbers, # Can be string like "28" or "65,81"
|
|
metadata={
|
|
'cwe': metadata_dict.get('cwe'),
|
|
'owasp': metadata_dict.get('owasp'),
|
|
'masvs': metadata_dict.get('masvs'),
|
|
'cvss': metadata_dict.get('cvss'),
|
|
'ref': metadata_dict.get('ref'),
|
|
'line_numbers': line_numbers,
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
else:
|
|
# Fallback: create one finding without file info
|
|
finding = self.create_finding(
|
|
title=finding_name,
|
|
description=metadata_dict.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-code-analysis",
|
|
metadata={
|
|
'cwe': metadata_dict.get('cwe'),
|
|
'owasp': metadata_dict.get('owasp'),
|
|
'masvs': metadata_dict.get('masvs'),
|
|
'cvss': metadata_dict.get('cvss'),
|
|
'ref': metadata_dict.get('ref'),
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
# Parse behavior analysis
|
|
if 'behaviour' in scan_data:
|
|
for key, value in scan_data['behaviour'].items():
|
|
if isinstance(value, dict):
|
|
metadata_dict = value.get('metadata', {})
|
|
labels = metadata_dict.get('label', [])
|
|
label = labels[0] if labels else 'Unknown Behavior'
|
|
|
|
severity = self.SEVERITY_MAP.get(
|
|
metadata_dict.get('severity', '').lower(), 'medium'
|
|
)
|
|
|
|
# MobSF returns 'files' as a dict: {filename: line_numbers}
|
|
files_dict = value.get('files', {})
|
|
|
|
# Create a finding for each affected file
|
|
if isinstance(files_dict, dict) and files_dict:
|
|
for file_path, line_numbers in files_dict.items():
|
|
finding = self.create_finding(
|
|
title=f"Behavior: {label}",
|
|
description=metadata_dict.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-behavior",
|
|
file_path=file_path,
|
|
line_number=line_numbers,
|
|
metadata={
|
|
'line_numbers': line_numbers,
|
|
'behavior_key': key,
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
else:
|
|
# Fallback: create one finding without file info
|
|
finding = self.create_finding(
|
|
title=f"Behavior: {label}",
|
|
description=metadata_dict.get('description', 'No description'),
|
|
severity=severity,
|
|
category="android-behavior",
|
|
metadata={
|
|
'behavior_key': key,
|
|
'tool': 'mobsf',
|
|
}
|
|
)
|
|
findings.append(finding)
|
|
|
|
logger.debug(f"Parsed {len(findings)} findings from MobSF results")
|
|
return findings
|
|
|
|
def _create_summary(self, findings: List[ModuleFinding], scan_hash: str) -> Dict[str, Any]:
|
|
"""Create analysis summary"""
|
|
severity_counter = Counter()
|
|
category_counter = Counter()
|
|
|
|
for finding in findings:
|
|
severity_counter[finding.severity] += 1
|
|
category_counter[finding.category] += 1
|
|
|
|
return {
|
|
"scan_hash": scan_hash,
|
|
"total_findings": len(findings),
|
|
"severity_counts": dict(severity_counter),
|
|
"category_counts": dict(category_counter),
|
|
}
|