diff --git a/Makefile b/Makefile index 2f6a0f0..3e23501 100644 --- a/Makefile +++ b/Makefile @@ -11,3 +11,6 @@ upload: test-upload: python3 -m twine upload --repository testpypi dist/* + +pylint: + pylint --rcfile=setup.cfg mvt diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 0bd6e0b..0adb88a 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -156,14 +156,13 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): # Command: check-backup #============================================================================== @cli.command("check-backup", help="Check an Android Backup") -@click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL) @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, default=[], help=HELP_MSG_IOC) @click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.argument("BACKUP_PATH", type=click.Path(exists=True)) @click.pass_context -def check_backup(ctx, serial, iocs, output, list_modules, backup_path): +def check_backup(ctx, iocs, output, list_modules, backup_path): cmd = CmdAndroidCheckBackup(target_path=backup_path, results_path=output, ioc_files=iocs) diff --git a/mvt/android/cmd_check_adb.py b/mvt/android/cmd_check_adb.py index 0d4bc44..8527fc8 100644 --- a/mvt/android/cmd_check_adb.py +++ b/mvt/android/cmd_check_adb.py @@ -14,12 +14,12 @@ log = logging.getLogger(__name__) class CmdAndroidCheckADB(Command): - name = "check-adb" - modules = ADB_MODULES - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + + self.name = "check-adb" + self.modules = ADB_MODULES diff --git a/mvt/android/cmd_check_backup.py b/mvt/android/cmd_check_backup.py index 4194828..039008f 100644 --- a/mvt/android/cmd_check_backup.py +++ b/mvt/android/cmd_check_backup.py @@ -25,9 +25,6 @@ log = logging.getLogger(__name__) class CmdAndroidCheckBackup(Command): - name = "check-backup" - modules = BACKUP_MODULES - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): @@ -35,6 +32,9 @@ class CmdAndroidCheckBackup(Command): ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + self.name = "check-backup" + self.modules = BACKUP_MODULES + self.backup_type = None self.backup_archive = None self.backup_files = [] diff --git a/mvt/android/cmd_check_bugreport.py b/mvt/android/cmd_check_bugreport.py index 1c478d7..52cf99f 100644 --- a/mvt/android/cmd_check_bugreport.py +++ b/mvt/android/cmd_check_bugreport.py @@ -18,9 +18,6 @@ log = logging.getLogger(__name__) class CmdAndroidCheckBugreport(Command): - name = "check-bugreport" - modules = BUGREPORT_MODULES - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): @@ -28,6 +25,9 @@ class CmdAndroidCheckBugreport(Command): ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + self.name = "check-bugreport" + self.modules = BUGREPORT_MODULES + self.bugreport_format = None self.bugreport_archive = None self.bugreport_files = [] @@ -41,7 +41,7 @@ class CmdAndroidCheckBugreport(Command): elif os.path.isdir(self.target_path): self.bugreport_format = "dir" parent_path = Path(self.target_path).absolute().as_posix() - for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): + for root, _, subfiles in os.walk(os.path.abspath(self.target_path)): for file_name in subfiles: self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path)) diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index b90bec7..6804e6c 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -25,8 +25,6 @@ from mvt.android.parsers.backup import (InvalidBackupPassword, parse_ab_header, parse_backup_file) from mvt.common.module import InsufficientPrivileges, MVTModule -log = logging.getLogger(__name__) - ADB_KEY_PATH = os.path.expanduser("~/.android/adbkey") ADB_PUB_KEY_PATH = os.path.expanduser("~/.android/adbkey.pub") @@ -75,7 +73,7 @@ class AndroidExtraction(MVTModule): try: self.device = AdbDeviceUsb(serial=self.serial) except UsbDeviceNotFoundError: - log.critical("No device found. Make sure it is connected and unlocked.") + self.log.critical("No device found. Make sure it is connected and unlocked.") sys.exit(-1) # Otherwise we try to use the TCP transport. else: @@ -90,18 +88,18 @@ class AndroidExtraction(MVTModule): try: self.device.connect(rsa_keys=[signer], auth_timeout_s=5) except (USBErrorBusy, USBErrorAccess): - log.critical("Device is busy, maybe run `adb kill-server` and try again.") + self.log.critical("Device is busy, maybe run `adb kill-server` and try again.") sys.exit(-1) except DeviceAuthError: - log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...") + self.log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...") time.sleep(5) except UsbReadFailedError: - log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.") + self.log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.") sys.exit(-1) except OSError as e: if e.errno == 113 and self.serial: - log.critical("Unable to connect to the device %s: did you specify the correct IP addres?", - self.serial) + self.log.critical("Unable to connect to the device %s: did you specify the correct IP addres?", + self.serial) sys.exit(-1) else: break @@ -112,7 +110,7 @@ class AndroidExtraction(MVTModule): def _adb_reconnect(self) -> None: """Reconnect to device using adb.""" - log.info("Reconnecting ...") + self.log.info("Reconnecting ...") self._adb_disconnect() self._adb_connect() @@ -197,15 +195,15 @@ class AndroidExtraction(MVTModule): new_remote_path = f"/sdcard/{tmp_filename}" # We copy the file from the data folder to /sdcard/. - cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") - if cp.startswith("cp: ") and "No such file or directory" in cp: + cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") + if cp_output.startswith("cp: ") and "No such file or directory" in cp_output: raise Exception(f"Unable to process file {remote_path}: File not found") - elif cp.startswith("cp: ") and "Permission denied" in cp: + if cp_output.startswith("cp: ") and "Permission denied" in cp_output: raise Exception(f"Unable to process file {remote_path}: Permission denied") # We download from /sdcard/ to the local temporary file. # If it doesn't work now, don't try again (retry_root=False) - self._adb_download(new_remote_path, local_path, retry_root=False) + self._adb_download(new_remote_path, local_path, progress_callback, retry_root=False) # Delete the copy on /sdcard/. self._adb_command(f"rm -rf {new_remote_path}") @@ -235,10 +233,10 @@ class AndroidExtraction(MVTModule): new_remote_path = f"/sdcard/Download/{local_name}" # We copy the file from the data folder to /sdcard/. - cp = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") - if cp.startswith("cp: ") and "No such file or directory" in cp: + cp_output = self._adb_command_as_root(f"cp {remote_path} {new_remote_path}") + if cp_output.startswith("cp: ") and "No such file or directory" in cp_output: raise Exception(f"Unable to process file {remote_path}: File not found") - elif cp.startswith("cp: ") and "Permission denied" in cp: + if cp_output.startswith("cp: ") and "Permission denied" in cp_output: raise Exception(f"Unable to process file {remote_path}: Permission denied") # We download from /sdcard/ to the local temporary file. @@ -258,19 +256,18 @@ class AndroidExtraction(MVTModule): self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... - backup_output_b64 = self._adb_command("/system/bin/bu backup -nocompress '{}' | base64".format( - package_name)) + backup_output_b64 = self._adb_command(f"/system/bin/bu backup -nocompress '{package_name}' | base64") backup_output = base64.b64decode(backup_output_b64) header = parse_ab_header(backup_output) if not header["backup"]: self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") - return + return None if header["encryption"] == "none": return parse_backup_file(backup_output, password=None) - for password_retry in range(0, 3): + for _ in range(0, 3): backup_password = Prompt.ask("Enter backup password", password=True) try: decrypted_backup_tar = parse_backup_file(backup_output, backup_password) @@ -280,6 +277,8 @@ class AndroidExtraction(MVTModule): self.log.warn("All attempts to decrypt backup with password failed!") + return None + def run(self) -> None: """Run the main procedure.""" raise NotImplementedError diff --git a/mvt/android/modules/adb/chrome_history.py b/mvt/android/modules/adb/chrome_history.py index 95506a3..ccaef8f 100644 --- a/mvt/android/modules/adb/chrome_history.py +++ b/mvt/android/modules/adb/chrome_history.py @@ -26,7 +26,7 @@ class ChromeHistory(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/android/modules/adb/dumpsys_appops.py b/mvt/android/modules/adb/dumpsys_appops.py index 211eabd..ab70b8f 100644 --- a/mvt/android/modules/adb/dumpsys_appops.py +++ b/mvt/android/modules/adb/dumpsys_appops.py @@ -23,7 +23,7 @@ class DumpsysAppOps(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] for perm in record["permissions"]: if "entries" not in perm: diff --git a/mvt/android/modules/adb/dumpsys_battery_daily.py b/mvt/android/modules/adb/dumpsys_battery_daily.py index 8ac197b..5884903 100644 --- a/mvt/android/modules/adb/dumpsys_battery_daily.py +++ b/mvt/android/modules/adb/dumpsys_battery_daily.py @@ -21,7 +21,7 @@ class DumpsysBatteryDaily(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["from"], "module": self.__class__.__name__, diff --git a/mvt/android/modules/adb/dumpsys_receivers.py b/mvt/android/modules/adb/dumpsys_receivers.py index a3df117..17b51f5 100644 --- a/mvt/android/modules/adb/dumpsys_receivers.py +++ b/mvt/android/modules/adb/dumpsys_receivers.py @@ -51,11 +51,11 @@ class DumpsysReceivers(AndroidExtraction): self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", receiver["receiver"]) - ioc = self.indicators.check_app_id(receiver["package_name"]) - if ioc: - receiver["matched_indicator"] = ioc - self.detected.append({intent: receiver}) - continue + ioc = self.indicators.check_app_id(receiver["package_name"]) + if ioc: + receiver["matched_indicator"] = ioc + self.detected.append({intent: receiver}) + continue def run(self) -> None: self._adb_connect() diff --git a/mvt/android/modules/adb/files.py b/mvt/android/modules/adb/files.py index 27c2eec..bb88fb6 100644 --- a/mvt/android/modules/adb/files.py +++ b/mvt/android/modules/adb/files.py @@ -34,7 +34,7 @@ class Files(AndroidExtraction): log=log, results=results) self.full_find = False - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: if "modified_time" in record: return { "timestamp": record["modified_time"], @@ -43,6 +43,8 @@ class Files(AndroidExtraction): "data": record["path"], } + return None + def check_indicators(self) -> None: for result in self.results: if result.get("is_suid"): diff --git a/mvt/android/modules/adb/packages.py b/mvt/android/modules/adb/packages.py index 0d924bf..a2436a0 100644 --- a/mvt/android/modules/adb/packages.py +++ b/mvt/android/modules/adb/packages.py @@ -78,7 +78,7 @@ class Packages(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] timestamps = [ @@ -87,11 +87,11 @@ class Packages(AndroidExtraction): {"event": "package_last_update", "timestamp": record["last_update_time"]}, ] - for ts in timestamps: + for timestamp in timestamps: records.append({ - "timestamp": ts["timestamp"], + "timestamp": timestamp["timestamp"], "module": self.__class__.__name__, - "event": ts["event"], + "event": timestamp["event"], "data": f"{record['package_name']} (system: {record['system']}, third party: {record['third_party']})", }) @@ -222,10 +222,10 @@ class Packages(AndroidExtraction): for file_path in output.splitlines(): file_path = file_path.strip() - md5 = self._adb_command(f"md5sum {file_path}").split(" ")[0] - sha1 = self._adb_command(f"sha1sum {file_path}").split(" ")[0] - sha256 = self._adb_command(f"sha256sum {file_path}").split(" ")[0] - sha512 = self._adb_command(f"sha512sum {file_path}").split(" ")[0] + md5 = self._adb_command(f"md5sum {file_path}").split(" ", maxsplit=1)[0] + sha1 = self._adb_command(f"sha1sum {file_path}").split(" ", maxsplit=1)[0] + sha256 = self._adb_command(f"sha256sum {file_path}").split(" ", maxsplit=1)[0] + sha512 = self._adb_command(f"sha512sum {file_path}").split(" ", maxsplit=1)[0] package_files.append({ "path": file_path, diff --git a/mvt/android/modules/adb/settings.py b/mvt/android/modules/adb/settings.py index 6c532b2..7e8a188 100644 --- a/mvt/android/modules/adb/settings.py +++ b/mvt/android/modules/adb/settings.py @@ -70,7 +70,7 @@ class Settings(AndroidExtraction): self.results = {} if not results else results def check_indicators(self) -> None: - for namespace, settings in self.results.items(): + for _, settings in self.results.items(): for key, value in settings.items(): for danger in ANDROID_DANGEROUS_SETTINGS: # Check if one of the dangerous settings is using an unsafe diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 7ce3f9a..74b29d7 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -52,7 +52,9 @@ class SMS(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + self.sms_db_type = 0 + + def serialize(self, record: dict) -> dict | list: body = record["body"].replace("\n", "\\n") return { "timestamp": record["isodate"], @@ -83,9 +85,9 @@ class SMS(AndroidExtraction): conn = sqlite3.connect(db_path) cur = conn.cursor() - if self.SMS_DB_TYPE == 1: + if self.sms_db_type == 1: cur.execute(SMS_BUGLE_QUERY) - elif self.SMS_DB_TYPE == 2: + elif self.sms_db_type == 2: cur.execute(SMS_MMSMS_QUERY) names = [description[0] for description in cur.description] @@ -133,10 +135,10 @@ class SMS(AndroidExtraction): def run(self) -> None: try: if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): - self.SMS_DB_TYPE = 1 + self.sms_db_type = 1 self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): - self.SMS_DB_TYPE = 2 + self.sms_db_type = 2 self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) return except InsufficientPrivileges: diff --git a/mvt/android/modules/adb/whatsapp.py b/mvt/android/modules/adb/whatsapp.py index ea59446..1445201 100644 --- a/mvt/android/modules/adb/whatsapp.py +++ b/mvt/android/modules/adb/whatsapp.py @@ -26,7 +26,7 @@ class Whatsapp(AndroidExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: text = record["data"].replace("\n", "\\n") return { "timestamp": record["isodate"], @@ -74,8 +74,9 @@ class Whatsapp(AndroidExtraction): # If we find links in the messages or if they are empty we add them to the list. if check_for_links(message["data"]) or message["data"].strip() == "": - if (message.get('thumb_image') is not None): + if message.get('thumb_image'): message['thumb_image'] = base64.b64encode(message['thumb_image']) + messages.append(message) cur.close() diff --git a/mvt/android/modules/backup/base.py b/mvt/android/modules/backup/base.py index 67256da..de50cb5 100644 --- a/mvt/android/modules/backup/base.py +++ b/mvt/android/modules/backup/base.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import fnmatch +import logging import os from tarfile import TarFile @@ -12,7 +13,19 @@ from mvt.common.module import MVTModule class BackupExtraction(MVTModule): """This class provides a base for all backup extractios modules""" - ab = None + + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = logging.getLogger(__name__), + results: list = []) -> None: + super().__init__(file_path=file_path, target_path=target_path, + results_path=results_path, fast_mode=fast_mode, + log=log, results=results) + + self.ab = None + self.backup_path = None + self.tar = None + self.files = [] def from_folder(self, backup_path: str, files: list) -> None: """ diff --git a/mvt/android/modules/bugreport/appops.py b/mvt/android/modules/bugreport/appops.py index f219d72..1fad632 100644 --- a/mvt/android/modules/bugreport/appops.py +++ b/mvt/android/modules/bugreport/appops.py @@ -21,7 +21,7 @@ class Appops(BugReportModule): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] for perm in record["permissions"]: if "entries" not in perm: diff --git a/mvt/android/modules/bugreport/base.py b/mvt/android/modules/bugreport/base.py index d7476d3..08e7204 100644 --- a/mvt/android/modules/bugreport/base.py +++ b/mvt/android/modules/bugreport/base.py @@ -4,6 +4,7 @@ # https://github.com/mvt-project/mvt/blob/main/LICENSE import fnmatch +import logging import os from zipfile import ZipFile @@ -13,7 +14,18 @@ from mvt.common.module import MVTModule class BugReportModule(MVTModule): """This class provides a base for all Android Bug Report modules.""" - zip_archive = None + def __init__(self, file_path: str = None, target_path: str = None, + results_path: str = None, fast_mode: bool = False, + log: logging.Logger = logging.getLogger(__name__), + results: list = []) -> None: + super().__init__(file_path=file_path, target_path=target_path, + results_path=results_path, fast_mode=fast_mode, + log=log, results=results) + + self.zip_archive = None + self.extract_path = None + self.extract_files = [] + self.zip_files = [] def from_folder(self, extract_path: str, extract_files: str) -> None: self.extract_path = extract_path diff --git a/mvt/android/modules/bugreport/battery_daily.py b/mvt/android/modules/bugreport/battery_daily.py index 750ff19..cd42a4c 100644 --- a/mvt/android/modules/bugreport/battery_daily.py +++ b/mvt/android/modules/bugreport/battery_daily.py @@ -21,7 +21,7 @@ class BatteryDaily(BugReportModule): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["from"], "module": self.__class__.__name__, diff --git a/mvt/android/modules/bugreport/packages.py b/mvt/android/modules/bugreport/packages.py index 88b521e..7742d2e 100644 --- a/mvt/android/modules/bugreport/packages.py +++ b/mvt/android/modules/bugreport/packages.py @@ -24,7 +24,7 @@ class Packages(BugReportModule): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] timestamps = [ @@ -33,11 +33,11 @@ class Packages(BugReportModule): {"event": "package_last_update", "timestamp": record["last_update_time"]}, ] - for ts in timestamps: + for timestamp in timestamps: records.append({ - "timestamp": ts["timestamp"], + "timestamp": timestamp["timestamp"], "module": self.__class__.__name__, - "event": ts["event"], + "event": timestamp["event"], "data": f"Install or update of package {record['package_name']}", }) diff --git a/mvt/android/modules/bugreport/receivers.py b/mvt/android/modules/bugreport/receivers.py index 73e3469..360f2b8 100644 --- a/mvt/android/modules/bugreport/receivers.py +++ b/mvt/android/modules/bugreport/receivers.py @@ -51,11 +51,11 @@ class Receivers(BugReportModule): self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", receiver["receiver"]) - ioc = self.indicators.check_app_id(receiver["package_name"]) - if ioc: - receiver["matched_indicator"] = ioc - self.detected.append({intent: receiver}) - continue + ioc = self.indicators.check_app_id(receiver["package_name"]) + if ioc: + receiver["matched_indicator"] = ioc + self.detected.append({intent: receiver}) + continue def run(self) -> None: content = self._get_dumpstate_file() diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index 8060f29..1d38b32 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -49,7 +49,7 @@ def parse_ab_header(data): 'encryption': "none", 'version': 4} """ if data.startswith(b"ANDROID BACKUP"): - [magic_header, version, is_compressed, encryption, tar_data] = data.split(b"\n", 4) + [_, version, is_compressed, encryption, _] = data.split(b"\n", 4) return { "backup": True, "compression": (is_compressed == b"1"), @@ -149,7 +149,7 @@ def parse_backup_file(data, password=None): if not data.startswith(b"ANDROID BACKUP"): raise AndroidBackupParsingError("Invalid file header") - [magic_header, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4) + [_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4) version = int(version) is_compressed = int(is_compressed) @@ -171,13 +171,14 @@ def parse_tar_for_sms(data): Returns an array of SMS """ dbytes = io.BytesIO(data) - tar = tarfile.open(fileobj=dbytes) + res = [] - for member in tar.getmembers(): - if member.name.startswith("apps/com.android.providers.telephony/d_f/") and \ - (member.name.endswith("_sms_backup") or member.name.endswith("_mms_backup")): - dhandler = tar.extractfile(member) - res.extend(parse_sms_file(dhandler.read())) + with tarfile.open(fileobj=dbytes) as tar: + for member in tar.getmembers(): + if member.name.startswith("apps/com.android.providers.telephony/d_f/") and \ + (member.name.endswith("_sms_backup") or member.name.endswith("_mms_backup")): + dhandler = tar.extractfile(member) + res.extend(parse_sms_file(dhandler.read())) return res @@ -192,7 +193,7 @@ def parse_sms_file(data): json_data = json.loads(data) for entry in json_data: - # Adapt MMS format to SMS format + # Adapt MMS format to SMS format. if "mms_body" in entry: entry["body"] = entry["mms_body"] entry.pop("mms_body") diff --git a/mvt/android/parsers/dumpsys.py b/mvt/android/parsers/dumpsys.py index 3ff40be..6e16237 100644 --- a/mvt/android/parsers/dumpsys.py +++ b/mvt/android/parsers/dumpsys.py @@ -213,14 +213,14 @@ def parse_dumpsys_dbinfo(output: str) -> list: matches = rxp_no_pid.findall(line) if not matches: continue - else: - match = matches[0] - results.append({ - "isodate": match[0], - "action": match[1], - "sql": match[2], - "path": pool, - }) + + match = matches[0] + results.append({ + "isodate": match[0], + "action": match[1], + "sql": match[2], + "path": pool, + }) else: match = matches[0] results.append({ diff --git a/mvt/common/cmd_check_iocs.py b/mvt/common/cmd_check_iocs.py index 245ca71..5718253 100644 --- a/mvt/common/cmd_check_iocs.py +++ b/mvt/common/cmd_check_iocs.py @@ -13,9 +13,6 @@ log = logging.getLogger(__name__) class CmdCheckIOCS(Command): - name = "check-iocs" - modules = [] - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): @@ -23,6 +20,8 @@ class CmdCheckIOCS(Command): ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + self.name = "check-iocs" + def run(self) -> None: all_modules = [] for entry in self.modules: @@ -33,7 +32,7 @@ class CmdCheckIOCS(Command): total_detections = 0 for file_name in os.listdir(self.target_path): - name_only, ext = os.path.splitext(file_name) + name_only, _ = os.path.splitext(file_name) file_path = os.path.join(self.target_path, file_name) for iocs_module in all_modules: diff --git a/mvt/common/command.py b/mvt/common/command.py index fae9741..40f6865 100644 --- a/mvt/common/command.py +++ b/mvt/common/command.py @@ -17,13 +17,14 @@ from mvt.common.utils import convert_timestamp_to_iso from mvt.common.version import MVT_VERSION -class Command(object): +class Command: def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False, log: logging.Logger = logging.getLogger(__name__)): self.name = "" + self.modules = [] self.target_path = target_path self.results_path = results_path @@ -56,11 +57,11 @@ class Command(object): if not self.results_path: return - fh = logging.FileHandler(os.path.join(self.results_path, "command.log")) + file_handler = logging.FileHandler(os.path.join(self.results_path, "command.log")) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") - fh.setLevel(logging.DEBUG) - fh.setFormatter(formatter) - logger.addHandler(fh) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) def _store_timeline(self) -> None: if not self.results_path: @@ -99,23 +100,23 @@ class Command(object): # enough. if self.target_path and os.environ.get("MVT_HASH_FILES"): if os.path.isfile(self.target_path): - h = hashlib.sha256() + sha256 = hashlib.sha256() with open(self.target_path, "rb") as handle: - h.update(handle.read()) + sha256.update(handle.read()) info["hashes"].append({ "file_path": self.target_path, - "sha256": h.hexdigest(), + "sha256": sha256.hexdigest(), }) elif os.path.isdir(self.target_path): - for (root, dirs, files) in os.walk(self.target_path): + for (root, _, files) in os.walk(self.target_path): for file in files: file_path = os.path.join(root, file) - h = hashlib.sha256() + sha256 = hashlib.sha256() try: with open(file_path, "rb") as handle: - h.update(handle.read()) + sha256.update(handle.read()) except FileNotFoundError: self.log.error("Failed to hash the file %s: might be a symlink", file_path) continue @@ -125,10 +126,10 @@ class Command(object): info["hashes"].append({ "file_path": file_path, - "sha256": h.hexdigest(), + "sha256": sha256.hexdigest(), }) - with open(os.path.join(self.results_path, "info.json"), "w+") as handle: + with open(os.path.join(self.results_path, "info.json"), "w+", encoding="utf-8") as handle: json.dump(info, handle, indent=4) def list_modules(self) -> None: diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 3ddb29d..51ac456 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -74,6 +74,44 @@ class Indicators: ioc_coll["count"] += 1 self.total_ioc_count += 1 + def _process_indicator(self, indicator: dict, collection: dict) -> None: + key, value = indicator.get("pattern", "").strip("[]").split("=") + + if key == "domain-name:value": + # We force domain names to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["domains"]) + elif key == "process:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["processes"]) + elif key == "email-addr:value": + # We force email addresses to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["emails"]) + elif key == "file:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_names"]) + elif key == "file:path": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_paths"]) + elif key == "file:hashes.sha256": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["files_sha256"]) + elif key == "app:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["app_ids"]) + elif key == "configuration-profile:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["ios_profile_ids"]) + def parse_stix2(self, file_path: str) -> None: """Extract indicators from a STIX2 file. @@ -132,47 +170,9 @@ class Indicators: # Now we look for the correct collection matching the malware ID we # got from the relationship. for collection in collections: - if collection["id"] != malware_id: - continue - - key, value = indicator.get("pattern", "").strip("[]").split("=") - - if key == "domain-name:value": - # We force domain names to lower case. - self._add_indicator(ioc=value.lower(), - ioc_coll=collection, - ioc_coll_list=collection["domains"]) - elif key == "process:name": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["processes"]) - elif key == "email-addr:value": - # We force email addresses to lower case. - self._add_indicator(ioc=value.lower(), - ioc_coll=collection, - ioc_coll_list=collection["emails"]) - elif key == "file:name": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["file_names"]) - elif key == "file:path": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["file_paths"]) - elif key == "file:hashes.sha256": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["files_sha256"]) - elif key == "app:id": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["app_ids"]) - elif key == "configuration-profile:id": - self._add_indicator(ioc=value, - ioc_coll=collection, - ioc_coll_list=collection["ios_profile_ids"]) - - break + if collection["id"] == malware_id: + self._process_indicator(indicator, collection) + break for coll in collections: self.log.info("Extracted %d indicators for collection with name \"%s\"", @@ -198,7 +198,7 @@ class Indicators: self._check_stix2_env_variable() self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def get_iocs(self, ioc_type: str) -> dict: + def get_iocs(self, ioc_type: str) -> dict | None: for ioc_collection in self.ioc_collections: for ioc in ioc_collection.get(ioc_type, []): yield { @@ -208,7 +208,7 @@ class Indicators: "stix2_file_name": ioc_collection["stix2_file_name"], } - def check_domain(self, url: str) -> dict: + def check_domain(self, url: str) -> dict | None: """Check if a given URL matches any of the provided domain indicators. :param url: URL to match against domain indicators @@ -280,7 +280,9 @@ class Indicators: return ioc - def check_domains(self, urls: list) -> dict: + return None + + def check_domains(self, urls: list) -> dict | None: """Check a list of URLs against the provided list of domain indicators. :param urls: List of URLs to check against domain indicators @@ -296,7 +298,9 @@ class Indicators: if check: return check - def check_process(self, process: str) -> dict: + return None + + def check_process(self, process: str) -> dict | None: """Check the provided process name against the list of process indicators. @@ -321,7 +325,9 @@ class Indicators: process, ioc["name"]) return ioc - def check_processes(self, processes: list) -> dict: + return None + + def check_processes(self, processes: list) -> dict | None: """Check the provided list of processes against the list of process indicators. @@ -338,7 +344,9 @@ class Indicators: if check: return check - def check_email(self, email: str) -> dict: + return None + + def check_email(self, email: str) -> dict | None: """Check the provided email against the list of email indicators. :param email: Email address to check against email indicators @@ -355,7 +363,9 @@ class Indicators: email, ioc["name"]) return ioc - def check_file_name(self, file_name: str) -> dict: + return None + + def check_file_name(self, file_name: str) -> dict | None: """Check the provided file name against the list of file indicators. :param file_name: File name to check against file @@ -373,7 +383,9 @@ class Indicators: file_name, ioc["name"]) return ioc - def check_file_path(self, file_path: str) -> dict: + return None + + def check_file_path(self, file_path: str) -> dict | None: """Check the provided file path against the list of file indicators (both path and name). :param file_path: File path or file name to check against file @@ -396,7 +408,9 @@ class Indicators: file_path, ioc["name"]) return ioc - def check_profile(self, profile_uuid: str) -> dict: + return None + + def check_profile(self, profile_uuid: str) -> dict | None: """Check the provided configuration profile UUID against the list of indicators. :param profile_uuid: Profile UUID to check against configuration profile indicators @@ -413,7 +427,9 @@ class Indicators: profile_uuid, ioc["name"]) return ioc - def check_file_hash(self, file_hash: str) -> dict: + return None + + def check_file_hash(self, file_hash: str) -> dict | None: """Check the provided SHA256 file hash against the list of indicators. :param file_hash: SHA256 hash to check @@ -430,7 +446,9 @@ class Indicators: file_hash, ioc["name"]) return ioc - def check_app_id(self, app_id: str) -> dict: + return None + + def check_app_id(self, app_id: str) -> dict | None: """Check the provided app identifier (typically an Android package name) against the list of indicators. @@ -447,3 +465,5 @@ class Indicators: self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"", app_id, ioc["name"]) return ioc + + return None diff --git a/mvt/common/module.py b/mvt/common/module.py index f72bbea..0533d80 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -24,7 +24,7 @@ class InsufficientPrivileges(Exception): pass -class MVTModule(object): +class MVTModule: """This class provides a base for all extraction modules.""" enabled = True @@ -106,7 +106,7 @@ class MVTModule(object): with open(detected_json_path, "w", encoding="utf-8") as handle: json.dump(self.detected, handle, indent=4, default=str) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: raise NotImplementedError @staticmethod @@ -126,7 +126,7 @@ class MVTModule(object): for result in self.results: record = self.serialize(result) if record: - if type(record) == list: + if isinstance(record, list): self.timeline.extend(record) else: self.timeline.append(record) @@ -134,7 +134,7 @@ class MVTModule(object): for detected in self.detected: record = self.serialize(detected) if record: - if type(record) == list: + if isinstance(record, list): self.timeline_detected.extend(record) else: self.timeline_detected.append(record) @@ -173,7 +173,6 @@ def run_module(module: Callable) -> None: except NotImplementedError: module.log.info("The %s module does not support checking for indicators", module.__class__.__name__) - pass else: if module.indicators and not module.detected: module.log.info("The %s module produced no detections!", diff --git a/mvt/common/options.py b/mvt/common/options.py index 36f8ac1..3e46b9b 100644 --- a/mvt/common/options.py +++ b/mvt/common/options.py @@ -13,27 +13,21 @@ class MutuallyExclusiveOption(Option): def __init__(self, *args, **kwargs): self.mutually_exclusive = set(kwargs.pop("mutually_exclusive", [])) - help = kwargs.get("help", "") + help_msg = kwargs.get("help", "") if self.mutually_exclusive: ex_str = ", ".join(self.mutually_exclusive) - kwargs["help"] = help + ( + kwargs["help"] = help_msg + ( " NOTE: This argument is mutually exclusive with " "arguments: [" + ex_str + "]." ) - super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) + + super().__init__(*args, **kwargs) def handle_parse_result(self, ctx, opts, args): if self.mutually_exclusive.intersection(opts) and self.name in opts: raise UsageError( - "Illegal usage: `{}` is mutually exclusive with " - "arguments `{}`.".format( - self.name, - ", ".join(self.mutually_exclusive) - ) + f"Illegal usage: `{self.name}` is mutually exclusive with " + f"arguments `{', '.join(self.mutually_exclusive)}`." ) - return super(MutuallyExclusiveOption, self).handle_parse_result( - ctx, - opts, - args - ) + return super().handle_parse_result(ctx, opts, args) diff --git a/mvt/common/updates.py b/mvt/common/updates.py index 6a2efb4..96e9c7c 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -55,7 +55,7 @@ class IndicatorsUpdates: if not os.path.exists(self.latest_check_path): return 0 - with open(self.latest_check_path, "r") as handle: + with open(self.latest_check_path, "r", encoding="utf-8") as handle: data = handle.read().strip() if data: return int(data) @@ -64,14 +64,14 @@ class IndicatorsUpdates: def set_latest_check(self) -> None: timestamp = int(datetime.utcnow().timestamp()) - with open(self.latest_check_path, "w") as handle: + with open(self.latest_check_path, "w", encoding="utf-8") as handle: handle.write(str(timestamp)) def get_latest_update(self) -> int: if not os.path.exists(self.latest_update_path): return 0 - with open(self.latest_update_path, "r") as handle: + with open(self.latest_update_path, "r", encoding="utf-8") as handle: data = handle.read().strip() if data: return int(data) @@ -80,7 +80,7 @@ class IndicatorsUpdates: def set_latest_update(self) -> None: timestamp = int(datetime.utcnow().timestamp()) - with open(self.latest_update_path, "w") as handle: + with open(self.latest_update_path, "w", encoding="utf-8") as handle: handle.write(str(timestamp)) def get_remote_index(self) -> dict: @@ -145,23 +145,25 @@ class IndicatorsUpdates: self.set_latest_update() def _get_remote_file_latest_commit(self, owner: str, repo: str, - branch: str, path: str) -> bool: - file_commit_url = f"https://api.github.com/repos/{self.index_owner}/{self.index_repo}/commits?path={self.index_path}" + branch: str, path: str) -> int: + # TODO: The branch is currently not taken into consideration. + # How do we specify which branch to look up to the API? + file_commit_url = f"https://api.github.com/repos/{owner}/{repo}/commits?path={path}" res = requests.get(file_commit_url) if res.status_code != 200: log.error("Failed to get details about file %s (error %d)", file_commit_url, res.status_code) - return False + return -1 details = res.json() if len(details) == 0: - return False + return -1 latest_commit = details[0] latest_commit_date = latest_commit.get("commit", {}).get("author", {}).get("date", None) if not latest_commit_date: log.error("Failed to retrieve date of latest update to indicators index file") - return False + return -1 latest_commit_dt = datetime.strptime(latest_commit_date, '%Y-%m-%dT%H:%M:%SZ') latest_commit_ts = int(latest_commit_dt.timestamp()) diff --git a/mvt/common/url.py b/mvt/common/url.py index a5cb764..312572b 100644 --- a/mvt/common/url.py +++ b/mvt/common/url.py @@ -256,7 +256,7 @@ SHORTENER_DOMAINS = [ class URL: def __init__(self, url: str) -> None: - if type(url) == bytes: + if isinstance(url, bytes): url = url.decode() self.url = url @@ -315,3 +315,5 @@ class URL: res = requests.head(self.url) if str(res.status_code).startswith("30"): return res.headers["Location"] + + return "" diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 19e82f0..0313769 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -23,7 +23,7 @@ def convert_mactime_to_unix(timestamp, from_2001: bool = True): # This is to fix formats in case of, for example, SMS messages database # timestamp format. - if type(timestamp) == int and len(str(timestamp)) == 18: + if isinstance(timestamp, int) and len(str(timestamp)) == 18: timestamp = int(str(timestamp)[:9]) # MacTime counts from 2001-01-01. @@ -106,8 +106,8 @@ def keys_bytes_to_string(obj) -> str: if isinstance(obj, (tuple, list, set)): value = [keys_bytes_to_string(x) for x in obj] return value - else: - return obj + + return obj for key, value in obj.items(): if isinstance(key, bytes): diff --git a/mvt/common/virustotal.py b/mvt/common/virustotal.py index 16cb9ac..0bac588 100644 --- a/mvt/common/virustotal.py +++ b/mvt/common/virustotal.py @@ -35,11 +35,12 @@ def virustotal_lookup(file_hash: str): if res.status_code == 200: report = res.json() return report["data"] - elif res.status_code == 404: + + if res.status_code == 404: log.info("Could not find results for file with hash %s", file_hash) elif res.status_code == 429: raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key") else: - raise Exception("Unexpected response from VirusTotal: %s", res.status_code) + raise Exception(f"Unexpected response from VirusTotal: {res.status_code}") return None diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index 1087b65..32511c1 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -71,7 +71,7 @@ def decrypt_backup(ctx, destination, password, key_file, backup_path): if key_file: if MVT_IOS_BACKUP_PASSWORD in os.environ: - log.info("Ignoring environment variable, using --key-file '%s' instead", + log.info("Ignoring %s environment variable, using --key-file '%s' instead", MVT_IOS_BACKUP_PASSWORD, key_file) backup.decrypt_with_key_file(key_file) diff --git a/mvt/ios/cmd_check_backup.py b/mvt/ios/cmd_check_backup.py index c3036ae..4cf2355 100644 --- a/mvt/ios/cmd_check_backup.py +++ b/mvt/ios/cmd_check_backup.py @@ -15,9 +15,6 @@ log = logging.getLogger(__name__) class CmdIOSCheckBackup(Command): - name = "check-backup" - modules = BACKUP_MODULES + MIXED_MODULES - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): @@ -25,5 +22,8 @@ class CmdIOSCheckBackup(Command): ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + self.name = "check-backup" + self.modules = BACKUP_MODULES + MIXED_MODULES + def module_init(self, module): module.is_backup = True diff --git a/mvt/ios/cmd_check_fs.py b/mvt/ios/cmd_check_fs.py index 021d65c..503660c 100644 --- a/mvt/ios/cmd_check_fs.py +++ b/mvt/ios/cmd_check_fs.py @@ -15,9 +15,6 @@ log = logging.getLogger(__name__) class CmdIOSCheckFS(Command): - name = "check-fs" - modules = FS_MODULES + MIXED_MODULES - def __init__(self, target_path: str = None, results_path: str = None, ioc_files: list = [], module_name: str = None, serial: str = None, fast_mode: bool = False): @@ -25,5 +22,8 @@ class CmdIOSCheckFS(Command): ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) + self.name = "check-fs" + self.modules = FS_MODULES + MIXED_MODULES + def module_init(self, module): module.is_fs_dump = True diff --git a/mvt/ios/modules/backup/configuration_profiles.py b/mvt/ios/modules/backup/configuration_profiles.py index f97457c..3d2932b 100644 --- a/mvt/ios/modules/backup/configuration_profiles.py +++ b/mvt/ios/modules/backup/configuration_profiles.py @@ -26,9 +26,9 @@ class ConfigurationProfiles(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: if not record["install_date"]: - return + return {} payload_name = record['plist'].get('PayloadDisplayName') payload_description = record['plist'].get('PayloadDescription') @@ -106,12 +106,12 @@ class ConfigurationProfiles(IOSExtraction): conf_plist["LastPushTokenHash"] = b64encode(conf_plist["LastPushTokenHash"]) if "PayloadContent" in conf_plist: - for x in range(len(conf_plist["PayloadContent"])): - if "PERSISTENT_REF" in conf_plist["PayloadContent"][x]: - conf_plist["PayloadContent"][x]["PERSISTENT_REF"] = b64encode(conf_plist["PayloadContent"][x]["PERSISTENT_REF"]) + for content_entry in range(len(conf_plist["PayloadContent"])): + if "PERSISTENT_REF" in conf_plist["PayloadContent"][content_entry]: + conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"] = b64encode(conf_plist["PayloadContent"][content_entry]["PERSISTENT_REF"]) - if "IdentityPersistentRef" in conf_plist["PayloadContent"][x]: - conf_plist["PayloadContent"][x]["IdentityPersistentRef"] = b64encode(conf_plist["PayloadContent"][x]["IdentityPersistentRef"]) + if "IdentityPersistentRef" in conf_plist["PayloadContent"][content_entry]: + conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"] = b64encode(conf_plist["PayloadContent"][content_entry]["IdentityPersistentRef"]) self.results.append({ "file_id": conf_file["file_id"], diff --git a/mvt/ios/modules/backup/manifest.py b/mvt/ios/modules/backup/manifest.py index 3bfac33..d2c9304 100644 --- a/mvt/ios/modules/backup/manifest.py +++ b/mvt/ios/modules/backup/manifest.py @@ -46,23 +46,24 @@ class Manifest(IOSExtraction): """ if isinstance(timestamp_or_unix_time_int, datetime.datetime): return convert_timestamp_to_iso(timestamp_or_unix_time_int) - else: - timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int) - return convert_timestamp_to_iso(timestamp) - def serialize(self, record: dict) -> None: + timestamp = datetime.datetime.utcfromtimestamp(timestamp_or_unix_time_int) + return convert_timestamp_to_iso(timestamp) + + def serialize(self, record: dict) -> []: records = [] if "modified" not in record or "status_changed" not in record: - return - for ts in set([record["created"], record["modified"], record["status_changed"]]): + return records + + for timestamp in set([record["created"], record["modified"], record["status_changed"]]): macb = "" - macb += "M" if ts == record["modified"] else "-" + macb += "M" if timestamp == record["modified"] else "-" macb += "-" - macb += "C" if ts == record["status_changed"] else "-" - macb += "B" if ts == record["created"] else "-" + macb += "C" if timestamp == record["status_changed"] else "-" + macb += "B" if timestamp == record["created"] else "-" records.append({ - "timestamp": ts, + "timestamp": timestamp, "module": self.__class__.__name__, "event": macb, "data": f"{record['relative_path']} - {record['domain']}" @@ -136,7 +137,6 @@ class Manifest(IOSExtraction): except Exception: self.log.exception("Error reading manifest file metadata for file with ID %s and relative path %s", file_data["fileID"], file_data["relativePath"]) - pass self.results.append(cleaned_metadata) diff --git a/mvt/ios/modules/backup/profile_events.py b/mvt/ios/modules/backup/profile_events.py index 358ce24..8451ff6 100644 --- a/mvt/ios/modules/backup/profile_events.py +++ b/mvt/ios/modules/backup/profile_events.py @@ -27,7 +27,7 @@ class ProfileEvents(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record.get("timestamp"), "module": self.__class__.__name__, diff --git a/mvt/ios/modules/base.py b/mvt/ios/modules/base.py index b70c814..6c6a3d4 100644 --- a/mvt/ios/modules/base.py +++ b/mvt/ios/modules/base.py @@ -94,7 +94,7 @@ class IOSExtraction(MVTModule): elif domain: cur.execute(f"{base_sql} domain = ?;", (domain,)) except Exception as e: - raise DatabaseCorruptedError("failed to query Manifest.db: %s", e) + raise DatabaseCorruptedError(f"failed to query Manifest.db: {e}") for row in cur: yield { diff --git a/mvt/ios/modules/fs/analytics.py b/mvt/ios/modules/fs/analytics.py index 8c946d0..83c0587 100644 --- a/mvt/ios/modules/fs/analytics.py +++ b/mvt/ios/modules/fs/analytics.py @@ -27,7 +27,7 @@ class Analytics(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/analytics_ios_versions.py b/mvt/ios/modules/fs/analytics_ios_versions.py index a72cc53..6d3c74e 100644 --- a/mvt/ios/modules/fs/analytics_ios_versions.py +++ b/mvt/ios/modules/fs/analytics_ios_versions.py @@ -25,7 +25,7 @@ class AnalyticsIOSVersions(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, @@ -45,25 +45,25 @@ class AnalyticsIOSVersions(IOSExtraction): if not build: continue - ts = result.get("isodate", None) - if not ts: + isodate = result.get("isodate", None) + if not isodate: continue if build not in builds.keys(): - builds[build] = ts + builds[build] = isodate continue - result_dt = datetime.strptime(ts, dt_format) + result_dt = datetime.strptime(isodate, dt_format) cur_dt = datetime.strptime(builds[build], dt_format) if result_dt < cur_dt: - builds[build] = ts + builds[build] = isodate - for build, ts in builds.items(): + for build, isodate in builds.items(): version = find_version_by_build(build) self.results.append({ - "isodate": ts, + "isodate": isodate, "build": build, "version": version, }) diff --git a/mvt/ios/modules/fs/cache_files.py b/mvt/ios/modules/fs/cache_files.py index 5bcf5be..9723076 100644 --- a/mvt/ios/modules/fs/cache_files.py +++ b/mvt/ios/modules/fs/cache_files.py @@ -20,7 +20,7 @@ class CacheFiles(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] for item in self.results[record]: records.append({ @@ -74,7 +74,7 @@ class CacheFiles(IOSExtraction): def run(self) -> None: self.results = {} - for root, dirs, files in os.walk(self.target_path): + for root, _, files in os.walk(self.target_path): for file_name in files: if file_name != "Cache.db": continue diff --git a/mvt/ios/modules/fs/filesystem.py b/mvt/ios/modules/fs/filesystem.py index bf04420..9e06496 100644 --- a/mvt/ios/modules/fs/filesystem.py +++ b/mvt/ios/modules/fs/filesystem.py @@ -27,7 +27,7 @@ class Filesystem(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["modified"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/safari_favicon.py b/mvt/ios/modules/fs/safari_favicon.py index 241b3fc..4bb4809 100644 --- a/mvt/ios/modules/fs/safari_favicon.py +++ b/mvt/ios/modules/fs/safari_favicon.py @@ -27,7 +27,7 @@ class SafariFavicon(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/shutdownlog.py b/mvt/ios/modules/fs/shutdownlog.py index cd5d2a4..a2b541c 100644 --- a/mvt/ios/modules/fs/shutdownlog.py +++ b/mvt/ios/modules/fs/shutdownlog.py @@ -25,7 +25,7 @@ class ShutdownLog(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/version_history.py b/mvt/ios/modules/fs/version_history.py index afe1b54..ad838be 100644 --- a/mvt/ios/modules/fs/version_history.py +++ b/mvt/ios/modules/fs/version_history.py @@ -27,7 +27,7 @@ class IOSVersionHistory(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/webkit_indexeddb.py b/mvt/ios/modules/fs/webkit_indexeddb.py index 1e2333c..45e903a 100644 --- a/mvt/ios/modules/fs/webkit_indexeddb.py +++ b/mvt/ios/modules/fs/webkit_indexeddb.py @@ -29,7 +29,7 @@ class WebkitIndexedDB(WebkitBase): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/fs/webkit_localstorage.py b/mvt/ios/modules/fs/webkit_localstorage.py index f921df6..30c03ff 100644 --- a/mvt/ios/modules/fs/webkit_localstorage.py +++ b/mvt/ios/modules/fs/webkit_localstorage.py @@ -27,7 +27,7 @@ class WebkitLocalStorage(WebkitBase): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/calls.py b/mvt/ios/modules/mixed/calls.py index 423990d..e0293d7 100644 --- a/mvt/ios/modules/mixed/calls.py +++ b/mvt/ios/modules/mixed/calls.py @@ -29,7 +29,7 @@ class Calls(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/chrome_favicon.py b/mvt/ios/modules/mixed/chrome_favicon.py index 44be403..4575cf5 100644 --- a/mvt/ios/modules/mixed/chrome_favicon.py +++ b/mvt/ios/modules/mixed/chrome_favicon.py @@ -32,7 +32,7 @@ class ChromeFavicon(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/chrome_history.py b/mvt/ios/modules/mixed/chrome_history.py index 78c3f0d..cbda552 100644 --- a/mvt/ios/modules/mixed/chrome_history.py +++ b/mvt/ios/modules/mixed/chrome_history.py @@ -31,7 +31,7 @@ class ChromeHistory(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/firefox_favicon.py b/mvt/ios/modules/mixed/firefox_favicon.py index 2ca83b0..3bb3f87 100644 --- a/mvt/ios/modules/mixed/firefox_favicon.py +++ b/mvt/ios/modules/mixed/firefox_favicon.py @@ -30,7 +30,7 @@ class FirefoxFavicon(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/firefox_history.py b/mvt/ios/modules/mixed/firefox_history.py index 9895dd6..b106052 100644 --- a/mvt/ios/modules/mixed/firefox_history.py +++ b/mvt/ios/modules/mixed/firefox_history.py @@ -34,7 +34,7 @@ class FirefoxHistory(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/idstatuscache.py b/mvt/ios/modules/mixed/idstatuscache.py index 10d65f4..e616933 100644 --- a/mvt/ios/modules/mixed/idstatuscache.py +++ b/mvt/ios/modules/mixed/idstatuscache.py @@ -31,7 +31,7 @@ class IDStatusCache(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/interactionc.py b/mvt/ios/modules/mixed/interactionc.py index 2d0c4cc..1253fbe 100644 --- a/mvt/ios/modules/mixed/interactionc.py +++ b/mvt/ios/modules/mixed/interactionc.py @@ -42,27 +42,27 @@ class InteractionC(IOSExtraction): "last_outgoing_recipient_date", ] - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] processed = [] - for ts in self.timestamps: + for timestamp in self.timestamps: # Check if the record has the current timestamp. - if ts not in record or not record[ts]: + if timestamp not in record or not record[timestamp]: continue # Check if the timestamp was already processed. - if record[ts] in processed: + if record[timestamp] in processed: continue records.append({ - "timestamp": record[ts], + "timestamp": record[timestamp], "module": self.__class__.__name__, - "event": ts, + "event": timestamp, "data": f"[{record['bundle_id']}] {record['account']} - from {record['sender_display_name']} " f"({record['sender_identifier']}) to {record['recipient_display_name']} " f"({record['recipient_identifier']}): {record['content']}" }) - processed.append(record[ts]) + processed.append(record[timestamp]) return records diff --git a/mvt/ios/modules/mixed/locationd.py b/mvt/ios/modules/mixed/locationd.py index adb30ed..1cbf0d1 100644 --- a/mvt/ios/modules/mixed/locationd.py +++ b/mvt/ios/modules/mixed/locationd.py @@ -42,7 +42,7 @@ class LocationdClients(IOSExtraction): "BeaconRegionTimeStopped", ] - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: records = [] for timestamp in self.timestamps: if timestamp in record.keys(): @@ -102,12 +102,12 @@ class LocationdClients(IOSExtraction): with open(file_path, "rb") as handle: file_plist = plistlib.load(handle) - for key, values in file_plist.items(): + for key, _ in file_plist.items(): result = file_plist[key] result["package"] = key - for ts in self.timestamps: - if ts in result.keys(): - result[ts] = convert_timestamp_to_iso(convert_mactime_to_unix(result[ts])) + for timestamp in self.timestamps: + if timestamp in result.keys(): + result[timestamp] = convert_timestamp_to_iso(convert_mactime_to_unix(result[timestamp])) self.results.append(result) diff --git a/mvt/ios/modules/mixed/osanalytics_addaily.py b/mvt/ios/modules/mixed/osanalytics_addaily.py index 7efeeaf..3e2a089 100644 --- a/mvt/ios/modules/mixed/osanalytics_addaily.py +++ b/mvt/ios/modules/mixed/osanalytics_addaily.py @@ -29,7 +29,7 @@ class OSAnalyticsADDaily(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: record_data = f"{record['package']} WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" return { diff --git a/mvt/ios/modules/mixed/safari_browserstate.py b/mvt/ios/modules/mixed/safari_browserstate.py index 4273697..d90a6b5 100644 --- a/mvt/ios/modules/mixed/safari_browserstate.py +++ b/mvt/ios/modules/mixed/safari_browserstate.py @@ -34,7 +34,7 @@ class SafariBrowserState(IOSExtraction): self._session_history_count = 0 - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["last_viewed_timestamp"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/safari_history.py b/mvt/ios/modules/mixed/safari_history.py index 3bc026c..878b1cd 100644 --- a/mvt/ios/modules/mixed/safari_history.py +++ b/mvt/ios/modules/mixed/safari_history.py @@ -34,7 +34,7 @@ class SafariHistory(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/shortcuts.py b/mvt/ios/modules/mixed/shortcuts.py index dfa5bd0..0f751d8 100644 --- a/mvt/ios/modules/mixed/shortcuts.py +++ b/mvt/ios/modules/mixed/shortcuts.py @@ -33,13 +33,14 @@ class Shortcuts(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: found_urls = "" if record["action_urls"]: - found_urls = "- URLs in actions: {}".format(", ".join(record["action_urls"])) + found_urls = f"- URLs in actions: {', '.join(record['action_urls'])}" + desc = "" if record["description"]: - desc = record["description"].decode('utf-8', errors='ignore') + desc = record["description"].decode("utf-8", errors="ignore") return [{ "timestamp": record["isodate"], diff --git a/mvt/ios/modules/mixed/sms.py b/mvt/ios/modules/mixed/sms.py index fd95407..1c82224 100644 --- a/mvt/ios/modules/mixed/sms.py +++ b/mvt/ios/modules/mixed/sms.py @@ -31,7 +31,7 @@ class SMS(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: text = record["text"].replace("\n", "\\n") return { "timestamp": record["isodate"], diff --git a/mvt/ios/modules/mixed/sms_attachments.py b/mvt/ios/modules/mixed/sms_attachments.py index a9e475c..be927d3 100644 --- a/mvt/ios/modules/mixed/sms_attachments.py +++ b/mvt/ios/modules/mixed/sms_attachments.py @@ -30,7 +30,7 @@ class SMSAttachments(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: return { "timestamp": record["isodate"], "module": self.__class__.__name__, diff --git a/mvt/ios/modules/mixed/tcc.py b/mvt/ios/modules/mixed/tcc.py index c42e505..3cd7b52 100644 --- a/mvt/ios/modules/mixed/tcc.py +++ b/mvt/ios/modules/mixed/tcc.py @@ -56,12 +56,13 @@ class TCC(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: if "last_modified" in record: if "allowed_value" in record: msg = f"Access to {record['service']} by {record['client']} {record['allowed_value']}" else: msg = f"Access to {record['service']} by {record['client']} {record['auth_value']}" + return { "timestamp": record["last_modified"], "module": self.__class__.__name__, @@ -69,6 +70,8 @@ class TCC(IOSExtraction): "data": msg } + return {} + def check_indicators(self) -> None: if not self.indicators: return diff --git a/mvt/ios/modules/mixed/webkit_session_resource_log.py b/mvt/ios/modules/mixed/webkit_session_resource_log.py index 9b58ad1..8646234 100644 --- a/mvt/ios/modules/mixed/webkit_session_resource_log.py +++ b/mvt/ios/modules/mixed/webkit_session_resource_log.py @@ -58,7 +58,7 @@ class WebkitSessionResourceLog(IOSExtraction): if not self.indicators: return - for key, entries in self.results.items(): + for _, entries in self.results.items(): for entry in entries: source_domains = self._extract_domains(entry["redirect_source"]) destination_domains = self._extract_domains(entry["redirect_destination"]) diff --git a/mvt/ios/modules/mixed/whatsapp.py b/mvt/ios/modules/mixed/whatsapp.py index 6a67e70..ffbebc6 100644 --- a/mvt/ios/modules/mixed/whatsapp.py +++ b/mvt/ios/modules/mixed/whatsapp.py @@ -30,7 +30,7 @@ class Whatsapp(IOSExtraction): results_path=results_path, fast_mode=fast_mode, log=log, results=results) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: text = record.get("ZTEXT", "").replace("\n", "\\n") links_text = "" if record["links"]: diff --git a/mvt/ios/modules/net_base.py b/mvt/ios/modules/net_base.py index 4b0f355..4906c2b 100644 --- a/mvt/ios/modules/net_base.py +++ b/mvt/ios/modules/net_base.py @@ -81,7 +81,7 @@ class NetBase(IOSExtraction): self.log.info("Extracted information on %d processes", len(self.results)) - def serialize(self, record: dict) -> None: + def serialize(self, record: dict) -> dict | list: record_data = f"{record['proc_name']} (Bundle ID: {record['bundle_id']}, ID: {record['proc_id']})" record_data_usage = record_data + f" WIFI IN: {record['wifi_in']}, WIFI OUT: {record['wifi_out']} - " \ f"WWAN IN: {record['wwan_in']}, WWAN OUT: {record['wwan_out']}" @@ -134,7 +134,7 @@ class NetBase(IOSExtraction): except PermissionError: continue - files.append([posix_path.name, posix_path.__str__()]) + files.append([posix_path.name, str(posix_path)]) for proc in self.results: if not proc["bundle_id"]: @@ -150,7 +150,7 @@ class NetBase(IOSExtraction): self.log.debug("Located at %s", binary_path) else: msg = f"Could not find the binary associated with the process with name {proc['proc_name']}" - if (proc["proc_name"] is None): + if not proc["proc_name"]: msg = f"Found process entry with empty 'proc_name': {proc['live_proc_id']} at {proc['live_isodate']}" elif len(proc["proc_name"]) == 16: msg = msg + " (However, the process name might have been truncated in the database)" @@ -209,7 +209,7 @@ class NetBase(IOSExtraction): # Set default DataUsage keys. result = {key: None for key in self.results[0].keys()} result["first_isodate"] = result["isodate"] = result["live_isodate"] = proc["prev_proc_first"] - result["proc_name"] = "MISSING [follows {}]".format(proc["prev_proc_name"]) + result["proc_name"] = f"MISSING [follows {proc['prev_proc_name']}]" result["proc_id"] = result["live_proc_id"] = proc["proc_id"] result["bundle_id"] = None diff --git a/mvt/ios/versions.py b/mvt/ios/versions.py index 81e932b..6329e07 100644 --- a/mvt/ios/versions.py +++ b/mvt/ios/versions.py @@ -247,10 +247,12 @@ IPHONE_IOS_VERSIONS = [ def get_device_desc_from_id(identifier: str, devices_list: list = IPHONE_MODELS) -> str: - for model in IPHONE_MODELS: + for model in devices_list: if identifier == model["identifier"]: return model["description"] + return "" + def find_version_by_build(build: str) -> str: build = build.upper() @@ -258,6 +260,8 @@ def find_version_by_build(build: str) -> str: if build == version["build"]: return version["version"] + return "" + def latest_ios_version() -> str: return IPHONE_IOS_VERSIONS[-1] diff --git a/setup.cfg b/setup.cfg index 9703e75..5575fee 100644 --- a/setup.cfg +++ b/setup.cfg @@ -43,7 +43,7 @@ console_scripts = mvt-android = mvt.android:cli [flake8] -max-complexit = 10 +max-complexity = 10 max-line-length = 1000 ignore = C901, @@ -52,3 +52,34 @@ ignore = E127, W503, E226 + +[pylint] +score = no +reports = no +output-format = colorized + +max-locals = 25 +max-args = 10 + +good-names = i,e,m + +min-similarity-lines=10 +ignore-comments=yes +ignore-docstrings=yes +ignore-imports=yes + +ignored-argument-names=args|kwargs + +# https://pylint.pycqa.org/en/stable/technical_reference/features.html +disable = + too-many-instance-attributes, + broad-except, + abstract-method, + dangerous-default-value, + #duplicate-code, + line-too-long, + too-few-public-methods, + missing-docstring, + missing-module-docstring, + missing-class-docstring, + missing-function-docstring,