Compare commits

...

50 Commits

Author SHA1 Message Date
Nex
cce9159eda Adding indicator to matched results 2022-01-23 15:01:49 +01:00
Nex
e1211991aa Bumped version 2022-01-23 14:17:43 +01:00
Nex
8ae9ca328c Added log line at the end to highlight number of detections 2022-01-21 16:50:32 +01:00
Nex
0e2eb51732 Fixed checking of indicators in filesystem module 2022-01-21 16:30:34 +01:00
Nex
b35cd4bc73 Added support for context-aware indicators.
This way when a detection is logged, the user can know which STIX2
file was matched by the module
2022-01-21 16:26:58 +01:00
Nex
1b4f99a31d Trying to catch missing argument error (ref: #211) 2022-01-21 12:20:22 +01:00
tek
e4e1716729 Bumped version 2022-01-20 15:28:42 +01:00
tek
083bc12351 Merge branch 'feature/check-file-path' 2022-01-20 15:19:37 +01:00
tek
cf6d392460 Adds more details on the download-iocs command 2022-01-20 13:29:50 +01:00
tek
95205d8e17 Adds indicators check to iOS TCC module 2022-01-18 17:12:20 +01:00
Nex
1460828c30 Uniforming style in test units 2022-01-18 16:33:13 +01:00
Nex
fa84b3f296 Revert "Testing with slightly older version of iOSbackup"
This reverts commit e1efaa5467.
2022-01-18 16:32:22 +01:00
Nex
e1efaa5467 Testing with slightly older version of iOSbackup 2022-01-18 16:27:14 +01:00
Nex
696d42fc6e Disabling tests for 3.7 due to iOSbackup requirements of >= 3.8 2022-01-18 16:22:29 +01:00
Nex
a0e1662726 Somehow mysteriously with >= pip doesn't find the version, with == does 2022-01-18 16:16:03 +01:00
Nex
51645bdbc0 Adding pip install for deps 2022-01-18 16:10:59 +01:00
Nex
bb1b108fd7 Cleaning build workflow 2022-01-18 16:09:01 +01:00
Nex
92f9dcb8a5 Tring to fix build 2022-01-18 16:08:14 +01:00
Nex
a6fd5fe1f3 Bumped version 2022-01-18 16:06:14 +01:00
Nex
3e0ef20fcd . 2022-01-18 16:05:01 +01:00
Nex
01f3acde2e Merge branch 'main' of github.com:mvt-project/mvt 2022-01-18 16:00:52 +01:00
Nex
b697874f56 Conforming the test files 2022-01-18 16:00:03 +01:00
Donncha Ó Cearbhaill
41d699f457 Add PyTest to Github actions 2022-01-18 15:59:16 +01:00
Donncha Ó Cearbhaill
6fcd40f6b6 Fix use of global list instance as self.results variable 2022-01-18 15:53:05 +01:00
tek
38bb583a9e Improves management of file path indicators 2022-01-18 15:50:31 +01:00
Donncha Ó Cearbhaill
48ec2d8fa8 Merge branch 'main' into tests 2022-01-18 15:30:40 +01:00
tek
798805c583 Improves Shortcut output 2022-01-18 13:06:35 +01:00
Nex
24be9e9570 Use default list of indicators files now that some default ones are automatically loaded 2022-01-14 16:26:14 +01:00
Nex
adbd95c559 Dots 2022-01-14 02:01:59 +01:00
Nex
8a707c288a Bumped version 2022-01-14 01:53:10 +01:00
Nex
4c906ad52e Renamed download iocs function 2022-01-14 01:52:57 +01:00
Nex
a2f8030cce Added new iOS versions 2022-01-14 01:41:48 +01:00
Nex
737007afdb Bumped version 2022-01-12 16:18:13 +01:00
Nex
33efeda90a Added TODO note 2022-01-12 16:10:15 +01:00
Nex
146f2ae57d Renaming check function for consistency 2022-01-12 16:02:13 +01:00
Nex
11bc916854 Sorted imports 2022-01-11 16:02:44 +01:00
Nex
3084876f31 Removing unused imports, fixing conditions, new lines 2022-01-11 16:02:01 +01:00
Nex
f63cb585b2 Shortened command to download-iocs 2022-01-11 15:59:01 +01:00
Nex
637aebcd89 Small cleanup 2022-01-11 15:53:10 +01:00
Nex
16a0de3af4 Added new module to highlight installed accessibility services 2022-01-11 15:16:26 +01:00
tek
15fbedccc9 Fixes a minor bug in WebkitResourceLoadStatistics 2022-01-10 18:09:31 +01:00
tek
e0514b20dd Catches exception in Shortcuts module if the table does not exist 2022-01-10 16:58:12 +01:00
Donncha Ó Cearbhaill
b2e9f0361b Fix repeated results due to global results[] variable 2022-01-07 18:24:24 +01:00
Donncha Ó Cearbhaill
e85c70c603 Generate stix2 for each test run 2022-01-07 17:51:21 +01:00
Donncha Ó Cearbhaill
3f8dade610 Move backup binary artifact to seperate folder 2022-01-07 17:08:46 +01:00
Donncha Ó Cearbhaill
54963b0b59 Update test PR to work with latest code, fix flake8 2022-01-07 17:03:53 +01:00
tek
513e2cc704 First test structure 2022-01-07 16:41:19 +01:00
tek
28d57e7178 Add command to download latest public indicators
Squashed commit of the following:

commit c0d9e8d5d188c13e7e5ec0612e99bfb7e25f47d4
Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
Date:   Fri Jan 7 16:05:12 2022 +0100

    Update name of indicators JSON file

commit f719e49c5f942cef64931ecf422b6a6e7b8c9f17
Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
Date:   Fri Jan 7 15:38:03 2022 +0100

    Do not set indicators option on module if no indicators were loaded

commit a289eb8de936f7d74c6c787cbb8daf5c5bec015c
Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
Date:   Fri Jan 7 14:43:00 2022 +0100

    Simplify code for loading IoCs

commit 0804563415ee80d76c13d3b38ffe639fa14caa14
Author: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
Date:   Fri Jan 7 13:43:47 2022 +0100

    Add metadata to IoC entries

commit 97d0e893c1a0736c4931363ff40f09a030b90cf6
Author: tek <tek@randhome.io>
Date:   Fri Dec 17 16:43:09 2021 +0100

    Implements automated loading of indicators

commit c381e14df92ae4d7d846a1c97bcf6639cc526082
Author: tek <tek@randhome.io>
Date:   Fri Dec 17 12:41:15 2021 +0100

    Improves download-indicators

commit b938e02ddfd0b916fd883f510b467491a4a84e5f
Author: tek <tek@randhome.io>
Date:   Fri Dec 17 01:44:26 2021 +0100

    Adds download-indicators for mvt-ios and mvt-android
2022-01-07 16:38:04 +01:00
Nex
dc8eeb618e Merge pull request #229 from NicolaiSoeborg/patch-1
Bump adb read timeout
2021-12-31 11:59:40 +01:00
Nicolai Søborg
c282d4341d Bump adb read timeout
Some adb commands (like `dumpsys`) are very slow and the default timeout is "only" 10s. 
A timeout of 200 seconds is chosen completely at random - works on my phone 🤷

Fixes https://github.com/mvt-project/mvt/issues/113
Fixes https://github.com/mvt-project/mvt/issues/228
2021-12-28 13:56:04 +01:00
56 changed files with 862 additions and 311 deletions

View File

@@ -16,7 +16,8 @@ jobs:
strategy:
fail-fast: false
matrix:
python-version: [3.7, 3.8, 3.9]
# python-version: [3.7, 3.8, 3.9]
python-version: [3.8, 3.9]
steps:
- uses: actions/checkout@v2
@@ -27,8 +28,9 @@ jobs:
- name: Install dependencies
run: |
python -m pip install --upgrade pip
python -m pip install flake8 pytest safety
python -m pip install flake8 pytest safety stix2
if [ -f requirements.txt ]; then pip install -r requirements.txt; fi
python -m pip install .
- name: Lint with flake8
run: |
# stop the build if there are Python syntax errors or undefined names
@@ -37,7 +39,5 @@ jobs:
flake8 . --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics
- name: Safety checks
run: safety check
# - name: Test with pytest
# run: |
# pytest
- name: Test with pytest
run: pytest

View File

@@ -41,4 +41,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
- [Predator from Cytrox](https://citizenlab.ca/2021/12/pegasus-vs-predator-dissidents-doubly-infected-iphone-reveals-cytrox-mercenary-spyware/) ([STIX2](https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-12-16_cytrox/cytrox.stix2))
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/stalkerware.stix2).
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators listed [here](https://github.com/mvt-project/mvt/blob/main/public_indicators.json) and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by mvt.
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.

View File

@@ -9,10 +9,10 @@ import os
import click
from rich.logging import RichHandler
from mvt.common.help import HELP_MSG_MODULE, HELP_MSG_IOC
from mvt.common.help import HELP_MSG_FAST, HELP_MSG_OUTPUT, HELP_MSG_LIST_MODULES
from mvt.common.help import HELP_MSG_SERIAL
from mvt.common.indicators import Indicators, IndicatorsFileBadFormat
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT, HELP_MSG_SERIAL)
from mvt.common.indicators import Indicators, download_indicators_files
from mvt.common.logo import logo
from mvt.common.module import run_module, save_timeline
@@ -129,13 +129,7 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
ctx.exit(1)
indicators = Indicators(log=log)
for ioc_path in iocs:
try:
indicators.parse_stix2(ioc_path)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
log.info("Loaded a total of %d indicators", indicators.ioc_count)
indicators.load_indicators_files(iocs)
timeline = []
timeline_detected = []
@@ -145,13 +139,12 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
m = adb_module(output_folder=output, fast_mode=fast,
log=logging.getLogger(adb_module.__module__))
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
if serial:
m.serial = serial
if indicators.ioc_count > 0:
indicators.log = m.log
m.indicators = indicators
run_module(m)
timeline.extend(m.timeline)
timeline_detected.extend(m.timeline_detected)
@@ -184,13 +177,7 @@ def check_backup(ctx, iocs, output, backup_path, serial):
ctx.exit(1)
indicators = Indicators(log=log)
for ioc_path in iocs:
try:
indicators.parse_stix2(ioc_path)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
log.info("Loaded a total of %d indicators", indicators.ioc_count)
indicators.load_indicators_files(iocs)
if os.path.isfile(backup_path):
log.critical("The path you specified is a not a folder!")
@@ -203,12 +190,18 @@ def check_backup(ctx, iocs, output, backup_path, serial):
for module in BACKUP_MODULES:
m = module(base_folder=backup_path, output_folder=output,
log=logging.getLogger(module.__module__))
if indicators.total_ioc_count:
m.indicators = indicators
m.indicators.log = m.log
if serial:
m.serial = serial
if indicators.ioc_count > 0:
indicators.log = m.log
m.indicators = indicators
run_module(m)
#==============================================================================
# Command: download-iocs
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators")
def download_indicators():
download_indicators_files(log)

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
from .chrome_history import ChromeHistory
from .dumpsys_accessibility import DumpsysAccessibility
from .dumpsys_batterystats import DumpsysBatterystats
from .dumpsys_full import DumpsysFull
from .dumpsys_packages import DumpsysPackages
@@ -18,6 +19,6 @@ from .sms import SMS
from .whatsapp import Whatsapp
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes,
DumpsysBatterystats, DumpsysProcstats,
DumpsysAccessibility, DumpsysBatterystats, DumpsysProcstats,
DumpsysPackages, DumpsysReceivers, DumpsysFull,
Packages, RootBinaries, Logcat, Files]

View File

@@ -112,7 +112,7 @@ class AndroidExtraction(MVTModule):
:returns: Output of command
"""
return self.device.shell(command)
return self.device.shell(command, read_timeout_s=200.0)
def _adb_check_if_root(self):
"""Check if we have a `su` binary on the Android device.

View File

@@ -0,0 +1,53 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import io
import logging
import os
from .base import AndroidExtraction
log = logging.getLogger(__name__)
class DumpsysAccessibility(AndroidExtraction):
"""This module extracts stats on accessibility."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
serial=None, fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
self._adb_connect()
stats = self._adb_command("dumpsys accessibility")
in_services = False
for line in stats.split("\n"):
if line.strip().startswith("installed services:"):
in_services = True
continue
if not in_services:
continue
if line.strip() == "}":
break
service = line.split(":")[1].strip()
log.info("Found installed accessibility service \"%s\"", service)
if self.output_folder:
acc_path = os.path.join(self.output_folder,
"dumpsys_accessibility.txt")
with io.open(acc_path, "w", encoding="utf-8") as handle:
handle.write(stats)
log.info("Records from dumpsys accessibility stored at %s",
acc_path)
self._adb_disconnect()

View File

@@ -3,12 +3,11 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
import stat
import datetime
import logging
import stat
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.utils import convert_timestamp_to_iso
from .base import AndroidExtraction
@@ -31,8 +30,8 @@ class Files(AndroidExtraction):
# Run find command with correct args and parse results.
# Check that full file printf options are suppported on first run.
if self.full_find == None:
output = self._adb_command(f"find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
if self.full_find is None:
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
if not (output or output.strip().splitlines()):
# Full find command failed to generate output, fallback to basic file arguments
self.full_find = False
@@ -40,7 +39,7 @@ class Files(AndroidExtraction):
self.full_find = True
found_files = []
if self.full_find == True:
if self.full_find is True:
# Run full file command and collect additonal file information.
output = self._adb_command(f"find '{file_path}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
for file_line in output.splitlines():
@@ -90,10 +89,6 @@ class Files(AndroidExtraction):
return
for result in self.results:
if self.indicators.check_filename(result["path"]):
self.log.warning("Found a known suspicous filename at path: \"%s\"", result["path"])
self.detected.append(result)
if self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
self.detected.append(result)

View File

@@ -95,6 +95,9 @@ class Packages(AndroidExtraction):
self._adb_connect()
packages = self._adb_command("pm list packages -U -u -i -f")
if packages.strip() == "Error: Unknown option: -U":
packages = self._adb_command("pm list packages -u -i -f")
for line in packages.split("\n"):
line = line.strip()
if not line.startswith("package:"):

View File

@@ -3,51 +3,71 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import io
import json
import os
import requests
from appdirs import user_data_dir
from .url import URL
class IndicatorsFileBadFormat(Exception):
pass
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
functions to compare extracted artifacts to the indicators.
"""
def __init__(self, log=None):
self.data_dir = user_data_dir("mvt")
self.log = log
self.ioc_domains = []
self.ioc_processes = []
self.ioc_emails = []
self.ioc_files = []
self.ioc_files_sha256 = []
self.ioc_app_ids = []
self.ios_profile_ids = []
self.ioc_count = 0
self._check_env_variable()
self.total_ioc_count = 0
def _add_indicator(self, ioc, iocs_list):
def _load_downloaded_indicators(self):
if not os.path.isdir(self.data_dir):
return
for f in os.listdir(self.data_dir):
if f.lower().endswith(".stix2"):
self.parse_stix2(os.path.join(self.data_dir, f))
def _check_stix2_env_variable(self):
"""
Checks if a variable MVT_STIX2 contains path to STIX Files.
"""
if "MVT_STIX2" not in os.environ:
return
paths = os.environ["MVT_STIX2"].split(":")
for path in paths:
if os.path.isfile(path):
self.parse_stix2(path)
else:
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
def _generate_indicators_file(self):
return {
"name": "",
"description": "",
"file_name": "",
"file_path": "",
"domains": [],
"processes": [],
"emails": [],
"file_names": [],
"file_paths": [],
"files_sha256": [],
"app_ids": [],
"ios_profile_ids": [],
"count": 0,
}
def _add_indicator(self, ioc, ioc_file, iocs_list):
if ioc not in iocs_list:
iocs_list.append(ioc)
self.ioc_count += 1
def _check_env_variable(self):
"""
Checks if a variable MVT_STIX2 contains path to STIX Files
"""
if "MVT_STIX2" in os.environ:
paths = os.environ["MVT_STIX2"].split(":")
for path in paths:
if os.path.isfile(path):
self.parse_stix2(path)
else:
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
ioc_file["count"] += 1
self.total_ioc_count += 1
def parse_stix2(self, file_path):
"""Extract indicators from a STIX2 file.
@@ -56,17 +76,27 @@ class Indicators:
:type file_path: str
"""
self.log.info("Parsing STIX2 indicators file at path %s",
file_path)
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
ioc_file = self._generate_indicators_file()
ioc_file["file_path"] = file_path
ioc_file["file_name"] = os.path.basename(file_path)
with open(file_path, "r") as handle:
try:
data = json.load(handle)
except json.decoder.JSONDecodeError:
raise IndicatorsFileBadFormat("Unable to parse STIX2 indicators file, the file seems malformed or in the wrong format")
self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.")
return
for entry in data.get("objects", []):
if entry.get("type", "") != "indicator":
entry_type = entry.get("type", "")
if entry_type == "malware":
ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"]
ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"]
continue
if entry_type != "indicator":
continue
key, value = entry.get("pattern", "").strip("[]").split("=")
@@ -75,28 +105,70 @@ class Indicators:
if key == "domain-name:value":
# We force domain names to lower case.
self._add_indicator(ioc=value.lower(),
iocs_list=self.ioc_domains)
ioc_file=ioc_file,
iocs_list=ioc_file["domains"])
elif key == "process:name":
self._add_indicator(ioc=value,
iocs_list=self.ioc_processes)
ioc_file=ioc_file,
iocs_list=ioc_file["processes"])
elif key == "email-addr:value":
# We force email addresses to lower case.
self._add_indicator(ioc=value.lower(),
iocs_list=self.ioc_emails)
ioc_file=ioc_file,
iocs_list=ioc_file["emails"])
elif key == "file:name":
self._add_indicator(ioc=value,
iocs_list=self.ioc_files)
elif key == "app:id":
ioc_file=ioc_file,
iocs_list=ioc_file["file_names"])
elif key == "file:path":
self._add_indicator(ioc=value,
iocs_list=self.ioc_app_ids)
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
iocs_list=self.ios_profile_ids)
ioc_file=ioc_file,
iocs_list=ioc_file["file_paths"])
elif key == "file:hashes.sha256":
self._add_indicator(ioc=value,
iocs_list=self.ioc_files_sha256)
ioc_file=ioc_file,
iocs_list=ioc_file["files_sha256"])
elif key == "app:id":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["app_ids"])
elif key == "configuration-profile:id":
self._add_indicator(ioc=value,
ioc_file=ioc_file,
iocs_list=ioc_file["ios_profile_ids"])
def check_domain(self, url) -> bool:
self.log.info("Loaded %d indicators from \"%s\" indicators file",
ioc_file["count"], ioc_file["name"])
self.ioc_files.append(ioc_file)
def load_indicators_files(self, files, load_default=True):
"""
Load a list of indicators files.
"""
for file_path in files:
if os.path.isfile(file_path):
self.parse_stix2(file_path)
else:
self.log.warning("This indicators file %s does not exist", file_path)
# Load downloaded indicators and any indicators from env variable.
if load_default:
self._load_downloaded_indicators()
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type):
for ioc_file in self.ioc_files:
for ioc in ioc_file.get(ioc_type, []):
yield {
"value": ioc,
"type": ioc_type,
"name": ioc_file["name"]
}
def check_domain(self, url):
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
@@ -106,9 +178,9 @@ class Indicators:
"""
# TODO: If the IOC domain contains a subdomain, it is not currently
# being matched.
# being matched.
if not url:
return False
return None
try:
# First we use the provided URL.
@@ -136,39 +208,40 @@ class Indicators:
except Exception:
# If URL parsing failed, we just try to do a simple substring
# match.
for ioc in self.ioc_domains:
if ioc.lower() in url:
self.log.warning("Maybe found a known suspicious domain: %s", url)
return True
for ioc in self.get_iocs("domains"):
if ioc["value"].lower() in url:
self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"",
url, ioc["name"])
return ioc
# If nothing matched, we can quit here.
return False
return None
# If all parsing worked, we start walking through available domain indicators.
for ioc in self.ioc_domains:
for ioc in self.get_iocs("domains"):
# First we check the full domain.
if final_url.domain.lower() == ioc:
if final_url.domain.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a known suspicious domain %s shortened as %s",
final_url.url, orig_url.url)
self.log.warning("Found a known suspicious domain %s shortened as %s matching indicators from \"%s\"",
final_url.url, orig_url.url, ioc["name"])
else:
self.log.warning("Found a known suspicious domain: %s", final_url.url)
self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"",
final_url.url, ioc["name"])
return True
return ioc
# Then we just check the top level domain.
if final_url.top_level.lower() == ioc:
if final_url.top_level.lower() == ioc["value"]:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a sub-domain matching a known suspicious top level %s shortened as %s",
final_url.url, orig_url.url)
self.log.warning("Found a sub-domain with suspicious top level %s shortened as %s matching indicators from \"%s\"",
final_url.url, orig_url.url, ioc["name"])
else:
self.log.warning("Found a sub-domain matching a known suspicious top level: %s", final_url.url)
self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"",
final_url.url, ioc["name"])
return True
return ioc
return False
def check_domains(self, urls) -> bool:
def check_domains(self, urls):
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
@@ -178,15 +251,14 @@ class Indicators:
"""
if not urls:
return False
return None
for url in urls:
if self.check_domain(url):
return True
check = self.check_domain(url)
if check:
return check
return False
def check_process(self, process) -> bool:
def check_process(self, process):
"""Check the provided process name against the list of process
indicators.
@@ -197,22 +269,22 @@ class Indicators:
"""
if not process:
return False
return None
proc_name = os.path.basename(process)
if proc_name in self.ioc_processes:
self.log.warning("Found a known suspicious process name \"%s\"", process)
return True
for ioc in self.get_iocs("processes"):
if proc_name == ioc["value"]:
self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"])
return ioc
if len(proc_name) == 16:
for bad_proc in self.ioc_processes:
if bad_proc.startswith(proc_name):
self.log.warning("Found a truncated known suspicious process name \"%s\"", process)
return True
if len(proc_name) == 16:
if ioc["value"].startswith(proc_name):
self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"",
process, ioc["name"])
return ioc
return False
def check_processes(self, processes) -> bool:
def check_processes(self, processes):
"""Check the provided list of processes against the list of
process indicators.
@@ -223,15 +295,14 @@ class Indicators:
"""
if not processes:
return False
return None
for process in processes:
if self.check_process(process):
return True
check = self.check_process(process)
if check:
return check
return False
def check_email(self, email) -> bool:
def check_email(self, email):
"""Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators
@@ -241,16 +312,35 @@ class Indicators:
"""
if not email:
return False
return None
if email.lower() in self.ioc_emails:
self.log.warning("Found a known suspicious email address: \"%s\"", email)
return True
for ioc in self.get_iocs("emails"):
if email.lower() == ioc["value"].lower():
self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"",
email, ioc["name"])
return ioc
return False
def check_file_name(self, file_name):
"""Check the provided file name against the list of file indicators.
def check_filename(self, file_path) -> bool:
"""Check the provided file path against the list of file indicators.
:param file_name: File name to check against file
indicators
:type file_name: str
:returns: True if the file name matched an indicator, otherwise False
:rtype: bool
"""
if not file_name:
return None
for ioc in self.get_iocs("file_names"):
if ioc["value"] == file_name:
self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"",
file_name, ioc["name"])
return ioc
def check_file_path(self, file_path):
"""Check the provided file path against the list of file indicators (both path and name).
:param file_path: File path or file name to check against file
indicators
@@ -260,34 +350,20 @@ class Indicators:
"""
if not file_path:
return False
return None
file_name = os.path.basename(file_path)
if file_name in self.ioc_files:
return True
ioc = self.check_file_name(os.path.basename(file_path))
if ioc:
return ioc
return False
def check_file_path(self, file_path) -> bool:
"""Check the provided file path against the list of file indicators.
:param file_path: File path or file name to check against file
indicators
:type file_path: str
:returns: True if the file path matched an indicator, otherwise False
:rtype: bool
"""
if not file_path:
return False
for ioc_file in self.ioc_files:
for ioc in self.get_iocs("file_paths"):
# Strip any trailing slash from indicator paths to match directories.
if file_path.startswith(ioc_file.rstrip("/")):
return True
return False
if file_path.startswith(ioc["value"].rstrip("/")):
self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"",
file_path, ioc["name"])
return ioc
def check_profile(self, profile_uuid) -> bool:
def check_profile(self, profile_uuid):
"""Check the provided configuration profile UUID against the list of indicators.
:param profile_uuid: Profile UUID to check against configuration profile indicators
@@ -296,7 +372,41 @@ class Indicators:
:rtype: bool
"""
if profile_uuid in self.ios_profile_ids:
return True
for ioc in self.get_iocs("ios_profile_ids"):
if profile_uuid in ioc["value"]:
self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"",
profile_uuid, ioc["name"])
return ioc
return False
def download_indicators_files(log):
"""
Download indicators from repo into MVT app data directory.
"""
data_dir = user_data_dir("mvt")
if not os.path.isdir(data_dir):
os.makedirs(data_dir, exist_ok=True)
# Download latest list of indicators from the MVT repo.
res = requests.get("https://github.com/mvt-project/mvt/raw/main/public_indicators.json")
if res.status_code != 200:
log.warning("Unable to find retrieve list of indicators from the MVT repository.")
return
for ioc_entry in res.json():
ioc_url = ioc_entry["stix2_url"]
log.info("Downloading indicator file '%s' from '%s'", ioc_entry["name"], ioc_url)
res = requests.get(ioc_url)
if res.status_code != 200:
log.warning("Could not find indicator file '%s'", ioc_url)
continue
clean_file_name = ioc_url.lstrip("https://").replace("/", "_")
ioc_path = os.path.join(data_dir, clean_file_name)
# Write file to disk. This will overwrite any older version of the STIX2 file.
with io.open(ioc_path, "w") as f:
f.write(res.text)
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))

View File

@@ -30,7 +30,7 @@ class MVTModule(object):
slug = None
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
fast_mode=False, log=None, results=None):
"""Initialize module.
:param file_path: Path to the module's database file, if there is any
@@ -51,7 +51,7 @@ class MVTModule(object):
self.fast_mode = fast_mode
self.log = log
self.indicators = None
self.results = results
self.results = results if results else []
self.detected = []
self.timeline = []
self.timeline_detected = []

View File

@@ -6,7 +6,7 @@
import requests
from packaging import version
MVT_VERSION = "1.4.1"
MVT_VERSION = "1.4.7"
def check_for_updates():

View File

@@ -10,10 +10,10 @@ import click
from rich.logging import RichHandler
from rich.prompt import Prompt
from mvt.common.help import HELP_MSG_MODULE, HELP_MSG_IOC
from mvt.common.help import HELP_MSG_FAST, HELP_MSG_OUTPUT
from mvt.common.help import HELP_MSG_LIST_MODULES
from mvt.common.indicators import Indicators, IndicatorsFileBadFormat
from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC,
HELP_MSG_LIST_MODULES, HELP_MSG_MODULE,
HELP_MSG_OUTPUT)
from mvt.common.indicators import Indicators, download_indicators_files
from mvt.common.logo import logo
from mvt.common.module import run_module, save_timeline
from mvt.common.options import MutuallyExclusiveOption
@@ -157,13 +157,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
ctx.exit(1)
indicators = Indicators(log=log)
for ioc_path in iocs:
try:
indicators.parse_stix2(ioc_path)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
log.info("Loaded a total of %d indicators", indicators.ioc_count)
indicators.load_indicators_files(iocs)
timeline = []
timeline_detected = []
@@ -174,8 +168,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
m = backup_module(base_folder=backup_path, output_folder=output, fast_mode=fast,
log=logging.getLogger(backup_module.__module__))
m.is_backup = True
if indicators.ioc_count > 0:
if indicators.total_ioc_count > 0:
m.indicators = indicators
m.indicators.log = m.log
@@ -189,6 +182,10 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
if len(timeline_detected) > 0:
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
if len(timeline_detected) > 0:
log.warning("The analysis of the backup produced %d detections!",
len(timeline_detected))
#==============================================================================
# Command: check-fs
@@ -220,13 +217,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
ctx.exit(1)
indicators = Indicators(log=log)
for ioc_path in iocs:
try:
indicators.parse_stix2(ioc_path)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
log.info("Loaded a total of %d indicators", indicators.ioc_count)
indicators.load_indicators_files(iocs)
timeline = []
timeline_detected = []
@@ -238,8 +229,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
log=logging.getLogger(fs_module.__module__))
m.is_fs_dump = True
if iocs:
if indicators.total_ioc_count > 0:
m.indicators = indicators
m.indicators.log = m.log
@@ -253,20 +243,23 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
if len(timeline_detected) > 0:
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
if len(timeline_detected) > 0:
log.warning("The analysis of the filesystem produced %d detections!",
len(timeline_detected))
#==============================================================================
# Command: check-iocs
#==============================================================================
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators")
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
default=[], required=True, help=HELP_MSG_IOC)
default=[], help=HELP_MSG_IOC)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
all_modules = []
for entry in BACKUP_MODULES + FS_MODULES:
for entry in BACKUP_MODULES + FS_MODULES + MIXED_MODULES:
if entry not in all_modules:
all_modules.append(entry)
@@ -280,14 +273,9 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
log.info("Checking stored results against provided indicators...")
indicators = Indicators(log=log)
for ioc_path in iocs:
try:
indicators.parse_stix2(ioc_path)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
log.info("Loaded a total of %d indicators", indicators.ioc_count)
indicators.load_indicators_files(iocs)
total_detections = 0
for file_name in os.listdir(folder):
name_only, ext = os.path.splitext(file_name)
file_path = os.path.join(folder, file_name)
@@ -304,11 +292,25 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
m = iocs_module.from_json(file_path,
log=logging.getLogger(iocs_module.__module__))
m.indicators = indicators
m.indicators.log = m.log
if indicators.total_ioc_count > 0:
m.indicators = indicators
m.indicators.log = m.log
try:
m.check_indicators()
except NotImplementedError:
continue
else:
total_detections += len(m.detected)
if total_detections > 0:
log.warning("The check of the results produced %d detections!",
total_detections)
#==============================================================================
# Command: download-iocs
#==============================================================================
@cli.command("download-iocs", help="Download public STIX2 indicators")
def download_iocs():
download_indicators_files(log)

View File

@@ -7,6 +7,7 @@
import os
import plistlib
from base64 import b64encode
from mvt.common.utils import convert_timestamp_to_iso
from ..base import IOSExtraction
@@ -45,8 +46,10 @@ class ConfigurationProfiles(IOSExtraction):
payload_content = result["plist"]["PayloadContent"][0]
# Alert on any known malicious configuration profiles in the indicator list.
if self.indicators.check_profile(result["plist"]["PayloadUUID"]):
ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
if ioc:
self.log.warning(f"Found a known malicious configuration profile \"{result['plist']['PayloadDisplayName']}\" with UUID '{result['plist']['PayloadUUID']}'.")
result["matched_indicator"] = ioc
self.detected.append(result)
continue
@@ -70,7 +73,7 @@ class ConfigurationProfiles(IOSExtraction):
with open(conf_file_path, "rb") as handle:
try:
conf_plist = plistlib.load(handle)
except:
except Exception:
conf_plist = {}
if "SignerCerts" in conf_plist:

View File

@@ -72,9 +72,7 @@ class Manifest(IOSExtraction):
return
for result in self.results:
if "relative_path" not in result:
continue
if not result["relative_path"]:
if not result.get("relative_path"):
continue
if result["domain"]:
@@ -83,16 +81,15 @@ class Manifest(IOSExtraction):
self.detected.append(result)
continue
if self.indicators.check_filename(result["relative_path"]):
self.log.warning("Found a known malicious file at path: %s", result["relative_path"])
if self.indicators.check_file_path("/" + result["relative_path"]):
self.detected.append(result)
continue
relPath = result["relative_path"].lower()
for ioc in self.indicators.ioc_domains:
if ioc.lower() in relPath:
rel_path = result["relative_path"].lower()
for ioc in self.indicators.get_iocs("domains"):
if ioc["value"].lower() in rel_path:
self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s",
ioc, relPath)
ioc["value"], rel_path)
self.detected.append(result)
def run(self):

View File

@@ -37,20 +37,24 @@ class Analytics(IOSExtraction):
return
for result in self.results:
for ioc in self.indicators.ioc_processes:
for key in result.keys():
if ioc == result[key]:
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
ioc, result["artifact"], result["timestamp"])
self.detected.append(result)
break
for ioc in self.indicators.ioc_domains:
for key in result.keys():
if ioc in str(result[key]):
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
ioc, result["artifact"], result["timestamp"])
self.detected.append(result)
break
for value in result.values():
if not isinstance(value, str):
continue
ioc = self.indicators.check_process(value)
if ioc:
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_domain(value)
if ioc:
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
value, result["artifact"], result["timestamp"])
result["matched_indicator"] = ioc
self.detected.append(result)
def _extract_analytics_data(self):
artifact = self.file_path.split("/")[-1]
@@ -101,6 +105,7 @@ class Analytics(IOSExtraction):
timestamp = ""
data = plistlib.loads(row[1])
data["timestamp"] = timestamp
data["artifact"] = artifact
self.results.append(data)

View File

@@ -34,13 +34,15 @@ class CacheFiles(IOSExtraction):
return
self.detected = {}
for key, items in self.results.items():
for item in items:
if self.indicators.check_domain(item["url"]):
for key, values in self.results.items():
for value in values:
ioc = self.indicators.check_domain(value["url"])
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected:
self.detected[key] = [item, ]
self.detected[key] = [value, ]
else:
self.detected[key].append(item)
self.detected[key].append(value)
def _process_cache_file(self, file_path):
self.log.info("Processing cache file at path: %s", file_path)

View File

@@ -37,23 +37,25 @@ class Filesystem(IOSExtraction):
return
for result in self.results:
if self.indicators.check_file(result["path"]):
self.log.warning("Found a known malicious file name at path: %s", result["path"])
if "path" not in result:
continue
ioc = self.indicators.check_file_path(result["path"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
if self.indicators.check_file_path(result["path"]):
self.log.warning("Found a known malicious file path at path: %s", result["path"])
self.detected.append(result)
# If we are instructed to run fast, we skip this.
# If we are instructed to run fast, we skip the rest.
if self.fast_mode:
self.log.info("Flag --fast was enabled: skipping extended search for suspicious files/processes")
else:
for ioc in self.indicators.ioc_processes:
parts = result["path"].split("/")
if ioc in parts:
self.log.warning("Found a known malicious file/process at path: %s", result["path"])
self.detected.append(result)
continue
for ioc in self.indicators.get_iocs("processes"):
parts = result["path"].split("/")
if ioc["value"] in parts:
self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"",
result["path"], ioc["name"])
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
for root, dirs, files in os.walk(self.base_folder):

View File

@@ -37,7 +37,12 @@ class SafariFavicon(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]):
ioc = self.indicators.check_domain(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_favicon_db(self, file_path):

View File

@@ -34,12 +34,20 @@ class ShutdownLog(IOSExtraction):
return
for result in self.results:
for ioc in self.indicators.ioc_processes:
ioc = self.indicators.check_file_path(result["client"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
for ioc in self.indicators.get_iocs("processes"):
parts = result["client"].split("/")
if ioc in parts:
self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log",
ioc)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def process_shutdownlog(self, content):
current_processes = []

View File

@@ -18,9 +18,11 @@ class WebkitBase(IOSExtraction):
if not self.indicators:
return
for item in self.results:
if self.indicators.check_domain(item["url"]):
self.detected.append(item)
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_webkit_folder(self, root_paths):
for found_path in self._get_fs_files_from_patterns(root_paths):

View File

@@ -16,13 +16,13 @@ from .net_datausage import Datausage
from .osanalytics_addaily import OSAnalyticsADDaily
from .safari_browserstate import SafariBrowserState
from .safari_history import SafariHistory
from .shortcuts import Shortcuts
from .sms import SMS
from .sms_attachments import SMSAttachments
from .tcc import TCC
from .webkit_resource_load_statistics import WebkitResourceLoadStatistics
from .webkit_session_resource_log import WebkitSessionResourceLog
from .whatsapp import Whatsapp
from .shortcuts import Shortcuts
MIXED_MODULES = [Calls, ChromeFavicon, ChromeHistory, Contacts, FirefoxFavicon,
FirefoxHistory, IDStatusCache, InteractionC, LocationdClients,

View File

@@ -42,7 +42,12 @@ class ChromeFavicon(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]):
ioc = self.indicators.check_domain(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):

View File

@@ -41,7 +41,9 @@ class ChromeHistory(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]):
ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):

View File

@@ -40,8 +40,12 @@ class FirefoxFavicon(IOSExtraction):
return
for result in self.results:
if (self.indicators.check_domain(result.get("url", "")) or
self.indicators.check_domain(result.get("history_url", ""))):
ioc = self.indicators.check_domain(result.get("url", ""))
if not ioc:
ioc = self.indicators.check_domain(result.get("history_url", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):

View File

@@ -44,7 +44,9 @@ class FirefoxHistory(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]):
ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):

View File

@@ -43,7 +43,9 @@ class IDStatusCache(IOSExtraction):
for result in self.results:
if result.get("user", "").startswith("mailto:"):
email = result["user"][7:].strip("'")
if self.indicators.check_email(email):
ioc = self.indicators.check_email(email)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue

View File

@@ -41,13 +41,13 @@ class LocationdClients(IOSExtraction):
def serialize(self, record):
records = []
for ts in self.timestamps:
if ts in record.keys():
for timestamp in self.timestamps:
if timestamp in record.keys():
records.append({
"timestamp": record[ts],
"timestamp": record[timestamp],
"module": self.__class__.__name__,
"event": ts,
"data": f"{ts} from {record['package']}"
"event": timestamp,
"data": f"{timestamp} from {record['package']}"
})
return records
@@ -60,8 +60,40 @@ class LocationdClients(IOSExtraction):
parts = result["package"].split("/")
proc_name = parts[len(parts)-1]
if self.indicators.check_process(proc_name):
ioc = self.indicators.check_process(proc_name)
if ioc:
self.log.warning("Found a suspicious process name in LocationD entry %s",
result["package"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if "BundlePath" in result:
ioc = self.indicators.check_file_path(result["BundlePath"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["BundlePath"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if "Executable" in result:
ioc = self.indicators.check_file_path(result["Executable"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["Executable"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if "Registered" in result:
ioc = self.indicators.check_file_path(result["Registered"])
if ioc:
self.log.warning("Found a suspicious file path in Location D: %s",
result["Registered"])
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def _extract_locationd_entries(self, file_path):
with open(file_path, "rb") as handle:

View File

@@ -41,7 +41,9 @@ class OSAnalyticsADDaily(IOSExtraction):
return
for result in self.results:
if self.indicators.check_process(result["package"]):
ioc = self.indicators.check_process(result["package"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):

View File

@@ -44,16 +44,22 @@ class SafariBrowserState(IOSExtraction):
return
for result in self.results:
if "tab_url" in result and self.indicators.check_domain(result["tab_url"]):
self.detected.append(result)
continue
if "tab_url" in result:
ioc = self.indicators.check_domain(result["tab_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
if "session_data" not in result:
continue
for session_entry in result["session_data"]:
if "entry_url" in session_entry and self.indicators.check_domain(session_entry["entry_url"]):
self.detected.append(result)
if "entry_url" in session_entry:
ioc = self.indicators.check_domain(session_entry["entry_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_browser_state_db(self, db_path):
conn = sqlite3.connect(db_path)

View File

@@ -80,7 +80,9 @@ class SafariHistory(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]):
ioc = self.indicators.check_domain(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def _process_history_db(self, history_path):

View File

@@ -3,12 +3,13 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import sqlite3
import io
import plistlib
import itertools
import plistlib
import sqlite3
from mvt.common.utils import check_for_links, convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from ..base import IOSExtraction
@@ -33,21 +34,31 @@ class Shortcuts(IOSExtraction):
found_urls = ""
if record["action_urls"]:
found_urls = "- URLs in actions: {}".format(", ".join(record["action_urls"]))
desc = ""
if record["description"]:
desc = record["description"].decode('utf-8', errors='ignore')
return {
return [{
"timestamp": record["isodate"],
"module": self.__class__.__name__,
"event": "shortcut",
"data": f"iOS Shortcut '{record['shortcut_name']}': {record['description']} {found_urls}"
}
"event": "shortcut_created",
"data": f"iOS Shortcut '{record['shortcut_name'].decode('utf-8')}': {desc} {found_urls}"
}, {
"timestamp": record["modified_date"],
"module": self.__class__.__name__,
"event": "shortcut_modified",
"data": f"iOS Shortcut '{record['shortcut_name'].decode('utf-8')}': {desc} {found_urls}"
}]
def check_indicators(self):
if not self.indicators:
return
for action in self.results:
if self.indicators.check_domains(action["action_urls"]):
self.detected.append(action)
for result in self.results:
ioc = self.indicators.check_domains(result["action_urls"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS,
@@ -57,17 +68,25 @@ class Shortcuts(IOSExtraction):
conn = sqlite3.connect(self.file_path)
conn.text_factory = bytes
cur = conn.cursor()
cur.execute("""
SELECT
ZSHORTCUT.Z_PK as "shortcut_id",
ZSHORTCUT.ZNAME as "shortcut_name",
ZSHORTCUT.ZCREATIONDATE as "created_date",
ZSHORTCUT.ZMODIFICATIONDATE as "modified_date",
ZSHORTCUT.ZACTIONSDESCRIPTION as "description",
ZSHORTCUTACTIONS.ZDATA as "action_data"
FROM ZSHORTCUT
LEFT JOIN ZSHORTCUTACTIONS ON ZSHORTCUTACTIONS.ZSHORTCUT == ZSHORTCUT.Z_PK;
""")
try:
cur.execute("""
SELECT
ZSHORTCUT.Z_PK as "shortcut_id",
ZSHORTCUT.ZNAME as "shortcut_name",
ZSHORTCUT.ZCREATIONDATE as "created_date",
ZSHORTCUT.ZMODIFICATIONDATE as "modified_date",
ZSHORTCUT.ZACTIONSDESCRIPTION as "description",
ZSHORTCUTACTIONS.ZDATA as "action_data"
FROM ZSHORTCUT
LEFT JOIN ZSHORTCUTACTIONS ON ZSHORTCUTACTIONS.ZSHORTCUT == ZSHORTCUT.Z_PK;
""")
except sqlite3.OperationalError:
# Table ZSHORTCUT does not exist
self.log.info("Invalid shortcut database format, skipping...")
cur.close()
conn.close()
return
names = [description[0] for description in cur.description]
for item in cur:
@@ -83,14 +102,13 @@ class Shortcuts(IOSExtraction):
action["identifier"] = action_entry["WFWorkflowActionIdentifier"]
action["parameters"] = action_entry["WFWorkflowActionParameters"]
# URLs might be in multiple fields, do a simple regex search across the parameters
# URLs might be in multiple fields, do a simple regex search across the parameters.
extracted_urls = check_for_links(str(action["parameters"]))
# Remove quoting characters that may have been captured by the regex
# Remove quoting characters that may have been captured by the regex.
action["urls"] = [url.rstrip("',") for url in extracted_urls]
actions.append(action)
# pprint.pprint(actions)
shortcut["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut.pop("created_date")))
shortcut["modified_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(shortcut["modified_date"]))
shortcut["parsed_actions"] = len(actions)

View File

@@ -41,10 +41,12 @@ class SMS(IOSExtraction):
if not self.indicators:
return
for message in self.results:
message_links = check_for_links(message.get("text", ""))
if self.indicators.check_domains(message_links):
self.detected.append(message)
for result in self.results:
message_links = check_for_links(result.get("text", ""))
ioc = self.indicators.check_domains(message_links)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,

View File

@@ -66,6 +66,16 @@ class TCC(IOSExtraction):
"data": msg
}
def check_indicators(self):
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result["client"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def process_db(self, file_path):
conn = sqlite3.connect(file_path)
cur = conn.cursor()

View File

@@ -37,7 +37,9 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.detected = {}
for key, items in self.results.items():
for item in items:
if self.indicators.check_domain(item["registrable_domain"]):
ioc = self.indicators.check_domain(item["registrable_domain"])
if ioc:
item["matched_indicator"] = ioc
if key not in self.detected:
self.detected[key] = [item, ]
else:
@@ -77,7 +79,8 @@ class WebkitResourceLoadStatistics(IOSExtraction):
for backup_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH):
db_path = self._get_backup_file_from_id(backup_file["file_id"])
key = f"{backup_file['domain']}/{WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH}"
self._process_observations_db(db_path=db_path, key=key)
if db_path:
self._process_observations_db(db_path=db_path, key=key)
except Exception as e:
self.log.info("Unable to search for WebKit observations.db: %s", e)
elif self.is_fs_dump:

View File

@@ -66,7 +66,9 @@ class WebkitSessionResourceLog(IOSExtraction):
all_origins = set([entry["origin"]] + source_domains + destination_domains)
if self.indicators.check_domains(all_origins):
ioc = self.indicators.check_domains(all_origins)
if ioc:
entry["matched_indicator"] = ioc
self.detected.append(entry)
redirect_path = ""

View File

@@ -35,6 +35,7 @@ class Whatsapp(IOSExtraction):
links_text = ""
if record["links"]:
links_text = " - Embedded links: " + ", ".join(record["links"])
return {
"timestamp": record.get("isodate"),
"module": self.__class__.__name__,
@@ -46,9 +47,11 @@ class Whatsapp(IOSExtraction):
if not self.indicators:
return
for message in self.results:
if self.indicators.check_domains(message["links"]):
self.detected.append(message)
for result in self.results:
ioc = self.indicators.check_domains(result.get("links", []))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS,
@@ -83,14 +86,15 @@ class Whatsapp(IOSExtraction):
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE")))
message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else ""
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. Check each of them!
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns.
# Check each of them!
message_links = []
fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"]
for field in fields_with_links:
if message.get(field):
message_links.extend(check_for_links(message.get(field, "")))
# Remove WhatsApp internal media URLs
# Remove WhatsApp internal media URLs.
filtered_links = []
for link in message_links:
if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")):

View File

@@ -237,5 +237,7 @@ class NetBase(IOSExtraction):
if not result["proc_id"]:
continue
if self.indicators.check_process(proc_name):
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -234,6 +234,8 @@ IPHONE_IOS_VERSIONS = [
{"build": "19A404", "version": "15.0.2"},
{"build": "19B74", "version": "15.1"},
{"build": "19B81", "version": "15.1.1"},
{"build": "19C56", "version": "15.2"},
{"build": "19C63", "version": "15.2.1"},
]

14
public_indicators.json Normal file
View File

@@ -0,0 +1,14 @@
[
{
"name": "NSO Group Pegasus Indicators of Compromise",
"source": "Amnesty International",
"reference": "https://www.amnesty.org/en/latest/research/2021/07/forensic-methodology-report-how-to-catch-nso-groups-pegasus/",
"stix2_url": "https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-07-18_nso/pegasus.stix2"
},
{
"name": "Cytrox Predator Spyware Indicators of Compromise",
"source": "Meta, Amnesty International, Citizen Lab",
"reference": "https://citizenlab.ca/2021/12/pegasus-vs-predator-dissidents-doubly-infected-iphone-reveals-cytrox-mercenary-spyware/",
"stix2_url": "https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-12-16_cytrox/cytrox.stix2"
}
]

View File

@@ -23,6 +23,7 @@ requires = (
"requests>=2.26.0",
"simplejson>=3.17.5",
"packaging>=21.0",
"appdirs>=1.4.4",
# iOS dependencies:
"iOSbackup>=0.9.921",
# Android dependencies:

0
tests/__init__.py Normal file
View File

1
tests/artifacts/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
test.stix2

View File

@@ -0,0 +1,50 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
from stix2.v21 import Bundle, Indicator, Malware, Relationship
def generate_test_stix_file(file_path):
if os.path.isfile(file_path):
os.remove(file_path)
domains = ["example.org"]
processes = ["Launch"]
emails = ["foobar@example.org"]
filenames = ["/var/foobar/txt"]
res = []
malware = Malware(name="TestMalware", is_family=False, description="")
res.append(malware)
for d in domains:
i = Indicator(indicator_types=["malicious-activity"], pattern="[domain-name:value='{}']".format(d), pattern_type="stix")
res.append(i)
res.append(Relationship(i, "indicates", malware))
for p in processes:
i = Indicator(indicator_types=["malicious-activity"], pattern="[process:name='{}']".format(p), pattern_type="stix")
res.append(i)
res.append(Relationship(i, "indicates", malware))
for f in filenames:
i = Indicator(indicator_types=["malicious-activity"], pattern="[file:name='{}']".format(f), pattern_type="stix")
res.append(i)
res.append(Relationship(i, "indicates", malware))
for e in emails:
i = Indicator(indicator_types=["malicious-activity"], pattern="[email-addr:value='{}']".format(e), pattern_type="stix")
res.append(i)
res.append(Relationship(i, "indicates", malware))
bundle = Bundle(objects=res)
with open(file_path, "w+") as f:
f.write(bundle.serialize(pretty=True))
if __name__ == "__main__":
generate_test_stix_file("test.stix2")
print("test.stix2 file created")

Binary file not shown.

Binary file not shown.

0
tests/common/__init__.py Normal file
View File

View File

@@ -0,0 +1,32 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
import os
from mvt.common.indicators import Indicators
class TestIndicators:
def test_parse_stix2(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)
assert ind.ioc_files[0]["count"] == 4
assert len(ind.ioc_files[0]["domains"]) == 1
assert len(ind.ioc_files[0]["emails"]) == 1
assert len(ind.ioc_files[0]["file_names"]) == 1
assert len(ind.ioc_files[0]["processes"]) == 1
def test_check_domain(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)
assert ind.check_domain("https://www.example.org/foobar")
assert ind.check_domain("http://example.org:8080/toto")
def test_env_stix(self, indicator_file):
os.environ["MVT_STIX2"] = indicator_file
ind = Indicators(log=logging)
ind.load_indicators_files([], load_default=False)
assert ind.total_ioc_count == 4

26
tests/conftest.py Normal file
View File

@@ -0,0 +1,26 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import pytest
from .artifacts.generate_stix import generate_test_stix_file
@pytest.fixture(scope="session", autouse=True)
def indicator_file(request, tmp_path_factory):
indicator_dir = tmp_path_factory.mktemp("indicators")
stix_path = indicator_dir / "indicators.stix2"
generate_test_stix_file(stix_path)
return str(stix_path)
@pytest.fixture(scope="session", autouse=True)
def clean_test_env(request, tmp_path_factory):
try:
del os.environ["MVT_STIX2"]
except KeyError:
pass

0
tests/ios/__init__.py Normal file
View File

View File

@@ -0,0 +1,19 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.module import run_module
from mvt.ios.modules.backup.backup_info import BackupInfo
from ..utils import get_backup_folder
class TestBackupInfoModule:
def test_manifest(self):
m = BackupInfo(base_folder=get_backup_folder(), log=logging)
run_module(m)
assert m.results["Build Version"] == "18C66"
assert m.results["IMEI"] == "42"

View File

@@ -0,0 +1,31 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.mixed.net_datausage import Datausage
from ..utils import get_backup_folder
class TestDatausageModule:
def test_datausage(self):
m = Datausage(base_folder=get_backup_folder(), log=logging, results=[])
run_module(m)
assert len(m.results) == 42
assert len(m.timeline) == 60
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = Datausage(base_folder=get_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
# Adds a file that exists in the manifest.
ind.ioc_files[0]["processes"].append("CumulativeUsageTracker")
m.indicators = ind
run_module(m)
assert len(m.detected) == 2

View File

@@ -0,0 +1,30 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.backup.manifest import Manifest
from ..utils import get_backup_folder
class TestManifestModule:
def test_manifest(self):
m = Manifest(base_folder=get_backup_folder(), log=logging, results=[])
run_module(m)
assert len(m.results) == 3721
assert len(m.timeline) == 5881
assert len(m.detected) == 0
def test_detection(self, indicator_file):
m = Manifest(base_folder=get_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
ind.ioc_files[0]["file_names"].append("com.apple.CoreBrightness.plist")
m.indicators = ind
run_module(m)
assert len(m.detected) == 1

36
tests/ios/test_tcc.py Normal file
View File

@@ -0,0 +1,36 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from mvt.common.indicators import Indicators
from mvt.common.module import run_module
from mvt.ios.modules.mixed.tcc import TCC
from ..utils import get_backup_folder
class TestTCCtModule:
def test_tcc(self):
m = TCC(base_folder=get_backup_folder(), log=logging, results=[])
run_module(m)
assert len(m.results) == 11
assert len(m.timeline) == 11
assert len(m.detected) == 0
assert m.results[0]["service"] == "kTCCServiceUbiquity"
assert m.results[0]["client"] == "com.apple.Preferences"
assert m.results[0]["auth_value"] == "allowed"
def test_tcc_detection(self, indicator_file):
m = TCC(base_folder=get_backup_folder(), log=logging, results=[])
ind = Indicators(log=logging)
ind.parse_stix2(indicator_file)
m.indicators = ind
run_module(m)
assert len(m.results) == 11
assert len(m.timeline) == 11
assert len(m.detected) == 1
assert m.detected[0]["service"] == "kTCCServiceLiverpool"
assert m.detected[0]["client"] == "Launch"

28
tests/utils.py Normal file
View File

@@ -0,0 +1,28 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
def get_artifact(fname):
"""
Return the artifact path in the artifact folder
"""
fpath = os.path.join(get_artifact_folder(), fname)
if os.path.isfile(fpath):
return fpath
return
def get_artifact_folder():
return os.path.join(os.path.dirname(__file__), "artifacts")
def get_backup_folder():
return os.path.join(os.path.dirname(__file__), "artifacts", "ios_backup")
def get_indicator_file():
print("PYTEST env", os.getenv("PYTEST_CURRENT_TEST"))