diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 63837a3..41ad802 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -13,7 +13,7 @@ from typing import List, Optional from mvt.android.artifacts.getprop import GetProp from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs -from mvt.android.cmd_check_backup import CmdAndroidCheckBackup +from mvt.android.cmd_check_backup import CmdAndroidCheckBackup, InvalidAndroidBackup from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport from mvt.common.command import Command from mvt.common.indicators import Indicators @@ -246,7 +246,14 @@ class CmdAndroidCheckAndroidQF(Command): sub_command=True, custom_modules=self.custom_modules, ) - cmd.from_ab(backup) + try: + cmd.from_ab(backup) + except InvalidAndroidBackup as exc: + self.log.warning( + "Skipping backup modules as backup.ab is malformed: %s", exc + ) + return False + cmd.run() self.timeline.extend(cmd.timeline) diff --git a/src/mvt/android/cmd_check_backup.py b/src/mvt/android/cmd_check_backup.py index 6c106f5..b75bb34 100644 --- a/src/mvt/android/cmd_check_backup.py +++ b/src/mvt/android/cmd_check_backup.py @@ -28,6 +28,10 @@ from .modules.backup import BACKUP_MODULES log = logging.getLogger(__name__) +class InvalidAndroidBackup(Exception): + pass + + class CmdAndroidCheckBackup(Command): def __init__( self, @@ -72,6 +76,10 @@ class CmdAndroidCheckBackup(Command): self.__type = "ab" header = parse_ab_header(ab_file_bytes) if not header["backup"]: + if self.sub_command: + raise InvalidAndroidBackup( + "Invalid backup format, file should be in .ab format" + ) log.critical("Invalid backup format, file should be in .ab format") sys.exit(1) @@ -87,12 +95,25 @@ class CmdAndroidCheckBackup(Command): log.critical("Invalid backup password") sys.exit(1) except AndroidBackupParsingError as exc: + if self.sub_command: + raise InvalidAndroidBackup( + f"Impossible to parse this backup file: {exc}" + ) from exc log.critical("Impossible to parse this backup file: %s", exc) log.critical("Please use Android Backup Extractor (ABE) instead") sys.exit(1) dbytes = io.BytesIO(tardata) - self.__tar = tarfile.open(fileobj=dbytes) + try: + self.__tar = tarfile.open(fileobj=dbytes) + except tarfile.TarError as exc: + if self.sub_command: + raise InvalidAndroidBackup( + f"Impossible to parse this backup file: {exc}" + ) from exc + log.critical("Impossible to parse this backup file: %s", exc) + log.critical("Please use Android Backup Extractor (ABE) instead") + sys.exit(1) for member in self.__tar: self.__files.append(member.name) diff --git a/src/mvt/android/modules/androidqf/sms.py b/src/mvt/android/modules/androidqf/sms.py index bcbd226..64aaab6 100644 --- a/src/mvt/android/modules/androidqf/sms.py +++ b/src/mvt/android/modules/androidqf/sms.py @@ -58,7 +58,7 @@ class SMS(AndroidQFModule): def parse_backup(self, data): header = parse_ab_header(data) if not header["backup"]: - self.log.critical("Invalid backup format, backup.ab was not analysed") + self.log.warning("Invalid backup format, backup.ab was not analysed") return password = None @@ -76,7 +76,7 @@ class SMS(AndroidQFModule): self.log.critical("Invalid backup password") return except AndroidBackupParsingError: - self.log.critical( + self.log.warning( "Impossible to parse this backup file, please use" " Android Backup Extractor instead" ) diff --git a/src/mvt/android/parsers/backup.py b/src/mvt/android/parsers/backup.py index c81ecd0..8c9bb5c 100644 --- a/src/mvt/android/parsers/backup.py +++ b/src/mvt/android/parsers/backup.py @@ -48,13 +48,16 @@ def parse_ab_header(data): 'encryption': "none", 'version': 4} """ if data.startswith(b"ANDROID BACKUP"): - [_, version, is_compressed, encryption, _] = data.split(b"\n", 4) - return { - "backup": True, - "compression": (is_compressed == b"1"), - "version": int(version), - "encryption": encryption.decode("utf-8"), - } + try: + [_, version, is_compressed, encryption, _] = data.split(b"\n", 4) + return { + "backup": True, + "compression": (is_compressed == b"1"), + "version": int(version), + "encryption": encryption.decode("utf-8"), + } + except (UnicodeDecodeError, ValueError): + pass return {"backup": False, "compression": None, "version": None, "encryption": None} diff --git a/tests/android/test_backup_parser.py b/tests/android/test_backup_parser.py index 5bd5b99..6e5be72 100644 --- a/tests/android/test_backup_parser.py +++ b/tests/android/test_backup_parser.py @@ -5,12 +5,24 @@ import hashlib -from mvt.android.parsers.backup import parse_backup_file, parse_tar_for_sms +from mvt.android.parsers.backup import ( + parse_ab_header, + parse_backup_file, + parse_tar_for_sms, +) from ..utils import get_artifact class TestBackupParsing: + def test_parse_incomplete_header(self): + assert parse_ab_header(b"ANDROID BACKUP\n") == { + "backup": False, + "compression": None, + "version": None, + "encryption": None, + } + def test_parsing_noencryption(self): file = get_artifact("android_backup/backup.ab") with open(file, "rb") as f: diff --git a/tests/test_check_android_androidqf.py b/tests/test_check_android_androidqf.py index c6e4221..c4c3410 100644 --- a/tests/test_check_android_androidqf.py +++ b/tests/test_check_android_androidqf.py @@ -3,7 +3,9 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ +import logging import os +import shutil from click.testing import CliRunner @@ -68,3 +70,18 @@ class TestCheckAndroidqfCommand: assert result.exit_code == 0 del os.environ["MVT_ANDROID_BACKUP_PASSWORD"] settings.__init__() # Reset settings + + def test_check_malformed_backup_skips_backup_modules(self, tmp_path, caplog): + path = tmp_path / "androidqf" + shutil.copytree(os.path.join(get_artifact_folder(), "androidqf"), path) + (path / "backup.ab").write_bytes(b"") + + runner = CliRunner() + with caplog.at_level(logging.WARNING): + result = runner.invoke(check_androidqf, [str(path)]) + + assert result.exit_code == 0 + assert "Skipping backup modules as backup.ab is malformed" in caplog.text + assert not any( + record.levelname in {"CRITICAL", "FATAL"} for record in caplog.records + )