Compare commits

..

1 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
95b2f04db6 WIP for Triangulation post-processing module 2023-06-28 21:46:18 +02:00
327 changed files with 6641 additions and 23690 deletions

View File

@@ -1,11 +0,0 @@
# To get started with Dependabot version updates, you'll need to specify which
# package ecosystems to update and where the package manifests are located.
# Please see the documentation for all configuration options:
# https://docs.github.com/code-security/dependabot/dependabot-version-updates/configuration-options-for-the-dependabot.yml-file
version: 2
updates:
- package-ecosystem: "pip" # See documentation for possible values
directory: "/" # Location of package manifests
schedule:
interval: "weekly"

View File

@@ -1,19 +0,0 @@
name: Add issue to project
on:
issues:
types:
- opened
- reopened
jobs:
add-to-project:
name: Add issue to project
runs-on: ubuntu-latest
steps:
- uses: actions/add-to-project@v0.5.0
with:
# You can target a project in a different organization
# to the issue
project-url: https://github.com/orgs/mvt-project/projects/1
github-token: ${{ secrets.ADD_TO_PROJECT_PAT }}

View File

@@ -1,23 +0,0 @@
name: Mypy
on: workflow_dispatch
jobs:
mypy_py3:
name: Mypy check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
with:
python-version: 3.9
cache: 'pip'
- name: Checkout
uses: actions/checkout@master
- name: Install Dependencies
run: |
pip install mypy
- name: mypy
run: |
make mypy

View File

@@ -1,61 +0,0 @@
#
name: Create and publish a Docker image
# Configures this workflow to run every time a release is published.
on:
workflow_dispatch:
release:
types: [published]
# Defines two custom environment variables for the workflow. These are used for the Container registry domain, and a name for the Docker image that this workflow builds.
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
# There is a single job in this workflow. It's configured to run on the latest available version of Ubuntu.
jobs:
build-and-push-image:
runs-on: ubuntu-latest
# Sets the permissions granted to the `GITHUB_TOKEN` for the actions in this job.
permissions:
contents: read
packages: write
attestations: write
id-token: write
#
steps:
- name: Checkout repository
uses: actions/checkout@v4
# Uses the `docker/login-action` action to log in to the Container registry registry using the account and password that will publish the packages. Once published, the packages are scoped to the account defined here.
- name: Log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
# This step uses [docker/metadata-action](https://github.com/docker/metadata-action#about) to extract tags and labels that will be applied to the specified image. The `id` "meta" allows the output of this step to be referenced in a subsequent step. The `images` value provides the base name for the tags and labels.
- name: Extract metadata (tags, labels) for Docker
id: meta
uses: docker/metadata-action@9ec57ed1fcdbf14dcef7dfbe97b2010124a938b7
with:
images: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}
# This step uses the `docker/build-push-action` action to build the image, based on your repository's `Dockerfile`. If the build succeeds, it pushes the image to GitHub Packages.
# It uses the `context` parameter to define the build's context as the set of files located in the specified path. For more information, see "[Usage](https://github.com/docker/build-push-action#usage)" in the README of the `docker/build-push-action` repository.
# It uses the `tags` and `labels` parameters to tag and label the image with the output from the "meta" step.
- name: Build and push Docker image
id: push
uses: docker/build-push-action@f2a1d5e99d037542a71f64918e516c093c6f3fc4
with:
context: .
push: true
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}
# This step generates an artifact attestation for the image, which is an unforgeable statement about where and how it was built. It increases supply chain security for people who consume the image. For more information, see "[AUTOTITLE](/actions/security-guides/using-artifact-attestations-to-establish-provenance-for-builds)."
- name: Generate artifact attestation
uses: actions/attest-build-provenance@v1
with:
subject-name: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME}}
subject-digest: ${{ steps.push.outputs.digest }}
push-to-registry: true

43
.github/workflows/python-package.yml vendored Normal file
View File

@@ -0,0 +1,43 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: CI
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.8', '3.9', '3.10']
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
run: |
python -m pip install --upgrade setuptools
python -m pip install --upgrade pip
python -m pip install flake8 pytest safety stix2 pytest-mock
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
python -m pip install .
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E9,F63,F7,F82 --show-source --statistics
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Safety checks
run: safety check
- name: Test with pytest
run: pytest

View File

@@ -1,22 +1,16 @@
name: Ruff
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
on: [push]
jobs:
ruff_py3:
name: Ruff syntax check
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Setup Python
uses: actions/setup-python@v4
uses: actions/setup-python@v1
with:
python-version: 3.9
cache: 'pip'
architecture: x64
- name: Checkout
uses: actions/checkout@master
- name: Install Dependencies
@@ -24,4 +18,4 @@ jobs:
pip install ruff
- name: ruff
run: |
make ruff
ruff check .

View File

@@ -2,8 +2,8 @@
Python script to download the Apple RSS feed and parse it.
"""
import json
import os
import json
import urllib.request
from xml.dom.minidom import parseString
@@ -12,7 +12,7 @@ from packaging import version
def download_apple_rss(feed_url):
with urllib.request.urlopen(feed_url) as f:
rss_feed = f.read().decode("utf-8")
rss_feed = f.read().decode('utf-8')
print("Downloaded RSS feed from Apple.")
return rss_feed
@@ -27,34 +27,24 @@ def parse_latest_ios_versions(rss_feed_text):
continue
import re
build_match = re.match(
r"iOS (?P<version>[\d\.]+) (?P<beta>beta )?(\S*)?\((?P<build>.*)\)", title
)
build_match = re.match(r"iOS (?P<version>[\d\.]+) (?P<beta>beta )?(\S*)?\((?P<build>.*)\)", title)
if not build_match:
print("Could not parse iOS build:", title)
continue
# Handle iOS beta releases
release_info = build_match.groupdict()
release_beta = release_info.pop("beta")
if release_beta:
if release_info["beta"]:
print("Skipping beta release:", title)
continue
# Some iOS releases have multiple build number for different hardware models.
# We will split these into separate entries and record each build number.
build_list = release_info.pop("build")
build_variants = build_list.split(" | ")
for build_number in build_variants:
release_info["build"] = build_number
latest_ios_versions.append(release_info)
release_info.pop("beta")
latest_ios_versions.append(release_info)
return latest_ios_versions
def update_mvt(mvt_checkout_path, latest_ios_versions):
version_path = os.path.join(mvt_checkout_path, "src/mvt/ios/data/ios_versions.json")
version_path = os.path.join(mvt_checkout_path, "mvt/ios/data/ios_versions.json")
with open(version_path, "r") as version_file:
current_versions = json.load(version_file)
@@ -72,22 +62,16 @@ def update_mvt(mvt_checkout_path, latest_ios_versions):
print("No new iOS versions found.")
else:
print("Found {} new iOS versions.".format(new_entry_count))
new_version_list = sorted(
current_versions, key=lambda x: version.Version(x["version"])
)
new_version_list = sorted(current_versions, key=lambda x: version.Version(x["version"]))
with open(version_path, "w") as version_file:
json.dump(new_version_list, version_file, indent=4)
def main():
print("Downloading RSS feed...")
mvt_checkout_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../")
)
mvt_checkout_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "../../../"))
rss_feed = download_apple_rss(
"https://developer.apple.com/news/releases/rss/releases.rss"
)
rss_feed = download_apple_rss("https://developer.apple.com/news/releases/rss/releases.rss")
latest_ios_version = parse_latest_ios_versions(rss_feed)
update_mvt(mvt_checkout_path, latest_ios_version)

View File

@@ -1,38 +0,0 @@
name: Tests
on:
push:
branches: [ main ]
pull_request:
branches: [ main ]
jobs:
build:
name: Run Python Tests
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
python-version: ['3.10', '3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}
- name: Install Python dependencies
run: |
make install
make test-requirements
- name: Test with pytest
run: |
set -o pipefail
make test-ci | tee pytest-coverage.txt
- name: Pytest coverage comment
continue-on-error: true # Workflows running on a fork can't post comments
uses: MishaKav/pytest-coverage-comment@main
if: github.event_name == 'pull_request'
with:
pytest-coverage-path: ./pytest-coverage.txt
junitxml-path: ./pytest.xml

View File

@@ -21,7 +21,6 @@ jobs:
title: '[auto] Update iOS releases and versions'
commit-message: Add new iOS versions and build numbers
branch: auto/add-new-ios-releases
draft: true
body: |
This is an automated pull request to update the iOS releases and version numbers.
add-paths: |

2
.gitignore vendored
View File

@@ -50,8 +50,6 @@ coverage.xml
*.py,cover
.hypothesis/
.pytest_cache/
pytest-coverage.txt
pytest.xml
# Translations
*.mo

View File

@@ -5,15 +5,11 @@
# Required
version: 2
build:
os: "ubuntu-22.04"
tools:
python: "3.11"
mkdocs:
configuration: mkdocs.yml
# Optionally set the version of Python and requirements required to build your docs
python:
version: 3.7
install:
- requirements: docs/requirements.txt

View File

@@ -1,11 +0,0 @@
# Safety Security and License Configuration file
# We recommend checking this file into your source control in the root of your Python project
# If this file is named .safety-policy.yml and is in the same directory where you run `safety check` it will be used by default.
# Otherwise, you can use the flag `safety check --policy-file <path-to-this-file>` to specify a custom location and name for the file.
# To validate and review your policy file, run the validate command: `safety validate policy_file --path <path-to-this-file>`
security: # configuration for the `safety check` command
ignore-vulnerabilities: # Here you can list multiple specific vulnerabilities you want to ignore (optionally for a time period)
67599: # Example vulnerability ID
reason: disputed, inapplicable
70612:
reason: disputed, inapplicable

View File

@@ -1,65 +1,19 @@
# Contributing to Mobile Verification Toolkit (MVT)
# Contributing
We greatly appreciate contributions to MVT!
Your involvement, whether through identifying issues, improving functionality, or enhancing documentation, is very much appreciated. To ensure smooth collaboration and a welcoming environment, we've outlined some key guidelines for contributing below.
## Getting started
Contributing to an open-source project like MVT might seem overwhelming at first, but we're here to support you!
Whether you're a technologist, a frontline human rights defender, a field researcher, or someone new to consensual spyware forensics, there are many ways to make meaningful contributions.
Here's how you can get started:
1. **Explore the codebase:**
- Browse the repository to get familar with MVT. Many MVT modules are simple in functionality and easy to understand.
- Look for `TODO:` or `FIXME:` comments in the code for areas that need attention.
2. **Check Github issues:**
- Look for issues tagged with ["help wanted"](https://github.com/mvt-project/mvt/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22) or ["good first issue"](https://github.com/mvt-project/mvt/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22) to find tasks that are beginner-friendly or where input from the community would be helpful.
3. **Ask for guidance:**
- If you're unsure where to start, feel free to open a [discussion](https://github.com/mvt-project/mvt/discussions) or comment on an issue.
## How to contribute:
1. **Report issues:**
- Found a bug? Please check existing issues to see if it's already reported. If not, open a new issue. Mobile operating systems and databases are constantly evolving, an new errors may appear spontaniously in new app versions.
**Please provide as much information as possible about the prodblem including: any error messages, steps to reproduce the problem, and any logs or screenshots that can help.**
Thank you for your interest in contributing to Mobile Verification Toolkit (MVT)! Your help is very much appreciated.
2. **Suggest features:**
- If you have an idea for new functionality, create a feature request issue and describe your proposal.
## Where to start
3. **Submit code:**
- Fork the repository and create a new branch for your changes.
- Ensure your changes align with the code style guidelines (see below).
- Open a pull request (PR) with a clear description of your changes and link it to any relevant issues.
Starting to contribute to a somewhat complex project like MVT might seem intimidating. Unless you have specific ideas of new functionality you would like to submit, some good starting points are searching for `TODO:` and `FIXME:` comments throughout the code. Alternatively you can check if any GitHub issues existed marked with the ["help wanted"](https://github.com/mvt-project/mvt/issues?q=is%3Aissue+is%3Aopen+label%3A%22help+wanted%22) tag.
4. **Documentation contributions:**
- Improving documentation is just as valuable as contributing code! If you notice gaps or inaccuracies in the documentation, feel free to submit changes or suggest updates.
## Code style
Please follow these code style guidelines for consistency and readability:
- **Indentation**: use 4 spaces per tab.
- **Quotes**: Use double quotes (`"`) by default. Use single quotes (`'`) for nested strings instead of escaping (`\"`), or when using f-formatting.
- **Maximum line length**:
- Aim for lines no longer than 80 characters.
- Exceptions are allowed for long log lines or strings, which may extend up to 100 characters.
- Wrap lines that exceed 100 characters.
When contributing code to
Follow [PEP 8 guidelines](https://peps.python.org/pep-0008/) for indentation and overall Python code style. All MVT code is automatically linted with [Ruff](https://github.com/astral-sh/ruff) before merging.
- **Indentation**: we use 4-spaces tabs.
Please check your code before opening a pull request by running `make ruff`
- **Quotes**: we use double quotes (`"`) as a default. Single quotes (`'`) can be favored with nested strings instead of escaping (`\"`), or when using f-formatting.
## Community and support
We aim to create a supportive and collaborative environment for all contributors. If you run into any challenges, feel free to reach out through the discussions or issues section of the repository.
Your contributions, big or small, help improve MVT and are always appreciated.
- **Maximum line length**: we strongly encourage to respect a 80 characters long lines and to follow [PEP8 indentation guidelines](https://peps.python.org/pep-0008/#indentation) when having to wrap. However, if breaking at 80 is not possible or is detrimental to the readability of the code, exceptions are tolerated. For example, long log lines, or long strings can be extended to 100 characters long. Please hard wrap anything beyond 100 characters.

View File

@@ -1,158 +1,79 @@
# Base image for building libraries
# ---------------------------------
FROM ubuntu:22.04 as build-base
FROM ubuntu:22.04
ARG DEBIAN_FRONTEND=noninteractive
# Ref. https://github.com/mvt-project/mvt
# Install build tools and dependencies
RUN apt-get update \
&& apt-get install -y \
LABEL url="https://mvt.re"
LABEL vcs-url="https://github.com/mvt-project/mvt"
LABEL description="MVT is a forensic tool to look for signs of infection in smartphone devices."
ENV PIP_NO_CACHE_DIR=1
ENV DEBIAN_FRONTEND=noninteractive
# Fixing major OS dependencies
# ----------------------------
RUN apt update \
&& apt install -y python3 python3-pip libusb-1.0-0-dev wget unzip default-jre-headless adb \
# Install build tools for libimobiledevice
# ----------------------------------------
build-essential \
checkinstall \
git \
autoconf \
automake \
libtool-bin \
pkg-config \
libcurl4-openssl-dev \
libusb-1.0-0-dev \
libplist-dev \
libusbmuxd-dev \
libssl-dev \
udev \
&& rm -rf /var/lib/apt/lists/*
sqlite3 \
pkg-config \
# libplist
# Clean up
# --------
FROM build-base as build-libplist
# Build
RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libplist
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /var/cache/apt
# libimobiledevice-glue
# ---------------------
FROM build-base as build-libimobiledevice-glue
# Build libimobiledevice
# ----------------------
RUN git clone https://github.com/libimobiledevice/libplist \
&& git clone https://github.com/libimobiledevice/libimobiledevice-glue \
&& git clone https://github.com/libimobiledevice/libusbmuxd \
&& git clone https://github.com/libimobiledevice/libimobiledevice \
&& git clone https://github.com/libimobiledevice/usbmuxd \
# Install dependencies
COPY --from=build-libplist /build /
&& cd libplist && ./autogen.sh && make && make install && ldconfig \
# Build
RUN git clone https://github.com/libimobiledevice/libimobiledevice-glue && cd libimobiledevice-glue \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libimobiledevice-glue
&& cd ../libimobiledevice-glue && PKG_CONFIG_PATH=/usr/local/lib/pkgconfig ./autogen.sh --prefix=/usr && make && make install && ldconfig \
&& cd ../libusbmuxd && PKG_CONFIG_PATH=/usr/local/lib/pkgconfig ./autogen.sh && make && make install && ldconfig \
# libtatsu
# --------
FROM build-base as build-libtatsu
&& cd ../libimobiledevice && PKG_CONFIG_PATH=/usr/local/lib/pkgconfig ./autogen.sh --enable-debug && make && make install && ldconfig \
# Install dependencies
COPY --from=build-libplist /build /
&& cd ../usbmuxd && PKG_CONFIG_PATH=/usr/local/lib/pkgconfig ./autogen.sh --prefix=/usr --sysconfdir=/etc --localstatedir=/var --runstatedir=/run && make && make install \
# Build
RUN git clone https://github.com/libimobiledevice/libtatsu && cd libtatsu \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libtatsu
# Clean up.
&& cd .. && rm -rf libplist libimobiledevice-glue libusbmuxd libimobiledevice usbmuxd
# libusbmuxd
# ----------
FROM build-base as build-libusbmuxd
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
# Build
RUN git clone https://github.com/libimobiledevice/libusbmuxd && cd libusbmuxd \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libusbmuxd
# libimobiledevice
# ----------------
FROM build-base as build-libimobiledevice
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libtatsu /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libusbmuxd /build /
# Build
RUN git clone https://github.com/libimobiledevice/libimobiledevice && cd libimobiledevice \
&& ./autogen.sh --enable-debug && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libimobiledevice
# usbmuxd
# -------
FROM build-base as build-usbmuxd
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libusbmuxd /build /
COPY --from=build-libimobiledevice /build /
# Build
RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
&& ./autogen.sh --sysconfdir=/etc --localstatedir=/var --runstatedir=/run && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf usbmuxd && mv /build/lib /build/usr/lib
# Create main image
FROM ubuntu:24.04 as main
LABEL org.opencontainers.image.url="https://mvt.re"
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
LABEL org.opencontainers.image.source="https://github.com/mvt-project/mvt"
LABEL org.opencontainers.image.title="Mobile Verification Toolkit"
LABEL org.opencontainers.image.description="MVT is a forensic tool to look for signs of infection in smartphone devices."
LABEL org.opencontainers.image.licenses="MVT License 1.1"
LABEL org.opencontainers.image.base.name=docker.io/library/ubuntu:22.04
# Install runtime dependencies
ARG DEBIAN_FRONTEND=noninteractive
RUN apt-get update \
&& apt-get install -y \
adb \
default-jre-headless \
libcurl4 \
libssl3 \
libusb-1.0-0 \
python3 \
sqlite3
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libtatsu /build /
COPY --from=build-libusbmuxd /build /
COPY --from=build-libimobiledevice /build /
COPY --from=build-usbmuxd /build /
# Install mvt using the locally checked out source
COPY . mvt/
RUN apt-get update \
&& apt-get install -y git python3-pip \
&& PIP_NO_CACHE_DIR=1 pip3 install --break-system-packages ./mvt \
&& apt-get remove -y python3-pip git && apt-get autoremove -y \
&& rm -rf /var/lib/apt/lists/* \
&& rm -rf mvt
# Installing MVT
# --------------
RUN pip3 install mvt
# Installing ABE
ADD --checksum=sha256:a20e07f8b2ea47620aff0267f230c3f1f495f097081fd709eec51cf2a2e11632 \
https://github.com/nelenkov/android-backup-extractor/releases/download/master-20221109063121-8fdfc5e/abe.jar /opt/abe/abe.jar
# --------------
RUN mkdir /opt/abe \
&& wget https://github.com/nelenkov/android-backup-extractor/releases/download/20210709062403-4c55371/abe.jar -O /opt/abe/abe.jar \
# Create alias for abe
RUN echo 'alias abe="java -jar /opt/abe/abe.jar"' >> ~/.bashrc
&& echo 'alias abe="java -jar /opt/abe/abe.jar"' >> ~/.bashrc
# Generate adb key folder
RUN echo 'if [ ! -f /root/.android/adbkey ]; then adb keygen /root/.android/adbkey 2&>1 > /dev/null; fi' >> ~/.bashrc
RUN mkdir /root/.android
# Generate adb key folder
# ------------------------------
RUN mkdir /root/.android && adb keygen /root/.android/adbkey
# Setup investigations environment
# --------------------------------
RUN mkdir /home/cases
WORKDIR /home/cases
WORKDIR /home/cases
RUN echo 'echo "Mobile Verification Toolkit @ Docker\n------------------------------------\n\nYou can find information about how to use this image for Android (https://github.com/mvt-project/mvt/tree/master/docs/android) and iOS (https://github.com/mvt-project/mvt/tree/master/docs/ios) in the official docs of the project.\n"' >> ~/.bashrc \
&& echo 'echo "Note that to perform the debug via USB you might need to give the Docker image access to the USB using \"docker run -it --privileged -v /dev/bus/usb:/dev/bus/usb mvt\" or, preferably, the \"--device=\" parameter.\n"' >> ~/.bashrc

View File

@@ -1,36 +0,0 @@
# Create main image
FROM python:3.10.14-alpine3.20 as main
LABEL org.opencontainers.image.url="https://mvt.re"
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
LABEL org.opencontainers.image.source="https://github.com/mvt-project/mvt"
LABEL org.opencontainers.image.title="Mobile Verification Toolkit (Android)"
LABEL org.opencontainers.image.description="MVT is a forensic tool to look for signs of infection in smartphone devices."
LABEL org.opencontainers.image.licenses="MVT License 1.1"
LABEL org.opencontainers.image.base.name=docker.io/library/python:3.10.14-alpine3.20
# Install runtime dependencies
RUN apk add --no-cache \
android-tools \
git \
libusb \
openjdk11-jre-headless \
sqlite
# Install mvt
COPY ./ mvt
RUN apk add --no-cache --virtual .build-deps gcc musl-dev \
&& PIP_NO_CACHE_DIR=1 pip3 install ./mvt \
&& apk del .build-deps gcc musl-dev && rm -rf ./mvt
# Installing ABE
ADD --checksum=sha256:a20e07f8b2ea47620aff0267f230c3f1f495f097081fd709eec51cf2a2e11632 \
https://github.com/nelenkov/android-backup-extractor/releases/download/master-20221109063121-8fdfc5e/abe.jar /opt/abe/abe.jar
# Create alias for abe
RUN echo 'alias abe="java -jar /opt/abe/abe.jar"' >> ~/.bashrc
# Generate adb key folder
RUN echo 'if [ ! -f /root/.android/adbkey ]; then adb keygen /root/.android/adbkey 2&>1 > /dev/null; fi' >> ~/.bashrc
RUN mkdir /root/.android
ENTRYPOINT [ "/usr/local/bin/mvt-android" ]

View File

@@ -1,137 +0,0 @@
# Base image for building libraries
# ---------------------------------
FROM ubuntu:22.04 as build-base
ARG DEBIAN_FRONTEND=noninteractive
# Install build tools and dependencies
RUN apt-get update \
&& apt-get install -y \
build-essential \
git \
autoconf \
automake \
libtool-bin \
pkg-config \
libcurl4-openssl-dev \
libusb-1.0-0-dev \
libssl-dev \
udev \
&& rm -rf /var/lib/apt/lists/*
# libplist
# --------
FROM build-base as build-libplist
# Build
RUN git clone https://github.com/libimobiledevice/libplist && cd libplist \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libplist
# libimobiledevice-glue
# ---------------------
FROM build-base as build-libimobiledevice-glue
# Install dependencies
COPY --from=build-libplist /build /
# Build
RUN git clone https://github.com/libimobiledevice/libimobiledevice-glue && cd libimobiledevice-glue \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libimobiledevice-glue
# libtatsu
# --------
FROM build-base as build-libtatsu
# Install dependencies
COPY --from=build-libplist /build /
# Build
RUN git clone https://github.com/libimobiledevice/libtatsu && cd libtatsu \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libtatsu
# libusbmuxd
# ----------
FROM build-base as build-libusbmuxd
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
# Build
RUN git clone https://github.com/libimobiledevice/libusbmuxd && cd libusbmuxd \
&& ./autogen.sh && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libusbmuxd
# libimobiledevice
# ----------------
FROM build-base as build-libimobiledevice
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libtatsu /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libusbmuxd /build /
# Build
RUN git clone https://github.com/libimobiledevice/libimobiledevice && cd libimobiledevice \
&& ./autogen.sh --enable-debug && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf libimobiledevice
# usbmuxd
# -------
FROM build-base as build-usbmuxd
# Install dependencies
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libusbmuxd /build /
COPY --from=build-libimobiledevice /build /
# Build
RUN git clone https://github.com/libimobiledevice/usbmuxd && cd usbmuxd \
&& ./autogen.sh --sysconfdir=/etc --localstatedir=/var --runstatedir=/run && make -j "$(nproc)" && make install DESTDIR=/build \
&& cd .. && rm -rf usbmuxd && mv /build/lib /build/usr/lib
# Main image
# ----------
FROM python:3.10.14-alpine3.20 as main
LABEL org.opencontainers.image.url="https://mvt.re"
LABEL org.opencontainers.image.documentation="https://docs.mvt.re"
LABEL org.opencontainers.image.source="https://github.com/mvt-project/mvt"
LABEL org.opencontainers.image.title="Mobile Verification Toolkit (iOS)"
LABEL org.opencontainers.image.description="MVT is a forensic tool to look for signs of infection in smartphone devices."
LABEL org.opencontainers.image.licenses="MVT License 1.1"
LABEL org.opencontainers.image.base.name=docker.io/library/python:3.10.14-alpine3.20
# Install runtime dependencies
RUN apk add --no-cache \
gcompat \
libcurl \
libssl3 \
libusb \
sqlite
COPY --from=build-libplist /build /
COPY --from=build-libimobiledevice-glue /build /
COPY --from=build-libtatsu /build /
COPY --from=build-libusbmuxd /build /
COPY --from=build-libimobiledevice /build /
COPY --from=build-usbmuxd /build /
# Install mvt using the locally checked out source
COPY ./ mvt
RUN apk add --no-cache --virtual .build-deps git gcc musl-dev \
&& PIP_NO_CACHE_DIR=1 pip3 install ./mvt \
&& apk del .build-deps git gcc musl-dev && rm -rf ./mvt
ENTRYPOINT [ "/usr/local/bin/mvt-ios" ]

View File

@@ -1,44 +1,21 @@
PWD = $(shell pwd)
autofix:
ruff format .
ruff check --fix .
check: ruff mypy
ruff:
ruff format --check .
check:
flake8
pytest -q
ruff check -q .
mypy:
mypy
test:
python3 -m pytest
test-ci:
python3 -m pytest -v
install:
python3 -m pip install --upgrade -e .
test-requirements:
python3 -m pip install --upgrade -r test-requirements.txt
generate-proto-parsers:
# Generate python parsers for protobuf files
PROTO_FILES=$$(find src/mvt/android/parsers/proto/ -iname "*.proto"); \
protoc -Isrc/mvt/android/parsers/proto/ --python_betterproto_out=src/mvt/android/parsers/proto/ $$PROTO_FILES
clean:
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/src/mvt.egg-info
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/mvt.egg-info
dist:
python3 -m pip install --upgrade build
python3 -m build
python3 setup.py sdist bdist_wheel
upload:
python3 -m twine upload dist/*
test-upload:
python3 -m twine upload --repository testpypi dist/*
pylint:
pylint --rcfile=setup.cfg mvt

View File

@@ -6,33 +6,19 @@
[![](https://img.shields.io/pypi/v/mvt)](https://pypi.org/project/mvt/)
[![Documentation Status](https://readthedocs.org/projects/mvt/badge/?version=latest)](https://docs.mvt.re/en/latest/?badge=latest)
[![CI](https://github.com/mvt-project/mvt/actions/workflows/tests.yml/badge.svg)](https://github.com/mvt-project/mvt/actions/workflows/tests.yml)
[![CI](https://github.com/mvt-project/mvt/actions/workflows/python-package.yml/badge.svg)](https://github.com/mvt-project/mvt/actions/workflows/python-package.yml)
[![Downloads](https://pepy.tech/badge/mvt)](https://pepy.tech/project/mvt)
Mobile Verification Toolkit (MVT) is a collection of utilities to simplify and automate the process of gathering forensic traces helpful to identify a potential compromise of Android and iOS devices.
It has been developed and released by the [Amnesty International Security Lab](https://securitylab.amnesty.org) in July 2021 in the context of the [Pegasus Project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/). It continues to be maintained by Amnesty International and other contributors.
It has been developed and released by the [Amnesty International Security Lab](https://www.amnesty.org/en/tech/) in July 2021 in the context of the [Pegasus project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology and forensic evidence](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/).
> **Note**
> MVT is a forensic research tool intended for technologists and investigators. It requires understanding digital forensics and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek reputable expert assistance.
>
*Warning*: MVT is a forensic research tool intended for technologists and investigators. Using it requires understanding the basics of forensic analysis and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek expert assistance.
### Indicators of Compromise
MVT supports using public [indicators of compromise (IOCs)](https://github.com/mvt-project/mvt-indicators) to scan mobile devices for potential traces of targeting or infection by known spyware campaigns. This includes IOCs published by [Amnesty International](https://github.com/AmnestyTech/investigations/) and other research groups.
> **Warning**
> Public indicators of compromise are insufficient to determine that a device is "clean", and not targeted with a particular spyware tool. Reliance on public indicators alone can miss recent forensic traces and give a false sense of security.
>
> Reliable and comprehensive digital forensic support and triage requires access to non-public indicators, research and threat intelligence.
>
>Such support is available to civil society through [Amnesty International's Security Lab](https://securitylab.amnesty.org/get-help/?c=mvt_docs) or through our forensic partnership with [Access Nows Digital Security Helpline](https://www.accessnow.org/help/).
More information about using indicators of compromise with MVT is available in the [documentation](https://docs.mvt.re/en/latest/iocs/).
## Installation
MVT can be installed from sources or from [PyPI](https://pypi.org/project/mvt/) (you will need some dependencies, check the [documentation](https://docs.mvt.re/en/latest/install/)):
MVT can be installed from sources or from [PyPi](https://pypi.org/project/mvt/) (you will need some dependencies, check the [documentation](https://docs.mvt.re/en/latest/install/)):
```
pip3 install mvt

14
dev/mvt-android Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mvt import android
android.cli()

14
dev/mvt-ios Executable file
View File

@@ -0,0 +1,14 @@
#!/usr/bin/env python3
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2022 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mvt import ios
ios.cli()

View File

@@ -35,11 +35,7 @@ $ mvt-android check-backup --output /path/to/results/ /path/to/backup.ab
INFO [mvt.android.modules.backup.sms] Extracted a total of 64 SMS messages
```
If the backup is encrypted, MVT will prompt you to enter the password. A backup password can also be provided with the `--backup-password` command line option or through the `MVT_ANDROID_BACKUP_PASSWORD` environment variable. The same options can also be used to when analysing an encrypted backup collected through AndroidQF in the `mvt-android check-androidqf` command:
```bash
$ mvt-android check-backup --backup-password "password123" --output /path/to/results/ /path/to/backup.ab
```
If the backup is encrypted, MVT will prompt you to enter the password.
Through the `--iocs` argument you can specify a [STIX2](https://oasis-open.github.io/cti-documentation/stix/intro) file defining a list of malicious indicators to check against the records extracted from the backup by MVT. Any matches will be highlighted in the terminal output.

View File

@@ -1,6 +1,6 @@
# Downloading APKs from an Android phone
MVT allows you to attempt to download all available installed packages (APKs) from a device in order to further inspect them and potentially identify any which might be malicious in nature.
MVT allows to attempt to download all available installed packages (APKs) in order to further inspect them and potentially identify any which might be malicious in nature.
You can do so by launching the following command:

View File

@@ -1,43 +0,0 @@
# Command Completion
MVT utilizes the [Click](https://click.palletsprojects.com/en/stable/) library for creating its command line interface.
Click provides tab completion support for Bash (version 4.4 and up), Zsh, and Fish.
To enable it, you need to manually register a special function with your shell, which varies depending on the shell you are using.
The following describes how to generate the command completion scripts and add them to your shell configuration.
> **Note: You will need to start a new shell for the changes to take effect.**
### For Bash
```bash
# Generates bash completion scripts
echo "$(_MVT_IOS_COMPLETE=bash_source mvt-ios)" > ~/.mvt-ios-complete.bash &&
echo "$(_MVT_ANDROID_COMPLETE=bash_source mvt-android)" > ~/.mvt-android-complete.bash
```
Add the following to `~/.bashrc`:
```bash
# source mvt completion scripts
. ~/.mvt-ios-complete.bash && . ~/.mvt-android-complete.bash
```
### For Zsh
```bash
# Generates zsh completion scripts
echo "$(_MVT_IOS_COMPLETE=zsh_source mvt-ios)" > ~/.mvt-ios-complete.zsh &&
echo "$(_MVT_ANDROID_COMPLETE=zsh_source mvt-android)" > ~/.mvt-android-complete.zsh
```
Add the following to `~/.zshrc`:
```bash
# source mvt completion scripts
. ~/.mvt-ios-complete.zsh && . ~/.mvt-android-complete.zsh
```
For more information, visit the official [Click Docs](https://click.palletsprojects.com/en/stable/shell-completion/#enabling-completion).

View File

@@ -1,27 +0,0 @@
# Development
The Mobile Verification Toolkit team welcomes contributions of new forensic modules or other contributions which help improve the software.
## Testing
MVT uses `pytest` for unit and integration tests. Code style consistency is maintained with `flake8`, `ruff` and `black`. All can
be run automatically with:
```bash
make check
```
Run these tests before making new commits or opening pull requests.
## Profiling
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
take to avoid inefficient code paths as we add new modules.
MVT modules can be profiled with Python built-in `cProfile` by setting the `MVT_PROFILE` environment variable.
```bash
MVT_PROFILE=1 dev/mvt-ios check-backup test_backup
```
Open an issue or PR if you are encountering significant performance issues when analyzing a device with MVT.

View File

@@ -2,22 +2,7 @@ Using Docker simplifies having all the required dependencies and tools (includin
Install Docker following the [official documentation](https://docs.docker.com/get-docker/).
Once Docker is installed, you can run MVT by downloading a prebuilt MVT Docker image, or by building a Docker image yourself from the MVT source repo.
### Using the prebuilt Docker image
```bash
docker pull ghcr.io/mvt-project/mvt
```
You can then run the Docker container with:
```
docker run -it ghcr.io/mvt-project/mvt
```
### Build and run Docker image from source
Once installed, you can clone MVT's repository and build its Docker image:
```bash
git clone https://github.com/mvt-project/mvt.git
@@ -33,9 +18,6 @@ docker run -it mvt
If a prompt is spawned successfully, you can close it with `exit`.
## Docker usage with Android devices
If you wish to use MVT to test an Android device you will need to enable the container's access to the host's USB devices. You can do so by enabling the `--privileged` flag and mounting the USB bus device as a volume:
```bash

View File

@@ -6,9 +6,6 @@
Mobile Verification Toolkit (MVT) is a tool to facilitate the [consensual forensic analysis](introduction.md#consensual-forensics) of Android and iOS devices, for the purpose of identifying traces of compromise.
It has been developed and released by the [Amnesty International Security Lab](https://securitylab.amnesty.org) in July 2021 in the context of the [Pegasus Project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/). It continues to be maintained by Amnesty International and other contributors.
In this documentation you will find instructions on how to install and run the `mvt-ios` and `mvt-android` commands, and guidance on how to interpret the extracted results.
## Resources

View File

@@ -7,27 +7,11 @@ Before proceeding, please note that MVT requires Python 3.6+ to run. While it sh
First install some basic dependencies that will be necessary to build all required tools:
```bash
sudo apt install python3 python3-venv python3-pip sqlite3 libusb-1.0-0
sudo apt install python3 python3-pip libusb-1.0-0 sqlite3
```
*libusb-1.0-0* is not required if you intend to only use `mvt-ios` and not `mvt-android`.
(Recommended) Set up `pipx`
For Ubuntu 23.04 or above:
```bash
sudo apt install pipx
pipx ensurepath
```
For Ubuntu 22.04 or below:
```
python3 -m pip install --user pipx
python3 -m pipx ensurepath
```
Other distributions: check for a `pipx` or `python-pipx` via your package manager.
When working with Android devices you should additionally install [Android SDK Platform Tools](https://developer.android.com/studio/releases/platform-tools). If you prefer to install a package made available by your distribution of choice, please make sure the version is recent to ensure compatibility with modern Android devices.
## Dependencies on macOS
@@ -37,7 +21,7 @@ Running MVT on macOS requires Xcode and [homebrew](https://brew.sh) to be instal
In order to install dependencies use:
```bash
brew install python3 pipx libusb sqlite3
brew install python3 libusb sqlite3
```
*libusb* is not required if you intend to only use `mvt-ios` and not `mvt-android`.
@@ -58,47 +42,24 @@ It is recommended to try installing and running MVT from [Windows Subsystem Linu
## Installing MVT
### Installing from PyPI with pipx (recommended)
1. Install `pipx` following the instructions above for your OS/distribution. Make sure to run `pipx ensurepath` and open a new terminal window.
2. ```bash
pipx install mvt
```
If you haven't done so, you can add this to your `.bashrc` or `.zshrc` file in order to add locally installed Pypi binaries to your `$PATH`:
You now should have the `mvt-ios` and `mvt-android` utilities installed. If you run into problems with these commands not being found, ensure you have run `pipx ensurepath` and opened a new terminal window.
### Installing from PyPI directly into a virtual environment
You can use `pipenv`, `poetry` etc. for your virtual environment, but the provided example is with the built-in `venv` tool:
1. Create the virtual environment in a folder in the current directory named `env`:
```bash
python3 -m venv env
export PATH=$PATH:~/.local/bin
```
2. Activate the virtual environment:
Then you can install MVT directly from [pypi](https://pypi.org/project/mvt/)
```bash
source env/bin/activate
pip3 install mvt
```
3. Install `mvt` into the virtual environment:
```bash
pip install mvt
```
The `mvt-ios` and `mvt-android` utilities should now be available as commands whenever the virtual environment is active.
### Installing from git source with pipx
If you want to have the latest features in development, you can install MVT directly from the source code in git.
If you want to have the latest features in development, you can install MVT directly from the source code. If you installed MVT previously from pypi, you should first uninstall it using `pip3 uninstall mvt` and then install from the source code:
```bash
pipx install --force git+https://github.com/mvt-project/mvt.git
git clone https://github.com/mvt-project/mvt.git
cd mvt
pip3 install .
```
You now should have the `mvt-ios` and `mvt-android` utilities installed.
**Notes:**
1. The `--force` flag is necessary to force the reinstallation of the package.
2. To revert to using a PyPI version, it will be necessary to `pipx uninstall mvt` first.
## Setting up command completions
See ["Command completions"](command_completion.md)

View File

@@ -12,20 +12,6 @@ Mobile Verification Toolkit (MVT) is a collection of utilities designed to facil
MVT is a forensic research tool intended for technologists and investigators. Using it requires understanding the basics of forensic analysis and using command-line tools. MVT is not intended for end-user self-assessment. If you are concerned with the security of your device please seek expert assistance.
## Indicators of Compromise
MVT supports using [indicators of compromise (IOCs)](https://github.com/mvt-project/mvt-indicators) to scan mobile devices for potential traces of targeting or infection by known spyware campaigns. This includes IOCs published by [Amnesty International](https://github.com/AmnestyTech/investigations/) and other research groups.
!!! warning
Public indicators of compromise are insufficient to determine that a device is "clean", and not targeted with a particular spyware tool. Reliance on public indicators alone can miss recent forensic traces and give a false sense of security.
Reliable and comprehensive digital forensic support and triage requires access to non-public indicators, research and threat intelligence.
Such support is available to civil society through [Amnesty International's Security Lab](https://securitylab.amnesty.org/get-help/?c=mvt_docs) or [Access Nows Digital Security Helpline](https://www.accessnow.org/help/).
More information about using indicators of compromise with MVT is available in the [documentation](iocs.md).
## Consensual Forensics
While MVT is capable of extracting and processing various types of very personal records typically found on a mobile phone (such as calls history, SMS and WhatsApp messages, etc.), this is intended to help identify potential attack vectors such as malicious SMS messages leading to exploitation.

View File

@@ -34,13 +34,6 @@ It is also possible to load STIX2 files automatically from the environment varia
export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
```
## STIX2 Support
So far MVT implements only a subset of [STIX2 specifications](https://docs.oasis-open.org/cti/stix/v2.1/csprd01/stix-v2.1-csprd01.html):
* It only supports checks for one value (such as `[domain-name:value='DOMAIN']`) and not boolean expressions over multiple comparisons
* It only supports the following types: `domain-name:value`, `process:name`, `email-addr:value`, `file:name`, `file:path`, `file:hashes.md5`, `file:hashes.sha1`, `file:hashes.sha256`, `app:id`, `configuration-profile:id`, `android-property:name`, `url:value` (but each type will only be checked by a module if it is relevant to the type of data obtained)
## Known repositories of STIX2 IOCs
- The [Amnesty International investigations repository](https://github.com/AmnestyTech/investigations) contains STIX-formatted IOCs for:
@@ -50,9 +43,6 @@ So far MVT implements only a subset of [STIX2 specifications](https://docs.oasis
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/generated/stalkerware.stix2).
- We are also maintaining [a list of IOCs](https://github.com/mvt-project/mvt-indicators) in STIX format from public spyware campaigns.
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators from the [mvt-indicators](https://github.com/mvt-project/mvt-indicators/blob/main/indicators.yaml) repository and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators listed [here](https://github.com/mvt-project/mvt/blob/main/public_indicators.json) and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.

View File

@@ -10,7 +10,7 @@ To do that:
4. If you want to have a more accurate detection, ensure that the encrypted backup option is activated and choose a secure password for the backup.
5. Start the backup and wait for it to finish (this may take up to 30 minutes).
![](../../img/macos-backup.jpg)
![](../../../img/macos-backup.jpg)
_Source: [Apple Support](https://support.apple.com/en-us/HT211229)_
Once the backup is done, find its location and copy it to a place where it can be analyzed by MVT. On Windows, the backup can be stored either in `%USERPROFILE%\Apple\MobileSync\` or `%USERPROFILE%\AppData\Roaming\Apple Computer\MobileSync\`. On macOS, the backup is stored in `~/Library/Application Support/MobileSync/`.
@@ -25,13 +25,13 @@ On more recent MacOS versions, this feature is included in Finder. To do a backu
4. In the General tab, select `Back up all the data on your iPhone to this Mac` from the options under the Backups section.
5. Check the box that says `Encrypt local backup`. If it is your first time selecting this option, you may need to enter a password to encrypt the backup.
![](../../img/macos-backup2.png)
![](../../../img/macos-backup2.png)
_Source: [Apple Support](https://support.apple.com/en-us/HT211229)_
6. Click `Back Up Now` to start the back-up process.
7. The encrypted backup for your iPhone should now start. Once the process finishes, you can check the backup by opening `Finder`, clicking on the `General` tab, then click on `Manage Backups`. Now you should see a list of your backups like the image below:
![](../../img/macos-backups.png)
![](../../../img/macos-backups.png)
_Source: [Apple Support](https://support.apple.com/en-us/HT211229)_
If your backup has a lock next to it like in the image above, then the backup is encrypted. You should also see the date and time when the encrypted backup was created. The backup files are stored in `~/Library/Application Support/MobileSync/`.

View File

@@ -45,10 +45,10 @@ Once the idevice tools are available you can check if everything works fine by c
ideviceinfo
```
This should show many details on the connected iOS device. If you are connecting the device to your laptop for the first time, it will require to unlock and enter the PIN code on the mobile device. If it complains that no device is connected and the mobile device is indeed plugged in through the USB cable, you might need to do this first, although typically the pairing is automatically done when connecting the device:
This should some many details on the connected iOS device. If you are connecting the device to your laptop for the first time, it will require to unlock and enter the PIN code on the mobile device. If it complains that no device is connected and the mobile device is indeed plugged in through the USB cable, you might need to do this first, although typically the pairing is automatically done when connecting the device:
```bash
sudo usbmuxd -f -v
sudo usbmuxd -f -d
idevicepair pair
```

View File

@@ -142,16 +142,6 @@ If indicators are provided through the command-line, they are checked against th
---
### `global_preferences.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-check:
This JSON file is created by mvt-ios' `GlobalPreferences` module. The module extracts records from a Plist file located at */private/var/mobile/Library/Preferences/.GlobalPreferences.plist*, which contains a system preferences including if Lockdown Mode is enabled.
---
### `id_status_cache.json`
!!! info "Availability"

View File

@@ -1,5 +1,5 @@
mkdocs==1.6.1
mkdocs-autorefs==1.4.2
mkdocs-material==9.6.14
mkdocs-material-extensions==1.3.1
mkdocstrings==0.29.1
mkdocs==1.2.3
mkdocs-autorefs
mkdocs-material
mkdocs-material-extensions
mkdocstrings

View File

@@ -1,14 +1,14 @@
site_name: Mobile Verification Toolkit
repo_url: https://github.com/mvt-project/mvt
edit_uri: edit/main/docs/
copyright: Copyright &copy; 2021-2023 MVT Project Developers
copyright: Copyright &copy; 2021-2022 MVT Project Developers
site_description: Mobile Verification Toolkit Documentation
markdown_extensions:
- attr_list
- admonition
- pymdownx.emoji:
emoji_index: !!python/name:material.extensions.emoji.twemoji
emoji_generator: !!python/name:material.extensions.emoji.to_svg
emoji_index: !!python/name:materialx.emoji.twemoji
emoji_generator: !!python/name:materialx.emoji.to_svg
- pymdownx.superfences
- pymdownx.inlinehilite
- pymdownx.highlight:
@@ -46,5 +46,4 @@ nav:
- Check an Android Backup (SMS messages): "android/backup.md"
- Download APKs: "android/download_apks.md"
- Indicators of Compromise: "iocs.md"
- Development: "development.md"
- License: "license.md"

View File

@@ -1,4 +1,4 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

264
mvt/android/cli.py Normal file
View File

@@ -0,0 +1,264 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import click
from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_HASHES, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL,
HELP_MSG_VERBOSE)
from mvt.common.logo import logo
from mvt.common.updates import IndicatorsUpdates
from mvt.common.utils import init_logging, set_verbose_logging
from .cmd_check_adb import CmdAndroidCheckADB
from .cmd_check_androidqf import CmdAndroidCheckAndroidQF
from .cmd_check_backup import CmdAndroidCheckBackup
from .cmd_check_bugreport import CmdAndroidCheckBugreport
from .cmd_download_apks import DownloadAPKs
from .modules.adb import ADB_MODULES
from .modules.adb.packages import Packages
from .modules.backup import BACKUP_MODULES
from .modules.bugreport import BUGREPORT_MODULES
init_logging()
log = logging.getLogger("mvt")
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
#==============================================================================
# Main
#==============================================================================
@click.group(invoke_without_command=False)
def cli():
logo()
#==============================================================================
# Command: version
#==============================================================================
@cli.command("version", help="Show the currently installed version of MVT")
def version():
return
#==============================================================================
# Command: download-apks
#==============================================================================
@cli.command("download-apks", help="Download all or only non-system installed APKs",
context_settings=CONTEXT_SETTINGS)
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--all-apks", "-a", is_flag=True,
help="Extract all packages installed on the phone, including system packages")
@click.option("--virustotal", "-v", is_flag=True, help="Check packages on VirusTotal")
@click.option("--output", "-o", type=click.Path(exists=False),
help="Specify a path to a folder where you want to store the APKs")
@click.option("--from-file", "-f", type=click.Path(exists=True),
help="Instead of acquiring from phone, load an existing packages.json file for "
"lookups (mainly for debug purposes)")
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.pass_context
def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose):
set_verbose_logging(verbose)
try:
if from_file:
download = DownloadAPKs.from_json(from_file)
else:
# TODO: Do we actually want to be able to run without storing any
# file?
if not output:
log.critical("You need to specify an output folder with --output!")
ctx.exit(1)
download = DownloadAPKs(results_path=output, all_apks=all_apks)
if serial:
download.serial = serial
download.run()
packages_to_lookup = []
if all_apks:
packages_to_lookup = download.packages
else:
for package in download.packages:
if not package.get("system", False):
packages_to_lookup.append(package)
if len(packages_to_lookup) == 0:
return
if virustotal:
m = Packages()
m.check_virustotal(packages_to_lookup)
except KeyboardInterrupt:
print("")
ctx.exit(1)
#==============================================================================
# Command: check-adb
#==============================================================================
@cli.command("check-adb", help="Check an Android device over adb",
context_settings=CONTEXT_SETTINGS)
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.pass_context
def check_adb(ctx, serial, iocs, output, fast, list_modules, module, verbose):
set_verbose_logging(verbose)
cmd = CmdAndroidCheckADB(results_path=output, ioc_files=iocs,
module_name=module, serial=serial, fast_mode=fast)
if list_modules:
cmd.list_modules()
return
log.info("Checking Android device over debug bridge")
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android device produced %d detections!",
cmd.detected_count)
#==============================================================================
# Command: check-bugreport
#==============================================================================
@cli.command("check-bugreport", help="Check an Android Bug Report",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
@click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_path):
set_verbose_logging(verbose)
# Always generate hashes as bug reports are small.
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path,
results_path=output, ioc_files=iocs,
module_name=module, hashes=True)
if list_modules:
cmd.list_modules()
return
log.info("Checking Android bug report at path: %s", bugreport_path)
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android bug report produced %d detections!",
cmd.detected_count)
#==============================================================================
# Command: check-backup
#==============================================================================
@cli.command("check-backup", help="Check an Android Backup",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_backup(ctx, iocs, output, list_modules, verbose, backup_path):
set_verbose_logging(verbose)
# Always generate hashes as backups are generally small.
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs, hashes=True)
if list_modules:
cmd.list_modules()
return
log.info("Checking Android backup at path: %s", backup_path)
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android backup produced %d detections!",
cmd.detected_count)
#==============================================================================
# Command: check-androidqf
#==============================================================================
@cli.command("check-androidqf", help="Check data collected with AndroidQF",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("ANDROIDQF_PATH", type=click.Path(exists=True))
@click.pass_context
def check_androidqf(ctx, iocs, output, list_modules, module, hashes, verbose, androidqf_path):
set_verbose_logging(verbose)
cmd = CmdAndroidCheckAndroidQF(target_path=androidqf_path,
results_path=output, ioc_files=iocs,
module_name=module, hashes=hashes)
if list_modules:
cmd.list_modules()
return
log.info("Checking AndroidQF acquisition at path: %s", androidqf_path)
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the AndroidQF acquisition produced %d detections!",
cmd.detected_count)
#==============================================================================
# Command: check-iocs
#==============================================================================
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
cmd.modules = BACKUP_MODULES + ADB_MODULES + BUGREPORT_MODULES
if list_modules:
cmd.list_modules()
return
cmd.run()
#==============================================================================
# Command: download-iocs
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators",
context_settings=CONTEXT_SETTINGS)
def download_indicators():
ioc_updates = IndicatorsUpdates()
ioc_updates.update()

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -14,6 +14,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -21,17 +22,11 @@ class CmdAndroidCheckADB(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
module_options=module_options,
log=log,
)
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-adb"
self.modules = ADB_MODULES

View File

@@ -0,0 +1,34 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.common.command import Command
from .modules.androidqf import ANDROIDQF_MODULES
log = logging.getLogger(__name__)
class CmdAndroidCheckAndroidQF(Command):
def __init__(
self,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -11,14 +11,12 @@ import tarfile
from pathlib import Path
from typing import List, Optional
from rich.prompt import Prompt
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
)
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.common.command import Command
from .modules.backup import BACKUP_MODULES
@@ -27,6 +25,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -34,19 +33,13 @@ class CmdAndroidCheckBackup(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: bool = False,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
log=log,
)
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
self.name = "check-backup"
self.modules = BACKUP_MODULES
@@ -71,12 +64,7 @@ class CmdAndroidCheckBackup(Command):
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(
log, self.module_options
)
if not password:
log.critical("No backup password provided.")
sys.exit(1)
password = Prompt.ask("Enter backup password", password=True)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
@@ -97,18 +85,16 @@ class CmdAndroidCheckBackup(Command):
self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles:
self.backup_files.append(
os.path.relpath(os.path.join(root, fname), self.target_path)
)
self.backup_files.append(os.path.relpath(os.path.join(root, fname),
self.target_path))
else:
log.critical(
"Invalid backup path, path should be a folder or an "
"Android Backup (.ab) file"
)
log.critical("Invalid backup path, path should be a folder or an "
"Android Backup (.ab) file")
sys.exit(1)
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files)
else:
module.from_ab(self.target_path, self.backup_archive, self.backup_files)
module.from_ab(self.target_path, self.backup_archive,
self.backup_files)

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -18,6 +18,7 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -25,19 +26,13 @@ class CmdAndroidCheckBugreport(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: bool = False,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
module_options=module_options,
hashes=hashes,
log=log,
)
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
@@ -60,9 +55,8 @@ class CmdAndroidCheckBugreport(Command):
parent_path = Path(self.target_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
file_path = os.path.relpath(
os.path.join(root, file_name), parent_path
)
file_path = os.path.relpath(os.path.join(root, file_name),
parent_path)
self.bugreport_files.append(file_path)
def module_init(self, module: BugReportModule) -> None: # type: ignore[override]

View File

@@ -1,12 +1,12 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
import logging
import os
from typing import Callable, Optional, Union
from typing import Callable, Optional
from rich.progress import track
@@ -26,7 +26,7 @@ class DownloadAPKs(AndroidExtraction):
def __init__(
self,
results_path: Optional[str] = None,
all_apks: bool = False,
all_apks: Optional[bool] = False,
packages: Optional[list] = None,
) -> None:
"""Initialize module.
@@ -52,9 +52,7 @@ class DownloadAPKs(AndroidExtraction):
packages = json.load(handle)
return cls(packages=packages)
def pull_package_file(
self, package_name: str, remote_path: str
) -> Union[str, None]:
def pull_package_file(self, package_name: str, remote_path: str) -> None:
"""Pull files related to specific package from the device.
:param package_name: Name of the package to download
@@ -68,31 +66,27 @@ class DownloadAPKs(AndroidExtraction):
if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}.apk"
)
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}.apk")
name_counter = 0
while True:
if not os.path.exists(local_path):
break
name_counter += 1
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}_{name_counter}.apk"
)
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}_{name_counter}.apk")
try:
self._adb_download(remote_path, local_path)
except InsufficientPrivileges:
log.error(
"Unable to pull package file from %s: insufficient privileges, "
"it might be a system app",
remote_path,
)
log.error("Unable to pull package file from %s: insufficient privileges, "
"it might be a system app", remote_path)
self._adb_reconnect()
return None
except Exception as exc:
log.exception("Failed to pull package file from %s: %s", remote_path, exc)
log.exception("Failed to pull package file from %s: %s",
remote_path, exc)
self._adb_reconnect()
return None
@@ -112,10 +106,10 @@ class DownloadAPKs(AndroidExtraction):
self.packages = m.results
def pull_packages(self) -> None:
"""Download all files of all selected packages from the device."""
log.info(
"Starting extraction of installed APKs at folder %s", self.results_path
)
"""Download all files of all selected packages from the device.
"""
log.info("Starting extraction of installed APKs at folder %s",
self.results_path)
# If the user provided the flag --all-apks we select all packages.
packages_selection = []
@@ -129,10 +123,8 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False):
packages_selection.append(package)
log.info(
'Selected only %d packages which are not marked as "system"',
len(packages_selection),
)
log.info("Selected only %d packages which are not marked as \"system\"",
len(packages_selection))
if len(packages_selection) == 0:
log.info("No packages were selected for download")
@@ -144,26 +136,19 @@ class DownloadAPKs(AndroidExtraction):
if not os.path.exists(self.results_path_apks):
os.makedirs(self.results_path_apks, exist_ok=True)
for i in track(
range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages...",
):
for i in track(range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages..."):
package = packages_selection[i]
log.info(
"[%d/%d] Package: %s",
i,
len(packages_selection),
package["package_name"],
)
log.info("[%d/%d] Package: %s", i, len(packages_selection),
package["package_name"])
# Sometimes the package path contains multiple lines for multiple
# apks. We loop through each line and download each file.
for package_file in package["files"]:
device_path = package_file["path"]
local_path = self.pull_package_file(
package["package_name"], device_path
)
local_path = self.pull_package_file(package["package_name"],
device_path)
if not local_path:
continue

View File

@@ -1,4 +1,4 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -10,7 +10,6 @@ from .dumpsys_appops import DumpsysAppOps
from .dumpsys_battery_daily import DumpsysBatteryDaily
from .dumpsys_battery_history import DumpsysBatteryHistory
from .dumpsys_dbinfo import DumpsysDBInfo
from .dumpsys_adbstate import DumpsysADBState
from .dumpsys_full import DumpsysFull
from .dumpsys_receivers import DumpsysReceivers
from .files import Files
@@ -24,25 +23,8 @@ from .settings import Settings
from .sms import SMS
from .whatsapp import Whatsapp
ADB_MODULES = [
ChromeHistory,
SMS,
Whatsapp,
Processes,
Getprop,
Settings,
SELinuxStatus,
DumpsysBatteryHistory,
DumpsysBatteryDaily,
DumpsysReceivers,
DumpsysActivities,
DumpsysAccessibility,
DumpsysDBInfo,
DumpsysADBState,
DumpsysFull,
DumpsysAppOps,
Packages,
Logcat,
RootBinaries,
Files,
]
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes, Getprop, Settings,
SELinuxStatus, DumpsysBatteryHistory, DumpsysBatteryDaily,
DumpsysReceivers, DumpsysActivities, DumpsysAccessibility,
DumpsysDBInfo, DumpsysFull, DumpsysAppOps, Packages, Logcat,
RootBinaries, Files]

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -16,20 +16,13 @@ from typing import Callable, Optional
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
from adb_shell.auth.keygen import keygen, write_public_keyfile
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
from adb_shell.exceptions import (
AdbCommandFailureException,
DeviceAuthError,
UsbDeviceNotFoundError,
UsbReadFailedError,
)
from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
UsbDeviceNotFoundError, UsbReadFailedError)
from rich.prompt import Prompt
from usb1 import USBErrorAccess, USBErrorBusy
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
from mvt.android.parsers.backup import (
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
)
from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.common.module import InsufficientPrivileges, MVTModule
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
@@ -44,18 +37,13 @@ class AndroidExtraction(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.device = None
self.serial = None
@@ -90,49 +78,36 @@ class AndroidExtraction(MVTModule):
try:
self.device = AdbDeviceUsb(serial=self.serial)
except UsbDeviceNotFoundError:
self.log.critical(
"No device found. Make sure it is connected and unlocked."
)
self.log.critical("No device found. Make sure it is connected and unlocked.")
sys.exit(-1)
# Otherwise we try to use the TCP transport.
else:
addr = self.serial.split(":")
if len(addr) < 2:
raise ValueError(
"TCP serial number must follow the format: `address:port`"
)
raise ValueError("TCP serial number must follow the format: `address:port`")
self.device = AdbDeviceTcp(
addr[0], int(addr[1]), default_transport_timeout_s=30.0
)
self.device = AdbDeviceTcp(addr[0], int(addr[1]),
default_transport_timeout_s=30.)
while True:
try:
self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
except (USBErrorBusy, USBErrorAccess):
self.log.critical(
"Device is busy, maybe run `adb kill-server` and try again."
)
self.log.critical("Device is busy, maybe run `adb kill-server` and try again.")
sys.exit(-1)
except DeviceAuthError:
self.log.error(
"You need to authorize this computer on the Android device. "
"Retrying in 5 seconds..."
)
self.log.error("You need to authorize this computer on the Android device. "
"Retrying in 5 seconds...")
time.sleep(5)
except UsbReadFailedError:
self.log.error(
"Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again."
)
self.log.error("Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again.")
sys.exit(-1)
except OSError as exc:
if exc.errno == 113 and self.serial:
self.log.critical(
"Unable to connect to the device %s: "
"did you specify the correct IP address?",
self.serial,
)
self.log.critical("Unable to connect to the device %s: "
"did you specify the correct IP address?",
self.serial)
sys.exit(-1)
else:
break
@@ -147,14 +122,14 @@ class AndroidExtraction(MVTModule):
self._adb_disconnect()
self._adb_connect()
def _adb_command(self, command: str, decode: bool = True) -> str:
def _adb_command(self, command: str) -> str:
"""Execute an adb shell command.
:param command: Shell command to execute
:returns: Output of command
"""
return self.device.shell(command, read_timeout_s=200.0, decode=decode)
return self.device.shell(command, read_timeout_s=200.0)
def _adb_check_if_root(self) -> bool:
"""Check if we have a `su` binary on the Android device.
@@ -169,11 +144,9 @@ class AndroidExtraction(MVTModule):
def _adb_root_or_die(self) -> None:
"""Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root():
raise InsufficientPrivileges(
"This module is optionally available "
"in case the device is already rooted."
" Do NOT root your own device!"
)
raise InsufficientPrivileges("This module is optionally available "
"in case the device is already rooted."
" Do NOT root your own device!")
def _adb_command_as_root(self, command):
"""Execute an adb shell command.
@@ -204,7 +177,7 @@ class AndroidExtraction(MVTModule):
remote_path: str,
local_path: str,
progress_callback: Optional[Callable] = None,
retry_root: Optional[bool] = True,
retry_root: Optional[bool] = True
) -> None:
"""Download a file form the device.
@@ -219,48 +192,41 @@ class AndroidExtraction(MVTModule):
self.device.pull(remote_path, local_path, progress_callback)
except AdbCommandFailureException as exc:
if retry_root:
self._adb_download_root(remote_path, local_path, progress_callback)
self._adb_download_root(remote_path, local_path,
progress_callback)
else:
raise Exception(
f"Unable to download file {remote_path}: {exc}"
) from exc
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_download_root(
self,
remote_path: str,
local_path: str,
progress_callback: Optional[Callable] = None,
progress_callback: Optional[Callable] = None
) -> None:
try:
# Check if we have root, if not raise an Exception.
self._adb_root_or_die()
# We generate a random temporary filename.
allowed_chars = (
string.ascii_uppercase + string.ascii_lowercase + string.digits
)
tmp_filename = "tmp_" + "".join(random.choices(allowed_chars, k=10))
allowed_chars = (string.ascii_uppercase
+ string.ascii_lowercase
+ string.digits)
tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10))
# We create a temporary local file.
new_remote_path = f"/sdcard/{tmp_filename}"
# We copy the file from the data folder to /sdcard/.
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if (
cp_output.startswith("cp: ")
and "No such file or directory" in cp_output
):
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
raise Exception(f"Unable to process file {remote_path}: File not found")
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(
f"Unable to process file {remote_path}: Permission denied"
)
raise Exception(f"Unable to process file {remote_path}: Permission denied")
# We download from /sdcard/ to the local temporary file.
# If it doesn't work now, don't try again (retry_root=False)
self._adb_download(
new_remote_path, local_path, progress_callback, retry_root=False
)
self._adb_download(new_remote_path, local_path, progress_callback,
retry_root=False)
# Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}")
@@ -268,7 +234,8 @@ class AndroidExtraction(MVTModule):
except AdbCommandFailureException as exc:
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_process_file(self, remote_path: str, process_routine: Callable) -> None:
def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None:
"""Download a local copy of a file which is only accessible as root.
This is a wrapper around process_routine.
@@ -306,16 +273,8 @@ class AndroidExtraction(MVTModule):
self._adb_command(f"rm -f {new_remote_path}")
def _generate_backup(self, package_name: str) -> bytes:
self.log.info(
"Please check phone and accept Android backup prompt. "
"You may need to set a backup password. \a"
)
if self.module_options.get("backup_password", None):
self.log.warning(
"Backup password already set from command line or environment "
"variable. You should use the same password if enabling encryption!"
)
self.log.info("Please check phone and accept Android backup prompt. "
"You may need to set a backup password. \a")
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over
# the shell transport...
@@ -325,23 +284,19 @@ class AndroidExtraction(MVTModule):
header = parse_ab_header(backup_output)
if not header["backup"]:
self.log.error(
"Extracting SMS via Android backup failed. No valid backup data found."
)
self.log.error("Extracting SMS via Android backup failed. "
"No valid backup data found.")
return None
if header["encryption"] == "none":
return parse_backup_file(backup_output, password=None)
for _ in range(0, 3):
backup_password = prompt_or_load_android_backup_password(
self.log, self.module_options
)
if not backup_password:
# Fail as no backup password loaded for this encrypted backup
self.log.critical("No backup password provided.")
backup_password = Prompt.ask("Enter backup password",
password=True)
try:
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
decrypted_backup_tar = parse_backup_file(backup_output,
backup_password)
return decrypted_backup_tar
except InvalidBackupPassword:
self.log.error("You provided the wrong password! Please try again...")

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -8,7 +8,8 @@ import os
import sqlite3
from typing import Optional, Union
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from .base import AndroidExtraction
@@ -23,18 +24,13 @@ class ChromeHistory(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = []
def serialize(self, record: dict) -> Union[dict, list]:
@@ -43,7 +39,7 @@ class ChromeHistory(AndroidExtraction):
"module": self.__class__.__name__,
"event": "visit",
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, "
f"redirect source: {record['redirect_source']})",
f"redirect source: {record['redirect_source']})"
}
def check_indicators(self) -> None:
@@ -51,9 +47,8 @@ class ChromeHistory(AndroidExtraction):
return
for result in self.results:
if self.indicators.check_url(result["url"]):
if self.indicators.check_domain(result["url"]):
self.detected.append(result)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.
@@ -64,8 +59,7 @@ class ChromeHistory(AndroidExtraction):
assert isinstance(self.results, list) # assert results type for mypy
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute(
"""
cur.execute("""
SELECT
urls.id,
urls.url,
@@ -75,35 +69,31 @@ class ChromeHistory(AndroidExtraction):
FROM urls
JOIN visits ON visits.url = urls.id
ORDER BY visits.visit_time;
"""
)
""")
for item in cur:
self.results.append(
{
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_datetime_to_iso(
convert_chrometime_to_datetime(item[3])
),
"redirect_source": item[4],
}
)
self.results.append({
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_datetime_to_iso(
convert_chrometime_to_datetime(item[3])),
"redirect_source": item[4],
})
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items", len(self.results))
self.log.info("Extracted a total of %d history items",
len(self.results))
def run(self) -> None:
self._adb_connect()
try:
self._adb_process_file(
os.path.join("/", CHROME_HISTORY_PATH), self._parse_db
)
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
self._parse_db)
except Exception as exc:
self.log.error(exc)

View File

@@ -0,0 +1,53 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_accessibility
from .base import AndroidExtraction
class DumpsysAccessibility(AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys accessibility")
self._adb_disconnect()
self.results = parse_dumpsys_accessibility(output)
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info("Identified a total of %d accessibility services",
len(self.results))

View File

@@ -0,0 +1,51 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_activity_resolver_table
from .base import AndroidExtraction
class DumpsysActivities(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, activities in self.results.items():
for activity in activities:
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append({intent: activity})
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self._adb_disconnect()
self.results = parse_dumpsys_activity_resolver_table(output)
self.log.info("Extracted activities for %d intents", len(self.results))

View File

@@ -0,0 +1,73 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.parsers.dumpsys import parse_dumpsys_appops
from .base import AndroidExtraction
class DumpsysAppOps(AndroidExtraction):
"""This module extracts records from App-op Manager."""
slug = "dumpsys_appops"
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
})
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES "
"permission", result["package_name"])
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys appops")
self._adb_disconnect()
self.results = parse_dumpsys_appops(output)
self.log.info("Extracted a total of %d records from app-ops manager",
len(self.results))

View File

@@ -0,0 +1,58 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.parsers import parse_dumpsys_battery_daily
from .base import AndroidExtraction
class DumpsysBatteryDaily(AndroidExtraction):
"""This module extracts records from battery daily updates."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
"event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}"
}
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --daily")
self._adb_disconnect()
self.results = parse_dumpsys_battery_daily(output)
self.log.info("Extracted %d records from battery daily stats",
len(self.results))

View File

@@ -0,0 +1,49 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_battery_history
from .base import AndroidExtraction
class DumpsysBatteryHistory(AndroidExtraction):
"""This module extracts records from battery history events."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys batterystats --history")
self._adb_disconnect()
self.results = parse_dumpsys_battery_history(output)
self.log.info("Extracted %d records from battery history",
len(self.results))

View File

@@ -0,0 +1,53 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_dbinfo
from .base import AndroidExtraction
class DumpsysDBInfo(AndroidExtraction):
"""This module extracts records from battery daily updates."""
slug = "dumpsys_dbinfo"
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
path = result.get("path", "")
for part in path.split("/"):
ioc = self.indicators.check_app_id(part)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys dbinfo")
self._adb_disconnect()
self.results = parse_dumpsys_dbinfo(output)
self.log.info("Extracted a total of %d records from database information",
len(self.results))

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -18,18 +18,13 @@ class DumpsysFull(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
self._adb_connect()

View File

@@ -0,0 +1,73 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
from .base import AndroidExtraction
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
INTENT_PHONE_STATE = "android.intent.action.PHONE_STATE"
INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class DumpsysReceivers(AndroidExtraction):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def run(self) -> None:
self._adb_connect()
output = self._adb_command("dumpsys package")
self.results = parse_dumpsys_receiver_resolver_table(output)
self._adb_disconnect()

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -30,18 +30,13 @@ class Files(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.full_find = False
def serialize(self, record: dict) -> Union[dict, list, None]:
@@ -58,15 +53,12 @@ class Files(AndroidExtraction):
def check_indicators(self) -> None:
for result in self.results:
if result.get("is_suid"):
self.log.warning(
'Found an SUID file in a non-standard directory "%s".',
result["path"],
)
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning(
'Found a known suspicous file at path: "%s"', result["path"]
)
self.log.warning("Found a known suspicous file at path: \"%s\"",
result["path"])
self.detected.append(result)
def backup_file(self, file_path: str) -> None:
@@ -81,13 +73,13 @@ class Files(AndroidExtraction):
local_file_path = os.path.join(local_files_folder, local_file_name)
try:
self._adb_download(remote_path=file_path, local_path=local_file_path)
self._adb_download(remote_path=file_path,
local_path=local_file_path)
except Exception:
pass
else:
self.log.info(
"Downloaded file %s to local copy at %s", file_path, local_file_path
)
self.log.info("Downloaded file %s to local copy at %s",
file_path, local_file_path)
def find_files(self, folder: str) -> None:
assert isinstance(self.results, list)
@@ -100,21 +92,20 @@ class Files(AndroidExtraction):
if len(file_line) < 6:
self.log.info("Skipping invalid file info - %s", file_line.rstrip())
continue
[unix_timestamp, mode, size, owner, group, full_path] = file_info
[unix_timestamp, mode, size,
owner, group, full_path] = file_info
mod_time = convert_unix_to_iso(unix_timestamp)
self.results.append(
{
"path": full_path,
"modified_time": mod_time,
"mode": mode,
"is_suid": (int(mode, 8) & stat.S_ISUID) == 2048,
"is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024,
"size": size,
"owner": owner,
"group": group,
}
)
self.results.append({
"path": full_path,
"modified_time": mod_time,
"mode": mode,
"is_suid": (int(mode, 8) & stat.S_ISUID) == 2048,
"is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024,
"size": size,
"owner": owner,
"group": group,
})
else:
output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
for file_line in output.splitlines():
@@ -132,20 +123,18 @@ class Files(AndroidExtraction):
self.find_files(tmp_folder)
for entry in self.results:
self.log.info("Found file in tmp folder at path %s", entry.get("path"))
self.log.info("Found file in tmp folder at path %s",
entry.get("path"))
self.backup_file(entry.get("path"))
for media_folder in ANDROID_MEDIA_FOLDERS:
self.find_files(media_folder)
self.log.info(
"Found %s files in primary Android tmp and media folders", len(self.results)
)
self.log.info("Found %s files in primary Android tmp and media folders",
len(self.results))
if self.module_options.get("fast_mode", None):
self.log.info(
"The `fast_mode` option was enabled: skipping full file listing"
)
if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing")
else:
self.log.info("Processing full file listing. This may take a while...")
self.find_files("/")

View File

@@ -0,0 +1,61 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from datetime import datetime, timedelta
from typing import Optional
from mvt.android.parsers import parse_getprop
from .base import AndroidExtraction
class Getprop(AndroidExtraction):
"""This module extracts device properties from getprop command."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_android_property_name(result.get("name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("getprop")
self._adb_disconnect()
self.results = parse_getprop(output)
# Alert if phone is outdated.
for entry in self.results:
if entry.get("name", "") != "ro.build.version.security_patch":
continue
patch_date = datetime.strptime(entry["value"], "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning("This phone has not received security updates "
"for more than six months (last update: %s)",
entry["value"])
self.log.info("Extracted %d Android system properties",
len(self.results))

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -18,40 +18,37 @@ class Logcat(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
self._adb_connect()
# Get the current logcat.
output = self._adb_command('logcat -d -b all "*:V"')
output = self._adb_command("logcat -d -b all \"*:V\"")
# Get the locat prior to last reboot.
last_output = self._adb_command('logcat -L -b all "*:V"')
last_output = self._adb_command("logcat -L -b all \"*:V\"")
if self.results_path:
logcat_path = os.path.join(self.results_path, "logcat.txt")
logcat_path = os.path.join(self.results_path,
"logcat.txt")
with open(logcat_path, "w", encoding="utf-8") as handle:
handle.write(output)
self.log.info("Current logcat logs stored at %s", logcat_path)
self.log.info("Current logcat logs stored at %s",
logcat_path)
logcat_last_path = os.path.join(self.results_path, "logcat_last.txt")
logcat_last_path = os.path.join(self.results_path,
"logcat_last.txt")
with open(logcat_last_path, "w", encoding="utf-8") as handle:
handle.write(last_output)
self.log.info(
"Logcat logs prior to last reboot stored at %s", logcat_last_path
)
self.log.info("Logcat logs prior to last reboot stored at %s",
logcat_last_path)
self._adb_disconnect()

View File

@@ -1,28 +1,89 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import List, Optional, Union
from rich.console import Console
from rich.progress import track
from rich.table import Table
from rich.text import Text
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
from mvt.android.utils import (
DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES,
SECURITY_PACKAGES,
SYSTEM_UPDATE_PACKAGES,
)
from mvt.android.parsers.dumpsys import parse_dumpsys_package_for_details
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
from .base import AndroidExtraction
DANGEROUS_PERMISSIONS_THRESHOLD = 10
DANGEROUS_PERMISSIONS = [
"android.permission.ACCESS_COARSE_LOCATION",
"android.permission.ACCESS_FINE_LOCATION",
"android.permission.AUTHENTICATE_ACCOUNTS",
"android.permission.CAMERA",
"android.permission.DISABLE_KEYGUARD",
"android.permission.PROCESS_OUTGOING_CALLS",
"android.permission.READ_CALENDAR",
"android.permission.READ_CALL_LOG",
"android.permission.READ_CONTACTS",
"android.permission.READ_PHONE_STATE",
"android.permission.READ_SMS",
"android.permission.RECEIVE_MMS",
"android.permission.RECEIVE_SMS",
"android.permission.RECEIVE_WAP_PUSH",
"android.permission.RECORD_AUDIO",
"android.permission.SEND_SMS",
"android.permission.SYSTEM_ALERT_WINDOW",
"android.permission.USE_CREDENTIALS",
"android.permission.USE_SIP",
"com.android.browser.permission.READ_HISTORY_BOOKMARKS",
]
ROOT_PACKAGES: List[str] = [
"com.noshufou.android.su",
"com.noshufou.android.su.elite",
"eu.chainfire.supersu",
"com.koushikdutta.superuser",
"com.thirdparty.superuser",
"com.yellowes.su",
"com.koushikdutta.rommanager",
"com.koushikdutta.rommanager.license",
"com.dimonvideo.luckypatcher",
"com.chelpus.lackypatch",
"com.ramdroid.appquarantine",
"com.ramdroid.appquarantinepro",
"com.devadvance.rootcloak",
"com.devadvance.rootcloakplus",
"de.robv.android.xposed.installer",
"com.saurik.substrate",
"com.zachspong.temprootremovejb",
"com.amphoras.hidemyroot",
"com.amphoras.hidemyrootadfree",
"com.formyhm.hiderootPremium",
"com.formyhm.hideroot",
"me.phh.superuser",
"eu.chainfire.supersu.pro",
"com.kingouser.com",
"com.topjohnwu.magisk",
]
SECURITY_PACKAGES = [
"com.policydm",
"com.samsung.android.app.omcagent",
"com.samsung.android.securitylogagent",
"com.sec.android.soagent",
]
SYSTEM_UPDATE_PACKAGES = [
"com.android.updater",
"com.google.android.gms",
"com.huawei.android.hwouc",
"com.lge.lgdmsclient",
"com.motorola.ccc.ota",
"com.oneplus.opbackup",
"com.oppo.ota",
"com.transsion.systemupdate",
"com.wssyncmldm",
]
class Packages(AndroidExtraction):
"""This module extracts the list of installed packages."""
@@ -32,65 +93,59 @@ class Packages(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
self._user_needed = False
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]},
{
"event": "package_install",
"timestamp": record["timestamp"]
},
{
"event": "package_first_install",
"timestamp": record["first_install_time"],
"timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
},
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for timestamp in timestamps:
records.append(
{
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']},"
f" third party: {record['third_party']})",
}
)
records.append({
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']},"
f" third party: {record['third_party']})",
})
return records
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning(
'Found an installed package related to rooting/jailbreaking: "%s"',
result["package_name"],
)
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.detected.append(result)
continue
if result["package_name"] in SECURITY_PACKAGES and result["disabled"]:
self.log.warning(
'Found a security package disabled: "%s"', result["package_name"]
)
self.log.warning("Found a security package disabled: \"%s\"",
result["package_name"])
if result["package_name"] in SYSTEM_UPDATE_PACKAGES and result["disabled"]:
self.log.warning(
'System OTA update package "%s" disabled on the phone',
result["package_name"],
)
self.log.warning("System OTA update package \"%s\" disabled on the phone",
result["package_name"])
if not self.indicators:
continue
@@ -172,15 +227,10 @@ class Packages(AndroidExtraction):
if line.strip() == "Packages:":
in_packages = True
return DumpsysPackagesArtifact.parse_dumpsys_package_for_details(
"\n".join(lines)
)
return parse_dumpsys_package_for_details("\n".join(lines))
def _get_files_for_package(self, package_name: str) -> list:
command = f"pm path {package_name}"
if self._user_needed:
command += " --user 0"
output = self._adb_command(command)
output = self._adb_command(f"pm path {package_name}")
output = output.strip().replace("package:", "")
if not output:
return []
@@ -189,24 +239,22 @@ class Packages(AndroidExtraction):
for file_path in output.splitlines():
file_path = file_path.strip()
md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[
0
]
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[
0
]
md5 = self._adb_command(
f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(
f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(
f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
sha512 = self._adb_command(
f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
package_files.append(
{
"path": file_path,
"md5": md5,
"sha1": sha1,
"sha256": sha256,
"sha512": sha512,
}
)
package_files.append({
"path": file_path,
"md5": md5,
"sha1": sha1,
"sha256": sha256,
"sha512": sha512,
})
return package_files
@@ -214,9 +262,6 @@ class Packages(AndroidExtraction):
self._adb_connect()
packages = self._adb_command("pm list packages -u -i -f")
if "java.lang.SecurityException" in packages or packages.strip() == "":
self._user_needed = True
packages = self._adb_command("pm list packages -u -i -f --user 0")
for line in packages.splitlines():
line = line.strip()
@@ -245,7 +290,8 @@ class Packages(AndroidExtraction):
"files": package_files,
}
dumpsys_package = self._adb_command(f"dumpsys package {package_name}")
dumpsys_package = self._adb_command(
f"dumpsys package {package_name}")
package_details = self.parse_package_for_details(dumpsys_package)
new_package.update(package_details)
@@ -257,10 +303,7 @@ class Packages(AndroidExtraction):
{"field": "third_party", "arg": "-3"},
]
for cmd in cmds:
command = f"pm list packages {cmd['arg']}"
if self._user_needed:
command += " --user 0"
output = self._adb_command(command)
output = self._adb_command(f"pm list packages {cmd['arg']}")
for line in output.splitlines():
line = line.strip()
if not line.startswith("package:"):
@@ -281,12 +324,10 @@ class Packages(AndroidExtraction):
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info(
'Third-party package "%s" requested %d '
"potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count,
)
self.log.info("Third-party package \"%s\" requested %d "
"potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
packages_to_lookup = []
for result in self.results:
@@ -294,18 +335,14 @@ class Packages(AndroidExtraction):
continue
packages_to_lookup.append(result)
self.log.info(
'Found non-system package with name "%s" installed by "%s" on %s',
result["package_name"],
result["installer"],
result["timestamp"],
)
self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s",
result["package_name"], result["installer"],
result["timestamp"])
if not self.module_options.get("fast_mode", None):
if not self.fast_mode:
self.check_virustotal(packages_to_lookup)
self.log.info(
"Extracted at total of %d installed package names", len(self.results)
)
self.log.info("Extracted at total of %d installed package names",
len(self.results))
self._adb_disconnect()

View File

@@ -0,0 +1,86 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from .base import AndroidExtraction
class Processes(AndroidExtraction):
"""This module extracts details on running processes."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
proc_name = result.get("proc_name", "")
if not proc_name:
continue
# Skipping this process because of false positives.
if result["proc_name"] == "gatekeeperd":
continue
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
self._adb_connect()
output = self._adb_command("ps -A")
for line in output.splitlines()[1:]:
line = line.strip()
if line == "":
continue
fields = line.split()
proc = {
"user": fields[0],
"pid": fields[1],
"parent_pid": fields[2],
"vsize": fields[3],
"rss": fields[4],
}
# Sometimes WCHAN is empty, so we need to re-align output fields.
if len(fields) == 8:
proc["wchan"] = ""
proc["pc"] = fields[5]
proc["name"] = fields[7]
elif len(fields) == 9:
proc["wchan"] = fields[5]
proc["pc"] = fields[6]
proc["name"] = fields[8]
self.results.append(proc)
self._adb_disconnect()
self.log.info("Extracted records on a total of %d processes",
len(self.results))

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -17,23 +17,13 @@ class RootBinaries(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
def check_indicators(self) -> None:
for root_binary in self.results:
self.detected.append(root_binary)
self.log.warning('Found root binary "%s"', root_binary)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def run(self) -> None:
root_binaries = [
@@ -65,6 +55,7 @@ class RootBinaries(AndroidExtraction):
if "which: not found" in output:
continue
self.results.append(root_binary)
self.detected.append(root_binary)
self.log.warning("Found root binary \"%s\"", root_binary)
self._adb_disconnect()

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -19,18 +19,13 @@ class SELinuxStatus(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
@@ -45,4 +40,4 @@ class SELinuxStatus(AndroidExtraction):
if status == "enforcing":
self.log.info("SELinux is being regularly enforced")
else:
self.log.warning('SELinux status is "%s"!', status)
self.log.warning("SELinux status is \"%s\"!", status)

View File

@@ -1,9 +1,12 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .artifact import AndroidArtifact
import logging
from typing import Optional
from .base import AndroidExtraction
ANDROID_DANGEROUS_SETTINGS = [
{
@@ -16,11 +19,6 @@ ANDROID_DANGEROUS_SETTINGS = [
"key": "package_verifier_enable",
"safe_value": "1",
},
{
"description": "disabled APK package verification",
"key": "package_verifier_state",
"safe_value": "1",
},
{
"description": "disabled Google Play Protect",
"key": "package_verifier_user_consent",
@@ -52,26 +50,61 @@ ANDROID_DANGEROUS_SETTINGS = [
"safe_value": "1",
},
{
"description": "enabled accessibility services",
"key": "accessibility_enabled",
"description": "enabled installation of non Google Play apps",
"key": "install_non_market_apps",
"safe_value": "0",
},
}
]
class Settings(AndroidArtifact):
class Settings(AndroidExtraction):
"""This module extracts Android system settings."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def check_indicators(self) -> None:
for namespace, settings in self.results.items():
for _, settings in self.results.items():
for key, value in settings.items():
for danger in ANDROID_DANGEROUS_SETTINGS:
# Check if one of the dangerous settings is using an unsafe
# value (different than the one specified).
if danger["key"] == key and danger["safe_value"] != value:
self.log.warning(
'Found suspicious "%s" setting "%s = %s" (%s)',
namespace,
key,
value,
danger["description"],
)
self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
key, value, danger["description"])
break
def run(self) -> None:
self._adb_connect()
for namespace in ["system", "secure", "global"]:
out = self._adb_command(f"cmd settings list {namespace}")
if not out:
continue
self.results[namespace] = {}
for line in out.splitlines():
line = line.strip()
if line == "":
continue
fields = line.split("=", 1)
try:
self.results[namespace][fields[0]] = fields[1]
except IndexError:
continue
self._adb_disconnect()

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -8,7 +8,8 @@ import os
import sqlite3
from typing import Optional, Union
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
from mvt.android.parsers.backup import (AndroidBackupParsingError,
parse_tar_for_sms)
from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_unix_to_iso
@@ -49,18 +50,13 @@ class SMS(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.sms_db_type = 0
@@ -70,7 +66,7 @@ class SMS(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"sms_{record['direction']}",
"data": f'{record.get("address", "unknown source")}: "{body}"',
"data": f"{record.get('address', 'unknown source')}: \"{body}\""
}
def check_indicators(self) -> None:
@@ -85,9 +81,8 @@ class SMS(AndroidExtraction):
if message_links == []:
message_links = check_for_links(message["body"])
if self.indicators.check_urls(message_links):
if self.indicators.check_domains(message_links):
self.detected.append(message)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.
@@ -110,21 +105,20 @@ class SMS(AndroidExtraction):
for index, value in enumerate(item):
message[names[index]] = value
message["direction"] = "received" if message["incoming"] == 1 else "sent"
message["direction"] = ("received" if message["incoming"] == 1 else "sent")
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# Extract links in the message body
body = message.get("body", None)
if body:
links = check_for_links(message["body"])
message["links"] = links
links = check_for_links(message["body"])
message["links"] = links
self.results.append(message)
cur.close()
conn.close()
self.log.info("Extracted a total of %d SMS messages", len(self.results))
self.log.info("Extracted a total of %d SMS messages",
len(self.results))
def _extract_sms_adb(self) -> None:
"""Use the Android backup command to extract SMS data from the native
@@ -141,14 +135,13 @@ class SMS(AndroidExtraction):
try:
self.results = parse_tar_for_sms(backup_tar)
except AndroidBackupParsingError:
self.log.info(
"Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor"
)
self.log.info("Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor")
return
self.log.info("Extracted a total of %d SMS messages", len(self.results))
self.log.info("Extracted a total of %d SMS messages",
len(self.results))
def run(self) -> None:
self._adb_connect()
@@ -156,24 +149,20 @@ class SMS(AndroidExtraction):
try:
if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)):
self.sms_db_type = 1
self._adb_process_file(
os.path.join("/", SMS_BUGLE_PATH), self._parse_db
)
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH),
self._parse_db)
elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)):
self.sms_db_type = 2
self._adb_process_file(
os.path.join("/", SMS_MMSSMS_PATH), self._parse_db
)
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH),
self._parse_db)
self._adb_disconnect()
return
except InsufficientPrivileges:
pass
self.log.info(
"No SMS database found. Trying extraction of SMS data "
"using Android backup feature."
)
self.log.info("No SMS database found. Trying extraction of SMS data "
"using Android backup feature.")
self._extract_sms_adb()
self._adb_disconnect()

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -24,18 +24,13 @@ class Whatsapp(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
text = record["data"].replace("\n", "\\n")
@@ -43,7 +38,7 @@ class Whatsapp(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"whatsapp_msg_{record['direction']}",
"data": f'"{text}"',
"data": f"\"{text}\""
}
def check_indicators(self) -> None:
@@ -55,9 +50,8 @@ class Whatsapp(AndroidExtraction):
continue
message_links = check_for_links(message["data"])
if self.indicators.check_urls(message_links):
if self.indicators.check_domains(message_links):
self.detected.append(message)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse an Android msgstore.db WhatsApp database file.
@@ -67,11 +61,9 @@ class Whatsapp(AndroidExtraction):
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute(
"""
cur.execute("""
SELECT * FROM messages;
"""
)
""")
names = [description[0] for description in cur.description]
messages = []
@@ -83,30 +75,32 @@ class Whatsapp(AndroidExtraction):
if not message["data"]:
continue
message["direction"] = "send" if message["key_from_me"] == 1 else "received"
message["direction"] = ("send" if message["key_from_me"] == 1 else "received")
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add them
# to the list.
if check_for_links(message["data"]) or message["data"].strip() == "":
if (check_for_links(message["data"])
or message["data"].strip() == ""):
if message.get("thumb_image"):
message["thumb_image"] = base64.b64encode(message["thumb_image"])
message["thumb_image"] = base64.b64encode(
message["thumb_image"])
messages.append(message)
cur.close()
conn.close()
self.log.info(
"Extracted a total of %d WhatsApp messages containing links", len(messages)
)
self.log.info("Extracted a total of %d WhatsApp messages containing links",
len(messages))
self.results = messages
def run(self) -> None:
self._adb_connect()
try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db)
self._adb_process_file(os.path.join("/", WHATSAPP_PATH),
self._parse_db)
except Exception as exc:
self.log.error(exc)

View File

@@ -0,0 +1,18 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .dumpsys_accessibility import DumpsysAccessibility
from .dumpsys_activities import DumpsysActivities
from .dumpsys_appops import DumpsysAppops
from .dumpsys_packages import DumpsysPackages
from .dumpsys_receivers import DumpsysReceivers
from .getprop import Getprop
from .processes import Processes
from .settings import Settings
from .sms import SMS
ANDROIDQF_MODULES = [DumpsysActivities, DumpsysReceivers, DumpsysAccessibility,
DumpsysAppops, Processes, Getprop, Settings, SMS,
DumpsysPackages]

View File

@@ -0,0 +1,38 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import fnmatch
import logging
import os
from typing import Any, Dict, List, Optional, Union
from mvt.common.module import MVTModule
class AndroidQFModule(MVTModule):
"""This class provides a base for all Android Data analysis modules."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self._path = target_path
self._files = []
for root, dirs, files in os.walk(target_path):
for name in files:
self._files.append(os.path.join(root, name))
def _get_files_by_pattern(self, pattern):
return fnmatch.filter(self._files, pattern)

View File

@@ -0,0 +1,68 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_accessibility
from .base import AndroidQFModule
class DumpsysAccessibility(AndroidQFModule):
"""This module analyse dumpsys accessbility"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
lines = []
in_accessibility = False
with open(dumpsys_file[0]) as handle:
for line in handle:
if line.strip().startswith("DUMP OF SERVICE accessibility:"):
in_accessibility = True
continue
if not in_accessibility:
continue
if line.strip().startswith("-------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_accessibility("\n".join(lines))
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info("Identified a total of %d accessibility services",
len(self.results))

View File

@@ -0,0 +1,66 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_activity_resolver_table
from .base import AndroidQFModule
class DumpsysActivities(AndroidQFModule):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, activities in self.results.items():
for activity in activities:
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append({intent: activity})
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
lines = []
in_package = False
with open(dumpsys_file[0]) as handle:
for line in handle:
if line.strip() == "DUMP OF SERVICE package:":
in_package = True
continue
if not in_package:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_activity_resolver_table("\n".join(lines))
self.log.info("Extracted activities for %d intents", len(self.results))

View File

@@ -0,0 +1,83 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.parsers import parse_dumpsys_appops
from .base import AndroidQFModule
class DumpsysAppops(AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']} : {entry['access']}",
})
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"])
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
lines = []
in_package = False
with open(dumpsys_file[0]) as handle:
for line in handle:
if line.startswith("DUMP OF SERVICE appops:"):
in_package = True
continue
if in_package:
if line.startswith("-------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_appops("\n".join(lines))
self.log.info("Identified %d applications in AppOps Manager",
len(self.results))

View File

@@ -0,0 +1,106 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Any, Dict, List, Optional, Union
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from mvt.android.parsers.dumpsys import parse_dumpsys_packages
from .base import AndroidQFModule
class DumpsysPackages(AndroidQFModule):
"""This module analyse dumpsys packages"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[List[Dict[str, Any]]] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
entries = []
for entry in ["timestamp", "first_install_time", "last_update_time"]:
if entry in record:
entries.append({
"timestamp": record[entry],
"module": self.__class__.__name__,
"event": entry,
"data": f"Package {record['package_name']} "
f"({record['uid']})",
})
return entries
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.detected.append(result)
continue
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if len(dumpsys_file) != 1:
self.log.info("Dumpsys file not found")
return
with open(dumpsys_file[0]) as handle:
data = handle.read().split("\n")
package = []
in_service = False
in_package_list = False
for line in data:
if line.strip().startswith("DUMP OF SERVICE package:"):
in_service = True
continue
if in_service and line.startswith("Packages:"):
in_package_list = True
continue
if not in_service or not in_package_list:
continue
if line.strip() == "":
break
package.append(line)
self.results = parse_dumpsys_packages("\n".join(package))
for result in self.results:
dangerous_permissions_count = 0
for perm in result["permissions"]:
if perm["name"] in DANGEROUS_PERMISSIONS:
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -0,0 +1,86 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Any, Dict, List, Optional, Union
from mvt.android.modules.adb.dumpsys_receivers import (
INTENT_DATA_SMS_RECEIVED, INTENT_NEW_OUTGOING_CALL,
INTENT_NEW_OUTGOING_SMS, INTENT_PHONE_STATE, INTENT_SMS_RECEIVED)
from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
from .base import AndroidQFModule
class DumpsysReceivers(AndroidQFModule):
"""This module analyse dumpsys receivers"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Any], Dict[str, Any], None] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
if not dumpsys_file:
return
in_receivers = False
lines = []
with open(dumpsys_file[0]) as handle:
for line in handle:
if line.strip() == "DUMP OF SERVICE package:":
in_receivers = True
continue
if not in_receivers:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_receiver_resolver_table("\n".join(lines))
self.log.info("Extracted receivers for %d intents", len(self.results))

View File

@@ -0,0 +1,76 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from datetime import datetime, timedelta
from typing import Optional
from mvt.android.parsers.getprop import parse_getprop
from .base import AndroidQFModule
INTERESTING_PROPERTIES = [
"gsm.sim.operator.alpha",
"gsm.sim.operator.iso-country",
"persist.sys.timezone",
"ro.boot.serialno",
"ro.build.version.sdk",
"ro.build.version.security_patch",
"ro.product.cpu.abi",
"ro.product.locale",
"ro.product.vendor.manufacturer",
"ro.product.vendor.model",
"ro.product.vendor.name"
]
class Getprop(AndroidQFModule):
"""This module extracts data from get properties."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = []
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_android_property_name(result.get("name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
getprop_files = self._get_files_by_pattern("*/getprop.txt")
if not getprop_files:
self.log.info("getprop.txt file not found")
return
with open(getprop_files[0]) as f:
data = f.read()
self.results = parse_getprop(data)
for entry in self.results:
if entry["name"] in INTERESTING_PROPERTIES:
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
last_patch = datetime.strptime(entry["value"], "%Y-%m-%d")
if (datetime.now() - last_patch) > timedelta(days=6*31):
self.log.warning("This phone has not received security "
"updates for more than six months "
"(last update: %s)", entry["value"])
self.log.info("Extracted a total of %d properties", len(self.results))

View File

@@ -0,0 +1,92 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from .base import AndroidQFModule
class Processes(AndroidQFModule):
"""This module analyse running processes"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
proc_name = result.get("proc_name", "")
if not proc_name:
continue
# Skipping this process because of false positives.
if result["proc_name"] == "gatekeeperd":
continue
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _parse_ps(self, data):
for line in data.split("\n")[1:]:
proc = line.split()
# Sometimes WCHAN is empty.
if len(proc) == 8:
proc = proc[:5] + [''] + proc[5:]
# Sometimes there is the security label.
if proc[0].startswith("u:r"):
label = proc[0]
proc = proc[1:]
else:
label = ""
# Sometimes there is no WCHAN.
if len(proc) < 9:
proc = proc[:5] + [""] + proc[5:]
self.results.append({
"user": proc[0],
"pid": int(proc[1]),
"ppid": int(proc[2]),
"virtual_memory_size": int(proc[3]),
"resident_set_size": int(proc[4]),
"wchan": proc[5],
"aprocress": proc[6],
"stat": proc[7],
"proc_name": proc[8].strip("[]"),
"label": label,
})
def run(self) -> None:
ps_files = self._get_files_by_pattern("*/ps.txt")
if not ps_files:
return
with open(ps_files[0]) as handle:
self._parse_ps(handle.read())
self.log.info("Identified %d running processes", len(self.results))

View File

@@ -0,0 +1,58 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.modules.adb.settings import ANDROID_DANGEROUS_SETTINGS
from .base import AndroidQFModule
class Settings(AndroidQFModule):
"""This module analyse setting files"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {}
def run(self) -> None:
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
namespace = setting_file[setting_file.rfind("_")+1:-4]
self.results[namespace] = {}
with open(setting_file) as handle:
for line in handle:
line = line.strip()
try:
key, value = line.split("=", 1)
except ValueError:
continue
try:
self.results[namespace][key] = value
except IndexError:
continue
for danger in ANDROID_DANGEROUS_SETTINGS:
if (danger["key"] == key
and danger["safe_value"] != value):
self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
key, value, danger["description"])
break
self.log.info("Identified %d settings",
sum([len(val) for val in self.results.values()]))

View File

@@ -1,19 +1,15 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1
import getpass
import logging
from typing import Optional
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
parse_tar_for_sms,
)
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file, parse_tar_for_sms)
from .base import AndroidQFModule
@@ -26,18 +22,13 @@ class SMS(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
@@ -58,23 +49,15 @@ class SMS(AndroidQFModule):
password = None
if header["encryption"] != "none":
password = prompt_or_load_android_backup_password(
self.log, self.module_options
)
if not password:
self.log.critical("No backup password provided.")
return
password = getpass.getpass(prompt="Backup Password: ", stream=None)
try:
tardata = parse_backup_file(data, password=password)
except InvalidBackupPassword:
self.log.critical("Invalid backup password")
return
except AndroidBackupParsingError:
self.log.critical(
"Impossible to parse this backup file, please use"
" Android Backup Extractor instead"
)
self.log.critical("Impossible to parse this backup file, please use"
" Android Backup Extractor instead")
return
if not tardata:
@@ -83,11 +66,9 @@ class SMS(AndroidQFModule):
try:
self.results = parse_tar_for_sms(tardata)
except AndroidBackupParsingError:
self.log.info(
"Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor"
)
self.log.info("Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor")
return
def run(self) -> None:
@@ -96,5 +77,9 @@ class SMS(AndroidQFModule):
self.log.info("No backup data found")
return
self.parse_backup(self._get_file_content(files[0]))
self.log.info("Identified %d SMS in backup data", len(self.results))
with open(files[0], "rb") as handle:
data = handle.read()
self.parse_backup(data)
self.log.info("Identified %d SMS in backup data",
len(self.results))

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -20,18 +20,13 @@ class BackupExtraction(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.ab = None
self.backup_path = None
self.tar = None
@@ -44,9 +39,7 @@ class BackupExtraction(MVTModule):
self.backup_path = backup_path
self.files = files
def from_ab(
self, file_path: Optional[str], tar: Optional[TarFile], files: List[str]
) -> None:
def from_ab(self, file_path: Optional[str], tar: Optional[TarFile], files: List[str]) -> None:
"""
Extract the files
"""

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -12,23 +12,19 @@ from mvt.common.utils import check_for_links
class SMS(BackupExtraction):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = []
def check_indicators(self) -> None:
@@ -43,9 +39,8 @@ class SMS(BackupExtraction):
if message_links == []:
message_links = check_for_links(message.get("text", ""))
if self.indicators.check_urls(message_links):
if self.indicators.check_domains(message_links):
self.detected.append(message)
continue
def run(self) -> None:
sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup"
@@ -60,4 +55,5 @@ class SMS(BackupExtraction):
data = self._get_file_content(file)
self.results.extend(parse_sms_file(data))
self.log.info("Extracted a total of %d SMS & MMS messages", len(self.results))
self.log.info("Extracted a total of %d SMS & MMS messages",
len(self.results))

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -11,24 +11,7 @@ from .battery_history import BatteryHistory
from .dbinfo import DBInfo
from .getprop import Getprop
from .packages import Packages
from .platform_compat import PlatformCompat
from .receivers import Receivers
from .adb_state import DumpsysADBState
from .fs_timestamps import BugReportTimestamps
from .tombstones import Tombstones
BUGREPORT_MODULES = [
Accessibility,
Activities,
Appops,
BatteryDaily,
BatteryHistory,
DBInfo,
Getprop,
Packages,
PlatformCompat,
Receivers,
DumpsysADBState,
BugReportTimestamps,
Tombstones,
]
BUGREPORT_MODULES = [Accessibility, Activities, Appops, BatteryDaily,
BatteryHistory, DBInfo, Getprop, Packages, Receivers]

View File

@@ -0,0 +1,69 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_accessibility
from .base import BugReportModule
class Accessibility(BugReportModule):
"""This module extracts stats on accessibility."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_accessibility = False
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE accessibility:":
in_accessibility = True
continue
if not in_accessibility:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_accessibility("\n".join(lines))
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info("Identified a total of %d accessibility services",
len(self.results))

View File

@@ -0,0 +1,68 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_activity_resolver_table
from .base import BugReportModule
class Activities(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, activities in self.results.items():
for activity in activities:
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append({intent: activity})
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_package = False
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE package:":
in_package = True
continue
if not in_package:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_activity_resolver_table("\n".join(lines))
self.log.info("Extracted activities for %d intents", len(self.results))

View File

@@ -0,0 +1,88 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.parsers import parse_dumpsys_appops
from .base import BugReportModule
class Appops(BugReportModule):
"""This module extracts information on package from App-Ops Manager."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for perm in record["permissions"]:
if "entries" not in perm:
continue
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
})
return records
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"])
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_appops = False
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE appops:":
in_appops = True
continue
if not in_appops:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_appops("\n".join(lines))
self.log.info("Identified a total of %d packages in App-Ops Manager",
len(self.results))

View File

@@ -1,12 +1,11 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# See the file 'LICENSE' for usage and copying permissions, or find a copy at
# https://github.com/mvt-project/mvt/blob/main/LICENSE
import datetime
import fnmatch
import logging
import os
from typing import List, Optional
from zipfile import ZipFile
@@ -21,27 +20,20 @@ class BugReportModule(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: Optional[list] = None
) -> None:
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
module_options=module_options,
log=log,
results=results,
)
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.zip_archive: Optional[ZipFile] = None
self.extract_path: Optional[str] = None
self.extract_files: List[str] = []
self.zip_files: List[str] = []
def from_folder(
self, extract_path: Optional[str], extract_files: List[str]
) -> None:
def from_folder(self, extract_path: Optional[str], extract_files: List[str]) -> None:
self.extract_path = extract_path
self.extract_files = extract_files
@@ -93,10 +85,4 @@ class BugReportModule(MVTModule):
return self._get_file_content(dumpstate_logs[0])
def _get_file_modification_time(self, file_path: str) -> dict:
if self.zip_archive:
file_timetuple = self.zip_archive.getinfo(file_path).date_time
return datetime.datetime(*file_timetuple)
else:
file_stat = os.stat(os.path.join(self.extract_path, file_path))
return datetime.datetime.fromtimestamp(file_stat.st_mtime)
return None

View File

@@ -0,0 +1,84 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.parsers import parse_dumpsys_battery_daily
from .base import BugReportModule
class BatteryDaily(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
"event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}"
}
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_batterystats = False
in_daily = False
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE batterystats:":
in_batterystats = True
continue
if not in_batterystats:
continue
if line.strip() == "Daily stats:":
lines.append(line)
in_daily = True
continue
if not in_daily:
continue
if line.strip() == "":
break
lines.append(line)
self.results = parse_dumpsys_battery_daily("\n".join(lines))
self.log.info("Extracted a total of %d battery daily stats",
len(self.results))

View File

@@ -0,0 +1,67 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_battery_history
from .base import BugReportModule
class BatteryHistory(BugReportModule):
"""This module extracts records from battery daily updates."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_history = False
for line in content.decode(errors="ignore").splitlines():
if line.strip().startswith("Battery History "):
lines.append(line)
in_history = True
continue
if not in_history:
continue
if line.strip() == "":
break
lines.append(line)
self.results = parse_dumpsys_battery_history("\n".join(lines))
self.log.info("Extracted a total of %d battery history records",
len(self.results))

View File

@@ -0,0 +1,70 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_dbinfo
from .base import BugReportModule
class DBInfo(BugReportModule):
"""This module extracts records from battery daily updates."""
slug = "dbinfo"
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def check_indicators(self) -> None:
if not self.indicators:
return
for result in self.results:
path = result.get("path", "")
for part in path.split("/"):
ioc = self.indicators.check_app_id(part)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
in_dbinfo = False
lines = []
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE dbinfo:":
in_dbinfo = True
continue
if not in_dbinfo:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_dbinfo("\n".join(lines))
self.log.info("Extracted a total of %d database connection pool records",
len(self.results))

View File

@@ -0,0 +1,69 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from datetime import datetime, timedelta
from typing import Optional
from mvt.android.parsers import parse_getprop
from .base import BugReportModule
class Getprop(BugReportModule):
"""This module extracts device properties from getprop command."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = {} if not results else results
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
lines = []
in_getprop = False
for line in content.decode(errors="ignore").splitlines():
if line.strip().startswith("------ SYSTEM PROPERTIES"):
in_getprop = True
continue
if not in_getprop:
continue
if line.strip() == "------":
break
lines.append(line)
self.results = parse_getprop("\n".join(lines))
# Alert if phone is outdated.
for entry in self.results:
if entry["name"] == "ro.build.version.security_patch":
security_patch = entry["value"]
patch_date = datetime.strptime(security_patch, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning("This phone has not received security updates "
"for more than six months (last update: %s)",
security_patch)
self.log.info("Extracted %d Android system properties",
len(self.results))

View File

@@ -0,0 +1,122 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from mvt.android.parsers.dumpsys import parse_dumpsys_packages
from .base import BugReportModule
class Packages(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
timestamps = [
{
"event": "package_install",
"timestamp": record["timestamp"]
},
{
"event": "package_first_install",
"timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
},
]
for timestamp in timestamps:
records.append({
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"Install or update of package {record['package_name']}",
})
return records
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.detected.append(result)
continue
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
in_package = False
in_packages_list = False
lines = []
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE package:":
in_package = True
continue
if not in_package:
continue
if line.strip() == "Packages:":
in_packages_list = True
continue
if not in_packages_list:
continue
if line.strip() == "":
break
lines.append(line)
self.results = parse_dumpsys_packages("\n".join(lines))
for result in self.results:
dangerous_permissions_count = 0
for perm in result["permissions"]:
if perm["name"] in DANGEROUS_PERMISSIONS:
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -0,0 +1,91 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
from .base import BugReportModule
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
INTENT_PHONE_STATE = "android.intent.action.PHONE_STATE"
INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
class Receivers(BugReportModule):
"""This module extracts details on receivers for risky activities."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
self.results = results if results else {}
def check_indicators(self) -> None:
if not self.indicators:
return
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
continue
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
return
in_receivers = False
lines = []
for line in content.decode(errors="ignore").splitlines():
if line.strip() == "DUMP OF SERVICE package:":
in_receivers = True
continue
if not in_receivers:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_receiver_resolver_table("\n".join(lines))
self.log.info("Extracted receivers for %d intents", len(self.results))

View File

@@ -0,0 +1,11 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .dumpsys import (parse_dumpsys_accessibility,
parse_dumpsys_activity_resolver_table,
parse_dumpsys_appops, parse_dumpsys_battery_daily,
parse_dumpsys_battery_history, parse_dumpsys_dbinfo,
parse_dumpsys_receiver_resolver_table)
from .getprop import parse_getprop

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -31,16 +31,15 @@ class InvalidBackupPassword(AndroidBackupParsingError):
# TODO: Need to clean all the following code and conform it to the coding style.
def to_utf8_bytes(input_bytes):
output = []
for byte in input_bytes:
if byte < ord(b"\x80"):
if byte < ord(b'\x80'):
output.append(byte)
else:
output.append(ord("\xef") | (byte >> 12))
output.append(ord("\xbc") | ((byte >> 6) & ord("\x3f")))
output.append(ord("\x80") | (byte & ord("\x3f")))
output.append(ord('\xef') | (byte >> 12))
output.append(ord('\xbc') | ((byte >> 6) & ord('\x3f')))
output.append(ord('\x80') | (byte & ord('\x3f')))
return bytes(output)
@@ -56,38 +55,33 @@ def parse_ab_header(data):
"backup": True,
"compression": (is_compressed == b"1"),
"version": int(version),
"encryption": encryption.decode("utf-8"),
"encryption": encryption.decode("utf-8")
}
return {"backup": False, "compression": None, "version": None, "encryption": None}
return {
"backup": False,
"compression": None,
"version": None,
"encryption": None
}
def decrypt_master_key(
password,
user_salt,
user_iv,
pbkdf2_rounds,
master_key_blob,
format_version,
checksum_salt,
):
def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds,
master_key_blob, format_version, checksum_salt):
"""Generate AES key from user password uisng PBKDF2
The backup master key is extracted from the master key blog after decryption.
"""
# Derive key from password using PBKDF2.
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds
)
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt,
iterations=pbkdf2_rounds)
key = kdf.derive(password.encode("utf-8"))
# Decrypt master key blob.
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
decryptor = cipher.decryptor()
try:
decryted_master_key_blob = (
decryptor.update(master_key_blob) + decryptor.finalize()
)
decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize()
# Extract key and IV from decrypted blob.
key_blob = io.BytesIO(decryted_master_key_blob)
@@ -109,9 +103,8 @@ def decrypt_master_key(
hmac_mk = master_key
# Derive checksum to confirm successful backup decryption.
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds
)
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt,
iterations=pbkdf2_rounds)
calculated_checksum = kdf.derive(hmac_mk)
if master_key_checksum != calculated_checksum:
@@ -120,7 +113,8 @@ def decrypt_master_key(
return master_key, master_iv
def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version):
def decrypt_backup_data(encrypted_backup, password, encryption_algo,
format_version):
"""
Generate encryption keyffrom password and do decryption
@@ -131,14 +125,8 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
if password is None:
raise InvalidBackupPassword()
[
user_salt,
checksum_salt,
pbkdf2_rounds,
user_iv,
master_key_blob,
encrypted_data,
] = encrypted_backup.split(b"\n", 5)
[user_salt, checksum_salt, pbkdf2_rounds, user_iv,
master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5)
user_salt = bytes.fromhex(user_salt.decode("utf-8"))
checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8"))
@@ -147,15 +135,13 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
# Derive decryption master key from password.
master_key, master_iv = decrypt_master_key(
password=password,
user_salt=user_salt,
user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds,
master_key_blob=master_key_blob,
format_version=format_version,
checksum_salt=checksum_salt,
)
master_key, master_iv = decrypt_master_key(password=password,
user_salt=user_salt,
user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds,
master_key_blob=master_key_blob,
format_version=format_version,
checksum_salt=checksum_salt)
# Decrypt and unpad backup data using derivied key.
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
@@ -174,23 +160,21 @@ def parse_backup_file(data, password=None):
if not data.startswith(b"ANDROID BACKUP"):
raise AndroidBackupParsingError("Invalid file header")
[_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
[_, version, is_compressed,
encryption_algo, tar_data] = data.split(b"\n", 4)
version = int(version)
is_compressed = int(is_compressed)
if encryption_algo != b"none":
tar_data = decrypt_backup_data(
tar_data, password, encryption_algo, format_version=version
)
tar_data = decrypt_backup_data(tar_data, password, encryption_algo,
format_version=version)
if is_compressed:
try:
tar_data = zlib.decompress(tar_data)
except zlib.error as exc:
raise AndroidBackupParsingError(
"Impossible to decompress the backup file"
) from exc
raise AndroidBackupParsingError("Impossible to decompress the backup file") from exc
return tar_data
@@ -205,10 +189,9 @@ def parse_tar_for_sms(data):
res = []
with tarfile.open(fileobj=dbytes) as tar:
for member in tar.getmembers():
if member.name.startswith("apps/com.android.providers.telephony/d_f/") and (
member.name.endswith("_sms_backup")
or member.name.endswith("_mms_backup")
):
if (member.name.startswith("apps/com.android.providers.telephony/d_f/")
and (member.name.endswith("_sms_backup")
or member.name.endswith("_mms_backup"))):
dhandler = tar.extractfile(member)
res.extend(parse_sms_file(dhandler.read()))
@@ -230,13 +213,10 @@ def parse_sms_file(data):
entry["body"] = entry["mms_body"]
entry.pop("mms_body")
body = entry.get("body", None)
message_links = None
if body:
message_links = check_for_links(entry["body"])
message_links = check_for_links(entry["body"])
entry["isodate"] = convert_unix_to_iso(int(entry["date"]) / 1000)
entry["direction"] = "sent" if int(entry["date_sent"]) else "received"
entry["direction"] = ("sent" if int(entry["date_sent"]) else "received")
# Extract links from the body
if message_links or entry["body"].strip() == "":

View File

@@ -0,0 +1,521 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import re
from datetime import datetime
from typing import Any, Dict, List
from mvt.common.utils import convert_datetime_to_iso
def parse_dumpsys_accessibility(output: str) -> List[Dict[str, str]]:
results = []
in_services = False
for line in output.splitlines():
if line.strip().startswith("installed services:"):
in_services = True
continue
if not in_services:
continue
if line.strip() == "}":
break
service = line.split(":")[1].strip()
results.append({
"package_name": service.split("/")[0],
"service": service,
})
return results
def parse_dumpsys_activity_resolver_table(output: str) -> Dict[str, Any]:
results = {}
in_activity_resolver_table = False
in_non_data_actions = False
intent = None
for line in output.splitlines():
if line.startswith("Activity Resolver Table:"):
in_activity_resolver_table = True
continue
if not in_activity_resolver_table:
continue
if line.startswith(" Non-Data Actions:"):
in_non_data_actions = True
continue
if not in_non_data_actions:
continue
# If we hit an empty line, the Non-Data Actions section should be
# finished.
if line.strip() == "":
break
# We detect the action name.
if (line.startswith(" " * 6) and not line.startswith(" " * 8)
and ":" in line):
intent = line.strip().replace(":", "")
results[intent] = []
continue
# If we are not in an intent block yet, skip.
if not intent:
continue
# If we are in a block but the line does not start with 8 spaces
# it means the block ended a new one started, so we reset and
# continue.
if not line.startswith(" " * 8):
intent = None
continue
# If we got this far, we are processing receivers for the
# activities we are interested in.
activity = line.strip().split(" ")[1]
package_name = activity.split("/")[0]
results[intent].append({
"package_name": package_name,
"activity": activity,
})
return results
def parse_dumpsys_battery_daily(output: str) -> list:
results = []
daily = None
daily_updates = []
for line in output.splitlines():
if line.startswith(" Daily from "):
if len(daily_updates) > 0:
results.extend(daily_updates)
daily_updates = []
timeframe = line[13:].strip()
date_from, date_to = timeframe.strip(":").split(" to ", 1)
daily = {"from": date_from[0:10], "to": date_to[0:10]}
continue
if not daily:
continue
if not line.strip().startswith("Update "):
continue
line = line.strip().replace("Update ", "")
package_name, vers = line.split(" ", 1)
vers_nr = vers.split("=", 1)[1]
already_seen = False
for update in daily_updates:
if (package_name == update["package_name"]
and vers_nr == update["vers"]):
already_seen = True
break
if not already_seen:
daily_updates.append({
"action": "update",
"from": daily["from"],
"to": daily["to"],
"package_name": package_name,
"vers": vers_nr,
})
if len(daily_updates) > 0:
results.extend(daily_updates)
return results
def parse_dumpsys_battery_history(output: str) -> List[Dict[str, Any]]:
results = []
for line in output.splitlines():
if line.startswith("Battery History "):
continue
if line.strip() == "":
break
time_elapsed = line.strip().split(" ", 1)[0]
event = ""
if line.find("+job") > 0:
event = "start_job"
uid = line[line.find("+job")+5:line.find(":")]
service = line[line.find(":")+1:].strip('"')
package_name = service.split("/")[0]
elif line.find("-job") > 0:
event = "end_job"
uid = line[line.find("-job")+5:line.find(":")]
service = line[line.find(":")+1:].strip('"')
package_name = service.split("/")[0]
elif line.find("+running +wake_lock=") > 0:
uid = line[line.find("+running +wake_lock=")+21:line.find(":")]
event = "wake"
service = line[line.find("*walarm*:")+9:].split(" ")[0].strip('"').strip()
if service == "" or "/" not in service:
continue
package_name = service.split("/")[0]
elif (line.find("+top=") > 0) or (line.find("-top") > 0):
if line.find("+top=") > 0:
event = "start_top"
top_pos = line.find("+top=")
else:
event = "end_top"
top_pos = line.find("-top=")
colon_pos = top_pos+line[top_pos:].find(":")
uid = line[top_pos+5:colon_pos]
service = ""
package_name = line[colon_pos+1:].strip('"')
else:
continue
results.append({
"time_elapsed": time_elapsed,
"event": event,
"uid": uid,
"package_name": package_name,
"service": service,
})
return results
def parse_dumpsys_dbinfo(output: str) -> List[Dict[str, Any]]:
results = []
rxp = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\].*\[Pid:\((\d+)\)\](\w+).*sql\=\"(.+?)\"') # pylint: disable=line-too-long
rxp_no_pid = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\][ ]{1}(\w+).*sql\=\"(.+?)\"') # pylint: disable=line-too-long
pool = None
in_operations = False
for line in output.splitlines():
if line.startswith("Connection pool for "):
pool = line.replace("Connection pool for ", "").rstrip(":")
if not pool:
continue
if line.strip() == "Most recently executed operations:":
in_operations = True
continue
if not in_operations:
continue
if not line.startswith(" "):
in_operations = False
pool = None
continue
matches = rxp.findall(line)
if not matches:
matches = rxp_no_pid.findall(line)
if not matches:
continue
match = matches[0]
results.append({
"isodate": match[0],
"action": match[1],
"sql": match[2],
"path": pool,
})
else:
match = matches[0]
results.append({
"isodate": match[0],
"pid": match[1],
"action": match[2],
"sql": match[3],
"path": pool,
})
return results
def parse_dumpsys_receiver_resolver_table(output: str) -> Dict[str, Any]:
results = {}
in_receiver_resolver_table = False
in_non_data_actions = False
intent = None
for line in output.splitlines():
if line.startswith("Receiver Resolver Table:"):
in_receiver_resolver_table = True
continue
if not in_receiver_resolver_table:
continue
if line.startswith(" Non-Data Actions:"):
in_non_data_actions = True
continue
if not in_non_data_actions:
continue
# If we hit an empty line, the Non-Data Actions section should be
# finished.
if line.strip() == "":
break
# We detect the action name.
if (line.startswith(" " * 6) and not line.startswith(" " * 8)
and ":" in line):
intent = line.strip().replace(":", "")
results[intent] = []
continue
# If we are not in an intent block yet, skip.
if not intent:
continue
# If we are in a block but the line does not start with 8 spaces
# it means the block ended a new one started, so we reset and
# continue.
if not line.startswith(" " * 8):
intent = None
continue
# If we got this far, we are processing receivers for the
# activities we are interested in.
receiver = line.strip().split(" ")[1]
package_name = receiver.split("/")[0]
results[intent].append({
"package_name": package_name,
"receiver": receiver,
})
return results
def parse_dumpsys_appops(output: str) -> List[Dict[str, Any]]:
results = []
perm = {}
package = {}
entry = {}
uid = None
in_packages = False
for line in output.splitlines():
if line.startswith(" Uid 0:"):
in_packages = True
if not in_packages:
continue
if line.startswith(" Uid "):
uid = line[6:-1]
continue
if line.startswith(" Package "):
if entry:
perm["entries"].append(entry)
entry = {}
if package:
if perm:
package["permissions"].append(perm)
perm = {}
results.append(package)
package = {
"package_name": line[12:-1],
"permissions": [],
"uid": uid,
}
continue
if line.startswith(" ") and line[6] != " ":
if entry:
perm["entries"].append(entry)
entry = {}
if perm:
package["permissions"].append(perm)
perm = {}
perm["name"] = line.split()[0]
perm["entries"] = []
if len(line.split()) > 1:
perm["access"] = line.split()[1][1:-2]
continue
if line.startswith(" "):
# Permission entry like:
# Reject: [fg-s]2021-05-19 22:02:52.054 (-314d1h25m2s33ms)
if entry:
perm["entries"].append(entry)
entry = {}
entry["access"] = line.split(":")[0].strip()
entry["type"] = line[line.find("[")+1:line.find("]")]
try:
entry["timestamp"] = convert_datetime_to_iso(
datetime.strptime(
line[line.find("]")+1:line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f"))
except ValueError:
# Invalid date format
pass
if line.strip() == "":
break
if entry:
perm["entries"].append(entry)
if perm:
package["permissions"].append(perm)
if package:
results.append(package)
return results
def parse_dumpsys_package_for_details(output: str) -> Dict[str, Any]:
"""
Parse one entry of a dumpsys package information
"""
details = {
"uid": "",
"version_name": "",
"version_code": "",
"timestamp": "",
"first_install_time": "",
"last_update_time": "",
"permissions": [],
"requested_permissions": [],
}
in_install_permissions = False
in_runtime_permissions = False
in_declared_permissions = False
in_requested_permissions = True
for line in output.splitlines():
if in_install_permissions:
if line.startswith(" " * 4) and not line.startswith(" " * 6):
in_install_permissions = False
else:
lineinfo = line.strip().split(":")
permission = lineinfo[0]
granted = None
if "granted=" in lineinfo[1]:
granted = ("granted=true" in lineinfo[1])
details["permissions"].append({
"name": permission,
"granted": granted,
"type": "install"
})
if in_runtime_permissions:
if not line.startswith(" " * 8):
in_runtime_permissions = False
else:
lineinfo = line.strip().split(":")
permission = lineinfo[0]
granted = None
if "granted=" in lineinfo[1]:
granted = ("granted=true" in lineinfo[1])
details["permissions"].append({
"name": permission,
"granted": granted,
"type": "runtime"
})
if in_declared_permissions:
if not line.startswith(" " * 6):
in_declared_permissions = False
else:
permission = line.strip().split(":")[0]
details["permissions"].append({
"name": permission,
"type": "declared"
})
if in_requested_permissions:
if not line.startswith(" " * 6):
in_requested_permissions = False
else:
details["requested_permissions"].append(line.strip())
if line.strip().startswith("userId="):
details["uid"] = line.split("=")[1].strip()
elif line.strip().startswith("versionName="):
details["version_name"] = line.split("=")[1].strip()
elif line.strip().startswith("versionCode="):
details["version_code"] = line.split("=", 1)[1].strip()
elif line.strip().startswith("timeStamp="):
details["timestamp"] = line.split("=")[1].strip()
elif line.strip().startswith("firstInstallTime="):
details["first_install_time"] = line.split("=")[1].strip()
elif line.strip().startswith("lastUpdateTime="):
details["last_update_time"] = line.split("=")[1].strip()
elif line.strip() == "install permissions:":
in_install_permissions = True
elif line.strip() == "runtime permissions:":
in_runtime_permissions = True
elif line.strip() == "declared permissions:":
in_declared_permissions = True
elif line.strip() == "requested permissions:":
in_requested_permissions = True
return details
def parse_dumpsys_packages(output: str) -> List[Dict[str, Any]]:
"""
Parse the dumpsys package service data
"""
pkg_rxp = re.compile(r" Package \[(.+?)\].*")
results = []
package_name = None
package = {}
lines = []
for line in output.splitlines():
if line.startswith(" Package ["):
if len(lines) > 0:
details = parse_dumpsys_package_for_details("\n".join(lines))
package.update(details)
results.append(package)
lines = []
package = {}
matches = pkg_rxp.findall(line)
if not matches:
continue
package_name = matches[0]
package["package_name"] = package_name
continue
if not package_name:
continue
lines.append(line)
if len(lines) > 0:
details = parse_dumpsys_package_for_details("\n".join(lines))
package.update(details)
results.append(package)
return results

View File

@@ -0,0 +1,29 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import re
from typing import Dict, List
def parse_getprop(output: str) -> List[Dict[str, str]]:
results = []
rxp = re.compile(r"\[(.+?)\]: \[(.+?)\]")
for line in output.splitlines():
line = line.strip()
if line == "":
continue
matches = re.findall(rxp, line)
if not matches or len(matches[0]) != 2:
continue
entry = {
"name": matches[0][0],
"value": matches[0][1]
}
results.append(entry)
return results

View File

@@ -1,4 +1,4 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -7,13 +7,14 @@ import logging
import os
from typing import Optional
from mvt.common.module import PostAnalysisModule
from mvt.common.command import Command
from mvt.common.utils import exec_or_profile
log = logging.getLogger(__name__)
class CmdCheckIOCS(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -21,23 +22,18 @@ class CmdCheckIOCS(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
fast_mode: Optional[bool] = False,
) -> None:
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
module_options=module_options,
log=log,
)
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
self.name = "check-iocs"
def run(self) -> None:
assert self.target_path is not None
all_modules = []
post_modules = []
for entry in self.modules:
if entry not in all_modules:
all_modules.append(entry)
@@ -49,38 +45,42 @@ class CmdCheckIOCS(Command):
name_only, _ = os.path.splitext(file_name)
file_path = os.path.join(self.target_path, file_name)
for iocs_module in all_modules:
if self.module_name and iocs_module.__name__ != self.module_name:
for module in all_modules:
if self.module_name and module.__name__ != self.module_name:
continue
if iocs_module.get_slug() != name_only:
# Handle post-analysis modules at the end
if issubclass(module, PostAnalysisModule) and module not in post_modules:
post_modules.append(module)
continue
log.info(
'Loading results from "%s" with module %s',
file_name,
iocs_module.__name__,
)
m = iocs_module.from_json(
file_path, log=logging.getLogger(iocs_module.__module__)
)
if not m:
log.warning("No result from this module, skipping it")
# Skip if the current result file does not match the module name
if module().get_slug() != name_only:
continue
log.info("Loading results from \"%s\" with module %s",
file_name, module.__name__)
m = module.from_json(file_path,
log=logging.getLogger(module.__module__))
if self.iocs.total_ioc_count > 0:
m.indicators = self.iocs
m.indicators.log = m.log
try:
exec_or_profile("m.check_indicators()", globals(), locals())
m.check_indicators()
except NotImplementedError:
continue
else:
total_detections += len(m.detected)
# Run post-analysis modules at end
for post_module in post_modules:
m = post_module.from_results(self.target_path, log=log)
m.run()
total_detections += len(m.detected)
if total_detections > 0:
log.warning(
"The check of the results produced %d detections!", total_detections
)
log.warning("The check of the results produced %d detections!",
total_detections)

View File

@@ -1,5 +1,5 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
@@ -12,16 +12,14 @@ from typing import Optional
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule, run_module, save_timeline
from mvt.common.utils import (
convert_datetime_to_iso,
generate_hashes_from_path,
get_sha256_from_file_path,
)
from mvt.common.config import settings
from mvt.common.utils import (convert_datetime_to_iso,
generate_hashes_from_path,
get_sha256_from_file_path)
from mvt.common.version import MVT_VERSION
class Command:
def __init__(
self,
target_path: Optional[str] = None,
@@ -29,25 +27,22 @@ class Command:
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
module_options: Optional[dict] = None,
hashes: bool = False,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
self.name = ""
self.modules = []
self.modules_post = []
self.target_path = target_path
self.results_path = results_path
self.ioc_files = ioc_files if ioc_files else []
self.module_name = module_name
self.serial = serial
self.fast_mode = fast_mode
self.log = log
# This dictionary can contain options that will be passed down from
# the Command to all modules. This can for example be used to pass
# down a password to decrypt a backup or flags which are need by some modules.
self.module_options = module_options if module_options else {}
# This list will contain all executed modules.
# We can use this to reference e.g. self.executed[0].results.
self.executed = []
@@ -68,9 +63,8 @@ class Command:
try:
os.makedirs(self.results_path)
except Exception as exc:
self.log.critical(
"Unable to create output folder %s: %s", self.results_path, exc
)
self.log.critical("Unable to create output folder %s: %s",
self.results_path, exc)
sys.exit(1)
def _setup_logging(self):
@@ -78,49 +72,26 @@ class Command:
return
logger = logging.getLogger("mvt")
file_handler = logging.FileHandler(
os.path.join(self.results_path, "command.log")
)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
file_handler = logging.FileHandler(os.path.join(self.results_path,
"command.log"))
formatter = logging.Formatter("%(asctime)s - %(name)s - "
"%(levelname)s - %(message)s")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# MVT can be run in a loop
# Old file handlers stick around in subsequent loops
# Remove any existing logging.FileHandler instances
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
logger.removeHandler(handler)
# And finally add the new one
logger.addHandler(file_handler)
def _store_timeline(self) -> None:
if not self.results_path:
return
# We use local timestamps in the timeline on Android as many
# logs do not contain timezone information.
if type(self).__name__.startswith("CmdAndroid"):
is_utc = False
else:
is_utc = True
if len(self.timeline) > 0:
save_timeline(
self.timeline,
os.path.join(self.results_path, "timeline.csv"),
is_utc=is_utc,
)
save_timeline(self.timeline,
os.path.join(self.results_path, "timeline.csv"))
if len(self.timeline_detected) > 0:
save_timeline(
self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv"),
is_utc=is_utc,
)
save_timeline(self.timeline_detected,
os.path.join(self.results_path,
"timeline_detected.csv"))
def _store_info(self) -> None:
if not self.results_path:
@@ -143,7 +114,7 @@ class Command:
if ioc_file_path and ioc_file_path not in info["ioc_files"]:
info["ioc_files"].append(ioc_file_path)
if self.target_path and (settings.HASH_FILES or self.hashes):
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
self.generate_hashes()
info["hashes"] = self.hash_values
@@ -152,9 +123,9 @@ class Command:
with open(info_path, "w+", encoding="utf-8") as handle:
json.dump(info, handle, indent=4)
if self.target_path and (settings.HASH_FILES or self.hashes):
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
info_hash = get_sha256_from_file_path(info_path)
self.log.info('Reference hash of the info.json file: "%s"', info_hash)
self.log.info("Reference hash of the info.json file: \"%s\"", info_hash)
def generate_hashes(self) -> None:
"""
@@ -167,8 +138,9 @@ class Command:
self.hash_values.append(file)
def list_modules(self) -> None:
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info("Following is the list of available %s modules:",
self.name)
for module in (self.modules + self.modules_post):
self.log.info(" - %s", module.__name__)
def init(self) -> None:
@@ -180,28 +152,8 @@ class Command:
def finish(self) -> None:
raise NotImplementedError
def _show_disable_adb_warning(self) -> None:
"""Warn if ADB is enabled"""
if type(self).__name__ in ["CmdAndroidCheckADB", "CmdAndroidCheckAndroidQF"]:
self.log.info(
"Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. "
"ADB is a powerful tool which can allow unauthorized access to the device."
)
def _show_support_message(self) -> None:
support_message = "Please seek reputable expert help if you have serious concerns about a possible spyware attack. Such support is available to human rights defenders and civil society through Amnesty International's Security Lab at https://securitylab.amnesty.org/get-help/?c=mvt"
if self.detected_count == 0:
self.log.info(
f"[bold]NOTE:[/bold] Using MVT with public indicators of compromise (IOCs) [bold]WILL NOT[/bold] automatically detect advanced attacks.\n\n{support_message}",
extra={"markup": True},
)
else:
self.log.warning(
f"[bold]NOTE: Detected indicators of compromise[/bold]. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}",
extra={"markup": True},
)
def run(self) -> None:
try:
self.init()
except NotImplementedError:
@@ -211,15 +163,13 @@ class Command:
if self.module_name and module.__name__ != self.module_name:
continue
# FIXME: do we need the logger here
# FIXME: do we need the logger here
module_logger = logging.getLogger(module.__module__)
m = module(
target_path=self.target_path,
results_path=self.results_path,
module_options=self.module_options,
log=module_logger,
)
m = module(target_path=self.target_path,
results_path=self.results_path,
fast_mode=self.fast_mode,
log=module_logger)
if self.iocs.total_ioc_count:
m.indicators = self.iocs
@@ -249,6 +199,3 @@ class Command:
self._store_timeline()
self._store_info()
self._show_disable_adb_warning()
self._show_support_message()

16
mvt/common/help.py Normal file
View File

@@ -0,0 +1,16 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
# Help messages of repeating options.
HELP_MSG_OUTPUT = "Specify a path to a folder where you want to store JSON results"
HELP_MSG_IOC = "Path to indicators file (can be invoked multiple time)"
HELP_MSG_FAST = "Avoid running time/resource consuming features"
HELP_MSG_LIST_MODULES = "Print list of available modules and exit"
HELP_MSG_MODULE = "Name of a single module you would like to run instead of all"
HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
HELP_MSG_VERBOSE = "Verbose mode"
# Android-specific.
HELP_MSG_SERIAL = "Specify a device serial number or HOST:PORT connection string"

553
mvt/common/indicators.py Normal file
View File

@@ -0,0 +1,553 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import json
import logging
import os
from typing import Any, Dict, Iterator, List, Optional, Union
from appdirs import user_data_dir
from .url import URL
MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
logger = logging.getLogger(__name__)
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
functions to compare extracted artifacts to the indicators.
"""
def __init__(self, log=logger) -> None:
self.log = log
self.ioc_collections: List[Dict[str, Any]] = []
self.total_ioc_count = 0
def _load_downloaded_indicators(self) -> None:
if not os.path.isdir(MVT_INDICATORS_FOLDER):
return
for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER):
if ioc_file_name.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER,
ioc_file_name))
def _check_stix2_env_variable(self) -> None:
"""
Checks if a variable MVT_STIX2 contains path to a STIX files.
"""
if "MVT_STIX2" not in os.environ:
return
paths = os.environ["MVT_STIX2"].split(":")
for path in paths:
if os.path.isfile(path):
self.parse_stix2(path)
else:
self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s",
path)
def _new_collection(
self,
cid: Optional[str] = None,
name: Optional[str] = None,
description: Optional[str] = None,
file_name: Optional[str] = None,
file_path: Optional[str] = None
) -> dict:
return {
"id": cid,
"name": name,
"description": description,
"stix2_file_name": file_name,
"stix2_file_path": file_path,
"domains": [],
"processes": [],
"emails": [],
"file_names": [],
"file_paths": [],
"files_sha256": [],
"app_ids": [],
"ios_profile_ids": [],
"android_property_names": [],
"count": 0,
}
def _add_indicator(self, ioc: str, ioc_coll: dict,
ioc_coll_list: list) -> None:
ioc = ioc.strip("'")
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
ioc_coll["count"] += 1
self.total_ioc_count += 1
def _process_indicator(self, indicator: dict, collection: dict) -> None:
key, value = indicator.get("pattern", "").strip("[]").split("=")
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"])
elif key == "android-property:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["android_property_names"])
def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
:param file_path: Path to the STIX2 file to parse
:type file_path: str
"""
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
with open(file_path, "r", encoding="utf-8") as handle:
try:
data = json.load(handle)
except json.decoder.JSONDecodeError:
self.log.critical("Unable to parse STIX2 indicator file. "
"The file is corrupted or in the wrong format!")
return
malware = {}
indicators = []
relationships = []
for entry in data.get("objects", []):
entry_type = entry.get("type", "")
if entry_type == "malware":
malware[entry["id"]] = {
"name": entry["name"],
"description": entry.get("description", ""),
}
elif entry_type == "indicator":
indicators.append(entry)
elif entry_type == "relationship":
relationships.append(entry)
collections = []
for mal_id, mal_values in malware.items():
collection = self._new_collection(mal_id, mal_values.get("name"),
mal_values.get("description"),
os.path.basename(file_path),
file_path)
collections.append(collection)
# We loop through all indicators.
for indicator in indicators:
malware_id = None
# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue
# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
break
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
for coll in collections:
self.log.info("Extracted %d indicators for collection with name \"%s\"",
coll["count"], coll["name"])
self.ioc_collections.extend(collections)
def load_indicators_files(self, files: list,
load_default: Optional[bool] = True) -> None:
"""
Load a list of indicators files.
"""
for file_path in files:
if os.path.isfile(file_path):
self.parse_stix2(file_path)
else:
self.log.warning("No indicators file exists at path %s",
file_path)
# Load downloaded indicators and any indicators from env variable.
if load_default:
self._load_downloaded_indicators()
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators",
self.total_ioc_count)
def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]:
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
"value": ioc,
"type": ioc_type,
"name": ioc_collection["name"],
"stix2_file_name": ioc_collection["stix2_file_name"],
}
def check_domain(self, url: str) -> Union[dict, None]:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
:type url: str
:returns: Indicator details if matched, otherwise None
"""
if not url:
return None
if not isinstance(url, str):
return None
try:
# First we use the provided URL.
orig_url = URL(url)
if orig_url.check_if_shortened():
# If it is, we try to retrieve the actual URL making an
# HTTP HEAD request.
unshortened = orig_url.unshorten()
self.log.debug("Found a shortened URL %s -> %s",
url, unshortened)
if unshortened is None:
return None
# Now we check for any nested URL shorteners.
dest_url = URL(unshortened)
if dest_url.check_if_shortened():
self.log.debug("Original URL %s appears to shorten another "
"shortened URL %s ... checking!",
orig_url.url, dest_url.url)
return self.check_domain(dest_url.url)
final_url = dest_url
else:
# If it's not shortened, we just use the original URL object.
final_url = orig_url
except Exception:
# If URL parsing failed, we just try to do a simple substring
# match.
for ioc in self.get_iocs("domains"):
if ioc["value"].lower() in url:
self.log.warning("Maybe found a known suspicious domain %s "
"matching indicators from \"%s\"",
url, ioc["name"])
return ioc
# If nothing matched, we can quit here.
return None
# If all parsing worked, we start walking through available domain
# indicators.
for ioc in self.get_iocs("domains"):
# First we check the full domain.
if final_url.domain.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a known suspicious domain %s "
"shortened as %s matching indicators from \"%s\"",
final_url.url, orig_url.url, ioc["name"])
else:
self.log.warning("Found a known suspicious domain %s "
"matching indicators from \"%s\"",
final_url.url, ioc["name"])
return ioc
# Then we just check the top level domain.
if final_url.top_level.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a sub-domain with suspicious top "
"level %s shortened as %s matching "
"indicators from \"%s\"", final_url.url,
orig_url.url, ioc["name"])
else:
self.log.warning("Found a sub-domain with a suspicious top "
"level %s matching indicators from \"%s\"",
final_url.url, ioc["name"])
return ioc
return None
def check_domains(self, urls: list) -> Union[dict, None]:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
:type urls: list
:returns: Indicator details if matched, otherwise None
"""
if not urls:
return None
for url in urls:
check = self.check_domain(url)
if check:
return check
return None
def check_process(self, process: str) -> Union[dict, None]:
"""Check the provided process name against the list of process
indicators.
:param process: Process name to check against process indicators
:type process: str
:returns: Indicator details if matched, otherwise None
"""
if not process:
return None
proc_name = os.path.basename(process)
for ioc in self.get_iocs("processes"):
if proc_name == ioc["value"]:
self.log.warning("Found a known suspicious process name \"%s\" "
"matching indicators from \"%s\"",
process, ioc["name"])
return ioc
if len(proc_name) == 16:
if ioc["value"].startswith(proc_name):
self.log.warning("Found a truncated known suspicious "
"process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"])
return ioc
return None
def check_processes(self, processes: list) -> Union[dict, None]:
"""Check the provided list of processes against the list of
process indicators.
:param processes: List of processes to check against process indicators
:type processes: list
:returns: Indicator details if matched, otherwise None
"""
if not processes:
return None
for process in processes:
check = self.check_process(process)
if check:
return check
return None
def check_email(self, email: str) -> Union[dict, None]:
"""Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators
:type email: str
:returns: Indicator details if matched, otherwise None
"""
if not email:
return None
for ioc in self.get_iocs("emails"):
if email.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious email address \"%s\" "
"matching indicators from \"%s\"",
email, ioc["name"])
return ioc
return None
def check_file_name(self, file_name: str) -> Union[dict, None]:
"""Check the provided file name against the list of file indicators.
:param file_name: File name to check against file
indicators
:type file_name: str
:returns: Indicator details if matched, otherwise None
"""
if not file_name:
return None
for ioc in self.get_iocs("file_names"):
if ioc["value"] == file_name:
self.log.warning("Found a known suspicious file name \"%s\" "
"matching indicators from \"%s\"",
file_name, ioc["name"])
return ioc
return None
def check_file_path(self, file_path: str) -> Union[dict, None]:
"""Check the provided file path against the list of file indicators
(both path and name).
:param file_path: File path or file name to check against file
indicators
:type file_path: str
:returns: Indicator details if matched, otherwise None
"""
if not file_path:
return None
ioc = self.check_file_name(os.path.basename(file_path))
if ioc:
return ioc
for ioc in self.get_iocs("file_paths"):
# Strip any trailing slash from indicator paths to match
# directories.
if file_path.startswith(ioc["value"].rstrip("/")):
self.log.warning("Found a known suspicious file path \"%s\" "
"matching indicators form \"%s\"",
file_path, ioc["name"])
return ioc
return None
def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Check the provided file path contains a process name from the
list of indicators
:param file_path: File path or file name to check against file
indicators
:type file_path: str
:returns: Indicator details if matched, otherwise None
"""
if not file_path:
return None
for ioc in self.get_iocs("processes"):
parts = file_path.split("/")
if ioc["value"] in parts:
self.log.warning("Found known suspicious process name mentioned in file at "
"path \"%s\" matching indicators from \"%s\"",
file_path, ioc["name"])
return ioc
return None
def check_profile(self, profile_uuid: str) -> Union[dict, None]:
"""Check the provided configuration profile UUID against the list of
indicators.
:param profile_uuid: Profile UUID to check against configuration profile
indicators
:type profile_uuid: str
:returns: Indicator details if matched, otherwise None
"""
if not profile_uuid:
return None
for ioc in self.get_iocs("ios_profile_ids"):
if profile_uuid in ioc["value"]:
self.log.warning("Found a known suspicious profile ID \"%s\" "
"matching indicators from \"%s\"",
profile_uuid, ioc["name"])
return ioc
return None
def check_file_hash(self, file_hash: str) -> Union[dict, None]:
"""Check the provided SHA256 file hash against the list of indicators.
:param file_hash: SHA256 hash to check
:type file_hash: str
:returns: Indicator details if matched, otherwise None
"""
if not file_hash:
return None
for ioc in self.get_iocs("files_sha256"):
if file_hash.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious file with hash \"%s\" "
"matching indicators from \"%s\"",
file_hash, ioc["name"])
return ioc
return None
def check_app_id(self, app_id: str) -> Union[dict, None]:
"""Check the provided app identifier (typically an Android package name)
against the list of indicators.
:param app_id: App ID to check against the list of indicators
:type app_id: str
:returns: Indicator details if matched, otherwise None
"""
if not app_id:
return None
for ioc in self.get_iocs("app_ids"):
if app_id.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious app with ID \"%s\" "
"matching indicators from \"%s\"", app_id,
ioc["name"])
return ioc
return None
def check_android_property_name(self, property_name: str) -> Optional[dict]:
"""Check the android property name against the list of indicators.
:param property_name: Name of the Android property
:type property_name: str
:returns: Indicator details if matched, otherwise None
"""
if property_name is None:
return None
for ioc in self.get_iocs("android_property_names"):
if property_name.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious Android property \"%s\" "
"matching indicators from \"%s\"", property_name,
ioc["name"])
return ioc
return None

63
mvt/common/logo.py Normal file
View File

@@ -0,0 +1,63 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from rich import print as rich_print
from .updates import IndicatorsUpdates, MVTUpdates
from .version import MVT_VERSION
def check_updates() -> None:
# First we check for MVT version udpates.
mvt_updates = MVTUpdates()
try:
latest_version = mvt_updates.check()
except Exception:
pass
else:
if latest_version:
rich_print(f"\t\t[bold]Version {latest_version} is available! "
"Upgrade mvt with `pip3 install -U mvt`[/bold]")
# Then we check for indicators files updates.
ioc_updates = IndicatorsUpdates()
# Before proceeding, we check if we have downloaded an indicators index.
# If not, there's no point in proceeding with the updates check.
if ioc_updates.get_latest_update() == 0:
rich_print("\t\t[bold]You have not yet downloaded any indicators, check "
"the `download-iocs` command![/bold]")
return
# We only perform this check at a fixed frequency, in order to not
# overburden the user with too many lookups if the command is being run
# multiple times.
should_check, hours = ioc_updates.should_check()
if not should_check:
rich_print(f"\t\tIndicators updates checked recently, next automatic check "
f"in {int(hours)} hours")
return
try:
ioc_to_update = ioc_updates.check()
except Exception:
pass
else:
if ioc_to_update:
rich_print("\t\t[bold]There are updates to your indicators files! "
"Run the `download-iocs` command to update![/bold]")
else:
rich_print("\t\tYour indicators files seem to be up to date.")
def logo() -> None:
rich_print("\n")
rich_print("\t[bold]MVT[/bold] - Mobile Verification Toolkit")
rich_print("\t\thttps://mvt.re")
rich_print(f"\t\tVersion: {MVT_VERSION}")
check_updates()
rich_print("\n")

View File

@@ -1,16 +1,16 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2023 The MVT Authors.
# Copyright (c) 2021-2023 Claudio Guarnieri.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import csv
import json
import logging
import os
import re
import glob
from typing import Any, Dict, List, Optional, Union
from .utils import CustomJSONEncoder, exec_or_profile
import simplejson as json
class DatabaseNotFoundError(Exception):
@@ -36,9 +36,9 @@ class MVTModule:
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
module_options: Optional[Dict[str, Any]] = None,
fast_mode: Optional[bool] = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None
) -> None:
"""Initialize module.
@@ -58,7 +58,7 @@ class MVTModule:
self.file_path = file_path
self.target_path = target_path
self.results_path = results_path
self.module_options = module_options if module_options else {}
self.fast_mode = fast_mode
self.log = log
self.indicators = None
self.results = results if results else []
@@ -69,22 +69,18 @@ class MVTModule:
@classmethod
def from_json(cls, json_path: str, log: logging.Logger):
with open(json_path, "r", encoding="utf-8") as handle:
try:
results = json.load(handle)
if log:
log.info('Loaded %d results from "%s"', len(results), json_path)
return cls(results=results, log=log)
except json.decoder.JSONDecodeError as err:
log.error('Error to decode the json "%s" file: "%s"', json_path, err)
return None
results = json.load(handle)
if log:
log.info("Loaded %d results from \"%s\"",
len(results), json_path)
return cls(results=results, log=log)
@classmethod
def get_slug(cls) -> str:
def get_slug(self) -> str:
"""Use the module's class name to retrieve a slug"""
if cls.slug:
return cls.slug
if self.slug:
return self.slug
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", cls.__name__)
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
def check_indicators(self) -> None:
@@ -104,23 +100,22 @@ class MVTModule:
if self.results:
results_file_name = f"{name}.json"
results_json_path = os.path.join(self.results_path, results_file_name)
results_json_path = os.path.join(self.results_path,
results_file_name)
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder)
json.dump(self.results, handle, indent=4, default=str)
except Exception as exc:
self.log.error(
"Unable to store results of module %s to file %s: %s",
self.__class__.__name__,
results_file_name,
exc,
)
self.log.error("Unable to store results of module %s to file %s: %s",
self.__class__.__name__, results_file_name,
exc)
if self.detected:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.results_path, detected_file_name)
detected_json_path = os.path.join(self.results_path,
detected_file_name)
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
json.dump(self.detected, handle, indent=4, default=str)
def serialize(self, record: dict) -> Union[dict, list, None]:
raise NotImplementedError
@@ -157,7 +152,8 @@ class MVTModule:
# De-duplicate timeline entries.
self.timeline = self._deduplicate_timeline(self.timeline)
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
self.timeline_detected = self._deduplicate_timeline(
self.timeline_detected)
def run(self) -> None:
"""Run the main module procedure."""
@@ -168,70 +164,49 @@ def run_module(module: MVTModule) -> None:
module.log.info("Running module %s...", module.__class__.__name__)
try:
exec_or_profile("module.run()", globals(), locals())
module.run()
except NotImplementedError:
module.log.exception(
"The run() procedure of module %s was not implemented yet!",
module.__class__.__name__,
)
module.log.exception("The run() procedure of module %s was not implemented yet!",
module.__class__.__name__)
except InsufficientPrivileges as exc:
module.log.info(
"Insufficient privileges for module %s: %s", module.__class__.__name__, exc
)
module.log.info("Insufficient privileges for module %s: %s",
module.__class__.__name__, exc)
except DatabaseNotFoundError as exc:
module.log.info(
"There might be no data to extract by module %s: %s",
module.__class__.__name__,
exc,
)
module.log.info("There might be no data to extract by module %s: %s",
module.__class__.__name__, exc)
except DatabaseCorruptedError as exc:
module.log.error(
"The %s module database seems to be corrupted: %s",
module.__class__.__name__,
exc,
)
module.log.error("The %s module database seems to be corrupted: %s",
module.__class__.__name__, exc)
except Exception as exc:
module.log.exception(
"Error in running extraction from module %s: %s",
module.__class__.__name__,
exc,
)
module.log.exception("Error in running extraction from module %s: %s",
module.__class__.__name__, exc)
else:
try:
exec_or_profile("module.check_indicators()", globals(), locals())
module.check_indicators()
except NotImplementedError:
module.log.info(
"The %s module does not support checking for indicators",
module.__class__.__name__,
)
module.log.info("The %s module does not support checking for indicators",
module.__class__.__name__)
except Exception as exc:
module.log.exception(
"Error when checking indicators from module %s: %s",
module.__class__.__name__,
exc,
)
module.log.exception("Error when checking indicators from module %s: %s",
module.__class__.__name__, exc)
else:
if module.indicators and not module.detected:
module.log.info(
"The %s module produced no detections!", module.__class__.__name__
)
module.log.info("The %s module produced no detections!",
module.__class__.__name__)
try:
module.to_timeline()
except NotImplementedError:
pass
except Exception as exc:
module.log.exception(
"Error when serializing data from module %s: %s",
module.__class__.__name__,
exc,
)
module.log.exception("Error when serializing data from module %s: %s",
module.__class__.__name__, exc)
module.save_to_json()
def save_timeline(timeline: list, timeline_path: str, is_utc: bool = True) -> None:
def save_timeline(timeline: list, timeline_path: str) -> None:
"""Save the timeline in a csv file.
:param timeline: List of records to order and store
@@ -239,24 +214,63 @@ def save_timeline(timeline: list, timeline_path: str, is_utc: bool = True) -> No
"""
with open(timeline_path, "a+", encoding="utf-8") as handle:
csvoutput = csv.writer(
handle, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, escapechar="\\"
)
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"",
quoting=csv.QUOTE_ALL, escapechar='\\')
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
if is_utc:
timestamp_header = "UTC Timestamp"
else:
timestamp_header = "Device Local Timestamp"
csvoutput.writerow([timestamp_header, "Plugin", "Event", "Description"])
for event in sorted(timeline, key=lambda x: x["timestamp"]
if x["timestamp"] is not None else ""):
csvoutput.writerow([
event.get("timestamp"),
event.get("module"),
event.get("event"),
event.get("data"),
])
for event in sorted(
timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""
):
csvoutput.writerow(
[
event.get("timestamp"),
event.get("module"),
event.get("event"),
event.get("data"),
]
)
class PostAnalysisModule(MVTModule):
"""
Base module for implementing post-processing rules against the output of
multiple MVT modules
"""
@classmethod
def from_results(cls, results_path: str, log: logging.Logger):
results = cls.load_results(results_path, log=log)
return cls(results=results, log=log)
@classmethod
def load_results(cls, results_path: str, log: logging.Logger):
"""Load the results from a directory of json file."""
# TODO: Move this to run once before loading all post-processing modules
module_results = {}
for json_path in glob.glob(os.path.join(results_path, "*.json")):
module_name, _ = os.path.splitext(os.path.basename(json_path))
with open(json_path, "r", encoding="utf-8") as handle:
try:
module_results[module_name] = json.load(handle)
except Exception as exc:
log.error("Unable to load results from file %s: %s",
json_path, exc)
if not module_results:
log.error("Did not find any MVT results at %s", results_path)
return module_results
def load_timeline(self):
"""Load timeline from CSV file"""
timeline = []
timeline_path = os.path.join(self.results_path, "timeline.csv")
with open(timeline_path, "r", encoding="utf-8") as handle:
csvinput = csv.reader(handle, delimiter=",", quotechar="\"",
quoting=csv.QUOTE_ALL, escapechar='\\')
for row in csvinput:
if row[0] == "UTC Timestamp":
continue
timeline.append({
"timestamp": row[0],
"module": row[1],
"event": row[2],
"data": row[3],
})
return timeline

Some files were not shown because too many files have changed in this diff Show More