From 6d1d499c4eadaf0e632d307284b02d0e2cebf316 Mon Sep 17 00:00:00 2001 From: Janik Besendorf Date: Fri, 7 Nov 2025 18:52:31 +0100 Subject: [PATCH] . --- .../artifacts/dumpsys_accessibility.py | 5 +- src/mvt/android/artifacts/dumpsys_appops.py | 12 +++-- .../artifacts/dumpsys_battery_daily.py | 8 ++-- .../artifacts/dumpsys_battery_history.py | 5 +- src/mvt/android/artifacts/dumpsys_dbinfo.py | 5 +- .../artifacts/dumpsys_package_activities.py | 5 +- src/mvt/android/artifacts/dumpsys_packages.py | 13 ++--- .../artifacts/dumpsys_platform_compat.py | 5 +- .../android/artifacts/dumpsys_receivers.py | 10 ++-- src/mvt/android/artifacts/getprop.py | 11 +++-- src/mvt/android/artifacts/mounts.py | 4 +- src/mvt/android/artifacts/processes.py | 10 ++-- .../android/artifacts/tombstone_crashes.py | 11 +++-- src/mvt/android/modules/adb/chrome_history.py | 5 +- src/mvt/android/modules/adb/packages.py | 10 ++-- src/mvt/android/modules/adb/sms.py | 5 +- .../android/modules/androidqf/aqf_files.py | 10 ++-- .../android/modules/androidqf/aqf_packages.py | 18 ++++--- src/mvt/android/modules/androidqf/sms.py | 5 +- src/mvt/android/modules/backup/sms.py | 9 ++-- src/mvt/common/alerts.py | 48 ++++++++++++++++--- 21 files changed, 142 insertions(+), 72 deletions(-) diff --git a/src/mvt/android/artifacts/dumpsys_accessibility.py b/src/mvt/android/artifacts/dumpsys_accessibility.py index 33fe8fa..2a17df6 100644 --- a/src/mvt/android/artifacts/dumpsys_accessibility.py +++ b/src/mvt/android/artifacts/dumpsys_accessibility.py @@ -16,8 +16,9 @@ class DumpsysAccessibilityArtifact(AndroidArtifact): for result in self.results: ioc_match = self.indicators.check_app_id(result["package_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def parse(self, content: str) -> None: diff --git a/src/mvt/android/artifacts/dumpsys_appops.py b/src/mvt/android/artifacts/dumpsys_appops.py index 52ec7d7..492eac7 100644 --- a/src/mvt/android/artifacts/dumpsys_appops.py +++ b/src/mvt/android/artifacts/dumpsys_appops.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ from datetime import datetime +from typing import Any from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult from mvt.common.utils import convert_datetime_to_iso @@ -44,8 +45,9 @@ class DumpsysAppopsArtifact(AndroidArtifact): if self.indicators: ioc_match = self.indicators.check_app_id(result.get("package_name")) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue # We use a placeholder entry to create a basic alert even without permission entries. @@ -83,9 +85,9 @@ class DumpsysAppopsArtifact(AndroidArtifact): def parse(self, output: str) -> None: # self.results: List[Dict[str, Any]] = [] - perm = {} - package = {} - entry = {} + perm: dict[str, Any] = {} + package: dict[str, Any] = {} + entry: dict[str, Any] = {} uid = None in_packages = False diff --git a/src/mvt/android/artifacts/dumpsys_battery_daily.py b/src/mvt/android/artifacts/dumpsys_battery_daily.py index 32d765a..4eb2033 100644 --- a/src/mvt/android/artifacts/dumpsys_battery_daily.py +++ b/src/mvt/android/artifacts/dumpsys_battery_daily.py @@ -3,6 +3,7 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +from typing import Any from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult @@ -30,13 +31,14 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact): for result in self.results: ioc_match = self.indicators.check_app_id(result["package_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def parse(self, output: str) -> None: daily = None - daily_updates = [] + daily_updates: list[dict[str, Any]] = [] for line in output.splitlines(): if line.startswith(" Daily from "): if len(daily_updates) > 0: diff --git a/src/mvt/android/artifacts/dumpsys_battery_history.py b/src/mvt/android/artifacts/dumpsys_battery_history.py index a6cec3e..fb42903 100644 --- a/src/mvt/android/artifacts/dumpsys_battery_history.py +++ b/src/mvt/android/artifacts/dumpsys_battery_history.py @@ -18,8 +18,9 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact): for result in self.results: ioc_match = self.indicators.check_app_id(result["package_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def parse(self, data: str) -> None: diff --git a/src/mvt/android/artifacts/dumpsys_dbinfo.py b/src/mvt/android/artifacts/dumpsys_dbinfo.py index 6dc9c32..a3a4eb1 100644 --- a/src/mvt/android/artifacts/dumpsys_dbinfo.py +++ b/src/mvt/android/artifacts/dumpsys_dbinfo.py @@ -22,8 +22,9 @@ class DumpsysDBInfoArtifact(AndroidArtifact): for part in path.split("/"): ioc_match = self.indicators.check_app_id(part) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def parse(self, output: str) -> None: diff --git a/src/mvt/android/artifacts/dumpsys_package_activities.py b/src/mvt/android/artifacts/dumpsys_package_activities.py index ef07492..31c6bbd 100644 --- a/src/mvt/android/artifacts/dumpsys_package_activities.py +++ b/src/mvt/android/artifacts/dumpsys_package_activities.py @@ -14,8 +14,9 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact): for activity in self.results: ioc_match = self.indicators.check_app_id(activity["package_name"]) if ioc_match: - activity["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", activity) + self.alertstore.critical( + ioc_match.message, "", activity, matched_indicator=ioc_match.ioc + ) continue def parse(self, content: str): diff --git a/src/mvt/android/artifacts/dumpsys_packages.py b/src/mvt/android/artifacts/dumpsys_packages.py index e5b3828..41891e0 100644 --- a/src/mvt/android/artifacts/dumpsys_packages.py +++ b/src/mvt/android/artifacts/dumpsys_packages.py @@ -30,8 +30,9 @@ class DumpsysPackagesArtifact(AndroidArtifact): ioc_match = self.indicators.check_app_id(result.get("package_name", "")) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: @@ -62,15 +63,15 @@ class DumpsysPackagesArtifact(AndroidArtifact): """ Parse one entry of a dumpsys package information """ - details = { + details: Dict[str, Any] = { "uid": "", "version_name": "", "version_code": "", "timestamp": "", "first_install_time": "", "last_update_time": "", - "permissions": [], - "requested_permissions": [], + "permissions": list(), + "requested_permissions": list(), } in_install_permissions = False in_runtime_permissions = False @@ -148,7 +149,7 @@ class DumpsysPackagesArtifact(AndroidArtifact): results = [] package_name = None package = {} - lines = [] + lines: list[str] = [] for line in output.splitlines(): if line.startswith(" Package ["): if len(lines) > 0: diff --git a/src/mvt/android/artifacts/dumpsys_platform_compat.py b/src/mvt/android/artifacts/dumpsys_platform_compat.py index 45573e3..c01fbfd 100644 --- a/src/mvt/android/artifacts/dumpsys_platform_compat.py +++ b/src/mvt/android/artifacts/dumpsys_platform_compat.py @@ -18,8 +18,9 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact): for result in self.results: ioc_match = self.indicators.check_app_id(result["package_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def parse(self, data: str) -> None: diff --git a/src/mvt/android/artifacts/dumpsys_receivers.py b/src/mvt/android/artifacts/dumpsys_receivers.py index 98b0e0e..c7130ed 100644 --- a/src/mvt/android/artifacts/dumpsys_receivers.py +++ b/src/mvt/android/artifacts/dumpsys_receivers.py @@ -52,12 +52,16 @@ class DumpsysReceiversArtifact(AndroidArtifact): ioc_match = self.indicators.check_app_id(receiver["package_name"]) if ioc_match: - receiver["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", {intent: receiver}) + self.alertstore.critical( + ioc_match.message, + "", + {intent: receiver}, + matched_indicator=ioc_match.ioc, + ) continue def parse(self, output: str) -> None: - self.results = {} + self.results: dict[str, list[dict[str, str]]] = {} in_receiver_resolver_table = False in_non_data_actions = False diff --git a/src/mvt/android/artifacts/getprop.py b/src/mvt/android/artifacts/getprop.py index ba827f9..71e4993 100644 --- a/src/mvt/android/artifacts/getprop.py +++ b/src/mvt/android/artifacts/getprop.py @@ -39,10 +39,10 @@ class GetProp(AndroidArtifact): if not matches or len(matches[0]) != 2: continue - entry = {"name": matches[0][0], "value": matches[0][1]} - self.results.append(entry) + prop_entry = {"name": matches[0][0], "value": matches[0][1]} + self.results.append(prop_entry) - def get_device_timezone(self) -> str: + def get_device_timezone(self) -> str | None: """ Get the device timezone from the getprop results @@ -70,5 +70,6 @@ class GetProp(AndroidArtifact): result.get("name", "") ) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) diff --git a/src/mvt/android/artifacts/mounts.py b/src/mvt/android/artifacts/mounts.py index f6a4258..d4370ca 100644 --- a/src/mvt/android/artifacts/mounts.py +++ b/src/mvt/android/artifacts/mounts.py @@ -179,19 +179,19 @@ class Mounts(AndroidArtifact): # Check if any mount points match indicators ioc = self.indicators.check_file_path(mount.get("mount_point", "")) if ioc: - mount["matched_indicator"] = ioc self.alertstore.critical( f"Mount point matches indicator: {mount.get('mount_point', '')}", "", mount, + matched_indicator=ioc, ) # Check device paths for indicators ioc = self.indicators.check_file_path(mount.get("device", "")) if ioc: - mount["matched_indicator"] = ioc self.alertstore.critical( f"Device path matches indicator: {mount.get('device', '')}", "", mount, + matched_indicator=ioc, ) diff --git a/src/mvt/android/artifacts/processes.py b/src/mvt/android/artifacts/processes.py index 2f1dcac..7cf962e 100644 --- a/src/mvt/android/artifacts/processes.py +++ b/src/mvt/android/artifacts/processes.py @@ -60,11 +60,13 @@ class Processes(AndroidArtifact): ioc_match = self.indicators.check_app_id(proc_name) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue ioc_match = self.indicators.check_process(proc_name) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) diff --git a/src/mvt/android/artifacts/tombstone_crashes.py b/src/mvt/android/artifacts/tombstone_crashes.py index a73b800..63e7c53 100644 --- a/src/mvt/android/artifacts/tombstone_crashes.py +++ b/src/mvt/android/artifacts/tombstone_crashes.py @@ -95,16 +95,19 @@ class TombstoneCrashArtifact(AndroidArtifact): for result in self.results: ioc_match = self.indicators.check_process(result["process_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue if result.get("command_line", []): command_name = result.get("command_line")[0].split("/")[-1] + command_name = result["command_line"][0] ioc_match = self.indicators.check_process(command_name) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue SUSPICIOUS_UIDS = [ diff --git a/src/mvt/android/modules/adb/chrome_history.py b/src/mvt/android/modules/adb/chrome_history.py index c00735d..2be1d33 100644 --- a/src/mvt/android/modules/adb/chrome_history.py +++ b/src/mvt/android/modules/adb/chrome_history.py @@ -58,8 +58,9 @@ class ChromeHistory(AndroidExtraction): for result in self.results: ioc_match = self.indicators.check_url(result["url"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def _parse_db(self, db_path: str) -> None: """Parse a Chrome History database file. diff --git a/src/mvt/android/modules/adb/packages.py b/src/mvt/android/modules/adb/packages.py index a73caf6..74e3afc 100644 --- a/src/mvt/android/modules/adb/packages.py +++ b/src/mvt/android/modules/adb/packages.py @@ -96,14 +96,16 @@ class Packages(AndroidExtraction): ioc_match = self.indicators.check_app_id(result["package_name"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) for package_file in result.get("files", []): ioc_match = self.indicators.check_file_hash(package_file["sha256"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) # @staticmethod # def check_virustotal(packages: list) -> None: diff --git a/src/mvt/android/modules/adb/sms.py b/src/mvt/android/modules/adb/sms.py index 47f0a20..0b26131 100644 --- a/src/mvt/android/modules/adb/sms.py +++ b/src/mvt/android/modules/adb/sms.py @@ -92,8 +92,9 @@ class SMS(AndroidExtraction): ioc_match = self.indicators.check_urls(message_links) if ioc_match: - message["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", message) + self.alertstore.critical( + ioc_match.message, "", message, matched_indicator=ioc_match.ioc + ) def _parse_db(self, db_path: str) -> None: """Parse an Android bugle_db SMS database file. diff --git a/src/mvt/android/modules/androidqf/aqf_files.py b/src/mvt/android/modules/androidqf/aqf_files.py index d355a9d..35a848d 100644 --- a/src/mvt/android/modules/androidqf/aqf_files.py +++ b/src/mvt/android/modules/androidqf/aqf_files.py @@ -89,8 +89,9 @@ class AQFFiles(AndroidQFModule): for result in self.results: ioc_match = self.indicators.check_file_path(result["path"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() continue @@ -113,8 +114,9 @@ class AQFFiles(AndroidQFModule): ioc_match = self.indicators.check_file_hash(result.get("sha256")) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) # TODO: adds SHA1 and MD5 when available in MVT diff --git a/src/mvt/android/modules/androidqf/aqf_packages.py b/src/mvt/android/modules/androidqf/aqf_packages.py index 19cc0d2..34ed0cf 100644 --- a/src/mvt/android/modules/androidqf/aqf_packages.py +++ b/src/mvt/android/modules/androidqf/aqf_packages.py @@ -100,15 +100,17 @@ class AQFPackages(AndroidQFModule): ioc_match = self.indicators.check_app_id(result.get("name")) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() for package_file in result.get("files", []): ioc_match = self.indicators.check_file_hash(package_file["sha256"]) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() if "certificate" not in package_file: @@ -121,8 +123,12 @@ class AQFPackages(AndroidQFModule): certificate_hash ) if ioc_match: - result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, + "", + result, + matched_indicator=ioc_match.ioc, + ) self.alertstore.log_latest() break diff --git a/src/mvt/android/modules/androidqf/sms.py b/src/mvt/android/modules/androidqf/sms.py index 687f402..46cc3f6 100644 --- a/src/mvt/android/modules/androidqf/sms.py +++ b/src/mvt/android/modules/androidqf/sms.py @@ -55,8 +55,9 @@ class SMS(AndroidQFModule): ioc_match = self.indicators.check_domains(message.get("links", [])) if ioc_match: - message["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", message) + self.alertstore.critical( + ioc_match.message, "", message, matched_indicator=ioc_match.ioc + ) def parse_backup(self, data): header = parse_ab_header(data) diff --git a/src/mvt/android/modules/backup/sms.py b/src/mvt/android/modules/backup/sms.py index fe76049..1c75587 100644 --- a/src/mvt/android/modules/backup/sms.py +++ b/src/mvt/android/modules/backup/sms.py @@ -4,7 +4,7 @@ # https://license.mvt.re/1.1/ import logging -from typing import Optional +from typing import Any, Optional from mvt.android.modules.backup.base import BackupModule from mvt.android.parsers.backup import parse_sms_file @@ -30,7 +30,7 @@ class SMS(BackupModule): log=log, results=results, ) - self.results = [] + self.results: list[dict[str, Any]] = [] def check_indicators(self) -> None: if not self.indicators: @@ -46,8 +46,9 @@ class SMS(BackupModule): ioc_match = self.indicators.check_urls(message_links) if ioc_match: - message["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(ioc_match.message, "", message) + self.alertstore.critical( + ioc_match.message, "", message, matched_indicator=ioc_match.ioc + ) continue def run(self) -> None: diff --git a/src/mvt/common/alerts.py b/src/mvt/common/alerts.py index 5ea73e6..004d422 100644 --- a/src/mvt/common/alerts.py +++ b/src/mvt/common/alerts.py @@ -29,6 +29,7 @@ class Alert: message: str event_time: str event: ModuleAtomicResult + matched_indicator: Optional[Any] = None class AlertStore: @@ -60,7 +61,7 @@ class AlertStore: # Check if it has a get_slug method (MVT modules have this) if hasattr(obj, "get_slug") and callable(obj.get_slug): try: - return obj.get_slug() + return str(obj.get_slug()) except Exception: pass @@ -81,7 +82,13 @@ class AlertStore: for alert in alerts: self.add(alert) - def info(self, message: str, event_time: str, event: ModuleAtomicResult): + def info( + self, + message: str, + event_time: str, + event: ModuleAtomicResult, + matched_indicator: Optional[Any] = None, + ): self.add( Alert( level=AlertLevel.INFORMATIONAL, @@ -89,10 +96,17 @@ class AlertStore: message=message, event_time=event_time, event=event, + matched_indicator=matched_indicator, ) ) - def low(self, message: str, event_time: str, event: ModuleAtomicResult): + def low( + self, + message: str, + event_time: str, + event: ModuleAtomicResult, + matched_indicator: Optional[Any] = None, + ): self.add( Alert( level=AlertLevel.LOW, @@ -100,10 +114,17 @@ class AlertStore: message=message, event_time=event_time, event=event, + matched_indicator=matched_indicator, ) ) - def medium(self, message: str, event_time: str, event: ModuleAtomicResult): + def medium( + self, + message: str, + event_time: str, + event: ModuleAtomicResult, + matched_indicator: Optional[Any] = None, + ): self.add( Alert( level=AlertLevel.MEDIUM, @@ -111,10 +132,17 @@ class AlertStore: message=message, event_time=event_time, event=event, + matched_indicator=matched_indicator, ) ) - def high(self, message: str, event_time: str, event: ModuleAtomicResult): + def high( + self, + message: str, + event_time: str, + event: ModuleAtomicResult, + matched_indicator: Optional[Any] = None, + ): self.add( Alert( level=AlertLevel.HIGH, @@ -122,10 +150,17 @@ class AlertStore: message=message, event_time=event_time, event=event, + matched_indicator=matched_indicator, ) ) - def critical(self, message: str, event_time: str, event: ModuleAtomicResult): + def critical( + self, + message: str, + event_time: str, + event: ModuleAtomicResult, + matched_indicator: Optional[Any] = None, + ): self.add( Alert( level=AlertLevel.CRITICAL, @@ -133,6 +168,7 @@ class AlertStore: message=message, event_time=event_time, event=event, + matched_indicator=matched_indicator, ) )