mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-14 17:42:46 +00:00
Compare commits
82 Commits
refactor/s
...
auto/add-n
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c88a6b46f1 | ||
|
|
61947d17af | ||
|
|
7173e02a6f | ||
|
|
8f34902bed | ||
|
|
939bec82ff | ||
|
|
b183ca33b5 | ||
|
|
a2c9e0c6cf | ||
|
|
4bfad1f87d | ||
|
|
c3dc3d96d5 | ||
|
|
afab222f93 | ||
|
|
5a1166c416 | ||
|
|
dd3d665bea | ||
|
|
5c3b92aeee | ||
|
|
d7e058af43 | ||
|
|
cdbaad94cc | ||
|
|
981371bd8b | ||
|
|
c7d00978c6 | ||
|
|
339a1d0712 | ||
|
|
7009cddc8c | ||
|
|
9b4d10139c | ||
|
|
b795ea3129 | ||
|
|
5be5ffbf49 | ||
|
|
2701490501 | ||
|
|
779842567d | ||
|
|
d3cc8cf590 | ||
|
|
b8a42eaf8f | ||
|
|
62b880fbff | ||
|
|
0778d448df | ||
|
|
f020655a1a | ||
|
|
91c34e6664 | ||
|
|
b4a8dd226a | ||
|
|
88213e12c9 | ||
|
|
f75b8e186a | ||
|
|
5babc1fcf3 | ||
|
|
b723ebf28e | ||
|
|
616e870212 | ||
|
|
847b0e087b | ||
|
|
86a0772eb2 | ||
|
|
7d0be9db4f | ||
|
|
4e120b2640 | ||
|
|
dbe9e5db9b | ||
|
|
0b00398729 | ||
|
|
87034d2c7a | ||
|
|
595a2f6536 | ||
|
|
8ead44a31e | ||
|
|
5c19d02a73 | ||
|
|
14ebc9ee4e | ||
|
|
de53cc07f8 | ||
|
|
22e066fc4a | ||
|
|
242052b8ec | ||
|
|
1df61b5bbf | ||
|
|
b691de2cc0 | ||
|
|
10915f250c | ||
|
|
c60cef4009 | ||
|
|
dda798df8e | ||
|
|
ffe6ad2014 | ||
|
|
a125b20fc5 | ||
|
|
49108e67e2 | ||
|
|
883b450601 | ||
|
|
ce813568ff | ||
|
|
93303f181a | ||
|
|
bee453a090 | ||
|
|
42106aa4d6 | ||
|
|
95076c8f71 | ||
|
|
c9ac12f336 | ||
|
|
486e3e7e9b | ||
|
|
be1fc3bd8b | ||
|
|
4757cff262 | ||
|
|
61f51caf31 | ||
|
|
511063fd0e | ||
|
|
88bc5672cb | ||
|
|
0fce0acf7a | ||
|
|
61f95d07d3 | ||
|
|
3dedd169c4 | ||
|
|
e34e03d3a3 | ||
|
|
34374699ce | ||
|
|
cf5aa7c89f | ||
|
|
2766739512 | ||
|
|
9c84afb4b0 | ||
|
|
80fc8bd879 | ||
|
|
ca41f7f106 | ||
|
|
55ddd86ad5 |
11
.github/dependabot.yml
vendored
Normal file
11
.github/dependabot.yml
vendored
Normal file
@@ -0,0 +1,11 @@
|
||||
# To get started with Dependabot version updates, you'll need to specify which
|
||||
# package ecosystems to update and where the package manifests are located.
|
||||
# Please see the documentation for all configuration options:
|
||||
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
|
||||
|
||||
version: 2
|
||||
updates:
|
||||
- package-ecosystem: "pip" # See documentation for possible values
|
||||
directory: "/" # Location of package manifests
|
||||
schedule:
|
||||
interval: "weekly"
|
||||
4
.github/workflows/tests.yml
vendored
4
.github/workflows/tests.yml
vendored
@@ -12,7 +12,7 @@ jobs:
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
python-version: ['3.8', '3.9', '3.10'] # , '3.11']
|
||||
python-version: ['3.10', '3.11', '3.12', '3.13', '3.14']
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
@@ -35,4 +35,4 @@ jobs:
|
||||
if: github.event_name == 'pull_request'
|
||||
with:
|
||||
pytest-coverage-path: ./pytest-coverage.txt
|
||||
junitxml-path: ./pytest.xml
|
||||
junitxml-path: ./pytest.xml
|
||||
|
||||
1
.github/workflows/update-ios-data.yml
vendored
1
.github/workflows/update-ios-data.yml
vendored
@@ -21,6 +21,7 @@ jobs:
|
||||
title: '[auto] Update iOS releases and versions'
|
||||
commit-message: Add new iOS versions and build numbers
|
||||
branch: auto/add-new-ios-releases
|
||||
draft: true
|
||||
body: |
|
||||
This is an automated pull request to update the iOS releases and version numbers.
|
||||
add-paths: |
|
||||
|
||||
@@ -103,7 +103,7 @@ RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
|
||||
|
||||
|
||||
# Create main image
|
||||
FROM ubuntu:22.04 as main
|
||||
FROM ubuntu:24.04 as main
|
||||
|
||||
LABEL org.opencontainers.image.url="https://mvt.re"
|
||||
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
|
||||
@@ -135,8 +135,7 @@ COPY --from=build-usbmuxd /build /
|
||||
COPY . mvt/
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y git python3-pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --upgrade pip \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install ./mvt \
|
||||
&& PIP_NO_CACHE_DIR=1 pip3 install --break-system-packages ./mvt \
|
||||
&& apt-get remove -y python3-pip git && apt-get autoremove -y \
|
||||
&& rm -rf /var/lib/apt/lists/* \
|
||||
&& rm -rf mvt
|
||||
|
||||
9
Makefile
9
Makefile
@@ -1,14 +1,9 @@
|
||||
PWD = $(shell pwd)
|
||||
|
||||
autofix:
|
||||
ruff format .
|
||||
ruff check --fix .
|
||||
|
||||
check: ruff mypy
|
||||
|
||||
ruff:
|
||||
ruff format --check .
|
||||
ruff check -q .
|
||||
ruff check .
|
||||
|
||||
mypy:
|
||||
mypy
|
||||
@@ -23,7 +18,7 @@ install:
|
||||
python3 -m pip install --upgrade -e .
|
||||
|
||||
test-requirements:
|
||||
python3 -m pip install --upgrade -r test-requirements.txt
|
||||
python3 -m pip install --upgrade --group dev
|
||||
|
||||
generate-proto-parsers:
|
||||
# Generate python parsers for protobuf files
|
||||
|
||||
59
SECURITY.md
59
SECURITY.md
@@ -2,4 +2,61 @@
|
||||
|
||||
Thank you for your interest in reporting security issues and vulnerabilities! Security research is of utmost importance and we take all reports seriously. If you discover an issue please report it to us right away!
|
||||
|
||||
Please DO NOT file a public issue, instead send your report privately to *nex [at] nex [dot] sx*. You can also write PGP-encrypted emails to [this key](https://keybase.io/nex/pgp_keys.asc?fingerprint=05216f3b86848a303c2fe37dd166f1667359d880).
|
||||
Please DO NOT file a public issue, instead send your report privately to the MVT maintainers at Amnesty International via `security [at] amnesty [dot] tech`.
|
||||
|
||||
You can also write PGP-encrypted emails to key `CFBF9698DCA8EB2A80F48ADEA035A030FA04ED13`. The corresponding PGP public key is lited below.
|
||||
|
||||
```
|
||||
-----BEGIN PGP PUBLIC KEY BLOCK-----
|
||||
|
||||
mQINBGlFPwsBEADQ+d7SeHrFPYv3wPOjWs2oMpp0DPdfIyGbg+iYWOC36FegZhKY
|
||||
+WeK96GqJWt8wD6kwFUVwQI795WZrjSd1q4a7wR+kj/h7xlRB6ZfVICA6O5DOOm6
|
||||
GNMvqy7ESm8g1XZDpb2u1BXmSS9X8f6rjB0e86kYsF1mB5/2USTM63jgDs0GGTkZ
|
||||
Q1z4Mq4gYyqH32b3gvXkbb68LeQmONUIM3cgmec9q8/pNc1l7fcoLWhOVADRj17Q
|
||||
plisa/EUf/SYqdtk9w7EHGggNenKNwVM235mkPcMqmE72bTpjT6XCxvZY3ByG5yi
|
||||
7L+tHJU45ZuXtt62EvX03azxThVfSmH/WbRk8lH8+CW8XMmiWZphG4ydPWqgVKCB
|
||||
2UOXm+6CQnKA+7Dt1AeK2t5ciATrv9LvwgSxk5WKc3288XFLA6eGMrTdQygYlLjJ
|
||||
+42RSdK/7fCt/qk4q13oUw8ZTVcCia98uZFi704XuuYTH6NrntIB7j/0oucIS4Y9
|
||||
cTWNO5LBerez4v8VI4YHcYESPeIWGFkXhvJzo0VMg1zidBLtiPoGF2JKZGwaK7/p
|
||||
yY1xALskLp4H+5OY4eB1kf8kl4vGsEK8xA/NNzOiapVmwBXpvVvmXIQJE2k+olNf
|
||||
sAuyB8+aO1Ws7tFYt3D+olC7iaprOdK7uA4GCgmYYhq6QQPg+cxfczgHfwARAQAB
|
||||
tD1TZWN1cml0eSBMYWIgYXQgQW1uZXN0eSBJbnRlcm5hdGlvbmFsIDxzZWN1cml0
|
||||
eUBhbW5lc3R5LnRlY2g+iQJRBBMBCAA7FiEEz7+WmNyo6yqA9IreoDWgMPoE7RMF
|
||||
AmlFPwsCGwMFCwkIBwICIgIGFQoJCAsCBBYCAwECHgcCF4AACgkQoDWgMPoE7RNr
|
||||
2w//a88uP90uSN6lgeIwKsHr1ri27QIBbzCV6hLN/gZBFR2uaiOn/xfFDbnR0Cjo
|
||||
5nMCJCT1k4nrPbMTlfmWLCD+YKELBzVqWlw4J2SOg3nznPl2JrL8QBKjwts0sF+h
|
||||
QbRWDsT54wBZnl6ZJJ79eLShNTokBbKnQ7071dMrENr5e2P2sClQXyiIc51ga4FM
|
||||
fHyhsx+GsrdiZNd2AH8912ljW1GuEi3epTO7KMZprmr37mjpZSUToiV59Yhl1Gbo
|
||||
2pixkYJqi62DG02/gTpCjq9NH3cEMxcxjh4E7yCA8ggLG6+IN6woIvPIdOsnQ+Yj
|
||||
d3H4rMNBjPSKoL+bdHILkCnp5HokcbVjNY3QAyOAF4qWhk4GtgpTshwxUmb4Tbay
|
||||
tWLJC2bzjuUBxLkGzMVFfU3B96sVS4Fi0sBaEMBtHskl2f45X8LJhSq//Lw/2L/8
|
||||
34uP/RxDSn+DPvj/yqMpekdCcmeFSTX1A19xkPcc0rVhMRde4VL338R86vzh0gMI
|
||||
1LySDAhXZyVWzrQ5s3n6N3EvCaHCn3qu7ieyFJifCSR7gZqevCEznMQRVpkMTzUt
|
||||
rk13Z6NOOb4IlTW7HFoY3omJG8Z5jV4kMIE7n6nb0qpNYQiG+YvjenQ3VrMoISyh
|
||||
lpS2De8+oOtwrxBVX3+qKWvQqzufeE3416kw2Z+5mxH7bx25Ag0EaUU/CwEQALyZ
|
||||
b+kwLN1yHObTm2yDBEn5HbCT3H1GremvPNmbAaTnfrjUngoKa8MuWWzbX5ptgmZR
|
||||
UpYY/ylOYcgGydz58vUNrPlhIZT9UhmiifPgZLEXyd0uFpr/NsbRajHMkK10iEZf
|
||||
h5bHNobiB7pGCu4Uj9e1cMiIZ4yEaYeyXYUoNHf6ISP39mJhHy6ov5yIpm9q0wzm
|
||||
tGUQPupxGXmEZlOPr3lxqXQ3Ekdv6cWDY5r/oOq71QJ/HUQ13QUuGFIbhnMbT8zd
|
||||
zaS6f/v772YKsWPc4NNUhtlf25VnQ4FuUtjCe3p6iYP4OVD8gJm0GvXyvyTuiQbL
|
||||
CSk/378JiNT7nZzYXxrWchMwvEoMIU55+/UaBc50HI5xvDQ858CX7PYGiimcdsO1
|
||||
EkQzhVxRfjlILfWrC2lgt+H5qhTn4Fah250Xe1PnLjXGHVUQnY/f3MFeiWQgf92b
|
||||
02+MfvOeC5OKttP1z5lcx6RFWCIa1E/u8Nj7YrH9hk0ZBRAnBaeAncDFY8dfX2zX
|
||||
VMoc0dV16gM7RrZ6i7D3CG3eLLkQlX0jbW9dzTuG/3f098EWB1p8vOfS/RbNCBRX
|
||||
jqGiqacL/aFF3Ci3nQ4O5tSv1XipbgrUhvXnwm9pxrLPS/45iaO59WN4RRGWLLQ7
|
||||
LHmeBxoa9avv0SdBYUL+eBxY46GXb/j5VLzHYhSnABEBAAGJAjYEGAEIACAWIQTP
|
||||
v5aY3KjrKoD0it6gNaAw+gTtEwUCaUU/CwIbDAAKCRCgNaAw+gTtEyvsEACnyFFD
|
||||
alOZTrrJTXNnUejuiExLh+qTO3T91p5bte597jpwCZnYGwkxEfffsqqhlY6ftEOf
|
||||
d5tNWE5isai4v8XCbplWomz4KBpepxcn2b+9o5dSyr1vohEFuCJziZDsta1J2DX5
|
||||
IE9U48kTgLDfdIBhuOyHNRkvXRHP2OVLCaiw4d9q+hlrraR8pehHt2BJSxh+QZoe
|
||||
n0iHvIZCBIUA45zLEGmXFpNTGeEf2dKPp3xOkAXOhAMPptE0V1itkF3R7kEW4aFO
|
||||
SZo8L3C1aWSz/gQ4/vvW5t1IJxirNMUgTMQFvqEkAwX3fm6GCxlgRSvTTRXdcrS8
|
||||
6qyFdH1nkCNsavPahN3N2RGGIlWtODEMTO1Hjy0kZtTYdW+JH9sendliCoJES+yN
|
||||
DjM125SgdAgrqlSYm/g8n9knWpxZv1QM6jU/sVz1J+l6/ixugL2i+CAL2d6uv4tT
|
||||
QmXnu7Ei4/2kHBUu3Lf59MNgmLHm6F7AhOWErszSeoJKsp+3yA1oTT/npz67sRzY
|
||||
VVyxz4NBIollna59a1lz0RhlWzNKqNB27jhylyM4ltdzHB7r4VMAVJyttozmIIOC
|
||||
35ucYxl5BHLuapaRSaYHdUId1LOccYyaOOFF/PSyCu9dKzXk7zEz2HNcIboWSkAE
|
||||
8ZDExMYM4WVpVCOj+frdsaBvzItHacRWuijtkw==
|
||||
=JAXX
|
||||
-----END PGP PUBLIC KEY BLOCK-----
|
||||
```
|
||||
|
||||
@@ -1,26 +1,56 @@
|
||||
# Deprecation of ADB command in MVT
|
||||
# Check over ADB
|
||||
|
||||
In order to check an Android device over the [Android Debug Bridge (adb)](https://developer.android.com/studio/command-line/adb) you will first need to install [Android SDK Platform Tools](https://developer.android.com/studio/releases/platform-tools). If you have installed [Android Studio](https://developer.android.com/studio/) you should already have access to `adb` and other utilities.
|
||||
|
||||
While many Linux distributions already package Android Platform Tools (for example `android-platform-tools-base` on Debian), it is preferable to install the most recent version from the official website. Packaged versions might be outdated and incompatible with most recent Android handsets.
|
||||
|
||||
Next you will need to enable debugging on the Android device you are testing. [Please follow the official instructions on how to do so.](https://developer.android.com/studio/command-line/adb)
|
||||
|
||||
## Connecting over USB
|
||||
|
||||
The easiest way to check the device is over a USB transport. You will need to have USB debugging enabled and the device plugged into your computer. If everything is configured appropriately you should see your device when launching the command `adb devices`.
|
||||
|
||||
Now you can try launching MVT with:
|
||||
|
||||
```bash
|
||||
mvt-android check-adb --output /path/to/results
|
||||
```
|
||||
|
||||
!!! warning
|
||||
The `check-adb` command is deprecated and will be removed in a future release.
|
||||
Whenever possible, prefer acquiring device data using the AndroidQF project (https://github.com/mvt-project/androidqf/) and then analyze those acquisitions with MVT.
|
||||
|
||||
The `mvt-android check-adb` command has been deprecated and removed from MVT.
|
||||
Running `mvt-android check-adb` will also emit a runtime deprecation warning advising you to migrate to AndroidQF.
|
||||
|
||||
The ability to analyze Android devices over ADB (`mvt-android check-adb`) has been removed from MVT due to several technical and forensic limitations.
|
||||
If you have previously started an adb daemon MVT will alert you and require you to kill it with `adb kill-server` and relaunch the command.
|
||||
|
||||
## Reasons for Deprecation
|
||||
!!! warning
|
||||
MVT relies on the Python library [adb-shell](https://pypi.org/project/adb-shell/) to connect to an Android device, which relies on libusb for the USB transport. Because of known driver issues, Windows users [are recommended](https://github.com/JeffLIrion/adb_shell/issues/118) to install appropriate drivers using [Zadig](https://zadig.akeo.ie/). Alternatively, an easier option might be to use the TCP transport and connect over Wi-Fi as describe next.
|
||||
|
||||
1. **Inconsistent Data Collection Across Devices**
|
||||
Android devices vary significantly in their system architecture, security policies, and available diagnostic logs. This inconsistency makes it difficult to ensure that MVT can reliably collect necessary forensic data across all devices.
|
||||
## Connecting over Wi-FI
|
||||
|
||||
2. **Incomplete Forensic Data Acquisition**
|
||||
The `check-adb` command did not retrieve a full forensic snapshot of all available data on the device. For example, critical logs such as the **full bugreport** were not systematically collected, leading to potential gaps in forensic analysis. This can be a serious problem in scenarios where the analyst only had one time access to the Android device.
|
||||
When connecting to the device over USB is not possible or not working properly, an alternative option is to connect over the network. In order to do so, first launch an adb daemon at a fixed port number:
|
||||
|
||||
4. **Code Duplication and Difficulty Ensuring Consistent Behavior Across Sources**
|
||||
Similar forensic data such as "dumpsys" logs were being loaded and parsed by MVT's ADB, AndroidQF and Bugreport commands. Multiple modules were needed to handle each source format which created duplication leading to inconsistent
|
||||
behavior and difficulties in maintaining the code base.
|
||||
```bash
|
||||
adb tcpip 5555
|
||||
```
|
||||
|
||||
5. **Alignment with iOS Workflow**
|
||||
MVT’s forensic workflow for iOS relies on pre-extracted artifacts, such as iTunes backups or filesystem dumps, rather than preforming commands or interactions directly on a live device. Removing the ADB functionality ensures a more consistent methodology across both Android and iOS mobile forensic.
|
||||
Then you can specify the IP address of the phone with the adb port number to MVT like so:
|
||||
|
||||
## Alternative: Using AndroidQF for Forensic Data Collection
|
||||
```bash
|
||||
mvt-android check-adb --serial 192.168.1.20:5555 --output /path/to/results
|
||||
```
|
||||
|
||||
To replace the deprecated ADB-based approach, forensic analysts should use [AndroidQF](https://github.com/mvt-project/androidqf) for comprehensive data collection, followed by MVT for forensic analysis. The workflow is outlined in the MVT [Android methodology](./methodology.md)
|
||||
Where `192.168.1.20` is the correct IP address of your device.
|
||||
|
||||
!!! warning
|
||||
The `check-adb` workflow shown above is deprecated. If you can acquire an AndroidQF acquisition from the device (recommended), use the AndroidQF project to create that acquisition: https://github.com/mvt-project/androidqf/
|
||||
|
||||
AndroidQF acquisitions provide a more stable, reproducible analysis surface and are the preferred workflow going forward.
|
||||
|
||||
## MVT modules requiring root privileges
|
||||
|
||||
!!! warning
|
||||
Deprecated: many `mvt-android check-adb` workflows are deprecated and will be removed in a future release. Whenever possible, prefer acquiring an AndroidQF acquisition using the AndroidQF project (https://github.com/mvt-project/androidqf/).
|
||||
|
||||
Of the currently available `mvt-android check-adb` modules a handful require root privileges to function correctly. This is because certain files, such as browser history and SMS messages databases are not accessible with user privileges through adb. These modules are to be considered OPTIONALLY available in case the device was already jailbroken. **Do NOT jailbreak your own device unless you are sure of what you are doing!** Jailbreaking your phone exposes it to considerable security risks!
|
||||
|
||||
@@ -1,53 +1,23 @@
|
||||
# Methodology for Android forensic
|
||||
|
||||
Unfortunately Android devices provide fewer complete forensically useful datasources than their iOS cousins. Unlike iOS, the Android backup feature only provides a limited about of relevant data.
|
||||
|
||||
Android diagnostic logs such as *bugreport files* can be inconsistent in format and structure across different Android versions and device vendors. The limited diagnostic information available makes it difficult to triage potential compromises, and because of this `mvt-android` capabilities are limited as well.
|
||||
Unfortunately Android devices provide much less observability than their iOS cousins. Android stores very little diagnostic information useful to triage potential compromises, and because of this `mvt-android` capabilities are limited as well.
|
||||
|
||||
However, not all is lost.
|
||||
|
||||
## Check Android devices with AndroidQF and MVT
|
||||
## Check installed Apps
|
||||
|
||||
The [AndroidQF](https://github.com/mvt-project/androidqf) tool can be used to collect a wide range of forensic artifacts from an Android device including an Android backup, a bugreport file, and a range of system logs. MVT natively supports analyzing the generated AndroidQF output for signs of device compromise.
|
||||
Because malware attacks over Android typically take the form of malicious or backdoored apps, the very first thing you might want to do is to extract and verify all installed Android packages and triage quickly if there are any which stand out as malicious or which might be atypical.
|
||||
|
||||
### Why Use AndroidQF?
|
||||
While it is out of the scope of this documentation to dwell into details on how to analyze Android apps, MVT does allow to easily and automatically extract information about installed apps, download copies of them, and quickly look them up on services such as [VirusTotal](https://www.virustotal.com).
|
||||
|
||||
- **Complete and raw data extraction**
|
||||
AndroidQF collects full forensic artifacts using an on-device forensic collection agent, ensuring that no crucial data is overlooked. The data collection does not depended on the shell environment or utilities available on the device.
|
||||
|
||||
- **Consistent and standardized output**
|
||||
By collecting a predefined and complete set of forensic files, AndroidQF ensures consistency in data acquisition across different Android devices.
|
||||
|
||||
- **Future-proof analysis**
|
||||
Since the full forensic artifacts are preserved, analysts can extract new evidence or apply updated analysis techniques without requiring access to the original device.
|
||||
|
||||
- **Cross-platform tool without dependencies**
|
||||
AndroidQF is a standalone Go binary which can be used to remotely collect data from an Android device without the device owner needing to install MVT or a Python environment.
|
||||
|
||||
### Workflow for Android Forensic Analysis with AndroidQF
|
||||
|
||||
With AndroidQF the analysis process is split into a separate data collection and data analysis stages.
|
||||
|
||||
1. **Extract Data Using AndroidQF**
|
||||
Deploy the AndroidQF forensic collector to acquire all relevant forensic artifacts from the Android device.
|
||||
|
||||
2. **Analyze Extracted Data with MVT**
|
||||
Use the `mvt-android check-androidqf` command to perform forensic analysis on the extracted artifacts.
|
||||
|
||||
By separating artifact collection from forensic analysis, this approach ensures a more reliable and scalable methodology for Android forensic investigations.
|
||||
|
||||
For more information, refer to the [AndroidQF project documentation](https://github.com/mvt-project/androidqf).
|
||||
!!! info "Using VirusTotal"
|
||||
Please note that in order to use VirusTotal lookups you are required to provide your own API key through the `MVT_VT_API_KEY` environment variable. You should also note that VirusTotal enforces strict API usage. Be mindful that MVT might consume your hourly search quota.
|
||||
|
||||
## Check the device over Android Debug Bridge
|
||||
|
||||
The ability to analyze Android devices over ADB (`mvt-android check-adb`) has been removed from MVT.
|
||||
Some additional diagnostic information can be extracted from the phone using the [Android Debug Bridge (adb)](https://developer.android.com/studio/command-line/adb). `mvt-android` allows to automatically extract information including [dumpsys](https://developer.android.com/studio/command-line/dumpsys) results, details on installed packages (without download), running processes, presence of root binaries and packages, and more.
|
||||
|
||||
See the [Android ADB documentation](./adb.md) for more information.
|
||||
|
||||
## Check an Android Backup (SMS messages)
|
||||
|
||||
Although Android backups are becoming deprecated, it is still possible to generate one. Unfortunately, because apps these days typically favor backup over the cloud, the amount of data available is limited.
|
||||
|
||||
The `mvt-android check-androidqf` command will automatically check an Android backup and SMS messages if an SMS backup is included in the AndroidQF extraction.
|
||||
|
||||
The `mvt-android check-backup` command can also be used directly with an Android backup file.
|
||||
Although Android backups are becoming deprecated, it is still possible to generate one. Unfortunately, because apps these days typically favor backup over the cloud, the amount of data available is limited. Currently, `mvt-android check-backup` only supports checking SMS messages containing links.
|
||||
|
||||
@@ -31,4 +31,21 @@ Test if the image was created successfully:
|
||||
docker run -it mvt
|
||||
```
|
||||
|
||||
If a prompt is spawned successfully, you can close it with `exit`.
|
||||
If a prompt is spawned successfully, you can close it with `exit`.
|
||||
|
||||
|
||||
## Docker usage with Android devices
|
||||
|
||||
If you wish to use MVT to test an Android device you will need to enable the container's access to the host's USB devices. You can do so by enabling the `--privileged` flag and mounting the USB bus device as a volume:
|
||||
|
||||
```bash
|
||||
docker run -it --privileged -v /dev/bus/usb:/dev/bus/usb mvt
|
||||
```
|
||||
|
||||
**Please note:** the `--privileged` parameter is generally regarded as a security risk. If you want to learn more about this check out [this explainer on container escapes](https://blog.trailofbits.com/2019/07/19/understanding-docker-container-escapes/) as it gives access to the whole system.
|
||||
|
||||
Recent versions of Docker provide a `--device` parameter allowing to specify a precise USB device without enabling `--privileged`:
|
||||
|
||||
```bash
|
||||
docker run -it --device=/dev/<your_usb_port> mvt
|
||||
```
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
mkdocs==1.6.1
|
||||
mkdocs-autorefs==1.2.0
|
||||
mkdocs-material==9.5.42
|
||||
mkdocs-autorefs==1.4.3
|
||||
mkdocs-material==9.6.20
|
||||
mkdocs-material-extensions==1.3.1
|
||||
mkdocstrings==0.23.0
|
||||
mkdocstrings==0.30.1
|
||||
@@ -1,13 +1,11 @@
|
||||
[project]
|
||||
name = "mvt"
|
||||
dynamic = ["version"]
|
||||
authors = [
|
||||
{name = "Claudio Guarnieri", email = "nex@nex.sx"}
|
||||
]
|
||||
authors = [{ name = "Claudio Guarnieri", email = "nex@nex.sx" }]
|
||||
maintainers = [
|
||||
{name = "Etienne Maynier", email = "tek@randhome.io"},
|
||||
{name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org"},
|
||||
{name = "Rory Flynn", email = "rory.flynn@amnesty.org"}
|
||||
{ name = "Etienne Maynier", email = "tek@randhome.io" },
|
||||
{ name = "Donncha Ó Cearbhaill", email = "donncha.ocearbhaill@amnesty.org" },
|
||||
{ name = "Rory Flynn", email = "rory.flynn@amnesty.org" },
|
||||
]
|
||||
description = "Mobile Verification Toolkit"
|
||||
readme = "README.md"
|
||||
@@ -16,46 +14,61 @@ classifiers = [
|
||||
"Development Status :: 5 - Production/Stable",
|
||||
"Intended Audience :: Information Technology",
|
||||
"Operating System :: OS Independent",
|
||||
"Programming Language :: Python"
|
||||
"Programming Language :: Python",
|
||||
]
|
||||
dependencies = [
|
||||
"click >=8.1.3",
|
||||
"rich >=12.6.0",
|
||||
"tld >=0.12.6",
|
||||
"requests >=2.28.1",
|
||||
"simplejson >=3.17.6",
|
||||
"packaging >=21.3",
|
||||
"appdirs >=1.4.4",
|
||||
"iOSbackup >=0.9.923",
|
||||
"cryptography >=42.0.5",
|
||||
"pyyaml >=6.0",
|
||||
"pyahocorasick >= 2.0.0",
|
||||
"betterproto >=1.2.0",
|
||||
"pydantic >= 2.10.0",
|
||||
"pydantic-settings >= 2.7.0",
|
||||
'backports.zoneinfo; python_version < "3.9"',
|
||||
"click==8.3.0",
|
||||
"rich==14.1.0",
|
||||
"tld==0.13.1",
|
||||
"requests==2.32.5",
|
||||
"simplejson==3.20.2",
|
||||
"packaging==25.0",
|
||||
"appdirs==1.4.4",
|
||||
"iOSbackup==0.9.925",
|
||||
"adb-shell[usb]==0.4.4",
|
||||
"libusb1==3.3.1",
|
||||
"cryptography==46.0.3",
|
||||
"PyYAML>=6.0.2",
|
||||
"pyahocorasick==2.2.0",
|
||||
"betterproto==1.2.5",
|
||||
"pydantic==2.12.3",
|
||||
"pydantic-settings==2.10.1",
|
||||
"NSKeyedUnArchiver==1.5.2",
|
||||
"python-dateutil==2.9.0.post0",
|
||||
"tzdata==2025.2",
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
requires-python = ">= 3.10"
|
||||
|
||||
[project.urls]
|
||||
homepage = "https://docs.mvt.re/en/latest/"
|
||||
repository = "https://github.com/mvt-project/mvt"
|
||||
|
||||
[project.scripts]
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
mvt-ios = "mvt.ios:cli"
|
||||
mvt-android = "mvt.android:cli"
|
||||
|
||||
[dependency-groups]
|
||||
dev = [
|
||||
"requests>=2.31.0",
|
||||
"pytest>=7.4.3",
|
||||
"pytest-cov>=4.1.0",
|
||||
"pytest-github-actions-annotate-failures>=0.2.0",
|
||||
"pytest-mock>=3.14.0",
|
||||
"stix2>=3.0.1",
|
||||
"ruff>=0.1.6",
|
||||
"mypy>=1.7.1",
|
||||
"betterproto[compiler]",
|
||||
]
|
||||
|
||||
[build-system]
|
||||
requires = ["setuptools>=61.0"]
|
||||
build-backend = "setuptools.build_meta"
|
||||
|
||||
[tool.coverage.run]
|
||||
omit = [
|
||||
"tests/*",
|
||||
]
|
||||
omit = ["tests/*"]
|
||||
|
||||
[tool.coverage.html]
|
||||
directory= "htmlcov"
|
||||
directory = "htmlcov"
|
||||
|
||||
[tool.mypy]
|
||||
install_types = true
|
||||
@@ -65,15 +78,13 @@ packages = "src"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
addopts = "-ra -q --cov=mvt --cov-report html --junitxml=pytest.xml --cov-report=term-missing:skip-covered"
|
||||
testpaths = [
|
||||
"tests"
|
||||
]
|
||||
testpaths = ["tests"]
|
||||
|
||||
[tool.ruff.lint]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
[tool.ruff]
|
||||
select = ["C90", "E", "F", "W"] # flake8 default set
|
||||
ignore = [
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
"E501", # don't enforce line length violations
|
||||
"C901", # complex-structure
|
||||
|
||||
# These were previously ignored but don't seem to be required:
|
||||
# "E265", # no-space-after-block-comment
|
||||
@@ -84,15 +95,15 @@ ignore = [
|
||||
# "E203", # whitespace-before-punctuation
|
||||
]
|
||||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
[tool.ruff.per-file-ignores]
|
||||
"__init__.py" = ["F401"] # unused-import
|
||||
|
||||
[tool.ruff.lint.mccabe]
|
||||
[tool.ruff.mccabe]
|
||||
max-complexity = 10
|
||||
|
||||
[tool.setuptools]
|
||||
include-package-data = true
|
||||
package-dir = {"" = "src"}
|
||||
package-dir = { "" = "src" }
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
@@ -101,4 +112,4 @@ where = ["src"]
|
||||
mvt = ["ios/data/*.json"]
|
||||
|
||||
[tool.setuptools.dynamic]
|
||||
version = {attr = "mvt.common.version.MVT_VERSION"}
|
||||
version = { attr = "mvt.common.version.MVT_VERSION" }
|
||||
|
||||
@@ -14,10 +14,10 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def parse(self, content: str) -> None:
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import hashlib
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
@@ -89,11 +90,16 @@ class DumpsysADBArtifact(AndroidArtifact):
|
||||
else:
|
||||
key_base64, user = user_key, b""
|
||||
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
try:
|
||||
key_raw = base64.b64decode(key_base64)
|
||||
key_fingerprint = hashlib.md5(key_raw).hexdigest().upper()
|
||||
key_fingerprint_colon = ":".join(
|
||||
[key_fingerprint[i : i + 2] for i in range(0, len(key_fingerprint), 2)]
|
||||
)
|
||||
except binascii.Error:
|
||||
# Impossible to parse base64
|
||||
key_fingerprint_colon = ""
|
||||
|
||||
return {
|
||||
"user": user.decode("utf-8"),
|
||||
"fingerprint": key_fingerprint_colon,
|
||||
|
||||
@@ -4,9 +4,9 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
@@ -20,9 +20,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
Parser for dumpsys app ops info
|
||||
"""
|
||||
|
||||
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
for perm in result["permissions"]:
|
||||
for perm in record["permissions"]:
|
||||
if "entries" not in perm:
|
||||
continue
|
||||
|
||||
@@ -33,7 +33,7 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
"timestamp": entry["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
"event": entry["access"],
|
||||
"data": f"{result['package_name']} access to "
|
||||
"data": f"{record['package_name']} access to "
|
||||
f"{perm['name']}: {entry['access']}",
|
||||
}
|
||||
)
|
||||
@@ -43,51 +43,48 @@ class DumpsysAppopsArtifact(AndroidArtifact):
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if self.indicators:
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
)
|
||||
ioc = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
# We use a placeholder entry to create a basic alert even without permission entries.
|
||||
placeholder_entry = {"access": "Unknown", "timestamp": ""}
|
||||
|
||||
detected_permissions = []
|
||||
for perm in result["permissions"]:
|
||||
if (
|
||||
perm["name"] in RISKY_PERMISSIONS
|
||||
# and perm["access"] == "allow"
|
||||
):
|
||||
for entry in sorted(
|
||||
perm["entries"] or [placeholder_entry],
|
||||
key=lambda x: x["timestamp"],
|
||||
):
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = [perm]
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f"Package '{result['package_name']}' had risky permission '{perm['name']}' set to '{entry['access']}' at {entry['timestamp']}",
|
||||
detected_permissions.append(perm)
|
||||
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
|
||||
self.log.warning(
|
||||
"Package '%s' had risky permission '%s' set to '%s' at %s",
|
||||
result["package_name"],
|
||||
perm["name"],
|
||||
entry["access"],
|
||||
entry["timestamp"],
|
||||
cleaned_result,
|
||||
)
|
||||
|
||||
elif result["package_name"] in RISKY_PACKAGES:
|
||||
for entry in sorted(
|
||||
perm["entries"] or [placeholder_entry],
|
||||
key=lambda x: x["timestamp"],
|
||||
):
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = [perm]
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f"Risky package '{result['package_name']}' had '{perm['name']}' permission set to '{entry['access']}' at {entry['timestamp']}",
|
||||
detected_permissions.append(perm)
|
||||
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
|
||||
self.log.warning(
|
||||
"Risky package '%s' had '%s' permission set to '%s' at %s",
|
||||
result["package_name"],
|
||||
perm["name"],
|
||||
entry["access"],
|
||||
entry["timestamp"],
|
||||
cleaned_result,
|
||||
)
|
||||
|
||||
if detected_permissions:
|
||||
# We clean the result to only include the risky permission, otherwise the timeline
|
||||
# will be polluted with all the other irrelevant permissions
|
||||
cleaned_result = result.copy()
|
||||
cleaned_result["permissions"] = detected_permissions
|
||||
self.detected.append(cleaned_result)
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
# self.results: List[Dict[str, Any]] = []
|
||||
self.results: List[Dict[str, Any]] = []
|
||||
perm = {}
|
||||
package = {}
|
||||
entry = {}
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from typing import Union
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleSerializedResult, ModuleAtomicResult
|
||||
|
||||
|
||||
class DumpsysBatteryDailyArtifact(AndroidArtifact):
|
||||
@@ -13,7 +13,7 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
|
||||
Parser for dumpsys dattery daily updates.
|
||||
"""
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["from"],
|
||||
"module": self.__class__.__name__,
|
||||
@@ -27,10 +27,10 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
|
||||
@@ -16,10 +16,10 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def parse(self, data: str) -> None:
|
||||
|
||||
@@ -20,12 +20,10 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
|
||||
for result in self.results:
|
||||
path = result.get("path", "")
|
||||
for part in path.split("/"):
|
||||
ioc_match = self.indicators.check_app_id(part)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
)
|
||||
ioc = self.indicators.check_app_id(part)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
|
||||
@@ -12,12 +12,10 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for activity in self.results:
|
||||
ioc_match = self.indicators.check_app_id(activity["package_name"])
|
||||
if ioc_match:
|
||||
activity["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", activity
|
||||
)
|
||||
ioc = self.indicators.check_app_id(activity["package_name"])
|
||||
if ioc:
|
||||
activity["matched_indicator"] = ioc
|
||||
self.detected.append(activity)
|
||||
continue
|
||||
|
||||
def parse(self, content: str):
|
||||
|
||||
@@ -4,39 +4,35 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any, Dict, List, Union
|
||||
|
||||
from mvt.android.utils import ROOT_PACKAGES
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
|
||||
class DumpsysPackagesArtifact(AndroidArtifact):
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
# XXX: De-duplication Package detections
|
||||
if result["package_name"] in ROOT_PACKAGES:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Found an installed package related to rooting/jailbreaking: "%s"',
|
||||
result["package_name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.log_latest()
|
||||
ioc = self.indicators.check_app_id(result.get("package_name", ""))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
timestamps = [
|
||||
{"event": "package_install", "timestamp": record["timestamp"]},
|
||||
{
|
||||
|
||||
@@ -16,10 +16,10 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(result["package_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def parse(self, data: str) -> None:
|
||||
|
||||
@@ -50,12 +50,10 @@ class DumpsysReceiversArtifact(AndroidArtifact):
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(receiver["package_name"])
|
||||
if ioc_match:
|
||||
receiver["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", {intent: receiver}
|
||||
)
|
||||
ioc = self.indicators.check_app_id(receiver["package_name"])
|
||||
if ioc:
|
||||
receiver["matched_indicator"] = ioc
|
||||
self.detected.append({intent: receiver})
|
||||
continue
|
||||
|
||||
def parse(self, output: str) -> None:
|
||||
|
||||
@@ -2,13 +2,13 @@
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
from typing import Union
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
|
||||
|
||||
class FileTimestampsArtifact(AndroidArtifact):
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
for ts in set(
|
||||
|
||||
@@ -59,16 +59,13 @@ class GetProp(AndroidArtifact):
|
||||
self.log.info("%s: %s", entry["name"], entry["value"])
|
||||
|
||||
if entry["name"] == "ro.build.version.security_patch":
|
||||
warning_message = warn_android_patch_level(entry["value"], self.log)
|
||||
self.alertstore.medium(self.get_slug(), warning_message, "", entry)
|
||||
warn_android_patch_level(entry["value"], self.log)
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_android_property_name(
|
||||
result.get("name", "")
|
||||
)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_android_property_name(result.get("name", ""))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
186
src/mvt/android/artifacts/mounts.py
Normal file
186
src/mvt/android/artifacts/mounts.py
Normal file
@@ -0,0 +1,186 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from typing import Any
|
||||
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
SUSPICIOUS_MOUNT_POINTS = [
|
||||
"/system",
|
||||
"/vendor",
|
||||
"/product",
|
||||
"/system_ext",
|
||||
]
|
||||
|
||||
SUSPICIOUS_OPTIONS = [
|
||||
"rw",
|
||||
"remount",
|
||||
"noatime",
|
||||
"nodiratime",
|
||||
]
|
||||
|
||||
ALLOWLIST_NOATIME = [
|
||||
"/system_dlkm",
|
||||
"/system_ext",
|
||||
"/product",
|
||||
"/vendor",
|
||||
"/vendor_dlkm",
|
||||
]
|
||||
|
||||
|
||||
class Mounts(AndroidArtifact):
|
||||
"""
|
||||
This artifact parses mount information from /proc/mounts or similar mount data.
|
||||
It can detect potentially suspicious mount configurations that may indicate
|
||||
a rooted or compromised device.
|
||||
"""
|
||||
|
||||
def parse(self, entry: str) -> None:
|
||||
"""
|
||||
Parse mount information from the provided entry.
|
||||
|
||||
Examples:
|
||||
/dev/block/bootdevice/by-name/system /system ext4 ro,seclabel,relatime 0 0
|
||||
/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)
|
||||
"""
|
||||
self.results: list[dict[str, Any]] = []
|
||||
|
||||
for line in entry.splitlines():
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
device = None
|
||||
mount_point = None
|
||||
filesystem_type = None
|
||||
mount_options = ""
|
||||
|
||||
if " on " in line and " type " in line:
|
||||
try:
|
||||
# Format: device on mount_point type filesystem_type (options)
|
||||
device_part, rest = line.split(" on ", 1)
|
||||
device = device_part.strip()
|
||||
|
||||
# Split by 'type' to get mount_point and filesystem info
|
||||
mount_part, fs_part = rest.split(" type ", 1)
|
||||
mount_point = mount_part.strip()
|
||||
|
||||
# Parse filesystem and options
|
||||
if "(" in fs_part and fs_part.endswith(")"):
|
||||
# Format: filesystem_type (options)
|
||||
fs_and_opts = fs_part.strip()
|
||||
paren_idx = fs_and_opts.find("(")
|
||||
filesystem_type = fs_and_opts[:paren_idx].strip()
|
||||
mount_options = fs_and_opts[paren_idx + 1 : -1].strip()
|
||||
else:
|
||||
# No options in parentheses, just filesystem type
|
||||
filesystem_type = fs_part.strip()
|
||||
mount_options = ""
|
||||
|
||||
# Skip if we don't have essential info
|
||||
if not device or not mount_point or not filesystem_type:
|
||||
continue
|
||||
|
||||
# Parse options into list
|
||||
options_list = (
|
||||
[opt.strip() for opt in mount_options.split(",") if opt.strip()]
|
||||
if mount_options
|
||||
else []
|
||||
)
|
||||
|
||||
# Check if it's a system partition
|
||||
is_system_partition = mount_point in SUSPICIOUS_MOUNT_POINTS or any(
|
||||
mount_point.startswith(sp) for sp in SUSPICIOUS_MOUNT_POINTS
|
||||
)
|
||||
|
||||
# Check if it's mounted read-write
|
||||
is_read_write = "rw" in options_list
|
||||
|
||||
mount_entry = {
|
||||
"device": device,
|
||||
"mount_point": mount_point,
|
||||
"filesystem_type": filesystem_type,
|
||||
"mount_options": mount_options,
|
||||
"options_list": options_list,
|
||||
"is_system_partition": is_system_partition,
|
||||
"is_read_write": is_read_write,
|
||||
}
|
||||
|
||||
self.results.append(mount_entry)
|
||||
|
||||
except ValueError:
|
||||
# If parsing fails, skip this line
|
||||
continue
|
||||
else:
|
||||
# Skip lines that don't match expected format
|
||||
continue
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""
|
||||
Check for suspicious mount configurations that may indicate root access
|
||||
or other security concerns.
|
||||
"""
|
||||
system_rw_mounts = []
|
||||
suspicious_mounts = []
|
||||
|
||||
for mount in self.results:
|
||||
mount_point = mount["mount_point"]
|
||||
options = mount["options_list"]
|
||||
|
||||
# Check for system partitions mounted as read-write
|
||||
if mount["is_system_partition"] and mount["is_read_write"]:
|
||||
system_rw_mounts.append(mount)
|
||||
if mount_point == "/system":
|
||||
self.log.warning(
|
||||
"Root detected /system partition is mounted as read-write (rw). "
|
||||
)
|
||||
else:
|
||||
self.log.warning(
|
||||
"System partition %s is mounted as read-write (rw). This may indicate system modifications.",
|
||||
mount_point,
|
||||
)
|
||||
|
||||
# Check for other suspicious mount options
|
||||
suspicious_opts = [opt for opt in options if opt in SUSPICIOUS_OPTIONS]
|
||||
if suspicious_opts and mount["is_system_partition"]:
|
||||
if (
|
||||
"noatime" in mount["mount_options"]
|
||||
and mount["mount_point"] in ALLOWLIST_NOATIME
|
||||
):
|
||||
continue
|
||||
suspicious_mounts.append(mount)
|
||||
self.log.warning(
|
||||
"Suspicious mount options found for %s: %s",
|
||||
mount_point,
|
||||
", ".join(suspicious_opts),
|
||||
)
|
||||
|
||||
# Log interesting mount information
|
||||
if mount_point == "/data" or mount_point.startswith("/sdcard"):
|
||||
self.log.info(
|
||||
"Data partition: %s mounted as %s with options: %s",
|
||||
mount_point,
|
||||
mount["filesystem_type"],
|
||||
mount["mount_options"],
|
||||
)
|
||||
|
||||
self.log.info("Parsed %d mount entries", len(self.results))
|
||||
|
||||
# Check indicators if available
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for mount in self.results:
|
||||
# Check if any mount points match indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
|
||||
if ioc:
|
||||
mount["matched_indicator"] = ioc
|
||||
self.detected.append(mount)
|
||||
|
||||
# Check device paths for indicators
|
||||
ioc = self.indicators.check_file_path(mount.get("device", ""))
|
||||
if ioc:
|
||||
mount["matched_indicator"] = ioc
|
||||
self.detected.append(mount)
|
||||
@@ -58,13 +58,13 @@ class Processes(AndroidArtifact):
|
||||
if result["proc_name"] == "gatekeeperd":
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(proc_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(proc_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_process(proc_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_process(proc_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
@@ -51,11 +51,6 @@ ANDROID_DANGEROUS_SETTINGS = [
|
||||
"key": "send_action_app_error",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "enabled installation of non Google Play apps",
|
||||
"key": "install_non_market_apps",
|
||||
"safe_value": "0",
|
||||
},
|
||||
{
|
||||
"description": "enabled accessibility services",
|
||||
"key": "accessibility_enabled",
|
||||
|
||||
@@ -4,13 +4,13 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import datetime
|
||||
from typing import List, Optional
|
||||
from typing import List, Optional, Union
|
||||
|
||||
import pydantic
|
||||
import betterproto
|
||||
from dateutil import parser
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
|
||||
from mvt.android.parsers.proto.tombstone import Tombstone
|
||||
from .artifact import AndroidArtifact
|
||||
|
||||
@@ -53,7 +53,7 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
||||
file_name: str
|
||||
file_timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
build_fingerprint: str
|
||||
revision: int
|
||||
revision: str
|
||||
arch: Optional[str] = None
|
||||
timestamp: str # We store the timestamp as a string to avoid timezone issues
|
||||
process_uptime: Optional[int] = None
|
||||
@@ -63,20 +63,20 @@ class TombstoneCrashResult(pydantic.BaseModel):
|
||||
process_name: Optional[str] = None
|
||||
binary_path: Optional[str] = None
|
||||
selinux_label: Optional[str] = None
|
||||
uid: Optional[int] = None
|
||||
uid: int
|
||||
signal_info: SignalInfo
|
||||
cause: Optional[str] = None
|
||||
extra: Optional[str] = None
|
||||
|
||||
|
||||
class TombstoneCrashArtifact(AndroidArtifact):
|
||||
""" "
|
||||
"""
|
||||
Parser for Android tombstone crash files.
|
||||
|
||||
This parser can parse both text and protobuf tombstone crash files.
|
||||
"""
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["timestamp"],
|
||||
"module": self.__class__.__name__,
|
||||
@@ -92,20 +92,18 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_process(result["process_name"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_process(result["process_name"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if result.get("command_line", []):
|
||||
command_name = result.get("command_line")[0].split("/")[-1]
|
||||
ioc_match = self.indicators.check_process(command_name)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
)
|
||||
ioc = self.indicators.check_process(command_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
SUSPICIOUS_UIDS = [
|
||||
@@ -114,24 +112,20 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
2000, # shell
|
||||
]
|
||||
if result["uid"] in SUSPICIOUS_UIDS:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
(
|
||||
f"Potentially suspicious crash in process '{result['process_name']}' "
|
||||
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
|
||||
),
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
f"Potentially suspicious crash in process '{result['process_name']}' "
|
||||
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
def parse_protobuf(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
|
||||
) -> None:
|
||||
"""
|
||||
Parse Android tombstone crash files from a protobuf object.
|
||||
"""
|
||||
"""Parse Android tombstone crash files from a protobuf object."""
|
||||
tombstone_pb = Tombstone().parse(data)
|
||||
tombstone_dict = tombstone_pb.to_dict(betterproto.Casing.SNAKE)
|
||||
tombstone_dict = tombstone_pb.to_dict(
|
||||
betterproto.Casing.SNAKE, include_default_values=True
|
||||
)
|
||||
|
||||
# Add some extra metadata
|
||||
tombstone_dict["timestamp"] = self._parse_timestamp_string(
|
||||
@@ -148,21 +142,23 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
def parse(
|
||||
self, file_name: str, file_timestamp: datetime.datetime, content: bytes
|
||||
) -> None:
|
||||
"""
|
||||
Parse text Android tombstone crash files.
|
||||
"""
|
||||
|
||||
# Split the tombstone file into a dictonary
|
||||
"""Parse text Android tombstone crash files."""
|
||||
tombstone_dict = {
|
||||
"file_name": file_name,
|
||||
"file_timestamp": convert_datetime_to_iso(file_timestamp),
|
||||
}
|
||||
lines = content.decode("utf-8").splitlines()
|
||||
for line in lines:
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
if not line.strip() or TOMBSTONE_DELIMITER in line:
|
||||
continue
|
||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||
self._parse_tombstone_line(line, key, destination_key, tombstone_dict)
|
||||
try:
|
||||
for key, destination_key in TOMBSTONE_TEXT_KEY_MAPPINGS.items():
|
||||
if self._parse_tombstone_line(
|
||||
line, key, destination_key, tombstone_dict
|
||||
):
|
||||
break
|
||||
except Exception as e:
|
||||
raise ValueError(f"Error parsing line {line_num}: {str(e)}")
|
||||
|
||||
# Validate the tombstone and add it to the results
|
||||
tombstone = TombstoneCrashResult.model_validate(tombstone_dict)
|
||||
@@ -172,7 +168,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
self, line: str, key: str, destination_key: str, tombstone: dict
|
||||
) -> bool:
|
||||
if not line.startswith(f"{key}"):
|
||||
return None
|
||||
return False
|
||||
|
||||
if key == "pid":
|
||||
return self._load_pid_line(line, tombstone)
|
||||
@@ -191,7 +187,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
raise ValueError(f"Expected key {key}, got {line_key}")
|
||||
|
||||
value_clean = value.strip().strip("'")
|
||||
if destination_key in ["uid", "revision"]:
|
||||
if destination_key == "uid":
|
||||
tombstone[destination_key] = int(value_clean)
|
||||
elif destination_key == "process_uptime":
|
||||
# eg. "Process uptime: 40s"
|
||||
@@ -204,51 +200,50 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
return True
|
||||
|
||||
def _load_pid_line(self, line: str, tombstone: dict) -> bool:
|
||||
pid_part, tid_part, name_part = [part.strip() for part in line.split(",")]
|
||||
try:
|
||||
parts = line.split(" >>> ") if " >>> " in line else line.split(">>>")
|
||||
process_info = parts[0]
|
||||
|
||||
pid_key, pid_value = pid_part.split(":", 1)
|
||||
if pid_key != "pid":
|
||||
raise ValueError(f"Expected key pid, got {pid_key}")
|
||||
pid_value = int(pid_value.strip())
|
||||
# Parse pid, tid, name from process info
|
||||
info_parts = [p.strip() for p in process_info.split(",")]
|
||||
for info in info_parts:
|
||||
key, value = info.split(":", 1)
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
|
||||
tid_key, tid_value = tid_part.split(":", 1)
|
||||
if tid_key != "tid":
|
||||
raise ValueError(f"Expected key tid, got {tid_key}")
|
||||
tid_value = int(tid_value.strip())
|
||||
if key == "pid":
|
||||
tombstone["pid"] = int(value)
|
||||
elif key == "tid":
|
||||
tombstone["tid"] = int(value)
|
||||
elif key == "name":
|
||||
tombstone["process_name"] = value
|
||||
|
||||
name_key, name_value = name_part.split(":", 1)
|
||||
if name_key != "name":
|
||||
raise ValueError(f"Expected key name, got {name_key}")
|
||||
name_value = name_value.strip()
|
||||
process_name, binary_path = self._parse_process_name(name_value, tombstone)
|
||||
# Extract binary path if it exists
|
||||
if len(parts) > 1:
|
||||
tombstone["binary_path"] = parts[1].strip().rstrip(" <")
|
||||
|
||||
tombstone["pid"] = pid_value
|
||||
tombstone["tid"] = tid_value
|
||||
tombstone["process_name"] = process_name
|
||||
tombstone["binary_path"] = binary_path
|
||||
return True
|
||||
return True
|
||||
|
||||
def _parse_process_name(self, process_name_part, tombstone: dict) -> bool:
|
||||
process_name, process_path = process_name_part.split(">>>")
|
||||
process_name = process_name.strip()
|
||||
binary_path = process_path.strip().split(" ")[0]
|
||||
return process_name, binary_path
|
||||
except Exception as e:
|
||||
raise ValueError(f"Failed to parse PID line: {str(e)}")
|
||||
|
||||
def _load_signal_line(self, line: str, tombstone: dict) -> bool:
|
||||
signal, code, _ = [part.strip() for part in line.split(",", 2)]
|
||||
signal = signal.split("signal ")[1]
|
||||
signal_code, signal_name = signal.split(" ")
|
||||
signal_name = signal_name.strip("()")
|
||||
signal_part, code_part = map(str.strip, line.split(",")[:2])
|
||||
|
||||
code_part = code.split("code ")[1]
|
||||
code_number, code_name = code_part.split(" ")
|
||||
code_name = code_name.strip("()")
|
||||
def parse_part(part: str, prefix: str) -> tuple[int, str]:
|
||||
match = part.split(prefix)[1]
|
||||
number = int(match.split()[0])
|
||||
name = match.split("(")[1].split(")")[0] if "(" in match else "UNKNOWN"
|
||||
return number, name
|
||||
|
||||
signal_number, signal_name = parse_part(signal_part, "signal ")
|
||||
code_number, code_name = parse_part(code_part, "code ")
|
||||
|
||||
tombstone["signal_info"] = {
|
||||
"code": int(code_number),
|
||||
"code": code_number,
|
||||
"code_name": code_name,
|
||||
"name": signal_name,
|
||||
"number": int(signal_code),
|
||||
"number": signal_number,
|
||||
}
|
||||
return True
|
||||
|
||||
@@ -259,13 +254,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
|
||||
|
||||
@staticmethod
|
||||
def _parse_timestamp_string(timestamp: str) -> str:
|
||||
timestamp_date, timezone = timestamp.split("+")
|
||||
# Truncate microseconds before parsing
|
||||
timestamp_without_micro = timestamp_date.split(".")[0] + "+" + timezone
|
||||
timestamp_parsed = datetime.datetime.strptime(
|
||||
timestamp_without_micro, "%Y-%m-%d %H:%M:%S%z"
|
||||
)
|
||||
|
||||
timestamp_parsed = parser.parse(timestamp)
|
||||
# HACK: Swap the local timestamp to UTC, so keep the original time and avoid timezone conversion.
|
||||
local_timestamp = timestamp_parsed.replace(tzinfo=datetime.timezone.utc)
|
||||
return convert_datetime_to_iso(local_timestamp)
|
||||
|
||||
@@ -9,35 +9,45 @@ import click
|
||||
|
||||
from mvt.common.cmd_check_iocs import CmdCheckIOCS
|
||||
from mvt.common.help import (
|
||||
HELP_MSG_VERSION,
|
||||
HELP_MSG_OUTPUT,
|
||||
HELP_MSG_VERBOSE,
|
||||
HELP_MSG_ANDROID_BACKUP_PASSWORD,
|
||||
HELP_MSG_APK_OUTPUT,
|
||||
HELP_MSG_APKS_FROM_FILE,
|
||||
HELP_MSG_CHECK_ADB,
|
||||
HELP_MSG_CHECK_ANDROID_BACKUP,
|
||||
HELP_MSG_CHECK_ANDROIDQF,
|
||||
HELP_MSG_CHECK_BUGREPORT,
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DOWNLOAD_ALL_APKS,
|
||||
HELP_MSG_DOWNLOAD_APKS,
|
||||
HELP_MSG_FAST,
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_IOC,
|
||||
HELP_MSG_LIST_MODULES,
|
||||
HELP_MSG_MODULE,
|
||||
HELP_MSG_NONINTERACTIVE,
|
||||
HELP_MSG_ANDROID_BACKUP_PASSWORD,
|
||||
HELP_MSG_CHECK_ADB_REMOVED,
|
||||
HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION,
|
||||
HELP_MSG_CHECK_BUGREPORT,
|
||||
HELP_MSG_CHECK_ANDROID_BACKUP,
|
||||
HELP_MSG_CHECK_ANDROIDQF,
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_OUTPUT,
|
||||
HELP_MSG_SERIAL,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_VERBOSE,
|
||||
HELP_MSG_VERSION,
|
||||
HELP_MSG_VIRUS_TOTAL,
|
||||
)
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
from mvt.common.utils import init_logging, set_verbose_logging
|
||||
|
||||
|
||||
from .cmd_check_adb import CmdAndroidCheckADB
|
||||
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
|
||||
from .cmd_check_backup import CmdAndroidCheckBackup
|
||||
from .cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from .cmd_download_apks import DownloadAPKs
|
||||
from .modules.adb import ADB_MODULES
|
||||
from .modules.adb.packages import Packages
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
from .modules.backup.helpers import cli_load_android_backup_password
|
||||
from .modules.bugreport import BUGREPORT_MODULES
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
|
||||
init_logging()
|
||||
log = logging.getLogger("mvt")
|
||||
@@ -45,12 +55,37 @@ log = logging.getLogger("mvt")
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -62,14 +97,124 @@ def version():
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Command: check-adb (removed)
|
||||
# Command: download-apks
|
||||
# ==============================================================================
|
||||
@cli.command(
|
||||
"check-adb", context_settings=CONTEXT_SETTINGS, help=HELP_MSG_CHECK_ADB_REMOVED
|
||||
"download-apks", context_settings=CONTEXT_SETTINGS, help=HELP_MSG_DOWNLOAD_APKS
|
||||
)
|
||||
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
|
||||
@click.option("--all-apks", "-a", is_flag=True, help=HELP_MSG_DOWNLOAD_ALL_APKS)
|
||||
@click.option("--virustotal", "-V", is_flag=True, help=HELP_MSG_VIRUS_TOTAL)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_APK_OUTPUT)
|
||||
@click.option(
|
||||
"--from-file", "-f", type=click.Path(exists=True), help=HELP_MSG_APKS_FROM_FILE
|
||||
)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@click.pass_context
|
||||
def check_adb(ctx):
|
||||
log.error(HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION)
|
||||
def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose):
|
||||
set_verbose_logging(verbose)
|
||||
try:
|
||||
if from_file:
|
||||
download = DownloadAPKs.from_json(from_file)
|
||||
else:
|
||||
# TODO: Do we actually want to be able to run without storing any
|
||||
# file?
|
||||
if not output:
|
||||
log.critical("You need to specify an output folder with --output!")
|
||||
ctx.exit(1)
|
||||
|
||||
download = DownloadAPKs(results_path=output, all_apks=all_apks)
|
||||
if serial:
|
||||
download.serial = serial
|
||||
download.run()
|
||||
|
||||
packages_to_lookup = []
|
||||
if all_apks:
|
||||
packages_to_lookup = download.packages
|
||||
else:
|
||||
for package in download.packages:
|
||||
if not package.get("system", False):
|
||||
packages_to_lookup.append(package)
|
||||
|
||||
if len(packages_to_lookup) == 0:
|
||||
return
|
||||
|
||||
if virustotal:
|
||||
m = Packages()
|
||||
m.check_virustotal(packages_to_lookup)
|
||||
except KeyboardInterrupt:
|
||||
print("")
|
||||
ctx.exit(1)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Command: check-adb
|
||||
# ==============================================================================
|
||||
@cli.command("check-adb", context_settings=CONTEXT_SETTINGS, help=HELP_MSG_CHECK_ADB)
|
||||
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
|
||||
@click.option(
|
||||
"--iocs",
|
||||
"-i",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_IOC,
|
||||
)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
|
||||
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@click.pass_context
|
||||
def check_adb(
|
||||
ctx,
|
||||
serial,
|
||||
iocs,
|
||||
output,
|
||||
fast,
|
||||
list_modules,
|
||||
module,
|
||||
non_interactive,
|
||||
backup_password,
|
||||
verbose,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
module_options = {
|
||||
"fast_mode": fast,
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
}
|
||||
|
||||
cmd = CmdAndroidCheckADB(
|
||||
results_path=output,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
cmd.list_modules()
|
||||
return
|
||||
|
||||
log.warning(
|
||||
"DEPRECATION: The 'check-adb' command is deprecated and may be removed in a future release. "
|
||||
"Prefer acquiring device data using the AndroidQF project (https://github.com/mvt-project/androidqf/) and analyzing that acquisition with MVT."
|
||||
)
|
||||
|
||||
log.info("Checking Android device over debug bridge")
|
||||
|
||||
cmd.run()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the Android device produced %d detections!",
|
||||
cmd.detected_count,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -101,6 +246,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
hashes=True,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -110,8 +257,12 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
log.info("Checking Android bug report at path: %s", bugreport_path)
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the Android bug report produced %d detections!",
|
||||
cmd.detected_count,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -159,6 +310,8 @@ def check_backup(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -168,8 +321,12 @@ def check_backup(
|
||||
log.info("Checking Android backup at path: %s", backup_path)
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the Android backup produced %d detections!",
|
||||
cmd.detected_count,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -219,6 +376,8 @@ def check_androidqf(
|
||||
"interactive": not non_interactive,
|
||||
"backup_password": cli_load_android_backup_password(log, backup_password),
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -228,9 +387,12 @@ def check_androidqf(
|
||||
log.info("Checking AndroidQF acquisition at path: %s", androidqf_path)
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_disable_adb_warning()
|
||||
cmd.show_support_message()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the AndroidQF acquisition produced %d detections!",
|
||||
cmd.detected_count,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -250,16 +412,20 @@ def check_androidqf(
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + ADB_MODULES + BUGREPORT_MODULES
|
||||
|
||||
if list_modules:
|
||||
cmd.list_modules()
|
||||
return
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
48
src/mvt/android/cmd_check_adb.py
Normal file
48
src/mvt/android/cmd_check_adb.py
Normal file
@@ -0,0 +1,48 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from .modules.adb import ADB_MODULES
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CmdAndroidCheckADB(Command):
|
||||
def __init__(
|
||||
self,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
ioc_files: Optional[list] = None,
|
||||
iocs: Optional[Indicators] = None,
|
||||
module_name: Optional[str] = None,
|
||||
serial: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-adb"
|
||||
self.modules = ADB_MODULES
|
||||
@@ -9,12 +9,11 @@ import zipfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
from .modules.androidqf.base import AndroidQFModule
|
||||
|
||||
@@ -45,6 +44,8 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -57,6 +58,8 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-androidqf"
|
||||
@@ -67,9 +70,6 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
self.__files: List[str] = []
|
||||
|
||||
def init(self):
|
||||
if not self.target_path:
|
||||
raise NoAndroidQFTargetPath
|
||||
|
||||
if os.path.isdir(self.target_path):
|
||||
self.__format = "dir"
|
||||
parent_path = Path(self.target_path).absolute().parent.as_posix()
|
||||
@@ -157,11 +157,9 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
cmd.from_zip(bugreport)
|
||||
cmd.run()
|
||||
|
||||
self.detected_count += cmd.detected_count
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
finally:
|
||||
if bugreport:
|
||||
bugreport.close()
|
||||
self.timeline_detected.extend(cmd.timeline_detected)
|
||||
|
||||
def run_backup_cmd(self) -> bool:
|
||||
try:
|
||||
@@ -184,12 +182,9 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
cmd.from_ab(backup)
|
||||
cmd.run()
|
||||
|
||||
self.detected_count += cmd.detected_count
|
||||
self.timeline.extend(cmd.timeline)
|
||||
self.alertstore.extend(cmd.alertstore.alerts)
|
||||
finally:
|
||||
if backup:
|
||||
backup.close()
|
||||
|
||||
self.timeline_detected.extend(cmd.timeline_detected)
|
||||
|
||||
def finish(self) -> None:
|
||||
"""
|
||||
|
||||
@@ -11,7 +11,7 @@ import tarfile
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.android.modules.backup.base import BackupModule
|
||||
from mvt.android.modules.backup.base import BackupExtraction
|
||||
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
|
||||
from mvt.android.parsers.backup import (
|
||||
AndroidBackupParsingError,
|
||||
@@ -39,6 +39,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -51,17 +53,19 @@ class CmdAndroidCheckBackup(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
self.modules = BACKUP_MODULES
|
||||
|
||||
self.__type: str = ""
|
||||
self.__tar: Optional[tarfile.TarFile] = None
|
||||
self.__files: List[str] = []
|
||||
self.backup_type: str = ""
|
||||
self.backup_archive: Optional[tarfile.TarFile] = None
|
||||
self.backup_files: List[str] = []
|
||||
|
||||
def from_ab(self, ab_file_bytes: bytes) -> None:
|
||||
self.__type = "ab"
|
||||
self.backup_type = "ab"
|
||||
header = parse_ab_header(ab_file_bytes)
|
||||
if not header["backup"]:
|
||||
log.critical("Invalid backup format, file should be in .ab format")
|
||||
@@ -84,26 +88,26 @@ class CmdAndroidCheckBackup(Command):
|
||||
sys.exit(1)
|
||||
|
||||
dbytes = io.BytesIO(tardata)
|
||||
self.__tar = tarfile.open(fileobj=dbytes)
|
||||
for member in self.__tar:
|
||||
self.__files.append(member.name)
|
||||
self.backup_archive = tarfile.open(fileobj=dbytes)
|
||||
for member in self.backup_archive:
|
||||
self.backup_files.append(member.name)
|
||||
|
||||
def init(self) -> None:
|
||||
if not self.target_path:
|
||||
return
|
||||
|
||||
if os.path.isfile(self.target_path):
|
||||
self.__type = "ab"
|
||||
self.backup_type = "ab"
|
||||
with open(self.target_path, "rb") as handle:
|
||||
ab_file_bytes = handle.read()
|
||||
self.from_ab(ab_file_bytes)
|
||||
|
||||
elif os.path.isdir(self.target_path):
|
||||
self.__type = "folder"
|
||||
self.backup_type = "folder"
|
||||
self.target_path = Path(self.target_path).absolute().as_posix()
|
||||
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
|
||||
for fname in subfiles:
|
||||
self.__files.append(
|
||||
self.backup_files.append(
|
||||
os.path.relpath(os.path.join(root, fname), self.target_path)
|
||||
)
|
||||
else:
|
||||
@@ -113,12 +117,8 @@ class CmdAndroidCheckBackup(Command):
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
def module_init(self, module: BackupModule) -> None: # type: ignore[override]
|
||||
if self.__type == "folder":
|
||||
module.from_dir(self.target_path, self.__files)
|
||||
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
|
||||
if self.backup_type == "folder":
|
||||
module.from_dir(self.target_path, self.backup_files)
|
||||
else:
|
||||
module.from_ab(self.target_path, self.__tar, self.__files)
|
||||
|
||||
def finish(self) -> None:
|
||||
if self.__tar:
|
||||
self.__tar.close()
|
||||
module.from_ab(self.target_path, self.backup_archive, self.backup_files)
|
||||
|
||||
@@ -30,6 +30,8 @@ class CmdAndroidCheckBugreport(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -42,6 +44,8 @@ class CmdAndroidCheckBugreport(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-bugreport"
|
||||
|
||||
184
src/mvt/android/cmd_download_apks.py
Normal file
184
src/mvt/android/cmd_download_apks.py
Normal file
@@ -0,0 +1,184 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Callable, Optional, Union
|
||||
|
||||
from rich.progress import track
|
||||
|
||||
from mvt.common.module import InsufficientPrivileges
|
||||
|
||||
from .modules.adb.base import AndroidExtraction
|
||||
from .modules.adb.packages import Packages
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DownloadAPKs(AndroidExtraction):
|
||||
"""DownloadAPKs is the main class operating the download of APKs
|
||||
from the device.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
results_path: Optional[str] = None,
|
||||
all_apks: bool = False,
|
||||
packages: Optional[list] = None,
|
||||
) -> None:
|
||||
"""Initialize module.
|
||||
:param results_path: Path to the folder where data should be stored
|
||||
:param all_apks: Boolean indicating whether to download all packages
|
||||
or filter known-goods
|
||||
:param packages: Provided list of packages, typically for JSON checks
|
||||
"""
|
||||
super().__init__(results_path=results_path, log=log)
|
||||
|
||||
self.packages = packages
|
||||
self.all_apks = all_apks
|
||||
self.results_path_apks = None
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str) -> Callable:
|
||||
"""Initialize this class from an existing apks.json file.
|
||||
|
||||
:param json_path: Path to the apks.json file to parse.
|
||||
|
||||
"""
|
||||
with open(json_path, "r", encoding="utf-8") as handle:
|
||||
packages = json.load(handle)
|
||||
return cls(packages=packages)
|
||||
|
||||
def pull_package_file(
|
||||
self, package_name: str, remote_path: str
|
||||
) -> Union[str, None]:
|
||||
"""Pull files related to specific package from the device.
|
||||
|
||||
:param package_name: Name of the package to download
|
||||
:param remote_path: Path to the file to download
|
||||
:returns: Path to the local copy
|
||||
|
||||
"""
|
||||
log.info("Downloading %s ...", remote_path)
|
||||
|
||||
file_name = ""
|
||||
if "==/" in remote_path:
|
||||
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
|
||||
|
||||
local_path = os.path.join(
|
||||
self.results_path_apks, f"{package_name}{file_name}.apk"
|
||||
)
|
||||
name_counter = 0
|
||||
while True:
|
||||
if not os.path.exists(local_path):
|
||||
break
|
||||
|
||||
name_counter += 1
|
||||
local_path = os.path.join(
|
||||
self.results_path_apks, f"{package_name}{file_name}_{name_counter}.apk"
|
||||
)
|
||||
|
||||
try:
|
||||
self._adb_download(remote_path, local_path)
|
||||
except InsufficientPrivileges:
|
||||
log.error(
|
||||
"Unable to pull package file from %s: insufficient privileges, "
|
||||
"it might be a system app",
|
||||
remote_path,
|
||||
)
|
||||
self._adb_reconnect()
|
||||
return None
|
||||
except Exception as exc:
|
||||
log.exception("Failed to pull package file from %s: %s", remote_path, exc)
|
||||
self._adb_reconnect()
|
||||
return None
|
||||
|
||||
return local_path
|
||||
|
||||
def get_packages(self) -> None:
|
||||
"""Use the Packages adb module to retrieve the list of packages.
|
||||
We reuse the same extraction logic to then download the APKs.
|
||||
"""
|
||||
self.log.info("Retrieving list of installed packages...")
|
||||
|
||||
m = Packages()
|
||||
m.log = self.log
|
||||
m.serial = self.serial
|
||||
m.run()
|
||||
|
||||
self.packages = m.results
|
||||
|
||||
def pull_packages(self) -> None:
|
||||
"""Download all files of all selected packages from the device."""
|
||||
log.info(
|
||||
"Starting extraction of installed APKs at folder %s", self.results_path
|
||||
)
|
||||
|
||||
# If the user provided the flag --all-apks we select all packages.
|
||||
packages_selection = []
|
||||
if self.all_apks:
|
||||
log.info("Selected all %d available packages", len(self.packages))
|
||||
packages_selection = self.packages
|
||||
else:
|
||||
# Otherwise we loop through the packages and get only those that
|
||||
# are not marked as system.
|
||||
for package in self.packages:
|
||||
if not package.get("system", False):
|
||||
packages_selection.append(package)
|
||||
|
||||
log.info(
|
||||
'Selected only %d packages which are not marked as "system"',
|
||||
len(packages_selection),
|
||||
)
|
||||
|
||||
if len(packages_selection) == 0:
|
||||
log.info("No packages were selected for download")
|
||||
return
|
||||
|
||||
log.info("Downloading packages from device. This might take some time ...")
|
||||
|
||||
self.results_path_apks = os.path.join(self.results_path, "apks")
|
||||
if not os.path.exists(self.results_path_apks):
|
||||
os.makedirs(self.results_path_apks, exist_ok=True)
|
||||
|
||||
for i in track(
|
||||
range(len(packages_selection)),
|
||||
description=f"Downloading {len(packages_selection)} packages...",
|
||||
):
|
||||
package = packages_selection[i]
|
||||
|
||||
log.info(
|
||||
"[%d/%d] Package: %s",
|
||||
i,
|
||||
len(packages_selection),
|
||||
package["package_name"],
|
||||
)
|
||||
|
||||
# Sometimes the package path contains multiple lines for multiple
|
||||
# apks. We loop through each line and download each file.
|
||||
for package_file in package["files"]:
|
||||
device_path = package_file["path"]
|
||||
local_path = self.pull_package_file(
|
||||
package["package_name"], device_path
|
||||
)
|
||||
if not local_path:
|
||||
continue
|
||||
|
||||
package_file["local_path"] = local_path
|
||||
|
||||
log.info("Download of selected packages completed")
|
||||
|
||||
def save_json(self) -> None:
|
||||
json_path = os.path.join(self.results_path, "apks.json")
|
||||
with open(json_path, "w", encoding="utf-8") as handle:
|
||||
json.dump(self.packages, handle, indent=4)
|
||||
|
||||
def run(self) -> None:
|
||||
self.get_packages()
|
||||
self._adb_connect()
|
||||
self.pull_packages()
|
||||
self.save_json()
|
||||
self._adb_disconnect()
|
||||
355
src/mvt/android/modules/adb/base.py
Normal file
355
src/mvt/android/modules/adb/base.py
Normal file
@@ -0,0 +1,355 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
from typing import Callable, Optional
|
||||
|
||||
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
|
||||
from adb_shell.auth.keygen import keygen, write_public_keyfile
|
||||
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
|
||||
from adb_shell.exceptions import (
|
||||
AdbCommandFailureException,
|
||||
DeviceAuthError,
|
||||
UsbDeviceNotFoundError,
|
||||
UsbReadFailedError,
|
||||
)
|
||||
from usb1 import USBErrorAccess, USBErrorBusy
|
||||
|
||||
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
|
||||
from mvt.android.parsers.backup import (
|
||||
InvalidBackupPassword,
|
||||
parse_ab_header,
|
||||
parse_backup_file,
|
||||
)
|
||||
from mvt.common.module import InsufficientPrivileges, MVTModule
|
||||
|
||||
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
|
||||
ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub")
|
||||
|
||||
|
||||
class AndroidExtraction(MVTModule):
|
||||
"""This class provides a base for all Android extraction modules."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
self.device = None
|
||||
self.serial = None
|
||||
|
||||
@staticmethod
|
||||
def _adb_check_keys() -> None:
|
||||
"""Make sure Android adb keys exist."""
|
||||
if not os.path.isdir(os.path.dirname(ADB_KEY_PATH)):
|
||||
os.makedirs(os.path.dirname(ADB_KEY_PATH))
|
||||
|
||||
if not os.path.exists(ADB_KEY_PATH):
|
||||
keygen(ADB_KEY_PATH)
|
||||
|
||||
if not os.path.exists(ADB_PUB_KEY_PATH):
|
||||
write_public_keyfile(ADB_KEY_PATH, ADB_PUB_KEY_PATH)
|
||||
|
||||
def _adb_connect(self) -> None:
|
||||
"""Connect to the device over adb."""
|
||||
self._adb_check_keys()
|
||||
|
||||
with open(ADB_KEY_PATH, "rb") as handle:
|
||||
priv_key = handle.read()
|
||||
|
||||
with open(ADB_PUB_KEY_PATH, "rb") as handle:
|
||||
pub_key = handle.read()
|
||||
|
||||
signer = PythonRSASigner(pub_key, priv_key)
|
||||
|
||||
# If no serial was specified or if the serial does not seem to be
|
||||
# a HOST:PORT definition, we use the USB transport.
|
||||
if not self.serial or ":" not in self.serial:
|
||||
try:
|
||||
self.device = AdbDeviceUsb(serial=self.serial)
|
||||
except UsbDeviceNotFoundError:
|
||||
self.log.critical(
|
||||
"No device found. Make sure it is connected and unlocked."
|
||||
)
|
||||
sys.exit(-1)
|
||||
# Otherwise we try to use the TCP transport.
|
||||
else:
|
||||
addr = self.serial.split(":")
|
||||
if len(addr) < 2:
|
||||
raise ValueError(
|
||||
"TCP serial number must follow the format: `address:port`"
|
||||
)
|
||||
|
||||
self.device = AdbDeviceTcp(
|
||||
addr[0], int(addr[1]), default_transport_timeout_s=30.0
|
||||
)
|
||||
|
||||
while True:
|
||||
try:
|
||||
self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
|
||||
except (USBErrorBusy, USBErrorAccess):
|
||||
self.log.critical(
|
||||
"Device is busy, maybe run `adb kill-server` and try again."
|
||||
)
|
||||
sys.exit(-1)
|
||||
except DeviceAuthError:
|
||||
self.log.error(
|
||||
"You need to authorize this computer on the Android device. "
|
||||
"Retrying in 5 seconds..."
|
||||
)
|
||||
time.sleep(5)
|
||||
except UsbReadFailedError:
|
||||
self.log.error(
|
||||
"Unable to connect to the device over USB. "
|
||||
"Try to unplug, plug the device and start again."
|
||||
)
|
||||
sys.exit(-1)
|
||||
except OSError as exc:
|
||||
if exc.errno == 113 and self.serial:
|
||||
self.log.critical(
|
||||
"Unable to connect to the device %s: "
|
||||
"did you specify the correct IP address?",
|
||||
self.serial,
|
||||
)
|
||||
sys.exit(-1)
|
||||
else:
|
||||
break
|
||||
|
||||
def _adb_disconnect(self) -> None:
|
||||
"""Close adb connection to the device."""
|
||||
self.device.close()
|
||||
|
||||
def _adb_reconnect(self) -> None:
|
||||
"""Reconnect to device using adb."""
|
||||
self.log.info("Reconnecting ...")
|
||||
self._adb_disconnect()
|
||||
self._adb_connect()
|
||||
|
||||
def _adb_command(self, command: str, decode: bool = True) -> str:
|
||||
"""Execute an adb shell command.
|
||||
|
||||
:param command: Shell command to execute
|
||||
:returns: Output of command
|
||||
|
||||
"""
|
||||
return self.device.shell(command, read_timeout_s=200.0, decode=decode)
|
||||
|
||||
def _adb_check_if_root(self) -> bool:
|
||||
"""Check if we have a `su` binary on the Android device.
|
||||
|
||||
|
||||
:returns: Boolean indicating whether a `su` binary is present or not
|
||||
|
||||
"""
|
||||
result = self._adb_command("command -v su && su -c true")
|
||||
return bool(result) and "Permission denied" not in result
|
||||
|
||||
def _adb_root_or_die(self) -> None:
|
||||
"""Check if we have a `su` binary, otherwise raise an Exception."""
|
||||
if not self._adb_check_if_root():
|
||||
raise InsufficientPrivileges(
|
||||
"This module is optionally available "
|
||||
"in case the device is already rooted."
|
||||
" Do NOT root your own device!"
|
||||
)
|
||||
|
||||
def _adb_command_as_root(self, command):
|
||||
"""Execute an adb shell command.
|
||||
|
||||
:param command: Shell command to execute as root
|
||||
:returns: Output of command
|
||||
|
||||
"""
|
||||
return self._adb_command(f"su -c {command}")
|
||||
|
||||
def _adb_check_file_exists(self, file: str) -> bool:
|
||||
"""Verify that a file exists.
|
||||
|
||||
:param file: Path of the file
|
||||
:returns: Boolean indicating whether the file exists or not
|
||||
|
||||
"""
|
||||
|
||||
# TODO: Need to support checking files without root privileges as well.
|
||||
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
return bool(self._adb_command_as_root(f"[ ! -f {file} ] || echo 1"))
|
||||
|
||||
def _adb_download(
|
||||
self,
|
||||
remote_path: str,
|
||||
local_path: str,
|
||||
progress_callback: Optional[Callable] = None,
|
||||
retry_root: Optional[bool] = True,
|
||||
) -> None:
|
||||
"""Download a file form the device.
|
||||
|
||||
:param remote_path: Path to download from the device
|
||||
:param local_path: Path to where to locally store the copy of the file
|
||||
:param progress_callback: Callback for download progress bar
|
||||
(Default value = None)
|
||||
:param retry_root: Default value = True)
|
||||
|
||||
"""
|
||||
try:
|
||||
self.device.pull(remote_path, local_path, progress_callback)
|
||||
except AdbCommandFailureException as exc:
|
||||
if retry_root:
|
||||
self._adb_download_root(remote_path, local_path, progress_callback)
|
||||
else:
|
||||
raise Exception(
|
||||
f"Unable to download file {remote_path}: {exc}"
|
||||
) from exc
|
||||
|
||||
def _adb_download_root(
|
||||
self,
|
||||
remote_path: str,
|
||||
local_path: str,
|
||||
progress_callback: Optional[Callable] = None,
|
||||
) -> None:
|
||||
try:
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
# We generate a random temporary filename.
|
||||
allowed_chars = (
|
||||
string.ascii_uppercase + string.ascii_lowercase + string.digits
|
||||
)
|
||||
tmp_filename = "tmp_" + "".join(random.choices(allowed_chars, k=10))
|
||||
|
||||
# We create a temporary local file.
|
||||
new_remote_path = f"/sdcard/{tmp_filename}"
|
||||
|
||||
# We copy the file from the data folder to /sdcard/.
|
||||
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if (
|
||||
cp_output.startswith("cp: ")
|
||||
and "No such file or directory" in cp_output
|
||||
):
|
||||
raise Exception(f"Unable to process file {remote_path}: File not found")
|
||||
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
|
||||
raise Exception(
|
||||
f"Unable to process file {remote_path}: Permission denied"
|
||||
)
|
||||
|
||||
# We download from /sdcard/ to the local temporary file.
|
||||
# If it doesn't work now, don't try again (retry_root=False)
|
||||
self._adb_download(
|
||||
new_remote_path, local_path, progress_callback, retry_root=False
|
||||
)
|
||||
|
||||
# Delete the copy on /sdcard/.
|
||||
self._adb_command(f"rm -rf {new_remote_path}")
|
||||
|
||||
except AdbCommandFailureException as exc:
|
||||
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
|
||||
|
||||
def _adb_process_file(self, remote_path: str, process_routine: Callable) -> None:
|
||||
"""Download a local copy of a file which is only accessible as root.
|
||||
This is a wrapper around process_routine.
|
||||
|
||||
:param remote_path: Path of the file on the device to process
|
||||
:param process_routine: Function to be called on the local copy of the
|
||||
downloaded file
|
||||
|
||||
"""
|
||||
# Connect to the device over adb.
|
||||
# Check if we have root, if not raise an Exception.
|
||||
self._adb_root_or_die()
|
||||
|
||||
# We create a temporary local file.
|
||||
tmp = tempfile.NamedTemporaryFile()
|
||||
local_path = tmp.name
|
||||
local_name = os.path.basename(tmp.name)
|
||||
new_remote_path = f"/sdcard/Download/{local_name}"
|
||||
|
||||
# We copy the file from the data folder to /sdcard/.
|
||||
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
|
||||
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: File not found")
|
||||
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
|
||||
raise Exception(f"Unable to process file {remote_path}: Permission denied")
|
||||
|
||||
# We download from /sdcard/ to the local temporary file.
|
||||
self._adb_download(new_remote_path, local_path)
|
||||
|
||||
# Launch the provided process routine!
|
||||
process_routine(local_path)
|
||||
|
||||
# Delete the local copy.
|
||||
tmp.close()
|
||||
# Delete the copy on /sdcard/.
|
||||
self._adb_command(f"rm -f {new_remote_path}")
|
||||
|
||||
def _generate_backup(self, package_name: str) -> bytes:
|
||||
self.log.info(
|
||||
"Please check phone and accept Android backup prompt. "
|
||||
"You may need to set a backup password. \a"
|
||||
)
|
||||
|
||||
if self.module_options.get("backup_password", None):
|
||||
self.log.warning(
|
||||
"Backup password already set from command line or environment "
|
||||
"variable. You should use the same password if enabling encryption!"
|
||||
)
|
||||
|
||||
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over
|
||||
# the shell transport...
|
||||
cmd = f"/system/bin/bu backup -nocompress '{package_name}' | base64"
|
||||
backup_output_b64 = self._adb_command(cmd)
|
||||
backup_output = base64.b64decode(backup_output_b64)
|
||||
header = parse_ab_header(backup_output)
|
||||
|
||||
if not header["backup"]:
|
||||
self.log.error(
|
||||
"Extracting SMS via Android backup failed. No valid backup data found."
|
||||
)
|
||||
return None
|
||||
|
||||
if header["encryption"] == "none":
|
||||
return parse_backup_file(backup_output, password=None)
|
||||
|
||||
for _ in range(0, 3):
|
||||
backup_password = prompt_or_load_android_backup_password(
|
||||
self.log, self.module_options
|
||||
)
|
||||
if not backup_password:
|
||||
# Fail as no backup password loaded for this encrypted backup
|
||||
self.log.critical("No backup password provided.")
|
||||
try:
|
||||
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
|
||||
return decrypted_backup_tar
|
||||
except InvalidBackupPassword:
|
||||
self.log.error("You provided the wrong password! Please try again...")
|
||||
|
||||
self.log.error("All attempts to decrypt backup with password failed!")
|
||||
|
||||
return None
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the main procedure."""
|
||||
raise NotImplementedError
|
||||
@@ -6,14 +6,9 @@
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
)
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -30,7 +25,7 @@ class ChromeHistory(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,7 +37,7 @@ class ChromeHistory(AndroidExtraction):
|
||||
)
|
||||
self.results = []
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
"module": self.__class__.__name__,
|
||||
@@ -56,10 +51,9 @@ class ChromeHistory(AndroidExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_url(result["url"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
if self.indicators.check_url(result["url"]):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def _parse_db(self, db_path: str) -> None:
|
||||
"""Parse a Chrome History database file.
|
||||
|
||||
@@ -8,7 +8,6 @@ import os
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class DumpsysFull(AndroidExtraction):
|
||||
@@ -21,7 +20,7 @@ class DumpsysFull(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ import stat
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_unix_to_iso
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -33,7 +32,7 @@ class Files(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class Getprop(GetPropArtifact, AndroidExtraction):
|
||||
@@ -22,7 +21,7 @@ class Getprop(GetPropArtifact, AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -8,7 +8,6 @@ import os
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class Logcat(AndroidExtraction):
|
||||
@@ -21,7 +20,7 @@ class Logcat(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -4,7 +4,12 @@
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from rich.console import Console
|
||||
from rich.progress import track
|
||||
from rich.table import Table
|
||||
from rich.text import Text
|
||||
|
||||
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
|
||||
from mvt.android.utils import (
|
||||
@@ -14,11 +19,7 @@ from mvt.android.utils import (
|
||||
SECURITY_PACKAGES,
|
||||
SYSTEM_UPDATE_PACKAGES,
|
||||
)
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -33,7 +34,7 @@ class Packages(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -45,7 +46,7 @@ class Packages(AndroidExtraction):
|
||||
)
|
||||
self._user_needed = False
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
timestamps = [
|
||||
@@ -94,71 +95,76 @@ class Packages(AndroidExtraction):
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
for package_file in result.get("files", []):
|
||||
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
ioc = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def check_virustotal(self, packages: list) -> None:
|
||||
hashes = []
|
||||
for package in packages:
|
||||
for file in package.get("files", []):
|
||||
if file["sha256"] not in hashes:
|
||||
hashes.append(file["sha256"])
|
||||
|
||||
total_hashes = len(hashes)
|
||||
detections = {}
|
||||
|
||||
progress_desc = f"Looking up {total_hashes} files..."
|
||||
for i in track(range(total_hashes), description=progress_desc):
|
||||
try:
|
||||
results = virustotal_lookup(hashes[i])
|
||||
except VTNoKey:
|
||||
return
|
||||
except VTQuotaExceeded as exc:
|
||||
print("Unable to continue: %s", exc)
|
||||
break
|
||||
|
||||
if not results:
|
||||
continue
|
||||
|
||||
positives = results["attributes"]["last_analysis_stats"]["malicious"]
|
||||
total = len(results["attributes"]["last_analysis_results"])
|
||||
|
||||
detections[hashes[i]] = f"{positives}/{total}"
|
||||
|
||||
table = Table(title="VirusTotal Packages Detections")
|
||||
table.add_column("Package name")
|
||||
table.add_column("File path")
|
||||
table.add_column("Detections")
|
||||
|
||||
for package in packages:
|
||||
for file in package.get("files", []):
|
||||
if "package_name" in package:
|
||||
row = [package["package_name"], file["path"]]
|
||||
elif "name" in package:
|
||||
row = [package["name"], file["path"]]
|
||||
else:
|
||||
self.log.error(
|
||||
f"Package {package} has no name or package_name. packages.json or apks.json is malformed"
|
||||
)
|
||||
continue
|
||||
if file["sha256"] in detections:
|
||||
detection = detections[file["sha256"]]
|
||||
positives = detection.split("/")[0]
|
||||
if int(positives) > 0:
|
||||
row.append(Text(detection, "red bold"))
|
||||
else:
|
||||
row.append(detection)
|
||||
else:
|
||||
row.append("not found")
|
||||
|
||||
# @staticmethod
|
||||
# def check_virustotal(packages: list) -> None:
|
||||
# hashes = []
|
||||
# for package in packages:
|
||||
# for file in package.get("files", []):
|
||||
# if file["sha256"] not in hashes:
|
||||
# hashes.append(file["sha256"])
|
||||
table.add_row(*row)
|
||||
|
||||
# total_hashes = len(hashes)
|
||||
# detections = {}
|
||||
|
||||
# progress_desc = f"Looking up {total_hashes} files..."
|
||||
# for i in track(range(total_hashes), description=progress_desc):
|
||||
# try:
|
||||
# results = virustotal_lookup(hashes[i])
|
||||
# except VTNoKey:
|
||||
# return
|
||||
# except VTQuotaExceeded as exc:
|
||||
# print("Unable to continue: %s", exc)
|
||||
# break
|
||||
|
||||
# if not results:
|
||||
# continue
|
||||
|
||||
# positives = results["attributes"]["last_analysis_stats"]["malicious"]
|
||||
# total = len(results["attributes"]["last_analysis_results"])
|
||||
|
||||
# detections[hashes[i]] = f"{positives}/{total}"
|
||||
|
||||
# table = Table(title="VirusTotal Packages Detections")
|
||||
# table.add_column("Package name")
|
||||
# table.add_column("File path")
|
||||
# table.add_column("Detections")
|
||||
|
||||
# for package in packages:
|
||||
# for file in package.get("files", []):
|
||||
# row = [package["package_name"], file["path"]]
|
||||
|
||||
# if file["sha256"] in detections:
|
||||
# detection = detections[file["sha256"]]
|
||||
# positives = detection.split("/")[0]
|
||||
# if int(positives) > 0:
|
||||
# row.append(Text(detection, "red bold"))
|
||||
# else:
|
||||
# row.append(detection)
|
||||
# else:
|
||||
# row.append("not found")
|
||||
|
||||
# table.add_row(*row)
|
||||
|
||||
# console = Console()
|
||||
# console.print(table)
|
||||
console = Console()
|
||||
console.print(table)
|
||||
|
||||
@staticmethod
|
||||
def parse_package_for_details(output: str) -> dict:
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class Processes(ProcessesArtifact, AndroidExtraction):
|
||||
@@ -22,7 +21,7 @@ class Processes(ProcessesArtifact, AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class RootBinaries(AndroidExtraction):
|
||||
@@ -20,7 +19,7 @@ class RootBinaries(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class SELinuxStatus(AndroidExtraction):
|
||||
@@ -22,7 +21,7 @@ class SELinuxStatus(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.settings import Settings as SettingsArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -22,7 +21,7 @@ class Settings(SettingsArtifact, AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -6,16 +6,11 @@
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
|
||||
from mvt.common.module import InsufficientPrivileges
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
@@ -56,7 +51,7 @@ class SMS(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -69,7 +64,7 @@ class SMS(AndroidExtraction):
|
||||
|
||||
self.sms_db_type = 0
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
body = record["body"].replace("\n", "\\n")
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
@@ -90,12 +85,9 @@ class SMS(AndroidExtraction):
|
||||
if message_links == []:
|
||||
message_links = check_for_links(message["body"])
|
||||
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
)
|
||||
if self.indicators.check_urls(message_links):
|
||||
self.detected.append(message)
|
||||
continue
|
||||
|
||||
def _parse_db(self, db_path: str) -> None:
|
||||
"""Parse an Android bugle_db SMS database file.
|
||||
|
||||
@@ -7,16 +7,11 @@ import base64
|
||||
import logging
|
||||
import os
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import check_for_links, convert_unix_to_iso
|
||||
|
||||
from .base import AndroidExtraction
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
)
|
||||
|
||||
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
|
||||
|
||||
@@ -31,7 +26,7 @@ class Whatsapp(AndroidExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -42,7 +37,7 @@ class Whatsapp(AndroidExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
text = record["data"].replace("\n", "\\n")
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
|
||||
@@ -3,11 +3,13 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .aqf_files import AQFFiles
|
||||
from .aqf_getprop import AQFGetProp
|
||||
from .aqf_packages import AQFPackages
|
||||
from .aqf_processes import AQFProcesses
|
||||
from .aqf_settings import AQFSettings
|
||||
from .aqf_files import AQFFiles
|
||||
from .mounts import Mounts
|
||||
from .root_binaries import RootBinaries
|
||||
from .sms import SMS
|
||||
|
||||
ANDROIDQF_MODULES = [
|
||||
@@ -17,4 +19,6 @@ ANDROIDQF_MODULES = [
|
||||
AQFSettings,
|
||||
AQFFiles,
|
||||
SMS,
|
||||
RootBinaries,
|
||||
Mounts,
|
||||
]
|
||||
|
||||
@@ -10,15 +10,10 @@ import logging
|
||||
try:
|
||||
import zoneinfo
|
||||
except ImportError:
|
||||
from backports import zoneinfo # type: ignore
|
||||
from typing import Optional
|
||||
from backports import zoneinfo
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.android.modules.androidqf.base import AndroidQFModule
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
|
||||
SUSPICIOUS_PATHS = [
|
||||
@@ -41,7 +36,7 @@ class AQFFiles(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -52,7 +47,7 @@ class AQFFiles(AndroidQFModule):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
records = []
|
||||
|
||||
for ts in set(
|
||||
@@ -87,11 +82,10 @@ class AQFFiles(AndroidQFModule):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_file_path(result["path"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.log_latest()
|
||||
ioc = self.indicators.check_file_path(result["path"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
# NOTE: Update with final path used for Android collector.
|
||||
@@ -104,17 +98,20 @@ class AQFFiles(AndroidQFModule):
|
||||
if self.file_is_executable(result["mode"]):
|
||||
file_type = "executable "
|
||||
|
||||
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
|
||||
self.alertstore.high(self.get_slug(), msg, "", result)
|
||||
self.alertstore.log_latest()
|
||||
self.log.warning(
|
||||
'Found %sfile at suspicious path "%s".',
|
||||
file_type,
|
||||
result["path"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
if result.get("sha256", "") == "":
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_file_hash(result["sha256"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_file_hash(result["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
# TODO: adds SHA1 and MD5 when available in MVT
|
||||
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
@@ -22,7 +21,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from .base import AndroidQFModule
|
||||
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
|
||||
|
||||
@@ -26,7 +25,7 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -17,7 +17,6 @@ from mvt.android.utils import (
|
||||
)
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFPackages(AndroidQFModule):
|
||||
@@ -30,7 +29,7 @@ class AQFPackages(AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -44,98 +43,79 @@ class AQFPackages(AndroidQFModule):
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if result["name"] in ROOT_PACKAGES:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found an installed package related to rooting/jailbreaking: "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Found an installed package related to rooting/jailbreaking: "%s"',
|
||||
result["name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
# Detections for apps installed via unusual methods.
|
||||
# Detections for apps installed via unusual methods
|
||||
if result["installer"] in THIRD_PARTY_STORE_INSTALLERS:
|
||||
self.alertstore.info(
|
||||
self.get_slug(),
|
||||
f'Found a package installed via a third party store (installer="{result["installer"]}"): "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Found a package installed via a third party store (installer="%s"): "%s"',
|
||||
result["installer"],
|
||||
result["name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
elif result["installer"] in BROWSER_INSTALLERS:
|
||||
self.alertstore.medium(
|
||||
self.get_slug(),
|
||||
f'Found a package installed via a browser (installer="{result["installer"]}"): "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Found a package installed via a browser (installer="%s"): "%s"',
|
||||
result["installer"],
|
||||
result["name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
self.detected.append(result)
|
||||
elif result["installer"] == "null" and result["system"] is False:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Found a non-system package installed via adb or another method: "{result["name"]}"',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Found a non-system package installed via adb or another method: "%s"',
|
||||
result["name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
self.detected.append(result)
|
||||
elif result["installer"] in PLAY_STORE_INSTALLERS:
|
||||
pass
|
||||
|
||||
# Check for disabled security or software update packages.
|
||||
# Check for disabled security or software update packages
|
||||
package_disabled = result.get("disabled", None)
|
||||
if result["name"] in SECURITY_PACKAGES and package_disabled:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Security package "{result["name"]}" disabled on the phone',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'Security package "%s" disabled on the phone', result["name"]
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if result["name"] in SYSTEM_UPDATE_PACKAGES and package_disabled:
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'System OTA update package "{result["name"]}" disabled on the phone',
|
||||
"",
|
||||
result,
|
||||
self.log.warning(
|
||||
'System OTA update package "%s" disabled on the phone',
|
||||
result["name"],
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_app_id(result.get("name"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
self.alertstore.log_latest()
|
||||
ioc = self.indicators.check_app_id(result.get("name"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
for package_file in result.get("files", []):
|
||||
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
ioc = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
if "certificate" not in package_file:
|
||||
continue
|
||||
|
||||
# The keys generated by AndroidQF have a leading uppercase character.
|
||||
# The keys generated by AndroidQF have a leading uppercase character
|
||||
for hash_type in ["Md5", "Sha1", "Sha256"]:
|
||||
certificate_hash = package_file["certificate"][hash_type]
|
||||
ioc_match = self.indicators.check_app_certificate_hash(
|
||||
certificate_hash
|
||||
)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", result
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
ioc = self.indicators.check_app_certificate_hash(certificate_hash)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
break
|
||||
|
||||
# Deduplicate the detected packages
|
||||
dedupe_detected_dict = {str(item): item for item in self.detected}
|
||||
self.detected = list(dedupe_detected_dict.values())
|
||||
|
||||
def run(self) -> None:
|
||||
packages = self._get_files_by_pattern("*/packages.json")
|
||||
if not packages:
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFProcesses(ProcessesArtifact, AndroidQFModule):
|
||||
@@ -22,7 +21,7 @@ class AQFProcesses(ProcessesArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.settings import Settings as SettingsArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
@@ -22,7 +21,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,10 +7,9 @@ import fnmatch
|
||||
import logging
|
||||
import os
|
||||
import zipfile
|
||||
from typing import List, Optional
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from mvt.common.module import MVTModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class AndroidQFModule(MVTModule):
|
||||
@@ -23,7 +22,7 @@ class AndroidQFModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
74
src/mvt/android/modules/androidqf/mounts.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import json
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.mounts import Mounts as MountsArtifact
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class Mounts(MountsArtifact, AndroidQFModule):
|
||||
"""This module extracts and analyzes mount information from AndroidQF acquisitions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
self.results = []
|
||||
|
||||
def run(self) -> None:
|
||||
"""
|
||||
Run the mounts analysis module.
|
||||
|
||||
This module looks for mount information files collected by androidqf
|
||||
and analyzes them for suspicious configurations, particularly focusing
|
||||
on detecting root access indicators like /system mounted as read-write.
|
||||
"""
|
||||
mount_files = self._get_files_by_pattern("*/mounts.json")
|
||||
|
||||
if not mount_files:
|
||||
self.log.info("No mount information file found")
|
||||
return
|
||||
|
||||
self.log.info("Found mount information file: %s", mount_files[0])
|
||||
|
||||
try:
|
||||
data = self._get_file_content(mount_files[0]).decode(
|
||||
"utf-8", errors="replace"
|
||||
)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to read mount information file: %s", exc)
|
||||
return
|
||||
|
||||
# Parse the mount data
|
||||
try:
|
||||
json_data = json.loads(data)
|
||||
|
||||
if isinstance(json_data, list):
|
||||
# AndroidQF format: array of strings like
|
||||
# "/dev/block/dm-12 on / type ext4 (ro,seclabel,noatime)"
|
||||
mount_content = "\n".join(json_data)
|
||||
self.parse(mount_content)
|
||||
except Exception as exc:
|
||||
self.log.error("Failed to parse mount information: %s", exc)
|
||||
return
|
||||
|
||||
self.log.info("Extracted a total of %d mount entries", len(self.results))
|
||||
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
121
src/mvt/android/modules/androidqf/root_binaries.py
Normal file
@@ -0,0 +1,121 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import json
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from .base import AndroidQFModule
|
||||
|
||||
|
||||
class RootBinaries(AndroidQFModule):
|
||||
"""This module analyzes root_binaries.json for root binaries found by androidqf."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
target_path: Optional[str] = None,
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
target_path=target_path,
|
||||
results_path=results_path,
|
||||
module_options=module_options,
|
||||
log=log,
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: dict) -> dict:
|
||||
return {
|
||||
"timestamp": record.get("timestamp"),
|
||||
"module": self.__class__.__name__,
|
||||
"event": "root_binary_found",
|
||||
"data": f"Root binary found: {record['path']} (binary: {record['binary_name']})",
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check for indicators of device rooting."""
|
||||
if not self.results:
|
||||
return
|
||||
|
||||
# All found root binaries are considered indicators of rooting
|
||||
for result in self.results:
|
||||
self.log.warning(
|
||||
'Found root binary "%s" at path "%s"',
|
||||
result["binary_name"],
|
||||
result["path"],
|
||||
)
|
||||
self.detected.append(result)
|
||||
|
||||
if self.detected:
|
||||
self.log.warning(
|
||||
"Device shows signs of rooting with %d root binaries found",
|
||||
len(self.detected),
|
||||
)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the root binaries analysis."""
|
||||
root_binaries_files = self._get_files_by_pattern("*/root_binaries.json")
|
||||
|
||||
if not root_binaries_files:
|
||||
self.log.info("No root_binaries.json file found")
|
||||
return
|
||||
|
||||
rawdata = self._get_file_content(root_binaries_files[0]).decode(
|
||||
"utf-8", errors="ignore"
|
||||
)
|
||||
|
||||
try:
|
||||
root_binary_paths = json.loads(rawdata)
|
||||
except json.JSONDecodeError as e:
|
||||
self.log.error("Failed to parse root_binaries.json: %s", e)
|
||||
return
|
||||
|
||||
if not isinstance(root_binary_paths, list):
|
||||
self.log.error("Expected root_binaries.json to contain a list of paths")
|
||||
return
|
||||
|
||||
# Known root binary names that might be found and their descriptions
|
||||
# This maps the binary name to a human-readable description
|
||||
known_root_binaries = {
|
||||
"su": "SuperUser binary",
|
||||
"busybox": "BusyBox utilities",
|
||||
"supersu": "SuperSU root management",
|
||||
"Superuser.apk": "Superuser app",
|
||||
"KingoUser.apk": "KingRoot app",
|
||||
"SuperSu.apk": "SuperSU app",
|
||||
"magisk": "Magisk root framework",
|
||||
"magiskhide": "Magisk hide utility",
|
||||
"magiskinit": "Magisk init binary",
|
||||
"magiskpolicy": "Magisk policy binary",
|
||||
}
|
||||
|
||||
for path in root_binary_paths:
|
||||
if not path or not isinstance(path, str):
|
||||
continue
|
||||
|
||||
# Extract binary name from path
|
||||
binary_name = path.split("/")[-1].lower()
|
||||
|
||||
# Check if this matches a known root binary by exact name match
|
||||
description = "Unknown root binary"
|
||||
for known_binary in known_root_binaries:
|
||||
if binary_name == known_binary.lower():
|
||||
description = known_root_binaries[known_binary]
|
||||
break
|
||||
|
||||
result = {
|
||||
"path": path.strip(),
|
||||
"binary_name": binary_name,
|
||||
"description": description,
|
||||
}
|
||||
|
||||
self.results.append(result)
|
||||
|
||||
self.log.info("Found %d root binaries", len(self.results))
|
||||
@@ -53,12 +53,8 @@ class SMS(AndroidQFModule):
|
||||
if "body" not in message:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_domains(message.get("links", []))
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
)
|
||||
if self.indicators.check_domains(message.get("links", [])):
|
||||
self.detected.append(message)
|
||||
|
||||
def parse_backup(self, data):
|
||||
header = parse_ab_header(data)
|
||||
|
||||
@@ -9,10 +9,10 @@ import os
|
||||
from tarfile import TarFile
|
||||
from typing import List, Optional
|
||||
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class BackupModule(MVTModule):
|
||||
class BackupExtraction(MVTModule):
|
||||
"""This class provides a base for all backup extractios modules"""
|
||||
|
||||
def __init__(
|
||||
@@ -22,7 +22,7 @@ class BackupModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -6,13 +6,12 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.modules.backup.base import BackupModule
|
||||
from mvt.android.modules.backup.base import BackupExtraction
|
||||
from mvt.android.parsers.backup import parse_sms_file
|
||||
from mvt.common.utils import check_for_links
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class SMS(BackupModule):
|
||||
class SMS(BackupExtraction):
|
||||
def __init__(
|
||||
self,
|
||||
file_path: Optional[str] = None,
|
||||
@@ -20,7 +19,7 @@ class SMS(BackupModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -44,12 +43,8 @@ class SMS(BackupModule):
|
||||
if message_links == []:
|
||||
message_links = check_for_links(message.get("text", ""))
|
||||
|
||||
ioc_match = self.indicators.check_urls(message_links)
|
||||
if ioc_match:
|
||||
message["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", message
|
||||
)
|
||||
if self.indicators.check_urls(message_links):
|
||||
self.detected.append(message)
|
||||
continue
|
||||
|
||||
def run(self) -> None:
|
||||
|
||||
@@ -10,7 +10,7 @@ import os
|
||||
from typing import List, Optional
|
||||
from zipfile import ZipFile
|
||||
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class BugReportModule(MVTModule):
|
||||
@@ -23,7 +23,7 @@ class BugReportModule(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.dumpsys_package_activities import (
|
||||
DumpsysPackageActivitiesArtifact,
|
||||
)
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -24,7 +23,7 @@ class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysADBState(DumpsysADBArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -24,7 +23,7 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysGetProp(GetPropArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -6,7 +6,6 @@
|
||||
import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
|
||||
from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRESHOLD
|
||||
|
||||
@@ -23,7 +22,7 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -9,7 +9,6 @@ from typing import Optional
|
||||
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
|
||||
|
||||
from mvt.android.modules.bugreport.base import BugReportModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
|
||||
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
@@ -22,7 +21,7 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
|
||||
from .base import BugReportModule
|
||||
|
||||
@@ -22,7 +21,7 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -35,6 +34,20 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
|
||||
|
||||
self.results = results if results else {}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
if self.indicators:
|
||||
receiver_name = self.results[result][0]["receiver"]
|
||||
|
||||
# return IoC if the stix2 process name a substring of the receiver name
|
||||
ioc = self.indicators.check_receiver_prefix(receiver_name)
|
||||
if ioc:
|
||||
self.results[result][0]["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
|
||||
|
||||
def run(self) -> None:
|
||||
content = self._get_dumpstate_file()
|
||||
if not content:
|
||||
|
||||
@@ -8,7 +8,6 @@ from typing import Optional
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from .base import BugReportModule
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
|
||||
|
||||
|
||||
@@ -24,7 +23,7 @@ class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,7 +7,6 @@ import logging
|
||||
from typing import Optional
|
||||
|
||||
from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from .base import BugReportModule
|
||||
|
||||
|
||||
@@ -23,7 +22,7 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -231,6 +231,7 @@ def parse_sms_file(data):
|
||||
entry.pop("mms_body")
|
||||
|
||||
body = entry.get("body", None)
|
||||
message_links = None
|
||||
if body:
|
||||
message_links = check_for_links(entry["body"])
|
||||
|
||||
|
||||
@@ -6,16 +6,16 @@ from datetime import datetime, timedelta
|
||||
from typing import List
|
||||
|
||||
|
||||
def warn_android_patch_level(patch_level: str, log) -> str:
|
||||
def warn_android_patch_level(patch_level: str, log) -> bool:
|
||||
"""Alert if Android patch level out-of-date"""
|
||||
patch_date = datetime.strptime(patch_level, "%Y-%m-%d")
|
||||
if (datetime.now() - patch_date) > timedelta(days=6 * 31):
|
||||
warning_message = (
|
||||
f"This phone has not received security updates "
|
||||
f"for more than six months (last update: {patch_level}).",
|
||||
log.warning(
|
||||
"This phone has not received security updates "
|
||||
"for more than six months (last update: %s)",
|
||||
patch_level,
|
||||
)
|
||||
return warning_message
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,181 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2025 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import csv
|
||||
import logging
|
||||
from enum import Enum
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
from .log import INFO_ALERT, LOW_ALERT, HIGH_ALERT, CRITICAL_ALERT, MEDIUM_ALERT
|
||||
from .module_types import ModuleAtomicResult
|
||||
|
||||
|
||||
class AlertLevel(Enum):
|
||||
INFORMATIONAL = 0
|
||||
LOW = 10
|
||||
MEDIUM = 20
|
||||
HIGH = 30
|
||||
CRITICAL = 40
|
||||
|
||||
|
||||
@dataclass
|
||||
class Alert:
|
||||
level: AlertLevel
|
||||
module: str
|
||||
message: str
|
||||
event_time: str
|
||||
event: ModuleAtomicResult
|
||||
|
||||
|
||||
class AlertStore:
|
||||
def __init__(self, log: Optional[logging.Logger] = None) -> None:
|
||||
self.__alerts: List[Alert] = []
|
||||
self.__log = log
|
||||
|
||||
@property
|
||||
def alerts(self) -> List[Alert]:
|
||||
return self.__alerts
|
||||
|
||||
def add(self, alert: Alert) -> None:
|
||||
self.__alerts.append(alert)
|
||||
|
||||
def extend(self, alerts: List[Alert]) -> None:
|
||||
self.__alerts.extend(alerts)
|
||||
|
||||
def info(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.INFORMATIONAL,
|
||||
module=module,
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
)
|
||||
)
|
||||
|
||||
def low(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.LOW,
|
||||
module=module,
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
)
|
||||
)
|
||||
|
||||
def medium(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.MEDIUM,
|
||||
module=module,
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
)
|
||||
)
|
||||
|
||||
def high(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.HIGH,
|
||||
module=module,
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
)
|
||||
)
|
||||
|
||||
def critical(
|
||||
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
|
||||
):
|
||||
self.add(
|
||||
Alert(
|
||||
level=AlertLevel.CRITICAL,
|
||||
module=module,
|
||||
message=message,
|
||||
event_time=event_time,
|
||||
event=event,
|
||||
)
|
||||
)
|
||||
|
||||
def log(self, alert: Alert) -> None:
|
||||
if not self.__log:
|
||||
return
|
||||
|
||||
if not alert.message:
|
||||
return
|
||||
|
||||
if alert.level == AlertLevel.INFORMATIONAL:
|
||||
self.__log.log(INFO_ALERT, alert.message)
|
||||
elif alert.level == AlertLevel.LOW:
|
||||
self.__log.log(LOW_ALERT, alert.message)
|
||||
elif alert.level == AlertLevel.MEDIUM:
|
||||
self.__log.log(MEDIUM_ALERT, alert.message)
|
||||
elif alert.level == AlertLevel.HIGH:
|
||||
self.__log.log(HIGH_ALERT, alert.message)
|
||||
elif alert.level == AlertLevel.CRITICAL:
|
||||
self.__log.log(CRITICAL_ALERT, alert.message)
|
||||
|
||||
def log_latest(self) -> None:
|
||||
self.log(self.__alerts[-1])
|
||||
|
||||
def count(self, level: AlertLevel) -> int:
|
||||
count = 0
|
||||
for alert in self.__alerts:
|
||||
if alert.level == level:
|
||||
count += 1
|
||||
|
||||
return count
|
||||
|
||||
def as_json(self) -> List[Dict[str, Any]]:
|
||||
alerts = []
|
||||
for alert in self.__alerts:
|
||||
alert_dict = asdict(alert)
|
||||
# This is required because an Enum is not JSON serializable.
|
||||
alert_dict["level"] = alert.level.name
|
||||
alerts.append(alert_dict)
|
||||
|
||||
return alerts
|
||||
|
||||
def save_timeline(self, timeline_path: str) -> None:
|
||||
with open(timeline_path, "a+", encoding="utf-8") as handle:
|
||||
csvoutput = csv.writer(
|
||||
handle,
|
||||
delimiter=",",
|
||||
quotechar='"',
|
||||
quoting=csv.QUOTE_ALL,
|
||||
escapechar="\\",
|
||||
)
|
||||
csvoutput.writerow(["Event Time", "Module", "Message", "Event"])
|
||||
|
||||
timed_alerts = []
|
||||
for alert in self.alerts:
|
||||
if not alert.event_time:
|
||||
continue
|
||||
|
||||
timed_alerts.append(asdict(alert))
|
||||
|
||||
for event in sorted(
|
||||
timed_alerts,
|
||||
key=lambda x: x["event_time"] if x["event_time"] is not None else "",
|
||||
):
|
||||
csvoutput.writerow(
|
||||
[
|
||||
event.get("event_time"),
|
||||
event.get("module"),
|
||||
event.get("message"),
|
||||
event.get("event"),
|
||||
]
|
||||
)
|
||||
@@ -2,11 +2,27 @@
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
from .module import MVTModule
|
||||
|
||||
|
||||
class Artifact(MVTModule):
|
||||
"""Base class for artifacts.
|
||||
|
||||
XXX: Inheriting from MVTModule to have the same signature as other modules. Not sure if this is a good idea.
|
||||
class Artifact:
|
||||
"""
|
||||
Main artifact class
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.results = []
|
||||
self.detected = []
|
||||
self.indicators = None
|
||||
super().__init__(*args, **kwargs)
|
||||
|
||||
def parse(self, entry: str):
|
||||
"""
|
||||
Parse the artifact, adds the parsed information to self.results
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check the results of this module against a provided list of
|
||||
indicators coming from self.indicators
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -24,6 +24,8 @@ class CmdCheckIOCS(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -35,6 +37,8 @@ class CmdCheckIOCS(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-iocs"
|
||||
@@ -69,6 +73,10 @@ class CmdCheckIOCS(Command):
|
||||
m = iocs_module.from_json(
|
||||
file_path, log=logging.getLogger(iocs_module.__module__)
|
||||
)
|
||||
if not m:
|
||||
log.warning("No result from this module, skipping it")
|
||||
continue
|
||||
|
||||
if self.iocs.total_ioc_count > 0:
|
||||
m.indicators = self.iocs
|
||||
m.indicators.log = m.log
|
||||
@@ -78,7 +86,7 @@ class CmdCheckIOCS(Command):
|
||||
except NotImplementedError:
|
||||
continue
|
||||
else:
|
||||
total_detections += len(m.alertstore.alerts)
|
||||
total_detections += len(m.detected)
|
||||
|
||||
if total_detections > 0:
|
||||
log.warning(
|
||||
|
||||
@@ -9,20 +9,16 @@ import os
|
||||
import sys
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from rich.console import Console
|
||||
from rich.panel import Panel
|
||||
from rich.text import Text
|
||||
|
||||
from .indicators import Indicators
|
||||
from .module import MVTModule, run_module, save_timeline
|
||||
from .utils import (
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule, run_module, save_timeline
|
||||
from mvt.common.utils import (
|
||||
convert_datetime_to_iso,
|
||||
generate_hashes_from_path,
|
||||
get_sha256_from_file_path,
|
||||
)
|
||||
from .config import settings
|
||||
from .alerts import AlertStore, AlertLevel
|
||||
from .version import MVT_VERSION
|
||||
from mvt.common.config import settings
|
||||
from mvt.common.version import MVT_VERSION
|
||||
|
||||
|
||||
class Command:
|
||||
@@ -38,6 +34,8 @@ class Command:
|
||||
hashes: Optional[bool] = False,
|
||||
sub_command: Optional[bool] = False,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
self.name = ""
|
||||
self.modules = []
|
||||
@@ -49,6 +47,8 @@ class Command:
|
||||
self.serial = serial
|
||||
self.log = log
|
||||
self.sub_command = sub_command
|
||||
self.disable_version_check = disable_version_check
|
||||
self.disable_indicator_check = disable_indicator_check
|
||||
|
||||
# This dictionary can contain options that will be passed down from
|
||||
# the Command to all modules. This can for example be used to pass
|
||||
@@ -58,9 +58,11 @@ class Command:
|
||||
# This list will contain all executed modules.
|
||||
# We can use this to reference e.g. self.executed[0].results.
|
||||
self.executed = []
|
||||
self.detected_count = 0
|
||||
self.hashes = hashes
|
||||
self.hash_values = []
|
||||
self.timeline = []
|
||||
self.timeline_detected = []
|
||||
|
||||
# Load IOCs
|
||||
self._create_storage()
|
||||
@@ -72,14 +74,12 @@ class Command:
|
||||
self.iocs = Indicators(self.log)
|
||||
self.iocs.load_indicators_files(self.ioc_files)
|
||||
|
||||
self.alertstore = AlertStore()
|
||||
|
||||
def _create_storage(self) -> None:
|
||||
if self.results_path and not os.path.exists(self.results_path):
|
||||
try:
|
||||
os.makedirs(self.results_path)
|
||||
except Exception as exc:
|
||||
self.log.fatal(
|
||||
self.log.critical(
|
||||
"Unable to create output folder %s: %s", self.results_path, exc
|
||||
)
|
||||
sys.exit(1)
|
||||
@@ -98,14 +98,14 @@ class Command:
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(formatter)
|
||||
|
||||
# MVT can be run in a loop.
|
||||
# Old file handlers stick around in subsequent loops.
|
||||
# Remove any existing logging.FileHandler instances.
|
||||
# MVT can be run in a loop
|
||||
# Old file handlers stick around in subsequent loops
|
||||
# Remove any existing logging.FileHandler instances
|
||||
for handler in logger.handlers:
|
||||
if isinstance(handler, logging.FileHandler):
|
||||
logger.removeHandler(handler)
|
||||
|
||||
# And finally add the new one.
|
||||
# And finally add the new one
|
||||
logger.addHandler(file_handler)
|
||||
|
||||
def _store_timeline(self) -> None:
|
||||
@@ -126,24 +126,12 @@ class Command:
|
||||
is_utc=is_utc,
|
||||
)
|
||||
|
||||
def _store_alerts(self) -> None:
|
||||
if not self.results_path:
|
||||
return
|
||||
|
||||
alerts = self.alertstore.as_json()
|
||||
if not alerts:
|
||||
return
|
||||
|
||||
alerts_path = os.path.join(self.results_path, "alerts.json")
|
||||
with open(alerts_path, "w+", encoding="utf-8") as handle:
|
||||
json.dump(alerts, handle, indent=4)
|
||||
|
||||
def _store_alerts_timeline(self) -> None:
|
||||
if not self.results_path:
|
||||
return
|
||||
|
||||
alerts_timeline_path = os.path.join(self.results_path, "alerts_timeline.csv")
|
||||
self.alertstore.save_timeline(alerts_timeline_path)
|
||||
if len(self.timeline_detected) > 0:
|
||||
save_timeline(
|
||||
self.timeline_detected,
|
||||
os.path.join(self.results_path, "timeline_detected.csv"),
|
||||
is_utc=is_utc,
|
||||
)
|
||||
|
||||
def _store_info(self) -> None:
|
||||
if not self.results_path:
|
||||
@@ -203,54 +191,26 @@ class Command:
|
||||
def finish(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def show_alerts_brief(self) -> None:
|
||||
console = Console()
|
||||
|
||||
message = Text()
|
||||
for i, level in enumerate(AlertLevel):
|
||||
message.append(
|
||||
f"MVT produced {self.alertstore.count(level)} {level.name} alerts."
|
||||
def _show_disable_adb_warning(self) -> None:
|
||||
"""Warn if ADB is enabled"""
|
||||
if type(self).__name__ in ["CmdAndroidCheckADB", "CmdAndroidCheckAndroidQF"]:
|
||||
self.log.info(
|
||||
"Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. "
|
||||
"ADB is a powerful tool which can allow unauthorized access to the device."
|
||||
)
|
||||
if i < len(AlertLevel) - 1:
|
||||
message.append("\n")
|
||||
|
||||
panel = Panel(
|
||||
message, title="ALERTS", style="sandy_brown", border_style="sandy_brown"
|
||||
)
|
||||
console.print("")
|
||||
console.print(panel)
|
||||
|
||||
def show_disable_adb_warning(self) -> None:
|
||||
console = Console()
|
||||
message = Text(
|
||||
"Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. "
|
||||
"ADB is a powerful tool which can allow unauthorized access to the device."
|
||||
)
|
||||
panel = Panel(message, title="NOTE", style="yellow", border_style="yellow")
|
||||
console.print("")
|
||||
console.print(panel)
|
||||
|
||||
def show_support_message(self) -> None:
|
||||
console = Console()
|
||||
message = Text()
|
||||
|
||||
def _show_support_message(self) -> None:
|
||||
support_message = "Please seek reputable expert help if you have serious concerns about a possible spyware attack. Such support is available to human rights defenders and civil society through Amnesty International's Security Lab at https://securitylab.amnesty.org/get-help/?c=mvt"
|
||||
if (
|
||||
self.alertstore.count(AlertLevel.HIGH) > 0
|
||||
or self.alertstore.count(AlertLevel.CRITICAL) > 0
|
||||
):
|
||||
message.append(
|
||||
f"MVT produced HIGH or CRITICAL alerts. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}",
|
||||
if self.detected_count == 0:
|
||||
self.log.info(
|
||||
f"[bold]NOTE:[/bold] Using MVT with public indicators of compromise (IOCs) [bold]WILL NOT[/bold] automatically detect advanced attacks.\n\n{support_message}",
|
||||
extra={"markup": True},
|
||||
)
|
||||
panel = Panel(message, title="WARNING", style="red", border_style="red")
|
||||
else:
|
||||
message.append(
|
||||
f"The lack of severe alerts does not equate to a clean bill of health.\n\n{support_message}",
|
||||
self.log.warning(
|
||||
f"[bold]NOTE: Detected indicators of compromise[/bold]. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}",
|
||||
extra={"markup": True},
|
||||
)
|
||||
panel = Panel(message, title="NOTE", style="yellow", border_style="yellow")
|
||||
|
||||
console.print("")
|
||||
console.print(panel)
|
||||
|
||||
def run(self) -> None:
|
||||
try:
|
||||
@@ -262,11 +222,6 @@ class Command:
|
||||
if self.module_name and module.__name__ != self.module_name:
|
||||
continue
|
||||
|
||||
if not module.enabled and not (
|
||||
self.module_name and module.__name__ == self.module_name
|
||||
):
|
||||
continue
|
||||
|
||||
# FIXME: do we need the logger here
|
||||
module_logger = logging.getLogger(module.__module__)
|
||||
|
||||
@@ -292,8 +247,11 @@ class Command:
|
||||
run_module(m)
|
||||
|
||||
self.executed.append(m)
|
||||
|
||||
self.detected_count += len(m.detected)
|
||||
|
||||
self.timeline.extend(m.timeline)
|
||||
self.alertstore.extend(m.alertstore.alerts)
|
||||
self.timeline_detected.extend(m.timeline_detected)
|
||||
|
||||
try:
|
||||
self.finish()
|
||||
@@ -305,6 +263,7 @@ class Command:
|
||||
return
|
||||
|
||||
self._store_timeline()
|
||||
self._store_alerts_timeline()
|
||||
self._store_alerts()
|
||||
self._store_info()
|
||||
|
||||
self._show_disable_adb_warning()
|
||||
self._show_support_message()
|
||||
|
||||
@@ -15,6 +15,8 @@ HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
|
||||
HELP_MSG_VERBOSE = "Verbose mode"
|
||||
HELP_MSG_CHECK_IOCS = "Compare stored JSON results to provided indicators"
|
||||
HELP_MSG_STIX2 = "Download public STIX2 indicators"
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK = "Disable MVT version update check"
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK = "Disable indicators update check"
|
||||
|
||||
# IOS Specific
|
||||
HELP_MSG_DECRYPT_BACKUP = "Decrypt an encrypted iTunes backup"
|
||||
@@ -33,15 +35,19 @@ HELP_MSG_CHECK_IOS_BACKUP = "Extract artifacts from an iTunes backup"
|
||||
HELP_MSG_CHECK_FS = "Extract artifacts from a full filesystem dump"
|
||||
|
||||
# Android Specific
|
||||
HELP_MSG_SERIAL = "Specify a device serial number or HOST:PORT connection string"
|
||||
HELP_MSG_DOWNLOAD_APKS = "Download all or only non-system installed APKs"
|
||||
HELP_MSG_ANDROID_BACKUP_PASSWORD = "The backup password to use for an Android backup"
|
||||
HELP_MSG_CHECK_ADB_REMOVED = "REMOVED: Check an Android device over ADB"
|
||||
HELP_MSG_CHECK_ADB_REMOVED_DESCRIPTION = (
|
||||
"The 'mvt-android check-adb' command has been removed from MVT. "
|
||||
"Use AndroidQF to collect full forensic artifacts from an Android device. \n\n"
|
||||
"The 'mvt-android check-androidqf' command in MVT can be used to fully analyze "
|
||||
"forensic data collected with AndroidQF. Minimal checks can also be performed "
|
||||
"on an Android bugreport using the 'mvt-android check-bugreport' command."
|
||||
HELP_MSG_DOWNLOAD_ALL_APKS = (
|
||||
"Extract all packages installed on the phone, including system packages"
|
||||
)
|
||||
HELP_MSG_VIRUS_TOTAL = "Check packages on VirusTotal"
|
||||
HELP_MSG_APK_OUTPUT = "Specify a path to a folder where you want to store the APKs"
|
||||
HELP_MSG_APKS_FROM_FILE = (
|
||||
"Instead of acquiring APKs from a phone, load an existing packages.json file for "
|
||||
"lookups (mainly for debug purposes)"
|
||||
)
|
||||
HELP_MSG_CHECK_ADB = "Deprecated: Check an Android device over ADB. Prefer using the external AndroidQF project (https://github.com/mvt-project/androidqf) to acquire AndroidQF images for analysis."
|
||||
HELP_MSG_CHECK_BUGREPORT = "Check an Android Bug Report"
|
||||
HELP_MSG_CHECK_ANDROID_BACKUP = "Check an Android Backup"
|
||||
HELP_MSG_CHECK_ANDROIDQF = "Check data collected with AndroidQF"
|
||||
|
||||
@@ -8,8 +8,7 @@ import json
|
||||
import logging
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from typing import Any, Dict, Iterator, List, Optional
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, Iterator, List, Optional, Union
|
||||
|
||||
import ahocorasick
|
||||
from appdirs import user_data_dir
|
||||
@@ -23,20 +22,6 @@ MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class Indicator:
|
||||
value: str
|
||||
type: str
|
||||
name: str
|
||||
stix2_file_name: str
|
||||
|
||||
|
||||
@dataclass
|
||||
class IndicatorMatch:
|
||||
ioc: Indicator
|
||||
message: str
|
||||
|
||||
|
||||
class Indicators:
|
||||
"""This class is used to parse indicators from a STIX2 file and provide
|
||||
functions to compare extracted artifacts to the indicators.
|
||||
@@ -73,7 +58,7 @@ class Indicators:
|
||||
self.parse_stix2(file)
|
||||
else:
|
||||
self.log.error(
|
||||
"Path specified with env MVT_STIX2 is not a valid path: '%s'", path
|
||||
"Path specified with env MVT_STIX2 is not a valid path: %s", path
|
||||
)
|
||||
|
||||
def _new_collection(
|
||||
@@ -212,13 +197,13 @@ class Indicators:
|
||||
:type file_path: str
|
||||
|
||||
"""
|
||||
self.log.info("Parsing STIX2 indicators file at path '%s'", file_path)
|
||||
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
|
||||
|
||||
with open(file_path, "r", encoding="utf-8") as handle:
|
||||
try:
|
||||
data = json.load(handle)
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.warning(
|
||||
self.log.critical(
|
||||
"Unable to parse STIX2 indicator file. "
|
||||
"The file is corrupted or in the wrong format!"
|
||||
)
|
||||
@@ -329,7 +314,7 @@ class Indicators:
|
||||
if os.path.isfile(file_path):
|
||||
self.parse_stix2(file_path)
|
||||
else:
|
||||
self.log.error("No indicators file exists at path %s", file_path)
|
||||
self.log.warning("No indicators file exists at path %s", file_path)
|
||||
|
||||
# Load downloaded indicators and any indicators from env variable.
|
||||
if load_default:
|
||||
@@ -338,15 +323,15 @@ class Indicators:
|
||||
self._check_stix2_env_variable()
|
||||
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
|
||||
|
||||
def get_iocs(self, ioc_type: str) -> Iterator[Indicator]:
|
||||
def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]:
|
||||
for ioc_collection in self.ioc_collections:
|
||||
for ioc in ioc_collection.get(ioc_type, []):
|
||||
yield Indicator(
|
||||
value=ioc,
|
||||
type=ioc_type,
|
||||
name=ioc_collection["name"],
|
||||
stix2_file_name=ioc_collection["stix2_file_name"],
|
||||
)
|
||||
yield {
|
||||
"value": ioc,
|
||||
"type": ioc_type,
|
||||
"name": ioc_collection["name"],
|
||||
"stix2_file_name": ioc_collection["stix2_file_name"],
|
||||
}
|
||||
|
||||
@lru_cache()
|
||||
def get_ioc_matcher(
|
||||
@@ -377,12 +362,12 @@ class Indicators:
|
||||
raise ValueError("Must provide either ioc_type or ioc_list")
|
||||
|
||||
for ioc in iocs:
|
||||
automaton.add_word(ioc.value, ioc)
|
||||
automaton.add_word(ioc["value"], ioc)
|
||||
automaton.make_automaton()
|
||||
return automaton
|
||||
|
||||
@lru_cache()
|
||||
def check_url(self, url: str) -> Optional[IndicatorMatch]:
|
||||
def check_url(self, url: str) -> Union[dict, None]:
|
||||
"""Check if a given URL matches any of the provided domain indicators.
|
||||
|
||||
:param url: URL to match against domain indicators
|
||||
@@ -390,16 +375,21 @@ class Indicators:
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not url or not isinstance(url, str):
|
||||
if not url:
|
||||
return None
|
||||
if not isinstance(url, str):
|
||||
return None
|
||||
|
||||
# Check the URL first
|
||||
for ioc in self.get_iocs("urls"):
|
||||
if ioc.value == url:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious URL {url} matching indicator "{ioc.value}" from "{ioc.name}"',
|
||||
if ioc["value"] == url:
|
||||
self.log.warning(
|
||||
'Found a known suspicious URL %s matching indicator "%s" from "%s"',
|
||||
url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
# Then check the domain
|
||||
# Create an Aho-Corasick automaton from the list of urls
|
||||
@@ -436,41 +426,71 @@ class Indicators:
|
||||
except Exception:
|
||||
# If URL parsing failed, we just try to do a simple substring
|
||||
# match.
|
||||
for _, ioc in domain_matcher.iter(url):
|
||||
if ioc.value.lower() in url:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Maybe found a known suspicious domain {url} matching indicator "{ioc.value}" from "{ioc.name}"',
|
||||
for idx, ioc in domain_matcher.iter(url):
|
||||
if ioc["value"].lower() in url:
|
||||
self.log.warning(
|
||||
"Maybe found a known suspicious domain %s "
|
||||
'matching indicator "%s" from "%s"',
|
||||
url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
# If nothing matched, we can quit here.
|
||||
return None
|
||||
|
||||
# If all parsing worked, we start walking through available domain
|
||||
# indicators.
|
||||
for _, ioc in domain_matcher.iter(final_url.domain.lower()):
|
||||
for idx, ioc in domain_matcher.iter(final_url.domain.lower()):
|
||||
# First we check the full domain.
|
||||
if final_url.domain.lower() == ioc.value:
|
||||
if final_url.domain.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
message = f'Found a known suspicious domain {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
||||
self.log.warning(
|
||||
"Found a known suspicious domain %s "
|
||||
'shortened as %s matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
orig_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
else:
|
||||
message = f'Found a known suspicious domain {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
||||
|
||||
return IndicatorMatch(ioc=ioc, message=message)
|
||||
self.log.warning(
|
||||
"Found a known suspicious domain %s "
|
||||
'matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
# Then we just check the top level domain.
|
||||
for _, ioc in domain_matcher.iter(final_url.top_level.lower()):
|
||||
if final_url.top_level.lower() == ioc.value:
|
||||
for idx, ioc in domain_matcher.iter(final_url.top_level.lower()):
|
||||
if final_url.top_level.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
message = f'Found a sub-domain with suspicious top level {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
||||
self.log.warning(
|
||||
"Found a sub-domain with suspicious top "
|
||||
"level %s shortened as %s matching "
|
||||
'indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
orig_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
else:
|
||||
message = f'Found a sub-domain with a suspicious top level {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
||||
self.log.warning(
|
||||
"Found a sub-domain with a suspicious top "
|
||||
'level %s matching indicator "%s" from "%s"',
|
||||
final_url.url,
|
||||
ioc["value"],
|
||||
ioc["name"],
|
||||
)
|
||||
|
||||
return IndicatorMatch(ioc=ioc, message=message)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_urls(self, urls: list) -> Optional[IndicatorMatch]:
|
||||
def check_urls(self, urls: list) -> Union[dict, None]:
|
||||
"""Check a list of URLs against the provided list of domain indicators.
|
||||
|
||||
:param urls: List of URLs to check against domain indicators
|
||||
@@ -488,7 +508,7 @@ class Indicators:
|
||||
|
||||
return None
|
||||
|
||||
def check_process(self, process: str) -> Optional[IndicatorMatch]:
|
||||
def check_process(self, process: str) -> Union[dict, None]:
|
||||
"""Check the provided process name against the list of process
|
||||
indicators.
|
||||
|
||||
@@ -502,22 +522,28 @@ class Indicators:
|
||||
|
||||
proc_name = os.path.basename(process)
|
||||
for ioc in self.get_iocs("processes"):
|
||||
if proc_name == ioc.value:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious process name "{process}" matching indicators from "{ioc.name}"',
|
||||
if proc_name == ioc["value"]:
|
||||
self.log.warning(
|
||||
'Found a known suspicious process name "%s" '
|
||||
'matching indicators from "%s"',
|
||||
process,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
if len(proc_name) == 16:
|
||||
if ioc.value.startswith(proc_name):
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a truncated known suspicious process name "{process}" matching indicators from "{ioc.name}"',
|
||||
if ioc["value"].startswith(proc_name):
|
||||
self.log.warning(
|
||||
"Found a truncated known suspicious "
|
||||
'process name "%s" matching indicators from "%s"',
|
||||
process,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_processes(self, processes: list) -> Optional[IndicatorMatch]:
|
||||
def check_processes(self, processes: list) -> Union[dict, None]:
|
||||
"""Check the provided list of processes against the list of
|
||||
process indicators.
|
||||
|
||||
@@ -536,7 +562,7 @@ class Indicators:
|
||||
|
||||
return None
|
||||
|
||||
def check_email(self, email: str) -> Optional[IndicatorMatch]:
|
||||
def check_email(self, email: str) -> Union[dict, None]:
|
||||
"""Check the provided email against the list of email indicators.
|
||||
|
||||
:param email: Email address to check against email indicators
|
||||
@@ -548,15 +574,18 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("emails"):
|
||||
if email.lower() == ioc.value.lower():
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious email address "{email}" matching indicators from "{ioc.name}"',
|
||||
if email.lower() == ioc["value"].lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious email address "%s" '
|
||||
'matching indicators from "%s"',
|
||||
email,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_file_name(self, file_name: str) -> Optional[IndicatorMatch]:
|
||||
def check_file_name(self, file_name: str) -> Union[dict, None]:
|
||||
"""Check the provided file name against the list of file indicators.
|
||||
|
||||
:param file_name: File name to check against file
|
||||
@@ -569,15 +598,18 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("file_names"):
|
||||
if ioc.value == file_name:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious file name "{file_name}" matching indicators from "{ioc.name}"',
|
||||
if ioc["value"] == file_name:
|
||||
self.log.warning(
|
||||
'Found a known suspicious file name "%s" '
|
||||
'matching indicators from "%s"',
|
||||
file_name,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_file_path(self, file_path: str) -> Optional[IndicatorMatch]:
|
||||
def check_file_path(self, file_path: str) -> Union[dict, None]:
|
||||
"""Check the provided file path against the list of file indicators
|
||||
(both path and name).
|
||||
|
||||
@@ -590,22 +622,25 @@ class Indicators:
|
||||
if not file_path:
|
||||
return None
|
||||
|
||||
ioc_match = self.check_file_name(os.path.basename(file_path))
|
||||
if ioc_match:
|
||||
return ioc_match
|
||||
ioc = self.check_file_name(os.path.basename(file_path))
|
||||
if ioc:
|
||||
return ioc
|
||||
|
||||
for ioc in self.get_iocs("file_paths"):
|
||||
# Strip any trailing slash from indicator paths to match
|
||||
# directories.
|
||||
if file_path.startswith(ioc.value.rstrip("/")):
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious file path "{file_path}" matching indicators form "{ioc.name}"',
|
||||
if file_path.startswith(ioc["value"].rstrip("/")):
|
||||
self.log.warning(
|
||||
'Found a known suspicious file path "%s" '
|
||||
'matching indicators form "%s"',
|
||||
file_path,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_file_path_process(self, file_path: str) -> Optional[IndicatorMatch]:
|
||||
def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]:
|
||||
"""Check the provided file path contains a process name from the
|
||||
list of indicators
|
||||
|
||||
@@ -620,15 +655,18 @@ class Indicators:
|
||||
|
||||
for ioc in self.get_iocs("processes"):
|
||||
parts = file_path.split("/")
|
||||
if ioc.value in parts:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found known suspicious process name mentioned in file at path "{file_path}" matching indicators from "{ioc.name}"',
|
||||
if ioc["value"] in parts:
|
||||
self.log.warning(
|
||||
"Found known suspicious process name mentioned in file at "
|
||||
'path "%s" matching indicators from "%s"',
|
||||
file_path,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_profile(self, profile_uuid: str) -> Optional[IndicatorMatch]:
|
||||
def check_profile(self, profile_uuid: str) -> Union[dict, None]:
|
||||
"""Check the provided configuration profile UUID against the list of
|
||||
indicators.
|
||||
|
||||
@@ -642,15 +680,18 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("ios_profile_ids"):
|
||||
if profile_uuid in ioc.value:
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious profile ID "{profile_uuid}" matching indicators from "{ioc.name}"',
|
||||
if profile_uuid in ioc["value"]:
|
||||
self.log.warning(
|
||||
'Found a known suspicious profile ID "%s" '
|
||||
'matching indicators from "%s"',
|
||||
profile_uuid,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_file_hash(self, file_hash: str) -> Optional[IndicatorMatch]:
|
||||
def check_file_hash(self, file_hash: str) -> Union[dict, None]:
|
||||
"""Check the provided file hash against the list of indicators.
|
||||
|
||||
:param file_hash: hash to check
|
||||
@@ -669,15 +710,18 @@ class Indicators:
|
||||
hash_type = "sha256"
|
||||
|
||||
for ioc in self.get_iocs("files_" + hash_type):
|
||||
if file_hash.lower() == ioc.value.lower():
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious file with hash "{file_hash}" matching indicators from "{ioc.name}"',
|
||||
if file_hash.lower() == ioc["value"].lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious file with hash "%s" '
|
||||
'matching indicators from "%s"',
|
||||
file_hash,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_app_certificate_hash(self, cert_hash: str) -> Optional[IndicatorMatch]:
|
||||
def check_app_certificate_hash(self, cert_hash: str) -> Union[dict, None]:
|
||||
"""Check the provided cert hash against the list of indicators.
|
||||
|
||||
:param cert_hash: hash to check
|
||||
@@ -689,15 +733,18 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("app_cert_hashes"):
|
||||
if cert_hash.lower() == ioc.value.lower():
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious app certfificate with hash "{cert_hash}" matching indicators from "{ioc.name}"',
|
||||
if cert_hash.lower() == ioc["value"].lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious app certfificate with hash "%s" '
|
||||
'matching indicators from "%s"',
|
||||
cert_hash,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_app_id(self, app_id: str) -> Optional[IndicatorMatch]:
|
||||
def check_app_id(self, app_id: str) -> Union[dict, None]:
|
||||
"""Check the provided app identifier (typically an Android package name)
|
||||
against the list of indicators.
|
||||
|
||||
@@ -710,17 +757,42 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("app_ids"):
|
||||
if app_id.lower() == ioc.value.lower():
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious app with ID "{app_id}" matching indicators from "{ioc.name}"',
|
||||
if app_id.lower() == ioc["value"].lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious app with ID "%s" '
|
||||
'matching indicators from "%s"',
|
||||
app_id,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_android_property_name(
|
||||
self, property_name: str
|
||||
) -> Optional[IndicatorMatch]:
|
||||
|
||||
def check_receiver_prefix(self, receiver_name: str) -> Union[dict, None]:
|
||||
"""Check the provided receiver name against the list of indicators.
|
||||
An IoC match is detected when a substring of the receiver matches the indicator
|
||||
:param app_id: App ID to check against the list of indicators
|
||||
:type app_id: str
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not receiver_name:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("app_ids"):
|
||||
if ioc["value"].lower() in receiver_name.lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious receiver with name "%s" '
|
||||
'matching indicators from "%s"',
|
||||
receiver_name,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_android_property_name(self, property_name: str) -> Optional[dict]:
|
||||
"""Check the android property name against the list of indicators.
|
||||
|
||||
:param property_name: Name of the Android property
|
||||
@@ -732,21 +804,24 @@ class Indicators:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("android_property_names"):
|
||||
if property_name.lower() == ioc.value.lower():
|
||||
return IndicatorMatch(
|
||||
ioc=ioc,
|
||||
message=f'Found a known suspicious Android property "{property_name}" matching indicators from "{ioc.name}"',
|
||||
if property_name.lower() == ioc["value"].lower():
|
||||
self.log.warning(
|
||||
'Found a known suspicious Android property "%s" '
|
||||
'matching indicators from "%s"',
|
||||
property_name,
|
||||
ioc["name"],
|
||||
)
|
||||
return ioc
|
||||
|
||||
return None
|
||||
|
||||
def check_domain(self, url: str) -> Optional[IndicatorMatch]:
|
||||
def check_domain(self, url: str) -> Union[dict, None]:
|
||||
"""
|
||||
Renamed check_url now, kept for compatibility
|
||||
"""
|
||||
return self.check_url(url)
|
||||
|
||||
def check_domains(self, urls: list) -> Optional[IndicatorMatch]:
|
||||
def check_domains(self, urls: list) -> Union[dict, None]:
|
||||
"""
|
||||
Renamed check_domains, kept for compatibility
|
||||
"""
|
||||
|
||||
@@ -1,65 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2025 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
from rich.console import Console
|
||||
from rich.logging import RichHandler
|
||||
from typing import Optional
|
||||
|
||||
INFO = logging.INFO
|
||||
DEBUG = logging.DEBUG
|
||||
ERROR = logging.ERROR
|
||||
FATAL = logging.CRITICAL
|
||||
WARNING = logging.WARNING
|
||||
|
||||
INFO_ALERT = 25
|
||||
LOW_ALERT = 35
|
||||
MEDIUM_ALERT = 45
|
||||
HIGH_ALERT = 55
|
||||
CRITICAL_ALERT = 65
|
||||
|
||||
logging.addLevelName(INFO_ALERT, "INFO")
|
||||
logging.addLevelName(LOW_ALERT, "LOW")
|
||||
logging.addLevelName(MEDIUM_ALERT, "MEDIUM")
|
||||
logging.addLevelName(HIGH_ALERT, "HIGH")
|
||||
logging.addLevelName(CRITICAL_ALERT, "CRITICAL")
|
||||
|
||||
|
||||
class MVTLogHandler(RichHandler):
|
||||
def __init__(self, console: Optional[Console] = None, level: int = logging.DEBUG):
|
||||
super().__init__(console=console, level=level)
|
||||
|
||||
def __add_prefix_space(self, level: str) -> str:
|
||||
max_length = len("CRITICAL ALERT")
|
||||
space = max_length - len(level)
|
||||
return f"{level}{' ' * space}"
|
||||
|
||||
def emit(self, record: logging.LogRecord):
|
||||
try:
|
||||
msg = rf"[grey50]\[{record.name}][/] {self.format(record)}"
|
||||
|
||||
if record.levelno == ERROR:
|
||||
msg = f"[bold red]{self.__add_prefix_space('ERROR')}[/bold red] {msg}"
|
||||
elif record.levelno == FATAL:
|
||||
msg = f"[bold red]{self.__add_prefix_space('FATAL')}[/bold red] {msg}"
|
||||
elif record.levelno == WARNING:
|
||||
msg = f"[yellow]{self.__add_prefix_space('WARNING')}[/yellow] {msg}"
|
||||
elif record.levelno == INFO_ALERT:
|
||||
msg = f"[blue]{self.__add_prefix_space('INFO ALERT')}[/blue] {msg}"
|
||||
elif record.levelno == LOW_ALERT:
|
||||
msg = f"[yellow]{self.__add_prefix_space('LOW ALERT')}[/yellow] {msg}"
|
||||
elif record.levelno == MEDIUM_ALERT:
|
||||
msg = f"[sandy_brown]{self.__add_prefix_space('MEDIUM ALERT')}[/sandy_brown] {msg}"
|
||||
elif record.levelno == HIGH_ALERT:
|
||||
msg = f"[red]{self.__add_prefix_space('HIGH ALERT')}[/red] {msg}"
|
||||
elif record.levelno == CRITICAL_ALERT:
|
||||
msg = f"[bold red]{self.__add_prefix_space('CRITICAL ALERT')}[/bold red] {msg}"
|
||||
else:
|
||||
msg = f"{self.__add_prefix_space('')} {msg}"
|
||||
|
||||
self.console.print(msg)
|
||||
|
||||
except Exception:
|
||||
self.handleError(record)
|
||||
@@ -12,74 +12,85 @@ from .updates import IndicatorsUpdates, MVTUpdates
|
||||
from .version import MVT_VERSION
|
||||
|
||||
|
||||
def check_updates() -> None:
|
||||
def check_updates(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
log = logging.getLogger("mvt")
|
||||
|
||||
# First we check for MVT version updates.
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT version: %s", e)
|
||||
else:
|
||||
if latest_version:
|
||||
if not disable_version_check:
|
||||
try:
|
||||
mvt_updates = MVTUpdates()
|
||||
latest_version = mvt_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt`[/bold]"
|
||||
"\t\t[bold]Note: Could not check for MVT updates.[/bold] "
|
||||
"You may be working offline. Please update MVT regularly."
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error("Error encountered when trying to check latest MVT indicators: %s", e)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT version: %s", e
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
if latest_version:
|
||||
rich_print(
|
||||
f"\t\t[bold]Version {latest_version} is available! "
|
||||
"Upgrade mvt with `pip3 install -U mvt` or with `pipx upgrade mvt`[/bold]"
|
||||
)
|
||||
|
||||
# Then we check for indicators files updates.
|
||||
if not disable_indicator_check:
|
||||
ioc_updates = IndicatorsUpdates()
|
||||
|
||||
# Before proceeding, we check if we have downloaded an indicators index.
|
||||
# If not, there's no point in proceeding with the updates check.
|
||||
if ioc_updates.get_latest_update() == 0:
|
||||
rich_print(
|
||||
"\t\t[bold]You have not yet downloaded any indicators, check "
|
||||
"the `download-iocs` command![/bold]"
|
||||
)
|
||||
return
|
||||
|
||||
# We only perform this check at a fixed frequency, in order to not
|
||||
# overburden the user with too many lookups if the command is being run
|
||||
# multiple times.
|
||||
should_check, hours = ioc_updates.should_check()
|
||||
if not should_check:
|
||||
rich_print(
|
||||
f"\t\tIndicators updates checked recently, next automatic check "
|
||||
f"in {int(hours)} hours"
|
||||
)
|
||||
return
|
||||
|
||||
try:
|
||||
ioc_to_update = ioc_updates.check()
|
||||
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
|
||||
rich_print(
|
||||
"\t\t[bold]Note: Could not check for indicator updates.[/bold] "
|
||||
"You may be working offline. Please update MVT indicators regularly."
|
||||
)
|
||||
except Exception as e:
|
||||
log.error(
|
||||
"Error encountered when trying to check latest MVT indicators: %s", e
|
||||
)
|
||||
else:
|
||||
if ioc_to_update:
|
||||
rich_print(
|
||||
"\t\t[bold]There are updates to your indicators files! "
|
||||
"Run the `download-iocs` command to update![/bold]"
|
||||
)
|
||||
else:
|
||||
rich_print("\t\tYour indicators files seem to be up to date.")
|
||||
|
||||
|
||||
def logo() -> None:
|
||||
def logo(
|
||||
disable_version_check: bool = False, disable_indicator_check: bool = False
|
||||
) -> None:
|
||||
rich_print("\n")
|
||||
rich_print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
|
||||
rich_print("\t\thttps://mvt.re")
|
||||
rich_print(f"\t\tVersion: {MVT_VERSION}")
|
||||
|
||||
check_updates()
|
||||
check_updates(disable_version_check, disable_indicator_check)
|
||||
|
||||
rich_print("\n")
|
||||
|
||||
@@ -8,18 +8,9 @@ import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from dataclasses import asdict, is_dataclass
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from .utils import CustomJSONEncoder, exec_or_profile
|
||||
from .indicators import Indicators
|
||||
from .alerts import AlertStore
|
||||
from .module_types import (
|
||||
ModuleResults,
|
||||
ModuleTimeline,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
|
||||
|
||||
class DatabaseNotFoundError(Exception):
|
||||
@@ -37,7 +28,7 @@ class InsufficientPrivileges(Exception):
|
||||
class MVTModule:
|
||||
"""This class provides a base for all extraction modules."""
|
||||
|
||||
enabled: bool = True
|
||||
enabled = True
|
||||
slug: Optional[str] = None
|
||||
|
||||
def __init__(
|
||||
@@ -47,7 +38,7 @@ class MVTModule:
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[Dict[str, Any]] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
|
||||
) -> None:
|
||||
"""Initialize module.
|
||||
|
||||
@@ -55,7 +46,7 @@ class MVTModule:
|
||||
:type file_path: str
|
||||
:param target_path: Path to the target folder (backup or filesystem
|
||||
dump)
|
||||
:type target_path: str
|
||||
:type file_path: str
|
||||
:param results_path: Folder where results will be stored
|
||||
:type results_path: str
|
||||
:param fast_mode: Flag to enable or disable slow modules
|
||||
@@ -64,32 +55,32 @@ class MVTModule:
|
||||
:param results: Provided list of results entries
|
||||
:type results: list
|
||||
"""
|
||||
self.file_path: Optional[str] = file_path
|
||||
self.target_path: Optional[str] = target_path
|
||||
self.results_path: Optional[str] = results_path
|
||||
self.module_options: Optional[Dict[str, Any]] = (
|
||||
module_options if module_options else {}
|
||||
)
|
||||
|
||||
self.file_path = file_path
|
||||
self.target_path = target_path
|
||||
self.results_path = results_path
|
||||
self.module_options = module_options if module_options else {}
|
||||
self.log = log
|
||||
self.indicators: Optional[Indicators] = None
|
||||
self.alertstore: AlertStore = AlertStore(log=log)
|
||||
|
||||
self.results: ModuleResults = results if results else []
|
||||
self.timeline: ModuleTimeline = []
|
||||
self.timeline_detected: ModuleTimeline = []
|
||||
self.indicators = None
|
||||
self.results = results if results else []
|
||||
self.detected: List[Dict[str, Any]] = []
|
||||
self.timeline: List[Dict[str, str]] = []
|
||||
self.timeline_detected: List[Dict[str, str]] = []
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, json_path: str, log: logging.Logger):
|
||||
with open(json_path, "r", encoding="utf-8") as handle:
|
||||
results = json.load(handle)
|
||||
if log:
|
||||
log.info('Loaded %d results from "%s"', len(results), json_path)
|
||||
|
||||
return cls(results=results, log=log)
|
||||
try:
|
||||
results = json.load(handle)
|
||||
if log:
|
||||
log.info('Loaded %d results from "%s"', len(results), json_path)
|
||||
return cls(results=results, log=log)
|
||||
except json.decoder.JSONDecodeError as err:
|
||||
log.error('Error to decode the json "%s" file: "%s"', json_path, err)
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_slug(cls) -> str:
|
||||
"""Use the module's class name to retrieve a slug"""
|
||||
if cls.slug:
|
||||
return cls.slug
|
||||
|
||||
@@ -97,26 +88,26 @@ class MVTModule:
|
||||
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check the results of this module against a provided list of
|
||||
indicators.
|
||||
|
||||
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def save_to_json(self) -> None:
|
||||
"""Save the collected results to a json file."""
|
||||
if not self.results_path:
|
||||
return
|
||||
|
||||
name = self.get_slug()
|
||||
|
||||
if self.results:
|
||||
converted_results = [
|
||||
asdict(result) if is_dataclass(result) else result
|
||||
for result in self.results
|
||||
]
|
||||
results_file_name = f"{name}.json"
|
||||
results_json_path = os.path.join(self.results_path, results_file_name)
|
||||
with open(results_json_path, "w", encoding="utf-8") as handle:
|
||||
try:
|
||||
json.dump(
|
||||
converted_results, handle, indent=4, cls=CustomJSONEncoder
|
||||
)
|
||||
json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder)
|
||||
except Exception as exc:
|
||||
self.log.error(
|
||||
"Unable to store results of module %s to file %s: %s",
|
||||
@@ -125,15 +116,13 @@ class MVTModule:
|
||||
exc,
|
||||
)
|
||||
|
||||
if self.alertstore.alerts:
|
||||
if self.detected:
|
||||
detected_file_name = f"{name}_detected.json"
|
||||
detected_json_path = os.path.join(self.results_path, detected_file_name)
|
||||
with open(detected_json_path, "w", encoding="utf-8") as handle:
|
||||
json.dump(
|
||||
self.alertstore.alerts, handle, indent=4, cls=CustomJSONEncoder
|
||||
)
|
||||
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
|
||||
|
||||
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list, None]:
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
@@ -145,38 +134,30 @@ class MVTModule:
|
||||
"""
|
||||
timeline_set = set()
|
||||
for record in timeline:
|
||||
timeline_set.add(
|
||||
json.dumps(
|
||||
asdict(record) if is_dataclass(record) else record, sort_keys=True
|
||||
)
|
||||
)
|
||||
|
||||
timeline_set.add(json.dumps(record, sort_keys=True))
|
||||
return [json.loads(record) for record in timeline_set]
|
||||
|
||||
def to_timeline(self) -> None:
|
||||
"""Convert results into a timeline."""
|
||||
if not self.results:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
record: ModuleSerializedResult = self.serialize(result)
|
||||
record = self.serialize(result)
|
||||
if record:
|
||||
if isinstance(record, list):
|
||||
self.timeline.extend(record)
|
||||
else:
|
||||
self.timeline.append(record)
|
||||
|
||||
# for detected in self.alertstore.alerts:
|
||||
# record = self.serialize(detected)
|
||||
# if record:
|
||||
# if isinstance(record, list):
|
||||
# self.timeline_detected.extend(record)
|
||||
# else:
|
||||
# self.timeline_detected.append(record)
|
||||
for detected in self.detected:
|
||||
record = self.serialize(detected)
|
||||
if record:
|
||||
if isinstance(record, list):
|
||||
self.timeline_detected.extend(record)
|
||||
else:
|
||||
self.timeline_detected.append(record)
|
||||
|
||||
# De-duplicate timeline entries.
|
||||
self.timeline = self._deduplicate_timeline(self.timeline)
|
||||
# self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
|
||||
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
|
||||
|
||||
def run(self) -> None:
|
||||
"""Run the main module procedure."""
|
||||
@@ -231,7 +212,7 @@ def run_module(module: MVTModule) -> None:
|
||||
)
|
||||
|
||||
else:
|
||||
if module.indicators and not module.alertstore.alerts:
|
||||
if module.indicators and not module.detected:
|
||||
module.log.info(
|
||||
"The %s module produced no detections!", module.__class__.__name__
|
||||
)
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2025 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
from .indicators import Indicator
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Union, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModuleAtomicResult:
|
||||
timestamp: Optional[str]
|
||||
matched_indicator: Optional[Indicator]
|
||||
|
||||
|
||||
ModuleResults = List[ModuleAtomicResult]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ModuleAtomicTimeline:
|
||||
timestamp: str
|
||||
module: str
|
||||
event: str
|
||||
data: str
|
||||
|
||||
|
||||
ModuleTimeline = List[ModuleAtomicTimeline]
|
||||
ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline]
|
||||
@@ -24,7 +24,11 @@ INDICATORS_CHECK_FREQUENCY = 12
|
||||
|
||||
class MVTUpdates:
|
||||
def check(self) -> str:
|
||||
res = requests.get(settings.PYPI_UPDATE_URL, timeout=15)
|
||||
try:
|
||||
res = requests.get(settings.PYPI_UPDATE_URL, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to check for updates, skipping updates: %s", e)
|
||||
return ""
|
||||
data = res.json()
|
||||
latest_version = data.get("info", {}).get("version", "")
|
||||
|
||||
@@ -93,7 +97,12 @@ class IndicatorsUpdates:
|
||||
url = self.github_raw_url.format(
|
||||
self.index_owner, self.index_repo, self.index_branch, self.index_path
|
||||
)
|
||||
res = requests.get(url, timeout=15)
|
||||
try:
|
||||
res = requests.get(url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to retrieve indicators index from %s: %s", url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to retrieve indicators index located at %s (error %d)",
|
||||
@@ -105,7 +114,12 @@ class IndicatorsUpdates:
|
||||
return yaml.safe_load(res.content)
|
||||
|
||||
def download_remote_ioc(self, ioc_url: str) -> Optional[str]:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(ioc_url, timeout=15)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to download indicators file from %s: %s", ioc_url, e)
|
||||
return None
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to download indicators file from %s (error %d)",
|
||||
@@ -171,7 +185,12 @@ class IndicatorsUpdates:
|
||||
file_commit_url = (
|
||||
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
|
||||
)
|
||||
res = requests.get(file_commit_url, timeout=15)
|
||||
try:
|
||||
res = requests.get(file_commit_url, timeout=5)
|
||||
except requests.exceptions.RequestException as e:
|
||||
log.error("Failed to get details about file %s: %s", file_commit_url, e)
|
||||
return -1
|
||||
|
||||
if res.status_code != 200:
|
||||
log.error(
|
||||
"Failed to get details about file %s (error %d)",
|
||||
|
||||
@@ -12,7 +12,7 @@ import os
|
||||
import re
|
||||
from typing import Any, Iterator, Union
|
||||
|
||||
from .log import MVTLogHandler
|
||||
from rich.logging import RichHandler
|
||||
from mvt.common.config import settings
|
||||
|
||||
|
||||
@@ -234,10 +234,11 @@ def init_logging(verbose: bool = False):
|
||||
"""
|
||||
Initialise logging for the MVT module
|
||||
"""
|
||||
# Setup logging using Rich.
|
||||
log = logging.getLogger("mvt")
|
||||
log.setLevel(logging.INFO)
|
||||
consoleHandler = MVTLogHandler()
|
||||
consoleHandler.setFormatter(logging.Formatter("%(message)s"))
|
||||
log.setLevel(logging.DEBUG)
|
||||
consoleHandler = RichHandler(show_path=False, log_time_format="%X")
|
||||
consoleHandler.setFormatter(logging.Formatter("[%(name)s] %(message)s"))
|
||||
if verbose:
|
||||
consoleHandler.setLevel(logging.DEBUG)
|
||||
else:
|
||||
|
||||
@@ -3,4 +3,4 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
MVT_VERSION = "3.0.0"
|
||||
MVT_VERSION = "2.7.0"
|
||||
|
||||
52
src/mvt/common/virustotal.py
Normal file
52
src/mvt/common/virustotal.py
Normal file
@@ -0,0 +1,52 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2023 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
import requests
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
MVT_VT_API_KEY = "MVT_VT_API_KEY"
|
||||
|
||||
|
||||
class VTNoKey(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class VTQuotaExceeded(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def virustotal_lookup(file_hash: str):
|
||||
if MVT_VT_API_KEY not in os.environ:
|
||||
raise VTNoKey(
|
||||
"No VirusTotal API key provided: to use VirusTotal "
|
||||
"lookups please provide your API key with "
|
||||
"`export MVT_VT_API_KEY=<key>`"
|
||||
)
|
||||
|
||||
headers = {
|
||||
"User-Agent": "VirusTotal",
|
||||
"Content-Type": "application/json",
|
||||
"x-apikey": os.environ[MVT_VT_API_KEY],
|
||||
}
|
||||
res = requests.get(
|
||||
f"https://www.virustotal.com/api/v3/files/{file_hash}", headers=headers
|
||||
)
|
||||
|
||||
if res.status_code == 200:
|
||||
report = res.json()
|
||||
return report["data"]
|
||||
|
||||
if res.status_code == 404:
|
||||
log.info("Could not find results for file with hash %s", file_hash)
|
||||
elif res.status_code == 429:
|
||||
raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key")
|
||||
else:
|
||||
raise Exception(f"Unexpected response from VirusTotal: {res.status_code}")
|
||||
|
||||
return None
|
||||
@@ -37,6 +37,8 @@ from mvt.common.help import (
|
||||
HELP_MSG_CHECK_IOCS,
|
||||
HELP_MSG_STIX2,
|
||||
HELP_MSG_CHECK_IOS_BACKUP,
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
from .cmd_check_backup import CmdIOSCheckBackup
|
||||
from .cmd_check_fs import CmdIOSCheckFS
|
||||
@@ -53,12 +55,37 @@ MVT_IOS_BACKUP_PASSWORD = "MVT_IOS_BACKUP_PASSWORD"
|
||||
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
|
||||
|
||||
|
||||
def _get_disable_flags(ctx):
|
||||
"""Helper function to safely get disable flags from context."""
|
||||
if ctx.obj is None:
|
||||
return False, False
|
||||
return (
|
||||
ctx.obj.get("disable_version_check", False),
|
||||
ctx.obj.get("disable_indicator_check", False),
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@click.group(invoke_without_command=False)
|
||||
def cli():
|
||||
logo()
|
||||
@click.option(
|
||||
"--disable-update-check", is_flag=True, help=HELP_MSG_DISABLE_UPDATE_CHECK
|
||||
)
|
||||
@click.option(
|
||||
"--disable-indicator-update-check",
|
||||
is_flag=True,
|
||||
help=HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
@click.pass_context
|
||||
def cli(ctx, disable_update_check, disable_indicator_update_check):
|
||||
ctx.ensure_object(dict)
|
||||
ctx.obj["disable_version_check"] = disable_update_check
|
||||
ctx.obj["disable_indicator_check"] = disable_indicator_update_check
|
||||
logo(
|
||||
disable_version_check=disable_update_check,
|
||||
disable_indicator_check=disable_indicator_update_check,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -219,6 +246,8 @@ def check_backup(
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -228,8 +257,11 @@ def check_backup(
|
||||
log.info("Checking iTunes backup located at: %s", backup_path)
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the backup produced %d detections!", cmd.detected_count
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -263,6 +295,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
module_name=module,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -272,8 +306,12 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
log.info("Checking iOS filesystem located at: %s", dump_path)
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
if cmd.detected_count > 0:
|
||||
log.warning(
|
||||
"The analysis of the iOS filesystem produced %d detections!",
|
||||
cmd.detected_count,
|
||||
)
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
@@ -293,7 +331,13 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
|
||||
|
||||
if list_modules:
|
||||
@@ -301,8 +345,6 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
return
|
||||
|
||||
cmd.run()
|
||||
cmd.show_alerts_brief()
|
||||
cmd.show_support_message()
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
|
||||
@@ -27,6 +27,8 @@ class CmdIOSCheckBackup(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -39,6 +41,8 @@ class CmdIOSCheckBackup(Command):
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-backup"
|
||||
|
||||
@@ -27,6 +27,8 @@ class CmdIOSCheckFS(Command):
|
||||
module_options: Optional[dict] = None,
|
||||
hashes: bool = False,
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -34,11 +36,12 @@ class CmdIOSCheckFS(Command):
|
||||
ioc_files=ioc_files,
|
||||
iocs=iocs,
|
||||
module_name=module_name,
|
||||
serial=serial,
|
||||
module_options=module_options,
|
||||
hashes=hashes,
|
||||
sub_command=sub_command,
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
)
|
||||
|
||||
self.name = "check-fs"
|
||||
|
||||
@@ -194,5 +194,41 @@
|
||||
{
|
||||
"identifier": "iPhone16,2",
|
||||
"description": "iPhone 15 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,1",
|
||||
"description": "iPhone 16 Pro"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,2",
|
||||
"description": "iPhone 16 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,3",
|
||||
"description": "iPhone 16"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,4",
|
||||
"description": "iPhone 16 Plus"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone17,5",
|
||||
"description": "iPhone 16e"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,1",
|
||||
"description": "iPhone 17 Pro"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,2",
|
||||
"description": "iPhone 17 Pro Max"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,3",
|
||||
"description": "iPhone 17"
|
||||
},
|
||||
{
|
||||
"identifier": "iPhone18,4",
|
||||
"description": "iPhone Air"
|
||||
}
|
||||
]
|
||||
|
||||
@@ -631,6 +631,10 @@
|
||||
"build": "16H81",
|
||||
"version": "12.5.7"
|
||||
},
|
||||
{
|
||||
"version": "12.5.8",
|
||||
"build": "16H88"
|
||||
},
|
||||
{
|
||||
"build": "17A577",
|
||||
"version": "13.0"
|
||||
@@ -891,6 +895,18 @@
|
||||
"version": "15.8.2",
|
||||
"build": "19H384"
|
||||
},
|
||||
{
|
||||
"version": "15.8.4",
|
||||
"build": "19H390"
|
||||
},
|
||||
{
|
||||
"version": "15.8.5",
|
||||
"build": "19H394"
|
||||
},
|
||||
{
|
||||
"version": "15.8.6",
|
||||
"build": "19H402"
|
||||
},
|
||||
{
|
||||
"build": "20A362",
|
||||
"version": "16.0"
|
||||
@@ -992,6 +1008,18 @@
|
||||
"version": "16.7.8",
|
||||
"build": "20H343"
|
||||
},
|
||||
{
|
||||
"version": "16.7.11",
|
||||
"build": "20H360"
|
||||
},
|
||||
{
|
||||
"version": "16.7.12",
|
||||
"build": "20H364"
|
||||
},
|
||||
{
|
||||
"version": "16.7.14",
|
||||
"build": "20H370"
|
||||
},
|
||||
{
|
||||
"version": "17.0",
|
||||
"build": "21A327"
|
||||
@@ -1076,6 +1104,10 @@
|
||||
"version": "17.6.1",
|
||||
"build": "21G101"
|
||||
},
|
||||
{
|
||||
"version": "17.7.7",
|
||||
"build": "21H433"
|
||||
},
|
||||
{
|
||||
"version": "18",
|
||||
"build": "22A3354"
|
||||
@@ -1103,5 +1135,77 @@
|
||||
{
|
||||
"version": "18.3",
|
||||
"build": "22D63"
|
||||
},
|
||||
{
|
||||
"version": "18.3.1",
|
||||
"build": "22D72"
|
||||
},
|
||||
{
|
||||
"version": "18.4",
|
||||
"build": "22E240"
|
||||
},
|
||||
{
|
||||
"version": "18.4.1",
|
||||
"build": "22E252"
|
||||
},
|
||||
{
|
||||
"version": "18.5",
|
||||
"build": "22F76"
|
||||
},
|
||||
{
|
||||
"version": "18.6",
|
||||
"build": "22G86"
|
||||
},
|
||||
{
|
||||
"version": "18.6.1",
|
||||
"build": "22G90"
|
||||
},
|
||||
{
|
||||
"version": "18.6.2",
|
||||
"build": "22G100"
|
||||
},
|
||||
{
|
||||
"version": "18.7",
|
||||
"build": "22H20"
|
||||
},
|
||||
{
|
||||
"version": "18.7.2",
|
||||
"build": "22H124"
|
||||
},
|
||||
{
|
||||
"version": "18.7.3",
|
||||
"build": "22H217"
|
||||
},
|
||||
{
|
||||
"version": "18.7.4",
|
||||
"build": "22H218"
|
||||
},
|
||||
{
|
||||
"version": "18.7.5",
|
||||
"build": "22H311"
|
||||
},
|
||||
{
|
||||
"version": "26",
|
||||
"build": "23A341"
|
||||
},
|
||||
{
|
||||
"version": "26.0.1",
|
||||
"build": "23A355"
|
||||
},
|
||||
{
|
||||
"version": "26.1",
|
||||
"build": "23B85"
|
||||
},
|
||||
{
|
||||
"version": "26.2",
|
||||
"build": "23C55"
|
||||
},
|
||||
{
|
||||
"version": "26.2.1",
|
||||
"build": "23C71"
|
||||
},
|
||||
{
|
||||
"version": "26.3",
|
||||
"build": "23D127"
|
||||
}
|
||||
]
|
||||
@@ -9,7 +9,6 @@ import plistlib
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.module import DatabaseNotFoundError
|
||||
from mvt.common.module_types import ModuleResults
|
||||
from mvt.ios.versions import get_device_desc_from_id, is_ios_version_outdated
|
||||
|
||||
from ..base import IOSExtraction
|
||||
@@ -25,7 +24,7 @@ class BackupInfo(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,14 +7,9 @@ import logging
|
||||
import os
|
||||
import plistlib
|
||||
from base64 import b64encode
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
ModuleResults,
|
||||
)
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -33,7 +28,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -44,7 +39,7 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
if not record["install_date"]:
|
||||
return {}
|
||||
|
||||
@@ -68,28 +63,28 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
|
||||
# Alert on any known malicious configuration profiles in the
|
||||
# indicator list.
|
||||
ioc_match = self.indicators.check_profile(
|
||||
result["plist"]["PayloadUUID"]
|
||||
)
|
||||
if ioc_match:
|
||||
warning_message = (
|
||||
f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"',
|
||||
ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
|
||||
if ioc:
|
||||
self.log.warning(
|
||||
"Found a known malicious configuration "
|
||||
'profile "%s" with UUID %s',
|
||||
result["plist"]["PayloadDisplayName"],
|
||||
result["plist"]["PayloadUUID"],
|
||||
)
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), warning_message, "", result
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
# Highlight suspicious configuration profiles which may be used
|
||||
# to hide notifications.
|
||||
if payload_content["PayloadType"] in ["com.apple.notificationsettings"]:
|
||||
warning_message = (
|
||||
f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}',
|
||||
self.log.warning(
|
||||
"Found a potentially suspicious configuration profile "
|
||||
'"%s" with payload type %s',
|
||||
result["plist"]["PayloadDisplayName"],
|
||||
payload_content["PayloadType"],
|
||||
)
|
||||
self.alertstore.medum(self.get_slug(), warning_message, "", result)
|
||||
self.alertstore.log_latest()
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def run(self) -> None:
|
||||
|
||||
@@ -13,11 +13,6 @@ from typing import Optional
|
||||
from mvt.common.module import DatabaseNotFoundError
|
||||
from mvt.common.url import URL
|
||||
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleAtomicResult,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -32,7 +27,7 @@ class Manifest(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -65,7 +60,7 @@ class Manifest(IOSExtraction):
|
||||
|
||||
return convert_unix_to_iso(timestamp_or_unix_time_int)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> []:
|
||||
records = []
|
||||
if "modified" not in record or "status_changed" not in record:
|
||||
return records
|
||||
@@ -100,10 +95,8 @@ class Manifest(IOSExtraction):
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_file_path("/" + result["relative_path"])
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
|
||||
if self.indicators.check_file_path("/" + result["relative_path"]):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
rel_path = result["relative_path"].lower()
|
||||
@@ -114,15 +107,15 @@ class Manifest(IOSExtraction):
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_url(part)
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.high(
|
||||
self.get_slug(),
|
||||
f'Found mention of domain "{ioc_match.ioc.value}" in a backup file with path: {rel_path}',
|
||||
"",
|
||||
result,
|
||||
ioc = self.indicators.check_url(part)
|
||||
if ioc:
|
||||
self.log.warning(
|
||||
'Found mention of domain "%s" in a backup file with path: %s',
|
||||
ioc["value"],
|
||||
rel_path,
|
||||
)
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self) -> None:
|
||||
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
|
||||
|
||||
@@ -5,14 +5,9 @@
|
||||
|
||||
import logging
|
||||
import plistlib
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_datetime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleAtomicResult,
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
)
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -34,7 +29,7 @@ class ProfileEvents(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -45,7 +40,7 @@ class ProfileEvents(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record.get("timestamp"),
|
||||
"module": self.__class__.__name__,
|
||||
@@ -56,27 +51,20 @@ class ProfileEvents(IOSExtraction):
|
||||
}
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
for result in self.results:
|
||||
message = f'On {result.get("timestamp")} process "{result.get("process")}" started operation "{result.get("operation")}" of profile "{result.get("profile_id")}"'
|
||||
self.alertstore.low(
|
||||
self.get_slug(), message, result.get("timestamp"), result
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc_match = self.indicators.check_process(result.get("process"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_process(result.get("process"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_profile(result.get("profile_id"))
|
||||
if ioc_match:
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
|
||||
ioc = self.indicators.check_profile(result.get("profile_id"))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
@staticmethod
|
||||
def parse_profile_events(file_data: bytes) -> list:
|
||||
@@ -121,4 +109,13 @@ class ProfileEvents(IOSExtraction):
|
||||
with open(events_file_path, "rb") as handle:
|
||||
self.results.extend(self.parse_profile_events(handle.read()))
|
||||
|
||||
for result in self.results:
|
||||
self.log.info(
|
||||
'On %s process "%s" started operation "%s" of profile "%s"',
|
||||
result.get("timestamp"),
|
||||
result.get("process"),
|
||||
result.get("operation"),
|
||||
result.get("profile_id"),
|
||||
)
|
||||
|
||||
self.log.info("Extracted %d profile events", len(self.results))
|
||||
|
||||
@@ -11,8 +11,7 @@ import sqlite3
|
||||
import subprocess
|
||||
from typing import Iterator, Optional, Union
|
||||
|
||||
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError
|
||||
from mvt.common.module import MVTModule, ModuleResults
|
||||
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError, MVTModule
|
||||
|
||||
|
||||
class IOSExtraction(MVTModule):
|
||||
@@ -26,7 +25,7 @@ class IOSExtraction(MVTModule):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
|
||||
@@ -7,14 +7,9 @@ import copy
|
||||
import logging
|
||||
import plistlib
|
||||
import sqlite3
|
||||
from typing import Optional
|
||||
from typing import Optional, Union
|
||||
|
||||
from mvt.common.utils import convert_mactime_to_iso
|
||||
from mvt.common.module_types import (
|
||||
ModuleResults,
|
||||
ModuleSerializedResult,
|
||||
ModuleAtomicResult,
|
||||
)
|
||||
|
||||
from ..base import IOSExtraction
|
||||
|
||||
@@ -34,7 +29,7 @@ class Analytics(IOSExtraction):
|
||||
results_path: Optional[str] = None,
|
||||
module_options: Optional[dict] = None,
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
results: ModuleResults = [],
|
||||
results: Optional[list] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
file_path=file_path,
|
||||
@@ -45,7 +40,7 @@ class Analytics(IOSExtraction):
|
||||
results=results,
|
||||
)
|
||||
|
||||
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
return {
|
||||
"timestamp": record["isodate"],
|
||||
"module": self.__class__.__name__,
|
||||
@@ -62,26 +57,24 @@ class Analytics(IOSExtraction):
|
||||
if not isinstance(value, str):
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_process(value)
|
||||
if ioc_match:
|
||||
warning_message = (
|
||||
f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}',
|
||||
ioc = self.indicators.check_process(value)
|
||||
if ioc:
|
||||
self.log.warning(
|
||||
'Found mention of a malicious process "%s" in %s file at %s',
|
||||
value,
|
||||
result["artifact"],
|
||||
result["isodate"],
|
||||
)
|
||||
new_result = copy.copy(result)
|
||||
new_result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), warning_message, "", new_result
|
||||
)
|
||||
self.alertstore.log_latest()
|
||||
new_result["matched_indicator"] = ioc
|
||||
self.detected.append(new_result)
|
||||
continue
|
||||
|
||||
ioc_match = self.indicators.check_url(value)
|
||||
if ioc_match:
|
||||
ioc = self.indicators.check_url(value)
|
||||
if ioc:
|
||||
new_result = copy.copy(result)
|
||||
result["matched_indicator"] = ioc_match.ioc
|
||||
self.alertstore.critical(
|
||||
self.get_slug(), ioc_match.message, "", new_result
|
||||
)
|
||||
new_result["matched_indicator"] = ioc
|
||||
self.detected.append(new_result)
|
||||
|
||||
def _extract_analytics_data(self):
|
||||
artifact = self.file_path.split("/")[-1]
|
||||
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user