From bdaaf1543458f9a1cef2ed136042f08a65b2c093 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Thu, 17 Feb 2022 18:17:38 +0100 Subject: [PATCH 01/10] Add initial implementation of SMS extraction using ADB --- mvt/android/modules/adb/sms.py | 94 ++++++++++++++++++++++++++++------ 1 file changed, 77 insertions(+), 17 deletions(-) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 7ba0233..3614144 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -6,8 +6,15 @@ import logging import os import sqlite3 +import tarfile +import io +import zlib +import base64 +import json +import datetime from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.common.module import InsufficientPrivileges from .base import AndroidExtraction @@ -16,11 +23,11 @@ log = logging.getLogger(__name__) SMS_BUGLE_PATH = "data/data/com.google.android.apps.messaging/databases/bugle_db" SMS_BUGLE_QUERY = """ SELECT - ppl.normalized_destination AS number, + ppl.normalized_destination AS address, p.timestamp AS timestamp, CASE WHEN m.sender_id IN (SELECT _id FROM participants WHERE contact_id=-1) -THEN 2 ELSE 1 END incoming, p.text AS text +THEN 2 ELSE 1 END incoming, p.text AS body FROM messages m, conversations c, parts p, participants ppl, conversation_participants cp WHERE (m.conversation_id = c._id) @@ -32,10 +39,10 @@ WHERE (m.conversation_id = c._id) SMS_MMSSMS_PATH = "data/data/com.android.providers.telephony/databases/mmssms.db" SMS_MMSMS_QUERY = """ SELECT - address AS number, + address AS address, date_sent AS timestamp, type as incoming, - body AS text + body AS body FROM sms; """ @@ -50,12 +57,12 @@ class SMS(AndroidExtraction): log=log, results=results) def serialize(self, record): - text = record["text"].replace("\n", "\\n") + body = record["body"].replace("\n", "\\n") return { "timestamp": record["isodate"], "module": self.__class__.__name__, "event": f"sms_{record['direction']}", - "data": f"{record['number']}: \"{text}\"" + "data": f"{record['address']}: \"{body}\"" } def check_indicators(self): @@ -63,10 +70,10 @@ class SMS(AndroidExtraction): return for message in self.results: - if "text" not in message: + if "body" not in message: continue - message_links = check_for_links(message["text"]) + message_links = check_for_links(message["body"]) if self.indicators.check_domains(message_links): self.detected.append(message) @@ -96,7 +103,7 @@ class SMS(AndroidExtraction): # If we find links in the messages or if they are empty we add # them to the list of results. - if check_for_links(message["text"]) or message["text"].strip() == "": + if check_for_links(message["body"]) or message["body"].strip() == "": self.results.append(message) cur.close() @@ -104,12 +111,65 @@ class SMS(AndroidExtraction): log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + def _extract_sms_from_backup_tar(self, tar_data): + # Extract data from generated tar file + tar_bytes = io.BytesIO(tar_data) + tar = tarfile.open(fileobj=tar_bytes, mode='r') + for member in tar.getmembers(): + if not member.name.endswith("_sms_backup"): + continue + + self.log.debug("Extracting SMS messages from backup file %s", member.name) + sms_part_zlib = zlib.decompress(tar.extractfile(member).read()) + json_data = json.loads(sms_part_zlib) + + # TODO: Copied from SMS module. Refactor to avoid duplication + for message in json_data: + utc_timestamp = datetime.datetime.utcfromtimestamp(int(message["date"]) / 1000) + message["isodate"] = convert_timestamp_to_iso(utc_timestamp) + message["direction"] = ("sent" if int(message["date_sent"]) else "received") + + message_links = check_for_links(message["body"]) + if message_links or message["body"].strip() == "": + self.results.append(message) + + log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + + def _extract_sms_adb(self): + """Use the Android backup command to extract SMS data from the native SMS app + + It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression + algorithim. This module only supports an unencrypted ADB backup. + """ + Run ADB command to create a backup of SMS app + self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a") + + # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... + backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64") + backup_output = base64.b64decode(backup_output_b64) + if not backup_output.startswith(b"ANDROID BACKUP"): + self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") + return + + [magic_header, version, is_compressed, encryption, tar_data] = backup_output.split(b"\n", 4) + if encryption != b"none" or int(is_compressed): + self.log.error("The backup is encrypted or compressed and cannot be parsed. " + "[version: %s, encryption: %s, compression: %s]", version, encryption, is_compressed) + return + + self._extract_sms_from_backup_tar(tar_data) + def run(self): - if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): - self.SMS_DB_TYPE = 1 - self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) - elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): - self.SMS_DB_TYPE = 2 - self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) - else: - self.log.error("No SMS database found") + try: + if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): + self.SMS_DB_TYPE = 1 + self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) + elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): + self.SMS_DB_TYPE = 2 + self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) + return + except InsufficientPrivileges: + pass + + self.log.warn("No SMS database found. Trying extraction of SMS data using Android backup feature.") + self._extract_sms_adb() From cd0e7d987984b391af0a6dd373ba4258e2d95a2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Fri, 18 Feb 2022 15:09:08 +0100 Subject: [PATCH 02/10] Fix syntax error with broken comment --- mvt/android/modules/adb/sms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 3614144..0798f50 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -141,9 +141,9 @@ class SMS(AndroidExtraction): It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression algorithim. This module only supports an unencrypted ADB backup. """ - Run ADB command to create a backup of SMS app self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a") + # Run ADB command to create a backup of SMS app # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64") backup_output = base64.b64decode(backup_output_b64) From 8eb30e3a0296918988c56762a09763fb5c99c9ed Mon Sep 17 00:00:00 2001 From: tek Date: Wed, 23 Feb 2022 15:07:13 +0100 Subject: [PATCH 03/10] Improves android backup parsing for check-backup and check-adb --- mvt/android/cli.py | 8 - mvt/android/modules/adb/sms.py | 57 ++--- mvt/android/modules/backup/sms.py | 62 ++++-- mvt/android/parsers/backup.py | 195 ++++++++++++++++++ setup.py | 1 + tests/android/__init__.py | 0 tests/android/test_backup_module.py | 29 +++ tests/android/test_backup_parser.py | 50 +++++ .../d_f/000000_sms_backup | 2 + tests/artifacts/android_backup/backup.ab | Bin 0 -> 5144 bytes tests/artifacts/android_backup/backup2.ab | Bin 0 -> 5653 bytes tests/ios/test_backup_info.py | 4 +- tests/ios/test_datausage.py | 6 +- tests/ios/test_manifest.py | 6 +- tests/ios/test_safari_browserstate.py | 6 +- tests/ios/test_tcc.py | 6 +- tests/utils.py | 6 +- 17 files changed, 357 insertions(+), 81 deletions(-) create mode 100644 mvt/android/parsers/backup.py create mode 100644 tests/android/__init__.py create mode 100644 tests/android/test_backup_module.py create mode 100644 tests/android/test_backup_parser.py create mode 100644 tests/artifacts/android_backup/apps/com.android.providers.telephony/d_f/000000_sms_backup create mode 100644 tests/artifacts/android_backup/backup.ab create mode 100644 tests/artifacts/android_backup/backup2.ab diff --git a/mvt/android/cli.py b/mvt/android/cli.py index e070edb..9e7d220 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -256,14 +256,6 @@ def check_backup(ctx, iocs, output, backup_path, serial): indicators = Indicators(log=log) indicators.load_indicators_files(iocs) - if os.path.isfile(backup_path): - log.critical("The path you specified is a not a folder!") - - if os.path.basename(backup_path) == "backup.ab": - log.info("You can use ABE (https://github.com/nelenkov/android-backup-extractor) " - "to extract 'backup.ab' files!") - ctx.exit(1) - for module in BACKUP_MODULES: m = module(base_folder=backup_path, output_folder=output, log=logging.getLogger(module.__module__)) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 0798f50..b8f9333 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -6,14 +6,11 @@ import logging import os import sqlite3 -import tarfile -import io -import zlib import base64 -import json -import datetime +import getpass from mvt.common.utils import check_for_links, convert_timestamp_to_iso +from mvt.android.parsers.backup import parse_ab_header, parse_sms_backup, InvalidBackupPassword from mvt.common.module import InsufficientPrivileges from .base import AndroidExtraction @@ -73,6 +70,7 @@ class SMS(AndroidExtraction): if "body" not in message: continue + # FIXME: check links exported from the body previously message_links = check_for_links(message["body"]) if self.indicators.check_domains(message_links): self.detected.append(message) @@ -86,9 +84,9 @@ class SMS(AndroidExtraction): conn = sqlite3.connect(db_path) cur = conn.cursor() - if (self.SMS_DB_TYPE == 1): + if self.SMS_DB_TYPE == 1: cur.execute(SMS_BUGLE_QUERY) - elif (self.SMS_DB_TYPE == 2): + elif self.SMS_DB_TYPE == 2: cur.execute(SMS_MMSMS_QUERY) names = [description[0] for description in cur.description] @@ -111,29 +109,6 @@ class SMS(AndroidExtraction): log.info("Extracted a total of %d SMS messages containing links", len(self.results)) - def _extract_sms_from_backup_tar(self, tar_data): - # Extract data from generated tar file - tar_bytes = io.BytesIO(tar_data) - tar = tarfile.open(fileobj=tar_bytes, mode='r') - for member in tar.getmembers(): - if not member.name.endswith("_sms_backup"): - continue - - self.log.debug("Extracting SMS messages from backup file %s", member.name) - sms_part_zlib = zlib.decompress(tar.extractfile(member).read()) - json_data = json.loads(sms_part_zlib) - - # TODO: Copied from SMS module. Refactor to avoid duplication - for message in json_data: - utc_timestamp = datetime.datetime.utcfromtimestamp(int(message["date"]) / 1000) - message["isodate"] = convert_timestamp_to_iso(utc_timestamp) - message["direction"] = ("sent" if int(message["date_sent"]) else "received") - - message_links = check_for_links(message["body"]) - if message_links or message["body"].strip() == "": - self.results.append(message) - - log.info("Extracted a total of %d SMS messages containing links", len(self.results)) def _extract_sms_adb(self): """Use the Android backup command to extract SMS data from the native SMS app @@ -141,23 +116,33 @@ class SMS(AndroidExtraction): It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression algorithim. This module only supports an unencrypted ADB backup. """ + # Run ADB command to create a backup of SMS app self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a") # Run ADB command to create a backup of SMS app # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64") backup_output = base64.b64decode(backup_output_b64) - if not backup_output.startswith(b"ANDROID BACKUP"): + header = parse_ab_header(backup_output) + if not header["backup"]: self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") return - [magic_header, version, is_compressed, encryption, tar_data] = backup_output.split(b"\n", 4) - if encryption != b"none" or int(is_compressed): - self.log.error("The backup is encrypted or compressed and cannot be parsed. " - "[version: %s, encryption: %s, compression: %s]", version, encryption, is_compressed) + if header["compression"]: + self.log.error("The backup is compressed and cannot be parsed, quitting...") return - self._extract_sms_from_backup_tar(tar_data) + pwd = None + if header["encryption"] != "none": + pwd = getpass.getpass(prompt="Backup Password: ", stream=None) + + + try: + self.results = parse_sms_backup(backup_output, password=pwd) + except InvalidBackupPassword: + self.info.log("Invalid backup password") + return + log.info("Extracted a total of %d SMS messages containing links", len(self.results)) def run(self): try: diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index 4ab2b1f..12d3384 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -3,16 +3,15 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import json import os -import zlib +import getpass from mvt.common.module import MVTModule from mvt.common.utils import check_for_links +from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword class SMS(MVTModule): - def __init__(self, file_path=None, base_folder=None, output_folder=None, fast_mode=False, log=None, results=[]): super().__init__(file_path=file_path, base_folder=base_folder, @@ -35,30 +34,49 @@ class SMS(MVTModule): self.log.info("Processing SMS backup file at %s", file_path) with open(file_path, "rb") as handle: - data = zlib.decompress(handle.read()) - json_data = json.loads(data) + data = handle.read() - for entry in json_data: - message_links = check_for_links(entry["body"]) - - # If we find links in the messages or if they are empty we add them to the list. - if message_links or entry["body"].strip() == "": - self.results.append(entry) + self.results = parse_sms_file(data) def run(self): - app_folder = os.path.join(self.base_folder, - "apps", - "com.android.providers.telephony", - "d_f") - if not os.path.exists(app_folder): - raise FileNotFoundError("Unable to find the SMS backup folder") + # FIXME: this should be done in the Module code if there are other modules on backups + if os.path.isfile(self.base_folder): + # ab file + with open(self.base_folder, "rb") as handle: + data = handle.read() + header = parse_ab_header(data) + if not header["backup"]: + self.log.info("Not a valid Android Backup file, quitting...") + return + if header["compression"]: + self.log.info("MVT does not support compressed backups, either regenerate the backup with the -nocompress option or use ANdroid Backup Extractor to convert it to a tar file") + self.log.info("Quitting...") + return + pwd = None + if header["encryption"] != "none": + pwd = getpass.getpass(prompt="Backup Password: ", stream=None) - for file_name in os.listdir(app_folder): - if not file_name.endswith("_sms_backup"): - continue + try: + messages = parse_sms_backup(data, password=pwd) + except InvalidBackupPassword: + self.log.info("Invalid password, impossible de decrypt the backup, quitting...") + return + self.results = messages + else: + app_folder = os.path.join(self.base_folder, + "apps", + "com.android.providers.telephony", + "d_f") + if not os.path.exists(app_folder): + self.log.info("Unable to find the SMS backup folder") + return - file_path = os.path.join(app_folder, file_name) - self._process_sms_file(file_path) + for file_name in os.listdir(app_folder): + if not file_name.endswith("_sms_backup"): + continue + + file_path = os.path.join(app_folder, file_name) + self._process_sms_file(file_path) self.log.info("Extracted a total of %d SMS messages containing links", len(self.results)) diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py new file mode 100644 index 0000000..9a83b84 --- /dev/null +++ b/mvt/android/parsers/backup.py @@ -0,0 +1,195 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import io +import zlib +import json +import tarfile +import hashlib +import datetime +import pyaes +from mvt.common.utils import check_for_links, convert_timestamp_to_iso + + +PBKDF2_KEY_SIZE = 32 + + +class AndroidBackupParseError(Exception): + """Exception raised file parsing an android backup file""" + + +class AndroidBackupNotImplemented(AndroidBackupParseError): + pass + + +class InvalidBackupPassword(AndroidBackupParseError): + pass + + +def decrypt_master_key_blob(key, aes_iv, cipher_text): + """ + Decrypt the master key blob with AES + From : https://github.com/FloatingOctothorpe/dump_android_backup + """ + + aes = pyaes.AESModeOfOperationCBC(key, aes_iv) + + plain_text = b'' + while len(plain_text) < len(cipher_text): + offset = len(plain_text) + plain_text += aes.decrypt(cipher_text[offset:(offset + 16)]) + + blob = io.BytesIO(plain_text) + master_iv_length = ord(blob.read(1)) + master_iv = blob.read(master_iv_length) + master_key_length = ord(blob.read(1)) + master_key = blob.read(master_key_length) + master_key_checksum_length = ord(blob.read(1)) + master_key_checksum = blob.read(master_key_checksum_length) + + return master_iv, master_key, master_key_checksum + + +def to_utf8_bytes(input_bytes): + """Emulate bytes being converted into a "UTF8 byte array" + For more info see the Bouncy Castle Crypto package Strings.toUTF8ByteArray + method: + https://github.com/bcgit/bc-java/blob/master/core/src/main/java/org/bouncycastle/util/Strings.java#L142 + From https://github.com/FloatingOctothorpe/dump_android_backup + """ + output = [] + for byte in input_bytes: + if byte < ord(b'\x80'): + output.append(byte) + else: + output.append(ord('\xef') | (byte >> 12)) + output.append(ord('\xbc') | ((byte >> 6) & ord('\x3f'))) + output.append(ord('\x80') | (byte & ord('\x3f'))) + return bytes(output) + + +def parse_sms_backup(data, password=None): + """ + Parse a backup file and returns SMS in it + """ + tardata = parse_backup_file(data, password=password) + return parse_tar_for_sms(tardata) + + +def parse_ab_header(data): + """ + Parse the header of an Android Backup file + Returns a dict {'backup': True, 'compression': False, + 'encryption': "none", 'version': 4} + """ + if data.startswith(b"ANDROID BACKUP"): + [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) + return { + "backup": True, + "compression": (is_compressed == b"1"), + "version": int(version), + "encryption": encryption.decode("utf-8") + } + return { + "backup": False, + "compression": None, + "version": None, + "encryption": None + } + + +def parse_backup_file(data, password=None): + """ + Parse an ab file, returns a tar file + Inspired by https://github.com/FloatingOctothorpe/dump_android_backup + """ + if not data.startswith(b"ANDROID BACKUP"): + raise AndroidBackupParseError("Invalid file header") + + [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) + version = int(version) + is_compressed = int(is_compressed) + if is_compressed == 1: + raise AndroidBackupNotImplemented("Compression is not implemented") + + if encryption != b"none": + if encryption != b"AES-256": + raise AndroidBackupNotImplemented("Encryption Algorithm not implemented") + if password is None: + raise InvalidBackupPassword() + [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = tar_data.split(b"\n", 5) + user_salt = bytes.fromhex(user_salt.decode("utf-8")) + checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) + pbkdf2_rounds = int(pbkdf2_rounds) + user_iv = bytes.fromhex(user_iv.decode("utf-8")) + master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) + + key = hashlib.pbkdf2_hmac('sha1', + password.encode('utf-8'), + user_salt, + pbkdf2_rounds, + PBKDF2_KEY_SIZE) + try: + [master_iv, master_key, master_key_checksum] = decrypt_master_key_blob(key, user_iv, master_key_blob) + except TypeError: + raise InvalidBackupPassword() + + if version > 1: + hmac_mk = to_utf8_bytes(master_key) + else: + hmac_mk = master_key + + calculated_checksum = hashlib.pbkdf2_hmac('sha1', + hmac_mk, + checksum_salt, + pbkdf2_rounds, + PBKDF2_KEY_SIZE) + + if master_key_checksum != calculated_checksum: + raise InvalidBackupPassword() + + decrypter = pyaes.Decrypter(pyaes.AESModeOfOperationCBC(master_key, master_iv)) + tar_data = decrypter.feed(encrypted_data) + + return tar_data + + +def parse_tar_for_sms(data): + """ + Extract SMS from a tar backup archive + Returns an array of SMS + """ + dbytes = io.BytesIO(data) + tar = tarfile.open(fileobj=dbytes) + try: + member = tar.getmember("apps/com.android.providers.telephony/d_f/000000_sms_backup") + except KeyError: + return [] + + dhandler = tar.extractfile(member) + return parse_sms_file(dhandler.read()) + + +def parse_sms_file(data): + """ + Parse an SMS file extracted from a folder + Returns a list of SMS entries + """ + res = [] + data = zlib.decompress(data) + json_data = json.loads(data) + + for entry in json_data: + message_links = check_for_links(entry["body"]) + utc_timestamp = datetime.datetime.utcfromtimestamp(int(entry["date"]) / 1000) + entry["isodate"] = convert_timestamp_to_iso(utc_timestamp) + entry["direction"] = ("sent" if int(entry["date_sent"]) else "received") + + # If we find links in the messages or if they are empty we add them to the list. + if message_links or entry["body"].strip() == "": + entry["links"] = message_links + res.append(entry) + + return res diff --git a/setup.py b/setup.py index fe71b9e..ea00db6 100755 --- a/setup.py +++ b/setup.py @@ -29,6 +29,7 @@ requires = ( # Android dependencies: "adb-shell>=0.4.2", "libusb1>=2.0.1", + "pyaes>=1.6.1" ) diff --git a/tests/android/__init__.py b/tests/android/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/android/test_backup_module.py b/tests/android/test_backup_module.py new file mode 100644 index 0000000..347e4e3 --- /dev/null +++ b/tests/android/test_backup_module.py @@ -0,0 +1,29 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +import os + +from mvt.android.modules.backup.sms import SMS +from mvt.common.module import run_module + +from ..utils import get_android_backup_folder + + +class TestBackupModule: + def test_module_folder(self): + fpath = get_android_backup_folder() + mod = SMS(base_folder=fpath, log=logging) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 + assert mod.results[0]["links"][0] == "https://google.com/" + + def test_module_file(self): + fpath = os.path.join(get_android_backup_folder(), "backup.ab") + mod = SMS(base_folder=fpath, log=logging) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 diff --git a/tests/android/test_backup_parser.py b/tests/android/test_backup_parser.py new file mode 100644 index 0000000..ca754cb --- /dev/null +++ b/tests/android/test_backup_parser.py @@ -0,0 +1,50 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +import hashlib + +from mvt.android.parsers.backup import parse_backup_file, parse_tar_for_sms + +from ..utils import get_artifact + + +class TestBackupParsing: + def test_parsing_noencryption(self): + file = get_artifact("android_backup/backup.ab") + with open(file, "rb") as f: + data = f.read() + ddata = parse_backup_file(data) + + m = hashlib.sha256() + m.update(ddata) + assert m.hexdigest() == "0799b583788908f06bccb854608cede375041ee878722703a39182edeb008324" + sms = parse_tar_for_sms(ddata) + assert isinstance(sms, list) == True + assert len(sms) == 1 + assert len(sms[0]["links"]) == 1 + assert sms[0]["links"][0] == "https://google.com/" + + def test_parsing_encryption(self): + file = get_artifact("android_backup/backup2.ab") + with open(file, "rb") as f: + data = f.read() + ddata = parse_backup_file(data, password="123456") + + m = hashlib.sha256() + m.update(ddata) + assert m.hexdigest() == "f365ace1effbc4902c6aeba241ca61544f8a96ad456c1861808ea87b7dd03896" + sms = parse_tar_for_sms(ddata) + assert isinstance(sms, list) == True + assert len(sms) == 1 + assert len(sms[0]["links"]) == 1 + assert sms[0]["links"][0] == "https://google.com/" + + + + + + + diff --git a/tests/artifacts/android_backup/apps/com.android.providers.telephony/d_f/000000_sms_backup b/tests/artifacts/android_backup/apps/com.android.providers.telephony/d_f/000000_sms_backup new file mode 100644 index 0000000..14d85a2 --- /dev/null +++ b/tests/artifacts/android_backup/apps/com.android.providers.telephony/d_f/000000_sms_backup @@ -0,0 +1,2 @@ +xEN] +0 JZku;wC[xwS|ʇ"|2کG0<%90G01괎9'd8#L*Khtd5w#<)RH!!~2zs*!yUwHk|!#Wp2&Ky-Ry! zaf<#IUuz!xe>cdZ^KAae%atJ6`gPX@sKa5_+v(~0F67kQolbu+*oj_TlWdFAEhuTN!4^qIwfPvvxA~PPD zsOA)Ga$=Rc=>^pvB56$8v8jeVpoN-nUW|z*wUV=-{$!|XFV$L!+?66%C#C@;UUG3Y zrDCnshBCI*Yp5ZF7N}d*?5fL@l9y6@YM3G5Gz$L4sM4&OMlFeILZB59AOPw{6VPfZ zVldrg1HjuFnox}ep=NF+CdZ%wO}%xj3s$t(9wQs4p_!&6S+SfZ09dudI$iIz)#OVg zF2zOXQs9E9QT@zKELftVoXOgt-uEun6$kjL4r<+lUA(gAR6|rtS#9nq20S}QEFgBK zTpJ@amOwFfGvKQLY_=zf1ZSLE0bfuMK&Y~CGgf+nLC&3B_mO&6w5UdEv0wuN1Lj1u zT8=;&SkM(^wGLm&RGLw^VDA!l7h17dCE3IQ>w}W-TA=7F_hmk{vRolBJg$ofgbz## zhQz4kIN>YK~0 z0m#KW_Bkg-lR@LNsnF2c(I=()p_%nyMXIJsoCRnZdXkt9V4CW?lm;?EcL}K41SGh~ z!Pt&K8(@dl*L`oHaSp!40@hR4s&NjUO;po05BTbR0HHV4h3GW}<3K$qN=|6u1B7cX zr4v^lqBxU*O^5%*fFoK2pe2^XsyKw8WA~00?+L(C1A9hshN#Z;RB|&=S0E{+9JK}{ zML2_s8e1tva|3T@Ho#JM*#w{q`2xYZO2vV4UB&vj`BY8EaRxq8Mu%t9K+~y#3zPQ+ z0Z=On(v$*bp(>3WsIy1*#y4%Cy4JwFKKGolYDni!6v55NwZ-ZTC#j|-O|8^vrE%O z>h8S@=eNBg0X;O8%5AXW<-G`6t-h2Dg`>xw|YW=Z;k9>B`4bSc0cmGw-d~o3OFW-Io{5v4{r+{ zF+xK%Gc+n0CMz5gRQA19e4zWEUDFyDk6j|QFm!P*W$!mA4n=OlNg|h1g zsu!~u^mz}b(_YIK`J{$!f3)Lia;4!VfR0b2)q3AuyC{e2^tii6%?REa1WJ)ICmRih zLxlir7q*dnY!9;(5bWoJFkR~H(JFS2`RPz&zV6gkc&1u_&7WGcFHKnN+W)pn-w@@U zvpG#6YFnT1F{`oFo?0oZtS~2M;|1>~kLNt)OJZ=u<6t2CDT=}vA_ZySDyc?rBXN~U7Ri0``7V;`Q(&UWt631@nBrG)_OCgAh7 ze*=|q&Mv#f2mmq0qt6{<;_uRDFj*~|P|0g2^9kT~Bn=2r8AyDFCU76fZ7i5>EgZ`@ zmWI&4cUxo8s-8rEj~ByrBlTFQTXP1Ds)EXtK+O0@_p4S!pJ2!MGO;+E zY^JeFjAiEQrhDYUE#cb!_+rgI0BocRqFp>^df4?2MoK*Dt0SCVXUSTX8AXlGM^Xom zte$V-w0?nxi_qV0KJ3TzWckrn1QdGI65tN33t}x%?UlkuPK0-Nti717ZQGOV$JYQ; zD&=EP9KLx}6w{^d2-~w?wEZ4G>~wa6gE>m=Hu*_d?En}BN#G7@xEz6n^tRC%ty+6Y z#2hJcfFfCAbJ=s{_6YfL%zY0e&BN>xRhDd4>6P4kAH_!i9mUfCLZC5+-hcXv^)@jNVqmJa0C_k0x3kg}IOz|63HL&XE z@Xo?-oyJ5%Bf( zFrUNIvUuSP9ncx<<}+Bfy|6tGo0y9gzoLLQjD1!(vRP%z7w05er-*h*1-aUYj01*W#c_`L3_A%-}+?xq#<{~gH)nw zj#JM?HY+$3rMyG=;EN6a4O{0DRIk_^^x8g724(%l%Ii}I_gEB57*gms^mDZ@N|Mz5 z0=2316+Io=2dM2Z69o4p3hZh}&0Z(``hFKW_NP|GJzy$DPOh%&)W-tQs2uT(+!3=) z!j^rF3%*E3AKw3rLm(H0)dzC9-lW|)mlL%iqSXJdQC`xPIIPzYZQcvl9pzivwocjc z_57z)r2KHB?!%yNk`+ARP8?woDP3$Z;q!uo5L?sA_3p@wjCs+YCFBo{7v^}ZGtK9? zfT8yq`g2W#^oGwovU^be@0wH$b+&$S%A~E$pCMlK>7)GyfKjA`orodMDuvpdO@Sqq zCV&9$=(Jh&l;&F3R%g*$y?vaXvcKQ@R<6dN`Ay?R0fzmACRXS@UD5+=WU6H{Q}vzg zBpG@Y#orqnWt8lVNr76YTq8+r^3eJ#4^_@6FX0}W3bkbQ;e#$=U1QX&NSjw4>n1lU zYU}kQ!Doa~jsc<{fj`4DTYPBUM@MTxhUaqK__01FK(qymq$EW8-%6(UB0ViZ`-{4^ zS@*2V>XL^xFpBqPFhlC5?QiK z4WG*B@w_%=e_9bg(jO|CdB%Yf$}-Mh45d?x4vTmJl<%m~yR(R=A!w}@rZ1s4HTbK^@W3O;FpE^< zNHT4)9+?eTUnPGUJpipL4RT-j4Y1bW*g4n+Bs?o}zh>_NMXRM~>#97OoU+Q!gFI3! z$QE|hQ@X7NxDY^X4Jn%hlQJXKTjzhsQsE;+`jxBz^B^mrD&_s{CBZ|f#tLZPU!!`Q zW1&?Lu6C$M;sV)~)vP0L@p{9$1MLF;VmnHCD>2+qbZ(P(YbkM@>_2BW^t8l>xPJ)^ z7A8KqzFpo4vp!vPB(N>o!E|M?-+GQju%ta8YD+YB(MFb){a1ll>7|bG_UXa$fXpZq zS5SXTIhEUMa@0gSZu>Z1Wve*MeUI&8jjVCN@F-_$Mt!Urvcu$IzWSevf{yyU(;0OHoG~^I{vjr&g*8`@}%43 zGN8ErktZo6U6o3?g=9y6l{OMLQqJY=QV%7}V zOr@Cxw_RQ=`o-uH!y|t;=S!{Bsb{v8M%HeDN5lQv)F=xI{z7al3W7{zW&QNc@1U}8 zEiI7du@O?zh<{m-=YIHps*~;>b~)u*TTS>LD#=WikOxLS>;}^VSNiHCpS=F zuit7}vrsEtWBDz;W{R>JU>jE|!(l0RtGAiF=hUY%ojlJn(P)ADc|94YS_pm=Z^U3(e?nF!2hFYhUxj zctZJC?zTWkoGs+{;W_k&Ot2Q%(rKO~P~UU2>hQvnTHT|{8{_V_u>oO=VK5t}UQf2~ zB7)D&6$`VCMY@e>B&z~c^O;5TWh9noK`7IJ$K*kJc6Ah2F^s$kNz9!mgY?(H&9`&4 z$>Zp$9Zzs{pkERZDyeych{$7{$(;_|qf8kr2l;7CQe&4gv6Vn0T)!@a#}n)9!$+WB zp;(25@SZGpB9=qw53)BK#A+ENisDY4(0y@wEMX>Af~=v*(R36MHX>py8KoYVvqq1h z5D8pKIg`hm40b2TD3;=xa3pkaQp9$xor$K&W$TJtx}~O9G1d*7GvR`Pa($jBkIHuI zCZf5T5+tx=N z_V#ST#3y>J%Gs$i&qwGX=>%&EMm)2yu`q(>n=ZeuM*$K}c>s+2vb z8<5JC9e=U!0D#cj^rdLgJbk#hZ!BlZ560i_vVm&{a4%lXBXi1_@AvSdb zr*O6wsZ;2_kInPY>ZOZvlbl0QtJ4c{2FI9$b3Bx~7;3D)LGkxtK5U*QlAV)uP$*X$ z&+_i?x}Va!7)(O#0$ry2WpsoNHG}wa(=-FzF|Fq?4vq(rh!LIn^i}SP8-A<~Ip{RP zApNpVQ<6Y=YLpuh5a`>PGS;_;}@n{fGQ7bur8D@Mslm!+xczB_P8i zw6|(C;G+^e^o`Rl-71seH+-f-fuw``CLUF59>gXAllyD zLQFU{j1{uwe0(L_C-l3JOy6NcnqO{=qanFZ!${vpvhtieXI5vRZ8`|-C_L|j=Yc|t zXtOc878kbCf0N0jkv~jKx^*H6TI#N9MuVJvxxJJ*HJS=>f%s|{1$A4Sqq=6`k_RC6 zEgjcUy>nVGm+!TTFjQX8R^&GchhFcYOQ&irI( zEIZB08BmdM?ff~;5RD$n=ILIZp;^|0<(RAE5&&$$Z*rU5>p~gwi5^+?eha<_xUl&&R2o=XCqiYd)^Oq+A>xw-$8uvH zi5`=SSRA@3TfTo@H`BvZ14~^CI^sH^f)EetT*b$Vh7eYkT>xroF*DODANuc8qy^Vg zE}0Xjdwp(U(Z7T(v zF|I`KaUu9mhCKk#vf?&_SJwm|eOx!~KlbDA^7$&)tnSY*2_23+NSQj&)IpRt}$ zoC$hw=f0*^!VN{M%jjB~!kxpF2Jm#qGHp5sMtG2_pm#W(sn8o1eor6K&0x zFBEyyi#~(!*}*VPQ`anrCn>;fx)UAFPBwbQ=3}-wW9S|?CokF@BF$NU19>BWG+ivl zsnLTA!bwy@#SX8H?6uiI2Hs_G%cOv(`NO;~cH$2F|K;V`^r6mr>!DQR45v@nK+};i z_Eu(npqP>NNE;$bvu9%tNPE;ZJL7hxwWL#of_`)XUqkhi2RyCbd|t~XrKlmmjJcUt z7z<_C#l9l^o12NvcAE>)m=Q5X{G84P&j42IsYR)5z~owG5xhtSJ6Q^Tf$6;qCr+Vj z>dH5C%zUsj{(%(FT*}rWjj=8{Ajo&o#B=OR`Bb1&Q8h@GwGDj3`d1yr`;e;;_$E>I&Nu#FhV2| zl{mjuSd+qGND)-Kn}<6l2|n}Fo~d*fr{MWm7b&g*@Y2$T$J_#f3#(oG(}3!^7!ZFI z6w>qacXC?E$H{qt&2|61(J?N(4z#jGh9>~k{z`CgTzEi%?QE0P!^8aNlF339)ayGg z>p#H!ok)$k#~D0JxT0$3dbh&XQ_+>_tqFNT_<-r@h_WUOjp!8D4Sr{A52TV>9v0_Z zce8+oz`~U5dm8Sr8X9O7E1ET`JOg5B;sp($hT1!Kola+C^F#MZ%7*7csg!i(I87g& zOj3)>HTh<9NU_>jun*W+)zCJhFn|G;NF04r$1|DEnQ(e zw905A4nSJ|-t`b9wtm@9si@`#B)2*dJ_~zKn-1w56}c5gGg;F-STOARg)OU-AMDtU z#Mfi(P(+>mJwsxSPq(at$iB%FXNfu{CXSXC2Z~nNojomCkc}jW#hatvbSipZfIw{E zT^Rdkv;-QtXa-#5?TARo%JG&ST+&aT=<{2 zvKseOmU&b?Z#s(SvP7Btmy=L!+0+^gr!(E;7)khFI;X26GM)h)0(CrUn*-+MUP58CeCrO9l=DiyA zeZxQ(>o?L}iR<~zs9Dww(3`TL0vi){8!RwY{6a(4?&*~U(M#E(N0Zf%AxM^kdaF@6 z;XkcJjj}n9jEwDjp@o5!rDyr@(C4Y#_nK9qsGi%v17K;*w*0XEcPkJ|y0i?jd7N@Q zG-KOe<+-VW7DSC5aZniWWLvPeXi?JUS;5tpC*`n)YXEt2 z`!MBJ=qU>h&_RyUQQlG=_y@vF!eh#Ws|aP*o73hlR??wLC2#AnHvhcV90n$&r|SqY z#zP@EtBHFtF#J&2Sn+T=f^)zzdCs%Di0eEfMf;xuDvocNQy1l#QdTK4=Az! z+UpFl=`H6XyE$It0JYM-$;d8dXjiI+C|b zPTV-mSvT6ohyHEA#G!^s-*g|qaaVFE=c5{_&`O^`5Eo&f!NVajQy#YchzjwxMT1{J vw1zVpAE8iN_mC!tp4FrXE;pxi Date: Wed, 23 Feb 2022 16:18:45 +0100 Subject: [PATCH 04/10] Adds partial compression support in Android Backup parsing --- mvt/android/modules/adb/sms.py | 12 ++++++------ mvt/android/modules/backup/sms.py | 12 +++++++----- mvt/android/parsers/backup.py | 16 ++++++++++------ tests/android/test_backup_parser.py | 18 +++++++++++++----- tests/artifacts/android_backup/backup3.ab | Bin 0 -> 1393 bytes 5 files changed, 36 insertions(+), 22 deletions(-) create mode 100644 tests/artifacts/android_backup/backup3.ab diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index b8f9333..10000db 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -10,7 +10,7 @@ import base64 import getpass from mvt.common.utils import check_for_links, convert_timestamp_to_iso -from mvt.android.parsers.backup import parse_ab_header, parse_sms_backup, InvalidBackupPassword +from mvt.android.parsers.backup import parse_ab_header, parse_sms_backup, InvalidBackupPassword, AndroidBackupParsingError from mvt.common.module import InsufficientPrivileges from .base import AndroidExtraction @@ -128,10 +128,6 @@ class SMS(AndroidExtraction): self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") return - if header["compression"]: - self.log.error("The backup is compressed and cannot be parsed, quitting...") - return - pwd = None if header["encryption"] != "none": pwd = getpass.getpass(prompt="Backup Password: ", stream=None) @@ -140,8 +136,12 @@ class SMS(AndroidExtraction): try: self.results = parse_sms_backup(backup_output, password=pwd) except InvalidBackupPassword: - self.info.log("Invalid backup password") + self.log.info("Invalid backup password") return + except AndroidBackupParsingError: + self.log.info("Impossible to read SMS from the Android Backup, please extract the SMS and try extracting it with Android Backup Extractor") + return + log.info("Extracted a total of %d SMS messages containing links", len(self.results)) def run(self): diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index 12d3384..71247d4 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -8,7 +8,7 @@ import getpass from mvt.common.module import MVTModule from mvt.common.utils import check_for_links -from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword +from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword, AndroidBackupParsingError class SMS(MVTModule): @@ -48,10 +48,7 @@ class SMS(MVTModule): if not header["backup"]: self.log.info("Not a valid Android Backup file, quitting...") return - if header["compression"]: - self.log.info("MVT does not support compressed backups, either regenerate the backup with the -nocompress option or use ANdroid Backup Extractor to convert it to a tar file") - self.log.info("Quitting...") - return + pwd = None if header["encryption"] != "none": pwd = getpass.getpass(prompt="Backup Password: ", stream=None) @@ -61,6 +58,11 @@ class SMS(MVTModule): except InvalidBackupPassword: self.log.info("Invalid password, impossible de decrypt the backup, quitting...") return + except AndroidBackupParsingError: + self.log.info("Impossible to extract data from this Android Backup, please regenerate the backup using the -nocompress option or extract it using Android Backup Extractor instead.") + self.log.info("Quitting...") + return + self.results = messages else: app_folder = os.path.join(self.base_folder, diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index 9a83b84..180e287 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -16,15 +16,15 @@ from mvt.common.utils import check_for_links, convert_timestamp_to_iso PBKDF2_KEY_SIZE = 32 -class AndroidBackupParseError(Exception): +class AndroidBackupParsingError(Exception): """Exception raised file parsing an android backup file""" -class AndroidBackupNotImplemented(AndroidBackupParseError): +class AndroidBackupNotImplemented(AndroidBackupParsingError): pass -class InvalidBackupPassword(AndroidBackupParseError): +class InvalidBackupPassword(AndroidBackupParsingError): pass @@ -106,13 +106,11 @@ def parse_backup_file(data, password=None): Inspired by https://github.com/FloatingOctothorpe/dump_android_backup """ if not data.startswith(b"ANDROID BACKUP"): - raise AndroidBackupParseError("Invalid file header") + raise AndroidBackupParsingError("Invalid file header") [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) version = int(version) is_compressed = int(is_compressed) - if is_compressed == 1: - raise AndroidBackupNotImplemented("Compression is not implemented") if encryption != b"none": if encryption != b"AES-256": @@ -153,6 +151,12 @@ def parse_backup_file(data, password=None): decrypter = pyaes.Decrypter(pyaes.AESModeOfOperationCBC(master_key, master_iv)) tar_data = decrypter.feed(encrypted_data) + if is_compressed == 1: + try: + tar_data = zlib.decompress(tar_data) + except zlib.error: + raise AndroidBackupParsingError("Impossible to decompress the file") + return tar_data diff --git a/tests/android/test_backup_parser.py b/tests/android/test_backup_parser.py index ca754cb..766875c 100644 --- a/tests/android/test_backup_parser.py +++ b/tests/android/test_backup_parser.py @@ -42,9 +42,17 @@ class TestBackupParsing: assert len(sms[0]["links"]) == 1 assert sms[0]["links"][0] == "https://google.com/" + def test_parsing_compression(self): + file = get_artifact("android_backup/backup3.ab") + with open(file, "rb") as f: + data = f.read() + ddata = parse_backup_file(data) - - - - - + m = hashlib.sha256() + m.update(ddata) + assert m.hexdigest() == "33e73df2ede9798dcb3a85c06200ee41c8f52dd2f2e50ffafcceb0407bc13e3a" + sms = parse_tar_for_sms(ddata) + assert isinstance(sms, list) == True + assert len(sms) == 1 + assert len(sms[0]["links"]) == 1 + assert sms[0]["links"][0] == "https://google.com/" diff --git a/tests/artifacts/android_backup/backup3.ab b/tests/artifacts/android_backup/backup3.ab new file mode 100644 index 0000000000000000000000000000000000000000..6ab7bc9ff05efa9790fc24cc3efed106bb1a25d5 GIT binary patch literal 1393 zcmV-%1&;bbPDD~qNkkw*K|@PbPzp5)F$!*PZecy*R;3Vk*J%mOL3ZkKj6chv!4I~N*5}~4+LP67iaW$c@81D**cnCfw5<4 zx9)-~v;XJdxr-EkPRjI~qdA^)l!biv%Xe3s5;+oLoI-`D<645I(x|KU=JJvOZwwppB z7oFhXKp%c~OU+}W#@xO|mBGLWbXB7ed@pg&3B4XwyiCRx5SQ7bvR?9~|$ z85KE?i7aIsD41|SYrzvAR3u66wG>6vI^{BGb50WvL(-d9=_pOmQhm$0qWsp8JlBH7 z7Gvy@hBGU^`9e7dt!?Cv8G^S-@tBfncP5>BRZ|2zm2>jEm|a-dgCs^o~ZZ!E-1j8{C9TE?1g9?Z~vY zFr{Ner`jwx;-cu5DJqqL#*`L@)($ek_ag$e8J*x0M{xs=cC;OmH^S(#bBUp+3ct+v zCf;LZNhZU})L&yvO&Fy4n1baf9@GO+as~*d7H*~1LEJ)2>ZPDJJ)Wz8BYHw>XsVgb zun9r6#{ftIp|MP0PYft*UPi8^3)BsrlygbeB9fw9Kt)Ke)@r2#tc3+VFnp1Su0Xy( zu)b0Cpj_V&KlhNE45S@v~fBWFE!`t`V|J>f&HlJL3^uXg^-g3u_`}RJ3 z)AJwiKlS^MU%d4Gov)w#`o#CY?w2D!9NKcjoAl9|O?O{YZhZAqdgYrfYrZ1B_2h>K z%jfsp{@JGOTdzHS@77<|u3vcL@U|m=p4fZrg+C5HS&zSW-_dkn-}>;#YwHglx{F`_ z?$rF+cYofz@aWq=UH9$H;hovlab?F}00030{~86OU=)m800000|NjF3#3t;$eha*e literal 0 HcmV?d00001 From 86c79075ff51a3b84cefbe81e1b20d29ccc8a65f Mon Sep 17 00:00:00 2001 From: tek Date: Fri, 4 Mar 2022 10:10:56 +0100 Subject: [PATCH 05/10] Reorganise code for backup modules --- mvt/android/cli.py | 48 +++++++++++++++++++++ mvt/android/modules/backup/base.py | 46 ++++++++++++++++++++ mvt/android/modules/backup/sms.py | 66 ++++------------------------- tests/android/test_backup_module.py | 53 ++++++++++++++++++++++- 4 files changed, 154 insertions(+), 59 deletions(-) create mode 100644 mvt/android/modules/backup/base.py diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 9e7d220..38e2d04 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -5,6 +5,9 @@ import logging import os +import getpass +import io +import tarfile from pathlib import Path from zipfile import ZipFile @@ -17,6 +20,8 @@ from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC, from mvt.common.indicators import Indicators, download_indicators_files from mvt.common.logo import logo from mvt.common.module import run_module, save_timeline +from mvt.android.parsers.backup import parse_ab_header, parse_backup_file +from mvt.android.parsers.backup import InvalidBackupPassword, AndroidBackupParsingError from .download_apks import DownloadAPKs from .lookups.koodous import koodous_lookup @@ -246,6 +251,44 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): def check_backup(ctx, iocs, output, backup_path, serial): log.info("Checking ADB backup located at: %s", backup_path) + if os.path.isfile(backup_path): + # AB File + backup_type = "ab" + with open(backup_path, "rb") as handle: + data = handle.read() + header = parse_ab_header(data) + if not header["backup"]: + log.critical("Invalid backup format, file should be in .ab format") + ctx.exit(1) + password = None + if header["encryption"] != "none": + password = getpass.getpass(prompt="Backup Password: ", stream=None) + try: + tardata = parse_backup_file(data, password=password) + except InvalidBackupPassword: + log.critical("Invalid backup password") + ctx.exit(1) + except AndroidBackupParsingError: + log.critical("Impossible to parse this backup file, please use Android Backup Extractor instead") + ctx.exit(1) + + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + + elif os.path.isdir(backup_path): + backup_type = "folder" + backup_path = Path(backup_path).absolute().as_posix() + files = [] + for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): + for fname in subfiles: + files.append(os.path.relpath(os.path.join(root, fname), backup_path)) + else: + log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file") + ctx.exit(1) + if output and not os.path.exists(output): try: os.makedirs(output) @@ -265,6 +308,11 @@ def check_backup(ctx, iocs, output, backup_path, serial): if serial: m.serial = serial + if backup_type == "folder": + m.from_folder(backup_path, files) + else: + m.from_ab(backup_path, tar, files) + run_module(m) diff --git a/mvt/android/modules/backup/base.py b/mvt/android/modules/backup/base.py new file mode 100644 index 0000000..8af5dc8 --- /dev/null +++ b/mvt/android/modules/backup/base.py @@ -0,0 +1,46 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import os +import fnmatch + +from mvt.common.module import MVTModule + + +class BackupExtraction(MVTModule): + """This class provides a base for all backup extractios modules""" + ab = None + + def from_folder(self, backup_path, files): + """ + Get all the files and list them + """ + self.backup_path = backup_path + self.files = files + + def from_ab(self, file_path, tar, files): + """ + Extract the files + """ + self.ab = file_path + self.tar = tar + self.files = files + + def _get_files_by_pattern(self, pattern): + return fnmatch.filter(self.files, pattern) + + def _get_file_content(self, file_path): + if self.ab: + try: + member = self.tar.getmember(file_path) + except KeyError: + return None + handle = self.tar.extractfile(member) + else: + handle = open(os.path.join(self.backup_path, file_path), "rb") + + data = handle.read() + handle.close() + return data diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index 71247d4..2f09ccf 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -3,20 +3,18 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import os -import getpass - -from mvt.common.module import MVTModule +from mvt.android.modules.backup.base import BackupExtraction from mvt.common.utils import check_for_links -from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword, AndroidBackupParsingError +from mvt.android.parsers.backup import parse_sms_file -class SMS(MVTModule): +class SMS(BackupExtraction): def __init__(self, file_path=None, base_folder=None, output_folder=None, fast_mode=False, log=None, results=[]): super().__init__(file_path=file_path, base_folder=base_folder, output_folder=output_folder, fast_mode=fast_mode, log=log, results=results) + self.results = [] def check_indicators(self): if not self.indicators: @@ -26,59 +24,13 @@ class SMS(MVTModule): if "body" not in message: continue - message_links = check_for_links(message["body"]) - if self.indicators.check_domains(message_links): + if self.indicators.check_domains(message["links"]): self.detected.append(message) - def _process_sms_file(self, file_path): - self.log.info("Processing SMS backup file at %s", file_path) - - with open(file_path, "rb") as handle: - data = handle.read() - - self.results = parse_sms_file(data) - def run(self): - # FIXME: this should be done in the Module code if there are other modules on backups - if os.path.isfile(self.base_folder): - # ab file - with open(self.base_folder, "rb") as handle: - data = handle.read() - header = parse_ab_header(data) - if not header["backup"]: - self.log.info("Not a valid Android Backup file, quitting...") - return - - pwd = None - if header["encryption"] != "none": - pwd = getpass.getpass(prompt="Backup Password: ", stream=None) - - try: - messages = parse_sms_backup(data, password=pwd) - except InvalidBackupPassword: - self.log.info("Invalid password, impossible de decrypt the backup, quitting...") - return - except AndroidBackupParsingError: - self.log.info("Impossible to extract data from this Android Backup, please regenerate the backup using the -nocompress option or extract it using Android Backup Extractor instead.") - self.log.info("Quitting...") - return - - self.results = messages - else: - app_folder = os.path.join(self.base_folder, - "apps", - "com.android.providers.telephony", - "d_f") - if not os.path.exists(app_folder): - self.log.info("Unable to find the SMS backup folder") - return - - for file_name in os.listdir(app_folder): - if not file_name.endswith("_sms_backup"): - continue - - file_path = os.path.join(app_folder, file_name) - self._process_sms_file(file_path) - + for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"): + self.log.info("Processing SMS backup file at %s", file) + data = self._get_file_content(file) + self.results.extend(parse_sms_file(data)) self.log.info("Extracted a total of %d SMS messages containing links", len(self.results)) diff --git a/tests/android/test_backup_module.py b/tests/android/test_backup_module.py index 347e4e3..7324931 100644 --- a/tests/android/test_backup_module.py +++ b/tests/android/test_backup_module.py @@ -5,17 +5,25 @@ import logging import os +import tarfile +import io from mvt.android.modules.backup.sms import SMS from mvt.common.module import run_module +from mvt.android.parsers.backup import parse_backup_file from ..utils import get_android_backup_folder class TestBackupModule: def test_module_folder(self): - fpath = get_android_backup_folder() - mod = SMS(base_folder=fpath, log=logging) + backup_path = get_android_backup_folder() + mod = SMS(base_folder=backup_path, log=logging) + files = [] + for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): + for fname in subfiles: + files.append(os.path.relpath(os.path.join(root, fname), backup_path)) + mod.from_folder(backup_path, files) run_module(mod) assert len(mod.results) == 1 assert len(mod.results[0]["links"]) == 1 @@ -24,6 +32,47 @@ class TestBackupModule: def test_module_file(self): fpath = os.path.join(get_android_backup_folder(), "backup.ab") mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data) + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 + + def test_module_file2(self): + fpath = os.path.join(get_android_backup_folder(), "backup2.ab") + mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data, password="123456") + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 + + def test_module_file3(self): + fpath = os.path.join(get_android_backup_folder(), "backup3.ab") + mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data) + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) run_module(mod) assert len(mod.results) == 1 assert len(mod.results[0]["links"]) == 1 From 6cc67f3c1df6f2b610afe27a18780f4e14e10217 Mon Sep 17 00:00:00 2001 From: tek Date: Fri, 4 Mar 2022 12:34:54 +0100 Subject: [PATCH 06/10] Fixes testing issue --- tests/ios/test_sms.py | 6 +++--- tests/test_check_backup.py | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/ios/test_sms.py b/tests/ios/test_sms.py index 16adf3b..860f1dc 100644 --- a/tests/ios/test_sms.py +++ b/tests/ios/test_sms.py @@ -9,19 +9,19 @@ from mvt.common.indicators import Indicators from mvt.common.module import run_module from mvt.ios.modules.mixed.sms import SMS -from ..utils import get_backup_folder +from ..utils import get_ios_backup_folder class TestSMSModule: def test_sms(self): - m = SMS(base_folder=get_backup_folder(), log=logging, results=[]) + m = SMS(base_folder=get_ios_backup_folder(), log=logging, results=[]) run_module(m) assert len(m.results) == 1 assert len(m.timeline) == 1 assert len(m.detected) == 0 def test_detection(self, indicator_file): - m = SMS(base_folder=get_backup_folder(), log=logging, results=[]) + m = SMS(base_folder=get_ios_backup_folder(), log=logging, results=[]) ind = Indicators(log=logging) ind.parse_stix2(indicator_file) # Adds a file that exists in the manifest. diff --git a/tests/test_check_backup.py b/tests/test_check_backup.py index e71ef4b..c099ddd 100644 --- a/tests/test_check_backup.py +++ b/tests/test_check_backup.py @@ -7,12 +7,12 @@ from click.testing import CliRunner from mvt.ios.cli import check_backup -from .utils import get_backup_folder +from .utils import get_ios_backup_folder class TestCheckBackupCommand: def test_check(self): runner = CliRunner() - path = get_backup_folder() + path = get_ios_backup_folder() result = runner.invoke(check_backup, [path]) assert result.exit_code == 0 From a4d08f8f35df7501d069704d0a9582fa3a2ba3aa Mon Sep 17 00:00:00 2001 From: tek Date: Fri, 4 Mar 2022 15:05:10 +0100 Subject: [PATCH 07/10] Replaces pyaes with cryptography and reorganize backup parser code --- mvt/android/parsers/backup.py | 79 +++++++++++++++-------------------- setup.py | 2 +- 2 files changed, 34 insertions(+), 47 deletions(-) diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index 180e287..ee396d6 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -7,9 +7,11 @@ import io import zlib import json import tarfile -import hashlib import datetime -import pyaes +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives import padding from mvt.common.utils import check_for_links, convert_timestamp_to_iso @@ -28,37 +30,7 @@ class InvalidBackupPassword(AndroidBackupParsingError): pass -def decrypt_master_key_blob(key, aes_iv, cipher_text): - """ - Decrypt the master key blob with AES - From : https://github.com/FloatingOctothorpe/dump_android_backup - """ - - aes = pyaes.AESModeOfOperationCBC(key, aes_iv) - - plain_text = b'' - while len(plain_text) < len(cipher_text): - offset = len(plain_text) - plain_text += aes.decrypt(cipher_text[offset:(offset + 16)]) - - blob = io.BytesIO(plain_text) - master_iv_length = ord(blob.read(1)) - master_iv = blob.read(master_iv_length) - master_key_length = ord(blob.read(1)) - master_key = blob.read(master_key_length) - master_key_checksum_length = ord(blob.read(1)) - master_key_checksum = blob.read(master_key_checksum_length) - - return master_iv, master_key, master_key_checksum - - def to_utf8_bytes(input_bytes): - """Emulate bytes being converted into a "UTF8 byte array" - For more info see the Bouncy Castle Crypto package Strings.toUTF8ByteArray - method: - https://github.com/bcgit/bc-java/blob/master/core/src/main/java/org/bouncycastle/util/Strings.java#L142 - From https://github.com/FloatingOctothorpe/dump_android_backup - """ output = [] for byte in input_bytes: if byte < ord(b'\x80'): @@ -103,7 +75,6 @@ def parse_ab_header(data): def parse_backup_file(data, password=None): """ Parse an ab file, returns a tar file - Inspired by https://github.com/FloatingOctothorpe/dump_android_backup """ if not data.startswith(b"ANDROID BACKUP"): raise AndroidBackupParsingError("Invalid file header") @@ -124,13 +95,25 @@ def parse_backup_file(data, password=None): user_iv = bytes.fromhex(user_iv.decode("utf-8")) master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) - key = hashlib.pbkdf2_hmac('sha1', - password.encode('utf-8'), - user_salt, - pbkdf2_rounds, - PBKDF2_KEY_SIZE) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA1(), + length=32, + salt=user_salt, + iterations=pbkdf2_rounds) + key = kdf.derive(password.encode("utf-8")) + + cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv)) + decryptor = cipher.decryptor() try: - [master_iv, master_key, master_key_checksum] = decrypt_master_key_blob(key, user_iv, master_key_blob) + plain_text = decryptor.update(master_key_blob) + decryptor.finalize() + + blob = io.BytesIO(plain_text) + master_iv_length = ord(blob.read(1)) + master_iv = blob.read(master_iv_length) + master_key_length = ord(blob.read(1)) + master_key = blob.read(master_key_length) + master_key_checksum_length = ord(blob.read(1)) + master_key_checksum = blob.read(master_key_checksum_length) except TypeError: raise InvalidBackupPassword() @@ -139,17 +122,21 @@ def parse_backup_file(data, password=None): else: hmac_mk = master_key - calculated_checksum = hashlib.pbkdf2_hmac('sha1', - hmac_mk, - checksum_salt, - pbkdf2_rounds, - PBKDF2_KEY_SIZE) + kdf = PBKDF2HMAC( + algorithm=hashes.SHA1(), + length=32, + salt=checksum_salt, + iterations=pbkdf2_rounds) + calculated_checksum = kdf.derive(hmac_mk) if master_key_checksum != calculated_checksum: raise InvalidBackupPassword() - decrypter = pyaes.Decrypter(pyaes.AESModeOfOperationCBC(master_key, master_iv)) - tar_data = decrypter.feed(encrypted_data) + cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) + decryptor = cipher.decryptor() + tar_data = decryptor.update(encrypted_data) + decryptor.finalize() + unpadder = padding.PKCS7(128).unpadder() + tar_data = data = unpadder.update(tar_data) if is_compressed == 1: try: diff --git a/setup.py b/setup.py index ea00db6..6a39db0 100755 --- a/setup.py +++ b/setup.py @@ -29,7 +29,7 @@ requires = ( # Android dependencies: "adb-shell>=0.4.2", "libusb1>=2.0.1", - "pyaes>=1.6.1" + "cryptography>=36.0.1" ) From b44c67e699dd96bce838f07bf7c6457ea59a0b08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Fri, 4 Mar 2022 17:04:32 +0100 Subject: [PATCH 08/10] Refactor some of the decryption code --- mvt/android/parsers/backup.py | 135 ++++++++++++++++++++-------------- 1 file changed, 78 insertions(+), 57 deletions(-) diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index ee396d6..c9c2a1d 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -64,6 +64,7 @@ def parse_ab_header(data): "version": int(version), "encryption": encryption.decode("utf-8") } + return { "backup": False, "compression": None, @@ -71,6 +72,78 @@ def parse_ab_header(data): "encryption": None } +def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_blob, format_version, checksum_salt): + """Generate AES key from user password uisng PBKDF2 + + The backup master key is extracted from the master key blog after decryption. + """ + # Derive key from password using PBKDF2 + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds) + key = kdf.derive(password.encode("utf-8")) + + # Decrypt master key blob + cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv)) + decryptor = cipher.decryptor() + try: + decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize() + except TypeError: + raise InvalidBackupPassword() + + # Extract key and IV from decrypted blob. + key_blob = io.BytesIO(decryted_master_key_blob) + master_iv_length = ord(key_blob.read(1)) + master_iv = key_blob.read(master_iv_length) + + master_key_length = ord(key_blob.read(1)) + master_key = key_blob.read(master_key_length) + + master_key_checksum_length = ord(key_blob.read(1)) + master_key_checksum = key_blob.read(master_key_checksum_length) + + # Handle quirky encoding of master key bytes in Android original Java crypto code + if format_version > 1: + hmac_mk = to_utf8_bytes(master_key) + else: + hmac_mk = master_key + + # Derive checksum to confirm successful backup decryption. + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds) + calculated_checksum = kdf.derive(hmac_mk) + + if master_key_checksum != calculated_checksum: + raise InvalidBackupPassword() + + return master_key, master_iv + +def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version): + """ + Generate encryption keyffrom password and do decryption + """ + if encryption_algo != b"AES-256": + raise AndroidBackupNotImplemented("Encryption Algorithm not implemented") + + if password is None: + raise InvalidBackupPassword() + + [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5) + user_salt = bytes.fromhex(user_salt.decode("utf-8")) + checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) + pbkdf2_rounds = int(pbkdf2_rounds) + user_iv = bytes.fromhex(user_iv.decode("utf-8")) + master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) + + # Derive decryption master key from password + master_key, master_iv = decrypt_master_key(password=password, user_salt=user_salt, user_iv=user_iv, + pbkdf2_rounds=pbkdf2_rounds, master_key_blob=master_key_blob, + format_version=format_version, checksum_salt=checksum_salt) + + # Decrypt and unpad backup data using derivied key + cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) + decryptor = cipher.decryptor() + decrypted_tar = decryptor.update(encrypted_data) + decryptor.finalize() + + unpadder = padding.PKCS7(128).unpadder() + return unpadder.update(decrypted_tar) def parse_backup_file(data, password=None): """ @@ -79,70 +152,18 @@ def parse_backup_file(data, password=None): if not data.startswith(b"ANDROID BACKUP"): raise AndroidBackupParsingError("Invalid file header") - [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) + [magic_header, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4) version = int(version) is_compressed = int(is_compressed) - if encryption != b"none": - if encryption != b"AES-256": - raise AndroidBackupNotImplemented("Encryption Algorithm not implemented") - if password is None: - raise InvalidBackupPassword() - [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = tar_data.split(b"\n", 5) - user_salt = bytes.fromhex(user_salt.decode("utf-8")) - checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) - pbkdf2_rounds = int(pbkdf2_rounds) - user_iv = bytes.fromhex(user_iv.decode("utf-8")) - master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) + if encryption_algo != b"none": + tar_data = decrypt_backup_data(tar_data, password, encryption_algo, format_version=version) - kdf = PBKDF2HMAC( - algorithm=hashes.SHA1(), - length=32, - salt=user_salt, - iterations=pbkdf2_rounds) - key = kdf.derive(password.encode("utf-8")) - - cipher = Cipher(algorithms.AES(key), modes.CBC(user_iv)) - decryptor = cipher.decryptor() - try: - plain_text = decryptor.update(master_key_blob) + decryptor.finalize() - - blob = io.BytesIO(plain_text) - master_iv_length = ord(blob.read(1)) - master_iv = blob.read(master_iv_length) - master_key_length = ord(blob.read(1)) - master_key = blob.read(master_key_length) - master_key_checksum_length = ord(blob.read(1)) - master_key_checksum = blob.read(master_key_checksum_length) - except TypeError: - raise InvalidBackupPassword() - - if version > 1: - hmac_mk = to_utf8_bytes(master_key) - else: - hmac_mk = master_key - - kdf = PBKDF2HMAC( - algorithm=hashes.SHA1(), - length=32, - salt=checksum_salt, - iterations=pbkdf2_rounds) - calculated_checksum = kdf.derive(hmac_mk) - - if master_key_checksum != calculated_checksum: - raise InvalidBackupPassword() - - cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) - decryptor = cipher.decryptor() - tar_data = decryptor.update(encrypted_data) + decryptor.finalize() - unpadder = padding.PKCS7(128).unpadder() - tar_data = data = unpadder.update(tar_data) - - if is_compressed == 1: + if is_compressed: try: tar_data = zlib.decompress(tar_data) except zlib.error: - raise AndroidBackupParsingError("Impossible to decompress the file") + raise AndroidBackupParsingError("Impossible to decompress the backup file") return tar_data From be511dcb515b74f0f6973852b811a052ccf9bdaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Fri, 4 Mar 2022 17:06:10 +0100 Subject: [PATCH 09/10] Refactor SMS ADB code to use backup functions --- mvt/android/modules/adb/base.py | 28 ++++++++++++++++++++++++++++ mvt/android/modules/adb/sms.py | 25 +++---------------------- mvt/android/parsers/backup.py | 8 -------- 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index 2ad9b87..6d4bfb8 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -19,6 +19,7 @@ from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError, from usb1 import USBErrorAccess, USBErrorBusy from mvt.common.module import InsufficientPrivileges, MVTModule +from mvt.android.parsers.backup import parse_ab_header, parse_backup_file log = logging.getLogger(__name__) @@ -243,6 +244,33 @@ class AndroidExtraction(MVTModule): # Disconnect from the device. self._adb_disconnect() + def _generate_backup(package_name): + # Run ADB command to create a backup of SMS app + self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") + + # Run ADB command to create a backup of SMS app + # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... + backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format(package_name)) + backup_output = base64.b64decode(backup_output_b64) + header = parse_ab_header(backup_output) + if not header["backup"]: + self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") + return + + if header["encryption"] == "none": + return parse_backup_file(backup_output, password=None) + + # Backup encrypted. Request password from user. + while password_retry in range(0, 3): + backup_password = getpass.getpass(prompt="Backup Password: ", stream=None) + try: + decrypted_backup_tar = parse_backup_file(backup_output, backup_password) + return decrypted_backup_tar + except InvalidBackupPassword: + self.log.info("Invalid backup password") + + self.log.warn("All attempts to decrypt backup with password failed!") + def run(self): """Run the main procedure.""" raise NotImplementedError diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 10000db..d323cd1 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -10,7 +10,7 @@ import base64 import getpass from mvt.common.utils import check_for_links, convert_timestamp_to_iso -from mvt.android.parsers.backup import parse_ab_header, parse_sms_backup, InvalidBackupPassword, AndroidBackupParsingError +from mvt.android.parsers.backup import parse_tar_for_sms, AndroidBackupParsingError from mvt.common.module import InsufficientPrivileges from .base import AndroidExtraction @@ -116,28 +116,9 @@ class SMS(AndroidExtraction): It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression algorithim. This module only supports an unencrypted ADB backup. """ - # Run ADB command to create a backup of SMS app - self.log.warning("Please check phone and accept Android backup prompt. Do not set an encryption password. \a") - - # Run ADB command to create a backup of SMS app - # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... - backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress com.android.providers.telephony | base64") - backup_output = base64.b64decode(backup_output_b64) - header = parse_ab_header(backup_output) - if not header["backup"]: - self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") - return - - pwd = None - if header["encryption"] != "none": - pwd = getpass.getpass(prompt="Backup Password: ", stream=None) - - + backup_tar = self._generate_backup("com.android.providers.telephony") try: - self.results = parse_sms_backup(backup_output, password=pwd) - except InvalidBackupPassword: - self.log.info("Invalid backup password") - return + self.results = parse_tar_for_sms(backup_tar) except AndroidBackupParsingError: self.log.info("Impossible to read SMS from the Android Backup, please extract the SMS and try extracting it with Android Backup Extractor") return diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index c9c2a1d..85b1865 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -42,14 +42,6 @@ def to_utf8_bytes(input_bytes): return bytes(output) -def parse_sms_backup(data, password=None): - """ - Parse a backup file and returns SMS in it - """ - tardata = parse_backup_file(data, password=password) - return parse_tar_for_sms(tardata) - - def parse_ab_header(data): """ Parse the header of an Android Backup file From ac26aa964a9c619f6313adfbb317fb43e5068ccd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Fri, 4 Mar 2022 17:24:26 +0100 Subject: [PATCH 10/10] Fix exception with bad password --- mvt/android/modules/adb/base.py | 9 ++++++--- mvt/android/modules/adb/sms.py | 3 +++ mvt/android/parsers/backup.py | 22 +++++++++++----------- 3 files changed, 20 insertions(+), 14 deletions(-) diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index 6d4bfb8..9214ede 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -10,6 +10,8 @@ import string import sys import tempfile import time +import base64 +import getpass from adb_shell.adb_device import AdbDeviceTcp, AdbDeviceUsb from adb_shell.auth.keygen import keygen, write_public_keyfile @@ -19,7 +21,7 @@ from adb_shell.exceptions import (AdbCommandFailureException, DeviceAuthError, from usb1 import USBErrorAccess, USBErrorBusy from mvt.common.module import InsufficientPrivileges, MVTModule -from mvt.android.parsers.backup import parse_ab_header, parse_backup_file +from mvt.android.parsers.backup import parse_ab_header, parse_backup_file, InvalidBackupPassword log = logging.getLogger(__name__) @@ -244,7 +246,7 @@ class AndroidExtraction(MVTModule): # Disconnect from the device. self._adb_disconnect() - def _generate_backup(package_name): + def _generate_backup(self, package_name): # Run ADB command to create a backup of SMS app self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") @@ -253,6 +255,7 @@ class AndroidExtraction(MVTModule): backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format(package_name)) backup_output = base64.b64decode(backup_output_b64) header = parse_ab_header(backup_output) + if not header["backup"]: self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") return @@ -261,7 +264,7 @@ class AndroidExtraction(MVTModule): return parse_backup_file(backup_output, password=None) # Backup encrypted. Request password from user. - while password_retry in range(0, 3): + for password_retry in range(0, 3): backup_password = getpass.getpass(prompt="Backup Password: ", stream=None) try: decrypted_backup_tar = parse_backup_file(backup_output, backup_password) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index d323cd1..bd3d9bb 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -117,6 +117,9 @@ class SMS(AndroidExtraction): algorithim. This module only supports an unencrypted ADB backup. """ backup_tar = self._generate_backup("com.android.providers.telephony") + if not backup_tar: + return + try: self.results = parse_tar_for_sms(backup_tar) except AndroidBackupParsingError: diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index 85b1865..3014625 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -78,20 +78,20 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b decryptor = cipher.decryptor() try: decryted_master_key_blob = decryptor.update(master_key_blob) + decryptor.finalize() + + # Extract key and IV from decrypted blob. + key_blob = io.BytesIO(decryted_master_key_blob) + master_iv_length = ord(key_blob.read(1)) + master_iv = key_blob.read(master_iv_length) + + master_key_length = ord(key_blob.read(1)) + master_key = key_blob.read(master_key_length) + + master_key_checksum_length = ord(key_blob.read(1)) + master_key_checksum = key_blob.read(master_key_checksum_length) except TypeError: raise InvalidBackupPassword() - # Extract key and IV from decrypted blob. - key_blob = io.BytesIO(decryted_master_key_blob) - master_iv_length = ord(key_blob.read(1)) - master_iv = key_blob.read(master_iv_length) - - master_key_length = ord(key_blob.read(1)) - master_key = key_blob.read(master_key_length) - - master_key_checksum_length = ord(key_blob.read(1)) - master_key_checksum = key_blob.read(master_key_checksum_length) - # Handle quirky encoding of master key bytes in Android original Java crypto code if format_version > 1: hmac_mk = to_utf8_bytes(master_key)