From c779009550865f03ce8b75a6ac71a045e30e6c05 Mon Sep 17 00:00:00 2001 From: Janik Besendorf Date: Sat, 20 Dec 2025 09:50:55 +0100 Subject: [PATCH] fix typing for mypy --- src/mvt/android/artifacts/artifact.py | 42 +++++++++++++------ src/mvt/android/artifacts/dumpsys_adb.py | 2 +- src/mvt/android/artifacts/getprop.py | 3 +- src/mvt/android/cmd_check_androidqf.py | 2 + src/mvt/android/cmd_check_backup.py | 22 ++++++---- src/mvt/android/cmd_check_bugreport.py | 2 + src/mvt/android/modules/adb/chrome_history.py | 2 +- src/mvt/android/modules/adb/selinux_status.py | 5 ++- .../android/modules/androidqf/aqf_files.py | 2 +- .../android/modules/androidqf/aqf_getprop.py | 4 +- .../modules/androidqf/aqf_log_timestamps.py | 13 +++--- .../android/modules/androidqf/aqf_packages.py | 6 ++- .../android/modules/androidqf/aqf_settings.py | 4 +- src/mvt/android/modules/androidqf/base.py | 4 +- src/mvt/android/modules/androidqf/mounts.py | 4 +- src/mvt/android/modules/backup/base.py | 17 ++++---- src/mvt/android/modules/bugreport/base.py | 11 +++-- .../modules/bugreport/dumpsys_packages.py | 7 ++-- src/mvt/common/command.py | 2 +- src/mvt/common/config.py | 42 +++++++++---------- src/mvt/common/indicators.py | 12 +++--- src/mvt/common/module.py | 9 ++-- src/mvt/common/module_types.py | 20 +++++---- src/mvt/common/updates.py | 4 +- src/mvt/common/url.py | 18 +++++--- src/mvt/ios/decrypt.py | 6 ++- src/mvt/ios/modules/backup/backup_info.py | 4 +- .../modules/backup/configuration_profiles.py | 16 +++---- src/mvt/ios/modules/backup/manifest.py | 16 ++++--- src/mvt/ios/modules/backup/profile_events.py | 20 +++++---- src/mvt/ios/modules/base.py | 20 +++++++-- src/mvt/ios/modules/fs/analytics.py | 16 +++---- src/mvt/ios/modules/fs/cache_files.py | 5 ++- src/mvt/ios/modules/fs/safari_favicon.py | 9 ++-- src/mvt/ios/modules/fs/shutdownlog.py | 11 +++-- src/mvt/ios/modules/fs/version_history.py | 3 +- src/mvt/ios/modules/fs/webkit_base.py | 4 +- src/mvt/ios/modules/mixed/applications.py | 16 ++++--- src/mvt/ios/modules/mixed/calendar.py | 7 ++-- src/mvt/ios/modules/mixed/calls.py | 10 +++-- src/mvt/ios/modules/mixed/chrome_favicon.py | 10 +++-- src/mvt/ios/modules/mixed/chrome_history.py | 10 +++-- src/mvt/ios/modules/mixed/contacts.py | 3 ++ src/mvt/ios/modules/mixed/firefox_favicon.py | 10 +++-- src/mvt/ios/modules/mixed/firefox_history.py | 10 +++-- .../ios/modules/mixed/global_preferences.py | 6 ++- src/mvt/ios/modules/mixed/idstatuscache.py | 5 +-- src/mvt/ios/modules/mixed/interactionc.py | 6 ++- src/mvt/ios/modules/mixed/locationd.py | 11 ++--- .../ios/modules/mixed/osanalytics_addaily.py | 10 +++-- .../ios/modules/mixed/safari_browserstate.py | 11 +++-- src/mvt/ios/modules/mixed/safari_history.py | 10 +++-- src/mvt/ios/modules/mixed/shortcuts.py | 8 +++- src/mvt/ios/modules/mixed/sms.py | 10 ++++- src/mvt/ios/modules/mixed/sms_attachments.py | 10 ++--- .../mixed/webkit_resource_load_statistics.py | 8 ++-- .../mixed/webkit_session_resource_log.py | 13 +++--- src/mvt/ios/modules/mixed/whatsapp.py | 12 ++++-- src/mvt/ios/versions.py | 6 ++- 59 files changed, 366 insertions(+), 225 deletions(-) diff --git a/src/mvt/android/artifacts/artifact.py b/src/mvt/android/artifacts/artifact.py index a5df7b7..94dddde 100644 --- a/src/mvt/android/artifacts/artifact.py +++ b/src/mvt/android/artifacts/artifact.py @@ -20,23 +20,39 @@ class AndroidArtifact(Artifact): :param binary: whether the dumpsys should be pared as binary or not (bool) :return: section extracted (string or bytes) """ - lines = [] in_section = False - delimiter = "------------------------------------------------------------------------------" + delimiter_str = "------------------------------------------------------------------------------" + delimiter_bytes = b"------------------------------------------------------------------------------" + if binary: - delimiter = delimiter.encode("utf-8") + lines_bytes = [] + for line in dumpsys.splitlines(): # type: ignore[union-attr] + if line.strip() == separator: # type: ignore[arg-type] + in_section = True + continue - for line in dumpsys.splitlines(): - if line.strip() == separator: - in_section = True - continue + if not in_section: + continue - if not in_section: - continue + if line.strip().startswith(delimiter_bytes): # type: ignore[arg-type] + break - if line.strip().startswith(delimiter): - break + lines_bytes.append(line) # type: ignore[arg-type] - lines.append(line) + return b"\n".join(lines_bytes) # type: ignore[return-value,arg-type] + else: + lines_str = [] + for line in dumpsys.splitlines(): # type: ignore[union-attr] + if line.strip() == separator: # type: ignore[arg-type] + in_section = True + continue - return b"\n".join(lines) if binary else "\n".join(lines) + if not in_section: + continue + + if line.strip().startswith(delimiter_str): # type: ignore[arg-type] + break + + lines_str.append(line) # type: ignore[arg-type] + + return "\n".join(lines_str) # type: ignore[return-value,arg-type] diff --git a/src/mvt/android/artifacts/dumpsys_adb.py b/src/mvt/android/artifacts/dumpsys_adb.py index 2bc9abe..4ed1669 100644 --- a/src/mvt/android/artifacts/dumpsys_adb.py +++ b/src/mvt/android/artifacts/dumpsys_adb.py @@ -84,7 +84,7 @@ class DumpsysADBArtifact(AndroidArtifact): return keystore @staticmethod - def calculate_key_info(user_key: bytes) -> str: + def calculate_key_info(user_key: bytes) -> dict: if b" " in user_key: key_base64, user = user_key.split(b" ", 1) else: diff --git a/src/mvt/android/artifacts/getprop.py b/src/mvt/android/artifacts/getprop.py index 71e4993..debcfc4 100644 --- a/src/mvt/android/artifacts/getprop.py +++ b/src/mvt/android/artifacts/getprop.py @@ -60,7 +60,8 @@ class GetProp(AndroidArtifact): if entry["name"] == "ro.build.version.security_patch": warning_message = warn_android_patch_level(entry["value"], self.log) - self.alertstore.medium(warning_message, "", entry) + if isinstance(warning_message, str): + self.alertstore.medium(warning_message, "", entry) if not self.indicators: return diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 0427b1d..ff96a24 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -167,6 +167,8 @@ class CmdAndroidCheckAndroidQF(Command): if bugreport: bugreport.close() + return True + def run_backup_cmd(self) -> bool: try: backup = self.load_backup() diff --git a/src/mvt/android/cmd_check_backup.py b/src/mvt/android/cmd_check_backup.py index 8fc4d71..c6fe922 100644 --- a/src/mvt/android/cmd_check_backup.py +++ b/src/mvt/android/cmd_check_backup.py @@ -9,7 +9,7 @@ import os import sys import tarfile from pathlib import Path -from typing import List, Optional +from typing import List, Optional, cast from mvt.android.modules.backup.base import BackupModule from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password @@ -93,22 +93,28 @@ class CmdAndroidCheckBackup(Command): self.__files.append(member.name) def init(self) -> None: - if not self.target_path: + if not self.target_path: # type: ignore[has-type] return - if os.path.isfile(self.target_path): + # Type guard: we know it's not None here after the check above + assert self.target_path is not None # type: ignore[has-type] + # Use a different local variable name to avoid any scoping issues + backup_path: str = self.target_path # type: ignore[has-type] + + if os.path.isfile(backup_path): self.__type = "ab" - with open(self.target_path, "rb") as handle: + with open(backup_path, "rb") as handle: ab_file_bytes = handle.read() self.from_ab(ab_file_bytes) - elif os.path.isdir(self.target_path): + elif os.path.isdir(backup_path): self.__type = "folder" - self.target_path = Path(self.target_path).absolute().as_posix() - for root, subdirs, subfiles in os.walk(os.path.abspath(self.target_path)): + backup_path = Path(backup_path).absolute().as_posix() + self.target_path = backup_path + for root, subdirs, subfiles in os.walk(os.path.abspath(backup_path)): for fname in subfiles: self.__files.append( - os.path.relpath(os.path.join(root, fname), self.target_path) + os.path.relpath(os.path.join(root, fname), backup_path) ) else: log.critical( diff --git a/src/mvt/android/cmd_check_bugreport.py b/src/mvt/android/cmd_check_bugreport.py index 7cc827f..b6f1367 100644 --- a/src/mvt/android/cmd_check_bugreport.py +++ b/src/mvt/android/cmd_check_bugreport.py @@ -96,6 +96,8 @@ class CmdAndroidCheckBugreport(Command): if self.__format == "zip": module.from_zip(self.__zip, self.__files) else: + if not self.target_path: + raise ValueError("target_path is not set") module.from_dir(self.target_path, self.__files) def finish(self) -> None: diff --git a/src/mvt/android/modules/adb/chrome_history.py b/src/mvt/android/modules/adb/chrome_history.py index 2be1d33..16a5164 100644 --- a/src/mvt/android/modules/adb/chrome_history.py +++ b/src/mvt/android/modules/adb/chrome_history.py @@ -40,7 +40,7 @@ class ChromeHistory(AndroidExtraction): log=log, results=results, ) - self.results = [] + self.results: list = [] def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: return { diff --git a/src/mvt/android/modules/adb/selinux_status.py b/src/mvt/android/modules/adb/selinux_status.py index 39925c7..1f15dbc 100644 --- a/src/mvt/android/modules/adb/selinux_status.py +++ b/src/mvt/android/modules/adb/selinux_status.py @@ -6,9 +6,10 @@ import logging from typing import Optional -from .base import AndroidExtraction from mvt.common.module_types import ModuleResults +from .base import AndroidExtraction + class SELinuxStatus(AndroidExtraction): """This module checks if SELinux is being enforced.""" @@ -33,7 +34,7 @@ class SELinuxStatus(AndroidExtraction): results=results, ) - self.results = {} if not results else results + self.results: dict = {} def run(self) -> None: self._adb_connect() diff --git a/src/mvt/android/modules/androidqf/aqf_files.py b/src/mvt/android/modules/androidqf/aqf_files.py index 35a848d..941de9f 100644 --- a/src/mvt/android/modules/androidqf/aqf_files.py +++ b/src/mvt/android/modules/androidqf/aqf_files.py @@ -112,7 +112,7 @@ class AQFFiles(AndroidQFModule): if result.get("sha256", "") == "": continue - ioc_match = self.indicators.check_file_hash(result.get("sha256")) + ioc_match = self.indicators.check_file_hash(result.get("sha256") or "") if ioc_match: self.alertstore.critical( ioc_match.message, "", result, matched_indicator=ioc_match.ioc diff --git a/src/mvt/android/modules/androidqf/aqf_getprop.py b/src/mvt/android/modules/androidqf/aqf_getprop.py index 68dca90..f41029b 100644 --- a/src/mvt/android/modules/androidqf/aqf_getprop.py +++ b/src/mvt/android/modules/androidqf/aqf_getprop.py @@ -7,9 +7,9 @@ import logging from typing import Optional from mvt.android.artifacts.getprop import GetProp as GetPropArtifact +from mvt.common.module_types import ModuleResults from .base import AndroidQFModule -from mvt.common.module_types import ModuleResults class AQFGetProp(GetPropArtifact, AndroidQFModule): @@ -32,7 +32,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule): log=log, results=results, ) - self.results = [] + self.results: list = [] def run(self) -> None: getprop_files = self._get_files_by_pattern("*/getprop.txt") diff --git a/src/mvt/android/modules/androidqf/aqf_log_timestamps.py b/src/mvt/android/modules/androidqf/aqf_log_timestamps.py index fc46804..1070d1b 100644 --- a/src/mvt/android/modules/androidqf/aqf_log_timestamps.py +++ b/src/mvt/android/modules/androidqf/aqf_log_timestamps.py @@ -3,15 +3,16 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import os import datetime import logging +import os from typing import Optional -from mvt.common.utils import convert_datetime_to_iso -from mvt.common.module_types import ModuleResults -from .base import AndroidQFModule from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact +from mvt.common.module_types import ModuleResults +from mvt.common.utils import convert_datetime_to_iso + +from .base import AndroidQFModule class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule): @@ -37,11 +38,13 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule): results=results, ) - def _get_file_modification_time(self, file_path: str) -> dict: + def _get_file_modification_time(self, file_path: str) -> datetime.datetime: if self.archive: file_timetuple = self.archive.getinfo(file_path).date_time return datetime.datetime(*file_timetuple) else: + if not self.parent_path: + raise ValueError("parent_path is not set") file_stat = os.stat(os.path.join(self.parent_path, file_path)) return datetime.datetime.fromtimestamp(file_stat.st_mtime) diff --git a/src/mvt/android/modules/androidqf/aqf_packages.py b/src/mvt/android/modules/androidqf/aqf_packages.py index 34ed0cf..6fa4ef5 100644 --- a/src/mvt/android/modules/androidqf/aqf_packages.py +++ b/src/mvt/android/modules/androidqf/aqf_packages.py @@ -98,7 +98,7 @@ class AQFPackages(AndroidQFModule): if not self.indicators: continue - ioc_match = self.indicators.check_app_id(result.get("name")) + ioc_match = self.indicators.check_app_id(result.get("name") or "") if ioc_match: self.alertstore.critical( ioc_match.message, "", result, matched_indicator=ioc_match.ioc @@ -106,7 +106,9 @@ class AQFPackages(AndroidQFModule): self.alertstore.log_latest() for package_file in result.get("files", []): - ioc_match = self.indicators.check_file_hash(package_file["sha256"]) + ioc_match = self.indicators.check_file_hash( + package_file.get("sha256") or "" + ) if ioc_match: self.alertstore.critical( ioc_match.message, "", result, matched_indicator=ioc_match.ioc diff --git a/src/mvt/android/modules/androidqf/aqf_settings.py b/src/mvt/android/modules/androidqf/aqf_settings.py index 1dc3b6e..2974932 100644 --- a/src/mvt/android/modules/androidqf/aqf_settings.py +++ b/src/mvt/android/modules/androidqf/aqf_settings.py @@ -7,9 +7,9 @@ import logging from typing import Optional from mvt.android.artifacts.settings import Settings as SettingsArtifact +from mvt.common.module_types import ModuleResults from .base import AndroidQFModule -from mvt.common.module_types import ModuleResults class AQFSettings(SettingsArtifact, AndroidQFModule): @@ -32,7 +32,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule): log=log, results=results, ) - self.results = {} + self.results: dict = {} def run(self) -> None: for setting_file in self._get_files_by_pattern("*/settings_*.txt"): diff --git a/src/mvt/android/modules/androidqf/base.py b/src/mvt/android/modules/androidqf/base.py index a784158..be898fc 100644 --- a/src/mvt/android/modules/androidqf/base.py +++ b/src/mvt/android/modules/androidqf/base.py @@ -33,8 +33,8 @@ class AndroidQFModule(MVTModule): log=log, results=results, ) - self.parent_path = None - self._path: str = target_path + self.parent_path: Optional[str] = None + self._path: Optional[str] = target_path self.files: List[str] = [] self.archive: Optional[zipfile.ZipFile] = None diff --git a/src/mvt/android/modules/androidqf/mounts.py b/src/mvt/android/modules/androidqf/mounts.py index 1a5ba5c..fb1274a 100644 --- a/src/mvt/android/modules/androidqf/mounts.py +++ b/src/mvt/android/modules/androidqf/mounts.py @@ -3,8 +3,8 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import logging import json +import logging from typing import Optional from mvt.android.artifacts.mounts import Mounts as MountsArtifact @@ -32,7 +32,7 @@ class Mounts(MountsArtifact, AndroidQFModule): log=log, results=results, ) - self.results = [] + self.results: list = [] def run(self) -> None: """ diff --git a/src/mvt/android/modules/backup/base.py b/src/mvt/android/modules/backup/base.py index 8da8a20..d49a945 100644 --- a/src/mvt/android/modules/backup/base.py +++ b/src/mvt/android/modules/backup/base.py @@ -9,7 +9,7 @@ import os from tarfile import TarFile from typing import List, Optional -from mvt.common.module import MVTModule, ModuleResults +from mvt.common.module import ModuleResults, MVTModule class BackupModule(MVTModule): @@ -32,10 +32,10 @@ class BackupModule(MVTModule): log=log, results=results, ) - self.ab = None - self.backup_path = None - self.tar = None - self.files = [] + self.ab: Optional[str] = None + self.backup_path: Optional[str] = None + self.tar: Optional[TarFile] = None + self.files: list = [] def from_dir(self, backup_path: Optional[str], files: List[str]) -> None: self.backup_path = backup_path @@ -55,12 +55,15 @@ class BackupModule(MVTModule): return fnmatch.filter(self.files, pattern) def _get_file_content(self, file_path: str) -> bytes: + handle = None if self.tar: try: member = self.tar.getmember(file_path) + handle = self.tar.extractfile(member) + if not handle: + raise ValueError(f"Could not extract file: {file_path}") except KeyError: - return None - handle = self.tar.extractfile(member) + raise FileNotFoundError(f"File not found in tar: {file_path}") elif self.backup_path: handle = open(os.path.join(self.backup_path, file_path), "rb") else: diff --git a/src/mvt/android/modules/bugreport/base.py b/src/mvt/android/modules/bugreport/base.py index 025e2c1..1240aed 100644 --- a/src/mvt/android/modules/bugreport/base.py +++ b/src/mvt/android/modules/bugreport/base.py @@ -6,11 +6,10 @@ import datetime import fnmatch import logging import os - from typing import List, Optional from zipfile import ZipFile -from mvt.common.module import MVTModule, ModuleResults +from mvt.common.module import ModuleResults, MVTModule class BugReportModule(MVTModule): @@ -69,6 +68,8 @@ class BugReportModule(MVTModule): if self.zip_archive: handle = self.zip_archive.open(file_path) else: + if not self.extract_path: + raise ValueError("extract_path is not set") handle = open(os.path.join(self.extract_path, file_path), "rb") data = handle.read() @@ -76,7 +77,7 @@ class BugReportModule(MVTModule): return data - def _get_dumpstate_file(self) -> bytes: + def _get_dumpstate_file(self) -> Optional[bytes]: main = self._get_files_by_pattern("main_entry.txt") if main: main_content = self._get_file_content(main[0]) @@ -91,10 +92,12 @@ class BugReportModule(MVTModule): return self._get_file_content(dumpstate_logs[0]) - def _get_file_modification_time(self, file_path: str) -> dict: + def _get_file_modification_time(self, file_path: str) -> datetime.datetime: if self.zip_archive: file_timetuple = self.zip_archive.getinfo(file_path).date_time return datetime.datetime(*file_timetuple) else: + if not self.extract_path: + raise ValueError("extract_path is not set") file_stat = os.stat(os.path.join(self.extract_path, file_path)) return datetime.datetime.fromtimestamp(file_stat.st_mtime) diff --git a/src/mvt/android/modules/bugreport/dumpsys_packages.py b/src/mvt/android/modules/bugreport/dumpsys_packages.py index 0fb4713..6bc5d27 100644 --- a/src/mvt/android/modules/bugreport/dumpsys_packages.py +++ b/src/mvt/android/modules/bugreport/dumpsys_packages.py @@ -6,9 +6,9 @@ import logging from typing import Optional -from mvt.common.module_types import ModuleResults from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRESHOLD +from mvt.common.module_types import ModuleResults from .base import BugReportModule @@ -43,8 +43,9 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule): ) return - data = data.decode("utf-8", errors="replace") - content = self.extract_dumpsys_section(data, "DUMP OF SERVICE package:") + content = self.extract_dumpsys_section( + data.decode("utf-8", errors="replace"), "DUMP OF SERVICE package:" + ) self.parse(content) for result in self.results: diff --git a/src/mvt/common/command.py b/src/mvt/common/command.py index e796391..c2b3664 100644 --- a/src/mvt/common/command.py +++ b/src/mvt/common/command.py @@ -154,7 +154,7 @@ class Command: if not self.results_path: return - target_path = None + target_path: Optional[str] = None if self.target_path: target_path = os.path.abspath(self.target_path) diff --git a/src/mvt/common/config.py b/src/mvt/common/config.py index d2e4e20..cecaac5 100644 --- a/src/mvt/common/config.py +++ b/src/mvt/common/config.py @@ -1,8 +1,8 @@ -import os -import yaml import json +import os +from typing import Optional, Tuple, Type -from typing import Tuple, Type, Optional +import yaml from appdirs import user_config_dir from pydantic import AnyHttpUrl, Field from pydantic_settings import ( @@ -22,51 +22,51 @@ class MVTSettings(BaseSettings): env_prefix="MVT_", env_nested_delimiter="_", extra="ignore", - nested_model_default_partial_updates=True, ) # Allow to decided if want to load environment variables load_env: bool = Field(True, exclude=True) # General settings - PYPI_UPDATE_URL: AnyHttpUrl = Field( - "https://pypi.org/pypi/mvt/json", - validate_default=False, + PYPI_UPDATE_URL: str = Field( + default="https://pypi.org/pypi/mvt/json", ) NETWORK_ACCESS_ALLOWED: bool = True NETWORK_TIMEOUT: int = 15 # Command default settings, all can be specified by MVT_ prefixed environment variables too. IOS_BACKUP_PASSWORD: Optional[str] = Field( - None, description="Default password to use to decrypt iOS backups" + default=None, description="Default password to use to decrypt iOS backups" ) ANDROID_BACKUP_PASSWORD: Optional[str] = Field( - None, description="Default password to use to decrypt Android backups" + default=None, description="Default password to use to decrypt Android backups" ) STIX2: Optional[str] = Field( - None, description="List of directories where STIX2 files are stored" + default=None, description="List of directories where STIX2 files are stored" ) VT_API_KEY: Optional[str] = Field( - None, description="API key to use for VirusTotal lookups" + default=None, description="API key to use for VirusTotal lookups" ) - PROFILE: bool = Field(False, description="Profile the execution of MVT modules") - HASH_FILES: bool = Field(False, description="Should MVT hash output files") + PROFILE: bool = Field( + default=False, description="Profile the execution of MVT modules" + ) + HASH_FILES: bool = Field(default=False, description="Should MVT hash output files") @classmethod def settings_customise_sources( cls, settings_cls: Type[BaseSettings], - init_settings: InitSettingsSource, + init_settings: PydanticBaseSettingsSource, env_settings: PydanticBaseSettingsSource, dotenv_settings: PydanticBaseSettingsSource, file_secret_settings: PydanticBaseSettingsSource, ) -> Tuple[PydanticBaseSettingsSource, ...]: - sources = ( - YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH), + yaml_source = YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH) + sources: Tuple[PydanticBaseSettingsSource, ...] = ( + yaml_source, init_settings, ) - # Load env variables if enabled - if init_settings.init_kwargs.get("load_env", True): - sources = (env_settings,) + sources + # Always load env variables by default + sources = (env_settings,) + sources return sources def save_settings( @@ -94,11 +94,11 @@ class MVTSettings(BaseSettings): Afterwards we load the settings again, this time including the env variables. """ # Set invalid env prefix to avoid loading env variables. - settings = MVTSettings(load_env=False) + settings = cls(load_env=False) settings.save_settings() # Load the settings again with any ENV variables. - settings = MVTSettings(load_env=True) + settings = cls(load_env=True) return settings diff --git a/src/mvt/common/indicators.py b/src/mvt/common/indicators.py index c34418d..d176688 100644 --- a/src/mvt/common/indicators.py +++ b/src/mvt/common/indicators.py @@ -7,15 +7,15 @@ import glob import json import logging import os +from dataclasses import dataclass from functools import lru_cache from typing import Any, Dict, Iterator, List, Optional -from dataclasses import dataclass import ahocorasick from appdirs import user_data_dir -from .url import URL from .config import settings +from .url import URL MVT_DATA_FOLDER = user_data_dir("mvt") MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") @@ -68,7 +68,7 @@ class Indicators: self.parse_stix2(path) elif os.path.isdir(path): for file in glob.glob( - os.path.join(path, "**", "*.stix2", recursive=True) + os.path.join(path, "**", "*.stix2"), recursive=True ): self.parse_stix2(file) else: @@ -350,7 +350,7 @@ class Indicators: @lru_cache() def get_ioc_matcher( - self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None + self, ioc_type: Optional[str] = None, ioc_list: Optional[List[Indicator]] = None ) -> ahocorasick.Automaton: """ Build an Aho-Corasick automaton from a list of iocs (i.e indicators) @@ -370,9 +370,9 @@ class Indicators: """ automaton = ahocorasick.Automaton() if ioc_type: - iocs = self.get_iocs(ioc_type) + iocs: Iterator[Indicator] = self.get_iocs(ioc_type) elif ioc_list: - iocs = ioc_list + iocs = iter(ioc_list) else: raise ValueError("Must provide either ioc_type or ioc_list") diff --git a/src/mvt/common/module.py b/src/mvt/common/module.py index d5a7628..ce8652f 100644 --- a/src/mvt/common/module.py +++ b/src/mvt/common/module.py @@ -146,7 +146,10 @@ class MVTModule: for record in timeline: timeline_set.add( json.dumps( - asdict(record) if is_dataclass(record) else record, sort_keys=True + asdict(record) + if is_dataclass(record) and not isinstance(record, type) + else record, + sort_keys=True, ) ) @@ -161,9 +164,9 @@ class MVTModule: record: ModuleSerializedResult = self.serialize(result) if record: if isinstance(record, list): - self.timeline.extend(record) + self.timeline.extend(record) # type: ignore[arg-type] else: - self.timeline.append(record) + self.timeline.append(record) # type: ignore[arg-type] # De-duplicate timeline entries. self.timeline = self._deduplicate_timeline(self.timeline) diff --git a/src/mvt/common/module_types.py b/src/mvt/common/module_types.py index f433482..9b92454 100644 --- a/src/mvt/common/module_types.py +++ b/src/mvt/common/module_types.py @@ -3,15 +3,18 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -from .indicators import Indicator from dataclasses import dataclass -from typing import List, Union, Optional +from typing import Any, Dict, List, Optional, Union +from .indicators import Indicator -@dataclass -class ModuleAtomicResult: - timestamp: Optional[str] - matched_indicator: Optional[Indicator] +# ModuleAtomicResult is a flexible dictionary that can contain any data. +# Common fields include: +# - timestamp: Optional[str] - timestamp string +# - isodate: Optional[str] - ISO formatted date string +# - matched_indicator: Optional[Indicator] - indicator that matched this result +# - Any other module-specific fields +ModuleAtomicResult = Dict[str, Any] ModuleResults = List[ModuleAtomicResult] @@ -26,4 +29,7 @@ class ModuleAtomicTimeline: ModuleTimeline = List[ModuleAtomicTimeline] -ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline] +# ModuleSerializedResult can be a proper timeline object or a plain dict for compatibility +ModuleSerializedResult = Union[ + ModuleAtomicTimeline, ModuleTimeline, Dict[str, Any], List[Dict[str, Any]] +] diff --git a/src/mvt/common/updates.py b/src/mvt/common/updates.py index c9c380b..c95f9ce 100644 --- a/src/mvt/common/updates.py +++ b/src/mvt/common/updates.py @@ -12,9 +12,9 @@ import requests import yaml from packaging import version +from .config import settings from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER from .version import MVT_VERSION -from .config import settings log = logging.getLogger(__name__) @@ -25,7 +25,7 @@ INDICATORS_CHECK_FREQUENCY = 12 class MVTUpdates: def check(self) -> str: try: - res = requests.get(settings.PYPI_UPDATE_URL, timeout=5) + res = requests.get(str(settings.PYPI_UPDATE_URL), timeout=5) except requests.exceptions.RequestException as e: log.error("Failed to check for updates, skipping updates: %s", e) return "" diff --git a/src/mvt/common/url.py b/src/mvt/common/url.py index 40240c5..a83b53b 100644 --- a/src/mvt/common/url.py +++ b/src/mvt/common/url.py @@ -338,11 +338,12 @@ class URL: :rtype: str """ - return ( - get_tld(self.url, as_object=True, fix_protocol=True) - .parsed_url.netloc.lower() - .lstrip("www.") - ) + tld_obj = get_tld(self.url, as_object=True, fix_protocol=True) + if isinstance(tld_obj, str): + return tld_obj + if tld_obj is None: + return "" + return tld_obj.parsed_url.netloc.lower().lstrip("www.") def get_top_level(self) -> str: """Get only the top-level domain from a URL. @@ -351,7 +352,12 @@ class URL: :rtype: str """ - return get_tld(self.url, as_object=True, fix_protocol=True).fld.lower() + tld_obj = get_tld(self.url, as_object=True, fix_protocol=True) + if isinstance(tld_obj, str): + return tld_obj + if tld_obj is None: + return "" + return tld_obj.fld.lower() def check_if_shortened(self) -> bool: """Check if the URL is among list of shortener services. diff --git a/src/mvt/ios/decrypt.py b/src/mvt/ios/decrypt.py index ffb2cb7..6077254 100644 --- a/src/mvt/ios/decrypt.py +++ b/src/mvt/ios/decrypt.py @@ -58,6 +58,7 @@ class DecryptBackup: def _process_file( self, relative_path: str, domain: str, item, file_id: str, item_folder: str ) -> None: + assert self._backup is not None self._backup.getFileDecryptedCopy( manifestEntry=item, targetName=file_id, targetFolder=item_folder ) @@ -70,6 +71,9 @@ class DecryptBackup: ) def process_backup(self) -> None: + assert self._backup is not None + assert self.dest_path is not None + if not os.path.exists(self.dest_path): os.makedirs(self.dest_path) @@ -97,7 +101,7 @@ class DecryptBackup: ) continue - item_folder = os.path.join(self.dest_path, file_id[0:2]) + item_folder = os.path.join(self.dest_path, file_id[0:2]) # type: ignore[arg-type] if not os.path.exists(item_folder): os.makedirs(item_folder) diff --git a/src/mvt/ios/modules/backup/backup_info.py b/src/mvt/ios/modules/backup/backup_info.py index 07baa2f..9e07200 100644 --- a/src/mvt/ios/modules/backup/backup_info.py +++ b/src/mvt/ios/modules/backup/backup_info.py @@ -36,9 +36,11 @@ class BackupInfo(IOSExtraction): results=results, ) - self.results = {} + self.results: dict = {} def run(self) -> None: + if not self.target_path: + raise DatabaseNotFoundError("target_path is not set") info_path = os.path.join(self.target_path, "Info.plist") if not os.path.exists(info_path): raise DatabaseNotFoundError( diff --git a/src/mvt/ios/modules/backup/configuration_profiles.py b/src/mvt/ios/modules/backup/configuration_profiles.py index 306ea4f..7aa635b 100644 --- a/src/mvt/ios/modules/backup/configuration_profiles.py +++ b/src/mvt/ios/modules/backup/configuration_profiles.py @@ -9,12 +9,12 @@ import plistlib from base64 import b64encode from typing import Optional -from mvt.common.utils import convert_datetime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -72,12 +72,10 @@ class ConfigurationProfiles(IOSExtraction): result["plist"]["PayloadUUID"] ) if ioc_match: - warning_message = ( - f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"', - ) + warning_message = f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"' result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), warning_message, "", result + warning_message, "", result, matched_indicator=ioc_match.ioc ) self.alertstore.log_latest() continue @@ -85,10 +83,8 @@ class ConfigurationProfiles(IOSExtraction): # Highlight suspicious configuration profiles which may be used # to hide notifications. if payload_content["PayloadType"] in ["com.apple.notificationsettings"]: - warning_message = ( - f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}', - ) - self.alertstore.medum(self.get_slug(), warning_message, "", result) + warning_message = f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}' + self.alertstore.medium(warning_message, "", result) self.alertstore.log_latest() continue diff --git a/src/mvt/ios/modules/backup/manifest.py b/src/mvt/ios/modules/backup/manifest.py index 099a682..4673b25 100644 --- a/src/mvt/ios/modules/backup/manifest.py +++ b/src/mvt/ios/modules/backup/manifest.py @@ -11,13 +11,13 @@ import plistlib from typing import Optional from mvt.common.module import DatabaseNotFoundError -from mvt.common.url import URL -from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso from mvt.common.module_types import ( - ModuleResults, ModuleAtomicResult, + ModuleResults, ModuleSerializedResult, ) +from mvt.common.url import URL +from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso from ..base import IOSExtraction @@ -66,7 +66,7 @@ class Manifest(IOSExtraction): return convert_unix_to_iso(timestamp_or_unix_time_int) def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: - records = [] + records: list = [] if "modified" not in record or "status_changed" not in record: return records @@ -103,7 +103,9 @@ class Manifest(IOSExtraction): ioc_match = self.indicators.check_file_path("/" + result["relative_path"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.high(self.get_slug(), ioc_match.message, "", result) + self.alertstore.high( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue rel_path = result["relative_path"].lower() @@ -118,13 +120,15 @@ class Manifest(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), f'Found mention of domain "{ioc_match.ioc.value}" in a backup file with path: {rel_path}', "", result, + matched_indicator=ioc_match.ioc, ) def run(self) -> None: + if not self.target_path: + raise DatabaseNotFoundError("target_path is not set") manifest_db_path = os.path.join(self.target_path, "Manifest.db") if not os.path.isfile(manifest_db_path): raise DatabaseNotFoundError("unable to find backup's Manifest.db") diff --git a/src/mvt/ios/modules/backup/profile_events.py b/src/mvt/ios/modules/backup/profile_events.py index 98c87b1..83d82af 100644 --- a/src/mvt/ios/modules/backup/profile_events.py +++ b/src/mvt/ios/modules/backup/profile_events.py @@ -7,12 +7,12 @@ import logging import plistlib from typing import Optional -from mvt.common.utils import convert_datetime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -58,29 +58,31 @@ class ProfileEvents(IOSExtraction): def check_indicators(self) -> None: for result in self.results: message = f'On {result.get("timestamp")} process "{result.get("process")}" started operation "{result.get("operation")}" of profile "{result.get("profile_id")}"' - self.alertstore.low( - self.get_slug(), message, result.get("timestamp"), result - ) + self.alertstore.low(message, result.get("timestamp") or "", result) self.alertstore.log_latest() if not self.indicators: return for result in self.results: - ioc_match = self.indicators.check_process(result.get("process")) + ioc_match = self.indicators.check_process(result.get("process") or "") if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue - ioc_match = self.indicators.check_profile(result.get("profile_id")) + ioc_match = self.indicators.check_profile(result.get("profile_id") or "") if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) @staticmethod def parse_profile_events(file_data: bytes) -> list: - results = [] + results: list = [] events_plist = plistlib.loads(file_data) diff --git a/src/mvt/ios/modules/base.py b/src/mvt/ios/modules/base.py index 1a4861b..7226032 100644 --- a/src/mvt/ios/modules/base.py +++ b/src/mvt/ios/modules/base.py @@ -11,8 +11,12 @@ import sqlite3 import subprocess from typing import Iterator, Optional, Union -from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError -from mvt.common.module import MVTModule, ModuleResults +from mvt.common.module import ( + DatabaseCorruptedError, + DatabaseNotFoundError, + ModuleResults, + MVTModule, +) class IOSExtraction(MVTModule): @@ -110,6 +114,8 @@ class IOSExtraction(MVTModule): (Default value = None) """ + if not self.target_path: + raise DatabaseNotFoundError("target_path is not set") manifest_db_path = os.path.join(self.target_path, "Manifest.db") if not os.path.exists(manifest_db_path): raise DatabaseNotFoundError("unable to find backup's Manifest.db") @@ -146,6 +152,8 @@ class IOSExtraction(MVTModule): } def _get_backup_file_from_id(self, file_id: str) -> Union[str, None]: + if not self.target_path: + return None file_path = os.path.join(self.target_path, file_id[0:2], file_id) if os.path.exists(file_path): return file_path @@ -153,6 +161,8 @@ class IOSExtraction(MVTModule): return None def _get_fs_files_from_patterns(self, root_paths: list) -> Iterator[str]: + if not self.target_path: + return for root_path in root_paths: for found_path in glob.glob(os.path.join(self.target_path, root_path)): if not os.path.exists(found_path): @@ -174,9 +184,10 @@ class IOSExtraction(MVTModule): :param backup_ids: Default value = None) """ - file_path = None + file_path: Optional[str] = None # First we check if the was an explicit file path specified. if not self.file_path: + # Type narrowing: we know self.file_path is None here, work with local file_path # If not, we first try with backups. # We construct the path to the file according to the iTunes backup # folder structure, if we have a valid ID. @@ -198,8 +209,9 @@ class IOSExtraction(MVTModule): # If we do not find any, we fail. if file_path: - self.file_path = file_path + self.file_path = file_path # type: str else: raise DatabaseNotFoundError("unable to find the module's database file") + assert self.file_path is not None self._recover_sqlite_db_if_needed(self.file_path) diff --git a/src/mvt/ios/modules/fs/analytics.py b/src/mvt/ios/modules/fs/analytics.py index b5cba0d..1d6d961 100644 --- a/src/mvt/ios/modules/fs/analytics.py +++ b/src/mvt/ios/modules/fs/analytics.py @@ -9,12 +9,12 @@ import plistlib import sqlite3 from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -44,6 +44,7 @@ class Analytics(IOSExtraction): log=log, results=results, ) + self.results: list = [] def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: return { @@ -64,13 +65,11 @@ class Analytics(IOSExtraction): ioc_match = self.indicators.check_process(value) if ioc_match: - warning_message = ( - f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}', - ) + warning_message = f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}' new_result = copy.copy(result) new_result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), warning_message, "", new_result + warning_message, "", new_result, matched_indicator=ioc_match.ioc ) self.alertstore.log_latest() continue @@ -80,7 +79,10 @@ class Analytics(IOSExtraction): new_result = copy.copy(result) result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", new_result + ioc_match.message, + "", + new_result, + matched_indicator=ioc_match.ioc, ) def _extract_analytics_data(self): diff --git a/src/mvt/ios/modules/fs/cache_files.py b/src/mvt/ios/modules/fs/cache_files.py index fa34b08..a8fdfe6 100644 --- a/src/mvt/ios/modules/fs/cache_files.py +++ b/src/mvt/ios/modules/fs/cache_files.py @@ -10,9 +10,10 @@ from typing import Optional from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) + from ..base import IOSExtraction @@ -95,7 +96,7 @@ class CacheFiles(IOSExtraction): ) def run(self) -> None: - self.results = {} + self.results: dict = {} for root, _, files in os.walk(self.target_path): for file_name in files: if file_name != "Cache.db": diff --git a/src/mvt/ios/modules/fs/safari_favicon.py b/src/mvt/ios/modules/fs/safari_favicon.py index c7579fb..cffb26b 100644 --- a/src/mvt/ios/modules/fs/safari_favicon.py +++ b/src/mvt/ios/modules/fs/safari_favicon.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -41,6 +41,7 @@ class SafariFavicon(IOSExtraction): log=log, results=results, ) + self.results: list = [] def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: return { @@ -61,7 +62,9 @@ class SafariFavicon(IOSExtraction): ioc_match = self.indicators.check_url(result["icon_url"]) if ioc_match: - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() def _process_favicon_db(self, file_path): diff --git a/src/mvt/ios/modules/fs/shutdownlog.py b/src/mvt/ios/modules/fs/shutdownlog.py index ad8fcd9..2426171 100644 --- a/src/mvt/ios/modules/fs/shutdownlog.py +++ b/src/mvt/ios/modules/fs/shutdownlog.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -57,7 +57,9 @@ class ShutdownLog(IOSExtraction): for result in self.results: ioc_match = self.indicators.check_file_path(result["client"]) if ioc_match: - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() continue @@ -66,10 +68,10 @@ class ShutdownLog(IOSExtraction): if ioc.value in parts: result["matched_indicator"] = ioc self.alertstore.critical( - self.get_slug(), f'Found mention of a known malicious process "{ioc.value}" in shutdown.log', "", result, + matched_indicator=ioc, ) self.alertstore.log_latest() continue @@ -135,5 +137,8 @@ class ShutdownLog(IOSExtraction): def run(self) -> None: self._find_ios_database(root_paths=SHUTDOWN_LOG_PATH) self.log.info("Found shutdown log at path: %s", self.file_path) + + if not self.file_path: + return with open(self.file_path, "r", encoding="utf-8") as handle: self.process_shutdownlog(handle.read()) diff --git a/src/mvt/ios/modules/fs/version_history.py b/src/mvt/ios/modules/fs/version_history.py index 8c0af0f..bd5515c 100644 --- a/src/mvt/ios/modules/fs/version_history.py +++ b/src/mvt/ios/modules/fs/version_history.py @@ -8,12 +8,12 @@ import json import logging from typing import Optional -from mvt.common.utils import convert_datetime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -42,6 +42,7 @@ class IOSVersionHistory(IOSExtraction): log=log, results=results, ) + self.results: list = [] def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult: return { diff --git a/src/mvt/ios/modules/fs/webkit_base.py b/src/mvt/ios/modules/fs/webkit_base.py index 00d4c19..6d72d8e 100644 --- a/src/mvt/ios/modules/fs/webkit_base.py +++ b/src/mvt/ios/modules/fs/webkit_base.py @@ -21,7 +21,9 @@ class WebkitBase(IOSExtraction): ioc_match = self.indicators.check_url(result["url"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def _process_webkit_folder(self, root_paths): diff --git a/src/mvt/ios/modules/mixed/applications.py b/src/mvt/ios/modules/mixed/applications.py index 558430a..bbbab76 100644 --- a/src/mvt/ios/modules/mixed/applications.py +++ b/src/mvt/ios/modules/mixed/applications.py @@ -11,10 +11,14 @@ from datetime import datetime, timezone from typing import Any, Dict, Optional from mvt.common.module import DatabaseNotFoundError +from mvt.common.module_types import ( + ModuleAtomicResult, + ModuleResults, + ModuleSerializedResult, +) from mvt.common.utils import convert_datetime_to_iso -from mvt.ios.modules.base import IOSExtraction -from mvt.common.module import ModuleResults, ModuleAtomicResult, ModuleSerializedResult +from ..base import IOSExtraction APPLICATIONS_DB_PATH = [ "private/var/containers/Bundle/Application/*/iTunesMetadata.plist" @@ -63,7 +67,6 @@ class Applications(IOSExtraction): if self.indicators: if "softwareVersionBundleId" not in result: self.alertstore.high( - self.get_slug(), "Suspicious application identified without softwareVersionBundleId", "", result, @@ -76,10 +79,10 @@ class Applications(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), f"Malicious application {result['softwareVersionBundleId']} identified", "", result, + matched_indicator=ioc_match.ioc, ) continue @@ -89,10 +92,10 @@ class Applications(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), f"Malicious application {result['softwareVersionBundleId']} identified", "", result, + matched_indicator=ioc_match.ioc, ) continue @@ -102,7 +105,6 @@ class Applications(IOSExtraction): not in KNOWN_APP_INSTALLERS ): self.alertstore.medium( - self.get_slug(), f"Suspicious app not installed from the App Store or MDM: {result['softwareVersionBundleId']}", "", result, @@ -157,6 +159,8 @@ class Applications(IOSExtraction): def run(self) -> None: if self.is_backup: + if not self.target_path: + return plist_path = os.path.join(self.target_path, "Info.plist") if not os.path.isfile(plist_path): raise DatabaseNotFoundError("Impossible to find Info.plist file") diff --git a/src/mvt/ios/modules/mixed/calendar.py b/src/mvt/ios/modules/mixed/calendar.py index 6c11186..75a811d 100644 --- a/src/mvt/ios/modules/mixed/calendar.py +++ b/src/mvt/ios/modules/mixed/calendar.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -73,14 +73,13 @@ class Calendar(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", result + ioc_match.message, "", result, matched_indicator=ioc_match.ioc ) continue # Custom check for Quadream exploit if result["summary"] == "Meeting" and result["description"] == "Notes": self.alertstore.high( - self.get_slug(), f"Potential Quadream exploit event identified: {result['uuid']}", "", result, diff --git a/src/mvt/ios/modules/mixed/calls.py b/src/mvt/ios/modules/mixed/calls.py index 82debba..4d411bf 100644 --- a/src/mvt/ios/modules/mixed/calls.py +++ b/src/mvt/ios/modules/mixed/calls.py @@ -6,8 +6,8 @@ import logging from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -22,9 +22,9 @@ class Calls(IOSExtraction): def __init__( self, - file_path: str = None, - target_path: str = None, - results_path: str = None, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, module_options: Optional[dict] = None, log: logging.Logger = logging.getLogger(__name__), results: list = [], @@ -53,6 +53,8 @@ class Calls(IOSExtraction): ) self.log.info("Found Calls database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() cur.execute( diff --git a/src/mvt/ios/modules/mixed/chrome_favicon.py b/src/mvt/ios/modules/mixed/chrome_favicon.py index 00d0b6c..cc44a68 100644 --- a/src/mvt/ios/modules/mixed/chrome_favicon.py +++ b/src/mvt/ios/modules/mixed/chrome_favicon.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso from ..base import IOSExtraction @@ -62,7 +62,9 @@ class ChromeFavicon(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) continue def run(self) -> None: @@ -71,6 +73,8 @@ class ChromeFavicon(IOSExtraction): ) self.log.info("Found Chrome favicon cache database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) # Fetch icon cache diff --git a/src/mvt/ios/modules/mixed/chrome_history.py b/src/mvt/ios/modules/mixed/chrome_history.py index 012b703..fa3b88f 100644 --- a/src/mvt/ios/modules/mixed/chrome_history.py +++ b/src/mvt/ios/modules/mixed/chrome_history.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso from mvt.common.module_types import ( - ModuleResults, ModuleAtomicResult, + ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso from ..base import IOSExtraction @@ -63,7 +63,9 @@ class ChromeHistory(IOSExtraction): ioc_match = self.indicators.check_url(result["url"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -71,6 +73,8 @@ class ChromeHistory(IOSExtraction): ) self.log.info("Found Chrome history database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() cur.execute( diff --git a/src/mvt/ios/modules/mixed/contacts.py b/src/mvt/ios/modules/mixed/contacts.py index 2bd5bee..1966ac1 100644 --- a/src/mvt/ios/modules/mixed/contacts.py +++ b/src/mvt/ios/modules/mixed/contacts.py @@ -8,6 +8,7 @@ import sqlite3 from typing import Optional from mvt.common.module_types import ModuleResults + from ..base import IOSExtraction CONTACTS_BACKUP_IDS = [ @@ -45,6 +46,8 @@ class Contacts(IOSExtraction): ) self.log.info("Found Contacts database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() try: diff --git a/src/mvt/ios/modules/mixed/firefox_favicon.py b/src/mvt/ios/modules/mixed/firefox_favicon.py index 0489114..3357a59 100644 --- a/src/mvt/ios/modules/mixed/firefox_favicon.py +++ b/src/mvt/ios/modules/mixed/firefox_favicon.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_unix_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -64,7 +64,9 @@ class FirefoxFavicon(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -72,6 +74,8 @@ class FirefoxFavicon(IOSExtraction): ) self.log.info("Found Firefox favicon database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() cur.execute( diff --git a/src/mvt/ios/modules/mixed/firefox_history.py b/src/mvt/ios/modules/mixed/firefox_history.py index 67adda5..d61f19c 100644 --- a/src/mvt/ios/modules/mixed/firefox_history.py +++ b/src/mvt/ios/modules/mixed/firefox_history.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import convert_unix_to_iso from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -64,7 +64,9 @@ class FirefoxHistory(IOSExtraction): ioc_match = self.indicators.check_url(result["url"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -72,6 +74,8 @@ class FirefoxHistory(IOSExtraction): ) self.log.info("Found Firefox history database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() cur.execute( diff --git a/src/mvt/ios/modules/mixed/global_preferences.py b/src/mvt/ios/modules/mixed/global_preferences.py index 9f11db9..04b11bc 100644 --- a/src/mvt/ios/modules/mixed/global_preferences.py +++ b/src/mvt/ios/modules/mixed/global_preferences.py @@ -42,9 +42,9 @@ class GlobalPreferences(IOSExtraction): for entry in self.results: if entry["entry"] == "LDMGlobalEnabled": if entry["value"]: - self.alertstore.info("Lockdown mode enabled", "", None) + self.alertstore.info("Lockdown mode enabled", "", entry) else: - self.alertstore.low("Lockdown mode disabled", "", None) + self.alertstore.low("Lockdown mode disabled", "", entry) continue def process_file(self, file_path: str) -> None: @@ -61,6 +61,8 @@ class GlobalPreferences(IOSExtraction): ) self.log.info("Found Global Preference database at path: %s", self.file_path) + if not self.file_path: + return self.process_file(self.file_path) self.log.info("Extracted a total of %d Global Preferences", len(self.results)) diff --git a/src/mvt/ios/modules/mixed/idstatuscache.py b/src/mvt/ios/modules/mixed/idstatuscache.py index 1d836fc..b1e06dd 100644 --- a/src/mvt/ios/modules/mixed/idstatuscache.py +++ b/src/mvt/ios/modules/mixed/idstatuscache.py @@ -8,12 +8,12 @@ import logging import plistlib from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -67,13 +67,12 @@ class IDStatusCache(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", result + ioc_match.message, "", result, matched_indicator=ioc_match.ioc ) continue if "\\x00\\x00" in result.get("user", ""): self.alertstore.high( - self.get_slug(), f"Found an ID Status Cache entry with suspicious patterns: {result.get('user')}", "", result, diff --git a/src/mvt/ios/modules/mixed/interactionc.py b/src/mvt/ios/modules/mixed/interactionc.py index 8eadda4..078ac6a 100644 --- a/src/mvt/ios/modules/mixed/interactionc.py +++ b/src/mvt/ios/modules/mixed/interactionc.py @@ -7,12 +7,12 @@ import logging import sqlite3 from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -285,6 +285,8 @@ class InteractionC(IOSExtraction): ) self.log.info("Found InteractionC database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() diff --git a/src/mvt/ios/modules/mixed/locationd.py b/src/mvt/ios/modules/mixed/locationd.py index 35a1c49..b16c0a1 100644 --- a/src/mvt/ios/modules/mixed/locationd.py +++ b/src/mvt/ios/modules/mixed/locationd.py @@ -8,12 +8,12 @@ import logging import plistlib from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -86,7 +86,6 @@ class LocationdClients(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), f"Found a suspicious process name in LocationD entry {result['package']}", "", result, @@ -99,7 +98,6 @@ class LocationdClients(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), f"Found a suspicious process name in LocationD entry {result['package']}", "", result, @@ -111,8 +109,7 @@ class LocationdClients(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), - f"Found a suspicious file path in LocationD entry {result['BundlePath']}", + f"Found a known malicious domain in LocationD entry {result['package']}", "", result, ) @@ -124,7 +121,6 @@ class LocationdClients(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), f"Found a suspicious file path in LocationD entry {result['Executable']}", "", result, @@ -141,7 +137,6 @@ class LocationdClients(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.high( - self.get_slug(), f"Found a suspicious file path in LocationD entry {result['Registered']}", "", result, diff --git a/src/mvt/ios/modules/mixed/osanalytics_addaily.py b/src/mvt/ios/modules/mixed/osanalytics_addaily.py index 1238d3c..7191975 100644 --- a/src/mvt/ios/modules/mixed/osanalytics_addaily.py +++ b/src/mvt/ios/modules/mixed/osanalytics_addaily.py @@ -7,12 +7,12 @@ import logging import plistlib from typing import Optional -from mvt.common.utils import convert_datetime_to_iso from mvt.common.module_types import ( - ModuleResults, ModuleAtomicResult, + ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -65,7 +65,9 @@ class OSAnalyticsADDaily(IOSExtraction): ioc_match = self.indicators.check_process(result["package"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -76,6 +78,8 @@ class OSAnalyticsADDaily(IOSExtraction): "Found com.apple.osanalytics.addaily plist at path: %s", self.file_path ) + if not self.file_path: + return with open(self.file_path, "rb") as handle: file_plist = plistlib.load(handle) diff --git a/src/mvt/ios/modules/mixed/safari_browserstate.py b/src/mvt/ios/modules/mixed/safari_browserstate.py index cbeeb24..021f690 100644 --- a/src/mvt/ios/modules/mixed/safari_browserstate.py +++ b/src/mvt/ios/modules/mixed/safari_browserstate.py @@ -10,12 +10,12 @@ import plistlib import sqlite3 from typing import Optional -from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string from mvt.common.module_types import ( + ModuleAtomicResult, ModuleResults, ModuleSerializedResult, - ModuleAtomicResult, ) +from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string from ..base import IOSExtraction @@ -67,7 +67,7 @@ class SafariBrowserState(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", result + ioc_match.message, "", result, matched_indicator=ioc_match.ioc ) continue @@ -80,7 +80,10 @@ class SafariBrowserState(IOSExtraction): if ioc_match: result["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", result + ioc_match.message, + "", + result, + matched_indicator=ioc_match.ioc, ) def _process_browser_state_db(self, db_path): diff --git a/src/mvt/ios/modules/mixed/safari_history.py b/src/mvt/ios/modules/mixed/safari_history.py index 213cab6..8b5dd8c 100644 --- a/src/mvt/ios/modules/mixed/safari_history.py +++ b/src/mvt/ios/modules/mixed/safari_history.py @@ -7,13 +7,13 @@ import logging import os from typing import Optional -from mvt.common.url import URL -from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso from mvt.common.module_types import ( - ModuleResults, ModuleAtomicResult, + ModuleResults, ModuleSerializedResult, ) +from mvt.common.url import URL +from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso from ..base import IOSExtraction @@ -117,7 +117,9 @@ class SafariHistory(IOSExtraction): ioc_match = self.indicators.check_url(result["url"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def _process_history_db(self, history_path): self._recover_sqlite_db_if_needed(history_path) diff --git a/src/mvt/ios/modules/mixed/shortcuts.py b/src/mvt/ios/modules/mixed/shortcuts.py index 38735a0..762769f 100644 --- a/src/mvt/ios/modules/mixed/shortcuts.py +++ b/src/mvt/ios/modules/mixed/shortcuts.py @@ -10,12 +10,12 @@ import plistlib import sqlite3 from typing import Optional -from mvt.common.utils import check_for_links, convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -80,7 +80,9 @@ class Shortcuts(IOSExtraction): ioc_match = self.indicators.check_urls(result["action_urls"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -88,6 +90,8 @@ class Shortcuts(IOSExtraction): ) self.log.info("Found Shortcuts database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) conn.text_factory = bytes cur = conn.cursor() diff --git a/src/mvt/ios/modules/mixed/sms.py b/src/mvt/ios/modules/mixed/sms.py index 5f64941..5005ff5 100644 --- a/src/mvt/ios/modules/mixed/sms.py +++ b/src/mvt/ios/modules/mixed/sms.py @@ -8,12 +8,12 @@ import sqlite3 from base64 import b64encode from typing import Optional -from mvt.common.utils import check_for_links, convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -95,12 +95,17 @@ class SMS(IOSExtraction): ioc_match = self.indicators.check_urls(message_links) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS) self.log.info("Found SMS database at path: %s", self.file_path) + if not self.file_path: + return + try: conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() @@ -118,6 +123,7 @@ class SMS(IOSExtraction): except sqlite3.DatabaseError as exc: conn.close() if "database disk image is malformed" in str(exc): + assert self.file_path is not None self._recover_sqlite_db_if_needed(self.file_path, forced=True) conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() diff --git a/src/mvt/ios/modules/mixed/sms_attachments.py b/src/mvt/ios/modules/mixed/sms_attachments.py index 0b9a6e3..53943a8 100644 --- a/src/mvt/ios/modules/mixed/sms_attachments.py +++ b/src/mvt/ios/modules/mixed/sms_attachments.py @@ -7,12 +7,12 @@ import logging from base64 import b64encode from typing import Optional -from mvt.common.utils import convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) +from mvt.common.utils import convert_mactime_to_iso from ..base import IOSExtraction @@ -65,9 +65,7 @@ class SMSAttachments(IOSExtraction): ioc_match = self.indicators.check_file_path(attachment["filename"]) if ioc_match: attachment["matched_indicator"] = ioc_match.ioc - self.alertstore.high( - self.get_slug(), ioc_match.message, "", attachment - ) + self.alertstore.high(ioc_match.message, "", attachment) if ( attachment["filename"].startswith("/var/tmp/") @@ -85,6 +83,8 @@ class SMSAttachments(IOSExtraction): self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS) self.log.info("Found SMS database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() cur.execute( diff --git a/src/mvt/ios/modules/mixed/webkit_resource_load_statistics.py b/src/mvt/ios/modules/mixed/webkit_resource_load_statistics.py index 4de6df6..cae6a03 100644 --- a/src/mvt/ios/modules/mixed/webkit_resource_load_statistics.py +++ b/src/mvt/ios/modules/mixed/webkit_resource_load_statistics.py @@ -8,12 +8,12 @@ import os import sqlite3 from typing import Optional -from mvt.common.utils import convert_unix_to_iso from mvt.common.module_types import ( ModuleAtomicResult, - ModuleSerializedResult, ModuleResults, + ModuleSerializedResult, ) +from mvt.common.utils import convert_unix_to_iso from ..base import IOSExtraction @@ -69,7 +69,9 @@ class WebkitResourceLoadStatistics(IOSExtraction): ioc_match = self.indicators.check_url(result["registrable_domain"]) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) self.alertstore.log_latest() def _process_observations_db(self, db_path: str, domain: str, path: str) -> None: diff --git a/src/mvt/ios/modules/mixed/webkit_session_resource_log.py b/src/mvt/ios/modules/mixed/webkit_session_resource_log.py index 8c88c66..27d320a 100644 --- a/src/mvt/ios/modules/mixed/webkit_session_resource_log.py +++ b/src/mvt/ios/modules/mixed/webkit_session_resource_log.py @@ -8,8 +8,8 @@ import os import plistlib from typing import Optional -from mvt.common.utils import convert_datetime_to_iso from mvt.common.module_types import ModuleResults +from mvt.common.utils import convert_datetime_to_iso from ..base import IOSExtraction @@ -50,7 +50,7 @@ class WebkitSessionResourceLog(IOSExtraction): results=results, ) - self.results = {} if not results else results + self.results: dict = {} @staticmethod def _extract_domains(entries): @@ -83,15 +83,15 @@ class WebkitSessionResourceLog(IOSExtraction): # subresource_domains = self._extract_domains( # entry["subresource_under_origin"]) - all_origins = set( - [entry["origin"]] + source_domains + destination_domains + all_origins = list( + set([entry["origin"]] + source_domains + destination_domains) ) ioc_match = self.indicators.check_urls(all_origins) if ioc_match: entry["matched_indicator"] = ioc_match.ioc self.alertstore.critical( - self.get_slug(), ioc_match.message, "", entry + ioc_match.message, "", entry, matched_indicator=ioc_match.ioc ) redirect_path = "" @@ -114,7 +114,6 @@ class WebkitSessionResourceLog(IOSExtraction): redirect_path += ", ".join(destination_domains) self.alertstore.high( - self.get_slug(), f"Found HTTP redirect between suspicious domains: {redirect_path}", "", entry, @@ -190,6 +189,8 @@ class WebkitSessionResourceLog(IOSExtraction): self.log.info( "Found Safari browsing session resource log at path: %s", log_path ) + if not self.target_path: + continue key = os.path.relpath(log_path, self.target_path) self.results[key] = self._extract_browsing_stats(log_path) diff --git a/src/mvt/ios/modules/mixed/whatsapp.py b/src/mvt/ios/modules/mixed/whatsapp.py index 4fdeba3..c1c0408 100644 --- a/src/mvt/ios/modules/mixed/whatsapp.py +++ b/src/mvt/ios/modules/mixed/whatsapp.py @@ -6,12 +6,12 @@ import logging from typing import Optional -from mvt.common.utils import check_for_links, convert_mactime_to_iso from mvt.common.module_types import ( ModuleAtomicResult, ModuleResults, ModuleSerializedResult, ) +from mvt.common.utils import check_for_links, convert_mactime_to_iso from ..base import IOSExtraction @@ -65,7 +65,9 @@ class Whatsapp(IOSExtraction): ioc_match = self.indicators.check_urls(result.get("links", [])) if ioc_match: result["matched_indicator"] = ioc_match.ioc - self.alertstore.critical(self.get_slug(), ioc_match.message, "", result) + self.alertstore.critical( + ioc_match.message, "", result, matched_indicator=ioc_match.ioc + ) def run(self) -> None: self._find_ios_database( @@ -73,6 +75,8 @@ class Whatsapp(IOSExtraction): ) self.log.info("Found WhatsApp database at path: %s", self.file_path) + if not self.file_path: + return conn = self._open_sqlite_db(self.file_path) cur = conn.cursor() @@ -102,7 +106,9 @@ class Whatsapp(IOSExtraction): for index, value in enumerate(message_row): message[names[index]] = value - message["isodate"] = convert_mactime_to_iso(message.get("ZMESSAGEDATE")) + message["isodate"] = convert_mactime_to_iso( + message.get("ZMESSAGEDATE") or 0 + ) message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else "" # Extract links from the WhatsApp message. URLs can be stored in diff --git a/src/mvt/ios/versions.py b/src/mvt/ios/versions.py index bae3bd0..1ce1800 100644 --- a/src/mvt/ios/versions.py +++ b/src/mvt/ios/versions.py @@ -9,8 +9,10 @@ from typing import Dict, Optional import packaging -IPHONE_MODELS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_models.json")) -IPHONE_IOS_VERSIONS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_versions.json")) +IPHONE_MODELS = json.loads(pkgutil.get_data("mvt", "ios/data/ios_models.json") or b"[]") +IPHONE_IOS_VERSIONS = json.loads( + pkgutil.get_data("mvt", "ios/data/ios_versions.json") or b"[]" +) def get_device_desc_from_id(identifier: str, devices_list: list = IPHONE_MODELS) -> str: