diff --git a/README.md b/README.md index 04db0ab..cb651a1 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,11 @@ For alternative installation options and known issues, please refer to the [docu MVT provides two commands `mvt-ios` and `mvt-android`. [Check out the documentation to learn how to use them!](https://docs.mvt.re/) +Module-running `check-*` commands can load custom Python modules with +`--load-module PATH` or from a folder set in `MVT_CUSTOM_MODULES`. See the +[development documentation](https://docs.mvt.re/en/latest/development/) for +details. + ## License diff --git a/docs/development.md b/docs/development.md index 6a15114..93a612f 100644 --- a/docs/development.md +++ b/docs/development.md @@ -39,6 +39,82 @@ Selecting a single module also runs its transitive dependencies. If a dependency is unavailable or the dependency graph contains a cycle, the command logs a warning and does not run any modules. +## Custom modules + +Module-running `check-*` commands can load custom modules from Python files that +are not installed as part of MVT. Load one file with: + +```bash +mvt-ios check-backup --load-module ./example_module.py --output ./out ./backup +``` + +You can also load a folder. MVT loads non-hidden top-level `*.py` files in +sorted order and skips `__init__.py`: + +```bash +mvt-ios check-fs --load-module ./custom_modules ./filesystem-dump +``` + +Set `MVT_CUSTOM_MODULES` to load a folder for every module-running command. This +folder is loaded before any `--load-module` path: + +```bash +MVT_CUSTOM_MODULES=./custom_modules mvt-android check-bugreport ./bugreport.zip +``` + +Custom modules are normal `MVTModule` subclasses: + +```python +from mvt.common.module import MVTModule + + +class ExampleCustomModule(MVTModule): + supported_commands = (("ios", "check-backup"), ("ios", "check-fs")) + slug = "example_custom_module" + + def run(self): + self.results = [{"message": "custom module ran"}] + + def check_indicators(self): + pass + + def serialize(self, result): + return None +``` + +Use `supported_commands` to restrict a module to specific platform/command +pairs. Missing or empty `supported_commands` means the module is available to +all commands, which keeps older modules compatible. Supported pairs are: + +```python +("ios", "check-backup") +("ios", "check-fs") +("ios", "check-iocs") +("android", "check-backup") +("android", "check-bugreport") +("android", "check-androidqf") +("android", "check-intrusion-logs") +("android", "check-iocs") +``` + +Custom modules can depend on existing MVT module classes. Dependencies are +resolved with the same ordering logic as built-in modules, and custom modules +are appended after built-ins before ordering: + +```python +from mvt.common.module import MVTModule +from mvt.ios.modules.backup.manifest import Manifest + + +class DependentCustomModule(MVTModule): + supported_commands = (("ios", "check-backup"),) + dependencies = (Manifest,) + + def run(self): + manifest_results = self.get_dependency_results(Manifest) + self.results = [{"manifest_entries": len(manifest_results)}] +``` + ## Profiling Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index cdb3b3e..364f4ef 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -22,6 +22,7 @@ from mvt.common.help import ( HELP_MSG_HASHES, HELP_MSG_IOC, HELP_MSG_LIST_MODULES, + HELP_MSG_LOAD_MODULE, HELP_MSG_MODULE, HELP_MSG_NONINTERACTIVE, HELP_MSG_OUTPUT, @@ -30,6 +31,7 @@ from mvt.common.help import ( HELP_MSG_VERSION, ) from mvt.common.logo import logo +from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules from mvt.common.updates import IndicatorsUpdates from mvt.common.utils import init_logging, set_verbose_logging @@ -59,6 +61,13 @@ def _get_disable_flags(ctx): ) +def _load_custom_modules(load_module): + try: + return load_custom_modules(load_module) + except CustomModuleLoadError as exc: + raise click.ClickException(str(exc)) from exc + + # ============================================================================== # Main # ============================================================================== @@ -119,11 +128,28 @@ def check_adb(ctx): @click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) @click.argument("BUGREPORT_PATH", type=click.Path(exists=True)) @click.pass_context -def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_path): +def check_bugreport( + ctx, + iocs, + output, + list_modules, + module, + load_module, + verbose, + bugreport_path, +): set_verbose_logging(verbose) + custom_modules = _load_custom_modules(load_module) # Always generate hashes as bug reports are small. cmd = CmdAndroidCheckBugreport( target_path=bugreport_path, @@ -133,6 +159,7 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_ hashes=True, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -164,6 +191,13 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_ ) @click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE) @click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD) @click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) @@ -174,12 +208,14 @@ def check_backup( iocs, output, list_modules, + load_module, non_interactive, backup_password, verbose, backup_path, ): set_verbose_logging(verbose) + custom_modules = _load_custom_modules(load_module) # Always generate hashes as backups are generally small. cmd = CmdAndroidCheckBackup( @@ -193,6 +229,7 @@ def check_backup( }, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -223,6 +260,13 @@ def check_backup( @click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES) @click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE) @click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD) @@ -235,6 +279,7 @@ def check_androidqf( output, list_modules, module, + load_module, hashes, non_interactive, backup_password, @@ -242,6 +287,7 @@ def check_androidqf( androidqf_path, ): set_verbose_logging(verbose) + custom_modules = _load_custom_modules(load_module) cmd = CmdAndroidCheckAndroidQF( target_path=androidqf_path, @@ -255,6 +301,7 @@ def check_androidqf( }, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -288,6 +335,13 @@ def check_androidqf( @click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option( "--timezone", "-t", @@ -307,11 +361,13 @@ def check_intrusion_logs( output, list_modules, module, + load_module, timezone, verbose, logs_path, ): set_verbose_logging(verbose) + custom_modules = _load_custom_modules(load_module) module_options = {} if timezone: @@ -325,6 +381,7 @@ def check_intrusion_logs( module_options=module_options if module_options else None, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -352,15 +409,25 @@ def check_intrusion_logs( ) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.argument("FOLDER", type=click.Path(exists=True)) @click.pass_context -def check_iocs(ctx, iocs, list_modules, module, folder): +def check_iocs(ctx, iocs, list_modules, module, load_module, folder): + custom_modules = _load_custom_modules(load_module) cmd = CmdCheckIOCS( target_path=folder, ioc_files=iocs, module_name=module, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, + platform="android", ) cmd.modules = ( BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 04c981b..41ad802 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -17,6 +17,7 @@ from mvt.android.cmd_check_backup import CmdAndroidCheckBackup, InvalidAndroidBa from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.androidqf import ANDROIDQF_MODULES from .modules.androidqf.base import AndroidQFModule @@ -50,6 +51,7 @@ class CmdAndroidCheckAndroidQF(Command): sub_command: Optional[bool] = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -64,8 +66,10 @@ class CmdAndroidCheckAndroidQF(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "android" self.name = "check-androidqf" self.modules = ANDROIDQF_MODULES @@ -210,6 +214,7 @@ class CmdAndroidCheckAndroidQF(Command): module_options=self.module_options, hashes=self.hashes, sub_command=True, + custom_modules=self.custom_modules, ) cmd.from_zip(bugreport) cmd.run() @@ -239,6 +244,7 @@ class CmdAndroidCheckAndroidQF(Command): module_options=self.module_options, hashes=self.hashes, sub_command=True, + custom_modules=self.custom_modules, ) try: cmd.from_ab(backup) @@ -318,6 +324,7 @@ class CmdAndroidCheckAndroidQF(Command): module_options=adv_module_options, hashes=self.hashes, sub_command=True, + custom_modules=self.custom_modules, ) cmd.run() diff --git a/src/mvt/android/cmd_check_backup.py b/src/mvt/android/cmd_check_backup.py index 3f91f07..b75bb34 100644 --- a/src/mvt/android/cmd_check_backup.py +++ b/src/mvt/android/cmd_check_backup.py @@ -21,6 +21,7 @@ from mvt.android.parsers.backup import ( ) from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.backup import BACKUP_MODULES @@ -45,6 +46,7 @@ class CmdAndroidCheckBackup(Command): sub_command: Optional[bool] = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -59,8 +61,10 @@ class CmdAndroidCheckBackup(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "android" self.name = "check-backup" self.modules = BACKUP_MODULES diff --git a/src/mvt/android/cmd_check_bugreport.py b/src/mvt/android/cmd_check_bugreport.py index b6f1367..1c03c6d 100644 --- a/src/mvt/android/cmd_check_bugreport.py +++ b/src/mvt/android/cmd_check_bugreport.py @@ -12,6 +12,7 @@ from zipfile import ZipFile from mvt.android.modules.bugreport.base import BugReportModule from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.bugreport import BUGREPORT_MODULES @@ -32,6 +33,7 @@ class CmdAndroidCheckBugreport(Command): sub_command: Optional[bool] = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -46,8 +48,10 @@ class CmdAndroidCheckBugreport(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "android" self.name = "check-bugreport" self.modules = BUGREPORT_MODULES diff --git a/src/mvt/android/cmd_check_intrusion_logs.py b/src/mvt/android/cmd_check_intrusion_logs.py index 8541f9a..95f2089 100644 --- a/src/mvt/android/cmd_check_intrusion_logs.py +++ b/src/mvt/android/cmd_check_intrusion_logs.py @@ -9,6 +9,7 @@ from typing import Optional from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.intrusion_logs import ( INTRUSION_LOGS_MODULES, @@ -35,6 +36,7 @@ class CmdAndroidCheckIntrusionLogs(Command): sub_command: Optional[bool] = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -49,8 +51,10 @@ class CmdAndroidCheckIntrusionLogs(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "android" self.name = "check-intrusion-logs" self.modules = INTRUSION_LOGS_MODULES self._all_events: dict[str, list[dict]] = {} diff --git a/src/mvt/common/cmd_check_iocs.py b/src/mvt/common/cmd_check_iocs.py index 35700ea..c227659 100644 --- a/src/mvt/common/cmd_check_iocs.py +++ b/src/mvt/common/cmd_check_iocs.py @@ -8,6 +8,7 @@ import os from typing import Optional from mvt.common.command import Command +from mvt.common.module import MVTModule from mvt.common.utils import exec_or_profile log = logging.getLogger(__name__) @@ -26,6 +27,8 @@ class CmdCheckIOCS(Command): sub_command: Optional[bool] = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, + platform: str = "", ) -> None: super().__init__( target_path=target_path, @@ -39,14 +42,16 @@ class CmdCheckIOCS(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = platform self.name = "check-iocs" def run(self) -> None: assert self.target_path is not None all_modules = [] - for entry in self.modules: + for entry in self._available_modules(): if entry not in all_modules: all_modules.append(entry) diff --git a/src/mvt/common/command.py b/src/mvt/common/command.py index 35daaeb..8d21aa3 100644 --- a/src/mvt/common/command.py +++ b/src/mvt/common/command.py @@ -19,6 +19,7 @@ from .alerts import AlertLevel, AlertStore from .config import settings from .indicators import Indicators from .module import EncryptedBackupError, MVTModule, run_module, save_timeline +from .module_loader import module_supports_command from .module_types import ModuleTimeline from .utils import ( CustomJSONEncoder, @@ -44,9 +45,12 @@ class Command: log: logging.Logger = logging.getLogger(__name__), disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: self.name = "" + self.platform = "" self.modules: list[type[MVTModule]] = [] + self.custom_modules = custom_modules if custom_modules else [] self.target_path = target_path self.results_path = results_path @@ -199,9 +203,24 @@ class Command: def list_modules(self) -> None: self.log.info("Following is the list of available %s modules:", self.name) - for module in self.modules: + for module in self._available_modules(): self.log.info(" - %s", module.__name__) + def _available_modules(self) -> list[type[MVTModule]]: + modules = list(self.modules) + modules.extend( + module + for module in self.custom_modules + if module_supports_command(module, self.platform, self.name) + ) + + deduplicated = [] + for module in modules: + if module not in deduplicated: + deduplicated.append(module) + + return deduplicated + def init(self) -> None: raise NotImplementedError @@ -262,14 +281,15 @@ class Command: def _ordered_modules(self) -> Optional[list[type[MVTModule]]]: """Return enabled modules in stable topological order.""" - module_indexes = {module: index for index, module in enumerate(self.modules)} + modules = self._available_modules() + module_indexes = {module: index for index, module in enumerate(modules)} if self.module_name: selected = [ - module for module in self.modules if module.__name__ == self.module_name + module for module in modules if module.__name__ == self.module_name ] else: - selected = [module for module in self.modules if module.enabled] + selected = [module for module in modules if module.enabled] required = set(selected) pending = list(selected) diff --git a/src/mvt/common/help.py b/src/mvt/common/help.py index 535a059..4cba131 100644 --- a/src/mvt/common/help.py +++ b/src/mvt/common/help.py @@ -10,6 +10,10 @@ HELP_MSG_IOC = "Path to indicators file (can be invoked multiple time)" HELP_MSG_FAST = "Avoid running time/resource consuming features" HELP_MSG_LIST_MODULES = "Print list of available modules and exit" HELP_MSG_MODULE = "Name of a single module you would like to run instead of all" +HELP_MSG_LOAD_MODULE = ( + "Load custom MVT module(s) from a Python file or folder " + "(can be invoked multiple times)" +) HELP_MSG_NONINTERACTIVE = "Don't ask interactive questions during processing" HELP_MSG_HASHES = "Generate hashes of all the files analyzed" HELP_MSG_VERBOSE = "Verbose mode" diff --git a/src/mvt/common/module.py b/src/mvt/common/module.py index 6e231be..14c5fe5 100644 --- a/src/mvt/common/module.py +++ b/src/mvt/common/module.py @@ -44,6 +44,7 @@ class MVTModule: enabled: bool = True slug: Optional[str] = None dependencies: Sequence[type["MVTModule"]] = () + supported_commands: Sequence[tuple[str, str]] = () def __init__( self, diff --git a/src/mvt/common/module_loader.py b/src/mvt/common/module_loader.py new file mode 100644 index 0000000..890ed0f --- /dev/null +++ b/src/mvt/common/module_loader.py @@ -0,0 +1,127 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2026 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import hashlib +import importlib.util +import inspect +import os +import sys +from pathlib import Path +from types import ModuleType +from typing import Iterable, Optional + +from .module import MVTModule + +MVT_CUSTOM_MODULES_ENV = "MVT_CUSTOM_MODULES" + + +class CustomModuleLoadError(Exception): + pass + + +def _module_name_for_path(path: Path) -> str: + digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16] + return f"_mvt_custom_module_{path.stem}_{digest}" + + +def _iter_module_files(path: Path) -> Iterable[Path]: + if path.is_file(): + if path.suffix != ".py": + raise CustomModuleLoadError(f"Custom module file is not a Python file: {path}") + yield path + return + + if path.is_dir(): + for child in sorted(path.iterdir()): + if child.name.startswith("."): + continue + if child.name == "__init__.py": + continue + if child.is_file() and child.suffix == ".py": + yield child + return + + raise CustomModuleLoadError(f"Custom module path does not exist: {path}") + + +def _load_python_file(path: Path) -> ModuleType: + module_name = _module_name_for_path(path) + spec = importlib.util.spec_from_file_location(module_name, path) + if spec is None or spec.loader is None: + raise CustomModuleLoadError(f"Unable to load custom module file: {path}") + + module = importlib.util.module_from_spec(spec) + sys.modules[module_name] = module + try: + spec.loader.exec_module(module) + except Exception as exc: + raise CustomModuleLoadError(f"Unable to import custom module {path}: {exc}") from exc + + return module + + +def discover_mvt_modules(module: ModuleType) -> list[type[MVTModule]]: + modules = [] + for _, obj in inspect.getmembers(module, inspect.isclass): + if obj is MVTModule: + continue + if obj.__module__ != module.__name__: + continue + if not issubclass(obj, MVTModule): + continue + modules.append(obj) + + return modules + + +def load_custom_modules_from_path(path: str) -> list[type[MVTModule]]: + custom_modules: list[type[MVTModule]] = [] + seen: set[tuple[str, str]] = set() + resolved_path = Path(path).expanduser().resolve() + + for module_file in _iter_module_files(resolved_path): + loaded_module = _load_python_file(module_file) + for module_class in discover_mvt_modules(loaded_module): + key = (str(module_file), module_class.__qualname__) + if key in seen: + continue + seen.add(key) + custom_modules.append(module_class) + + return custom_modules + + +def load_custom_modules(paths: Optional[Iterable[str]] = None) -> list[type[MVTModule]]: + search_paths: list[str] = [] + env_path = os.environ.get(MVT_CUSTOM_MODULES_ENV) + if env_path: + search_paths.append(env_path) + if paths: + search_paths.extend(paths) + + custom_modules: list[type[MVTModule]] = [] + seen: set[tuple[str, str]] = set() + for path in search_paths: + for module_class in load_custom_modules_from_path(path): + source = Path(inspect.getfile(module_class)).resolve() + key = (str(source), module_class.__qualname__) + if key in seen: + continue + seen.add(key) + custom_modules.append(module_class) + + return custom_modules + + +def module_supports_command( + module_class: type[MVTModule], + platform: str, + command: str, +) -> bool: + supported_commands = getattr(module_class, "supported_commands", None) + if not supported_commands: + return True + + return (platform, command) in {tuple(entry) for entry in supported_commands} diff --git a/src/mvt/ios/cli.py b/src/mvt/ios/cli.py index 446a0e6..d9f634c 100644 --- a/src/mvt/ios/cli.py +++ b/src/mvt/ios/cli.py @@ -31,6 +31,7 @@ from mvt.common.help import ( HELP_MSG_OUTPUT, HELP_MSG_FAST, HELP_MSG_LIST_MODULES, + HELP_MSG_LOAD_MODULE, HELP_MSG_MODULE, HELP_MSG_VERBOSE, HELP_MSG_CHECK_FS, @@ -40,6 +41,7 @@ from mvt.common.help import ( HELP_MSG_DISABLE_UPDATE_CHECK, HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK, ) +from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules from .cmd_check_backup import CmdIOSCheckBackup from .cmd_check_fs import CmdIOSCheckFS from .decrypt import DecryptBackup @@ -65,6 +67,13 @@ def _get_disable_flags(ctx): ) +def _load_custom_modules(load_module): + try: + return load_custom_modules(load_module) + except CustomModuleLoadError as exc: + raise click.ClickException(str(exc)) from exc + + # ============================================================================== # Main # ============================================================================== @@ -229,15 +238,32 @@ def extract_key(password, key_file, backup_path): @click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES) @click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) @click.argument("BACKUP_PATH", type=click.Path(exists=True)) @click.pass_context def check_backup( - ctx, iocs, output, fast, list_modules, module, hashes, verbose, backup_path + ctx, + iocs, + output, + fast, + list_modules, + module, + load_module, + hashes, + verbose, + backup_path, ): set_verbose_logging(verbose) module_options = {"fast_mode": fast} + custom_modules = _load_custom_modules(load_module) cmd = CmdIOSCheckBackup( target_path=backup_path, @@ -248,6 +274,7 @@ def check_backup( hashes=hashes, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -277,13 +304,32 @@ def check_backup( @click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES) @click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE) @click.argument("DUMP_PATH", type=click.Path(exists=True)) @click.pass_context -def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dump_path): +def check_fs( + ctx, + iocs, + output, + fast, + list_modules, + module, + load_module, + hashes, + verbose, + dump_path, +): set_verbose_logging(verbose) module_options = {"fast_mode": fast} + custom_modules = _load_custom_modules(load_module) cmd = CmdIOSCheckFS( target_path=dump_path, @@ -294,6 +340,7 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum hashes=hashes, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, ) if list_modules: @@ -321,15 +368,25 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum ) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.option( + "--load-module", + type=click.Path(exists=True), + multiple=True, + default=[], + help=HELP_MSG_LOAD_MODULE, +) @click.argument("FOLDER", type=click.Path(exists=True)) @click.pass_context -def check_iocs(ctx, iocs, list_modules, module, folder): +def check_iocs(ctx, iocs, list_modules, module, load_module, folder): + custom_modules = _load_custom_modules(load_module) cmd = CmdCheckIOCS( target_path=folder, ioc_files=iocs, module_name=module, disable_version_check=_get_disable_flags(ctx)[0], disable_indicator_check=_get_disable_flags(ctx)[1], + custom_modules=custom_modules, + platform="ios", ) cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES diff --git a/src/mvt/ios/cmd_check_backup.py b/src/mvt/ios/cmd_check_backup.py index 9200964..4264a7f 100644 --- a/src/mvt/ios/cmd_check_backup.py +++ b/src/mvt/ios/cmd_check_backup.py @@ -8,6 +8,7 @@ from typing import Optional from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.backup import BACKUP_MODULES from .modules.mixed import MIXED_MODULES @@ -29,6 +30,7 @@ class CmdIOSCheckBackup(Command): sub_command: bool = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -43,8 +45,10 @@ class CmdIOSCheckBackup(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "ios" self.name = "check-backup" self.modules = BACKUP_MODULES + MIXED_MODULES diff --git a/src/mvt/ios/cmd_check_fs.py b/src/mvt/ios/cmd_check_fs.py index 78325ba..e76146e 100644 --- a/src/mvt/ios/cmd_check_fs.py +++ b/src/mvt/ios/cmd_check_fs.py @@ -8,6 +8,7 @@ from typing import Optional from mvt.common.command import Command from mvt.common.indicators import Indicators +from mvt.common.module import MVTModule from .modules.fs import FS_MODULES from .modules.mixed import MIXED_MODULES @@ -29,6 +30,7 @@ class CmdIOSCheckFS(Command): sub_command: bool = False, disable_version_check: bool = False, disable_indicator_check: bool = False, + custom_modules: Optional[list[type[MVTModule]]] = None, ) -> None: super().__init__( target_path=target_path, @@ -42,8 +44,10 @@ class CmdIOSCheckFS(Command): log=log, disable_version_check=disable_version_check, disable_indicator_check=disable_indicator_check, + custom_modules=custom_modules, ) + self.platform = "ios" self.name = "check-fs" self.modules = FS_MODULES + MIXED_MODULES diff --git a/tests/common/test_command.py b/tests/common/test_command.py index 8a19441..865f9f1 100644 --- a/tests/common/test_command.py +++ b/tests/common/test_command.py @@ -42,6 +42,19 @@ class IndependentModule(RecordingModule): pass +class CustomIOSBackupModule(RecordingModule): + supported_commands = (("ios", "check-backup"),) + + +class CustomIOSFSModule(RecordingModule): + supported_commands = (("ios", "check-fs"),) + + +class CustomDependsOnBuiltin(RecordingModule): + supported_commands = (("ios", "check-backup"),) + dependencies = (FirstModule,) + + class RecordingCommand(Command): def init(self): self.initialized = True @@ -132,3 +145,46 @@ class TestCommand: assert RecordingModule.run_order == [] assert not hasattr(cmd, "initialized") assert "depends on unavailable module UnavailableModule" in caplog.text + + def test_custom_modules_are_filtered_before_ordering(self): + cmd = RecordingCommand() + cmd.platform = "ios" + cmd.name = "check-backup" + cmd.modules = [FirstModule] + cmd.custom_modules = [CustomIOSBackupModule, CustomIOSFSModule] + + assert [module.__name__ for module in cmd._ordered_modules()] == [ + "FirstModule", + "CustomIOSBackupModule", + ] + + def test_selected_custom_module_runs(self): + cmd = RecordingCommand(module_name="CustomIOSBackupModule") + cmd.platform = "ios" + cmd.name = "check-backup" + cmd.custom_modules = [CustomIOSBackupModule] + + cmd.run() + + assert RecordingModule.run_order == ["CustomIOSBackupModule"] + + def test_selected_unsupported_custom_module_does_not_run(self): + cmd = RecordingCommand(module_name="CustomIOSFSModule") + cmd.platform = "ios" + cmd.name = "check-backup" + cmd.custom_modules = [CustomIOSFSModule] + + cmd.run() + + assert RecordingModule.run_order == [] + + def test_custom_module_dependencies_use_topological_order(self): + cmd = RecordingCommand(module_name="CustomDependsOnBuiltin") + cmd.platform = "ios" + cmd.name = "check-backup" + cmd.modules = [SecondModule, FirstModule] + cmd.custom_modules = [CustomDependsOnBuiltin] + + cmd.run() + + assert RecordingModule.run_order == ["FirstModule", "CustomDependsOnBuiltin"] diff --git a/tests/common/test_module_loader.py b/tests/common/test_module_loader.py new file mode 100644 index 0000000..ce13427 --- /dev/null +++ b/tests/common/test_module_loader.py @@ -0,0 +1,145 @@ +import pytest + +from mvt.common.module import MVTModule +from mvt.common.module_loader import ( + CustomModuleLoadError, + load_custom_modules, + load_custom_modules_from_path, + module_supports_command, +) + + +MODULE_TEMPLATE = """ +from mvt.common.module import MVTModule + + +class {name}(MVTModule): + supported_commands = {supported_commands!r} + + def run(self): + pass + + def check_indicators(self): + pass + + def serialize(self, result): + return None +""" + + +def _write_module(path, name, supported_commands=()): + path.write_text( + MODULE_TEMPLATE.format( + name=name, + supported_commands=supported_commands, + ), + encoding="utf-8", + ) + return path + + +def test_load_custom_modules_from_python_file(tmp_path): + module_path = _write_module(tmp_path / "custom.py", "FileModule") + + modules = load_custom_modules_from_path(str(module_path)) + + assert [module.__name__ for module in modules] == ["FileModule"] + assert issubclass(modules[0], MVTModule) + + +def test_load_custom_modules_from_folder_in_sorted_order(tmp_path): + _write_module(tmp_path / "b_module.py", "BModule") + _write_module(tmp_path / "a_module.py", "AModule") + _write_module(tmp_path / ".hidden.py", "HiddenModule") + _write_module(tmp_path / "__init__.py", "InitModule") + nested = tmp_path / "nested" + nested.mkdir() + _write_module(nested / "nested_module.py", "NestedModule") + + modules = load_custom_modules_from_path(str(tmp_path)) + + assert [module.__name__ for module in modules] == ["AModule", "BModule"] + + +def test_discovery_ignores_imported_base_and_unrelated_classes(tmp_path): + module_path = tmp_path / "custom.py" + module_path.write_text( + """ +from mvt.common.module import MVTModule + + +class Unrelated: + pass + + +class DiscoveredModule(MVTModule): + def run(self): + pass + + def check_indicators(self): + pass + + def serialize(self, result): + return None +""", + encoding="utf-8", + ) + + modules = load_custom_modules_from_path(str(module_path)) + + assert [module.__name__ for module in modules] == ["DiscoveredModule"] + + +def test_load_custom_modules_deduplicates_same_class(tmp_path): + module_path = _write_module(tmp_path / "custom.py", "DuplicateModule") + + modules = load_custom_modules([str(module_path), str(module_path)]) + + assert [module.__name__ for module in modules] == ["DuplicateModule"] + + +def test_load_custom_modules_raises_for_missing_path(tmp_path): + with pytest.raises(CustomModuleLoadError, match="does not exist"): + load_custom_modules_from_path(str(tmp_path / "missing.py")) + + +def test_load_custom_modules_raises_for_import_error(tmp_path): + module_path = tmp_path / "broken.py" + module_path.write_text("raise RuntimeError('broken import')", encoding="utf-8") + + with pytest.raises(CustomModuleLoadError, match="broken import"): + load_custom_modules_from_path(str(module_path)) + + +def test_load_custom_modules_loads_env_folder_first(tmp_path, monkeypatch): + env_folder = tmp_path / "env" + env_folder.mkdir() + cli_folder = tmp_path / "cli" + cli_folder.mkdir() + _write_module(env_folder / "env_module.py", "EnvModule") + _write_module(cli_folder / "cli_module.py", "CliModule") + monkeypatch.setenv("MVT_CUSTOM_MODULES", str(env_folder)) + + modules = load_custom_modules([str(cli_folder)]) + + assert [module.__name__ for module in modules] == ["EnvModule", "CliModule"] + + +def test_module_supports_command_defaults_to_all_commands(tmp_path): + module_path = _write_module(tmp_path / "custom.py", "DefaultModule") + module = load_custom_modules_from_path(str(module_path))[0] + + assert module_supports_command(module, "ios", "check-backup") + assert module_supports_command(module, "android", "check-bugreport") + + +def test_module_supports_command_honors_supported_commands(tmp_path): + module_path = _write_module( + tmp_path / "custom.py", + "SpecificModule", + (("ios", "check-backup"),), + ) + module = load_custom_modules_from_path(str(module_path))[0] + + assert module_supports_command(module, "ios", "check-backup") + assert not module_supports_command(module, "ios", "check-fs") diff --git a/tests/test_custom_modules.py b/tests/test_custom_modules.py new file mode 100644 index 0000000..dcbbbc9 --- /dev/null +++ b/tests/test_custom_modules.py @@ -0,0 +1,193 @@ +from click.testing import CliRunner + +from mvt.android.cli import check_bugreport +from mvt.android.cmd_check_androidqf import CmdAndroidCheckAndroidQF +from mvt.android.cmd_check_backup import CmdAndroidCheckBackup +from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport +from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs +from mvt.common.module import MVTModule +from mvt.ios.cli import check_backup, check_fs + + +CUSTOM_MODULE = """ +from mvt.common.module import MVTModule + + +class {name}(MVTModule): + supported_commands = {supported_commands!r} + slug = "{slug}" + + def run(self): + self.results = [{{"message": "custom module ran"}}] + + def check_indicators(self): + pass + + def serialize(self, result): + return None +""" + + +def _write_custom_module(path, name, supported_commands, slug=None): + path.write_text( + CUSTOM_MODULE.format( + name=name, + supported_commands=supported_commands, + slug=slug or name.lower(), + ), + encoding="utf-8", + ) + return path + + +def test_load_module_appears_only_for_supported_cli_command(tmp_path): + module_path = _write_custom_module( + tmp_path / "custom.py", + "IOSBackupOnlyModule", + (("ios", "check-backup"),), + ) + + backup_result = CliRunner().invoke( + check_backup, + ["--list-modules", "--load-module", str(module_path), str(tmp_path)], + ) + fs_result = CliRunner().invoke( + check_fs, + ["--list-modules", "--load-module", str(module_path), str(tmp_path)], + ) + + assert backup_result.exit_code == 0 + assert "IOSBackupOnlyModule" in backup_result.output + assert fs_result.exit_code == 0 + assert "IOSBackupOnlyModule" not in fs_result.output + + +def test_module_option_runs_supported_custom_module(tmp_path): + module_path = _write_custom_module( + tmp_path / "custom.py", + "CustomRunModule", + (("ios", "check-backup"),), + slug="custom_run_module", + ) + output_path = tmp_path / "out" + + result = CliRunner().invoke( + check_backup, + [ + "--module", + "CustomRunModule", + "--load-module", + str(module_path), + "--output", + str(output_path), + str(tmp_path), + ], + ) + + assert result.exit_code == 0 + assert (output_path / "custom_run_module.json").exists() + + +def test_custom_modules_load_from_environment_without_cli_flag(tmp_path, monkeypatch): + custom_modules_path = tmp_path / "custom_modules" + custom_modules_path.mkdir() + _write_custom_module( + custom_modules_path / "env_module.py", + "EnvBugreportModule", + (("android", "check-bugreport"),), + ) + monkeypatch.setenv("MVT_CUSTOM_MODULES", str(custom_modules_path)) + + result = CliRunner().invoke(check_bugreport, ["--list-modules", str(tmp_path)]) + + assert result.exit_code == 0 + assert "EnvBugreportModule" in result.output + + +class NestedBugreportModule(MVTModule): + supported_commands = (("android", "check-bugreport"),) + + +class NestedBackupModule(MVTModule): + supported_commands = (("android", "check-backup"),) + + +class NestedIntrusionLogsModule(MVTModule): + supported_commands = (("android", "check-intrusion-logs"),) + + +class NestedAndroidQFModule(MVTModule): + supported_commands = (("android", "check-androidqf"),) + + +class DummyZip: + def close(self): + pass + + +def test_androidqf_propagates_custom_modules_to_nested_commands(tmp_path, monkeypatch): + records = {} + custom_modules = [ + NestedBugreportModule, + NestedBackupModule, + NestedIntrusionLogsModule, + NestedAndroidQFModule, + ] + cmd = CmdAndroidCheckAndroidQF( + target_path=str(tmp_path), + custom_modules=custom_modules, + ) + + def record_available(name): + def _record(command): + records[name] = [ + module.__name__ + for module in command._available_modules() + if module.__name__.startswith("Nested") + ] + + return _record + + monkeypatch.setattr(cmd, "load_bugreport", lambda: DummyZip()) + monkeypatch.setattr( + CmdAndroidCheckBugreport, + "from_zip", + lambda self, bugreport: None, + ) + monkeypatch.setattr( + CmdAndroidCheckBugreport, + "run", + record_available("bugreport"), + ) + + monkeypatch.setattr(cmd, "load_backup", lambda: b"") + monkeypatch.setattr(CmdAndroidCheckBackup, "from_ab", lambda self, backup: None) + monkeypatch.setattr( + CmdAndroidCheckBackup, + "run", + record_available("backup"), + ) + + intrusion_logs_path = tmp_path / "intrusion_logs" + intrusion_logs_path.mkdir() + setattr(cmd, "_CmdAndroidCheckAndroidQF__format", "dir") + setattr( + cmd, + "_CmdAndroidCheckAndroidQF__files", + ["androidqf/intrusion_logs/security.txt"], + ) + monkeypatch.setattr(cmd, "_read_device_timezone", lambda: None) + monkeypatch.setattr( + CmdAndroidCheckIntrusionLogs, + "run", + record_available("intrusion_logs"), + ) + + assert cmd.run_bugreport_cmd() + assert cmd.run_backup_cmd() + assert cmd.run_intrusion_logs_cmd() + assert records == { + "bugreport": ["NestedBugreportModule"], + "backup": ["NestedBackupModule"], + "intrusion_logs": ["NestedIntrusionLogsModule"], + }