mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-15 01:52:45 +00:00
Compare commits
100 Commits
feature/co
...
feature/an
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d82e55e12c | ||
|
|
981371bd8b | ||
|
|
c7d00978c6 | ||
|
|
339a1d0712 | ||
|
|
7009cddc8c | ||
|
|
9b4d10139c | ||
|
|
b795ea3129 | ||
|
|
5be5ffbf49 | ||
|
|
2701490501 | ||
|
|
779842567d | ||
|
|
d3cc8cf590 | ||
|
|
b8a42eaf8f | ||
|
|
62b880fbff | ||
|
|
0778d448df | ||
|
|
f020655a1a | ||
|
|
91c34e6664 | ||
|
|
b4a8dd226a | ||
|
|
88213e12c9 | ||
|
|
f75b8e186a | ||
|
|
5babc1fcf3 | ||
|
|
b723ebf28e | ||
|
|
616e870212 | ||
|
|
847b0e087b | ||
|
|
86a0772eb2 | ||
|
|
7d0be9db4f | ||
|
|
4e120b2640 | ||
|
|
dbe9e5db9b | ||
|
|
0b00398729 | ||
|
|
87034d2c7a | ||
|
|
595a2f6536 | ||
|
|
8ead44a31e | ||
|
|
5c19d02a73 | ||
|
|
14ebc9ee4e | ||
|
|
de53cc07f8 | ||
|
|
22e066fc4a | ||
|
|
242052b8ec | ||
|
|
1df61b5bbf | ||
|
|
b691de2cc0 | ||
|
|
10915f250c | ||
|
|
c60cef4009 | ||
|
|
dda798df8e | ||
|
|
ffe6ad2014 | ||
|
|
a125b20fc5 | ||
|
|
49108e67e2 | ||
|
|
883b450601 | ||
|
|
ce813568ff | ||
|
|
93303f181a | ||
|
|
bee453a090 | ||
|
|
42106aa4d6 | ||
|
|
95076c8f71 | ||
|
|
c9ac12f336 | ||
|
|
486e3e7e9b | ||
|
|
be1fc3bd8b | ||
|
|
4757cff262 | ||
|
|
61f51caf31 | ||
|
|
511063fd0e | ||
|
|
88bc5672cb | ||
|
|
0fce0acf7a | ||
|
|
61f95d07d3 | ||
|
|
3dedd169c4 | ||
|
|
e34e03d3a3 | ||
|
|
34374699ce | ||
|
|
cf5aa7c89f | ||
|
|
2766739512 | ||
|
|
9c84afb4b0 | ||
|
|
80fc8bd879 | ||
|
|
ca41f7f106 | ||
|
|
55ddd86ad5 | ||
|
|
b184eeedf4 | ||
|
|
4e97e85350 | ||
|
|
e5865b166e | ||
|
|
a2dabb4267 | ||
|
|
b7595b62eb | ||
|
|
02c02ca15c | ||
|
|
6da33394fe | ||
|
|
086871e21d | ||
|
|
f32830c649 | ||
|
|
edcad488ab | ||
|
|
43901c96a0 | ||
|
|
0962383b46 | ||
|
|
34cd08fd9a | ||
|
|
579b53f7ec | ||
|
|
dbb80d6320 | ||
|
|
0fbf24e82a | ||
|
|
a2493baead | ||
|
|
0dc6228a59 | ||
|
|
6e230bdb6a | ||
|
|
2aa76c8a1c | ||
|
|
7d6dc9e6dc | ||
|
|
458195a0ab | ||
|
|
52e854b8b7 | ||
|
|
0f1eec3971 | ||
|
|
f4425865c0 | ||
|
|
28c0c86c4e | ||
|
|
154e6dab15 | ||
|
|
8e895d3d07 | ||
|
|
bc09e2a394 | ||
|
|
2d0de088dd | ||
|
|
8694e7a047 | ||
|
|
9b41ba99aa |
11
.github/dependabot.yml
vendored
Normal file
11
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ['3.8', '3.9', '3.10'] # , '3.11']
|
||||
python-version: ['3.10', '3.11', '3.12', '3.13']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
@@ -35,4 +35,4 @@ jobs:
|
||||
if: github.event_name == 'pull_request'
|
||||
with:
|
||||
pytest-coverage-path: ./pytest-coverage.txt
|
||||
junitxml-path: ./pytest.xml
|
||||
junitxml-path: ./pytest.xml
|
||||
|
||||
1
.github/workflows/update-ios-data.yml
vendored
1
.github/workflows/update-ios-data.yml
vendored
@@ -21,6 +21,7 @@ jobs:
|
||||
title: '[auto] Update iOS releases and versions'
|
||||
commit-message: Add new iOS versions and build numbers
|
||||
branch: auto/add-new-ios-releases
|
||||
draft: true
|
||||
body: |
|
||||
This is an automated pull request to update the iOS releases and version numbers.
|
||||
add-paths: |
|
||||
|
||||
@@ -103,7 +103,7 @@ RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
|
||||
|
||||
|
||||
# Create main image
|
||||
FROM ubuntu:22.04 as main
|
||||
FROM ubuntu:24.04 as main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
@@ -135,8 +135,7 @@ COPY --from=build-usbmuxd /build /
|
||||
COPY . mvt/
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y git python3-pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --upgrade pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install ./mvt \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --break-system-packages ./mvt \
|
||||
&& apt-get remove -y python3-pip git && apt-get autoremove -y \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& rm -rf mvt
|
||||
|
||||
7
Makefile
7
Makefile
@@ -23,7 +23,12 @@ install:
|
||||
python3 -m pip install --upgrade -e .
|
||||
|
||||
test-requirements:
|
||||
python3 -m pip install --upgrade -r test-requirements.txt
|
||||
python3 -m pip install --upgrade --group dev
|
||||
|
||||
generate-proto-parsers:
|
||||
# Generate python parsers for protobuf files
|
||||
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
|
||||
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
|
||||
|
||||
clean:
|
||||
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info
|
||||
|
||||
43
docs/command_completion.md
Normal file
43
docs/command_completion.md
Normal file
@@ -0,0 +1,43 @@
|
||||
# Command Completion
|
||||
|
||||
MVT utilizes the [Click](https://click.palletsprojects.com/en/stable/) library for creating its command line interface.
|
||||
|
||||
Click provides tab completion support for Bash (version 4.4 and up), Zsh, and Fish.
|
||||
|
||||
To enable it, you need to manually register a special function with your shell, which varies depending on the shell you are using.
|
||||
|
||||
The following describes how to generate the command completion scripts and add them to your shell configuration.
|
||||
|
||||
> **Note: You will need to start a new shell for the changes to take effect.**
|
||||
|
||||
### For Bash
|
||||
|
||||
```bash
|
||||
# Generates bash completion scripts
|
||||
echo "$(_MVT_IOS_COMPLETE=bash_source mvt-ios)" > ~/.mvt-ios-complete.bash &&
|
||||
echo "$(_MVT_ANDROID_COMPLETE=bash_source mvt-android)" > ~/.mvt-android-complete.bash
|
||||
```
|
||||
|
||||
Add the following to `~/.bashrc`:
|
||||
```bash
|
||||
# source mvt completion scripts
|
||||
. ~/.mvt-ios-complete.bash && . ~/.mvt-android-complete.bash
|
||||
```
|
||||
|
||||
### For Zsh
|
||||
|
||||
```bash
|
||||
# Generates zsh completion scripts
|
||||
echo "$(_MVT_IOS_COMPLETE=zsh_source mvt-ios)" > ~/.mvt-ios-complete.zsh &&
|
||||
echo "$(_MVT_ANDROID_COMPLETE=zsh_source mvt-android)" > ~/.mvt-android-complete.zsh
|
||||
```
|
||||
|
||||
Add the following to `~/.zshrc`:
|
||||
```bash
|
||||
# source mvt completion scripts
|
||||
. ~/.mvt-ios-complete.zsh && . ~/.mvt-android-complete.zsh
|
||||
```
|
||||
|
||||
For more information, visit the official [Click Docs](https://click.palletsprojects.com/en/stable/shell-completion/#enabling-completion).
|
||||
|
||||
|
||||
@@ -98,3 +98,7 @@ You now should have the `mvt-ios` and `mvt-android` utilities installed.
|
||||
**Notes:**
|
||||
1. The `--force` flag is necessary to force the reinstallation of the package.
|
||||
2. To revert to using a PyPI version, it will be necessary to `pipx uninstall mvt` first.
|
||||
|
||||
## Setting up command completions
|
||||
|
||||
See ["Command completions"](command_completion.md)
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
mkdocs==1.6.1
|
||||
mkdocs-autorefs==1.2.0
|
||||
mkdocs-material==9.5.42
|
||||
mkdocs-autorefs==1.4.3
|
||||
mkdocs-material==9.6.20
|
||||
mkdocs-material-extensions==1.3.1
|
||||
mkdocstrings==0.23.0
|
||||
mkdocstrings==0.30.1
|
||||
@@ -1,13 +1,11 @@
|
||||
[project]
|
||||
name = "mvt"
|
||||
dynamic = ["version"]
|
||||
authors = [
|
||||
{name = "Claudio Guarnieri", email = "nex@nex.sx"}
|
||||
]
|
||||
authors = [{ name = "Claudio Guarnieri", email = "nex@nex.sx" }]
|
||||
maintainers = [
|
||||
{name = "Etienne Maynier", email = "tek@randhome.io"},
|
||||
{name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org"},
|
||||
{name = "Rory Flynn", email = "rory.flynn@amnesty.org"}
|
||||
{ name = "Etienne Maynier", email = "tek@randhome.io" },
|
||||
{ name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org" },
|
||||
{ name = "Rory Flynn", email = "rory.flynn@amnesty.org" },
|
||||
]
|
||||
description = "Mobile Verification Toolkit"
|
||||
readme = "README.md"
|
||||
@@ -16,44 +14,61 @@ classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Information Technology",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python"
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
dependencies = [
|
||||
"click >=8.1.3",
|
||||
"rich >=12.6.0",
|
||||
"tld >=0.12.6",
|
||||
"requests >=2.28.1",
|
||||
"simplejson >=3.17.6",
|
||||
"packaging >=21.3",
|
||||
"appdirs >=1.4.4",
|
||||
"iOSbackup >=0.9.923",
|
||||
"adb-shell[usb] >=0.4.3",
|
||||
"libusb1 >=3.0.0",
|
||||
"cryptography >=42.0.5",
|
||||
"pyyaml >=6.0",
|
||||
"pyahocorasick >= 2.0.0",
|
||||
"click==8.2.1",
|
||||
"rich==14.1.0",
|
||||
"tld==0.13.1",
|
||||
"requests==2.32.4",
|
||||
"simplejson==3.20.1",
|
||||
"packaging==25.0",
|
||||
"appdirs==1.4.4",
|
||||
"iOSbackup==0.9.925",
|
||||
"adb-shell[usb]==0.4.4",
|
||||
"libusb1==3.3.1",
|
||||
"cryptography==45.0.6",
|
||||
"PyYAML>=6.0.2",
|
||||
"pyahocorasick==2.2.0",
|
||||
"betterproto==1.2.5",
|
||||
"pydantic==2.11.7",
|
||||
"pydantic-settings==2.10.1",
|
||||
"NSKeyedUnArchiver==1.5.2",
|
||||
"python-dateutil==2.9.0.post0",
|
||||
"tzdata==2025.2",
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
[project.urls]
|
||||
homepage = "https://docs.mvt.re/en/latest/"
|
||||
repository = "https://github.com/mvt-project/mvt"
|
||||
|
||||
[project.scripts]
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"requests>=2.31.0",
|
||||
"pytest>=7.4.3",
|
||||
"pytest-cov>=4.1.0",
|
||||
"pytest-github-actions-annotate-failures>=0.2.0",
|
||||
"pytest-mock>=3.14.0",
|
||||
"stix2>=3.0.1",
|
||||
"ruff>=0.1.6",
|
||||
"mypy>=1.7.1",
|
||||
"betterproto[compiler]",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.coverage.run]
|
||||
omit = [
|
||||
"tests/*",
|
||||
]
|
||||
omit = ["tests/*"]
|
||||
|
||||
[tool.coverage.html]
|
||||
directory= "htmlcov"
|
||||
directory = "htmlcov"
|
||||
|
||||
[tool.mypy]
|
||||
install_types = true
|
||||
@@ -63,15 +78,13 @@ packages = "src"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
|
||||
testpaths = [
|
||||
"tests"
|
||||
]
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
ignore = [
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
|
||||
# These were previously ignored but don't seem to be required:
|
||||
# "E265", # no-space-after-block-comment
|
||||
@@ -83,14 +96,14 @@ ignore = [
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
|
||||
[tool.ruff.lint.mccabe]
|
||||
max-complexity = 10
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
package-dir = {"" = "src"}
|
||||
package-dir = { "" = "src" }
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
@@ -99,4 +112,4 @@ where = ["src"]
|
||||
mvt = ["ios/data/*.json"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "mvt.common.version.MVT_VERSION"}
|
||||
version = { attr = "mvt.common.version.MVT_VERSION" }
|
||||
|
||||
@@ -4,13 +4,14 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hashlib
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
|
||||
class DumpsysADBArtifact(AndroidArtifact):
|
||||
multiline_fields = ["user_keys"]
|
||||
multiline_fields = ["user_keys", "keystore"]
|
||||
|
||||
def indented_dump_parser(self, dump_data):
|
||||
"""
|
||||
@@ -67,14 +68,38 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
|
||||
return res
|
||||
|
||||
def parse_xml(self, xml_data):
|
||||
"""
|
||||
Parse XML data from dumpsys ADB output
|
||||
"""
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
keystore = []
|
||||
keystore_root = ET.fromstring(xml_data)
|
||||
for adb_key in keystore_root.findall("adbKey"):
|
||||
key_info = self.calculate_key_info(adb_key.get("key").encode("utf-8"))
|
||||
key_info["last_connected"] = adb_key.get("lastConnection")
|
||||
keystore.append(key_info)
|
||||
|
||||
return keystore
|
||||
|
||||
@staticmethod
|
||||
def calculate_key_info(user_key: bytes) -> str:
|
||||
key_base64, user = user_key.split(b" ", 1)
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
if b" " in user_key:
|
||||
key_base64, user = user_key.split(b" ", 1)
|
||||
else:
|
||||
key_base64, user = user_key, b""
|
||||
|
||||
try:
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
except binascii.Error:
|
||||
# Impossible to parse base64
|
||||
key_fingerprint_colon = ""
|
||||
|
||||
return {
|
||||
"user": user.decode("utf-8"),
|
||||
"fingerprint": key_fingerprint_colon,
|
||||
@@ -115,8 +140,24 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
if parsed.get("debugging_manager") is None:
|
||||
self.log.error("Unable to find expected ADB entries in dumpsys output") # noqa
|
||||
return
|
||||
|
||||
# Keystore can be in different levels, as the basic parser
|
||||
# is not always consistent due to different dumpsys formats.
|
||||
if parsed.get("keystore"):
|
||||
keystore_data = b"\n".join(parsed["keystore"])
|
||||
elif parsed["debugging_manager"].get("keystore"):
|
||||
keystore_data = b"\n".join(parsed["debugging_manager"]["keystore"])
|
||||
else:
|
||||
parsed = parsed["debugging_manager"]
|
||||
keystore_data = None
|
||||
|
||||
# Keystore is in XML format on some devices and we need to parse it
|
||||
if keystore_data and keystore_data.startswith(b"<?xml"):
|
||||
parsed["debugging_manager"]["keystore"] = self.parse_xml(keystore_data)
|
||||
else:
|
||||
# Keystore is not XML format
|
||||
parsed["debugging_manager"]["keystore"] = keystore_data
|
||||
|
||||
parsed = parsed["debugging_manager"]
|
||||
|
||||
# Calculate key fingerprints for better readability
|
||||
key_info = []
|
||||
|
||||
@@ -11,6 +11,10 @@ from mvt.common.utils import convert_datetime_to_iso
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
|
||||
RISKY_PERMISSIONS = ["REQUEST_INSTALL_PACKAGES"]
|
||||
RISKY_PACKAGES = ["com.android.shell"]
|
||||
|
||||
|
||||
class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
"""
|
||||
Parser for dumpsys app ops info
|
||||
@@ -45,15 +49,39 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
detected_permissions = []
|
||||
for perm in result["permissions"]:
|
||||
if (
|
||||
perm["name"] == "REQUEST_INSTALL_PACKAGES"
|
||||
and perm["access"] == "allow"
|
||||
perm["name"] in RISKY_PERMISSIONS
|
||||
# and perm["access"] == "allow"
|
||||
):
|
||||
self.log.info(
|
||||
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
|
||||
result["package_name"],
|
||||
)
|
||||
detected_permissions.append(perm)
|
||||
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
|
||||
self.log.warning(
|
||||
"Package '%s' had risky permission '%s' set to '%s' at %s",
|
||||
result["package_name"],
|
||||
perm["name"],
|
||||
entry["access"],
|
||||
entry["timestamp"],
|
||||
)
|
||||
|
||||
elif result["package_name"] in RISKY_PACKAGES:
|
||||
detected_permissions.append(perm)
|
||||
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
|
||||
self.log.warning(
|
||||
"Risky package '%s' had '%s' permission set to '%s' at %s",
|
||||
result["package_name"],
|
||||
perm["name"],
|
||||
entry["access"],
|
||||
entry["timestamp"],
|
||||
)
|
||||
|
||||
if detected_permissions:
|
||||
# We clean the result to only include the risky permission, otherwise the timeline
|
||||
# will be polluted with all the other irrelevant permissions
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = detected_permissions
|
||||
self.detected.append(cleaned_result)
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
self.results: List[Dict[str, Any]] = []
|
||||
@@ -121,11 +149,16 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
if line.startswith(" "):
|
||||
# Permission entry like:
|
||||
# Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms)
|
||||
access_type = line.split(":")[0].strip()
|
||||
if access_type not in ["Access", "Reject"]:
|
||||
# Skipping invalid access type. Some entries are not in the format we expect
|
||||
continue
|
||||
|
||||
if entry:
|
||||
perm["entries"].append(entry)
|
||||
entry = {}
|
||||
|
||||
entry["access"] = line.split(":")[0].strip()
|
||||
entry["access"] = access_type
|
||||
entry["type"] = line[line.find("[") + 1 : line.find("]")]
|
||||
|
||||
try:
|
||||
|
||||
@@ -16,8 +16,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
self.log.warning(
|
||||
"Found an installed package related to "
|
||||
'rooting/jailbreaking: "%s"',
|
||||
'Found an installed package related to rooting/jailbreaking: "%s"',
|
||||
result["package_name"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
43
src/mvt/android/artifacts/file_timestamps.py
Normal file
43
src/mvt/android/artifacts/file_timestamps.py
Normal file
@@ -0,0 +1,43 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
from typing import Union
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
|
||||
class FileTimestampsArtifact(AndroidArtifact):
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
for ts in set(
|
||||
[
|
||||
record.get("access_time"),
|
||||
record.get("changed_time"),
|
||||
record.get("modified_time"),
|
||||
]
|
||||
):
|
||||
if not ts:
|
||||
continue
|
||||
|
||||
macb = ""
|
||||
macb += "M" if ts == record.get("modified_time") else "-"
|
||||
macb += "A" if ts == record.get("access_time") else "-"
|
||||
macb += "C" if ts == record.get("changed_time") else "-"
|
||||
macb += "-"
|
||||
|
||||
msg = record["path"]
|
||||
if record.get("context"):
|
||||
msg += f" ({record['context']})"
|
||||
|
||||
records.append(
|
||||
{
|
||||
"timestamp": ts,
|
||||
"module": self.__class__.__name__,
|
||||
"event": macb,
|
||||
"data": msg,
|
||||
}
|
||||
)
|
||||
|
||||
return records
|
||||
@@ -42,6 +42,17 @@ class GetProp(AndroidArtifact):
|
||||
entry = {"name": matches[0][0], "value": matches[0][1]}
|
||||
self.results.append(entry)
|
||||
|
||||
def get_device_timezone(self) -> str:
|
||||
"""
|
||||
Get the device timezone from the getprop results
|
||||
|
||||
Used in other moduels to calculate the timezone offset
|
||||
"""
|
||||
for entry in self.results:
|
||||
if entry["name"] == "persist.sys.timezone":
|
||||
return entry["value"]
|
||||
return None
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for entry in self.results:
|
||||
if entry["name"] in INTERESTING_PROPERTIES:
|
||||
|
||||
186
src/mvt/android/artifacts/mounts.py
Normal file
186
src/mvt/android/artifacts/mounts.py
Normal file
@@ -0,0 +1,186 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
SUSPICIOUS_MOUNT_POINTS = [
|
||||
"/system",
|
||||
"/vendor",
|
||||
"/product",
|
||||
"/system_ext",
|
||||
]
|
||||
|
||||
SUSPICIOUS_OPTIONS = [
|
||||
"rw",
|
||||
"remount",
|
||||
"noatime",
|
||||
"nodiratime",
|
||||
]
|
||||
|
||||
ALLOWLIST_NOATIME = [
|
||||
"/system_dlkm",
|
||||
"/system_ext",
|
||||
"/product",
|
||||
"/vendor",
|
||||
"/vendor_dlkm",
|
||||
]
|
||||
|
||||
|
||||
class Mounts(AndroidArtifact):
|
||||
"""
|
||||
This artifact parses mount information from /proc/mounts or similar mount data.
|
||||
It can detect potentially suspicious mount configurations that may indicate
|
||||
a rooted or compromised device.
|
||||
"""
|
||||
|
||||
def parse(self, entry: str) -> None:
|
||||
"""
|
||||
Parse mount information from the provided entry.
|
||||
|
||||
Examples:
|
||||
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
|
||||
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
|
||||
"""
|
||||
self.results: list[dict[str, Any]] = []
|
||||
|
||||
for line in entry.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
device = None
|
||||
mount_point = None
|
||||
filesystem_type = None
|
||||
mount_options = ""
|
||||
|
||||
if " on " in line and " type " in line:
|
||||
try:
|
||||
# Format: device on mount_point type filesystem_type (options)
|
||||
device_part, rest = line.split(" on ", 1)
|
||||
device = device_part.strip()
|
||||
|
||||
# Split by 'type' to get mount_point and filesystem info
|
||||
mount_part, fs_part = rest.split(" type ", 1)
|
||||
mount_point = mount_part.strip()
|
||||
|
||||
# Parse filesystem and options
|
||||
if "(" in fs_part and fs_part.endswith(")"):
|
||||
# Format: filesystem_type (options)
|
||||
fs_and_opts = fs_part.strip()
|
||||
paren_idx = fs_and_opts.find("(")
|
||||
filesystem_type = fs_and_opts[:paren_idx].strip()
|
||||
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
|
||||
else:
|
||||
# No options in parentheses, just filesystem type
|
||||
filesystem_type = fs_part.strip()
|
||||
mount_options = ""
|
||||
|
||||
# Skip if we don't have essential info
|
||||
if not device or not mount_point or not filesystem_type:
|
||||
continue
|
||||
|
||||
# Parse options into list
|
||||
options_list = (
|
||||
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
|
||||
if mount_options
|
||||
else []
|
||||
)
|
||||
|
||||
# Check if it's a system partition
|
||||
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
|
||||
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
|
||||
)
|
||||
|
||||
# Check if it's mounted read-write
|
||||
is_read_write = "rw" in options_list
|
||||
|
||||
mount_entry = {
|
||||
"device": device,
|
||||
"mount_point": mount_point,
|
||||
"filesystem_type": filesystem_type,
|
||||
"mount_options": mount_options,
|
||||
"options_list": options_list,
|
||||
"is_system_partition": is_system_partition,
|
||||
"is_read_write": is_read_write,
|
||||
}
|
||||
|
||||
self.results.append(mount_entry)
|
||||
|
||||
except ValueError:
|
||||
# If parsing fails, skip this line
|
||||
continue
|
||||
else:
|
||||
# Skip lines that don't match expected format
|
||||
continue
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""
|
||||
Check for suspicious mount configurations that may indicate root access
|
||||
or other security concerns.
|
||||
"""
|
||||
system_rw_mounts = []
|
||||
suspicious_mounts = []
|
||||
|
||||
for mount in self.results:
|
||||
mount_point = mount["mount_point"]
|
||||
options = mount["options_list"]
|
||||
|
||||
# Check for system partitions mounted as read-write
|
||||
if mount["is_system_partition"] and mount["is_read_write"]:
|
||||
system_rw_mounts.append(mount)
|
||||
if mount_point == "/system":
|
||||
self.log.warning(
|
||||
"Root detected /system partition is mounted as read-write (rw). "
|
||||
)
|
||||
else:
|
||||
self.log.warning(
|
||||
"System partition %s is mounted as read-write (rw). This may indicate system modifications.",
|
||||
mount_point,
|
||||
)
|
||||
|
||||
# Check for other suspicious mount options
|
||||
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
|
||||
if suspicious_opts and mount["is_system_partition"]:
|
||||
if (
|
||||
"noatime" in mount["mount_options"]
|
||||
and mount["mount_point"] in ALLOWLIST_NOATIME
|
||||
):
|
||||
continue
|
||||
suspicious_mounts.append(mount)
|
||||
self.log.warning(
|
||||
"Suspicious mount options found for %s: %s",
|
||||
mount_point,
|
||||
", ".join(suspicious_opts),
|
||||
)
|
||||
|
||||
# Log interesting mount information
|
||||
if mount_point == "/data" or mount_point.startswith("/sdcard"):
|
||||
self.log.info(
|
||||
"Data partition: %s mounted as %s with options: %s",
|
||||
mount_point,
|
||||
mount["filesystem_type"],
|
||||
mount["mount_options"],
|
||||
)
|
||||
|
||||
self.log.info("Parsed %d mount entries", len(self.results))
|
||||
|
||||
# Check indicators if available
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for mount in self.results:
|
||||
# Check if any mount points match indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
|
||||
if ioc:
|
||||
mount["matched_indicator"] = ioc
|
||||
self.detected.append(mount)
|
||||
|
||||
# Check device paths for indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("device", ""))
|
||||
if ioc:
|
||||
mount["matched_indicator"] = ioc
|
||||
self.detected.append(mount)
|
||||
@@ -16,6 +16,11 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
"key": "package_verifier_enable",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled APK package verification",
|
||||
"key": "package_verifier_state",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled Google Play Protect",
|
||||
"key": "package_verifier_user_consent",
|
||||
@@ -46,11 +51,6 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
"key": "send_action_app_error",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "enabled installation of non Google Play apps",
|
||||
"key": "install_non_market_apps",
|
||||
"safe_value": "0",
|
||||
},
|
||||
{
|
||||
"description": "enabled accessibility services",
|
||||
"key": "accessibility_enabled",
|
||||
|
||||
268
src/mvt/android/artifacts/tombstone_crashes.py
Normal file
268
src/mvt/android/artifacts/tombstone_crashes.py
Normal file
@@ -0,0 +1,268 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import datetime
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import pydantic
|
||||
import betterproto
|
||||
from dateutil import parser
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.android.parsers.proto.tombstone import Tombstone
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
|
||||
TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
|
||||
|
||||
# Map the legacy crash file keys to the new format.
|
||||
TOMBSTONE_TEXT_KEY_MAPPINGS = {
|
||||
"Build fingerprint": "build_fingerprint",
|
||||
"Revision": "revision",
|
||||
"ABI": "arch",
|
||||
"Timestamp": "timestamp",
|
||||
"Process uptime": "process_uptime",
|
||||
"Cmdline": "command_line",
|
||||
"pid": "pid",
|
||||
"tid": "tid",
|
||||
"name": "process_name",
|
||||
"binary_path": "binary_path",
|
||||
"uid": "uid",
|
||||
"signal": "signal_info",
|
||||
"code": "code",
|
||||
"Cause": "cause",
|
||||
}
|
||||
|
||||
|
||||
class SignalInfo(pydantic.BaseModel):
|
||||
code: int
|
||||
code_name: str
|
||||
name: str
|
||||
number: Optional[int] = None
|
||||
|
||||
|
||||
class TombstoneCrashResult(pydantic.BaseModel):
|
||||
"""
|
||||
MVT Result model for a tombstone crash result.
|
||||
|
||||
Needed for validation and serialization, and consistency between text and protobuf tombstones.
|
||||
"""
|
||||
|
||||
file_name: str
|
||||
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
build_fingerprint: str
|
||||
revision: str
|
||||
arch: Optional[str] = None
|
||||
timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
process_uptime: Optional[int] = None
|
||||
command_line: Optional[List[str]] = None
|
||||
pid: int
|
||||
tid: int
|
||||
process_name: Optional[str] = None
|
||||
binary_path: Optional[str] = None
|
||||
selinux_label: Optional[str] = None
|
||||
uid: int
|
||||
signal_info: SignalInfo
|
||||
cause: Optional[str] = None
|
||||
extra: Optional[str] = None
|
||||
|
||||
|
||||
class TombstoneCrashArtifact(AndroidArtifact):
|
||||
"""
|
||||
Parser for Android tombstone crash files.
|
||||
|
||||
This parser can parse both text and protobuf tombstone crash files.
|
||||
"""
|
||||
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": "Tombstone",
|
||||
"data": (
|
||||
f"Crash in '{record['process_name']}' process running as UID '{record['uid']}' in file '{record['file_name']}' "
|
||||
f"Crash type '{record['signal_info']['name']}' with code '{record['signal_info']['code_name']}'"
|
||||
),
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_process(result["process_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if result.get("command_line", []):
|
||||
command_name = result.get("command_line")[0].split("/")[-1]
|
||||
ioc = self.indicators.check_process(command_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
SUSPICIOUS_UIDS = [
|
||||
0, # root
|
||||
1000, # system
|
||||
2000, # shell
|
||||
]
|
||||
if result["uid"] in SUSPICIOUS_UIDS:
|
||||
self.log.warning(
|
||||
f"Potentially suspicious crash in process '{result['process_name']}' "
|
||||
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
def parse_protobuf(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
|
||||
) -> None:
|
||||
"""Parse Android tombstone crash files from a protobuf object."""
|
||||
tombstone_pb = Tombstone().parse(data)
|
||||
tombstone_dict = tombstone_pb.to_dict(
|
||||
betterproto.Casing.SNAKE, include_default_values=True
|
||||
)
|
||||
|
||||
# Add some extra metadata
|
||||
tombstone_dict["timestamp"] = self._parse_timestamp_string(
|
||||
tombstone_pb.timestamp
|
||||
)
|
||||
tombstone_dict["file_name"] = file_name
|
||||
tombstone_dict["file_timestamp"] = convert_datetime_to_iso(file_timestamp)
|
||||
tombstone_dict["process_name"] = self._proccess_name_from_thread(tombstone_dict)
|
||||
|
||||
# Confirm the tombstone is valid, and matches the output model
|
||||
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
||||
self.results.append(tombstone.model_dump())
|
||||
|
||||
def parse(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
|
||||
) -> None:
|
||||
"""Parse text Android tombstone crash files."""
|
||||
tombstone_dict = {
|
||||
"file_name": file_name,
|
||||
"file_timestamp": convert_datetime_to_iso(file_timestamp),
|
||||
}
|
||||
lines = content.decode("utf-8").splitlines()
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
if not line.strip() or TOMBSTONE_DELIMITER in line:
|
||||
continue
|
||||
try:
|
||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||
if self._parse_tombstone_line(
|
||||
line, key, destination_key, tombstone_dict
|
||||
):
|
||||
break
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
|
||||
|
||||
# Validate the tombstone and add it to the results
|
||||
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
||||
self.results.append(tombstone.model_dump())
|
||||
|
||||
def _parse_tombstone_line(
|
||||
self, line: str, key: str, destination_key: str, tombstone: dict
|
||||
) -> bool:
|
||||
if not line.startswith(f"{key}"):
|
||||
return False
|
||||
|
||||
if key == "pid":
|
||||
return self._load_pid_line(line, tombstone)
|
||||
elif key == "signal":
|
||||
return self._load_signal_line(line, tombstone)
|
||||
elif key == "Timestamp":
|
||||
return self._load_timestamp_line(line, tombstone)
|
||||
else:
|
||||
return self._load_key_value_line(line, key, destination_key, tombstone)
|
||||
|
||||
def _load_key_value_line(
|
||||
self, line: str, key: str, destination_key: str, tombstone: dict
|
||||
) -> bool:
|
||||
line_key, value = line.split(":", 1)
|
||||
if line_key != key:
|
||||
raise ValueError(f"Expected key {key}, got {line_key}")
|
||||
|
||||
value_clean = value.strip().strip("'")
|
||||
if destination_key == "uid":
|
||||
tombstone[destination_key] = int(value_clean)
|
||||
elif destination_key == "process_uptime":
|
||||
# eg. "Process uptime: 40s"
|
||||
tombstone[destination_key] = int(value_clean.rstrip("s"))
|
||||
elif destination_key == "command_line":
|
||||
# XXX: Check if command line should be a single string in a list, or a list of strings.
|
||||
tombstone[destination_key] = [value_clean]
|
||||
else:
|
||||
tombstone[destination_key] = value_clean
|
||||
return True
|
||||
|
||||
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
|
||||
try:
|
||||
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
|
||||
process_info = parts[0]
|
||||
|
||||
# Parse pid, tid, name from process info
|
||||
info_parts = [p.strip() for p in process_info.split(",")]
|
||||
for info in info_parts:
|
||||
key, value = info.split(":", 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
if key == "pid":
|
||||
tombstone["pid"] = int(value)
|
||||
elif key == "tid":
|
||||
tombstone["tid"] = int(value)
|
||||
elif key == "name":
|
||||
tombstone["process_name"] = value
|
||||
|
||||
# Extract binary path if it exists
|
||||
if len(parts) > 1:
|
||||
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to parse PID line: {str(e)}")
|
||||
|
||||
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
|
||||
signal_part, code_part = map(str.strip, line.split(",")[:2])
|
||||
|
||||
def parse_part(part: str, prefix: str) -> tuple[int, str]:
|
||||
match = part.split(prefix)[1]
|
||||
number = int(match.split()[0])
|
||||
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
|
||||
return number, name
|
||||
|
||||
signal_number, signal_name = parse_part(signal_part, "signal ")
|
||||
code_number, code_name = parse_part(code_part, "code ")
|
||||
|
||||
tombstone["signal_info"] = {
|
||||
"code": code_number,
|
||||
"code_name": code_name,
|
||||
"name": signal_name,
|
||||
"number": signal_number,
|
||||
}
|
||||
return True
|
||||
|
||||
def _load_timestamp_line(self, line: str, tombstone: dict) -> bool:
|
||||
timestamp = line.split(":", 1)[1].strip()
|
||||
tombstone["timestamp"] = self._parse_timestamp_string(timestamp)
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _parse_timestamp_string(timestamp: str) -> str:
|
||||
timestamp_parsed = parser.parse(timestamp)
|
||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||
return convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
@staticmethod
|
||||
def _proccess_name_from_thread(tombstone_dict: dict) -> str:
|
||||
if tombstone_dict.get("threads"):
|
||||
for thread in tombstone_dict["threads"].values():
|
||||
if thread.get("id") == tombstone_dict["tid"] and thread.get("name"):
|
||||
return thread["name"]
|
||||
return "Unknown"
|
||||
@@ -31,10 +31,12 @@ from mvt.common.help import (
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
from mvt.common.utils import init_logging, set_verbose_logging
|
||||
from mvt.common.utils import init_logging, set_verbose_logging, CommandWrapperGroup
|
||||
|
||||
from .cmd_check_adb import CmdAndroidCheckADB
|
||||
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
|
||||
@@ -53,12 +55,37 @@ log = logging.getLogger("mvt")
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.group(invoke_without_command=False, cls=CommandWrapperGroup)
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -166,6 +193,8 @@ def check_adb(
|
||||
module_name=module,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -212,6 +241,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
hashes=True,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -274,6 +305,8 @@ def check_backup(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -338,6 +371,8 @@ def check_androidqf(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -372,7 +407,13 @@ def check_androidqf(
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + ADB_MODULES + BUGREPORT_MODULES
|
||||
|
||||
if list_modules:
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.adb import ADB_MODULES
|
||||
|
||||
@@ -19,18 +20,28 @@ class CmdAndroidCheckADB(Command):
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-adb"
|
||||
|
||||
@@ -9,59 +9,186 @@ import zipfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
from .modules.androidqf.base import AndroidQFModule
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class NoAndroidQFTargetPath(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoAndroidQFBugReport(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class NoAndroidQFBackup(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class CmdAndroidCheckAndroidQF(Command):
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-androidqf"
|
||||
self.modules = ANDROIDQF_MODULES
|
||||
|
||||
self.format: Optional[str] = None
|
||||
self.archive: Optional[zipfile.ZipFile] = None
|
||||
self.files: List[str] = []
|
||||
self.__format: Optional[str] = None
|
||||
self.__zip: Optional[zipfile.ZipFile] = None
|
||||
self.__files: List[str] = []
|
||||
|
||||
def init(self):
|
||||
if os.path.isdir(self.target_path):
|
||||
self.format = "dir"
|
||||
self.__format = "dir"
|
||||
parent_path = Path(self.target_path).absolute().parent.as_posix()
|
||||
target_abs_path = os.path.abspath(self.target_path)
|
||||
for root, subdirs, subfiles in os.walk(target_abs_path):
|
||||
for fname in subfiles:
|
||||
file_path = os.path.relpath(os.path.join(root, fname), parent_path)
|
||||
self.files.append(file_path)
|
||||
self.__files.append(file_path)
|
||||
elif os.path.isfile(self.target_path):
|
||||
self.format = "zip"
|
||||
self.archive = zipfile.ZipFile(self.target_path)
|
||||
self.files = self.archive.namelist()
|
||||
self.__format = "zip"
|
||||
self.__zip = zipfile.ZipFile(self.target_path)
|
||||
self.__files = self.__zip.namelist()
|
||||
|
||||
def module_init(self, module):
|
||||
if self.format == "zip":
|
||||
module.from_zip_file(self.archive, self.files)
|
||||
def module_init(self, module: AndroidQFModule) -> None: # type: ignore[override]
|
||||
if self.__format == "zip" and self.__zip:
|
||||
module.from_zip(self.__zip, self.__files)
|
||||
return
|
||||
|
||||
if not self.target_path:
|
||||
raise NoAndroidQFTargetPath
|
||||
|
||||
parent_path = Path(self.target_path).absolute().parent.as_posix()
|
||||
module.from_dir(parent_path, self.__files)
|
||||
|
||||
def load_bugreport(self) -> zipfile.ZipFile:
|
||||
bugreport_zip_path = None
|
||||
for file_name in self.__files:
|
||||
if file_name.endswith("bugreport.zip"):
|
||||
bugreport_zip_path = file_name
|
||||
break
|
||||
else:
|
||||
raise NoAndroidQFBugReport
|
||||
|
||||
if self.__format == "zip" and self.__zip:
|
||||
handle = self.__zip.open(bugreport_zip_path)
|
||||
return zipfile.ZipFile(handle)
|
||||
|
||||
if self.__format == "dir" and self.target_path:
|
||||
parent_path = Path(self.target_path).absolute().parent.as_posix()
|
||||
module.from_folder(parent_path, self.files)
|
||||
bug_report_path = os.path.join(parent_path, bugreport_zip_path)
|
||||
return zipfile.ZipFile(bug_report_path)
|
||||
|
||||
raise NoAndroidQFBugReport
|
||||
|
||||
def load_backup(self) -> bytes:
|
||||
backup_ab_path = None
|
||||
for file_name in self.__files:
|
||||
if file_name.endswith("backup.ab"):
|
||||
backup_ab_path = file_name
|
||||
break
|
||||
else:
|
||||
raise NoAndroidQFBackup
|
||||
|
||||
if self.__format == "zip" and self.__zip:
|
||||
backup_file_handle = self.__zip.open(backup_ab_path)
|
||||
return backup_file_handle.read()
|
||||
|
||||
if self.__format == "dir" and self.target_path:
|
||||
parent_path = Path(self.target_path).absolute().parent.as_posix()
|
||||
backup_path = os.path.join(parent_path, backup_ab_path)
|
||||
with open(backup_path, "rb") as backup_file:
|
||||
backup_ab_data = backup_file.read()
|
||||
return backup_ab_data
|
||||
|
||||
raise NoAndroidQFBackup
|
||||
|
||||
def run_bugreport_cmd(self) -> bool:
|
||||
try:
|
||||
bugreport = self.load_bugreport()
|
||||
except NoAndroidQFBugReport:
|
||||
self.log.warning(
|
||||
"Skipping bugreport modules as no bugreport.zip found in AndroidQF data."
|
||||
)
|
||||
return False
|
||||
else:
|
||||
cmd = CmdAndroidCheckBugreport(
|
||||
target_path=None,
|
||||
results_path=self.results_path,
|
||||
ioc_files=self.ioc_files,
|
||||
iocs=self.iocs,
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
)
|
||||
cmd.from_zip(bugreport)
|
||||
cmd.run()
|
||||
|
||||
self.detected_count += cmd.detected_count
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.timeline_detected.extend(cmd.timeline_detected)
|
||||
|
||||
def run_backup_cmd(self) -> bool:
|
||||
try:
|
||||
backup = self.load_backup()
|
||||
except NoAndroidQFBackup:
|
||||
self.log.warning(
|
||||
"Skipping backup modules as no backup.ab found in AndroidQF data."
|
||||
)
|
||||
return False
|
||||
else:
|
||||
cmd = CmdAndroidCheckBackup(
|
||||
target_path=None,
|
||||
results_path=self.results_path,
|
||||
ioc_files=self.ioc_files,
|
||||
iocs=self.iocs,
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
)
|
||||
cmd.from_ab(backup)
|
||||
cmd.run()
|
||||
|
||||
self.detected_count += cmd.detected_count
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.timeline_detected.extend(cmd.timeline_detected)
|
||||
|
||||
def finish(self) -> None:
|
||||
"""
|
||||
Run the bugreport and backup modules if the respective files are found in the AndroidQF data.
|
||||
"""
|
||||
self.run_bugreport_cmd()
|
||||
self.run_backup_cmd()
|
||||
|
||||
@@ -20,6 +20,7 @@ from mvt.android.parsers.backup import (
|
||||
parse_backup_file,
|
||||
)
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
|
||||
@@ -32,20 +33,28 @@ class CmdAndroidCheckBackup(Command):
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
@@ -55,6 +64,34 @@ class CmdAndroidCheckBackup(Command):
|
||||
self.backup_archive: Optional[tarfile.TarFile] = None
|
||||
self.backup_files: List[str] = []
|
||||
|
||||
def from_ab(self, ab_file_bytes: bytes) -> None:
|
||||
self.backup_type = "ab"
|
||||
header = parse_ab_header(ab_file_bytes)
|
||||
if not header["backup"]:
|
||||
log.critical("Invalid backup format, file should be in .ab format")
|
||||
sys.exit(1)
|
||||
|
||||
password = None
|
||||
if header["encryption"] != "none":
|
||||
password = prompt_or_load_android_backup_password(log, self.module_options)
|
||||
if not password:
|
||||
log.critical("No backup password provided.")
|
||||
sys.exit(1)
|
||||
try:
|
||||
tardata = parse_backup_file(ab_file_bytes, password=password)
|
||||
except InvalidBackupPassword:
|
||||
log.critical("Invalid backup password")
|
||||
sys.exit(1)
|
||||
except AndroidBackupParsingError as exc:
|
||||
log.critical("Impossible to parse this backup file: %s", exc)
|
||||
log.critical("Please use Android Backup Extractor (ABE) instead")
|
||||
sys.exit(1)
|
||||
|
||||
dbytes = io.BytesIO(tardata)
|
||||
self.backup_archive = tarfile.open(fileobj=dbytes)
|
||||
for member in self.backup_archive:
|
||||
self.backup_files.append(member.name)
|
||||
|
||||
def init(self) -> None:
|
||||
if not self.target_path:
|
||||
return
|
||||
@@ -62,35 +99,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
if os.path.isfile(self.target_path):
|
||||
self.backup_type = "ab"
|
||||
with open(self.target_path, "rb") as handle:
|
||||
data = handle.read()
|
||||
|
||||
header = parse_ab_header(data)
|
||||
if not header["backup"]:
|
||||
log.critical("Invalid backup format, file should be in .ab format")
|
||||
sys.exit(1)
|
||||
|
||||
password = None
|
||||
if header["encryption"] != "none":
|
||||
password = prompt_or_load_android_backup_password(
|
||||
log, self.module_options
|
||||
)
|
||||
if not password:
|
||||
log.critical("No backup password provided.")
|
||||
sys.exit(1)
|
||||
try:
|
||||
tardata = parse_backup_file(data, password=password)
|
||||
except InvalidBackupPassword:
|
||||
log.critical("Invalid backup password")
|
||||
sys.exit(1)
|
||||
except AndroidBackupParsingError as exc:
|
||||
log.critical("Impossible to parse this backup file: %s", exc)
|
||||
log.critical("Please use Android Backup Extractor (ABE) instead")
|
||||
sys.exit(1)
|
||||
|
||||
dbytes = io.BytesIO(tardata)
|
||||
self.backup_archive = tarfile.open(fileobj=dbytes)
|
||||
for member in self.backup_archive:
|
||||
self.backup_files.append(member.name)
|
||||
ab_file_bytes = handle.read()
|
||||
self.from_ab(ab_file_bytes)
|
||||
|
||||
elif os.path.isdir(self.target_path):
|
||||
self.backup_type = "folder"
|
||||
@@ -109,6 +119,6 @@ class CmdAndroidCheckBackup(Command):
|
||||
|
||||
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
|
||||
if self.backup_type == "folder":
|
||||
module.from_folder(self.target_path, self.backup_files)
|
||||
module.from_dir(self.target_path, self.backup_files)
|
||||
else:
|
||||
module.from_ab(self.target_path, self.backup_archive, self.backup_files)
|
||||
|
||||
@@ -11,6 +11,7 @@ from zipfile import ZipFile
|
||||
|
||||
from mvt.android.modules.bugreport.base import BugReportModule
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.bugreport import BUGREPORT_MODULES
|
||||
|
||||
@@ -23,54 +24,80 @@ class CmdAndroidCheckBugreport(Command):
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-bugreport"
|
||||
self.modules = BUGREPORT_MODULES
|
||||
|
||||
self.bugreport_format: str = ""
|
||||
self.bugreport_archive: Optional[ZipFile] = None
|
||||
self.bugreport_files: List[str] = []
|
||||
self.__format: str = ""
|
||||
self.__zip: Optional[ZipFile] = None
|
||||
self.__files: List[str] = []
|
||||
|
||||
def from_dir(self, dir_path: str) -> None:
|
||||
"""This method is used to initialize the bug report analysis from an
|
||||
uncompressed directory.
|
||||
"""
|
||||
self.__format = "dir"
|
||||
self.target_path = dir_path
|
||||
parent_path = Path(dir_path).absolute().as_posix()
|
||||
for root, _, subfiles in os.walk(os.path.abspath(dir_path)):
|
||||
for file_name in subfiles:
|
||||
file_path = os.path.relpath(os.path.join(root, file_name), parent_path)
|
||||
self.__files.append(file_path)
|
||||
|
||||
def from_zip(self, bugreport_zip: ZipFile) -> None:
|
||||
"""This method is used to initialize the bug report analysis from a
|
||||
compressed archive.
|
||||
"""
|
||||
# NOTE: This will be invoked either by the CLI directly,or by the
|
||||
# check-androidqf command. We need this because we want to support
|
||||
# check-androidqf to analyse compressed archives itself too.
|
||||
# So, we'll need to extract bugreport.zip from a 'androidqf.zip', and
|
||||
# since nothing is written on disk, we need to be able to pass this
|
||||
# command a ZipFile instance in memory.
|
||||
|
||||
self.__format = "zip"
|
||||
self.__zip = bugreport_zip
|
||||
for file_name in self.__zip.namelist():
|
||||
self.__files.append(file_name)
|
||||
|
||||
def init(self) -> None:
|
||||
if not self.target_path:
|
||||
return
|
||||
|
||||
if os.path.isfile(self.target_path):
|
||||
self.bugreport_format = "zip"
|
||||
self.bugreport_archive = ZipFile(self.target_path)
|
||||
for file_name in self.bugreport_archive.namelist():
|
||||
self.bugreport_files.append(file_name)
|
||||
self.from_zip(ZipFile(self.target_path))
|
||||
elif os.path.isdir(self.target_path):
|
||||
self.bugreport_format = "dir"
|
||||
parent_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for file_name in subfiles:
|
||||
file_path = os.path.relpath(
|
||||
os.path.join(root, file_name), parent_path
|
||||
)
|
||||
self.bugreport_files.append(file_path)
|
||||
self.from_dir(self.target_path)
|
||||
|
||||
def module_init(self, module: BugReportModule) -> None: # type: ignore[override]
|
||||
if self.bugreport_format == "zip":
|
||||
module.from_zip(self.bugreport_archive, self.bugreport_files)
|
||||
if self.__format == "zip":
|
||||
module.from_zip(self.__zip, self.__files)
|
||||
else:
|
||||
module.from_folder(self.target_path, self.bugreport_files)
|
||||
module.from_dir(self.target_path, self.__files)
|
||||
|
||||
def finish(self) -> None:
|
||||
if self.bugreport_archive:
|
||||
self.bugreport_archive.close()
|
||||
if self.__zip:
|
||||
self.__zip.close()
|
||||
|
||||
@@ -4,15 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .chrome_history import ChromeHistory
|
||||
from .dumpsys_accessibility import DumpsysAccessibility
|
||||
from .dumpsys_activities import DumpsysActivities
|
||||
from .dumpsys_appops import DumpsysAppOps
|
||||
from .dumpsys_battery_daily import DumpsysBatteryDaily
|
||||
from .dumpsys_battery_history import DumpsysBatteryHistory
|
||||
from .dumpsys_dbinfo import DumpsysDBInfo
|
||||
from .dumpsys_adbstate import DumpsysADBState
|
||||
from .dumpsys_full import DumpsysFull
|
||||
from .dumpsys_receivers import DumpsysReceivers
|
||||
from .files import Files
|
||||
from .getprop import Getprop
|
||||
from .logcat import Logcat
|
||||
@@ -32,15 +24,7 @@ ADB_MODULES = [
|
||||
Getprop,
|
||||
Settings,
|
||||
SELinuxStatus,
|
||||
DumpsysBatteryHistory,
|
||||
DumpsysBatteryDaily,
|
||||
DumpsysReceivers,
|
||||
DumpsysActivities,
|
||||
DumpsysAccessibility,
|
||||
DumpsysDBInfo,
|
||||
DumpsysADBState,
|
||||
DumpsysFull,
|
||||
DumpsysAppOps,
|
||||
Packages,
|
||||
Logcat,
|
||||
RootBinaries,
|
||||
|
||||
@@ -326,8 +326,7 @@ class AndroidExtraction(MVTModule):
|
||||
|
||||
if not header["backup"]:
|
||||
self.log.error(
|
||||
"Extracting SMS via Android backup failed. "
|
||||
"No valid backup data found."
|
||||
"Extracting SMS via Android backup failed. No valid backup data found."
|
||||
)
|
||||
return None
|
||||
|
||||
|
||||
@@ -1,49 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidExtraction):
|
||||
"""This module extracts stats on accessibility."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys accessibility")
|
||||
self._adb_disconnect()
|
||||
|
||||
self.parse(output)
|
||||
|
||||
for result in self.results:
|
||||
self.log.info(
|
||||
'Found installed accessibility service "%s"', result.get("service")
|
||||
)
|
||||
|
||||
self.log.info(
|
||||
"Identified a total of %d accessibility services", len(self.results)
|
||||
)
|
||||
@@ -1,45 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_package_activities import (
|
||||
DumpsysPackageActivitiesArtifact,
|
||||
)
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysActivities(DumpsysPackageActivitiesArtifact, AndroidExtraction):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = results if results else []
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys package")
|
||||
self._adb_disconnect()
|
||||
self.parse(output)
|
||||
|
||||
self.log.info("Extracted %d package activities", len(self.results))
|
||||
@@ -1,45 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysADBState(DumpsysADBArtifact, AndroidExtraction):
|
||||
"""This module extracts ADB keystore state."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys adb", decode=False)
|
||||
self._adb_disconnect()
|
||||
|
||||
self.parse(output)
|
||||
if self.results:
|
||||
self.log.info(
|
||||
"Identified a total of %d trusted ADB keys",
|
||||
len(self.results[0].get("user_keys", [])),
|
||||
)
|
||||
@@ -1,46 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysAppOps(DumpsysAppopsArtifact, AndroidExtraction):
|
||||
"""This module extracts records from App-op Manager."""
|
||||
|
||||
slug = "dumpsys_appops"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys appops")
|
||||
self._adb_disconnect()
|
||||
|
||||
self.parse(output)
|
||||
|
||||
self.log.info(
|
||||
"Extracted a total of %d records from app-ops manager", len(self.results)
|
||||
)
|
||||
@@ -1,44 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, AndroidExtraction):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys batterystats --daily")
|
||||
self._adb_disconnect()
|
||||
|
||||
self.parse(output)
|
||||
|
||||
self.log.info(
|
||||
"Extracted %d records from battery daily stats", len(self.results)
|
||||
)
|
||||
@@ -1,42 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, AndroidExtraction):
|
||||
"""This module extracts records from battery history events."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys batterystats --history")
|
||||
self._adb_disconnect()
|
||||
|
||||
self.parse(output)
|
||||
|
||||
self.log.info("Extracted %d records from battery history", len(self.results))
|
||||
@@ -1,44 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class DumpsysReceivers(DumpsysReceiversArtifact, AndroidExtraction):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = results if results else {}
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
output = self._adb_command("dumpsys package")
|
||||
self.parse(output)
|
||||
|
||||
self._adb_disconnect()
|
||||
self.log.info("Extracted receivers for %d intents", len(self.results))
|
||||
@@ -75,8 +75,7 @@ class Packages(AndroidExtraction):
|
||||
for result in self.results:
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
self.log.warning(
|
||||
"Found an installed package related to "
|
||||
'rooting/jailbreaking: "%s"',
|
||||
'Found an installed package related to rooting/jailbreaking: "%s"',
|
||||
result["package_name"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
@@ -108,8 +107,7 @@ class Packages(AndroidExtraction):
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
@staticmethod
|
||||
def check_virustotal(packages: list) -> None:
|
||||
def check_virustotal(self, packages: list) -> None:
|
||||
hashes = []
|
||||
for package in packages:
|
||||
for file in package.get("files", []):
|
||||
@@ -144,8 +142,15 @@ class Packages(AndroidExtraction):
|
||||
|
||||
for package in packages:
|
||||
for file in package.get("files", []):
|
||||
row = [package["package_name"], file["path"]]
|
||||
|
||||
if "package_name" in package:
|
||||
row = [package["package_name"], file["path"]]
|
||||
elif "name" in package:
|
||||
row = [package["name"], file["path"]]
|
||||
else:
|
||||
self.log.error(
|
||||
f"Package {package} has no name or package_name. packages.json or apks.json is malformed"
|
||||
)
|
||||
continue
|
||||
if file["sha256"] in detections:
|
||||
detection = detections[file["sha256"]]
|
||||
positives = detection.split("/")[0]
|
||||
|
||||
@@ -70,7 +70,7 @@ class SMS(AndroidExtraction):
|
||||
"timestamp": record["isodate"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": f"sms_{record['direction']}",
|
||||
"data": f"{record.get('address', 'unknown source')}: \"{body}\"",
|
||||
"data": f'{record.get("address", "unknown source")}: "{body}"',
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
|
||||
@@ -3,38 +3,22 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .dumpsys_accessibility import DumpsysAccessibility
|
||||
from .dumpsys_activities import DumpsysActivities
|
||||
from .dumpsys_appops import DumpsysAppops
|
||||
from .dumpsys_battery_daily import DumpsysBatteryDaily
|
||||
from .dumpsys_battery_history import DumpsysBatteryHistory
|
||||
from .dumpsys_dbinfo import DumpsysDBInfo
|
||||
from .dumpsys_packages import DumpsysPackages
|
||||
from .dumpsys_receivers import DumpsysReceivers
|
||||
from .dumpsys_adb import DumpsysADBState
|
||||
from .getprop import Getprop
|
||||
from .packages import Packages
|
||||
from .dumpsys_platform_compat import DumpsysPlatformCompat
|
||||
from .processes import Processes
|
||||
from .settings import Settings
|
||||
from .aqf_files import AQFFiles
|
||||
from .aqf_getprop import AQFGetProp
|
||||
from .aqf_packages import AQFPackages
|
||||
from .aqf_processes import AQFProcesses
|
||||
from .aqf_settings import AQFSettings
|
||||
from .mounts import Mounts
|
||||
from .root_binaries import RootBinaries
|
||||
from .sms import SMS
|
||||
from .files import Files
|
||||
|
||||
ANDROIDQF_MODULES = [
|
||||
DumpsysActivities,
|
||||
DumpsysReceivers,
|
||||
DumpsysAccessibility,
|
||||
DumpsysAppops,
|
||||
DumpsysDBInfo,
|
||||
DumpsysBatteryDaily,
|
||||
DumpsysBatteryHistory,
|
||||
DumpsysADBState,
|
||||
Packages,
|
||||
DumpsysPlatformCompat,
|
||||
Processes,
|
||||
Getprop,
|
||||
Settings,
|
||||
AQFPackages,
|
||||
AQFProcesses,
|
||||
AQFGetProp,
|
||||
AQFSettings,
|
||||
AQFFiles,
|
||||
SMS,
|
||||
DumpsysPackages,
|
||||
Files,
|
||||
RootBinaries,
|
||||
Mounts,
|
||||
]
|
||||
|
||||
@@ -6,6 +6,11 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
|
||||
try:
|
||||
import zoneinfo
|
||||
except ImportError:
|
||||
from backports import zoneinfo
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.modules.androidqf.base import AndroidQFModule
|
||||
@@ -16,8 +21,13 @@ SUSPICIOUS_PATHS = [
|
||||
]
|
||||
|
||||
|
||||
class Files(AndroidQFModule):
|
||||
"""This module analyse list of files"""
|
||||
class AQFFiles(AndroidQFModule):
|
||||
"""
|
||||
This module analyzes the files.json dump generated by AndroidQF.
|
||||
|
||||
The format needs to be kept in sync with the AndroidQF module code.
|
||||
https://github.com/mvt-project/androidqf/blob/main/android-collector/cmd/find.go#L28
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -106,6 +116,12 @@ class Files(AndroidQFModule):
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
|
||||
def run(self) -> None:
|
||||
if timezone := self._get_device_timezone():
|
||||
device_timezone = zoneinfo.ZoneInfo(timezone)
|
||||
else:
|
||||
self.log.warning("Unable to determine device timezone, using UTC")
|
||||
device_timezone = zoneinfo.ZoneInfo("UTC")
|
||||
|
||||
for file in self._get_files_by_pattern("*/files.json"):
|
||||
rawdata = self._get_file_content(file).decode("utf-8", errors="ignore")
|
||||
try:
|
||||
@@ -120,11 +136,18 @@ class Files(AndroidQFModule):
|
||||
for file_data in data:
|
||||
for ts in ["access_time", "changed_time", "modified_time"]:
|
||||
if ts in file_data:
|
||||
file_data[ts] = convert_datetime_to_iso(
|
||||
datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
utc_timestamp = datetime.datetime.fromtimestamp(
|
||||
file_data[ts], tz=datetime.timezone.utc
|
||||
)
|
||||
# Convert the UTC timestamp to local tiem on Android device's local timezone
|
||||
local_timestamp = utc_timestamp.astimezone(device_timezone)
|
||||
|
||||
# HACK: We only output the UTC timestamp in convert_datetime_to_iso, we
|
||||
# set the timestamp timezone to UTC, to avoid the timezone conversion again.
|
||||
local_timestamp = local_timestamp.replace(
|
||||
tzinfo=datetime.timezone.utc
|
||||
)
|
||||
file_data[ts] = convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
self.results.append(file_data)
|
||||
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Getprop(GetPropArtifact, AndroidQFModule):
|
||||
class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
"""This module extracts data from get properties."""
|
||||
|
||||
def __init__(
|
||||
65
src/mvt/android/modules/androidqf/aqf_log_timestamps.py
Normal file
65
src/mvt/android/modules/androidqf/aqf_log_timestamps.py
Normal file
@@ -0,0 +1,65 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
import datetime
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from .base import AndroidQFModule
|
||||
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
|
||||
|
||||
|
||||
class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
"""This module creates timeline for log files extracted by AQF."""
|
||||
|
||||
slug = "aqf_log_timestamps"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def _get_file_modification_time(self, file_path: str) -> dict:
|
||||
if self.archive:
|
||||
file_timetuple = self.archive.getinfo(file_path).date_time
|
||||
return datetime.datetime(*file_timetuple)
|
||||
else:
|
||||
file_stat = os.stat(os.path.join(self.parent_path, file_path))
|
||||
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
|
||||
|
||||
def run(self) -> None:
|
||||
filesystem_files = self._get_files_by_pattern("*/logs/*")
|
||||
|
||||
self.results = []
|
||||
for file in filesystem_files:
|
||||
# Only the modification time is available in the zip file metadata.
|
||||
# The timezone is the local timezone of the machine the phone.
|
||||
modification_time = self._get_file_modification_time(file)
|
||||
self.results.append(
|
||||
{
|
||||
"path": file,
|
||||
"modified_time": convert_datetime_to_iso(modification_time),
|
||||
}
|
||||
)
|
||||
|
||||
self.log.info(
|
||||
"Extracted a total of %d filesystem timestamps from AndroidQF logs directory.",
|
||||
len(self.results),
|
||||
)
|
||||
@@ -19,7 +19,7 @@ from mvt.android.utils import (
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Packages(AndroidQFModule):
|
||||
class AQFPackages(AndroidQFModule):
|
||||
"""This module examines the installed packages in packages.json"""
|
||||
|
||||
def __init__(
|
||||
@@ -44,8 +44,7 @@ class Packages(AndroidQFModule):
|
||||
for result in self.results:
|
||||
if result["name"] in ROOT_PACKAGES:
|
||||
self.log.warning(
|
||||
"Found an installed package related to "
|
||||
'rooting/jailbreaking: "%s"',
|
||||
'Found an installed package related to rooting/jailbreaking: "%s"',
|
||||
result["name"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.processes import Processes as ProcessesArtifact
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Processes(ProcessesArtifact, AndroidQFModule):
|
||||
class AQFProcesses(ProcessesArtifact, AndroidQFModule):
|
||||
"""This module analyse running processes"""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.settings import Settings as SettingsArtifact
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Settings(SettingsArtifact, AndroidQFModule):
|
||||
class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
"""This module analyse setting files"""
|
||||
|
||||
def __init__(
|
||||
@@ -37,17 +37,48 @@ class AndroidQFModule(MVTModule):
|
||||
self.files: List[str] = []
|
||||
self.archive: Optional[zipfile.ZipFile] = None
|
||||
|
||||
def from_folder(self, parent_path: str, files: List[str]):
|
||||
def from_dir(self, parent_path: str, files: List[str]) -> None:
|
||||
self.parent_path = parent_path
|
||||
self.files = files
|
||||
|
||||
def from_zip_file(self, archive: zipfile.ZipFile, files: List[str]):
|
||||
def from_zip(self, archive: zipfile.ZipFile, files: List[str]) -> None:
|
||||
self.archive = archive
|
||||
self.files = files
|
||||
|
||||
def _get_files_by_pattern(self, pattern: str):
|
||||
return fnmatch.filter(self.files, pattern)
|
||||
|
||||
def _get_device_timezone(self):
|
||||
"""
|
||||
Get the device timezone from the getprop.txt file.
|
||||
|
||||
This is needed to map local timestamps stored in some
|
||||
Android log files to UTC/timezone-aware timestamps.
|
||||
"""
|
||||
get_prop_files = self._get_files_by_pattern("*/getprop.txt")
|
||||
if not get_prop_files:
|
||||
self.log.warning(
|
||||
"Could not find getprop.txt file. "
|
||||
"Some timestamps and timeline data may be incorrect."
|
||||
)
|
||||
return None
|
||||
|
||||
from mvt.android.artifacts.getprop import GetProp
|
||||
|
||||
properties_artifact = GetProp()
|
||||
prop_data = self._get_file_content(get_prop_files[0]).decode("utf-8")
|
||||
properties_artifact.parse(prop_data)
|
||||
timezone = properties_artifact.get_device_timezone()
|
||||
if timezone:
|
||||
self.log.debug("Identified local phone timezone: %s", timezone)
|
||||
return timezone
|
||||
|
||||
self.log.warning(
|
||||
"Could not find or determine local device timezone. "
|
||||
"Some timestamps and timeline data may be incorrect."
|
||||
)
|
||||
return None
|
||||
|
||||
def _get_file_content(self, file_path):
|
||||
if self.archive:
|
||||
handle = self.archive.open(file_path)
|
||||
|
||||
@@ -1,51 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysAccessibility(DumpsysAccessibilityArtifact, AndroidQFModule):
|
||||
"""This module analyses dumpsys accessibility"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE accessibility:")
|
||||
self.parse(content)
|
||||
|
||||
for result in self.results:
|
||||
self.log.info(
|
||||
'Found installed accessibility service "%s"', result.get("service")
|
||||
)
|
||||
|
||||
self.log.info(
|
||||
"Identified a total of %d accessibility services", len(self.results)
|
||||
)
|
||||
@@ -1,50 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_package_activities import (
|
||||
DumpsysPackageActivitiesArtifact,
|
||||
)
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysActivities(DumpsysPackageActivitiesArtifact, AndroidQFModule):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = results if results else []
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
# Get data and extract the dumpsys section
|
||||
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
|
||||
# Parse it
|
||||
self.parse(content)
|
||||
|
||||
self.log.info("Extracted %d package activities", len(self.results))
|
||||
@@ -1,51 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysADBState(DumpsysADBArtifact, AndroidQFModule):
|
||||
"""This module extracts ADB keystore state."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
full_dumpsys = self._get_file_content(dumpsys_file[0])
|
||||
content = self.extract_dumpsys_section(
|
||||
full_dumpsys,
|
||||
b"DUMP OF SERVICE adb:",
|
||||
binary=True,
|
||||
)
|
||||
self.parse(content)
|
||||
if self.results:
|
||||
self.log.info(
|
||||
"Identified a total of %d trusted ADB keys",
|
||||
len(self.results[0].get("user_keys", [])),
|
||||
)
|
||||
@@ -1,46 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysAppops(DumpsysAppopsArtifact, AndroidQFModule):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
# Extract section
|
||||
data = self._get_file_content(dumpsys_file[0])
|
||||
section = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE appops:"
|
||||
)
|
||||
|
||||
# Parse it
|
||||
self.parse(section)
|
||||
self.log.info("Identified %d applications in AppOps Manager", len(self.results))
|
||||
@@ -1,46 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, AndroidQFModule):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
# Extract section
|
||||
data = self._get_file_content(dumpsys_file[0])
|
||||
section = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE batterystats:"
|
||||
)
|
||||
|
||||
# Parse it
|
||||
self.parse(section)
|
||||
self.log.info("Extracted a total of %d battery daily stats", len(self.results))
|
||||
@@ -1,46 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, AndroidQFModule):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
# Extract section
|
||||
data = self._get_file_content(dumpsys_file[0])
|
||||
section = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE batterystats:"
|
||||
)
|
||||
|
||||
# Parse it
|
||||
self.parse(section)
|
||||
self.log.info("Extracted a total of %d battery daily stats", len(self.results))
|
||||
@@ -1,46 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysDBInfo(DumpsysDBInfoArtifact, AndroidQFModule):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
# Extract dumpsys DBInfo section
|
||||
data = self._get_file_content(dumpsys_file[0])
|
||||
section = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE dbinfo:"
|
||||
)
|
||||
|
||||
# Parse it
|
||||
self.parse(section)
|
||||
self.log.info("Identified %d DB Info entries", len(self.results))
|
||||
@@ -1,62 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
|
||||
from mvt.android.modules.adb.packages import (
|
||||
DANGEROUS_PERMISSIONS,
|
||||
DANGEROUS_PERMISSIONS_THRESHOLD,
|
||||
)
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysPackages(DumpsysPackagesArtifact, AndroidQFModule):
|
||||
"""This module analyse dumpsys packages"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[List[Dict[str, Any]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if len(dumpsys_file) != 1:
|
||||
self.log.info("Dumpsys file not found")
|
||||
return
|
||||
|
||||
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
|
||||
self.parse(content)
|
||||
|
||||
for result in self.results:
|
||||
dangerous_permissions_count = 0
|
||||
for perm in result["permissions"]:
|
||||
if perm["name"] in DANGEROUS_PERMISSIONS:
|
||||
dangerous_permissions_count += 1
|
||||
|
||||
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
|
||||
self.log.info(
|
||||
'Found package "%s" requested %d potentially dangerous permissions',
|
||||
result["package_name"],
|
||||
dangerous_permissions_count,
|
||||
)
|
||||
|
||||
self.log.info("Extracted details on %d packages", len(self.results))
|
||||
@@ -1,44 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, AndroidQFModule):
|
||||
"""This module extracts details on uninstalled apps."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
|
||||
data = self._get_file_content(dumpsys_file[0]).decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE platform_compat:")
|
||||
self.parse(content)
|
||||
|
||||
self.log.info("Found %d uninstalled apps", len(self.results))
|
||||
@@ -1,49 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class DumpsysReceivers(DumpsysReceiversArtifact, AndroidQFModule):
|
||||
"""This module analyse dumpsys receivers"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Union[List[Any], Dict[str, Any], None] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = results if results else {}
|
||||
|
||||
def run(self) -> None:
|
||||
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
|
||||
if not dumpsys_file:
|
||||
return
|
||||
data = self._get_file_content(dumpsys_file[0])
|
||||
|
||||
dumpsys_section = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE package:"
|
||||
)
|
||||
|
||||
self.parse(dumpsys_section)
|
||||
|
||||
self.log.info("Extracted receivers for %d intents", len(self.results))
|
||||
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Mounts(MountsArtifact, AndroidQFModule):
|
||||
"""This module extracts and analyzes mount information from AndroidQF acquisitions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = []
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Run the mounts analysis module.
|
||||
|
||||
This module looks for mount information files collected by androidqf
|
||||
and analyzes them for suspicious configurations, particularly focusing
|
||||
on detecting root access indicators like /system mounted as read-write.
|
||||
"""
|
||||
mount_files = self._get_files_by_pattern("*/mounts.json")
|
||||
|
||||
if not mount_files:
|
||||
self.log.info("No mount information file found")
|
||||
return
|
||||
|
||||
self.log.info("Found mount information file: %s", mount_files[0])
|
||||
|
||||
try:
|
||||
data = self._get_file_content(mount_files[0]).decode(
|
||||
"utf-8", errors="replace"
|
||||
)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to read mount information file: %s", exc)
|
||||
return
|
||||
|
||||
# Parse the mount data
|
||||
try:
|
||||
json_data = json.loads(data)
|
||||
|
||||
if isinstance(json_data, list):
|
||||
# AndroidQF format: array of strings like
|
||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
||||
mount_content = "\n".join(json_data)
|
||||
self.parse(mount_content)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to parse mount information: %s", exc)
|
||||
return
|
||||
|
||||
self.log.info("Extracted a total of %d mount entries", len(self.results))
|
||||
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class RootBinaries(AndroidQFModule):
|
||||
"""This module analyzes root_binaries.json for root binaries found by androidqf."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: dict) -> dict:
|
||||
return {
|
||||
"timestamp": record.get("timestamp"),
|
||||
"module": self.__class__.__name__,
|
||||
"event": "root_binary_found",
|
||||
"data": f"Root binary found: {record['path']} (binary: {record['binary_name']})",
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check for indicators of device rooting."""
|
||||
if not self.results:
|
||||
return
|
||||
|
||||
# All found root binaries are considered indicators of rooting
|
||||
for result in self.results:
|
||||
self.log.warning(
|
||||
'Found root binary "%s" at path "%s"',
|
||||
result["binary_name"],
|
||||
result["path"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
if self.detected:
|
||||
self.log.warning(
|
||||
"Device shows signs of rooting with %d root binaries found",
|
||||
len(self.detected),
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the root binaries analysis."""
|
||||
root_binaries_files = self._get_files_by_pattern("*/root_binaries.json")
|
||||
|
||||
if not root_binaries_files:
|
||||
self.log.info("No root_binaries.json file found")
|
||||
return
|
||||
|
||||
rawdata = self._get_file_content(root_binaries_files[0]).decode(
|
||||
"utf-8", errors="ignore"
|
||||
)
|
||||
|
||||
try:
|
||||
root_binary_paths = json.loads(rawdata)
|
||||
except json.JSONDecodeError as e:
|
||||
self.log.error("Failed to parse root_binaries.json: %s", e)
|
||||
return
|
||||
|
||||
if not isinstance(root_binary_paths, list):
|
||||
self.log.error("Expected root_binaries.json to contain a list of paths")
|
||||
return
|
||||
|
||||
# Known root binary names that might be found and their descriptions
|
||||
# This maps the binary name to a human-readable description
|
||||
known_root_binaries = {
|
||||
"su": "SuperUser binary",
|
||||
"busybox": "BusyBox utilities",
|
||||
"supersu": "SuperSU root management",
|
||||
"Superuser.apk": "Superuser app",
|
||||
"KingoUser.apk": "KingRoot app",
|
||||
"SuperSu.apk": "SuperSU app",
|
||||
"magisk": "Magisk root framework",
|
||||
"magiskhide": "Magisk hide utility",
|
||||
"magiskinit": "Magisk init binary",
|
||||
"magiskpolicy": "Magisk policy binary",
|
||||
}
|
||||
|
||||
for path in root_binary_paths:
|
||||
if not path or not isinstance(path, str):
|
||||
continue
|
||||
|
||||
# Extract binary name from path
|
||||
binary_name = path.split("/")[-1].lower()
|
||||
|
||||
# Check if this matches a known root binary by exact name match
|
||||
description = "Unknown root binary"
|
||||
for known_binary in known_root_binaries:
|
||||
if binary_name == known_binary.lower():
|
||||
description = known_root_binaries[known_binary]
|
||||
break
|
||||
|
||||
result = {
|
||||
"path": path.strip(),
|
||||
"binary_name": binary_name,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
self.results.append(result)
|
||||
|
||||
self.log.info("Found %d root binaries", len(self.results))
|
||||
@@ -19,7 +19,13 @@ from .base import AndroidQFModule
|
||||
|
||||
|
||||
class SMS(AndroidQFModule):
|
||||
"""This module analyse SMS file in backup"""
|
||||
"""
|
||||
This module analyse SMS file in backup
|
||||
|
||||
XXX: We should also de-duplicate this AQF module, but first we
|
||||
need to add tests for loading encrypted SMS backups through the backup
|
||||
sub-module.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -37,10 +37,7 @@ class BackupExtraction(MVTModule):
|
||||
self.tar = None
|
||||
self.files = []
|
||||
|
||||
def from_folder(self, backup_path: Optional[str], files: List[str]) -> None:
|
||||
"""
|
||||
Get all the files and list them
|
||||
"""
|
||||
def from_dir(self, backup_path: Optional[str], files: List[str]) -> None:
|
||||
self.backup_path = backup_path
|
||||
self.files = files
|
||||
|
||||
@@ -58,14 +55,16 @@ class BackupExtraction(MVTModule):
|
||||
return fnmatch.filter(self.files, pattern)
|
||||
|
||||
def _get_file_content(self, file_path: str) -> bytes:
|
||||
if self.ab:
|
||||
if self.tar:
|
||||
try:
|
||||
member = self.tar.getmember(file_path)
|
||||
except KeyError:
|
||||
return None
|
||||
handle = self.tar.extractfile(member)
|
||||
else:
|
||||
elif self.backup_path:
|
||||
handle = open(os.path.join(self.backup_path, file_path), "rb")
|
||||
else:
|
||||
raise ValueError("No backup path or tar file provided")
|
||||
|
||||
data = handle.read()
|
||||
handle.close()
|
||||
|
||||
@@ -3,10 +3,11 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
|
||||
from rich.prompt import Prompt
|
||||
|
||||
from mvt.common.config import settings
|
||||
|
||||
MVT_ANDROID_BACKUP_PASSWORD = "MVT_ANDROID_BACKUP_PASSWORD"
|
||||
|
||||
|
||||
@@ -16,24 +17,24 @@ def cli_load_android_backup_password(log, backup_password):
|
||||
|
||||
Used in MVT CLI command parsers.
|
||||
"""
|
||||
password_from_env = os.environ.get(MVT_ANDROID_BACKUP_PASSWORD, None)
|
||||
password_from_env_or_config = settings.ANDROID_BACKUP_PASSWORD
|
||||
if backup_password:
|
||||
log.info(
|
||||
"Your password may be visible in the process table because it "
|
||||
"was supplied on the command line!"
|
||||
)
|
||||
if password_from_env:
|
||||
if password_from_env_or_config:
|
||||
log.info(
|
||||
"Ignoring %s environment variable, using --backup-password argument instead",
|
||||
MVT_ANDROID_BACKUP_PASSWORD,
|
||||
"MVT_ANDROID_BACKUP_PASSWORD",
|
||||
)
|
||||
return backup_password
|
||||
elif password_from_env:
|
||||
elif password_from_env_or_config:
|
||||
log.info(
|
||||
"Using backup password from %s environment variable",
|
||||
MVT_ANDROID_BACKUP_PASSWORD,
|
||||
"Using backup password from %s environment variable or config file",
|
||||
"MVT_ANDROID_BACKUP_PASSWORD",
|
||||
)
|
||||
return password_from_env
|
||||
return password_from_env_or_config
|
||||
|
||||
|
||||
def prompt_or_load_android_backup_password(log, module_options):
|
||||
|
||||
@@ -50,13 +50,13 @@ class SMS(BackupExtraction):
|
||||
def run(self) -> None:
|
||||
sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup"
|
||||
for file in self._get_files_by_pattern(sms_path):
|
||||
self.log.info("Processing SMS backup file at %s", file)
|
||||
self.log.debug("Processing SMS backup file at %s", file)
|
||||
data = self._get_file_content(file)
|
||||
self.results.extend(parse_sms_file(data))
|
||||
|
||||
mms_path = "apps/com.android.providers.telephony/d_f/*_mms_backup"
|
||||
for file in self._get_files_by_pattern(mms_path):
|
||||
self.log.info("Processing MMS backup file at %s", file)
|
||||
self.log.debug("Processing MMS backup file at %s", file)
|
||||
data = self._get_file_content(file)
|
||||
self.results.extend(parse_sms_file(data))
|
||||
|
||||
|
||||
@@ -3,28 +3,32 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .accessibility import Accessibility
|
||||
from .activities import Activities
|
||||
from .appops import Appops
|
||||
from .battery_daily import BatteryDaily
|
||||
from .battery_history import BatteryHistory
|
||||
from .dbinfo import DBInfo
|
||||
from .getprop import Getprop
|
||||
from .packages import Packages
|
||||
from .platform_compat import PlatformCompat
|
||||
from .receivers import Receivers
|
||||
from .adb_state import DumpsysADBState
|
||||
from .dumpsys_accessibility import DumpsysAccessibility
|
||||
from .dumpsys_activities import DumpsysActivities
|
||||
from .dumpsys_appops import DumpsysAppops
|
||||
from .dumpsys_battery_daily import DumpsysBatteryDaily
|
||||
from .dumpsys_battery_history import DumpsysBatteryHistory
|
||||
from .dumpsys_dbinfo import DumpsysDBInfo
|
||||
from .dumpsys_getprop import DumpsysGetProp
|
||||
from .dumpsys_packages import DumpsysPackages
|
||||
from .dumpsys_platform_compat import DumpsysPlatformCompat
|
||||
from .dumpsys_receivers import DumpsysReceivers
|
||||
from .dumpsys_adb_state import DumpsysADBState
|
||||
from .fs_timestamps import BugReportTimestamps
|
||||
from .tombstones import Tombstones
|
||||
|
||||
BUGREPORT_MODULES = [
|
||||
Accessibility,
|
||||
Activities,
|
||||
Appops,
|
||||
BatteryDaily,
|
||||
BatteryHistory,
|
||||
DBInfo,
|
||||
Getprop,
|
||||
Packages,
|
||||
PlatformCompat,
|
||||
Receivers,
|
||||
DumpsysAccessibility,
|
||||
DumpsysActivities,
|
||||
DumpsysAppops,
|
||||
DumpsysBatteryDaily,
|
||||
DumpsysBatteryHistory,
|
||||
DumpsysDBInfo,
|
||||
DumpsysGetProp,
|
||||
DumpsysPackages,
|
||||
DumpsysPlatformCompat,
|
||||
DumpsysReceivers,
|
||||
DumpsysADBState,
|
||||
BugReportTimestamps,
|
||||
Tombstones,
|
||||
]
|
||||
|
||||
@@ -2,10 +2,11 @@
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# See the file 'LICENSE' for usage and copying permissions, or find a copy at
|
||||
# https://github.com/mvt-project/mvt/blob/main/LICENSE
|
||||
|
||||
import datetime
|
||||
import fnmatch
|
||||
import logging
|
||||
import os
|
||||
|
||||
from typing import List, Optional
|
||||
from zipfile import ZipFile
|
||||
|
||||
@@ -38,9 +39,7 @@ class BugReportModule(MVTModule):
|
||||
self.extract_files: List[str] = []
|
||||
self.zip_files: List[str] = []
|
||||
|
||||
def from_folder(
|
||||
self, extract_path: Optional[str], extract_files: List[str]
|
||||
) -> None:
|
||||
def from_dir(self, extract_path: str, extract_files: List[str]) -> None:
|
||||
self.extract_path = extract_path
|
||||
self.extract_files = extract_files
|
||||
|
||||
@@ -91,3 +90,11 @@ class BugReportModule(MVTModule):
|
||||
return None
|
||||
|
||||
return self._get_file_content(dumpstate_logs[0])
|
||||
|
||||
def _get_file_modification_time(self, file_path: str) -> dict:
|
||||
if self.zip_archive:
|
||||
file_timetuple = self.zip_archive.getinfo(file_path).date_time
|
||||
return datetime.datetime(*file_timetuple)
|
||||
else:
|
||||
file_stat = os.stat(os.path.join(self.extract_path, file_path))
|
||||
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
|
||||
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArti
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Accessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
"""This module extracts stats on accessibility."""
|
||||
|
||||
def __init__(
|
||||
@@ -13,7 +13,7 @@ from mvt.android.artifacts.dumpsys_package_activities import (
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Activities(DumpsysPackageActivitiesArtifact, BugReportModule):
|
||||
class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Appops(DumpsysAppopsArtifact, BugReportModule):
|
||||
class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
|
||||
"""This module extracts information on package from App-Ops Manager."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtif
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class BatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
|
||||
class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryA
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class BatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
|
||||
class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class DBInfo(DumpsysDBInfoArtifact, BugReportModule):
|
||||
class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
slug = "dbinfo"
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Getprop(GetPropArtifact, BugReportModule):
|
||||
class DumpsysGetProp(GetPropArtifact, BugReportModule):
|
||||
"""This module extracts device properties from getprop command."""
|
||||
|
||||
def __init__(
|
||||
@@ -12,7 +12,7 @@ from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRES
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Packages(DumpsysPackagesArtifact, BugReportModule):
|
||||
class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatA
|
||||
from mvt.android.modules.bugreport.base import BugReportModule
|
||||
|
||||
|
||||
class PlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
"""This module extracts details on uninstalled apps."""
|
||||
|
||||
def __init__(
|
||||
@@ -11,7 +11,7 @@ from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Receivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
"""This module extracts details on receivers for risky activities."""
|
||||
|
||||
def __init__(
|
||||
@@ -6,15 +6,15 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from .base import BugReportModule
|
||||
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
|
||||
|
||||
|
||||
class DumpsysDBInfo(DumpsysDBInfoArtifact, AndroidExtraction):
|
||||
class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
slug = "dumpsys_dbinfo"
|
||||
slug = "bugreport_timestamps"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -35,13 +35,21 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, AndroidExtraction):
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
output = self._adb_command("dumpsys dbinfo")
|
||||
self._adb_disconnect()
|
||||
filesystem_files = self._get_files_by_pattern("FS/*")
|
||||
|
||||
self.parse(output)
|
||||
self.results = []
|
||||
for file in filesystem_files:
|
||||
# Only the modification time is available in the zip file metadata.
|
||||
# The timezone is the local timezone of the machine the phone.
|
||||
modification_time = self._get_file_modification_time(file)
|
||||
self.results.append(
|
||||
{
|
||||
"path": file,
|
||||
"modified_time": convert_datetime_to_iso(modification_time),
|
||||
}
|
||||
)
|
||||
|
||||
self.log.info(
|
||||
"Extracted a total of %d records from database information",
|
||||
"Extracted a total of %d filesystem timestamps from bugreport.",
|
||||
len(self.results),
|
||||
)
|
||||
64
src/mvt/android/modules/bugreport/tombstones.py
Normal file
64
src/mvt/android/modules/bugreport/tombstones.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
class Tombstones(TombstoneCrashArtifact, BugReportModule):
|
||||
"""This module extracts records from battery daily updates."""
|
||||
|
||||
slug = "tombstones"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
tombstone_files = self._get_files_by_pattern("*/tombstone_*")
|
||||
if not tombstone_files:
|
||||
self.log.error(
|
||||
"Unable to find any tombstone files. "
|
||||
"Did you provide a valid bugreport archive?"
|
||||
)
|
||||
return
|
||||
|
||||
for tombstone_file in sorted(tombstone_files):
|
||||
tombstone_filename = tombstone_file.split("/")[-1]
|
||||
modification_time = self._get_file_modification_time(tombstone_file)
|
||||
tombstone_data = self._get_file_content(tombstone_file)
|
||||
|
||||
try:
|
||||
if tombstone_file.endswith(".pb"):
|
||||
self.parse_protobuf(
|
||||
tombstone_filename, modification_time, tombstone_data
|
||||
)
|
||||
else:
|
||||
self.parse(tombstone_filename, modification_time, tombstone_data)
|
||||
except ValueError as e:
|
||||
# Catch any exceptions raised during parsing or validation.
|
||||
self.log.error(f"Error parsing tombstone file {tombstone_file}: {e}")
|
||||
|
||||
self.log.info(
|
||||
"Extracted a total of %d tombstone files",
|
||||
len(self.results),
|
||||
)
|
||||
@@ -231,6 +231,7 @@ def parse_sms_file(data):
|
||||
entry.pop("mms_body")
|
||||
|
||||
body = entry.get("body", None)
|
||||
message_links = None
|
||||
if body:
|
||||
message_links = check_for_links(entry["body"])
|
||||
|
||||
|
||||
0
src/mvt/android/parsers/proto/__init__.py
Normal file
0
src/mvt/android/parsers/proto/__init__.py
Normal file
195
src/mvt/android/parsers/proto/tombstone.proto
Normal file
195
src/mvt/android/parsers/proto/tombstone.proto
Normal file
@@ -0,0 +1,195 @@
|
||||
// tombstone.proto file from Android source
|
||||
// Src: https://android.googlesource.com/platform/system/core/+/refs/heads/main/debuggerd/proto/tombstone.proto
|
||||
//
|
||||
// Protobuf definition for Android tombstones.
|
||||
//
|
||||
// An app can get hold of these for any `REASON_CRASH_NATIVE` instance of
|
||||
// `android.app.ApplicationExitInfo`.
|
||||
//
|
||||
// https://developer.android.com/reference/android/app/ApplicationExitInfo#getTraceInputStream()
|
||||
//
|
||||
syntax = "proto3";
|
||||
option java_package = "com.android.server.os";
|
||||
option java_outer_classname = "TombstoneProtos";
|
||||
// NOTE TO OEMS:
|
||||
// If you add custom fields to this proto, do not use numbers in the reserved range.
|
||||
message CrashDetail {
|
||||
bytes name = 1;
|
||||
bytes data = 2;
|
||||
reserved 3 to 999;
|
||||
}
|
||||
message StackHistoryBufferEntry {
|
||||
BacktraceFrame addr = 1;
|
||||
uint64 fp = 2;
|
||||
uint64 tag = 3;
|
||||
reserved 4 to 999;
|
||||
}
|
||||
message StackHistoryBuffer {
|
||||
uint64 tid = 1;
|
||||
repeated StackHistoryBufferEntry entries = 2;
|
||||
reserved 3 to 999;
|
||||
}
|
||||
message Tombstone {
|
||||
Architecture arch = 1;
|
||||
Architecture guest_arch = 24;
|
||||
string build_fingerprint = 2;
|
||||
string revision = 3;
|
||||
string timestamp = 4;
|
||||
uint32 pid = 5;
|
||||
uint32 tid = 6;
|
||||
uint32 uid = 7;
|
||||
string selinux_label = 8;
|
||||
repeated string command_line = 9;
|
||||
// Process uptime in seconds.
|
||||
uint32 process_uptime = 20;
|
||||
Signal signal_info = 10;
|
||||
string abort_message = 14;
|
||||
repeated CrashDetail crash_details = 21;
|
||||
repeated Cause causes = 15;
|
||||
map<uint32, Thread> threads = 16;
|
||||
map<uint32, Thread> guest_threads = 25;
|
||||
repeated MemoryMapping memory_mappings = 17;
|
||||
repeated LogBuffer log_buffers = 18;
|
||||
repeated FD open_fds = 19;
|
||||
uint32 page_size = 22;
|
||||
bool has_been_16kb_mode = 23;
|
||||
StackHistoryBuffer stack_history_buffer = 26;
|
||||
reserved 27 to 999;
|
||||
}
|
||||
enum Architecture {
|
||||
ARM32 = 0;
|
||||
ARM64 = 1;
|
||||
X86 = 2;
|
||||
X86_64 = 3;
|
||||
RISCV64 = 4;
|
||||
NONE = 5;
|
||||
reserved 6 to 999;
|
||||
}
|
||||
message Signal {
|
||||
int32 number = 1;
|
||||
string name = 2;
|
||||
int32 code = 3;
|
||||
string code_name = 4;
|
||||
bool has_sender = 5;
|
||||
int32 sender_uid = 6;
|
||||
int32 sender_pid = 7;
|
||||
bool has_fault_address = 8;
|
||||
uint64 fault_address = 9;
|
||||
// Note, may or may not contain the dump of the actual memory contents. Currently, on arm64, we
|
||||
// only include metadata, and not the contents.
|
||||
MemoryDump fault_adjacent_metadata = 10;
|
||||
reserved 11 to 999;
|
||||
}
|
||||
message HeapObject {
|
||||
uint64 address = 1;
|
||||
uint64 size = 2;
|
||||
uint64 allocation_tid = 3;
|
||||
repeated BacktraceFrame allocation_backtrace = 4;
|
||||
uint64 deallocation_tid = 5;
|
||||
repeated BacktraceFrame deallocation_backtrace = 6;
|
||||
}
|
||||
message MemoryError {
|
||||
enum Tool {
|
||||
GWP_ASAN = 0;
|
||||
SCUDO = 1;
|
||||
reserved 2 to 999;
|
||||
}
|
||||
Tool tool = 1;
|
||||
enum Type {
|
||||
UNKNOWN = 0;
|
||||
USE_AFTER_FREE = 1;
|
||||
DOUBLE_FREE = 2;
|
||||
INVALID_FREE = 3;
|
||||
BUFFER_OVERFLOW = 4;
|
||||
BUFFER_UNDERFLOW = 5;
|
||||
reserved 6 to 999;
|
||||
}
|
||||
Type type = 2;
|
||||
oneof location {
|
||||
HeapObject heap = 3;
|
||||
}
|
||||
reserved 4 to 999;
|
||||
}
|
||||
message Cause {
|
||||
string human_readable = 1;
|
||||
oneof details {
|
||||
MemoryError memory_error = 2;
|
||||
}
|
||||
reserved 3 to 999;
|
||||
}
|
||||
message Register {
|
||||
string name = 1;
|
||||
uint64 u64 = 2;
|
||||
reserved 3 to 999;
|
||||
}
|
||||
message Thread {
|
||||
int32 id = 1;
|
||||
string name = 2;
|
||||
repeated Register registers = 3;
|
||||
repeated string backtrace_note = 7;
|
||||
repeated string unreadable_elf_files = 9;
|
||||
repeated BacktraceFrame current_backtrace = 4;
|
||||
repeated MemoryDump memory_dump = 5;
|
||||
int64 tagged_addr_ctrl = 6;
|
||||
int64 pac_enabled_keys = 8;
|
||||
reserved 10 to 999;
|
||||
}
|
||||
message BacktraceFrame {
|
||||
uint64 rel_pc = 1;
|
||||
uint64 pc = 2;
|
||||
uint64 sp = 3;
|
||||
string function_name = 4;
|
||||
uint64 function_offset = 5;
|
||||
string file_name = 6;
|
||||
uint64 file_map_offset = 7;
|
||||
string build_id = 8;
|
||||
reserved 9 to 999;
|
||||
}
|
||||
message ArmMTEMetadata {
|
||||
// One memory tag per granule (e.g. every 16 bytes) of regular memory.
|
||||
bytes memory_tags = 1;
|
||||
reserved 2 to 999;
|
||||
}
|
||||
message MemoryDump {
|
||||
string register_name = 1;
|
||||
string mapping_name = 2;
|
||||
uint64 begin_address = 3;
|
||||
bytes memory = 4;
|
||||
oneof metadata {
|
||||
ArmMTEMetadata arm_mte_metadata = 6;
|
||||
}
|
||||
reserved 5, 7 to 999;
|
||||
}
|
||||
message MemoryMapping {
|
||||
uint64 begin_address = 1;
|
||||
uint64 end_address = 2;
|
||||
uint64 offset = 3;
|
||||
bool read = 4;
|
||||
bool write = 5;
|
||||
bool execute = 6;
|
||||
string mapping_name = 7;
|
||||
string build_id = 8;
|
||||
uint64 load_bias = 9;
|
||||
reserved 10 to 999;
|
||||
}
|
||||
message FD {
|
||||
int32 fd = 1;
|
||||
string path = 2;
|
||||
string owner = 3;
|
||||
uint64 tag = 4;
|
||||
reserved 5 to 999;
|
||||
}
|
||||
message LogBuffer {
|
||||
string name = 1;
|
||||
repeated LogMessage logs = 2;
|
||||
reserved 3 to 999;
|
||||
}
|
||||
message LogMessage {
|
||||
string timestamp = 1;
|
||||
uint32 pid = 2;
|
||||
uint32 tid = 3;
|
||||
uint32 priority = 4;
|
||||
string tag = 5;
|
||||
string message = 6;
|
||||
reserved 7 to 999;
|
||||
}
|
||||
208
src/mvt/android/parsers/proto/tombstone.py
Normal file
208
src/mvt/android/parsers/proto/tombstone.py
Normal file
@@ -0,0 +1,208 @@
|
||||
# Generated by the protocol buffer compiler. DO NOT EDIT!
|
||||
# sources: tombstone.proto
|
||||
# plugin: python-betterproto
|
||||
from dataclasses import dataclass
|
||||
from typing import Dict, List
|
||||
|
||||
import betterproto
|
||||
|
||||
|
||||
class Architecture(betterproto.Enum):
|
||||
ARM32 = 0
|
||||
ARM64 = 1
|
||||
X86 = 2
|
||||
X86_64 = 3
|
||||
RISCV64 = 4
|
||||
NONE = 5
|
||||
|
||||
|
||||
class MemoryErrorTool(betterproto.Enum):
|
||||
GWP_ASAN = 0
|
||||
SCUDO = 1
|
||||
|
||||
|
||||
class MemoryErrorType(betterproto.Enum):
|
||||
UNKNOWN = 0
|
||||
USE_AFTER_FREE = 1
|
||||
DOUBLE_FREE = 2
|
||||
INVALID_FREE = 3
|
||||
BUFFER_OVERFLOW = 4
|
||||
BUFFER_UNDERFLOW = 5
|
||||
|
||||
|
||||
@dataclass
|
||||
class CrashDetail(betterproto.Message):
|
||||
"""
|
||||
NOTE TO OEMS: If you add custom fields to this proto, do not use numbers in
|
||||
the reserved range.
|
||||
"""
|
||||
|
||||
name: bytes = betterproto.bytes_field(1)
|
||||
data: bytes = betterproto.bytes_field(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBufferEntry(betterproto.Message):
|
||||
addr: "BacktraceFrame" = betterproto.message_field(1)
|
||||
fp: int = betterproto.uint64_field(2)
|
||||
tag: int = betterproto.uint64_field(3)
|
||||
|
||||
|
||||
@dataclass
|
||||
class StackHistoryBuffer(betterproto.Message):
|
||||
tid: int = betterproto.uint64_field(1)
|
||||
entries: List["StackHistoryBufferEntry"] = betterproto.message_field(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Tombstone(betterproto.Message):
|
||||
arch: "Architecture" = betterproto.enum_field(1)
|
||||
guest_arch: "Architecture" = betterproto.enum_field(24)
|
||||
build_fingerprint: str = betterproto.string_field(2)
|
||||
revision: str = betterproto.string_field(3)
|
||||
timestamp: str = betterproto.string_field(4)
|
||||
pid: int = betterproto.uint32_field(5)
|
||||
tid: int = betterproto.uint32_field(6)
|
||||
uid: int = betterproto.uint32_field(7)
|
||||
selinux_label: str = betterproto.string_field(8)
|
||||
command_line: List[str] = betterproto.string_field(9)
|
||||
# Process uptime in seconds.
|
||||
process_uptime: int = betterproto.uint32_field(20)
|
||||
signal_info: "Signal" = betterproto.message_field(10)
|
||||
abort_message: str = betterproto.string_field(14)
|
||||
crash_details: List["CrashDetail"] = betterproto.message_field(21)
|
||||
causes: List["Cause"] = betterproto.message_field(15)
|
||||
threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
16, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
)
|
||||
guest_threads: Dict[int, "Thread"] = betterproto.map_field(
|
||||
25, betterproto.TYPE_UINT32, betterproto.TYPE_MESSAGE
|
||||
)
|
||||
memory_mappings: List["MemoryMapping"] = betterproto.message_field(17)
|
||||
log_buffers: List["LogBuffer"] = betterproto.message_field(18)
|
||||
open_fds: List["FD"] = betterproto.message_field(19)
|
||||
page_size: int = betterproto.uint32_field(22)
|
||||
has_been_16kb_mode: bool = betterproto.bool_field(23)
|
||||
stack_history_buffer: "StackHistoryBuffer" = betterproto.message_field(26)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Signal(betterproto.Message):
|
||||
number: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
code: int = betterproto.int32_field(3)
|
||||
code_name: str = betterproto.string_field(4)
|
||||
has_sender: bool = betterproto.bool_field(5)
|
||||
sender_uid: int = betterproto.int32_field(6)
|
||||
sender_pid: int = betterproto.int32_field(7)
|
||||
has_fault_address: bool = betterproto.bool_field(8)
|
||||
fault_address: int = betterproto.uint64_field(9)
|
||||
# Note, may or may not contain the dump of the actual memory contents.
|
||||
# Currently, on arm64, we only include metadata, and not the contents.
|
||||
fault_adjacent_metadata: "MemoryDump" = betterproto.message_field(10)
|
||||
|
||||
|
||||
@dataclass
|
||||
class HeapObject(betterproto.Message):
|
||||
address: int = betterproto.uint64_field(1)
|
||||
size: int = betterproto.uint64_field(2)
|
||||
allocation_tid: int = betterproto.uint64_field(3)
|
||||
allocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
deallocation_tid: int = betterproto.uint64_field(5)
|
||||
deallocation_backtrace: List["BacktraceFrame"] = betterproto.message_field(6)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryError(betterproto.Message):
|
||||
tool: "MemoryErrorTool" = betterproto.enum_field(1)
|
||||
type: "MemoryErrorType" = betterproto.enum_field(2)
|
||||
heap: "HeapObject" = betterproto.message_field(3, group="location")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Cause(betterproto.Message):
|
||||
human_readable: str = betterproto.string_field(1)
|
||||
memory_error: "MemoryError" = betterproto.message_field(2, group="details")
|
||||
|
||||
|
||||
@dataclass
|
||||
class Register(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
u64: int = betterproto.uint64_field(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Thread(betterproto.Message):
|
||||
id: int = betterproto.int32_field(1)
|
||||
name: str = betterproto.string_field(2)
|
||||
registers: List["Register"] = betterproto.message_field(3)
|
||||
backtrace_note: List[str] = betterproto.string_field(7)
|
||||
unreadable_elf_files: List[str] = betterproto.string_field(9)
|
||||
current_backtrace: List["BacktraceFrame"] = betterproto.message_field(4)
|
||||
memory_dump: List["MemoryDump"] = betterproto.message_field(5)
|
||||
tagged_addr_ctrl: int = betterproto.int64_field(6)
|
||||
pac_enabled_keys: int = betterproto.int64_field(8)
|
||||
|
||||
|
||||
@dataclass
|
||||
class BacktraceFrame(betterproto.Message):
|
||||
rel_pc: int = betterproto.uint64_field(1)
|
||||
pc: int = betterproto.uint64_field(2)
|
||||
sp: int = betterproto.uint64_field(3)
|
||||
function_name: str = betterproto.string_field(4)
|
||||
function_offset: int = betterproto.uint64_field(5)
|
||||
file_name: str = betterproto.string_field(6)
|
||||
file_map_offset: int = betterproto.uint64_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ArmMTEMetadata(betterproto.Message):
|
||||
# One memory tag per granule (e.g. every 16 bytes) of regular memory.
|
||||
memory_tags: bytes = betterproto.bytes_field(1)
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryDump(betterproto.Message):
|
||||
register_name: str = betterproto.string_field(1)
|
||||
mapping_name: str = betterproto.string_field(2)
|
||||
begin_address: int = betterproto.uint64_field(3)
|
||||
memory: bytes = betterproto.bytes_field(4)
|
||||
arm_mte_metadata: "ArmMTEMetadata" = betterproto.message_field(6, group="metadata")
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryMapping(betterproto.Message):
|
||||
begin_address: int = betterproto.uint64_field(1)
|
||||
end_address: int = betterproto.uint64_field(2)
|
||||
offset: int = betterproto.uint64_field(3)
|
||||
read: bool = betterproto.bool_field(4)
|
||||
write: bool = betterproto.bool_field(5)
|
||||
execute: bool = betterproto.bool_field(6)
|
||||
mapping_name: str = betterproto.string_field(7)
|
||||
build_id: str = betterproto.string_field(8)
|
||||
load_bias: int = betterproto.uint64_field(9)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FD(betterproto.Message):
|
||||
fd: int = betterproto.int32_field(1)
|
||||
path: str = betterproto.string_field(2)
|
||||
owner: str = betterproto.string_field(3)
|
||||
tag: int = betterproto.uint64_field(4)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogBuffer(betterproto.Message):
|
||||
name: str = betterproto.string_field(1)
|
||||
logs: List["LogMessage"] = betterproto.message_field(2)
|
||||
|
||||
|
||||
@dataclass
|
||||
class LogMessage(betterproto.Message):
|
||||
timestamp: str = betterproto.string_field(1)
|
||||
pid: int = betterproto.uint32_field(2)
|
||||
tid: int = betterproto.uint32_field(3)
|
||||
priority: int = betterproto.uint32_field(4)
|
||||
tag: str = betterproto.string_field(5)
|
||||
message: str = betterproto.string_field(6)
|
||||
@@ -22,6 +22,10 @@ class CmdCheckIOCS(Command):
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -30,7 +34,11 @@ class CmdCheckIOCS(Command):
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-iocs"
|
||||
@@ -65,6 +73,10 @@ class CmdCheckIOCS(Command):
|
||||
m = iocs_module.from_json(
|
||||
file_path, log=logging.getLogger(iocs_module.__module__)
|
||||
)
|
||||
if not m:
|
||||
log.warning("No result from this module, skipping it")
|
||||
continue
|
||||
|
||||
if self.iocs.total_ioc_count > 0:
|
||||
m.indicators = self.iocs
|
||||
m.indicators.log = m.log
|
||||
|
||||
@@ -17,6 +17,7 @@ from mvt.common.utils import (
|
||||
generate_hashes_from_path,
|
||||
get_sha256_from_file_path,
|
||||
)
|
||||
from mvt.common.config import settings
|
||||
from mvt.common.version import MVT_VERSION
|
||||
|
||||
|
||||
@@ -26,11 +27,15 @@ class Command:
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
self.name = ""
|
||||
self.modules = []
|
||||
@@ -41,6 +46,9 @@ class Command:
|
||||
self.module_name = module_name
|
||||
self.serial = serial
|
||||
self.log = log
|
||||
self.sub_command = sub_command
|
||||
self.disable_version_check = disable_version_check
|
||||
self.disable_indicator_check = disable_indicator_check
|
||||
|
||||
# This dictionary can contain options that will be passed down from
|
||||
# the Command to all modules. This can for example be used to pass
|
||||
@@ -59,8 +67,12 @@ class Command:
|
||||
# Load IOCs
|
||||
self._create_storage()
|
||||
self._setup_logging()
|
||||
self.iocs = Indicators(log=log)
|
||||
self.iocs.load_indicators_files(self.ioc_files)
|
||||
|
||||
if iocs is not None:
|
||||
self.iocs = iocs
|
||||
else:
|
||||
self.iocs = Indicators(self.log)
|
||||
self.iocs.load_indicators_files(self.ioc_files)
|
||||
|
||||
def _create_storage(self) -> None:
|
||||
if self.results_path and not os.path.exists(self.results_path):
|
||||
@@ -81,7 +93,7 @@ class Command:
|
||||
os.path.join(self.results_path, "command.log")
|
||||
)
|
||||
formatter = logging.Formatter(
|
||||
"%(asctime)s - %(name)s - " "%(levelname)s - %(message)s"
|
||||
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(formatter)
|
||||
@@ -100,15 +112,25 @@ class Command:
|
||||
if not self.results_path:
|
||||
return
|
||||
|
||||
# We use local timestamps in the timeline on Android as many
|
||||
# logs do not contain timezone information.
|
||||
if type(self).__name__.startswith("CmdAndroid"):
|
||||
is_utc = False
|
||||
else:
|
||||
is_utc = True
|
||||
|
||||
if len(self.timeline) > 0:
|
||||
save_timeline(
|
||||
self.timeline, os.path.join(self.results_path, "timeline.csv")
|
||||
self.timeline,
|
||||
os.path.join(self.results_path, "timeline.csv"),
|
||||
is_utc=is_utc,
|
||||
)
|
||||
|
||||
if len(self.timeline_detected) > 0:
|
||||
save_timeline(
|
||||
self.timeline_detected,
|
||||
os.path.join(self.results_path, "timeline_detected.csv"),
|
||||
is_utc=is_utc,
|
||||
)
|
||||
|
||||
def _store_info(self) -> None:
|
||||
@@ -132,7 +154,7 @@ class Command:
|
||||
if ioc_file_path and ioc_file_path not in info["ioc_files"]:
|
||||
info["ioc_files"].append(ioc_file_path)
|
||||
|
||||
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
|
||||
if self.target_path and (settings.HASH_FILES or self.hashes):
|
||||
self.generate_hashes()
|
||||
|
||||
info["hashes"] = self.hash_values
|
||||
@@ -141,7 +163,7 @@ class Command:
|
||||
with open(info_path, "w+", encoding="utf-8") as handle:
|
||||
json.dump(info, handle, indent=4)
|
||||
|
||||
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
|
||||
if self.target_path and (settings.HASH_FILES or self.hashes):
|
||||
info_hash = get_sha256_from_file_path(info_path)
|
||||
self.log.info('Reference hash of the info.json file: "%s"', info_hash)
|
||||
|
||||
@@ -236,6 +258,10 @@ class Command:
|
||||
except NotImplementedError:
|
||||
pass
|
||||
|
||||
# We only store the timeline from the parent/main command
|
||||
if self.sub_command:
|
||||
return
|
||||
|
||||
self._store_timeline()
|
||||
self._store_info()
|
||||
|
||||
|
||||
127
src/mvt/common/config.py
Normal file
127
src/mvt/common/config.py
Normal file
@@ -0,0 +1,127 @@
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
import uuid
|
||||
|
||||
from typing import Tuple, Type, Optional
|
||||
from appdirs import user_config_dir
|
||||
from pydantic import AnyHttpUrl, BaseModel, Field
|
||||
from pydantic_settings import (
|
||||
BaseSettings,
|
||||
InitSettingsSource,
|
||||
PydanticBaseSettingsSource,
|
||||
SettingsConfigDict,
|
||||
YamlConfigSettingsSource,
|
||||
)
|
||||
|
||||
MVT_CONFIG_FOLDER = user_config_dir("mvt")
|
||||
MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml")
|
||||
|
||||
|
||||
class TelemetrySettings(BaseModel):
|
||||
"""
|
||||
Settings used by the Telemetry module.
|
||||
"""
|
||||
|
||||
ENABLED: bool = Field(True, description="Flag for telemetry collection")
|
||||
ENDPOINT: AnyHttpUrl = Field(
|
||||
"https://t.mvt.re/events", description="Telemetry collection endpoint"
|
||||
)
|
||||
DEVICE_ID: str | None = Field(
|
||||
default=None,
|
||||
required=True,
|
||||
description="Anonymous Unique ID for use in telemetry",
|
||||
)
|
||||
|
||||
|
||||
class MVTSettings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="MVT_",
|
||||
env_nested_delimiter="_",
|
||||
extra="ignore",
|
||||
nested_model_default_partial_updates=True,
|
||||
)
|
||||
# Flag to enable or disable loading of environment variables.
|
||||
load_env: bool = Field(True, exclude=True)
|
||||
|
||||
# General settings
|
||||
PYPI_UPDATE_URL: AnyHttpUrl = Field(
|
||||
"https://pypi.org/pypi/mvt/json",
|
||||
validate_default=False,
|
||||
)
|
||||
NETWORK_ACCESS_ALLOWED: bool = True
|
||||
NETWORK_TIMEOUT: int = 15
|
||||
|
||||
# Command default settings, all can be specified by MVT_ prefixed environment variables too.
|
||||
IOS_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt iOS backups"
|
||||
)
|
||||
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt Android backups"
|
||||
)
|
||||
STIX2: Optional[str] = Field(
|
||||
None, description="List of directories where STIX2 files are stored"
|
||||
)
|
||||
VT_API_KEY: Optional[str] = Field(
|
||||
None, description="API key to use for VirusTotal lookups"
|
||||
)
|
||||
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
|
||||
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
|
||||
|
||||
# Telemetry settings
|
||||
TELEMETRY: TelemetrySettings = TelemetrySettings(include=True)
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: Type[BaseSettings],
|
||||
init_settings: InitSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> Tuple[PydanticBaseSettingsSource, ...]:
|
||||
sources = (
|
||||
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
|
||||
init_settings,
|
||||
)
|
||||
# Load env variables if enabled
|
||||
if init_settings.init_kwargs.get("load_env", True):
|
||||
sources = (env_settings,) + sources
|
||||
return sources
|
||||
|
||||
def save_settings(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Save the current settings to a file.
|
||||
"""
|
||||
if not os.path.isdir(MVT_CONFIG_FOLDER):
|
||||
os.makedirs(MVT_CONFIG_FOLDER)
|
||||
|
||||
# Dump the settings to the YAML file
|
||||
model_serializable = json.loads(self.model_dump_json(exclude_defaults=True))
|
||||
with open(MVT_CONFIG_PATH, "w") as config_file:
|
||||
config_file.write(yaml.dump(model_serializable, default_flow_style=False))
|
||||
|
||||
@classmethod
|
||||
def initialise(cls) -> "MVTSettings":
|
||||
"""
|
||||
Initialise the settings file.
|
||||
|
||||
We first initialise the settings (without env variable) and then persist
|
||||
them to file. This way we can update the config file with the default values.
|
||||
|
||||
Afterwards we load the settings again, this time including the env variables.
|
||||
"""
|
||||
# Set invalid env prefix to avoid loading env variables.
|
||||
settings = MVTSettings(load_env=False)
|
||||
if not settings.TELEMETRY.DEVICE_ID:
|
||||
settings.TELEMETRY.DEVICE_ID = str(uuid.uuid4())
|
||||
settings.save_settings()
|
||||
|
||||
# Load the settings again with any ENV variables.
|
||||
settings = MVTSettings(load_env=True)
|
||||
return settings
|
||||
|
||||
|
||||
settings = MVTSettings.initialise()
|
||||
@@ -15,6 +15,8 @@ HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
|
||||
HELP_MSG_VERBOSE = "Verbose mode"
|
||||
HELP_MSG_CHECK_IOCS = "Compare stored JSON results to provided indicators"
|
||||
HELP_MSG_STIX2 = "Download public STIX2 indicators"
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK = "Disable MVT version update check"
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK = "Disable indicators update check"
|
||||
|
||||
# IOS Specific
|
||||
HELP_MSG_DECRYPT_BACKUP = "Decrypt an encrypted iTunes backup"
|
||||
|
||||
@@ -14,6 +14,7 @@ import ahocorasick
|
||||
from appdirs import user_data_dir
|
||||
|
||||
from .url import URL
|
||||
from .config import settings
|
||||
|
||||
MVT_DATA_FOLDER = user_data_dir("mvt")
|
||||
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
|
||||
@@ -41,12 +42,12 @@ class Indicators:
|
||||
|
||||
def _check_stix2_env_variable(self) -> None:
|
||||
"""
|
||||
Checks if a variable MVT_STIX2 contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
|
||||
Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
|
||||
"""
|
||||
if "MVT_STIX2" not in os.environ:
|
||||
if not settings.STIX2:
|
||||
return
|
||||
|
||||
paths = os.environ["MVT_STIX2"].split(":")
|
||||
paths = settings.STIX2.split(":")
|
||||
for path in paths:
|
||||
if os.path.isfile(path) and path.lower().endswith(".stix2"):
|
||||
self.parse_stix2(path)
|
||||
@@ -383,8 +384,7 @@ class Indicators:
|
||||
for ioc in self.get_iocs("urls"):
|
||||
if ioc["value"] == url:
|
||||
self.log.warning(
|
||||
"Found a known suspicious URL %s "
|
||||
'matching indicator "%s" from "%s"',
|
||||
'Found a known suspicious URL %s matching indicator "%s" from "%s"',
|
||||
url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
|
||||
@@ -12,74 +12,85 @@ from .updates import IndicatorsUpdates, MVTUpdates
|
||||
from .version import MVT_VERSION
|
||||
|
||||
|
||||
def check_updates() -> None:
|
||||
def check_updates(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
log = logging.getLogger("mvt")
|
||||
|
||||
# First we check for MVT version updates.
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT version: %s", e)
|
||||
else:
|
||||
if latest_version:
|
||||
if not disable_version_check:
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt`[/bold]"
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT indicators: %s", e)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT version: %s", e
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
if latest_version:
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt` or with `pipx upgrade mvt`[/bold]"
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
if not disable_indicator_check:
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT indicators: %s", e
|
||||
)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
|
||||
|
||||
def logo() -> None:
|
||||
def logo(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
rich_print("\n")
|
||||
rich_print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
|
||||
rich_print("\t\thttps://mvt.re")
|
||||
rich_print(f"\t\tVersion: {MVT_VERSION}")
|
||||
|
||||
check_updates()
|
||||
check_updates(disable_version_check, disable_indicator_check)
|
||||
|
||||
rich_print("\n")
|
||||
|
||||
@@ -69,10 +69,14 @@ class MVTModule:
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str, log: logging.Logger):
|
||||
with open(json_path, "r", encoding="utf-8") as handle:
|
||||
results = json.load(handle)
|
||||
if log:
|
||||
log.info('Loaded %d results from "%s"', len(results), json_path)
|
||||
return cls(results=results, log=log)
|
||||
try:
|
||||
results = json.load(handle)
|
||||
if log:
|
||||
log.info('Loaded %d results from "%s"', len(results), json_path)
|
||||
return cls(results=results, log=log)
|
||||
except json.decoder.JSONDecodeError as err:
|
||||
log.error('Error to decode the json "%s" file: "%s"', json_path, err)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_slug(cls) -> str:
|
||||
@@ -227,7 +231,7 @@ def run_module(module: MVTModule) -> None:
|
||||
module.save_to_json()
|
||||
|
||||
|
||||
def save_timeline(timeline: list, timeline_path: str) -> None:
|
||||
def save_timeline(timeline: list, timeline_path: str, is_utc: bool = True) -> None:
|
||||
"""Save the timeline in a csv file.
|
||||
|
||||
:param timeline: List of records to order and store
|
||||
@@ -238,7 +242,12 @@ def save_timeline(timeline: list, timeline_path: str) -> None:
|
||||
csvoutput = csv.writer(
|
||||
handle, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, escapechar="\\"
|
||||
)
|
||||
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
|
||||
|
||||
if is_utc:
|
||||
timestamp_header = "UTC Timestamp"
|
||||
else:
|
||||
timestamp_header = "Device Local Timestamp"
|
||||
csvoutput.writerow([timestamp_header, "Plugin", "Event", "Description"])
|
||||
|
||||
for event in sorted(
|
||||
timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""
|
||||
|
||||
113
src/mvt/common/telemetry.py
Normal file
113
src/mvt/common/telemetry.py
Normal file
@@ -0,0 +1,113 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
import sys
|
||||
import platform
|
||||
import requests
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
|
||||
from mvt.common.config import settings
|
||||
from mvt.common.version import MVT_VERSION
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Telemetry(object):
|
||||
"""
|
||||
MVT collects anonymous telemetry to understand how MVT is used.
|
||||
|
||||
This data is helpful to prioritize features, identify platforms and versions. It
|
||||
will also how many users are using custom indicators, modules and packages.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.endpoint = settings.TELEMETRY.ENDPOINT
|
||||
self.device_id = settings.TELEMETRY.DEVICE_ID
|
||||
|
||||
def is_telemetry_enabled(self):
|
||||
return settings.TELEMETRY.ENABLED
|
||||
|
||||
@staticmethod
|
||||
def _installation_type():
|
||||
"""Check if MVT is installed via pip, docker or source."""
|
||||
if "site-packages" in __file__:
|
||||
return "pypi"
|
||||
elif os.environ.get("MVT_DOCKER_IMAGE", None):
|
||||
return "docker"
|
||||
else:
|
||||
return "source"
|
||||
|
||||
def _get_device_properties(self):
|
||||
return {
|
||||
"os_type": platform.system(),
|
||||
"os_version": platform.platform(),
|
||||
"python_version": f"{platform.python_version()}/{platform.python_implementation()}",
|
||||
"mvt_version": MVT_VERSION,
|
||||
"mvt_installation_type": self._installation_type(),
|
||||
"mvt_package_name": __package__,
|
||||
"mvt_command": os.path.basename(sys.argv[0]),
|
||||
"telemetry_version": "0.0.1",
|
||||
}
|
||||
|
||||
def _build_event(self, event_name, event_properties):
|
||||
return {
|
||||
"event": event_name,
|
||||
"distinct_id": self.device_id,
|
||||
"properties": {
|
||||
**self._get_device_properties(),
|
||||
**event_properties,
|
||||
},
|
||||
}
|
||||
|
||||
def _send_event(self, event):
|
||||
if not self.is_telemetry_enabled():
|
||||
# Telemetry is disabled. Do not send any data.
|
||||
return
|
||||
|
||||
event_json = json.dumps(event)
|
||||
|
||||
try:
|
||||
telemetry_thread = threading.Thread(
|
||||
target=self._send_event_thread, args=(event_json,)
|
||||
)
|
||||
telemetry_thread.start()
|
||||
except Exception as e:
|
||||
logger.debug(f"Failed to send telemetry data in a thread: {e}")
|
||||
|
||||
def _send_event_thread(self, event):
|
||||
try:
|
||||
response = requests.post(
|
||||
self.endpoint,
|
||||
data=json.dumps(event),
|
||||
timeout=5,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"User-Agent": f"mvt/{MVT_VERSION}",
|
||||
},
|
||||
)
|
||||
response.raise_for_status()
|
||||
except requests.RequestException as e:
|
||||
logger.debug(f"Failed to send telemetry data: {e}")
|
||||
|
||||
def send_cli_command_event(self, command_name):
|
||||
event = self._build_event(
|
||||
event_name="run_mvt_cli_command",
|
||||
event_properties={"cli_command_name": command_name},
|
||||
)
|
||||
self._send_event(event)
|
||||
|
||||
def send_module_detections_event(self, module_name, detections):
|
||||
event = self._build_event(
|
||||
event_name="module_detections",
|
||||
event_properties={"module_name": module_name, "detections": detections},
|
||||
)
|
||||
self._send_event(event)
|
||||
|
||||
|
||||
telemetry = Telemetry()
|
||||
@@ -14,6 +14,7 @@ from packaging import version
|
||||
|
||||
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
|
||||
from .version import MVT_VERSION
|
||||
from .config import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -23,7 +24,11 @@ INDICATORS_CHECK_FREQUENCY = 12
|
||||
|
||||
class MVTUpdates:
|
||||
def check(self) -> str:
|
||||
res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15)
|
||||
try:
|
||||
res = requests.get(settings.PYPI_UPDATE_URL, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to check for updates, skipping updates: %s", e)
|
||||
return ""
|
||||
data = res.json()
|
||||
latest_version = data.get("info", {}).get("version", "")
|
||||
|
||||
@@ -92,7 +97,12 @@ class IndicatorsUpdates:
|
||||
url = self.github_raw_url.format(
|
||||
self.index_owner, self.index_repo, self.index_branch, self.index_path
|
||||
)
|
||||
res = requests.get(url, timeout=15)
|
||||
try:
|
||||
res = requests.get(url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to retrieve indicators index from %s: %s", url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to retrieve indicators index located at %s (error %d)",
|
||||
@@ -104,7 +114,12 @@ class IndicatorsUpdates:
|
||||
return yaml.safe_load(res.content)
|
||||
|
||||
def download_remote_ioc(self, ioc_url: str) -> Optional[str]:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to download indicators file from %s: %s", ioc_url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to download indicators file from %s (error %d)",
|
||||
@@ -170,7 +185,12 @@ class IndicatorsUpdates:
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
)
|
||||
res = requests.get(file_commit_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to get details about file %s: %s", file_commit_url, e)
|
||||
return -1
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to get details about file %s (error %d)",
|
||||
|
||||
@@ -10,9 +10,32 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import click
|
||||
from typing import Any, Iterator, Union
|
||||
|
||||
from rich.logging import RichHandler
|
||||
from mvt.common.telemetry import telemetry
|
||||
from mvt.common.config import settings
|
||||
|
||||
|
||||
class CommandWrapperGroup(click.Group):
|
||||
"""Allow hooks to run before and after MVT CLI commands"""
|
||||
|
||||
def add_command(self, cmd, name=None):
|
||||
click.Group.add_command(self, cmd, name=name)
|
||||
cmd.invoke = self.build_command_invoke(cmd.invoke)
|
||||
|
||||
def build_command_invoke(self, original_invoke):
|
||||
def command_invoke(ctx):
|
||||
"""Invoke the Click command"""
|
||||
|
||||
# Run telemetry before the command
|
||||
telemetry.send_cli_command_event(ctx.command.name)
|
||||
|
||||
# Run the original command
|
||||
original_invoke(ctx)
|
||||
|
||||
return command_invoke
|
||||
|
||||
|
||||
class CustomJSONEncoder(json.JSONEncoder):
|
||||
@@ -256,7 +279,7 @@ def set_verbose_logging(verbose: bool = False):
|
||||
|
||||
def exec_or_profile(module, globals, locals):
|
||||
"""Hook for profiling MVT modules"""
|
||||
if int(os.environ.get("MVT_PROFILE", False)):
|
||||
if settings.PROFILE:
|
||||
cProfile.runctx(module, globals, locals)
|
||||
else:
|
||||
exec(module, globals, locals)
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
MVT_VERSION = "2.5.4"
|
||||
MVT_VERSION = "2.6.1"
|
||||
|
||||
@@ -18,6 +18,7 @@ from mvt.common.utils import (
|
||||
generate_hashes_from_path,
|
||||
init_logging,
|
||||
set_verbose_logging,
|
||||
CommandWrapperGroup,
|
||||
)
|
||||
from mvt.common.help import (
|
||||
HELP_MSG_VERSION,
|
||||
@@ -37,6 +38,8 @@ from mvt.common.help import (
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_CHECK_IOS_BACKUP,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
from .cmd_check_backup import CmdIOSCheckBackup
|
||||
from .cmd_check_fs import CmdIOSCheckFS
|
||||
@@ -53,12 +56,37 @@ MVT_IOS_BACKUP_PASSWORD = "MVT_IOS_BACKUP_PASSWORD"
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.group(invoke_without_command=False, cls=CommandWrapperGroup)
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -100,7 +128,7 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
|
||||
if key_file:
|
||||
if MVT_IOS_BACKUP_PASSWORD in os.environ:
|
||||
log.info(
|
||||
"Ignoring %s environment variable, using --key-file" "'%s' instead",
|
||||
"Ignoring %s environment variable, using --key-file'%s' instead",
|
||||
MVT_IOS_BACKUP_PASSWORD,
|
||||
key_file,
|
||||
)
|
||||
@@ -114,7 +142,7 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
|
||||
|
||||
if MVT_IOS_BACKUP_PASSWORD in os.environ:
|
||||
log.info(
|
||||
"Ignoring %s environment variable, using --password" "argument instead",
|
||||
"Ignoring %s environment variable, using --passwordargument instead",
|
||||
MVT_IOS_BACKUP_PASSWORD,
|
||||
)
|
||||
|
||||
@@ -168,8 +196,7 @@ def extract_key(password, key_file, backup_path):
|
||||
|
||||
if MVT_IOS_BACKUP_PASSWORD in os.environ:
|
||||
log.info(
|
||||
"Ignoring %s environment variable, using --password "
|
||||
"argument instead",
|
||||
"Ignoring %s environment variable, using --password argument instead",
|
||||
MVT_IOS_BACKUP_PASSWORD,
|
||||
)
|
||||
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
|
||||
@@ -220,6 +247,8 @@ def check_backup(
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -267,6 +296,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -301,7 +332,13 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
|
||||
|
||||
if list_modules:
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
from .modules.mixed import MIXED_MODULES
|
||||
@@ -20,20 +21,28 @@ class CmdIOSCheckBackup(Command):
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
|
||||
@@ -7,6 +7,7 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.fs import FS_MODULES
|
||||
from .modules.mixed import MIXED_MODULES
|
||||
@@ -20,20 +21,27 @@ class CmdIOSCheckFS(Command):
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-fs"
|
||||
|
||||
@@ -891,6 +891,14 @@
|
||||
"version": "15.8.2",
|
||||
"build": "19H384"
|
||||
},
|
||||
{
|
||||
"version": "15.8.4",
|
||||
"build": "19H390"
|
||||
},
|
||||
{
|
||||
"version": "15.8.5",
|
||||
"build": "19H394"
|
||||
},
|
||||
{
|
||||
"build": "20A362",
|
||||
"version": "16.0"
|
||||
@@ -992,6 +1000,14 @@
|
||||
"version": "16.7.8",
|
||||
"build": "20H343"
|
||||
},
|
||||
{
|
||||
"version": "16.7.11",
|
||||
"build": "20H360"
|
||||
},
|
||||
{
|
||||
"version": "16.7.12",
|
||||
"build": "20H364"
|
||||
},
|
||||
{
|
||||
"version": "17.0",
|
||||
"build": "21A327"
|
||||
@@ -1076,6 +1092,10 @@
|
||||
"version": "17.6.1",
|
||||
"build": "21G101"
|
||||
},
|
||||
{
|
||||
"version": "17.7.7",
|
||||
"build": "21H433"
|
||||
},
|
||||
{
|
||||
"version": "18",
|
||||
"build": "22A3354"
|
||||
@@ -1095,5 +1115,61 @@
|
||||
{
|
||||
"version": "18.2",
|
||||
"build": "22C152"
|
||||
},
|
||||
{
|
||||
"version": "18.2.1",
|
||||
"build": "22C161"
|
||||
},
|
||||
{
|
||||
"version": "18.3",
|
||||
"build": "22D63"
|
||||
},
|
||||
{
|
||||
"version": "18.3.1",
|
||||
"build": "22D72"
|
||||
},
|
||||
{
|
||||
"version": "18.4",
|
||||
"build": "22E240"
|
||||
},
|
||||
{
|
||||
"version": "18.4.1",
|
||||
"build": "22E252"
|
||||
},
|
||||
{
|
||||
"version": "18.5",
|
||||
"build": "22F76"
|
||||
},
|
||||
{
|
||||
"version": "18.6",
|
||||
"build": "22G86"
|
||||
},
|
||||
{
|
||||
"version": "18.6.1",
|
||||
"build": "22G90"
|
||||
},
|
||||
{
|
||||
"version": "18.6.2",
|
||||
"build": "22G100"
|
||||
},
|
||||
{
|
||||
"version": "18.7",
|
||||
"build": "22H20"
|
||||
},
|
||||
{
|
||||
"version": "18.7.2",
|
||||
"build": "22H124"
|
||||
},
|
||||
{
|
||||
"version": "26",
|
||||
"build": "23A341"
|
||||
},
|
||||
{
|
||||
"version": "26.0.1",
|
||||
"build": "23A355"
|
||||
},
|
||||
{
|
||||
"version": "26.1",
|
||||
"build": "23B85"
|
||||
}
|
||||
]
|
||||
@@ -41,7 +41,7 @@ class BackupInfo(IOSExtraction):
|
||||
info_path = os.path.join(self.target_path, "Info.plist")
|
||||
if not os.path.exists(info_path):
|
||||
raise DatabaseNotFoundError(
|
||||
"No Info.plist at backup path, unable to extract device " "information"
|
||||
"No Info.plist at backup path, unable to extract device information"
|
||||
)
|
||||
|
||||
with open(info_path, "rb") as handle:
|
||||
|
||||
@@ -110,8 +110,7 @@ class Manifest(IOSExtraction):
|
||||
ioc = self.indicators.check_url(part)
|
||||
if ioc:
|
||||
self.log.warning(
|
||||
'Found mention of domain "%s" in a backup file with '
|
||||
"path: %s",
|
||||
'Found mention of domain "%s" in a backup file with path: %s',
|
||||
ioc["value"],
|
||||
rel_path,
|
||||
)
|
||||
|
||||
@@ -74,7 +74,7 @@ class IOSExtraction(MVTModule):
|
||||
|
||||
if not shutil.which("sqlite3"):
|
||||
raise DatabaseCorruptedError(
|
||||
"failed to recover without sqlite3 binary: please install " "sqlite3!"
|
||||
"failed to recover without sqlite3 binary: please install sqlite3!"
|
||||
)
|
||||
if '"' in file_path:
|
||||
raise DatabaseCorruptedError(
|
||||
|
||||
@@ -43,6 +43,8 @@ class GlobalPreferences(IOSExtraction):
|
||||
self.log.warning("Lockdown mode enabled")
|
||||
else:
|
||||
self.log.warning("Lockdown mode disabled")
|
||||
return
|
||||
self.log.warning("Lockdown mode disabled")
|
||||
|
||||
def process_file(self, file_path: str) -> None:
|
||||
with open(file_path, "rb") as handle:
|
||||
|
||||
@@ -95,14 +95,17 @@ class SafariBrowserState(IOSExtraction):
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
# Old version iOS <12 likely
|
||||
cur.execute(
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
title, url, user_visible_url, last_viewed_time, session_data
|
||||
FROM tabs
|
||||
ORDER BY last_viewed_time;
|
||||
"""
|
||||
SELECT
|
||||
title, url, user_visible_url, last_viewed_time, session_data
|
||||
FROM tabs
|
||||
ORDER BY last_viewed_time;
|
||||
"""
|
||||
)
|
||||
)
|
||||
except sqlite3.OperationalError as e:
|
||||
self.log.error(f"Error executing query: {e}")
|
||||
|
||||
for row in cur:
|
||||
session_entries = []
|
||||
|
||||
@@ -43,7 +43,7 @@ class SMS(IOSExtraction):
|
||||
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
text = record["text"].replace("\n", "\\n")
|
||||
sms_data = f"{record['service']}: {record['guid']} \"{text}\" from {record['phone_number']} ({record['account']})"
|
||||
sms_data = f'{record["service"]}: {record["guid"]} "{text}" from {record["phone_number"]} ({record["account"]})'
|
||||
records = [
|
||||
{
|
||||
"timestamp": record["isodate"],
|
||||
|
||||
@@ -116,13 +116,16 @@ class TCC(IOSExtraction):
|
||||
)
|
||||
db_version = "v2"
|
||||
except sqlite3.OperationalError:
|
||||
cur.execute(
|
||||
"""SELECT
|
||||
service, client, client_type, allowed,
|
||||
prompt_count
|
||||
FROM access;"""
|
||||
)
|
||||
db_version = "v1"
|
||||
try:
|
||||
cur.execute(
|
||||
"""SELECT
|
||||
service, client, client_type, allowed,
|
||||
prompt_count
|
||||
FROM access;"""
|
||||
)
|
||||
db_version = "v1"
|
||||
except sqlite3.OperationalError as e:
|
||||
self.log.error(f"Error parsing TCC database: {e}")
|
||||
|
||||
for row in cur:
|
||||
service = row[0]
|
||||
|
||||
@@ -100,7 +100,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
redirect_path += ", ".join(source_domains)
|
||||
redirect_path += " -> "
|
||||
|
||||
redirect_path += f"ORIGIN: \"{entry['origin']}\""
|
||||
redirect_path += f'ORIGIN: "{entry["origin"]}"'
|
||||
|
||||
if len(destination_domains) > 0:
|
||||
redirect_path += " -> "
|
||||
@@ -127,6 +127,24 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
browsing_stats = file_plist["browsingStatistics"]
|
||||
|
||||
for item in browsing_stats:
|
||||
most_recent_interaction, last_seen = None, None
|
||||
if "mostRecentUserInteraction" in item:
|
||||
try:
|
||||
most_recent_interaction = convert_datetime_to_iso(
|
||||
item["mostRecentUserInteraction"]
|
||||
)
|
||||
except Exception:
|
||||
self.log.error(
|
||||
f'Error converting date of Safari resource"most recent interaction": {item["mostRecentUserInteraction"]}'
|
||||
)
|
||||
if "lastSeen" in item:
|
||||
try:
|
||||
last_seen = convert_datetime_to_iso(item["lastSeen"])
|
||||
except Exception:
|
||||
self.log.error(
|
||||
f'Error converting date of Safari resource"last seen": {item["lastSeen"]}'
|
||||
)
|
||||
|
||||
items.append(
|
||||
{
|
||||
"origin": item.get("PrevalentResourceOrigin", ""),
|
||||
@@ -139,10 +157,8 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
"subresourceUnderTopFrameOrigins", ""
|
||||
),
|
||||
"user_interaction": item.get("hadUserInteraction"),
|
||||
"most_recent_interaction": convert_datetime_to_iso(
|
||||
item["mostRecentUserInteraction"]
|
||||
),
|
||||
"last_seen": convert_datetime_to_iso(item["lastSeen"]),
|
||||
"most_recent_interaction": most_recent_interaction,
|
||||
"last_seen": last_seen,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@@ -38,44 +38,70 @@ class NetBase(IOSExtraction):
|
||||
|
||||
def _extract_net_data(self):
|
||||
conn = sqlite3.connect(self.file_path)
|
||||
conn.row_factory = sqlite3.Row
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
ZPROCESS.ZFIRSTTIMESTAMP,
|
||||
ZPROCESS.ZTIMESTAMP,
|
||||
ZPROCESS.ZPROCNAME,
|
||||
ZPROCESS.ZBUNDLENAME,
|
||||
ZPROCESS.Z_PK AS ZPROCESS_PK,
|
||||
ZLIVEUSAGE.ZWIFIIN,
|
||||
ZLIVEUSAGE.ZWIFIOUT,
|
||||
ZLIVEUSAGE.ZWWANIN,
|
||||
ZLIVEUSAGE.ZWWANOUT,
|
||||
ZLIVEUSAGE.Z_PK AS ZLIVEUSAGE_PK,
|
||||
ZLIVEUSAGE.ZHASPROCESS,
|
||||
ZLIVEUSAGE.ZTIMESTAMP AS ZL_TIMESTAMP
|
||||
FROM ZLIVEUSAGE
|
||||
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
|
||||
UNION
|
||||
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
|
||||
NULL, NULL, NULL, NULL, NULL, NULL, NULL
|
||||
FROM ZPROCESS WHERE Z_PK NOT IN
|
||||
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
|
||||
"""
|
||||
SELECT
|
||||
ZPROCESS.ZFIRSTTIMESTAMP,
|
||||
ZPROCESS.ZTIMESTAMP,
|
||||
ZPROCESS.ZPROCNAME,
|
||||
ZPROCESS.ZBUNDLENAME,
|
||||
ZPROCESS.Z_PK,
|
||||
ZLIVEUSAGE.ZWIFIIN,
|
||||
ZLIVEUSAGE.ZWIFIOUT,
|
||||
ZLIVEUSAGE.ZWWANIN,
|
||||
ZLIVEUSAGE.ZWWANOUT,
|
||||
ZLIVEUSAGE.Z_PK,
|
||||
ZLIVEUSAGE.ZHASPROCESS,
|
||||
ZLIVEUSAGE.ZTIMESTAMP
|
||||
FROM ZLIVEUSAGE
|
||||
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
|
||||
UNION
|
||||
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
|
||||
NULL, NULL, NULL, NULL, NULL, NULL, NULL
|
||||
FROM ZPROCESS WHERE Z_PK NOT IN
|
||||
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
|
||||
"""
|
||||
)
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
# Recent phones don't have ZWIFIIN and ZWIFIOUT columns
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
ZPROCESS.ZFIRSTTIMESTAMP,
|
||||
ZPROCESS.ZTIMESTAMP,
|
||||
ZPROCESS.ZPROCNAME,
|
||||
ZPROCESS.ZBUNDLENAME,
|
||||
ZPROCESS.Z_PK AS ZPROCESS_PK,
|
||||
ZLIVEUSAGE.ZWWANIN,
|
||||
ZLIVEUSAGE.ZWWANOUT,
|
||||
ZLIVEUSAGE.Z_PK AS ZLIVEUSAGE_PK,
|
||||
ZLIVEUSAGE.ZHASPROCESS,
|
||||
ZLIVEUSAGE.ZTIMESTAMP AS ZL_TIMESTAMP
|
||||
FROM ZLIVEUSAGE
|
||||
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK
|
||||
UNION
|
||||
SELECT ZFIRSTTIMESTAMP, ZTIMESTAMP, ZPROCNAME, ZBUNDLENAME, Z_PK,
|
||||
NULL, NULL, NULL, NULL, NULL
|
||||
FROM ZPROCESS WHERE Z_PK NOT IN
|
||||
(SELECT ZHASPROCESS FROM ZLIVEUSAGE);
|
||||
"""
|
||||
)
|
||||
|
||||
for row in cur:
|
||||
# ZPROCESS records can be missing after the JOIN.
|
||||
# Handle NULL timestamps.
|
||||
if row[0] and row[1]:
|
||||
first_isodate = convert_mactime_to_iso(row[0])
|
||||
isodate = convert_mactime_to_iso(row[1])
|
||||
if row["ZFIRSTTIMESTAMP"] and row["ZTIMESTAMP"]:
|
||||
first_isodate = convert_mactime_to_iso(row["ZFIRSTTIMESTAMP"])
|
||||
isodate = convert_mactime_to_iso(row["ZTIMESTAMP"])
|
||||
else:
|
||||
first_isodate = row[0]
|
||||
isodate = row[1]
|
||||
first_isodate = row["ZFIRSTTIMESTAMP"]
|
||||
isodate = row["ZTIMESTAMP"]
|
||||
|
||||
if row[11]:
|
||||
live_timestamp = convert_mactime_to_iso(row[11])
|
||||
if row["ZL_TIMESTAMP"]:
|
||||
live_timestamp = convert_mactime_to_iso(row["ZL_TIMESTAMP"])
|
||||
else:
|
||||
live_timestamp = ""
|
||||
|
||||
@@ -83,16 +109,18 @@ class NetBase(IOSExtraction):
|
||||
{
|
||||
"first_isodate": first_isodate,
|
||||
"isodate": isodate,
|
||||
"proc_name": row[2],
|
||||
"bundle_id": row[3],
|
||||
"proc_id": row[4],
|
||||
"wifi_in": row[5],
|
||||
"wifi_out": row[6],
|
||||
"wwan_in": row[7],
|
||||
"wwan_out": row[8],
|
||||
"live_id": row[9],
|
||||
"live_proc_id": row[10],
|
||||
"live_isodate": live_timestamp if row[11] else first_isodate,
|
||||
"proc_name": row["ZPROCNAME"],
|
||||
"bundle_id": row["ZBUNDLENAME"],
|
||||
"proc_id": row["ZPROCESS_PK"],
|
||||
"wifi_in": row["ZWIFIIN"] if "ZWIFIIN" in row.keys() else None,
|
||||
"wifi_out": row["ZWIFIOUT"] if "ZWIFIOUT" in row.keys() else None,
|
||||
"wwan_in": row["ZWWANIN"],
|
||||
"wwan_out": row["ZWWANOUT"],
|
||||
"live_id": row["ZLIVEUSAGE_PK"],
|
||||
"live_proc_id": row["ZHASPROCESS"],
|
||||
"live_isodate": live_timestamp
|
||||
if row["ZL_TIMESTAMP"]
|
||||
else first_isodate,
|
||||
}
|
||||
)
|
||||
|
||||
@@ -108,8 +136,6 @@ class NetBase(IOSExtraction):
|
||||
)
|
||||
record_data_usage = (
|
||||
record_data + " "
|
||||
f"WIFI IN: {record['wifi_in']}, "
|
||||
f"WIFI OUT: {record['wifi_out']} - "
|
||||
f"WWAN IN: {record['wwan_in']}, "
|
||||
f"WWAN OUT: {record['wwan_out']}"
|
||||
)
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user