diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 9e7d220..38e2d04 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -5,6 +5,9 @@ import logging import os +import getpass +import io +import tarfile from pathlib import Path from zipfile import ZipFile @@ -17,6 +20,8 @@ from mvt.common.help import (HELP_MSG_FAST, HELP_MSG_IOC, from mvt.common.indicators import Indicators, download_indicators_files from mvt.common.logo import logo from mvt.common.module import run_module, save_timeline +from mvt.android.parsers.backup import parse_ab_header, parse_backup_file +from mvt.android.parsers.backup import InvalidBackupPassword, AndroidBackupParsingError from .download_apks import DownloadAPKs from .lookups.koodous import koodous_lookup @@ -246,6 +251,44 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): def check_backup(ctx, iocs, output, backup_path, serial): log.info("Checking ADB backup located at: %s", backup_path) + if os.path.isfile(backup_path): + # AB File + backup_type = "ab" + with open(backup_path, "rb") as handle: + data = handle.read() + header = parse_ab_header(data) + if not header["backup"]: + log.critical("Invalid backup format, file should be in .ab format") + ctx.exit(1) + password = None + if header["encryption"] != "none": + password = getpass.getpass(prompt="Backup Password: ", stream=None) + try: + tardata = parse_backup_file(data, password=password) + except InvalidBackupPassword: + log.critical("Invalid backup password") + ctx.exit(1) + except AndroidBackupParsingError: + log.critical("Impossible to parse this backup file, please use Android Backup Extractor instead") + ctx.exit(1) + + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + + elif os.path.isdir(backup_path): + backup_type = "folder" + backup_path = Path(backup_path).absolute().as_posix() + files = [] + for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): + for fname in subfiles: + files.append(os.path.relpath(os.path.join(root, fname), backup_path)) + else: + log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file") + ctx.exit(1) + if output and not os.path.exists(output): try: os.makedirs(output) @@ -265,6 +308,11 @@ def check_backup(ctx, iocs, output, backup_path, serial): if serial: m.serial = serial + if backup_type == "folder": + m.from_folder(backup_path, files) + else: + m.from_ab(backup_path, tar, files) + run_module(m) diff --git a/mvt/android/modules/backup/base.py b/mvt/android/modules/backup/base.py new file mode 100644 index 0000000..8af5dc8 --- /dev/null +++ b/mvt/android/modules/backup/base.py @@ -0,0 +1,46 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import os +import fnmatch + +from mvt.common.module import MVTModule + + +class BackupExtraction(MVTModule): + """This class provides a base for all backup extractios modules""" + ab = None + + def from_folder(self, backup_path, files): + """ + Get all the files and list them + """ + self.backup_path = backup_path + self.files = files + + def from_ab(self, file_path, tar, files): + """ + Extract the files + """ + self.ab = file_path + self.tar = tar + self.files = files + + def _get_files_by_pattern(self, pattern): + return fnmatch.filter(self.files, pattern) + + def _get_file_content(self, file_path): + if self.ab: + try: + member = self.tar.getmember(file_path) + except KeyError: + return None + handle = self.tar.extractfile(member) + else: + handle = open(os.path.join(self.backup_path, file_path), "rb") + + data = handle.read() + handle.close() + return data diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index 71247d4..2f09ccf 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -3,20 +3,18 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import os -import getpass - -from mvt.common.module import MVTModule +from mvt.android.modules.backup.base import BackupExtraction from mvt.common.utils import check_for_links -from mvt.android.parsers.backup import parse_sms_file, parse_sms_backup, parse_ab_header, InvalidBackupPassword, AndroidBackupParsingError +from mvt.android.parsers.backup import parse_sms_file -class SMS(MVTModule): +class SMS(BackupExtraction): def __init__(self, file_path=None, base_folder=None, output_folder=None, fast_mode=False, log=None, results=[]): super().__init__(file_path=file_path, base_folder=base_folder, output_folder=output_folder, fast_mode=fast_mode, log=log, results=results) + self.results = [] def check_indicators(self): if not self.indicators: @@ -26,59 +24,13 @@ class SMS(MVTModule): if "body" not in message: continue - message_links = check_for_links(message["body"]) - if self.indicators.check_domains(message_links): + if self.indicators.check_domains(message["links"]): self.detected.append(message) - def _process_sms_file(self, file_path): - self.log.info("Processing SMS backup file at %s", file_path) - - with open(file_path, "rb") as handle: - data = handle.read() - - self.results = parse_sms_file(data) - def run(self): - # FIXME: this should be done in the Module code if there are other modules on backups - if os.path.isfile(self.base_folder): - # ab file - with open(self.base_folder, "rb") as handle: - data = handle.read() - header = parse_ab_header(data) - if not header["backup"]: - self.log.info("Not a valid Android Backup file, quitting...") - return - - pwd = None - if header["encryption"] != "none": - pwd = getpass.getpass(prompt="Backup Password: ", stream=None) - - try: - messages = parse_sms_backup(data, password=pwd) - except InvalidBackupPassword: - self.log.info("Invalid password, impossible de decrypt the backup, quitting...") - return - except AndroidBackupParsingError: - self.log.info("Impossible to extract data from this Android Backup, please regenerate the backup using the -nocompress option or extract it using Android Backup Extractor instead.") - self.log.info("Quitting...") - return - - self.results = messages - else: - app_folder = os.path.join(self.base_folder, - "apps", - "com.android.providers.telephony", - "d_f") - if not os.path.exists(app_folder): - self.log.info("Unable to find the SMS backup folder") - return - - for file_name in os.listdir(app_folder): - if not file_name.endswith("_sms_backup"): - continue - - file_path = os.path.join(app_folder, file_name) - self._process_sms_file(file_path) - + for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"): + self.log.info("Processing SMS backup file at %s", file) + data = self._get_file_content(file) + self.results.extend(parse_sms_file(data)) self.log.info("Extracted a total of %d SMS messages containing links", len(self.results)) diff --git a/tests/android/test_backup_module.py b/tests/android/test_backup_module.py index 347e4e3..7324931 100644 --- a/tests/android/test_backup_module.py +++ b/tests/android/test_backup_module.py @@ -5,17 +5,25 @@ import logging import os +import tarfile +import io from mvt.android.modules.backup.sms import SMS from mvt.common.module import run_module +from mvt.android.parsers.backup import parse_backup_file from ..utils import get_android_backup_folder class TestBackupModule: def test_module_folder(self): - fpath = get_android_backup_folder() - mod = SMS(base_folder=fpath, log=logging) + backup_path = get_android_backup_folder() + mod = SMS(base_folder=backup_path, log=logging) + files = [] + for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): + for fname in subfiles: + files.append(os.path.relpath(os.path.join(root, fname), backup_path)) + mod.from_folder(backup_path, files) run_module(mod) assert len(mod.results) == 1 assert len(mod.results[0]["links"]) == 1 @@ -24,6 +32,47 @@ class TestBackupModule: def test_module_file(self): fpath = os.path.join(get_android_backup_folder(), "backup.ab") mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data) + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 + + def test_module_file2(self): + fpath = os.path.join(get_android_backup_folder(), "backup2.ab") + mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data, password="123456") + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) + run_module(mod) + assert len(mod.results) == 1 + assert len(mod.results[0]["links"]) == 1 + + def test_module_file3(self): + fpath = os.path.join(get_android_backup_folder(), "backup3.ab") + mod = SMS(base_folder=fpath, log=logging) + with open(fpath, "rb") as f: + data = f.read() + tardata = parse_backup_file(data) + dbytes = io.BytesIO(tardata) + tar = tarfile.open(fileobj=dbytes) + files = [] + for member in tar: + files.append(member.name) + mod.from_ab(fpath, tar, files) run_module(mod) assert len(mod.results) == 1 assert len(mod.results[0]["links"]) == 1