mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-15 18:02:44 +00:00
Compare commits
2 Commits
root_binar
...
dev_deps
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9a3016dd1 | ||
|
|
2b10e86f53 |
@@ -1,5 +1,5 @@
|
|||||||
mkdocs==1.6.1
|
mkdocs==1.6.1
|
||||||
mkdocs-autorefs==1.4.3
|
mkdocs-autorefs==1.4.2
|
||||||
mkdocs-material==9.6.20
|
mkdocs-material==9.6.17
|
||||||
mkdocs-material-extensions==1.3.1
|
mkdocs-material-extensions==1.3.1
|
||||||
mkdocstrings==0.30.1
|
mkdocstrings==0.30.0
|
||||||
@@ -35,7 +35,6 @@ dependencies = [
|
|||||||
"pydantic-settings==2.10.1",
|
"pydantic-settings==2.10.1",
|
||||||
"NSKeyedUnArchiver==1.5.2",
|
"NSKeyedUnArchiver==1.5.2",
|
||||||
"python-dateutil==2.9.0.post0",
|
"python-dateutil==2.9.0.post0",
|
||||||
"tzdata==2025.2",
|
|
||||||
]
|
]
|
||||||
requires-python = ">= 3.10"
|
requires-python = ">= 3.10"
|
||||||
|
|
||||||
|
|||||||
@@ -1,186 +0,0 @@
|
|||||||
# Mobile Verification Toolkit (MVT)
|
|
||||||
# Copyright (c) 2021-2023 The MVT Authors.
|
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
||||||
# https://license.mvt.re/1.1/
|
|
||||||
|
|
||||||
from typing import Any
|
|
||||||
|
|
||||||
from .artifact import AndroidArtifact
|
|
||||||
|
|
||||||
SUSPICIOUS_MOUNT_POINTS = [
|
|
||||||
"/system",
|
|
||||||
"/vendor",
|
|
||||||
"/product",
|
|
||||||
"/system_ext",
|
|
||||||
]
|
|
||||||
|
|
||||||
SUSPICIOUS_OPTIONS = [
|
|
||||||
"rw",
|
|
||||||
"remount",
|
|
||||||
"noatime",
|
|
||||||
"nodiratime",
|
|
||||||
]
|
|
||||||
|
|
||||||
ALLOWLIST_NOATIME = [
|
|
||||||
"/system_dlkm",
|
|
||||||
"/system_ext",
|
|
||||||
"/product",
|
|
||||||
"/vendor",
|
|
||||||
"/vendor_dlkm",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
class Mounts(AndroidArtifact):
|
|
||||||
"""
|
|
||||||
This artifact parses mount information from /proc/mounts or similar mount data.
|
|
||||||
It can detect potentially suspicious mount configurations that may indicate
|
|
||||||
a rooted or compromised device.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def parse(self, entry: str) -> None:
|
|
||||||
"""
|
|
||||||
Parse mount information from the provided entry.
|
|
||||||
|
|
||||||
Examples:
|
|
||||||
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
|
|
||||||
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
|
|
||||||
"""
|
|
||||||
self.results: list[dict[str, Any]] = []
|
|
||||||
|
|
||||||
for line in entry.splitlines():
|
|
||||||
line = line.strip()
|
|
||||||
if not line:
|
|
||||||
continue
|
|
||||||
|
|
||||||
device = None
|
|
||||||
mount_point = None
|
|
||||||
filesystem_type = None
|
|
||||||
mount_options = ""
|
|
||||||
|
|
||||||
if " on " in line and " type " in line:
|
|
||||||
try:
|
|
||||||
# Format: device on mount_point type filesystem_type (options)
|
|
||||||
device_part, rest = line.split(" on ", 1)
|
|
||||||
device = device_part.strip()
|
|
||||||
|
|
||||||
# Split by 'type' to get mount_point and filesystem info
|
|
||||||
mount_part, fs_part = rest.split(" type ", 1)
|
|
||||||
mount_point = mount_part.strip()
|
|
||||||
|
|
||||||
# Parse filesystem and options
|
|
||||||
if "(" in fs_part and fs_part.endswith(")"):
|
|
||||||
# Format: filesystem_type (options)
|
|
||||||
fs_and_opts = fs_part.strip()
|
|
||||||
paren_idx = fs_and_opts.find("(")
|
|
||||||
filesystem_type = fs_and_opts[:paren_idx].strip()
|
|
||||||
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
|
|
||||||
else:
|
|
||||||
# No options in parentheses, just filesystem type
|
|
||||||
filesystem_type = fs_part.strip()
|
|
||||||
mount_options = ""
|
|
||||||
|
|
||||||
# Skip if we don't have essential info
|
|
||||||
if not device or not mount_point or not filesystem_type:
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Parse options into list
|
|
||||||
options_list = (
|
|
||||||
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
|
|
||||||
if mount_options
|
|
||||||
else []
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if it's a system partition
|
|
||||||
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
|
|
||||||
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check if it's mounted read-write
|
|
||||||
is_read_write = "rw" in options_list
|
|
||||||
|
|
||||||
mount_entry = {
|
|
||||||
"device": device,
|
|
||||||
"mount_point": mount_point,
|
|
||||||
"filesystem_type": filesystem_type,
|
|
||||||
"mount_options": mount_options,
|
|
||||||
"options_list": options_list,
|
|
||||||
"is_system_partition": is_system_partition,
|
|
||||||
"is_read_write": is_read_write,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.results.append(mount_entry)
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
# If parsing fails, skip this line
|
|
||||||
continue
|
|
||||||
else:
|
|
||||||
# Skip lines that don't match expected format
|
|
||||||
continue
|
|
||||||
|
|
||||||
def check_indicators(self) -> None:
|
|
||||||
"""
|
|
||||||
Check for suspicious mount configurations that may indicate root access
|
|
||||||
or other security concerns.
|
|
||||||
"""
|
|
||||||
system_rw_mounts = []
|
|
||||||
suspicious_mounts = []
|
|
||||||
|
|
||||||
for mount in self.results:
|
|
||||||
mount_point = mount["mount_point"]
|
|
||||||
options = mount["options_list"]
|
|
||||||
|
|
||||||
# Check for system partitions mounted as read-write
|
|
||||||
if mount["is_system_partition"] and mount["is_read_write"]:
|
|
||||||
system_rw_mounts.append(mount)
|
|
||||||
if mount_point == "/system":
|
|
||||||
self.log.warning(
|
|
||||||
"Root detected /system partition is mounted as read-write (rw). "
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
self.log.warning(
|
|
||||||
"System partition %s is mounted as read-write (rw). This may indicate system modifications.",
|
|
||||||
mount_point,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Check for other suspicious mount options
|
|
||||||
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
|
|
||||||
if suspicious_opts and mount["is_system_partition"]:
|
|
||||||
if (
|
|
||||||
"noatime" in mount["mount_options"]
|
|
||||||
and mount["mount_point"] in ALLOWLIST_NOATIME
|
|
||||||
):
|
|
||||||
continue
|
|
||||||
suspicious_mounts.append(mount)
|
|
||||||
self.log.warning(
|
|
||||||
"Suspicious mount options found for %s: %s",
|
|
||||||
mount_point,
|
|
||||||
", ".join(suspicious_opts),
|
|
||||||
)
|
|
||||||
|
|
||||||
# Log interesting mount information
|
|
||||||
if mount_point == "/data" or mount_point.startswith("/sdcard"):
|
|
||||||
self.log.info(
|
|
||||||
"Data partition: %s mounted as %s with options: %s",
|
|
||||||
mount_point,
|
|
||||||
mount["filesystem_type"],
|
|
||||||
mount["mount_options"],
|
|
||||||
)
|
|
||||||
|
|
||||||
self.log.info("Parsed %d mount entries", len(self.results))
|
|
||||||
|
|
||||||
# Check indicators if available
|
|
||||||
if not self.indicators:
|
|
||||||
return
|
|
||||||
|
|
||||||
for mount in self.results:
|
|
||||||
# Check if any mount points match indicators
|
|
||||||
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
|
|
||||||
if ioc:
|
|
||||||
mount["matched_indicator"] = ioc
|
|
||||||
self.detected.append(mount)
|
|
||||||
|
|
||||||
# Check device paths for indicators
|
|
||||||
ioc = self.indicators.check_file_path(mount.get("device", ""))
|
|
||||||
if ioc:
|
|
||||||
mount["matched_indicator"] = ioc
|
|
||||||
self.detected.append(mount)
|
|
||||||
@@ -53,7 +53,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
|||||||
file_name: str
|
file_name: str
|
||||||
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
|
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||||
build_fingerprint: str
|
build_fingerprint: str
|
||||||
revision: str
|
revision: int
|
||||||
arch: Optional[str] = None
|
arch: Optional[str] = None
|
||||||
timestamp: str # We store the timestamp as a string to avoid timezone issues
|
timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||||
process_uptime: Optional[int] = None
|
process_uptime: Optional[int] = None
|
||||||
@@ -70,7 +70,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
|||||||
|
|
||||||
|
|
||||||
class TombstoneCrashArtifact(AndroidArtifact):
|
class TombstoneCrashArtifact(AndroidArtifact):
|
||||||
"""
|
""" "
|
||||||
Parser for Android tombstone crash files.
|
Parser for Android tombstone crash files.
|
||||||
|
|
||||||
This parser can parse both text and protobuf tombstone crash files.
|
This parser can parse both text and protobuf tombstone crash files.
|
||||||
@@ -121,7 +121,9 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
def parse_protobuf(
|
def parse_protobuf(
|
||||||
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
|
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Parse Android tombstone crash files from a protobuf object."""
|
"""
|
||||||
|
Parse Android tombstone crash files from a protobuf object.
|
||||||
|
"""
|
||||||
tombstone_pb = Tombstone().parse(data)
|
tombstone_pb = Tombstone().parse(data)
|
||||||
tombstone_dict = tombstone_pb.to_dict(
|
tombstone_dict = tombstone_pb.to_dict(
|
||||||
betterproto.Casing.SNAKE, include_default_values=True
|
betterproto.Casing.SNAKE, include_default_values=True
|
||||||
@@ -142,23 +144,21 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
def parse(
|
def parse(
|
||||||
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
|
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
|
||||||
) -> None:
|
) -> None:
|
||||||
"""Parse text Android tombstone crash files."""
|
"""
|
||||||
|
Parse text Android tombstone crash files.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Split the tombstone file into a dictonary
|
||||||
tombstone_dict = {
|
tombstone_dict = {
|
||||||
"file_name": file_name,
|
"file_name": file_name,
|
||||||
"file_timestamp": convert_datetime_to_iso(file_timestamp),
|
"file_timestamp": convert_datetime_to_iso(file_timestamp),
|
||||||
}
|
}
|
||||||
lines = content.decode("utf-8").splitlines()
|
lines = content.decode("utf-8").splitlines()
|
||||||
for line_num, line in enumerate(lines, 1):
|
for line in lines:
|
||||||
if not line.strip() or TOMBSTONE_DELIMITER in line:
|
if not line.strip() or TOMBSTONE_DELIMITER in line:
|
||||||
continue
|
continue
|
||||||
try:
|
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
|
||||||
if self._parse_tombstone_line(
|
|
||||||
line, key, destination_key, tombstone_dict
|
|
||||||
):
|
|
||||||
break
|
|
||||||
except Exception as e:
|
|
||||||
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
|
|
||||||
|
|
||||||
# Validate the tombstone and add it to the results
|
# Validate the tombstone and add it to the results
|
||||||
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
||||||
@@ -168,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
self, line: str, key: str, destination_key: str, tombstone: dict
|
self, line: str, key: str, destination_key: str, tombstone: dict
|
||||||
) -> bool:
|
) -> bool:
|
||||||
if not line.startswith(f"{key}"):
|
if not line.startswith(f"{key}"):
|
||||||
return False
|
return None
|
||||||
|
|
||||||
if key == "pid":
|
if key == "pid":
|
||||||
return self._load_pid_line(line, tombstone)
|
return self._load_pid_line(line, tombstone)
|
||||||
@@ -187,7 +187,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
raise ValueError(f"Expected key {key}, got {line_key}")
|
raise ValueError(f"Expected key {key}, got {line_key}")
|
||||||
|
|
||||||
value_clean = value.strip().strip("'")
|
value_clean = value.strip().strip("'")
|
||||||
if destination_key == "uid":
|
if destination_key in ["uid", "revision"]:
|
||||||
tombstone[destination_key] = int(value_clean)
|
tombstone[destination_key] = int(value_clean)
|
||||||
elif destination_key == "process_uptime":
|
elif destination_key == "process_uptime":
|
||||||
# eg. "Process uptime: 40s"
|
# eg. "Process uptime: 40s"
|
||||||
@@ -200,50 +200,51 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
|
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
|
||||||
try:
|
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
|
||||||
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
|
|
||||||
process_info = parts[0]
|
|
||||||
|
|
||||||
# Parse pid, tid, name from process info
|
pid_key, pid_value = pid_part.split(":", 1)
|
||||||
info_parts = [p.strip() for p in process_info.split(",")]
|
if pid_key != "pid":
|
||||||
for info in info_parts:
|
raise ValueError(f"Expected key pid, got {pid_key}")
|
||||||
key, value = info.split(":", 1)
|
pid_value = int(pid_value.strip())
|
||||||
key = key.strip()
|
|
||||||
value = value.strip()
|
|
||||||
|
|
||||||
if key == "pid":
|
tid_key, tid_value = tid_part.split(":", 1)
|
||||||
tombstone["pid"] = int(value)
|
if tid_key != "tid":
|
||||||
elif key == "tid":
|
raise ValueError(f"Expected key tid, got {tid_key}")
|
||||||
tombstone["tid"] = int(value)
|
tid_value = int(tid_value.strip())
|
||||||
elif key == "name":
|
|
||||||
tombstone["process_name"] = value
|
|
||||||
|
|
||||||
# Extract binary path if it exists
|
name_key, name_value = name_part.split(":", 1)
|
||||||
if len(parts) > 1:
|
if name_key != "name":
|
||||||
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
|
raise ValueError(f"Expected key name, got {name_key}")
|
||||||
|
name_value = name_value.strip()
|
||||||
|
process_name, binary_path = self._parse_process_name(name_value, tombstone)
|
||||||
|
|
||||||
return True
|
tombstone["pid"] = pid_value
|
||||||
|
tombstone["tid"] = tid_value
|
||||||
|
tombstone["process_name"] = process_name
|
||||||
|
tombstone["binary_path"] = binary_path
|
||||||
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
|
||||||
raise ValueError(f"Failed to parse PID line: {str(e)}")
|
process_name, process_path = process_name_part.split(">>>")
|
||||||
|
process_name = process_name.strip()
|
||||||
|
binary_path = process_path.strip().split(" ")[0]
|
||||||
|
return process_name, binary_path
|
||||||
|
|
||||||
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
|
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
|
||||||
signal_part, code_part = map(str.strip, line.split(",")[:2])
|
signal, code, _ = [part.strip() for part in line.split(",", 2)]
|
||||||
|
signal = signal.split("signal ")[1]
|
||||||
|
signal_code, signal_name = signal.split(" ")
|
||||||
|
signal_name = signal_name.strip("()")
|
||||||
|
|
||||||
def parse_part(part: str, prefix: str) -> tuple[int, str]:
|
code_part = code.split("code ")[1]
|
||||||
match = part.split(prefix)[1]
|
code_number, code_name = code_part.split(" ")
|
||||||
number = int(match.split()[0])
|
code_name = code_name.strip("()")
|
||||||
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
|
|
||||||
return number, name
|
|
||||||
|
|
||||||
signal_number, signal_name = parse_part(signal_part, "signal ")
|
|
||||||
code_number, code_name = parse_part(code_part, "code ")
|
|
||||||
|
|
||||||
tombstone["signal_info"] = {
|
tombstone["signal_info"] = {
|
||||||
"code": code_number,
|
"code": int(code_number),
|
||||||
"code_name": code_name,
|
"code_name": code_name,
|
||||||
"name": signal_name,
|
"name": signal_name,
|
||||||
"number": signal_number,
|
"number": int(signal_code),
|
||||||
}
|
}
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@@ -255,6 +256,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def _parse_timestamp_string(timestamp: str) -> str:
|
def _parse_timestamp_string(timestamp: str) -> str:
|
||||||
timestamp_parsed = parser.parse(timestamp)
|
timestamp_parsed = parser.parse(timestamp)
|
||||||
|
|
||||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||||
return convert_datetime_to_iso(local_timestamp)
|
return convert_datetime_to_iso(local_timestamp)
|
||||||
|
|||||||
@@ -107,7 +107,8 @@ class Packages(AndroidExtraction):
|
|||||||
result["matched_indicator"] = ioc
|
result["matched_indicator"] = ioc
|
||||||
self.detected.append(result)
|
self.detected.append(result)
|
||||||
|
|
||||||
def check_virustotal(self, packages: list) -> None:
|
@staticmethod
|
||||||
|
def check_virustotal(packages: list) -> None:
|
||||||
hashes = []
|
hashes = []
|
||||||
for package in packages:
|
for package in packages:
|
||||||
for file in package.get("files", []):
|
for file in package.get("files", []):
|
||||||
@@ -142,15 +143,8 @@ class Packages(AndroidExtraction):
|
|||||||
|
|
||||||
for package in packages:
|
for package in packages:
|
||||||
for file in package.get("files", []):
|
for file in package.get("files", []):
|
||||||
if "package_name" in package:
|
row = [package["package_name"], file["path"]]
|
||||||
row = [package["package_name"], file["path"]]
|
|
||||||
elif "name" in package:
|
|
||||||
row = [package["name"], file["path"]]
|
|
||||||
else:
|
|
||||||
self.log.error(
|
|
||||||
f"Package {package} has no name or package_name. packages.json or apks.json is malformed"
|
|
||||||
)
|
|
||||||
continue
|
|
||||||
if file["sha256"] in detections:
|
if file["sha256"] in detections:
|
||||||
detection = detections[file["sha256"]]
|
detection = detections[file["sha256"]]
|
||||||
positives = detection.split("/")[0]
|
positives = detection.split("/")[0]
|
||||||
|
|||||||
@@ -19,8 +19,6 @@ from .processes import Processes
|
|||||||
from .settings import Settings
|
from .settings import Settings
|
||||||
from .sms import SMS
|
from .sms import SMS
|
||||||
from .files import Files
|
from .files import Files
|
||||||
from .root_binaries import RootBinaries
|
|
||||||
from .mounts import Mounts
|
|
||||||
|
|
||||||
ANDROIDQF_MODULES = [
|
ANDROIDQF_MODULES = [
|
||||||
DumpsysActivities,
|
DumpsysActivities,
|
||||||
@@ -39,6 +37,4 @@ ANDROIDQF_MODULES = [
|
|||||||
SMS,
|
SMS,
|
||||||
DumpsysPackages,
|
DumpsysPackages,
|
||||||
Files,
|
Files,
|
||||||
RootBinaries,
|
|
||||||
Mounts,
|
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -1,74 +0,0 @@
|
|||||||
# Mobile Verification Toolkit (MVT)
|
|
||||||
# Copyright (c) 2021-2023 The MVT Authors.
|
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
||||||
# https://license.mvt.re/1.1/
|
|
||||||
|
|
||||||
import logging
|
|
||||||
import json
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
|
|
||||||
|
|
||||||
from .base import AndroidQFModule
|
|
||||||
|
|
||||||
|
|
||||||
class Mounts(MountsArtifact, AndroidQFModule):
|
|
||||||
"""This module extracts and analyzes mount information from AndroidQF acquisitions."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
file_path: Optional[str] = None,
|
|
||||||
target_path: Optional[str] = None,
|
|
||||||
results_path: Optional[str] = None,
|
|
||||||
module_options: Optional[dict] = None,
|
|
||||||
log: logging.Logger = logging.getLogger(__name__),
|
|
||||||
results: Optional[list] = None,
|
|
||||||
) -> None:
|
|
||||||
super().__init__(
|
|
||||||
file_path=file_path,
|
|
||||||
target_path=target_path,
|
|
||||||
results_path=results_path,
|
|
||||||
module_options=module_options,
|
|
||||||
log=log,
|
|
||||||
results=results,
|
|
||||||
)
|
|
||||||
self.results = []
|
|
||||||
|
|
||||||
def run(self) -> None:
|
|
||||||
"""
|
|
||||||
Run the mounts analysis module.
|
|
||||||
|
|
||||||
This module looks for mount information files collected by androidqf
|
|
||||||
and analyzes them for suspicious configurations, particularly focusing
|
|
||||||
on detecting root access indicators like /system mounted as read-write.
|
|
||||||
"""
|
|
||||||
mount_files = self._get_files_by_pattern("*/mounts.json")
|
|
||||||
|
|
||||||
if not mount_files:
|
|
||||||
self.log.info("No mount information file found")
|
|
||||||
return
|
|
||||||
|
|
||||||
self.log.info("Found mount information file: %s", mount_files[0])
|
|
||||||
|
|
||||||
try:
|
|
||||||
data = self._get_file_content(mount_files[0]).decode(
|
|
||||||
"utf-8", errors="replace"
|
|
||||||
)
|
|
||||||
except Exception as exc:
|
|
||||||
self.log.error("Failed to read mount information file: %s", exc)
|
|
||||||
return
|
|
||||||
|
|
||||||
# Parse the mount data
|
|
||||||
try:
|
|
||||||
json_data = json.loads(data)
|
|
||||||
|
|
||||||
if isinstance(json_data, list):
|
|
||||||
# AndroidQF format: array of strings like
|
|
||||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
|
||||||
mount_content = "\n".join(json_data)
|
|
||||||
self.parse(mount_content)
|
|
||||||
except Exception as exc:
|
|
||||||
self.log.error("Failed to parse mount information: %s", exc)
|
|
||||||
return
|
|
||||||
|
|
||||||
self.log.info("Extracted a total of %d mount entries", len(self.results))
|
|
||||||
@@ -1,121 +0,0 @@
|
|||||||
# Mobile Verification Toolkit (MVT)
|
|
||||||
# Copyright (c) 2021-2023 The MVT Authors.
|
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
||||||
# https://license.mvt.re/1.1/
|
|
||||||
|
|
||||||
import json
|
|
||||||
import logging
|
|
||||||
from typing import Optional
|
|
||||||
|
|
||||||
from .base import AndroidQFModule
|
|
||||||
|
|
||||||
|
|
||||||
class RootBinaries(AndroidQFModule):
|
|
||||||
"""This module analyzes root_binaries.json for root binaries found by androidqf."""
|
|
||||||
|
|
||||||
def __init__(
|
|
||||||
self,
|
|
||||||
file_path: Optional[str] = None,
|
|
||||||
target_path: Optional[str] = None,
|
|
||||||
results_path: Optional[str] = None,
|
|
||||||
module_options: Optional[dict] = None,
|
|
||||||
log: logging.Logger = logging.getLogger(__name__),
|
|
||||||
results: Optional[list] = None,
|
|
||||||
) -> None:
|
|
||||||
super().__init__(
|
|
||||||
file_path=file_path,
|
|
||||||
target_path=target_path,
|
|
||||||
results_path=results_path,
|
|
||||||
module_options=module_options,
|
|
||||||
log=log,
|
|
||||||
results=results,
|
|
||||||
)
|
|
||||||
|
|
||||||
def serialize(self, record: dict) -> dict:
|
|
||||||
return {
|
|
||||||
"timestamp": record.get("timestamp"),
|
|
||||||
"module": self.__class__.__name__,
|
|
||||||
"event": "root_binary_found",
|
|
||||||
"data": f"Root binary found: {record['path']} (binary: {record['binary_name']})",
|
|
||||||
}
|
|
||||||
|
|
||||||
def check_indicators(self) -> None:
|
|
||||||
"""Check for indicators of device rooting."""
|
|
||||||
if not self.results:
|
|
||||||
return
|
|
||||||
|
|
||||||
# All found root binaries are considered indicators of rooting
|
|
||||||
for result in self.results:
|
|
||||||
self.log.warning(
|
|
||||||
'Found root binary "%s" at path "%s"',
|
|
||||||
result["binary_name"],
|
|
||||||
result["path"],
|
|
||||||
)
|
|
||||||
self.detected.append(result)
|
|
||||||
|
|
||||||
if self.detected:
|
|
||||||
self.log.warning(
|
|
||||||
"Device shows signs of rooting with %d root binaries found",
|
|
||||||
len(self.detected),
|
|
||||||
)
|
|
||||||
|
|
||||||
def run(self) -> None:
|
|
||||||
"""Run the root binaries analysis."""
|
|
||||||
root_binaries_files = self._get_files_by_pattern("*/root_binaries.json")
|
|
||||||
|
|
||||||
if not root_binaries_files:
|
|
||||||
self.log.info("No root_binaries.json file found")
|
|
||||||
return
|
|
||||||
|
|
||||||
rawdata = self._get_file_content(root_binaries_files[0]).decode(
|
|
||||||
"utf-8", errors="ignore"
|
|
||||||
)
|
|
||||||
|
|
||||||
try:
|
|
||||||
root_binary_paths = json.loads(rawdata)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
self.log.error("Failed to parse root_binaries.json: %s", e)
|
|
||||||
return
|
|
||||||
|
|
||||||
if not isinstance(root_binary_paths, list):
|
|
||||||
self.log.error("Expected root_binaries.json to contain a list of paths")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Known root binary names that might be found and their descriptions
|
|
||||||
# This maps the binary name to a human-readable description
|
|
||||||
known_root_binaries = {
|
|
||||||
"su": "SuperUser binary",
|
|
||||||
"busybox": "BusyBox utilities",
|
|
||||||
"supersu": "SuperSU root management",
|
|
||||||
"Superuser.apk": "Superuser app",
|
|
||||||
"KingoUser.apk": "KingRoot app",
|
|
||||||
"SuperSu.apk": "SuperSU app",
|
|
||||||
"magisk": "Magisk root framework",
|
|
||||||
"magiskhide": "Magisk hide utility",
|
|
||||||
"magiskinit": "Magisk init binary",
|
|
||||||
"magiskpolicy": "Magisk policy binary",
|
|
||||||
}
|
|
||||||
|
|
||||||
for path in root_binary_paths:
|
|
||||||
if not path or not isinstance(path, str):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Extract binary name from path
|
|
||||||
binary_name = path.split("/")[-1].lower()
|
|
||||||
|
|
||||||
# Check if this matches a known root binary by exact name match
|
|
||||||
description = "Unknown root binary"
|
|
||||||
for known_binary in known_root_binaries:
|
|
||||||
if binary_name == known_binary.lower():
|
|
||||||
description = known_root_binaries[known_binary]
|
|
||||||
break
|
|
||||||
|
|
||||||
result = {
|
|
||||||
"path": path.strip(),
|
|
||||||
"binary_name": binary_name,
|
|
||||||
"description": description,
|
|
||||||
}
|
|
||||||
|
|
||||||
self.results.append(result)
|
|
||||||
|
|
||||||
self.log.info("Found %d root binaries", len(self.results))
|
|
||||||
@@ -895,10 +895,6 @@
|
|||||||
"version": "15.8.4",
|
"version": "15.8.4",
|
||||||
"build": "19H390"
|
"build": "19H390"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"version": "15.8.5",
|
|
||||||
"build": "19H394"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"build": "20A362",
|
"build": "20A362",
|
||||||
"version": "16.0"
|
"version": "16.0"
|
||||||
@@ -1004,10 +1000,6 @@
|
|||||||
"version": "16.7.11",
|
"version": "16.7.11",
|
||||||
"build": "20H360"
|
"build": "20H360"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"version": "16.7.12",
|
|
||||||
"build": "20H364"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"version": "17.0",
|
"version": "17.0",
|
||||||
"build": "21A327"
|
"build": "21A327"
|
||||||
@@ -1147,21 +1139,5 @@
|
|||||||
{
|
{
|
||||||
"version": "18.6.1",
|
"version": "18.6.1",
|
||||||
"build": "22G90"
|
"build": "22G90"
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "18.6.2",
|
|
||||||
"build": "22G100"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "18.7",
|
|
||||||
"build": "22H20"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "26",
|
|
||||||
"build": "23A341"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"version": "26.0.1",
|
|
||||||
"build": "23A355"
|
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
@@ -1,97 +0,0 @@
|
|||||||
# Mobile Verification Toolkit (MVT)
|
|
||||||
# Copyright (c) 2021-2023 The MVT Authors.
|
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
||||||
# https://license.mvt.re/1.1/
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
from mvt.common.module import run_module
|
|
||||||
|
|
||||||
from ..utils import get_android_androidqf, list_files
|
|
||||||
|
|
||||||
|
|
||||||
class TestAndroidqfMountsArtifact:
|
|
||||||
def test_parse_mounts_token_checks(self):
|
|
||||||
"""
|
|
||||||
Test the artifact-level `parse` method using tolerant token checks.
|
|
||||||
|
|
||||||
Different parser variants may place mount tokens into different dict
|
|
||||||
keys (for example `mount_options`, `pass_num`, `dump_freq`, etc.). To
|
|
||||||
avoid brittle assertions we concatenate each parsed entry's values and
|
|
||||||
look for expected tokens (device names, mount points, options) somewhere
|
|
||||||
in the combined representation.
|
|
||||||
"""
|
|
||||||
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
|
|
||||||
|
|
||||||
m = MountsArtifact()
|
|
||||||
|
|
||||||
mount_lines = [
|
|
||||||
"/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)",
|
|
||||||
"/dev/block/by-name/system on /system type ext4 (rw,seclabel,noatime)",
|
|
||||||
"/dev/block/by-name/data on /data type f2fs (rw,nosuid,nodev,noatime)",
|
|
||||||
]
|
|
||||||
mount_content = "\n".join(mount_lines)
|
|
||||||
|
|
||||||
# Parse the mount lines (artifact-level)
|
|
||||||
m.parse(mount_content)
|
|
||||||
|
|
||||||
# Basic sanity: parser should return one entry per input line
|
|
||||||
assert len(m.results) == 3, f"Expected 3 parsed mounts, got: {m.results}"
|
|
||||||
|
|
||||||
# Concatenate each entry's values into a single string so token checks
|
|
||||||
# are tolerant to which dict keys were used by the parser.
|
|
||||||
def concat_values(entry):
|
|
||||||
parts = []
|
|
||||||
for v in entry.values():
|
|
||||||
try:
|
|
||||||
parts.append(str(v))
|
|
||||||
except Exception:
|
|
||||||
# Skip values that can't be stringified
|
|
||||||
continue
|
|
||||||
return " ".join(parts)
|
|
||||||
|
|
||||||
concatenated = [concat_values(e) for e in m.results]
|
|
||||||
|
|
||||||
# Token expectations (tolerant):
|
|
||||||
# - Root line should include 'dm-12' and 'noatime' (and typically 'ro')
|
|
||||||
assert any("dm-12" in s and "noatime" in s for s in concatenated), (
|
|
||||||
f"No root-like tokens (dm-12 + noatime) found in parsed results: {concatenated}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# - System line should include '/system' or 'by-name/system' and 'rw'
|
|
||||||
assert any(
|
|
||||||
(("by-name/system" in s or "/system" in s) and "rw" in s)
|
|
||||||
for s in concatenated
|
|
||||||
), (
|
|
||||||
f"No system-like tokens (system + rw) found in parsed results: {concatenated}"
|
|
||||||
)
|
|
||||||
|
|
||||||
# - Data line should include '/data' or 'by-name/data' and 'rw'
|
|
||||||
assert any(
|
|
||||||
(("by-name/data" in s or "/data" in s) and "rw" in s) for s in concatenated
|
|
||||||
), f"No data-like tokens (data + rw) found in parsed results: {concatenated}"
|
|
||||||
|
|
||||||
|
|
||||||
class TestAndroidqfMountsModule:
|
|
||||||
def test_androidqf_module_no_mounts_file(self):
|
|
||||||
"""
|
|
||||||
When no `mounts.json` is present in the androidqf dataset, the module
|
|
||||||
should not produce results nor detections.
|
|
||||||
"""
|
|
||||||
from mvt.android.modules.androidqf.mounts import Mounts
|
|
||||||
|
|
||||||
data_path = get_android_androidqf()
|
|
||||||
m = Mounts(target_path=data_path, log=logging)
|
|
||||||
files = list_files(data_path)
|
|
||||||
parent_path = Path(data_path).absolute().parent.as_posix()
|
|
||||||
m.from_folder(parent_path, files)
|
|
||||||
|
|
||||||
run_module(m)
|
|
||||||
|
|
||||||
# The provided androidqf test dataset does not include mounts.json, so
|
|
||||||
# results should remain empty.
|
|
||||||
assert len(m.results) == 0, (
|
|
||||||
f"Expected no results when mounts.json is absent, got: {m.results}"
|
|
||||||
)
|
|
||||||
assert len(m.detected) == 0, f"Expected no detections, got: {m.detected}"
|
|
||||||
@@ -1,116 +0,0 @@
|
|||||||
# Mobile Verification Toolkit (MVT)
|
|
||||||
# Copyright (c) 2021-2023 The MVT Authors.
|
|
||||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
||||||
# https://license.mvt.re/1.1/
|
|
||||||
|
|
||||||
import logging
|
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from mvt.android.modules.androidqf.root_binaries import RootBinaries
|
|
||||||
from mvt.common.module import run_module
|
|
||||||
|
|
||||||
from ..utils import get_android_androidqf, list_files
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def data_path():
|
|
||||||
return get_android_androidqf()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def parent_data_path(data_path):
|
|
||||||
return Path(data_path).absolute().parent.as_posix()
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def file_list(data_path):
|
|
||||||
return list_files(data_path)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
|
||||||
def module(parent_data_path, file_list):
|
|
||||||
m = RootBinaries(target_path=parent_data_path, log=logging)
|
|
||||||
m.from_folder(parent_data_path, file_list)
|
|
||||||
return m
|
|
||||||
|
|
||||||
|
|
||||||
class TestAndroidqfRootBinaries:
|
|
||||||
def test_root_binaries_detection(self, module):
|
|
||||||
run_module(module)
|
|
||||||
|
|
||||||
# Should find 4 root binaries from the test file
|
|
||||||
assert len(module.results) == 4
|
|
||||||
assert len(module.detected) == 4
|
|
||||||
|
|
||||||
# Check that all results are detected as indicators
|
|
||||||
binary_paths = [result["path"] for result in module.results]
|
|
||||||
assert "/system/bin/su" in binary_paths
|
|
||||||
assert "/system/xbin/busybox" in binary_paths
|
|
||||||
assert "/data/local/tmp/magisk" in binary_paths
|
|
||||||
assert "/system/bin/magiskhide" in binary_paths
|
|
||||||
|
|
||||||
def test_root_binaries_descriptions(self, module):
|
|
||||||
run_module(module)
|
|
||||||
|
|
||||||
# Check that binary descriptions are correctly identified
|
|
||||||
su_result = next((r for r in module.results if "su" in r["binary_name"]), None)
|
|
||||||
assert su_result is not None
|
|
||||||
assert "SuperUser binary" in su_result["description"]
|
|
||||||
|
|
||||||
busybox_result = next(
|
|
||||||
(r for r in module.results if "busybox" in r["binary_name"]), None
|
|
||||||
)
|
|
||||||
assert busybox_result is not None
|
|
||||||
assert "BusyBox utilities" in busybox_result["description"]
|
|
||||||
|
|
||||||
magisk_result = next(
|
|
||||||
(r for r in module.results if r["binary_name"] == "magisk"), None
|
|
||||||
)
|
|
||||||
assert magisk_result is not None
|
|
||||||
assert "Magisk root framework" in magisk_result["description"]
|
|
||||||
|
|
||||||
magiskhide_result = next(
|
|
||||||
(r for r in module.results if "magiskhide" in r["binary_name"]), None
|
|
||||||
)
|
|
||||||
assert magiskhide_result is not None
|
|
||||||
assert "Magisk hide utility" in magiskhide_result["description"]
|
|
||||||
|
|
||||||
def test_root_binaries_warnings(self, caplog, module):
|
|
||||||
run_module(module)
|
|
||||||
|
|
||||||
# Check that warnings are logged for each root binary found
|
|
||||||
assert 'Found root binary "su" at path "/system/bin/su"' in caplog.text
|
|
||||||
assert (
|
|
||||||
'Found root binary "busybox" at path "/system/xbin/busybox"' in caplog.text
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
'Found root binary "magisk" at path "/data/local/tmp/magisk"' in caplog.text
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
'Found root binary "magiskhide" at path "/system/bin/magiskhide"'
|
|
||||||
in caplog.text
|
|
||||||
)
|
|
||||||
assert "Device shows signs of rooting with 4 root binaries found" in caplog.text
|
|
||||||
|
|
||||||
def test_serialize_method(self, module):
|
|
||||||
run_module(module)
|
|
||||||
|
|
||||||
# Test that serialize method works correctly
|
|
||||||
if module.results:
|
|
||||||
serialized = module.serialize(module.results[0])
|
|
||||||
assert serialized["module"] == "RootBinaries"
|
|
||||||
assert serialized["event"] == "root_binary_found"
|
|
||||||
assert "Root binary found:" in serialized["data"]
|
|
||||||
|
|
||||||
def test_no_root_binaries_file(self, parent_data_path):
|
|
||||||
# Test behavior when no root_binaries.json file is present
|
|
||||||
empty_file_list = []
|
|
||||||
m = RootBinaries(target_path=parent_data_path, log=logging)
|
|
||||||
m.from_folder(parent_data_path, empty_file_list)
|
|
||||||
|
|
||||||
run_module(m)
|
|
||||||
|
|
||||||
assert len(m.results) == 0
|
|
||||||
assert len(m.detected) == 0
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
[
|
|
||||||
"/system/bin/su",
|
|
||||||
"/system/xbin/busybox",
|
|
||||||
"/data/local/tmp/magisk",
|
|
||||||
"/system/bin/magiskhide"
|
|
||||||
]
|
|
||||||
@@ -62,7 +62,7 @@ class TestHashes:
|
|||||||
def test_hash_from_folder(self):
|
def test_hash_from_folder(self):
|
||||||
path = os.path.join(get_artifact_folder(), "androidqf")
|
path = os.path.join(get_artifact_folder(), "androidqf")
|
||||||
hashes = list(generate_hashes_from_path(path, logging))
|
hashes = list(generate_hashes_from_path(path, logging))
|
||||||
assert len(hashes) == 8
|
assert len(hashes) == 7
|
||||||
# Sort the files to have reliable order for tests.
|
# Sort the files to have reliable order for tests.
|
||||||
hashes = sorted(hashes, key=lambda x: x["file_path"])
|
hashes = sorted(hashes, key=lambda x: x["file_path"])
|
||||||
assert hashes[0]["file_path"] == os.path.join(path, "backup.ab")
|
assert hashes[0]["file_path"] == os.path.join(path, "backup.ab")
|
||||||
|
|||||||
Reference in New Issue
Block a user