From b8ea29cde56244f0fa62db5343cf7d8d2e1f443a Mon Sep 17 00:00:00 2001 From: besendorf Date: Tue, 12 May 2026 17:24:29 +0200 Subject: [PATCH] Add Android intrusion log checks (#788) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add Android intrusion log checks * Warn on unknown intrusion log event types * Rename intrusion logs folder from intrusion-logs to instrusion_logs to match AndroidQF output --------- Co-authored-by: tes Co-authored-by: Donncha Ó Cearbhaill --- src/mvt/android/cli.py | 76 +- src/mvt/android/cmd_check_androidqf.py | 131 ++- src/mvt/android/cmd_check_intrusion_logs.py | 113 +++ .../modules/intrusion_logs/__init__.py | 20 + .../android/modules/intrusion_logs/base.py | 395 +++++++++ .../modules/intrusion_logs/connect_event.py | 121 +++ .../modules/intrusion_logs/dns_event.py | 141 ++++ .../modules/intrusion_logs/security_event.py | 758 ++++++++++++++++++ src/mvt/common/help.py | 1 + tests/android/test_intrusion_logs.py | 154 ++++ 10 files changed, 1908 insertions(+), 2 deletions(-) create mode 100644 src/mvt/android/cmd_check_intrusion_logs.py create mode 100644 src/mvt/android/modules/intrusion_logs/__init__.py create mode 100644 src/mvt/android/modules/intrusion_logs/base.py create mode 100644 src/mvt/android/modules/intrusion_logs/connect_event.py create mode 100644 src/mvt/android/modules/intrusion_logs/dns_event.py create mode 100644 src/mvt/android/modules/intrusion_logs/security_event.py create mode 100644 tests/android/test_intrusion_logs.py diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index 96c1c7d..690e410 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -16,6 +16,7 @@ from mvt.common.help import ( HELP_MSG_CHECK_ANDROIDQF, HELP_MSG_CHECK_BUGREPORT, HELP_MSG_CHECK_IOCS, + HELP_MSG_CHECK_INTRUSION_LOGS, HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK, HELP_MSG_DISABLE_UPDATE_CHECK, HELP_MSG_HASHES, @@ -35,6 +36,8 @@ from mvt.common.utils import init_logging, set_verbose_logging from .cmd_check_androidqf import CmdAndroidCheckAndroidQF from .cmd_check_backup import CmdAndroidCheckBackup from .cmd_check_bugreport import CmdAndroidCheckBugreport +from .cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs +from .modules.intrusion_logs import INTRUSION_LOGS_MODULES from .modules.androidqf import ANDROIDQF_MODULES from .modules.backup import BACKUP_MODULES from .modules.backup.helpers import cli_load_android_backup_password @@ -266,6 +269,75 @@ def check_androidqf( cmd.show_support_message() +# ============================================================================== +# Command: check-intrusion-logs +# ============================================================================== +@cli.command( + "check-intrusion-logs", + context_settings=CONTEXT_SETTINGS, + help=HELP_MSG_CHECK_INTRUSION_LOGS, +) +@click.option( + "--iocs", + "-i", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_IOC, +) +@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) +@click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--timezone", + "-t", + default=None, + help=( + "IANA timezone name for the device, for example 'Europe/Paris'. " + "When provided, event timestamps are expressed in the device's local " + "time instead of UTC." + ), +) +@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) +@click.argument("LOGS_PATH", type=click.Path(exists=True)) +@click.pass_context +def check_intrusion_logs( + ctx, + iocs, + output, + list_modules, + module, + timezone, + verbose, + logs_path, +): + set_verbose_logging(verbose) + + module_options = {} + if timezone: + module_options["device_timezone"] = timezone + + cmd = CmdAndroidCheckIntrusionLogs( + target_path=logs_path, + results_path=output, + ioc_files=iocs, + module_name=module, + module_options=module_options if module_options else None, + disable_version_check=_get_disable_flags(ctx)[0], + disable_indicator_check=_get_disable_flags(ctx)[1], + ) + + if list_modules: + cmd.list_modules() + return + + log.info("Checking intrusion logs at path: %s", logs_path) + + cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() + + # ============================================================================== # Command: check-iocs # ============================================================================== @@ -284,7 +356,9 @@ def check_androidqf( @click.pass_context def check_iocs(ctx, iocs, list_modules, module, folder): cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module) - cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + cmd.modules = ( + BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES + ) if list_modules: cmd.list_modules() diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index ff96a24..54dbc3d 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -5,10 +5,14 @@ import logging import os +import shutil +import tempfile import zipfile from pathlib import Path from typing import List, Optional +from mvt.android.artifacts.getprop import GetProp +from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs from mvt.android.cmd_check_backup import CmdAndroidCheckBackup from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport from mvt.common.command import Command @@ -139,6 +143,55 @@ class CmdAndroidCheckAndroidQF(Command): raise NoAndroidQFBackup + def _read_device_timezone(self) -> Optional[str]: + getprop_files = [ + f for f in self.__files if f.replace("\\", "/").endswith("getprop.txt") + ] + if not getprop_files: + self.log.warning( + "Could not find getprop.txt; intrusion log timestamps will use UTC." + ) + return None + + try: + content = self._get_file_content(getprop_files[0]).decode( + "utf-8", errors="ignore" + ) + except Exception as exc: + self.log.warning("Could not read getprop.txt: %s", exc) + return None + + props = GetProp() + props.parse(content) + timezone = props.get_device_timezone() + if timezone: + self.log.info( + "Device timezone identified from getprop.txt: %s", + timezone, + ) + else: + self.log.warning( + "persist.sys.timezone not found in getprop.txt; " + "intrusion log timestamps will use UTC." + ) + + return timezone + + def _get_file_content(self, file_path: str) -> bytes: + if self.__format == "zip" and self.__zip: + handle = self.__zip.open(file_path) + try: + return handle.read() + finally: + handle.close() + + if self.__format == "dir" and self.target_path: + parent_path = Path(self.target_path).absolute().parent.as_posix() + with open(os.path.join(parent_path, file_path), "rb") as handle: + return handle.read() + + raise FileNotFoundError(file_path) + def run_bugreport_cmd(self) -> bool: bugreport = None try: @@ -194,9 +247,85 @@ class CmdAndroidCheckAndroidQF(Command): self.alertstore.extend(cmd.alertstore.alerts) return True + def run_intrusion_logs_cmd(self) -> bool: + intrusion_log_files = [ + f + for f in self.__files + if "/intrusion_logs/" in f.replace("\\", "/") + or f.replace("\\", "/").startswith("intrusion_logs/") + ] + + if not intrusion_log_files: + self.log.info( + "No intrusion_logs folder found in AndroidQF data, " + "skipping intrusion logs analysis." + ) + return False + + self.log.info( + "Found intrusion_logs folder in AndroidQF data, running intrusion logs analysis." + ) + + intrusion_logs_path = None + temp_dir = None + + try: + if self.__format == "dir" and self.target_path: + intrusion_logs_path = os.path.join( + os.path.abspath(self.target_path), "intrusion_logs" + ) + if not os.path.isdir(intrusion_logs_path): + self.log.warning( + "intrusion_logs directory not found at %s", + intrusion_logs_path, + ) + return False + + elif self.__format == "zip" and self.__zip: + temp_dir = tempfile.mkdtemp(prefix="mvt_intrusion_logs_") + for entry in intrusion_log_files: + normalized = entry.replace("\\", "/") + idx = normalized.find("intrusion_logs/") + relative = normalized[idx + len("intrusion_logs/") :] + if not relative or relative.endswith("/"): + continue + + target = os.path.join(temp_dir, relative) + os.makedirs(os.path.dirname(target), exist_ok=True) + with self.__zip.open(entry) as src, open(target, "wb") as dst: + dst.write(src.read()) + + intrusion_logs_path = temp_dir + else: + return False + + adv_module_options = dict(self.module_options or {}) + if device_timezone := self._read_device_timezone(): + adv_module_options["device_timezone"] = device_timezone + + cmd = CmdAndroidCheckIntrusionLogs( + target_path=intrusion_logs_path, + results_path=self.results_path, + ioc_files=self.ioc_files, + iocs=self.iocs, + module_options=adv_module_options, + hashes=self.hashes, + sub_command=True, + ) + cmd.run() + + self.timeline.extend(cmd.timeline) + self.alertstore.extend(cmd.alertstore.alerts) + return True + + finally: + if temp_dir: + shutil.rmtree(temp_dir, ignore_errors=True) + def finish(self) -> None: """ - Run the bugreport and backup modules if the respective files are found in the AndroidQF data. + Run nested modules if their respective files are found in AndroidQF data. """ self.run_bugreport_cmd() self.run_backup_cmd() + self.run_intrusion_logs_cmd() diff --git a/src/mvt/android/cmd_check_intrusion_logs.py b/src/mvt/android/cmd_check_intrusion_logs.py new file mode 100644 index 0000000..8541f9a --- /dev/null +++ b/src/mvt/android/cmd_check_intrusion_logs.py @@ -0,0 +1,113 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +import os +from typing import Optional + +from mvt.common.command import Command +from mvt.common.indicators import Indicators + +from .modules.intrusion_logs import ( + INTRUSION_LOGS_MODULES, + KNOWN_INTRUSION_LOG_EVENT_TYPES, +) +from .modules.intrusion_logs.base import IntrusionLogsModule + +log = logging.getLogger(__name__) + + +class CmdAndroidCheckIntrusionLogs(Command): + """Command to check Android Intrusion Logging files.""" + + def __init__( + self, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + ioc_files: Optional[list] = None, + iocs: Optional[Indicators] = None, + module_name: Optional[str] = None, + serial: Optional[str] = None, + module_options: Optional[dict] = None, + hashes: Optional[bool] = False, + sub_command: Optional[bool] = False, + disable_version_check: bool = False, + disable_indicator_check: bool = False, + ) -> None: + super().__init__( + target_path=target_path, + results_path=results_path, + ioc_files=ioc_files, + iocs=iocs, + module_name=module_name, + serial=serial, + module_options=module_options, + hashes=hashes, + sub_command=sub_command, + log=log, + disable_version_check=disable_version_check, + disable_indicator_check=disable_indicator_check, + ) + + self.name = "check-intrusion-logs" + self.modules = INTRUSION_LOGS_MODULES + self._all_events: dict[str, list[dict]] = {} + + def init(self) -> None: + if not self.target_path: + raise ValueError("No target path specified") + + if not os.path.isdir(self.target_path) and not ( + os.path.isfile(self.target_path) + and self.target_path.lower().endswith(".zip") + ): + raise ValueError( + f"Target path must be a directory or a .zip file: {self.target_path}" + ) + + self.log.info("Checking intrusion logs at path: %s", self.target_path) + self._all_events = self._pre_load_events() + + def module_init(self, module: IntrusionLogsModule) -> None: # type: ignore[override] + module.il_events_by_type = self._all_events + + def finish(self) -> None: + return + + def _pre_load_events(self) -> dict[str, list[dict]]: + """Load and parse all advanced-log files once for reuse by all modules.""" + self.log.info("Pre-loading intrusion log files from: %s", self.target_path) + + loader = IntrusionLogsModule( + target_path=self.target_path, + log=self.log, + ) + + try: + all_events = loader.load_all_events(self.target_path) + except Exception as exc: + self.log.error("Failed to pre-load events: %s", exc) + return {} + + total_events = sum(len(events) for events in all_events.values()) + self.log.info( + "Pre-loaded %d events across %d type(s); modules will reuse this data", + total_events, + len(all_events), + ) + + unknown_event_types = sorted( + event_type + for event_type in all_events + if event_type not in KNOWN_INTRUSION_LOG_EVENT_TYPES + ) + if unknown_event_types: + self.log.warning( + "Found unknown intrusion logging event type(s): %s. " + "Please open an issue on GitHub so MVT can add support for them.", + ", ".join(unknown_event_types), + ) + + return all_events diff --git a/src/mvt/android/modules/intrusion_logs/__init__.py b/src/mvt/android/modules/intrusion_logs/__init__.py new file mode 100644 index 0000000..f8be973 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/__init__.py @@ -0,0 +1,20 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from .connect_event import ConnectEvent +from .dns_event import DnsEvent +from .security_event import SecurityEvent + +INTRUSION_LOGS_MODULES = [ + DnsEvent, + ConnectEvent, + SecurityEvent, +] + +KNOWN_INTRUSION_LOG_EVENT_TYPES = { + "connect_event", + "dns_event", + "security_event", +} diff --git a/src/mvt/android/modules/intrusion_logs/base.py b/src/mvt/android/modules/intrusion_logs/base.py new file mode 100644 index 0000000..4aac618 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/base.py @@ -0,0 +1,395 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import datetime +import io +import json +import logging +import zipfile +from pathlib import Path +from typing import Optional, Union + +try: + import zoneinfo +except ImportError: + from backports import zoneinfo # type: ignore[no-redef] + +from mvt.common.module import MVTModule +from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso + + +class IntrusionLogsModule(MVTModule): + """Base class for modules analyzing intrusion logs (newline-delimited JSON). + + Performance note + ---------------- + Log files can be large and are shared by every module in this package. + To avoid re-reading and re-parsing the same files N times (once per + module), the command layer should call :meth:`load_all_events` exactly + once and then assign the returned dict to the ``il_events_by_type`` + attribute of every module instance **before** calling ``run_module``. + + When ``il_events_by_type`` is populated: + * :meth:`collect_txt` becomes a no-op (no disk I/O). + * :meth:`parse_collected_txt` iterates the in-memory list for the + requested event type instead of re-parsing raw text. + + Modules that are used standalone (e.g. in tests) still work as before + because ``il_events_by_type`` defaults to ``None``, which preserves the + original file-loading code path. + """ + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + # Raw file content collected by collect_txt (fallback path only). + self.il_files: list[tuple[str, str]] = [] + + # Pre-parsed events injected by the command layer. + # Keys are event-type strings (e.g. "dns_event"), values are lists of + # raw event-data dicts exactly as they appear in the JSON lines. + # When this is not None, collect_txt and parse_collected_txt use it + # instead of touching the file system. + self.il_events_by_type: Optional[dict[str, list[dict]]] = None + + # ------------------------------------------------------------------ + # Serialization helper + # ------------------------------------------------------------------ + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a record for timeline output.""" + return { + "timestamp": record.get("timestamp", record.get("isodate")), + "module": self.__class__.__name__, + "event": record.get("event_type", ""), + "data": str(record), + } + + # ------------------------------------------------------------------ + # File collection + # ------------------------------------------------------------------ + + def collect_txt(self, source) -> None: + """Collect text log files from *source* into ``self.il_files``. + + Entry points: + * directory → walk recursively + * zip file → walk zip entries + * anything else → silently skip + + If ``self.il_events_by_type`` has already been populated (i.e. the + command layer pre-loaded the events), this method returns immediately + without any disk I/O. + """ + if self.il_events_by_type is not None: + self.log.debug( + "Pre-loaded events available — skipping file collection for %s", + self.__class__.__name__, + ) + return + + path = Path(source) + + if path.is_dir(): + self._walk_directory(path) + return + + if path.is_file() and path.suffix.lower() == ".zip": + try: + with zipfile.ZipFile(path) as z: + self._walk_zip(z) + except zipfile.BadZipFile: + self.log.debug("Skipping invalid zip: %s", path) + return + + self.log.debug("Skipping unsupported source: %s", source) + + def _walk_directory(self, root: Path, prefix: str = "") -> None: + for item in root.iterdir(): + if item.is_dir(): + self._walk_directory(item, prefix=f"{prefix}{item.name}/") + continue + + if item.suffix.lower() == ".txt": + self.il_files.append( + (f"{prefix}{item.name}", item.read_text(errors="ignore")) + ) + + elif item.suffix.lower() == ".zip": + try: + with zipfile.ZipFile(item) as z: + self._walk_zip(z, prefix=f"{prefix}{item.name}::") + except zipfile.BadZipFile: + self.log.warning("Skipping invalid zip: %s", item) + + def _walk_zip(self, zf: zipfile.ZipFile, prefix: str = "") -> None: + for info in zf.infolist(): + if info.is_dir(): + continue + + name = info.filename + with zf.open(info) as f: + data = f.read() + + if name.lower().endswith(".txt"): + self.il_files.append((f"{prefix}{name}", data.decode(errors="ignore"))) + + elif name.lower().endswith(".zip"): + with zipfile.ZipFile(io.BytesIO(data)) as inner: + self._walk_zip(inner, prefix=f"{prefix}{name}::") + + # ------------------------------------------------------------------ + # Single-pass loader (used by the command layer) + # ------------------------------------------------------------------ + + def load_all_events(self, source) -> dict[str, list[dict]]: + """Read every log file under *source* **once** and parse all JSON + lines in a single pass, routing events into per-type buckets. + + Returns a ``dict`` mapping *event_type* strings to lists of raw + event-data dicts. The result is also stored in + ``self.il_events_by_type`` so that subsequent calls to + :meth:`collect_txt` and :meth:`parse_collected_txt` on *this* + instance are no-ops. + + Intended usage in the command layer:: + + loader = IntrusionLogsModule(target_path=target, log=log) + all_events = loader.load_all_events(target) + + for module_cls in INTRUSION_LOGS_MODULES: + m = module_cls(target_path=target, ...) + m.il_events_by_type = all_events # inject — no re-reading + run_module(m) + """ + # Reset so that _collect_txt actually runs (il_events_by_type is None). + self.il_events_by_type = None + self.il_files = [] + self.collect_txt(source) + + events_by_type: dict[str, list[dict]] = {} + # JSON fingerprints used to drop events that appear in more than one + # log file (overlapping daily files are the most common source of + # cross-file duplicates). + seen_fingerprints: set[str] = set() + total_lines = 0 + skipped_lines = 0 + duplicate_lines = 0 + + for file_name, text in self.il_files: + for line_num, line in enumerate(text.splitlines(), start=1): + line = line.strip() + if not line: + continue + + total_lines += 1 + try: + entry = json.loads(line) + for event_type, event_data in entry.items(): + if isinstance(event_data, dict): + fingerprint = json.dumps(event_data, sort_keys=True) + if fingerprint in seen_fingerprints: + duplicate_lines += 1 + continue + seen_fingerprints.add(fingerprint) + events_by_type.setdefault(event_type, []).append(event_data) + except json.JSONDecodeError as e: + skipped_lines += 1 + self.log.warning( + "Failed to parse JSON on line %d in %s: %s", + line_num, + file_name, + e, + ) + except Exception as e: + skipped_lines += 1 + self.log.warning( + "Error processing line %d in %s: %s", + line_num, + file_name, + e, + ) + + if duplicate_lines: + self.log.info( + "Removed %d duplicate event(s) seen across multiple log files", + duplicate_lines, + ) + + self.log.info( + "Loaded %d log files, parsed %d lines (%d skipped), found event types: %s", + len(self.il_files), + total_lines, + skipped_lines, + {k: len(v) for k, v in events_by_type.items()}, + ) + + # Cache so this instance also benefits from the fast path. + self.il_events_by_type = events_by_type + return events_by_type + + # ------------------------------------------------------------------ + # Parsing + # ------------------------------------------------------------------ + + def parse_collected_txt(self, event_type: str) -> None: + """Parse collected log text and dispatch events of *event_type*. + + Fast path + ~~~~~~~~~ + When ``self.il_events_by_type`` is populated (injected by the command + layer after a single shared :meth:`load_all_events` call), the method + iterates the already-parsed in-memory list for *event_type* — no + re-reading, no re-parsing of JSON. + + Fallback path + ~~~~~~~~~~~~~ + When ``self.il_events_by_type`` is ``None``, the method falls back to + iterating ``self.il_files`` and parsing each JSON line, which is the + original behaviour. + """ + if self.il_events_by_type is not None: + events = self.il_events_by_type.get(event_type, []) + self.log.debug( + "Using pre-loaded events: dispatching %d '%s' events", + len(events), + event_type, + ) + for event_data in events: + try: + # Work on a shallow copy so that mutations in one module + # (e.g. adding "timestamp") do not affect other modules + # that share the same dict reference. + self.process_event(dict(event_data)) + except Exception as e: + self.log.warning( + "Error processing pre-parsed '%s' event: %s", + event_type, + e, + ) + return + + # Fallback: parse raw text collected by collect_txt(). + # Use the same JSON-fingerprint approach as MVTModule._deduplicate_timeline + # to drop events that appear verbatim in more than one log file. + seen_fingerprints: set[str] = set() + duplicate_count = 0 + for file_name, text in self.il_files: + for line_num, line in enumerate(text.splitlines(), start=1): + line = line.strip() + if not line: + continue + try: + entry = json.loads(line) + if event_type in entry: + event_data = entry[event_type] + fingerprint = json.dumps(event_data, sort_keys=True) + if fingerprint in seen_fingerprints: + duplicate_count += 1 + continue + seen_fingerprints.add(fingerprint) + event_data["event_type"] = event_type + self.process_event(event_data) + except json.JSONDecodeError as e: + self.log.warning( + "Failed to parse JSON on line %d in %s: %s", + line_num, + file_name, + str(e), + ) + except Exception as e: + self.log.warning( + "Error processing line %d in %s: %s", + line_num, + file_name, + str(e), + ) + if duplicate_count: + self.log.info( + "Removed %d duplicate '%s' event(s) seen across multiple log files", + duplicate_count, + event_type, + ) + + # ------------------------------------------------------------------ + # Event processing + # ------------------------------------------------------------------ + + def process_event(self, event_data: dict) -> None: + """Process an individual event. Override this in subclasses. + + Args: + event_data: Dictionary containing the event data. + """ + self.results.append(event_data) + + # ------------------------------------------------------------------ + # Timestamp localisation + # ------------------------------------------------------------------ + + def _localize_timestamp(self, event_time_seconds: float) -> str: + """Convert a Unix timestamp (in seconds) to an ISO string. + + When the device timezone is available via ``module_options["device_timezone"]`` + (a IANA timezone name such as ``"Europe/Paris"`` read from + ``persist.sys.timezone`` in ``getprop.txt``), the UTC instant is + converted to the device's local time before formatting — mirroring the + approach used by ``AQFFiles``. + + When no timezone is configured the method falls back to UTC, which is + consistent with all other MVT modules that call ``convert_unix_to_iso``. + + Args: + event_time_seconds: Unix epoch timestamp expressed in **seconds** + (callers are responsible for dividing ms/ns values first). + + Returns: + ISO-formatted datetime string (``YYYY-mm-dd HH:MM:SS.ffffff``). + The string always represents the device-local time (or UTC when no + timezone is known); no UTC offset suffix is appended, matching the + format produced by :func:`mvt.common.utils.convert_unix_to_iso`. + """ + tz_name: Optional[str] = self.module_options.get("device_timezone") + if tz_name: + try: + device_tz = zoneinfo.ZoneInfo(tz_name) + utc_dt = datetime.datetime.fromtimestamp( + event_time_seconds, tz=datetime.timezone.utc + ) + local_dt = utc_dt.astimezone(device_tz) + # Strip tzinfo so that convert_datetime_to_iso outputs the + # local wall-clock time without a timezone suffix. This is + # the same pattern used by AQFFiles. + return convert_datetime_to_iso(local_dt.replace(tzinfo=None)) + except Exception as e: + self.log.warning( + "Could not apply device timezone '%s', falling back to UTC: %s", + tz_name, + e, + ) + + return convert_unix_to_iso(event_time_seconds) + + # ------------------------------------------------------------------ + # Abstract interface + # ------------------------------------------------------------------ + + def run(self) -> None: + """Main execution method. Must be implemented by subclasses.""" + raise NotImplementedError("Subclasses must implement the run() method") diff --git a/src/mvt/android/modules/intrusion_logs/connect_event.py b/src/mvt/android/modules/intrusion_logs/connect_event.py new file mode 100644 index 0000000..054dcb1 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/connect_event.py @@ -0,0 +1,121 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + + +class ConnectEvent(IntrusionLogsModule): + """This module analyzes network connection events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + + def check_indicators(self) -> None: + """Check connection events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check IP address against indicators + ip_address = result.get("ip_address", "") + if ip_address: + # Clean IP address (remove leading slash and extract IP from format like "ip6-localhost/::1") + if "/" in ip_address: + parts = ip_address.split("/") + clean_ip = parts[-1] if len(parts) > 1 else parts[0] + else: + clean_ip = ip_address.lstrip("/") + + # Skip localhost addresses + if clean_ip and clean_ip not in ["::1", "127.0.0.1", "0.0.0.0"]: + ioc = self.indicators.check_domain(clean_ip) + if ioc: + result["matched_ip"] = clean_ip + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check package name against app identifiers + package_name = result.get("package_name", "") + if package_name: + ioc = self.indicators.check_app_id(package_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a connection event record for timeline output.""" + ip_address = record.get("ip_address", "") + port = record.get("port", 0) + package_name = record.get("package_name", "") + matched_ip = record.get("matched_ip", "") + + # Clean IP address for display + if "/" in ip_address: + parts = ip_address.split("/") + clean_ip = parts[-1] if len(parts) > 1 else parts[0] + else: + clean_ip = ip_address.lstrip("/") + + # Indicate when IP matched an IoC + if matched_ip: + data = f"Connection to {clean_ip}:{port} by {package_name} [Matched IP: {matched_ip}]" + else: + data = f"Connection to {clean_ip}:{port} by {package_name}" + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": "network_connection", + "data": data, + } + + def process_event(self, event_data: dict) -> None: + """Process a connection event and add it to results.""" + # Convert event_time from milliseconds to ISO format + event_time = event_data.get("event_time") + if event_time: + # Android event times are in milliseconds since epoch + event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0) + else: + event_data["timestamp"] = None + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze connection events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("connect_event") + + self.log.info("Identified %d connection events", len(self.results)) diff --git a/src/mvt/android/modules/intrusion_logs/dns_event.py b/src/mvt/android/modules/intrusion_logs/dns_event.py new file mode 100644 index 0000000..242ae7b --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/dns_event.py @@ -0,0 +1,141 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + + +class DnsEvent(IntrusionLogsModule): + """This module analyzes DNS events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + + def check_indicators(self) -> None: + """Check DNS events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check hostname against domain indicators + hostname = result.get("hostname", "") + if hostname: + ioc = self.indicators.check_domain(hostname) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check IP addresses against indicators + ip_addresses = result.get("ip_addresses", []) + matched_ips = [] + for ip_addr in ip_addresses: + # Remove leading slash if present + clean_ip = ( + ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr) + ) + if clean_ip and clean_ip != "0.0.0.0": + ioc = self.indicators.check_domain(clean_ip) + if ioc: + matched_ips.append(clean_ip) + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Store matched IPs for timeline display + if matched_ips: + result["matched_ips"] = matched_ips + + # Check package name against app identifiers + package_name = result.get("package_name", "") + if package_name: + ioc = self.indicators.check_app_id(package_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a DNS event record for timeline output.""" + hostname = record.get("hostname", "") + package_name = record.get("package_name", "") + + # Get IP addresses for display + ip_addresses = record.get("ip_addresses", []) + matched_ips = record.get("matched_ips", []) + + # Clean up IP addresses (remove leading slashes) + clean_ips = [] + for ip_addr in ip_addresses: + clean_ip = ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr) + if clean_ip and clean_ip != "0.0.0.0": + clean_ips.append(clean_ip) + + # Build the data string with actual IPs + if matched_ips: + # Highlight matched IPs in the output + ip_display = ", ".join(matched_ips) + data = f"DNS query for {hostname} by {package_name} [Matched IPs: {ip_display}]" + elif clean_ips: + ip_display = ", ".join(clean_ips) + data = f"DNS query for {hostname} by {package_name} [IPs: {ip_display}]" + else: + data = f"DNS query for {hostname} by {package_name}" + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": "dns_query", + "data": data, + } + + def process_event(self, event_data: dict) -> None: + """Process a DNS event and add it to results.""" + # Convert event_time from milliseconds to ISO format + event_time = event_data.get("event_time") + if event_time: + # Android event times are in milliseconds since epoch + event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0) + else: + event_data["timestamp"] = None + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze DNS events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("dns_event") + + self.log.info("Identified %d DNS events", len(self.results)) diff --git a/src/mvt/android/modules/intrusion_logs/security_event.py b/src/mvt/android/modules/intrusion_logs/security_event.py new file mode 100644 index 0000000..7190db4 --- /dev/null +++ b/src/mvt/android/modules/intrusion_logs/security_event.py @@ -0,0 +1,758 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from typing import Optional, Union + +from .base import IntrusionLogsModule + +# Security event tags based on Android SecurityLog API +# Reference: https://developer.android.com/reference/android/app/admin/SecurityLog +SECURITY_EVENT_TAGS = { + # ADB events (API level 24) + "adb_shell_interactive": { + "tag_id": 210001, + "name": "ADB Shell Interactive", + "description": "An ADB interactive shell was opened via 'adb shell'", + }, + "adb_shell_cmd": { + "tag_id": 210002, + "name": "ADB Shell Command", + "description": "A shell command was issued over ADB via 'adb shell '", + }, + "adb_sync_recv_file": { + "tag_id": 210003, + "name": "ADB Sync Recv File", + "description": "A file was pulled from the device via adb daemon (adb pull)", + }, + "adb_sync_send_file": { + "tag_id": 210004, + "name": "ADB Sync Send File", + "description": "A file was pushed to the device via adb daemon (adb push)", + }, + # App process events (API level 24) + "app_process_start": { + "tag_id": 210005, + "name": "App Process Start", + "description": "An app process was started", + }, + # Keyguard events (API level 24) + "keyguard_dismissed": { + "tag_id": 210006, + "name": "Keyguard Dismissed", + "description": "Keyguard has been dismissed", + }, + "keyguard_dismiss_auth_attempt": { + "tag_id": 210007, + "name": "Keyguard Dismiss Auth Attempt", + "description": "Authentication attempt to dismiss keyguard", + }, + "keyguard_secured": { + "tag_id": 210008, + "name": "Keyguard Secured", + "description": "Device has been locked", + }, + # OS events (API level 28) + "os_startup": { + "tag_id": 210009, + "name": "OS Startup", + "description": "Android OS has started", + }, + "os_shutdown": { + "tag_id": 210010, + "name": "OS Shutdown", + "description": "Android OS has shutdown", + }, + # Logging events (API level 28) + "logging_started": { + "tag_id": 210011, + "name": "Logging Started", + "description": "Audit logging has started", + }, + "logging_stopped": { + "tag_id": 210012, + "name": "Logging Stopped", + "description": "Audit logging has stopped", + }, + # Media events (API level 28) + "media_mount": { + "tag_id": 210013, + "name": "Media Mount", + "description": "Removable media has been mounted", + }, + "media_unmount": { + "tag_id": 210014, + "name": "Media Unmount", + "description": "Removable media was unmounted", + }, + # Log buffer event (API level 28) + "log_buffer_size_critical": { + "tag_id": 210015, + "name": "Log Buffer Size Critical", + "description": "Audit log buffer has reached 90% capacity", + }, + # Password policy events (API level 28) + "password_expiration_set": { + "tag_id": 210016, + "name": "Password Expiration Set", + "description": "Admin set password expiration timeout", + }, + "password_complexity_set": { + "tag_id": 210017, + "name": "Password Complexity Set", + "description": "Admin set password complexity requirement", + }, + "password_history_length_set": { + "tag_id": 210018, + "name": "Password History Length Set", + "description": "Admin set password history length", + }, + "max_screen_lock_timeout_set": { + "tag_id": 210019, + "name": "Max Screen Lock Timeout Set", + "description": "Admin set maximum screen lock timeout", + }, + "max_password_attempts_set": { + "tag_id": 210020, + "name": "Max Password Attempts Set", + "description": "Admin set maximum failed password attempts before wipe", + }, + "keyguard_disabled_features_set": { + "tag_id": 210021, + "name": "Keyguard Disabled Features Set", + "description": "Admin set disabled keyguard features", + }, + # Remote lock event (API level 28) + "remote_lock": { + "tag_id": 210022, + "name": "Remote Lock", + "description": "Admin remotely locked the device or profile", + }, + # Wipe failure event (API level 28) + "wipe_failure": { + "tag_id": 210023, + "name": "Wipe Failure", + "description": "Failed to wipe device or user data", + }, + # Cryptographic key events (API level 28) + "key_generated": { + "tag_id": 210024, + "name": "Key Generated", + "description": "Cryptographic key was generated", + }, + "key_import": { + "tag_id": 210025, + "name": "Key Import", + "description": "Cryptographic key was imported", + }, + "key_destruction": { + "tag_id": 210026, + "name": "Key Destruction", + "description": "Cryptographic key was destroyed", + }, + # User restriction events (API level 28) + "user_restriction_added": { + "tag_id": 210027, + "name": "User Restriction Added", + "description": "Admin added a user restriction", + }, + "user_restriction_removed": { + "tag_id": 210028, + "name": "User Restriction Removed", + "description": "Admin removed a user restriction", + }, + # Certificate events (API level 28) + "cert_authority_installed": { + "tag_id": 210029, + "name": "Certificate Authority Installed", + "description": "Root certificate installed to trusted storage", + }, + "cert_authority_removed": { + "tag_id": 210030, + "name": "Certificate Authority Removed", + "description": "Root certificate removed from trusted storage", + }, + "crypto_self_test_completed": { + "tag_id": 210031, + "name": "Crypto Self Test Completed", + "description": "Cryptographic functionality self test completed", + }, + "key_integrity_violation": { + "tag_id": 210032, + "name": "Key Integrity Violation", + "description": "Key integrity violation detected", + }, + "cert_validation_failure": { + "tag_id": 210033, + "name": "Certificate Validation Failure", + "description": "X.509v3 certificate validation failed", + }, + # Camera policy event (API level 30) + "camera_policy_set": { + "tag_id": 210034, + "name": "Camera Policy Set", + "description": "Admin set policy to disable camera", + }, + # Password complexity events (API level 31/33) + "password_complexity_required": { + "tag_id": 210035, + "name": "Password Complexity Required", + "description": "Admin set password complexity requirement using predefined levels", + }, + "password_changed": { + "tag_id": 210036, + "name": "Password Changed", + "description": "User changed their lockscreen password", + }, + # WiFi events (API level 33) + "wifi_connection": { + "tag_id": 210037, + "name": "WiFi Connection", + "description": "Device attempted to connect to a managed WiFi network", + }, + "wifi_disconnection": { + "tag_id": 210038, + "name": "WiFi Disconnection", + "description": "Device disconnected from a managed WiFi network", + }, + # Bluetooth events (API level 33) + "bluetooth_connection": { + "tag_id": 210039, + "name": "Bluetooth Connection", + "description": "Device attempted to connect to a Bluetooth device", + }, + "bluetooth_disconnection": { + "tag_id": 210040, + "name": "Bluetooth Disconnection", + "description": "Device disconnected from a Bluetooth device", + }, + # Package events (API level 34) + "package_installed": { + "tag_id": 210041, + "name": "Package Installed", + "description": "Application package was installed", + }, + "package_updated": { + "tag_id": 210042, + "name": "Package Updated", + "description": "Application package was updated", + }, + "package_uninstalled": { + "tag_id": 210043, + "name": "Package Uninstalled", + "description": "Application package was uninstalled", + }, + # Backup service event (API level 35) + "backup_service_toggled": { + "tag_id": 210044, + "name": "Backup Service Toggled", + "description": "Admin enabled or disabled backup service", + }, + # NFC events (API level 36) + "nfc_enabled": { + "tag_id": 210045, + "name": "NFC Enabled", + "description": "NFC service is enabled", + }, + "nfc_disabled": { + "tag_id": 210046, + "name": "NFC Disabled", + "description": "NFC service is disabled", + }, +} + +SECURITY_EVENT_METADATA_KEYS = { + "event_time", + "event_type", + "timestamp", +} + + +class SecurityEvent(IntrusionLogsModule): + """This module analyzes security events from intrusion logs.""" + + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + module_options: Optional[dict] = None, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None, + ) -> None: + super().__init__( + file_path=file_path, + target_path=target_path, + results_path=results_path, + module_options=module_options, + log=log, + results=results, + ) + self.event_type_counts: dict[str, int] = {} + + def _get_event_tag(self, event_data: dict) -> Optional[str]: + """Return the security-event tag key, including tags unknown to MVT.""" + for key in event_data: + if key not in SECURITY_EVENT_METADATA_KEYS: + return key + + return None + + def check_indicators(self) -> None: + """Check security events against indicators of compromise.""" + if not self.indicators: + return + + for result in self.results: + # Check app process start events for suspicious package names + if "app_process_start" in result: + process_info = result["app_process_start"] + process_name = process_info.get("process", "") + if process_name: + # Check the full process name + ioc = self.indicators.check_app_id(process_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Also check process components after the first colon + # Example: "com.google.android.webview:sandboxed_process0:org.chromium.content.app.SandboxedProcessService0:0" + # We want to check "sandboxed_process0" and subsequent components + if ":" in process_name: + components = process_name.split(":") + for component in components[ + 1: + ]: # Skip the first component (main package name) + if component: + ioc = self.indicators.check_app_id(component) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + break + + # Check package operations for suspicious packages + for pkg_event in [ + "package_installed", + "package_updated", + "package_uninstalled", + ]: + if pkg_event in result: + pkg_info = result[pkg_event] + pkg_name = pkg_info.get("package_name", "") + if pkg_name: + ioc = self.indicators.check_app_id(pkg_name) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check ADB shell commands for suspicious patterns + if "adb_shell_cmd" in result: + cmd_info = result["adb_shell_cmd"] + command = cmd_info.get("command", "") + if command: + # Check if command contains any suspicious app IDs + ioc = self.indicators.check_app_id(command) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Check ADB file sync events for suspicious paths + for adb_event in ["adb_sync_recv_file", "adb_sync_send_file"]: + if adb_event in result: + file_info = result[adb_event] + file_path = file_info.get("path", "") + if file_path: + ioc = self.indicators.check_file_path(file_path) + if ioc: + self.alertstore.critical( + ioc.message, + result.get("timestamp") or "", + result, + matched_indicator=ioc.ioc, + ) + + # Flag failed cryptographic operations as potentially suspicious + if "key_generated" in result: + if not result["key_generated"].get("success", True): + self.log.warning( + "Failed key generation detected for key_id: %s", + result["key_generated"].get("key_id", "unknown"), + ) + + # Flag certificate validation failures + if "cert_validation_failure" in result: + self.log.warning( + "Certificate validation failure detected: %s", + result.get("cert_validation_failure"), + ) + + # Flag key integrity violations + if "key_integrity_violation" in result: + self.alertstore.medium( + f"Key integrity violation detected: {result.get('key_integrity_violation')}", + result.get("timestamp") or "", + result, + ) + + # Flag certificate authority installations (potential MITM) + if "cert_authority_installed" in result: + cert_info = result["cert_authority_installed"] + self.log.warning( + "Certificate authority installed: %s (success: %s)", + cert_info.get("subject", "unknown"), + cert_info.get("success", "unknown"), + ) + + # Flag wipe failures + if "wipe_failure" in result: + self.alertstore.medium( + "Device wipe failure detected", + result.get("timestamp") or "", + result, + ) + + # Flag crypto self test failures + if "crypto_self_test_completed" in result: + test_result = result["crypto_self_test_completed"] + if isinstance(test_result, dict): + success = test_result.get("success", True) + else: + success = test_result == 1 + if not success: + self.alertstore.medium( + "Cryptographic self test failed", + result.get("timestamp") or "", + result, + ) + + def serialize(self, record: dict) -> Union[dict, list]: + """Serialize a security event record for timeline output.""" + # Determine the event sub-type + event_subtype = None + event_data_str = "" + + event_subtype = self._get_event_tag(record) + if event_subtype: + event_info = record[event_subtype] + + if event_subtype in SECURITY_EVENT_TAGS: + # ADB events + if event_subtype == "adb_shell_interactive": + event_data_str = "ADB interactive shell opened" + elif event_subtype == "adb_shell_cmd": + command = event_info.get("command", "") + event_data_str = f"ADB shell command: {command}" + elif event_subtype == "adb_sync_recv_file": + path = event_info.get("path", "") + event_data_str = f"File pulled via ADB: {path}" + elif event_subtype == "adb_sync_send_file": + path = event_info.get("path", "") + event_data_str = f"File pushed via ADB: {path}" + + # App process events + elif event_subtype == "app_process_start": + process_name = event_info.get("process", "") + uid = event_info.get("uid", "") + pid = event_info.get("pid", "") + event_data_str = ( + f"Process started: {process_name} (UID: {uid}, PID: {pid})" + ) + + # Keyguard events + elif event_subtype == "keyguard_dismiss_auth_attempt": + success = event_info.get("success", False) + method = event_info.get("method_strength", 0) + event_data_str = f"Auth attempt: {'Success' if success else 'Failed'} (method strength: {method})" + elif event_subtype == "keyguard_dismissed": + event_data_str = "Keyguard dismissed" + elif event_subtype == "keyguard_secured": + event_data_str = "Device locked" + elif event_subtype == "keyguard_disabled_features_set": + admin = event_info.get("admin_package", "") + features = event_info.get("disabled_features", "") + event_data_str = ( + f"Keyguard features disabled by {admin}: {features}" + ) + + # Key events + elif event_subtype == "key_generated": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + uid = event_info.get("uid", "") + event_data_str = f"Key {'generated' if success else 'generation failed'}: {key_id} (UID: {uid})" + elif event_subtype == "key_destruction": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + uid = event_info.get("uid", "") + event_data_str = f"Key {'destroyed' if success else 'destruction failed'}: {key_id} (UID: {uid})" + elif event_subtype == "key_import": + success = event_info.get("success", False) + key_id = event_info.get("key_id", "unknown") + event_data_str = ( + f"Key {'imported' if success else 'import failed'}: {key_id}" + ) + elif event_subtype == "key_integrity_violation": + key_id = event_info.get("key_id", "unknown") + event_data_str = f"Key integrity violation: {key_id}" + + # Certificate events + elif event_subtype == "cert_authority_installed": + success = event_info.get("success", False) + subject = event_info.get("subject", "unknown") + event_data_str = f"Cert {'installed' if success else 'install failed'}: {subject}" + elif event_subtype == "cert_authority_removed": + success = event_info.get("success", False) + subject = event_info.get("subject", "unknown") + event_data_str = ( + f"Cert {'removed' if success else 'removal failed'}: {subject}" + ) + elif event_subtype == "cert_validation_failure": + reason = ( + event_info if isinstance(event_info, str) else str(event_info) + ) + event_data_str = f"Certificate validation failure: {reason}" + elif event_subtype == "crypto_self_test_completed": + if isinstance(event_info, dict): + success = event_info.get("success", False) + else: + success = event_info == 1 + event_data_str = ( + f"Crypto self test: {'passed' if success else 'FAILED'}" + ) + + # Package events + elif event_subtype in [ + "package_installed", + "package_updated", + "package_uninstalled", + ]: + pkg_name = event_info.get("package_name", "") + version = event_info.get("version_code", "") + user_id = event_info.get("user_id", "") + action = event_subtype.replace("package_", "").title() + event_data_str = ( + f"Package {action}: {pkg_name} (v{version}, user: {user_id})" + ) + + # OS events + elif event_subtype == "os_startup": + verified_boot = event_info.get("verified_boot_state", "") + dm_verity = event_info.get("dm_verity_mode", "") + event_data_str = f"OS startup (verified boot: {verified_boot}, dm-verity: {dm_verity})" + elif event_subtype == "os_shutdown": + event_data_str = "OS shutdown" + + # Logging events + elif event_subtype == "logging_started": + event_data_str = "Audit logging started" + elif event_subtype == "logging_stopped": + event_data_str = "Audit logging stopped" + elif event_subtype == "log_buffer_size_critical": + event_data_str = "Log buffer at 90% capacity" + + # Media events + elif event_subtype == "media_mount": + mount_point = event_info.get("mount_point", "") + label = event_info.get("volume_label", "") + event_data_str = f"Media mounted: {mount_point} ({label})" + elif event_subtype == "media_unmount": + mount_point = event_info.get("mount_point", "") + label = event_info.get("volume_label", "") + event_data_str = f"Media unmounted: {mount_point} ({label})" + + # Password policy events + elif event_subtype == "password_expiration_set": + admin = event_info.get("admin_package", "") + timeout = event_info.get("timeout_ms", "") + event_data_str = f"Password expiration set by {admin}: {timeout}ms" + elif event_subtype == "password_complexity_set": + admin = event_info.get("admin_package", "") + event_data_str = f"Password complexity set by {admin}" + elif event_subtype == "password_complexity_required": + admin = event_info.get("admin_package", "") + complexity = event_info.get("complexity", "") + event_data_str = ( + f"Password complexity required by {admin}: {complexity}" + ) + elif event_subtype == "password_history_length_set": + admin = event_info.get("admin_package", "") + length = event_info.get("length", "") + event_data_str = f"Password history length set by {admin}: {length}" + elif event_subtype == "password_changed": + complexity = event_info.get("complexity", "") + user_id = event_info.get("user_id", "") + event_data_str = ( + f"Password changed (complexity: {complexity}, user: {user_id})" + ) + elif event_subtype == "max_screen_lock_timeout_set": + admin = event_info.get("admin_package", "") + timeout = event_info.get("timeout_ms", "") + event_data_str = ( + f"Max screen lock timeout set by {admin}: {timeout}ms" + ) + elif event_subtype == "max_password_attempts_set": + admin = event_info.get("admin_package", "") + attempts = event_info.get("max_attempts", "") + event_data_str = f"Max password attempts set by {admin}: {attempts}" + + # Remote lock and wipe events + elif event_subtype == "remote_lock": + admin = event_info.get("admin_package", "") + event_data_str = f"Device remotely locked by {admin}" + elif event_subtype == "wipe_failure": + event_data_str = "Device wipe failed" + + # User restriction events + elif event_subtype == "user_restriction_added": + admin = event_info.get("admin_package", "") + restriction = event_info.get("restriction", "") + event_data_str = f"User restriction added by {admin}: {restriction}" + elif event_subtype == "user_restriction_removed": + admin = event_info.get("admin_package", "") + restriction = event_info.get("restriction", "") + event_data_str = ( + f"User restriction removed by {admin}: {restriction}" + ) + + # WiFi events + elif event_subtype == "wifi_connection": + bssid = event_info.get("bssid", "") + event_type = event_info.get("event_type", "") + reason = event_info.get("reason", "") + event_data_str = f"WiFi connection: {event_type} (BSSID: {bssid})" + if reason: + event_data_str += f" - {reason}" + elif event_subtype == "wifi_disconnection": + bssid = event_info.get("bssid", "") + reason = event_info.get("reason", "") + event_data_str = f"WiFi disconnection (BSSID: {bssid})" + if reason: + event_data_str += f" - {reason}" + + # Bluetooth events + elif event_subtype == "bluetooth_connection": + mac = event_info.get("mac_address", "") + success = event_info.get("success", False) + reason = event_info.get("reason", "") + event_data_str = f"Bluetooth {'connected' if success else 'connection failed'}: {mac}" + if reason: + event_data_str += f" - {reason}" + elif event_subtype == "bluetooth_disconnection": + mac = event_info.get("mac_address", "") + reason = event_info.get("reason", "") + event_data_str = f"Bluetooth disconnected: {mac}" + if reason: + event_data_str += f" - {reason}" + + # Camera policy event + elif event_subtype == "camera_policy_set": + admin = event_info.get("admin_package", "") + disabled = event_info.get("disabled", False) + event_data_str = ( + f"Camera {'disabled' if disabled else 'enabled'} by {admin}" + ) + + # Backup service event + elif event_subtype == "backup_service_toggled": + admin = event_info.get("admin_package", "") + enabled = event_info.get("enabled", False) + event_data_str = f"Backup service {'enabled' if enabled else 'disabled'} by {admin}" + + # NFC events + elif event_subtype == "nfc_enabled": + event_data_str = "NFC enabled" + elif event_subtype == "nfc_disabled": + event_data_str = "NFC disabled" + + else: + event_data_str = ( + f"{SECURITY_EVENT_TAGS.get(event_subtype, {}).get('name', event_subtype)}: " + f"{event_info}" + ) + else: + event_data_str = f"{event_subtype}: {event_info}" + + if not event_subtype: + event_subtype = "unknown" + event_data_str = str(record) + + return { + "timestamp": record.get("timestamp"), + "module": self.__class__.__name__, + "event": event_subtype, + "data": event_data_str, + } + + def process_event(self, event_data: dict) -> None: + """Process a security event and add it to results.""" + # Convert event_time to ISO format + # Security events use nanoseconds since epoch + event_time = event_data.get("event_time") + if event_time: + # Convert nanoseconds to seconds + event_data["timestamp"] = self._localize_timestamp( + event_time / 1_000_000_000.0 + ) + else: + event_data["timestamp"] = None + + # Track event type statistics, including future tags unknown to MVT. + event_tag = self._get_event_tag(event_data) + if event_tag: + self.event_type_counts[event_tag] = ( + self.event_type_counts.get(event_tag, 0) + 1 + ) + + self.results.append(event_data) + + def run(self) -> None: + """Extract and analyze security events from intrusion logs.""" + if not self.target_path: + self.log.error("No target path specified") + return + + self.collect_txt(self.target_path) + self.parse_collected_txt("security_event") + + self.log.info("Identified %d security events", len(self.results)) + + # Log event type breakdown + if self.event_type_counts: + self.log.info("Security event breakdown:") + for event_type, count in sorted( + self.event_type_counts.items(), key=lambda x: x[1], reverse=True + ): + event_name = SECURITY_EVENT_TAGS.get(event_type, {}).get( + "name", event_type + ) + self.log.info(" - %s: %d", event_name, count) + + unknown_event_types = sorted( + event_type + for event_type in self.event_type_counts + if event_type not in SECURITY_EVENT_TAGS + ) + if unknown_event_types: + self.log.warning( + "Found unknown intrusion logging security event type(s): %s. " + "Please open an issue on GitHub so MVT can add support for them.", + ", ".join(unknown_event_types), + ) diff --git a/src/mvt/common/help.py b/src/mvt/common/help.py index fcee8a2..535a059 100644 --- a/src/mvt/common/help.py +++ b/src/mvt/common/help.py @@ -47,3 +47,4 @@ HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION = ( HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report" HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup" HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF" +HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files" diff --git a/tests/android/test_intrusion_logs.py b/tests/android/test_intrusion_logs.py new file mode 100644 index 0000000..6e8f253 --- /dev/null +++ b/tests/android/test_intrusion_logs.py @@ -0,0 +1,154 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2023 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import json +import logging + +from click.testing import CliRunner + +from mvt.android.cli import check_intrusion_logs +from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs +from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule +from mvt.android.modules.intrusion_logs.security_event import SecurityEvent + + +def _write_ndjson(path, records): + path.write_text( + "\n".join(json.dumps(record) for record in records), + encoding="utf-8", + ) + + +def test_load_all_events_preserves_unknown_top_level_event(tmp_path): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "future_event": { + "event_time": 1_700_000_000_000, + "field": "value", + } + } + ], + ) + + module = IntrusionLogsModule(target_path=str(tmp_path)) + events = module.load_all_events(str(tmp_path)) + + assert events == { + "future_event": [ + { + "event_time": 1_700_000_000_000, + "field": "value", + } + ] + } + + +def test_check_intrusion_logs_warns_about_unknown_top_level_event_type( + tmp_path, caplog +): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "future_event": { + "event_time": 1_700_000_000_000, + "field": "value", + } + } + ], + ) + + with caplog.at_level(logging.WARNING): + cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path)) + cmd.run() + + assert "Found unknown intrusion logging event type(s): future_event" in caplog.text + assert "Please open an issue on GitHub" in caplog.text + + +def test_check_intrusion_logs_parses_core_and_unknown_security_events( + tmp_path, caplog +): + _write_ndjson( + tmp_path / "intrusion.txt", + [ + { + "dns_event": { + "event_time": 1_700_000_000_000, + "hostname": "example.com", + "package_name": "com.example.app", + "ip_addresses": ["/1.2.3.4"], + } + }, + { + "connect_event": { + "event_time": 1_700_000_001_000, + "ip_address": "/5.6.7.8", + "port": 443, + "package_name": "com.example.app", + } + }, + { + "security_event": { + "event_time": 1_700_000_002_000_000_000, + "app_process_start": { + "process": "com.example.app", + "uid": 10_000, + "pid": 1234, + }, + } + }, + { + "security_event": { + "event_time": 1_700_000_003_000_000_000, + "future_google_event": { + "field": "value", + }, + } + }, + ], + ) + + with caplog.at_level(logging.WARNING): + cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path)) + cmd.run() + + assert [module.__class__.__name__ for module in cmd.executed] == [ + "DnsEvent", + "ConnectEvent", + "SecurityEvent", + ] + assert [len(module.results) for module in cmd.executed] == [1, 1, 2] + + security_module = next( + module for module in cmd.executed if isinstance(module, SecurityEvent) + ) + assert security_module.event_type_counts["app_process_start"] == 1 + assert security_module.event_type_counts["future_google_event"] == 1 + + future_timeline_events = [ + event for event in cmd.timeline if event["event"] == "future_google_event" + ] + assert len(future_timeline_events) == 1 + assert "future_google_event" in future_timeline_events[0]["data"] + assert "field" in future_timeline_events[0]["data"] + assert ( + "Found unknown intrusion logging security event type(s): future_google_event" + in caplog.text + ) + assert "Please open an issue on GitHub" in caplog.text + + +def test_check_intrusion_logs_cli_lists_modules(tmp_path): + _write_ndjson(tmp_path / "intrusion.txt", []) + + result = CliRunner().invoke(check_intrusion_logs, ["--list-modules", str(tmp_path)]) + + assert result.exit_code == 0 + assert "DnsEvent" in result.output + assert "ConnectEvent" in result.output + assert "SecurityEvent" in result.output