Compare commits

...

32 Commits

Author SHA1 Message Date
Donncha Ó Cearbhaill
e2516f284b Bump version number 2023-06-29 17:03:26 +02:00
Donncha Ó Cearbhaill
17963f83d6 Fix URL to indicator repo in docs 2023-06-29 16:49:20 +02:00
Donncha Ó Cearbhaill
4f0c9c6077 Update README with information on indicators of compromise and path ways for forensic support 2023-06-29 16:48:56 +02:00
Donncha Ó Cearbhaill
27bd5f03a8 Merge pull request #359 from mvt-project/optimise-domain-checking
Optimise domain checking performance
2023-06-29 14:56:50 +02:00
Donncha Ó Cearbhaill
3babbadc1d Add docs for the profiling feature 2023-06-29 14:55:09 +02:00
Donncha Ó Cearbhaill
41db117168 Improve performance when checking URLs and domains
Some MVT modules such as the WhatsApp module can be very slow as it was taking a naive approach to look for IOCs. The code was checking URLs (potentially more than 100k) against
1000's of IOC domains resulting in a quadratic run-time with hundreds of millions of comparisons as the number of IOCs increases.

This commit add an Aho-Corasick library which allows the efficient search in a string (the URL in this case) for all matches in set of keys (the IOCs). This data structure is perfect for this use case.

A quick measurement shows a 80% performance improvement for a WhatsApp database with 100k entries. The slow path is now the time spent fetching and expanding short URLs found in the database. This
can also be sped up significantly by fetching each URL asynchronously. This would require reworking modules to split the URL expansion from the IOC check so I will implement in a separate PR.
2023-06-29 14:14:44 +02:00
Donncha Ó Cearbhaill
2b01ed7179 Add optional profiling for MVT modules 2023-06-29 13:31:13 +02:00
Donncha Ó Cearbhaill
78d493b17e Merge pull request #356 from mvt-project/auto/add-new-ios-releases
[auto] Update iOS releases and versions
2023-06-22 11:06:45 +02:00
DonnchaC
473c80009b Add new iOS versions and build numbers 2023-06-22 00:17:52 +00:00
tek
a1481683e3 Adapts linter workflow to black 2023-06-14 01:05:14 +02:00
Nex
bdd36a9179 Merge pull request #349 from mvt-project/code-cleanup
Linted code using isort + autoflake + black
2023-06-08 21:12:34 +02:00
Nex
e1677639c4 Linted code using isort + autoflake + black, fixed wrong use of Optional[bool] 2023-06-01 23:40:26 +02:00
tek
c2d740ed36 Handle better some empty database issues in iOS backups 2023-05-25 00:24:34 +02:00
tek
d0e24c6369 Fixes a bug in the applications module 2023-05-24 12:04:03 +02:00
tek
a1994079b1 Sort imports 2023-05-24 12:03:49 +02:00
Donncha Ó Cearbhaill
289b7efdeb Add missing iOS build numbers 2023-05-21 17:11:07 +01:00
Donncha Ó Cearbhaill
166a63e14c Merge pull request #347 from mvt-project/auto/add-new-ios-releases
[auto] Update iOS releases and versions
2023-05-21 17:54:25 +02:00
DonnchaC
1b933fdb12 Add new iOS versions and build numbers 2023-05-21 15:53:45 +00:00
Donncha Ó Cearbhaill
0c0ff7012b Set branch number for auto-generated pull request 2023-05-21 16:52:47 +01:00
Donncha Ó Cearbhaill
f9b0d07a81 Don't include information beta's in the version JSON 2023-05-21 16:49:14 +01:00
Donncha Ó Cearbhaill
d14bcdd05f Update title used in auto PR for new iOS versions 2023-05-21 16:47:56 +01:00
Donncha Ó Cearbhaill
e026bb0a76 Fix path to script in workflow 2023-05-21 16:44:17 +01:00
Donncha Ó Cearbhaill
253b4f031a Allow workflow to be triggered manually 2023-05-21 16:42:54 +01:00
Donncha Ó Cearbhaill
ec14297643 Merge pull request #345 from mvt-project/feature/auto-update-version-info
Add workflow to auto-update iOS builds and version numbers
2023-05-21 17:38:46 +02:00
Donncha Ó Cearbhaill
3142d86edd Fix path to include version JSON files in built package 2023-05-21 16:37:36 +01:00
Donncha Ó Cearbhaill
c18998d771 Add version 16.5 to resolve merge conflict from main 2023-05-21 16:26:12 +01:00
Donncha Ó Cearbhaill
22fd794fb8 Fix python style and setup.cfg syntax 2023-05-21 16:15:49 +01:00
Donncha Ó Cearbhaill
27c5c76dc2 Add script and worker to auto-update build and version info 2023-05-21 16:09:50 +01:00
Donncha Ó Cearbhaill
fafbac3545 Fix sorting of version numbers 2023-05-20 21:49:27 +01:00
Donncha Ó Cearbhaill
bbfaadd297 Load iOS device and build information from a JSON file. 2023-05-20 21:24:14 +01:00
tek
85abed55b6 Merge branch 'main' of github.com:mvt-project/mvt 2023-05-20 00:14:01 +02:00
tek
2fbd7607ef Adds latest iOS version 2023-05-20 00:11:16 +02:00
150 changed files with 5107 additions and 2556 deletions

11
.github/workflows/black.yml vendored Normal file
View File

@@ -0,0 +1,11 @@
name: Black
on: [push]
jobs:
black:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: psf/black@stable
with:
options: "--check"

View File

@@ -0,0 +1,89 @@
"""
Python script to download the Apple RSS feed and parse it.
"""
import json
import os
import urllib.request
from xml.dom.minidom import parseString
from packaging import version
def download_apple_rss(feed_url):
with urllib.request.urlopen(feed_url) as f:
rss_feed = f.read().decode("utf-8")
print("Downloaded RSS feed from Apple.")
return rss_feed
def parse_latest_ios_versions(rss_feed_text):
latest_ios_versions = []
parsed_feed = parseString(rss_feed_text)
for item in parsed_feed.getElementsByTagName("item"):
title = item.getElementsByTagName("title")[0].firstChild.data
if not title.startswith("iOS"):
continue
import re
build_match = re.match(
r"iOS (?P<version>[\d\.]+) (?P<beta>beta )?(\S*)?\((?P<build>.*)\)", title
)
if not build_match:
print("Could not parse iOS build:", title)
continue
release_info = build_match.groupdict()
if release_info["beta"]:
print("Skipping beta release:", title)
continue
release_info.pop("beta")
latest_ios_versions.append(release_info)
return latest_ios_versions
def update_mvt(mvt_checkout_path, latest_ios_versions):
version_path = os.path.join(mvt_checkout_path, "mvt/ios/data/ios_versions.json")
with open(version_path, "r") as version_file:
current_versions = json.load(version_file)
new_entry_count = 0
for new_version in latest_ios_versions:
for current_version in current_versions:
if new_version["build"] == current_version["build"]:
break
else:
# New version that does not exist in current data
current_versions.append(new_version)
new_entry_count += 1
if not new_entry_count:
print("No new iOS versions found.")
else:
print("Found {} new iOS versions.".format(new_entry_count))
new_version_list = sorted(
current_versions, key=lambda x: version.Version(x["version"])
)
with open(version_path, "w") as version_file:
json.dump(new_version_list, version_file, indent=4)
def main():
print("Downloading RSS feed...")
mvt_checkout_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "../../../")
)
rss_feed = download_apple_rss(
"https://developer.apple.com/news/releases/rss/releases.rss"
)
latest_ios_version = parse_latest_ios_versions(rss_feed)
update_mvt(mvt_checkout_path, latest_ios_version)
if __name__ == "__main__":
main()

29
.github/workflows/update-ios-data.yml vendored Normal file
View File

@@ -0,0 +1,29 @@
name: Update iOS releases and version numbers
run-name: ${{ github.actor }} is finding the latest iOS release version and build numbers
on:
workflow_dispatch:
schedule:
# * is a special character in YAML so you have to quote this string
- cron: '0 */6 * * *'
jobs:
update-ios-version:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- name: Run script to fetch latest iOS releases from Apple RSS feed.
run: python3 .github/workflows/scripts/update-ios-releases.py
- name: Create Pull Request
uses: peter-evans/create-pull-request@v5
with:
title: '[auto] Update iOS releases and versions'
commit-message: Add new iOS versions and build numbers
branch: auto/add-new-ios-releases
body: |
This is an automated pull request to update the iOS releases and version numbers.
add-paths: |
*.json
labels: |
automated pr

View File

@@ -4,6 +4,7 @@ check:
flake8
pytest -q
ruff check -q .
black --check .
clean:
rm -rf $(PWD)/build $(PWD)/dist $(PWD)/mvt.egg-info

View File

@@ -11,10 +11,24 @@
Mobile Verification Toolkit (MVT) is a collection of utilities to simplify and automate the process of gathering forensic traces helpful to identify a potential compromise of Android and iOS devices.
It has been developed and released by the [Amnesty International Security Lab](https://www.amnesty.org/en/tech/) in July 2021 in the context of the [Pegasus project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology and forensic evidence](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/).
It has been developed and released by the [Amnesty International Security Lab](https://www.amnesty.org/en/tech/) in July 2021 in the context of the [Pegasus Project](https://forbiddenstories.org/about-the-pegasus-project/) along with [a technical forensic methodology](https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/). It continues to be maintained by Amnesty International and other contributors.
*Warning*: MVT is a forensic research tool intended for technologists and investigators. Using it requires understanding the basics of forensic analysis and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek expert assistance.
> **Note**
> MVT is a forensic research tool intended for technologists and investigators. It requires understanding digital forensics and using command-line tools. This is not intended for end-user self-assessment. If you are concerned with the security of your device please seek reputable expert assistance.
>
### Indicators of Compromise
MVT supports using public [indicators of compromise (IOCs)](https://github.com/mvt-project/mvt-indicators) to scan mobile devices for potential traces of targeting or infection by known spyware campaigns. This includes IOCs published by [Amnesty International](https://github.com/AmnestyTech/investigations/) and other research groups.
> **Warning**
> Public indicators of compromise are insufficient to determine that a device is "clean", and not targeted with a particular spyware tool. Reliance on public indicators alone can miss recent forensic traces and give a false sense of security.
>
> Reliable and comprehensive digital forensic support and triage requires access to non-public indicators, research and threat intelligence.
>
>Such support is available to civil society through [Amnesty International's Security Lab](https://www.amnesty.org/en/tech/) or through our forensic partnership with [Access Nows Digital Security Helpline](https://www.accessnow.org/help/).
More information about using indicators of compromise with MVT is available in the [documentation](https://docs.mvt.re/en/latest/iocs/).
## Installation

27
docs/development.md Normal file
View File

@@ -0,0 +1,27 @@
# Development
The Mobile Verification Toolkit team welcomes contributions of new forensic modules or other contributions which help improve the software.
## Testing
MVT uses `pytest` for unit and integration tests. Code style consistency is maintained with `flake8`, `ruff` and `black`. All can
be run automatically with:
```bash
make check
```
Run these tests before making new commits or opening pull requests.
## Profiling
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
take to avoid inefficient code paths as we add new modules.
MVT modules can be profiled with Python built-in `cProfile` by setting the `MVT_PROFILE` environment variable.
```bash
MVT_PROFILE=1 dev/mvt-ios check-backup test_backup
```
Open an issue or PR if you are encountering significant performance issues when analyzing a device with MVT.

View File

@@ -43,6 +43,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/generated/stalkerware.stix2).
- We are also maintaining [a list of IOCs](https://github.com/mvt-project/mvt-indicators) in STIX format from public spyware campaigns.
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators listed [here](https://github.com/mvt-project/mvt/blob/main/public_indicators.json) and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators from the [mvt-indicators](https://github.com/mvt-project/mvt-indicators/blob/main/indicators.yaml) repository and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.

View File

@@ -1,7 +1,7 @@
site_name: Mobile Verification Toolkit
repo_url: https://github.com/mvt-project/mvt
edit_uri: edit/main/docs/
copyright: Copyright &copy; 2021-2022 MVT Project Developers
copyright: Copyright &copy; 2021-2023 MVT Project Developers
site_description: Mobile Verification Toolkit Documentation
markdown_extensions:
- attr_list
@@ -46,4 +46,5 @@ nav:
- Check an Android Backup (SMS messages): "android/backup.md"
- Download APKs: "android/download_apks.md"
- Indicators of Compromise: "iocs.md"
- Development: "development.md"
- License: "license.md"

View File

@@ -8,10 +8,16 @@ import logging
import click
from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_HASHES, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL,
HELP_MSG_VERBOSE)
from mvt.common.help import (
HELP_MSG_FAST,
HELP_MSG_HASHES,
HELP_MSG_IOC,
HELP_MSG_LIST_MODULES,
HELP_MSG_MODULE,
HELP_MSG_OUTPUT,
HELP_MSG_SERIAL,
HELP_MSG_VERBOSE,
)
from mvt.common.logo import logo
from mvt.common.updates import IndicatorsUpdates
from mvt.common.utils import init_logging, set_verbose_logging
@@ -28,39 +34,54 @@ from .modules.bugreport import BUGREPORT_MODULES
init_logging()
log = logging.getLogger("mvt")
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
#==============================================================================
# ==============================================================================
# Main
#==============================================================================
# ==============================================================================
@click.group(invoke_without_command=False)
def cli():
logo()
#==============================================================================
# ==============================================================================
# Command: version
#==============================================================================
# ==============================================================================
@cli.command("version", help="Show the currently installed version of MVT")
def version():
return
#==============================================================================
# ==============================================================================
# Command: download-apks
#==============================================================================
@cli.command("download-apks", help="Download all or only non-system installed APKs",
context_settings=CONTEXT_SETTINGS)
# ==============================================================================
@cli.command(
"download-apks",
help="Download all or only non-system installed APKs",
context_settings=CONTEXT_SETTINGS,
)
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--all-apks", "-a", is_flag=True,
help="Extract all packages installed on the phone, including system packages")
@click.option(
"--all-apks",
"-a",
is_flag=True,
help="Extract all packages installed on the phone, including system packages",
)
@click.option("--virustotal", "-v", is_flag=True, help="Check packages on VirusTotal")
@click.option("--output", "-o", type=click.Path(exists=False),
help="Specify a path to a folder where you want to store the APKs")
@click.option("--from-file", "-f", type=click.Path(exists=True),
help="Instead of acquiring from phone, load an existing packages.json file for "
"lookups (mainly for debug purposes)")
@click.option(
"--output",
"-o",
type=click.Path(exists=False),
help="Specify a path to a folder where you want to store the APKs",
)
@click.option(
"--from-file",
"-f",
type=click.Path(exists=True),
help="Instead of acquiring from phone, load an existing packages.json file for "
"lookups (mainly for debug purposes)",
)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.pass_context
def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose):
@@ -99,16 +120,24 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose)
ctx.exit(1)
#==============================================================================
# ==============================================================================
# Command: check-adb
#==============================================================================
@cli.command("check-adb", help="Check an Android device over adb",
context_settings=CONTEXT_SETTINGS)
# ==============================================================================
@cli.command(
"check-adb",
help="Check an Android device over adb",
context_settings=CONTEXT_SETTINGS,
)
@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@@ -116,8 +145,13 @@ def download_apks(ctx, all_apks, virustotal, output, from_file, serial, verbose)
@click.pass_context
def check_adb(ctx, serial, iocs, output, fast, list_modules, module, verbose):
set_verbose_logging(verbose)
cmd = CmdAndroidCheckADB(results_path=output, ioc_files=iocs,
module_name=module, serial=serial, fast_mode=fast)
cmd = CmdAndroidCheckADB(
results_path=output,
ioc_files=iocs,
module_name=module,
serial=serial,
fast_mode=fast,
)
if list_modules:
cmd.list_modules()
@@ -128,19 +162,29 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module, verbose):
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android device produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the Android device produced %d detections!",
cmd.detected_count,
)
#==============================================================================
# ==============================================================================
# Command: check-bugreport
#==============================================================================
@cli.command("check-bugreport", help="Check an Android Bug Report",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
# ==============================================================================
@cli.command(
"check-bugreport",
help="Check an Android Bug Report",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@@ -148,10 +192,14 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module, verbose):
@click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_path):
set_verbose_logging(verbose)
# Always generate hashes as bug reports are small.
cmd = CmdAndroidCheckBugreport(target_path=bugreport_path,
results_path=output, ioc_files=iocs,
module_name=module, hashes=True)
# Always generate hashes as bug reports are small.
cmd = CmdAndroidCheckBugreport(
target_path=bugreport_path,
results_path=output,
ioc_files=iocs,
module_name=module,
hashes=True,
)
if list_modules:
cmd.list_modules()
@@ -162,19 +210,27 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android bug report produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the Android bug report produced %d detections!",
cmd.detected_count,
)
#==============================================================================
# ==============================================================================
# Command: check-backup
#==============================================================================
@cli.command("check-backup", help="Check an Android Backup",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
# ==============================================================================
@cli.command(
"check-backup", help="Check an Android Backup", context_settings=CONTEXT_SETTINGS
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@@ -182,8 +238,9 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
def check_backup(ctx, iocs, output, list_modules, verbose, backup_path):
set_verbose_logging(verbose)
# Always generate hashes as backups are generally small.
cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs, hashes=True)
cmd = CmdAndroidCheckBackup(
target_path=backup_path, results_path=output, ioc_files=iocs, hashes=True
)
if list_modules:
cmd.list_modules()
@@ -194,30 +251,46 @@ def check_backup(ctx, iocs, output, list_modules, verbose, backup_path):
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the Android backup produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the Android backup produced %d detections!",
cmd.detected_count,
)
#==============================================================================
# ==============================================================================
# Command: check-androidqf
#==============================================================================
@cli.command("check-androidqf", help="Check data collected with AndroidQF",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
# ==============================================================================
@cli.command(
"check-androidqf",
help="Check data collected with AndroidQF",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("ANDROIDQF_PATH", type=click.Path(exists=True))
@click.pass_context
def check_androidqf(ctx, iocs, output, list_modules, module, hashes, verbose, androidqf_path):
def check_androidqf(
ctx, iocs, output, list_modules, module, hashes, verbose, androidqf_path
):
set_verbose_logging(verbose)
cmd = CmdAndroidCheckAndroidQF(target_path=androidqf_path,
results_path=output, ioc_files=iocs,
module_name=module, hashes=hashes)
cmd = CmdAndroidCheckAndroidQF(
target_path=androidqf_path,
results_path=output,
ioc_files=iocs,
module_name=module,
hashes=hashes,
)
if list_modules:
cmd.list_modules()
@@ -228,17 +301,28 @@ def check_androidqf(ctx, iocs, output, list_modules, module, hashes, verbose, an
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the AndroidQF acquisition produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the AndroidQF acquisition produced %d detections!",
cmd.detected_count,
)
#==============================================================================
# ==============================================================================
# Command: check-iocs
#==============================================================================
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
# ==============================================================================
@cli.command(
"check-iocs",
help="Compare stored JSON results to provided indicators",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("FOLDER", type=click.Path(exists=True))
@@ -254,11 +338,14 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
cmd.run()
#==============================================================================
# ==============================================================================
# Command: download-iocs
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators",
context_settings=CONTEXT_SETTINGS)
# ==============================================================================
@cli.command(
"download-iocs",
help="Download public STIX2 indicators",
context_settings=CONTEXT_SETTINGS,
)
def download_indicators():
ioc_updates = IndicatorsUpdates()
ioc_updates.update()

View File

@@ -14,7 +14,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckADB(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -22,11 +21,17 @@ class CmdAndroidCheckADB(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
log=log,
)
self.name = "check-adb"
self.modules = ADB_MODULES

View File

@@ -14,7 +14,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckAndroidQF(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -22,13 +21,19 @@ class CmdAndroidCheckAndroidQF(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES

View File

@@ -14,9 +14,12 @@ from typing import List, Optional
from rich.prompt import Prompt
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
)
from mvt.common.command import Command
from .modules.backup import BACKUP_MODULES
@@ -25,7 +28,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBackup(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -33,13 +35,19 @@ class CmdAndroidCheckBackup(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-backup"
self.modules = BACKUP_MODULES
@@ -85,16 +93,18 @@ class CmdAndroidCheckBackup(Command):
self.target_path = Path(self.target_path).absolute().as_posix()
for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)):
for fname in subfiles:
self.backup_files.append(os.path.relpath(os.path.join(root, fname),
self.target_path))
self.backup_files.append(
os.path.relpath(os.path.join(root, fname), self.target_path)
)
else:
log.critical("Invalid backup path, path should be a folder or an "
"Android Backup (.ab) file")
log.critical(
"Invalid backup path, path should be a folder or an "
"Android Backup (.ab) file"
)
sys.exit(1)
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
if self.backup_type == "folder":
module.from_folder(self.target_path, self.backup_files)
else:
module.from_ab(self.target_path, self.backup_archive,
self.backup_files)
module.from_ab(self.target_path, self.backup_archive, self.backup_files)

View File

@@ -18,7 +18,6 @@ log = logging.getLogger(__name__)
class CmdAndroidCheckBugreport(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -26,13 +25,19 @@ class CmdAndroidCheckBugreport(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
@@ -55,8 +60,9 @@ class CmdAndroidCheckBugreport(Command):
parent_path = Path(self.target_path).absolute().as_posix()
for root, _, subfiles in os.walk(os.path.abspath(self.target_path)):
for file_name in subfiles:
file_path = os.path.relpath(os.path.join(root, file_name),
parent_path)
file_path = os.path.relpath(
os.path.join(root, file_name), parent_path
)
self.bugreport_files.append(file_path)
def module_init(self, module: BugReportModule) -> None: # type: ignore[override]

View File

@@ -26,7 +26,7 @@ class DownloadAPKs(AndroidExtraction):
def __init__(
self,
results_path: Optional[str] = None,
all_apks: Optional[bool] = False,
all_apks: bool = False,
packages: Optional[list] = None,
) -> None:
"""Initialize module.
@@ -66,27 +66,31 @@ class DownloadAPKs(AndroidExtraction):
if "==/" in remote_path:
file_name = "_" + remote_path.split("==/")[1].replace(".apk", "")
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}.apk")
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}.apk"
)
name_counter = 0
while True:
if not os.path.exists(local_path):
break
name_counter += 1
local_path = os.path.join(self.results_path_apks,
f"{package_name}{file_name}_{name_counter}.apk")
local_path = os.path.join(
self.results_path_apks, f"{package_name}{file_name}_{name_counter}.apk"
)
try:
self._adb_download(remote_path, local_path)
except InsufficientPrivileges:
log.error("Unable to pull package file from %s: insufficient privileges, "
"it might be a system app", remote_path)
log.error(
"Unable to pull package file from %s: insufficient privileges, "
"it might be a system app",
remote_path,
)
self._adb_reconnect()
return None
except Exception as exc:
log.exception("Failed to pull package file from %s: %s",
remote_path, exc)
log.exception("Failed to pull package file from %s: %s", remote_path, exc)
self._adb_reconnect()
return None
@@ -106,10 +110,10 @@ class DownloadAPKs(AndroidExtraction):
self.packages = m.results
def pull_packages(self) -> None:
"""Download all files of all selected packages from the device.
"""
log.info("Starting extraction of installed APKs at folder %s",
self.results_path)
"""Download all files of all selected packages from the device."""
log.info(
"Starting extraction of installed APKs at folder %s", self.results_path
)
# If the user provided the flag --all-apks we select all packages.
packages_selection = []
@@ -123,8 +127,10 @@ class DownloadAPKs(AndroidExtraction):
if not package.get("system", False):
packages_selection.append(package)
log.info("Selected only %d packages which are not marked as \"system\"",
len(packages_selection))
log.info(
'Selected only %d packages which are not marked as "system"',
len(packages_selection),
)
if len(packages_selection) == 0:
log.info("No packages were selected for download")
@@ -136,19 +142,26 @@ class DownloadAPKs(AndroidExtraction):
if not os.path.exists(self.results_path_apks):
os.makedirs(self.results_path_apks, exist_ok=True)
for i in track(range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages..."):
for i in track(
range(len(packages_selection)),
description=f"Downloading {len(packages_selection)} packages...",
):
package = packages_selection[i]
log.info("[%d/%d] Package: %s", i, len(packages_selection),
package["package_name"])
log.info(
"[%d/%d] Package: %s",
i,
len(packages_selection),
package["package_name"],
)
# Sometimes the package path contains multiple lines for multiple
# apks. We loop through each line and download each file.
for package_file in package["files"]:
device_path = package_file["path"]
local_path = self.pull_package_file(package["package_name"],
device_path)
local_path = self.pull_package_file(
package["package_name"], device_path
)
if not local_path:
continue

View File

@@ -23,8 +23,24 @@ from .settings import Settings
from .sms import SMS
from .whatsapp import Whatsapp
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes, Getprop, Settings,
SELinuxStatus, DumpsysBatteryHistory, DumpsysBatteryDaily,
DumpsysReceivers, DumpsysActivities, DumpsysAccessibility,
DumpsysDBInfo, DumpsysFull, DumpsysAppOps, Packages, Logcat,
RootBinaries, Files]
ADB_MODULES = [
ChromeHistory,
SMS,
Whatsapp,
Processes,
Getprop,
Settings,
SELinuxStatus,
DumpsysBatteryHistory,
DumpsysBatteryDaily,
DumpsysReceivers,
DumpsysActivities,
DumpsysAccessibility,
DumpsysDBInfo,
DumpsysFull,
DumpsysAppOps,
Packages,
Logcat,
RootBinaries,
Files,
]

View File

@@ -16,13 +16,20 @@ from typing import Callable, Optional
from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
from adb_shell.auth.keygen import keygen, write_public_keyfile
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
UsbDeviceNotFoundError, UsbReadFailedError)
from adb_shell.exceptions import (
AdbCommandFailureException,
DeviceAuthError,
UsbDeviceNotFoundError,
UsbReadFailedError,
)
from rich.prompt import Prompt
from usb1 import USBErrorAccess, USBErrorBusy
from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header,
parse_backup_file)
from mvt.android.parsers.backup import (
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
)
from mvt.common.module import InsufficientPrivileges, MVTModule
ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey")
@@ -37,13 +44,18 @@ class AndroidExtraction(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.device = None
self.serial = None
@@ -78,36 +90,49 @@ class AndroidExtraction(MVTModule):
try:
self.device = AdbDeviceUsb(serial=self.serial)
except UsbDeviceNotFoundError:
self.log.critical("No device found. Make sure it is connected and unlocked.")
self.log.critical(
"No device found. Make sure it is connected and unlocked."
)
sys.exit(-1)
# Otherwise we try to use the TCP transport.
else:
addr = self.serial.split(":")
if len(addr) < 2:
raise ValueError("TCP serial number must follow the format: `address:port`")
raise ValueError(
"TCP serial number must follow the format: `address:port`"
)
self.device = AdbDeviceTcp(addr[0], int(addr[1]),
default_transport_timeout_s=30.)
self.device = AdbDeviceTcp(
addr[0], int(addr[1]), default_transport_timeout_s=30.0
)
while True:
try:
self.device.connect(rsa_keys=[signer], auth_timeout_s=5)
except (USBErrorBusy, USBErrorAccess):
self.log.critical("Device is busy, maybe run `adb kill-server` and try again.")
self.log.critical(
"Device is busy, maybe run `adb kill-server` and try again."
)
sys.exit(-1)
except DeviceAuthError:
self.log.error("You need to authorize this computer on the Android device. "
"Retrying in 5 seconds...")
self.log.error(
"You need to authorize this computer on the Android device. "
"Retrying in 5 seconds..."
)
time.sleep(5)
except UsbReadFailedError:
self.log.error("Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again.")
self.log.error(
"Unable to connect to the device over USB. "
"Try to unplug, plug the device and start again."
)
sys.exit(-1)
except OSError as exc:
if exc.errno == 113 and self.serial:
self.log.critical("Unable to connect to the device %s: "
"did you specify the correct IP address?",
self.serial)
self.log.critical(
"Unable to connect to the device %s: "
"did you specify the correct IP address?",
self.serial,
)
sys.exit(-1)
else:
break
@@ -144,9 +169,11 @@ class AndroidExtraction(MVTModule):
def _adb_root_or_die(self) -> None:
"""Check if we have a `su` binary, otherwise raise an Exception."""
if not self._adb_check_if_root():
raise InsufficientPrivileges("This module is optionally available "
"in case the device is already rooted."
" Do NOT root your own device!")
raise InsufficientPrivileges(
"This module is optionally available "
"in case the device is already rooted."
" Do NOT root your own device!"
)
def _adb_command_as_root(self, command):
"""Execute an adb shell command.
@@ -177,7 +204,7 @@ class AndroidExtraction(MVTModule):
remote_path: str,
local_path: str,
progress_callback: Optional[Callable] = None,
retry_root: Optional[bool] = True
retry_root: Optional[bool] = True,
) -> None:
"""Download a file form the device.
@@ -192,41 +219,48 @@ class AndroidExtraction(MVTModule):
self.device.pull(remote_path, local_path, progress_callback)
except AdbCommandFailureException as exc:
if retry_root:
self._adb_download_root(remote_path, local_path,
progress_callback)
self._adb_download_root(remote_path, local_path, progress_callback)
else:
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
raise Exception(
f"Unable to download file {remote_path}: {exc}"
) from exc
def _adb_download_root(
self,
remote_path: str,
local_path: str,
progress_callback: Optional[Callable] = None
progress_callback: Optional[Callable] = None,
) -> None:
try:
# Check if we have root, if not raise an Exception.
self._adb_root_or_die()
# We generate a random temporary filename.
allowed_chars = (string.ascii_uppercase
+ string.ascii_lowercase
+ string.digits)
tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10))
allowed_chars = (
string.ascii_uppercase + string.ascii_lowercase + string.digits
)
tmp_filename = "tmp_" + "".join(random.choices(allowed_chars, k=10))
# We create a temporary local file.
new_remote_path = f"/sdcard/{tmp_filename}"
# We copy the file from the data folder to /sdcard/.
cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}")
if cp_output.startswith("cp: ") and "No such file or directory" in cp_output:
if (
cp_output.startswith("cp: ")
and "No such file or directory" in cp_output
):
raise Exception(f"Unable to process file {remote_path}: File not found")
if cp_output.startswith("cp: ") and "Permission denied" in cp_output:
raise Exception(f"Unable to process file {remote_path}: Permission denied")
raise Exception(
f"Unable to process file {remote_path}: Permission denied"
)
# We download from /sdcard/ to the local temporary file.
# If it doesn't work now, don't try again (retry_root=False)
self._adb_download(new_remote_path, local_path, progress_callback,
retry_root=False)
self._adb_download(
new_remote_path, local_path, progress_callback, retry_root=False
)
# Delete the copy on /sdcard/.
self._adb_command(f"rm -rf {new_remote_path}")
@@ -234,8 +268,7 @@ class AndroidExtraction(MVTModule):
except AdbCommandFailureException as exc:
raise Exception(f"Unable to download file {remote_path}: {exc}") from exc
def _adb_process_file(self, remote_path: str,
process_routine: Callable) -> None:
def _adb_process_file(self, remote_path: str, process_routine: Callable) -> None:
"""Download a local copy of a file which is only accessible as root.
This is a wrapper around process_routine.
@@ -273,8 +306,10 @@ class AndroidExtraction(MVTModule):
self._adb_command(f"rm -f {new_remote_path}")
def _generate_backup(self, package_name: str) -> bytes:
self.log.info("Please check phone and accept Android backup prompt. "
"You may need to set a backup password. \a")
self.log.info(
"Please check phone and accept Android backup prompt. "
"You may need to set a backup password. \a"
)
# TODO: Base64 encoding as temporary fix to avoid byte-mangling over
# the shell transport...
@@ -284,19 +319,19 @@ class AndroidExtraction(MVTModule):
header = parse_ab_header(backup_output)
if not header["backup"]:
self.log.error("Extracting SMS via Android backup failed. "
"No valid backup data found.")
self.log.error(
"Extracting SMS via Android backup failed. "
"No valid backup data found."
)
return None
if header["encryption"] == "none":
return parse_backup_file(backup_output, password=None)
for _ in range(0, 3):
backup_password = Prompt.ask("Enter backup password",
password=True)
backup_password = Prompt.ask("Enter backup password", password=True)
try:
decrypted_backup_tar = parse_backup_file(backup_output,
backup_password)
decrypted_backup_tar = parse_backup_file(backup_output, backup_password)
return decrypted_backup_tar
except InvalidBackupPassword:
self.log.error("You provided the wrong password! Please try again...")

View File

@@ -8,8 +8,7 @@ import os
import sqlite3
from typing import Optional, Union
from mvt.common.utils import (convert_chrometime_to_datetime,
convert_datetime_to_iso)
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from .base import AndroidExtraction
@@ -24,13 +23,18 @@ class ChromeHistory(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = []
def serialize(self, record: dict) -> Union[dict, list]:
@@ -39,7 +43,7 @@ class ChromeHistory(AndroidExtraction):
"module": self.__class__.__name__,
"event": "visit",
"data": f"{record['id']} - {record['url']} (visit ID: {record['visit_id']}, "
f"redirect source: {record['redirect_source']})"
f"redirect source: {record['redirect_source']})",
}
def check_indicators(self) -> None:
@@ -59,7 +63,8 @@ class ChromeHistory(AndroidExtraction):
assert isinstance(self.results, list) # assert results type for mypy
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("""
cur.execute(
"""
SELECT
urls.id,
urls.url,
@@ -69,31 +74,35 @@ class ChromeHistory(AndroidExtraction):
FROM urls
JOIN visits ON visits.url = urls.id
ORDER BY visits.visit_time;
""")
"""
)
for item in cur:
self.results.append({
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_datetime_to_iso(
convert_chrometime_to_datetime(item[3])),
"redirect_source": item[4],
})
self.results.append(
{
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_datetime_to_iso(
convert_chrometime_to_datetime(item[3])
),
"redirect_source": item[4],
}
)
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items",
len(self.results))
self.log.info("Extracted a total of %d history items", len(self.results))
def run(self) -> None:
self._adb_connect()
try:
self._adb_process_file(os.path.join("/", CHROME_HISTORY_PATH),
self._parse_db)
self._adb_process_file(
os.path.join("/", CHROME_HISTORY_PATH), self._parse_db
)
except Exception as exc:
self.log.error(exc)

View File

@@ -19,13 +19,18 @@ class DumpsysAccessibility(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -46,8 +51,10 @@ class DumpsysAccessibility(AndroidExtraction):
self.results = parse_dumpsys_accessibility(output)
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info("Identified a total of %d accessibility services",
len(self.results))
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -19,13 +19,18 @@ class DumpsysActivities(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}

View File

@@ -21,13 +21,18 @@ class DumpsysAppOps(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
@@ -37,13 +42,15 @@ class DumpsysAppOps(AndroidExtraction):
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
})
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
return records
@@ -57,10 +64,14 @@ class DumpsysAppOps(AndroidExtraction):
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES "
"permission", result["package_name"])
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES " "permission",
result["package_name"],
)
def run(self) -> None:
self._adb_connect()
@@ -69,5 +80,6 @@ class DumpsysAppOps(AndroidExtraction):
self.results = parse_dumpsys_appops(output)
self.log.info("Extracted a total of %d records from app-ops manager",
len(self.results))
self.log.info(
"Extracted a total of %d records from app-ops manager", len(self.results)
)

View File

@@ -19,13 +19,18 @@ class DumpsysBatteryDaily(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -33,7 +38,7 @@ class DumpsysBatteryDaily(AndroidExtraction):
"module": self.__class__.__name__,
"event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}"
f"with vers {record['vers']}",
}
def check_indicators(self) -> None:
@@ -54,5 +59,6 @@ class DumpsysBatteryDaily(AndroidExtraction):
self.results = parse_dumpsys_battery_daily(output)
self.log.info("Extracted %d records from battery daily stats",
len(self.results))
self.log.info(
"Extracted %d records from battery daily stats", len(self.results)
)

View File

@@ -19,13 +19,18 @@ class DumpsysBatteryHistory(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -45,5 +50,4 @@ class DumpsysBatteryHistory(AndroidExtraction):
self.results = parse_dumpsys_battery_history(output)
self.log.info("Extracted %d records from battery history",
len(self.results))
self.log.info("Extracted %d records from battery history", len(self.results))

View File

@@ -21,13 +21,18 @@ class DumpsysDBInfo(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -49,5 +54,7 @@ class DumpsysDBInfo(AndroidExtraction):
self.results = parse_dumpsys_dbinfo(output)
self.log.info("Extracted a total of %d records from database information",
len(self.results))
self.log.info(
"Extracted a total of %d records from database information",
len(self.results),
)

View File

@@ -18,13 +18,18 @@ class DumpsysFull(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()

View File

@@ -25,13 +25,18 @@ class DumpsysReceivers(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}
@@ -42,21 +47,31 @@ class DumpsysReceivers(AndroidExtraction):
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept outgoing SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming data SMS message: "%s"',
receiver["receiver"],
)
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
self.log.info(
"Found a receiver monitoring "
'telephony state/incoming calls: "%s"',
receiver["receiver"],
)
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver monitoring outgoing calls: "%s"',
receiver["receiver"],
)
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:

View File

@@ -30,13 +30,18 @@ class Files(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.full_find = False
def serialize(self, record: dict) -> Union[dict, list, None]:
@@ -53,12 +58,15 @@ class Files(AndroidExtraction):
def check_indicators(self) -> None:
for result in self.results:
if result.get("is_suid"):
self.log.warning("Found an SUID file in a non-standard directory \"%s\".",
result["path"])
self.log.warning(
'Found an SUID file in a non-standard directory "%s".',
result["path"],
)
if self.indicators and self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"",
result["path"])
self.log.warning(
'Found a known suspicous file at path: "%s"', result["path"]
)
self.detected.append(result)
def backup_file(self, file_path: str) -> None:
@@ -73,13 +81,13 @@ class Files(AndroidExtraction):
local_file_path = os.path.join(local_files_folder, local_file_name)
try:
self._adb_download(remote_path=file_path,
local_path=local_file_path)
self._adb_download(remote_path=file_path, local_path=local_file_path)
except Exception:
pass
else:
self.log.info("Downloaded file %s to local copy at %s",
file_path, local_file_path)
self.log.info(
"Downloaded file %s to local copy at %s", file_path, local_file_path
)
def find_files(self, folder: str) -> None:
assert isinstance(self.results, list)
@@ -92,20 +100,21 @@ class Files(AndroidExtraction):
if len(file_line) < 6:
self.log.info("Skipping invalid file info - %s", file_line.rstrip())
continue
[unix_timestamp, mode, size,
owner, group, full_path] = file_info
[unix_timestamp, mode, size, owner, group, full_path] = file_info
mod_time = convert_unix_to_iso(unix_timestamp)
self.results.append({
"path": full_path,
"modified_time": mod_time,
"mode": mode,
"is_suid": (int(mode, 8) & stat.S_ISUID) == 2048,
"is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024,
"size": size,
"owner": owner,
"group": group,
})
self.results.append(
{
"path": full_path,
"modified_time": mod_time,
"mode": mode,
"is_suid": (int(mode, 8) & stat.S_ISUID) == 2048,
"is_sgid": (int(mode, 8) & stat.S_ISGID) == 1024,
"size": size,
"owner": owner,
"group": group,
}
)
else:
output = self._adb_command(f"find '{folder}' -type f 2> /dev/null")
for file_line in output.splitlines():
@@ -123,15 +132,15 @@ class Files(AndroidExtraction):
self.find_files(tmp_folder)
for entry in self.results:
self.log.info("Found file in tmp folder at path %s",
entry.get("path"))
self.log.info("Found file in tmp folder at path %s", entry.get("path"))
self.backup_file(entry.get("path"))
for media_folder in ANDROID_MEDIA_FOLDERS:
self.find_files(media_folder)
self.log.info("Found %s files in primary Android tmp and media folders",
len(self.results))
self.log.info(
"Found %s files in primary Android tmp and media folders", len(self.results)
)
if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping full file listing")

View File

@@ -20,13 +20,18 @@ class Getprop(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results
@@ -52,10 +57,11 @@ class Getprop(AndroidExtraction):
if entry.get("name", "") != "ro.build.version.security_patch":
continue
patch_date = datetime.strptime(entry["value"], "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning("This phone has not received security updates "
"for more than six months (last update: %s)",
entry["value"])
if (datetime.now() - patch_date) > timedelta(days=6 * 30):
self.log.warning(
"This phone has not received security updates "
"for more than six months (last update: %s)",
entry["value"],
)
self.log.info("Extracted %d Android system properties",
len(self.results))
self.log.info("Extracted %d Android system properties", len(self.results))

View File

@@ -18,37 +18,40 @@ class Logcat(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None:
self._adb_connect()
# Get the current logcat.
output = self._adb_command("logcat -d -b all \"*:V\"")
output = self._adb_command('logcat -d -b all "*:V"')
# Get the locat prior to last reboot.
last_output = self._adb_command("logcat -L -b all \"*:V\"")
last_output = self._adb_command('logcat -L -b all "*:V"')
if self.results_path:
logcat_path = os.path.join(self.results_path,
"logcat.txt")
logcat_path = os.path.join(self.results_path, "logcat.txt")
with open(logcat_path, "w", encoding="utf-8") as handle:
handle.write(output)
self.log.info("Current logcat logs stored at %s",
logcat_path)
self.log.info("Current logcat logs stored at %s", logcat_path)
logcat_last_path = os.path.join(self.results_path,
"logcat_last.txt")
logcat_last_path = os.path.join(self.results_path, "logcat_last.txt")
with open(logcat_last_path, "w", encoding="utf-8") as handle:
handle.write(last_output)
self.log.info("Logcat logs prior to last reboot stored at %s",
logcat_last_path)
self.log.info(
"Logcat logs prior to last reboot stored at %s", logcat_last_path
)
self._adb_disconnect()

View File

@@ -93,59 +93,65 @@ class Packages(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
timestamps = [
{
"event": "package_install",
"timestamp": record["timestamp"]
},
{"event": "package_install", "timestamp": record["timestamp"]},
{
"event": "package_first_install",
"timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
"timestamp": record["first_install_time"],
},
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for timestamp in timestamps:
records.append({
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']},"
f" third party: {record['third_party']})",
})
records.append(
{
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"{record['package_name']} (system: {record['system']},"
f" third party: {record['third_party']})",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
result["package_name"],
)
self.detected.append(result)
continue
if result["package_name"] in SECURITY_PACKAGES and result["disabled"]:
self.log.warning("Found a security package disabled: \"%s\"",
result["package_name"])
self.log.warning(
'Found a security package disabled: "%s"', result["package_name"]
)
if result["package_name"] in SYSTEM_UPDATE_PACKAGES and result["disabled"]:
self.log.warning("System OTA update package \"%s\" disabled on the phone",
result["package_name"])
self.log.warning(
'System OTA update package "%s" disabled on the phone',
result["package_name"],
)
if not self.indicators:
continue
@@ -239,22 +245,24 @@ class Packages(AndroidExtraction):
for file_path in output.splitlines():
file_path = file_path.strip()
md5 = self._adb_command(
f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(
f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(
f"sha256sum {file_path}").split(" ", maxsplit=1)[0]
sha512 = self._adb_command(
f"sha512sum {file_path}").split(" ", maxsplit=1)[0]
md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0]
sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0]
sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[
0
]
sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[
0
]
package_files.append({
"path": file_path,
"md5": md5,
"sha1": sha1,
"sha256": sha256,
"sha512": sha512,
})
package_files.append(
{
"path": file_path,
"md5": md5,
"sha1": sha1,
"sha256": sha256,
"sha512": sha512,
}
)
return package_files
@@ -290,8 +298,7 @@ class Packages(AndroidExtraction):
"files": package_files,
}
dumpsys_package = self._adb_command(
f"dumpsys package {package_name}")
dumpsys_package = self._adb_command(f"dumpsys package {package_name}")
package_details = self.parse_package_for_details(dumpsys_package)
new_package.update(package_details)
@@ -324,10 +331,12 @@ class Packages(AndroidExtraction):
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Third-party package \"%s\" requested %d "
"potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
self.log.info(
'Third-party package "%s" requested %d '
"potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count,
)
packages_to_lookup = []
for result in self.results:
@@ -335,14 +344,18 @@ class Packages(AndroidExtraction):
continue
packages_to_lookup.append(result)
self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s",
result["package_name"], result["installer"],
result["timestamp"])
self.log.info(
'Found non-system package with name "%s" installed by "%s" on %s',
result["package_name"],
result["installer"],
result["timestamp"],
)
if not self.fast_mode:
self.check_virustotal(packages_to_lookup)
self.log.info("Extracted at total of %d installed package names",
len(self.results))
self.log.info(
"Extracted at total of %d installed package names", len(self.results)
)
self._adb_disconnect()

View File

@@ -17,13 +17,18 @@ class Processes(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -82,5 +87,4 @@ class Processes(AndroidExtraction):
self._adb_disconnect()
self.log.info("Extracted records on a total of %d processes",
len(self.results))
self.log.info("Extracted records on a total of %d processes", len(self.results))

View File

@@ -17,13 +17,18 @@ class RootBinaries(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None:
root_binaries = [
@@ -56,6 +61,6 @@ class RootBinaries(AndroidExtraction):
continue
self.detected.append(root_binary)
self.log.warning("Found root binary \"%s\"", root_binary)
self.log.warning('Found root binary "%s"', root_binary)
self._adb_disconnect()

View File

@@ -19,13 +19,18 @@ class SELinuxStatus(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results
@@ -40,4 +45,4 @@ class SELinuxStatus(AndroidExtraction):
if status == "enforcing":
self.log.info("SELinux is being regularly enforced")
else:
self.log.warning("SELinux status is \"%s\"!", status)
self.log.warning('SELinux status is "%s"!', status)

View File

@@ -53,7 +53,7 @@ ANDROID_DANGEROUS_SETTINGS = [
"description": "enabled installation of non Google Play apps",
"key": "install_non_market_apps",
"safe_value": "0",
}
},
]
@@ -65,13 +65,18 @@ class Settings(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results
@@ -82,8 +87,12 @@ class Settings(AndroidExtraction):
# Check if one of the dangerous settings is using an unsafe
# value (different than the one specified).
if danger["key"] == key and danger["safe_value"] != value:
self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
key, value, danger["description"])
self.log.warning(
'Found suspicious setting "%s = %s" (%s)',
key,
value,
danger["description"],
)
break
def run(self) -> None:

View File

@@ -8,8 +8,7 @@ import os
import sqlite3
from typing import Optional, Union
from mvt.android.parsers.backup import (AndroidBackupParsingError,
parse_tar_for_sms)
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_unix_to_iso
@@ -50,13 +49,18 @@ class SMS(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.sms_db_type = 0
@@ -66,7 +70,7 @@ class SMS(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"sms_{record['direction']}",
"data": f"{record.get('address', 'unknown source')}: \"{body}\""
"data": f"{record.get('address', 'unknown source')}: \"{body}\"",
}
def check_indicators(self) -> None:
@@ -105,7 +109,7 @@ class SMS(AndroidExtraction):
for index, value in enumerate(item):
message[names[index]] = value
message["direction"] = ("received" if message["incoming"] == 1 else "sent")
message["direction"] = "received" if message["incoming"] == 1 else "sent"
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# Extract links in the message body
@@ -117,8 +121,7 @@ class SMS(AndroidExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d SMS messages",
len(self.results))
self.log.info("Extracted a total of %d SMS messages", len(self.results))
def _extract_sms_adb(self) -> None:
"""Use the Android backup command to extract SMS data from the native
@@ -135,13 +138,14 @@ class SMS(AndroidExtraction):
try:
self.results = parse_tar_for_sms(backup_tar)
except AndroidBackupParsingError:
self.log.info("Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor")
self.log.info(
"Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor"
)
return
self.log.info("Extracted a total of %d SMS messages",
len(self.results))
self.log.info("Extracted a total of %d SMS messages", len(self.results))
def run(self) -> None:
self._adb_connect()
@@ -149,20 +153,24 @@ class SMS(AndroidExtraction):
try:
if self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH)):
self.sms_db_type = 1
self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH),
self._parse_db)
self._adb_process_file(
os.path.join("/", SMS_BUGLE_PATH), self._parse_db
)
elif self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH)):
self.sms_db_type = 2
self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH),
self._parse_db)
self._adb_process_file(
os.path.join("/", SMS_MMSSMS_PATH), self._parse_db
)
self._adb_disconnect()
return
except InsufficientPrivileges:
pass
self.log.info("No SMS database found. Trying extraction of SMS data "
"using Android backup feature.")
self.log.info(
"No SMS database found. Trying extraction of SMS data "
"using Android backup feature."
)
self._extract_sms_adb()
self._adb_disconnect()

View File

@@ -24,13 +24,18 @@ class Whatsapp(AndroidExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
text = record["data"].replace("\n", "\\n")
@@ -38,7 +43,7 @@ class Whatsapp(AndroidExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": f"whatsapp_msg_{record['direction']}",
"data": f"\"{text}\""
"data": f'"{text}"',
}
def check_indicators(self) -> None:
@@ -61,9 +66,11 @@ class Whatsapp(AndroidExtraction):
"""
conn = sqlite3.connect(db_path)
cur = conn.cursor()
cur.execute("""
cur.execute(
"""
SELECT * FROM messages;
""")
"""
)
names = [description[0] for description in cur.description]
messages = []
@@ -75,32 +82,30 @@ class Whatsapp(AndroidExtraction):
if not message["data"]:
continue
message["direction"] = ("send" if message["key_from_me"] == 1 else "received")
message["direction"] = "send" if message["key_from_me"] == 1 else "received"
message["isodate"] = convert_unix_to_iso(message["timestamp"])
# If we find links in the messages or if they are empty we add them
# to the list.
if (check_for_links(message["data"])
or message["data"].strip() == ""):
if check_for_links(message["data"]) or message["data"].strip() == "":
if message.get("thumb_image"):
message["thumb_image"] = base64.b64encode(
message["thumb_image"])
message["thumb_image"] = base64.b64encode(message["thumb_image"])
messages.append(message)
cur.close()
conn.close()
self.log.info("Extracted a total of %d WhatsApp messages containing links",
len(messages))
self.log.info(
"Extracted a total of %d WhatsApp messages containing links", len(messages)
)
self.results = messages
def run(self) -> None:
self._adb_connect()
try:
self._adb_process_file(os.path.join("/", WHATSAPP_PATH),
self._parse_db)
self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db)
except Exception as exc:
self.log.error(exc)

View File

@@ -13,6 +13,14 @@ from .processes import Processes
from .settings import Settings
from .sms import SMS
ANDROIDQF_MODULES = [DumpsysActivities, DumpsysReceivers, DumpsysAccessibility,
DumpsysAppops, Processes, Getprop, Settings, SMS,
DumpsysPackages]
ANDROIDQF_MODULES = [
DumpsysActivities,
DumpsysReceivers,
DumpsysAccessibility,
DumpsysAppops,
Processes,
Getprop,
Settings,
SMS,
DumpsysPackages,
]

View File

@@ -19,13 +19,18 @@ class AndroidQFModule(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self._path = target_path
self._files = []

View File

@@ -19,13 +19,18 @@ class DumpsysAccessibility(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -53,7 +58,9 @@ class DumpsysAccessibility(AndroidQFModule):
if not in_accessibility:
continue
if line.strip().startswith("-------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"-------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
@@ -61,8 +68,10 @@ class DumpsysAccessibility(AndroidQFModule):
self.results = parse_dumpsys_accessibility("\n".join(lines))
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info("Identified a total of %d accessibility services",
len(self.results))
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -19,13 +19,18 @@ class DumpsysActivities(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}
@@ -56,7 +61,9 @@ class DumpsysActivities(AndroidQFModule):
if not in_package:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line.rstrip())

View File

@@ -12,19 +12,23 @@ from .base import AndroidQFModule
class DumpsysAppops(AndroidQFModule):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
@@ -34,13 +38,15 @@ class DumpsysAppops(AndroidQFModule):
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']} : {entry['access']}",
})
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']} : {entry['access']}",
}
)
return records
@@ -54,10 +60,14 @@ class DumpsysAppops(AndroidQFModule):
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"])
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"],
)
def run(self) -> None:
dumpsys_file = self._get_files_by_pattern("*/dumpsys.txt")
@@ -73,11 +83,12 @@ class DumpsysAppops(AndroidQFModule):
continue
if in_package:
if line.startswith("-------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.startswith(
"-------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line.rstrip())
self.results = parse_dumpsys_appops("\n".join(lines))
self.log.info("Identified %d applications in AppOps Manager",
len(self.results))
self.log.info("Identified %d applications in AppOps Manager", len(self.results))

View File

@@ -6,9 +6,11 @@
import logging
from typing import Any, Dict, List, Optional, Union
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from mvt.android.modules.adb.packages import (
DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES,
)
from mvt.android.parsers.dumpsys import parse_dumpsys_packages
from .base import AndroidQFModule
@@ -22,34 +24,43 @@ class DumpsysPackages(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[List[Dict[str, Any]]] = None
results: Optional[List[Dict[str, Any]]] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
entries = []
for entry in ["timestamp", "first_install_time", "last_update_time"]:
if entry in record:
entries.append({
"timestamp": record[entry],
"module": self.__class__.__name__,
"event": entry,
"data": f"Package {record['package_name']} "
f"({record['uid']})",
})
entries.append(
{
"timestamp": record[entry],
"module": self.__class__.__name__,
"event": entry,
"data": f"Package {record['package_name']} "
f"({record['uid']})",
}
)
return entries
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
result["package_name"],
)
self.detected.append(result)
continue
@@ -99,8 +110,10 @@ class DumpsysPackages(AndroidQFModule):
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
self.log.info(
'Found package "%s" requested %d potentially dangerous permissions',
result["package_name"],
dangerous_permissions_count,
)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -7,8 +7,12 @@ import logging
from typing import Any, Dict, List, Optional, Union
from mvt.android.modules.adb.dumpsys_receivers import (
INTENT_DATA_SMS_RECEIVED, INTENT_NEW_OUTGOING_CALL,
INTENT_NEW_OUTGOING_SMS, INTENT_PHONE_STATE, INTENT_SMS_RECEIVED)
INTENT_DATA_SMS_RECEIVED,
INTENT_NEW_OUTGOING_CALL,
INTENT_NEW_OUTGOING_SMS,
INTENT_PHONE_STATE,
INTENT_SMS_RECEIVED,
)
from mvt.android.parsers import parse_dumpsys_receiver_resolver_table
from .base import AndroidQFModule
@@ -22,13 +26,18 @@ class DumpsysReceivers(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Any], Dict[str, Any], None] = None
results: Union[List[Any], Dict[str, Any], None] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}
@@ -39,21 +48,31 @@ class DumpsysReceivers(AndroidQFModule):
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept outgoing SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming data SMS message: "%s"',
receiver["receiver"],
)
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
self.log.info(
"Found a receiver monitoring "
'telephony state/incoming calls: "%s"',
receiver["receiver"],
)
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver monitoring outgoing calls: "%s"',
receiver["receiver"],
)
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
@@ -76,7 +95,9 @@ class DumpsysReceivers(AndroidQFModule):
if not in_receivers:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line.rstrip())

View File

@@ -22,7 +22,7 @@ INTERESTING_PROPERTIES = [
"ro.product.locale",
"ro.product.vendor.manufacturer",
"ro.product.vendor.model",
"ro.product.vendor.name"
"ro.product.vendor.name",
]
@@ -34,13 +34,18 @@ class Getprop(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = []
def check_indicators(self) -> None:
@@ -68,9 +73,12 @@ class Getprop(AndroidQFModule):
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
last_patch = datetime.strptime(entry["value"], "%Y-%m-%d")
if (datetime.now() - last_patch) > timedelta(days=6*31):
self.log.warning("This phone has not received security "
"updates for more than six months "
"(last update: %s)", entry["value"])
if (datetime.now() - last_patch) > timedelta(days=6 * 31):
self.log.warning(
"This phone has not received security "
"updates for more than six months "
"(last update: %s)",
entry["value"],
)
self.log.info("Extracted a total of %d properties", len(self.results))

View File

@@ -17,13 +17,18 @@ class Processes(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -55,7 +60,7 @@ class Processes(AndroidQFModule):
# Sometimes WCHAN is empty.
if len(proc) == 8:
proc = proc[:5] + [''] + proc[5:]
proc = proc[:5] + [""] + proc[5:]
# Sometimes there is the security label.
if proc[0].startswith("u:r"):
@@ -68,18 +73,20 @@ class Processes(AndroidQFModule):
if len(proc) < 9:
proc = proc[:5] + [""] + proc[5:]
self.results.append({
"user": proc[0],
"pid": int(proc[1]),
"ppid": int(proc[2]),
"virtual_memory_size": int(proc[3]),
"resident_set_size": int(proc[4]),
"wchan": proc[5],
"aprocress": proc[6],
"stat": proc[7],
"proc_name": proc[8].strip("[]"),
"label": label,
})
self.results.append(
{
"user": proc[0],
"pid": int(proc[1]),
"ppid": int(proc[2]),
"virtual_memory_size": int(proc[3]),
"resident_set_size": int(proc[4]),
"wchan": proc[5],
"aprocress": proc[6],
"stat": proc[7],
"proc_name": proc[8].strip("[]"),
"label": label,
}
)
def run(self) -> None:
ps_files = self._get_files_by_pattern("*/ps.txt")

View File

@@ -19,18 +19,23 @@ class Settings(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {}
def run(self) -> None:
for setting_file in self._get_files_by_pattern("*/settings_*.txt"):
namespace = setting_file[setting_file.rfind("_")+1:-4]
namespace = setting_file[setting_file.rfind("_") + 1 : -4]
self.results[namespace] = {}
@@ -48,11 +53,15 @@ class Settings(AndroidQFModule):
continue
for danger in ANDROID_DANGEROUS_SETTINGS:
if (danger["key"] == key
and danger["safe_value"] != value):
self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
key, value, danger["description"])
if danger["key"] == key and danger["safe_value"] != value:
self.log.warning(
'Found suspicious setting "%s = %s" (%s)',
key,
value,
danger["description"],
)
break
self.log.info("Identified %d settings",
sum([len(val) for val in self.results.values()]))
self.log.info(
"Identified %d settings", sum([len(val) for val in self.results.values()])
)

View File

@@ -7,9 +7,13 @@ import getpass
import logging
from typing import Optional
from mvt.android.parsers.backup import (AndroidBackupParsingError,
InvalidBackupPassword, parse_ab_header,
parse_backup_file, parse_tar_for_sms)
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
InvalidBackupPassword,
parse_ab_header,
parse_backup_file,
parse_tar_for_sms,
)
from .base import AndroidQFModule
@@ -22,13 +26,18 @@ class SMS(AndroidQFModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -56,8 +65,10 @@ class SMS(AndroidQFModule):
self.log.critical("Invalid backup password")
return
except AndroidBackupParsingError:
self.log.critical("Impossible to parse this backup file, please use"
" Android Backup Extractor instead")
self.log.critical(
"Impossible to parse this backup file, please use"
" Android Backup Extractor instead"
)
return
if not tardata:
@@ -66,9 +77,11 @@ class SMS(AndroidQFModule):
try:
self.results = parse_tar_for_sms(tardata)
except AndroidBackupParsingError:
self.log.info("Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor")
self.log.info(
"Impossible to read SMS from the Android Backup, "
"please extract the SMS and try extracting it with "
"Android Backup Extractor"
)
return
def run(self) -> None:
@@ -81,5 +94,4 @@ class SMS(AndroidQFModule):
data = handle.read()
self.parse_backup(data)
self.log.info("Identified %d SMS in backup data",
len(self.results))
self.log.info("Identified %d SMS in backup data", len(self.results))

View File

@@ -20,13 +20,18 @@ class BackupExtraction(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.ab = None
self.backup_path = None
self.tar = None
@@ -39,7 +44,9 @@ class BackupExtraction(MVTModule):
self.backup_path = backup_path
self.files = files
def from_ab(self, file_path: Optional[str], tar: Optional[TarFile], files: List[str]) -> None:
def from_ab(
self, file_path: Optional[str], tar: Optional[TarFile], files: List[str]
) -> None:
"""
Extract the files
"""

View File

@@ -12,19 +12,23 @@ from mvt.common.utils import check_for_links
class SMS(BackupExtraction):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = []
def check_indicators(self) -> None:
@@ -55,5 +59,4 @@ class SMS(BackupExtraction):
data = self._get_file_content(file)
self.results.extend(parse_sms_file(data))
self.log.info("Extracted a total of %d SMS & MMS messages",
len(self.results))
self.log.info("Extracted a total of %d SMS & MMS messages", len(self.results))

View File

@@ -13,5 +13,14 @@ from .getprop import Getprop
from .packages import Packages
from .receivers import Receivers
BUGREPORT_MODULES = [Accessibility, Activities, Appops, BatteryDaily,
BatteryHistory, DBInfo, Getprop, Packages, Receivers]
BUGREPORT_MODULES = [
Accessibility,
Activities,
Appops,
BatteryDaily,
BatteryHistory,
DBInfo,
Getprop,
Packages,
Receivers,
]

View File

@@ -19,13 +19,18 @@ class Accessibility(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -41,8 +46,10 @@ class Accessibility(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -55,15 +62,19 @@ class Accessibility(BugReportModule):
if not in_accessibility:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_accessibility("\n".join(lines))
for result in self.results:
self.log.info("Found installed accessibility service \"%s\"",
result.get("service"))
self.log.info(
'Found installed accessibility service "%s"', result.get("service")
)
self.log.info("Identified a total of %d accessibility services",
len(self.results))
self.log.info(
"Identified a total of %d accessibility services", len(self.results)
)

View File

@@ -19,13 +19,18 @@ class Activities(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}
@@ -44,8 +49,10 @@ class Activities(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -58,7 +65,9 @@ class Activities(BugReportModule):
if not in_package:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)

View File

@@ -19,13 +19,18 @@ class Appops(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
@@ -35,13 +40,15 @@ class Appops(BugReportModule):
for entry in perm["entries"]:
if "timestamp" in entry:
records.append({
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
})
records.append(
{
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
return records
@@ -55,16 +62,22 @@ class Appops(BugReportModule):
continue
for perm in result["permissions"]:
if (perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"):
self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"])
if (
perm["name"] == "REQUEST_INSTALL_PACKAGES"
and perm["access"] == "allow"
):
self.log.info(
"Package %s with REQUEST_INSTALL_PACKAGES permission",
result["package_name"],
)
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -77,12 +90,15 @@ class Appops(BugReportModule):
if not in_appops:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_appops("\n".join(lines))
self.log.info("Identified a total of %d packages in App-Ops Manager",
len(self.results))
self.log.info(
"Identified a total of %d packages in App-Ops Manager", len(self.results)
)

View File

@@ -20,20 +20,27 @@ class BugReportModule(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.zip_archive: Optional[ZipFile] = None
self.extract_path: Optional[str] = None
self.extract_files: List[str] = []
self.zip_files: List[str] = []
def from_folder(self, extract_path: Optional[str], extract_files: List[str]) -> None:
def from_folder(
self, extract_path: Optional[str], extract_files: List[str]
) -> None:
self.extract_path = extract_path
self.extract_files = extract_files

View File

@@ -19,13 +19,18 @@ class BatteryDaily(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -33,7 +38,7 @@ class BatteryDaily(BugReportModule):
"module": self.__class__.__name__,
"event": "battery_daily",
"data": f"Recorded update of package {record['package_name']} "
f"with vers {record['vers']}"
f"with vers {record['vers']}",
}
def check_indicators(self) -> None:
@@ -50,8 +55,10 @@ class BatteryDaily(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -80,5 +87,4 @@ class BatteryDaily(BugReportModule):
self.results = parse_dumpsys_battery_daily("\n".join(lines))
self.log.info("Extracted a total of %d battery daily stats",
len(self.results))
self.log.info("Extracted a total of %d battery daily stats", len(self.results))

View File

@@ -19,13 +19,18 @@ class BatteryHistory(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -41,8 +46,10 @@ class BatteryHistory(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -63,5 +70,6 @@ class BatteryHistory(BugReportModule):
self.results = parse_dumpsys_battery_history("\n".join(lines))
self.log.info("Extracted a total of %d battery history records",
len(self.results))
self.log.info(
"Extracted a total of %d battery history records", len(self.results)
)

View File

@@ -21,13 +21,18 @@ class DBInfo(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def check_indicators(self) -> None:
if not self.indicators:
@@ -45,8 +50,10 @@ class DBInfo(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
in_dbinfo = False
@@ -59,12 +66,16 @@ class DBInfo(BugReportModule):
if not in_dbinfo:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)
self.results = parse_dumpsys_dbinfo("\n".join(lines))
self.log.info("Extracted a total of %d database connection pool records",
len(self.results))
self.log.info(
"Extracted a total of %d database connection pool records",
len(self.results),
)

View File

@@ -20,21 +20,28 @@ class Getprop(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {} if not results else results
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
lines = []
@@ -60,10 +67,11 @@ class Getprop(BugReportModule):
if entry["name"] == "ro.build.version.security_patch":
security_patch = entry["value"]
patch_date = datetime.strptime(security_patch, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6*30):
self.log.warning("This phone has not received security updates "
"for more than six months (last update: %s)",
security_patch)
if (datetime.now() - patch_date) > timedelta(days=6 * 30):
self.log.warning(
"This phone has not received security updates "
"for more than six months (last update: %s)",
security_patch,
)
self.log.info("Extracted %d Android system properties",
len(self.results))
self.log.info("Extracted %d Android system properties", len(self.results))

View File

@@ -6,9 +6,11 @@
import logging
from typing import Optional, Union
from mvt.android.modules.adb.packages import (DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES)
from mvt.android.modules.adb.packages import (
DANGEROUS_PERMISSIONS,
DANGEROUS_PERMISSIONS_THRESHOLD,
ROOT_PACKAGES,
)
from mvt.android.parsers.dumpsys import parse_dumpsys_packages
from .base import BugReportModule
@@ -22,48 +24,51 @@ class Packages(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
timestamps = [
{
"event": "package_install",
"timestamp": record["timestamp"]
},
{"event": "package_install", "timestamp": record["timestamp"]},
{
"event": "package_first_install",
"timestamp": record["first_install_time"]
},
{
"event": "package_last_update",
"timestamp": record["last_update_time"]
"timestamp": record["first_install_time"],
},
{"event": "package_last_update", "timestamp": record["last_update_time"]},
]
for timestamp in timestamps:
records.append({
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"Install or update of package {record['package_name']}",
})
records.append(
{
"timestamp": timestamp["timestamp"],
"module": self.__class__.__name__,
"event": timestamp["event"],
"data": f"Install or update of package {record['package_name']}",
}
)
return records
def check_indicators(self) -> None:
for result in self.results:
if result["package_name"] in ROOT_PACKAGES:
self.log.warning("Found an installed package related to "
"rooting/jailbreaking: \"%s\"",
result["package_name"])
self.log.warning(
"Found an installed package related to "
'rooting/jailbreaking: "%s"',
result["package_name"],
)
self.detected.append(result)
continue
@@ -79,8 +84,10 @@ class Packages(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
in_package = False
@@ -115,8 +122,10 @@ class Packages(BugReportModule):
dangerous_permissions_count += 1
if dangerous_permissions_count >= DANGEROUS_PERMISSIONS_THRESHOLD:
self.log.info("Found package \"%s\" requested %d potentially dangerous permissions",
result["package_name"],
dangerous_permissions_count)
self.log.info(
'Found package "%s" requested %d potentially dangerous permissions',
result["package_name"],
dangerous_permissions_count,
)
self.log.info("Extracted details on %d packages", len(self.results))

View File

@@ -25,13 +25,18 @@ class Receivers(BugReportModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = results if results else {}
@@ -42,21 +47,31 @@ class Receivers(BugReportModule):
for intent, receivers in self.results.items():
for receiver in receivers:
if intent == INTENT_NEW_OUTGOING_SMS:
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept outgoing SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming SMS messages: "%s"',
receiver["receiver"],
)
elif intent == INTENT_DATA_SMS_RECEIVED:
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver to intercept incoming data SMS message: "%s"',
receiver["receiver"],
)
elif intent == INTENT_PHONE_STATE:
self.log.info("Found a receiver monitoring "
"telephony state/incoming calls: \"%s\"",
receiver["receiver"])
self.log.info(
"Found a receiver monitoring "
'telephony state/incoming calls: "%s"',
receiver["receiver"],
)
elif intent == INTENT_NEW_OUTGOING_CALL:
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
receiver["receiver"])
self.log.info(
'Found a receiver monitoring outgoing calls: "%s"',
receiver["receiver"],
)
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
@@ -67,8 +82,10 @@ class Receivers(BugReportModule):
def run(self) -> None:
content = self._get_dumpstate_file()
if not content:
self.log.error("Unable to find dumpstate file. "
"Did you provide a valid bug report archive?")
self.log.error(
"Unable to find dumpstate file. "
"Did you provide a valid bug report archive?"
)
return
in_receivers = False
@@ -81,7 +98,9 @@ class Receivers(BugReportModule):
if not in_receivers:
continue
if line.strip().startswith("------------------------------------------------------------------------------"): # pylint: disable=line-too-long
if line.strip().startswith(
"------------------------------------------------------------------------------"
): # pylint: disable=line-too-long
break
lines.append(line)

View File

@@ -3,9 +3,13 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .dumpsys import (parse_dumpsys_accessibility,
parse_dumpsys_activity_resolver_table,
parse_dumpsys_appops, parse_dumpsys_battery_daily,
parse_dumpsys_battery_history, parse_dumpsys_dbinfo,
parse_dumpsys_receiver_resolver_table)
from .dumpsys import (
parse_dumpsys_accessibility,
parse_dumpsys_activity_resolver_table,
parse_dumpsys_appops,
parse_dumpsys_battery_daily,
parse_dumpsys_battery_history,
parse_dumpsys_dbinfo,
parse_dumpsys_receiver_resolver_table,
)
from .getprop import parse_getprop

View File

@@ -31,15 +31,16 @@ class InvalidBackupPassword(AndroidBackupParsingError):
# TODO: Need to clean all the following code and conform it to the coding style.
def to_utf8_bytes(input_bytes):
output = []
for byte in input_bytes:
if byte < ord(b'\x80'):
if byte < ord(b"\x80"):
output.append(byte)
else:
output.append(ord('\xef') | (byte >> 12))
output.append(ord('\xbc') | ((byte >> 6) & ord('\x3f')))
output.append(ord('\x80') | (byte & ord('\x3f')))
output.append(ord("\xef") | (byte >> 12))
output.append(ord("\xbc") | ((byte >> 6) & ord("\x3f")))
output.append(ord("\x80") | (byte & ord("\x3f")))
return bytes(output)
@@ -55,33 +56,38 @@ def parse_ab_header(data):
"backup": True,
"compression": (is_compressed == b"1"),
"version": int(version),
"encryption": encryption.decode("utf-8")
"encryption": encryption.decode("utf-8"),
}
return {
"backup": False,
"compression": None,
"version": None,
"encryption": None
}
return {"backup": False, "compression": None, "version": None, "encryption": None}
def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds,
master_key_blob, format_version, checksum_salt):
def decrypt_master_key(
password,
user_salt,
user_iv,
pbkdf2_rounds,
master_key_blob,
format_version,
checksum_salt,
):
"""Generate AES key from user password uisng PBKDF2
The backup master key is extracted from the master key blog after decryption.
"""
# Derive key from password using PBKDF2.
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt,
iterations=pbkdf2_rounds)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds
)
key = kdf.derive(password.encode("utf-8"))
# Decrypt master key blob.
cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv))
decryptor = cipher.decryptor()
try:
decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize()
decryted_master_key_blob = (
decryptor.update(master_key_blob) + decryptor.finalize()
)
# Extract key and IV from decrypted blob.
key_blob = io.BytesIO(decryted_master_key_blob)
@@ -103,8 +109,9 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds,
hmac_mk = master_key
# Derive checksum to confirm successful backup decryption.
kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt,
iterations=pbkdf2_rounds)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds
)
calculated_checksum = kdf.derive(hmac_mk)
if master_key_checksum != calculated_checksum:
@@ -113,8 +120,7 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds,
return master_key, master_iv
def decrypt_backup_data(encrypted_backup, password, encryption_algo,
format_version):
def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version):
"""
Generate encryption keyffrom password and do decryption
@@ -125,8 +131,14 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo,
if password is None:
raise InvalidBackupPassword()
[user_salt, checksum_salt, pbkdf2_rounds, user_iv,
master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5)
[
user_salt,
checksum_salt,
pbkdf2_rounds,
user_iv,
master_key_blob,
encrypted_data,
] = encrypted_backup.split(b"\n", 5)
user_salt = bytes.fromhex(user_salt.decode("utf-8"))
checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8"))
@@ -135,13 +147,15 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo,
master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8"))
# Derive decryption master key from password.
master_key, master_iv = decrypt_master_key(password=password,
user_salt=user_salt,
user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds,
master_key_blob=master_key_blob,
format_version=format_version,
checksum_salt=checksum_salt)
master_key, master_iv = decrypt_master_key(
password=password,
user_salt=user_salt,
user_iv=user_iv,
pbkdf2_rounds=pbkdf2_rounds,
master_key_blob=master_key_blob,
format_version=format_version,
checksum_salt=checksum_salt,
)
# Decrypt and unpad backup data using derivied key.
cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv))
@@ -160,21 +174,23 @@ def parse_backup_file(data, password=None):
if not data.startswith(b"ANDROID BACKUP"):
raise AndroidBackupParsingError("Invalid file header")
[_, version, is_compressed,
encryption_algo, tar_data] = data.split(b"\n", 4)
[_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4)
version = int(version)
is_compressed = int(is_compressed)
if encryption_algo != b"none":
tar_data = decrypt_backup_data(tar_data, password, encryption_algo,
format_version=version)
tar_data = decrypt_backup_data(
tar_data, password, encryption_algo, format_version=version
)
if is_compressed:
try:
tar_data = zlib.decompress(tar_data)
except zlib.error as exc:
raise AndroidBackupParsingError("Impossible to decompress the backup file") from exc
raise AndroidBackupParsingError(
"Impossible to decompress the backup file"
) from exc
return tar_data
@@ -189,9 +205,10 @@ def parse_tar_for_sms(data):
res = []
with tarfile.open(fileobj=dbytes) as tar:
for member in tar.getmembers():
if (member.name.startswith("apps/com.android.providers.telephony/d_f/")
and (member.name.endswith("_sms_backup")
or member.name.endswith("_mms_backup"))):
if member.name.startswith("apps/com.android.providers.telephony/d_f/") and (
member.name.endswith("_sms_backup")
or member.name.endswith("_mms_backup")
):
dhandler = tar.extractfile(member)
res.extend(parse_sms_file(dhandler.read()))
@@ -216,7 +233,7 @@ def parse_sms_file(data):
message_links = check_for_links(entry["body"])
entry["isodate"] = convert_unix_to_iso(int(entry["date"]) / 1000)
entry["direction"] = ("sent" if int(entry["date_sent"]) else "received")
entry["direction"] = "sent" if int(entry["date_sent"]) else "received"
# Extract links from the body
if message_links or entry["body"].strip() == "":

View File

@@ -27,10 +27,12 @@ def parse_dumpsys_accessibility(output: str) -> List[Dict[str, str]]:
service = line.split(":")[1].strip()
results.append({
"package_name": service.split("/")[0],
"service": service,
})
results.append(
{
"package_name": service.split("/")[0],
"service": service,
}
)
return results
@@ -62,8 +64,7 @@ def parse_dumpsys_activity_resolver_table(output: str) -> Dict[str, Any]:
break
# We detect the action name.
if (line.startswith(" " * 6) and not line.startswith(" " * 8)
and ":" in line):
if line.startswith(" " * 6) and not line.startswith(" " * 8) and ":" in line:
intent = line.strip().replace(":", "")
results[intent] = []
continue
@@ -84,10 +85,12 @@ def parse_dumpsys_activity_resolver_table(output: str) -> Dict[str, Any]:
activity = line.strip().split(" ")[1]
package_name = activity.split("/")[0]
results[intent].append({
"package_name": package_name,
"activity": activity,
})
results[intent].append(
{
"package_name": package_name,
"activity": activity,
}
)
return results
@@ -119,19 +122,20 @@ def parse_dumpsys_battery_daily(output: str) -> list:
already_seen = False
for update in daily_updates:
if (package_name == update["package_name"]
and vers_nr == update["vers"]):
if package_name == update["package_name"] and vers_nr == update["vers"]:
already_seen = True
break
if not already_seen:
daily_updates.append({
"action": "update",
"from": daily["from"],
"to": daily["to"],
"package_name": package_name,
"vers": vers_nr,
})
daily_updates.append(
{
"action": "update",
"from": daily["from"],
"to": daily["to"],
"package_name": package_name,
"vers": vers_nr,
}
)
if len(daily_updates) > 0:
results.extend(daily_updates)
@@ -154,18 +158,20 @@ def parse_dumpsys_battery_history(output: str) -> List[Dict[str, Any]]:
event = ""
if line.find("+job") > 0:
event = "start_job"
uid = line[line.find("+job")+5:line.find(":")]
service = line[line.find(":")+1:].strip('"')
uid = line[line.find("+job") + 5 : line.find(":")]
service = line[line.find(":") + 1 :].strip('"')
package_name = service.split("/")[0]
elif line.find("-job") > 0:
event = "end_job"
uid = line[line.find("-job")+5:line.find(":")]
service = line[line.find(":")+1:].strip('"')
uid = line[line.find("-job") + 5 : line.find(":")]
service = line[line.find(":") + 1 :].strip('"')
package_name = service.split("/")[0]
elif line.find("+running +wake_lock=") > 0:
uid = line[line.find("+running +wake_lock=")+21:line.find(":")]
uid = line[line.find("+running +wake_lock=") + 21 : line.find(":")]
event = "wake"
service = line[line.find("*walarm*:")+9:].split(" ")[0].strip('"').strip()
service = (
line[line.find("*walarm*:") + 9 :].split(" ")[0].strip('"').strip()
)
if service == "" or "/" not in service:
continue
@@ -177,20 +183,22 @@ def parse_dumpsys_battery_history(output: str) -> List[Dict[str, Any]]:
else:
event = "end_top"
top_pos = line.find("-top=")
colon_pos = top_pos+line[top_pos:].find(":")
uid = line[top_pos+5:colon_pos]
colon_pos = top_pos + line[top_pos:].find(":")
uid = line[top_pos + 5 : colon_pos]
service = ""
package_name = line[colon_pos+1:].strip('"')
package_name = line[colon_pos + 1 :].strip('"')
else:
continue
results.append({
"time_elapsed": time_elapsed,
"event": event,
"uid": uid,
"package_name": package_name,
"service": service,
})
results.append(
{
"time_elapsed": time_elapsed,
"event": event,
"uid": uid,
"package_name": package_name,
"service": service,
}
)
return results
@@ -198,8 +206,12 @@ def parse_dumpsys_battery_history(output: str) -> List[Dict[str, Any]]:
def parse_dumpsys_dbinfo(output: str) -> List[Dict[str, Any]]:
results = []
rxp = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\].*\[Pid:\((\d+)\)\](\w+).*sql\=\"(.+?)\"') # pylint: disable=line-too-long
rxp_no_pid = re.compile(r'.*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\][ ]{1}(\w+).*sql\=\"(.+?)\"') # pylint: disable=line-too-long
rxp = re.compile(
r".*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\].*\[Pid:\((\d+)\)\](\w+).*sql\=\"(.+?)\""
) # pylint: disable=line-too-long
rxp_no_pid = re.compile(
r".*\[([0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}\.[0-9]{3})\][ ]{1}(\w+).*sql\=\"(.+?)\""
) # pylint: disable=line-too-long
pool = None
in_operations = False
@@ -229,21 +241,25 @@ def parse_dumpsys_dbinfo(output: str) -> List[Dict[str, Any]]:
continue
match = matches[0]
results.append({
"isodate": match[0],
"action": match[1],
"sql": match[2],
"path": pool,
})
results.append(
{
"isodate": match[0],
"action": match[1],
"sql": match[2],
"path": pool,
}
)
else:
match = matches[0]
results.append({
"isodate": match[0],
"pid": match[1],
"action": match[2],
"sql": match[3],
"path": pool,
})
results.append(
{
"isodate": match[0],
"pid": match[1],
"action": match[2],
"sql": match[3],
"path": pool,
}
)
return results
@@ -275,8 +291,7 @@ def parse_dumpsys_receiver_resolver_table(output: str) -> Dict[str, Any]:
break
# We detect the action name.
if (line.startswith(" " * 6) and not line.startswith(" " * 8)
and ":" in line):
if line.startswith(" " * 6) and not line.startswith(" " * 8) and ":" in line:
intent = line.strip().replace(":", "")
results[intent] = []
continue
@@ -297,10 +312,12 @@ def parse_dumpsys_receiver_resolver_table(output: str) -> Dict[str, Any]:
receiver = line.strip().split(" ")[1]
package_name = receiver.split("/")[0]
results[intent].append({
"package_name": package_name,
"receiver": receiver,
})
results[intent].append(
{
"package_name": package_name,
"receiver": receiver,
}
)
return results
@@ -366,13 +383,15 @@ def parse_dumpsys_appops(output: str) -> List[Dict[str, Any]]:
entry = {}
entry["access"] = line.split(":")[0].strip()
entry["type"] = line[line.find("[")+1:line.find("]")]
entry["type"] = line[line.find("[") + 1 : line.find("]")]
try:
entry["timestamp"] = convert_datetime_to_iso(
datetime.strptime(
line[line.find("]")+1:line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f"))
line[line.find("]") + 1 : line.find("(")].strip(),
"%Y-%m-%d %H:%M:%S.%f",
)
)
except ValueError:
# Invalid date format
pass
@@ -418,13 +437,11 @@ def parse_dumpsys_package_for_details(output: str) -> Dict[str, Any]:
permission = lineinfo[0]
granted = None
if "granted=" in lineinfo[1]:
granted = ("granted=true" in lineinfo[1])
granted = "granted=true" in lineinfo[1]
details["permissions"].append({
"name": permission,
"granted": granted,
"type": "install"
})
details["permissions"].append(
{"name": permission, "granted": granted, "type": "install"}
)
if in_runtime_permissions:
if not line.startswith(" " * 8):
@@ -434,23 +451,18 @@ def parse_dumpsys_package_for_details(output: str) -> Dict[str, Any]:
permission = lineinfo[0]
granted = None
if "granted=" in lineinfo[1]:
granted = ("granted=true" in lineinfo[1])
granted = "granted=true" in lineinfo[1]
details["permissions"].append({
"name": permission,
"granted": granted,
"type": "runtime"
})
details["permissions"].append(
{"name": permission, "granted": granted, "type": "runtime"}
)
if in_declared_permissions:
if not line.startswith(" " * 6):
in_declared_permissions = False
else:
permission = line.strip().split(":")[0]
details["permissions"].append({
"name": permission,
"type": "declared"
})
details["permissions"].append({"name": permission, "type": "declared"})
if in_requested_permissions:
if not line.startswith(" " * 6):
in_requested_permissions = False

View File

@@ -20,10 +20,7 @@ def parse_getprop(output: str) -> List[Dict[str, str]]:
if not matches or len(matches[0]) != 2:
continue
entry = {
"name": matches[0][0],
"value": matches[0][1]
}
entry = {"name": matches[0][0], "value": matches[0][1]}
results.append(entry)
return results

View File

@@ -8,12 +8,12 @@ import os
from typing import Optional
from mvt.common.command import Command
from mvt.common.utils import exec_or_profile
log = logging.getLogger(__name__)
class CmdCheckIOCS(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -21,11 +21,17 @@ class CmdCheckIOCS(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
log=log,
)
self.name = "check-iocs"
@@ -50,22 +56,27 @@ class CmdCheckIOCS(Command):
if iocs_module().get_slug() != name_only:
continue
log.info("Loading results from \"%s\" with module %s",
file_name, iocs_module.__name__)
log.info(
'Loading results from "%s" with module %s',
file_name,
iocs_module.__name__,
)
m = iocs_module.from_json(file_path,
log=logging.getLogger(iocs_module.__module__))
m = iocs_module.from_json(
file_path, log=logging.getLogger(iocs_module.__module__)
)
if self.iocs.total_ioc_count > 0:
m.indicators = self.iocs
m.indicators.log = m.log
try:
m.check_indicators()
exec_or_profile("m.check_indicators()", globals(), locals())
except NotImplementedError:
continue
else:
total_detections += len(m.detected)
if total_detections > 0:
log.warning("The check of the results produced %d detections!",
total_detections)
log.warning(
"The check of the results produced %d detections!", total_detections
)

View File

@@ -12,14 +12,15 @@ from typing import Optional
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule, run_module, save_timeline
from mvt.common.utils import (convert_datetime_to_iso,
generate_hashes_from_path,
get_sha256_from_file_path)
from mvt.common.utils import (
convert_datetime_to_iso,
generate_hashes_from_path,
get_sha256_from_file_path,
)
from mvt.common.version import MVT_VERSION
class Command:
def __init__(
self,
target_path: Optional[str] = None,
@@ -27,8 +28,8 @@ class Command:
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
log: logging.Logger = logging.getLogger(__name__),
) -> None:
self.name = ""
@@ -62,8 +63,9 @@ class Command:
try:
os.makedirs(self.results_path)
except Exception as exc:
self.log.critical("Unable to create output folder %s: %s",
self.results_path, exc)
self.log.critical(
"Unable to create output folder %s: %s", self.results_path, exc
)
sys.exit(1)
def _setup_logging(self):
@@ -71,10 +73,12 @@ class Command:
return
logger = logging.getLogger("mvt")
file_handler = logging.FileHandler(os.path.join(self.results_path,
"command.log"))
formatter = logging.Formatter("%(asctime)s - %(name)s - "
"%(levelname)s - %(message)s")
file_handler = logging.FileHandler(
os.path.join(self.results_path, "command.log")
)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - " "%(levelname)s - %(message)s"
)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
@@ -84,13 +88,15 @@ class Command:
return
if len(self.timeline) > 0:
save_timeline(self.timeline,
os.path.join(self.results_path, "timeline.csv"))
save_timeline(
self.timeline, os.path.join(self.results_path, "timeline.csv")
)
if len(self.timeline_detected) > 0:
save_timeline(self.timeline_detected,
os.path.join(self.results_path,
"timeline_detected.csv"))
save_timeline(
self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv"),
)
def _store_info(self) -> None:
if not self.results_path:
@@ -124,7 +130,7 @@ class Command:
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
info_hash = get_sha256_from_file_path(info_path)
self.log.info("Reference hash of the info.json file: \"%s\"", info_hash)
self.log.info('Reference hash of the info.json file: "%s"', info_hash)
def generate_hashes(self) -> None:
"""
@@ -137,8 +143,7 @@ class Command:
self.hash_values.append(file)
def list_modules(self) -> None:
self.log.info("Following is the list of available %s modules:",
self.name)
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
self.log.info(" - %s", module.__name__)
@@ -152,7 +157,6 @@ class Command:
raise NotImplementedError
def run(self) -> None:
try:
self.init()
except NotImplementedError:
@@ -162,13 +166,15 @@ class Command:
if self.module_name and module.__name__ != self.module_name:
continue
# FIXME: do we need the logger here
# FIXME: do we need the logger here
module_logger = logging.getLogger(module.__module__)
m = module(target_path=self.target_path,
results_path=self.results_path,
fast_mode=self.fast_mode,
log=module_logger)
m = module(
target_path=self.target_path,
results_path=self.results_path,
fast_mode=self.fast_mode,
log=module_logger,
)
if self.iocs.total_ioc_count:
m.indicators = self.iocs

View File

@@ -7,7 +7,9 @@ import json
import logging
import os
from typing import Any, Dict, Iterator, List, Optional, Union
from functools import lru_cache
import ahocorasick
from appdirs import user_data_dir
from .url import URL
@@ -34,8 +36,7 @@ class Indicators:
for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER):
if ioc_file_name.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER,
ioc_file_name))
self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, ioc_file_name))
def _check_stix2_env_variable(self) -> None:
"""
@@ -49,8 +50,9 @@ class Indicators:
if os.path.isfile(path):
self.parse_stix2(path)
else:
self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s",
path)
self.log.error(
"Path specified with env MVT_STIX2 is not a valid file: %s", path
)
def _new_collection(
self,
@@ -58,7 +60,7 @@ class Indicators:
name: Optional[str] = None,
description: Optional[str] = None,
file_name: Optional[str] = None,
file_path: Optional[str] = None
file_path: Optional[str] = None,
) -> dict:
return {
"id": cid,
@@ -78,8 +80,7 @@ class Indicators:
"count": 0,
}
def _add_indicator(self, ioc: str, ioc_coll: dict,
ioc_coll_list: list) -> None:
def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None:
ioc = ioc.strip("'")
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
@@ -91,43 +92,51 @@ class Indicators:
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"])
self._add_indicator(
ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["domains"],
)
elif key == "process:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["processes"])
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["processes"]
)
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"])
self._add_indicator(
ioc=value.lower(),
ioc_coll=collection,
ioc_coll_list=collection["emails"],
)
elif key == "file:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_names"])
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_names"]
)
elif key == "file:path":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["file_paths"])
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_paths"]
)
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["files_sha256"])
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha256"]
)
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["app_ids"])
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["app_ids"]
)
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"])
self._add_indicator(
ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["ios_profile_ids"],
)
elif key == "android-property:name":
self._add_indicator(ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["android_property_names"])
self._add_indicator(
ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["android_property_names"],
)
def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
@@ -142,8 +151,10 @@ class Indicators:
try:
data = json.load(handle)
except json.decoder.JSONDecodeError:
self.log.critical("Unable to parse STIX2 indicator file. "
"The file is corrupted or in the wrong format!")
self.log.critical(
"Unable to parse STIX2 indicator file. "
"The file is corrupted or in the wrong format!"
)
return
malware = {}
@@ -163,10 +174,13 @@ class Indicators:
collections = []
for mal_id, mal_values in malware.items():
collection = self._new_collection(mal_id, mal_values.get("name"),
mal_values.get("description"),
os.path.basename(file_path),
file_path)
collection = self._new_collection(
mal_id,
mal_values.get("name"),
mal_values.get("description"),
os.path.basename(file_path),
file_path,
)
collections.append(collection)
# We loop through all indicators.
@@ -192,13 +206,17 @@ class Indicators:
break
for coll in collections:
self.log.info("Extracted %d indicators for collection with name \"%s\"",
coll["count"], coll["name"])
self.log.info(
'Extracted %d indicators for collection with name "%s"',
coll["count"],
coll["name"],
)
self.ioc_collections.extend(collections)
def load_indicators_files(self, files: list,
load_default: Optional[bool] = True) -> None:
def load_indicators_files(
self, files: list, load_default: Optional[bool] = True
) -> None:
"""
Load a list of indicators files.
"""
@@ -206,16 +224,14 @@ class Indicators:
if os.path.isfile(file_path):
self.parse_stix2(file_path)
else:
self.log.warning("No indicators file exists at path %s",
file_path)
self.log.warning("No indicators file exists at path %s", file_path)
# Load downloaded indicators and any indicators from env variable.
if load_default:
self._load_downloaded_indicators()
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators",
self.total_ioc_count)
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]:
for ioc_collection in self.ioc_collections:
@@ -227,6 +243,40 @@ class Indicators:
"stix2_file_name": ioc_collection["stix2_file_name"],
}
@lru_cache()
def get_ioc_matcher(
self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None
) -> ahocorasick.Automaton:
"""
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
Returns an Aho-Corasick automaton
This data-structue and algorithim allows for fast matching of a large number
of match strings (i.e IOCs) against a large body of text. This will also
match strings containing the IOC, so it is important to confirm the
match is a valid IOC before using it.
for _, ioc in domains_automaton.iter(url.domain.lower()):
if ioc.value == url.domain.lower():
print(ioc)
We use an LRU cache to avoid rebuilding the automaton every time we call a
function such as check_domain().
"""
automaton = ahocorasick.Automaton()
if ioc_type:
iocs = self.get_iocs(ioc_type)
elif ioc_list:
iocs = ioc_list
else:
raise ValueError("Must provide either ioc_tyxpe or ioc_list")
for ioc in iocs:
automaton.add_word(ioc["value"], ioc)
automaton.make_automaton()
return automaton
@lru_cache()
def check_domain(self, url: str) -> Union[dict, None]:
"""Check if a given URL matches any of the provided domain indicators.
@@ -240,6 +290,9 @@ class Indicators:
if not isinstance(url, str):
return None
# Create an Aho-Corasick automaton from the list of domains
domain_matcher = self.get_ioc_matcher("domains")
try:
# First we use the provided URL.
orig_url = URL(url)
@@ -249,17 +302,20 @@ class Indicators:
# HTTP HEAD request.
unshortened = orig_url.unshorten()
self.log.debug("Found a shortened URL %s -> %s",
url, unshortened)
self.log.debug("Found a shortened URL %s -> %s", url, unshortened)
if unshortened is None:
self.log.warning("Unable to unshorten URL %s", url)
return None
# Now we check for any nested URL shorteners.
dest_url = URL(unshortened)
if dest_url.check_if_shortened():
self.log.debug("Original URL %s appears to shorten another "
"shortened URL %s ... checking!",
orig_url.url, dest_url.url)
self.log.debug(
"Original URL %s appears to shorten another "
"shortened URL %s ... checking!",
orig_url.url,
dest_url.url,
)
return self.check_domain(dest_url.url)
final_url = dest_url
@@ -269,11 +325,15 @@ class Indicators:
except Exception:
# If URL parsing failed, we just try to do a simple substring
# match.
for ioc in self.get_iocs("domains"):
for idx, ioc in domain_matcher.iter(url):
if ioc["value"].lower() in url:
self.log.warning("Maybe found a known suspicious domain %s "
"matching indicators from \"%s\"",
url, ioc["name"])
self.log.warning(
"Maybe found a known suspicious domain %s "
'matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
)
return ioc
# If nothing matched, we can quit here.
@@ -281,31 +341,49 @@ class Indicators:
# If all parsing worked, we start walking through available domain
# indicators.
for ioc in self.get_iocs("domains"):
for idx, ioc in domain_matcher.iter(final_url.domain.lower()):
# First we check the full domain.
if final_url.domain.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a known suspicious domain %s "
"shortened as %s matching indicators from \"%s\"",
final_url.url, orig_url.url, ioc["name"])
self.log.warning(
"Found a known suspicious domain %s "
'shortened as %s matching indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
else:
self.log.warning("Found a known suspicious domain %s "
"matching indicators from \"%s\"",
final_url.url, ioc["name"])
self.log.warning(
"Found a known suspicious domain %s "
'matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)
return ioc
# Then we just check the top level domain.
# Then we just check the top level domain.
for idx, ioc in domain_matcher.iter(final_url.top_level.lower()):
if final_url.top_level.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a sub-domain with suspicious top "
"level %s shortened as %s matching "
"indicators from \"%s\"", final_url.url,
orig_url.url, ioc["name"])
self.log.warning(
"Found a sub-domain with suspicious top "
"level %s shortened as %s matching "
'indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
else:
self.log.warning("Found a sub-domain with a suspicious top "
"level %s matching indicators from \"%s\"",
final_url.url, ioc["name"])
self.log.warning(
"Found a sub-domain with a suspicious top "
'level %s matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)
return ioc
@@ -344,16 +422,22 @@ class Indicators:
proc_name = os.path.basename(process)
for ioc in self.get_iocs("processes"):
if proc_name == ioc["value"]:
self.log.warning("Found a known suspicious process name \"%s\" "
"matching indicators from \"%s\"",
process, ioc["name"])
self.log.warning(
'Found a known suspicious process name "%s" '
'matching indicators from "%s"',
process,
ioc["name"],
)
return ioc
if len(proc_name) == 16:
if ioc["value"].startswith(proc_name):
self.log.warning("Found a truncated known suspicious "
"process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"])
self.log.warning(
"Found a truncated known suspicious "
'process name "%s" matching indicators from "%s"',
process,
ioc["name"],
)
return ioc
return None
@@ -390,9 +474,12 @@ class Indicators:
for ioc in self.get_iocs("emails"):
if email.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious email address \"%s\" "
"matching indicators from \"%s\"",
email, ioc["name"])
self.log.warning(
'Found a known suspicious email address "%s" '
'matching indicators from "%s"',
email,
ioc["name"],
)
return ioc
return None
@@ -411,9 +498,12 @@ class Indicators:
for ioc in self.get_iocs("file_names"):
if ioc["value"] == file_name:
self.log.warning("Found a known suspicious file name \"%s\" "
"matching indicators from \"%s\"",
file_name, ioc["name"])
self.log.warning(
'Found a known suspicious file name "%s" '
'matching indicators from "%s"',
file_name,
ioc["name"],
)
return ioc
return None
@@ -439,9 +529,12 @@ class Indicators:
# Strip any trailing slash from indicator paths to match
# directories.
if file_path.startswith(ioc["value"].rstrip("/")):
self.log.warning("Found a known suspicious file path \"%s\" "
"matching indicators form \"%s\"",
file_path, ioc["name"])
self.log.warning(
'Found a known suspicious file path "%s" '
'matching indicators form "%s"',
file_path,
ioc["name"],
)
return ioc
return None
@@ -462,9 +555,12 @@ class Indicators:
for ioc in self.get_iocs("processes"):
parts = file_path.split("/")
if ioc["value"] in parts:
self.log.warning("Found known suspicious process name mentioned in file at "
"path \"%s\" matching indicators from \"%s\"",
file_path, ioc["name"])
self.log.warning(
"Found known suspicious process name mentioned in file at "
'path "%s" matching indicators from "%s"',
file_path,
ioc["name"],
)
return ioc
return None
@@ -484,9 +580,12 @@ class Indicators:
for ioc in self.get_iocs("ios_profile_ids"):
if profile_uuid in ioc["value"]:
self.log.warning("Found a known suspicious profile ID \"%s\" "
"matching indicators from \"%s\"",
profile_uuid, ioc["name"])
self.log.warning(
'Found a known suspicious profile ID "%s" '
'matching indicators from "%s"',
profile_uuid,
ioc["name"],
)
return ioc
return None
@@ -504,9 +603,12 @@ class Indicators:
for ioc in self.get_iocs("files_sha256"):
if file_hash.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious file with hash \"%s\" "
"matching indicators from \"%s\"",
file_hash, ioc["name"])
self.log.warning(
'Found a known suspicious file with hash "%s" '
'matching indicators from "%s"',
file_hash,
ioc["name"],
)
return ioc
return None
@@ -525,9 +627,12 @@ class Indicators:
for ioc in self.get_iocs("app_ids"):
if app_id.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious app with ID \"%s\" "
"matching indicators from \"%s\"", app_id,
ioc["name"])
self.log.warning(
'Found a known suspicious app with ID "%s" '
'matching indicators from "%s"',
app_id,
ioc["name"],
)
return ioc
return None
@@ -545,9 +650,12 @@ class Indicators:
for ioc in self.get_iocs("android_property_names"):
if property_name.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious Android property \"%s\" "
"matching indicators from \"%s\"", property_name,
ioc["name"])
self.log.warning(
'Found a known suspicious Android property "%s" '
'matching indicators from "%s"',
property_name,
ioc["name"],
)
return ioc
return None

View File

@@ -18,8 +18,10 @@ def check_updates() -> None:
pass
else:
if latest_version:
rich_print(f"\t\t[bold]Version {latest_version} is available! "
"Upgrade mvt with `pip3 install -U mvt`[/bold]")
rich_print(
f"\t\t[bold]Version {latest_version} is available! "
"Upgrade mvt with `pip3 install -U mvt`[/bold]"
)
# Then we check for indicators files updates.
ioc_updates = IndicatorsUpdates()
@@ -27,8 +29,10 @@ def check_updates() -> None:
# Before proceeding, we check if we have downloaded an indicators index.
# If not, there's no point in proceeding with the updates check.
if ioc_updates.get_latest_update() == 0:
rich_print("\t\t[bold]You have not yet downloaded any indicators, check "
"the `download-iocs` command![/bold]")
rich_print(
"\t\t[bold]You have not yet downloaded any indicators, check "
"the `download-iocs` command![/bold]"
)
return
# We only perform this check at a fixed frequency, in order to not
@@ -36,8 +40,10 @@ def check_updates() -> None:
# multiple times.
should_check, hours = ioc_updates.should_check()
if not should_check:
rich_print(f"\t\tIndicators updates checked recently, next automatic check "
f"in {int(hours)} hours")
rich_print(
f"\t\tIndicators updates checked recently, next automatic check "
f"in {int(hours)} hours"
)
return
try:
@@ -46,8 +52,10 @@ def check_updates() -> None:
pass
else:
if ioc_to_update:
rich_print("\t\t[bold]There are updates to your indicators files! "
"Run the `download-iocs` command to update![/bold]")
rich_print(
"\t\t[bold]There are updates to your indicators files! "
"Run the `download-iocs` command to update![/bold]"
)
else:
rich_print("\t\tYour indicators files seem to be up to date.")

View File

@@ -11,6 +11,8 @@ from typing import Any, Dict, List, Optional, Union
import simplejson as json
from .utils import exec_or_profile
class DatabaseNotFoundError(Exception):
pass
@@ -35,9 +37,9 @@ class MVTModule:
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
) -> None:
"""Initialize module.
@@ -70,8 +72,7 @@ class MVTModule:
with open(json_path, "r", encoding="utf-8") as handle:
results = json.load(handle)
if log:
log.info("Loaded %d results from \"%s\"",
len(results), json_path)
log.info('Loaded %d results from "%s"', len(results), json_path)
return cls(results=results, log=log)
def get_slug(self) -> str:
@@ -99,20 +100,21 @@ class MVTModule:
if self.results:
results_file_name = f"{name}.json"
results_json_path = os.path.join(self.results_path,
results_file_name)
results_json_path = os.path.join(self.results_path, results_file_name)
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, default=str)
except Exception as exc:
self.log.error("Unable to store results of module %s to file %s: %s",
self.__class__.__name__, results_file_name,
exc)
self.log.error(
"Unable to store results of module %s to file %s: %s",
self.__class__.__name__,
results_file_name,
exc,
)
if self.detected:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.results_path,
detected_file_name)
detected_json_path = os.path.join(self.results_path, detected_file_name)
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)
@@ -151,8 +153,7 @@ class MVTModule:
# De-duplicate timeline entries.
self.timeline = self._deduplicate_timeline(self.timeline)
self.timeline_detected = self._deduplicate_timeline(
self.timeline_detected)
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
def run(self) -> None:
"""Run the main module procedure."""
@@ -163,44 +164,65 @@ def run_module(module: MVTModule) -> None:
module.log.info("Running module %s...", module.__class__.__name__)
try:
module.run()
exec_or_profile("module.run()", globals(), locals())
except NotImplementedError:
module.log.exception("The run() procedure of module %s was not implemented yet!",
module.__class__.__name__)
module.log.exception(
"The run() procedure of module %s was not implemented yet!",
module.__class__.__name__,
)
except InsufficientPrivileges as exc:
module.log.info("Insufficient privileges for module %s: %s",
module.__class__.__name__, exc)
module.log.info(
"Insufficient privileges for module %s: %s", module.__class__.__name__, exc
)
except DatabaseNotFoundError as exc:
module.log.info("There might be no data to extract by module %s: %s",
module.__class__.__name__, exc)
module.log.info(
"There might be no data to extract by module %s: %s",
module.__class__.__name__,
exc,
)
except DatabaseCorruptedError as exc:
module.log.error("The %s module database seems to be corrupted: %s",
module.__class__.__name__, exc)
module.log.error(
"The %s module database seems to be corrupted: %s",
module.__class__.__name__,
exc,
)
except Exception as exc:
module.log.exception("Error in running extraction from module %s: %s",
module.__class__.__name__, exc)
module.log.exception(
"Error in running extraction from module %s: %s",
module.__class__.__name__,
exc,
)
else:
try:
module.check_indicators()
exec_or_profile("module.check_indicators()", globals(), locals())
except NotImplementedError:
module.log.info("The %s module does not support checking for indicators",
module.__class__.__name__)
module.log.info(
"The %s module does not support checking for indicators",
module.__class__.__name__,
)
except Exception as exc:
module.log.exception("Error when checking indicators from module %s: %s",
module.__class__.__name__, exc)
module.log.exception(
"Error when checking indicators from module %s: %s",
module.__class__.__name__,
exc,
)
else:
if module.indicators and not module.detected:
module.log.info("The %s module produced no detections!",
module.__class__.__name__)
module.log.info(
"The %s module produced no detections!", module.__class__.__name__
)
try:
module.to_timeline()
except NotImplementedError:
pass
except Exception as exc:
module.log.exception("Error when serializing data from module %s: %s",
module.__class__.__name__, exc)
module.log.exception(
"Error when serializing data from module %s: %s",
module.__class__.__name__,
exc,
)
module.save_to_json()
@@ -213,15 +235,19 @@ def save_timeline(timeline: list, timeline_path: str) -> None:
"""
with open(timeline_path, "a+", encoding="utf-8") as handle:
csvoutput = csv.writer(handle, delimiter=",", quotechar="\"",
quoting=csv.QUOTE_ALL, escapechar='\\')
csvoutput = csv.writer(
handle, delimiter=",", quotechar='"', quoting=csv.QUOTE_ALL, escapechar="\\"
)
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
for event in sorted(timeline, key=lambda x: x["timestamp"]
if x["timestamp"] is not None else ""):
csvoutput.writerow([
event.get("timestamp"),
event.get("module"),
event.get("event"),
event.get("data"),
])
for event in sorted(
timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""
):
csvoutput.writerow(
[
event.get("timestamp"),
event.get("module"),
event.get("event"),
event.get("data"),
]
)

View File

@@ -16,8 +16,10 @@ class MutuallyExclusiveOption(Option):
help_msg = kwargs.get("help", "")
if self.mutually_exclusive:
ex_str = ", ".join(self.mutually_exclusive)
kwargs["help"] = (f"{help_msg} NOTE: This argument is mutually exclusive with arguments"
f"[{ex_str}].")
kwargs["help"] = (
f"{help_msg} NOTE: This argument is mutually exclusive with arguments"
f"[{ex_str}]."
)
super().__init__(*args, **kwargs)

View File

@@ -22,7 +22,6 @@ INDICATORS_CHECK_FREQUENCY = 12
class MVTUpdates:
def check(self) -> str:
res = requests.get("https://pypi.org/pypi/mvt/json")
data = res.json()
@@ -35,7 +34,6 @@ class MVTUpdates:
class IndicatorsUpdates:
def __init__(self) -> None:
self.github_raw_url = "https://raw.githubusercontent.com/{}/{}/{}/{}"
@@ -47,10 +45,12 @@ class IndicatorsUpdates:
if not os.path.exists(MVT_DATA_FOLDER):
os.makedirs(MVT_DATA_FOLDER)
self.latest_update_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_update")
self.latest_check_path = os.path.join(MVT_DATA_FOLDER,
"latest_indicators_check")
self.latest_update_path = os.path.join(
MVT_DATA_FOLDER, "latest_indicators_update"
)
self.latest_check_path = os.path.join(
MVT_DATA_FOLDER, "latest_indicators_check"
)
def get_latest_check(self) -> int:
if not os.path.exists(self.latest_check_path):
@@ -85,12 +85,16 @@ class IndicatorsUpdates:
handle.write(str(timestamp))
def get_remote_index(self) -> Optional[dict]:
url = self.github_raw_url.format(self.index_owner, self.index_repo,
self.index_branch, self.index_path)
url = self.github_raw_url.format(
self.index_owner, self.index_repo, self.index_branch, self.index_path
)
res = requests.get(url)
if res.status_code != 200:
log.error("Failed to retrieve indicators index located at %s (error %d)",
url, res.status_code)
log.error(
"Failed to retrieve indicators index located at %s (error %d)",
url,
res.status_code,
)
return None
return yaml.safe_load(res.content)
@@ -98,8 +102,11 @@ class IndicatorsUpdates:
def download_remote_ioc(self, ioc_url: str) -> Optional[str]:
res = requests.get(ioc_url)
if res.status_code != 200:
log.error("Failed to download indicators file from %s (error %d)",
ioc_url, res.status_code)
log.error(
"Failed to download indicators file from %s (error %d)",
ioc_url,
res.status_code,
)
return None
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
@@ -135,28 +142,37 @@ class IndicatorsUpdates:
ioc_url = ioc.get("download_url", "")
if not ioc_url:
log.error("Could not find a way to download indicator file for %s",
ioc.get("name"))
log.error(
"Could not find a way to download indicator file for %s",
ioc.get("name"),
)
continue
ioc_local_path = self.download_remote_ioc(ioc_url)
if not ioc_local_path:
continue
log.info("Downloaded indicators \"%s\" to %s",
ioc.get("name"), ioc_local_path)
log.info(
'Downloaded indicators "%s" to %s', ioc.get("name"), ioc_local_path
)
self.set_latest_update()
def _get_remote_file_latest_commit(self, owner: str, repo: str,
branch: str, path: str) -> int:
def _get_remote_file_latest_commit(
self, owner: str, repo: str, branch: str, path: str
) -> int:
# TODO: The branch is currently not taken into consideration.
# How do we specify which branch to look up to the API?
file_commit_url = f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
file_commit_url = (
f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}"
)
res = requests.get(file_commit_url)
if res.status_code != 200:
log.error("Failed to get details about file %s (error %d)",
file_commit_url, res.status_code)
log.error(
"Failed to get details about file %s (error %d)",
file_commit_url,
res.status_code,
)
return -1
details = res.json()
@@ -164,13 +180,16 @@ class IndicatorsUpdates:
return -1
latest_commit = details[0]
latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None)
latest_commit_date = (
latest_commit.get("commit", {}).get("author", {}).get("date", None)
)
if not latest_commit_date:
log.error("Failed to retrieve date of latest update to indicators index file")
log.error(
"Failed to retrieve date of latest update to indicators index file"
)
return -1
latest_commit_dt = datetime.strptime(latest_commit_date,
'%Y-%m-%dT%H:%M:%SZ')
latest_commit_dt = datetime.strptime(latest_commit_date, "%Y-%m-%dT%H:%M:%SZ")
latest_commit_ts = int(latest_commit_dt.timestamp())
return latest_commit_ts
@@ -192,10 +211,9 @@ class IndicatorsUpdates:
self.set_latest_check()
latest_update = self.get_latest_update()
latest_commit_ts = self._get_remote_file_latest_commit(self.index_owner,
self.index_repo,
self.index_branch,
self.index_path)
latest_commit_ts = self._get_remote_file_latest_commit(
self.index_owner, self.index_repo, self.index_branch, self.index_path
)
if latest_update < latest_commit_ts:
return True
@@ -214,10 +232,9 @@ class IndicatorsUpdates:
branch = github.get("branch", "main")
path = github.get("path", "")
file_latest_commit_ts = self._get_remote_file_latest_commit(owner,
repo,
branch,
path)
file_latest_commit_ts = self._get_remote_file_latest_commit(
owner, repo, branch, path
)
if latest_update < file_latest_commit_ts:
return True

View File

@@ -254,7 +254,6 @@ SHORTENER_DOMAINS = [
class URL:
def __init__(self, url: str) -> None:
if isinstance(url, bytes):
url = url.decode()
@@ -273,9 +272,11 @@ class URL:
:rtype: str
"""
return get_tld(self.url,
as_object=True,
fix_protocol=True).parsed_url.netloc.lower().lstrip("www.")
return (
get_tld(self.url, as_object=True, fix_protocol=True)
.parsed_url.netloc.lower()
.lstrip("www.")
)
def get_top_level(self) -> str:
"""Get only the top-level domain from a URL.
@@ -286,9 +287,7 @@ class URL:
:rtype: str
"""
return get_tld(self.url,
as_object=True,
fix_protocol=True).fld.lower()
return get_tld(self.url, as_object=True, fix_protocol=True).fld.lower()
def check_if_shortened(self) -> bool:
"""Check if the URL is among list of shortener services.

View File

@@ -8,6 +8,7 @@ import hashlib
import logging
import os
import re
import cProfile
from typing import Any, Iterator, Union
from rich.logging import RichHandler
@@ -42,7 +43,7 @@ def convert_datetime_to_iso(date_time: datetime.datetime) -> str:
def convert_unix_to_utc_datetime(
timestamp: Union[int, float, str]
timestamp: Union[int, float, str]
) -> datetime.datetime:
"""Converts a unix epoch timestamp to UTC datetime.
@@ -69,8 +70,7 @@ def convert_unix_to_iso(timestamp: Union[int, float, str]) -> str:
return ""
def convert_mactime_to_datetime(timestamp: Union[int, float],
from_2001: bool = True):
def convert_mactime_to_datetime(timestamp: Union[int, float], from_2001: bool = True):
"""Converts Mac Standard Time to a datetime.
:param timestamp: MacTime timestamp (either int or float).
@@ -111,8 +111,7 @@ def convert_mactime_to_iso(timestamp: int, from_2001: bool = True):
"""
return convert_datetime_to_iso(
convert_mactime_to_datetime(timestamp, from_2001))
return convert_datetime_to_iso(convert_mactime_to_datetime(timestamp, from_2001))
def check_for_links(text: str) -> list:
@@ -185,18 +184,20 @@ def generate_hashes_from_path(path: str, log) -> Iterator[dict]:
hash_value = get_sha256_from_file_path(path)
yield {"file_path": path, "sha256": hash_value}
elif os.path.isdir(path):
for (root, _, files) in os.walk(path):
for root, _, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
try:
sha256 = get_sha256_from_file_path(file_path)
except FileNotFoundError:
log.error("Failed to hash the file %s: might be a symlink",
file_path)
log.error(
"Failed to hash the file %s: might be a symlink", file_path
)
continue
except PermissionError:
log.error("Failed to hash the file %s: permission denied",
file_path)
log.error(
"Failed to hash the file %s: permission denied", file_path
)
continue
yield {"file_path": file_path, "sha256": sha256}
@@ -225,3 +226,11 @@ def set_verbose_logging(verbose: bool = False):
handler.setLevel(logging.DEBUG)
else:
handler.setLevel(logging.INFO)
def exec_or_profile(module, globals, locals):
"""Hook for profiling MVT modules"""
if int(os.environ.get("MVT_PROFILE", False)):
cProfile.runctx(module, globals, locals)
else:
exec(module, globals, locals)

View File

@@ -3,4 +3,4 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
MVT_VERSION = "2.2.6"
MVT_VERSION = "2.3.0"

View File

@@ -23,17 +23,20 @@ class VTQuotaExceeded(Exception):
def virustotal_lookup(file_hash: str):
if MVT_VT_API_KEY not in os.environ:
raise VTNoKey("No VirusTotal API key provided: to use VirusTotal "
"lookups please provide your API key with "
"`export MVT_VT_API_KEY=<key>`")
raise VTNoKey(
"No VirusTotal API key provided: to use VirusTotal "
"lookups please provide your API key with "
"`export MVT_VT_API_KEY=<key>`"
)
headers = {
"User-Agent": "VirusTotal",
"Content-Type": "application/json",
"x-apikey": os.environ[MVT_VT_API_KEY],
}
res = requests.get(f"https://www.virustotal.com/api/v3/files/{file_hash}",
headers=headers)
res = requests.get(
f"https://www.virustotal.com/api/v3/files/{file_hash}", headers=headers
)
if res.status_code == 200:
report = res.json()

View File

@@ -11,14 +11,23 @@ import click
from rich.prompt import Prompt
from mvt.common.cmd_check_iocs import CmdCheckIOCS
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_HASHES, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_VERBOSE)
from mvt.common.help import (
HELP_MSG_FAST,
HELP_MSG_HASHES,
HELP_MSG_IOC,
HELP_MSG_LIST_MODULES,
HELP_MSG_MODULE,
HELP_MSG_OUTPUT,
HELP_MSG_VERBOSE,
)
from mvt.common.logo import logo
from mvt.common.options import MutuallyExclusiveOption
from mvt.common.updates import IndicatorsUpdates
from mvt.common.utils import (generate_hashes_from_path, init_logging,
set_verbose_logging)
from mvt.common.utils import (
generate_hashes_from_path,
init_logging,
set_verbose_logging,
)
from .cmd_check_backup import CmdIOSCheckBackup
from .cmd_check_fs import CmdIOSCheckFS
@@ -32,41 +41,55 @@ log = logging.getLogger("mvt")
# Set this environment variable to a password if needed.
MVT_IOS_BACKUP_PASSWORD = "MVT_IOS_BACKUP_PASSWORD"
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
#==============================================================================
# ==============================================================================
# Main
#==============================================================================
# ==============================================================================
@click.group(invoke_without_command=False)
def cli():
logo()
#==============================================================================
# ==============================================================================
# Command: version
#==============================================================================
# ==============================================================================
@cli.command("version", help="Show the currently installed version of MVT")
def version():
return
#==============================================================================
# ==============================================================================
# Command: decrypt-backup
#==============================================================================
@cli.command("decrypt-backup", help="Decrypt an encrypted iTunes backup",
context_settings=CONTEXT_SETTINGS)
@click.option("--destination", "-d", required=True,
help="Path to the folder where to store the decrypted backup")
@click.option("--password", "-p", cls=MutuallyExclusiveOption,
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)",
mutually_exclusive=["key_file"])
@click.option("--key-file", "-k", cls=MutuallyExclusiveOption,
type=click.Path(exists=True),
help="File containing raw encryption key to use to decrypt "
"the backup",
mutually_exclusive=["password"])
# ==============================================================================
@cli.command(
"decrypt-backup",
help="Decrypt an encrypted iTunes backup",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--destination",
"-d",
required=True,
help="Path to the folder where to store the decrypted backup",
)
@click.option(
"--password",
"-p",
cls=MutuallyExclusiveOption,
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)",
mutually_exclusive=["key_file"],
)
@click.option(
"--key-file",
"-k",
cls=MutuallyExclusiveOption,
type=click.Path(exists=True),
help="File containing raw encryption key to use to decrypt " "the backup",
mutually_exclusive=["password"],
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
@@ -75,22 +98,28 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
if key_file:
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --key-file"
"'%s' instead", MVT_IOS_BACKUP_PASSWORD, key_file)
log.info(
"Ignoring %s environment variable, using --key-file" "'%s' instead",
MVT_IOS_BACKUP_PASSWORD,
key_file,
)
backup.decrypt_with_key_file(key_file)
elif password:
log.info("Your password may be visible in the process table because it "
"was supplied on the command line!")
log.info(
"Your password may be visible in the process table because it "
"was supplied on the command line!"
)
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password"
"argument instead", MVT_IOS_BACKUP_PASSWORD)
log.info(
"Ignoring %s environment variable, using --password" "argument instead",
MVT_IOS_BACKUP_PASSWORD,
)
backup.decrypt_with_password(password)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable",
MVT_IOS_BACKUP_PASSWORD)
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
backup.decrypt_with_password(os.environ[MVT_IOS_BACKUP_PASSWORD])
else:
sekrit = Prompt.ask("Enter backup password", password=True)
@@ -112,33 +141,45 @@ def decrypt_backup(ctx, destination, password, key_file, hashes, backup_path):
json.dump(info, handle, indent=4)
#==============================================================================
# ==============================================================================
# Command: extract-key
#==============================================================================
@cli.command("extract-key", help="Extract decryption key from an iTunes backup",
context_settings=CONTEXT_SETTINGS)
@click.option("--password", "-p",
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)")
@click.option("--key-file", "-k",
help="Key file to be written (if unset, will print to STDOUT)",
required=False,
type=click.Path(exists=False, file_okay=True, dir_okay=False,
writable=True))
# ==============================================================================
@cli.command(
"extract-key",
help="Extract decryption key from an iTunes backup",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--password",
"-p",
help="Password to use to decrypt the backup (or, set "
f"{MVT_IOS_BACKUP_PASSWORD} environment variable)",
)
@click.option(
"--key-file",
"-k",
help="Key file to be written (if unset, will print to STDOUT)",
required=False,
type=click.Path(exists=False, file_okay=True, dir_okay=False, writable=True),
)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
def extract_key(password, key_file, backup_path):
backup = DecryptBackup(backup_path)
if password:
log.info("Your password may be visible in the process table because it "
"was supplied on the command line!")
log.info(
"Your password may be visible in the process table because it "
"was supplied on the command line!"
)
if MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Ignoring %s environment variable, using --password "
"argument instead", MVT_IOS_BACKUP_PASSWORD)
log.info(
"Ignoring %s environment variable, using --password "
"argument instead",
MVT_IOS_BACKUP_PASSWORD,
)
elif MVT_IOS_BACKUP_PASSWORD in os.environ:
log.info("Using password from %s environment variable",
MVT_IOS_BACKUP_PASSWORD)
log.info("Using password from %s environment variable", MVT_IOS_BACKUP_PASSWORD)
password = os.environ[MVT_IOS_BACKUP_PASSWORD]
else:
password = Prompt.ask("Enter backup password", password=True)
@@ -150,15 +191,23 @@ def extract_key(password, key_file, backup_path):
backup.write_key(key_file)
#==============================================================================
# ==============================================================================
# Command: check-backup
#==============================================================================
@cli.command("check-backup", help="Extract artifacts from an iTunes backup",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
# ==============================================================================
@cli.command(
"check-backup",
help="Extract artifacts from an iTunes backup",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@@ -166,12 +215,19 @@ def extract_key(password, key_file, backup_path):
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_backup(ctx, iocs, output, fast, list_modules, module, hashes, verbose, backup_path):
def check_backup(
ctx, iocs, output, fast, list_modules, module, hashes, verbose, backup_path
):
set_verbose_logging(verbose)
cmd = CmdIOSCheckBackup(target_path=backup_path, results_path=output,
ioc_files=iocs, module_name=module, fast_mode=fast,
hashes=hashes)
cmd = CmdIOSCheckBackup(
target_path=backup_path,
results_path=output,
ioc_files=iocs,
module_name=module,
fast_mode=fast,
hashes=hashes,
)
if list_modules:
cmd.list_modules()
@@ -182,19 +238,28 @@ def check_backup(ctx, iocs, output, fast, list_modules, module, hashes, verbose,
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the backup produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the backup produced %d detections!", cmd.detected_count
)
#==============================================================================
# ==============================================================================
# Command: check-fs
#==============================================================================
@cli.command("check-fs", help="Extract artifacts from a full filesystem dump",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
@click.option("--output", "-o", type=click.Path(exists=False),
help=HELP_MSG_OUTPUT)
# ==============================================================================
@cli.command(
"check-fs",
help="Extract artifacts from a full filesystem dump",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@@ -204,9 +269,14 @@ def check_backup(ctx, iocs, output, fast, list_modules, module, hashes, verbose,
@click.pass_context
def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dump_path):
set_verbose_logging(verbose)
cmd = CmdIOSCheckFS(target_path=dump_path, results_path=output,
ioc_files=iocs, module_name=module, fast_mode=fast,
hashes=hashes)
cmd = CmdIOSCheckFS(
target_path=dump_path,
results_path=output,
ioc_files=iocs,
module_name=module,
fast_mode=fast,
hashes=hashes,
)
if list_modules:
cmd.list_modules()
@@ -217,17 +287,28 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
cmd.run()
if cmd.detected_count > 0:
log.warning("The analysis of the iOS filesystem produced %d detections!",
cmd.detected_count)
log.warning(
"The analysis of the iOS filesystem produced %d detections!",
cmd.detected_count,
)
#==============================================================================
# ==============================================================================
# Command: check-iocs
#==============================================================================
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators",
context_settings=CONTEXT_SETTINGS)
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], help=HELP_MSG_IOC)
# ==============================================================================
@cli.command(
"check-iocs",
help="Compare stored JSON results to provided indicators",
context_settings=CONTEXT_SETTINGS,
)
@click.option(
"--iocs",
"-i",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_IOC,
)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("FOLDER", type=click.Path(exists=True))
@@ -243,11 +324,14 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
cmd.run()
#==============================================================================
# ==============================================================================
# Command: download-iocs
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators",
context_settings=CONTEXT_SETTINGS)
# ==============================================================================
@cli.command(
"download-iocs",
help="Download public STIX2 indicators",
context_settings=CONTEXT_SETTINGS,
)
def download_iocs():
ioc_updates = IndicatorsUpdates()
ioc_updates.update()

View File

@@ -15,7 +15,6 @@ log = logging.getLogger(__name__)
class CmdIOSCheckBackup(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -23,13 +22,19 @@ class CmdIOSCheckBackup(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-backup"
self.modules = BACKUP_MODULES + MIXED_MODULES

View File

@@ -15,7 +15,6 @@ log = logging.getLogger(__name__)
class CmdIOSCheckFS(Command):
def __init__(
self,
target_path: Optional[str] = None,
@@ -23,13 +22,19 @@ class CmdIOSCheckFS(Command):
ioc_files: Optional[list] = None,
module_name: Optional[str] = None,
serial: Optional[str] = None,
fast_mode: Optional[bool] = False,
hashes: Optional[bool] = False,
fast_mode: bool = False,
hashes: bool = False,
) -> None:
super().__init__(target_path=target_path, results_path=results_path,
ioc_files=ioc_files, module_name=module_name,
serial=serial, fast_mode=fast_mode, hashes=hashes,
log=log)
super().__init__(
target_path=target_path,
results_path=results_path,
ioc_files=ioc_files,
module_name=module_name,
serial=serial,
fast_mode=fast_mode,
hashes=hashes,
log=log,
)
self.name = "check-fs"
self.modules = FS_MODULES + MIXED_MODULES

View File

@@ -0,0 +1,166 @@
[
{
"identifier": "iPhone4,1",
"description": "iPhone 4S"
},
{
"identifier": "iPhone5,1",
"description": "iPhone 5"
},
{
"identifier": "iPhone5,2",
"description": "iPhone 5"
},
{
"identifier": "iPhone5,3",
"description": "iPhone 5c"
},
{
"identifier": "iPhone5,4",
"description": "iPhone 5c"
},
{
"identifier": "iPhone6,1",
"description": "iPhone 5s"
},
{
"identifier": "iPhone6,2",
"description": "iPhone 5s"
},
{
"identifier": "iPhone7,1",
"description": "iPhone 6 Plus"
},
{
"identifier": "iPhone7,2",
"description": "iPhone 6"
},
{
"identifier": "iPhone8,1",
"description": "iPhone 6s"
},
{
"identifier": "iPhone8,2",
"description": "iPhone 6s Plus"
},
{
"identifier": "iPhone8,4",
"description": "iPhone SE (1st generation)"
},
{
"identifier": "iPhone9,1",
"description": "iPhone 7"
},
{
"identifier": "iPhone9,2",
"description": "iPhone 7 Plus"
},
{
"identifier": "iPhone9,3",
"description": "iPhone 7"
},
{
"identifier": "iPhone9,4",
"description": "iPhone 7 Plus"
},
{
"identifier": "iPhone10,1",
"description": "iPhone 8"
},
{
"identifier": "iPhone10,2",
"description": "iPhone 8 Plus"
},
{
"identifier": "iPhone10,3",
"description": "iPhone X"
},
{
"identifier": "iPhone10,4",
"description": "iPhone 8"
},
{
"identifier": "iPhone10,5",
"description": "iPhone 8 Plus"
},
{
"identifier": "iPhone10,6",
"description": "iPhone X"
},
{
"identifier": "iPhone11,2",
"description": "iPhone XS"
},
{
"identifier": "iPhone11,4",
"description": "iPhone XS Max"
},
{
"identifier": "iPhone11,6",
"description": "iPhone XS Max"
},
{
"identifier": "iPhone11,8",
"description": "iPhone XR"
},
{
"identifier": "iPhone12,1",
"description": "iPhone 11"
},
{
"identifier": "iPhone12,3",
"description": "iPhone 11 Pro"
},
{
"identifier": "iPhone12,5",
"description": "iPhone 11 Pro Max"
},
{
"identifier": "iPhone12,8",
"description": "iPhone SE (2nd generation)"
},
{
"identifier": "iPhone13,1",
"description": "iPhone 12 mini"
},
{
"identifier": "iPhone13,2",
"description": "iPhone 12"
},
{
"identifier": "iPhone13,3",
"description": "iPhone 12 Pro"
},
{
"identifier": "iPhone13,4",
"description": "iPhone 12 Pro Max"
},
{
"identifier": "iPhone14,4",
"description": "iPhone 13 Mini"
},
{
"identifier": "iPhone14,5",
"description": "iPhone 13"
},
{
"identifier": "iPhone14,2",
"description": "iPhone 13 Pro"
},
{
"identifier": "iPhone14,3",
"description": "iPhone 13 Pro Max"
},
{
"identifier": "iPhone14,8",
"decription": "iPhone 14 Plus"
},
{
"identifier": "iPhone15,2",
"description": "iPhone 14 Pro"
},
{
"identifier": "iPhone15,3",
"description": "iPhone 14 Pro Max"
}
]

View File

@@ -0,0 +1,931 @@
[
{
"build": "1A543a",
"version": "1.0"
},
{
"build": "1C25",
"version": "1.0.1"
},
{
"build": "1C28",
"version": "1.0.2"
},
{
"build": "3A109a",
"version": "1.1.1"
},
{
"build": "3B48b",
"version": "1.1.2"
},
{
"build": "4A93",
"version": "1.1.3"
},
{
"build": "4A102",
"version": "1.1.4"
},
{
"build": "5A347",
"version": "2.0"
},
{
"build": "5B108",
"version": "2.0.1"
},
{
"build": "5C1",
"version": "2.0.2"
},
{
"build": "5F136",
"version": "2.1"
},
{
"build": "5G77",
"version": "2.2"
},
{
"build": "5H11",
"version": "2.2.1"
},
{
"build": "7A341",
"version": "3.0"
},
{
"build": "7A400",
"version": "3.0.1"
},
{
"build": "7C144",
"version": "3.1"
},
{
"build": "7D11",
"version": "3.1.2"
},
{
"build": "7E18",
"version": "3.1.3"
},
{
"build": "8A293",
"version": "4.0"
},
{
"build": "8A306",
"version": "4.0.1"
},
{
"build": "8B117",
"version": "4.1"
},
{
"build": "8C148",
"version": "4.2"
},
{
"build": "8C148a",
"version": "4.2.1"
},
{
"build": "8C148",
"version": "4.2.1"
},
{
"build": "8E401",
"version": "4.2.8"
},
{
"build": "8E501",
"version": "4.2.9"
},
{
"build": "8E600",
"version": "4.2.10"
},
{
"build": "8F190",
"version": "4.3"
},
{
"build": "8J2",
"version": "4.3.3"
},
{
"build": "8K2",
"version": "4.3.4"
},
{
"build": "8L1",
"version": "4.3.5"
},
{
"build": "9A334",
"version": "5.0"
},
{
"build": "9A405",
"version": "5.0.1"
},
{
"build": "9A406",
"version": "5.0.1"
},
{
"build": "9B176",
"version": "5.1"
},
{
"build": "9B179",
"version": "5.1"
},
{
"build": "9B206",
"version": "5.1.1"
},
{
"build": "9B208",
"version": "5.1.1"
},
{
"build": "10A403",
"version": "6.0"
},
{
"build": "10A405",
"version": "6.0"
},
{
"build": "10A523",
"version": "6.0.1"
},
{
"build": "10A525",
"version": "6.0.1"
},
{
"build": "10A551",
"version": "6.0.2"
},
{
"build": "10B141",
"version": "6.1"
},
{
"build": "10B144",
"version": "6.1"
},
{
"build": "10B142",
"version": "6.1"
},
{
"build": "10B143",
"version": "6.1"
},
{
"build": "10B145",
"version": "6.1.1"
},
{
"build": "10B146",
"version": "6.1.2"
},
{
"build": "10B329",
"version": "6.1.3"
},
{
"build": "10B350",
"version": "6.1.4"
},
{
"build": "10B500",
"version": "6.1.6"
},
{
"build": "11B511",
"version": "7.0.3"
},
{
"build": "11B554a",
"version": "7.0.4"
},
{
"build": "11B601",
"version": "7.0.5"
},
{
"build": "11B651",
"version": "7.0.6"
},
{
"build": "11D169",
"version": "7.1"
},
{
"build": "11D167",
"version": "7.1"
},
{
"build": "11D201",
"version": "7.1.1"
},
{
"build": "11D257",
"version": "7.1.2"
},
{
"build": "12A365",
"version": "8.0"
},
{
"build": "12A366",
"version": "8.0"
},
{
"build": "12A402",
"version": "8.0.1"
},
{
"build": "12A405",
"version": "8.0.2"
},
{
"build": "12B411",
"version": "8.1"
},
{
"build": "12B435",
"version": "8.1.1"
},
{
"build": "12B436",
"version": "8.1.1"
},
{
"build": "12B440",
"version": "8.1.2"
},
{
"build": "12B466",
"version": "8.1.3"
},
{
"build": "12D508",
"version": "8.2"
},
{
"build": "12F70",
"version": "8.3"
},
{
"build": "12H143",
"version": "8.4"
},
{
"build": "12H321",
"version": "8.4.1"
},
{
"build": "13A344",
"version": "9.0"
},
{
"build": "13A342",
"version": "9.0"
},
{
"build": "13A343",
"version": "9.0"
},
{
"build": "13A404",
"version": "9.0.1"
},
{
"build": "13A405",
"version": "9.0.1"
},
{
"build": "13A452",
"version": "9.0.2"
},
{
"build": "13B143",
"version": "9.1"
},
{
"build": "13C75",
"version": "9.2"
},
{
"build": "13D15",
"version": "9.2.1"
},
{
"build": "13D20",
"version": "9.2.1"
},
{
"build": "13E237",
"version": "9.3"
},
{
"build": "13E233",
"version": "9.3"
},
{
"build": "13E234",
"version": "9.3"
},
{
"build": "13E238",
"version": "9.3.1"
},
{
"build": "13F69",
"version": "9.3.2"
},
{
"build": "13G34",
"version": "9.3.3"
},
{
"build": "13G35",
"version": "9.3.4"
},
{
"build": "13G36",
"version": "9.3.5"
},
{
"build": "13G37",
"version": "9.3.6"
},
{
"build": "14A403",
"version": "10.0.1"
},
{
"build": "14A456",
"version": "10.0.2"
},
{
"build": "14A551",
"version": "10.0.3"
},
{
"build": "14B72",
"version": "10.1"
},
{
"build": "14B72c",
"version": "10.1"
},
{
"build": "14B150",
"version": "10.1.1"
},
{
"build": "14B100",
"version": "10.1.1"
},
{
"build": "14C92",
"version": "10.2"
},
{
"build": "14D27",
"version": "10.2.1"
},
{
"build": "14E277",
"version": "10.3"
},
{
"build": "14E304",
"version": "10.3.1"
},
{
"build": "14F89",
"version": "10.3.2"
},
{
"build": "14G60",
"version": "10.3.3"
},
{
"build": "14G61",
"version": "10.3.4"
},
{
"build": "15A372",
"version": "11.0"
},
{
"build": "15A402",
"version": "11.0.1"
},
{
"version": "11.0.1",
"build": "15A403"
},
{
"build": "15A421",
"version": "11.0.2"
},
{
"build": "15A432",
"version": "11.0.3"
},
{
"build": "15B93",
"version": "11.1"
},
{
"build": "15B150",
"version": "11.1.1"
},
{
"build": "15B202",
"version": "11.1.2"
},
{
"build": "15C114",
"version": "11.2"
},
{
"build": "15C153",
"version": "11.2.1"
},
{
"build": "15C202",
"version": "11.2.2"
},
{
"build": "15D60",
"version": "11.2.5"
},
{
"build": "15D100",
"version": "11.2.6"
},
{
"build": "15E216",
"version": "11.3"
},
{
"build": "15E302",
"version": "11.3.1"
},
{
"build": "15F79",
"version": "11.4"
},
{
"build": "15G77",
"version": "11.4.1"
},
{
"build": "16A366",
"version": "12.0"
},
{
"build": "16A367",
"version": "12.0"
},
{
"build": "16A404",
"version": "12.0.1"
},
{
"build": "16A405",
"version": "12.0.1"
},
{
"build": "16B92",
"version": "12.1"
},
{
"build": "16B94",
"version": "12.1"
},
{
"build": "16B93",
"version": "12.1"
},
{
"build": "16C50",
"version": "12.1.1"
},
{
"build": "16C104",
"version": "12.1.2"
},
{
"build": "16C101",
"version": "12.1.2"
},
{
"build": "16D39",
"version": "12.1.3"
},
{
"build": "16D40",
"version": "12.1.3"
},
{
"build": "16D57",
"version": "12.1.4"
},
{
"build": "16E227",
"version": "12.2"
},
{
"build": "16F156",
"version": "12.3"
},
{
"build": "16F203",
"version": "12.3.1"
},
{
"build": "16F250",
"version": "12.3.2"
},
{
"build": "16G77",
"version": "12.4"
},
{
"build": "16G102",
"version": "12.4.1"
},
{
"build": "16G114",
"version": "12.4.2"
},
{
"build": "16G130",
"version": "12.4.3"
},
{
"build": "16G140",
"version": "12.4.4"
},
{
"build": "16G161",
"version": "12.4.5"
},
{
"build": "16G183",
"version": "12.4.6"
},
{
"build": "16G192",
"version": "12.4.7"
},
{
"build": "16G201",
"version": "12.4.8"
},
{
"build": "16H5",
"version": "12.4.9"
},
{
"build": "16H20",
"version": "12.5"
},
{
"build": "16H22",
"version": "12.5.1"
},
{
"build": "16H30",
"version": "12.5.2"
},
{
"build": "16H41",
"version": "12.5.3"
},
{
"build": "16H50",
"version": "12.5.4"
},
{
"build": "16H62",
"version": "12.5.5"
},
{
"build": "16H71",
"version": "12.5.6"
},
{
"build": "16H81",
"version": "12.5.7"
},
{
"build": "17A577",
"version": "13.0"
},
{
"build": "17A844",
"version": "13.1"
},
{
"build": "17A854",
"version": "13.1.1"
},
{
"build": "17A860",
"version": "13.1.2"
},
{
"build": "17A861",
"version": "13.1.2"
},
{
"build": "17A878",
"version": "13.1.3"
},
{
"build": "17B84",
"version": "13.2"
},
{
"build": "17B102",
"version": "13.2.2"
},
{
"build": "17B111",
"version": "13.2.3"
},
{
"build": "17C54",
"version": "13.3"
},
{
"build": "17D50",
"version": "13.3.1"
},
{
"build": "17E255",
"version": "13.4"
},
{
"build": "17E8255",
"version": "13.4"
},
{
"build": "17E262",
"version": "13.4.1"
},
{
"build": "17E8258",
"version": "13.4.1"
},
{
"build": "17F75",
"version": "13.5"
},
{
"build": "17F80",
"version": "13.5.1"
},
{
"build": "17G68",
"version": "13.6"
},
{
"build": "17G80",
"version": "13.6.1"
},
{
"build": "17H35",
"version": "13.7"
},
{
"build": "18A373",
"version": "14.0"
},
{
"build": "18A393",
"version": "14.0.1"
},
{
"build": "18A8395",
"version": "14.1"
},
{
"build": "18B92",
"version": "14.2"
},
{
"version": "14.2",
"build": "18B111"
},
{
"version": "14.2.1",
"build": "18B121"
},
{
"build": "18C66",
"version": "14.3"
},
{
"build": "18D52",
"version": "14.4"
},
{
"build": "18D61",
"version": "14.4.1"
},
{
"build": "18D70",
"version": "14.4.2"
},
{
"build": "18E199",
"version": "14.5"
},
{
"build": "18E212",
"version": "14.5.1"
},
{
"build": "18F72",
"version": "14.6"
},
{
"build": "18G69",
"version": "14.7"
},
{
"build": "18G82",
"version": "14.7.1"
},
{
"build": "18H17",
"version": "14.8"
},
{
"build": "18H107",
"version": "14.8.1"
},
{
"build": "19A341",
"version": "15.0"
},
{
"build": "19A346",
"version": "15.0"
},
{
"build": "19A348",
"version": "15.0.1"
},
{
"build": "19A404",
"version": "15.0.2"
},
{
"build": "19B74",
"version": "15.1"
},
{
"build": "19B81",
"version": "15.1.1"
},
{
"build": "19C56",
"version": "15.2"
},
{
"build": "19C63",
"version": "15.2.1"
},
{
"build": "19D50",
"version": "15.3"
},
{
"build": "19D52",
"version": "15.3.1"
},
{
"build": "19E241",
"version": "15.4"
},
{
"build": "19E258",
"version": "15.4.1"
},
{
"build": "19F77",
"version": "15.5"
},
{
"build": "19G71",
"version": "15.6"
},
{
"build": "19G82",
"version": "15.6.1"
},
{
"build": "19H12",
"version": "15.7"
},
{
"build": "19H117",
"version": "15.7.1"
},
{
"build": "19H218",
"version": "15.7.2"
},
{
"version": "15.7.3",
"build": "19H307"
},
{
"version": "15.7.4",
"build": "19H321"
},
{
"version": "15.7.5",
"build": "19H332"
},
{
"version": "15.7.6",
"build": "19H349"
},
{
"version": "15.7.7",
"build": "19H357"
},
{
"build": "20A362",
"version": "16.0"
},
{
"build": "20A371",
"version": "16.0.1"
},
{
"build": "20A380",
"version": "16.0.2"
},
{
"build": "20A392",
"version": "16.0.3"
},
{
"build": "20B82",
"version": "16.1"
},
{
"build": "20B101",
"version": "16.1.1"
},
{
"build": "20B110",
"version": "16.1.2"
},
{
"build": "20C65",
"version": "16.2"
},
{
"build": "20D47",
"version": "16.3"
},
{
"build": "20D67",
"version": "16.3.1"
},
{
"build": "20E247",
"version": "16.4"
},
{
"build": "20E252",
"version": "16.4.1"
},
{
"version": "16.5",
"beta": null,
"build": "20F66"
},
{
"version": "16.5.1",
"build": "20F75"
}
]

View File

@@ -55,13 +55,19 @@ class DecryptBackup:
log.critical("The backup does not seem encrypted!")
return False
def _process_file(self, relative_path: str, domain: str, item,
file_id: str, item_folder: str) -> None:
self._backup.getFileDecryptedCopy(manifestEntry=item,
targetName=file_id,
targetFolder=item_folder)
log.info("Decrypted file %s [%s] to %s/%s", relative_path, domain,
item_folder, file_id)
def _process_file(
self, relative_path: str, domain: str, item, file_id: str, item_folder: str
) -> None:
self._backup.getFileDecryptedCopy(
manifestEntry=item, targetName=file_id, targetFolder=item_folder
)
log.info(
"Decrypted file %s [%s] to %s/%s",
relative_path,
domain,
item_folder,
file_id,
)
def process_backup(self) -> None:
if not os.path.exists(self.dest_path):
@@ -83,11 +89,12 @@ class DecryptBackup:
# This may be a partial backup. Skip files from the manifest
# which do not exist locally.
source_file_path = os.path.join(self.backup_path, file_id[0:2],
file_id)
source_file_path = os.path.join(self.backup_path, file_id[0:2], file_id)
if not os.path.exists(source_file_path):
log.debug("Skipping file %s. File not found in encrypted backup directory.",
source_file_path)
log.debug(
"Skipping file %s. File not found in encrypted backup directory.",
source_file_path,
)
continue
item_folder = os.path.join(self.dest_path, file_id[0:2])
@@ -99,10 +106,10 @@ class DecryptBackup:
# Add manifest plist to both keys to handle this.
item["manifest"] = item["file"]
pool.apply_async(self._process_file, args=(relative_path,
domain, item,
file_id,
item_folder))
pool.apply_async(
self._process_file,
args=(relative_path, domain, item, file_id, item_folder),
)
except Exception as exc:
log.error("Failed to decrypt file %s: %s", relative_path, exc)
@@ -112,10 +119,8 @@ class DecryptBackup:
# Copying over the root plist files as well.
for file_name in os.listdir(self.backup_path):
if file_name.endswith(".plist"):
log.info("Copied plist file %s to %s",
file_name, self.dest_path)
shutil.copy(os.path.join(self.backup_path, file_name),
self.dest_path)
log.info("Copied plist file %s to %s", file_name, self.dest_path)
shutil.copy(os.path.join(self.backup_path, file_name), self.dest_path)
def decrypt_with_password(self, password: str) -> None:
"""Decrypts an encrypted iOS backup.
@@ -123,22 +128,26 @@ class DecryptBackup:
:param password: Password to use to decrypt the original backup
"""
log.info("Decrypting iOS backup at path %s with password",
self.backup_path)
log.info("Decrypting iOS backup at path %s with password", self.backup_path)
if not os.path.exists(os.path.join(self.backup_path, "Manifest.plist")):
possible = glob.glob(os.path.join(
self.backup_path, "*", "Manifest.plist"))
possible = glob.glob(os.path.join(self.backup_path, "*", "Manifest.plist"))
if len(possible) == 1:
newpath = os.path.dirname(possible[0])
log.warning("No Manifest.plist in %s, using %s instead.",
self.backup_path, newpath)
log.warning(
"No Manifest.plist in %s, using %s instead.",
self.backup_path,
newpath,
)
self.backup_path = newpath
elif len(possible) > 1:
log.critical("No Manifest.plist in %s, and %d Manifest.plist files in subdirs. "
"Please choose one!",
self.backup_path, len(possible))
log.critical(
"No Manifest.plist in %s, and %d Manifest.plist files in subdirs. "
"Please choose one!",
self.backup_path,
len(possible),
)
return
# Before proceeding, we check whether the backup is indeed encrypted.
@@ -146,23 +155,33 @@ class DecryptBackup:
return
try:
self._backup = iOSbackup(udid=os.path.basename(self.backup_path),
cleartextpassword=password,
backuproot=os.path.dirname(self.backup_path))
self._backup = iOSbackup(
udid=os.path.basename(self.backup_path),
cleartextpassword=password,
backuproot=os.path.dirname(self.backup_path),
)
except Exception as exc:
if (isinstance(exc, KeyError)
and len(exc.args) > 0
and exc.args[0] == b"KEY"):
if (
isinstance(exc, KeyError)
and len(exc.args) > 0
and exc.args[0] == b"KEY"
):
log.critical("Failed to decrypt backup. Password is probably wrong.")
elif (isinstance(exc, FileNotFoundError)
and os.path.basename(exc.filename) == "Manifest.plist"):
log.critical("Failed to find a valid backup at %s. "
"Did you point to the right backup path?",
self.backup_path)
elif (
isinstance(exc, FileNotFoundError)
and os.path.basename(exc.filename) == "Manifest.plist"
):
log.critical(
"Failed to find a valid backup at %s. "
"Did you point to the right backup path?",
self.backup_path,
)
else:
log.exception(exc)
log.critical("Failed to decrypt backup. Did you provide the correct password? "
"Did you point to the right backup path?")
log.critical(
"Failed to decrypt backup. Did you provide the correct password? "
"Did you point to the right backup path?"
)
def decrypt_with_key_file(self, key_file: str) -> None:
"""Decrypts an encrypted iOS backup using a key file.
@@ -170,8 +189,11 @@ class DecryptBackup:
:param key_file: File to read the key bytes to decrypt the backup
"""
log.info("Decrypting iOS backup at path %s with key file %s",
self.backup_path, key_file)
log.info(
"Decrypting iOS backup at path %s with key file %s",
self.backup_path,
key_file,
)
# Before proceeding, we check whether the backup is indeed encrypted.
if not self.is_encrypted(self.backup_path):
@@ -182,17 +204,23 @@ class DecryptBackup:
# Key should be 64 hex encoded characters (32 raw bytes)
if len(key_bytes) != 64:
log.critical("Invalid key from key file. Did you provide the correct key file?")
log.critical(
"Invalid key from key file. Did you provide the correct key file?"
)
return
try:
key_bytes_raw = binascii.unhexlify(key_bytes)
self._backup = iOSbackup(udid=os.path.basename(self.backup_path),
derivedkey=key_bytes_raw,
backuproot=os.path.dirname(self.backup_path))
self._backup = iOSbackup(
udid=os.path.basename(self.backup_path),
derivedkey=key_bytes_raw,
backuproot=os.path.dirname(self.backup_path),
)
except Exception as exc:
log.exception(exc)
log.critical("Failed to decrypt backup. Did you provide the correct key file?")
log.critical(
"Failed to decrypt backup. Did you provide the correct key file?"
)
def get_key(self) -> None:
"""Retrieve and prints the encryption key."""
@@ -200,8 +228,11 @@ class DecryptBackup:
return
self._decryption_key = self._backup.getDecryptionKey()
log.info("Derived decryption key for backup at path %s is: \"%s\"",
self.backup_path, self._decryption_key)
log.info(
'Derived decryption key for backup at path %s is: "%s"',
self.backup_path,
self._decryption_key,
)
def write_key(self, key_path: str) -> None:
"""Save extracted key to file.
@@ -214,13 +245,15 @@ class DecryptBackup:
return
try:
with open(key_path, 'w', encoding="utf-8") as handle:
with open(key_path, "w", encoding="utf-8") as handle:
handle.write(self._decryption_key)
except Exception as exc:
log.exception(exc)
log.critical("Failed to write key to file: %s", key_path)
return
else:
log.info("Wrote decryption key to file: %s. This file is "
"equivalent to a plaintext password. Keep it safe!",
key_path)
log.info(
"Wrote decryption key to file: %s. This file is "
"equivalent to a plaintext password. Keep it safe!",
key_path,
)

View File

@@ -22,31 +22,51 @@ class BackupInfo(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.results = {}
def run(self) -> None:
info_path = os.path.join(self.target_path, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device "
"information")
raise DatabaseNotFoundError(
"No Info.plist at backup path, unable to extract device " "information"
)
with open(info_path, "rb") as handle:
info = plistlib.load(handle)
fields = ["Build Version", "Device Name", "Display Name",
"GUID", "ICCID", "IMEI", "MEID", "Installed Applications",
"Last Backup Date", "Phone Number", "Product Name",
"Product Type", "Product Version", "Serial Number",
"Target Identifier", "Target Type", "Unique Identifier",
"iTunes Version"]
fields = [
"Build Version",
"Device Name",
"Display Name",
"GUID",
"ICCID",
"IMEI",
"MEID",
"Installed Applications",
"Last Backup Date",
"Phone Number",
"Product Name",
"Product Type",
"Product Version",
"Serial Number",
"Target Identifier",
"Target Type",
"Unique Identifier",
"iTunes Version",
]
for field in fields:
value = info.get(field, None)

View File

@@ -13,7 +13,9 @@ from mvt.common.utils import convert_datetime_to_iso
from ..base import IOSExtraction
CONF_PROFILES_DOMAIN = "SysSharedContainerDomain-systemgroup.com.apple.configurationprofiles"
CONF_PROFILES_DOMAIN = (
"SysSharedContainerDomain-systemgroup.com.apple.configurationprofiles"
)
class ConfigurationProfiles(IOSExtraction):
@@ -24,26 +26,31 @@ class ConfigurationProfiles(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
if not record["install_date"]:
return {}
payload_name = record['plist'].get('PayloadDisplayName')
payload_description = record['plist'].get('PayloadDescription')
payload_name = record["plist"].get("PayloadDisplayName")
payload_description = record["plist"].get("PayloadDescription")
return {
"timestamp": record["install_date"],
"module": self.__class__.__name__,
"event": "configuration_profile_install",
"data": f"{record['plist']['PayloadType']} installed: {record['plist']['PayloadUUID']} "
f"- {payload_name}: {payload_description}"
f"- {payload_name}: {payload_description}",
}
def check_indicators(self) -> None:
@@ -58,10 +65,12 @@ class ConfigurationProfiles(IOSExtraction):
# indicator list.
ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
if ioc:
self.log.warning("Found a known malicious configuration "
"profile \"%s\" with UUID %s",
result['plist']['PayloadDisplayName'],
result['plist']['PayloadUUID'])
self.log.warning(
"Found a known malicious configuration "
'profile "%s" with UUID %s',
result["plist"]["PayloadDisplayName"],
result["plist"]["PayloadUUID"],
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@@ -69,22 +78,26 @@ class ConfigurationProfiles(IOSExtraction):
# Highlight suspicious configuration profiles which may be used
# to hide notifications.
if payload_content["PayloadType"] in ["com.apple.notificationsettings"]:
self.log.warning("Found a potentially suspicious configuration profile "
"\"%s\" with payload type %s",
result['plist']['PayloadDisplayName'],
payload_content['PayloadType'])
self.log.warning(
"Found a potentially suspicious configuration profile "
'"%s" with payload type %s',
result["plist"]["PayloadDisplayName"],
payload_content["PayloadType"],
)
self.detected.append(result)
continue
def run(self) -> None:
for conf_file in self._get_backup_files_from_manifest(
domain=CONF_PROFILES_DOMAIN):
domain=CONF_PROFILES_DOMAIN
):
conf_rel_path = conf_file["relative_path"]
# Filter out all configuration files that are not configuration
# profiles.
if not conf_rel_path or not os.path.basename(
conf_rel_path).startswith("profile-"):
if not conf_rel_path or not os.path.basename(conf_rel_path).startswith(
"profile-"
):
continue
conf_file_path = self._get_backup_file_from_id(conf_file["file_id"])
@@ -100,37 +113,75 @@ class ConfigurationProfiles(IOSExtraction):
# TODO: Tidy up the following code hell.
if "SignerCerts" in conf_plist:
conf_plist["SignerCerts"] = [b64encode(x) for x in conf_plist["SignerCerts"]]
conf_plist["SignerCerts"] = [
b64encode(x) for x in conf_plist["SignerCerts"]
]
if "OTAProfileStub" in conf_plist:
if "SignerCerts" in conf_plist["OTAProfileStub"]:
conf_plist["OTAProfileStub"]["SignerCerts"] = [b64encode(x) for x in conf_plist["OTAProfileStub"]["SignerCerts"]]
conf_plist["OTAProfileStub"]["SignerCerts"] = [
b64encode(x)
for x in conf_plist["OTAProfileStub"]["SignerCerts"]
]
if "PayloadContent" in conf_plist["OTAProfileStub"]:
if "EnrollmentIdentityPersistentID" in conf_plist["OTAProfileStub"]["PayloadContent"]:
conf_plist["OTAProfileStub"]["PayloadContent"]["EnrollmentIdentityPersistentID"] = b64encode(conf_plist["OTAProfileStub"]["PayloadContent"]["EnrollmentIdentityPersistentID"])
if (
"EnrollmentIdentityPersistentID"
in conf_plist["OTAProfileStub"]["PayloadContent"]
):
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
] = b64encode(
conf_plist["OTAProfileStub"]["PayloadContent"][
"EnrollmentIdentityPersistentID"
]
)
if "PushTokenDataSentToServerKey" in conf_plist:
conf_plist["PushTokenDataSentToServerKey"] = b64encode(conf_plist["PushTokenDataSentToServerKey"])
conf_plist["PushTokenDataSentToServerKey"] = b64encode(
conf_plist["PushTokenDataSentToServerKey"]
)
if "LastPushTokenHash" in conf_plist:
conf_plist["LastPushTokenHash"] = b64encode(conf_plist["LastPushTokenHash"])
conf_plist["LastPushTokenHash"] = b64encode(
conf_plist["LastPushTokenHash"]
)
if "PayloadContent" in conf_plist:
for content_entry in range(len(conf_plist["PayloadContent"])):
if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"] = b64encode(conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"])
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"PERSISTENT_REF"
]
)
if "IdentityPersistentRef" in conf_plist["PayloadContent"][content_entry]:
conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"] = b64encode(conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"])
if (
"IdentityPersistentRef"
in conf_plist["PayloadContent"][content_entry]
):
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
] = b64encode(
conf_plist["PayloadContent"][content_entry][
"IdentityPersistentRef"
]
)
self.results.append({
"file_id": conf_file["file_id"],
"relative_path": conf_file["relative_path"],
"domain": conf_file["domain"],
"plist": conf_plist,
"install_date": convert_datetime_to_iso(conf_plist.get("InstallDate")),
})
self.results.append(
{
"file_id": conf_file["file_id"],
"relative_path": conf_file["relative_path"],
"domain": conf_file["domain"],
"plist": conf_plist,
"install_date": convert_datetime_to_iso(
conf_plist.get("InstallDate")
),
}
)
self.log.info("Extracted details about %d configuration profiles",
len(self.results))
self.log.info(
"Extracted details about %d configuration profiles", len(self.results)
)

View File

@@ -26,13 +26,18 @@ class Manifest(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def _get_key(self, dictionary, key):
"""Unserialized plist objects can have keys which are str or byte types
@@ -42,8 +47,7 @@ class Manifest(IOSExtraction):
:param key:
"""
return (dictionary.get(key.encode("utf-8"), None)
or dictionary.get(key, None))
return dictionary.get(key.encode("utf-8"), None) or dictionary.get(key, None)
@staticmethod
def _convert_timestamp(timestamp_or_unix_time_int):
@@ -62,20 +66,23 @@ class Manifest(IOSExtraction):
if "modified" not in record or "status_changed" not in record:
return records
for timestamp in set([record["created"], record["modified"],
record["status_changed"]]):
for timestamp in set(
[record["created"], record["modified"], record["status_changed"]]
):
macb = ""
macb += "M" if timestamp == record["modified"] else "-"
macb += "-"
macb += "C" if timestamp == record["status_changed"] else "-"
macb += "B" if timestamp == record["created"] else "-"
records.append({
"timestamp": timestamp,
"module": self.__class__.__name__,
"event": macb,
"data": f"{record['relative_path']} - {record['domain']}"
})
records.append(
{
"timestamp": timestamp,
"module": self.__class__.__name__,
"event": macb,
"data": f"{record['relative_path']} - {record['domain']}",
}
)
return records
@@ -85,10 +92,15 @@ class Manifest(IOSExtraction):
continue
if result["domain"]:
if (os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist"
and result["domain"] == "RootDomain"):
self.log.warning("Found a potentially suspicious "
"\"com.apple.CrashReporter.plist\" file created in RootDomain")
if (
os.path.basename(result["relative_path"])
== "com.apple.CrashReporter.plist"
and result["domain"] == "RootDomain"
):
self.log.warning(
"Found a potentially suspicious "
'"com.apple.CrashReporter.plist" file created in RootDomain'
)
self.detected.append(result)
continue
@@ -109,8 +121,12 @@ class Manifest(IOSExtraction):
ioc = self.indicators.check_domain(part)
if ioc:
self.log.warning("Found mention of domain \"%s\" in a backup file with "
"path: %s", ioc["value"], rel_path)
self.log.warning(
'Found mention of domain "%s" in a backup file with '
"path: %s",
ioc["value"],
rel_path,
)
result["matched_indicator"] = ioc
self.detected.append(result)
@@ -119,8 +135,7 @@ class Manifest(IOSExtraction):
if not os.path.isfile(manifest_db_path):
raise DatabaseNotFoundError("unable to find backup's Manifest.db")
self.log.info("Found Manifest.db database at path: %s",
manifest_db_path)
self.log.info("Found Manifest.db database at path: %s", manifest_db_path)
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
@@ -148,27 +163,33 @@ class Manifest(IOSExtraction):
birth = self._get_key(file_metadata, "Birth")
last_modified = self._get_key(file_metadata, "LastModified")
last_status_change = self._get_key(file_metadata,
"LastStatusChange")
last_status_change = self._get_key(
file_metadata, "LastStatusChange"
)
cleaned_metadata.update({
"created": self._convert_timestamp(birth),
"modified": self._convert_timestamp(last_modified),
"status_changed": self._convert_timestamp(last_status_change),
"mode": oct(self._get_key(file_metadata, "Mode")),
"owner": self._get_key(file_metadata, "UserID"),
"size": self._get_key(file_metadata, "Size"),
})
cleaned_metadata.update(
{
"created": self._convert_timestamp(birth),
"modified": self._convert_timestamp(last_modified),
"status_changed": self._convert_timestamp(
last_status_change
),
"mode": oct(self._get_key(file_metadata, "Mode")),
"owner": self._get_key(file_metadata, "UserID"),
"size": self._get_key(file_metadata, "Size"),
}
)
except Exception:
self.log.exception("Error reading manifest file metadata for file with ID %s "
"and relative path %s",
file_data["fileID"],
file_data["relativePath"])
self.log.exception(
"Error reading manifest file metadata for file with ID %s "
"and relative path %s",
file_data["fileID"],
file_data["relativePath"],
)
self.results.append(cleaned_metadata)
cur.close()
conn.close()
self.log.info("Extracted a total of %d file metadata items",
len(self.results))
self.log.info("Extracted a total of %d file metadata items", len(self.results))

View File

@@ -21,18 +21,24 @@ class ProfileEvents(IOSExtraction):
"""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -40,8 +46,8 @@ class ProfileEvents(IOSExtraction):
"module": self.__class__.__name__,
"event": "profile_operation",
"data": f"Process {record.get('process')} started operation "
f"{record.get('operation')} of profile "
f"{record.get('profile_id')}"
f"{record.get('operation')} of profile "
f"{record.get('profile_id')}",
}
def check_indicators(self) -> None:
@@ -92,21 +98,24 @@ class ProfileEvents(IOSExtraction):
def run(self) -> None:
for events_file in self._get_backup_files_from_manifest(
relative_path=CONF_PROFILES_EVENTS_RELPATH):
events_file_path = self._get_backup_file_from_id(
events_file["file_id"])
relative_path=CONF_PROFILES_EVENTS_RELPATH
):
events_file_path = self._get_backup_file_from_id(events_file["file_id"])
if not events_file_path:
continue
self.log.info("Found MCProfileEvents.plist file at %s",
events_file_path)
self.log.info("Found MCProfileEvents.plist file at %s", events_file_path)
with open(events_file_path, "rb") as handle:
self.results.extend(self.parse_profile_events(handle.read()))
for result in self.results:
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
result.get("timestamp"), result.get("process"),
result.get("operation"), result.get("profile_id"))
self.log.info(
'On %s process "%s" started operation "%s" of profile "%s"',
result.get("timestamp"),
result.get("process"),
result.get("operation"),
result.get("profile_id"),
)
self.log.info("Extracted %d profile events", len(self.results))

View File

@@ -11,8 +11,7 @@ import sqlite3
import subprocess
from typing import Iterator, Optional, Union
from mvt.common.module import (DatabaseCorruptedError, DatabaseNotFoundError,
MVTModule)
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError, MVTModule
class IOSExtraction(MVTModule):
@@ -24,19 +23,25 @@ class IOSExtraction(MVTModule):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
self.is_backup = False
self.is_fs_dump = False
def _recover_sqlite_db_if_needed(self, file_path: str,
forced: Optional[bool] = False) -> None:
def _recover_sqlite_db_if_needed(
self, file_path: str, forced: bool = False
) -> None:
"""Tries to recover a malformed database by running a .clone command.
:param file_path: Path to the malformed database file.
@@ -59,30 +64,35 @@ class IOSExtraction(MVTModule):
if not recover:
return
self.log.info("Database at path %s is malformed. Trying to recover...",
file_path)
self.log.info(
"Database at path %s is malformed. Trying to recover...", file_path
)
if not shutil.which("sqlite3"):
raise DatabaseCorruptedError("failed to recover without sqlite3 binary: please install "
"sqlite3!")
raise DatabaseCorruptedError(
"failed to recover without sqlite3 binary: please install " "sqlite3!"
)
if '"' in file_path:
raise DatabaseCorruptedError(f"database at path '{file_path}' is corrupted. unable to "
"recover because it has a quotation mark (\") in its name")
raise DatabaseCorruptedError(
f"database at path '{file_path}' is corrupted. unable to "
'recover because it has a quotation mark (") in its name'
)
bak_path = f"{file_path}.bak"
shutil.move(file_path, bak_path)
ret = subprocess.call(["sqlite3", bak_path, f".clone \"{file_path}\""],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
ret = subprocess.call(
["sqlite3", bak_path, f'.clone "{file_path}"'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
if ret != 0:
raise DatabaseCorruptedError("failed to recover database")
self.log.info("Database at path %s recovered successfully!", file_path)
def _get_backup_files_from_manifest(
self,
relative_path: Optional[str] = None,
domain: Optional[str] = None
self, relative_path: Optional[str] = None, domain: Optional[str] = None
) -> Iterator[dict]:
"""Locate files from Manifest.db.
@@ -102,16 +112,19 @@ class IOSExtraction(MVTModule):
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
if relative_path and domain:
cur.execute(f"{base_sql} relativePath = ? AND domain = ?;",
(relative_path, domain))
cur.execute(
f"{base_sql} relativePath = ? AND domain = ?;",
(relative_path, domain),
)
else:
if relative_path:
if "*" in relative_path:
cur.execute(f"{base_sql} relativePath LIKE ?;",
(relative_path.replace("*", "%"),))
cur.execute(
f"{base_sql} relativePath LIKE ?;",
(relative_path.replace("*", "%"),),
)
else:
cur.execute(f"{base_sql} relativePath = ?;",
(relative_path,))
cur.execute(f"{base_sql} relativePath = ?;", (relative_path,))
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as exc:
@@ -133,17 +146,14 @@ class IOSExtraction(MVTModule):
def _get_fs_files_from_patterns(self, root_paths: list) -> Iterator[str]:
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.target_path,
root_path)):
for found_path in glob.glob(os.path.join(self.target_path, root_path)):
if not os.path.exists(found_path):
continue
yield found_path
def _find_ios_database(
self,
backup_ids: Optional[list] = None,
root_paths: Optional[list] = None
self, backup_ids: Optional[list] = None, root_paths: Optional[list] = None
) -> None:
"""Try to locate a module's database file from either an iTunes
backup or a full filesystem dump. This is intended only for

View File

@@ -15,6 +15,16 @@ from .webkit_indexeddb import WebkitIndexedDB
from .webkit_localstorage import WebkitLocalStorage
from .webkit_safariviewservice import WebkitSafariViewService
FS_MODULES = [CacheFiles, Filesystem, Netusage, Analytics, AnalyticsIOSVersions,
SafariFavicon, ShutdownLog, IOSVersionHistory, WebkitIndexedDB,
WebkitLocalStorage, WebkitSafariViewService]
FS_MODULES = [
CacheFiles,
Filesystem,
Netusage,
Analytics,
AnalyticsIOSVersions,
SafariFavicon,
ShutdownLog,
IOSVersionHistory,
WebkitIndexedDB,
WebkitLocalStorage,
WebkitSafariViewService,
]

View File

@@ -27,13 +27,18 @@ class Analytics(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -54,9 +59,12 @@ class Analytics(IOSExtraction):
ioc = self.indicators.check_process(value)
if ioc:
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
value, result["artifact"],
result["isodate"])
self.log.warning(
'Found mention of a malicious process "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
@@ -64,9 +72,12 @@ class Analytics(IOSExtraction):
ioc = self.indicators.check_domain(value)
if ioc:
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
value, result["artifact"],
result["isodate"])
self.log.warning(
'Found mention of a malicious domain "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
@@ -78,7 +89,8 @@ class Analytics(IOSExtraction):
cur = conn.cursor()
try:
cur.execute("""
cur.execute(
"""
SELECT
timestamp,
data
@@ -93,9 +105,11 @@ class Analytics(IOSExtraction):
timestamp,
data
FROM all_events;
""")
"""
)
except sqlite3.OperationalError:
cur.execute("""
cur.execute(
"""
SELECT
timestamp,
data
@@ -105,7 +119,8 @@ class Analytics(IOSExtraction):
timestamp,
data
FROM soft_failures;
""")
"""
)
for row in cur:
if row[0] and row[1]:
@@ -131,14 +146,14 @@ class Analytics(IOSExtraction):
def process_analytics_dbs(self):
for file_path in self._get_fs_files_from_patterns(ANALYTICS_DB_PATH):
self.file_path = file_path
self.log.info("Found Analytics database file at path: %s",
file_path)
self.log.info("Found Analytics database file at path: %s", file_path)
self._extract_analytics_data()
def run(self) -> None:
self.process_analytics_dbs()
self.log.info("Extracted %d records from analytics databases",
len(self.results))
self.log.info(
"Extracted %d records from analytics databases", len(self.results)
)
self.results = sorted(self.results, key=lambda entry: entry["isodate"])

View File

@@ -23,13 +23,18 @@ class AnalyticsIOSVersions(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -68,13 +73,19 @@ class AnalyticsIOSVersions(IOSExtraction):
for build, isodate in builds.items():
version = find_version_by_build(build)
self.results.append({
"isodate": isodate,
"build": build,
"version": version,
})
self.results.append(
{
"isodate": isodate,
"build": build,
"version": version,
}
)
self.results = sorted(self.results, key=lambda entry: entry["isodate"])
for result in self.results:
self.log.info("iOS version %s (%s) first appeared on %s",
result["version"], result["build"], result["isodate"])
self.log.info(
"iOS version %s (%s) first appeared on %s",
result["version"],
result["build"],
result["isodate"],
)

View File

@@ -12,29 +12,35 @@ from ..base import IOSExtraction
class CacheFiles(IOSExtraction):
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
records = []
for item in self.results[record]:
records.append({
"timestamp": item["isodate"],
"module": self.__class__.__name__,
"event": "cache_response",
"data": f"{record} recorded visit to URL {item['url']}"
})
records.append(
{
"timestamp": item["isodate"],
"module": self.__class__.__name__,
"event": "cache_response",
"data": f"{record} recorded visit to URL {item['url']}",
}
)
return records
@@ -49,7 +55,9 @@ class CacheFiles(IOSExtraction):
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected:
self.detected[key] = [value, ]
self.detected[key] = [
value,
]
else:
self.detected[key].append(value)
@@ -69,14 +77,16 @@ class CacheFiles(IOSExtraction):
self.results[key_name] = []
for row in cur:
self.results[key_name].append({
"entry_id": row[0],
"version": row[1],
"hash_value": row[2],
"storage_policy": row[3],
"url": row[4],
"isodate": row[5],
})
self.results[key_name].append(
{
"entry_id": row[0],
"version": row[1],
"hash_value": row[2],
"storage_policy": row[3],
"url": row[4],
"isodate": row[5],
}
)
def run(self) -> None:
self.results = {}

View File

@@ -22,13 +22,18 @@ class Filesystem(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -67,8 +72,7 @@ class Filesystem(IOSExtraction):
dir_path = os.path.join(root, dir_name)
result = {
"path": os.path.relpath(dir_path, self.target_path),
"modified": convert_unix_to_iso(
os.stat(dir_path).st_mtime),
"modified": convert_unix_to_iso(os.stat(dir_path).st_mtime),
}
except Exception:
continue
@@ -80,8 +84,7 @@ class Filesystem(IOSExtraction):
file_path = os.path.join(root, file_name)
result = {
"path": os.path.relpath(file_path, self.target_path),
"modified": convert_unix_to_iso(
os.stat(file_path).st_mtime),
"modified": convert_unix_to_iso(os.stat(file_path).st_mtime),
}
except Exception:
continue

View File

@@ -11,7 +11,7 @@ from ..net_base import NetBase
NETUSAGE_ROOT_PATHS = [
"private/var/networkd/netusage.sqlite",
"private/var/networkd/db/netusage.sqlite"
"private/var/networkd/db/netusage.sqlite",
]
@@ -27,13 +27,18 @@ class Netusage(NetBase):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None:
for netusage_path in self._get_fs_files_from_patterns(NETUSAGE_ROOT_PATHS):
@@ -42,8 +47,11 @@ class Netusage(NetBase):
try:
self._extract_net_data()
except sqlite3.OperationalError as exc:
self.log.info("Skipping this NetUsage database because "
"it seems empty or malformed: %s", exc)
self.log.info(
"Skipping this NetUsage database because "
"it seems empty or malformed: %s",
exc,
)
continue
self._find_suspicious_processes()

View File

@@ -25,13 +25,18 @@ class SafariFavicon(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -39,7 +44,7 @@ class SafariFavicon(IOSExtraction):
"module": self.__class__.__name__,
"event": "safari_favicon",
"data": f"Safari favicon from {record['url']} with icon URL "
f"{record['icon_url']} ({record['type']})",
f"{record['icon_url']} ({record['type']})",
}
def check_indicators(self) -> None:
@@ -60,7 +65,8 @@ class SafariFavicon(IOSExtraction):
# Fetch valid icon cache.
cur = conn.cursor()
cur.execute("""
cur.execute(
"""
SELECT
page_url.url,
icon_info.url,
@@ -68,47 +74,52 @@ class SafariFavicon(IOSExtraction):
FROM page_url
JOIN icon_info ON page_url.uuid = icon_info.uuid
ORDER BY icon_info.timestamp;
""")
"""
)
for row in cur:
self.results.append({
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_mactime_to_iso(row[2]),
"type": "valid",
"safari_favicon_db_path": file_path,
})
self.results.append(
{
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_mactime_to_iso(row[2]),
"type": "valid",
"safari_favicon_db_path": file_path,
}
)
# Fetch icons from the rejected icons table.
cur.execute("""
cur.execute(
"""
SELECT
page_url,
icon_url,
timestamp
FROM rejected_resources ORDER BY timestamp;
""")
"""
)
for row in cur:
self.results.append({
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_mactime_to_iso(row[2]),
"type": "rejected",
"safari_favicon_db_path": file_path,
})
self.results.append(
{
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_mactime_to_iso(row[2]),
"type": "rejected",
"safari_favicon_db_path": file_path,
}
)
cur.close()
conn.close()
def run(self) -> None:
for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS):
self.log.info("Found Safari favicon cache database at path: %s",
file_path)
self.log.info("Found Safari favicon cache database at path: %s", file_path)
self._process_favicon_db(file_path)
self.log.info("Extracted a total of %d favicon records",
len(self.results))
self.log.info("Extracted a total of %d favicon records", len(self.results))
self.results = sorted(self.results, key=lambda x: x["isodate"])

View File

@@ -23,13 +23,18 @@ class ShutdownLog(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -37,7 +42,7 @@ class ShutdownLog(IOSExtraction):
"module": self.__class__.__name__,
"event": "shutdown",
"data": f"Client {record['client']} with PID {record['pid']} "
"was running when the device was shut down",
"was running when the device was shut down",
}
def check_indicators(self) -> None:
@@ -54,8 +59,11 @@ class ShutdownLog(IOSExtraction):
for ioc in self.indicators.get_iocs("processes"):
parts = result["client"].split("/")
if ioc in parts:
self.log.warning("Found mention of a known malicious process \"%s\" in "
"shutdown.log", ioc)
self.log.warning(
'Found mention of a known malicious process "%s" in '
"shutdown.log",
ioc,
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@@ -66,28 +74,32 @@ class ShutdownLog(IOSExtraction):
line = line.strip()
if line.startswith("remaining client pid:"):
current_processes.append({
"pid": line[line.find("pid: ")+5:line.find(" (")],
"client": line[line.find("(")+1:line.find(")")],
})
current_processes.append(
{
"pid": line[line.find("pid: ") + 5 : line.find(" (")],
"client": line[line.find("(") + 1 : line.find(")")],
}
)
elif line.startswith("SIGTERM: "):
try:
mac_timestamp = int(line[line.find("[")+1:line.find("]")])
mac_timestamp = int(line[line.find("[") + 1 : line.find("]")])
except ValueError:
try:
start = line.find(" @") + 2
mac_timestamp = int(line[start:start+10])
mac_timestamp = int(line[start : start + 10])
except Exception:
mac_timestamp = 0
isodate = convert_mactime_to_iso(mac_timestamp, from_2001=False)
for current_process in current_processes:
self.results.append({
"isodate": isodate,
"pid": current_process["pid"],
"client": current_process["client"],
})
self.results.append(
{
"isodate": isodate,
"pid": current_process["pid"],
"client": current_process["client"],
}
)
current_processes = []

View File

@@ -25,13 +25,18 @@ class IOSVersionHistory(IOSExtraction):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -46,12 +51,15 @@ class IOSVersionHistory(IOSExtraction):
with open(found_path, "r", encoding="utf-8") as analytics_log:
log_line = json.loads(analytics_log.readline().strip())
timestamp = datetime.datetime.strptime(log_line["timestamp"],
"%Y-%m-%d %H:%M:%S.%f %z")
timestamp = datetime.datetime.strptime(
log_line["timestamp"], "%Y-%m-%d %H:%M:%S.%f %z"
)
timestamp_utc = timestamp.astimezone(datetime.timezone.utc)
self.results.append({
"isodate": convert_datetime_to_iso(timestamp_utc),
"os_version": log_line["os_version"],
})
self.results.append(
{
"isodate": convert_datetime_to_iso(timestamp_utc),
"os_version": log_line["os_version"],
}
)
self.results = sorted(self.results, key=lambda entry: entry["isodate"])

View File

@@ -35,8 +35,10 @@ class WebkitBase(IOSExtraction):
name = name.replace("https_", "https://")
url = name.split("_")[0]
self.results.append({
"folder": key,
"url": url,
"isodate": convert_unix_to_iso(os.stat(found_path).st_mtime),
})
self.results.append(
{
"folder": key,
"url": url,
"isodate": convert_unix_to_iso(os.stat(found_path).st_mtime),
}
)

View File

@@ -27,13 +27,18 @@ class WebkitIndexedDB(WebkitBase):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -41,10 +46,11 @@ class WebkitIndexedDB(WebkitBase):
"module": self.__class__.__name__,
"event": "webkit_indexeddb",
"data": f"IndexedDB folder {record['folder']} containing "
f"file for URL {record['url']}",
f"file for URL {record['url']}",
}
def run(self) -> None:
self._process_webkit_folder(WEBKIT_INDEXEDDB_ROOT_PATHS)
self.log.info("Extracted a total of %d WebKit IndexedDB records",
len(self.results))
self.log.info(
"Extracted a total of %d WebKit IndexedDB records", len(self.results)
)

View File

@@ -25,13 +25,18 @@ class WebkitLocalStorage(WebkitBase):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
return {
@@ -39,10 +44,12 @@ class WebkitLocalStorage(WebkitBase):
"module": self.__class__.__name__,
"event": "webkit_local_storage",
"data": f"WebKit Local Storage folder {record['folder']} "
f"containing file for URL {record['url']}",
f"containing file for URL {record['url']}",
}
def run(self) -> None:
self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit Local Storages",
len(self.results))
self.log.info(
"Extracted a total of %d records from WebKit Local Storages",
len(self.results),
)

View File

@@ -25,15 +25,22 @@ class WebkitSafariViewService(WebkitBase):
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def run(self) -> None:
self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData",
len(self.results))
self.log.info(
"Extracted a total of %d records from WebKit SafariViewService WebsiteData",
len(self.results),
)

View File

@@ -26,9 +26,27 @@ from .webkit_resource_load_statistics import WebkitResourceLoadStatistics
from .webkit_session_resource_log import WebkitSessionResourceLog
from .whatsapp import Whatsapp
MIXED_MODULES = [Calls, ChromeFavicon, ChromeHistory, Contacts, FirefoxFavicon,
FirefoxHistory, IDStatusCache, InteractionC, LocationdClients,
OSAnalyticsADDaily, Datausage, SafariBrowserState, SafariHistory,
TCC, SMS, SMSAttachments, WebkitResourceLoadStatistics,
WebkitSessionResourceLog, Whatsapp, Shortcuts, Applications,
Calendar]
MIXED_MODULES = [
Calls,
ChromeFavicon,
ChromeHistory,
Contacts,
FirefoxFavicon,
FirefoxHistory,
IDStatusCache,
InteractionC,
LocationdClients,
OSAnalyticsADDaily,
Datausage,
SafariBrowserState,
SafariHistory,
TCC,
SMS,
SMSAttachments,
WebkitResourceLoadStatistics,
WebkitSessionResourceLog,
Whatsapp,
Shortcuts,
Applications,
Calendar,
]

View File

@@ -21,18 +21,24 @@ APPLICATIONS_DB_PATH = [
class Applications(IOSExtraction):
"""Extract information from accounts installed on the phone."""
def __init__(
self,
file_path: Optional[str] = None,
target_path: Optional[str] = None,
results_path: Optional[str] = None,
fast_mode: Optional[bool] = False,
fast_mode: bool = False,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None
results: Optional[list] = None,
) -> None:
super().__init__(file_path=file_path, target_path=target_path,
results_path=results_path, fast_mode=fast_mode,
log=log, results=results)
super().__init__(
file_path=file_path,
target_path=target_path,
results_path=results_path,
fast_mode=fast_mode,
log=log,
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
if "isodate" in record:
@@ -40,39 +46,62 @@ class Applications(IOSExtraction):
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "app_installed",
"data": f"App {record.get('name', '')} version {record.get('bundleShortVersionString', '')} from {record.get('artistName', '')} installed from {record.get('sourceApp', '')}"
"data": f"App {record.get('name', '')} version {record.get('bundleShortVersionString', '')} from {record.get('artistName', '')} installed from {record.get('sourceApp', '')}",
}
return []
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
if "softwareVersionBundleId" not in result:
self.log.warning(
"Suspicious application identified without softwareVersionBundleId"
)
self.detected.append(result)
continue
ioc = self.indicators.check_process(result["softwareVersionBundleId"])
if ioc:
self.log.warning("Malicious application %s identified", result["softwareVersionBundleId"])
self.log.warning(
"Malicious application %s identified",
result["softwareVersionBundleId"],
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_app_id(result["softwareVersionBundleId"])
if ioc:
self.log.warning("Malicious application %s identified", result["softwareVersionBundleId"])
self.log.warning(
"Malicious application %s identified",
result["softwareVersionBundleId"],
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if result.get("sourceApp", "com.apple.AppStore") not in ["com.apple.AppStore", "com.apple.dmd", "dmd"]:
self.log.warning("Suspicious app not installed from the App Store or MDM: %s", result["softwareVersionBundleId"])
if result.get("sourceApp", "com.apple.AppStore") not in [
"com.apple.AppStore",
"com.apple.dmd",
"dmd",
]:
self.log.warning(
"Suspicious app not installed from the App Store or MDM: %s",
result["softwareVersionBundleId"],
)
self.detected.append(result)
def _parse_itunes_timestamp(self, entry: Dict[str, Any]) -> None:
"""
Parse the iTunes metadata info
"""
if entry.get("com.apple.iTunesStore.downloadInfo", {}).get("purchaseDate", None):
if entry.get("com.apple.iTunesStore.downloadInfo", {}).get(
"purchaseDate", None
):
timestamp = datetime.strptime(
entry["com.apple.iTunesStore.downloadInfo"]["purchaseDate"],
"%Y-%m-%dT%H:%M:%SZ")
"%Y-%m-%dT%H:%M:%SZ",
)
timestamp_utc = timestamp.astimezone(timezone.utc)
entry["isodate"] = convert_datetime_to_iso(timestamp_utc)
@@ -119,5 +148,4 @@ class Applications(IOSExtraction):
for file_path in self._get_fs_files_from_patterns(APPLICATIONS_DB_PATH):
self._parse_itunes_metadata(file_path)
self.log.info("Extracted a total of %d applications",
len(self.results))
self.log.info("Extracted a total of %d applications", len(self.results))

Some files were not shown because too many files have changed in this diff Show More