Merge remote-tracking branch 'origin/main' into todos

# Conflicts:
#	src/mvt/android/cli.py
This commit is contained in:
Janik Besendorf
2026-06-05 20:01:41 +02:00
24 changed files with 2077 additions and 24 deletions
+82
View File
@@ -0,0 +1,82 @@
# Check Android Intrusion Logs
Recent versions of Android can produce structured *Intrusion Logs* — newline-delimited JSON records derived from the platform's [SecurityLog API](https://developer.android.com/reference/android/app/admin/SecurityLog). Intrusion Logging is offered as a new option under Android's **Advanced Protection Mode**, which users can opt into on their device; no MDM or device-policy configuration is required. When enabled, these logs provide a high-fidelity record of process starts, DNS queries, outbound network connections, ADB activity, keyguard events, and other security-relevant operations. The initial Intrusion Logging feature was released for Android 16 in May 2026. The feature and supported events is likely to be expanded over time.
For background on how this data source was introduced and why it is forensically valuable, see the Amnesty International Security Lab announcement: [Android Intrusion Logging as a new source of data for consensual forensic analysis](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/).
## Recommended workflow: collect with AndroidQF
[AndroidQF](https://github.com/mvt-project/androidqf) is the recommended way to acquire data from an Android device for analysis with MVT. During acquisition AndroidQF will prompt the user to also collect intrusion logs from the device, and writes them into an `intrusion-logs/` subdirectory of the acquisition output.
When you analyse such an acquisition with `mvt-android check-androidqf`, MVT automatically detects the `intrusion-logs/` directory and runs the same intrusion-log checks described below — there is no need to invoke a separate command:
```bash
mvt-android check-androidqf --output /path/to/results/ /path/to/androidqf-output/
```
The device timezone is read from the AndroidQF acquisition (`getprop.txt`) and applied to event timestamps automatically.
## Standalone command: `check-intrusion-logs`
The `mvt-android check-intrusion-logs` command runs the intrusion-log analysis directly against a set of log files. Prefer the AndroidQF workflow above; use the standalone command when the intrusion logs were collected outside of an AndroidQF acquisition, or when re-analysing only a set of intrusion logs.
## Expected input
`check-intrusion-logs` accepts either:
- a **directory** containing one or more `.txt` files (recursively), or
- a **`.zip` archive** containing such `.txt` files (nested `.zip` archives are also walked).
Each `.txt` file is expected to contain newline-delimited JSON, with one JSON object per line. Each object wraps a single event under a top-level key indicating its type, for example:
```json
{"dns_event": {"event_time": 1746979200000, "hostname": "example.com", "ip_addresses": ["93.184.216.34"], "package_name": "com.example.app"}}
{"connect_event": {"event_time": 1746979201000, "ip_address": "93.184.216.34", "port": 443, "package_name": "com.example.app"}}
{"security_event": {"event_time": 1746979202000, "tag": 210005, "data": ["..."]}}
```
Identical events that appear across multiple overlapping log files (e.g. daily rotations) are de-duplicated on a first-seen basis.
## Running the analysis
```bash
mvt-android check-intrusion-logs --output /path/to/results/ /path/to/intrusion-logs/
```
A `.zip` archive can be passed directly in place of the directory:
```bash
mvt-android check-intrusion-logs --output /path/to/results/ /path/to/intrusion-logs.zip
```
### Options
| Option | Description |
| --- | --- |
| `-i, --iocs PATH` | Path to a STIX2 indicator file. May be passed multiple times. |
| `-o, --output PATH` | Directory where JSON results and the timeline CSV will be written. |
| `-l, --list-modules` | List the available intrusion-log modules and exit. |
| `-m, --module NAME` | Run a single module (e.g. `DnsEvent`) instead of all of them. |
| `-t, --timezone TZ` | IANA timezone name for the device (e.g. `Europe/Paris`). When set, event timestamps are converted to the device's local time instead of UTC. |
| `-v, --verbose` | Verbose logging. |
## Modules
The command runs the following modules over the parsed events:
- **`DnsEvent`** — DNS resolution events. Hostnames and resolved IP addresses are checked against domain indicators, and the requesting `package_name` is checked against app-identifier indicators.
- **`ConnectEvent`** — Outbound network connection events. Destination IPs (with localhost addresses skipped) are checked against domain indicators, and `package_name` is checked against app-identifier indicators.
- **`SecurityEvent`** — Security log events keyed by Android `SecurityLog` tag IDs (e.g. `app_process_start`, `adb_shell_cmd`, `keyguard_dismissed`, `os_startup`, `cert_*` events). These are surfaced in the timeline to help reconstruct device activity around suspected events.
All three modules share a single pre-parsing pass over the input, so adding more modules in the future does not multiply I/O cost. Additional modules will be added in the future to support new event types which are generated by the Intrusion Logging feature.
## Interpreting results
A successful IOC match raises a `CRITICAL` alert that includes the matched indicator, the offending event, and the event timestamp. Alerts are summarised at the end of the run and persisted alongside the per-module JSON results.
When `--timezone` is provided, timestamps in the timeline and JSON output reflect the device's local wall-clock time. Otherwise timestamps are in UTC, consistent with the rest of MVT.
## Limitations
- This page assumes the intrusion logs have already been collected from the device. The recommended collection path is via AndroidQF (see above); intrusion logging itself must have been enabled on the device beforehand by opting into Android's Advanced Protection mode and also enabling the optional Intrusion Logging feature (see the [Amnesty blog post](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/) for details).
- As with all IOC-based analysis, public indicators alone are not sufficient to conclude that a device is uncompromised. See the [Indicators of Compromise](../iocs.md) page for context.
+9 -1
View File
@@ -38,9 +38,17 @@ By separating artifact collection from forensic analysis, this approach ensures
For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf).
## Android Intrusion Logs
On devices where the user has opted into Android's [**Advanced Protection Mode**](https://support.google.com/android/answer/16339980) and turned on the optional Intrusion Logging featrue, Android can create and archive structured *Intrusion Logs* in an encrypted format. These logs record DNS queries, outbound network connections, process starts, ADB activity and other security-relevant events, and are a high-fidelity complement to the rest of an AndroidQF acquisition. The logs are generated on-device and encrypted before being stored in the Google account associated with the device. The encryption key is protected by the user device PIN. The intrusion log data is not accessible to Google.
AndroidQF will prompt the user to download, decrypt and collect device intrusion logs as part of an acquisition. When they are present, `mvt-android check-androidqf` will automatically run the intrusion-log checks alongside the other AndroidQF modules — no extra command is required. This is the recommended workflow for Android forensic analysis with MVT.
For cases where intrusion logs were collected outside of an AndroidQF acquisition, the standalone `mvt-android check-intrusion-logs` command can analyse them directly. See [Check Android Intrusion Logs](intrusion_logs.md) for details, and the [feature announcment from Amnesty International's Security Lab](https://securitylab.amnesty.org/latest/2026/05/android-intrusion-logging-as-a-new-source-of-data-for-consensual-forensic-analysis/) for background on the data source.
## Android Debug Bridge analysis removed
The ability to analyze Android devices directly over ADB has been removed from MVT. Use AndroidQF for device acquisition and `mvt-android check-androidqf` for analysis.
The ability to analyze Android devices directly over ADB has been removed from MVT. Direct extraction of data from ADB was error-prone and frequently resulted in inconsistent data collection between ADB and AndroidQF acquisitions. Use AndroidQF for device acquisition and `mvt-android check-androidqf` for analysis.
## Check an Android Backup (SMS messages)
+1
View File
@@ -43,6 +43,7 @@ nav:
- MVT for Android:
- Android Forensic Methodology: "android/methodology.md"
- Check an Android Backup (SMS messages): "android/backup.md"
- Check Android Intrusion Logs: "android/intrusion_logs.md"
- Indicators of Compromise: "iocs.md"
- Development: "development.md"
- License: "license.md"
+1 -1
View File
@@ -35,7 +35,7 @@ dependencies = [
"pydantic-settings==2.13.1",
"NSKeyedUnArchiver==1.5.2",
"python-dateutil==2.9.0.post0",
"tzdata==2026.1",
"tzdata==2026.2",
]
requires-python = ">= 3.10"
@@ -10,16 +10,20 @@ from .artifact import AndroidArtifact
class DumpsysAccessibilityArtifact(AndroidArtifact):
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
if self.indicators:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
self.alertstore.medium(
f'Found accessibility service: "{result["service"]}"',
"",
result,
)
continue
def parse(self, content: str) -> None:
"""
+75 -1
View File
@@ -16,6 +16,7 @@ from mvt.common.help import (
HELP_MSG_CHECK_ANDROIDQF,
HELP_MSG_CHECK_BUGREPORT,
HELP_MSG_CHECK_IOCS,
HELP_MSG_CHECK_INTRUSION_LOGS,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
HELP_MSG_DISABLE_UPDATE_CHECK,
HELP_MSG_HASHES,
@@ -35,6 +36,8 @@ from mvt.common.utils import init_logging, set_verbose_logging
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
from .cmd_check_backup import CmdAndroidCheckBackup
from .cmd_check_bugreport import CmdAndroidCheckBugreport
from .cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from .modules.intrusion_logs import INTRUSION_LOGS_MODULES
from .modules.androidqf import ANDROIDQF_MODULES
from .modules.backup import BACKUP_MODULES
from .modules.backup.helpers import cli_load_android_backup_password
@@ -266,6 +269,75 @@ def check_androidqf(
cmd.show_support_message()
# ==============================================================================
# Command: check-intrusion-logs
# ==============================================================================
@cli.command(
"check-intrusion-logs",
context_settings=CONTEXT_SETTINGS,
help=HELP_MSG_CHECK_INTRUSION_LOGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--timezone",
"-t",
default=None,
help=(
"IANA timezone name for the device, for example 'Europe/Paris'. "
"When provided, event timestamps are expressed in the device's local "
"time instead of UTC."
),
)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("LOGS_PATH", type=click.Path(exists=True))
@click.pass_context
def check_intrusion_logs(
ctx,
iocs,
output,
list_modules,
module,
timezone,
verbose,
logs_path,
):
set_verbose_logging(verbose)
module_options = {}
if timezone:
module_options["device_timezone"] = timezone
cmd = CmdAndroidCheckIntrusionLogs(
target_path=logs_path,
results_path=output,
ioc_files=iocs,
module_name=module,
module_options=module_options if module_options else None,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
)
if list_modules:
cmd.list_modules()
return
log.info("Checking intrusion logs at path: %s", logs_path)
cmd.run()
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
# Command: check-iocs
# ==============================================================================
@@ -290,7 +362,9 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
)
cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES
cmd.modules = (
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
)
if list_modules:
cmd.list_modules()
+130 -1
View File
@@ -5,10 +5,14 @@
import logging
import os
import shutil
import tempfile
import zipfile
from pathlib import Path
from typing import List, Optional
from mvt.android.artifacts.getprop import GetProp
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.command import Command
@@ -139,6 +143,55 @@ class CmdAndroidCheckAndroidQF(Command):
raise NoAndroidQFBackup
def _read_device_timezone(self) -> Optional[str]:
getprop_files = [
f for f in self.__files if f.replace("\\", "/").endswith("getprop.txt")
]
if not getprop_files:
self.log.warning(
"Could not find getprop.txt; intrusion log timestamps will use UTC."
)
return None
try:
content = self._get_file_content(getprop_files[0]).decode(
"utf-8", errors="ignore"
)
except Exception as exc:
self.log.warning("Could not read getprop.txt: %s", exc)
return None
props = GetProp()
props.parse(content)
timezone = props.get_device_timezone()
if timezone:
self.log.info(
"Device timezone identified from getprop.txt: %s",
timezone,
)
else:
self.log.warning(
"persist.sys.timezone not found in getprop.txt; "
"intrusion log timestamps will use UTC."
)
return timezone
def _get_file_content(self, file_path: str) -> bytes:
if self.__format == "zip" and self.__zip:
handle = self.__zip.open(file_path)
try:
return handle.read()
finally:
handle.close()
if self.__format == "dir" and self.target_path:
parent_path = Path(self.target_path).absolute().parent.as_posix()
with open(os.path.join(parent_path, file_path), "rb") as handle:
return handle.read()
raise FileNotFoundError(file_path)
def run_bugreport_cmd(self) -> bool:
bugreport = None
try:
@@ -194,9 +247,85 @@ class CmdAndroidCheckAndroidQF(Command):
self.alertstore.extend(cmd.alertstore.alerts)
return True
def run_intrusion_logs_cmd(self) -> bool:
intrusion_log_files = [
f
for f in self.__files
if "/intrusion_logs/" in f.replace("\\", "/")
or f.replace("\\", "/").startswith("intrusion_logs/")
]
if not intrusion_log_files:
self.log.info(
"No intrusion_logs folder found in AndroidQF data, "
"skipping intrusion logs analysis."
)
return False
self.log.info(
"Found intrusion_logs folder in AndroidQF data, running intrusion logs analysis."
)
intrusion_logs_path = None
temp_dir = None
try:
if self.__format == "dir" and self.target_path:
intrusion_logs_path = os.path.join(
os.path.abspath(self.target_path), "intrusion_logs"
)
if not os.path.isdir(intrusion_logs_path):
self.log.warning(
"intrusion_logs directory not found at %s",
intrusion_logs_path,
)
return False
elif self.__format == "zip" and self.__zip:
temp_dir = tempfile.mkdtemp(prefix="mvt_intrusion_logs_")
for entry in intrusion_log_files:
normalized = entry.replace("\\", "/")
idx = normalized.find("intrusion_logs/")
relative = normalized[idx + len("intrusion_logs/") :]
if not relative or relative.endswith("/"):
continue
target = os.path.join(temp_dir, relative)
os.makedirs(os.path.dirname(target), exist_ok=True)
with self.__zip.open(entry) as src, open(target, "wb") as dst:
dst.write(src.read())
intrusion_logs_path = temp_dir
else:
return False
adv_module_options = dict(self.module_options or {})
if device_timezone := self._read_device_timezone():
adv_module_options["device_timezone"] = device_timezone
cmd = CmdAndroidCheckIntrusionLogs(
target_path=intrusion_logs_path,
results_path=self.results_path,
ioc_files=self.ioc_files,
iocs=self.iocs,
module_options=adv_module_options,
hashes=self.hashes,
sub_command=True,
)
cmd.run()
self.timeline.extend(cmd.timeline)
self.alertstore.extend(cmd.alertstore.alerts)
return True
finally:
if temp_dir:
shutil.rmtree(temp_dir, ignore_errors=True)
def finish(self) -> None:
"""
Run the bugreport and backup modules if the respective files are found in the AndroidQF data.
Run nested modules if their respective files are found in AndroidQF data.
"""
self.run_bugreport_cmd()
self.run_backup_cmd()
self.run_intrusion_logs_cmd()
+113
View File
@@ -0,0 +1,113 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.intrusion_logs import (
INTRUSION_LOGS_MODULES,
KNOWN_INTRUSION_LOG_EVENT_TYPES,
)
from .modules.intrusion_logs.base import IntrusionLogsModule
log = logging.getLogger(__name__)
class CmdAndroidCheckIntrusionLogs(Command):
"""Command to check Android Intrusion Logging files."""
def __init__(
self,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
iocs: Optional[Indicators] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: Optional[bool] = False,
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
iocs=iocs,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
sub_command=sub_command,
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
)
self.name = "check-intrusion-logs"
self.modules = INTRUSION_LOGS_MODULES
self._all_events: dict[str, list[dict]] = {}
def init(self) -> None:
if not self.target_path:
raise ValueError("No target path specified")
if not os.path.isdir(self.target_path) and not (
os.path.isfile(self.target_path)
and self.target_path.lower().endswith(".zip")
):
raise ValueError(
f"Target path must be a directory or a .zip file: {self.target_path}"
)
self.log.info("Checking intrusion logs at path: %s", self.target_path)
self._all_events = self._pre_load_events()
def module_init(self, module: IntrusionLogsModule) -> None: # type: ignore[override]
module.il_events_by_type = self._all_events
def finish(self) -> None:
return
def _pre_load_events(self) -> dict[str, list[dict]]:
"""Load and parse all advanced-log files once for reuse by all modules."""
self.log.info("Pre-loading intrusion log files from: %s", self.target_path)
loader = IntrusionLogsModule(
target_path=self.target_path,
log=self.log,
)
try:
all_events = loader.load_all_events(self.target_path)
except Exception as exc:
self.log.error("Failed to pre-load events: %s", exc)
return {}
total_events = sum(len(events) for events in all_events.values())
self.log.info(
"Pre-loaded %d events across %d type(s); modules will reuse this data",
total_events,
len(all_events),
)
unknown_event_types = sorted(
event_type
for event_type in all_events
if event_type not in KNOWN_INTRUSION_LOG_EVENT_TYPES
)
if unknown_event_types:
self.log.warning(
"Found unknown intrusion logging event type(s): %s. "
"Please open an issue on GitHub so MVT can add support for them.",
", ".join(unknown_event_types),
)
return all_events
+5 -1
View File
@@ -6,6 +6,7 @@ import datetime
import fnmatch
import logging
import os
from pathlib import Path
from typing import List, Optional
from zipfile import ZipFile
@@ -70,7 +71,10 @@ class BugReportModule(MVTModule):
else:
if not self.extract_path:
raise ValueError("extract_path is not set")
handle = open(os.path.join(self.extract_path, file_path), "rb")
joined = os.path.join(self.extract_path, file_path)
if not Path(joined).resolve().is_relative_to(Path(self.extract_path).resolve()):
raise ValueError("unsafe file_path")
handle = open(joined, "rb")
data = handle.read()
handle.close()
@@ -0,0 +1,20 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .connect_event import ConnectEvent
from .dns_event import DnsEvent
from .security_event import SecurityEvent
INTRUSION_LOGS_MODULES = [
DnsEvent,
ConnectEvent,
SecurityEvent,
]
KNOWN_INTRUSION_LOG_EVENT_TYPES = {
"connect_event",
"dns_event",
"security_event",
}
@@ -0,0 +1,395 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import datetime
import io
import json
import logging
import zipfile
from pathlib import Path
from typing import Optional, Union
try:
import zoneinfo
except ImportError:
from backports import zoneinfo # type: ignore[no-redef]
from mvt.common.module import MVTModule
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
class IntrusionLogsModule(MVTModule):
"""Base class for modules analyzing intrusion logs (newline-delimited JSON).
Performance note
----------------
Log files can be large and are shared by every module in this package.
To avoid re-reading and re-parsing the same files N times (once per
module), the command layer should call :meth:`load_all_events` exactly
once and then assign the returned dict to the ``il_events_by_type``
attribute of every module instance **before** calling ``run_module``.
When ``il_events_by_type`` is populated:
* :meth:`collect_txt` becomes a no-op (no disk I/O).
* :meth:`parse_collected_txt` iterates the in-memory list for the
requested event type instead of re-parsing raw text.
Modules that are used standalone (e.g. in tests) still work as before
because ``il_events_by_type`` defaults to ``None``, which preserves the
original file-loading code path.
"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
# Raw file content collected by collect_txt (fallback path only).
self.il_files: list[tuple[str, str]] = []
# Pre-parsed events injected by the command layer.
# Keys are event-type strings (e.g. "dns_event"), values are lists of
# raw event-data dicts exactly as they appear in the JSON lines.
# When this is not None, collect_txt and parse_collected_txt use it
# instead of touching the file system.
self.il_events_by_type: Optional[dict[str, list[dict]]] = None
# ------------------------------------------------------------------
# Serialization helper
# ------------------------------------------------------------------
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a record for timeline output."""
return {
"timestamp": record.get("timestamp", record.get("isodate")),
"module": self.__class__.__name__,
"event": record.get("event_type", ""),
"data": str(record),
}
# ------------------------------------------------------------------
# File collection
# ------------------------------------------------------------------
def collect_txt(self, source) -> None:
"""Collect text log files from *source* into ``self.il_files``.
Entry points:
* directory → walk recursively
* zip file → walk zip entries
* anything else → silently skip
If ``self.il_events_by_type`` has already been populated (i.e. the
command layer pre-loaded the events), this method returns immediately
without any disk I/O.
"""
if self.il_events_by_type is not None:
self.log.debug(
"Pre-loaded events available — skipping file collection for %s",
self.__class__.__name__,
)
return
path = Path(source)
if path.is_dir():
self._walk_directory(path)
return
if path.is_file() and path.suffix.lower() == ".zip":
try:
with zipfile.ZipFile(path) as z:
self._walk_zip(z)
except zipfile.BadZipFile:
self.log.debug("Skipping invalid zip: %s", path)
return
self.log.debug("Skipping unsupported source: %s", source)
def _walk_directory(self, root: Path, prefix: str = "") -> None:
for item in root.iterdir():
if item.is_dir():
self._walk_directory(item, prefix=f"{prefix}{item.name}/")
continue
if item.suffix.lower() == ".txt":
self.il_files.append(
(f"{prefix}{item.name}", item.read_text(errors="ignore"))
)
elif item.suffix.lower() == ".zip":
try:
with zipfile.ZipFile(item) as z:
self._walk_zip(z, prefix=f"{prefix}{item.name}::")
except zipfile.BadZipFile:
self.log.warning("Skipping invalid zip: %s", item)
def _walk_zip(self, zf: zipfile.ZipFile, prefix: str = "") -> None:
for info in zf.infolist():
if info.is_dir():
continue
name = info.filename
with zf.open(info) as f:
data = f.read()
if name.lower().endswith(".txt"):
self.il_files.append((f"{prefix}{name}", data.decode(errors="ignore")))
elif name.lower().endswith(".zip"):
with zipfile.ZipFile(io.BytesIO(data)) as inner:
self._walk_zip(inner, prefix=f"{prefix}{name}::")
# ------------------------------------------------------------------
# Single-pass loader (used by the command layer)
# ------------------------------------------------------------------
def load_all_events(self, source) -> dict[str, list[dict]]:
"""Read every log file under *source* **once** and parse all JSON
lines in a single pass, routing events into per-type buckets.
Returns a ``dict`` mapping *event_type* strings to lists of raw
event-data dicts. The result is also stored in
``self.il_events_by_type`` so that subsequent calls to
:meth:`collect_txt` and :meth:`parse_collected_txt` on *this*
instance are no-ops.
Intended usage in the command layer::
loader = IntrusionLogsModule(target_path=target, log=log)
all_events = loader.load_all_events(target)
for module_cls in INTRUSION_LOGS_MODULES:
m = module_cls(target_path=target, ...)
m.il_events_by_type = all_events # inject — no re-reading
run_module(m)
"""
# Reset so that _collect_txt actually runs (il_events_by_type is None).
self.il_events_by_type = None
self.il_files = []
self.collect_txt(source)
events_by_type: dict[str, list[dict]] = {}
# JSON fingerprints used to drop events that appear in more than one
# log file (overlapping daily files are the most common source of
# cross-file duplicates).
seen_fingerprints: set[str] = set()
total_lines = 0
skipped_lines = 0
duplicate_lines = 0
for file_name, text in self.il_files:
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if not line:
continue
total_lines += 1
try:
entry = json.loads(line)
for event_type, event_data in entry.items():
if isinstance(event_data, dict):
fingerprint = json.dumps(event_data, sort_keys=True)
if fingerprint in seen_fingerprints:
duplicate_lines += 1
continue
seen_fingerprints.add(fingerprint)
events_by_type.setdefault(event_type, []).append(event_data)
except json.JSONDecodeError as e:
skipped_lines += 1
self.log.warning(
"Failed to parse JSON on line %d in %s: %s",
line_num,
file_name,
e,
)
except Exception as e:
skipped_lines += 1
self.log.warning(
"Error processing line %d in %s: %s",
line_num,
file_name,
e,
)
if duplicate_lines:
self.log.info(
"Removed %d duplicate event(s) seen across multiple log files",
duplicate_lines,
)
self.log.info(
"Loaded %d log files, parsed %d lines (%d skipped), found event types: %s",
len(self.il_files),
total_lines,
skipped_lines,
{k: len(v) for k, v in events_by_type.items()},
)
# Cache so this instance also benefits from the fast path.
self.il_events_by_type = events_by_type
return events_by_type
# ------------------------------------------------------------------
# Parsing
# ------------------------------------------------------------------
def parse_collected_txt(self, event_type: str) -> None:
"""Parse collected log text and dispatch events of *event_type*.
Fast path
~~~~~~~~~
When ``self.il_events_by_type`` is populated (injected by the command
layer after a single shared :meth:`load_all_events` call), the method
iterates the already-parsed in-memory list for *event_type* — no
re-reading, no re-parsing of JSON.
Fallback path
~~~~~~~~~~~~~
When ``self.il_events_by_type`` is ``None``, the method falls back to
iterating ``self.il_files`` and parsing each JSON line, which is the
original behaviour.
"""
if self.il_events_by_type is not None:
events = self.il_events_by_type.get(event_type, [])
self.log.debug(
"Using pre-loaded events: dispatching %d '%s' events",
len(events),
event_type,
)
for event_data in events:
try:
# Work on a shallow copy so that mutations in one module
# (e.g. adding "timestamp") do not affect other modules
# that share the same dict reference.
self.process_event(dict(event_data))
except Exception as e:
self.log.warning(
"Error processing pre-parsed '%s' event: %s",
event_type,
e,
)
return
# Fallback: parse raw text collected by collect_txt().
# Use the same JSON-fingerprint approach as MVTModule._deduplicate_timeline
# to drop events that appear verbatim in more than one log file.
seen_fingerprints: set[str] = set()
duplicate_count = 0
for file_name, text in self.il_files:
for line_num, line in enumerate(text.splitlines(), start=1):
line = line.strip()
if not line:
continue
try:
entry = json.loads(line)
if event_type in entry:
event_data = entry[event_type]
fingerprint = json.dumps(event_data, sort_keys=True)
if fingerprint in seen_fingerprints:
duplicate_count += 1
continue
seen_fingerprints.add(fingerprint)
event_data["event_type"] = event_type
self.process_event(event_data)
except json.JSONDecodeError as e:
self.log.warning(
"Failed to parse JSON on line %d in %s: %s",
line_num,
file_name,
str(e),
)
except Exception as e:
self.log.warning(
"Error processing line %d in %s: %s",
line_num,
file_name,
str(e),
)
if duplicate_count:
self.log.info(
"Removed %d duplicate '%s' event(s) seen across multiple log files",
duplicate_count,
event_type,
)
# ------------------------------------------------------------------
# Event processing
# ------------------------------------------------------------------
def process_event(self, event_data: dict) -> None:
"""Process an individual event. Override this in subclasses.
Args:
event_data: Dictionary containing the event data.
"""
self.results.append(event_data)
# ------------------------------------------------------------------
# Timestamp localisation
# ------------------------------------------------------------------
def _localize_timestamp(self, event_time_seconds: float) -> str:
"""Convert a Unix timestamp (in seconds) to an ISO string.
When the device timezone is available via ``module_options["device_timezone"]``
(a IANA timezone name such as ``"Europe/Paris"`` read from
``persist.sys.timezone`` in ``getprop.txt``), the UTC instant is
converted to the device's local time before formatting — mirroring the
approach used by ``AQFFiles``.
When no timezone is configured the method falls back to UTC, which is
consistent with all other MVT modules that call ``convert_unix_to_iso``.
Args:
event_time_seconds: Unix epoch timestamp expressed in **seconds**
(callers are responsible for dividing ms/ns values first).
Returns:
ISO-formatted datetime string (``YYYY-mm-dd HH:MM:SS.ffffff``).
The string always represents the device-local time (or UTC when no
timezone is known); no UTC offset suffix is appended, matching the
format produced by :func:`mvt.common.utils.convert_unix_to_iso`.
"""
tz_name: Optional[str] = self.module_options.get("device_timezone")
if tz_name:
try:
device_tz = zoneinfo.ZoneInfo(tz_name)
utc_dt = datetime.datetime.fromtimestamp(
event_time_seconds, tz=datetime.timezone.utc
)
local_dt = utc_dt.astimezone(device_tz)
# Strip tzinfo so that convert_datetime_to_iso outputs the
# local wall-clock time without a timezone suffix. This is
# the same pattern used by AQFFiles.
return convert_datetime_to_iso(local_dt.replace(tzinfo=None))
except Exception as e:
self.log.warning(
"Could not apply device timezone '%s', falling back to UTC: %s",
tz_name,
e,
)
return convert_unix_to_iso(event_time_seconds)
# ------------------------------------------------------------------
# Abstract interface
# ------------------------------------------------------------------
def run(self) -> None:
"""Main execution method. Must be implemented by subclasses."""
raise NotImplementedError("Subclasses must implement the run() method")
@@ -0,0 +1,121 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
class ConnectEvent(IntrusionLogsModule):
"""This module analyzes network connection events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def check_indicators(self) -> None:
"""Check connection events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check IP address against indicators
ip_address = result.get("ip_address", "")
if ip_address:
# Clean IP address (remove leading slash and extract IP from format like "ip6-localhost/::1")
if "/" in ip_address:
parts = ip_address.split("/")
clean_ip = parts[-1] if len(parts) > 1 else parts[0]
else:
clean_ip = ip_address.lstrip("/")
# Skip localhost addresses
if clean_ip and clean_ip not in ["::1", "127.0.0.1", "0.0.0.0"]:
ioc = self.indicators.check_domain(clean_ip)
if ioc:
result["matched_ip"] = clean_ip
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check package name against app identifiers
package_name = result.get("package_name", "")
if package_name:
ioc = self.indicators.check_app_id(package_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a connection event record for timeline output."""
ip_address = record.get("ip_address", "")
port = record.get("port", 0)
package_name = record.get("package_name", "")
matched_ip = record.get("matched_ip", "")
# Clean IP address for display
if "/" in ip_address:
parts = ip_address.split("/")
clean_ip = parts[-1] if len(parts) > 1 else parts[0]
else:
clean_ip = ip_address.lstrip("/")
# Indicate when IP matched an IoC
if matched_ip:
data = f"Connection to {clean_ip}:{port} by {package_name} [Matched IP: {matched_ip}]"
else:
data = f"Connection to {clean_ip}:{port} by {package_name}"
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "network_connection",
"data": data,
}
def process_event(self, event_data: dict) -> None:
"""Process a connection event and add it to results."""
# Convert event_time from milliseconds to ISO format
event_time = event_data.get("event_time")
if event_time:
# Android event times are in milliseconds since epoch
event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0)
else:
event_data["timestamp"] = None
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze connection events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("connect_event")
self.log.info("Identified %d connection events", len(self.results))
@@ -0,0 +1,141 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
class DnsEvent(IntrusionLogsModule):
"""This module analyzes DNS events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def check_indicators(self) -> None:
"""Check DNS events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check hostname against domain indicators
hostname = result.get("hostname", "")
if hostname:
ioc = self.indicators.check_domain(hostname)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check IP addresses against indicators
ip_addresses = result.get("ip_addresses", [])
matched_ips = []
for ip_addr in ip_addresses:
# Remove leading slash if present
clean_ip = (
ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr)
)
if clean_ip and clean_ip != "0.0.0.0":
ioc = self.indicators.check_domain(clean_ip)
if ioc:
matched_ips.append(clean_ip)
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Store matched IPs for timeline display
if matched_ips:
result["matched_ips"] = matched_ips
# Check package name against app identifiers
package_name = result.get("package_name", "")
if package_name:
ioc = self.indicators.check_app_id(package_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a DNS event record for timeline output."""
hostname = record.get("hostname", "")
package_name = record.get("package_name", "")
# Get IP addresses for display
ip_addresses = record.get("ip_addresses", [])
matched_ips = record.get("matched_ips", [])
# Clean up IP addresses (remove leading slashes)
clean_ips = []
for ip_addr in ip_addresses:
clean_ip = ip_addr.lstrip("/") if isinstance(ip_addr, str) else str(ip_addr)
if clean_ip and clean_ip != "0.0.0.0":
clean_ips.append(clean_ip)
# Build the data string with actual IPs
if matched_ips:
# Highlight matched IPs in the output
ip_display = ", ".join(matched_ips)
data = f"DNS query for {hostname} by {package_name} [Matched IPs: {ip_display}]"
elif clean_ips:
ip_display = ", ".join(clean_ips)
data = f"DNS query for {hostname} by {package_name} [IPs: {ip_display}]"
else:
data = f"DNS query for {hostname} by {package_name}"
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "dns_query",
"data": data,
}
def process_event(self, event_data: dict) -> None:
"""Process a DNS event and add it to results."""
# Convert event_time from milliseconds to ISO format
event_time = event_data.get("event_time")
if event_time:
# Android event times are in milliseconds since epoch
event_data["timestamp"] = self._localize_timestamp(event_time / 1000.0)
else:
event_data["timestamp"] = None
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze DNS events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("dns_event")
self.log.info("Identified %d DNS events", len(self.results))
@@ -0,0 +1,758 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from .base import IntrusionLogsModule
# Security event tags based on Android SecurityLog API
# Reference: https://developer.android.com/reference/android/app/admin/SecurityLog
SECURITY_EVENT_TAGS = {
# ADB events (API level 24)
"adb_shell_interactive": {
"tag_id": 210001,
"name": "ADB Shell Interactive",
"description": "An ADB interactive shell was opened via 'adb shell'",
},
"adb_shell_cmd": {
"tag_id": 210002,
"name": "ADB Shell Command",
"description": "A shell command was issued over ADB via 'adb shell <command>'",
},
"adb_sync_recv_file": {
"tag_id": 210003,
"name": "ADB Sync Recv File",
"description": "A file was pulled from the device via adb daemon (adb pull)",
},
"adb_sync_send_file": {
"tag_id": 210004,
"name": "ADB Sync Send File",
"description": "A file was pushed to the device via adb daemon (adb push)",
},
# App process events (API level 24)
"app_process_start": {
"tag_id": 210005,
"name": "App Process Start",
"description": "An app process was started",
},
# Keyguard events (API level 24)
"keyguard_dismissed": {
"tag_id": 210006,
"name": "Keyguard Dismissed",
"description": "Keyguard has been dismissed",
},
"keyguard_dismiss_auth_attempt": {
"tag_id": 210007,
"name": "Keyguard Dismiss Auth Attempt",
"description": "Authentication attempt to dismiss keyguard",
},
"keyguard_secured": {
"tag_id": 210008,
"name": "Keyguard Secured",
"description": "Device has been locked",
},
# OS events (API level 28)
"os_startup": {
"tag_id": 210009,
"name": "OS Startup",
"description": "Android OS has started",
},
"os_shutdown": {
"tag_id": 210010,
"name": "OS Shutdown",
"description": "Android OS has shutdown",
},
# Logging events (API level 28)
"logging_started": {
"tag_id": 210011,
"name": "Logging Started",
"description": "Audit logging has started",
},
"logging_stopped": {
"tag_id": 210012,
"name": "Logging Stopped",
"description": "Audit logging has stopped",
},
# Media events (API level 28)
"media_mount": {
"tag_id": 210013,
"name": "Media Mount",
"description": "Removable media has been mounted",
},
"media_unmount": {
"tag_id": 210014,
"name": "Media Unmount",
"description": "Removable media was unmounted",
},
# Log buffer event (API level 28)
"log_buffer_size_critical": {
"tag_id": 210015,
"name": "Log Buffer Size Critical",
"description": "Audit log buffer has reached 90% capacity",
},
# Password policy events (API level 28)
"password_expiration_set": {
"tag_id": 210016,
"name": "Password Expiration Set",
"description": "Admin set password expiration timeout",
},
"password_complexity_set": {
"tag_id": 210017,
"name": "Password Complexity Set",
"description": "Admin set password complexity requirement",
},
"password_history_length_set": {
"tag_id": 210018,
"name": "Password History Length Set",
"description": "Admin set password history length",
},
"max_screen_lock_timeout_set": {
"tag_id": 210019,
"name": "Max Screen Lock Timeout Set",
"description": "Admin set maximum screen lock timeout",
},
"max_password_attempts_set": {
"tag_id": 210020,
"name": "Max Password Attempts Set",
"description": "Admin set maximum failed password attempts before wipe",
},
"keyguard_disabled_features_set": {
"tag_id": 210021,
"name": "Keyguard Disabled Features Set",
"description": "Admin set disabled keyguard features",
},
# Remote lock event (API level 28)
"remote_lock": {
"tag_id": 210022,
"name": "Remote Lock",
"description": "Admin remotely locked the device or profile",
},
# Wipe failure event (API level 28)
"wipe_failure": {
"tag_id": 210023,
"name": "Wipe Failure",
"description": "Failed to wipe device or user data",
},
# Cryptographic key events (API level 28)
"key_generated": {
"tag_id": 210024,
"name": "Key Generated",
"description": "Cryptographic key was generated",
},
"key_import": {
"tag_id": 210025,
"name": "Key Import",
"description": "Cryptographic key was imported",
},
"key_destruction": {
"tag_id": 210026,
"name": "Key Destruction",
"description": "Cryptographic key was destroyed",
},
# User restriction events (API level 28)
"user_restriction_added": {
"tag_id": 210027,
"name": "User Restriction Added",
"description": "Admin added a user restriction",
},
"user_restriction_removed": {
"tag_id": 210028,
"name": "User Restriction Removed",
"description": "Admin removed a user restriction",
},
# Certificate events (API level 28)
"cert_authority_installed": {
"tag_id": 210029,
"name": "Certificate Authority Installed",
"description": "Root certificate installed to trusted storage",
},
"cert_authority_removed": {
"tag_id": 210030,
"name": "Certificate Authority Removed",
"description": "Root certificate removed from trusted storage",
},
"crypto_self_test_completed": {
"tag_id": 210031,
"name": "Crypto Self Test Completed",
"description": "Cryptographic functionality self test completed",
},
"key_integrity_violation": {
"tag_id": 210032,
"name": "Key Integrity Violation",
"description": "Key integrity violation detected",
},
"cert_validation_failure": {
"tag_id": 210033,
"name": "Certificate Validation Failure",
"description": "X.509v3 certificate validation failed",
},
# Camera policy event (API level 30)
"camera_policy_set": {
"tag_id": 210034,
"name": "Camera Policy Set",
"description": "Admin set policy to disable camera",
},
# Password complexity events (API level 31/33)
"password_complexity_required": {
"tag_id": 210035,
"name": "Password Complexity Required",
"description": "Admin set password complexity requirement using predefined levels",
},
"password_changed": {
"tag_id": 210036,
"name": "Password Changed",
"description": "User changed their lockscreen password",
},
# WiFi events (API level 33)
"wifi_connection": {
"tag_id": 210037,
"name": "WiFi Connection",
"description": "Device attempted to connect to a managed WiFi network",
},
"wifi_disconnection": {
"tag_id": 210038,
"name": "WiFi Disconnection",
"description": "Device disconnected from a managed WiFi network",
},
# Bluetooth events (API level 33)
"bluetooth_connection": {
"tag_id": 210039,
"name": "Bluetooth Connection",
"description": "Device attempted to connect to a Bluetooth device",
},
"bluetooth_disconnection": {
"tag_id": 210040,
"name": "Bluetooth Disconnection",
"description": "Device disconnected from a Bluetooth device",
},
# Package events (API level 34)
"package_installed": {
"tag_id": 210041,
"name": "Package Installed",
"description": "Application package was installed",
},
"package_updated": {
"tag_id": 210042,
"name": "Package Updated",
"description": "Application package was updated",
},
"package_uninstalled": {
"tag_id": 210043,
"name": "Package Uninstalled",
"description": "Application package was uninstalled",
},
# Backup service event (API level 35)
"backup_service_toggled": {
"tag_id": 210044,
"name": "Backup Service Toggled",
"description": "Admin enabled or disabled backup service",
},
# NFC events (API level 36)
"nfc_enabled": {
"tag_id": 210045,
"name": "NFC Enabled",
"description": "NFC service is enabled",
},
"nfc_disabled": {
"tag_id": 210046,
"name": "NFC Disabled",
"description": "NFC service is disabled",
},
}
SECURITY_EVENT_METADATA_KEYS = {
"event_time",
"event_type",
"timestamp",
}
class SecurityEvent(IntrusionLogsModule):
"""This module analyzes security events from intrusion logs."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self.event_type_counts: dict[str, int] = {}
def _get_event_tag(self, event_data: dict) -> Optional[str]:
"""Return the security-event tag key, including tags unknown to MVT."""
for key in event_data:
if key not in SECURITY_EVENT_METADATA_KEYS:
return key
return None
def check_indicators(self) -> None:
"""Check security events against indicators of compromise."""
if not self.indicators:
return
for result in self.results:
# Check app process start events for suspicious package names
if "app_process_start" in result:
process_info = result["app_process_start"]
process_name = process_info.get("process", "")
if process_name:
# Check the full process name
ioc = self.indicators.check_app_id(process_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Also check process components after the first colon
# Example: "com.google.android.webview:sandboxed_process0:org.chromium.content.app.SandboxedProcessService0:0"
# We want to check "sandboxed_process0" and subsequent components
if ":" in process_name:
components = process_name.split(":")
for component in components[
1:
]: # Skip the first component (main package name)
if component:
ioc = self.indicators.check_app_id(component)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
break
# Check package operations for suspicious packages
for pkg_event in [
"package_installed",
"package_updated",
"package_uninstalled",
]:
if pkg_event in result:
pkg_info = result[pkg_event]
pkg_name = pkg_info.get("package_name", "")
if pkg_name:
ioc = self.indicators.check_app_id(pkg_name)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check ADB shell commands for suspicious patterns
if "adb_shell_cmd" in result:
cmd_info = result["adb_shell_cmd"]
command = cmd_info.get("command", "")
if command:
# Check if command contains any suspicious app IDs
ioc = self.indicators.check_app_id(command)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Check ADB file sync events for suspicious paths
for adb_event in ["adb_sync_recv_file", "adb_sync_send_file"]:
if adb_event in result:
file_info = result[adb_event]
file_path = file_info.get("path", "")
if file_path:
ioc = self.indicators.check_file_path(file_path)
if ioc:
self.alertstore.critical(
ioc.message,
result.get("timestamp") or "",
result,
matched_indicator=ioc.ioc,
)
# Flag failed cryptographic operations as potentially suspicious
if "key_generated" in result:
if not result["key_generated"].get("success", True):
self.log.warning(
"Failed key generation detected for key_id: %s",
result["key_generated"].get("key_id", "unknown"),
)
# Flag certificate validation failures
if "cert_validation_failure" in result:
self.log.warning(
"Certificate validation failure detected: %s",
result.get("cert_validation_failure"),
)
# Flag key integrity violations
if "key_integrity_violation" in result:
self.alertstore.medium(
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
result.get("timestamp") or "",
result,
)
# Flag certificate authority installations (potential MITM)
if "cert_authority_installed" in result:
cert_info = result["cert_authority_installed"]
self.log.warning(
"Certificate authority installed: %s (success: %s)",
cert_info.get("subject", "unknown"),
cert_info.get("success", "unknown"),
)
# Flag wipe failures
if "wipe_failure" in result:
self.alertstore.medium(
"Device wipe failure detected",
result.get("timestamp") or "",
result,
)
# Flag crypto self test failures
if "crypto_self_test_completed" in result:
test_result = result["crypto_self_test_completed"]
if isinstance(test_result, dict):
success = test_result.get("success", True)
else:
success = test_result == 1
if not success:
self.alertstore.medium(
"Cryptographic self test failed",
result.get("timestamp") or "",
result,
)
def serialize(self, record: dict) -> Union[dict, list]:
"""Serialize a security event record for timeline output."""
# Determine the event sub-type
event_subtype = None
event_data_str = ""
event_subtype = self._get_event_tag(record)
if event_subtype:
event_info = record[event_subtype]
if event_subtype in SECURITY_EVENT_TAGS:
# ADB events
if event_subtype == "adb_shell_interactive":
event_data_str = "ADB interactive shell opened"
elif event_subtype == "adb_shell_cmd":
command = event_info.get("command", "")
event_data_str = f"ADB shell command: {command}"
elif event_subtype == "adb_sync_recv_file":
path = event_info.get("path", "")
event_data_str = f"File pulled via ADB: {path}"
elif event_subtype == "adb_sync_send_file":
path = event_info.get("path", "")
event_data_str = f"File pushed via ADB: {path}"
# App process events
elif event_subtype == "app_process_start":
process_name = event_info.get("process", "")
uid = event_info.get("uid", "")
pid = event_info.get("pid", "")
event_data_str = (
f"Process started: {process_name} (UID: {uid}, PID: {pid})"
)
# Keyguard events
elif event_subtype == "keyguard_dismiss_auth_attempt":
success = event_info.get("success", False)
method = event_info.get("method_strength", 0)
event_data_str = f"Auth attempt: {'Success' if success else 'Failed'} (method strength: {method})"
elif event_subtype == "keyguard_dismissed":
event_data_str = "Keyguard dismissed"
elif event_subtype == "keyguard_secured":
event_data_str = "Device locked"
elif event_subtype == "keyguard_disabled_features_set":
admin = event_info.get("admin_package", "")
features = event_info.get("disabled_features", "")
event_data_str = (
f"Keyguard features disabled by {admin}: {features}"
)
# Key events
elif event_subtype == "key_generated":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
uid = event_info.get("uid", "")
event_data_str = f"Key {'generated' if success else 'generation failed'}: {key_id} (UID: {uid})"
elif event_subtype == "key_destruction":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
uid = event_info.get("uid", "")
event_data_str = f"Key {'destroyed' if success else 'destruction failed'}: {key_id} (UID: {uid})"
elif event_subtype == "key_import":
success = event_info.get("success", False)
key_id = event_info.get("key_id", "unknown")
event_data_str = (
f"Key {'imported' if success else 'import failed'}: {key_id}"
)
elif event_subtype == "key_integrity_violation":
key_id = event_info.get("key_id", "unknown")
event_data_str = f"Key integrity violation: {key_id}"
# Certificate events
elif event_subtype == "cert_authority_installed":
success = event_info.get("success", False)
subject = event_info.get("subject", "unknown")
event_data_str = f"Cert {'installed' if success else 'install failed'}: {subject}"
elif event_subtype == "cert_authority_removed":
success = event_info.get("success", False)
subject = event_info.get("subject", "unknown")
event_data_str = (
f"Cert {'removed' if success else 'removal failed'}: {subject}"
)
elif event_subtype == "cert_validation_failure":
reason = (
event_info if isinstance(event_info, str) else str(event_info)
)
event_data_str = f"Certificate validation failure: {reason}"
elif event_subtype == "crypto_self_test_completed":
if isinstance(event_info, dict):
success = event_info.get("success", False)
else:
success = event_info == 1
event_data_str = (
f"Crypto self test: {'passed' if success else 'FAILED'}"
)
# Package events
elif event_subtype in [
"package_installed",
"package_updated",
"package_uninstalled",
]:
pkg_name = event_info.get("package_name", "")
version = event_info.get("version_code", "")
user_id = event_info.get("user_id", "")
action = event_subtype.replace("package_", "").title()
event_data_str = (
f"Package {action}: {pkg_name} (v{version}, user: {user_id})"
)
# OS events
elif event_subtype == "os_startup":
verified_boot = event_info.get("verified_boot_state", "")
dm_verity = event_info.get("dm_verity_mode", "")
event_data_str = f"OS startup (verified boot: {verified_boot}, dm-verity: {dm_verity})"
elif event_subtype == "os_shutdown":
event_data_str = "OS shutdown"
# Logging events
elif event_subtype == "logging_started":
event_data_str = "Audit logging started"
elif event_subtype == "logging_stopped":
event_data_str = "Audit logging stopped"
elif event_subtype == "log_buffer_size_critical":
event_data_str = "Log buffer at 90% capacity"
# Media events
elif event_subtype == "media_mount":
mount_point = event_info.get("mount_point", "")
label = event_info.get("volume_label", "")
event_data_str = f"Media mounted: {mount_point} ({label})"
elif event_subtype == "media_unmount":
mount_point = event_info.get("mount_point", "")
label = event_info.get("volume_label", "")
event_data_str = f"Media unmounted: {mount_point} ({label})"
# Password policy events
elif event_subtype == "password_expiration_set":
admin = event_info.get("admin_package", "")
timeout = event_info.get("timeout_ms", "")
event_data_str = f"Password expiration set by {admin}: {timeout}ms"
elif event_subtype == "password_complexity_set":
admin = event_info.get("admin_package", "")
event_data_str = f"Password complexity set by {admin}"
elif event_subtype == "password_complexity_required":
admin = event_info.get("admin_package", "")
complexity = event_info.get("complexity", "")
event_data_str = (
f"Password complexity required by {admin}: {complexity}"
)
elif event_subtype == "password_history_length_set":
admin = event_info.get("admin_package", "")
length = event_info.get("length", "")
event_data_str = f"Password history length set by {admin}: {length}"
elif event_subtype == "password_changed":
complexity = event_info.get("complexity", "")
user_id = event_info.get("user_id", "")
event_data_str = (
f"Password changed (complexity: {complexity}, user: {user_id})"
)
elif event_subtype == "max_screen_lock_timeout_set":
admin = event_info.get("admin_package", "")
timeout = event_info.get("timeout_ms", "")
event_data_str = (
f"Max screen lock timeout set by {admin}: {timeout}ms"
)
elif event_subtype == "max_password_attempts_set":
admin = event_info.get("admin_package", "")
attempts = event_info.get("max_attempts", "")
event_data_str = f"Max password attempts set by {admin}: {attempts}"
# Remote lock and wipe events
elif event_subtype == "remote_lock":
admin = event_info.get("admin_package", "")
event_data_str = f"Device remotely locked by {admin}"
elif event_subtype == "wipe_failure":
event_data_str = "Device wipe failed"
# User restriction events
elif event_subtype == "user_restriction_added":
admin = event_info.get("admin_package", "")
restriction = event_info.get("restriction", "")
event_data_str = f"User restriction added by {admin}: {restriction}"
elif event_subtype == "user_restriction_removed":
admin = event_info.get("admin_package", "")
restriction = event_info.get("restriction", "")
event_data_str = (
f"User restriction removed by {admin}: {restriction}"
)
# WiFi events
elif event_subtype == "wifi_connection":
bssid = event_info.get("bssid", "")
event_type = event_info.get("event_type", "")
reason = event_info.get("reason", "")
event_data_str = f"WiFi connection: {event_type} (BSSID: {bssid})"
if reason:
event_data_str += f" - {reason}"
elif event_subtype == "wifi_disconnection":
bssid = event_info.get("bssid", "")
reason = event_info.get("reason", "")
event_data_str = f"WiFi disconnection (BSSID: {bssid})"
if reason:
event_data_str += f" - {reason}"
# Bluetooth events
elif event_subtype == "bluetooth_connection":
mac = event_info.get("mac_address", "")
success = event_info.get("success", False)
reason = event_info.get("reason", "")
event_data_str = f"Bluetooth {'connected' if success else 'connection failed'}: {mac}"
if reason:
event_data_str += f" - {reason}"
elif event_subtype == "bluetooth_disconnection":
mac = event_info.get("mac_address", "")
reason = event_info.get("reason", "")
event_data_str = f"Bluetooth disconnected: {mac}"
if reason:
event_data_str += f" - {reason}"
# Camera policy event
elif event_subtype == "camera_policy_set":
admin = event_info.get("admin_package", "")
disabled = event_info.get("disabled", False)
event_data_str = (
f"Camera {'disabled' if disabled else 'enabled'} by {admin}"
)
# Backup service event
elif event_subtype == "backup_service_toggled":
admin = event_info.get("admin_package", "")
enabled = event_info.get("enabled", False)
event_data_str = f"Backup service {'enabled' if enabled else 'disabled'} by {admin}"
# NFC events
elif event_subtype == "nfc_enabled":
event_data_str = "NFC enabled"
elif event_subtype == "nfc_disabled":
event_data_str = "NFC disabled"
else:
event_data_str = (
f"{SECURITY_EVENT_TAGS.get(event_subtype, {}).get('name', event_subtype)}: "
f"{event_info}"
)
else:
event_data_str = f"{event_subtype}: {event_info}"
if not event_subtype:
event_subtype = "unknown"
event_data_str = str(record)
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": event_subtype,
"data": event_data_str,
}
def process_event(self, event_data: dict) -> None:
"""Process a security event and add it to results."""
# Convert event_time to ISO format
# Security events use nanoseconds since epoch
event_time = event_data.get("event_time")
if event_time:
# Convert nanoseconds to seconds
event_data["timestamp"] = self._localize_timestamp(
event_time / 1_000_000_000.0
)
else:
event_data["timestamp"] = None
# Track event type statistics, including future tags unknown to MVT.
event_tag = self._get_event_tag(event_data)
if event_tag:
self.event_type_counts[event_tag] = (
self.event_type_counts.get(event_tag, 0) + 1
)
self.results.append(event_data)
def run(self) -> None:
"""Extract and analyze security events from intrusion logs."""
if not self.target_path:
self.log.error("No target path specified")
return
self.collect_txt(self.target_path)
self.parse_collected_txt("security_event")
self.log.info("Identified %d security events", len(self.results))
# Log event type breakdown
if self.event_type_counts:
self.log.info("Security event breakdown:")
for event_type, count in sorted(
self.event_type_counts.items(), key=lambda x: x[1], reverse=True
):
event_name = SECURITY_EVENT_TAGS.get(event_type, {}).get(
"name", event_type
)
self.log.info(" - %s: %d", event_name, count)
unknown_event_types = sorted(
event_type
for event_type in self.event_type_counts
if event_type not in SECURITY_EVENT_TAGS
)
if unknown_event_types:
self.log.warning(
"Found unknown intrusion logging security event type(s): %s. "
"Please open an issue on GitHub so MVT can add support for them.",
", ".join(unknown_event_types),
)
+1 -1
View File
@@ -219,7 +219,7 @@ class AlertStore:
return alerts
def save_timeline(self, timeline_path: str) -> None:
with open(timeline_path, "a+", encoding="utf-8") as handle:
with open(timeline_path, "w", encoding="utf-8") as handle:
csvoutput = csv.writer(
handle,
delimiter=",",
+1
View File
@@ -47,3 +47,4 @@ HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION = (
HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report"
HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup"
HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF"
HELP_MSG_CHECK_INTRUSION_LOGS = "Check Android Intrusion Logging files"
+1 -1
View File
@@ -260,7 +260,7 @@ def save_timeline(timeline: list, timeline_path: str, is_utc: bool = True) -> No
:param timeline_path: Path to the csv file to store the timeline to
"""
with open(timeline_path, "a+", encoding="utf-8") as handle:
with open(timeline_path, "w", encoding="utf-8") as handle:
csvoutput = csv.writer(
handle, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, escapechar="\\"
)
+1 -1
View File
@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "2026.4.28"
MVT_VERSION = "2026.5.12"
+16
View File
@@ -911,6 +911,10 @@
"version": "15.8.7",
"build": "19H411"
},
{
"version": "15.8.8",
"build": "19H422"
},
{
"build": "20A362",
"version": "16.0"
@@ -1028,6 +1032,10 @@
"version": "16.7.15",
"build": "20H380"
},
{
"version": "16.7.16",
"build": "20H392"
},
{
"version": "17.0",
"build": "21A327"
@@ -1204,6 +1212,10 @@
"version": "18.7.8",
"build": "22H352"
},
{
"version": "18.7.9",
"build": "22H355"
},
{
"version": "26",
"build": "23A341"
@@ -1239,5 +1251,9 @@
{
"version": "26.4.2",
"build": "23E261"
},
{
"version": "26.5",
"build": "23F77"
}
]
+7
View File
@@ -11,6 +11,7 @@ import os
import os.path
import shutil
import sqlite3
from pathlib import Path
from typing import Optional
from iOSbackup import iOSbackup
@@ -96,6 +97,9 @@ class DecryptBackup:
# This may be a partial backup. Skip files from the manifest
# which do not exist locally.
source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id)
if not Path(source_file_path).resolve().is_relative_to(Path(self.backup_path).resolve()):
log.warning("Skipping unsafe file_id: %r", file_id)
continue
if not os.path.exists(source_file_path):
log.debug(
"Skipping file %s. File not found in encrypted backup directory.",
@@ -104,6 +108,9 @@ class DecryptBackup:
continue
item_folder = os.path.join(self.dest_path, file_id[0:2]) # type: ignore[arg-type]
if not Path(os.path.join(item_folder, file_id)).resolve().is_relative_to(Path(self.dest_path).resolve()):
log.warning("Skipping unsafe file_id: %r", file_id)
continue
if not os.path.exists(item_folder):
os.makedirs(item_folder)
+3
View File
@@ -9,6 +9,7 @@ import os
import shutil
import sqlite3
import subprocess
from pathlib import Path
from typing import Iterator, Optional, Union
from mvt.common.module import (
@@ -165,6 +166,8 @@ class IOSExtraction(MVTModule):
if not self.target_path:
return None
file_path = os.path.join(self.target_path, file_id[0:2], file_id)
if not Path(file_path).resolve().is_relative_to(Path(self.target_path).resolve()):
return None
if os.path.exists(file_path):
return file_path
@@ -5,6 +5,7 @@
import logging
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from mvt.common.alerts import AlertLevel
from mvt.common.indicators import Indicators
from ..utils import get_artifact
@@ -38,6 +39,19 @@ class TestDumpsysAccessibilityArtifact:
assert da.results[0]["package_name"] == "com.malware.accessibility"
assert da.results[0]["service"] == "com.malware.service.malwareservice"
def test_accessibility_service_alert(self):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility_v14_or_later.txt")
with open(file) as f:
data = f.read()
da.parse(data)
da.check_indicators()
assert len(da.alertstore.alerts) == 1
assert da.alertstore.alerts[0].level == AlertLevel.MEDIUM
assert da.alertstore.alerts[0].event == da.results[0]
def test_ioc_check(self, indicator_file):
da = DumpsysAccessibilityArtifact()
file = get_artifact("android_data/dumpsys_accessibility.txt")
@@ -51,4 +65,12 @@ class TestDumpsysAccessibilityArtifact:
da.indicators = ind
assert len(da.alertstore.alerts) == 0
da.check_indicators()
assert len(da.alertstore.alerts) == 1
assert len(da.alertstore.alerts) == len(da.results)
assert da.alertstore.count(AlertLevel.MEDIUM) == 3
assert da.alertstore.count(AlertLevel.CRITICAL) == 1
critical_alert = next(
alert
for alert in da.alertstore.alerts
if alert.level == AlertLevel.CRITICAL
)
assert critical_alert.event["package_name"] == "com.sec.android.app.camera"
+154
View File
@@ -0,0 +1,154 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
import logging
from click.testing import CliRunner
from mvt.android.cli import check_intrusion_logs
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule
from mvt.android.modules.intrusion_logs.security_event import SecurityEvent
def _write_ndjson(path, records):
path.write_text(
"\n".join(json.dumps(record) for record in records),
encoding="utf-8",
)
def test_load_all_events_preserves_unknown_top_level_event(tmp_path):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"future_event": {
"event_time": 1_700_000_000_000,
"field": "value",
}
}
],
)
module = IntrusionLogsModule(target_path=str(tmp_path))
events = module.load_all_events(str(tmp_path))
assert events == {
"future_event": [
{
"event_time": 1_700_000_000_000,
"field": "value",
}
]
}
def test_check_intrusion_logs_warns_about_unknown_top_level_event_type(
tmp_path, caplog
):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"future_event": {
"event_time": 1_700_000_000_000,
"field": "value",
}
}
],
)
with caplog.at_level(logging.WARNING):
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
assert "Found unknown intrusion logging event type(s): future_event" in caplog.text
assert "Please open an issue on GitHub" in caplog.text
def test_check_intrusion_logs_parses_core_and_unknown_security_events(
tmp_path, caplog
):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"dns_event": {
"event_time": 1_700_000_000_000,
"hostname": "example.com",
"package_name": "com.example.app",
"ip_addresses": ["/1.2.3.4"],
}
},
{
"connect_event": {
"event_time": 1_700_000_001_000,
"ip_address": "/5.6.7.8",
"port": 443,
"package_name": "com.example.app",
}
},
{
"security_event": {
"event_time": 1_700_000_002_000_000_000,
"app_process_start": {
"process": "com.example.app",
"uid": 10_000,
"pid": 1234,
},
}
},
{
"security_event": {
"event_time": 1_700_000_003_000_000_000,
"future_google_event": {
"field": "value",
},
}
},
],
)
with caplog.at_level(logging.WARNING):
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
assert [module.__class__.__name__ for module in cmd.executed] == [
"DnsEvent",
"ConnectEvent",
"SecurityEvent",
]
assert [len(module.results) for module in cmd.executed] == [1, 1, 2]
security_module = next(
module for module in cmd.executed if isinstance(module, SecurityEvent)
)
assert security_module.event_type_counts["app_process_start"] == 1
assert security_module.event_type_counts["future_google_event"] == 1
future_timeline_events = [
event for event in cmd.timeline if event["event"] == "future_google_event"
]
assert len(future_timeline_events) == 1
assert "future_google_event" in future_timeline_events[0]["data"]
assert "field" in future_timeline_events[0]["data"]
assert (
"Found unknown intrusion logging security event type(s): future_google_event"
in caplog.text
)
assert "Please open an issue on GitHub" in caplog.text
def test_check_intrusion_logs_cli_lists_modules(tmp_path):
_write_ndjson(tmp_path / "intrusion.txt", [])
result = CliRunner().invoke(check_intrusion_logs, ["--list-modules", str(tmp_path)])
assert result.exit_code == 0
assert "DnsEvent" in result.output
assert "ConnectEvent" in result.output
assert "SecurityEvent" in result.output
Generated
+7 -7
View File
@@ -950,7 +950,7 @@ requires-dist = [
{ name = "rich", specifier = "==14.3.3" },
{ name = "simplejson", specifier = "==3.20.2" },
{ name = "tld", specifier = "==0.13.1" },
{ name = "tzdata", specifier = "==2026.1" },
{ name = "tzdata", specifier = "==2026.2" },
]
[package.metadata.requires-dev]
@@ -1794,20 +1794,20 @@ wheels = [
[[package]]
name = "tzdata"
version = "2026.1"
version = "2026.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/f5/cd531b2d15a671a40c0f66cf06bc3570a12cd56eef98960068ebbad1bf5a/tzdata-2026.1.tar.gz", hash = "sha256:67658a1903c75917309e753fdc349ac0efd8c27db7a0cb406a25be4840f87f98", size = 197639, upload-time = "2026-04-03T11:25:22.002Z" }
sdist = { url = "https://files.pythonhosted.org/packages/ba/19/1b9b0e29f30c6d35cb345486df41110984ea67ae69dddbc0e8a100999493/tzdata-2026.2.tar.gz", hash = "sha256:9173fde7d80d9018e02a662e168e5a2d04f87c41ea174b139fbef642eda62d10", size = 198254, upload-time = "2026-04-24T15:22:08.651Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b0/70/d460bd685a170790ec89317e9bd33047988e4bce507b831f5db771e142de/tzdata-2026.1-py2.py3-none-any.whl", hash = "sha256:4b1d2be7ac37ceafd7327b961aa3a54e467efbdb563a23655fbfe0d39cfc42a9", size = 348952, upload-time = "2026-04-03T11:25:20.313Z" },
{ url = "https://files.pythonhosted.org/packages/ce/e4/dccd7f47c4b64213ac01ef921a1337ee6e30e8c6466046018326977efd95/tzdata-2026.2-py2.py3-none-any.whl", hash = "sha256:bbe9af844f658da81a5f95019480da3a89415801f6cc966806612cc7169bffe7", size = 349321, upload-time = "2026-04-24T15:22:05.876Z" },
]
[[package]]
name = "urllib3"
version = "2.6.3"
version = "2.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/c7/24/5f1b3bdffd70275f6661c76461e25f024d5a38a46f04aaca912426a2b1d3/urllib3-2.6.3.tar.gz", hash = "sha256:1b62b6884944a57dbe321509ab94fd4d3b307075e0c2eae991ac71ee15ad38ed", size = 435556, upload-time = "2026-01-07T16:24:43.925Z" }
sdist = { url = "https://files.pythonhosted.org/packages/53/0c/06f8b233b8fd13b9e5ee11424ef85419ba0d8ba0b3138bf360be2ff56953/urllib3-2.7.0.tar.gz", hash = "sha256:231e0ec3b63ceb14667c67be60f2f2c40a518cb38b03af60abc813da26505f4c", size = 433602, upload-time = "2026-05-07T16:13:18.596Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/39/08/aaaad47bc4e9dc8c725e68f9d04865dbcb2052843ff09c97b08904852d84/urllib3-2.6.3-py3-none-any.whl", hash = "sha256:bf272323e553dfb2e87d9bfd225ca7b0f467b919d7bbd355436d3fd37cb0acd4", size = 131584, upload-time = "2026-01-07T16:24:42.685Z" },
{ url = "https://files.pythonhosted.org/packages/7f/3e/5db95bcf282c52709639744ca2a8b149baccf648e39c8cc87553df9eae0c/urllib3-2.7.0-py3-none-any.whl", hash = "sha256:9fb4c81ebbb1ce9531cce37674bbc6f1360472bc18ca9a553ede278ef7276897", size = 131087, upload-time = "2026-05-07T16:13:17.151Z" },
]
[[package]]