Add custom module loading

This commit is contained in:
Janik Besendorf
2026-06-17 20:18:28 +02:00
parent b9f13b8146
commit 26bece59ee
18 changed files with 793 additions and 10 deletions
+5
View File
@@ -60,6 +60,11 @@ For alternative installation options and known issues, please refer to the [docu
MVT provides two commands `mvt-ios` and `mvt-android`. [Check out the documentation to learn how to use them!](https://docs.mvt.re/)
Module-running `check-*` commands can load custom Python modules with
`--load-module PATH` or from a folder set in `MVT_CUSTOM_MODULES`. See the
[development documentation](https://docs.mvt.re/en/latest/development/) for
details.
## License
+76
View File
@@ -39,6 +39,82 @@ Selecting a single module also runs its transitive dependencies. If a dependency
is unavailable or the dependency graph contains a cycle, the command logs a
warning and does not run any modules.
## Custom modules
Module-running `check-*` commands can load custom modules from Python files that
are not installed as part of MVT. Load one file with:
```bash
mvt-ios check-backup --load-module ./example_module.py --output ./out ./backup
```
You can also load a folder. MVT loads non-hidden top-level `*.py` files in
sorted order and skips `__init__.py`:
```bash
mvt-ios check-fs --load-module ./custom_modules ./filesystem-dump
```
Set `MVT_CUSTOM_MODULES` to load a folder for every module-running command. This
folder is loaded before any `--load-module` path:
```bash
MVT_CUSTOM_MODULES=./custom_modules mvt-android check-bugreport ./bugreport.zip
```
Custom modules are normal `MVTModule` subclasses:
```python
from mvt.common.module import MVTModule
class ExampleCustomModule(MVTModule):
supported_commands = (("ios", "check-backup"), ("ios", "check-fs"))
slug = "example_custom_module"
def run(self):
self.results = [{"message": "custom module ran"}]
def check_indicators(self):
pass
def serialize(self, result):
return None
```
Use `supported_commands` to restrict a module to specific platform/command
pairs. Missing or empty `supported_commands` means the module is available to
all commands, which keeps older modules compatible. Supported pairs are:
```python
("ios", "check-backup")
("ios", "check-fs")
("ios", "check-iocs")
("android", "check-backup")
("android", "check-bugreport")
("android", "check-androidqf")
("android", "check-intrusion-logs")
("android", "check-iocs")
```
Custom modules can depend on existing MVT module classes. Dependencies are
resolved with the same ordering logic as built-in modules, and custom modules
are appended after built-ins before ordering:
```python
from mvt.common.module import MVTModule
from mvt.ios.modules.backup.manifest import Manifest
class DependentCustomModule(MVTModule):
supported_commands = (("ios", "check-backup"),)
dependencies = (Manifest,)
def run(self):
manifest_results = self.get_dependency_results(Manifest)
self.results = [{"manifest_entries": len(manifest_results)}]
```
## Profiling
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
+69 -2
View File
@@ -22,6 +22,7 @@ from mvt.common.help import (
HELP_MSG_HASHES,
HELP_MSG_IOC,
HELP_MSG_LIST_MODULES,
HELP_MSG_LOAD_MODULE,
HELP_MSG_MODULE,
HELP_MSG_NONINTERACTIVE,
HELP_MSG_OUTPUT,
@@ -30,6 +31,7 @@ from mvt.common.help import (
HELP_MSG_VERSION,
)
from mvt.common.logo import logo
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
from mvt.common.updates import IndicatorsUpdates
from mvt.common.utils import init_logging, set_verbose_logging
@@ -59,6 +61,13 @@ def _get_disable_flags(ctx):
)
def _load_custom_modules(load_module):
try:
return load_custom_modules(load_module)
except CustomModuleLoadError as exc:
raise click.ClickException(str(exc)) from exc
# ==============================================================================
# Main
# ==============================================================================
@@ -119,11 +128,28 @@ def check_adb(ctx):
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
@click.pass_context
def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_path):
def check_bugreport(
ctx,
iocs,
output,
list_modules,
module,
load_module,
verbose,
bugreport_path,
):
set_verbose_logging(verbose)
custom_modules = _load_custom_modules(load_module)
# Always generate hashes as bug reports are small.
cmd = CmdAndroidCheckBugreport(
target_path=bugreport_path,
@@ -133,6 +159,7 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
hashes=True,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -164,6 +191,13 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
)
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@@ -174,12 +208,14 @@ def check_backup(
iocs,
output,
list_modules,
load_module,
non_interactive,
backup_password,
verbose,
backup_path,
):
set_verbose_logging(verbose)
custom_modules = _load_custom_modules(load_module)
# Always generate hashes as backups are generally small.
cmd = CmdAndroidCheckBackup(
@@ -193,6 +229,7 @@ def check_backup(
},
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -223,6 +260,13 @@ def check_backup(
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
@@ -235,6 +279,7 @@ def check_androidqf(
output,
list_modules,
module,
load_module,
hashes,
non_interactive,
backup_password,
@@ -242,6 +287,7 @@ def check_androidqf(
androidqf_path,
):
set_verbose_logging(verbose)
custom_modules = _load_custom_modules(load_module)
cmd = CmdAndroidCheckAndroidQF(
target_path=androidqf_path,
@@ -255,6 +301,7 @@ def check_androidqf(
},
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -288,6 +335,13 @@ def check_androidqf(
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option(
"--timezone",
"-t",
@@ -307,11 +361,13 @@ def check_intrusion_logs(
output,
list_modules,
module,
load_module,
timezone,
verbose,
logs_path,
):
set_verbose_logging(verbose)
custom_modules = _load_custom_modules(load_module)
module_options = {}
if timezone:
@@ -325,6 +381,7 @@ def check_intrusion_logs(
module_options=module_options if module_options else None,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -352,15 +409,25 @@ def check_intrusion_logs(
)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
def check_iocs(ctx, iocs, list_modules, module, load_module, folder):
custom_modules = _load_custom_modules(load_module)
cmd = CmdCheckIOCS(
target_path=folder,
ioc_files=iocs,
module_name=module,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
platform="android",
)
cmd.modules = (
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
+7
View File
@@ -17,6 +17,7 @@ from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.androidqf import ANDROIDQF_MODULES
from .modules.androidqf.base import AndroidQFModule
@@ -50,6 +51,7 @@ class CmdAndroidCheckAndroidQF(Command):
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -64,8 +66,10 @@ class CmdAndroidCheckAndroidQF(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "android"
self.name = "check-androidqf"
self.modules = ANDROIDQF_MODULES
@@ -210,6 +214,7 @@ class CmdAndroidCheckAndroidQF(Command):
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
custom_modules=self.custom_modules,
)
cmd.from_zip(bugreport)
cmd.run()
@@ -239,6 +244,7 @@ class CmdAndroidCheckAndroidQF(Command):
module_options=self.module_options,
hashes=self.hashes,
sub_command=True,
custom_modules=self.custom_modules,
)
cmd.from_ab(backup)
cmd.run()
@@ -311,6 +317,7 @@ class CmdAndroidCheckAndroidQF(Command):
module_options=adv_module_options,
hashes=self.hashes,
sub_command=True,
custom_modules=self.custom_modules,
)
cmd.run()
+4
View File
@@ -21,6 +21,7 @@ from mvt.android.parsers.backup import (
)
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.backup import BACKUP_MODULES
@@ -41,6 +42,7 @@ class CmdAndroidCheckBackup(Command):
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -55,8 +57,10 @@ class CmdAndroidCheckBackup(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "android"
self.name = "check-backup"
self.modules = BACKUP_MODULES
+4
View File
@@ -12,6 +12,7 @@ from zipfile import ZipFile
from mvt.android.modules.bugreport.base import BugReportModule
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.bugreport import BUGREPORT_MODULES
@@ -32,6 +33,7 @@ class CmdAndroidCheckBugreport(Command):
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -46,8 +48,10 @@ class CmdAndroidCheckBugreport(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "android"
self.name = "check-bugreport"
self.modules = BUGREPORT_MODULES
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.intrusion_logs import (
INTRUSION_LOGS_MODULES,
@@ -35,6 +36,7 @@ class CmdAndroidCheckIntrusionLogs(Command):
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -49,8 +51,10 @@ class CmdAndroidCheckIntrusionLogs(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "android"
self.name = "check-intrusion-logs"
self.modules = INTRUSION_LOGS_MODULES
self._all_events: dict[str, list[dict]] = {}
+6 -1
View File
@@ -8,6 +8,7 @@ import os
from typing import Optional
from mvt.common.command import Command
from mvt.common.module import MVTModule
from mvt.common.utils import exec_or_profile
log = logging.getLogger(__name__)
@@ -26,6 +27,8 @@ class CmdCheckIOCS(Command):
sub_command: Optional[bool] = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
platform: str = "",
) -> None:
super().__init__(
target_path=target_path,
@@ -39,14 +42,16 @@ class CmdCheckIOCS(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = platform
self.name = "check-iocs"
def run(self) -> None:
assert self.target_path is not None
all_modules = []
for entry in self.modules:
for entry in self._available_modules():
if entry not in all_modules:
all_modules.append(entry)
+24 -4
View File
@@ -19,6 +19,7 @@ from .alerts import AlertLevel, AlertStore
from .config import settings
from .indicators import Indicators
from .module import EncryptedBackupError, MVTModule, run_module, save_timeline
from .module_loader import module_supports_command
from .module_types import ModuleTimeline
from .utils import (
CustomJSONEncoder,
@@ -44,9 +45,12 @@ class Command:
log: logging.Logger = logging.getLogger(__name__),
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
self.name = ""
self.platform = ""
self.modules: list[type[MVTModule]] = []
self.custom_modules = custom_modules if custom_modules else []
self.target_path = target_path
self.results_path = results_path
@@ -199,9 +203,24 @@ class Command:
def list_modules(self) -> None:
self.log.info("Following is the list of available %s modules:", self.name)
for module in self.modules:
for module in self._available_modules():
self.log.info(" - %s", module.__name__)
def _available_modules(self) -> list[type[MVTModule]]:
modules = list(self.modules)
modules.extend(
module
for module in self.custom_modules
if module_supports_command(module, self.platform, self.name)
)
deduplicated = []
for module in modules:
if module not in deduplicated:
deduplicated.append(module)
return deduplicated
def init(self) -> None:
raise NotImplementedError
@@ -262,14 +281,15 @@ class Command:
def _ordered_modules(self) -> Optional[list[type[MVTModule]]]:
"""Return enabled modules in stable topological order."""
module_indexes = {module: index for index, module in enumerate(self.modules)}
modules = self._available_modules()
module_indexes = {module: index for index, module in enumerate(modules)}
if self.module_name:
selected = [
module for module in self.modules if module.__name__ == self.module_name
module for module in modules if module.__name__ == self.module_name
]
else:
selected = [module for module in self.modules if module.enabled]
selected = [module for module in modules if module.enabled]
required = set(selected)
pending = list(selected)
+4
View File
@@ -10,6 +10,10 @@ HELP_MSG_IOC = "Path to indicators file (can be invoked multiple time)"
HELP_MSG_FAST = "Avoid running time/resource consuming features"
HELP_MSG_LIST_MODULES = "Print list of available modules and exit"
HELP_MSG_MODULE = "Name of a single module you would like to run instead of all"
HELP_MSG_LOAD_MODULE = (
"Load custom MVT module(s) from a Python file or folder "
"(can be invoked multiple times)"
)
HELP_MSG_NONINTERACTIVE = "Don't ask interactive questions during processing"
HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
HELP_MSG_VERBOSE = "Verbose mode"
+1
View File
@@ -44,6 +44,7 @@ class MVTModule:
enabled: bool = True
slug: Optional[str] = None
dependencies: Sequence[type["MVTModule"]] = ()
supported_commands: Sequence[tuple[str, str]] = ()
def __init__(
self,
+127
View File
@@ -0,0 +1,127 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2026 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import hashlib
import importlib.util
import inspect
import os
import sys
from pathlib import Path
from types import ModuleType
from typing import Iterable, Optional
from .module import MVTModule
MVT_CUSTOM_MODULES_ENV = "MVT_CUSTOM_MODULES"
class CustomModuleLoadError(Exception):
pass
def _module_name_for_path(path: Path) -> str:
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
return f"_mvt_custom_module_{path.stem}_{digest}"
def _iter_module_files(path: Path) -> Iterable[Path]:
if path.is_file():
if path.suffix != ".py":
raise CustomModuleLoadError(f"Custom module file is not a Python file: {path}")
yield path
return
if path.is_dir():
for child in sorted(path.iterdir()):
if child.name.startswith("."):
continue
if child.name == "__init__.py":
continue
if child.is_file() and child.suffix == ".py":
yield child
return
raise CustomModuleLoadError(f"Custom module path does not exist: {path}")
def _load_python_file(path: Path) -> ModuleType:
module_name = _module_name_for_path(path)
spec = importlib.util.spec_from_file_location(module_name, path)
if spec is None or spec.loader is None:
raise CustomModuleLoadError(f"Unable to load custom module file: {path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
try:
spec.loader.exec_module(module)
except Exception as exc:
raise CustomModuleLoadError(f"Unable to import custom module {path}: {exc}") from exc
return module
def discover_mvt_modules(module: ModuleType) -> list[type[MVTModule]]:
modules = []
for _, obj in inspect.getmembers(module, inspect.isclass):
if obj is MVTModule:
continue
if obj.__module__ != module.__name__:
continue
if not issubclass(obj, MVTModule):
continue
modules.append(obj)
return modules
def load_custom_modules_from_path(path: str) -> list[type[MVTModule]]:
custom_modules: list[type[MVTModule]] = []
seen: set[tuple[str, str]] = set()
resolved_path = Path(path).expanduser().resolve()
for module_file in _iter_module_files(resolved_path):
loaded_module = _load_python_file(module_file)
for module_class in discover_mvt_modules(loaded_module):
key = (str(module_file), module_class.__qualname__)
if key in seen:
continue
seen.add(key)
custom_modules.append(module_class)
return custom_modules
def load_custom_modules(paths: Optional[Iterable[str]] = None) -> list[type[MVTModule]]:
search_paths: list[str] = []
env_path = os.environ.get(MVT_CUSTOM_MODULES_ENV)
if env_path:
search_paths.append(env_path)
if paths:
search_paths.extend(paths)
custom_modules: list[type[MVTModule]] = []
seen: set[tuple[str, str]] = set()
for path in search_paths:
for module_class in load_custom_modules_from_path(path):
source = Path(inspect.getfile(module_class)).resolve()
key = (str(source), module_class.__qualname__)
if key in seen:
continue
seen.add(key)
custom_modules.append(module_class)
return custom_modules
def module_supports_command(
module_class: type[MVTModule],
platform: str,
command: str,
) -> bool:
supported_commands = getattr(module_class, "supported_commands", None)
if not supported_commands:
return True
return (platform, command) in {tuple(entry) for entry in supported_commands}
+60 -3
View File
@@ -31,6 +31,7 @@ from mvt.common.help import (
HELP_MSG_OUTPUT,
HELP_MSG_FAST,
HELP_MSG_LIST_MODULES,
HELP_MSG_LOAD_MODULE,
HELP_MSG_MODULE,
HELP_MSG_VERBOSE,
HELP_MSG_CHECK_FS,
@@ -40,6 +41,7 @@ from mvt.common.help import (
HELP_MSG_DISABLE_UPDATE_CHECK,
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
)
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
from .cmd_check_backup import CmdIOSCheckBackup
from .cmd_check_fs import CmdIOSCheckFS
from .decrypt import DecryptBackup
@@ -65,6 +67,13 @@ def _get_disable_flags(ctx):
)
def _load_custom_modules(load_module):
try:
return load_custom_modules(load_module)
except CustomModuleLoadError as exc:
raise click.ClickException(str(exc)) from exc
# ==============================================================================
# Main
# ==============================================================================
@@ -229,15 +238,32 @@ def extract_key(password, key_file, backup_path):
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_backup(
ctx, iocs, output, fast, list_modules, module, hashes, verbose, backup_path
ctx,
iocs,
output,
fast,
list_modules,
module,
load_module,
hashes,
verbose,
backup_path,
):
set_verbose_logging(verbose)
module_options = {"fast_mode": fast}
custom_modules = _load_custom_modules(load_module)
cmd = CmdIOSCheckBackup(
target_path=backup_path,
@@ -248,6 +274,7 @@ def check_backup(
hashes=hashes,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -277,13 +304,32 @@ def check_backup(
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
@click.argument("DUMP_PATH", type=click.Path(exists=True))
@click.pass_context
def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dump_path):
def check_fs(
ctx,
iocs,
output,
fast,
list_modules,
module,
load_module,
hashes,
verbose,
dump_path,
):
set_verbose_logging(verbose)
module_options = {"fast_mode": fast}
custom_modules = _load_custom_modules(load_module)
cmd = CmdIOSCheckFS(
target_path=dump_path,
@@ -294,6 +340,7 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
hashes=hashes,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
)
if list_modules:
@@ -321,15 +368,25 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
)
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
@click.option("--module", "-m", help=HELP_MSG_MODULE)
@click.option(
"--load-module",
type=click.Path(exists=True),
multiple=True,
default=[],
help=HELP_MSG_LOAD_MODULE,
)
@click.argument("FOLDER", type=click.Path(exists=True))
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
def check_iocs(ctx, iocs, list_modules, module, load_module, folder):
custom_modules = _load_custom_modules(load_module)
cmd = CmdCheckIOCS(
target_path=folder,
ioc_files=iocs,
module_name=module,
disable_version_check=_get_disable_flags(ctx)[0],
disable_indicator_check=_get_disable_flags(ctx)[1],
custom_modules=custom_modules,
platform="ios",
)
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
+4
View File
@@ -8,6 +8,7 @@ from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.backup import BACKUP_MODULES
from .modules.mixed import MIXED_MODULES
@@ -29,6 +30,7 @@ class CmdIOSCheckBackup(Command):
sub_command: bool = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -43,8 +45,10 @@ class CmdIOSCheckBackup(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "ios"
self.name = "check-backup"
self.modules = BACKUP_MODULES + MIXED_MODULES
+4
View File
@@ -8,6 +8,7 @@ from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule
from .modules.fs import FS_MODULES
from .modules.mixed import MIXED_MODULES
@@ -29,6 +30,7 @@ class CmdIOSCheckFS(Command):
sub_command: bool = False,
disable_version_check: bool = False,
disable_indicator_check: bool = False,
custom_modules: Optional[list[type[MVTModule]]] = None,
) -> None:
super().__init__(
target_path=target_path,
@@ -42,8 +44,10 @@ class CmdIOSCheckFS(Command):
log=log,
disable_version_check=disable_version_check,
disable_indicator_check=disable_indicator_check,
custom_modules=custom_modules,
)
self.platform = "ios"
self.name = "check-fs"
self.modules = FS_MODULES + MIXED_MODULES
+56
View File
@@ -42,6 +42,19 @@ class IndependentModule(RecordingModule):
pass
class CustomIOSBackupModule(RecordingModule):
supported_commands = (("ios", "check-backup"),)
class CustomIOSFSModule(RecordingModule):
supported_commands = (("ios", "check-fs"),)
class CustomDependsOnBuiltin(RecordingModule):
supported_commands = (("ios", "check-backup"),)
dependencies = (FirstModule,)
class RecordingCommand(Command):
def init(self):
self.initialized = True
@@ -132,3 +145,46 @@ class TestCommand:
assert RecordingModule.run_order == []
assert not hasattr(cmd, "initialized")
assert "depends on unavailable module UnavailableModule" in caplog.text
def test_custom_modules_are_filtered_before_ordering(self):
cmd = RecordingCommand()
cmd.platform = "ios"
cmd.name = "check-backup"
cmd.modules = [FirstModule]
cmd.custom_modules = [CustomIOSBackupModule, CustomIOSFSModule]
assert [module.__name__ for module in cmd._ordered_modules()] == [
"FirstModule",
"CustomIOSBackupModule",
]
def test_selected_custom_module_runs(self):
cmd = RecordingCommand(module_name="CustomIOSBackupModule")
cmd.platform = "ios"
cmd.name = "check-backup"
cmd.custom_modules = [CustomIOSBackupModule]
cmd.run()
assert RecordingModule.run_order == ["CustomIOSBackupModule"]
def test_selected_unsupported_custom_module_does_not_run(self):
cmd = RecordingCommand(module_name="CustomIOSFSModule")
cmd.platform = "ios"
cmd.name = "check-backup"
cmd.custom_modules = [CustomIOSFSModule]
cmd.run()
assert RecordingModule.run_order == []
def test_custom_module_dependencies_use_topological_order(self):
cmd = RecordingCommand(module_name="CustomDependsOnBuiltin")
cmd.platform = "ios"
cmd.name = "check-backup"
cmd.modules = [SecondModule, FirstModule]
cmd.custom_modules = [CustomDependsOnBuiltin]
cmd.run()
assert RecordingModule.run_order == ["FirstModule", "CustomDependsOnBuiltin"]
+145
View File
@@ -0,0 +1,145 @@
import pytest
from mvt.common.module import MVTModule
from mvt.common.module_loader import (
CustomModuleLoadError,
load_custom_modules,
load_custom_modules_from_path,
module_supports_command,
)
MODULE_TEMPLATE = """
from mvt.common.module import MVTModule
class {name}(MVTModule):
supported_commands = {supported_commands!r}
def run(self):
pass
def check_indicators(self):
pass
def serialize(self, result):
return None
"""
def _write_module(path, name, supported_commands=()):
path.write_text(
MODULE_TEMPLATE.format(
name=name,
supported_commands=supported_commands,
),
encoding="utf-8",
)
return path
def test_load_custom_modules_from_python_file(tmp_path):
module_path = _write_module(tmp_path / "custom.py", "FileModule")
modules = load_custom_modules_from_path(str(module_path))
assert [module.__name__ for module in modules] == ["FileModule"]
assert issubclass(modules[0], MVTModule)
def test_load_custom_modules_from_folder_in_sorted_order(tmp_path):
_write_module(tmp_path / "b_module.py", "BModule")
_write_module(tmp_path / "a_module.py", "AModule")
_write_module(tmp_path / ".hidden.py", "HiddenModule")
_write_module(tmp_path / "__init__.py", "InitModule")
nested = tmp_path / "nested"
nested.mkdir()
_write_module(nested / "nested_module.py", "NestedModule")
modules = load_custom_modules_from_path(str(tmp_path))
assert [module.__name__ for module in modules] == ["AModule", "BModule"]
def test_discovery_ignores_imported_base_and_unrelated_classes(tmp_path):
module_path = tmp_path / "custom.py"
module_path.write_text(
"""
from mvt.common.module import MVTModule
class Unrelated:
pass
class DiscoveredModule(MVTModule):
def run(self):
pass
def check_indicators(self):
pass
def serialize(self, result):
return None
""",
encoding="utf-8",
)
modules = load_custom_modules_from_path(str(module_path))
assert [module.__name__ for module in modules] == ["DiscoveredModule"]
def test_load_custom_modules_deduplicates_same_class(tmp_path):
module_path = _write_module(tmp_path / "custom.py", "DuplicateModule")
modules = load_custom_modules([str(module_path), str(module_path)])
assert [module.__name__ for module in modules] == ["DuplicateModule"]
def test_load_custom_modules_raises_for_missing_path(tmp_path):
with pytest.raises(CustomModuleLoadError, match="does not exist"):
load_custom_modules_from_path(str(tmp_path / "missing.py"))
def test_load_custom_modules_raises_for_import_error(tmp_path):
module_path = tmp_path / "broken.py"
module_path.write_text("raise RuntimeError('broken import')", encoding="utf-8")
with pytest.raises(CustomModuleLoadError, match="broken import"):
load_custom_modules_from_path(str(module_path))
def test_load_custom_modules_loads_env_folder_first(tmp_path, monkeypatch):
env_folder = tmp_path / "env"
env_folder.mkdir()
cli_folder = tmp_path / "cli"
cli_folder.mkdir()
_write_module(env_folder / "env_module.py", "EnvModule")
_write_module(cli_folder / "cli_module.py", "CliModule")
monkeypatch.setenv("MVT_CUSTOM_MODULES", str(env_folder))
modules = load_custom_modules([str(cli_folder)])
assert [module.__name__ for module in modules] == ["EnvModule", "CliModule"]
def test_module_supports_command_defaults_to_all_commands(tmp_path):
module_path = _write_module(tmp_path / "custom.py", "DefaultModule")
module = load_custom_modules_from_path(str(module_path))[0]
assert module_supports_command(module, "ios", "check-backup")
assert module_supports_command(module, "android", "check-bugreport")
def test_module_supports_command_honors_supported_commands(tmp_path):
module_path = _write_module(
tmp_path / "custom.py",
"SpecificModule",
(("ios", "check-backup"),),
)
module = load_custom_modules_from_path(str(module_path))[0]
assert module_supports_command(module, "ios", "check-backup")
assert not module_supports_command(module, "ios", "check-fs")
+193
View File
@@ -0,0 +1,193 @@
from click.testing import CliRunner
from mvt.android.cli import check_bugreport
from mvt.android.cmd_check_androidqf import CmdAndroidCheckAndroidQF
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
from mvt.common.module import MVTModule
from mvt.ios.cli import check_backup, check_fs
CUSTOM_MODULE = """
from mvt.common.module import MVTModule
class {name}(MVTModule):
supported_commands = {supported_commands!r}
slug = "{slug}"
def run(self):
self.results = [{{"message": "custom module ran"}}]
def check_indicators(self):
pass
def serialize(self, result):
return None
"""
def _write_custom_module(path, name, supported_commands, slug=None):
path.write_text(
CUSTOM_MODULE.format(
name=name,
supported_commands=supported_commands,
slug=slug or name.lower(),
),
encoding="utf-8",
)
return path
def test_load_module_appears_only_for_supported_cli_command(tmp_path):
module_path = _write_custom_module(
tmp_path / "custom.py",
"IOSBackupOnlyModule",
(("ios", "check-backup"),),
)
backup_result = CliRunner().invoke(
check_backup,
["--list-modules", "--load-module", str(module_path), str(tmp_path)],
)
fs_result = CliRunner().invoke(
check_fs,
["--list-modules", "--load-module", str(module_path), str(tmp_path)],
)
assert backup_result.exit_code == 0
assert "IOSBackupOnlyModule" in backup_result.output
assert fs_result.exit_code == 0
assert "IOSBackupOnlyModule" not in fs_result.output
def test_module_option_runs_supported_custom_module(tmp_path):
module_path = _write_custom_module(
tmp_path / "custom.py",
"CustomRunModule",
(("ios", "check-backup"),),
slug="custom_run_module",
)
output_path = tmp_path / "out"
result = CliRunner().invoke(
check_backup,
[
"--module",
"CustomRunModule",
"--load-module",
str(module_path),
"--output",
str(output_path),
str(tmp_path),
],
)
assert result.exit_code == 0
assert (output_path / "custom_run_module.json").exists()
def test_custom_modules_load_from_environment_without_cli_flag(tmp_path, monkeypatch):
custom_modules_path = tmp_path / "custom_modules"
custom_modules_path.mkdir()
_write_custom_module(
custom_modules_path / "env_module.py",
"EnvBugreportModule",
(("android", "check-bugreport"),),
)
monkeypatch.setenv("MVT_CUSTOM_MODULES", str(custom_modules_path))
result = CliRunner().invoke(check_bugreport, ["--list-modules", str(tmp_path)])
assert result.exit_code == 0
assert "EnvBugreportModule" in result.output
class NestedBugreportModule(MVTModule):
supported_commands = (("android", "check-bugreport"),)
class NestedBackupModule(MVTModule):
supported_commands = (("android", "check-backup"),)
class NestedIntrusionLogsModule(MVTModule):
supported_commands = (("android", "check-intrusion-logs"),)
class NestedAndroidQFModule(MVTModule):
supported_commands = (("android", "check-androidqf"),)
class DummyZip:
def close(self):
pass
def test_androidqf_propagates_custom_modules_to_nested_commands(tmp_path, monkeypatch):
records = {}
custom_modules = [
NestedBugreportModule,
NestedBackupModule,
NestedIntrusionLogsModule,
NestedAndroidQFModule,
]
cmd = CmdAndroidCheckAndroidQF(
target_path=str(tmp_path),
custom_modules=custom_modules,
)
def record_available(name):
def _record(command):
records[name] = [
module.__name__
for module in command._available_modules()
if module.__name__.startswith("Nested")
]
return _record
monkeypatch.setattr(cmd, "load_bugreport", lambda: DummyZip())
monkeypatch.setattr(
CmdAndroidCheckBugreport,
"from_zip",
lambda self, bugreport: None,
)
monkeypatch.setattr(
CmdAndroidCheckBugreport,
"run",
record_available("bugreport"),
)
monkeypatch.setattr(cmd, "load_backup", lambda: b"")
monkeypatch.setattr(CmdAndroidCheckBackup, "from_ab", lambda self, backup: None)
monkeypatch.setattr(
CmdAndroidCheckBackup,
"run",
record_available("backup"),
)
intrusion_logs_path = tmp_path / "intrusion_logs"
intrusion_logs_path.mkdir()
setattr(cmd, "_CmdAndroidCheckAndroidQF__format", "dir")
setattr(
cmd,
"_CmdAndroidCheckAndroidQF__files",
["androidqf/intrusion_logs/security.txt"],
)
monkeypatch.setattr(cmd, "_read_device_timezone", lambda: None)
monkeypatch.setattr(
CmdAndroidCheckIntrusionLogs,
"run",
record_available("intrusion_logs"),
)
assert cmd.run_bugreport_cmd()
assert cmd.run_backup_cmd()
assert cmd.run_intrusion_logs_cmd()
assert records == {
"bugreport": ["NestedBugreportModule"],
"backup": ["NestedBackupModule"],
"intrusion_logs": ["NestedIntrusionLogsModule"],
}