Add Android intrusion log checks

This commit is contained in:
Janik Besendorf
2026-05-11 19:45:06 +02:00
parent c2b0b6db28
commit 758903811c
10 changed files with 1843 additions and 2 deletions
+75 -1
View File
@@ -16,6 +16,7 @@ from mvt.common.help import (
HELP_MSG_CHECK_ANDROIDQF,
HELP_MSG_CHECK_BUGREPORT,
HELP_MSG_CHECK_IOCS,
HELP_MSG_CHECK_INTRUSION_LOGS,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
HELP_MSG_DISABLE_UPDATE_CHECK,
HELP_MSG_HASHES,
@@ -35,6 +36,8 @@ from mvt.common.utils import init_logging, set_verbose_logging
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
from .cmd_check_backup import CmdAndroidCheckBackup
from .cmd_check_bugreport import CmdAndroidCheckBugreport
from .cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from .modules.intrusion_logs import INTRUSION_LOGS_MODULES
from .modules.androidqf import ANDROIDQF_MODULES
from .modules.backup import BACKUP_MODULES
from .modules.backup.helpers import cli_load_android_backup_password
@@ -266,6 +269,75 @@ def check_androidqf(
cmd.show_support_message()
# ==============================================================================
# Command: check-intrusion-logs
# ==============================================================================
@cli.command(
"check-intrusion-logs",
context_settings=CONTEXT_SETTINGS,
help=HELP_MSG_CHECK_INTRUSION_LOGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--timezone",
"-t",
default=None,
help=(
"IANA timezone name for the device, for example 'Europe/Paris'. "
"When provided, event timestamps are expressed in the device's local "
"time instead of UTC."
),
)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("LOGS_PATH", type=click.Path(exists=True))
@click.pass_context
def check_intrusion_logs(
ctx,
iocs,
output,
list_modules,
module,
timezone,
verbose,
logs_path,
):
set_verbose_logging(verbose)
module_options = {}
if timezone:
module_options["device_timezone"] = timezone
cmd = CmdAndroidCheckIntrusionLogs(
target_path=logs_path,
results_path=output,
ioc_files=iocs,
module_name=module,
module_options=module_options if module_options else None,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
)
if list_modules:
cmd.list_modules()
return
log.info("Checking intrusion logs at path: %s", logs_path)
cmd.run()
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
# Command: check-iocs
# ==============================================================================
@@ -284,7 +356,9 @@ def check_androidqf(
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES
cmd.modules = (
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
)
if list_modules:
cmd.list_modules()
+130 -1
View File
@@ -5,10 +5,14 @@
import logging
import os
import shutil
import tempfile
import zipfile
from pathlib import Path
from typing import List, Optional
from mvt.android.artifacts.getprop import GetProp
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.command import Command
@@ -139,6 +143,55 @@ class CmdAndroidCheckAndroidQF(Command):
raise NoAndroidQFBackup
def _read_device_timezone(self) -> Optional[str]:
getprop_files = [
f for f in self.__files if f.replace("\\", "/").endswith("getprop.txt")
]
if not getprop_files:
self.log.warning(
"Could not find getprop.txt; intrusion log timestamps will use UTC."
)
return None
try:
content = self._get_file_content(getprop_files[0]).decode(
"utf-8", errors="ignore"
)
except Exception as exc:
self.log.warning("Could not read getprop.txt: %s", exc)
return None
props = GetProp()
props.parse(content)
timezone = props.get_device_timezone()
if timezone:
self.log.info(
"Device timezone identified from getprop.txt: %s",
timezone,
)
else:
self.log.warning(
"persist.sys.timezone not found in getprop.txt; "
"intrusion log timestamps will use UTC."
)
return timezone
def _get_file_content(self, file_path: str) -> bytes:
if self.__format == "zip" and self.__zip:
handle = self.__zip.open(file_path)
try:
return handle.read()
finally:
handle.close()
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix()
with open(os.path.join(parent_path, file_path), "rb") as handle:
return handle.read()
raise FileNotFoundError(file_path)
def run_bugreport_cmd(self) -> bool:
bugreport = None
try:
@@ -194,9 +247,85 @@ class CmdAndroidCheckAndroidQF(Command):
self.alertstore.extend(cmd.alertstore.alerts)
return True
def run_intrusion_logs_cmd(self) -> bool:
intrusion_log_files = [
f
for f in self.__files
if "/intrusion-logs/" in f.replace("\\", "/")
or f.replace("\\", "/").startswith("intrusion-logs/")
]
if not intrusion_log_files:
self.log.info(
"No intrusion-logs found in AndroidQF data, "
"skipping intrusion logs analysis."
)
return False
self.log.info(
"Found intrusion-logs in AndroidQF data, running intrusion logs analysis."
)
intrusion_logs_path = None
temp_dir = None
try:
if self.__format == "dir" and self.target_path:
intrusion_logs_path = os.path.join(
os.path.abspath(self.target_path), "intrusion-logs"
)
if not os.path.isdir(intrusion_logs_path):
self.log.warning(
"intrusion-logs directory not found at %s",
intrusion_logs_path,
)
return False
elif self.__format == "zip" and self.__zip:
temp_dir = tempfile.mkdtemp(prefix="mvt_intrusion_logs_")
for entry in intrusion_log_files:
normalized = entry.replace("\\", "/")
idx = normalized.find("intrusion-logs/")
relative = normalized[idx + len("intrusion-logs/") :]
if not relative or relative.endswith("/"):
continue
target = os.path.join(temp_dir, relative)
os.makedirs(os.path.dirname(target), exist_ok=True)
with self.__zip.open(entry) as src, open(target, "wb") as dst:
dst.write(src.read())
intrusion_logs_path = temp_dir
else:
return False
adv_module_options = dict(self.module_options or {})
if device_timezone := self._read_device_timezone():
adv_module_options["device_timezone"] = device_timezone
cmd = CmdAndroidCheckIntrusionLogs(
target_path=intrusion_logs_path,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=adv_module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.run()
self.timeline.extend(cmd.timeline)
self.alertstore.extend(cmd.alertstore.alerts)
return True
finally:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
def finish(self) -> None:
"""
Run the bugreport and backup modules if the respective files are found in the AndroidQF data.
Run nested modules if their respective files are found in AndroidQF data.
"""
self.run_bugreport_cmd()
self.run_backup_cmd()
self.run_intrusion_logs_cmd()
@@ -0,0 +1,98 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.intrusion_logs import INTRUSION_LOGS_MODULES
from .modules.intrusion_logs.base import IntrusionLogsModule
log = logging.getLogger(__name__)
class CmdAndroidCheckIntrusionLogs(Command):
"""Command to check Android Intrusion Logging files."""
def __init__(
self,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
iocs=iocs,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
)
self.name = "check-intrusion-logs"
self.modules = INTRUSION_LOGS_MODULES
self._all_events: dict[str, list[dict]] = {}
def init(self) -> None:
if not self.target_path:
raise ValueError("No target path specified")
if not os.path.isdir(self.target_path) and not (
os.path.isfile(self.target_path)
and self.target_path.lower().endswith(".zip")
):
raise ValueError(
f"Target path must be a directory or a .zip file: {self.target_path}"
)
self.log.info("Checking intrusion logs at path: %s", self.target_path)
self._all_events = self._pre_load_events()
def module_init(self, module: IntrusionLogsModule) -> None: # type: ignore[override]
module.il_events_by_type = self._all_events
def finish(self) -> None:
return
def _pre_load_events(self) -> dict[str, list[dict]]:
"""Load and parse all advanced-log files once for reuse by all modules."""
self.log.info("Pre-loading intrusion log files from: %s", self.target_path)
loader = IntrusionLogsModule(
target_path=self.target_path,
log=self.log,
)
try:
all_events = loader.load_all_events(self.target_path)
except Exception as exc:
self.log.error("Failed to pre-load events: %s", exc)
return {}
total_events = sum(len(events) for events in all_events.values())
self.log.info(
"Pre-loaded %d events across %d type(s); modules will reuse this data",
total_events,
len(all_events),
)
return all_events
@@ -0,0 +1,14 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .connect_event import ConnectEvent
from .dns_event import DnsEvent
from .security_event import SecurityEvent
INTRUSION_LOGS_MODULES = [
DnsEvent,
ConnectEvent,
SecurityEvent,
]
@@ -0,0 +1,395 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import datetime
import io
import json
import logging
import zipfile
from pathlib import Path
from typing import Optional, Union
try:
import zoneinfo
except ImportError:
from backports import zoneinfo # type: ignore[no-redef]
from mvt.common.module import MVTModule
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
class IntrusionLogsModule(MVTModule):
"""Base class for modules analyzing intrusion logs (newline-delimited JSON).
Performance note
----------------
Log files can be large and are shared by every module in this package.
To avoid re-reading and re-parsing the same files N times (once per
module), the command layer should call :meth:`load_all_events` exactly
once and then assign the returned dict to the ``il_events_by_type``
attribute of every module instance **before** calling ``run_module``.
When ``il_events_by_type`` is populated:
* :meth:`collect_txt` becomes a no-op (no disk I/O).
* :meth:`parse_collected_txt` iterates the in-memory list for the
requested event type instead of re-parsing raw text.
Modules that are used standalone (e.g. in tests) still work as before
because ``il_events_by_type`` defaults to ``None``, which preserves the
original file-loading code path.
"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
# Raw file content collected by collect_txt (fallback path only).
self.il_files: list[tuple[str, str]] = []
# Pre-parsed events injected by the command layer.
# Keys are event-type strings (e.g. "dns_event"), values are lists of
# raw event-data dicts exactly as they appear in the JSON lines.
# When this is not None, collect_txt and parse_collected_txt use it
# instead of touching the file system.
self.il_events_by_type: Optional[dict[str, list[dict]]] = None
# ------------------------------------------------------------------
# Serialization helper
# ------------------------------------------------------------------
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a record for timeline output."""
return {
"timestamp": record.get("timestamp", record.get("isodate")),
"module": self.__class__.__name__,
"event": record.get("event_type", ""),
"data": str(record),
}
# ------------------------------------------------------------------
# File collection
# ------------------------------------------------------------------
def collect_txt(self, source) -> None:
"""Collect text log files from *source* into ``self.il_files``.
Entry points:
* directory → walk recursively
* zip file → walk zip entries
* anything else → silently skip
If ``self.il_events_by_type`` has already been populated (i.e. the
command layer pre-loaded the events), this method returns immediately
without any disk I/O.
"""
if self.il_events_by_type is not None:
self.log.debug(
"Pre-loaded events available — skipping file collection for %s",
self.__class__.__name__,
)
return
path = Path(source)
if path.is_dir():
self._walk_directory(path)
return
if path.is_file() and path.suffix.lower() == ".zip":
try:
with zipfile.ZipFile(path) as z:
self._walk_zip(z)
except zipfile.BadZipFile:
self.log.debug("Skipping invalid zip: %s", path)
return
self.log.debug("Skipping unsupported source: %s", source)
def _walk_directory(self, root: Path, prefix: str = "") -> None:
for item in root.iterdir():
if item.is_dir():
self._walk_directory(item, prefix=f"{prefix}{item.name}/")
continue
if item.suffix.lower() == ".txt":
self.il_files.append(
(f"{prefix}{item.name}", item.read_text(errors="ignore"))
)
elif item.suffix.lower() == ".zip":
try:
with zipfile.ZipFile(item) as z:
self._walk_zip(z, prefix=f"{prefix}{item.name}::")
except zipfile.BadZipFile:
self.log.warning("Skipping invalid zip: %s", item)
def _walk_zip(self, zf: zipfile.ZipFile, prefix: str = "") -> None:
for info in zf.infolist():
if info.is_dir():
continue
name = info.filename
with zf.open(info) as f:
data = f.read()
if name.lower().endswith(".txt"):
self.il_files.append((f"{prefix}{name}", data.decode(errors="ignore")))
elif name.lower().endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(data)) as inner:
self._walk_zip(inner, prefix=f"{prefix}{name}::")
# ------------------------------------------------------------------
# Single-pass loader (used by the command layer)
# ------------------------------------------------------------------
def load_all_events(self, source) -> dict[str, list[dict]]:
"""Read every log file under *source* **once** and parse all JSON
lines in a single pass, routing events into per-type buckets.
Returns a ``dict`` mapping *event_type* strings to lists of raw
event-data dicts. The result is also stored in
``self.il_events_by_type`` so that subsequent calls to
:meth:`collect_txt` and :meth:`parse_collected_txt` on *this*
instance are no-ops.
Intended usage in the command layer::
loader = IntrusionLogsModule(target_path=target, log=log)
all_events = loader.load_all_events(target)
for module_cls in INTRUSION_LOGS_MODULES:
m = module_cls(target_path=target, ...)
m.il_events_by_type = all_events # inject — no re-reading
run_module(m)
"""
# Reset so that _collect_txt actually runs (il_events_by_type is None).
self.il_events_by_type = None
self.il_files = []
self.collect_txt(source)
events_by_type: dict[str, list[dict]] = {}
# JSON fingerprints used to drop events that appear in more than one
# log file (overlapping daily files are the most common source of
# cross-file duplicates).
seen_fingerprints: set[str] = set()
total_lines = 0
skipped_lines = 0
duplicate_lines = 0
for file_name, text in self.il_files:
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if not line:
continue
total_lines += 1
try:
entry = json.loads(line)
for event_type, event_data in entry.items():
if isinstance(event_data, dict):
fingerprint = json.dumps(event_data, sort_keys=True)
if fingerprint in seen_fingerprints:
duplicate_lines += 1
continue
seen_fingerprints.add(fingerprint)
events_by_type.setdefault(event_type, []).append(event_data)
except json.JSONDecodeError as e:
skipped_lines += 1
self.log.warning(
"Failed to parse JSON on line %d in %s: %s",
line_num,
file_name,
e,
)
except Exception as e:
skipped_lines += 1
self.log.warning(
"Error processing line %d in %s: %s",
line_num,
file_name,
e,
)
if duplicate_lines:
self.log.info(
"Removed %d duplicate event(s) seen across multiple log files",
duplicate_lines,
)
self.log.info(
"Loaded %d log files, parsed %d lines (%d skipped), found event types: %s",
len(self.il_files),
total_lines,
skipped_lines,
{k: len(v) for k, v in events_by_type.items()},
)
# Cache so this instance also benefits from the fast path.
self.il_events_by_type = events_by_type
return events_by_type
# ------------------------------------------------------------------
# Parsing
# ------------------------------------------------------------------
def parse_collected_txt(self, event_type: str) -> None:
"""Parse collected log text and dispatch events of *event_type*.
Fast path
~~~~~~~~~
When ``self.il_events_by_type`` is populated (injected by the command
layer after a single shared :meth:`load_all_events` call), the method
iterates the already-parsed in-memory list for *event_type* — no
re-reading, no re-parsing of JSON.
Fallback path
~~~~~~~~~~~~~
When ``self.il_events_by_type`` is ``None``, the method falls back to
iterating ``self.il_files`` and parsing each JSON line, which is the
original behaviour.
"""
if self.il_events_by_type is not None:
events = self.il_events_by_type.get(event_type, [])
self.log.debug(
"Using pre-loaded events: dispatching %d '%s' events",
len(events),
event_type,
)
for event_data in events:
try:
# Work on a shallow copy so that mutations in one module
# (e.g. adding "timestamp") do not affect other modules
# that share the same dict reference.
self.process_event(dict(event_data))
except Exception as e:
self.log.warning(
"Error processing pre-parsed '%s' event: %s",
event_type,
e,
)
return
# Fallback: parse raw text collected by collect_txt().
# Use the same JSON-fingerprint approach as MVTModule._deduplicate_timeline
# to drop events that appear verbatim in more than one log file.
seen_fingerprints: set[str] = set()
duplicate_count = 0
for file_name, text in self.il_files:
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if event_type in entry:
event_data = entry[event_type]
fingerprint = json.dumps(event_data, sort_keys=True)
if fingerprint in seen_fingerprints:
duplicate_count += 1
continue
seen_fingerprints.add(fingerprint)
event_data["event_type"] = event_type
self.process_event(event_data)
except json.JSONDecodeError as e:
self.log.warning(
"Failed to parse JSON on line %d in %s: %s",
line_num,
file_name,
str(e),
)
except Exception as e:
self.log.warning(
"Error processing line %d in %s: %s",
line_num,
file_name,
str(e),
)
if duplicate_count:
self.log.info(
"Removed %d duplicate '%s' event(s) seen across multiple log files",
duplicate_count,
event_type,
)
# ------------------------------------------------------------------
# Event processing
# ------------------------------------------------------------------
def process_event(self, event_data: dict) -> None:
"""Process an individual event. Override this in subclasses.
Args:
event_data: Dictionary containing the event data.
"""
self.results.append(event_data)
# ------------------------------------------------------------------
# Timestamp localisation
# ------------------------------------------------------------------
def _localize_timestamp(self, event_time_seconds: float) -> str:
"""Convert a Unix timestamp (in seconds) to an ISO string.
When the device timezone is available via ``module_options["device_timezone"]``
(a IANA timezone name such as ``"Europe/Paris"`` read from
``persist.sys.timezone`` in ``getprop.txt``), the UTC instant is
converted to the device's local time before formatting — mirroring the
approach used by ``AQFFiles``.
When no timezone is configured the method falls back to UTC, which is
consistent with all other MVT modules that call ``convert_unix_to_iso``.
Args:
event_time_seconds: Unix epoch timestamp expressed in **seconds**
(callers are responsible for dividing ms/ns values first).
Returns:
ISO-formatted datetime string (``YYYY-mm-dd HH:MM:SS.ffffff``).
The string always represents the device-local time (or UTC when no
timezone is known); no UTC offset suffix is appended, matching the
format produced by :func:`mvt.common.utils.convert_unix_to_iso`.
"""
tz_name: Optional[str] = self.module_options.get("device_timezone")
if tz_name:
try:
device_tz = zoneinfo.ZoneInfo(tz_name)
utc_dt = datetime.datetime.fromtimestamp(
event_time_seconds, tz=datetime.timezone.utc
)
local_dt = utc_dt.astimezone(device_tz)
# Strip tzinfo so that convert_datetime_to_iso outputs the
# local wall-clock time without a timezone suffix. This is
# the same pattern used by AQFFiles.
return convert_datetime_to_iso(local_dt.replace(tzinfo=None))
except Exception as e:
self.log.warning(
"Could not apply device timezone '%s', falling back to UTC: %s",
tz_name,
e,
)
return convert_unix_to_iso(event_time_seconds)
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
def run(self) -> None:
"""Main execution method. Must be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement the run() method")
@@ -0,0 +1,121 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
class ConnectEvent(IntrusionLogsModule):
"""This module analyzes network connection events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def check_indicators(self) -> None:
"""Check connection events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check IP address against indicators
ip_address = result.get("ip_address", "")
if ip_address:
# Clean IP address (remove leading slash and extract IP from format like "ip6-localhost/::1")
if "/" in ip_address:
parts = ip_address.split("/")
clean_ip = parts[-1] if len(parts) > 1 else parts[0]
else:
clean_ip = ip_address.lstrip("/")
# Skip localhost addresses
if clean_ip and clean_ip not in ["::1", "127.0.0.1", "0.0.0.0"]:
ioc = self.indicators.check_domain(clean_ip)
if ioc:
result["matched_ip"] = clean_ip
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check package name against app identifiers
package_name = result.get("package_name", "")
if package_name:
ioc = self.indicators.check_app_id(package_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a connection event record for timeline output."""
ip_address = record.get("ip_address", "")
port = record.get("port", 0)
package_name = record.get("package_name", "")
matched_ip = record.get("matched_ip", "")
# Clean IP address for display
if "/" in ip_address:
parts = ip_address.split("/")
clean_ip = parts[-1] if len(parts) > 1 else parts[0]
else:
clean_ip = ip_address.lstrip("/")
# Indicate when IP matched an IoC
if matched_ip:
data = f"Connection to {clean_ip}:{port} by {package_name} [Matched IP: {matched_ip}]"
else:
data = f"Connection to {clean_ip}:{port} by {package_name}"
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "network_connection",
"data": data,
}
def process_event(self, event_data: dict) -> None:
"""Process a connection event and add it to results."""
# Convert event_time from milliseconds to ISO format
event_time = event_data.get("event_time")
if event_time:
# Android event times are in milliseconds since epoch
event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0)
else:
event_data["timestamp"] = None
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze connection events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("connect_event")
self.log.info("Identified %d connection events", len(self.results))
@@ -0,0 +1,141 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
class DnsEvent(IntrusionLogsModule):
"""This module analyzes DNS events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def check_indicators(self) -> None:
"""Check DNS events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check hostname against domain indicators
hostname = result.get("hostname", "")
if hostname:
ioc = self.indicators.check_domain(hostname)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check IP addresses against indicators
ip_addresses = result.get("ip_addresses", [])
matched_ips = []
for ip_addr in ip_addresses:
# Remove leading slash if present
clean_ip = (
ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr)
)
if clean_ip and clean_ip != "0.0.0.0":
ioc = self.indicators.check_domain(clean_ip)
if ioc:
matched_ips.append(clean_ip)
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Store matched IPs for timeline display
if matched_ips:
result["matched_ips"] = matched_ips
# Check package name against app identifiers
package_name = result.get("package_name", "")
if package_name:
ioc = self.indicators.check_app_id(package_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a DNS event record for timeline output."""
hostname = record.get("hostname", "")
package_name = record.get("package_name", "")
# Get IP addresses for display
ip_addresses = record.get("ip_addresses", [])
matched_ips = record.get("matched_ips", [])
# Clean up IP addresses (remove leading slashes)
clean_ips = []
for ip_addr in ip_addresses:
clean_ip = ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr)
if clean_ip and clean_ip != "0.0.0.0":
clean_ips.append(clean_ip)
# Build the data string with actual IPs
if matched_ips:
# Highlight matched IPs in the output
ip_display = ", ".join(matched_ips)
data = f"DNS query for {hostname} by {package_name} [Matched IPs: {ip_display}]"
elif clean_ips:
ip_display = ", ".join(clean_ips)
data = f"DNS query for {hostname} by {package_name} [IPs: {ip_display}]"
else:
data = f"DNS query for {hostname} by {package_name}"
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "dns_query",
"data": data,
}
def process_event(self, event_data: dict) -> None:
"""Process a DNS event and add it to results."""
# Convert event_time from milliseconds to ISO format
event_time = event_data.get("event_time")
if event_time:
# Android event times are in milliseconds since epoch
event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0)
else:
event_data["timestamp"] = None
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze DNS events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("dns_event")
self.log.info("Identified %d DNS events", len(self.results))
@@ -0,0 +1,746 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
# Security event tags based on Android SecurityLog API
# Reference: https://developer.android.com/reference/android/app/admin/SecurityLog
SECURITY_EVENT_TAGS = {
# ADB events (API level 24)
"adb_shell_interactive": {
"tag_id": 210001,
"name": "ADB Shell Interactive",
"description": "An ADB interactive shell was opened via 'adb shell'",
},
"adb_shell_cmd": {
"tag_id": 210002,
"name": "ADB Shell Command",
"description": "A shell command was issued over ADB via 'adb shell <command>'",
},
"adb_sync_recv_file": {
"tag_id": 210003,
"name": "ADB Sync Recv File",
"description": "A file was pulled from the device via adb daemon (adb pull)",
},
"adb_sync_send_file": {
"tag_id": 210004,
"name": "ADB Sync Send File",
"description": "A file was pushed to the device via adb daemon (adb push)",
},
# App process events (API level 24)
"app_process_start": {
"tag_id": 210005,
"name": "App Process Start",
"description": "An app process was started",
},
# Keyguard events (API level 24)
"keyguard_dismissed": {
"tag_id": 210006,
"name": "Keyguard Dismissed",
"description": "Keyguard has been dismissed",
},
"keyguard_dismiss_auth_attempt": {
"tag_id": 210007,
"name": "Keyguard Dismiss Auth Attempt",
"description": "Authentication attempt to dismiss keyguard",
},
"keyguard_secured": {
"tag_id": 210008,
"name": "Keyguard Secured",
"description": "Device has been locked",
},
# OS events (API level 28)
"os_startup": {
"tag_id": 210009,
"name": "OS Startup",
"description": "Android OS has started",
},
"os_shutdown": {
"tag_id": 210010,
"name": "OS Shutdown",
"description": "Android OS has shutdown",
},
# Logging events (API level 28)
"logging_started": {
"tag_id": 210011,
"name": "Logging Started",
"description": "Audit logging has started",
},
"logging_stopped": {
"tag_id": 210012,
"name": "Logging Stopped",
"description": "Audit logging has stopped",
},
# Media events (API level 28)
"media_mount": {
"tag_id": 210013,
"name": "Media Mount",
"description": "Removable media has been mounted",
},
"media_unmount": {
"tag_id": 210014,
"name": "Media Unmount",
"description": "Removable media was unmounted",
},
# Log buffer event (API level 28)
"log_buffer_size_critical": {
"tag_id": 210015,
"name": "Log Buffer Size Critical",
"description": "Audit log buffer has reached 90% capacity",
},
# Password policy events (API level 28)
"password_expiration_set": {
"tag_id": 210016,
"name": "Password Expiration Set",
"description": "Admin set password expiration timeout",
},
"password_complexity_set": {
"tag_id": 210017,
"name": "Password Complexity Set",
"description": "Admin set password complexity requirement",
},
"password_history_length_set": {
"tag_id": 210018,
"name": "Password History Length Set",
"description": "Admin set password history length",
},
"max_screen_lock_timeout_set": {
"tag_id": 210019,
"name": "Max Screen Lock Timeout Set",
"description": "Admin set maximum screen lock timeout",
},
"max_password_attempts_set": {
"tag_id": 210020,
"name": "Max Password Attempts Set",
"description": "Admin set maximum failed password attempts before wipe",
},
"keyguard_disabled_features_set": {
"tag_id": 210021,
"name": "Keyguard Disabled Features Set",
"description": "Admin set disabled keyguard features",
},
# Remote lock event (API level 28)
"remote_lock": {
"tag_id": 210022,
"name": "Remote Lock",
"description": "Admin remotely locked the device or profile",
},
# Wipe failure event (API level 28)
"wipe_failure": {
"tag_id": 210023,
"name": "Wipe Failure",
"description": "Failed to wipe device or user data",
},
# Cryptographic key events (API level 28)
"key_generated": {
"tag_id": 210024,
"name": "Key Generated",
"description": "Cryptographic key was generated",
},
"key_import": {
"tag_id": 210025,
"name": "Key Import",
"description": "Cryptographic key was imported",
},
"key_destruction": {
"tag_id": 210026,
"name": "Key Destruction",
"description": "Cryptographic key was destroyed",
},
# User restriction events (API level 28)
"user_restriction_added": {
"tag_id": 210027,
"name": "User Restriction Added",
"description": "Admin added a user restriction",
},
"user_restriction_removed": {
"tag_id": 210028,
"name": "User Restriction Removed",
"description": "Admin removed a user restriction",
},
# Certificate events (API level 28)
"cert_authority_installed": {
"tag_id": 210029,
"name": "Certificate Authority Installed",
"description": "Root certificate installed to trusted storage",
},
"cert_authority_removed": {
"tag_id": 210030,
"name": "Certificate Authority Removed",
"description": "Root certificate removed from trusted storage",
},
"crypto_self_test_completed": {
"tag_id": 210031,
"name": "Crypto Self Test Completed",
"description": "Cryptographic functionality self test completed",
},
"key_integrity_violation": {
"tag_id": 210032,
"name": "Key Integrity Violation",
"description": "Key integrity violation detected",
},
"cert_validation_failure": {
"tag_id": 210033,
"name": "Certificate Validation Failure",
"description": "X.509v3 certificate validation failed",
},
# Camera policy event (API level 30)
"camera_policy_set": {
"tag_id": 210034,
"name": "Camera Policy Set",
"description": "Admin set policy to disable camera",
},
# Password complexity events (API level 31/33)
"password_complexity_required": {
"tag_id": 210035,
"name": "Password Complexity Required",
"description": "Admin set password complexity requirement using predefined levels",
},
"password_changed": {
"tag_id": 210036,
"name": "Password Changed",
"description": "User changed their lockscreen password",
},
# WiFi events (API level 33)
"wifi_connection": {
"tag_id": 210037,
"name": "WiFi Connection",
"description": "Device attempted to connect to a managed WiFi network",
},
"wifi_disconnection": {
"tag_id": 210038,
"name": "WiFi Disconnection",
"description": "Device disconnected from a managed WiFi network",
},
# Bluetooth events (API level 33)
"bluetooth_connection": {
"tag_id": 210039,
"name": "Bluetooth Connection",
"description": "Device attempted to connect to a Bluetooth device",
},
"bluetooth_disconnection": {
"tag_id": 210040,
"name": "Bluetooth Disconnection",
"description": "Device disconnected from a Bluetooth device",
},
# Package events (API level 34)
"package_installed": {
"tag_id": 210041,
"name": "Package Installed",
"description": "Application package was installed",
},
"package_updated": {
"tag_id": 210042,
"name": "Package Updated",
"description": "Application package was updated",
},
"package_uninstalled": {
"tag_id": 210043,
"name": "Package Uninstalled",
"description": "Application package was uninstalled",
},
# Backup service event (API level 35)
"backup_service_toggled": {
"tag_id": 210044,
"name": "Backup Service Toggled",
"description": "Admin enabled or disabled backup service",
},
# NFC events (API level 36)
"nfc_enabled": {
"tag_id": 210045,
"name": "NFC Enabled",
"description": "NFC service is enabled",
},
"nfc_disabled": {
"tag_id": 210046,
"name": "NFC Disabled",
"description": "NFC service is disabled",
},
}
SECURITY_EVENT_METADATA_KEYS = {
"event_time",
"event_type",
"timestamp",
}
class SecurityEvent(IntrusionLogsModule):
"""This module analyzes security events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.event_type_counts: dict[str, int] = {}
def _get_event_tag(self, event_data: dict) -> Optional[str]:
"""Return the security-event tag key, including tags unknown to MVT."""
for key in event_data:
if key not in SECURITY_EVENT_METADATA_KEYS:
return key
return None
def check_indicators(self) -> None:
"""Check security events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check app process start events for suspicious package names
if "app_process_start" in result:
process_info = result["app_process_start"]
process_name = process_info.get("process", "")
if process_name:
# Check the full process name
ioc = self.indicators.check_app_id(process_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Also check process components after the first colon
# Example: "com.google.android.webview:sandboxed_process0:org.chromium.content.app.SandboxedProcessService0:0"
# We want to check "sandboxed_process0" and subsequent components
if ":" in process_name:
components = process_name.split(":")
for component in components[
1:
]: # Skip the first component (main package name)
if component:
ioc = self.indicators.check_app_id(component)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
break
# Check package operations for suspicious packages
for pkg_event in [
"package_installed",
"package_updated",
"package_uninstalled",
]:
if pkg_event in result:
pkg_info = result[pkg_event]
pkg_name = pkg_info.get("package_name", "")
if pkg_name:
ioc = self.indicators.check_app_id(pkg_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check ADB shell commands for suspicious patterns
if "adb_shell_cmd" in result:
cmd_info = result["adb_shell_cmd"]
command = cmd_info.get("command", "")
if command:
# Check if command contains any suspicious app IDs
ioc = self.indicators.check_app_id(command)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check ADB file sync events for suspicious paths
for adb_event in ["adb_sync_recv_file", "adb_sync_send_file"]:
if adb_event in result:
file_info = result[adb_event]
file_path = file_info.get("path", "")
if file_path:
ioc = self.indicators.check_file_path(file_path)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Flag failed cryptographic operations as potentially suspicious
if "key_generated" in result:
if not result["key_generated"].get("success", True):
self.log.warning(
"Failed key generation detected for key_id: %s",
result["key_generated"].get("key_id", "unknown"),
)
# Flag certificate validation failures
if "cert_validation_failure" in result:
self.log.warning(
"Certificate validation failure detected: %s",
result.get("cert_validation_failure"),
)
# Flag key integrity violations
if "key_integrity_violation" in result:
self.alertstore.medium(
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
result.get("timestamp") or "",
result,
)
# Flag certificate authority installations (potential MITM)
if "cert_authority_installed" in result:
cert_info = result["cert_authority_installed"]
self.log.warning(
"Certificate authority installed: %s (success: %s)",
cert_info.get("subject", "unknown"),
cert_info.get("success", "unknown"),
)
# Flag wipe failures
if "wipe_failure" in result:
self.alertstore.medium(
"Device wipe failure detected",
result.get("timestamp") or "",
result,
)
# Flag crypto self test failures
if "crypto_self_test_completed" in result:
test_result = result["crypto_self_test_completed"]
if isinstance(test_result, dict):
success = test_result.get("success", True)
else:
success = test_result == 1
if not success:
self.alertstore.medium(
"Cryptographic self test failed",
result.get("timestamp") or "",
result,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a security event record for timeline output."""
# Determine the event sub-type
event_subtype = None
event_data_str = ""
event_subtype = self._get_event_tag(record)
if event_subtype:
event_info = record[event_subtype]
if event_subtype in SECURITY_EVENT_TAGS:
# ADB events
if event_subtype == "adb_shell_interactive":
event_data_str = "ADB interactive shell opened"
elif event_subtype == "adb_shell_cmd":
command = event_info.get("command", "")
event_data_str = f"ADB shell command: {command}"
elif event_subtype == "adb_sync_recv_file":
path = event_info.get("path", "")
event_data_str = f"File pulled via ADB: {path}"
elif event_subtype == "adb_sync_send_file":
path = event_info.get("path", "")
event_data_str = f"File pushed via ADB: {path}"
# App process events
elif event_subtype == "app_process_start":
process_name = event_info.get("process", "")
uid = event_info.get("uid", "")
pid = event_info.get("pid", "")
event_data_str = (
f"Process started: {process_name} (UID: {uid}, PID: {pid})"
)
# Keyguard events
elif event_subtype == "keyguard_dismiss_auth_attempt":
success = event_info.get("success", False)
method = event_info.get("method_strength", 0)
event_data_str = f"Auth attempt: {'Success' if success else 'Failed'} (method strength: {method})"
elif event_subtype == "keyguard_dismissed":
event_data_str = "Keyguard dismissed"
elif event_subtype == "keyguard_secured":
event_data_str = "Device locked"
elif event_subtype == "keyguard_disabled_features_set":
admin = event_info.get("admin_package", "")
features = event_info.get("disabled_features", "")
event_data_str = (
f"Keyguard features disabled by {admin}: {features}"
)
# Key events
elif event_subtype == "key_generated":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
uid = event_info.get("uid", "")
event_data_str = f"Key {'generated' if success else 'generation failed'}: {key_id} (UID: {uid})"
elif event_subtype == "key_destruction":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
uid = event_info.get("uid", "")
event_data_str = f"Key {'destroyed' if success else 'destruction failed'}: {key_id} (UID: {uid})"
elif event_subtype == "key_import":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
event_data_str = (
f"Key {'imported' if success else 'import failed'}: {key_id}"
)
elif event_subtype == "key_integrity_violation":
key_id = event_info.get("key_id", "unknown")
event_data_str = f"Key integrity violation: {key_id}"
# Certificate events
elif event_subtype == "cert_authority_installed":
success = event_info.get("success", False)
subject = event_info.get("subject", "unknown")
event_data_str = f"Cert {'installed' if success else 'install failed'}: {subject}"
elif event_subtype == "cert_authority_removed":
success = event_info.get("success", False)
subject = event_info.get("subject", "unknown")
event_data_str = (
f"Cert {'removed' if success else 'removal failed'}: {subject}"
)
elif event_subtype == "cert_validation_failure":
reason = (
event_info if isinstance(event_info, str) else str(event_info)
)
event_data_str = f"Certificate validation failure: {reason}"
elif event_subtype == "crypto_self_test_completed":
if isinstance(event_info, dict):
success = event_info.get("success", False)
else:
success = event_info == 1
event_data_str = (
f"Crypto self test: {'passed' if success else 'FAILED'}"
)
# Package events
elif event_subtype in [
"package_installed",
"package_updated",
"package_uninstalled",
]:
pkg_name = event_info.get("package_name", "")
version = event_info.get("version_code", "")
user_id = event_info.get("user_id", "")
action = event_subtype.replace("package_", "").title()
event_data_str = (
f"Package {action}: {pkg_name} (v{version}, user: {user_id})"
)
# OS events
elif event_subtype == "os_startup":
verified_boot = event_info.get("verified_boot_state", "")
dm_verity = event_info.get("dm_verity_mode", "")
event_data_str = f"OS startup (verified boot: {verified_boot}, dm-verity: {dm_verity})"
elif event_subtype == "os_shutdown":
event_data_str = "OS shutdown"
# Logging events
elif event_subtype == "logging_started":
event_data_str = "Audit logging started"
elif event_subtype == "logging_stopped":
event_data_str = "Audit logging stopped"
elif event_subtype == "log_buffer_size_critical":
event_data_str = "Log buffer at 90% capacity"
# Media events
elif event_subtype == "media_mount":
mount_point = event_info.get("mount_point", "")
label = event_info.get("volume_label", "")
event_data_str = f"Media mounted: {mount_point} ({label})"
elif event_subtype == "media_unmount":
mount_point = event_info.get("mount_point", "")
label = event_info.get("volume_label", "")
event_data_str = f"Media unmounted: {mount_point} ({label})"
# Password policy events
elif event_subtype == "password_expiration_set":
admin = event_info.get("admin_package", "")
timeout = event_info.get("timeout_ms", "")
event_data_str = f"Password expiration set by {admin}: {timeout}ms"
elif event_subtype == "password_complexity_set":
admin = event_info.get("admin_package", "")
event_data_str = f"Password complexity set by {admin}"
elif event_subtype == "password_complexity_required":
admin = event_info.get("admin_package", "")
complexity = event_info.get("complexity", "")
event_data_str = (
f"Password complexity required by {admin}: {complexity}"
)
elif event_subtype == "password_history_length_set":
admin = event_info.get("admin_package", "")
length = event_info.get("length", "")
event_data_str = f"Password history length set by {admin}: {length}"
elif event_subtype == "password_changed":
complexity = event_info.get("complexity", "")
user_id = event_info.get("user_id", "")
event_data_str = (
f"Password changed (complexity: {complexity}, user: {user_id})"
)
elif event_subtype == "max_screen_lock_timeout_set":
admin = event_info.get("admin_package", "")
timeout = event_info.get("timeout_ms", "")
event_data_str = (
f"Max screen lock timeout set by {admin}: {timeout}ms"
)
elif event_subtype == "max_password_attempts_set":
admin = event_info.get("admin_package", "")
attempts = event_info.get("max_attempts", "")
event_data_str = f"Max password attempts set by {admin}: {attempts}"
# Remote lock and wipe events
elif event_subtype == "remote_lock":
admin = event_info.get("admin_package", "")
event_data_str = f"Device remotely locked by {admin}"
elif event_subtype == "wipe_failure":
event_data_str = "Device wipe failed"
# User restriction events
elif event_subtype == "user_restriction_added":
admin = event_info.get("admin_package", "")
restriction = event_info.get("restriction", "")
event_data_str = f"User restriction added by {admin}: {restriction}"
elif event_subtype == "user_restriction_removed":
admin = event_info.get("admin_package", "")
restriction = event_info.get("restriction", "")
event_data_str = (
f"User restriction removed by {admin}: {restriction}"
)
# WiFi events
elif event_subtype == "wifi_connection":
bssid = event_info.get("bssid", "")
event_type = event_info.get("event_type", "")
reason = event_info.get("reason", "")
event_data_str = f"WiFi connection: {event_type} (BSSID: {bssid})"
if reason:
event_data_str += f" - {reason}"
elif event_subtype == "wifi_disconnection":
bssid = event_info.get("bssid", "")
reason = event_info.get("reason", "")
event_data_str = f"WiFi disconnection (BSSID: {bssid})"
if reason:
event_data_str += f" - {reason}"
# Bluetooth events
elif event_subtype == "bluetooth_connection":
mac = event_info.get("mac_address", "")
success = event_info.get("success", False)
reason = event_info.get("reason", "")
event_data_str = f"Bluetooth {'connected' if success else 'connection failed'}: {mac}"
if reason:
event_data_str += f" - {reason}"
elif event_subtype == "bluetooth_disconnection":
mac = event_info.get("mac_address", "")
reason = event_info.get("reason", "")
event_data_str = f"Bluetooth disconnected: {mac}"
if reason:
event_data_str += f" - {reason}"
# Camera policy event
elif event_subtype == "camera_policy_set":
admin = event_info.get("admin_package", "")
disabled = event_info.get("disabled", False)
event_data_str = (
f"Camera {'disabled' if disabled else 'enabled'} by {admin}"
)
# Backup service event
elif event_subtype == "backup_service_toggled":
admin = event_info.get("admin_package", "")
enabled = event_info.get("enabled", False)
event_data_str = f"Backup service {'enabled' if enabled else 'disabled'} by {admin}"
# NFC events
elif event_subtype == "nfc_enabled":
event_data_str = "NFC enabled"
elif event_subtype == "nfc_disabled":
event_data_str = "NFC disabled"
else:
event_data_str = (
f"{SECURITY_EVENT_TAGS.get(event_subtype, {}).get('name', event_subtype)}: "
f"{event_info}"
)
else:
event_data_str = f"{event_subtype}: {event_info}"
if not event_subtype:
event_subtype = "unknown"
event_data_str = str(record)
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": event_subtype,
"data": event_data_str,
}
def process_event(self, event_data: dict) -> None:
"""Process a security event and add it to results."""
# Convert event_time to ISO format
# Security events use nanoseconds since epoch
event_time = event_data.get("event_time")
if event_time:
# Convert nanoseconds to seconds
event_data["timestamp"] = self._localize_timestamp(
event_time / 1_000_000_000.0
)
else:
event_data["timestamp"] = None
# Track event type statistics, including future tags unknown to MVT.
event_tag = self._get_event_tag(event_data)
if event_tag:
self.event_type_counts[event_tag] = (
self.event_type_counts.get(event_tag, 0) + 1
)
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze security events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("security_event")
self.log.info("Identified %d security events", len(self.results))
# Log event type breakdown
if self.event_type_counts:
self.log.info("Security event breakdown:")
for event_type, count in sorted(
self.event_type_counts.items(), key=lambda x: x[1], reverse=True
):
event_name = SECURITY_EVENT_TAGS.get(event_type, {}).get(
"name", event_type
)
self.log.info(" - %s: %d", event_name, count)
+1
View File
@@ -47,3 +47,4 @@ HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION = (
HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report"
HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup"
HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF"
HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files"
+122
View File
@@ -0,0 +1,122 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
from click.testing import CliRunner
from mvt.android.cli import check_intrusion_logs
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule
from mvt.android.modules.intrusion_logs.security_event import SecurityEvent
def _write_ndjson(path, records):
path.write_text(
"\n".join(json.dumps(record) for record in records),
encoding="utf-8",
)
def test_load_all_events_preserves_unknown_top_level_event(tmp_path):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"future_event": {
"event_time": 1_700_000_000_000,
"field": "value",
}
}
],
)
module = IntrusionLogsModule(target_path=str(tmp_path))
events = module.load_all_events(str(tmp_path))
assert events == {
"future_event": [
{
"event_time": 1_700_000_000_000,
"field": "value",
}
]
}
def test_check_intrusion_logs_parses_core_and_unknown_security_events(tmp_path):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"dns_event": {
"event_time": 1_700_000_000_000,
"hostname": "example.com",
"package_name": "com.example.app",
"ip_addresses": ["/1.2.3.4"],
}
},
{
"connect_event": {
"event_time": 1_700_000_001_000,
"ip_address": "/5.6.7.8",
"port": 443,
"package_name": "com.example.app",
}
},
{
"security_event": {
"event_time": 1_700_000_002_000_000_000,
"app_process_start": {
"process": "com.example.app",
"uid": 10_000,
"pid": 1234,
},
}
},
{
"security_event": {
"event_time": 1_700_000_003_000_000_000,
"future_google_event": {
"field": "value",
},
}
},
],
)
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
assert [module.__class__.__name__ for module in cmd.executed] == [
"DnsEvent",
"ConnectEvent",
"SecurityEvent",
]
assert [len(module.results) for module in cmd.executed] == [1, 1, 2]
security_module = next(
module for module in cmd.executed if isinstance(module, SecurityEvent)
)
assert security_module.event_type_counts["app_process_start"] == 1
assert security_module.event_type_counts["future_google_event"] == 1
future_timeline_events = [
event for event in cmd.timeline if event["event"] == "future_google_event"
]
assert len(future_timeline_events) == 1
assert "future_google_event" in future_timeline_events[0]["data"]
assert "field" in future_timeline_events[0]["data"]
def test_check_intrusion_logs_cli_lists_modules(tmp_path):
_write_ndjson(tmp_path / "intrusion.txt", [])
result = CliRunner().invoke(check_intrusion_logs, ["--list-modules", str(tmp_path)])
assert result.exit_code == 0
assert "DnsEvent" in result.output
assert "ConnectEvent" in result.output
assert "SecurityEvent" in result.output