mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-15 10:02:43 +00:00
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c3acc95e9e | ||
|
|
90d05336da | ||
|
|
5513e6e9e3 | ||
|
|
38116f8405 | ||
|
|
59b069f006 | ||
|
|
28e1348aa7 | ||
|
|
034338d1f4 | ||
|
|
09d5eabf2f | ||
|
|
a425d6c511 | ||
|
|
f8897a4f8c | ||
|
|
86eae68bdb | ||
|
|
d2bf348b03 | ||
|
|
25c6c03075 | ||
|
|
cf88740f6a | ||
|
|
eb4810b0ad | ||
|
|
cce9159eda | ||
|
|
e1211991aa | ||
|
|
8ae9ca328c | ||
|
|
0e2eb51732 | ||
|
|
b35cd4bc73 | ||
|
|
1b4f99a31d | ||
|
|
e4e1716729 | ||
|
|
083bc12351 | ||
|
|
cf6d392460 | ||
|
|
95205d8e17 | ||
|
|
38bb583a9e |
@@ -41,6 +41,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
|
||||
- [Predator from Cytrox](https://citizenlab.ca/2021/12/pegasus-vs-predator-dissidents-doubly-infected-iphone-reveals-cytrox-mercenary-spyware/) ([STIX2](https://raw.githubusercontent.com/AmnestyTech/investigations/master/2021-12-16_cytrox/cytrox.stix2))
|
||||
- [This repository](https://github.com/Te-k/stalkerware-indicators) contains IOCs for Android stalkerware including [a STIX MVT-compatible file](https://raw.githubusercontent.com/Te-k/stalkerware-indicators/master/stalkerware.stix2).
|
||||
|
||||
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`.
|
||||
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators listed [here](https://github.com/mvt-project/mvt/blob/main/public_indicators.json) and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by mvt.
|
||||
|
||||
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.
|
||||
|
||||
@@ -139,7 +139,7 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial):
|
||||
|
||||
m = adb_module(output_folder=output, fast_mode=fast,
|
||||
log=logging.getLogger(adb_module.__module__))
|
||||
if indicators.ioc_count:
|
||||
if indicators.total_ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
if serial:
|
||||
@@ -190,7 +190,7 @@ def check_backup(ctx, iocs, output, backup_path, serial):
|
||||
for module in BACKUP_MODULES:
|
||||
m = module(base_folder=backup_path, output_folder=output,
|
||||
log=logging.getLogger(module.__module__))
|
||||
if indicators.ioc_count:
|
||||
if indicators.total_ioc_count:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
if serial:
|
||||
@@ -199,6 +199,72 @@ def check_backup(ctx, iocs, output, backup_path, serial):
|
||||
run_module(m)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-iocs
|
||||
#==============================================================================
|
||||
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators")
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
all_modules = []
|
||||
for entry in BACKUP_MODULES + ADB_MODULES:
|
||||
if entry not in all_modules:
|
||||
all_modules.append(entry)
|
||||
|
||||
if list_modules:
|
||||
log.info("Following is the list of available check-iocs modules:")
|
||||
for iocs_module in all_modules:
|
||||
log.info(" - %s", iocs_module.__name__)
|
||||
|
||||
return
|
||||
|
||||
log.info("Checking stored results against provided indicators...")
|
||||
|
||||
indicators = Indicators(log=log)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
total_detections = 0
|
||||
for file_name in os.listdir(folder):
|
||||
name_only, ext = os.path.splitext(file_name)
|
||||
file_path = os.path.join(folder, file_name)
|
||||
|
||||
# TODO: Skipping processing of result files that are not json.
|
||||
# We might want to revisit this eventually.
|
||||
if ext != ".json":
|
||||
continue
|
||||
|
||||
for iocs_module in all_modules:
|
||||
if module and iocs_module.__name__ != module:
|
||||
continue
|
||||
|
||||
if iocs_module().get_slug() != name_only:
|
||||
continue
|
||||
|
||||
log.info("Loading results from \"%s\" with module %s", file_name,
|
||||
iocs_module.__name__)
|
||||
|
||||
m = iocs_module.from_json(file_path,
|
||||
log=logging.getLogger(iocs_module.__module__))
|
||||
if indicators.total_ioc_count > 0:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
try:
|
||||
m.check_indicators()
|
||||
except NotImplementedError:
|
||||
continue
|
||||
else:
|
||||
total_detections += len(m.detected)
|
||||
|
||||
if total_detections > 0:
|
||||
log.warning("The check of the results produced %d detections!",
|
||||
total_detections)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: download-iocs
|
||||
#==============================================================================
|
||||
|
||||
@@ -11,14 +11,16 @@ from .dumpsys_packages import DumpsysPackages
|
||||
from .dumpsys_procstats import DumpsysProcstats
|
||||
from .dumpsys_receivers import DumpsysReceivers
|
||||
from .files import Files
|
||||
from .getprop import Getprop
|
||||
from .logcat import Logcat
|
||||
from .packages import Packages
|
||||
from .processes import Processes
|
||||
from .rootbinaries import RootBinaries
|
||||
from .root_binaries import RootBinaries
|
||||
from .settings import Settings
|
||||
from .sms import SMS
|
||||
from .whatsapp import Whatsapp
|
||||
|
||||
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes,
|
||||
ADB_MODULES = [ChromeHistory, SMS, Whatsapp, Processes, Getprop, Settings,
|
||||
DumpsysAccessibility, DumpsysBatterystats, DumpsysProcstats,
|
||||
DumpsysPackages, DumpsysReceivers, DumpsysFull,
|
||||
Packages, RootBinaries, Logcat, Files]
|
||||
|
||||
@@ -15,7 +15,7 @@ from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb
|
||||
from adb_shell.auth.keygen import keygen, write_public_keyfile
|
||||
from adb_shell.auth.sign_pythonrsa import PythonRSASigner
|
||||
from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError,
|
||||
UsbReadFailedError)
|
||||
UsbDeviceNotFoundError, UsbReadFailedError)
|
||||
from usb1 import USBErrorAccess, USBErrorBusy
|
||||
|
||||
from mvt.common.module import InsufficientPrivileges, MVTModule
|
||||
@@ -65,7 +65,11 @@ class AndroidExtraction(MVTModule):
|
||||
# If no serial was specified or if the serial does not seem to be
|
||||
# a HOST:PORT definition, we use the USB transport.
|
||||
if not self.serial or ":" not in self.serial:
|
||||
self.device = AdbDeviceUsb(serial=self.serial)
|
||||
try:
|
||||
self.device = AdbDeviceUsb(serial=self.serial)
|
||||
except UsbDeviceNotFoundError:
|
||||
log.critical("No device found. Make sure it is connected and unlocked.")
|
||||
sys.exit(-1)
|
||||
# Otherwise we try to use the TCP transport.
|
||||
else:
|
||||
addr = self.serial.split(":")
|
||||
|
||||
@@ -9,10 +9,11 @@ from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ACTION_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
|
||||
ACTION_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
|
||||
ACTION_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
|
||||
ACTION_PHONE_STATE = "android.intent.action.PHONE_STATE"
|
||||
INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS"
|
||||
INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED"
|
||||
INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED"
|
||||
INTENT_PHONE_STATE = "android.intent.action.PHONE_STATE"
|
||||
INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL"
|
||||
|
||||
|
||||
class DumpsysReceivers(AndroidExtraction):
|
||||
@@ -24,6 +25,24 @@ class DumpsysReceivers(AndroidExtraction):
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
def check_indicators(self):
|
||||
for result in self.results:
|
||||
if result["activity"] == INTENT_NEW_OUTGOING_SMS:
|
||||
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
|
||||
result["receiver"])
|
||||
elif result["activity"] == INTENT_SMS_RECEIVED:
|
||||
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
|
||||
result["receiver"])
|
||||
elif result["activity"] == INTENT_DATA_SMS_RECEIVED:
|
||||
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
|
||||
result["receiver"])
|
||||
elif result["activity"] == INTENT_PHONE_STATE:
|
||||
self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"",
|
||||
result["receiver"])
|
||||
elif result["activity"] == INTENT_NEW_OUTGOING_CALL:
|
||||
self.log.info("Found a receiver monitoring outgoing calls: \"%s\"",
|
||||
result["receiver"])
|
||||
|
||||
def run(self):
|
||||
self._adb_connect()
|
||||
|
||||
@@ -34,17 +53,20 @@ class DumpsysReceivers(AndroidExtraction):
|
||||
activity = None
|
||||
for line in output.split("\n"):
|
||||
# Find activity block markers.
|
||||
if line.strip().startswith(ACTION_NEW_OUTGOING_SMS):
|
||||
activity = ACTION_NEW_OUTGOING_SMS
|
||||
if line.strip().startswith(INTENT_NEW_OUTGOING_SMS):
|
||||
activity = INTENT_NEW_OUTGOING_SMS
|
||||
continue
|
||||
elif line.strip().startswith(ACTION_SMS_RECEIVED):
|
||||
activity = ACTION_SMS_RECEIVED
|
||||
elif line.strip().startswith(INTENT_SMS_RECEIVED):
|
||||
activity = INTENT_SMS_RECEIVED
|
||||
continue
|
||||
elif line.strip().startswith(ACTION_PHONE_STATE):
|
||||
activity = ACTION_PHONE_STATE
|
||||
elif line.strip().startswith(INTENT_PHONE_STATE):
|
||||
activity = INTENT_PHONE_STATE
|
||||
continue
|
||||
elif line.strip().startswith(ACTION_DATA_SMS_RECEIVED):
|
||||
activity = ACTION_DATA_SMS_RECEIVED
|
||||
elif line.strip().startswith(INTENT_DATA_SMS_RECEIVED):
|
||||
activity = INTENT_DATA_SMS_RECEIVED
|
||||
continue
|
||||
elif line.strip().startswith(INTENT_NEW_OUTGOING_CALL):
|
||||
activity = INTENT_NEW_OUTGOING_CALL
|
||||
continue
|
||||
|
||||
# If we are not in an activity block yet, skip.
|
||||
@@ -65,19 +87,6 @@ class DumpsysReceivers(AndroidExtraction):
|
||||
if package_name == "com.google.android.gms":
|
||||
continue
|
||||
|
||||
if activity == ACTION_NEW_OUTGOING_SMS:
|
||||
self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"",
|
||||
receiver)
|
||||
elif activity == ACTION_SMS_RECEIVED:
|
||||
self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"",
|
||||
receiver)
|
||||
elif activity == ACTION_DATA_SMS_RECEIVED:
|
||||
self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"",
|
||||
receiver)
|
||||
elif activity == ACTION_PHONE_STATE:
|
||||
self.log.info("Found a receiver monitoring telephony state: \"%s\"",
|
||||
receiver)
|
||||
|
||||
self.results.append({
|
||||
"activity": activity,
|
||||
"package_name": package_name,
|
||||
|
||||
@@ -22,30 +22,16 @@ class Files(AndroidExtraction):
|
||||
super().__init__(file_path=file_path, base_folder=base_folder,
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
self.full_find = None
|
||||
self.full_find = False
|
||||
|
||||
def find_path(self, file_path):
|
||||
"""Checks if Android system supports full find command output"""
|
||||
# Check find command params on first run
|
||||
# Run find command with correct args and parse results.
|
||||
|
||||
# Check that full file printf options are suppported on first run.
|
||||
if self.full_find is None:
|
||||
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
if not (output or output.strip().splitlines()):
|
||||
# Full find command failed to generate output, fallback to basic file arguments
|
||||
self.full_find = False
|
||||
else:
|
||||
self.full_find = True
|
||||
|
||||
found_files = []
|
||||
if self.full_find is True:
|
||||
# Run full file command and collect additonal file information.
|
||||
def find_files(self, file_path):
|
||||
if self.full_find:
|
||||
output = self._adb_command(f"find '{file_path}' -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
|
||||
for file_line in output.splitlines():
|
||||
[unix_timestamp, mode, size, owner, group, full_path] = file_line.rstrip().split(" ", 5)
|
||||
mod_time = convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(float(unix_timestamp))))
|
||||
found_files.append({
|
||||
self.results.append({
|
||||
"path": full_path,
|
||||
"modified_time": mod_time,
|
||||
"mode": mode,
|
||||
@@ -56,14 +42,9 @@ class Files(AndroidExtraction):
|
||||
"group": group,
|
||||
})
|
||||
else:
|
||||
# Run a basic listing of file paths.
|
||||
output = self._adb_command(f"find '{file_path}' 2> /dev/null")
|
||||
for file_line in output.splitlines():
|
||||
found_files.append({
|
||||
"path": file_line.rstrip()
|
||||
})
|
||||
|
||||
return found_files
|
||||
self.results.append({"path": file_line.rstrip()})
|
||||
|
||||
def serialize(self, record):
|
||||
if "modified_time" in record:
|
||||
@@ -85,39 +66,32 @@ class Files(AndroidExtraction):
|
||||
def check_indicators(self):
|
||||
"""Check file list for known suspicious files or suspicious properties"""
|
||||
self.check_suspicious()
|
||||
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_file_name(result["path"]):
|
||||
self.log.warning("Found a known suspicous filename at path: \"%s\"", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
if self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning("Found a known suspicous file at path: \"%s\"", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
self._adb_connect()
|
||||
found_file_paths = []
|
||||
|
||||
DATA_PATHS = ["/data/local/tmp/", "/sdcard/", "/tmp/"]
|
||||
for path in DATA_PATHS:
|
||||
file_info = self.find_path(path)
|
||||
found_file_paths.extend(file_info)
|
||||
output = self._adb_command("find '/' -maxdepth 1 -printf '%T@ %m %s %u %g %p\n' 2> /dev/null")
|
||||
if output or output.strip().splitlines():
|
||||
self.full_find = True
|
||||
|
||||
# Store results
|
||||
self.results.extend(found_file_paths)
|
||||
self.log.info("Found %s files in primary Android data directories.", len(found_file_paths))
|
||||
for data_path in ["/data/local/tmp/", "/sdcard/", "/tmp/"]:
|
||||
self.find_files(data_path)
|
||||
|
||||
self.log.info("Found %s files in primary Android data directories", len(self.results))
|
||||
|
||||
if self.fast_mode:
|
||||
self.log.info("Flag --fast was enabled: skipping full file listing")
|
||||
else:
|
||||
self.log.info("Flag --fast was not enabled: processing full file listing. "
|
||||
"This may take a while...")
|
||||
output = self.find_path("/")
|
||||
if output and self.output_folder:
|
||||
self.results.extend(output)
|
||||
log.info("List of visible files stored in files.json")
|
||||
self.log.info("Processing full file listing. This may take a while...")
|
||||
self.find_files("/")
|
||||
self.log.info("Found %s total files", len(self.results))
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
46
mvt/android/modules/adb/getprop.py
Normal file
46
mvt/android/modules/adb/getprop.py
Normal file
@@ -0,0 +1,46 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021 The MVT Project Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Getprop(AndroidExtraction):
|
||||
"""This module extracts device properties from getprop command."""
|
||||
|
||||
def __init__(self, file_path=None, base_folder=None, output_folder=None,
|
||||
serial=None, fast_mode=False, log=None, results=[]):
|
||||
super().__init__(file_path=file_path, base_folder=base_folder,
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self.results = {} if not results else results
|
||||
|
||||
def run(self):
|
||||
self._adb_connect()
|
||||
|
||||
rxp = re.compile("\\[(.+?)\\]: \\[(.+?)\\]")
|
||||
out = self._adb_command("getprop")
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if line == "":
|
||||
continue
|
||||
|
||||
matches = re.findall(rxp, line)
|
||||
if not matches or len(matches[0]) != 2:
|
||||
continue
|
||||
|
||||
key = matches[0][0]
|
||||
value = matches[0][1]
|
||||
self.results[key] = value
|
||||
|
||||
self._adb_disconnect()
|
||||
|
||||
self.log.info("Extracted %d Android system properties", len(self.results))
|
||||
@@ -8,6 +8,9 @@ import os
|
||||
|
||||
import pkg_resources
|
||||
|
||||
from mvt.android.lookups.koodous import koodous_lookup
|
||||
from mvt.android.lookups.virustotal import virustotal_lookup
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -42,9 +45,6 @@ class Packages(AndroidExtraction):
|
||||
return records
|
||||
|
||||
def check_indicators(self):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
root_packages_path = os.path.join("..", "..", "data", "root_packages.txt")
|
||||
root_packages_string = pkg_resources.resource_string(__name__, root_packages_path)
|
||||
root_packages = root_packages_string.decode("utf-8").split("\n")
|
||||
@@ -55,16 +55,19 @@ class Packages(AndroidExtraction):
|
||||
self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"",
|
||||
result["package_name"])
|
||||
self.detected.append(result)
|
||||
if result["package_name"] in self.indicators.ioc_app_ids:
|
||||
self.log.warning("Found a malicious package name: \"%s\"",
|
||||
result["package_name"])
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_app_id(result.get("package_name"))
|
||||
if ioc:
|
||||
result["matched_indicators"] = ioc
|
||||
self.detected.append(result)
|
||||
for file in result["files"]:
|
||||
if file["sha256"] in self.indicators.ioc_files_sha256:
|
||||
self.log.warning("Found a malicious APK: \"%s\" %s",
|
||||
result["package_name"],
|
||||
file["sha256"])
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
for package_file in result["files"]:
|
||||
ioc = self.indicators.check_file_hash(package_file["sha256"])
|
||||
if ioc:
|
||||
result["matched_indicators"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _get_files_for_package(self, package_name):
|
||||
output = self._adb_command(f"pm path {package_name}")
|
||||
@@ -95,6 +98,9 @@ class Packages(AndroidExtraction):
|
||||
self._adb_connect()
|
||||
|
||||
packages = self._adb_command("pm list packages -U -u -i -f")
|
||||
if packages.strip() == "Error: Unknown option: -U":
|
||||
packages = self._adb_command("pm list packages -u -i -f")
|
||||
|
||||
for line in packages.split("\n"):
|
||||
line = line.strip()
|
||||
if not line.startswith("package:"):
|
||||
@@ -154,13 +160,19 @@ class Packages(AndroidExtraction):
|
||||
if result["package_name"] == package_name:
|
||||
self.results[i][cmd["field"]] = True
|
||||
|
||||
packages_to_lookup = []
|
||||
for result in self.results:
|
||||
if result["system"]:
|
||||
continue
|
||||
|
||||
packages_to_lookup.append(result)
|
||||
self.log.info("Found non-system package with name \"%s\" installed by \"%s\" on %s",
|
||||
result["package_name"], result["installer"], result["timestamp"])
|
||||
|
||||
if not self.fast_mode:
|
||||
virustotal_lookup(packages_to_lookup)
|
||||
koodous_lookup(packages_to_lookup)
|
||||
|
||||
self.log.info("Extracted at total of %d installed package names",
|
||||
len(self.results))
|
||||
|
||||
|
||||
106
mvt/android/modules/adb/settings.py
Normal file
106
mvt/android/modules/adb/settings.py
Normal file
@@ -0,0 +1,106 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021 The MVT Project Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
|
||||
from .base import AndroidExtraction
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
ANDROID_DANGEROUS_SETTINGS = [
|
||||
{
|
||||
"description": "disabled Google Play Services apps verification",
|
||||
"key": "verifier_verify_adb_installs",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled Google Play Protect",
|
||||
"key": "package_verifier_enable",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled Google Play Protect",
|
||||
"key": "package_verifier_user_consent",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled Google Play Protect",
|
||||
"key": "upload_apk_enable",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "enabled installation of non-market apps",
|
||||
"key": "install_non_market_apps",
|
||||
"safe_value": "0",
|
||||
},
|
||||
{
|
||||
"description": "disabled confirmation of adb apps installation",
|
||||
"key": "adb_install_need_confirm",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled sharing of security reports",
|
||||
"key": "send_security_reports",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled sharing of crash logs with manufacturer",
|
||||
"key": "samsung_errorlog_agree",
|
||||
"safe_value": "1",
|
||||
},
|
||||
{
|
||||
"description": "disabled applications errors reports",
|
||||
"key": "send_action_app_error",
|
||||
"safe_value": "1",
|
||||
},
|
||||
]
|
||||
|
||||
class Settings(AndroidExtraction):
|
||||
"""This module extracts Android system settings."""
|
||||
|
||||
def __init__(self, file_path=None, base_folder=None, output_folder=None,
|
||||
serial=None, fast_mode=False, log=None, results=[]):
|
||||
super().__init__(file_path=file_path, base_folder=base_folder,
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self.results = {} if not results else results
|
||||
|
||||
def check_indicators(self):
|
||||
for namespace, settings in self.results.items():
|
||||
for key, value in settings.items():
|
||||
for danger in ANDROID_DANGEROUS_SETTINGS:
|
||||
# Check if one of the dangerous settings is using an unsafe
|
||||
# value (different than the one specified).
|
||||
if danger["key"] == key and danger["safe_value"] != value:
|
||||
self.log.warning("Found suspicious setting \"%s = %s\" (%s)",
|
||||
key, value, danger["description"])
|
||||
break
|
||||
|
||||
def run(self):
|
||||
self._adb_connect()
|
||||
|
||||
for namespace in ["system", "secure", "global"]:
|
||||
out = self._adb_command(f"cmd settings list {namespace}")
|
||||
if not out:
|
||||
continue
|
||||
|
||||
self.results[namespace] = {}
|
||||
|
||||
for line in out.splitlines():
|
||||
line = line.strip()
|
||||
if line == "":
|
||||
continue
|
||||
|
||||
fields = line.split("=", 1)
|
||||
try:
|
||||
self.results[namespace][fields[0]] = fields[1]
|
||||
except IndexError:
|
||||
continue
|
||||
|
||||
self._adb_disconnect()
|
||||
@@ -21,23 +21,12 @@ class Indicators:
|
||||
def __init__(self, log=None):
|
||||
self.data_dir = user_data_dir("mvt")
|
||||
self.log = log
|
||||
self.ioc_domains = []
|
||||
self.ioc_processes = []
|
||||
self.ioc_emails = []
|
||||
self.ioc_files = []
|
||||
self.ioc_files_sha256 = []
|
||||
self.ioc_app_ids = []
|
||||
self.ios_profile_ids = []
|
||||
self.ioc_count = 0
|
||||
|
||||
def _add_indicator(self, ioc, iocs_list):
|
||||
if ioc not in iocs_list:
|
||||
iocs_list.append(ioc)
|
||||
self.ioc_count += 1
|
||||
self.total_ioc_count = 0
|
||||
|
||||
def _load_downloaded_indicators(self):
|
||||
if not os.path.isdir(self.data_dir):
|
||||
return False
|
||||
return
|
||||
|
||||
for f in os.listdir(self.data_dir):
|
||||
if f.lower().endswith(".stix2"):
|
||||
@@ -48,7 +37,7 @@ class Indicators:
|
||||
Checks if a variable MVT_STIX2 contains path to STIX Files.
|
||||
"""
|
||||
if "MVT_STIX2" not in os.environ:
|
||||
return False
|
||||
return
|
||||
|
||||
paths = os.environ["MVT_STIX2"].split(":")
|
||||
for path in paths:
|
||||
@@ -57,6 +46,102 @@ class Indicators:
|
||||
else:
|
||||
self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path)
|
||||
|
||||
def _generate_indicators_file(self):
|
||||
return {
|
||||
"name": "",
|
||||
"description": "",
|
||||
"file_name": "",
|
||||
"file_path": "",
|
||||
"domains": [],
|
||||
"processes": [],
|
||||
"emails": [],
|
||||
"file_names": [],
|
||||
"file_paths": [],
|
||||
"files_sha256": [],
|
||||
"app_ids": [],
|
||||
"ios_profile_ids": [],
|
||||
"count": 0,
|
||||
}
|
||||
|
||||
def _add_indicator(self, ioc, ioc_file, iocs_list):
|
||||
if ioc not in iocs_list:
|
||||
iocs_list.append(ioc)
|
||||
ioc_file["count"] += 1
|
||||
self.total_ioc_count += 1
|
||||
|
||||
def parse_stix2(self, file_path):
|
||||
"""Extract indicators from a STIX2 file.
|
||||
|
||||
:param file_path: Path to the STIX2 file to parse
|
||||
:type file_path: str
|
||||
|
||||
"""
|
||||
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
|
||||
|
||||
ioc_file = self._generate_indicators_file()
|
||||
ioc_file["file_path"] = file_path
|
||||
ioc_file["file_name"] = os.path.basename(file_path)
|
||||
|
||||
with open(file_path, "r") as handle:
|
||||
try:
|
||||
data = json.load(handle)
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.")
|
||||
return
|
||||
|
||||
for entry in data.get("objects", []):
|
||||
entry_type = entry.get("type", "")
|
||||
if entry_type == "malware":
|
||||
ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"]
|
||||
ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"]
|
||||
continue
|
||||
|
||||
if entry_type != "indicator":
|
||||
continue
|
||||
|
||||
key, value = entry.get("pattern", "").strip("[]").split("=")
|
||||
value = value.strip("'")
|
||||
|
||||
if key == "domain-name:value":
|
||||
# We force domain names to lower case.
|
||||
self._add_indicator(ioc=value.lower(),
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["domains"])
|
||||
elif key == "process:name":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["processes"])
|
||||
elif key == "email-addr:value":
|
||||
# We force email addresses to lower case.
|
||||
self._add_indicator(ioc=value.lower(),
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["emails"])
|
||||
elif key == "file:name":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["file_names"])
|
||||
elif key == "file:path":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["file_paths"])
|
||||
elif key == "file:hashes.sha256":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["files_sha256"])
|
||||
elif key == "app:id":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["app_ids"])
|
||||
elif key == "configuration-profile:id":
|
||||
self._add_indicator(ioc=value,
|
||||
ioc_file=ioc_file,
|
||||
iocs_list=ioc_file["ios_profile_ids"])
|
||||
|
||||
self.log.info("Loaded %d indicators from \"%s\" indicators file",
|
||||
ioc_file["count"], ioc_file["name"])
|
||||
|
||||
self.ioc_files.append(ioc_file)
|
||||
|
||||
def load_indicators_files(self, files, load_default=True):
|
||||
"""
|
||||
Load a list of indicators files.
|
||||
@@ -67,71 +152,34 @@ class Indicators:
|
||||
else:
|
||||
self.log.warning("This indicators file %s does not exist", file_path)
|
||||
|
||||
# Load downloaded indicators and any indicators from env variable
|
||||
# Load downloaded indicators and any indicators from env variable.
|
||||
if load_default:
|
||||
self._load_downloaded_indicators()
|
||||
|
||||
self._check_stix2_env_variable()
|
||||
self.log.info("Loaded a total of %d unique indicators", self.ioc_count)
|
||||
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
|
||||
|
||||
def parse_stix2(self, file_path):
|
||||
"""Extract indicators from a STIX2 file.
|
||||
def get_iocs(self, ioc_type):
|
||||
for ioc_file in self.ioc_files:
|
||||
for ioc in ioc_file.get(ioc_type, []):
|
||||
yield {
|
||||
"value": ioc,
|
||||
"type": ioc_type,
|
||||
"name": ioc_file["name"]
|
||||
}
|
||||
|
||||
:param file_path: Path to the STIX2 file to parse
|
||||
:type file_path: str
|
||||
|
||||
"""
|
||||
self.log.info("Parsing STIX2 indicators file at path %s", file_path)
|
||||
with open(file_path, "r") as handle:
|
||||
try:
|
||||
data = json.load(handle)
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.")
|
||||
return
|
||||
|
||||
for entry in data.get("objects", []):
|
||||
if entry.get("type", "") != "indicator":
|
||||
continue
|
||||
|
||||
key, value = entry.get("pattern", "").strip("[]").split("=")
|
||||
value = value.strip("'")
|
||||
|
||||
if key == "domain-name:value":
|
||||
# We force domain names to lower case.
|
||||
self._add_indicator(ioc=value.lower(),
|
||||
iocs_list=self.ioc_domains)
|
||||
elif key == "process:name":
|
||||
self._add_indicator(ioc=value,
|
||||
iocs_list=self.ioc_processes)
|
||||
elif key == "email-addr:value":
|
||||
# We force email addresses to lower case.
|
||||
self._add_indicator(ioc=value.lower(),
|
||||
iocs_list=self.ioc_emails)
|
||||
elif key == "file:name":
|
||||
self._add_indicator(ioc=value,
|
||||
iocs_list=self.ioc_files)
|
||||
elif key == "app:id":
|
||||
self._add_indicator(ioc=value,
|
||||
iocs_list=self.ioc_app_ids)
|
||||
elif key == "configuration-profile:id":
|
||||
self._add_indicator(ioc=value,
|
||||
iocs_list=self.ios_profile_ids)
|
||||
elif key == "file:hashes.sha256":
|
||||
self._add_indicator(ioc=value,
|
||||
iocs_list=self.ioc_files_sha256)
|
||||
|
||||
def check_domain(self, url) -> bool:
|
||||
def check_domain(self, url):
|
||||
"""Check if a given URL matches any of the provided domain indicators.
|
||||
|
||||
:param url: URL to match against domain indicators
|
||||
:type url: str
|
||||
:returns: True if the URL matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
# TODO: If the IOC domain contains a subdomain, it is not currently
|
||||
# being matched.
|
||||
# being matched.
|
||||
if not url:
|
||||
return False
|
||||
return None
|
||||
|
||||
try:
|
||||
# First we use the provided URL.
|
||||
@@ -159,174 +207,206 @@ class Indicators:
|
||||
except Exception:
|
||||
# If URL parsing failed, we just try to do a simple substring
|
||||
# match.
|
||||
for ioc in self.ioc_domains:
|
||||
if ioc.lower() in url:
|
||||
self.log.warning("Maybe found a known suspicious domain: %s", url)
|
||||
return True
|
||||
for ioc in self.get_iocs("domains"):
|
||||
if ioc["value"].lower() in url:
|
||||
self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"",
|
||||
url, ioc["name"])
|
||||
return ioc
|
||||
|
||||
# If nothing matched, we can quit here.
|
||||
return False
|
||||
return None
|
||||
|
||||
# If all parsing worked, we start walking through available domain indicators.
|
||||
for ioc in self.ioc_domains:
|
||||
for ioc in self.get_iocs("domains"):
|
||||
# First we check the full domain.
|
||||
if final_url.domain.lower() == ioc:
|
||||
if final_url.domain.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
self.log.warning("Found a known suspicious domain %s shortened as %s",
|
||||
final_url.url, orig_url.url)
|
||||
self.log.warning("Found a known suspicious domain %s shortened as %s matching indicators from \"%s\"",
|
||||
final_url.url, orig_url.url, ioc["name"])
|
||||
else:
|
||||
self.log.warning("Found a known suspicious domain: %s", final_url.url)
|
||||
self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"",
|
||||
final_url.url, ioc["name"])
|
||||
|
||||
return True
|
||||
return ioc
|
||||
|
||||
# Then we just check the top level domain.
|
||||
if final_url.top_level.lower() == ioc:
|
||||
if final_url.top_level.lower() == ioc["value"]:
|
||||
if orig_url.is_shortened and orig_url.url != final_url.url:
|
||||
self.log.warning("Found a sub-domain matching a known suspicious top level %s shortened as %s",
|
||||
final_url.url, orig_url.url)
|
||||
self.log.warning("Found a sub-domain with suspicious top level %s shortened as %s matching indicators from \"%s\"",
|
||||
final_url.url, orig_url.url, ioc["name"])
|
||||
else:
|
||||
self.log.warning("Found a sub-domain matching a known suspicious top level: %s", final_url.url)
|
||||
self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"",
|
||||
final_url.url, ioc["name"])
|
||||
|
||||
return True
|
||||
return ioc
|
||||
|
||||
return False
|
||||
|
||||
def check_domains(self, urls) -> bool:
|
||||
def check_domains(self, urls):
|
||||
"""Check a list of URLs against the provided list of domain indicators.
|
||||
|
||||
:param urls: List of URLs to check against domain indicators
|
||||
:type urls: list
|
||||
:returns: True if any URL matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not urls:
|
||||
return False
|
||||
return None
|
||||
|
||||
for url in urls:
|
||||
if self.check_domain(url):
|
||||
return True
|
||||
check = self.check_domain(url)
|
||||
if check:
|
||||
return check
|
||||
|
||||
return False
|
||||
|
||||
def check_process(self, process) -> bool:
|
||||
def check_process(self, process):
|
||||
"""Check the provided process name against the list of process
|
||||
indicators.
|
||||
|
||||
:param process: Process name to check against process indicators
|
||||
:type process: str
|
||||
:returns: True if process matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not process:
|
||||
return False
|
||||
return None
|
||||
|
||||
proc_name = os.path.basename(process)
|
||||
if proc_name in self.ioc_processes:
|
||||
self.log.warning("Found a known suspicious process name \"%s\"", process)
|
||||
return True
|
||||
for ioc in self.get_iocs("processes"):
|
||||
if proc_name == ioc["value"]:
|
||||
self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"",
|
||||
process, ioc["name"])
|
||||
return ioc
|
||||
|
||||
if len(proc_name) == 16:
|
||||
for bad_proc in self.ioc_processes:
|
||||
if bad_proc.startswith(proc_name):
|
||||
self.log.warning("Found a truncated known suspicious process name \"%s\"", process)
|
||||
return True
|
||||
if len(proc_name) == 16:
|
||||
if ioc["value"].startswith(proc_name):
|
||||
self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"",
|
||||
process, ioc["name"])
|
||||
return ioc
|
||||
|
||||
return False
|
||||
|
||||
def check_processes(self, processes) -> bool:
|
||||
def check_processes(self, processes):
|
||||
"""Check the provided list of processes against the list of
|
||||
process indicators.
|
||||
|
||||
:param processes: List of processes to check against process indicators
|
||||
:type processes: list
|
||||
:returns: True if process matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not processes:
|
||||
return False
|
||||
return None
|
||||
|
||||
for process in processes:
|
||||
if self.check_process(process):
|
||||
return True
|
||||
check = self.check_process(process)
|
||||
if check:
|
||||
return check
|
||||
|
||||
return False
|
||||
|
||||
def check_email(self, email) -> bool:
|
||||
def check_email(self, email):
|
||||
"""Check the provided email against the list of email indicators.
|
||||
|
||||
:param email: Email address to check against email indicators
|
||||
:type email: str
|
||||
:returns: True if email address matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not email:
|
||||
return False
|
||||
return None
|
||||
|
||||
if email.lower() in self.ioc_emails:
|
||||
self.log.warning("Found a known suspicious email address: \"%s\"", email)
|
||||
return True
|
||||
for ioc in self.get_iocs("emails"):
|
||||
if email.lower() == ioc["value"].lower():
|
||||
self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"",
|
||||
email, ioc["name"])
|
||||
return ioc
|
||||
|
||||
return False
|
||||
def check_file_name(self, file_name):
|
||||
"""Check the provided file name against the list of file indicators.
|
||||
|
||||
def check_file_name(self, file_path) -> bool:
|
||||
"""Check the provided file path against the list of file indicators.
|
||||
:param file_name: File name to check against file
|
||||
indicators
|
||||
:type file_name: str
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not file_name:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("file_names"):
|
||||
if ioc["value"] == file_name:
|
||||
self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"",
|
||||
file_name, ioc["name"])
|
||||
return ioc
|
||||
|
||||
def check_file_path(self, file_path):
|
||||
"""Check the provided file path against the list of file indicators (both path and name).
|
||||
|
||||
:param file_path: File path or file name to check against file
|
||||
indicators
|
||||
:type file_path: str
|
||||
:returns: True if the file path matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not file_path:
|
||||
return False
|
||||
return None
|
||||
|
||||
file_name = os.path.basename(file_path)
|
||||
if file_name in self.ioc_files:
|
||||
return True
|
||||
ioc = self.check_file_name(os.path.basename(file_path))
|
||||
if ioc:
|
||||
return ioc
|
||||
|
||||
return False
|
||||
|
||||
# TODO: The difference between check_file_name() and check_file_path()
|
||||
# needs to be more explicit and clear. Probably, the two should just
|
||||
# be combined into one function.
|
||||
def check_file_path(self, file_path) -> bool:
|
||||
"""Check the provided file path against the list of file indicators.
|
||||
|
||||
:param file_path: File path or file name to check against file
|
||||
indicators
|
||||
:type file_path: str
|
||||
:returns: True if the file path matched an indicator, otherwise False
|
||||
:rtype: bool
|
||||
|
||||
"""
|
||||
if not file_path:
|
||||
return False
|
||||
|
||||
for ioc_file in self.ioc_files:
|
||||
for ioc in self.get_iocs("file_paths"):
|
||||
# Strip any trailing slash from indicator paths to match directories.
|
||||
if file_path.startswith(ioc_file.rstrip("/")):
|
||||
return True
|
||||
if file_path.startswith(ioc["value"].rstrip("/")):
|
||||
self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"",
|
||||
file_path, ioc["name"])
|
||||
return ioc
|
||||
|
||||
return False
|
||||
|
||||
def check_profile(self, profile_uuid) -> bool:
|
||||
def check_profile(self, profile_uuid):
|
||||
"""Check the provided configuration profile UUID against the list of indicators.
|
||||
|
||||
:param profile_uuid: Profile UUID to check against configuration profile indicators
|
||||
:type profile_uuid: str
|
||||
:returns: True if the UUID in indicator list, otherwise False
|
||||
:rtype: bool
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if profile_uuid in self.ios_profile_ids:
|
||||
return True
|
||||
if not profile_uuid:
|
||||
return None
|
||||
|
||||
return False
|
||||
for ioc in self.get_iocs("ios_profile_ids"):
|
||||
if profile_uuid in ioc["value"]:
|
||||
self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"",
|
||||
profile_uuid, ioc["name"])
|
||||
return ioc
|
||||
|
||||
def check_file_hash(self, file_hash):
|
||||
"""Check the provided SHA256 file hash against the list of indicators.
|
||||
|
||||
:param file_hash: SHA256 hash to check
|
||||
:type file_hash: str
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not file_hash:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("files_sha256"):
|
||||
if file_hash.lower() == ioc["value"].lower():
|
||||
self.log.warning("Found a known suspicious file with hash \"%s\" matching indicators from \"%s\"",
|
||||
file_hash, ioc["name"])
|
||||
return ioc
|
||||
|
||||
def check_app_id(self, app_id):
|
||||
"""Check the provided app identifier (typically an Android package name)
|
||||
against the list of indicators.
|
||||
|
||||
:param app_id: App ID to check against the list of indicators
|
||||
:type app_id: str
|
||||
:returns: Indicator details if matched, otherwise None
|
||||
|
||||
"""
|
||||
if not app_id:
|
||||
return None
|
||||
|
||||
for ioc in self.get_iocs("app_ids"):
|
||||
if app_id.lower() == ioc["value"].lower():
|
||||
self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"",
|
||||
app_id, ioc["name"])
|
||||
return ioc
|
||||
|
||||
|
||||
def download_indicators_files(log):
|
||||
@@ -358,4 +438,5 @@ def download_indicators_files(log):
|
||||
# Write file to disk. This will overwrite any older version of the STIX2 file.
|
||||
with io.open(ioc_path, "w") as f:
|
||||
f.write(res.text)
|
||||
|
||||
log.info("Saved indicator file to '%s'", os.path.basename(ioc_path))
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import requests
|
||||
from packaging import version
|
||||
|
||||
MVT_VERSION = "1.4.4"
|
||||
MVT_VERSION = "1.4.8"
|
||||
|
||||
|
||||
def check_for_updates():
|
||||
|
||||
@@ -168,7 +168,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
|
||||
m = backup_module(base_folder=backup_path, output_folder=output, fast_mode=fast,
|
||||
log=logging.getLogger(backup_module.__module__))
|
||||
m.is_backup = True
|
||||
if indicators.ioc_count:
|
||||
if indicators.total_ioc_count > 0:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
@@ -182,6 +182,10 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
|
||||
if len(timeline_detected) > 0:
|
||||
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
|
||||
|
||||
if len(timeline_detected) > 0:
|
||||
log.warning("The analysis of the backup produced %d detections!",
|
||||
len(timeline_detected))
|
||||
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-fs
|
||||
@@ -225,7 +229,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
|
||||
log=logging.getLogger(fs_module.__module__))
|
||||
|
||||
m.is_fs_dump = True
|
||||
if indicators.ioc_count:
|
||||
if indicators.total_ioc_count > 0:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
@@ -239,20 +243,23 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
|
||||
if len(timeline_detected) > 0:
|
||||
save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv"))
|
||||
|
||||
if len(timeline_detected) > 0:
|
||||
log.warning("The analysis of the filesystem produced %d detections!",
|
||||
len(timeline_detected))
|
||||
|
||||
#==============================================================================
|
||||
# Command: check-iocs
|
||||
#==============================================================================
|
||||
@cli.command("check-iocs", help="Compare stored JSON results to provided indicators")
|
||||
@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True,
|
||||
default=[], required=True, help=HELP_MSG_IOC)
|
||||
default=[], help=HELP_MSG_IOC)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
all_modules = []
|
||||
for entry in BACKUP_MODULES + FS_MODULES:
|
||||
for entry in BACKUP_MODULES + FS_MODULES + MIXED_MODULES:
|
||||
if entry not in all_modules:
|
||||
all_modules.append(entry)
|
||||
|
||||
@@ -268,6 +275,7 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
indicators = Indicators(log=log)
|
||||
indicators.load_indicators_files(iocs)
|
||||
|
||||
total_detections = 0
|
||||
for file_name in os.listdir(folder):
|
||||
name_only, ext = os.path.splitext(file_name)
|
||||
file_path = os.path.join(folder, file_name)
|
||||
@@ -284,7 +292,7 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
|
||||
m = iocs_module.from_json(file_path,
|
||||
log=logging.getLogger(iocs_module.__module__))
|
||||
if indicators.ioc_count:
|
||||
if indicators.total_ioc_count > 0:
|
||||
m.indicators = indicators
|
||||
m.indicators.log = m.log
|
||||
|
||||
@@ -292,6 +300,12 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
m.check_indicators()
|
||||
except NotImplementedError:
|
||||
continue
|
||||
else:
|
||||
total_detections += len(m.detected)
|
||||
|
||||
if total_detections > 0:
|
||||
log.warning("The check of the results produced %d detections!",
|
||||
total_detections)
|
||||
|
||||
|
||||
#==============================================================================
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021 The MVT Project Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
@@ -46,8 +45,10 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
payload_content = result["plist"]["PayloadContent"][0]
|
||||
|
||||
# Alert on any known malicious configuration profiles in the indicator list.
|
||||
if self.indicators.check_profile(result["plist"]["PayloadUUID"]):
|
||||
ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
|
||||
if ioc:
|
||||
self.log.warning(f"Found a known malicious configuration profile \"{result['plist']['PayloadDisplayName']}\" with UUID '{result['plist']['PayloadUUID']}'.")
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
@@ -76,6 +77,9 @@ class ConfigurationProfiles(IOSExtraction):
|
||||
|
||||
if "SignerCerts" in conf_plist:
|
||||
conf_plist["SignerCerts"] = [b64encode(x) for x in conf_plist["SignerCerts"]]
|
||||
if "OTAProfileStub" in conf_plist:
|
||||
if "SignerCerts" in conf_plist["OTAProfileStub"]:
|
||||
conf_plist["OTAProfileStub"]["SignerCerts"] = [b64encode(x) for x in conf_plist["OTAProfileStub"]["SignerCerts"]]
|
||||
if "PushTokenDataSentToServerKey" in conf_plist:
|
||||
conf_plist["PushTokenDataSentToServerKey"] = b64encode(conf_plist["PushTokenDataSentToServerKey"])
|
||||
if "LastPushTokenHash" in conf_plist:
|
||||
|
||||
@@ -72,9 +72,7 @@ class Manifest(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if "relative_path" not in result:
|
||||
continue
|
||||
if not result["relative_path"]:
|
||||
if not result.get("relative_path"):
|
||||
continue
|
||||
|
||||
if result["domain"]:
|
||||
@@ -83,16 +81,15 @@ class Manifest(IOSExtraction):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if self.indicators.check_file_name(result["relative_path"]):
|
||||
self.log.warning("Found a known malicious file at path: %s", result["relative_path"])
|
||||
if self.indicators.check_file_path("/" + result["relative_path"]):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
relPath = result["relative_path"].lower()
|
||||
for ioc in self.indicators.ioc_domains:
|
||||
if ioc.lower() in relPath:
|
||||
rel_path = result["relative_path"].lower()
|
||||
for ioc in self.indicators.get_iocs("domains"):
|
||||
if ioc["value"].lower() in rel_path:
|
||||
self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s",
|
||||
ioc, relPath)
|
||||
ioc["value"], rel_path)
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -37,20 +37,24 @@ class Analytics(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
for ioc in self.indicators.ioc_processes:
|
||||
for key in result.keys():
|
||||
if ioc == result[key]:
|
||||
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
|
||||
ioc, result["artifact"], result["timestamp"])
|
||||
self.detected.append(result)
|
||||
break
|
||||
for ioc in self.indicators.ioc_domains:
|
||||
for key in result.keys():
|
||||
if ioc in str(result[key]):
|
||||
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
|
||||
ioc, result["artifact"], result["timestamp"])
|
||||
self.detected.append(result)
|
||||
break
|
||||
for value in result.values():
|
||||
if not isinstance(value, str):
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_process(value)
|
||||
if ioc:
|
||||
self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s",
|
||||
value, result["artifact"], result["timestamp"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_domain(value)
|
||||
if ioc:
|
||||
self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s",
|
||||
value, result["artifact"], result["timestamp"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _extract_analytics_data(self):
|
||||
artifact = self.file_path.split("/")[-1]
|
||||
@@ -101,6 +105,7 @@ class Analytics(IOSExtraction):
|
||||
timestamp = ""
|
||||
data = plistlib.loads(row[1])
|
||||
data["timestamp"] = timestamp
|
||||
|
||||
data["artifact"] = artifact
|
||||
|
||||
self.results.append(data)
|
||||
|
||||
@@ -34,13 +34,15 @@ class CacheFiles(IOSExtraction):
|
||||
return
|
||||
|
||||
self.detected = {}
|
||||
for key, items in self.results.items():
|
||||
for item in items:
|
||||
if self.indicators.check_domain(item["url"]):
|
||||
for key, values in self.results.items():
|
||||
for value in values:
|
||||
ioc = self.indicators.check_domain(value["url"])
|
||||
if ioc:
|
||||
value["matched_indicator"] = ioc
|
||||
if key not in self.detected:
|
||||
self.detected[key] = [item, ]
|
||||
self.detected[key] = [value, ]
|
||||
else:
|
||||
self.detected[key].append(item)
|
||||
self.detected[key].append(value)
|
||||
|
||||
def _process_cache_file(self, file_path):
|
||||
self.log.info("Processing cache file at path: %s", file_path)
|
||||
|
||||
@@ -37,23 +37,25 @@ class Filesystem(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_file(result["path"]):
|
||||
self.log.warning("Found a known malicious file name at path: %s", result["path"])
|
||||
if "path" not in result:
|
||||
continue
|
||||
|
||||
ioc = self.indicators.check_file_path(result["path"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
if self.indicators.check_file_path(result["path"]):
|
||||
self.log.warning("Found a known malicious file path at path: %s", result["path"])
|
||||
self.detected.append(result)
|
||||
|
||||
# If we are instructed to run fast, we skip this.
|
||||
# If we are instructed to run fast, we skip the rest.
|
||||
if self.fast_mode:
|
||||
self.log.info("Flag --fast was enabled: skipping extended search for suspicious files/processes")
|
||||
else:
|
||||
for ioc in self.indicators.ioc_processes:
|
||||
parts = result["path"].split("/")
|
||||
if ioc in parts:
|
||||
self.log.warning("Found a known malicious file/process at path: %s", result["path"])
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
for ioc in self.indicators.get_iocs("processes"):
|
||||
parts = result["path"].split("/")
|
||||
if ioc["value"] in parts:
|
||||
self.log.warning("Found known suspicious process name mentioned in file at path \"%s\" matching indicators from \"%s\"",
|
||||
result["path"], ioc["name"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
for root, dirs, files in os.walk(self.base_folder):
|
||||
|
||||
@@ -37,7 +37,12 @@ class SafariFavicon(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]):
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if not ioc:
|
||||
ioc = self.indicators.check_domain(result["icon_url"])
|
||||
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _process_favicon_db(self, file_path):
|
||||
|
||||
@@ -34,12 +34,20 @@ class ShutdownLog(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
for ioc in self.indicators.ioc_processes:
|
||||
ioc = self.indicators.check_file_path(result["client"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
for ioc in self.indicators.get_iocs("processes"):
|
||||
parts = result["client"].split("/")
|
||||
if ioc in parts:
|
||||
self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log",
|
||||
ioc)
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def process_shutdownlog(self, content):
|
||||
current_processes = []
|
||||
|
||||
@@ -18,9 +18,11 @@ class WebkitBase(IOSExtraction):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for item in self.results:
|
||||
if self.indicators.check_domain(item["url"]):
|
||||
self.detected.append(item)
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _process_webkit_folder(self, root_paths):
|
||||
for found_path in self._get_fs_files_from_patterns(root_paths):
|
||||
|
||||
@@ -42,7 +42,12 @@ class ChromeFavicon(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]):
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if not ioc:
|
||||
ioc = self.indicators.check_domain(result["icon_url"])
|
||||
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -41,7 +41,9 @@ class ChromeHistory(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_domain(result["url"]):
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -40,8 +40,12 @@ class FirefoxFavicon(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if (self.indicators.check_domain(result.get("url", "")) or
|
||||
self.indicators.check_domain(result.get("history_url", ""))):
|
||||
ioc = self.indicators.check_domain(result.get("url", ""))
|
||||
if not ioc:
|
||||
ioc = self.indicators.check_domain(result.get("history_url", ""))
|
||||
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -44,7 +44,9 @@ class FirefoxHistory(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_domain(result["url"]):
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -43,7 +43,9 @@ class IDStatusCache(IOSExtraction):
|
||||
for result in self.results:
|
||||
if result.get("user", "").startswith("mailto:"):
|
||||
email = result["user"][7:].strip("'")
|
||||
if self.indicators.check_email(email):
|
||||
ioc = self.indicators.check_email(email)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
|
||||
@@ -41,13 +41,13 @@ class LocationdClients(IOSExtraction):
|
||||
|
||||
def serialize(self, record):
|
||||
records = []
|
||||
for ts in self.timestamps:
|
||||
if ts in record.keys():
|
||||
for timestamp in self.timestamps:
|
||||
if timestamp in record.keys():
|
||||
records.append({
|
||||
"timestamp": record[ts],
|
||||
"timestamp": record[timestamp],
|
||||
"module": self.__class__.__name__,
|
||||
"event": ts,
|
||||
"data": f"{ts} from {record['package']}"
|
||||
"event": timestamp,
|
||||
"data": f"{timestamp} from {record['package']}"
|
||||
})
|
||||
|
||||
return records
|
||||
@@ -60,8 +60,40 @@ class LocationdClients(IOSExtraction):
|
||||
parts = result["package"].split("/")
|
||||
proc_name = parts[len(parts)-1]
|
||||
|
||||
if self.indicators.check_process(proc_name):
|
||||
ioc = self.indicators.check_process(proc_name)
|
||||
if ioc:
|
||||
self.log.warning("Found a suspicious process name in LocationD entry %s",
|
||||
result["package"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if "BundlePath" in result:
|
||||
ioc = self.indicators.check_file_path(result["BundlePath"])
|
||||
if ioc:
|
||||
self.log.warning("Found a suspicious file path in Location D: %s",
|
||||
result["BundlePath"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if "Executable" in result:
|
||||
ioc = self.indicators.check_file_path(result["Executable"])
|
||||
if ioc:
|
||||
self.log.warning("Found a suspicious file path in Location D: %s",
|
||||
result["Executable"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if "Registered" in result:
|
||||
ioc = self.indicators.check_file_path(result["Registered"])
|
||||
if ioc:
|
||||
self.log.warning("Found a suspicious file path in Location D: %s",
|
||||
result["Registered"])
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
def _extract_locationd_entries(self, file_path):
|
||||
with open(file_path, "rb") as handle:
|
||||
|
||||
@@ -41,7 +41,9 @@ class OSAnalyticsADDaily(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_process(result["package"]):
|
||||
ioc = self.indicators.check_process(result["package"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
|
||||
@@ -44,18 +44,25 @@ class SafariBrowserState(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if "tab_url" in result and self.indicators.check_domain(result["tab_url"]):
|
||||
self.detected.append(result)
|
||||
continue
|
||||
if "tab_url" in result:
|
||||
ioc = self.indicators.check_domain(result["tab_url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
continue
|
||||
|
||||
if "session_data" not in result:
|
||||
continue
|
||||
|
||||
for session_entry in result["session_data"]:
|
||||
if "entry_url" in session_entry and self.indicators.check_domain(session_entry["entry_url"]):
|
||||
self.detected.append(result)
|
||||
if "entry_url" in session_entry:
|
||||
ioc = self.indicators.check_domain(session_entry["entry_url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _process_browser_state_db(self, db_path):
|
||||
self._recover_sqlite_db_if_needed(db_path)
|
||||
conn = sqlite3.connect(db_path)
|
||||
|
||||
cur = conn.cursor()
|
||||
@@ -86,8 +93,12 @@ class SafariBrowserState(IOSExtraction):
|
||||
if row[4]:
|
||||
# Skip a 4 byte header before the plist content.
|
||||
session_plist = row[4][4:]
|
||||
session_data = plistlib.load(io.BytesIO(session_plist))
|
||||
session_data = keys_bytes_to_string(session_data)
|
||||
session_data = {}
|
||||
try:
|
||||
session_data = plistlib.load(io.BytesIO(session_plist))
|
||||
session_data = keys_bytes_to_string(session_data)
|
||||
except plistlib.InvalidFileException:
|
||||
pass
|
||||
|
||||
if "SessionHistoryEntries" in session_data.get("SessionHistory", {}):
|
||||
for session_entry in session_data["SessionHistory"].get("SessionHistoryEntries"):
|
||||
@@ -108,7 +119,6 @@ class SafariBrowserState(IOSExtraction):
|
||||
})
|
||||
|
||||
def run(self):
|
||||
|
||||
if self.is_backup:
|
||||
for backup_file in self._get_backup_files_from_manifest(relative_path=SAFARI_BROWSER_STATE_BACKUP_RELPATH):
|
||||
self.file_path = self._get_backup_file_from_id(backup_file["file_id"])
|
||||
|
||||
@@ -80,7 +80,9 @@ class SafariHistory(IOSExtraction):
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
if self.indicators.check_domain(result["url"]):
|
||||
ioc = self.indicators.check_domain(result["url"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def _process_history_db(self, history_path):
|
||||
|
||||
@@ -54,9 +54,11 @@ class Shortcuts(IOSExtraction):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for action in self.results:
|
||||
if self.indicators.check_domains(action["action_urls"]):
|
||||
self.detected.append(action)
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_domains(result["action_urls"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
self._find_ios_database(backup_ids=SHORTCUT_BACKUP_IDS,
|
||||
|
||||
@@ -41,10 +41,12 @@ class SMS(IOSExtraction):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for message in self.results:
|
||||
message_links = check_for_links(message.get("text", ""))
|
||||
if self.indicators.check_domains(message_links):
|
||||
self.detected.append(message)
|
||||
for result in self.results:
|
||||
message_links = check_for_links(result.get("text", ""))
|
||||
ioc = self.indicators.check_domains(message_links)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,
|
||||
|
||||
@@ -66,6 +66,16 @@ class TCC(IOSExtraction):
|
||||
"data": msg
|
||||
}
|
||||
|
||||
def check_indicators(self):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_process(result["client"])
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def process_db(self, file_path):
|
||||
conn = sqlite3.connect(file_path)
|
||||
cur = conn.cursor()
|
||||
|
||||
@@ -28,7 +28,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self.results = {}
|
||||
self.results = {} if not results else results
|
||||
|
||||
def check_indicators(self):
|
||||
if not self.indicators:
|
||||
@@ -37,7 +37,9 @@ class WebkitResourceLoadStatistics(IOSExtraction):
|
||||
self.detected = {}
|
||||
for key, items in self.results.items():
|
||||
for item in items:
|
||||
if self.indicators.check_domain(item["registrable_domain"]):
|
||||
ioc = self.indicators.check_domain(item["registrable_domain"])
|
||||
if ioc:
|
||||
item["matched_indicator"] = ioc
|
||||
if key not in self.detected:
|
||||
self.detected[key] = [item, ]
|
||||
else:
|
||||
|
||||
@@ -35,7 +35,7 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
output_folder=output_folder, fast_mode=fast_mode,
|
||||
log=log, results=results)
|
||||
|
||||
self.results = {}
|
||||
self.results = {} if not results else results
|
||||
|
||||
@staticmethod
|
||||
def _extract_domains(entries):
|
||||
@@ -66,7 +66,9 @@ class WebkitSessionResourceLog(IOSExtraction):
|
||||
|
||||
all_origins = set([entry["origin"]] + source_domains + destination_domains)
|
||||
|
||||
if self.indicators.check_domains(all_origins):
|
||||
ioc = self.indicators.check_domains(all_origins)
|
||||
if ioc:
|
||||
entry["matched_indicator"] = ioc
|
||||
self.detected.append(entry)
|
||||
|
||||
redirect_path = ""
|
||||
|
||||
@@ -35,6 +35,7 @@ class Whatsapp(IOSExtraction):
|
||||
links_text = ""
|
||||
if record["links"]:
|
||||
links_text = " - Embedded links: " + ", ".join(record["links"])
|
||||
|
||||
return {
|
||||
"timestamp": record.get("isodate"),
|
||||
"module": self.__class__.__name__,
|
||||
@@ -46,9 +47,11 @@ class Whatsapp(IOSExtraction):
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for message in self.results:
|
||||
if self.indicators.check_domains(message["links"]):
|
||||
self.detected.append(message)
|
||||
for result in self.results:
|
||||
ioc = self.indicators.check_domains(result.get("links", []))
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
def run(self):
|
||||
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS,
|
||||
@@ -83,14 +86,15 @@ class Whatsapp(IOSExtraction):
|
||||
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE")))
|
||||
message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else ""
|
||||
|
||||
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. Check each of them!
|
||||
# Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns.
|
||||
# Check each of them!
|
||||
message_links = []
|
||||
fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"]
|
||||
for field in fields_with_links:
|
||||
if message.get(field):
|
||||
message_links.extend(check_for_links(message.get(field, "")))
|
||||
|
||||
# Remove WhatsApp internal media URLs
|
||||
# Remove WhatsApp internal media URLs.
|
||||
filtered_links = []
|
||||
for link in message_links:
|
||||
if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")):
|
||||
|
||||
@@ -237,5 +237,7 @@ class NetBase(IOSExtraction):
|
||||
if not result["proc_id"]:
|
||||
continue
|
||||
|
||||
if self.indicators.check_process(proc_name):
|
||||
ioc = self.indicators.check_process(proc_name)
|
||||
if ioc:
|
||||
result["matched_indicator"] = ioc
|
||||
self.detected.append(result)
|
||||
|
||||
@@ -236,6 +236,7 @@ IPHONE_IOS_VERSIONS = [
|
||||
{"build": "19B81", "version": "15.1.1"},
|
||||
{"build": "19C56", "version": "15.2"},
|
||||
{"build": "19C63", "version": "15.2.1"},
|
||||
{"build": "19D50", "version": "15.3"},
|
||||
]
|
||||
|
||||
|
||||
|
||||
Binary file not shown.
Binary file not shown.
@@ -13,11 +13,11 @@ class TestIndicators:
|
||||
def test_parse_stix2(self, indicator_file):
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([indicator_file], load_default=False)
|
||||
assert ind.ioc_count == 4
|
||||
assert len(ind.ioc_domains) == 1
|
||||
assert len(ind.ioc_emails) == 1
|
||||
assert len(ind.ioc_files) == 1
|
||||
assert len(ind.ioc_processes) == 1
|
||||
assert ind.ioc_files[0]["count"] == 4
|
||||
assert len(ind.ioc_files[0]["domains"]) == 1
|
||||
assert len(ind.ioc_files[0]["emails"]) == 1
|
||||
assert len(ind.ioc_files[0]["file_names"]) == 1
|
||||
assert len(ind.ioc_files[0]["processes"]) == 1
|
||||
|
||||
def test_check_domain(self, indicator_file):
|
||||
ind = Indicators(log=logging)
|
||||
@@ -28,5 +28,5 @@ class TestIndicators:
|
||||
def test_env_stix(self, indicator_file):
|
||||
os.environ["MVT_STIX2"] = indicator_file
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([indicator_file], load_default=False)
|
||||
assert ind.ioc_count == 4
|
||||
ind.load_indicators_files([], load_default=False)
|
||||
assert ind.total_ioc_count == 4
|
||||
|
||||
@@ -25,7 +25,7 @@ class TestDatausageModule:
|
||||
ind = Indicators(log=logging)
|
||||
ind.parse_stix2(indicator_file)
|
||||
# Adds a file that exists in the manifest.
|
||||
ind.ioc_processes[0] = "CumulativeUsageTracker"
|
||||
ind.ioc_files[0]["processes"].append("CumulativeUsageTracker")
|
||||
m.indicators = ind
|
||||
run_module(m)
|
||||
assert len(m.detected) == 2
|
||||
|
||||
@@ -24,8 +24,7 @@ class TestManifestModule:
|
||||
m = Manifest(base_folder=get_backup_folder(), log=logging, results=[])
|
||||
ind = Indicators(log=logging)
|
||||
ind.parse_stix2(indicator_file)
|
||||
# Adds a file that exists in the manifest
|
||||
ind.ioc_files[0] = "com.apple.CoreBrightness.plist"
|
||||
ind.ioc_files[0]["file_names"].append("com.apple.CoreBrightness.plist")
|
||||
m.indicators = ind
|
||||
run_module(m)
|
||||
assert len(m.detected) == 1
|
||||
|
||||
36
tests/ios/test_safari_browserstate.py
Normal file
36
tests/ios/test_safari_browserstate.py
Normal file
@@ -0,0 +1,36 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021 The MVT Project Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import logging
|
||||
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.mixed.safari_browserstate import SafariBrowserState
|
||||
|
||||
from ..utils import get_backup_folder
|
||||
|
||||
|
||||
class TestSafariBrowserStateModule:
|
||||
def test_parsing(self):
|
||||
m = SafariBrowserState(base_folder=get_backup_folder(), log=logging, results=[])
|
||||
m.is_backup = True
|
||||
run_module(m)
|
||||
assert m.file_path != None
|
||||
assert len(m.results) == 1
|
||||
assert len(m.timeline) == 1
|
||||
assert len(m.detected) == 0
|
||||
|
||||
def test_detection(self, indicator_file):
|
||||
m = SafariBrowserState(base_folder=get_backup_folder(), log=logging, results=[])
|
||||
m.is_backup = True
|
||||
ind = Indicators(log=logging)
|
||||
ind.parse_stix2(indicator_file)
|
||||
# Adds a file that exists in the manifest.
|
||||
ind.ioc_files[0]["domains"].append("en.wikipedia.org")
|
||||
m.indicators = ind
|
||||
run_module(m)
|
||||
assert len(m.detected) == 1
|
||||
assert len(m.results) == 1
|
||||
assert m.results[0]["tab_url"] == "https://en.wikipedia.org/wiki/NSO_Group"
|
||||
@@ -5,27 +5,32 @@
|
||||
|
||||
import logging
|
||||
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import run_module
|
||||
from mvt.ios.modules.mixed.tcc import TCC
|
||||
|
||||
from ..utils import get_backup_folder
|
||||
|
||||
|
||||
class TestManifestModule:
|
||||
def test_manifest(self):
|
||||
class TestTCCtModule:
|
||||
def test_tcc(self):
|
||||
m = TCC(base_folder=get_backup_folder(), log=logging, results=[])
|
||||
run_module(m)
|
||||
assert len(m.results) == 11
|
||||
assert len(m.timeline) == 11
|
||||
assert len(m.detected) == 0
|
||||
assert m.results[0]["service"] == "kTCCServiceUbiquity"
|
||||
assert m.results[0]["client"] == "com.apple.Preferences"
|
||||
assert m.results[0]["auth_value"] == "allowed"
|
||||
|
||||
def test_manifest_2(self):
|
||||
def test_tcc_detection(self, indicator_file):
|
||||
m = TCC(base_folder=get_backup_folder(), log=logging, results=[])
|
||||
ind = Indicators(log=logging)
|
||||
ind.parse_stix2(indicator_file)
|
||||
m.indicators = ind
|
||||
run_module(m)
|
||||
assert len(m.results) == 11
|
||||
assert len(m.timeline) == 11
|
||||
assert len(m.detected) == 0
|
||||
assert m.results[0]["service"] == "kTCCServiceUbiquity"
|
||||
assert m.results[0]["auth_value"] == "allowed"
|
||||
assert len(m.detected) == 1
|
||||
assert m.detected[0]["service"] == "kTCCServiceLiverpool"
|
||||
assert m.detected[0]["client"] == "Launch"
|
||||
|
||||
Reference in New Issue
Block a user