mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-16 02:12:46 +00:00
Compare commits
95 Commits
refactor/s
...
v2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
088a3f453a | ||
|
|
6a76191155 | ||
|
|
7173e02a6f | ||
|
|
c779009550 | ||
|
|
8f34902bed | ||
|
|
939bec82ff | ||
|
|
b183ca33b5 | ||
|
|
a2c9e0c6cf | ||
|
|
4bfad1f87d | ||
|
|
c3dc3d96d5 | ||
|
|
afab222f93 | ||
|
|
5a1166c416 | ||
|
|
dd3d665bea | ||
|
|
5c3b92aeee | ||
|
|
d7e058af43 | ||
|
|
cdbaad94cc | ||
|
|
801c464492 | ||
|
|
6d1d499c4e | ||
|
|
cc7781e255 | ||
|
|
c6837a455a | ||
|
|
b1f0a2de06 | ||
|
|
d259ab4810 | ||
|
|
d4b970c7c0 | ||
|
|
4b6a101cc7 | ||
|
|
5b1f4df7a4 | ||
|
|
301582d7dd | ||
|
|
af8c56675b | ||
|
|
2302e74a86 | ||
|
|
981371bd8b | ||
|
|
c7d00978c6 | ||
|
|
339a1d0712 | ||
|
|
7009cddc8c | ||
|
|
9b4d10139c | ||
|
|
b795ea3129 | ||
|
|
5be5ffbf49 | ||
|
|
2701490501 | ||
|
|
779842567d | ||
|
|
d3cc8cf590 | ||
|
|
b8a42eaf8f | ||
|
|
62b880fbff | ||
|
|
0778d448df | ||
|
|
f020655a1a | ||
|
|
91c34e6664 | ||
|
|
b4a8dd226a | ||
|
|
88213e12c9 | ||
|
|
f75b8e186a | ||
|
|
5babc1fcf3 | ||
|
|
b723ebf28e | ||
|
|
616e870212 | ||
|
|
847b0e087b | ||
|
|
86a0772eb2 | ||
|
|
7d0be9db4f | ||
|
|
4e120b2640 | ||
|
|
dbe9e5db9b | ||
|
|
0b00398729 | ||
|
|
87034d2c7a | ||
|
|
595a2f6536 | ||
|
|
8ead44a31e | ||
|
|
5c19d02a73 | ||
|
|
14ebc9ee4e | ||
|
|
de53cc07f8 | ||
|
|
22e066fc4a | ||
|
|
242052b8ec | ||
|
|
1df61b5bbf | ||
|
|
b691de2cc0 | ||
|
|
10915f250c | ||
|
|
c60cef4009 | ||
|
|
dda798df8e | ||
|
|
ffe6ad2014 | ||
|
|
a125b20fc5 | ||
|
|
49108e67e2 | ||
|
|
883b450601 | ||
|
|
ce813568ff | ||
|
|
93303f181a | ||
|
|
bee453a090 | ||
|
|
42106aa4d6 | ||
|
|
95076c8f71 | ||
|
|
c9ac12f336 | ||
|
|
486e3e7e9b | ||
|
|
be1fc3bd8b | ||
|
|
4757cff262 | ||
|
|
61f51caf31 | ||
|
|
511063fd0e | ||
|
|
88bc5672cb | ||
|
|
0fce0acf7a | ||
|
|
61f95d07d3 | ||
|
|
3dedd169c4 | ||
|
|
e34e03d3a3 | ||
|
|
34374699ce | ||
|
|
cf5aa7c89f | ||
|
|
2766739512 | ||
|
|
9c84afb4b0 | ||
|
|
80fc8bd879 | ||
|
|
ca41f7f106 | ||
|
|
55ddd86ad5 |
11
.github/dependabot.yml
vendored
Normal file
11
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ['3.8', '3.9', '3.10'] # , '3.11']
|
||||
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
@@ -35,4 +35,4 @@ jobs:
|
||||
if: github.event_name == 'pull_request'
|
||||
with:
|
||||
pytest-coverage-path: ./pytest-coverage.txt
|
||||
junitxml-path: ./pytest.xml
|
||||
junitxml-path: ./pytest.xml
|
||||
|
||||
1
.github/workflows/update-ios-data.yml
vendored
1
.github/workflows/update-ios-data.yml
vendored
@@ -21,6 +21,7 @@ jobs:
|
||||
title: '[auto] Update iOS releases and versions'
|
||||
commit-message: Add new iOS versions and build numbers
|
||||
branch: auto/add-new-ios-releases
|
||||
draft: true
|
||||
body: |
|
||||
This is an automated pull request to update the iOS releases and version numbers.
|
||||
add-paths: |
|
||||
|
||||
@@ -103,7 +103,7 @@ RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
|
||||
|
||||
|
||||
# Create main image
|
||||
FROM ubuntu:22.04 as main
|
||||
FROM ubuntu:24.04 as main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
@@ -135,8 +135,7 @@ COPY --from=build-usbmuxd /build /
|
||||
COPY . mvt/
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y git python3-pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --upgrade pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install ./mvt \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --break-system-packages ./mvt \
|
||||
&& apt-get remove -y python3-pip git && apt-get autoremove -y \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& rm -rf mvt
|
||||
|
||||
9
Makefile
9
Makefile
@@ -1,14 +1,9 @@
|
||||
PWD = $(shell pwd)
|
||||
|
||||
autofix:
|
||||
ruff format .
|
||||
ruff check --fix .
|
||||
|
||||
check: ruff mypy
|
||||
|
||||
ruff:
|
||||
ruff format --check .
|
||||
ruff check -q .
|
||||
ruff check .
|
||||
|
||||
mypy:
|
||||
mypy
|
||||
@@ -23,7 +18,7 @@ install:
|
||||
python3 -m pip install --upgrade -e .
|
||||
|
||||
test-requirements:
|
||||
python3 -m pip install --upgrade -r test-requirements.txt
|
||||
python3 -m pip install --upgrade --group dev
|
||||
|
||||
generate-proto-parsers:
|
||||
# Generate python parsers for protobuf files
|
||||
|
||||
59
SECURITY.md
59
SECURITY.md
@@ -2,4 +2,61 @@
|
||||
|
||||
Thank you for your interest in reporting security issues and vulnerabilities! Security research is of utmost importance and we take all reports seriously. If you discover an issue please report it to us right away!
|
||||
|
||||
Please DO NOT file a public issue, instead send your report privately to *nex [at] nex [dot] sx*. You can also write PGP-encrypted emails to [this key](https://keybase.io/nex/pgp_keys.asc?fingerprint=05216f3b86848a303c2fe37dd166f1667359d880).
|
||||
Please DO NOT file a public issue, instead send your report privately to the MVT maintainers at Amnesty International via `security [at] amnesty [dot] tech`.
|
||||
|
||||
You can also write PGP-encrypted emails to key `CFBF9698DCA8EB2A80F48ADEA035A030FA04ED13`. The corresponding PGP public key is lited below.
|
||||
|
||||
```
|
||||
-----BEGIN PGP PUBLIC KEY BLOCK-----
|
||||
|
||||
mQINBGlFPwsBEADQ+d7SeHrFPYv3wPOjWs2oMpp0DPdfIyGbg+iYWOC36FegZhKY
|
||||
+WeK96GqJWt8wD6kwFUVwQI795WZrjSd1q4a7wR+kj/h7xlRB6ZfVICA6O5DOOm6
|
||||
GNMvqy7ESm8g1XZDpb2u1BXmSS9X8f6rjB0e86kYsF1mB5/2USTM63jgDs0GGTkZ
|
||||
Q1z4Mq4gYyqH32b3gvXkbb68LeQmONUIM3cgmec9q8/pNc1l7fcoLWhOVADRj17Q
|
||||
plisa/EUf/SYqdtk9w7EHGggNenKNwVM235mkPcMqmE72bTpjT6XCxvZY3ByG5yi
|
||||
7L+tHJU45ZuXtt62EvX03azxThVfSmH/WbRk8lH8+CW8XMmiWZphG4ydPWqgVKCB
|
||||
2UOXm+6CQnKA+7Dt1AeK2t5ciATrv9LvwgSxk5WKc3288XFLA6eGMrTdQygYlLjJ
|
||||
+42RSdK/7fCt/qk4q13oUw8ZTVcCia98uZFi704XuuYTH6NrntIB7j/0oucIS4Y9
|
||||
cTWNO5LBerez4v8VI4YHcYESPeIWGFkXhvJzo0VMg1zidBLtiPoGF2JKZGwaK7/p
|
||||
yY1xALskLp4H+5OY4eB1kf8kl4vGsEK8xA/NNzOiapVmwBXpvVvmXIQJE2k+olNf
|
||||
sAuyB8+aO1Ws7tFYt3D+olC7iaprOdK7uA4GCgmYYhq6QQPg+cxfczgHfwARAQAB
|
||||
tD1TZWN1cml0eSBMYWIgYXQgQW1uZXN0eSBJbnRlcm5hdGlvbmFsIDxzZWN1cml0
|
||||
eUBhbW5lc3R5LnRlY2g+iQJRBBMBCAA7FiEEz7+WmNyo6yqA9IreoDWgMPoE7RMF
|
||||
AmlFPwsCGwMFCwkIBwICIgIGFQoJCAsCBBYCAwECHgcCF4AACgkQoDWgMPoE7RNr
|
||||
2w//a88uP90uSN6lgeIwKsHr1ri27QIBbzCV6hLN/gZBFR2uaiOn/xfFDbnR0Cjo
|
||||
5nMCJCT1k4nrPbMTlfmWLCD+YKELBzVqWlw4J2SOg3nznPl2JrL8QBKjwts0sF+h
|
||||
QbRWDsT54wBZnl6ZJJ79eLShNTokBbKnQ7071dMrENr5e2P2sClQXyiIc51ga4FM
|
||||
fHyhsx+GsrdiZNd2AH8912ljW1GuEi3epTO7KMZprmr37mjpZSUToiV59Yhl1Gbo
|
||||
2pixkYJqi62DG02/gTpCjq9NH3cEMxcxjh4E7yCA8ggLG6+IN6woIvPIdOsnQ+Yj
|
||||
d3H4rMNBjPSKoL+bdHILkCnp5HokcbVjNY3QAyOAF4qWhk4GtgpTshwxUmb4Tbay
|
||||
tWLJC2bzjuUBxLkGzMVFfU3B96sVS4Fi0sBaEMBtHskl2f45X8LJhSq//Lw/2L/8
|
||||
34uP/RxDSn+DPvj/yqMpekdCcmeFSTX1A19xkPcc0rVhMRde4VL338R86vzh0gMI
|
||||
1LySDAhXZyVWzrQ5s3n6N3EvCaHCn3qu7ieyFJifCSR7gZqevCEznMQRVpkMTzUt
|
||||
rk13Z6NOOb4IlTW7HFoY3omJG8Z5jV4kMIE7n6nb0qpNYQiG+YvjenQ3VrMoISyh
|
||||
lpS2De8+oOtwrxBVX3+qKWvQqzufeE3416kw2Z+5mxH7bx25Ag0EaUU/CwEQALyZ
|
||||
b+kwLN1yHObTm2yDBEn5HbCT3H1GremvPNmbAaTnfrjUngoKa8MuWWzbX5ptgmZR
|
||||
UpYY/ylOYcgGydz58vUNrPlhIZT9UhmiifPgZLEXyd0uFpr/NsbRajHMkK10iEZf
|
||||
h5bHNobiB7pGCu4Uj9e1cMiIZ4yEaYeyXYUoNHf6ISP39mJhHy6ov5yIpm9q0wzm
|
||||
tGUQPupxGXmEZlOPr3lxqXQ3Ekdv6cWDY5r/oOq71QJ/HUQ13QUuGFIbhnMbT8zd
|
||||
zaS6f/v772YKsWPc4NNUhtlf25VnQ4FuUtjCe3p6iYP4OVD8gJm0GvXyvyTuiQbL
|
||||
CSk/378JiNT7nZzYXxrWchMwvEoMIU55+/UaBc50HI5xvDQ858CX7PYGiimcdsO1
|
||||
EkQzhVxRfjlILfWrC2lgt+H5qhTn4Fah250Xe1PnLjXGHVUQnY/f3MFeiWQgf92b
|
||||
02+MfvOeC5OKttP1z5lcx6RFWCIa1E/u8Nj7YrH9hk0ZBRAnBaeAncDFY8dfX2zX
|
||||
VMoc0dV16gM7RrZ6i7D3CG3eLLkQlX0jbW9dzTuG/3f098EWB1p8vOfS/RbNCBRX
|
||||
jqGiqacL/aFF3Ci3nQ4O5tSv1XipbgrUhvXnwm9pxrLPS/45iaO59WN4RRGWLLQ7
|
||||
LHmeBxoa9avv0SdBYUL+eBxY46GXb/j5VLzHYhSnABEBAAGJAjYEGAEIACAWIQTP
|
||||
v5aY3KjrKoD0it6gNaAw+gTtEwUCaUU/CwIbDAAKCRCgNaAw+gTtEyvsEACnyFFD
|
||||
alOZTrrJTXNnUejuiExLh+qTO3T91p5bte597jpwCZnYGwkxEfffsqqhlY6ftEOf
|
||||
d5tNWE5isai4v8XCbplWomz4KBpepxcn2b+9o5dSyr1vohEFuCJziZDsta1J2DX5
|
||||
IE9U48kTgLDfdIBhuOyHNRkvXRHP2OVLCaiw4d9q+hlrraR8pehHt2BJSxh+QZoe
|
||||
n0iHvIZCBIUA45zLEGmXFpNTGeEf2dKPp3xOkAXOhAMPptE0V1itkF3R7kEW4aFO
|
||||
SZo8L3C1aWSz/gQ4/vvW5t1IJxirNMUgTMQFvqEkAwX3fm6GCxlgRSvTTRXdcrS8
|
||||
6qyFdH1nkCNsavPahN3N2RGGIlWtODEMTO1Hjy0kZtTYdW+JH9sendliCoJES+yN
|
||||
DjM125SgdAgrqlSYm/g8n9knWpxZv1QM6jU/sVz1J+l6/ixugL2i+CAL2d6uv4tT
|
||||
QmXnu7Ei4/2kHBUu3Lf59MNgmLHm6F7AhOWErszSeoJKsp+3yA1oTT/npz67sRzY
|
||||
VVyxz4NBIollna59a1lz0RhlWzNKqNB27jhylyM4ltdzHB7r4VMAVJyttozmIIOC
|
||||
35ucYxl5BHLuapaRSaYHdUId1LOccYyaOOFF/PSyCu9dKzXk7zEz2HNcIboWSkAE
|
||||
8ZDExMYM4WVpVCOj+frdsaBvzItHacRWuijtkw==
|
||||
=JAXX
|
||||
-----END PGP PUBLIC KEY BLOCK-----
|
||||
```
|
||||
|
||||
@@ -1,4 +1,28 @@
|
||||
# Deprecation of ADB command in MVT
|
||||
# Check over ADB
|
||||
|
||||
In order to check an Android device over the [Android Debug Bridge (adb)](https://developer.android.com/studio/command-line/adb) you will first need to install [Android SDK Platform Tools](https://developer.android.com/studio/releases/platform-tools). If you have installed [Android Studio](https://developer.android.com/studio/) you should already have access to `adb` and other utilities.
|
||||
|
||||
While many Linux distributions already package Android Platform Tools (for example `android-platform-tools-base` on Debian), it is preferable to install the most recent version from the official website. Packaged versions might be outdated and incompatible with most recent Android handsets.
|
||||
|
||||
Next you will need to enable debugging on the Android device you are testing. [Please follow the official instructions on how to do so.](https://developer.android.com/studio/command-line/adb)
|
||||
|
||||
## Connecting over USB
|
||||
|
||||
The easiest way to check the device is over a USB transport. You will need to have USB debugging enabled and the device plugged into your computer. If everything is configured appropriately you should see your device when launching the command `adb devices`.
|
||||
|
||||
Now you can try launching MVT with:
|
||||
|
||||
```bash
|
||||
mvt-android check-adb --output /path/to/results
|
||||
```
|
||||
|
||||
!!! warning
|
||||
The `check-adb` command is deprecated and will be removed in a future release.
|
||||
Whenever possible, prefer acquiring device data using the AndroidQF project (https://github.com/mvt-project/androidqf/) and then analyze those acquisitions with MVT.
|
||||
|
||||
Running `mvt-android check-adb` will also emit a runtime deprecation warning advising you to migrate to AndroidQF.
|
||||
|
||||
If you have previously started an adb daemon MVT will alert you and require you to kill it with `adb kill-server` and relaunch the command.
|
||||
|
||||
!!! warning
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
mkdocs==1.6.1
|
||||
mkdocs-autorefs==1.2.0
|
||||
mkdocs-material==9.5.42
|
||||
mkdocs-autorefs==1.4.3
|
||||
mkdocs-material==9.6.20
|
||||
mkdocs-material-extensions==1.3.1
|
||||
mkdocstrings==0.23.0
|
||||
mkdocstrings==0.30.1
|
||||
@@ -1,13 +1,11 @@
|
||||
[project]
|
||||
name = "mvt"
|
||||
dynamic = ["version"]
|
||||
authors = [
|
||||
{name = "Claudio Guarnieri", email = "nex@nex.sx"}
|
||||
]
|
||||
authors = [{ name = "Claudio Guarnieri", email = "nex@nex.sx" }]
|
||||
maintainers = [
|
||||
{name = "Etienne Maynier", email = "tek@randhome.io"},
|
||||
{name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org"},
|
||||
{name = "Rory Flynn", email = "rory.flynn@amnesty.org"}
|
||||
{ name = "Etienne Maynier", email = "tek@randhome.io" },
|
||||
{ name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org" },
|
||||
{ name = "Rory Flynn", email = "rory.flynn@amnesty.org" },
|
||||
]
|
||||
description = "Mobile Verification Toolkit"
|
||||
readme = "README.md"
|
||||
@@ -16,46 +14,61 @@ classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Information Technology",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python"
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
dependencies = [
|
||||
"click >=8.1.3",
|
||||
"rich >=12.6.0",
|
||||
"tld >=0.12.6",
|
||||
"requests >=2.28.1",
|
||||
"simplejson >=3.17.6",
|
||||
"packaging >=21.3",
|
||||
"appdirs >=1.4.4",
|
||||
"iOSbackup >=0.9.923",
|
||||
"cryptography >=42.0.5",
|
||||
"pyyaml >=6.0",
|
||||
"pyahocorasick >= 2.0.0",
|
||||
"betterproto >=1.2.0",
|
||||
"pydantic >= 2.10.0",
|
||||
"pydantic-settings >= 2.7.0",
|
||||
'backports.zoneinfo; python_version < "3.9"',
|
||||
"click==8.3.0",
|
||||
"rich==14.1.0",
|
||||
"tld==0.13.1",
|
||||
"requests==2.32.5",
|
||||
"simplejson==3.20.2",
|
||||
"packaging==25.0",
|
||||
"appdirs==1.4.4",
|
||||
"iOSbackup==0.9.925",
|
||||
"adb-shell[usb]==0.4.4",
|
||||
"libusb1==3.3.1",
|
||||
"cryptography==46.0.3",
|
||||
"PyYAML>=6.0.2",
|
||||
"pyahocorasick==2.2.0",
|
||||
"betterproto==1.2.5",
|
||||
"pydantic==2.12.3",
|
||||
"pydantic-settings==2.10.1",
|
||||
"NSKeyedUnArchiver==1.5.2",
|
||||
"python-dateutil==2.9.0.post0",
|
||||
"tzdata==2025.2",
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
[project.urls]
|
||||
homepage = "https://docs.mvt.re/en/latest/"
|
||||
repository = "https://github.com/mvt-project/mvt"
|
||||
|
||||
[project.scripts]
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"requests>=2.31.0",
|
||||
"pytest>=7.4.3",
|
||||
"pytest-cov>=4.1.0",
|
||||
"pytest-github-actions-annotate-failures>=0.2.0",
|
||||
"pytest-mock>=3.14.0",
|
||||
"stix2>=3.0.1",
|
||||
"ruff>=0.1.6",
|
||||
"mypy>=1.7.1",
|
||||
"betterproto[compiler]",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.coverage.run]
|
||||
omit = [
|
||||
"tests/*",
|
||||
]
|
||||
omit = ["tests/*"]
|
||||
|
||||
[tool.coverage.html]
|
||||
directory= "htmlcov"
|
||||
directory = "htmlcov"
|
||||
|
||||
[tool.mypy]
|
||||
install_types = true
|
||||
@@ -65,15 +78,13 @@ packages = "src"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
|
||||
testpaths = [
|
||||
"tests"
|
||||
]
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
[tool.ruff]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
ignore = [
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
|
||||
# These were previously ignored but don't seem to be required:
|
||||
# "E265", # no-space-after-block-comment
|
||||
@@ -84,15 +95,15 @@ ignore = [
|
||||
# "E203", # whitespace-before-punctuation
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
[tool.ruff.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
|
||||
[tool.ruff.lint.mccabe]
|
||||
[tool.ruff.mccabe]
|
||||
max-complexity = 10
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
package-dir = {"" = "src"}
|
||||
package-dir = { "" = "src" }
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
@@ -101,4 +112,4 @@ where = ["src"]
|
||||
mvt = ["ios/data/*.json"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "mvt.common.version.MVT_VERSION"}
|
||||
version = { attr = "mvt.common.version.MVT_VERSION" }
|
||||
|
||||
@@ -20,23 +20,39 @@ class AndroidArtifact(Artifact):
|
||||
:param binary: whether the dumpsys should be pared as binary or not (bool)
|
||||
:return: section extracted (string or bytes)
|
||||
"""
|
||||
lines = []
|
||||
in_section = False
|
||||
delimiter = "------------------------------------------------------------------------------"
|
||||
delimiter_str = "------------------------------------------------------------------------------"
|
||||
delimiter_bytes = b"------------------------------------------------------------------------------"
|
||||
|
||||
if binary:
|
||||
delimiter = delimiter.encode("utf-8")
|
||||
lines_bytes = []
|
||||
for line in dumpsys.splitlines(): # type: ignore[union-attr]
|
||||
if line.strip() == separator: # type: ignore[arg-type]
|
||||
in_section = True
|
||||
continue
|
||||
|
||||
for line in dumpsys.splitlines():
|
||||
if line.strip() == separator:
|
||||
in_section = True
|
||||
continue
|
||||
if not in_section:
|
||||
continue
|
||||
|
||||
if not in_section:
|
||||
continue
|
||||
if line.strip().startswith(delimiter_bytes): # type: ignore[arg-type]
|
||||
break
|
||||
|
||||
if line.strip().startswith(delimiter):
|
||||
break
|
||||
lines_bytes.append(line) # type: ignore[arg-type]
|
||||
|
||||
lines.append(line)
|
||||
return b"\n".join(lines_bytes) # type: ignore[return-value,arg-type]
|
||||
else:
|
||||
lines_str = []
|
||||
for line in dumpsys.splitlines(): # type: ignore[union-attr]
|
||||
if line.strip() == separator: # type: ignore[arg-type]
|
||||
in_section = True
|
||||
continue
|
||||
|
||||
return b"\n".join(lines) if binary else "\n".join(lines)
|
||||
if not in_section:
|
||||
continue
|
||||
|
||||
if line.strip().startswith(delimiter_str): # type: ignore[arg-type]
|
||||
break
|
||||
|
||||
lines_str.append(line) # type: ignore[arg-type]
|
||||
|
||||
return "\n".join(lines_str) # type: ignore[return-value,arg-type]
|
||||
|
||||
@@ -16,8 +16,9 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, content: str) -> None:
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hashlib
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
@@ -83,17 +84,22 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
return keystore
|
||||
|
||||
@staticmethod
|
||||
def calculate_key_info(user_key: bytes) -> str:
|
||||
def calculate_key_info(user_key: bytes) -> dict:
|
||||
if b" " in user_key:
|
||||
key_base64, user = user_key.split(b" ", 1)
|
||||
else:
|
||||
key_base64, user = user_key, b""
|
||||
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
try:
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
except binascii.Error:
|
||||
# Impossible to parse base64
|
||||
key_fingerprint_colon = ""
|
||||
|
||||
return {
|
||||
"user": user.decode("utf-8"),
|
||||
"fingerprint": key_fingerprint_colon,
|
||||
|
||||
@@ -4,13 +4,13 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
|
||||
RISKY_PERMISSIONS = ["REQUEST_INSTALL_PACKAGES"]
|
||||
RISKY_PACKAGES = ["com.android.shell"]
|
||||
|
||||
@@ -45,9 +45,8 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
if self.indicators:
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -66,7 +65,6 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = [perm]
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f"Package '{result['package_name']}' had risky permission '{perm['name']}' set to '{entry['access']}' at {entry['timestamp']}",
|
||||
entry["timestamp"],
|
||||
cleaned_result,
|
||||
@@ -80,7 +78,6 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = [perm]
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f"Risky package '{result['package_name']}' had '{perm['name']}' permission set to '{entry['access']}' at {entry['timestamp']}",
|
||||
entry["timestamp"],
|
||||
cleaned_result,
|
||||
@@ -88,9 +85,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
# self.results: List[Dict[str, Any]] = []
|
||||
perm = {}
|
||||
package = {}
|
||||
entry = {}
|
||||
perm: dict[str, Any] = {}
|
||||
package: dict[str, Any] = {}
|
||||
entry: dict[str, Any] = {}
|
||||
uid = None
|
||||
in_packages = False
|
||||
|
||||
|
||||
@@ -3,9 +3,11 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from typing import Any
|
||||
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleSerializedResult, ModuleAtomicResult
|
||||
|
||||
|
||||
class DumpsysBatteryDailyArtifact(AndroidArtifact):
|
||||
@@ -29,13 +31,14 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
daily = None
|
||||
daily_updates = []
|
||||
daily_updates: list[dict[str, Any]] = []
|
||||
for line in output.splitlines():
|
||||
if line.startswith(" Daily from "):
|
||||
if len(daily_updates) > 0:
|
||||
|
||||
@@ -18,8 +18,9 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, data: str) -> None:
|
||||
|
||||
@@ -22,9 +22,8 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
|
||||
for part in path.split("/"):
|
||||
ioc_match = self.indicators.check_app_id(part)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
@@ -14,9 +14,8 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
|
||||
for activity in self.results:
|
||||
ioc_match = self.indicators.check_app_id(activity["package_name"])
|
||||
if ioc_match:
|
||||
activity["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", activity
|
||||
ioc_match.message, "", activity, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
@@ -7,9 +7,9 @@ import re
|
||||
from typing import Any, Dict, List
|
||||
|
||||
from mvt.android.utils import ROOT_PACKAGES
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
|
||||
class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
@@ -18,7 +18,6 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
# XXX: De-duplication Package detections
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
|
||||
"",
|
||||
result,
|
||||
@@ -31,8 +30,9 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
@@ -63,15 +63,15 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
"""
|
||||
Parse one entry of a dumpsys package information
|
||||
"""
|
||||
details = {
|
||||
details: Dict[str, Any] = {
|
||||
"uid": "",
|
||||
"version_name": "",
|
||||
"version_code": "",
|
||||
"timestamp": "",
|
||||
"first_install_time": "",
|
||||
"last_update_time": "",
|
||||
"permissions": [],
|
||||
"requested_permissions": [],
|
||||
"permissions": list(),
|
||||
"requested_permissions": list(),
|
||||
}
|
||||
in_install_permissions = False
|
||||
in_runtime_permissions = False
|
||||
@@ -149,7 +149,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
results = []
|
||||
package_name = None
|
||||
package = {}
|
||||
lines = []
|
||||
lines: list[str] = []
|
||||
for line in output.splitlines():
|
||||
if line.startswith(" Package ["):
|
||||
if len(lines) > 0:
|
||||
|
||||
@@ -18,8 +18,9 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, data: str) -> None:
|
||||
|
||||
@@ -52,14 +52,16 @@ class DumpsysReceiversArtifact(AndroidArtifact):
|
||||
|
||||
ioc_match = self.indicators.check_app_id(receiver["package_name"])
|
||||
if ioc_match:
|
||||
receiver["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", {intent: receiver}
|
||||
ioc_match.message,
|
||||
"",
|
||||
{intent: receiver},
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
continue
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
self.results = {}
|
||||
self.results: dict[str, list[dict[str, str]]] = {}
|
||||
|
||||
in_receiver_resolver_table = False
|
||||
in_non_data_actions = False
|
||||
|
||||
@@ -39,10 +39,10 @@ class GetProp(AndroidArtifact):
|
||||
if not matches or len(matches[0]) != 2:
|
||||
continue
|
||||
|
||||
entry = {"name": matches[0][0], "value": matches[0][1]}
|
||||
self.results.append(entry)
|
||||
prop_entry = {"name": matches[0][0], "value": matches[0][1]}
|
||||
self.results.append(prop_entry)
|
||||
|
||||
def get_device_timezone(self) -> str:
|
||||
def get_device_timezone(self) -> str | None:
|
||||
"""
|
||||
Get the device timezone from the getprop results
|
||||
|
||||
@@ -60,7 +60,8 @@ class GetProp(AndroidArtifact):
|
||||
|
||||
if entry["name"] == "ro.build.version.security_patch":
|
||||
warning_message = warn_android_patch_level(entry["value"], self.log)
|
||||
self.alertstore.medium(self.get_slug(), warning_message, "", entry)
|
||||
if isinstance(warning_message, str):
|
||||
self.alertstore.medium(warning_message, "", entry)
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
@@ -70,5 +71,6 @@ class GetProp(AndroidArtifact):
|
||||
result.get("name", "")
|
||||
)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
197
src/mvt/android/artifacts/mounts.py
Normal file
197
src/mvt/android/artifacts/mounts.py
Normal file
@@ -0,0 +1,197 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
SUSPICIOUS_MOUNT_POINTS = [
|
||||
"/system",
|
||||
"/vendor",
|
||||
"/product",
|
||||
"/system_ext",
|
||||
]
|
||||
|
||||
SUSPICIOUS_OPTIONS = [
|
||||
"rw",
|
||||
"remount",
|
||||
"noatime",
|
||||
"nodiratime",
|
||||
]
|
||||
|
||||
ALLOWLIST_NOATIME = [
|
||||
"/system_dlkm",
|
||||
"/system_ext",
|
||||
"/product",
|
||||
"/vendor",
|
||||
"/vendor_dlkm",
|
||||
]
|
||||
|
||||
|
||||
class Mounts(AndroidArtifact):
|
||||
"""
|
||||
This artifact parses mount information from /proc/mounts or similar mount data.
|
||||
It can detect potentially suspicious mount configurations that may indicate
|
||||
a rooted or compromised device.
|
||||
"""
|
||||
|
||||
def parse(self, entry: str) -> None:
|
||||
"""
|
||||
Parse mount information from the provided entry.
|
||||
|
||||
Examples:
|
||||
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
|
||||
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
|
||||
"""
|
||||
self.results: list[dict[str, Any]] = []
|
||||
|
||||
for line in entry.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
device = None
|
||||
mount_point = None
|
||||
filesystem_type = None
|
||||
mount_options = ""
|
||||
|
||||
if " on " in line and " type " in line:
|
||||
try:
|
||||
# Format: device on mount_point type filesystem_type (options)
|
||||
device_part, rest = line.split(" on ", 1)
|
||||
device = device_part.strip()
|
||||
|
||||
# Split by 'type' to get mount_point and filesystem info
|
||||
mount_part, fs_part = rest.split(" type ", 1)
|
||||
mount_point = mount_part.strip()
|
||||
|
||||
# Parse filesystem and options
|
||||
if "(" in fs_part and fs_part.endswith(")"):
|
||||
# Format: filesystem_type (options)
|
||||
fs_and_opts = fs_part.strip()
|
||||
paren_idx = fs_and_opts.find("(")
|
||||
filesystem_type = fs_and_opts[:paren_idx].strip()
|
||||
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
|
||||
else:
|
||||
# No options in parentheses, just filesystem type
|
||||
filesystem_type = fs_part.strip()
|
||||
mount_options = ""
|
||||
|
||||
# Skip if we don't have essential info
|
||||
if not device or not mount_point or not filesystem_type:
|
||||
continue
|
||||
|
||||
# Parse options into list
|
||||
options_list = (
|
||||
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
|
||||
if mount_options
|
||||
else []
|
||||
)
|
||||
|
||||
# Check if it's a system partition
|
||||
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
|
||||
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
|
||||
)
|
||||
|
||||
# Check if it's mounted read-write
|
||||
is_read_write = "rw" in options_list
|
||||
|
||||
mount_entry = {
|
||||
"device": device,
|
||||
"mount_point": mount_point,
|
||||
"filesystem_type": filesystem_type,
|
||||
"mount_options": mount_options,
|
||||
"options_list": options_list,
|
||||
"is_system_partition": is_system_partition,
|
||||
"is_read_write": is_read_write,
|
||||
}
|
||||
|
||||
self.results.append(mount_entry)
|
||||
|
||||
except ValueError:
|
||||
# If parsing fails, skip this line
|
||||
continue
|
||||
else:
|
||||
# Skip lines that don't match expected format
|
||||
continue
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""
|
||||
Check for suspicious mount configurations that may indicate root access
|
||||
or other security concerns.
|
||||
"""
|
||||
system_rw_mounts = []
|
||||
suspicious_mounts = []
|
||||
|
||||
for mount in self.results:
|
||||
mount_point = mount["mount_point"]
|
||||
options = mount["options_list"]
|
||||
|
||||
# Check for system partitions mounted as read-write
|
||||
if mount["is_system_partition"] and mount["is_read_write"]:
|
||||
system_rw_mounts.append(mount)
|
||||
if mount_point == "/system":
|
||||
self.alertstore.high(
|
||||
"Root detected /system partition is mounted as read-write (rw)",
|
||||
"",
|
||||
mount,
|
||||
)
|
||||
else:
|
||||
self.alertstore.high(
|
||||
f"System partition {mount_point} is mounted as read-write (rw). This may indicate system modifications.",
|
||||
"",
|
||||
mount,
|
||||
)
|
||||
|
||||
# Check for other suspicious mount options
|
||||
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
|
||||
if suspicious_opts and mount["is_system_partition"]:
|
||||
if (
|
||||
"noatime" in mount["mount_options"]
|
||||
and mount["mount_point"] in ALLOWLIST_NOATIME
|
||||
):
|
||||
continue
|
||||
suspicious_mounts.append(mount)
|
||||
self.alertstore.high(
|
||||
f"Suspicious mount options found for {mount_point}: {', '.join(suspicious_opts)}",
|
||||
"",
|
||||
mount,
|
||||
)
|
||||
|
||||
# Log interesting mount information
|
||||
if mount_point == "/data" or mount_point.startswith("/sdcard"):
|
||||
self.log.info(
|
||||
"Data partition: %s mounted as %s with options: %s",
|
||||
mount_point,
|
||||
mount["filesystem_type"],
|
||||
mount["mount_options"],
|
||||
)
|
||||
|
||||
self.log.info("Parsed %d mount entries", len(self.results))
|
||||
|
||||
# Check indicators if available
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for mount in self.results:
|
||||
# Check if any mount points match indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
|
||||
if ioc:
|
||||
self.alertstore.critical(
|
||||
f"Mount point matches indicator: {mount.get('mount_point', '')}",
|
||||
"",
|
||||
mount,
|
||||
matched_indicator=ioc,
|
||||
)
|
||||
|
||||
# Check device paths for indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("device", ""))
|
||||
if ioc:
|
||||
self.alertstore.critical(
|
||||
f"Device path matches indicator: {mount.get('device', '')}",
|
||||
"",
|
||||
mount,
|
||||
matched_indicator=ioc,
|
||||
)
|
||||
@@ -60,11 +60,13 @@ class Processes(AndroidArtifact):
|
||||
|
||||
ioc_match = self.indicators.check_app_id(proc_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_process(proc_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
@@ -51,11 +51,6 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
"key": "send_action_app_error",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "enabled installation of non Google Play apps",
|
||||
"key": "install_non_market_apps",
|
||||
"safe_value": "0",
|
||||
},
|
||||
{
|
||||
"description": "enabled accessibility services",
|
||||
"key": "accessibility_enabled",
|
||||
|
||||
@@ -6,14 +6,15 @@
|
||||
import datetime
|
||||
from typing import List, Optional
|
||||
|
||||
import pydantic
|
||||
import betterproto
|
||||
import pydantic
|
||||
from dateutil import parser
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
from mvt.android.parsers.proto.tombstone import Tombstone
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
|
||||
|
||||
@@ -53,7 +54,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
||||
file_name: str
|
||||
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
build_fingerprint: str
|
||||
revision: int
|
||||
revision: str
|
||||
arch: Optional[str] = None
|
||||
timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
process_uptime: Optional[int] = None
|
||||
@@ -63,14 +64,14 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
||||
process_name: Optional[str] = None
|
||||
binary_path: Optional[str] = None
|
||||
selinux_label: Optional[str] = None
|
||||
uid: Optional[int] = None
|
||||
uid: int
|
||||
signal_info: SignalInfo
|
||||
cause: Optional[str] = None
|
||||
extra: Optional[str] = None
|
||||
|
||||
|
||||
class TombstoneCrashArtifact(AndroidArtifact):
|
||||
""" "
|
||||
"""
|
||||
Parser for Android tombstone crash files.
|
||||
|
||||
This parser can parse both text and protobuf tombstone crash files.
|
||||
@@ -94,17 +95,18 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_process(result["process_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
if result.get("command_line", []):
|
||||
command_name = result.get("command_line")[0].split("/")[-1]
|
||||
command_name = result["command_line"][0]
|
||||
ioc_match = self.indicators.check_process(command_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -115,7 +117,6 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
]
|
||||
if result["uid"] in SUSPICIOUS_UIDS:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
(
|
||||
f"Potentially suspicious crash in process '{result['process_name']}' "
|
||||
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
|
||||
@@ -127,11 +128,11 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
def parse_protobuf(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
|
||||
) -> None:
|
||||
"""
|
||||
Parse Android tombstone crash files from a protobuf object.
|
||||
"""
|
||||
"""Parse Android tombstone crash files from a protobuf object."""
|
||||
tombstone_pb = Tombstone().parse(data)
|
||||
tombstone_dict = tombstone_pb.to_dict(betterproto.Casing.SNAKE)
|
||||
tombstone_dict = tombstone_pb.to_dict(
|
||||
betterproto.Casing.SNAKE, include_default_values=True
|
||||
)
|
||||
|
||||
# Add some extra metadata
|
||||
tombstone_dict["timestamp"] = self._parse_timestamp_string(
|
||||
@@ -148,21 +149,23 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
def parse(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
|
||||
) -> None:
|
||||
"""
|
||||
Parse text Android tombstone crash files.
|
||||
"""
|
||||
|
||||
# Split the tombstone file into a dictonary
|
||||
"""Parse text Android tombstone crash files."""
|
||||
tombstone_dict = {
|
||||
"file_name": file_name,
|
||||
"file_timestamp": convert_datetime_to_iso(file_timestamp),
|
||||
}
|
||||
lines = content.decode("utf-8").splitlines()
|
||||
for line in lines:
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
if not line.strip() or TOMBSTONE_DELIMITER in line:
|
||||
continue
|
||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
|
||||
try:
|
||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||
if self._parse_tombstone_line(
|
||||
line, key, destination_key, tombstone_dict
|
||||
):
|
||||
break
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
|
||||
|
||||
# Validate the tombstone and add it to the results
|
||||
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
||||
@@ -172,7 +175,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
self, line: str, key: str, destination_key: str, tombstone: dict
|
||||
) -> bool:
|
||||
if not line.startswith(f"{key}"):
|
||||
return None
|
||||
return False
|
||||
|
||||
if key == "pid":
|
||||
return self._load_pid_line(line, tombstone)
|
||||
@@ -191,7 +194,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
raise ValueError(f"Expected key {key}, got {line_key}")
|
||||
|
||||
value_clean = value.strip().strip("'")
|
||||
if destination_key in ["uid", "revision"]:
|
||||
if destination_key == "uid":
|
||||
tombstone[destination_key] = int(value_clean)
|
||||
elif destination_key == "process_uptime":
|
||||
# eg. "Process uptime: 40s"
|
||||
@@ -204,51 +207,50 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
return True
|
||||
|
||||
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
|
||||
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
|
||||
try:
|
||||
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
|
||||
process_info = parts[0]
|
||||
|
||||
pid_key, pid_value = pid_part.split(":", 1)
|
||||
if pid_key != "pid":
|
||||
raise ValueError(f"Expected key pid, got {pid_key}")
|
||||
pid_value = int(pid_value.strip())
|
||||
# Parse pid, tid, name from process info
|
||||
info_parts = [p.strip() for p in process_info.split(",")]
|
||||
for info in info_parts:
|
||||
key, value = info.split(":", 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
tid_key, tid_value = tid_part.split(":", 1)
|
||||
if tid_key != "tid":
|
||||
raise ValueError(f"Expected key tid, got {tid_key}")
|
||||
tid_value = int(tid_value.strip())
|
||||
if key == "pid":
|
||||
tombstone["pid"] = int(value)
|
||||
elif key == "tid":
|
||||
tombstone["tid"] = int(value)
|
||||
elif key == "name":
|
||||
tombstone["process_name"] = value
|
||||
|
||||
name_key, name_value = name_part.split(":", 1)
|
||||
if name_key != "name":
|
||||
raise ValueError(f"Expected key name, got {name_key}")
|
||||
name_value = name_value.strip()
|
||||
process_name, binary_path = self._parse_process_name(name_value, tombstone)
|
||||
# Extract binary path if it exists
|
||||
if len(parts) > 1:
|
||||
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
|
||||
|
||||
tombstone["pid"] = pid_value
|
||||
tombstone["tid"] = tid_value
|
||||
tombstone["process_name"] = process_name
|
||||
tombstone["binary_path"] = binary_path
|
||||
return True
|
||||
return True
|
||||
|
||||
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
|
||||
process_name, process_path = process_name_part.split(">>>")
|
||||
process_name = process_name.strip()
|
||||
binary_path = process_path.strip().split(" ")[0]
|
||||
return process_name, binary_path
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to parse PID line: {str(e)}")
|
||||
|
||||
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
|
||||
signal, code, _ = [part.strip() for part in line.split(",", 2)]
|
||||
signal = signal.split("signal ")[1]
|
||||
signal_code, signal_name = signal.split(" ")
|
||||
signal_name = signal_name.strip("()")
|
||||
signal_part, code_part = map(str.strip, line.split(",")[:2])
|
||||
|
||||
code_part = code.split("code ")[1]
|
||||
code_number, code_name = code_part.split(" ")
|
||||
code_name = code_name.strip("()")
|
||||
def parse_part(part: str, prefix: str) -> tuple[int, str]:
|
||||
match = part.split(prefix)[1]
|
||||
number = int(match.split()[0])
|
||||
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
|
||||
return number, name
|
||||
|
||||
signal_number, signal_name = parse_part(signal_part, "signal ")
|
||||
code_number, code_name = parse_part(code_part, "code ")
|
||||
|
||||
tombstone["signal_info"] = {
|
||||
"code": int(code_number),
|
||||
"code": code_number,
|
||||
"code_name": code_name,
|
||||
"name": signal_name,
|
||||
"number": int(signal_code),
|
||||
"number": signal_number,
|
||||
}
|
||||
return True
|
||||
|
||||
@@ -259,13 +261,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
|
||||
@staticmethod
|
||||
def _parse_timestamp_string(timestamp: str) -> str:
|
||||
timestamp_date, timezone = timestamp.split("+")
|
||||
# Truncate microseconds before parsing
|
||||
timestamp_without_micro = timestamp_date.split(".")[0] + "+" + timezone
|
||||
timestamp_parsed = datetime.datetime.strptime(
|
||||
timestamp_without_micro, "%Y-%m-%d %H:%M:%S%z"
|
||||
)
|
||||
|
||||
timestamp_parsed = parser.parse(timestamp)
|
||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||
return convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
@@ -9,35 +9,36 @@ import click
|
||||
|
||||
from mvt.common.cmd_check_iocs import CmdCheckIOCS
|
||||
from mvt.common.help import (
|
||||
HELP_MSG_VERSION,
|
||||
HELP_MSG_OUTPUT,
|
||||
HELP_MSG_VERBOSE,
|
||||
HELP_MSG_ANDROID_BACKUP_PASSWORD,
|
||||
HELP_MSG_CHECK_ADB_REMOVED,
|
||||
HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION,
|
||||
HELP_MSG_CHECK_ANDROID_BACKUP,
|
||||
HELP_MSG_CHECK_ANDROIDQF,
|
||||
HELP_MSG_CHECK_BUGREPORT,
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_IOC,
|
||||
HELP_MSG_LIST_MODULES,
|
||||
HELP_MSG_MODULE,
|
||||
HELP_MSG_NONINTERACTIVE,
|
||||
HELP_MSG_ANDROID_BACKUP_PASSWORD,
|
||||
HELP_MSG_CHECK_ADB_REMOVED,
|
||||
HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION,
|
||||
HELP_MSG_CHECK_BUGREPORT,
|
||||
HELP_MSG_CHECK_ANDROID_BACKUP,
|
||||
HELP_MSG_CHECK_ANDROIDQF,
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_OUTPUT,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_VERBOSE,
|
||||
HELP_MSG_VERSION,
|
||||
)
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
from mvt.common.utils import init_logging, set_verbose_logging
|
||||
|
||||
|
||||
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
|
||||
from .cmd_check_backup import CmdAndroidCheckBackup
|
||||
from .cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
from .modules.backup.helpers import cli_load_android_backup_password
|
||||
from .modules.bugreport import BUGREPORT_MODULES
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
|
||||
init_logging()
|
||||
log = logging.getLogger("mvt")
|
||||
@@ -45,12 +46,37 @@ log = logging.getLogger("mvt")
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -101,6 +127,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
hashes=True,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -159,6 +187,8 @@ def check_backup(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -219,6 +249,8 @@ def check_androidqf(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
|
||||
@@ -9,12 +9,11 @@ import zipfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
from .modules.androidqf.base import AndroidQFModule
|
||||
|
||||
@@ -45,6 +44,8 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -57,6 +58,8 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-androidqf"
|
||||
@@ -137,6 +140,7 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
raise NoAndroidQFBackup
|
||||
|
||||
def run_bugreport_cmd(self) -> bool:
|
||||
bugreport = None
|
||||
try:
|
||||
bugreport = self.load_bugreport()
|
||||
except NoAndroidQFBugReport:
|
||||
@@ -163,6 +167,8 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
if bugreport:
|
||||
bugreport.close()
|
||||
|
||||
return True
|
||||
|
||||
def run_backup_cmd(self) -> bool:
|
||||
try:
|
||||
backup = self.load_backup()
|
||||
@@ -171,25 +177,22 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
"Skipping backup modules as no backup.ab found in AndroidQF data."
|
||||
)
|
||||
return False
|
||||
else:
|
||||
cmd = CmdAndroidCheckBackup(
|
||||
target_path=None,
|
||||
results_path=self.results_path,
|
||||
ioc_files=self.ioc_files,
|
||||
iocs=self.iocs,
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
)
|
||||
cmd.from_ab(backup)
|
||||
cmd.run()
|
||||
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
finally:
|
||||
if backup:
|
||||
backup.close()
|
||||
cmd = CmdAndroidCheckBackup(
|
||||
target_path=None,
|
||||
results_path=self.results_path,
|
||||
ioc_files=self.ioc_files,
|
||||
iocs=self.iocs,
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
)
|
||||
cmd.from_ab(backup)
|
||||
cmd.run()
|
||||
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
return True
|
||||
|
||||
def finish(self) -> None:
|
||||
"""
|
||||
|
||||
@@ -39,6 +39,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -51,6 +53,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
@@ -89,22 +93,28 @@ class CmdAndroidCheckBackup(Command):
|
||||
self.__files.append(member.name)
|
||||
|
||||
def init(self) -> None:
|
||||
if not self.target_path:
|
||||
if not self.target_path: # type: ignore[has-type]
|
||||
return
|
||||
|
||||
if os.path.isfile(self.target_path):
|
||||
# Type guard: we know it's not None here after the check above
|
||||
assert self.target_path is not None # type: ignore[has-type]
|
||||
# Use a different local variable name to avoid any scoping issues
|
||||
backup_path: str = self.target_path # type: ignore[has-type]
|
||||
|
||||
if os.path.isfile(backup_path):
|
||||
self.__type = "ab"
|
||||
with open(self.target_path, "rb") as handle:
|
||||
with open(backup_path, "rb") as handle:
|
||||
ab_file_bytes = handle.read()
|
||||
self.from_ab(ab_file_bytes)
|
||||
|
||||
elif os.path.isdir(self.target_path):
|
||||
elif os.path.isdir(backup_path):
|
||||
self.__type = "folder"
|
||||
self.target_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
backup_path = Path(backup_path).absolute().as_posix()
|
||||
self.target_path = backup_path
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)):
|
||||
for fname in subfiles:
|
||||
self.__files.append(
|
||||
os.path.relpath(os.path.join(root, fname), self.target_path)
|
||||
os.path.relpath(os.path.join(root, fname), backup_path)
|
||||
)
|
||||
else:
|
||||
log.critical(
|
||||
|
||||
@@ -30,6 +30,8 @@ class CmdAndroidCheckBugreport(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -42,6 +44,8 @@ class CmdAndroidCheckBugreport(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-bugreport"
|
||||
@@ -92,6 +96,8 @@ class CmdAndroidCheckBugreport(Command):
|
||||
if self.__format == "zip":
|
||||
module.from_zip(self.__zip, self.__files)
|
||||
else:
|
||||
if not self.target_path:
|
||||
raise ValueError("target_path is not set")
|
||||
module.from_dir(self.target_path, self.__files)
|
||||
|
||||
def finish(self) -> None:
|
||||
|
||||
@@ -8,12 +8,12 @@ import os
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -40,7 +40,7 @@ class ChromeHistory(AndroidExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = []
|
||||
self.results: list = []
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
@@ -58,8 +58,9 @@ class ChromeHistory(AndroidExtraction):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def _parse_db(self, db_path: str) -> None:
|
||||
"""Parse a Chrome History database file.
|
||||
|
||||
@@ -8,8 +8,8 @@ import os
|
||||
import stat
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -64,11 +64,15 @@ class Files(AndroidExtraction):
|
||||
result["path"],
|
||||
)
|
||||
|
||||
if self.indicators and self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning(
|
||||
'Found a known suspicous file at path: "%s"', result["path"]
|
||||
)
|
||||
self.detected.append(result)
|
||||
if self.indicators:
|
||||
ioc_match = self.indicators.check_file_path(result["path"])
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
f'Found a known suspicious file at path: "{result["path"]}"',
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match,
|
||||
)
|
||||
|
||||
def backup_file(self, file_path: str) -> None:
|
||||
if not self.results_path:
|
||||
|
||||
@@ -94,17 +94,17 @@ class Packages(AndroidExtraction):
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name"))
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
for package_file in result.get("files", []):
|
||||
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
# @staticmethod
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class RootBinaries(AndroidExtraction):
|
||||
"""This module extracts the list of installed packages."""
|
||||
@@ -33,8 +34,11 @@ class RootBinaries(AndroidExtraction):
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for root_binary in self.results:
|
||||
self.detected.append(root_binary)
|
||||
self.log.warning('Found root binary "%s"', root_binary)
|
||||
self.alertstore.high(
|
||||
f'Found root binary "{root_binary}"',
|
||||
"",
|
||||
root_binary,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
root_binaries = [
|
||||
|
||||
@@ -6,9 +6,10 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
|
||||
class SELinuxStatus(AndroidExtraction):
|
||||
"""This module checks if SELinux is being enforced."""
|
||||
@@ -33,7 +34,7 @@ class SELinuxStatus(AndroidExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = {} if not results else results
|
||||
self.results: dict = {}
|
||||
|
||||
def run(self) -> None:
|
||||
self._adb_connect()
|
||||
|
||||
@@ -10,12 +10,12 @@ from typing import Optional
|
||||
|
||||
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
|
||||
from mvt.common.module import InsufficientPrivileges
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -92,9 +92,8 @@ class SMS(AndroidExtraction):
|
||||
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def _parse_db(self, db_path: str) -> None:
|
||||
|
||||
@@ -9,14 +9,14 @@ import os
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
)
|
||||
|
||||
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
|
||||
|
||||
@@ -60,8 +60,11 @@ class Whatsapp(AndroidExtraction):
|
||||
continue
|
||||
|
||||
message_links = check_for_links(message["data"])
|
||||
if self.indicators.check_urls(message_links):
|
||||
self.detected.append(message)
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def _parse_db(self, db_path: str) -> None:
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .aqf_files import AQFFiles
|
||||
from .aqf_getprop import AQFGetProp
|
||||
from .aqf_packages import AQFPackages
|
||||
from .aqf_processes import AQFProcesses
|
||||
from .aqf_settings import AQFSettings
|
||||
from .aqf_files import AQFFiles
|
||||
from .mounts import Mounts
|
||||
from .root_binaries import RootBinaries
|
||||
from .sms import SMS
|
||||
|
||||
ANDROIDQF_MODULES = [
|
||||
@@ -17,4 +19,6 @@ ANDROIDQF_MODULES = [
|
||||
AQFSettings,
|
||||
AQFFiles,
|
||||
SMS,
|
||||
RootBinaries,
|
||||
Mounts,
|
||||
]
|
||||
|
||||
@@ -15,8 +15,8 @@ from typing import Optional
|
||||
|
||||
from mvt.android.modules.androidqf.base import AndroidQFModule
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
@@ -89,8 +89,9 @@ class AQFFiles(AndroidQFModule):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_file_path(result["path"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
|
||||
@@ -105,16 +106,17 @@ class AQFFiles(AndroidQFModule):
|
||||
file_type = "executable "
|
||||
|
||||
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
|
||||
self.alertstore.high(self.get_slug(), msg, "", result)
|
||||
self.alertstore.high(msg, "", result)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_file_hash(result["sha256"])
|
||||
ioc_match = self.indicators.check_file_hash(result.get("sha256") or "")
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
|
||||
|
||||
@@ -7,9 +7,9 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
@@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = []
|
||||
self.results: list = []
|
||||
|
||||
def run(self) -> None:
|
||||
getprop_files = self._get_files_by_pattern("*/getprop.txt")
|
||||
|
||||
@@ -3,15 +3,16 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
import datetime
|
||||
import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from .base import AndroidQFModule
|
||||
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
@@ -37,11 +38,13 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def _get_file_modification_time(self, file_path: str) -> dict:
|
||||
def _get_file_modification_time(self, file_path: str) -> datetime.datetime:
|
||||
if self.archive:
|
||||
file_timetuple = self.archive.getinfo(file_path).date_time
|
||||
return datetime.datetime(*file_timetuple)
|
||||
else:
|
||||
if not self.parent_path:
|
||||
raise ValueError("parent_path is not set")
|
||||
file_stat = os.stat(os.path.join(self.parent_path, file_path))
|
||||
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
|
||||
|
||||
|
||||
@@ -11,13 +11,13 @@ from mvt.android.utils import (
|
||||
BROWSER_INSTALLERS,
|
||||
PLAY_STORE_INSTALLERS,
|
||||
ROOT_PACKAGES,
|
||||
THIRD_PARTY_STORE_INSTALLERS,
|
||||
SECURITY_PACKAGES,
|
||||
SYSTEM_UPDATE_PACKAGES,
|
||||
THIRD_PARTY_STORE_INSTALLERS,
|
||||
)
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFPackages(AndroidQFModule):
|
||||
@@ -45,7 +45,6 @@ class AQFPackages(AndroidQFModule):
|
||||
for result in self.results:
|
||||
if result["name"] in ROOT_PACKAGES:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
@@ -56,7 +55,6 @@ class AQFPackages(AndroidQFModule):
|
||||
# Detections for apps installed via unusual methods.
|
||||
if result["installer"] in THIRD_PARTY_STORE_INSTALLERS:
|
||||
self.alertstore.info(
|
||||
self.get_slug(),
|
||||
f'Found a package installed via a third party store (installer="{result["installer"]}"): "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
@@ -64,7 +62,6 @@ class AQFPackages(AndroidQFModule):
|
||||
self.alertstore.log_latest()
|
||||
elif result["installer"] in BROWSER_INSTALLERS:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found a package installed via a browser (installer="{result["installer"]}"): "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
@@ -72,7 +69,6 @@ class AQFPackages(AndroidQFModule):
|
||||
self.alertstore.log_latest()
|
||||
elif result["installer"] == "null" and result["system"] is False:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Found a non-system package installed via adb or another method: "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
@@ -85,7 +81,6 @@ class AQFPackages(AndroidQFModule):
|
||||
package_disabled = result.get("disabled", None)
|
||||
if result["name"] in SECURITY_PACKAGES and package_disabled:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Security package "{result["name"]}" disabled on the phone',
|
||||
"",
|
||||
result,
|
||||
@@ -94,7 +89,6 @@ class AQFPackages(AndroidQFModule):
|
||||
|
||||
if result["name"] in SYSTEM_UPDATE_PACKAGES and package_disabled:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'System OTA update package "{result["name"]}" disabled on the phone',
|
||||
"",
|
||||
result,
|
||||
@@ -104,18 +98,20 @@ class AQFPackages(AndroidQFModule):
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("name"))
|
||||
ioc_match = self.indicators.check_app_id(result.get("name") or "")
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
for package_file in result.get("files", []):
|
||||
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
|
||||
ioc_match = self.indicators.check_file_hash(
|
||||
package_file.get("sha256") or ""
|
||||
)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
@@ -129,9 +125,11 @@ class AQFPackages(AndroidQFModule):
|
||||
certificate_hash
|
||||
)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message,
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
break
|
||||
|
||||
@@ -7,9 +7,9 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.settings import Settings as SettingsArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
@@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = {}
|
||||
self.results: dict = {}
|
||||
|
||||
def run(self) -> None:
|
||||
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
|
||||
|
||||
@@ -33,8 +33,8 @@ class AndroidQFModule(MVTModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.parent_path = None
|
||||
self._path: str = target_path
|
||||
self.parent_path: Optional[str] = None
|
||||
self._path: Optional[str] = target_path
|
||||
self.files: List[str] = []
|
||||
self.archive: Optional[zipfile.ZipFile] = None
|
||||
|
||||
|
||||
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Mounts(MountsArtifact, AndroidQFModule):
|
||||
"""This module extracts and analyzes mount information from AndroidQF acquisitions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Run the mounts analysis module.
|
||||
|
||||
This module looks for mount information files collected by androidqf
|
||||
and analyzes them for suspicious configurations, particularly focusing
|
||||
on detecting root access indicators like /system mounted as read-write.
|
||||
"""
|
||||
mount_files = self._get_files_by_pattern("*/mounts.json")
|
||||
|
||||
if not mount_files:
|
||||
self.log.info("No mount information file found")
|
||||
return
|
||||
|
||||
self.log.info("Found mount information file: %s", mount_files[0])
|
||||
|
||||
try:
|
||||
data = self._get_file_content(mount_files[0]).decode(
|
||||
"utf-8", errors="replace"
|
||||
)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to read mount information file: %s", exc)
|
||||
return
|
||||
|
||||
# Parse the mount data
|
||||
try:
|
||||
json_data = json.loads(data)
|
||||
|
||||
if isinstance(json_data, list):
|
||||
# AndroidQF format: array of strings like
|
||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
||||
mount_content = "\n".join(json_data)
|
||||
self.parse(mount_content)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to parse mount information: %s", exc)
|
||||
return
|
||||
|
||||
self.log.info("Extracted a total of %d mount entries", len(self.results))
|
||||
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class RootBinaries(AndroidQFModule):
|
||||
"""This module analyzes root_binaries.json for root binaries found by androidqf."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: dict) -> dict:
|
||||
return {
|
||||
"timestamp": record.get("timestamp"),
|
||||
"module": self.__class__.__name__,
|
||||
"event": "root_binary_found",
|
||||
"data": f"Root binary found: {record['path']} (binary: {record['binary_name']})",
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check for indicators of device rooting."""
|
||||
if not self.results:
|
||||
return
|
||||
|
||||
# All found root binaries are considered indicators of rooting
|
||||
for result in self.results:
|
||||
self.alertstore.high(
|
||||
f'Found root binary "{result["binary_name"]}" at path "{result["path"]}"',
|
||||
"",
|
||||
result,
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if self.results:
|
||||
self.log.warning(
|
||||
"Device shows signs of rooting with %d root binaries found",
|
||||
len(self.results),
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the root binaries analysis."""
|
||||
root_binaries_files = self._get_files_by_pattern("*/root_binaries.json")
|
||||
|
||||
if not root_binaries_files:
|
||||
self.log.info("No root_binaries.json file found")
|
||||
return
|
||||
|
||||
rawdata = self._get_file_content(root_binaries_files[0]).decode(
|
||||
"utf-8", errors="ignore"
|
||||
)
|
||||
|
||||
try:
|
||||
root_binary_paths = json.loads(rawdata)
|
||||
except json.JSONDecodeError as e:
|
||||
self.log.error("Failed to parse root_binaries.json: %s", e)
|
||||
return
|
||||
|
||||
if not isinstance(root_binary_paths, list):
|
||||
self.log.error("Expected root_binaries.json to contain a list of paths")
|
||||
return
|
||||
|
||||
# Known root binary names that might be found and their descriptions
|
||||
# This maps the binary name to a human-readable description
|
||||
known_root_binaries = {
|
||||
"su": "SuperUser binary",
|
||||
"busybox": "BusyBox utilities",
|
||||
"supersu": "SuperSU root management",
|
||||
"Superuser.apk": "Superuser app",
|
||||
"KingoUser.apk": "KingRoot app",
|
||||
"SuperSu.apk": "SuperSU app",
|
||||
"magisk": "Magisk root framework",
|
||||
"magiskhide": "Magisk hide utility",
|
||||
"magiskinit": "Magisk init binary",
|
||||
"magiskpolicy": "Magisk policy binary",
|
||||
}
|
||||
|
||||
for path in root_binary_paths:
|
||||
if not path or not isinstance(path, str):
|
||||
continue
|
||||
|
||||
# Extract binary name from path
|
||||
binary_name = path.split("/")[-1].lower()
|
||||
|
||||
# Check if this matches a known root binary by exact name match
|
||||
description = "Unknown root binary"
|
||||
for known_binary in known_root_binaries:
|
||||
if binary_name == known_binary.lower():
|
||||
description = known_root_binaries[known_binary]
|
||||
break
|
||||
|
||||
result = {
|
||||
"path": path.strip(),
|
||||
"binary_name": binary_name,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
self.results.append(result)
|
||||
|
||||
self.log.info("Found %d root binaries", len(self.results))
|
||||
@@ -55,9 +55,8 @@ class SMS(AndroidQFModule):
|
||||
|
||||
ioc_match = self.indicators.check_domains(message.get("links", []))
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def parse_backup(self, data):
|
||||
|
||||
@@ -9,7 +9,7 @@ import os
|
||||
from tarfile import TarFile
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import ModuleResults, MVTModule
|
||||
|
||||
|
||||
class BackupModule(MVTModule):
|
||||
@@ -32,10 +32,10 @@ class BackupModule(MVTModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.ab = None
|
||||
self.backup_path = None
|
||||
self.tar = None
|
||||
self.files = []
|
||||
self.ab: Optional[str] = None
|
||||
self.backup_path: Optional[str] = None
|
||||
self.tar: Optional[TarFile] = None
|
||||
self.files: list = []
|
||||
|
||||
def from_dir(self, backup_path: Optional[str], files: List[str]) -> None:
|
||||
self.backup_path = backup_path
|
||||
@@ -55,12 +55,15 @@ class BackupModule(MVTModule):
|
||||
return fnmatch.filter(self.files, pattern)
|
||||
|
||||
def _get_file_content(self, file_path: str) -> bytes:
|
||||
handle = None
|
||||
if self.tar:
|
||||
try:
|
||||
member = self.tar.getmember(file_path)
|
||||
handle = self.tar.extractfile(member)
|
||||
if not handle:
|
||||
raise ValueError(f"Could not extract file: {file_path}")
|
||||
except KeyError:
|
||||
return None
|
||||
handle = self.tar.extractfile(member)
|
||||
raise FileNotFoundError(f"File not found in tar: {file_path}")
|
||||
elif self.backup_path:
|
||||
handle = open(os.path.join(self.backup_path, file_path), "rb")
|
||||
else:
|
||||
|
||||
@@ -4,12 +4,12 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from mvt.android.modules.backup.base import BackupModule
|
||||
from mvt.android.parsers.backup import parse_sms_file
|
||||
from mvt.common.utils import check_for_links
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.common.utils import check_for_links
|
||||
|
||||
|
||||
class SMS(BackupModule):
|
||||
@@ -30,7 +30,7 @@ class SMS(BackupModule):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = []
|
||||
self.results: list[dict[str, Any]] = []
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
if not self.indicators:
|
||||
@@ -46,9 +46,8 @@ class SMS(BackupModule):
|
||||
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
|
||||
@@ -6,11 +6,10 @@ import datetime
|
||||
import fnmatch
|
||||
import logging
|
||||
import os
|
||||
|
||||
from typing import List, Optional
|
||||
from zipfile import ZipFile
|
||||
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import ModuleResults, MVTModule
|
||||
|
||||
|
||||
class BugReportModule(MVTModule):
|
||||
@@ -69,6 +68,8 @@ class BugReportModule(MVTModule):
|
||||
if self.zip_archive:
|
||||
handle = self.zip_archive.open(file_path)
|
||||
else:
|
||||
if not self.extract_path:
|
||||
raise ValueError("extract_path is not set")
|
||||
handle = open(os.path.join(self.extract_path, file_path), "rb")
|
||||
|
||||
data = handle.read()
|
||||
@@ -76,7 +77,7 @@ class BugReportModule(MVTModule):
|
||||
|
||||
return data
|
||||
|
||||
def _get_dumpstate_file(self) -> bytes:
|
||||
def _get_dumpstate_file(self) -> Optional[bytes]:
|
||||
main = self._get_files_by_pattern("main_entry.txt")
|
||||
if main:
|
||||
main_content = self._get_file_content(main[0])
|
||||
@@ -91,10 +92,12 @@ class BugReportModule(MVTModule):
|
||||
|
||||
return self._get_file_content(dumpstate_logs[0])
|
||||
|
||||
def _get_file_modification_time(self, file_path: str) -> dict:
|
||||
def _get_file_modification_time(self, file_path: str) -> datetime.datetime:
|
||||
if self.zip_archive:
|
||||
file_timetuple = self.zip_archive.getinfo(file_path).date_time
|
||||
return datetime.datetime(*file_timetuple)
|
||||
else:
|
||||
if not self.extract_path:
|
||||
raise ValueError("extract_path is not set")
|
||||
file_stat = os.stat(os.path.join(self.extract_path, file_path))
|
||||
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
|
||||
|
||||
@@ -6,9 +6,9 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
|
||||
from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRESHOLD
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -43,8 +43,9 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
|
||||
)
|
||||
return
|
||||
|
||||
data = data.decode("utf-8", errors="replace")
|
||||
content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:")
|
||||
content = self.extract_dumpsys_section(
|
||||
data.decode("utf-8", errors="replace"), "DUMP OF SERVICE package:"
|
||||
)
|
||||
self.parse(content)
|
||||
|
||||
for result in self.results:
|
||||
|
||||
@@ -35,6 +35,20 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
|
||||
self.results = results if results else {}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if self.indicators:
|
||||
receiver_name = self.results[result][0]["receiver"]
|
||||
|
||||
# return IoC if the stix2 process name a substring of the receiver name
|
||||
ioc = self.indicators.check_receiver_prefix(receiver_name)
|
||||
if ioc:
|
||||
self.results[result][0]["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
|
||||
|
||||
def run(self) -> None:
|
||||
content = self._get_dumpstate_file()
|
||||
if not content:
|
||||
|
||||
@@ -231,6 +231,7 @@ def parse_sms_file(data):
|
||||
entry.pop("mms_body")
|
||||
|
||||
body = entry.get("body", None)
|
||||
message_links = None
|
||||
if body:
|
||||
message_links = check_for_links(entry["body"])
|
||||
|
||||
|
||||
@@ -6,14 +6,13 @@ from datetime import datetime, timedelta
|
||||
from typing import List
|
||||
|
||||
|
||||
def warn_android_patch_level(patch_level: str, log) -> str:
|
||||
def warn_android_patch_level(patch_level: str, log) -> str | bool:
|
||||
"""Alert if Android patch level out-of-date"""
|
||||
patch_date = datetime.strptime(patch_level, "%Y-%m-%d")
|
||||
if (datetime.now() - patch_date) > timedelta(days=6 * 31):
|
||||
warning_message = (
|
||||
f"This phone has not received security updates "
|
||||
f"for more than six months (last update: {patch_level}).",
|
||||
patch_level,
|
||||
f"for more than six months (last update: {patch_level})."
|
||||
)
|
||||
return warning_message
|
||||
|
||||
|
||||
@@ -4,12 +4,13 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import csv
|
||||
import inspect
|
||||
import logging
|
||||
from dataclasses import asdict, dataclass
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict, Any, Optional
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from .log import INFO_ALERT, LOW_ALERT, HIGH_ALERT, CRITICAL_ALERT, MEDIUM_ALERT
|
||||
from .log import CRITICAL_ALERT, HIGH_ALERT, INFO_ALERT, LOW_ALERT, MEDIUM_ALERT
|
||||
from .module_types import ModuleAtomicResult
|
||||
|
||||
|
||||
@@ -28,6 +29,7 @@ class Alert:
|
||||
message: str
|
||||
event_time: str
|
||||
event: ModuleAtomicResult
|
||||
matched_indicator: Optional[Any] = None
|
||||
|
||||
|
||||
class AlertStore:
|
||||
@@ -35,78 +37,138 @@ class AlertStore:
|
||||
self.__alerts: List[Alert] = []
|
||||
self.__log = log
|
||||
|
||||
def _get_calling_module(self) -> str:
|
||||
"""
|
||||
Automatically detect the calling MVT module and return its slug.
|
||||
|
||||
Walks up the call stack to find the first frame that belongs to an MVT module
|
||||
(artifact or extraction module) and extracts its slug.
|
||||
|
||||
:return: Module slug string
|
||||
"""
|
||||
frame = inspect.currentframe()
|
||||
try:
|
||||
# Walk up the call stack
|
||||
while frame is not None:
|
||||
frame = frame.f_back
|
||||
if frame is None:
|
||||
break
|
||||
|
||||
# Get the 'self' object from the frame's local variables
|
||||
frame_locals = frame.f_locals
|
||||
if "self" in frame_locals:
|
||||
obj = frame_locals["self"]
|
||||
# Check if it has a get_slug method (MVT modules have this)
|
||||
if hasattr(obj, "get_slug") and callable(obj.get_slug):
|
||||
try:
|
||||
return str(obj.get_slug())
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback: return "unknown" if we can't find the module
|
||||
return "unknown"
|
||||
finally:
|
||||
del frame
|
||||
|
||||
@property
|
||||
def alerts(self) -> List[Alert]:
|
||||
return self.__alerts
|
||||
|
||||
def add(self, alert: Alert) -> None:
|
||||
self.__alerts.append(alert)
|
||||
self.log(alert)
|
||||
|
||||
def extend(self, alerts: List[Alert]) -> None:
|
||||
self.__alerts.extend(alerts)
|
||||
for alert in alerts:
|
||||
self.add(alert)
|
||||
|
||||
def info(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
self,
|
||||
message: str,
|
||||
event_time: str,
|
||||
event: ModuleAtomicResult,
|
||||
matched_indicator: Optional[Any] = None,
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.INFORMATIONAL,
|
||||
module=module,
|
||||
module=self._get_calling_module(),
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
matched_indicator=matched_indicator,
|
||||
)
|
||||
)
|
||||
|
||||
def low(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
self,
|
||||
message: str,
|
||||
event_time: str,
|
||||
event: ModuleAtomicResult,
|
||||
matched_indicator: Optional[Any] = None,
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.LOW,
|
||||
module=module,
|
||||
module=self._get_calling_module(),
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
matched_indicator=matched_indicator,
|
||||
)
|
||||
)
|
||||
|
||||
def medium(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
self,
|
||||
message: str,
|
||||
event_time: str,
|
||||
event: ModuleAtomicResult,
|
||||
matched_indicator: Optional[Any] = None,
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.MEDIUM,
|
||||
module=module,
|
||||
module=self._get_calling_module(),
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
matched_indicator=matched_indicator,
|
||||
)
|
||||
)
|
||||
|
||||
def high(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
self,
|
||||
message: str,
|
||||
event_time: str,
|
||||
event: ModuleAtomicResult,
|
||||
matched_indicator: Optional[Any] = None,
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.HIGH,
|
||||
module=module,
|
||||
module=self._get_calling_module(),
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
matched_indicator=matched_indicator,
|
||||
)
|
||||
)
|
||||
|
||||
def critical(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
self,
|
||||
message: str,
|
||||
event_time: str,
|
||||
event: ModuleAtomicResult,
|
||||
matched_indicator: Optional[Any] = None,
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.CRITICAL,
|
||||
module=module,
|
||||
module=self._get_calling_module(),
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
matched_indicator=matched_indicator,
|
||||
)
|
||||
)
|
||||
|
||||
|
||||
@@ -24,6 +24,8 @@ class CmdCheckIOCS(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -35,6 +37,8 @@ class CmdCheckIOCS(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-iocs"
|
||||
@@ -69,6 +73,10 @@ class CmdCheckIOCS(Command):
|
||||
m = iocs_module.from_json(
|
||||
file_path, log=logging.getLogger(iocs_module.__module__)
|
||||
)
|
||||
if not m:
|
||||
log.warning("No result from this module, skipping it")
|
||||
continue
|
||||
|
||||
if self.iocs.total_ioc_count > 0:
|
||||
m.indicators = self.iocs
|
||||
m.indicators.log = m.log
|
||||
|
||||
@@ -8,11 +8,14 @@ import logging
|
||||
import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from typing import Any, Optional
|
||||
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
|
||||
from .alerts import AlertLevel, AlertStore
|
||||
from .config import settings
|
||||
from .indicators import Indicators
|
||||
from .module import MVTModule, run_module, save_timeline
|
||||
from .utils import (
|
||||
@@ -20,8 +23,6 @@ from .utils import (
|
||||
generate_hashes_from_path,
|
||||
get_sha256_from_file_path,
|
||||
)
|
||||
from .config import settings
|
||||
from .alerts import AlertStore, AlertLevel
|
||||
from .version import MVT_VERSION
|
||||
|
||||
|
||||
@@ -38,9 +39,11 @@ class Command:
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
self.name = ""
|
||||
self.modules = []
|
||||
self.modules: list[Any] = []
|
||||
|
||||
self.target_path = target_path
|
||||
self.results_path = results_path
|
||||
@@ -49,6 +52,8 @@ class Command:
|
||||
self.serial = serial
|
||||
self.log = log
|
||||
self.sub_command = sub_command
|
||||
self.disable_version_check = disable_version_check
|
||||
self.disable_indicator_check = disable_indicator_check
|
||||
|
||||
# This dictionary can contain options that will be passed down from
|
||||
# the Command to all modules. This can for example be used to pass
|
||||
@@ -57,10 +62,10 @@ class Command:
|
||||
|
||||
# This list will contain all executed modules.
|
||||
# We can use this to reference e.g. self.executed[0].results.
|
||||
self.executed = []
|
||||
self.executed: list[Any] = []
|
||||
self.hashes = hashes
|
||||
self.hash_values = []
|
||||
self.timeline = []
|
||||
self.hash_values: list[dict[str, Any]] = []
|
||||
self.timeline: list[dict[str, Any]] = []
|
||||
|
||||
# Load IOCs
|
||||
self._create_storage()
|
||||
@@ -149,11 +154,11 @@ class Command:
|
||||
if not self.results_path:
|
||||
return
|
||||
|
||||
target_path = None
|
||||
target_path: Optional[str] = None
|
||||
if self.target_path:
|
||||
target_path = os.path.abspath(self.target_path)
|
||||
|
||||
info = {
|
||||
info: dict[str, Any] = {
|
||||
"target_path": target_path,
|
||||
"mvt_version": MVT_VERSION,
|
||||
"date": convert_datetime_to_iso(datetime.now()),
|
||||
|
||||
@@ -1,13 +1,12 @@
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
import os
|
||||
from typing import Optional, Tuple, Type
|
||||
|
||||
from typing import Tuple, Type, Optional
|
||||
import yaml
|
||||
from appdirs import user_config_dir
|
||||
from pydantic import AnyHttpUrl, Field
|
||||
from pydantic import Field
|
||||
from pydantic_settings import (
|
||||
BaseSettings,
|
||||
InitSettingsSource,
|
||||
PydanticBaseSettingsSource,
|
||||
SettingsConfigDict,
|
||||
YamlConfigSettingsSource,
|
||||
@@ -22,51 +21,51 @@ class MVTSettings(BaseSettings):
|
||||
env_prefix="MVT_",
|
||||
env_nested_delimiter="_",
|
||||
extra="ignore",
|
||||
nested_model_default_partial_updates=True,
|
||||
)
|
||||
# Allow to decided if want to load environment variables
|
||||
load_env: bool = Field(True, exclude=True)
|
||||
|
||||
# General settings
|
||||
PYPI_UPDATE_URL: AnyHttpUrl = Field(
|
||||
"https://pypi.org/pypi/mvt/json",
|
||||
validate_default=False,
|
||||
PYPI_UPDATE_URL: str = Field(
|
||||
default="https://pypi.org/pypi/mvt/json",
|
||||
)
|
||||
NETWORK_ACCESS_ALLOWED: bool = True
|
||||
NETWORK_TIMEOUT: int = 15
|
||||
|
||||
# Command default settings, all can be specified by MVT_ prefixed environment variables too.
|
||||
IOS_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt iOS backups"
|
||||
default=None, description="Default password to use to decrypt iOS backups"
|
||||
)
|
||||
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt Android backups"
|
||||
default=None, description="Default password to use to decrypt Android backups"
|
||||
)
|
||||
STIX2: Optional[str] = Field(
|
||||
None, description="List of directories where STIX2 files are stored"
|
||||
default=None, description="List of directories where STIX2 files are stored"
|
||||
)
|
||||
VT_API_KEY: Optional[str] = Field(
|
||||
None, description="API key to use for VirusTotal lookups"
|
||||
default=None, description="API key to use for VirusTotal lookups"
|
||||
)
|
||||
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
|
||||
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
|
||||
PROFILE: bool = Field(
|
||||
default=False, description="Profile the execution of MVT modules"
|
||||
)
|
||||
HASH_FILES: bool = Field(default=False, description="Should MVT hash output files")
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: Type[BaseSettings],
|
||||
init_settings: InitSettingsSource,
|
||||
init_settings: PydanticBaseSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> Tuple[PydanticBaseSettingsSource, ...]:
|
||||
sources = (
|
||||
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
|
||||
yaml_source = YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH)
|
||||
sources: Tuple[PydanticBaseSettingsSource, ...] = (
|
||||
yaml_source,
|
||||
init_settings,
|
||||
)
|
||||
# Load env variables if enabled
|
||||
if init_settings.init_kwargs.get("load_env", True):
|
||||
sources = (env_settings,) + sources
|
||||
# Always load env variables by default
|
||||
sources = (env_settings,) + sources
|
||||
return sources
|
||||
|
||||
def save_settings(
|
||||
@@ -94,11 +93,11 @@ class MVTSettings(BaseSettings):
|
||||
Afterwards we load the settings again, this time including the env variables.
|
||||
"""
|
||||
# Set invalid env prefix to avoid loading env variables.
|
||||
settings = MVTSettings(load_env=False)
|
||||
settings = cls(load_env=False)
|
||||
settings.save_settings()
|
||||
|
||||
# Load the settings again with any ENV variables.
|
||||
settings = MVTSettings(load_env=True)
|
||||
settings = cls(load_env=True)
|
||||
return settings
|
||||
|
||||
|
||||
|
||||
@@ -15,6 +15,8 @@ HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
|
||||
HELP_MSG_VERBOSE = "Verbose mode"
|
||||
HELP_MSG_CHECK_IOCS = "Compare stored JSON results to provided indicators"
|
||||
HELP_MSG_STIX2 = "Download public STIX2 indicators"
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK = "Disable MVT version update check"
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK = "Disable indicators update check"
|
||||
|
||||
# IOS Specific
|
||||
HELP_MSG_DECRYPT_BACKUP = "Decrypt an encrypted iTunes backup"
|
||||
|
||||
@@ -7,15 +7,15 @@ import glob
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, Iterator, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, Iterator, List, Optional, Union
|
||||
|
||||
import ahocorasick
|
||||
from appdirs import user_data_dir
|
||||
|
||||
from .url import URL
|
||||
from .config import settings
|
||||
from .url import URL
|
||||
|
||||
MVT_DATA_FOLDER = user_data_dir("mvt")
|
||||
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
|
||||
@@ -68,7 +68,7 @@ class Indicators:
|
||||
self.parse_stix2(path)
|
||||
elif os.path.isdir(path):
|
||||
for file in glob.glob(
|
||||
os.path.join(path, "**", "*.stix2", recursive=True)
|
||||
os.path.join(path, "**", "*.stix2"), recursive=True
|
||||
):
|
||||
self.parse_stix2(file)
|
||||
else:
|
||||
@@ -350,7 +350,7 @@ class Indicators:
|
||||
|
||||
@lru_cache()
|
||||
def get_ioc_matcher(
|
||||
self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None
|
||||
self, ioc_type: Optional[str] = None, ioc_list: Optional[List[Indicator]] = None
|
||||
) -> ahocorasick.Automaton:
|
||||
"""
|
||||
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
|
||||
@@ -370,9 +370,9 @@ class Indicators:
|
||||
"""
|
||||
automaton = ahocorasick.Automaton()
|
||||
if ioc_type:
|
||||
iocs = self.get_iocs(ioc_type)
|
||||
iocs: Iterator[Indicator] = self.get_iocs(ioc_type)
|
||||
elif ioc_list:
|
||||
iocs = ioc_list
|
||||
iocs = iter(ioc_list)
|
||||
else:
|
||||
raise ValueError("Must provide either ioc_type or ioc_list")
|
||||
|
||||
@@ -718,9 +718,30 @@ class Indicators:
|
||||
|
||||
return None
|
||||
|
||||
def check_android_property_name(
|
||||
self, property_name: str
|
||||
) -> Optional[IndicatorMatch]:
|
||||
def check_receiver_prefix(self, receiver_name: str) -> Union[dict, None]:
|
||||
"""Check the provided receiver name against the list of indicators.
|
||||
An IoC match is detected when a substring of the receiver matches the indicator
|
||||
:param app_id: App ID to check against the list of indicators
|
||||
:type app_id: str
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not receiver_name:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("app_ids"):
|
||||
if ioc["value"].lower() in receiver_name.lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious receiver with name "%s" '
|
||||
'matching indicators from "%s"',
|
||||
receiver_name,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_android_property_name(self, property_name: str) -> Optional[dict]:
|
||||
"""Check the android property name against the list of indicators.
|
||||
|
||||
:param property_name: Name of the Android property
|
||||
|
||||
@@ -12,74 +12,85 @@ from .updates import IndicatorsUpdates, MVTUpdates
|
||||
from .version import MVT_VERSION
|
||||
|
||||
|
||||
def check_updates() -> None:
|
||||
def check_updates(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
log = logging.getLogger("mvt")
|
||||
|
||||
# First we check for MVT version updates.
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT version: %s", e)
|
||||
else:
|
||||
if latest_version:
|
||||
if not disable_version_check:
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt`[/bold]"
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT indicators: %s", e)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT version: %s", e
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
if latest_version:
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt` or with `pipx upgrade mvt`[/bold]"
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
if not disable_indicator_check:
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT indicators: %s", e
|
||||
)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
|
||||
|
||||
def logo() -> None:
|
||||
def logo(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
rich_print("\n")
|
||||
rich_print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
|
||||
rich_print("\t\thttps://mvt.re")
|
||||
rich_print(f"\t\tVersion: {MVT_VERSION}")
|
||||
|
||||
check_updates()
|
||||
check_updates(disable_version_check, disable_indicator_check)
|
||||
|
||||
rich_print("\n")
|
||||
|
||||
@@ -11,15 +11,15 @@ import re
|
||||
from dataclasses import asdict, is_dataclass
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from .utils import CustomJSONEncoder, exec_or_profile
|
||||
from .indicators import Indicators
|
||||
from .alerts import AlertStore
|
||||
from .indicators import Indicators
|
||||
from .module_types import (
|
||||
ModuleResults,
|
||||
ModuleTimeline,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleTimeline,
|
||||
)
|
||||
from .utils import CustomJSONEncoder, exec_or_profile
|
||||
|
||||
|
||||
class DatabaseNotFoundError(Exception):
|
||||
@@ -77,7 +77,6 @@ class MVTModule:
|
||||
|
||||
self.results: ModuleResults = results if results else []
|
||||
self.timeline: ModuleTimeline = []
|
||||
self.timeline_detected: ModuleTimeline = []
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str, log: logging.Logger):
|
||||
@@ -147,7 +146,10 @@ class MVTModule:
|
||||
for record in timeline:
|
||||
timeline_set.add(
|
||||
json.dumps(
|
||||
asdict(record) if is_dataclass(record) else record, sort_keys=True
|
||||
asdict(record)
|
||||
if is_dataclass(record) and not isinstance(record, type)
|
||||
else record,
|
||||
sort_keys=True,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -162,21 +164,12 @@ class MVTModule:
|
||||
record: ModuleSerializedResult = self.serialize(result)
|
||||
if record:
|
||||
if isinstance(record, list):
|
||||
self.timeline.extend(record)
|
||||
self.timeline.extend(record) # type: ignore[arg-type]
|
||||
else:
|
||||
self.timeline.append(record)
|
||||
|
||||
# for detected in self.alertstore.alerts:
|
||||
# record = self.serialize(detected)
|
||||
# if record:
|
||||
# if isinstance(record, list):
|
||||
# self.timeline_detected.extend(record)
|
||||
# else:
|
||||
# self.timeline_detected.append(record)
|
||||
self.timeline.append(record) # type: ignore[arg-type]
|
||||
|
||||
# De-duplicate timeline entries.
|
||||
self.timeline = self._deduplicate_timeline(self.timeline)
|
||||
# self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the main module procedure."""
|
||||
|
||||
@@ -3,15 +3,17 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .indicators import Indicator
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Union, Optional
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModuleAtomicResult:
|
||||
timestamp: Optional[str]
|
||||
matched_indicator: Optional[Indicator]
|
||||
# ModuleAtomicResult is a flexible dictionary that can contain any data.
|
||||
# Common fields include:
|
||||
# - timestamp: Optional[str] - timestamp string
|
||||
# - isodate: Optional[str] - ISO formatted date string
|
||||
# - matched_indicator: Optional[Indicator] - indicator that matched this result
|
||||
# - Any other module-specific fields
|
||||
ModuleAtomicResult = Dict[str, Any]
|
||||
|
||||
|
||||
ModuleResults = List[ModuleAtomicResult]
|
||||
@@ -26,4 +28,7 @@ class ModuleAtomicTimeline:
|
||||
|
||||
|
||||
ModuleTimeline = List[ModuleAtomicTimeline]
|
||||
ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline]
|
||||
# ModuleSerializedResult can be a proper timeline object or a plain dict for compatibility
|
||||
ModuleSerializedResult = Union[
|
||||
ModuleAtomicTimeline, ModuleTimeline, Dict[str, Any], List[Dict[str, Any]]
|
||||
]
|
||||
|
||||
@@ -12,9 +12,9 @@ import requests
|
||||
import yaml
|
||||
from packaging import version
|
||||
|
||||
from .config import settings
|
||||
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
|
||||
from .version import MVT_VERSION
|
||||
from .config import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -24,7 +24,11 @@ INDICATORS_CHECK_FREQUENCY = 12
|
||||
|
||||
class MVTUpdates:
|
||||
def check(self) -> str:
|
||||
res = requests.get(settings.PYPI_UPDATE_URL, timeout=15)
|
||||
try:
|
||||
res = requests.get(str(settings.PYPI_UPDATE_URL), timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to check for updates, skipping updates: %s", e)
|
||||
return ""
|
||||
data = res.json()
|
||||
latest_version = data.get("info", {}).get("version", "")
|
||||
|
||||
@@ -93,7 +97,12 @@ class IndicatorsUpdates:
|
||||
url = self.github_raw_url.format(
|
||||
self.index_owner, self.index_repo, self.index_branch, self.index_path
|
||||
)
|
||||
res = requests.get(url, timeout=15)
|
||||
try:
|
||||
res = requests.get(url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to retrieve indicators index from %s: %s", url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to retrieve indicators index located at %s (error %d)",
|
||||
@@ -105,7 +114,12 @@ class IndicatorsUpdates:
|
||||
return yaml.safe_load(res.content)
|
||||
|
||||
def download_remote_ioc(self, ioc_url: str) -> Optional[str]:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to download indicators file from %s: %s", ioc_url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to download indicators file from %s (error %d)",
|
||||
@@ -171,7 +185,12 @@ class IndicatorsUpdates:
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
)
|
||||
res = requests.get(file_commit_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to get details about file %s: %s", file_commit_url, e)
|
||||
return -1
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to get details about file %s (error %d)",
|
||||
|
||||
@@ -338,11 +338,12 @@ class URL:
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
return (
|
||||
get_tld(self.url, as_object=True, fix_protocol=True)
|
||||
.parsed_url.netloc.lower()
|
||||
.lstrip("www.")
|
||||
)
|
||||
tld_obj = get_tld(self.url, as_object=True, fix_protocol=True)
|
||||
if isinstance(tld_obj, str):
|
||||
return tld_obj
|
||||
if tld_obj is None:
|
||||
return ""
|
||||
return tld_obj.parsed_url.netloc.lower().lstrip("www.")
|
||||
|
||||
def get_top_level(self) -> str:
|
||||
"""Get only the top-level domain from a URL.
|
||||
@@ -351,7 +352,12 @@ class URL:
|
||||
:rtype: str
|
||||
|
||||
"""
|
||||
return get_tld(self.url, as_object=True, fix_protocol=True).fld.lower()
|
||||
tld_obj = get_tld(self.url, as_object=True, fix_protocol=True)
|
||||
if isinstance(tld_obj, str):
|
||||
return tld_obj
|
||||
if tld_obj is None:
|
||||
return ""
|
||||
return tld_obj.fld.lower()
|
||||
|
||||
def check_if_shortened(self) -> bool:
|
||||
"""Check if the URL is among list of shortener services.
|
||||
|
||||
@@ -37,6 +37,8 @@ from mvt.common.help import (
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_CHECK_IOS_BACKUP,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
from .cmd_check_backup import CmdIOSCheckBackup
|
||||
from .cmd_check_fs import CmdIOSCheckFS
|
||||
@@ -53,12 +55,37 @@ MVT_IOS_BACKUP_PASSWORD = "MVT_IOS_BACKUP_PASSWORD"
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -219,6 +246,8 @@ def check_backup(
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -263,6 +292,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -293,7 +324,13 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
|
||||
|
||||
if list_modules:
|
||||
|
||||
@@ -27,6 +27,8 @@ class CmdIOSCheckBackup(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -39,6 +41,8 @@ class CmdIOSCheckBackup(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
|
||||
@@ -27,6 +27,8 @@ class CmdIOSCheckFS(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -34,11 +36,12 @@ class CmdIOSCheckFS(Command):
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-fs"
|
||||
|
||||
@@ -194,5 +194,41 @@
|
||||
{
|
||||
"identifier": "iPhone16,2",
|
||||
"description": "iPhone 15 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,1",
|
||||
"description": "iPhone 16 Pro"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,2",
|
||||
"description": "iPhone 16 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,3",
|
||||
"description": "iPhone 16"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,4",
|
||||
"description": "iPhone 16 Plus"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,5",
|
||||
"description": "iPhone 16e"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,1",
|
||||
"description": "iPhone 17 Pro"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,2",
|
||||
"description": "iPhone 17 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,3",
|
||||
"description": "iPhone 17"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,4",
|
||||
"description": "iPhone Air"
|
||||
}
|
||||
]
|
||||
|
||||
@@ -891,6 +891,14 @@
|
||||
"version": "15.8.2",
|
||||
"build": "19H384"
|
||||
},
|
||||
{
|
||||
"version": "15.8.4",
|
||||
"build": "19H390"
|
||||
},
|
||||
{
|
||||
"version": "15.8.5",
|
||||
"build": "19H394"
|
||||
},
|
||||
{
|
||||
"build": "20A362",
|
||||
"version": "16.0"
|
||||
@@ -992,6 +1000,14 @@
|
||||
"version": "16.7.8",
|
||||
"build": "20H343"
|
||||
},
|
||||
{
|
||||
"version": "16.7.11",
|
||||
"build": "20H360"
|
||||
},
|
||||
{
|
||||
"version": "16.7.12",
|
||||
"build": "20H364"
|
||||
},
|
||||
{
|
||||
"version": "17.0",
|
||||
"build": "21A327"
|
||||
@@ -1076,6 +1092,10 @@
|
||||
"version": "17.6.1",
|
||||
"build": "21G101"
|
||||
},
|
||||
{
|
||||
"version": "17.7.7",
|
||||
"build": "21H433"
|
||||
},
|
||||
{
|
||||
"version": "18",
|
||||
"build": "22A3354"
|
||||
@@ -1103,5 +1123,61 @@
|
||||
{
|
||||
"version": "18.3",
|
||||
"build": "22D63"
|
||||
},
|
||||
{
|
||||
"version": "18.3.1",
|
||||
"build": "22D72"
|
||||
},
|
||||
{
|
||||
"version": "18.4",
|
||||
"build": "22E240"
|
||||
},
|
||||
{
|
||||
"version": "18.4.1",
|
||||
"build": "22E252"
|
||||
},
|
||||
{
|
||||
"version": "18.5",
|
||||
"build": "22F76"
|
||||
},
|
||||
{
|
||||
"version": "18.6",
|
||||
"build": "22G86"
|
||||
},
|
||||
{
|
||||
"version": "18.6.1",
|
||||
"build": "22G90"
|
||||
},
|
||||
{
|
||||
"version": "18.6.2",
|
||||
"build": "22G100"
|
||||
},
|
||||
{
|
||||
"version": "18.7",
|
||||
"build": "22H20"
|
||||
},
|
||||
{
|
||||
"version": "18.7.2",
|
||||
"build": "22H124"
|
||||
},
|
||||
{
|
||||
"version": "18.7.3",
|
||||
"build": "22H217"
|
||||
},
|
||||
{
|
||||
"version": "26",
|
||||
"build": "23A341"
|
||||
},
|
||||
{
|
||||
"version": "26.0.1",
|
||||
"build": "23A355"
|
||||
},
|
||||
{
|
||||
"version": "26.1",
|
||||
"build": "23B85"
|
||||
},
|
||||
{
|
||||
"version": "26.2",
|
||||
"build": "23C55"
|
||||
}
|
||||
]
|
||||
@@ -58,6 +58,7 @@ class DecryptBackup:
|
||||
def _process_file(
|
||||
self, relative_path: str, domain: str, item, file_id: str, item_folder: str
|
||||
) -> None:
|
||||
assert self._backup is not None
|
||||
self._backup.getFileDecryptedCopy(
|
||||
manifestEntry=item, targetName=file_id, targetFolder=item_folder
|
||||
)
|
||||
@@ -70,6 +71,9 @@ class DecryptBackup:
|
||||
)
|
||||
|
||||
def process_backup(self) -> None:
|
||||
assert self._backup is not None
|
||||
assert self.dest_path is not None
|
||||
|
||||
if not os.path.exists(self.dest_path):
|
||||
os.makedirs(self.dest_path)
|
||||
|
||||
@@ -97,7 +101,7 @@ class DecryptBackup:
|
||||
)
|
||||
continue
|
||||
|
||||
item_folder = os.path.join(self.dest_path, file_id[0:2])
|
||||
item_folder = os.path.join(self.dest_path, file_id[0:2]) # type: ignore[arg-type]
|
||||
if not os.path.exists(item_folder):
|
||||
os.makedirs(item_folder)
|
||||
|
||||
|
||||
@@ -36,9 +36,11 @@ class BackupInfo(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.results = {}
|
||||
self.results: dict = {}
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
raise DatabaseNotFoundError("target_path is not set")
|
||||
info_path = os.path.join(self.target_path, "Info.plist")
|
||||
if not os.path.exists(info_path):
|
||||
raise DatabaseNotFoundError(
|
||||
|
||||
@@ -9,12 +9,12 @@ import plistlib
|
||||
from base64 import b64encode
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -72,12 +72,10 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
result["plist"]["PayloadUUID"]
|
||||
)
|
||||
if ioc_match:
|
||||
warning_message = (
|
||||
f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"',
|
||||
)
|
||||
warning_message = f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"'
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), warning_message, "", result
|
||||
warning_message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
@@ -85,10 +83,8 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
# Highlight suspicious configuration profiles which may be used
|
||||
# to hide notifications.
|
||||
if payload_content["PayloadType"] in ["com.apple.notificationsettings"]:
|
||||
warning_message = (
|
||||
f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}',
|
||||
)
|
||||
self.alertstore.medum(self.get_slug(), warning_message, "", result)
|
||||
warning_message = f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}'
|
||||
self.alertstore.medium(warning_message, "", result)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
|
||||
|
||||
@@ -11,13 +11,13 @@ import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module import DatabaseNotFoundError
|
||||
from mvt.common.url import URL
|
||||
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.url import URL
|
||||
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -66,7 +66,7 @@ class Manifest(IOSExtraction):
|
||||
return convert_unix_to_iso(timestamp_or_unix_time_int)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
records = []
|
||||
records: list = []
|
||||
if "modified" not in record or "status_changed" not in record:
|
||||
return records
|
||||
|
||||
@@ -103,7 +103,9 @@ class Manifest(IOSExtraction):
|
||||
ioc_match = self.indicators.check_file_path("/" + result["relative_path"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.high(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
rel_path = result["relative_path"].lower()
|
||||
@@ -118,13 +120,15 @@ class Manifest(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Found mention of domain "{ioc_match.ioc.value}" in a backup file with path: {rel_path}',
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
if not self.target_path:
|
||||
raise DatabaseNotFoundError("target_path is not set")
|
||||
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
|
||||
if not os.path.isfile(manifest_db_path):
|
||||
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
|
||||
|
||||
@@ -7,12 +7,12 @@ import logging
|
||||
import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -58,29 +58,31 @@ class ProfileEvents(IOSExtraction):
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
message = f'On {result.get("timestamp")} process "{result.get("process")}" started operation "{result.get("operation")}" of profile "{result.get("profile_id")}"'
|
||||
self.alertstore.low(
|
||||
self.get_slug(), message, result.get("timestamp"), result
|
||||
)
|
||||
self.alertstore.low(message, result.get("timestamp") or "", result)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_process(result.get("process"))
|
||||
ioc_match = self.indicators.check_process(result.get("process") or "")
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_profile(result.get("profile_id"))
|
||||
ioc_match = self.indicators.check_profile(result.get("profile_id") or "")
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def parse_profile_events(file_data: bytes) -> list:
|
||||
results = []
|
||||
results: list = []
|
||||
|
||||
events_plist = plistlib.loads(file_data)
|
||||
|
||||
|
||||
@@ -11,8 +11,12 @@ import sqlite3
|
||||
import subprocess
|
||||
from typing import Iterator, Optional, Union
|
||||
|
||||
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import (
|
||||
DatabaseCorruptedError,
|
||||
DatabaseNotFoundError,
|
||||
ModuleResults,
|
||||
MVTModule,
|
||||
)
|
||||
|
||||
|
||||
class IOSExtraction(MVTModule):
|
||||
@@ -110,6 +114,8 @@ class IOSExtraction(MVTModule):
|
||||
(Default value = None)
|
||||
|
||||
"""
|
||||
if not self.target_path:
|
||||
raise DatabaseNotFoundError("target_path is not set")
|
||||
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
|
||||
if not os.path.exists(manifest_db_path):
|
||||
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
|
||||
@@ -146,6 +152,8 @@ class IOSExtraction(MVTModule):
|
||||
}
|
||||
|
||||
def _get_backup_file_from_id(self, file_id: str) -> Union[str, None]:
|
||||
if not self.target_path:
|
||||
return None
|
||||
file_path = os.path.join(self.target_path, file_id[0:2], file_id)
|
||||
if os.path.exists(file_path):
|
||||
return file_path
|
||||
@@ -153,6 +161,8 @@ class IOSExtraction(MVTModule):
|
||||
return None
|
||||
|
||||
def _get_fs_files_from_patterns(self, root_paths: list) -> Iterator[str]:
|
||||
if not self.target_path:
|
||||
return
|
||||
for root_path in root_paths:
|
||||
for found_path in glob.glob(os.path.join(self.target_path, root_path)):
|
||||
if not os.path.exists(found_path):
|
||||
@@ -174,9 +184,10 @@ class IOSExtraction(MVTModule):
|
||||
:param backup_ids: Default value = None)
|
||||
|
||||
"""
|
||||
file_path = None
|
||||
file_path: Optional[str] = None
|
||||
# First we check if the was an explicit file path specified.
|
||||
if not self.file_path:
|
||||
# Type narrowing: we know self.file_path is None here, work with local file_path
|
||||
# If not, we first try with backups.
|
||||
# We construct the path to the file according to the iTunes backup
|
||||
# folder structure, if we have a valid ID.
|
||||
@@ -198,8 +209,9 @@ class IOSExtraction(MVTModule):
|
||||
|
||||
# If we do not find any, we fail.
|
||||
if file_path:
|
||||
self.file_path = file_path
|
||||
self.file_path = file_path # type: str
|
||||
else:
|
||||
raise DatabaseNotFoundError("unable to find the module's database file")
|
||||
|
||||
assert self.file_path is not None
|
||||
self._recover_sqlite_db_if_needed(self.file_path)
|
||||
|
||||
@@ -9,12 +9,12 @@ import plistlib
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -44,6 +44,7 @@ class Analytics(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
@@ -64,13 +65,11 @@ class Analytics(IOSExtraction):
|
||||
|
||||
ioc_match = self.indicators.check_process(value)
|
||||
if ioc_match:
|
||||
warning_message = (
|
||||
f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}',
|
||||
)
|
||||
warning_message = f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}'
|
||||
new_result = copy.copy(result)
|
||||
new_result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), warning_message, "", new_result
|
||||
warning_message, "", new_result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
@@ -80,7 +79,10 @@ class Analytics(IOSExtraction):
|
||||
new_result = copy.copy(result)
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", new_result
|
||||
ioc_match.message,
|
||||
"",
|
||||
new_result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
|
||||
def _extract_analytics_data(self):
|
||||
|
||||
@@ -10,9 +10,10 @@ from typing import Optional
|
||||
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
|
||||
@@ -95,7 +96,7 @@ class CacheFiles(IOSExtraction):
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self.results = {}
|
||||
self.results: dict = {}
|
||||
for root, _, files in os.walk(self.target_path):
|
||||
for file_name in files:
|
||||
if file_name != "Cache.db":
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -41,6 +41,7 @@ class SafariFavicon(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
@@ -61,7 +62,9 @@ class SafariFavicon(IOSExtraction):
|
||||
ioc_match = self.indicators.check_url(result["icon_url"])
|
||||
|
||||
if ioc_match:
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
def _process_favicon_db(self, file_path):
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -57,7 +57,9 @@ class ShutdownLog(IOSExtraction):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_file_path(result["client"])
|
||||
if ioc_match:
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
|
||||
@@ -66,10 +68,10 @@ class ShutdownLog(IOSExtraction):
|
||||
if ioc.value in parts:
|
||||
result["matched_indicator"] = ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(),
|
||||
f'Found mention of a known malicious process "{ioc.value}" in shutdown.log',
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc,
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
continue
|
||||
@@ -135,5 +137,8 @@ class ShutdownLog(IOSExtraction):
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(root_paths=SHUTDOWN_LOG_PATH)
|
||||
self.log.info("Found shutdown log at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
with open(self.file_path, "r", encoding="utf-8") as handle:
|
||||
self.process_shutdownlog(handle.read())
|
||||
|
||||
@@ -8,12 +8,12 @@ import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -42,6 +42,7 @@ class IOSVersionHistory(IOSExtraction):
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results: list = []
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
return {
|
||||
|
||||
@@ -21,7 +21,9 @@ class WebkitBase(IOSExtraction):
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def _process_webkit_folder(self, root_paths):
|
||||
|
||||
@@ -11,10 +11,14 @@ from datetime import datetime, timezone
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
from mvt.common.module import DatabaseNotFoundError
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.ios.modules.base import IOSExtraction
|
||||
from mvt.common.module import ModuleResults, ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
APPLICATIONS_DB_PATH = [
|
||||
"private/var/containers/Bundle/Application/*/iTunesMetadata.plist"
|
||||
@@ -63,7 +67,6 @@ class Applications(IOSExtraction):
|
||||
if self.indicators:
|
||||
if "softwareVersionBundleId" not in result:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
"Suspicious application identified without softwareVersionBundleId",
|
||||
"",
|
||||
result,
|
||||
@@ -76,10 +79,10 @@ class Applications(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(),
|
||||
f"Malicious application {result['softwareVersionBundleId']} identified",
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -89,10 +92,10 @@ class Applications(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(),
|
||||
f"Malicious application {result['softwareVersionBundleId']} identified",
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -102,7 +105,6 @@ class Applications(IOSExtraction):
|
||||
not in KNOWN_APP_INSTALLERS
|
||||
):
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f"Suspicious app not installed from the App Store or MDM: {result['softwareVersionBundleId']}",
|
||||
"",
|
||||
result,
|
||||
@@ -157,6 +159,8 @@ class Applications(IOSExtraction):
|
||||
|
||||
def run(self) -> None:
|
||||
if self.is_backup:
|
||||
if not self.target_path:
|
||||
return
|
||||
plist_path = os.path.join(self.target_path, "Info.plist")
|
||||
if not os.path.isfile(plist_path):
|
||||
raise DatabaseNotFoundError("Impossible to find Info.plist file")
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -73,14 +73,13 @@ class Calendar(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
# Custom check for Quadream exploit
|
||||
if result["summary"] == "Meeting" and result["description"] == "Notes":
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Potential Quadream exploit event identified: {result['uuid']}",
|
||||
"",
|
||||
result,
|
||||
|
||||
@@ -6,8 +6,8 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -22,9 +22,9 @@ class Calls(IOSExtraction):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: str = None,
|
||||
target_path: str = None,
|
||||
results_path: str = None,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: list = [],
|
||||
@@ -53,6 +53,8 @@ class Calls(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Calls database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -62,7 +62,9 @@ class ChromeFavicon(IOSExtraction):
|
||||
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
def run(self) -> None:
|
||||
@@ -71,6 +73,8 @@ class ChromeFavicon(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
|
||||
# Fetch icon cache
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -63,7 +63,9 @@ class ChromeHistory(IOSExtraction):
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(
|
||||
@@ -71,6 +73,8 @@ class ChromeHistory(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Chrome history database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -8,6 +8,7 @@ import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
CONTACTS_BACKUP_IDS = [
|
||||
@@ -45,6 +46,8 @@ class Contacts(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Contacts database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -64,7 +64,9 @@ class FirefoxFavicon(IOSExtraction):
|
||||
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(
|
||||
@@ -72,6 +74,8 @@ class FirefoxFavicon(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Firefox favicon database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -6,12 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -64,7 +64,9 @@ class FirefoxHistory(IOSExtraction):
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(
|
||||
@@ -72,6 +74,8 @@ class FirefoxHistory(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Firefox history database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -8,6 +8,7 @@ import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
GLOBAL_PREFERENCES_BACKUP_IDS = ["0dc926a1810f7aee4e8f38793ed788701f93bf9d"]
|
||||
@@ -41,14 +42,9 @@ class GlobalPreferences(IOSExtraction):
|
||||
for entry in self.results:
|
||||
if entry["entry"] == "LDMGlobalEnabled":
|
||||
if entry["value"]:
|
||||
self.alertstore.info(
|
||||
self.get_slug(), "Lockdown mode enabled", "", None
|
||||
)
|
||||
self.alertstore.info("Lockdown mode enabled", "", entry)
|
||||
else:
|
||||
self.alertstore.low(
|
||||
self.get_slug(), "Lockdown mode disabled", "", None
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
self.alertstore.low("Lockdown mode disabled", "", entry)
|
||||
continue
|
||||
|
||||
def process_file(self, file_path: str) -> None:
|
||||
@@ -65,6 +61,8 @@ class GlobalPreferences(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Global Preference database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
self.process_file(self.file_path)
|
||||
|
||||
self.log.info("Extracted a total of %d Global Preferences", len(self.results))
|
||||
|
||||
@@ -8,12 +8,12 @@ import logging
|
||||
import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -67,13 +67,12 @@ class IDStatusCache(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
if "\\x00\\x00" in result.get("user", ""):
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found an ID Status Cache entry with suspicious patterns: {result.get('user')}",
|
||||
"",
|
||||
result,
|
||||
|
||||
@@ -7,12 +7,12 @@ import logging
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -285,6 +285,8 @@ class InteractionC(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found InteractionC database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
|
||||
|
||||
@@ -8,12 +8,12 @@ import logging
|
||||
import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -86,7 +86,6 @@ class LocationdClients(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found a suspicious process name in LocationD entry {result['package']}",
|
||||
"",
|
||||
result,
|
||||
@@ -99,7 +98,6 @@ class LocationdClients(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found a suspicious process name in LocationD entry {result['package']}",
|
||||
"",
|
||||
result,
|
||||
@@ -111,8 +109,7 @@ class LocationdClients(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found a suspicious file path in LocationD entry {result['BundlePath']}",
|
||||
f"Found a known malicious domain in LocationD entry {result['package']}",
|
||||
"",
|
||||
result,
|
||||
)
|
||||
@@ -124,7 +121,6 @@ class LocationdClients(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found a suspicious file path in LocationD entry {result['Executable']}",
|
||||
"",
|
||||
result,
|
||||
@@ -141,7 +137,6 @@ class LocationdClients(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f"Found a suspicious file path in LocationD entry {result['Registered']}",
|
||||
"",
|
||||
result,
|
||||
|
||||
@@ -7,12 +7,12 @@ import logging
|
||||
import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -65,7 +65,9 @@ class OSAnalyticsADDaily(IOSExtraction):
|
||||
ioc_match = self.indicators.check_process(result["package"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(
|
||||
@@ -76,6 +78,8 @@ class OSAnalyticsADDaily(IOSExtraction):
|
||||
"Found com.apple.osanalytics.addaily plist at path: %s", self.file_path
|
||||
)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
with open(self.file_path, "rb") as handle:
|
||||
file_plist = plistlib.load(handle)
|
||||
|
||||
|
||||
@@ -10,12 +10,12 @@ import plistlib
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -67,7 +67,7 @@ class SafariBrowserState(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
continue
|
||||
|
||||
@@ -80,7 +80,10 @@ class SafariBrowserState(IOSExtraction):
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc_match.message,
|
||||
"",
|
||||
result,
|
||||
matched_indicator=ioc_match.ioc,
|
||||
)
|
||||
|
||||
def _process_browser_state_db(self, db_path):
|
||||
@@ -104,14 +107,17 @@ class SafariBrowserState(IOSExtraction):
|
||||
)
|
||||
except sqlite3.OperationalError:
|
||||
# Old version iOS <12 likely
|
||||
cur.execute(
|
||||
try:
|
||||
cur.execute(
|
||||
"""
|
||||
SELECT
|
||||
title, url, user_visible_url, last_viewed_time, session_data
|
||||
FROM tabs
|
||||
ORDER BY last_viewed_time;
|
||||
"""
|
||||
SELECT
|
||||
title, url, user_visible_url, last_viewed_time, session_data
|
||||
FROM tabs
|
||||
ORDER BY last_viewed_time;
|
||||
"""
|
||||
)
|
||||
)
|
||||
except sqlite3.OperationalError as e:
|
||||
self.log.error(f"Error executing query: {e}")
|
||||
|
||||
for row in cur:
|
||||
session_entries = []
|
||||
|
||||
@@ -7,13 +7,13 @@ import logging
|
||||
import os
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.url import URL
|
||||
from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.url import URL
|
||||
from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -117,7 +117,9 @@ class SafariHistory(IOSExtraction):
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def _process_history_db(self, history_path):
|
||||
self._recover_sqlite_db_if_needed(history_path)
|
||||
|
||||
@@ -10,12 +10,12 @@ import plistlib
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import check_for_links, convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import check_for_links, convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -80,7 +80,9 @@ class Shortcuts(IOSExtraction):
|
||||
ioc_match = self.indicators.check_urls(result["action_urls"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(
|
||||
@@ -88,6 +90,8 @@ class Shortcuts(IOSExtraction):
|
||||
)
|
||||
self.log.info("Found Shortcuts database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
conn.text_factory = bytes
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -8,12 +8,12 @@ import sqlite3
|
||||
from base64 import b64encode
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import check_for_links, convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import check_for_links, convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -95,12 +95,17 @@ class SMS(IOSExtraction):
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
|
||||
self.log.info("Found SMS database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
|
||||
try:
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
@@ -118,6 +123,7 @@ class SMS(IOSExtraction):
|
||||
except sqlite3.DatabaseError as exc:
|
||||
conn.close()
|
||||
if "database disk image is malformed" in str(exc):
|
||||
assert self.file_path is not None
|
||||
self._recover_sqlite_db_if_needed(self.file_path, forced=True)
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -7,12 +7,12 @@ import logging
|
||||
from base64 import b64encode
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -65,9 +65,7 @@ class SMSAttachments(IOSExtraction):
|
||||
ioc_match = self.indicators.check_file_path(attachment["filename"])
|
||||
if ioc_match:
|
||||
attachment["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(), ioc_match.message, "", attachment
|
||||
)
|
||||
self.alertstore.high(ioc_match.message, "", attachment)
|
||||
|
||||
if (
|
||||
attachment["filename"].startswith("/var/tmp/")
|
||||
@@ -85,6 +83,8 @@ class SMSAttachments(IOSExtraction):
|
||||
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
|
||||
self.log.info("Found SMS database at path: %s", self.file_path)
|
||||
|
||||
if not self.file_path:
|
||||
return
|
||||
conn = self._open_sqlite_db(self.file_path)
|
||||
cur = conn.cursor()
|
||||
cur.execute(
|
||||
|
||||
@@ -7,12 +7,12 @@ import logging
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -96,8 +96,9 @@ class TCC(IOSExtraction):
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_process(result["client"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.critical(
|
||||
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
|
||||
)
|
||||
|
||||
def process_db(self, file_path):
|
||||
conn = self._open_sqlite_db(file_path)
|
||||
@@ -121,13 +122,16 @@ class TCC(IOSExtraction):
|
||||
)
|
||||
db_version = "v2"
|
||||
except sqlite3.OperationalError:
|
||||
cur.execute(
|
||||
"""SELECT
|
||||
service, client, client_type, allowed,
|
||||
prompt_count
|
||||
FROM access;"""
|
||||
)
|
||||
db_version = "v1"
|
||||
try:
|
||||
cur.execute(
|
||||
"""SELECT
|
||||
service, client, client_type, allowed,
|
||||
prompt_count
|
||||
FROM access;"""
|
||||
)
|
||||
db_version = "v1"
|
||||
except sqlite3.OperationalError as e:
|
||||
self.log.error(f"Error parsing TCC database: {e}")
|
||||
|
||||
for row in cur:
|
||||
service = row[0]
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user