Compare commits

..

21 Commits

Author SHA1 Message Date
Nex
71e270fdf8 Bumped version 2021-08-16 14:56:46 +02:00
Nex
8125f1ba14 Updated docs with new modules 2021-08-16 11:12:57 +02:00
Nex
96e4a9a4a4 Overhaul of mvt-ios modules 2021-08-16 10:50:35 +02:00
Nex
24d7187303 Fixed variable name 2021-08-15 20:02:17 +02:00
Nex
6af6c52f60 Renamed function for consistency 2021-08-15 20:01:33 +02:00
Nex
fdaf2fc760 Fixed WebkitSessionResourceLog module, still needs testing 2021-08-15 20:00:29 +02:00
Nex
fda621672d Renamed webkit helper function 2021-08-15 19:50:55 +02:00
Nex
ce6cc771b4 Replaced leftover dicts 2021-08-15 19:20:41 +02:00
Nex
e1e4476bee Standardizing Manifest results structure 2021-08-15 19:07:45 +02:00
Nex
9582778adf Getting rid of dict() 2021-08-15 19:05:15 +02:00
Nex
5e6e4fa8d0 Added modules to extract details on configuration profiles from backup 2021-08-15 18:53:02 +02:00
Nex
9e5a412fe2 Creating helper function to locate files in Manifest.db 2021-08-15 17:39:14 +02:00
Nex
763cb6e06c DeviceInfo module is now BackupInfo and only running on backups 2021-08-15 13:16:00 +02:00
Nex
cbdbf41e1e Restructured modules folders 2021-08-15 13:14:18 +02:00
Nex
cf630f7c2b Fixed unused imports 2021-08-14 18:56:33 +02:00
Nex
3d6e01179a Fixed typo 2021-08-14 18:52:00 +02:00
Nex
8260bda308 Got rid of biplist, using standard plistlib 2021-08-14 18:50:11 +02:00
Nex
30e00e0707 Added module to extract information on device 2021-08-14 18:39:46 +02:00
Nex
88e2576334 Copying plist files too when decrypting a backup 2021-08-14 18:25:41 +02:00
Nex
076930c2c9 Added newline 2021-08-14 18:06:30 +02:00
Nex
8a91e64bb9 Catching gracefully if indicators file parse fails 2021-08-12 20:17:37 +02:00
50 changed files with 792 additions and 534 deletions

View File

@@ -4,6 +4,16 @@ In this page you can find a (reasonably) up-to-date breakdown of the files creat
## Records extracted by `check-fs` or `check-backup`
### `backup_info.json`
!!! info "Availabiliy"
Backup: :material-check:
Full filesystem dump: :material-close:
This JSON file is created by mvt-ios' `BackupInfo` module. The module extracts some details about the backup and the device, such as name, phone number, IMEI, product type and version.
---
### `cache_files.json`
!!! info "Availability"
@@ -50,6 +60,16 @@ If indicators a provided through the command-line, they are checked against the
---
### `configuration_profiles.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-close:
This JSON file is created by mvt-ios' `ConfigurationProfiles` module. The module extracts details about iOS configuration profiles that have been installed on the device. These should include both default iOS as well as third-party profiles.
---
### `contacts.json`
!!! info "Availability"
@@ -150,6 +170,16 @@ If indicators are provided through the command-line, they are checked against th
---
### `profile_events.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-close:
This JSON file is created by mvt-ios' `ProfileEvents` module. The module extracts a timeline of configuration profile operations. For example, it should indicate when a new profile was installed from the Settings app, or when one was removed.
---
### `safari_browser_state.json`
!!! info "Availability"
@@ -242,6 +272,18 @@ If indicators are provided through the command-line, they are checked against th
---
### `webkit_resource_load_statistics.json`
!!! info "Availability"
Backup: :material-check:
Full filesystem dump: :material-check:
This JSON file is created by mvt-ios `WebkitResourceLoadStatistics` module. The module extracts records from available WebKit ResourceLoadStatistics *observations.db* SQLite3 databases. These records should indicate domain names contacted by apps, including a timestamp.
If indicators are provided through the command-line, they are checked against the extracted domain names. Any matches are stored in *webkit_resource_load_statistics_detected.json*.
---
### `webkit_safari_view_service.json`
!!! info "Availability"

View File

@@ -3,7 +3,6 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import argparse
import logging
import os
import sys

View File

@@ -52,14 +52,14 @@ class ChromeHistory(AndroidExtraction):
""")
for item in cur:
self.results.append(dict(
id=item[0],
url=item[1],
visit_id=item[2],
timestamp=item[3],
isodate=convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
redirect_source=item[4],
))
self.results.append({
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix[item[3]]),
"redirect_source": item[4],
})
cur.close()
conn.close()

View File

@@ -76,18 +76,18 @@ class Packages(AndroidExtraction):
first_install = dumpsys[1].split("=")[1].strip()
last_update = dumpsys[2].split("=")[1].strip()
self.results.append(dict(
package_name=package_name,
file_name=file_name,
installer=installer,
timestamp=timestamp,
first_install_time=first_install,
last_update_time=last_update,
uid=uid,
disabled=False,
system=False,
third_party=False,
))
self.results.append({
"package_name": package_name,
"file_name": file_name,
"installer": installer,
"timestamp": timestamp,
"first_install_time": first_install,
"last_update_time": last_update,
"uid": uid,
"disabled": False,
"system": False,
"third_party": False,
})
cmds = [
{"field": "disabled", "arg": "-d"},

View File

@@ -29,13 +29,13 @@ class Processes(AndroidExtraction):
continue
fields = line.split()
proc = dict(
user=fields[0],
pid=fields[1],
parent_pid=fields[2],
vsize=fields[3],
rss=fields[4],
)
proc = {
"user": fields[0],
"pid": fields[1],
"parent_pid": fields[2],
"vsize": fields[3],
"rss": fields[4],
}
# Sometimes WCHAN is empty, so we need to re-align output fields.
if len(fields) == 8:

View File

@@ -84,7 +84,7 @@ class SMS(AndroidExtraction):
names = [description[0] for description in cur.description]
for item in cur:
message = dict()
message = {}
for index, value in enumerate(item):
message[names[index]] = value

View File

@@ -59,7 +59,7 @@ class Whatsapp(AndroidExtraction):
messages = []
for item in cur:
message = dict()
message = {}
for index, value in enumerate(item):
message[names[index]] = value

View File

@@ -8,7 +8,7 @@ import os
import zlib
from mvt.common.module import MVTModule
from mvt.common.utils import check_for_links, convert_timestamp_to_iso
from mvt.common.utils import check_for_links
class SMS(MVTModule):

View File

@@ -9,6 +9,9 @@ import os
from .url import URL
class IndicatorsFileBadFormat(Exception):
pass
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
functions to compare extracted artifacts to the indicators.
@@ -17,7 +20,10 @@ class Indicators:
def __init__(self, file_path, log=None):
self.file_path = file_path
with open(self.file_path, "r") as handle:
self.data = json.load(handle)
try:
self.data = json.load(handle)
except json.decoder.JSONDecodeError:
raise IndicatorsFileBadFormat("Unable to parse STIX2 indicators file, the file seems malformed or in the wrong format")
self.log = log
self.ioc_domains = []
@@ -109,10 +115,10 @@ class Indicators:
# Then we just check the top level domain.
if final_url.top_level.lower() == ioc:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning("Found a sub-domain matching a suspicious top level %s shortened as %s",
self.log.warning("Found a sub-domain matching a known suspicious top level %s shortened as %s",
final_url.url, orig_url.url)
else:
self.log.warning("Found a sub-domain matching a suspicious top level: %s", final_url.url)
self.log.warning("Found a sub-domain matching a known suspicious top level: %s", final_url.url)
return True

View File

@@ -6,7 +6,6 @@
import csv
import glob
import io
import logging
import os
import re
@@ -67,13 +66,6 @@ class MVTModule(object):
sub = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", self.__class__.__name__)
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
def _find_paths(self, root_paths):
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.base_folder, root_path)):
if not os.path.exists(found_path):
continue
yield found_path
def load_indicators(self, file_path):
self.indicators = Indicators(file_path, self.log)
@@ -93,9 +85,9 @@ class MVTModule(object):
if self.results:
results_file_name = f"{name}.json"
results_json_path = os.path.join(self.output_folder, results_file_name)
with open(results_json_path, "w") as handle:
with io.open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4)
json.dump(self.results, handle, indent=4, default=str)
except Exception as e:
self.log.error("Unable to store results of module %s to file %s: %s",
self.__class__.__name__, results_file_name, e)
@@ -103,8 +95,8 @@ class MVTModule(object):
if self.detected:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.output_folder, detected_file_name)
with open(detected_json_path, "w") as handle:
json.dump(self.detected, handle, indent=4)
with io.open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, default=str)
def serialize(self, record):
raise NotImplementedError
@@ -194,8 +186,8 @@ def save_timeline(timeline, timeline_path):
csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"])
for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""):
csvoutput.writerow([
event["timestamp"],
event["module"],
event["event"],
event["data"],
event.get("timestamp"),
event.get("module"),
event.get("event"),
event.get("data"),
])

View File

@@ -5,7 +5,7 @@
# From: https://gist.github.com/stanchan/bce1c2d030c76fe9223b5ff6ad0f03db
from click import Option, UsageError, command, option
from click import Option, UsageError
class MutuallyExclusiveOption(Option):

View File

@@ -5,7 +5,6 @@
import datetime
import hashlib
import os
import re

View File

@@ -5,18 +5,19 @@
import logging
import os
import tarfile
import click
from rich.logging import RichHandler
from rich.prompt import Prompt
from mvt.common.indicators import Indicators
from mvt.common.indicators import Indicators, IndicatorsFileBadFormat
from mvt.common.module import run_module, save_timeline
from mvt.common.options import MutuallyExclusiveOption
from .decrypt import DecryptBackup
from .modules.fs import BACKUP_MODULES, FS_MODULES
from .modules.backup import BACKUP_MODULES
from .modules.fs import FS_MODULES
from .modules.mixed import MIXED_MODULES
# Setup logging using Rich.
LOG_FORMAT = "[%(name)s] %(message)s"
@@ -79,6 +80,7 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path):
if not backup.can_process():
ctx.exit(1)
backup.process_backup()
@@ -129,7 +131,7 @@ def extract_key(password, backup_path, key_file):
def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
if list_modules:
log.info("Following is the list of available check-backup modules:")
for backup_module in BACKUP_MODULES:
for backup_module in BACKUP_MODULES + MIXED_MODULES:
log.info(" - %s", backup_module.__name__)
return
@@ -146,11 +148,15 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
if iocs:
# Pre-load indicators for performance reasons.
log.info("Loading indicators from provided file at: %s", iocs)
indicators = Indicators(iocs)
try:
indicators = Indicators(iocs)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
timeline = []
timeline_detected = []
for backup_module in BACKUP_MODULES:
for backup_module in BACKUP_MODULES + MIXED_MODULES:
if module and backup_module.__name__ != module:
continue
@@ -187,7 +193,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module):
def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
if list_modules:
log.info("Following is the list of available check-fs modules:")
for fs_module in FS_MODULES:
for fs_module in FS_MODULES + MIXED_MODULES:
log.info(" - %s", fs_module.__name__)
return
@@ -204,11 +210,15 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
if iocs:
# Pre-load indicators for performance reasons.
log.info("Loading indicators from provided file at: %s", iocs)
indicators = Indicators(iocs)
try:
indicators = Indicators(iocs)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
timeline = []
timeline_detected = []
for fs_module in FS_MODULES:
for fs_module in FS_MODULES + MIXED_MODULES:
if module and fs_module.__name__ != module:
continue
@@ -241,7 +251,8 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module):
@click.option("--list-modules", "-l", is_flag=True, help="Print list of available modules and exit")
@click.option("--module", "-m", help="Name of a single module you would like to run instead of all")
@click.argument("FOLDER", type=click.Path(exists=True))
def check_iocs(iocs, list_modules, module, folder):
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
all_modules = []
for entry in BACKUP_MODULES + FS_MODULES:
if entry not in all_modules:
@@ -258,7 +269,12 @@ def check_iocs(iocs, list_modules, module, folder):
# Pre-load indicators for performance reasons.
log.info("Loading indicators from provided file at: %s", iocs)
indicators = Indicators(iocs)
try:
indicators = Indicators(iocs)
except IndicatorsFileBadFormat as e:
log.critical(e)
ctx.exit(1)
for file_name in os.listdir(folder):
name_only, ext = os.path.splitext(file_name)

View File

@@ -8,7 +8,6 @@ import glob
import logging
import os
import shutil
import sqlite3
from iOSbackup import iOSbackup
@@ -71,6 +70,13 @@ class DecryptBackup:
except Exception as e:
log.error("Failed to decrypt file %s: %s", relative_path, e)
# Copying over the root plist files as well.
for file_name in os.listdir(self.backup_path):
if file_name.endswith(".plist"):
log.info("Copied plist file %s to %s", file_name, self.dest_path)
shutil.copy(os.path.join(self.backup_path, file_name),
self.dest_path)
def decrypt_with_password(self, password):
"""Decrypts an encrypted iOS backup.
:param password: Password to use to decrypt the original backup

View File

@@ -0,0 +1,11 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .backup_info import BackupInfo
from .configuration_profiles import ConfigurationProfiles
from .manifest import Manifest
from .profile_events import ProfileEvents
BACKUP_MODULES = [BackupInfo, ConfigurationProfiles, Manifest, ProfileEvents]

View File

@@ -0,0 +1,43 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import plistlib
from mvt.common.module import DatabaseNotFoundError
from ..base import IOSExtraction
class BackupInfo(IOSExtraction):
"""This module extracts information about the device and the backup."""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.results = {}
def run(self):
info_path = os.path.join(self.base_folder, "Info.plist")
if not os.path.exists(info_path):
raise DatabaseNotFoundError("No Info.plist at backup path, unable to extract device information")
with open(info_path, "rb") as handle:
info = plistlib.load(handle)
fields = ["Build Version", "Device Name", "Display Name", "GUID",
"GUID", "ICCID", "IMEI", "MEID", "Installed Applications",
"Last Backup Data", "Phone Number", "Product Name",
"Product Type", "Product Version", "Serial Number",
"Target Identifier", "Target Type", "Unique Identifier",
"iTunes Version"]
for field in fields:
value = info.get(field, None)
self.log.info("%s: %s", field, value)
self.results[field] = value

View File

@@ -0,0 +1,43 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import plistlib
from base64 import b64encode
from ..base import IOSExtraction
CONF_PROFILES_DOMAIN = "SysSharedContainerDomain-systemgroup.com.apple.configurationprofiles"
class ConfigurationProfiles(IOSExtraction):
"""This module extracts the full plist data from configuration profiles.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def run(self):
for conf_file in self._get_backup_files_from_manifest(domain=CONF_PROFILES_DOMAIN):
conf_file_path = self._get_backup_file_from_id(conf_file["file_id"])
if not conf_file_path:
continue
with open(conf_file_path, "rb") as handle:
conf_plist = plistlib.load(handle)
if "SignerCerts" in conf_plist:
conf_plist["SignerCerts"] = [b64encode(x) for x in conf_plist["SignerCerts"]]
self.results.append({
"file_id": conf_file["file_id"],
"relative_path": conf_file["relative_path"],
"domain": conf_file["domain"],
"plist": conf_plist,
})
self.log.info("Extracted details about %d configuration profiles", len(self.results))

View File

@@ -6,14 +6,13 @@
import datetime
import io
import os
import plistlib
import sqlite3
import biplist
from mvt.common.module import DatabaseNotFoundError
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
class Manifest(IOSExtraction):
@@ -26,15 +25,14 @@ class Manifest(IOSExtraction):
log=log, results=results)
def _get_key(self, dictionary, key):
"""
Unserialized plist objects can have keys which are str or byte types
"""Unserialized plist objects can have keys which are str or byte types
This is a helper to try fetch a key as both a byte or string type.
"""
return dictionary.get(key.encode("utf-8"), None) or dictionary.get(key, None)
def _convert_timestamp(self, timestamp_or_unix_time_int):
"""Older iOS versions stored the manifest times as unix timestamps."""
"""Older iOS versions stored the manifest times as unix timestamps.
"""
if isinstance(timestamp_or_unix_time_int, datetime.datetime):
return convert_timestamp_to_iso(timestamp_or_unix_time_int)
else:
@@ -43,20 +41,20 @@ class Manifest(IOSExtraction):
def serialize(self, record):
records = []
if "modified" not in record or "statusChanged" not in record:
if "modified" not in record or "status_changed" not in record:
return
for ts in set([record["created"], record["modified"], record["statusChanged"]]):
for ts in set([record["created"], record["modified"], record["status_changed"]]):
macb = ""
macb += "M" if ts == record["modified"] else "-"
macb += "-"
macb += "C" if ts == record["statusChanged"] else "-"
macb += "C" if ts == record["status_changed"] else "-"
macb += "B" if ts == record["created"] else "-"
records.append({
"timestamp": ts,
"module": self.__class__.__name__,
"event": macb,
"data": f"{record['relativePath']} - {record['domain']}"
"data": f"{record['relative_path']} - {record['domain']}"
})
return records
@@ -66,23 +64,23 @@ class Manifest(IOSExtraction):
return
for result in self.results:
if not "relativePath" in result:
if not "relative_path" in result:
continue
if not result["relativePath"]:
if not result["relative_path"]:
continue
if result["domain"]:
if os.path.basename(result["relativePath"]) == "com.apple.CrashReporter.plist" and result["domain"] == "RootDomain":
if os.path.basename(result["relative_path"]) == "com.apple.CrashReporter.plist" and result["domain"] == "RootDomain":
self.log.warning("Found a potentially suspicious \"com.apple.CrashReporter.plist\" file created in RootDomain")
self.detected.append(result)
continue
if self.indicators.check_file(result["relativePath"]):
self.log.warning("Found a known malicious file at path: %s", result["relativePath"])
if self.indicators.check_file(result["relative_path"]):
self.log.warning("Found a known malicious file at path: %s", result["relative_path"])
self.detected.append(result)
continue
relPath = result["relativePath"].lower()
relPath = result["relative_path"].lower()
for ioc in self.indicators.ioc_domains:
if ioc.lower() in relPath:
self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s",
@@ -103,26 +101,26 @@ class Manifest(IOSExtraction):
names = [description[0] for description in cur.description]
for file_entry in cur:
file_data = dict()
file_data = {}
for index, value in enumerate(file_entry):
file_data[names[index]] = value
cleaned_metadata = {
"fileID": file_data["fileID"],
"file_id": file_data["fileID"],
"domain": file_data["domain"],
"relativePath": file_data["relativePath"],
"relative_path": file_data["relativePath"],
"flags": file_data["flags"],
"created": "",
}
if file_data["file"]:
try:
file_plist = biplist.readPlist(io.BytesIO(file_data["file"]))
file_plist = plistlib.load(io.BytesIO(file_data["file"]))
file_metadata = self._get_key(file_plist, "$objects")[1]
cleaned_metadata.update({
"created": self._convert_timestamp(self._get_key(file_metadata, "Birth")),
"modified": self._convert_timestamp(self._get_key(file_metadata, "LastModified")),
"statusChanged": self._convert_timestamp(self._get_key(file_metadata, "LastStatusChange")),
"status_changed": self._convert_timestamp(self._get_key(file_metadata, "LastStatusChange")),
"mode": oct(self._get_key(file_metadata, "Mode")),
"owner": self._get_key(file_metadata, "UserID"),
"size": self._get_key(file_metadata, "Size"),

View File

@@ -0,0 +1,59 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import plistlib
from datetime import datetime
from mvt.common.utils import convert_timestamp_to_iso
from ..base import IOSExtraction
CONF_PROFILES_EVENTS_RELPATH = "Library/ConfigurationProfiles/MCProfileEvents.plist"
class ProfileEvents(IOSExtraction):
"""This module extracts events related to the installation of configuration
profiles.
"""
def __init__(self, file_path=None, base_folder=None, output_folder=None,
fast_mode=False, log=None, results=[]):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def serialize(self, record):
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
"event": "profile_operation",
"data": f"Process {record.get('process')} started operation {record.get('operation')} of profile {record.get('profile_id')}"
}
def run(self):
for events_file in self._get_backup_files_from_manifest(relative_path=CONF_PROFILES_EVENTS_RELPATH):
events_file_path = self._get_backup_file_from_id(events_file["file_id"])
if not events_file_path:
continue
with open(events_file_path, "rb") as handle:
events_plist = plistlib.load(handle)
if "ProfileEvents" not in events_plist:
continue
for event in events_plist["ProfileEvents"]:
key = list(event.keys())[0]
self.log.info("On %s process \"%s\" started operation \"%s\" of profile \"%s\"",
event[key].get("timestamp"), event[key].get("process"),
event[key].get("operation"), key)
self.results.append({
"profile_id": key,
"timestamp": convert_timestamp_to_iso(event[key].get("timestamp")),
"operation": event[key].get("operation"),
"process": event[key].get("process"),
})
self.log.info("Extracted %d profile events", len(self.results))

View File

@@ -4,7 +4,6 @@
# https://license.mvt.re/1.1/
import glob
import io
import os
import shutil
import sqlite3
@@ -27,8 +26,11 @@ class IOSExtraction(MVTModule):
self.is_fs_dump = False
self.is_sysdiagnose = False
def _is_database_malformed(self, file_path):
# Check if the database is malformed.
def _recover_sqlite_db_if_needed(self, file_path):
"""Tries to recover a malformed database by running a .clone command.
:param file_path: Path to the malformed database file.
"""
# TODO: Find a better solution.
conn = sqlite3.connect(file_path)
cur = conn.cursor()
@@ -41,19 +43,11 @@ class IOSExtraction(MVTModule):
finally:
conn.close()
return recover
def _recover_database(self, file_path):
"""Tries to recover a malformed database by running a .clone command.
:param file_path: Path to the malformed database file.
"""
# TODO: Find a better solution.
if not recover:
return
self.log.info("Database at path %s is malformed. Trying to recover...", file_path)
if not os.path.exists(file_path):
return
if not shutil.which("sqlite3"):
raise DatabaseCorruptedError("Unable to recover without sqlite3 binary. Please install sqlite3!")
if '"' in file_path:
@@ -69,9 +63,59 @@ class IOSExtraction(MVTModule):
self.log.info("Database at path %s recovered successfully!", file_path)
def _get_backup_files_from_manifest(self, relative_path=None, domain=None):
"""Locate files from Manifest.db.
:param relative_path: Relative path to use as filter from Manifest.db.
:param domain: Domain to use as filter from Manifest.db.
"""
manifest_db_path = os.path.join(self.base_folder, "Manifest.db")
if not os.path.exists(manifest_db_path):
raise Exception("Unable to find backup's Manifest.db")
base_sql = "SELECT fileID, domain, relativePath FROM Files WHERE "
try:
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
if relative_path and domain:
cur.execute(f"{base_sql} relativePath = ? AND domain = ?;",
(relative_path, domain))
else:
if relative_path:
cur.execute(f"{base_sql} relativePath = ?;", (relative_path,))
elif domain:
cur.execute(f"{base_sql} domain = ?;", (domain,))
except Exception as e:
raise Exception("Query to Manifest.db failed: %s", e)
for row in cur:
yield {
"file_id": row[0],
"domain": row[1],
"relative_path": row[2],
}
def _get_backup_file_from_id(self, file_id):
file_path = os.path.join(self.base_folder, file_id[0:2], file_id)
if os.path.exists(file_path):
return file_path
return None
def _get_fs_files_from_patterns(self, root_paths):
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.base_folder, root_path)):
if not os.path.exists(found_path):
continue
yield found_path
def _find_ios_database(self, backup_ids=None, root_paths=[]):
"""Try to locate the module's database file from either an iTunes
backup or a full filesystem dump.
"""Try to locate a module's database file from either an iTunes
backup or a full filesystem dump. This is intended only for
modules that expect to work with a single SQLite database.
If a module requires to process multiple databases or files,
you should use the helper functions above.
:param backup_id: iTunes backup database file's ID (or hash).
:param root_paths: Glob patterns for files to seek in filesystem dump.
"""
@@ -83,9 +127,8 @@ class IOSExtraction(MVTModule):
# folder structure, if we have a valid ID.
if backup_ids:
for backup_id in backup_ids:
file_path = os.path.join(self.base_folder, backup_id[0:2], backup_id)
# If we found the correct backup file, then we stop searching.
if os.path.exists(file_path):
file_path = self._get_backup_file_from_id(backup_id)
if file_path:
break
# If this file does not exist we might be processing a full
@@ -93,15 +136,9 @@ class IOSExtraction(MVTModule):
if not file_path or not os.path.exists(file_path):
# We reset the file_path.
file_path = None
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.base_folder, root_path)):
# If we find a valid path, we set file_path.
if os.path.exists(found_path):
file_path = found_path
break
# Otherwise, we reset the file_path again.
file_path = None
for found_path in self._get_fs_files_from_patterns(root_paths):
file_path = found_path
break
# If we do not find any, we fail.
if file_path:
@@ -109,5 +146,4 @@ class IOSExtraction(MVTModule):
else:
raise DatabaseNotFoundError("Unable to find the module's database file")
if self._is_database_malformed(self.file_path):
self._recover_database(self.file_path)
self._recover_sqlite_db_if_needed(self.file_path)

View File

@@ -4,41 +4,13 @@
# https://license.mvt.re/1.1/
from .cache_files import CacheFiles
from .calls import Calls
from .chrome_favicon import ChromeFavicon
from .chrome_history import ChromeHistory
from .contacts import Contacts
from .filesystem import Filesystem
from .firefox_favicon import FirefoxFavicon
from .firefox_history import FirefoxHistory
from .idstatuscache import IDStatusCache
from .interactionc import InteractionC
from .locationd import LocationdClients
from .manifest import Manifest
from .net_datausage import Datausage
from .net_netusage import Netusage
from .safari_browserstate import SafariBrowserState
from .safari_favicon import SafariFavicon
from .safari_history import SafariHistory
from .sms import SMS
from .sms_attachments import SMSAttachments
from .version_history import IOSVersionHistory
from .webkit_indexeddb import WebkitIndexedDB
from .webkit_localstorage import WebkitLocalStorage
from .webkit_resource_load_statistics import WebkitResourceLoadStatistics
from .webkit_safariviewservice import WebkitSafariViewService
from .webkit_session_resource_log import WebkitSessionResourceLog
from .whatsapp import Whatsapp
BACKUP_MODULES = [SafariBrowserState, SafariHistory, Datausage, SMS, SMSAttachments,
ChromeHistory, ChromeFavicon, WebkitSessionResourceLog,
WebkitResourceLoadStatistics, Calls, IDStatusCache, LocationdClients,
InteractionC, FirefoxHistory, FirefoxFavicon, Contacts, Manifest, Whatsapp]
FS_MODULES = [IOSVersionHistory, SafariHistory, SafariFavicon, SafariBrowserState,
WebkitIndexedDB, WebkitLocalStorage, WebkitSafariViewService,
WebkitResourceLoadStatistics, WebkitSessionResourceLog,
Datausage, Netusage, ChromeHistory,
ChromeFavicon, Calls, IDStatusCache, SMS, SMSAttachments,
LocationdClients, InteractionC, FirefoxHistory, FirefoxFavicon,
Contacts, CacheFiles, Whatsapp, Filesystem]
FS_MODULES = [CacheFiles, Filesystem, Netusage, SafariFavicon, IOSVersionHistory,
WebkitIndexedDB, WebkitLocalStorage, WebkitSafariViewService,]

View File

@@ -6,7 +6,7 @@
import os
import sqlite3
from .base import IOSExtraction
from ..base import IOSExtraction
class CacheFiles(IOSExtraction):
@@ -58,14 +58,14 @@ class CacheFiles(IOSExtraction):
self.results[key_name] = []
for row in cur:
self.results[key_name].append(dict(
entry_id=row[0],
version=row[1],
hash_value=row[2],
storage_policy=row[3],
url=row[4],
isodate=row[5],
))
self.results[key_name].append({
"entry_id": row[0],
"version": row[1],
"hash_value": row[2],
"storage_policy": row[3],
"url": row[4],
"isodate": row[5],
})
def run(self):
self.results = {}

View File

@@ -8,7 +8,7 @@ import os
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
class Filesystem(IOSExtraction):
@@ -25,7 +25,7 @@ class Filesystem(IOSExtraction):
return {
"timestamp": record["modified"],
"module": self.__class__.__name__,
"event": f"file_modified",
"event": "file_modified",
"data": record["file_path"],
}

View File

@@ -3,7 +3,9 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .net_base import NetBase
import sqlite3
from ..net_base import NetBase
NETUSAGE_ROOT_PATHS = [
"private/var/networkd/netusage.sqlite",
@@ -21,8 +23,13 @@ class Netusage(NetBase):
log=log, results=results)
def run(self):
self._find_ios_database(root_paths=NETUSAGE_ROOT_PATHS)
self.log.info("Found NetUsage database at path: %s", self.file_path)
for netusage_path in self._get_fs_files_from_patterns(NETUSAGE_ROOT_PATHS):
self.file_path = netusage_path
self.log.info("Found NetUsage database at path: %s", self.file_path)
try:
self._extract_net_data()
except sqlite3.OperationalError as e:
self.log.info("Skipping this NetUsage database because it seems empty or malformed: %s", e)
continue
self._extract_net_data()
self._find_suspicious_processes()

View File

@@ -7,7 +7,7 @@ import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
SAFARI_FAVICON_ROOT_PATHS = [
"private/var/mobile/Library/Image Cache/Favicons/Favicons.db",
@@ -39,50 +39,57 @@ class SafariFavicon(IOSExtraction):
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["icon_url"]):
self.detected.append(result)
def run(self):
self._find_ios_database(root_paths=SAFARI_FAVICON_ROOT_PATHS)
self.log.info("Found Safari favicon cache database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
def _process_favicon_db(self, file_path):
conn = sqlite3.connect(file_path)
# Fetch valid icon cache.
cur = conn.cursor()
cur.execute("""SELECT
cur.execute("""
SELECT
page_url.url,
icon_info.url,
icon_info.timestamp
FROM page_url
JOIN icon_info ON page_url.uuid = icon_info.uuid
ORDER BY icon_info.timestamp;""")
ORDER BY icon_info.timestamp;
""")
items = []
for item in cur:
items.append(dict(
url=item[0],
icon_url=item[1],
timestamp=item[2],
isodate=convert_timestamp_to_iso(convert_mactime_to_unix(item[2])),
type="valid",
))
for row in cur:
self.results.append({
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])),
"type": "valid",
"safari_favicon_db_path": file_path,
})
# Fetch icons from the rejected icons table.
cur.execute("""SELECT
cur.execute("""
SELECT
page_url,
icon_url,
timestamp
FROM rejected_resources ORDER BY timestamp;""")
FROM rejected_resources ORDER BY timestamp;
""")
for item in cur:
items.append(dict(
url=item[0],
icon_url=item[1],
timestamp=item[2],
isodate=convert_timestamp_to_iso(convert_mactime_to_unix(item[2])),
type="rejected",
))
for row in cur:
self.results.append({
"url": row[0],
"icon_url": row[1],
"timestamp": row[2],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[2])),
"type": "rejected",
"safari_favicon_db_path": file_path,
})
cur.close()
conn.close()
self.log.info("Extracted a total of %d favicon records", len(items))
self.results = sorted(items, key=lambda item: item["isodate"])
def run(self):
for file_path in self._get_fs_files_from_patterns(SAFARI_FAVICON_ROOT_PATHS):
self.log.info("Found Safari favicon cache database at path: %s", file_path)
self._process_favicon_db(file_path)
self.log.info("Extracted a total of %d favicon records", len(self.results))
self.results = sorted(self.results, key=lambda x: x["isodate"])

View File

@@ -8,7 +8,7 @@ import json
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
IOS_ANALYTICS_JOURNAL_PATHS = [
"private/var/db/analyticsd/Analytics-Journal-*.ips",
@@ -32,7 +32,7 @@ class IOSVersionHistory(IOSExtraction):
}
def run(self):
for found_path in self._find_paths(IOS_ANALYTICS_JOURNAL_PATHS):
for found_path in self._get_fs_files_from_patterns(IOS_ANALYTICS_JOURNAL_PATHS):
with open(found_path, "r") as analytics_log:
log_line = json.loads(analytics_log.readline().strip())

View File

@@ -8,7 +8,7 @@ import os
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
class WebkitBase(IOSExtraction):
@@ -22,8 +22,8 @@ class WebkitBase(IOSExtraction):
if self.indicators.check_domain(item["url"]):
self.detected.append(item)
def _database_from_path(self, root_paths):
for found_path in self._find_paths(root_paths):
def _process_webkit_folder(self, root_paths):
for found_path in self._get_fs_files_from_patterns(root_paths):
key = os.path.relpath(found_path, self.base_folder)
for name in os.listdir(found_path):
@@ -34,8 +34,8 @@ class WebkitBase(IOSExtraction):
name = name.replace("https_", "https://")
url = name.split("_")[0]
self.results.append(dict(
folder=key,
url=url,
isodate=convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime)),
))
self.results.append({
"folder": key,
"url": url,
"isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(os.stat(found_path).st_mtime)),
})

View File

@@ -30,6 +30,6 @@ class WebkitIndexedDB(WebkitBase):
}
def run(self):
self._database_from_path(WEBKIT_INDEXEDDB_ROOT_PATHS)
self._process_webkit_folder(WEBKIT_INDEXEDDB_ROOT_PATHS)
self.log.info("Extracted a total of %d WebKit IndexedDB records",
len(self.results))

View File

@@ -28,6 +28,6 @@ class WebkitLocalStorage(WebkitBase):
}
def run(self):
self._database_from_path(WEBKIT_LOCALSTORAGE_ROOT_PATHS)
self._process_webkit_folder(WEBKIT_LOCALSTORAGE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit Local Storages",
len(self.results))

View File

@@ -20,4 +20,6 @@ class WebkitSafariViewService(WebkitBase):
log=log, results=results)
def run(self):
self._database_from_path(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS)
self._process_webkit_folder(WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS)
self.log.info("Extracted a total of %d records from WebKit SafariViewService WebsiteData",
len(self.results))

View File

@@ -0,0 +1,27 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021 The MVT Project Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .calls import Calls
from .chrome_favicon import ChromeFavicon
from .chrome_history import ChromeHistory
from .contacts import Contacts
from .firefox_favicon import FirefoxFavicon
from .firefox_history import FirefoxHistory
from .idstatuscache import IDStatusCache
from .interactionc import InteractionC
from .locationd import LocationdClients
from .net_datausage import Datausage
from .safari_browserstate import SafariBrowserState
from .safari_history import SafariHistory
from .sms import SMS
from .sms_attachments import SMSAttachments
from .webkit_resource_load_statistics import WebkitResourceLoadStatistics
from .webkit_session_resource_log import WebkitSessionResourceLog
from .whatsapp import Whatsapp
MIXED_MODULES = [Calls, ChromeFavicon, ChromeHistory, Contacts, FirefoxFavicon,
FirefoxHistory, IDStatusCache, InteractionC, LocationdClients,
Datausage, SafariBrowserState, SafariHistory, SMS, SMSAttachments,
WebkitResourceLoadStatistics, WebkitSessionResourceLog, Whatsapp,]

View File

@@ -7,7 +7,7 @@ import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
CALLS_BACKUP_IDS = [
"5a4935c78a5255723f707230a451d79c540d2741",
@@ -34,7 +34,8 @@ class Calls(IOSExtraction):
}
def run(self):
self._find_ios_database(backup_ids=CALLS_BACKUP_IDS, root_paths=CALLS_ROOT_PATHS)
self._find_ios_database(backup_ids=CALLS_BACKUP_IDS,
root_paths=CALLS_ROOT_PATHS)
self.log.info("Found Calls database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -42,17 +43,17 @@ class Calls(IOSExtraction):
cur.execute("""
SELECT
ZDATE, ZDURATION, ZLOCATION, ZADDRESS, ZSERVICE_PROVIDER
FROM ZCALLRECORD;
FROM ZCALLRECORD;
""")
names = [description[0] for description in cur.description]
for entry in cur:
for row in cur:
self.results.append({
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(entry[0])),
"duration": entry[1],
"location": entry[2],
"number": entry[3].decode("utf-8") if entry[3] and entry[3] is bytes else entry[3],
"provider": entry[4]
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])),
"duration": row[1],
"location": row[2],
"number": row[3].decode("utf-8") if row[3] and row[3] is bytes else row[3],
"provider": row[4]
})
cur.close()

View File

@@ -8,7 +8,7 @@ import sqlite3
from mvt.common.utils import (convert_chrometime_to_unix,
convert_timestamp_to_iso)
from .base import IOSExtraction
from ..base import IOSExtraction
CHROME_FAVICON_BACKUP_IDS = [
"55680ab883d0fdcffd94f959b1632e5fbbb18c5b"
@@ -45,14 +45,16 @@ class ChromeFavicon(IOSExtraction):
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS, root_paths=CHROME_FAVICON_ROOT_PATHS)
self._find_ios_database(backup_ids=CHROME_FAVICON_BACKUP_IDS,
root_paths=CHROME_FAVICON_ROOT_PATHS)
self.log.info("Found Chrome favicon cache database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
# Fetch icon cache
cur = conn.cursor()
cur.execute("""SELECT
cur.execute("""
SELECT
icon_mapping.page_url,
favicons.url,
favicon_bitmaps.last_updated,
@@ -60,20 +62,21 @@ class ChromeFavicon(IOSExtraction):
FROM icon_mapping
JOIN favicon_bitmaps ON icon_mapping.icon_id = favicon_bitmaps.icon_id
JOIN favicons ON icon_mapping.icon_id = favicons.id
ORDER BY icon_mapping.id;""")
ORDER BY icon_mapping.id;
""")
items = []
for item in cur:
last_timestamp = int(item[2]) or int(item[3])
items.append(dict(
url=item[0],
icon_url=item[1],
timestamp=last_timestamp,
isodate=convert_timestamp_to_iso(convert_chrometime_to_unix(last_timestamp)),
))
records = []
for row in cur:
last_timestamp = int(row[2]) or int(row[3])
records.append({
"url": row[0],
"icon_url": row[1],
"timestamp": last_timestamp,
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(last_timestamp)),
})
cur.close()
conn.close()
self.log.info("Extracted a total of %d favicon records", len(items))
self.results = sorted(items, key=lambda item: item["isodate"])
self.log.info("Extracted a total of %d favicon records", len(records))
self.results = sorted(records, key=lambda row: row["isodate"])

View File

@@ -8,7 +8,7 @@ import sqlite3
from mvt.common.utils import (convert_chrometime_to_unix,
convert_timestamp_to_iso)
from .base import IOSExtraction
from ..base import IOSExtraction
CHROME_HISTORY_BACKUP_IDS = [
"faf971ce92c3ac508c018dce1bef2a8b8e9838f1",
@@ -45,7 +45,8 @@ class ChromeHistory(IOSExtraction):
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS, root_paths=CHROME_HISTORY_ROOT_PATHS)
self._find_ios_database(backup_ids=CHROME_HISTORY_BACKUP_IDS,
root_paths=CHROME_HISTORY_ROOT_PATHS)
self.log.info("Found Chrome history database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -63,14 +64,14 @@ class ChromeHistory(IOSExtraction):
""")
for item in cur:
self.results.append(dict(
id=item[0],
url=item[1],
visit_id=item[2],
timestamp=item[3],
isodate=convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
redirect_source=item[4],
))
self.results.append({
"id": item[0],
"url": item[1],
"visit_id": item[2],
"timestamp": item[3],
"isodate": convert_timestamp_to_iso(convert_chrometime_to_unix(item[3])),
"redirect_source": item[4],
})
cur.close()
conn.close()

View File

@@ -5,7 +5,7 @@
import sqlite3
from .base import IOSExtraction
from ..base import IOSExtraction
CONTACTS_BACKUP_IDS = [
"31bb7ba8914766d4ba40d6dfb6113c8b614be442",
@@ -39,9 +39,9 @@ class Contacts(IOSExtraction):
""")
names = [description[0] for description in cur.description]
for entry in cur:
new_contact = dict()
for index, value in enumerate(entry):
for row in cur:
new_contact = {}
for index, value in enumerate(row):
new_contact[names[index]] = value
self.results.append(new_contact)
@@ -49,4 +49,5 @@ class Contacts(IOSExtraction):
cur.close()
conn.close()
self.log.info("Extracted a total of %d contacts from the address book", len(self.results))
self.log.info("Extracted a total of %d contacts from the address book",
len(self.results))

View File

@@ -6,10 +6,9 @@
import sqlite3
from datetime import datetime
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
FIREFOX_HISTORY_BACKUP_IDS = [
"2e57c396a35b0d1bcbc624725002d98bd61d142b",
@@ -40,11 +39,13 @@ class FirefoxFavicon(IOSExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]) or self.indicators.check_domain(result["history_url"]):
if (self.indicators.check_domain(result.get("url", "")) or
self.indicators.check_domain(result.get("history_url", ""))):
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS, root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self._find_ios_database(backup_ids=FIREFOX_HISTORY_BACKUP_IDS,
root_paths=FIREFOX_HISTORY_ROOT_PATHS)
self.log.info("Found Firefox favicon database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -65,16 +66,16 @@ class FirefoxFavicon(IOSExtraction):
""")
for item in cur:
self.results.append(dict(
id=item[0],
url=item[1],
width=item[2],
height=item[3],
type=item[4],
isodate=convert_timestamp_to_iso(datetime.utcfromtimestamp(item[5])),
history_id=item[6],
history_url=item[7]
))
self.results.append({
"id": item[0],
"url": item[1],
"width": item[2],
"height": item[3],
"type": item[4],
"isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(item[5])),
"history_id": item[6],
"history_url": item[7]
})
cur.close()
conn.close()

View File

@@ -6,10 +6,9 @@
import sqlite3
from datetime import datetime
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
FIREFOX_HISTORY_BACKUP_IDS = [
"2e57c396a35b0d1bcbc624725002d98bd61d142b",
@@ -62,15 +61,15 @@ class FirefoxHistory(IOSExtraction):
WHERE visits.siteID = history.id;
""")
for item in cur:
self.results.append(dict(
id=item[0],
isodate=convert_timestamp_to_iso(datetime.utcfromtimestamp(item[1])),
url=item[2],
title=item[3],
i1000000s_local=item[4],
type=item[5]
))
for row in cur:
self.results.append({
"id": row[0],
"isodate": convert_timestamp_to_iso(datetime.utcfromtimestamp(row[1])),
"url": row[2],
"title": row[3],
"i1000000s_local": row[4],
"type": row[5]
})
cur.close()
conn.close()

View File

@@ -4,14 +4,11 @@
# https://license.mvt.re/1.1/
import collections
import glob
import os
import biplist
import plistlib
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
IDSTATUSCACHE_BACKUP_IDS = [
"6b97989189901ceaa4e5be9b7f05fb584120e27b",
@@ -42,22 +39,24 @@ class IDStatusCache(IOSExtraction):
return
for result in self.results:
if result["user"].startswith("mailto:"):
if result.get("user", "").startswith("mailto:"):
email = result["user"][7:].strip("'")
if self.indicators.check_email(email):
self.detected.append(result)
continue
if "\\x00\\x00" in result["user"]:
if "\\x00\\x00" in result.get("user", ""):
self.log.warning("Found an ID Status Cache entry with suspicious patterns: %s",
result["user"])
result.get("user"))
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS, root_paths=IDSTATUSCACHE_ROOT_PATHS)
self._find_ios_database(backup_ids=IDSTATUSCACHE_BACKUP_IDS,
root_paths=IDSTATUSCACHE_ROOT_PATHS)
self.log.info("Found IDStatusCache plist at path: %s", self.file_path)
file_plist = biplist.readPlist(self.file_path)
with open(self.file_path, "rb") as handle:
file_plist = plistlib.load(handle)
id_status_cache_entries = []
for app in file_plist:
@@ -80,7 +79,7 @@ class IDStatusCache(IOSExtraction):
entry_counter = collections.Counter([entry["user"] for entry in id_status_cache_entries])
for entry in id_status_cache_entries:
# Add total count of occurrences to the status cache entry
# Add total count of occurrences to the status cache entry.
entry["occurrences"] = entry_counter[entry["user"]]
self.results.append(entry)

View File

@@ -4,11 +4,10 @@
# https://license.mvt.re/1.1/
import sqlite3
from base64 import b64encode
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
INTERACTIONC_BACKUP_IDS = [
"1f5a521220a3ad80ebfdc196978df8e7a2e49dee",
@@ -117,55 +116,56 @@ class InteractionC(IOSExtraction):
ZINTERACTIONS.ZGROUPNAME,
ZINTERACTIONS.ZDERIVEDINTENTIDENTIFIER,
ZINTERACTIONS.Z_PK
FROM ZINTERACTIONS
LEFT JOIN ZCONTACTS ON ZINTERACTIONS.ZSENDER = ZCONTACTS.Z_PK
LEFT JOIN Z_1INTERACTIONS ON ZINTERACTIONS.Z_PK == Z_1INTERACTIONS.Z_3INTERACTIONS
LEFT JOIN ZATTACHMENT ON Z_1INTERACTIONS.Z_1ATTACHMENTS == ZATTACHMENT.Z_PK
LEFT JOIN Z_2INTERACTIONRECIPIENT ON ZINTERACTIONS.Z_PK== Z_2INTERACTIONRECIPIENT.Z_3INTERACTIONRECIPIENT
LEFT JOIN ZCONTACTS RECEIPIENTCONACT ON Z_2INTERACTIONRECIPIENT.Z_2RECIPIENTS== RECEIPIENTCONACT.Z_PK;
FROM ZINTERACTIONS
LEFT JOIN ZCONTACTS ON ZINTERACTIONS.ZSENDER = ZCONTACTS.Z_PK
LEFT JOIN Z_1INTERACTIONS ON ZINTERACTIONS.Z_PK == Z_1INTERACTIONS.Z_3INTERACTIONS
LEFT JOIN ZATTACHMENT ON Z_1INTERACTIONS.Z_1ATTACHMENTS == ZATTACHMENT.Z_PK
LEFT JOIN Z_2INTERACTIONRECIPIENT ON ZINTERACTIONS.Z_PK== Z_2INTERACTIONRECIPIENT.Z_3INTERACTIONRECIPIENT
LEFT JOIN ZCONTACTS RECEIPIENTCONACT ON Z_2INTERACTIONRECIPIENT.Z_2RECIPIENTS== RECEIPIENTCONACT.Z_PK;
""")
names = [description[0] for description in cur.description]
for item in cur:
for row in cur:
self.results.append({
"start_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[0])),
"end_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[1])),
"bundle_id": item[2],
"account": item[3],
"target_bundle_id": item[4],
"direction": item[5],
"sender_display_name": item[6],
"sender_identifier": item[7],
"sender_personid": item[8],
"recipient_display_name": item[9],
"recipient_identifier": item[10],
"recipient_personid": item[11],
"recipient_count": item[12],
"domain_identifier": item[13],
"is_response": item[14],
"content": item[15],
"uti": item[16],
"content_url": item[17],
"size": item[18],
"photo_local_id": item[19],
"attachment_id": item[20],
"cloud_id": item[21],
"incoming_recipient_count": item[22],
"incoming_sender_count": item[23],
"outgoing_recipient_count": item[24],
"interactions_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[25])) if item[25] else None,
"contacts_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[26])) if item[26] else None,
"first_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[27])) if item[27] else None,
"first_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[28])) if item[28] else None,
"first_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[29])) if item[29] else None,
"last_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[30])) if item[30] else None,
"last_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[31])) if item[31] else None,
"last_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(item[32])) if item[32] else None,
"custom_id": item[33],
"location_uuid": item[35],
"group_name": item[36],
"derivied_intent_id": item[37],
"table_id": item[38]
"start_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[0])),
"end_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[1])),
"bundle_id": row[2],
"account": row[3],
"target_bundle_id": row[4],
"direction": row[5],
"sender_display_name": row[6],
"sender_identifier": row[7],
"sender_personid": row[8],
"recipient_display_name": row[9],
"recipient_identifier": row[10],
"recipient_personid": row[11],
"recipient_count": row[12],
"domain_identifier": row[13],
"is_response": row[14],
"content": row[15],
"uti": row[16],
"content_url": row[17],
"size": row[18],
"photo_local_id": row[19],
"attachment_id": row[20],
"cloud_id": row[21],
"incoming_recipient_count": row[22],
"incoming_sender_count": row[23],
"outgoing_recipient_count": row[24],
"interactions_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[25])) if row[25] else None,
"contacts_creation_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[26])) if row[26] else None,
"first_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[27])) if row[27] else None,
"first_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[28])) if row[28] else None,
"first_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[29])) if row[29] else None,
"last_incoming_sender_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[30])) if row[30] else None,
"last_incoming_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[31])) if row[31] else None,
"last_outgoing_recipient_date": convert_timestamp_to_iso(convert_mactime_to_unix(row[32])) if row[32] else None,
"custom_id": row[33],
"location_uuid": row[35],
"group_name": row[36],
"derivied_intent_id": row[37],
"table_id": row[38]
})
cur.close()

View File

@@ -3,14 +3,11 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import glob
import os
import biplist
import plistlib
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
LOCATIOND_BACKUP_IDS = [
"a690d7769cce8904ca2b67320b107c8fe5f79412",
@@ -27,6 +24,7 @@ class LocationdClients(IOSExtraction):
super().__init__(file_path=file_path, base_folder=base_folder,
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.timestamps = [
"ConsumptionPeriodBegin",
"ReceivingLocationInformationTimeStopped",
@@ -53,9 +51,12 @@ class LocationdClients(IOSExtraction):
return records
def run(self):
self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS, root_paths=LOCATIOND_ROOT_PATHS)
self._find_ios_database(backup_ids=LOCATIOND_BACKUP_IDS,
root_paths=LOCATIOND_ROOT_PATHS)
self.log.info("Found Locationd Clients plist at path: %s", self.file_path)
file_plist = biplist.readPlist(self.file_path)
with open(self.file_path, "rb") as handle:
file_plist = plistlib.load(handle)
for app in file_plist:
if file_plist[app] is dict:

View File

@@ -3,7 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .net_base import NetBase
from ..net_base import NetBase
DATAUSAGE_BACKUP_IDS = [
"0d609c54856a9bb2d56729df1d68f2958a88426b",
@@ -23,7 +23,8 @@ class Datausage(NetBase):
log=log, results=results)
def run(self):
self._find_ios_database(backup_ids=DATAUSAGE_BACKUP_IDS, root_paths=DATAUSAGE_ROOT_PATHS)
self._find_ios_database(backup_ids=DATAUSAGE_BACKUP_IDS,
root_paths=DATAUSAGE_ROOT_PATHS)
self.log.info("Found DataUsage database at path: %s", self.file_path)
self._extract_net_data()

View File

@@ -4,18 +4,19 @@
# https://license.mvt.re/1.1/
import io
import os
import plistlib
import sqlite3
import biplist
from mvt.common.utils import (convert_mactime_to_unix,
convert_timestamp_to_iso, keys_bytes_to_string)
from .base import IOSExtraction
from ..base import IOSExtraction
SAFARI_BROWSER_STATE_BACKUP_IDS = [
"3a47b0981ed7c10f3e2800aa66bac96a3b5db28e",
]
SAFARI_BROWSER_STATE_BACKUP_RELPATH = "Library/Safari/BrowserState.db"
SAFARI_BROWSER_STATE_ROOT_PATHS = [
"private/var/mobile/Library/Safari/BrowserState.db",
"private/var/mobile/Containers/Data/Application/*/Library/Safari/BrowserState.db",
@@ -30,6 +31,8 @@ class SafariBrowserState(IOSExtraction):
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self._session_history_count = 0
def serialize(self, record):
return {
"timestamp": record["last_viewed_timestamp"],
@@ -54,16 +57,12 @@ class SafariBrowserState(IOSExtraction):
if "entry_url" in session_entry and self.indicators.check_domain(session_entry["entry_url"]):
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=SAFARI_BROWSER_STATE_BACKUP_IDS,
root_paths=SAFARI_BROWSER_STATE_ROOT_PATHS)
self.log.info("Found Safari browser state database at path: %s", self.file_path)
def _process_browser_state_db(self, db_path):
conn = sqlite3.connect(db_path)
conn = sqlite3.connect(self.file_path)
# Fetch valid icon cache.
cur = conn.cursor()
cur.execute("""SELECT
cur.execute("""
SELECT
tabs.title,
tabs.url,
tabs.user_visible_url,
@@ -71,34 +70,43 @@ class SafariBrowserState(IOSExtraction):
tab_sessions.session_data
FROM tabs
JOIN tab_sessions ON tabs.uuid = tab_sessions.tab_uuid
ORDER BY tabs.last_viewed_time;""")
ORDER BY tabs.last_viewed_time;
""")
session_history_count = 0
for item in cur:
for row in cur:
session_entries = []
if item[4]:
if row[4]:
# Skip a 4 byte header before the plist content.
session_plist = item[4][4:]
session_data = biplist.readPlist(io.BytesIO(session_plist))
session_plist = row[4][4:]
session_data = plistlib.load(io.BytesIO(session_plist))
session_data = keys_bytes_to_string(session_data)
if "SessionHistoryEntries" in session_data["SessionHistory"]:
for session_entry in session_data["SessionHistory"]["SessionHistoryEntries"]:
session_history_count += 1
session_entries.append(dict(
entry_title=session_entry["SessionHistoryEntryOriginalURL"],
entry_url=session_entry["SessionHistoryEntryURL"],
data_length=len(session_entry["SessionHistoryEntryData"]) if "SessionHistoryEntryData" in session_entry else 0,
))
if "SessionHistoryEntries" in session_data.get("SessionHistory", {}):
for session_entry in session_data["SessionHistory"].get("SessionHistoryEntries"):
self._session_history_count += 1
session_entries.append({
"entry_title": session_entry.get("SessionHistoryEntryOriginalURL"),
"entry_url": session_entry.get("SessionHistoryEntryURL"),
"data_length": len(session_entry.get("SessionHistoryEntryData")) if "SessionHistoryEntryData" in session_entry else 0,
})
self.results.append(dict(
tab_title=item[0],
tab_url=item[1],
tab_visible_url=item[2],
last_viewed_timestamp=convert_timestamp_to_iso(convert_mactime_to_unix(item[3])),
session_data=session_entries,
))
self.results.append({
"tab_title": row[0],
"tab_url": row[1],
"tab_visible_url": row[2],
"last_viewed_timestamp": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])),
"session_data": session_entries,
"safari_browser_state_db": os.path.relpath(db_path, self.base_folder),
})
def run(self):
# TODO: Is there really only one BrowserState.db in a device?
self._find_ios_database(backup_ids=SAFARI_BROWSER_STATE_BACKUP_IDS,
root_paths=SAFARI_BROWSER_STATE_ROOT_PATHS)
self.log.info("Found Safari browser state database at path: %s", self.file_path)
self._process_browser_state_db(self.file_path)
self.log.info("Extracted a total of %d tab records and %d session history entries",
len(self.results), session_history_count)
len(self.results), self._session_history_count)

View File

@@ -3,17 +3,15 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import os
import sqlite3
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
SAFARI_HISTORY_BACKUP_IDS = [
"e74113c185fd8297e140cfcf9c99436c5cc06b57",
"1a0e7afc19d307da602ccdcece51af33afe92c53",
]
SAFARI_HISTORY_BACKUP_RELPATH = "Library/Safari/History.db"
SAFARI_HISTORY_ROOT_PATHS = [
"private/var/mobile/Library/Safari/History.db",
"private/var/mobile/Containers/Data/Application/*/Library/Safari/History.db",
@@ -81,11 +79,8 @@ class SafariHistory(IOSExtraction):
if self.indicators.check_domain(result["url"]):
self.detected.append(result)
def run(self):
self._find_ios_database(backup_ids=SAFARI_HISTORY_BACKUP_IDS, root_paths=SAFARI_HISTORY_ROOT_PATHS)
self.log.info("Found Safari history database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
def _process_history_db(self, history_path):
conn = sqlite3.connect(history_path)
cur = conn.cursor()
cur.execute("""
SELECT
@@ -100,20 +95,33 @@ class SafariHistory(IOSExtraction):
ORDER BY history_visits.visit_time;
""")
items = []
for item in cur:
items.append(dict(
id=item[0],
url=item[1],
visit_id=item[2],
timestamp=item[3],
isodate=convert_timestamp_to_iso(convert_mactime_to_unix(item[3])),
redirect_source=item[4],
redirect_destination=item[5]
))
for row in cur:
self.results.append({
"id": row[0],
"url": row[1],
"visit_id": row[2],
"timestamp": row[3],
"isodate": convert_timestamp_to_iso(convert_mactime_to_unix(row[3])),
"redirect_source": row[4],
"redirect_destination": row[5],
"safari_history_db": os.path.relpath(history_path, self.base_folder),
})
cur.close()
conn.close()
self.log.info("Extracted a total of %d history items", len(items))
self.results = items
def run(self):
if self.is_backup:
for history_file in self._get_backup_files_from_manifest(relative_path=SAFARI_HISTORY_BACKUP_RELPATH):
history_path = self._get_backup_file_from_id(history_file["file_id"])
if not history_path:
continue
self.log.info("Found Safari history database at path: %s", history_path)
self._process_history_db(history_path)
elif self.is_fs_dump:
for history_path in self._get_fs_files_from_patterns(SAFARI_HISTORY_ROOT_PATHS):
self.log.info("Found Safari history database at path: %s", history_path)
self._process_history_db(history_path)
self.log.info("Extracted a total of %d history records", len(self.results))

View File

@@ -9,7 +9,7 @@ from base64 import b64encode
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from .base import IOSExtraction
from ..base import IOSExtraction
SMS_BACKUP_IDS = [
"3d0d7e5fb2ce288813306e4d4636395e047a3d28",
@@ -41,15 +41,13 @@ class SMS(IOSExtraction):
return
for message in self.results:
if not "text" in message:
continue
message_links = check_for_links(message["text"])
message_links = check_for_links(message.get("text", ""))
if self.indicators.check_domains(message_links):
self.detected.append(message)
def run(self):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,
root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -64,7 +62,7 @@ class SMS(IOSExtraction):
names = [description[0] for description in cur.description]
for item in cur:
message = dict()
message = {}
for index, value in enumerate(item):
# We base64 escape some of the attributes that could contain
# binary data.
@@ -78,17 +76,17 @@ class SMS(IOSExtraction):
# We convert Mac's ridiculous timestamp format.
message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message["date"]))
message["direction"] = ("sent" if message["is_from_me"] == 1 else "received")
message["direction"] = ("sent" if message.get("is_from_me", 0) == 1 else "received")
# Sometimes "text" is None instead of empty string.
if message["text"] is None:
if not message.get("text", None):
message["text"] = ""
# Extract links from the SMS message.
message_links = check_for_links(message["text"])
message_links = check_for_links(message.get("text", ""))
# If we find links in the messages or if they are empty we add them to the list.
if message_links or message["text"].strip() == "":
if message_links or message.get("text", "").strip() == "":
self.results.append(message)
cur.close()

View File

@@ -6,10 +6,9 @@
import sqlite3
from base64 import b64encode
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
SMS_BACKUP_IDS = [
"3d0d7e5fb2ce288813306e4d4636395e047a3d28",
@@ -37,7 +36,8 @@ class SMSAttachments(IOSExtraction):
}
def run(self):
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
self._find_ios_database(backup_ids=SMS_BACKUP_IDS,
root_paths=SMS_ROOT_PATHS)
self.log.info("Found SMS database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -51,19 +51,20 @@ class SMSAttachments(IOSExtraction):
FROM attachment
LEFT JOIN message_attachment_join ON message_attachment_join.attachment_id = attachment.ROWID
LEFT JOIN message ON message.ROWID = message_attachment_join.message_id
LEFT JOIN handle ON handle.ROWID = message.handle_id
LEFT JOIN handle ON handle.ROWID = message.handle_id;
""")
names = [description[0] for description in cur.description]
for item in cur:
attachment = dict()
attachment = {}
for index, value in enumerate(item):
if (names[index] in ["user_info", "sticker_user_info", "attribution_info",
"ck_server_change_token_blob", "sr_ck_server_change_token_blob"]) and value:
if (names[index] in ["user_info", "sticker_user_info",
"attribution_info",
"ck_server_change_token_blob",
"sr_ck_server_change_token_blob"]) and value:
value = b64encode(value).decode()
attachment[names[index]] = value
# We convert Mac's ridiculous timestamp format.
attachment["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["created_date"]))
attachment["start_date"] = convert_timestamp_to_iso(convert_mactime_to_unix(attachment["start_date"]))
attachment["direction"] = ("sent" if attachment["is_outgoing"] == 1 else "received")

View File

@@ -7,9 +7,9 @@ import datetime
import os
import sqlite3
from mvt.common.utils import convert_mactime_to_unix, convert_timestamp_to_iso
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH = "Library/WebKit/WebsiteData/ResourceLoadStatistics/observations.db"
WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS = [
@@ -28,6 +28,8 @@ class WebkitResourceLoadStatistics(IOSExtraction):
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
self.results = {}
def check_indicators(self):
if not self.indicators:
return
@@ -44,8 +46,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
def _process_observations_db(self, db_path, key):
self.log.info("Found WebKit ResourceLoadStatistics observations.db file at path %s", db_path)
if self._is_database_malformed(db_path):
self._recover_database(db_path)
self._recover_sqlite_db_if_needed(db_path)
conn = sqlite3.connect(db_path)
cur = conn.cursor()
@@ -59,40 +60,26 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.results[key] = []
for row in cur:
self.results[key].append(dict(
domain_id=row[0],
registrable_domain=row[1],
last_seen=row[2],
had_user_interaction=bool(row[3]),
# TODO: Fix isodate.
last_seen_isodate=convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))),
))
self.results[key].append({
"domain_id": row[0],
"registrable_domain": row[1],
"last_seen": row[2],
"had_user_interaction": bool(row[3]),
"last_seen_isodate": convert_timestamp_to_iso(datetime.datetime.utcfromtimestamp(int(row[2]))),
})
if len(self.results[key]) > 0:
self.log.info("Extracted a total of %d records from %s", len(self.results[key]), db_path)
def run(self):
self.results = {}
if self.is_backup:
manifest_db_path = os.path.join(self.base_folder, "Manifest.db")
if not os.path.exists(manifest_db_path):
self.log.info("Unable to search for WebKit observations.db files in backup because of missing Manifest.db")
return
try:
conn = sqlite3.connect(manifest_db_path)
cur = conn.cursor()
cur.execute("SELECT fileID, domain FROM Files WHERE relativePath = ?;", (WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH,))
for backup_file in self._get_backup_files_from_manifest(relative_path=WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH):
db_path = os.path.join(self.base_folder, backup_file["file_id"][0:2], backup_file["file_id"])
key = f"{backup_file['domain']}/{WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH}"
self._process_observations_db(db_path=db_path, key=key)
except Exception as e:
self.log.error("Unable to search for WebKit observations.db files in backup because of failed query to Manifest.db: %s", e)
for row in cur:
file_id = row[0]
domain = row[1]
db_path = os.path.join(self.base_folder, file_id[0:2], file_id)
if os.path.exists(db_path):
self._process_observations_db(db_path=db_path, key=f"{domain}/{WEBKIT_RESOURCELOADSTATICS_BACKUP_RELPATH}")
self.log.info("Unable to search for WebKit observations.db: %s", e)
elif self.is_fs_dump:
for db_path in self._find_paths(WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS):
for db_path in self._get_fs_files_from_patterns(WEBKIT_RESOURCELOADSTATICS_ROOT_PATHS):
self._process_observations_db(db_path=db_path, key=os.path.relpath(db_path, self.base_folder))

View File

@@ -5,17 +5,16 @@
import glob
import os
import biplist
import plistlib
from mvt.common.utils import convert_timestamp_to_iso
from .base import IOSExtraction
from ..base import IOSExtraction
WEBKIT_SESSION_RESOURCE_LOG_BACKUP_IDS = [
"a500ee38053454a02e990957be8a251935e28d3f",
]
WEBKIT_SESSION_RESOURCE_LOG_BACKUP_RELPATH = "Library/WebKit/WebsiteData/ResourceLoadStatistics/full_browsing_session_resourceLog.plist"
WEBKIT_SESSION_RESOURCE_LOG_ROOT_PATHS = [
"private/var/mobile/Containers/Data/Application/*/SystemData/com.apple.SafariViewService/Library/WebKit/WebsiteData/full_browsing_session_resourceLog.plist",
"private/var/mobile/Containers/Data/Application/*/Library/WebKit/WebsiteData/ResourceLoadStatistics/full_browsing_session_resourceLog.plist",
@@ -33,28 +32,7 @@ class WebkitSessionResourceLog(IOSExtraction):
output_folder=output_folder, fast_mode=fast_mode,
log=log, results=results)
def _extract_browsing_stats(self, file_path):
items = []
file_plist = biplist.readPlist(file_path)
if "browsingStatistics" not in file_plist:
return items
browsing_stats = file_plist["browsingStatistics"]
for item in browsing_stats:
items.append(dict(
origin=item.get("PrevalentResourceOrigin", ""),
redirect_source=item.get("topFrameUniqueRedirectsFrom", ""),
redirect_destination=item.get("topFrameUniqueRedirectsTo", ""),
subframe_under_origin=item.get("subframeUnderTopFrameOrigins", ""),
subresource_under_origin=item.get("subresourceUnderTopFrameOrigins", ""),
user_interaction=item.get("hadUserInteraction"),
most_recent_interaction=convert_timestamp_to_iso(item["mostRecentUserInteraction"]),
last_seen=convert_timestamp_to_iso(item["lastSeen"]),
))
return items
self.results = {}
@staticmethod
def _extract_domains(entries):
@@ -109,32 +87,41 @@ class WebkitSessionResourceLog(IOSExtraction):
self.log.warning("Found HTTP redirect between suspicious domains: %s", redirect_path)
def _find_paths(self, root_paths):
results = {}
for root_path in root_paths:
for found_path in glob.glob(os.path.join(self.base_folder, root_path)):
if not os.path.exists(found_path):
continue
def _extract_browsing_stats(self, log_path):
items = []
key = os.path.relpath(found_path, self.base_folder)
if key not in results:
results[key] = []
with open(log_path, "rb") as handle:
file_plist = plistlib.load(handle)
return results
if "browsingStatistics" not in file_plist:
return items
browsing_stats = file_plist["browsingStatistics"]
for item in browsing_stats:
items.append({
"origin": item.get("PrevalentResourceOrigin", ""),
"redirect_source": item.get("topFrameUniqueRedirectsFrom", ""),
"redirect_destination": item.get("topFrameUniqueRedirectsTo", ""),
"subframe_under_origin": item.get("subframeUnderTopFrameOrigins", ""),
"subresource_under_origin": item.get("subresourceUnderTopFrameOrigins", ""),
"user_interaction": item.get("hadUserInteraction"),
"most_recent_interaction": convert_timestamp_to_iso(item["mostRecentUserInteraction"]),
"last_seen": convert_timestamp_to_iso(item["lastSeen"]),
})
return items
def run(self):
self.results = {}
if self.is_backup:
for log_path in self._get_backup_files_from_manifest(relative_path=WEBKIT_SESSION_RESOURCE_LOG_BACKUP_RELPATH):
self.log.info("Found Safari browsing session resource log at path: %s", log_path)
self.results[log_path] = self._extract_browsing_stats(log_path)
elif self.is_fs_dump:
for log_path in self._get_fs_files_from_patterns(WEBKIT_SESSION_RESOURCE_LOG_ROOT_PATHS):
self.log.info("Found Safari browsing session resource log at path: %s", log_path)
key = os.path.relpath(log_path, self.base_folder)
self.results[key] = self._extract_browsing_stats(log_path)
try:
self._find_ios_database(backup_ids=WEBKIT_SESSION_RESOURCE_LOG_BACKUP_IDS)
except FileNotFoundError:
pass
else:
if self.file_path:
self.results[self.file_path] = self._extract_browsing_stats(self.file_path)
return
self.results = self._find_paths(root_paths=WEBKIT_SESSION_RESOURCE_LOG_ROOT_PATHS)
for log_file in self.results.keys():
self.log.info("Found Safari browsing session resource log at path: %s", log_file)
self.results[log_file] = self._extract_browsing_stats(os.path.join(self.base_folder, log_file))
self.log.info("Extracted records from %d Safari browsing session resource logs",
len(self.results))

View File

@@ -9,7 +9,7 @@ import sqlite3
from mvt.common.utils import (check_for_links, convert_mactime_to_unix,
convert_timestamp_to_iso)
from .base import IOSExtraction
from ..base import IOSExtraction
log = logging.getLogger(__name__)
@@ -30,12 +30,12 @@ class Whatsapp(IOSExtraction):
log=log, results=results)
def serialize(self, record):
text = record["ZTEXT"].replace("\n", "\\n")
text = record.get("ZTEXT", "").replace("\n", "\\n")
return {
"timestamp": record["isodate"],
"timestamp": record.get("isodate"),
"module": self.__class__.__name__,
"event": "message",
"data": f"{text} from {record['ZFROMJID']}"
"data": f"{text} from {record.get('ZFROMJID', 'Unknown')}",
}
def check_indicators(self):
@@ -43,16 +43,13 @@ class Whatsapp(IOSExtraction):
return
for message in self.results:
if not "ZTEXT" in message:
continue
message_links = check_for_links(message["ZTEXT"])
message_links = check_for_links(message.get("ZTEXT", ""))
if self.indicators.check_domains(message_links):
self.detected.append(message)
def run(self):
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS, root_paths=WHATSAPP_ROOT_PATHS)
self._find_ios_database(backup_ids=WHATSAPP_BACKUP_IDS,
root_paths=WHATSAPP_ROOT_PATHS)
log.info("Found WhatsApp database at path: %s", self.file_path)
conn = sqlite3.connect(self.file_path)
@@ -61,15 +58,15 @@ class Whatsapp(IOSExtraction):
names = [description[0] for description in cur.description]
for message in cur:
new_message = dict()
new_message = {}
for index, value in enumerate(message):
new_message[names[index]] = value
if not new_message["ZTEXT"]:
if not new_message.get("ZTEXT", None):
continue
# We convert Mac's silly timestamp again.
new_message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(new_message["ZMESSAGEDATE"]))
new_message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(new_message.get("ZMESSAGEDATE")))
# Extract links from the WhatsApp message.
message_links = check_for_links(new_message["ZTEXT"])

View File

@@ -24,7 +24,8 @@ class NetBase(IOSExtraction):
def _extract_net_data(self):
conn = sqlite3.connect(self.file_path)
cur = conn.cursor()
cur.execute("""SELECT
cur.execute("""
SELECT
ZPROCESS.ZFIRSTTIMESTAMP,
ZPROCESS.ZTIMESTAMP,
ZPROCESS.ZPROCNAME,
@@ -38,43 +39,42 @@ class NetBase(IOSExtraction):
ZLIVEUSAGE.ZHASPROCESS,
ZLIVEUSAGE.ZTIMESTAMP
FROM ZLIVEUSAGE
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK;""")
LEFT JOIN ZPROCESS ON ZLIVEUSAGE.ZHASPROCESS = ZPROCESS.Z_PK;
""")
items = []
for item in cur:
for row in cur:
# ZPROCESS records can be missing after the JOIN. Handle NULL timestamps.
if item[0] and item[1]:
first_isodate = convert_timestamp_to_iso(convert_mactime_to_unix(item[0]))
isodate = convert_timestamp_to_iso(convert_mactime_to_unix(item[1]))
if row[0] and row[1]:
first_isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[0]))
isodate = convert_timestamp_to_iso(convert_mactime_to_unix(row[1]))
else:
first_isodate = item[0]
isodate = item[1]
first_isodate = row[0]
isodate = row[1]
if item[11]:
live_timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(item[11]))
if row[11]:
live_timestamp = convert_timestamp_to_iso(convert_mactime_to_unix(row[11]))
else:
live_timestamp = ""
items.append(dict(
first_isodate=first_isodate,
isodate=isodate,
proc_name=item[2],
bundle_id=item[3],
proc_id=item[4],
wifi_in=item[5],
wifi_out=item[6],
wwan_in=item[7],
wwan_out=item[8],
live_id=item[9],
live_proc_id=item[10],
live_isodate=live_timestamp,
))
self.results.append({
"first_isodate": first_isodate,
"isodate": isodate,
"proc_name": row[2],
"bundle_id": row[3],
"proc_id": row[4],
"wifi_in": row[5],
"wifi_out": row[6],
"wwan_in": row[7],
"wwan_out": row[8],
"live_id": row[9],
"live_proc_id": row[10],
"live_isodate": live_timestamp,
})
cur.close()
conn.close()
self.log.info("Extracted information on %d processes", len(items))
self.results = items
self.log.info("Extracted information on %d processes", len(self.results))
def serialize(self, record):
record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})"
@@ -104,6 +104,7 @@ class NetBase(IOSExtraction):
"data": record_data,
}
])
return records
def _find_suspicious_processes(self):

View File

@@ -8,7 +8,7 @@ import os
from setuptools import find_packages, setup
__package_name__ = "mvt"
__version__ = "1.1.0"
__version__ = "1.2.0"
__description__ = "Mobile Verification Toolkit"
this_directory = os.path.abspath(os.path.dirname(__file__))
@@ -25,7 +25,6 @@ requires = (
"requests>=2.26.0",
"simplejson>=3.17.3",
# iOS dependencies:
"biplist>=1.0.3",
"iOSbackup>=0.9.912",
# Android dependencies:
"adb-shell>=0.4.0",