diff --git a/docs/android/intrusion_logs.md b/docs/android/intrusion_logs.md new file mode 100644 index 0000000..da8fc83 --- /dev/null +++ b/docs/android/intrusion_logs.md @@ -0,0 +1,82 @@ +# Check Android Intrusion Logs + +Recent versions of Android can produce structured *Intrusion Logs* — newline-delimited JSON records derived from the platform's [SecurityLog API](https://developer.android.com/reference/android/app/admin/SecurityLog). Intrusion Logging is offered as a new option under Android's **Advanced Protection Mode**, which users can opt into on their device; no MDM or device-policy configuration is required. When enabled, these logs provide a high-fidelity record of process starts, DNS queries, outbound network connections, ADB activity, keyguard events, and other security-relevant operations. The initial Intrusion Logging feature was released for Android 16 in May 2026. The feature and supported events is likely to be expanded over time. + +For background on how this data source was introduced and why it is forensically valuable, see the Amnesty International Security Lab announcement: [Android Intrusion Logging as a new source of data for consensual forensic analysis](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/). + +## Recommended workflow: collect with AndroidQF + +[AndroidQF](https://github.com/mvt-project/androidqf) is the recommended way to acquire data from an Android device for analysis with MVT. During acquisition AndroidQF will prompt the user to also collect intrusion logs from the device, and writes them into an `intrusion-logs/` subdirectory of the acquisition output. + +When you analyse such an acquisition with `mvt-android check-androidqf`, MVT automatically detects the `intrusion-logs/` directory and runs the same intrusion-log checks described below — there is no need to invoke a separate command: + +```bash +mvt-android check-androidqf --output /path/to/results/ /path/to/androidqf-output/ +``` + +The device timezone is read from the AndroidQF acquisition (`getprop.txt`) and applied to event timestamps automatically. + +## Standalone command: `check-intrusion-logs` + +The `mvt-android check-intrusion-logs` command runs the intrusion-log analysis directly against a set of log files. Prefer the AndroidQF workflow above; use the standalone command when the intrusion logs were collected outside of an AndroidQF acquisition, or when re-analysing only a set of intrusion logs. + +## Expected input + +`check-intrusion-logs` accepts either: + +- a **directory** containing one or more `.txt` files (recursively), or +- a **`.zip` archive** containing such `.txt` files (nested `.zip` archives are also walked). + +Each `.txt` file is expected to contain newline-delimited JSON, with one JSON object per line. Each object wraps a single event under a top-level key indicating its type, for example: + +```json +{"dns_event": {"event_time": 1746979200000, "hostname": "example.com", "ip_addresses": ["93.184.216.34"], "package_name": "com.example.app"}} +{"connect_event": {"event_time": 1746979201000, "ip_address": "93.184.216.34", "port": 443, "package_name": "com.example.app"}} +{"security_event": {"event_time": 1746979202000, "tag": 210005, "data": ["..."]}} +``` + +Identical events that appear across multiple overlapping log files (e.g. daily rotations) are de-duplicated on a first-seen basis. + +## Running the analysis + +```bash +mvt-android check-intrusion-logs --output /path/to/results/ /path/to/intrusion-logs/ +``` + +A `.zip` archive can be passed directly in place of the directory: + +```bash +mvt-android check-intrusion-logs --output /path/to/results/ /path/to/intrusion-logs.zip +``` + +### Options + +| Option | Description | +| --- | --- | +| `-i, --iocs PATH` | Path to a STIX2 indicator file. May be passed multiple times. | +| `-o, --output PATH` | Directory where JSON results and the timeline CSV will be written. | +| `-l, --list-modules` | List the available intrusion-log modules and exit. | +| `-m, --module NAME` | Run a single module (e.g. `DnsEvent`) instead of all of them. | +| `-t, --timezone TZ` | IANA timezone name for the device (e.g. `Europe/Paris`). When set, event timestamps are converted to the device's local time instead of UTC. | +| `-v, --verbose` | Verbose logging. | + +## Modules + +The command runs the following modules over the parsed events: + +- **`DnsEvent`** — DNS resolution events. Hostnames and resolved IP addresses are checked against domain indicators, and the requesting `package_name` is checked against app-identifier indicators. +- **`ConnectEvent`** — Outbound network connection events. Destination IPs (with localhost addresses skipped) are checked against domain indicators, and `package_name` is checked against app-identifier indicators. +- **`SecurityEvent`** — Security log events keyed by Android `SecurityLog` tag IDs (e.g. `app_process_start`, `adb_shell_cmd`, `keyguard_dismissed`, `os_startup`, `cert_*` events). These are surfaced in the timeline to help reconstruct device activity around suspected events. + +All three modules share a single pre-parsing pass over the input, so adding more modules in the future does not multiply I/O cost. Additional modules will be added in the future to support new event types which are generated by the Intrusion Logging feature. + +## Interpreting results + +A successful IOC match raises a `CRITICAL` alert that includes the matched indicator, the offending event, and the event timestamp. Alerts are summarised at the end of the run and persisted alongside the per-module JSON results. + +When `--timezone` is provided, timestamps in the timeline and JSON output reflect the device's local wall-clock time. Otherwise timestamps are in UTC, consistent with the rest of MVT. + +## Limitations + +- This page assumes the intrusion logs have already been collected from the device. The recommended collection path is via AndroidQF (see above); intrusion logging itself must have been enabled on the device beforehand by opting into Android's Advanced Protection mode and also enabling the optional Intrusion Logging feature (see the [Amnesty blog post](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/) for details). +- As with all IOC-based analysis, public indicators alone are not sufficient to conclude that a device is uncompromised. See the [Indicators of Compromise](../iocs.md) page for context. diff --git a/docs/android/methodology.md b/docs/android/methodology.md index 7d7a019..fc1a6c7 100644 --- a/docs/android/methodology.md +++ b/docs/android/methodology.md @@ -38,9 +38,17 @@ By separating artifact collection from forensic analysis, this approach ensures For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf). +## Android Intrusion Logs + +On devices where the user has opted into Android's [**Advanced Protection Mode**](https://support.google.com/android/answer/16339980) and turned on the optional Intrusion Logging featrue, Android can create and archive structured *Intrusion Logs* in an encrypted format. These logs record DNS queries, outbound network connections, process starts, ADB activity and other security-relevant events, and are a high-fidelity complement to the rest of an AndroidQF acquisition. The logs are generated on-device and encrypted before being stored in the Google account associated with the device. The encryption key is protected by the user device PIN. The intrusion log data is not accessible to Google. + +AndroidQF will prompt the user to download, decrypt and collect device intrusion logs as part of an acquisition. When they are present, `mvt-android check-androidqf` will automatically run the intrusion-log checks alongside the other AndroidQF modules — no extra command is required. This is the recommended workflow for Android forensic analysis with MVT. + +For cases where intrusion logs were collected outside of an AndroidQF acquisition, the standalone `mvt-android check-intrusion-logs` command can analyse them directly. See [Check Android Intrusion Logs](intrusion_logs.md) for details, and the [feature announcment from Amnesty International's Security Lab](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/) for background on the data source. + ## Android Debug Bridge analysis removed -The ability to analyze Android devices directly over ADB has been removed from MVT. Use AndroidQF for device acquisition and `mvt-android check-androidqf` for analysis. +The ability to analyze Android devices directly over ADB has been removed from MVT. Direct extraction of data from ADB was error-prone and frequently resulted in inconsistent data collection between ADB and AndroidQF acquisitions. Use AndroidQF for device acquisition and `mvt-android check-androidqf` for analysis. ## Check an Android Backup (SMS messages) diff --git a/mkdocs.yml b/mkdocs.yml index ab4b34b..d34c3fa 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -43,6 +43,7 @@ nav: - MVT for Android: - Android Forensic Methodology: "android/methodology.md" - Check an Android Backup (SMS messages): "android/backup.md" + - Check Android Intrusion Logs: "android/intrusion_logs.md" - Indicators of Compromise: "iocs.md" - Development: "development.md" - License: "license.md" diff --git a/pyproject.toml b/pyproject.toml index 52205f0..11e75de 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ "pydantic-settings==2.13.1", "NSKeyedUnArchiver==1.5.2", "python-dateutil==2.9.0.post0", - "tzdata==2026.1", + "tzdata==2026.2", ] requires-python = ">= 3.10" diff --git a/src/mvt/android/artifacts/dumpsys_accessibility.py b/src/mvt/android/artifacts/dumpsys_accessibility.py index 2a17df6..3d8fa6c 100644 --- a/src/mvt/android/artifacts/dumpsys_accessibility.py +++ b/src/mvt/android/artifacts/dumpsys_accessibility.py @@ -10,16 +10,20 @@ from .artifact import AndroidArtifact class DumpsysAccessibilityArtifact(AndroidArtifact): def check_indicators(self) -> None: - if not self.indicators: - return - for result in self.results: - ioc_match = self.indicators.check_app_id(result["package_name"]) - if ioc_match: - self.alertstore.critical( - ioc_match.message, "", result, matched_indicator=ioc_match.ioc + if self.indicators: + ioc_match = self.indicators.check_app_id(result["package_name"]) + if ioc_match: + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) + continue + + self.alertstore.medium( + f'Found accessibility service: "{result["service"]}"', + "", + result, ) - continue def parse(self, content: str) -> None: """ diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index 1244506..cdb3b3e 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -16,6 +16,7 @@ from mvt.common.help import ( HELP_MSG_CHECK_ANDROIDQF, HELP_MSG_CHECK_BUGREPORT, HELP_MSG_CHECK_IOCS, + HELP_MSG_CHECK_INTRUSION_LOGS, HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK, HELP_MSG_DISABLE_UPDATE_CHECK, HELP_MSG_HASHES, @@ -35,6 +36,8 @@ from mvt.common.utils import init_logging, set_verbose_logging from .cmd_check_androidqf import CmdAndroidCheckAndroidQF from .cmd_check_backup import CmdAndroidCheckBackup from .cmd_check_bugreport import CmdAndroidCheckBugreport +from .cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs +from .modules.intrusion_logs import INTRUSION_LOGS_MODULES from .modules.androidqf import ANDROIDQF_MODULES from .modules.backup import BACKUP_MODULES from .modules.backup.helpers import cli_load_android_backup_password @@ -266,6 +269,75 @@ def check_androidqf( cmd.show_support_message() +# ============================================================================== +# Command: check-intrusion-logs +# ============================================================================== +@cli.command( + "check-intrusion-logs", + context_settings=CONTEXT_SETTINGS, + help=HELP_MSG_CHECK_INTRUSION_LOGS, +) +@click.option( + "--iocs", + "-i", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_IOC, +) +@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) +@click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--timezone", + "-t", + default=None, + help=( + "IANA timezone name for the device, for example 'Europe/Paris'. " + "When provided, event timestamps are expressed in the device's local " + "time instead of UTC." + ), +) +@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) +@click.argument("LOGS_PATH", type=click.Path(exists=True)) +@click.pass_context +def check_intrusion_logs( + ctx, + iocs, + output, + list_modules, + module, + timezone, + verbose, + logs_path, +): + set_verbose_logging(verbose) + + module_options = {} + if timezone: + module_options["device_timezone"] = timezone + + cmd = CmdAndroidCheckIntrusionLogs( + target_path=logs_path, + results_path=output, + ioc_files=iocs, + module_name=module, + module_options=module_options if module_options else None, + disable_version_check=_get_disable_flags(ctx)[0], + disable_indicator_check=_get_disable_flags(ctx)[1], + ) + + if list_modules: + cmd.list_modules() + return + + log.info("Checking intrusion logs at path: %s", logs_path) + + cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() + + # ============================================================================== # Command: check-iocs # ============================================================================== @@ -290,7 +362,9 @@ def check_iocs(ctx, iocs, list_modules, module, folder): disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], ) - cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + cmd.modules = ( + BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES + ) if list_modules: cmd.list_modules() diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index ff96a24..54dbc3d 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -5,10 +5,14 @@ import logging import os +import shutil +import tempfile import zipfile from pathlib import Path from typing import List, Optional +from mvt.android.artifacts.getprop import GetProp +from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs from mvt.android.cmd_check_backup import CmdAndroidCheckBackup from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport from mvt.common.command import Command @@ -139,6 +143,55 @@ class CmdAndroidCheckAndroidQF(Command): raise NoAndroidQFBackup + def _read_device_timezone(self) -> Optional[str]: + getprop_files = [ + f for f in self.__files if f.replace("\\", "/").endswith("getprop.txt") + ] + if not getprop_files: + self.log.warning( + "Could not find getprop.txt; intrusion log timestamps will use UTC." + ) + return None + + try: + content = self._get_file_content(getprop_files[0]).decode( + "utf-8", errors="ignore" + ) + except Exception as exc: + self.log.warning("Could not read getprop.txt: %s", exc) + return None + + props = GetProp() + props.parse(content) + timezone = props.get_device_timezone() + if timezone: + self.log.info( + "Device timezone identified from getprop.txt: %s", + timezone, + ) + else: + self.log.warning( + "persist.sys.timezone not found in getprop.txt; " + "intrusion log timestamps will use UTC." + ) + + return timezone + + def _get_file_content(self, file_path: str) -> bytes: + if self.__format == "zip" and self.__zip: + handle = self.__zip.open(file_path) + try: + return handle.read() + finally: + handle.close() + + if self.__format == "dir" and self.target_path: + parent_path = Path(self.target_path).absolute().parent.as_posix() + with open(os.path.join(parent_path, file_path), "rb") as handle: + return handle.read() + + raise FileNotFoundError(file_path) + def run_bugreport_cmd(self) -> bool: bugreport = None try: @@ -194,9 +247,85 @@ class CmdAndroidCheckAndroidQF(Command): self.alertstore.extend(cmd.alertstore.alerts) return True + def run_intrusion_logs_cmd(self) -> bool: + intrusion_log_files = [ + f + for f in self.__files + if "/intrusion_logs/" in f.replace("\\", "/") + or f.replace("\\", "/").startswith("intrusion_logs/") + ] + + if not intrusion_log_files: + self.log.info( + "No intrusion_logs folder found in AndroidQF data, " + "skipping intrusion logs analysis." + ) + return False + + self.log.info( + "Found intrusion_logs folder in AndroidQF data, running intrusion logs analysis." + ) + + intrusion_logs_path = None + temp_dir = None + + try: + if self.__format == "dir" and self.target_path: + intrusion_logs_path = os.path.join( + os.path.abspath(self.target_path), "intrusion_logs" + ) + if not os.path.isdir(intrusion_logs_path): + self.log.warning( + "intrusion_logs directory not found at %s", + intrusion_logs_path, + ) + return False + + elif self.__format == "zip" and self.__zip: + temp_dir = tempfile.mkdtemp(prefix="mvt_intrusion_logs_") + for entry in intrusion_log_files: + normalized = entry.replace("\\", "/") + idx = normalized.find("intrusion_logs/") + relative = normalized[idx + len("intrusion_logs/") :] + if not relative or relative.endswith("/"): + continue + + target = os.path.join(temp_dir, relative) + os.makedirs(os.path.dirname(target), exist_ok=True) + with self.__zip.open(entry) as src, open(target, "wb") as dst: + dst.write(src.read()) + + intrusion_logs_path = temp_dir + else: + return False + + adv_module_options = dict(self.module_options or {}) + if device_timezone := self._read_device_timezone(): + adv_module_options["device_timezone"] = device_timezone + + cmd = CmdAndroidCheckIntrusionLogs( + target_path=intrusion_logs_path, + results_path=self.results_path, + ioc_files=self.ioc_files, + iocs=self.iocs, + module_options=adv_module_options, + hashes=self.hashes, + sub_command=True, + ) + cmd.run() + + self.timeline.extend(cmd.timeline) + self.alertstore.extend(cmd.alertstore.alerts) + return True + + finally: + if temp_dir: + shutil.rmtree(temp_dir, ignore_errors=True) + def finish(self) -> None: """ - Run the bugreport and backup modules if the respective files are found in the AndroidQF data. + Run nested modules if their respective files are found in AndroidQF data. """ self.run_bugreport_cmd() self.run_backup_cmd() + self.run_intrusion_logs_cmd() diff --git a/src/mvt/android/cmd_check_intrusion_logs.py b/src/mvt/android/cmd_check_intrusion_logs.py new file mode 100644 index 0000000..8541f9a --- /dev/null +++ b/src/mvt/android/cmd_check_intrusion_logs.py @@ -0,0 +1,113 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +import os +from typing import Optional + +from mvt.common.command import Command +from mvt.common.indicators import Indicators + +from .modules.intrusion_logs import ( + INTRUSION_LOGS_MODULES, + KNOWN_INTRUSION_LOG_EVENT_TYPES, +) +from .modules.intrusion_logs.base import IntrusionLogsModule + +log = logging.getLogger(__name__) + + +class CmdAndroidCheckIntrusionLogs(Command): + """Command to check Android Intrusion Logging files.""" + + def __init__( + self, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + ioc_files: Optional[list] = None, + iocs: Optional[Indicators] = None, + module_name: Optional[str] = None, + serial: Optional[str] = None, + module_options: Optional[dict] = None, + hashes: Optional[bool] = False, + sub_command: Optional[bool] = False, + disable_version_check: bool = False, + disable_indicator_check: bool = False, + ) -> None: + super().__init__( + target_path=target_path, + results_path=results_path, + ioc_files=ioc_files, + iocs=iocs, + module_name=module_name, + serial=serial, + module_options=module_options, + hashes=hashes, + sub_command=sub_command, + log=log, + disable_version_check=disable_version_check, + disable_indicator_check=disable_indicator_check, + ) + + self.name = "check-intrusion-logs" + self.modules = INTRUSION_LOGS_MODULES + self._all_events: dict[str, list[dict]] = {} + + def init(self) -> None: + if not self.target_path: + raise ValueError("No target path specified") + + if not os.path.isdir(self.target_path) and not ( + os.path.isfile(self.target_path) + and self.target_path.lower().endswith(".zip") + ): + raise ValueError( + f"Target path must be a directory or a .zip file: {self.target_path}" + ) + + self.log.info("Checking intrusion logs at path: %s", self.target_path) + self._all_events = self._pre_load_events() + + def module_init(self, module: IntrusionLogsModule) -> None: # type: ignore[override] + module.il_events_by_type = self._all_events + + def finish(self) -> None: + return + + def _pre_load_events(self) -> dict[str, list[dict]]: + """Load and parse all advanced-log files once for reuse by all modules.""" + self.log.info("Pre-loading intrusion log files from: %s", self.target_path) + + loader = IntrusionLogsModule( + target_path=self.target_path, + log=self.log, + ) + + try: + all_events = loader.load_all_events(self.target_path) + except Exception as exc: + self.log.error("Failed to pre-load events: %s", exc) + return {} + + total_events = sum(len(events) for events in all_events.values()) + self.log.info( + "Pre-loaded %d events across %d type(s); modules will reuse this data", + total_events, + len(all_events), + ) + + unknown_event_types = sorted( + event_type + for event_type in all_events + if event_type not in KNOWN_INTRUSION_LOG_EVENT_TYPES + ) + if unknown_event_types: + self.log.warning( + "Found unknown intrusion logging event type(s): %s. " + "Please open an issue on GitHub so MVT can add support for them.", + ", ".join(unknown_event_types), + ) + + return all_events diff --git a/src/mvt/android/modules/bugreport/base.py b/src/mvt/android/modules/bugreport/base.py index e73b119..156e01c 100644 --- a/src/mvt/android/modules/bugreport/base.py +++ b/src/mvt/android/modules/bugreport/base.py @@ -6,6 +6,7 @@ import datetime import fnmatch import logging import os +from pathlib import Path from typing import List, Optional from zipfile import ZipFile @@ -70,7 +71,10 @@ class BugReportModule(MVTModule): else: if not self.extract_path: raise ValueError("extract_path is not set") - handle = open(os.path.join(self.extract_path, file_path), "rb") + joined = os.path.join(self.extract_path, file_path) + if not Path(joined).resolve().is_relative_to(Path(self.extract_path).resolve()): + raise ValueError("unsafe file_path") + handle = open(joined, "rb") data = handle.read() handle.close() diff --git a/src/mvt/android/modules/intrusion_logs/__init__.py b/src/mvt/android/modules/intrusion_logs/__init__.py new file mode 100644 index 0000000..f8be973 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/__init__.py @@ -0,0 +1,20 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from .connect_event import ConnectEvent +from .dns_event import DnsEvent +from .security_event import SecurityEvent + +INTRUSION_LOGS_MODULES = [ + DnsEvent, + ConnectEvent, + SecurityEvent, +] + +KNOWN_INTRUSION_LOG_EVENT_TYPES = { + "connect_event", + "dns_event", + "security_event", +} diff --git a/src/mvt/android/modules/intrusion_logs/base.py b/src/mvt/android/modules/intrusion_logs/base.py new file mode 100644 index 0000000..4aac618 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/base.py @@ -0,0 +1,395 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import datetime +import io +import json +import logging +import zipfile +from pathlib import Path +from typing import Optional, Union + +try: + import zoneinfo +except ImportError: + from backports import zoneinfo # type: ignore[no-redef] + +from mvt.common.module import MVTModule +from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso + + +class IntrusionLogsModule(MVTModule): + """Base class for modules analyzing intrusion logs (newline-delimited JSON). + + Performance note + ---------------- + Log files can be large and are shared by every module in this package. + To avoid re-reading and re-parsing the same files N times (once per + module), the command layer should call :meth:`load_all_events` exactly + once and then assign the returned dict to the ``il_events_by_type`` + attribute of every module instance **before** calling ``run_module``. + + When ``il_events_by_type`` is populated: + * :meth:`collect_txt` becomes a no-op (no disk I/O). + * :meth:`parse_collected_txt` iterates the in-memory list for the + requested event type instead of re-parsing raw text. + + Modules that are used standalone (e.g. in tests) still work as before + because ``il_events_by_type`` defaults to ``None``, which preserves the + original file-loading code path. + """ + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + # Raw file content collected by collect_txt (fallback path only). + self.il_files: list[tuple[str, str]] = [] + + # Pre-parsed events injected by the command layer. + # Keys are event-type strings (e.g. "dns_event"), values are lists of + # raw event-data dicts exactly as they appear in the JSON lines. + # When this is not None, collect_txt and parse_collected_txt use it + # instead of touching the file system. + self.il_events_by_type: Optional[dict[str, list[dict]]] = None + + # ------------------------------------------------------------------ + # Serialization helper + # ------------------------------------------------------------------ + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a record for timeline output.""" + return { + "timestamp": record.get("timestamp", record.get("isodate")), + "module": self.__class__.__name__, + "event": record.get("event_type", ""), + "data": str(record), + } + + # ------------------------------------------------------------------ + # File collection + # ------------------------------------------------------------------ + + def collect_txt(self, source) -> None: + """Collect text log files from *source* into ``self.il_files``. + + Entry points: + * directory → walk recursively + * zip file → walk zip entries + * anything else → silently skip + + If ``self.il_events_by_type`` has already been populated (i.e. the + command layer pre-loaded the events), this method returns immediately + without any disk I/O. + """ + if self.il_events_by_type is not None: + self.log.debug( + "Pre-loaded events available — skipping file collection for %s", + self.__class__.__name__, + ) + return + + path = Path(source) + + if path.is_dir(): + self._walk_directory(path) + return + + if path.is_file() and path.suffix.lower() == ".zip": + try: + with zipfile.ZipFile(path) as z: + self._walk_zip(z) + except zipfile.BadZipFile: + self.log.debug("Skipping invalid zip: %s", path) + return + + self.log.debug("Skipping unsupported source: %s", source) + + def _walk_directory(self, root: Path, prefix: str = "") -> None: + for item in root.iterdir(): + if item.is_dir(): + self._walk_directory(item, prefix=f"{prefix}{item.name}/") + continue + + if item.suffix.lower() == ".txt": + self.il_files.append( + (f"{prefix}{item.name}", item.read_text(errors="ignore")) + ) + + elif item.suffix.lower() == ".zip": + try: + with zipfile.ZipFile(item) as z: + self._walk_zip(z, prefix=f"{prefix}{item.name}::") + except zipfile.BadZipFile: + self.log.warning("Skipping invalid zip: %s", item) + + def _walk_zip(self, zf: zipfile.ZipFile, prefix: str = "") -> None: + for info in zf.infolist(): + if info.is_dir(): + continue + + name = info.filename + with zf.open(info) as f: + data = f.read() + + if name.lower().endswith(".txt"): + self.il_files.append((f"{prefix}{name}", data.decode(errors="ignore"))) + + elif name.lower().endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(data)) as inner: + self._walk_zip(inner, prefix=f"{prefix}{name}::") + + # ------------------------------------------------------------------ + # Single-pass loader (used by the command layer) + # ------------------------------------------------------------------ + + def load_all_events(self, source) -> dict[str, list[dict]]: + """Read every log file under *source* **once** and parse all JSON + lines in a single pass, routing events into per-type buckets. + + Returns a ``dict`` mapping *event_type* strings to lists of raw + event-data dicts. The result is also stored in + ``self.il_events_by_type`` so that subsequent calls to + :meth:`collect_txt` and :meth:`parse_collected_txt` on *this* + instance are no-ops. + + Intended usage in the command layer:: + + loader = IntrusionLogsModule(target_path=target, log=log) + all_events = loader.load_all_events(target) + + for module_cls in INTRUSION_LOGS_MODULES: + m = module_cls(target_path=target, ...) + m.il_events_by_type = all_events # inject — no re-reading + run_module(m) + """ + # Reset so that _collect_txt actually runs (il_events_by_type is None). + self.il_events_by_type = None + self.il_files = [] + self.collect_txt(source) + + events_by_type: dict[str, list[dict]] = {} + # JSON fingerprints used to drop events that appear in more than one + # log file (overlapping daily files are the most common source of + # cross-file duplicates). + seen_fingerprints: set[str] = set() + total_lines = 0 + skipped_lines = 0 + duplicate_lines = 0 + + for file_name, text in self.il_files: + for line_num, line in enumerate(text.splitlines(), start=1): + line = line.strip() + if not line: + continue + + total_lines += 1 + try: + entry = json.loads(line) + for event_type, event_data in entry.items(): + if isinstance(event_data, dict): + fingerprint = json.dumps(event_data, sort_keys=True) + if fingerprint in seen_fingerprints: + duplicate_lines += 1 + continue + seen_fingerprints.add(fingerprint) + events_by_type.setdefault(event_type, []).append(event_data) + except json.JSONDecodeError as e: + skipped_lines += 1 + self.log.warning( + "Failed to parse JSON on line %d in %s: %s", + line_num, + file_name, + e, + ) + except Exception as e: + skipped_lines += 1 + self.log.warning( + "Error processing line %d in %s: %s", + line_num, + file_name, + e, + ) + + if duplicate_lines: + self.log.info( + "Removed %d duplicate event(s) seen across multiple log files", + duplicate_lines, + ) + + self.log.info( + "Loaded %d log files, parsed %d lines (%d skipped), found event types: %s", + len(self.il_files), + total_lines, + skipped_lines, + {k: len(v) for k, v in events_by_type.items()}, + ) + + # Cache so this instance also benefits from the fast path. + self.il_events_by_type = events_by_type + return events_by_type + + # ------------------------------------------------------------------ + # Parsing + # ------------------------------------------------------------------ + + def parse_collected_txt(self, event_type: str) -> None: + """Parse collected log text and dispatch events of *event_type*. + + Fast path + ~~~~~~~~~ + When ``self.il_events_by_type`` is populated (injected by the command + layer after a single shared :meth:`load_all_events` call), the method + iterates the already-parsed in-memory list for *event_type* — no + re-reading, no re-parsing of JSON. + + Fallback path + ~~~~~~~~~~~~~ + When ``self.il_events_by_type`` is ``None``, the method falls back to + iterating ``self.il_files`` and parsing each JSON line, which is the + original behaviour. + """ + if self.il_events_by_type is not None: + events = self.il_events_by_type.get(event_type, []) + self.log.debug( + "Using pre-loaded events: dispatching %d '%s' events", + len(events), + event_type, + ) + for event_data in events: + try: + # Work on a shallow copy so that mutations in one module + # (e.g. adding "timestamp") do not affect other modules + # that share the same dict reference. + self.process_event(dict(event_data)) + except Exception as e: + self.log.warning( + "Error processing pre-parsed '%s' event: %s", + event_type, + e, + ) + return + + # Fallback: parse raw text collected by collect_txt(). + # Use the same JSON-fingerprint approach as MVTModule._deduplicate_timeline + # to drop events that appear verbatim in more than one log file. + seen_fingerprints: set[str] = set() + duplicate_count = 0 + for file_name, text in self.il_files: + for line_num, line in enumerate(text.splitlines(), start=1): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + if event_type in entry: + event_data = entry[event_type] + fingerprint = json.dumps(event_data, sort_keys=True) + if fingerprint in seen_fingerprints: + duplicate_count += 1 + continue + seen_fingerprints.add(fingerprint) + event_data["event_type"] = event_type + self.process_event(event_data) + except json.JSONDecodeError as e: + self.log.warning( + "Failed to parse JSON on line %d in %s: %s", + line_num, + file_name, + str(e), + ) + except Exception as e: + self.log.warning( + "Error processing line %d in %s: %s", + line_num, + file_name, + str(e), + ) + if duplicate_count: + self.log.info( + "Removed %d duplicate '%s' event(s) seen across multiple log files", + duplicate_count, + event_type, + ) + + # ------------------------------------------------------------------ + # Event processing + # ------------------------------------------------------------------ + + def process_event(self, event_data: dict) -> None: + """Process an individual event. Override this in subclasses. + + Args: + event_data: Dictionary containing the event data. + """ + self.results.append(event_data) + + # ------------------------------------------------------------------ + # Timestamp localisation + # ------------------------------------------------------------------ + + def _localize_timestamp(self, event_time_seconds: float) -> str: + """Convert a Unix timestamp (in seconds) to an ISO string. + + When the device timezone is available via ``module_options["device_timezone"]`` + (a IANA timezone name such as ``"Europe/Paris"`` read from + ``persist.sys.timezone`` in ``getprop.txt``), the UTC instant is + converted to the device's local time before formatting — mirroring the + approach used by ``AQFFiles``. + + When no timezone is configured the method falls back to UTC, which is + consistent with all other MVT modules that call ``convert_unix_to_iso``. + + Args: + event_time_seconds: Unix epoch timestamp expressed in **seconds** + (callers are responsible for dividing ms/ns values first). + + Returns: + ISO-formatted datetime string (``YYYY-mm-dd HH:MM:SS.ffffff``). + The string always represents the device-local time (or UTC when no + timezone is known); no UTC offset suffix is appended, matching the + format produced by :func:`mvt.common.utils.convert_unix_to_iso`. + """ + tz_name: Optional[str] = self.module_options.get("device_timezone") + if tz_name: + try: + device_tz = zoneinfo.ZoneInfo(tz_name) + utc_dt = datetime.datetime.fromtimestamp( + event_time_seconds, tz=datetime.timezone.utc + ) + local_dt = utc_dt.astimezone(device_tz) + # Strip tzinfo so that convert_datetime_to_iso outputs the + # local wall-clock time without a timezone suffix. This is + # the same pattern used by AQFFiles. + return convert_datetime_to_iso(local_dt.replace(tzinfo=None)) + except Exception as e: + self.log.warning( + "Could not apply device timezone '%s', falling back to UTC: %s", + tz_name, + e, + ) + + return convert_unix_to_iso(event_time_seconds) + + # ------------------------------------------------------------------ + # Abstract interface + # ------------------------------------------------------------------ + + def run(self) -> None: + """Main execution method. Must be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the run() method") diff --git a/src/mvt/android/modules/intrusion_logs/connect_event.py b/src/mvt/android/modules/intrusion_logs/connect_event.py new file mode 100644 index 0000000..054dcb1 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/connect_event.py @@ -0,0 +1,121 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + + +class ConnectEvent(IntrusionLogsModule): + """This module analyzes network connection events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + + def check_indicators(self) -> None: + """Check connection events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check IP address against indicators + ip_address = result.get("ip_address", "") + if ip_address: + # Clean IP address (remove leading slash and extract IP from format like "ip6-localhost/::1") + if "/" in ip_address: + parts = ip_address.split("/") + clean_ip = parts[-1] if len(parts) > 1 else parts[0] + else: + clean_ip = ip_address.lstrip("/") + + # Skip localhost addresses + if clean_ip and clean_ip not in ["::1", "127.0.0.1", "0.0.0.0"]: + ioc = self.indicators.check_domain(clean_ip) + if ioc: + result["matched_ip"] = clean_ip + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check package name against app identifiers + package_name = result.get("package_name", "") + if package_name: + ioc = self.indicators.check_app_id(package_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a connection event record for timeline output.""" + ip_address = record.get("ip_address", "") + port = record.get("port", 0) + package_name = record.get("package_name", "") + matched_ip = record.get("matched_ip", "") + + # Clean IP address for display + if "/" in ip_address: + parts = ip_address.split("/") + clean_ip = parts[-1] if len(parts) > 1 else parts[0] + else: + clean_ip = ip_address.lstrip("/") + + # Indicate when IP matched an IoC + if matched_ip: + data = f"Connection to {clean_ip}:{port} by {package_name} [Matched IP: {matched_ip}]" + else: + data = f"Connection to {clean_ip}:{port} by {package_name}" + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": "network_connection", + "data": data, + } + + def process_event(self, event_data: dict) -> None: + """Process a connection event and add it to results.""" + # Convert event_time from milliseconds to ISO format + event_time = event_data.get("event_time") + if event_time: + # Android event times are in milliseconds since epoch + event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0) + else: + event_data["timestamp"] = None + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze connection events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("connect_event") + + self.log.info("Identified %d connection events", len(self.results)) diff --git a/src/mvt/android/modules/intrusion_logs/dns_event.py b/src/mvt/android/modules/intrusion_logs/dns_event.py new file mode 100644 index 0000000..242ae7b --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/dns_event.py @@ -0,0 +1,141 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + + +class DnsEvent(IntrusionLogsModule): + """This module analyzes DNS events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + + def check_indicators(self) -> None: + """Check DNS events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check hostname against domain indicators + hostname = result.get("hostname", "") + if hostname: + ioc = self.indicators.check_domain(hostname) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check IP addresses against indicators + ip_addresses = result.get("ip_addresses", []) + matched_ips = [] + for ip_addr in ip_addresses: + # Remove leading slash if present + clean_ip = ( + ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr) + ) + if clean_ip and clean_ip != "0.0.0.0": + ioc = self.indicators.check_domain(clean_ip) + if ioc: + matched_ips.append(clean_ip) + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Store matched IPs for timeline display + if matched_ips: + result["matched_ips"] = matched_ips + + # Check package name against app identifiers + package_name = result.get("package_name", "") + if package_name: + ioc = self.indicators.check_app_id(package_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a DNS event record for timeline output.""" + hostname = record.get("hostname", "") + package_name = record.get("package_name", "") + + # Get IP addresses for display + ip_addresses = record.get("ip_addresses", []) + matched_ips = record.get("matched_ips", []) + + # Clean up IP addresses (remove leading slashes) + clean_ips = [] + for ip_addr in ip_addresses: + clean_ip = ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr) + if clean_ip and clean_ip != "0.0.0.0": + clean_ips.append(clean_ip) + + # Build the data string with actual IPs + if matched_ips: + # Highlight matched IPs in the output + ip_display = ", ".join(matched_ips) + data = f"DNS query for {hostname} by {package_name} [Matched IPs: {ip_display}]" + elif clean_ips: + ip_display = ", ".join(clean_ips) + data = f"DNS query for {hostname} by {package_name} [IPs: {ip_display}]" + else: + data = f"DNS query for {hostname} by {package_name}" + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": "dns_query", + "data": data, + } + + def process_event(self, event_data: dict) -> None: + """Process a DNS event and add it to results.""" + # Convert event_time from milliseconds to ISO format + event_time = event_data.get("event_time") + if event_time: + # Android event times are in milliseconds since epoch + event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0) + else: + event_data["timestamp"] = None + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze DNS events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("dns_event") + + self.log.info("Identified %d DNS events", len(self.results)) diff --git a/src/mvt/android/modules/intrusion_logs/security_event.py b/src/mvt/android/modules/intrusion_logs/security_event.py new file mode 100644 index 0000000..7190db4 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/security_event.py @@ -0,0 +1,758 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + +# Security event tags based on Android SecurityLog API +# Reference: https://developer.android.com/reference/android/app/admin/SecurityLog +SECURITY_EVENT_TAGS = { + # ADB events (API level 24) + "adb_shell_interactive": { + "tag_id": 210001, + "name": "ADB Shell Interactive", + "description": "An ADB interactive shell was opened via 'adb shell'", + }, + "adb_shell_cmd": { + "tag_id": 210002, + "name": "ADB Shell Command", + "description": "A shell command was issued over ADB via 'adb shell '", + }, + "adb_sync_recv_file": { + "tag_id": 210003, + "name": "ADB Sync Recv File", + "description": "A file was pulled from the device via adb daemon (adb pull)", + }, + "adb_sync_send_file": { + "tag_id": 210004, + "name": "ADB Sync Send File", + "description": "A file was pushed to the device via adb daemon (adb push)", + }, + # App process events (API level 24) + "app_process_start": { + "tag_id": 210005, + "name": "App Process Start", + "description": "An app process was started", + }, + # Keyguard events (API level 24) + "keyguard_dismissed": { + "tag_id": 210006, + "name": "Keyguard Dismissed", + "description": "Keyguard has been dismissed", + }, + "keyguard_dismiss_auth_attempt": { + "tag_id": 210007, + "name": "Keyguard Dismiss Auth Attempt", + "description": "Authentication attempt to dismiss keyguard", + }, + "keyguard_secured": { + "tag_id": 210008, + "name": "Keyguard Secured", + "description": "Device has been locked", + }, + # OS events (API level 28) + "os_startup": { + "tag_id": 210009, + "name": "OS Startup", + "description": "Android OS has started", + }, + "os_shutdown": { + "tag_id": 210010, + "name": "OS Shutdown", + "description": "Android OS has shutdown", + }, + # Logging events (API level 28) + "logging_started": { + "tag_id": 210011, + "name": "Logging Started", + "description": "Audit logging has started", + }, + "logging_stopped": { + "tag_id": 210012, + "name": "Logging Stopped", + "description": "Audit logging has stopped", + }, + # Media events (API level 28) + "media_mount": { + "tag_id": 210013, + "name": "Media Mount", + "description": "Removable media has been mounted", + }, + "media_unmount": { + "tag_id": 210014, + "name": "Media Unmount", + "description": "Removable media was unmounted", + }, + # Log buffer event (API level 28) + "log_buffer_size_critical": { + "tag_id": 210015, + "name": "Log Buffer Size Critical", + "description": "Audit log buffer has reached 90% capacity", + }, + # Password policy events (API level 28) + "password_expiration_set": { + "tag_id": 210016, + "name": "Password Expiration Set", + "description": "Admin set password expiration timeout", + }, + "password_complexity_set": { + "tag_id": 210017, + "name": "Password Complexity Set", + "description": "Admin set password complexity requirement", + }, + "password_history_length_set": { + "tag_id": 210018, + "name": "Password History Length Set", + "description": "Admin set password history length", + }, + "max_screen_lock_timeout_set": { + "tag_id": 210019, + "name": "Max Screen Lock Timeout Set", + "description": "Admin set maximum screen lock timeout", + }, + "max_password_attempts_set": { + "tag_id": 210020, + "name": "Max Password Attempts Set", + "description": "Admin set maximum failed password attempts before wipe", + }, + "keyguard_disabled_features_set": { + "tag_id": 210021, + "name": "Keyguard Disabled Features Set", + "description": "Admin set disabled keyguard features", + }, + # Remote lock event (API level 28) + "remote_lock": { + "tag_id": 210022, + "name": "Remote Lock", + "description": "Admin remotely locked the device or profile", + }, + # Wipe failure event (API level 28) + "wipe_failure": { + "tag_id": 210023, + "name": "Wipe Failure", + "description": "Failed to wipe device or user data", + }, + # Cryptographic key events (API level 28) + "key_generated": { + "tag_id": 210024, + "name": "Key Generated", + "description": "Cryptographic key was generated", + }, + "key_import": { + "tag_id": 210025, + "name": "Key Import", + "description": "Cryptographic key was imported", + }, + "key_destruction": { + "tag_id": 210026, + "name": "Key Destruction", + "description": "Cryptographic key was destroyed", + }, + # User restriction events (API level 28) + "user_restriction_added": { + "tag_id": 210027, + "name": "User Restriction Added", + "description": "Admin added a user restriction", + }, + "user_restriction_removed": { + "tag_id": 210028, + "name": "User Restriction Removed", + "description": "Admin removed a user restriction", + }, + # Certificate events (API level 28) + "cert_authority_installed": { + "tag_id": 210029, + "name": "Certificate Authority Installed", + "description": "Root certificate installed to trusted storage", + }, + "cert_authority_removed": { + "tag_id": 210030, + "name": "Certificate Authority Removed", + "description": "Root certificate removed from trusted storage", + }, + "crypto_self_test_completed": { + "tag_id": 210031, + "name": "Crypto Self Test Completed", + "description": "Cryptographic functionality self test completed", + }, + "key_integrity_violation": { + "tag_id": 210032, + "name": "Key Integrity Violation", + "description": "Key integrity violation detected", + }, + "cert_validation_failure": { + "tag_id": 210033, + "name": "Certificate Validation Failure", + "description": "X.509v3 certificate validation failed", + }, + # Camera policy event (API level 30) + "camera_policy_set": { + "tag_id": 210034, + "name": "Camera Policy Set", + "description": "Admin set policy to disable camera", + }, + # Password complexity events (API level 31/33) + "password_complexity_required": { + "tag_id": 210035, + "name": "Password Complexity Required", + "description": "Admin set password complexity requirement using predefined levels", + }, + "password_changed": { + "tag_id": 210036, + "name": "Password Changed", + "description": "User changed their lockscreen password", + }, + # WiFi events (API level 33) + "wifi_connection": { + "tag_id": 210037, + "name": "WiFi Connection", + "description": "Device attempted to connect to a managed WiFi network", + }, + "wifi_disconnection": { + "tag_id": 210038, + "name": "WiFi Disconnection", + "description": "Device disconnected from a managed WiFi network", + }, + # Bluetooth events (API level 33) + "bluetooth_connection": { + "tag_id": 210039, + "name": "Bluetooth Connection", + "description": "Device attempted to connect to a Bluetooth device", + }, + "bluetooth_disconnection": { + "tag_id": 210040, + "name": "Bluetooth Disconnection", + "description": "Device disconnected from a Bluetooth device", + }, + # Package events (API level 34) + "package_installed": { + "tag_id": 210041, + "name": "Package Installed", + "description": "Application package was installed", + }, + "package_updated": { + "tag_id": 210042, + "name": "Package Updated", + "description": "Application package was updated", + }, + "package_uninstalled": { + "tag_id": 210043, + "name": "Package Uninstalled", + "description": "Application package was uninstalled", + }, + # Backup service event (API level 35) + "backup_service_toggled": { + "tag_id": 210044, + "name": "Backup Service Toggled", + "description": "Admin enabled or disabled backup service", + }, + # NFC events (API level 36) + "nfc_enabled": { + "tag_id": 210045, + "name": "NFC Enabled", + "description": "NFC service is enabled", + }, + "nfc_disabled": { + "tag_id": 210046, + "name": "NFC Disabled", + "description": "NFC service is disabled", + }, +} + +SECURITY_EVENT_METADATA_KEYS = { + "event_time", + "event_type", + "timestamp", +} + + +class SecurityEvent(IntrusionLogsModule): + """This module analyzes security events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + self.event_type_counts: dict[str, int] = {} + + def _get_event_tag(self, event_data: dict) -> Optional[str]: + """Return the security-event tag key, including tags unknown to MVT.""" + for key in event_data: + if key not in SECURITY_EVENT_METADATA_KEYS: + return key + + return None + + def check_indicators(self) -> None: + """Check security events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check app process start events for suspicious package names + if "app_process_start" in result: + process_info = result["app_process_start"] + process_name = process_info.get("process", "") + if process_name: + # Check the full process name + ioc = self.indicators.check_app_id(process_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Also check process components after the first colon + # Example: "com.google.android.webview:sandboxed_process0:org.chromium.content.app.SandboxedProcessService0:0" + # We want to check "sandboxed_process0" and subsequent components + if ":" in process_name: + components = process_name.split(":") + for component in components[ + 1: + ]: # Skip the first component (main package name) + if component: + ioc = self.indicators.check_app_id(component) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + break + + # Check package operations for suspicious packages + for pkg_event in [ + "package_installed", + "package_updated", + "package_uninstalled", + ]: + if pkg_event in result: + pkg_info = result[pkg_event] + pkg_name = pkg_info.get("package_name", "") + if pkg_name: + ioc = self.indicators.check_app_id(pkg_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check ADB shell commands for suspicious patterns + if "adb_shell_cmd" in result: + cmd_info = result["adb_shell_cmd"] + command = cmd_info.get("command", "") + if command: + # Check if command contains any suspicious app IDs + ioc = self.indicators.check_app_id(command) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check ADB file sync events for suspicious paths + for adb_event in ["adb_sync_recv_file", "adb_sync_send_file"]: + if adb_event in result: + file_info = result[adb_event] + file_path = file_info.get("path", "") + if file_path: + ioc = self.indicators.check_file_path(file_path) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Flag failed cryptographic operations as potentially suspicious + if "key_generated" in result: + if not result["key_generated"].get("success", True): + self.log.warning( + "Failed key generation detected for key_id: %s", + result["key_generated"].get("key_id", "unknown"), + ) + + # Flag certificate validation failures + if "cert_validation_failure" in result: + self.log.warning( + "Certificate validation failure detected: %s", + result.get("cert_validation_failure"), + ) + + # Flag key integrity violations + if "key_integrity_violation" in result: + self.alertstore.medium( + f"Key integrity violation detected: {result.get('key_integrity_violation')}", + result.get("timestamp") or "", + result, + ) + + # Flag certificate authority installations (potential MITM) + if "cert_authority_installed" in result: + cert_info = result["cert_authority_installed"] + self.log.warning( + "Certificate authority installed: %s (success: %s)", + cert_info.get("subject", "unknown"), + cert_info.get("success", "unknown"), + ) + + # Flag wipe failures + if "wipe_failure" in result: + self.alertstore.medium( + "Device wipe failure detected", + result.get("timestamp") or "", + result, + ) + + # Flag crypto self test failures + if "crypto_self_test_completed" in result: + test_result = result["crypto_self_test_completed"] + if isinstance(test_result, dict): + success = test_result.get("success", True) + else: + success = test_result == 1 + if not success: + self.alertstore.medium( + "Cryptographic self test failed", + result.get("timestamp") or "", + result, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a security event record for timeline output.""" + # Determine the event sub-type + event_subtype = None + event_data_str = "" + + event_subtype = self._get_event_tag(record) + if event_subtype: + event_info = record[event_subtype] + + if event_subtype in SECURITY_EVENT_TAGS: + # ADB events + if event_subtype == "adb_shell_interactive": + event_data_str = "ADB interactive shell opened" + elif event_subtype == "adb_shell_cmd": + command = event_info.get("command", "") + event_data_str = f"ADB shell command: {command}" + elif event_subtype == "adb_sync_recv_file": + path = event_info.get("path", "") + event_data_str = f"File pulled via ADB: {path}" + elif event_subtype == "adb_sync_send_file": + path = event_info.get("path", "") + event_data_str = f"File pushed via ADB: {path}" + + # App process events + elif event_subtype == "app_process_start": + process_name = event_info.get("process", "") + uid = event_info.get("uid", "") + pid = event_info.get("pid", "") + event_data_str = ( + f"Process started: {process_name} (UID: {uid}, PID: {pid})" + ) + + # Keyguard events + elif event_subtype == "keyguard_dismiss_auth_attempt": + success = event_info.get("success", False) + method = event_info.get("method_strength", 0) + event_data_str = f"Auth attempt: {'Success' if success else 'Failed'} (method strength: {method})" + elif event_subtype == "keyguard_dismissed": + event_data_str = "Keyguard dismissed" + elif event_subtype == "keyguard_secured": + event_data_str = "Device locked" + elif event_subtype == "keyguard_disabled_features_set": + admin = event_info.get("admin_package", "") + features = event_info.get("disabled_features", "") + event_data_str = ( + f"Keyguard features disabled by {admin}: {features}" + ) + + # Key events + elif event_subtype == "key_generated": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + uid = event_info.get("uid", "") + event_data_str = f"Key {'generated' if success else 'generation failed'}: {key_id} (UID: {uid})" + elif event_subtype == "key_destruction": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + uid = event_info.get("uid", "") + event_data_str = f"Key {'destroyed' if success else 'destruction failed'}: {key_id} (UID: {uid})" + elif event_subtype == "key_import": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + event_data_str = ( + f"Key {'imported' if success else 'import failed'}: {key_id}" + ) + elif event_subtype == "key_integrity_violation": + key_id = event_info.get("key_id", "unknown") + event_data_str = f"Key integrity violation: {key_id}" + + # Certificate events + elif event_subtype == "cert_authority_installed": + success = event_info.get("success", False) + subject = event_info.get("subject", "unknown") + event_data_str = f"Cert {'installed' if success else 'install failed'}: {subject}" + elif event_subtype == "cert_authority_removed": + success = event_info.get("success", False) + subject = event_info.get("subject", "unknown") + event_data_str = ( + f"Cert {'removed' if success else 'removal failed'}: {subject}" + ) + elif event_subtype == "cert_validation_failure": + reason = ( + event_info if isinstance(event_info, str) else str(event_info) + ) + event_data_str = f"Certificate validation failure: {reason}" + elif event_subtype == "crypto_self_test_completed": + if isinstance(event_info, dict): + success = event_info.get("success", False) + else: + success = event_info == 1 + event_data_str = ( + f"Crypto self test: {'passed' if success else 'FAILED'}" + ) + + # Package events + elif event_subtype in [ + "package_installed", + "package_updated", + "package_uninstalled", + ]: + pkg_name = event_info.get("package_name", "") + version = event_info.get("version_code", "") + user_id = event_info.get("user_id", "") + action = event_subtype.replace("package_", "").title() + event_data_str = ( + f"Package {action}: {pkg_name} (v{version}, user: {user_id})" + ) + + # OS events + elif event_subtype == "os_startup": + verified_boot = event_info.get("verified_boot_state", "") + dm_verity = event_info.get("dm_verity_mode", "") + event_data_str = f"OS startup (verified boot: {verified_boot}, dm-verity: {dm_verity})" + elif event_subtype == "os_shutdown": + event_data_str = "OS shutdown" + + # Logging events + elif event_subtype == "logging_started": + event_data_str = "Audit logging started" + elif event_subtype == "logging_stopped": + event_data_str = "Audit logging stopped" + elif event_subtype == "log_buffer_size_critical": + event_data_str = "Log buffer at 90% capacity" + + # Media events + elif event_subtype == "media_mount": + mount_point = event_info.get("mount_point", "") + label = event_info.get("volume_label", "") + event_data_str = f"Media mounted: {mount_point} ({label})" + elif event_subtype == "media_unmount": + mount_point = event_info.get("mount_point", "") + label = event_info.get("volume_label", "") + event_data_str = f"Media unmounted: {mount_point} ({label})" + + # Password policy events + elif event_subtype == "password_expiration_set": + admin = event_info.get("admin_package", "") + timeout = event_info.get("timeout_ms", "") + event_data_str = f"Password expiration set by {admin}: {timeout}ms" + elif event_subtype == "password_complexity_set": + admin = event_info.get("admin_package", "") + event_data_str = f"Password complexity set by {admin}" + elif event_subtype == "password_complexity_required": + admin = event_info.get("admin_package", "") + complexity = event_info.get("complexity", "") + event_data_str = ( + f"Password complexity required by {admin}: {complexity}" + ) + elif event_subtype == "password_history_length_set": + admin = event_info.get("admin_package", "") + length = event_info.get("length", "") + event_data_str = f"Password history length set by {admin}: {length}" + elif event_subtype == "password_changed": + complexity = event_info.get("complexity", "") + user_id = event_info.get("user_id", "") + event_data_str = ( + f"Password changed (complexity: {complexity}, user: {user_id})" + ) + elif event_subtype == "max_screen_lock_timeout_set": + admin = event_info.get("admin_package", "") + timeout = event_info.get("timeout_ms", "") + event_data_str = ( + f"Max screen lock timeout set by {admin}: {timeout}ms" + ) + elif event_subtype == "max_password_attempts_set": + admin = event_info.get("admin_package", "") + attempts = event_info.get("max_attempts", "") + event_data_str = f"Max password attempts set by {admin}: {attempts}" + + # Remote lock and wipe events + elif event_subtype == "remote_lock": + admin = event_info.get("admin_package", "") + event_data_str = f"Device remotely locked by {admin}" + elif event_subtype == "wipe_failure": + event_data_str = "Device wipe failed" + + # User restriction events + elif event_subtype == "user_restriction_added": + admin = event_info.get("admin_package", "") + restriction = event_info.get("restriction", "") + event_data_str = f"User restriction added by {admin}: {restriction}" + elif event_subtype == "user_restriction_removed": + admin = event_info.get("admin_package", "") + restriction = event_info.get("restriction", "") + event_data_str = ( + f"User restriction removed by {admin}: {restriction}" + ) + + # WiFi events + elif event_subtype == "wifi_connection": + bssid = event_info.get("bssid", "") + event_type = event_info.get("event_type", "") + reason = event_info.get("reason", "") + event_data_str = f"WiFi connection: {event_type} (BSSID: {bssid})" + if reason: + event_data_str += f" - {reason}" + elif event_subtype == "wifi_disconnection": + bssid = event_info.get("bssid", "") + reason = event_info.get("reason", "") + event_data_str = f"WiFi disconnection (BSSID: {bssid})" + if reason: + event_data_str += f" - {reason}" + + # Bluetooth events + elif event_subtype == "bluetooth_connection": + mac = event_info.get("mac_address", "") + success = event_info.get("success", False) + reason = event_info.get("reason", "") + event_data_str = f"Bluetooth {'connected' if success else 'connection failed'}: {mac}" + if reason: + event_data_str += f" - {reason}" + elif event_subtype == "bluetooth_disconnection": + mac = event_info.get("mac_address", "") + reason = event_info.get("reason", "") + event_data_str = f"Bluetooth disconnected: {mac}" + if reason: + event_data_str += f" - {reason}" + + # Camera policy event + elif event_subtype == "camera_policy_set": + admin = event_info.get("admin_package", "") + disabled = event_info.get("disabled", False) + event_data_str = ( + f"Camera {'disabled' if disabled else 'enabled'} by {admin}" + ) + + # Backup service event + elif event_subtype == "backup_service_toggled": + admin = event_info.get("admin_package", "") + enabled = event_info.get("enabled", False) + event_data_str = f"Backup service {'enabled' if enabled else 'disabled'} by {admin}" + + # NFC events + elif event_subtype == "nfc_enabled": + event_data_str = "NFC enabled" + elif event_subtype == "nfc_disabled": + event_data_str = "NFC disabled" + + else: + event_data_str = ( + f"{SECURITY_EVENT_TAGS.get(event_subtype, {}).get('name', event_subtype)}: " + f"{event_info}" + ) + else: + event_data_str = f"{event_subtype}: {event_info}" + + if not event_subtype: + event_subtype = "unknown" + event_data_str = str(record) + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": event_subtype, + "data": event_data_str, + } + + def process_event(self, event_data: dict) -> None: + """Process a security event and add it to results.""" + # Convert event_time to ISO format + # Security events use nanoseconds since epoch + event_time = event_data.get("event_time") + if event_time: + # Convert nanoseconds to seconds + event_data["timestamp"] = self._localize_timestamp( + event_time / 1_000_000_000.0 + ) + else: + event_data["timestamp"] = None + + # Track event type statistics, including future tags unknown to MVT. + event_tag = self._get_event_tag(event_data) + if event_tag: + self.event_type_counts[event_tag] = ( + self.event_type_counts.get(event_tag, 0) + 1 + ) + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze security events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("security_event") + + self.log.info("Identified %d security events", len(self.results)) + + # Log event type breakdown + if self.event_type_counts: + self.log.info("Security event breakdown:") + for event_type, count in sorted( + self.event_type_counts.items(), key=lambda x: x[1], reverse=True + ): + event_name = SECURITY_EVENT_TAGS.get(event_type, {}).get( + "name", event_type + ) + self.log.info(" - %s: %d", event_name, count) + + unknown_event_types = sorted( + event_type + for event_type in self.event_type_counts + if event_type not in SECURITY_EVENT_TAGS + ) + if unknown_event_types: + self.log.warning( + "Found unknown intrusion logging security event type(s): %s. " + "Please open an issue on GitHub so MVT can add support for them.", + ", ".join(unknown_event_types), + ) diff --git a/src/mvt/common/alerts.py b/src/mvt/common/alerts.py index 64f1334..540325e 100644 --- a/src/mvt/common/alerts.py +++ b/src/mvt/common/alerts.py @@ -219,7 +219,7 @@ class AlertStore: return alerts def save_timeline(self, timeline_path: str) -> None: - with open(timeline_path, "a+", encoding="utf-8") as handle: + with open(timeline_path, "w", encoding="utf-8") as handle: csvoutput = csv.writer( handle, delimiter=",", diff --git a/src/mvt/common/help.py b/src/mvt/common/help.py index fcee8a2..535a059 100644 --- a/src/mvt/common/help.py +++ b/src/mvt/common/help.py @@ -47,3 +47,4 @@ HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION = ( HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report" HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup" HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF" +HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files" diff --git a/src/mvt/common/module.py b/src/mvt/common/module.py index 2d8a395..62053fe 100644 --- a/src/mvt/common/module.py +++ b/src/mvt/common/module.py @@ -260,7 +260,7 @@ def save_timeline(timeline: list, timeline_path: str, is_utc: bool = True) -> No :param timeline_path: Path to the csv file to store the timeline to """ - with open(timeline_path, "a+", encoding="utf-8") as handle: + with open(timeline_path, "w", encoding="utf-8") as handle: csvoutput = csv.writer( handle, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, escapechar="\\" ) diff --git a/src/mvt/common/version.py b/src/mvt/common/version.py index a96d1e2..d3fc913 100644 --- a/src/mvt/common/version.py +++ b/src/mvt/common/version.py @@ -3,4 +3,4 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -MVT_VERSION = "2026.4.28" +MVT_VERSION = "2026.5.12" diff --git a/src/mvt/ios/data/ios_versions.json b/src/mvt/ios/data/ios_versions.json index 1a15aed..ffe09ac 100644 --- a/src/mvt/ios/data/ios_versions.json +++ b/src/mvt/ios/data/ios_versions.json @@ -911,6 +911,10 @@ "version": "15.8.7", "build": "19H411" }, + { + "version": "15.8.8", + "build": "19H422" + }, { "build": "20A362", "version": "16.0" @@ -1028,6 +1032,10 @@ "version": "16.7.15", "build": "20H380" }, + { + "version": "16.7.16", + "build": "20H392" + }, { "version": "17.0", "build": "21A327" @@ -1204,6 +1212,10 @@ "version": "18.7.8", "build": "22H352" }, + { + "version": "18.7.9", + "build": "22H355" + }, { "version": "26", "build": "23A341" @@ -1239,5 +1251,9 @@ { "version": "26.4.2", "build": "23E261" + }, + { + "version": "26.5", + "build": "23F77" } ] \ No newline at end of file diff --git a/src/mvt/ios/decrypt.py b/src/mvt/ios/decrypt.py index 620ee7e..ceac3e2 100644 --- a/src/mvt/ios/decrypt.py +++ b/src/mvt/ios/decrypt.py @@ -11,6 +11,7 @@ import os import os.path import shutil import sqlite3 +from pathlib import Path from typing import Optional from iOSbackup import iOSbackup @@ -96,6 +97,9 @@ class DecryptBackup: # This may be a partial backup. Skip files from the manifest # which do not exist locally. source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id) + if not Path(source_file_path).resolve().is_relative_to(Path(self.backup_path).resolve()): + log.warning("Skipping unsafe file_id: %r", file_id) + continue if not os.path.exists(source_file_path): log.debug( "Skipping file %s. File not found in encrypted backup directory.", @@ -104,6 +108,9 @@ class DecryptBackup: continue item_folder = os.path.join(self.dest_path, file_id[0:2]) # type: ignore[arg-type] + if not Path(os.path.join(item_folder, file_id)).resolve().is_relative_to(Path(self.dest_path).resolve()): + log.warning("Skipping unsafe file_id: %r", file_id) + continue if not os.path.exists(item_folder): os.makedirs(item_folder) diff --git a/src/mvt/ios/modules/base.py b/src/mvt/ios/modules/base.py index 5ac9071..121bad5 100644 --- a/src/mvt/ios/modules/base.py +++ b/src/mvt/ios/modules/base.py @@ -9,6 +9,7 @@ import os import shutil import sqlite3 import subprocess +from pathlib import Path from typing import Iterator, Optional, Union from mvt.common.module import ( @@ -165,6 +166,8 @@ class IOSExtraction(MVTModule): if not self.target_path: return None file_path = os.path.join(self.target_path, file_id[0:2], file_id) + if not Path(file_path).resolve().is_relative_to(Path(self.target_path).resolve()): + return None if os.path.exists(file_path): return file_path diff --git a/tests/android/test_artifact_dumpsys_accessibility.py b/tests/android/test_artifact_dumpsys_accessibility.py index 0c9d8d1..1f0b234 100644 --- a/tests/android/test_artifact_dumpsys_accessibility.py +++ b/tests/android/test_artifact_dumpsys_accessibility.py @@ -5,6 +5,7 @@ import logging from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact +from mvt.common.alerts import AlertLevel from mvt.common.indicators import Indicators from ..utils import get_artifact @@ -38,6 +39,19 @@ class TestDumpsysAccessibilityArtifact: assert da.results[0]["package_name"] == "com.malware.accessibility" assert da.results[0]["service"] == "com.malware.service.malwareservice" + def test_accessibility_service_alert(self): + da = DumpsysAccessibilityArtifact() + file = get_artifact("android_data/dumpsys_accessibility_v14_or_later.txt") + with open(file) as f: + data = f.read() + da.parse(data) + + da.check_indicators() + + assert len(da.alertstore.alerts) == 1 + assert da.alertstore.alerts[0].level == AlertLevel.MEDIUM + assert da.alertstore.alerts[0].event == da.results[0] + def test_ioc_check(self, indicator_file): da = DumpsysAccessibilityArtifact() file = get_artifact("android_data/dumpsys_accessibility.txt") @@ -51,4 +65,12 @@ class TestDumpsysAccessibilityArtifact: da.indicators = ind assert len(da.alertstore.alerts) == 0 da.check_indicators() - assert len(da.alertstore.alerts) == 1 + assert len(da.alertstore.alerts) == len(da.results) + assert da.alertstore.count(AlertLevel.MEDIUM) == 3 + assert da.alertstore.count(AlertLevel.CRITICAL) == 1 + critical_alert = next( + alert + for alert in da.alertstore.alerts + if alert.level == AlertLevel.CRITICAL + ) + assert critical_alert.event["package_name"] == "com.sec.android.app.camera" diff --git a/tests/android/test_intrusion_logs.py b/tests/android/test_intrusion_logs.py new file mode 100644 index 0000000..6e8f253 --- /dev/null +++ b/tests/android/test_intrusion_logs.py @@ -0,0 +1,154 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import json +import logging + +from click.testing import CliRunner + +from mvt.android.cli import check_intrusion_logs +from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs +from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule +from mvt.android.modules.intrusion_logs.security_event import SecurityEvent + + +def _write_ndjson(path, records): + path.write_text( + "\n".join(json.dumps(record) for record in records), + encoding="utf-8", + ) + + +def test_load_all_events_preserves_unknown_top_level_event(tmp_path): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "future_event": { + "event_time": 1_700_000_000_000, + "field": "value", + } + } + ], + ) + + module = IntrusionLogsModule(target_path=str(tmp_path)) + events = module.load_all_events(str(tmp_path)) + + assert events == { + "future_event": [ + { + "event_time": 1_700_000_000_000, + "field": "value", + } + ] + } + + +def test_check_intrusion_logs_warns_about_unknown_top_level_event_type( + tmp_path, caplog +): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "future_event": { + "event_time": 1_700_000_000_000, + "field": "value", + } + } + ], + ) + + with caplog.at_level(logging.WARNING): + cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path)) + cmd.run() + + assert "Found unknown intrusion logging event type(s): future_event" in caplog.text + assert "Please open an issue on GitHub" in caplog.text + + +def test_check_intrusion_logs_parses_core_and_unknown_security_events( + tmp_path, caplog +): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "dns_event": { + "event_time": 1_700_000_000_000, + "hostname": "example.com", + "package_name": "com.example.app", + "ip_addresses": ["/1.2.3.4"], + } + }, + { + "connect_event": { + "event_time": 1_700_000_001_000, + "ip_address": "/5.6.7.8", + "port": 443, + "package_name": "com.example.app", + } + }, + { + "security_event": { + "event_time": 1_700_000_002_000_000_000, + "app_process_start": { + "process": "com.example.app", + "uid": 10_000, + "pid": 1234, + }, + } + }, + { + "security_event": { + "event_time": 1_700_000_003_000_000_000, + "future_google_event": { + "field": "value", + }, + } + }, + ], + ) + + with caplog.at_level(logging.WARNING): + cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path)) + cmd.run() + + assert [module.__class__.__name__ for module in cmd.executed] == [ + "DnsEvent", + "ConnectEvent", + "SecurityEvent", + ] + assert [len(module.results) for module in cmd.executed] == [1, 1, 2] + + security_module = next( + module for module in cmd.executed if isinstance(module, SecurityEvent) + ) + assert security_module.event_type_counts["app_process_start"] == 1 + assert security_module.event_type_counts["future_google_event"] == 1 + + future_timeline_events = [ + event for event in cmd.timeline if event["event"] == "future_google_event" + ] + assert len(future_timeline_events) == 1 + assert "future_google_event" in future_timeline_events[0]["data"] + assert "field" in future_timeline_events[0]["data"] + assert ( + "Found unknown intrusion logging security event type(s): future_google_event" + in caplog.text + ) + assert "Please open an issue on GitHub" in caplog.text + + +def test_check_intrusion_logs_cli_lists_modules(tmp_path): + _write_ndjson(tmp_path / "intrusion.txt", []) + + result = CliRunner().invoke(check_intrusion_logs, ["--list-modules", str(tmp_path)]) + + assert result.exit_code == 0 + assert "DnsEvent" in result.output + assert "ConnectEvent" in result.output + assert "SecurityEvent" in result.output diff --git a/uv.lock b/uv.lock index fec3b30..f8ae019 100644 --- a/uv.lock +++ b/uv.lock @@ -950,7 +950,7 @@ requires-dist = [ { name = "rich", specifier = "==14.3.3" }, { name = "simplejson", specifier = "==3.20.2" }, { name = "tld", specifier = "==0.13.1" }, - { name = "tzdata", specifier = "==2026.1" }, + { name = "tzdata", specifier = "==2026.2" }, ] [package.metadata.requires-dev] @@ -1794,20 +1794,20 @@ wheels = [ [[package]] name = "tzdata" -version = "2026.1" +version = "2026.2" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/19/f5/cd531b2d15a671a40c0f66cf06bc3570a12cd56eef98960068ebbad1bf5a/tzdata-2026.1.tar.gz", hash = "sha256:67658a1903c75917309e753fdc349ac0efd8c27db7a0cb406a25be4840f87f98", size = 197639, upload-time = "2026-04-03T11:25:22.002Z" } +sdist = { url = "https://files.pythonhosted.org/packages/ba/19/1b9b0e29f30c6d35cb345486df41110984ea67ae69dddbc0e8a100999493/tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10", size = 198254, upload-time = "2026-04-24T15:22:08.651Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/b0/70/d460bd685a170790ec89317e9bd33047988e4bce507b831f5db771e142de/tzdata-2026.1-py2.py3-none-any.whl", hash = "sha256:4b1d2be7ac37ceafd7327b961aa3a54e467efbdb563a23655fbfe0d39cfc42a9", size = 348952, upload-time = "2026-04-03T11:25:20.313Z" }, + { url = "https://files.pythonhosted.org/packages/ce/e4/dccd7f47c4b64213ac01ef921a1337ee6e30e8c6466046018326977efd95/tzdata-2026.2-py2.py3-none-any.whl", hash = "sha256:bbe9af844f658da81a5f95019480da3a89415801f6cc966806612cc7169bffe7", size = 349321, upload-time = "2026-04-24T15:22:05.876Z" }, ] [[package]] name = "urllib3" -version = "2.6.3" +version = "2.7.0" source = { registry = "https://pypi.org/simple" } -sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" } +sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" }, + { url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" }, ] [[package]]