From a935347aed954c78a56f753facadf51131ab0d76 Mon Sep 17 00:00:00 2001 From: Nex Date: Fri, 12 Aug 2022 19:14:05 +0200 Subject: [PATCH] Trying to enforce line lengths at 80/100 --- mvt/android/cli.py | 17 +++-- mvt/android/cmd_check_adb.py | 4 +- mvt/android/cmd_check_backup.py | 13 ++-- mvt/android/cmd_check_bugreport.py | 8 ++- mvt/android/cmd_download_apks.py | 3 +- mvt/android/modules/adb/base.py | 57 ++++++++++----- mvt/android/modules/adb/dumpsys_appops.py | 10 +-- .../modules/adb/dumpsys_battery_daily.py | 3 +- .../modules/adb/dumpsys_battery_history.py | 3 +- mvt/android/modules/adb/dumpsys_receivers.py | 15 ++-- mvt/android/modules/adb/getprop.py | 8 ++- mvt/android/modules/adb/sms.py | 22 +++--- mvt/android/modules/adb/whatsapp.py | 14 ++-- mvt/android/modules/backup/sms.py | 10 +-- .../modules/bugreport/accessibility.py | 9 ++- mvt/android/modules/bugreport/activities.py | 3 +- mvt/android/modules/bugreport/appops.py | 12 ++-- .../modules/bugreport/battery_daily.py | 6 +- .../modules/bugreport/battery_history.py | 3 +- mvt/android/modules/bugreport/dbinfo.py | 3 +- mvt/android/modules/bugreport/getprop.py | 11 +-- mvt/android/modules/bugreport/packages.py | 6 +- mvt/android/modules/bugreport/receivers.py | 18 +++-- mvt/android/parsers/backup.py | 43 +++++++---- mvt/android/parsers/dumpsys.py | 9 ++- mvt/common/cmd_check_iocs.py | 4 +- mvt/common/command.py | 22 +++--- mvt/common/indicators.py | 71 ++++++++++++------- mvt/common/logo.py | 12 ++-- mvt/common/module.py | 16 +++-- mvt/common/updates.py | 3 +- mvt/common/virustotal.py | 10 ++- mvt/ios/cmd_check_backup.py | 4 +- mvt/ios/cmd_check_fs.py | 4 +- setup.cfg | 12 ++-- 35 files changed, 303 insertions(+), 165 deletions(-) diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 0adb88a..edd1ba7 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -58,14 +58,16 @@ def version(): @click.option("--output", "-o", type=click.Path(exists=False), help="Specify a path to a folder where you want to store the APKs") @click.option("--from-file", "-f", type=click.Path(exists=True), - help="Instead of acquiring from phone, load an existing packages.json file for lookups (mainly for debug purposes)") + help="Instead of acquiring from phone, load an existing packages.json file for " + "lookups (mainly for debug purposes)") @click.pass_context def download_apks(ctx, all_apks, virustotal, output, from_file, serial): try: if from_file: download = DownloadAPKs.from_json(from_file) else: - # TODO: Do we actually want to be able to run without storing any file? + # TODO: Do we actually want to be able to run without storing any + # file? if not output: log.critical("You need to specify an output folder with --output!") ctx.exit(1) @@ -130,14 +132,16 @@ def check_adb(ctx, serial, iocs, output, fast, list_modules, module): @cli.command("check-bugreport", help="Check an Android Bug Report") @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, default=[], help=HELP_MSG_IOC) -@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--output", "-o", type=click.Path(exists=False), + help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) @click.argument("BUGREPORT_PATH", type=click.Path(exists=True)) @click.pass_context def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): - cmd = CmdAndroidCheckBugreport(target_path=bugreport_path, results_path=output, - ioc_files=iocs, module_name=module) + cmd = CmdAndroidCheckBugreport(target_path=bugreport_path, + results_path=output, ioc_files=iocs, + module_name=module) if list_modules: cmd.list_modules() @@ -158,7 +162,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): @cli.command("check-backup", help="Check an Android Backup") @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, default=[], help=HELP_MSG_IOC) -@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--output", "-o", type=click.Path(exists=False), + help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.argument("BACKUP_PATH", type=click.Path(exists=True)) @click.pass_context diff --git a/mvt/android/cmd_check_adb.py b/mvt/android/cmd_check_adb.py index 8527fc8..3b6003a 100644 --- a/mvt/android/cmd_check_adb.py +++ b/mvt/android/cmd_check_adb.py @@ -15,8 +15,8 @@ log = logging.getLogger(__name__) class CmdAndroidCheckADB(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/android/cmd_check_backup.py b/mvt/android/cmd_check_backup.py index 039008f..c0a200b 100644 --- a/mvt/android/cmd_check_backup.py +++ b/mvt/android/cmd_check_backup.py @@ -26,8 +26,8 @@ log = logging.getLogger(__name__) class CmdAndroidCheckBackup(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) @@ -73,13 +73,16 @@ class CmdAndroidCheckBackup(Command): self.target_path = Path(self.target_path).absolute().as_posix() for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): for fname in subfiles: - self.backup_files.append(os.path.relpath(os.path.join(root, fname), self.target_path)) + self.backup_files.append(os.path.relpath(os.path.join(root, fname), + self.target_path)) else: - log.critical("Invalid backup path, path should be a folder or an Android Backup (.ab) file") + log.critical("Invalid backup path, path should be a folder or an " + "Android Backup (.ab) file") sys.exit(1) def module_init(self, module: Callable) -> None: if self.backup_type == "folder": module.from_folder(self.target_path, self.backup_files) else: - module.from_ab(self.target_path, self.backup_archive, self.backup_files) + module.from_ab(self.target_path, self.backup_archive, + self.backup_files) diff --git a/mvt/android/cmd_check_bugreport.py b/mvt/android/cmd_check_bugreport.py index 52cf99f..eb266bb 100644 --- a/mvt/android/cmd_check_bugreport.py +++ b/mvt/android/cmd_check_bugreport.py @@ -19,8 +19,8 @@ log = logging.getLogger(__name__) class CmdAndroidCheckBugreport(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) @@ -43,7 +43,9 @@ class CmdAndroidCheckBugreport(Command): parent_path = Path(self.target_path).absolute().as_posix() for root, _, subfiles in os.walk(os.path.abspath(self.target_path)): for file_name in subfiles: - self.bugreport_files.append(os.path.relpath(os.path.join(root, file_name), parent_path)) + file_path = os.path.relpath(os.path.join(root, file_name), + parent_path) + self.bugreport_files.append(file_path) def module_init(self, module: Callable) -> None: if self.bugreport_format == "zip": diff --git a/mvt/android/cmd_download_apks.py b/mvt/android/cmd_download_apks.py index 64e3eb8..3c20c36 100644 --- a/mvt/android/cmd_download_apks.py +++ b/mvt/android/cmd_download_apks.py @@ -78,7 +78,8 @@ class DownloadAPKs(AndroidExtraction): try: self._adb_download(remote_path, local_path) except InsufficientPrivileges: - log.error("Unable to pull package file from %s: insufficient privileges, it might be a system app", + log.error("Unable to pull package file from %s: insufficient " + "privileges, it might be a system app", remote_path) self._adb_reconnect() return None diff --git a/mvt/android/modules/adb/base.py b/mvt/android/modules/adb/base.py index 6804e6c..5721d36 100644 --- a/mvt/android/modules/adb/base.py +++ b/mvt/android/modules/adb/base.py @@ -73,13 +73,15 @@ class AndroidExtraction(MVTModule): try: self.device = AdbDeviceUsb(serial=self.serial) except UsbDeviceNotFoundError: - self.log.critical("No device found. Make sure it is connected and unlocked.") + self.log.critical("No device found. Make sure it is connected " + "and unlocked.") sys.exit(-1) # Otherwise we try to use the TCP transport. else: addr = self.serial.split(":") if len(addr) < 2: - raise ValueError("TCP serial number must follow the format: `address:port`") + raise ValueError("TCP serial number must follow the format: " + "`address:port`") self.device = AdbDeviceTcp(addr[0], int(addr[1]), default_transport_timeout_s=30.) @@ -88,17 +90,21 @@ class AndroidExtraction(MVTModule): try: self.device.connect(rsa_keys=[signer], auth_timeout_s=5) except (USBErrorBusy, USBErrorAccess): - self.log.critical("Device is busy, maybe run `adb kill-server` and try again.") + self.log.critical("Device is busy, maybe run `adb kill-server` " + "and try again.") sys.exit(-1) except DeviceAuthError: - self.log.error("You need to authorize this computer on the Android device. Retrying in 5 seconds...") + self.log.error("You need to authorize this computer on the " + "Android device. Retrying in 5 seconds...") time.sleep(5) except UsbReadFailedError: - self.log.error("Unable to connect to the device over USB. Try to unplug, plug the device and start again.") + self.log.error("Unable to connect to the device over USB. " + "Try to unplug, plug the device and start again.") sys.exit(-1) except OSError as e: if e.errno == 113 and self.serial: - self.log.critical("Unable to connect to the device %s: did you specify the correct IP addres?", + self.log.critical("Unable to connect to the device %s: " + "did you specify the correct IP addres?", self.serial) sys.exit(-1) else: @@ -135,7 +141,9 @@ class AndroidExtraction(MVTModule): def _adb_root_or_die(self) -> None: """Check if we have a `su` binary, otherwise raise an Exception.""" if not self._adb_check_if_root(): - raise InsufficientPrivileges("This module is optionally available in case the device is already rooted. Do NOT root your own device!") + raise InsufficientPrivileges("This module is optionally available " + "in case the device is already rooted." + " Do NOT root your own device!") def _adb_command_as_root(self, command): """Execute an adb shell command. @@ -170,7 +178,8 @@ class AndroidExtraction(MVTModule): :param remote_path: Path to download from the device :param local_path: Path to where to locally store the copy of the file - :param progress_callback: Callback for download progress bar (Default value = None) + :param progress_callback: Callback for download progress bar + (Default value = None) :param retry_root: Default value = True) """ @@ -178,7 +187,8 @@ class AndroidExtraction(MVTModule): self.device.pull(remote_path, local_path, progress_callback) except AdbCommandFailureException as e: if retry_root: - self._adb_download_root(remote_path, local_path, progress_callback) + self._adb_download_root(remote_path, local_path, + progress_callback) else: raise Exception(f"Unable to download file {remote_path}: {e}") @@ -189,7 +199,10 @@ class AndroidExtraction(MVTModule): self._adb_root_or_die() # We generate a random temporary filename. - tmp_filename = "tmp_" + ''.join(random.choices(string.ascii_uppercase + string.ascii_lowercase + string.digits, k=10)) + allowed_chars = (string.ascii_uppercase + + string.ascii_lowercase + + string.digits) + tmp_filename = "tmp_" + ''.join(random.choices(allowed_chars, k=10)) # We create a temporary local file. new_remote_path = f"/sdcard/{tmp_filename}" @@ -203,7 +216,8 @@ class AndroidExtraction(MVTModule): # We download from /sdcard/ to the local temporary file. # If it doesn't work now, don't try again (retry_root=False) - self._adb_download(new_remote_path, local_path, progress_callback, retry_root=False) + self._adb_download(new_remote_path, local_path, progress_callback, + retry_root=False) # Delete the copy on /sdcard/. self._adb_command(f"rm -rf {new_remote_path}") @@ -253,27 +267,34 @@ class AndroidExtraction(MVTModule): self._adb_disconnect() def _generate_backup(self, package_name: str) -> bytes: - self.log.warning("Please check phone and accept Android backup prompt. You may need to set a backup password. \a") + self.log.warning("Please check phone and accept Android backup prompt. " + "You may need to set a backup password. \a") - # TODO: Base64 encoding as temporary fix to avoid byte-mangling over the shell transport... - backup_output_b64 = self._adb_command(f"/system/bin/bu backup -nocompress '{package_name}' | base64") + # TODO: Base64 encoding as temporary fix to avoid byte-mangling over + # the shell transport... + cmd = f"/system/bin/bu backup -nocompress '{package_name}' | base64" + backup_output_b64 = self._adb_command(cmd) backup_output = base64.b64decode(backup_output_b64) header = parse_ab_header(backup_output) if not header["backup"]: - self.log.error("Extracting SMS via Android backup failed. No valid backup data found.") + self.log.error("Extracting SMS via Android backup failed. " + "No valid backup data found.") return None if header["encryption"] == "none": return parse_backup_file(backup_output, password=None) for _ in range(0, 3): - backup_password = Prompt.ask("Enter backup password", password=True) + backup_password = Prompt.ask("Enter backup password", + password=True) try: - decrypted_backup_tar = parse_backup_file(backup_output, backup_password) + decrypted_backup_tar = parse_backup_file(backup_output, + backup_password) return decrypted_backup_tar except InvalidBackupPassword: - self.log.error("You provided the wrong password! Please try again...") + self.log.error("You provided the wrong password! " + "Please try again...") self.log.warn("All attempts to decrypt backup with password failed!") diff --git a/mvt/android/modules/adb/dumpsys_appops.py b/mvt/android/modules/adb/dumpsys_appops.py index e8b8b17..88a576c 100644 --- a/mvt/android/modules/adb/dumpsys_appops.py +++ b/mvt/android/modules/adb/dumpsys_appops.py @@ -36,7 +36,8 @@ class DumpsysAppOps(AndroidExtraction): "timestamp": entry["timestamp"], "module": self.__class__.__name__, "event": entry["access"], - "data": f"{record['package_name']} access to {perm['name']}: {entry['access']}", + "data": f"{record['package_name']} access to " + f"{perm['name']}: {entry['access']}", }) return records @@ -51,9 +52,10 @@ class DumpsysAppOps(AndroidExtraction): continue for perm in result["permissions"]: - if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow": - self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", - result["package_name"]) + if (perm["name"] == "REQUEST_INSTALL_PACKAGES" + and perm["access"] == "allow"): + self.log.info("Package %s with REQUEST_INSTALL_PACKAGES " + "permission", result["package_name"]) def run(self) -> None: self._adb_connect() diff --git a/mvt/android/modules/adb/dumpsys_battery_daily.py b/mvt/android/modules/adb/dumpsys_battery_daily.py index 55d28d3..3eacb92 100644 --- a/mvt/android/modules/adb/dumpsys_battery_daily.py +++ b/mvt/android/modules/adb/dumpsys_battery_daily.py @@ -27,7 +27,8 @@ class DumpsysBatteryDaily(AndroidExtraction): "timestamp": record["from"], "module": self.__class__.__name__, "event": "battery_daily", - "data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" + "data": f"Recorded update of package {record['package_name']} " + f"with vers {record['vers']}" } def check_indicators(self) -> None: diff --git a/mvt/android/modules/adb/dumpsys_battery_history.py b/mvt/android/modules/adb/dumpsys_battery_history.py index 459a4bf..c849ac9 100644 --- a/mvt/android/modules/adb/dumpsys_battery_history.py +++ b/mvt/android/modules/adb/dumpsys_battery_history.py @@ -39,4 +39,5 @@ class DumpsysBatteryHistory(AndroidExtraction): self.results = parse_dumpsys_battery_history(output) - self.log.info("Extracted %d records from battery history", len(self.results)) + self.log.info("Extracted %d records from battery history", + len(self.results)) diff --git a/mvt/android/modules/adb/dumpsys_receivers.py b/mvt/android/modules/adb/dumpsys_receivers.py index 17b51f5..f4e426a 100644 --- a/mvt/android/modules/adb/dumpsys_receivers.py +++ b/mvt/android/modules/adb/dumpsys_receivers.py @@ -36,19 +36,24 @@ class DumpsysReceivers(AndroidExtraction): for intent, receivers in self.results.items(): for receiver in receivers: if intent == INTENT_NEW_OUTGOING_SMS: - self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"", + self.log.info("Found a receiver to intercept " + "outgoing SMS messages: \"%s\"", receiver["receiver"]) elif intent == INTENT_SMS_RECEIVED: - self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"", + self.log.info("Found a receiver to intercept " + "incoming SMS messages: \"%s\"", receiver["receiver"]) elif intent == INTENT_DATA_SMS_RECEIVED: - self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"", + self.log.info("Found a receiver to intercept " + "incoming data SMS message: \"%s\"", receiver["receiver"]) elif intent == INTENT_PHONE_STATE: - self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"", + self.log.info("Found a receiver monitoring " + "telephony state/incoming calls: \"%s\"", receiver["receiver"]) elif intent == INTENT_NEW_OUTGOING_CALL: - self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", + self.log.info("Found a receiver monitoring " + "outgoing calls: \"%s\"", receiver["receiver"]) ioc = self.indicators.check_app_id(receiver["package_name"]) diff --git a/mvt/android/modules/adb/getprop.py b/mvt/android/modules/adb/getprop.py index 61a0c47..d4bc1de 100644 --- a/mvt/android/modules/adb/getprop.py +++ b/mvt/android/modules/adb/getprop.py @@ -36,7 +36,9 @@ class Getprop(AndroidExtraction): if security_patch: patch_date = datetime.strptime(security_patch, "%Y-%m-%d") if (datetime.now() - patch_date) > timedelta(days=6*30): - self.log.warning("This phone has not received security updates for more than " - "six months (last update: %s)", security_patch) + self.log.warning("This phone has not received security updates " + "for more than six months (last update: %s)", + security_patch) - self.log.info("Extracted %d Android system properties", len(self.results)) + self.log.info("Extracted %d Android system properties", + len(self.results)) diff --git a/mvt/android/modules/adb/sms.py b/mvt/android/modules/adb/sms.py index 4f3f93c..59e757d 100644 --- a/mvt/android/modules/adb/sms.py +++ b/mvt/android/modules/adb/sms.py @@ -109,11 +109,12 @@ class SMS(AndroidExtraction): cur.close() conn.close() - self.log.info("Extracted a total of %d SMS messages containing links", len(self.results)) + self.log.info("Extracted a total of %d SMS messages containing links", + len(self.results)) def _extract_sms_adb(self) -> None: - """Use the Android backup command to extract SMS data from the native SMS - app. + """Use the Android backup command to extract SMS data from the native + SMS app. It is crucial to use the under-documented "-nocompress" flag to disable the non-standard Java compression algorithm. This module only supports @@ -126,8 +127,9 @@ class SMS(AndroidExtraction): try: self.results = parse_tar_for_sms(backup_tar) except AndroidBackupParsingError: - self.log.info("Impossible to read SMS from the Android Backup, please extract " - "the SMS and try extracting it with Android Backup Extractor") + self.log.info("Impossible to read SMS from the Android Backup, " + "please extract the SMS and try extracting it with " + "Android Backup Extractor") return self.log.info("Extracted a total of %d SMS messages containing links", @@ -137,14 +139,16 @@ class SMS(AndroidExtraction): try: if (self._adb_check_file_exists(os.path.join("/", SMS_BUGLE_PATH))): self.sms_db_type = 1 - self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), self._parse_db) + self._adb_process_file(os.path.join("/", SMS_BUGLE_PATH), + self._parse_db) elif (self._adb_check_file_exists(os.path.join("/", SMS_MMSSMS_PATH))): self.sms_db_type = 2 - self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), self._parse_db) + self._adb_process_file(os.path.join("/", SMS_MMSSMS_PATH), + self._parse_db) return except InsufficientPrivileges: pass - self.log.warn("No SMS database found. Trying extraction of SMS data using " - "Android backup feature.") + self.log.warn("No SMS database found. Trying extraction of SMS data " + "using Android backup feature.") self._extract_sms_adb() diff --git a/mvt/android/modules/adb/whatsapp.py b/mvt/android/modules/adb/whatsapp.py index a715749..ec8e5ee 100644 --- a/mvt/android/modules/adb/whatsapp.py +++ b/mvt/android/modules/adb/whatsapp.py @@ -73,22 +73,24 @@ class Whatsapp(AndroidExtraction): message["direction"] = ("send" if message["key_from_me"] == 1 else "received") message["isodate"] = convert_timestamp_to_iso(message["timestamp"]) - # If we find links in the messages or if they are empty we add them to the list. + # If we find links in the messages or if they are empty we add them + # to the list. if check_for_links(message["data"]) or message["data"].strip() == "": - if message.get('thumb_image'): - message['thumb_image'] = base64.b64encode(message['thumb_image']) + if message.get("thumb_image"): + message["thumb_image"] = base64.b64encode(message["thumb_image"]) messages.append(message) cur.close() conn.close() - self.log.info("Extracted a total of %d WhatsApp messages containing links", - len(messages)) + self.log.info("Extracted a total of %d WhatsApp messages " + "containing links", len(messages)) self.results = messages def run(self) -> None: try: - self._adb_process_file(os.path.join("/", WHATSAPP_PATH), self._parse_db) + self._adb_process_file(os.path.join("/", WHATSAPP_PATH), + self._parse_db) except Exception as e: self.log.error(e) diff --git a/mvt/android/modules/backup/sms.py b/mvt/android/modules/backup/sms.py index a8b03f5..5cc8aca 100644 --- a/mvt/android/modules/backup/sms.py +++ b/mvt/android/modules/backup/sms.py @@ -31,15 +31,17 @@ class SMS(BackupExtraction): self.detected.append(message) def run(self) -> None: - for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_sms_backup"): + sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup" + for file in self._get_files_by_pattern(sms_path): self.log.info("Processing SMS backup file at %s", file) data = self._get_file_content(file) self.results.extend(parse_sms_file(data)) - for file in self._get_files_by_pattern("apps/com.android.providers.telephony/d_f/*_mms_backup"): + mms_path = "apps/com.android.providers.telephony/d_f/*_mms_backup" + for file in self._get_files_by_pattern(mms_path): self.log.info("Processing MMS backup file at %s", file) data = self._get_file_content(file) self.results.extend(parse_sms_file(data)) - self.log.info("Extracted a total of %d SMS & MMS messages containing links", - len(self.results)) + self.log.info("Extracted a total of %d SMS & MMS messages " + "containing links", len(self.results)) diff --git a/mvt/android/modules/bugreport/accessibility.py b/mvt/android/modules/bugreport/accessibility.py index 5697c06..7171e36 100644 --- a/mvt/android/modules/bugreport/accessibility.py +++ b/mvt/android/modules/bugreport/accessibility.py @@ -35,7 +35,8 @@ class Accessibility(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return lines = [] @@ -55,6 +56,8 @@ class Accessibility(BugReportModule): self.results = parse_dumpsys_accessibility("\n".join(lines)) for result in self.results: - self.log.info("Found installed accessibility service \"%s\"", result.get("service")) + self.log.info("Found installed accessibility service \"%s\"", + result.get("service")) - self.log.info("Identified a total of %d accessibility services", len(self.results)) + self.log.info("Identified a total of %d accessibility services", + len(self.results)) diff --git a/mvt/android/modules/bugreport/activities.py b/mvt/android/modules/bugreport/activities.py index 3fec673..de0c730 100644 --- a/mvt/android/modules/bugreport/activities.py +++ b/mvt/android/modules/bugreport/activities.py @@ -38,7 +38,8 @@ class Activities(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return lines = [] diff --git a/mvt/android/modules/bugreport/appops.py b/mvt/android/modules/bugreport/appops.py index e9d8edf..df411df 100644 --- a/mvt/android/modules/bugreport/appops.py +++ b/mvt/android/modules/bugreport/appops.py @@ -34,7 +34,8 @@ class Appops(BugReportModule): "timestamp": entry["timestamp"], "module": self.__class__.__name__, "event": entry["access"], - "data": f"{record['package_name']} access to {perm['name']}: {entry['access']}", + "data": f"{record['package_name']} access to " + f"{perm['name']}: {entry['access']}", }) return records @@ -49,13 +50,16 @@ class Appops(BugReportModule): continue for perm in result["permissions"]: - if perm["name"] == "REQUEST_INSTALL_PACKAGES" and perm["access"] == "allow": - self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", result["package_name"]) + if (perm["name"] == "REQUEST_INSTALL_PACKAGES" + and perm["access"] == "allow"): + self.log.info("Package %s with REQUEST_INSTALL_PACKAGES permission", + result["package_name"]) def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return lines = [] diff --git a/mvt/android/modules/bugreport/battery_daily.py b/mvt/android/modules/bugreport/battery_daily.py index c22a2fd..173cb16 100644 --- a/mvt/android/modules/bugreport/battery_daily.py +++ b/mvt/android/modules/bugreport/battery_daily.py @@ -27,7 +27,8 @@ class BatteryDaily(BugReportModule): "timestamp": record["from"], "module": self.__class__.__name__, "event": "battery_daily", - "data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" + "data": f"Recorded update of package {record['package_name']} " + f"with vers {record['vers']}" } def check_indicators(self) -> None: @@ -44,7 +45,8 @@ class BatteryDaily(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return lines = [] diff --git a/mvt/android/modules/bugreport/battery_history.py b/mvt/android/modules/bugreport/battery_history.py index e431e1d..ad243d9 100644 --- a/mvt/android/modules/bugreport/battery_history.py +++ b/mvt/android/modules/bugreport/battery_history.py @@ -35,7 +35,8 @@ class BatteryHistory(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide " + "a valid bug report archive?") return lines = [] diff --git a/mvt/android/modules/bugreport/dbinfo.py b/mvt/android/modules/bugreport/dbinfo.py index e15a83f..0047926 100644 --- a/mvt/android/modules/bugreport/dbinfo.py +++ b/mvt/android/modules/bugreport/dbinfo.py @@ -39,7 +39,8 @@ class DBInfo(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return in_dbinfo = False diff --git a/mvt/android/modules/bugreport/getprop.py b/mvt/android/modules/bugreport/getprop.py index f08b069..92f65e3 100644 --- a/mvt/android/modules/bugreport/getprop.py +++ b/mvt/android/modules/bugreport/getprop.py @@ -27,7 +27,8 @@ class Getprop(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return lines = [] @@ -52,7 +53,9 @@ class Getprop(BugReportModule): if security_patch: patch_date = datetime.strptime(security_patch, "%Y-%m-%d") if (datetime.now() - patch_date) > timedelta(days=6*30): - self.log.warning("This phone has not received security updates for more than " - "six months (last update: %s)", security_patch) + self.log.warning("This phone has not received security updates " + "for more than six months (last update: %s)", + security_patch) - self.log.info("Extracted %d Android system properties", len(self.results)) + self.log.info("Extracted %d Android system properties", + len(self.results)) diff --git a/mvt/android/modules/bugreport/packages.py b/mvt/android/modules/bugreport/packages.py index 3299a50..2e2267e 100644 --- a/mvt/android/modules/bugreport/packages.py +++ b/mvt/android/modules/bugreport/packages.py @@ -47,7 +47,8 @@ class Packages(BugReportModule): def check_indicators(self) -> None: for result in self.results: if result["package_name"] in ROOT_PACKAGES: - self.log.warning("Found an installed package related to rooting/jailbreaking: \"%s\"", + self.log.warning("Found an installed package related to " + "rooting/jailbreaking: \"%s\"", result["package_name"]) self.detected.append(result) continue @@ -147,7 +148,8 @@ class Packages(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return in_package = False diff --git a/mvt/android/modules/bugreport/receivers.py b/mvt/android/modules/bugreport/receivers.py index 360f2b8..034911b 100644 --- a/mvt/android/modules/bugreport/receivers.py +++ b/mvt/android/modules/bugreport/receivers.py @@ -36,19 +36,24 @@ class Receivers(BugReportModule): for intent, receivers in self.results.items(): for receiver in receivers: if intent == INTENT_NEW_OUTGOING_SMS: - self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"", + self.log.info("Found a receiver to intercept " + "outgoing SMS messages: \"%s\"", receiver["receiver"]) elif intent == INTENT_SMS_RECEIVED: - self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"", + self.log.info("Found a receiver to intercept " + "incoming SMS messages: \"%s\"", receiver["receiver"]) elif intent == INTENT_DATA_SMS_RECEIVED: - self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"", + self.log.info("Found a receiver to intercept " + "incoming data SMS message: \"%s\"", receiver["receiver"]) elif intent == INTENT_PHONE_STATE: - self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"", + self.log.info("Found a receiver monitoring " + "telephony state/incoming calls: \"%s\"", receiver["receiver"]) elif intent == INTENT_NEW_OUTGOING_CALL: - self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", + self.log.info("Found a receiver monitoring " + "outgoing calls: \"%s\"", receiver["receiver"]) ioc = self.indicators.check_app_id(receiver["package_name"]) @@ -60,7 +65,8 @@ class Receivers(BugReportModule): def run(self) -> None: content = self._get_dumpstate_file() if not content: - self.log.error("Unable to find dumpstate file. Did you provide a valid bug report archive?") + self.log.error("Unable to find dumpstate file. Did you provide a " + "valid bug report archive?") return in_receivers = False diff --git a/mvt/android/parsers/backup.py b/mvt/android/parsers/backup.py index 1d38b32..a1f65fe 100644 --- a/mvt/android/parsers/backup.py +++ b/mvt/android/parsers/backup.py @@ -30,6 +30,8 @@ class InvalidBackupPassword(AndroidBackupParsingError): pass +# TODO: Need to clean all the following code and conform it to the coding style. + def to_utf8_bytes(input_bytes): output = [] for byte in input_bytes: @@ -65,13 +67,15 @@ def parse_ab_header(data): } -def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_blob, format_version, checksum_salt): +def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, + master_key_blob, format_version, checksum_salt): """Generate AES key from user password uisng PBKDF2 The backup master key is extracted from the master key blog after decryption. """ # Derive key from password using PBKDF2. - kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, iterations=pbkdf2_rounds) + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=user_salt, + iterations=pbkdf2_rounds) key = kdf.derive(password.encode("utf-8")) # Decrypt master key blob. @@ -100,7 +104,8 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b hmac_mk = master_key # Derive checksum to confirm successful backup decryption. - kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt, iterations=pbkdf2_rounds) + kdf = PBKDF2HMAC(algorithm=hashes.SHA1(), length=32, salt=checksum_salt, + iterations=pbkdf2_rounds) calculated_checksum = kdf.derive(hmac_mk) if master_key_checksum != calculated_checksum: @@ -109,7 +114,8 @@ def decrypt_master_key(password, user_salt, user_iv, pbkdf2_rounds, master_key_b return master_key, master_iv -def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_version): +def decrypt_backup_data(encrypted_backup, password, encryption_algo, + format_version): """ Generate encryption keyffrom password and do decryption @@ -120,7 +126,9 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers if password is None: raise InvalidBackupPassword() - [user_salt, checksum_salt, pbkdf2_rounds, user_iv, master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5) + [user_salt, checksum_salt, pbkdf2_rounds, user_iv, + master_key_blob, encrypted_data] = encrypted_backup.split(b"\n", 5) + user_salt = bytes.fromhex(user_salt.decode("utf-8")) checksum_salt = bytes.fromhex(checksum_salt.decode("utf-8")) pbkdf2_rounds = int(pbkdf2_rounds) @@ -128,9 +136,13 @@ def decrypt_backup_data(encrypted_backup, password, encryption_algo, format_vers master_key_blob = bytes.fromhex(master_key_blob.decode("utf-8")) # Derive decryption master key from password. - master_key, master_iv = decrypt_master_key(password=password, user_salt=user_salt, user_iv=user_iv, - pbkdf2_rounds=pbkdf2_rounds, master_key_blob=master_key_blob, - format_version=format_version, checksum_salt=checksum_salt) + master_key, master_iv = decrypt_master_key(password=password, + user_salt=user_salt, + user_iv=user_iv, + pbkdf2_rounds=pbkdf2_rounds, + master_key_blob=master_key_blob, + format_version=format_version, + checksum_salt=checksum_salt) # Decrypt and unpad backup data using derivied key. cipher = Cipher(algorithms.AES(master_key), modes.CBC(master_iv)) @@ -149,12 +161,15 @@ def parse_backup_file(data, password=None): if not data.startswith(b"ANDROID BACKUP"): raise AndroidBackupParsingError("Invalid file header") - [_, version, is_compressed, encryption_algo, tar_data] = data.split(b"\n", 4) + [_, version, is_compressed, + encryption_algo, tar_data] = data.split(b"\n", 4) + version = int(version) is_compressed = int(is_compressed) if encryption_algo != b"none": - tar_data = decrypt_backup_data(tar_data, password, encryption_algo, format_version=version) + tar_data = decrypt_backup_data(tar_data, password, encryption_algo, + format_version=version) if is_compressed: try: @@ -175,8 +190,9 @@ def parse_tar_for_sms(data): res = [] with tarfile.open(fileobj=dbytes) as tar: for member in tar.getmembers(): - if member.name.startswith("apps/com.android.providers.telephony/d_f/") and \ - (member.name.endswith("_sms_backup") or member.name.endswith("_mms_backup")): + if (member.name.startswith("apps/com.android.providers.telephony/d_f/") + and (member.name.endswith("_sms_backup") + or member.name.endswith("_mms_backup"))): dhandler = tar.extractfile(member) res.extend(parse_sms_file(dhandler.read())) @@ -204,7 +220,8 @@ def parse_sms_file(data): entry["isodate"] = convert_timestamp_to_iso(utc_timestamp) entry["direction"] = ("sent" if int(entry["date_sent"]) else "received") - # If we find links in the messages or if they are empty we add them to the list. + # If we find links in the messages or if they are empty we add them to + # the list. if message_links or entry["body"].strip() == "": entry["links"] = message_links res.append(entry) diff --git a/mvt/android/parsers/dumpsys.py b/mvt/android/parsers/dumpsys.py index 6e16237..4e5dd5e 100644 --- a/mvt/android/parsers/dumpsys.py +++ b/mvt/android/parsers/dumpsys.py @@ -61,7 +61,8 @@ def parse_dumpsys_activity_resolver_table(output: str) -> dict: break # We detect the action name. - if line.startswith(" " * 6) and not line.startswith(" " * 8) and ":" in line: + if (line.startswith(" " * 6) and not line.startswith(" " * 8) + and ":" in line): intent = line.strip().replace(":", "") results[intent] = [] continue @@ -117,7 +118,8 @@ def parse_dumpsys_battery_daily(output: str) -> list: already_seen = False for update in daily_updates: - if package_name == update["package_name"] and vers_nr == update["vers"]: + if (package_name == update["package_name"] + and vers_nr == update["vers"]): already_seen = True break @@ -261,7 +263,8 @@ def parse_dumpsys_receiver_resolver_table(output: str) -> dict: break # We detect the action name. - if line.startswith(" " * 6) and not line.startswith(" " * 8) and ":" in line: + if (line.startswith(" " * 6) and not line.startswith(" " * 8) + and ":" in line): intent = line.strip().replace(":", "") results[intent] = [] continue diff --git a/mvt/common/cmd_check_iocs.py b/mvt/common/cmd_check_iocs.py index 5718253..f8d44a4 100644 --- a/mvt/common/cmd_check_iocs.py +++ b/mvt/common/cmd_check_iocs.py @@ -14,8 +14,8 @@ log = logging.getLogger(__name__) class CmdCheckIOCS(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/common/command.py b/mvt/common/command.py index 40f6865..518ab46 100644 --- a/mvt/common/command.py +++ b/mvt/common/command.py @@ -20,8 +20,8 @@ from mvt.common.version import MVT_VERSION class Command: def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False, + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False, log: logging.Logger = logging.getLogger(__name__)): self.name = "" self.modules = [] @@ -57,7 +57,8 @@ class Command: if not self.results_path: return - file_handler = logging.FileHandler(os.path.join(self.results_path, "command.log")) + file_handler = logging.FileHandler(os.path.join(self.results_path, + "command.log")) formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) @@ -73,7 +74,8 @@ class Command: if len(self.timeline_detected) > 0: save_timeline(self.timeline_detected, - os.path.join(self.results_path, "timeline_detected.csv")) + os.path.join(self.results_path, + "timeline_detected.csv")) def _store_info(self) -> None: if not self.results_path: @@ -118,10 +120,12 @@ class Command: with open(file_path, "rb") as handle: sha256.update(handle.read()) except FileNotFoundError: - self.log.error("Failed to hash the file %s: might be a symlink", file_path) + self.log.error("Failed to hash the file %s: might be a symlink", + file_path) continue except PermissionError: - self.log.error("Failed to hash the file %s: permission denied", file_path) + self.log.error("Failed to hash the file %s: permission denied", + file_path) continue info["hashes"].append({ @@ -129,11 +133,13 @@ class Command: "sha256": sha256.hexdigest(), }) - with open(os.path.join(self.results_path, "info.json"), "w+", encoding="utf-8") as handle: + info_path = os.path.join(self.results_path, "info.json") + with open(info_path, "w+", encoding="utf-8") as handle: json.dump(info, handle, indent=4) def list_modules(self) -> None: - self.log.info("Following is the list of available %s modules:", self.name) + self.log.info("Following is the list of available %s modules:", + self.name) for module in self.modules: self.log.info(" - %s", module.__name__) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 9911e68..1de6227 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -32,7 +32,8 @@ class Indicators: for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER): if ioc_file_name.lower().endswith(".stix2"): - self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, ioc_file_name)) + self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, + ioc_file_name)) def _check_stix2_env_variable(self) -> None: """ @@ -49,8 +50,9 @@ class Indicators: self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s", path) - def _new_collection(self, cid: str = "", name: str = "", description: str = "", - file_name: str = "", file_path: str = "") -> dict: + def _new_collection(self, cid: str = "", name: str = "", + description: str = "", file_name: str = "", + file_path: str = "") -> dict: return { "id": cid, "name": name, @@ -68,7 +70,8 @@ class Indicators: "count": 0, } - def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None: + def _add_indicator(self, ioc: str, ioc_coll: dict, + ioc_coll_list: list) -> None: ioc = ioc.strip("'") if ioc not in ioc_coll_list: ioc_coll_list.append(ioc) @@ -181,7 +184,8 @@ class Indicators: self.ioc_collections.extend(collections) - def load_indicators_files(self, files: list, load_default: bool = True) -> None: + def load_indicators_files(self, files: list, + load_default: bool = True) -> None: """ Load a list of indicators files. """ @@ -197,7 +201,8 @@ class Indicators: self._load_downloaded_indicators() self._check_stix2_env_variable() - self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) + self.log.info("Loaded a total of %d unique indicators", + self.total_ioc_count) def get_iocs(self, ioc_type: str) -> Union[dict, None]: for ioc_collection in self.ioc_collections: @@ -237,7 +242,8 @@ class Indicators: # Now we check for any nested URL shorteners. dest_url = URL(unshortened) if dest_url.check_if_shortened(): - # self.log.info("Original URL %s appears to shorten another shortened URL %s ... checking!", + # self.log.info("Original URL %s appears to shorten another " + # "shortened URL %s ... checking!", # orig_url.url, dest_url.url) return self.check_domain(dest_url.url) @@ -250,7 +256,8 @@ class Indicators: # match. for ioc in self.get_iocs("domains"): if ioc["value"].lower() in url: - self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"", + self.log.warning("Maybe found a known suspicious domain %s " + "matching indicators from \"%s\"", url, ioc["name"]) return ioc @@ -262,10 +269,12 @@ class Indicators: # First we check the full domain. if final_url.domain.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning("Found a known suspicious domain %s shortened as %s matching indicators from \"%s\"", + self.log.warning("Found a known suspicious domain %s shortened as %s matching " + "indicators from \"%s\"", final_url.url, orig_url.url, ioc["name"]) else: - self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"", + self.log.warning("Found a known suspicious domain %s " + "matching indicators from \"%s\"", final_url.url, ioc["name"]) return ioc @@ -273,10 +282,12 @@ class Indicators: # Then we just check the top level domain. if final_url.top_level.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning("Found a sub-domain with suspicious top level %s shortened as %s matching indicators from \"%s\"", + self.log.warning("Found a sub-domain with suspicious top level %s shortened " + "as %s matching indicators from \"%s\"", final_url.url, orig_url.url, ioc["name"]) else: - self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"", + self.log.warning("Found a sub-domain with a suspicious top level %s matching " + "indicators from \"%s\"", final_url.url, ioc["name"]) return ioc @@ -316,13 +327,15 @@ class Indicators: proc_name = os.path.basename(process) for ioc in self.get_iocs("processes"): if proc_name == ioc["value"]: - self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious process name \"%s\" " + "matching indicators from \"%s\"", process, ioc["name"]) return ioc if len(proc_name) == 16: if ioc["value"].startswith(proc_name): - self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a truncated known suspicious process name \"%s\" " + "matching indicators from \"%s\"", process, ioc["name"]) return ioc @@ -360,7 +373,8 @@ class Indicators: for ioc in self.get_iocs("emails"): if email.lower() == ioc["value"].lower(): - self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious email address \"%s\"" + " matching indicators from \"%s\"", email, ioc["name"]) return ioc @@ -380,14 +394,16 @@ class Indicators: for ioc in self.get_iocs("file_names"): if ioc["value"] == file_name: - self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious file name \"%s\" " + "matching indicators from \"%s\"", file_name, ioc["name"]) return ioc return None def check_file_path(self, file_path: str) -> Union[dict, None]: - """Check the provided file path against the list of file indicators (both path and name). + """Check the provided file path against the list of file indicators + (both path and name). :param file_path: File path or file name to check against file indicators @@ -403,18 +419,22 @@ class Indicators: return ioc for ioc in self.get_iocs("file_paths"): - # Strip any trailing slash from indicator paths to match directories. + # Strip any trailing slash from indicator paths to match + # directories. if file_path.startswith(ioc["value"].rstrip("/")): - self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"", + self.log.warning("Found a known suspicious file path \"%s\" " + "matching indicators form \"%s\"", file_path, ioc["name"]) return ioc return None def check_profile(self, profile_uuid: str) -> Union[dict, None]: - """Check the provided configuration profile UUID against the list of indicators. + """Check the provided configuration profile UUID against the list of + indicators. - :param profile_uuid: Profile UUID to check against configuration profile indicators + :param profile_uuid: Profile UUID to check against configuration profile + indicators :type profile_uuid: str :returns: Indicator details if matched, otherwise None @@ -424,7 +444,8 @@ class Indicators: for ioc in self.get_iocs("ios_profile_ids"): if profile_uuid in ioc["value"]: - self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious profile ID \"%s\" " + "matching indicators from \"%s\"", profile_uuid, ioc["name"]) return ioc @@ -443,7 +464,8 @@ class Indicators: for ioc in self.get_iocs("files_sha256"): if file_hash.lower() == ioc["value"].lower(): - self.log.warning("Found a known suspicious file with hash \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious file with hash \"%s\" matching " + "indicators from \"%s\"", file_hash, ioc["name"]) return ioc @@ -463,7 +485,8 @@ class Indicators: for ioc in self.get_iocs("app_ids"): if app_id.lower() == ioc["value"].lower(): - self.log.warning("Found a known suspicious app with ID \"%s\" matching indicators from \"%s\"", + self.log.warning("Found a known suspicious app with ID \"%s\" matching " + "indicators from \"%s\"", app_id, ioc["name"]) return ioc diff --git a/mvt/common/logo.py b/mvt/common/logo.py index 9505ecc..d7e4664 100644 --- a/mvt/common/logo.py +++ b/mvt/common/logo.py @@ -18,7 +18,8 @@ def check_updates() -> None: pass else: if latest_version: - print(f"\t\t[bold]Version {latest_version} is available! Upgrade mvt![/bold]") + print(f"\t\t[bold]Version {latest_version} is available! " + "Upgrade mvt![/bold]") # Then we check for indicators files updates. ioc_updates = IndicatorsUpdates() @@ -26,7 +27,8 @@ def check_updates() -> None: # Before proceeding, we check if we have downloaded an indicators index. # If not, there's no point in proceeding with the updates check. if ioc_updates.get_latest_update() == 0: - print("\t\t[bold]You have not yet downloaded any indicators, check the `download-iocs` command![/bold]") + print("\t\t[bold]You have not yet downloaded any indicators, check " + "the `download-iocs` command![/bold]") return # We only perform this check at a fixed frequency, in order to not @@ -34,7 +36,8 @@ def check_updates() -> None: # multiple times. should_check, hours = ioc_updates.should_check() if not should_check: - print(f"\t\tIndicators updates checked recently, next automatic check in {int(hours)} hours") + print(f"\t\tIndicators updates checked recently, next automatic check " + f"in {int(hours)} hours") return try: @@ -43,7 +46,8 @@ def check_updates() -> None: pass else: if ioc_to_update: - print("\t\t[bold]There are updates to your indicators files! Run the `download-iocs` command to update![/bold]") + print("\t\t[bold]There are updates to your indicators files! " + "Run the `download-iocs` command to update![/bold]") else: print("\t\tYour indicators files seem to be up to date.") diff --git a/mvt/common/module.py b/mvt/common/module.py index 673e51f..754ece6 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -37,7 +37,8 @@ class MVTModule: :param file_path: Path to the module's database file, if there is any :type file_path: str - :param target_path: Path to the target folder (backup or filesystem dump) + :param target_path: Path to the target folder (backup or filesystem + dump) :type file_path: str :param results_path: Folder where results will be stored :type results_path: str @@ -92,7 +93,8 @@ class MVTModule: if self.results: results_file_name = f"{name}.json" - results_json_path = os.path.join(self.results_path, results_file_name) + results_json_path = os.path.join(self.results_path, + results_file_name) with open(results_json_path, "w", encoding="utf-8") as handle: try: json.dump(self.results, handle, indent=4, default=str) @@ -102,7 +104,8 @@ class MVTModule: if self.detected: detected_file_name = f"{name}_detected.json" - detected_json_path = os.path.join(self.results_path, detected_file_name) + detected_json_path = os.path.join(self.results_path, + detected_file_name) with open(detected_json_path, "w", encoding="utf-8") as handle: json.dump(self.detected, handle, indent=4, default=str) @@ -157,7 +160,8 @@ def run_module(module: Callable) -> None: module.log.exception("The run() procedure of module %s was not implemented yet!", module.__class__.__name__) except InsufficientPrivileges as e: - module.log.info("Insufficient privileges for module %s: %s", module.__class__.__name__, e) + module.log.info("Insufficient privileges for module %s: %s", + module.__class__.__name__, e) except DatabaseNotFoundError as e: module.log.info("There might be no data to extract by module %s: %s", module.__class__.__name__, e) @@ -197,7 +201,9 @@ def save_timeline(timeline: list, timeline_path: str) -> None: csvoutput = csv.writer(handle, delimiter=",", quotechar="\"", quoting=csv.QUOTE_ALL) csvoutput.writerow(["UTC Timestamp", "Plugin", "Event", "Description"]) - for event in sorted(timeline, key=lambda x: x["timestamp"] if x["timestamp"] is not None else ""): + + for event in sorted(timeline, key=lambda x: x["timestamp"] + if x["timestamp"] is not None else ""): csvoutput.writerow([ event.get("timestamp"), event.get("module"), diff --git a/mvt/common/updates.py b/mvt/common/updates.py index 96e9c7c..8aa3e7e 100644 --- a/mvt/common/updates.py +++ b/mvt/common/updates.py @@ -165,7 +165,8 @@ class IndicatorsUpdates: log.error("Failed to retrieve date of latest update to indicators index file") return -1 - latest_commit_dt = datetime.strptime(latest_commit_date, '%Y-%m-%dT%H:%M:%SZ') + latest_commit_dt = datetime.strptime(latest_commit_date, + '%Y-%m-%dT%H:%M:%SZ') latest_commit_ts = int(latest_commit_dt.timestamp()) return latest_commit_ts diff --git a/mvt/common/virustotal.py b/mvt/common/virustotal.py index 0bac588..a85950f 100644 --- a/mvt/common/virustotal.py +++ b/mvt/common/virustotal.py @@ -23,14 +23,17 @@ class VTQuotaExceeded(Exception): def virustotal_lookup(file_hash: str): if MVT_VT_API_KEY not in os.environ: - raise VTNoKey("No VirusTotal API key provided: to use VirusTotal lookups please provide your API key with `export MVT_VT_API_KEY=`") + raise VTNoKey("No VirusTotal API key provided: to use VirusTotal " + "lookups please provide your API key with " + "`export MVT_VT_API_KEY=`") headers = { "User-Agent": "VirusTotal", "Content-Type": "application/json", "x-apikey": os.environ[MVT_VT_API_KEY], } - res = requests.get(f"https://www.virustotal.com/api/v3/files/{file_hash}", headers=headers) + res = requests.get(f"https://www.virustotal.com/api/v3/files/{file_hash}", + headers=headers) if res.status_code == 200: report = res.json() @@ -39,7 +42,8 @@ def virustotal_lookup(file_hash: str): if res.status_code == 404: log.info("Could not find results for file with hash %s", file_hash) elif res.status_code == 429: - raise VTQuotaExceeded("You have exceeded the quota for your VirusTotal API key") + raise VTQuotaExceeded("You have exceeded the quota for your " + "VirusTotal API key") else: raise Exception(f"Unexpected response from VirusTotal: {res.status_code}") diff --git a/mvt/ios/cmd_check_backup.py b/mvt/ios/cmd_check_backup.py index 4cf2355..5428400 100644 --- a/mvt/ios/cmd_check_backup.py +++ b/mvt/ios/cmd_check_backup.py @@ -16,8 +16,8 @@ log = logging.getLogger(__name__) class CmdIOSCheckBackup(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/mvt/ios/cmd_check_fs.py b/mvt/ios/cmd_check_fs.py index 503660c..0eba37d 100644 --- a/mvt/ios/cmd_check_fs.py +++ b/mvt/ios/cmd_check_fs.py @@ -16,8 +16,8 @@ log = logging.getLogger(__name__) class CmdIOSCheckFS(Command): def __init__(self, target_path: str = None, results_path: str = None, - ioc_files: list = [], module_name: str = None, serial: str = None, - fast_mode: bool = False): + ioc_files: list = [], module_name: str = None, + serial: str = None, fast_mode: bool = False): super().__init__(target_path=target_path, results_path=results_path, ioc_files=ioc_files, module_name=module_name, serial=serial, fast_mode=fast_mode, log=log) diff --git a/setup.cfg b/setup.cfg index 5575fee..3603f31 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,10 +63,10 @@ max-args = 10 good-names = i,e,m -min-similarity-lines=10 -ignore-comments=yes -ignore-docstrings=yes -ignore-imports=yes +min-similarity-lines = 10 +ignore-comments = yes +ignore-docstrings = yes +ignore-imports = yes ignored-argument-names=args|kwargs @@ -76,10 +76,10 @@ disable = broad-except, abstract-method, dangerous-default-value, - #duplicate-code, - line-too-long, too-few-public-methods, missing-docstring, missing-module-docstring, missing-class-docstring, missing-function-docstring, + #duplicate-code, + #line-too-long,