mirror of
https://github.com/mvt-project/mvt.git
synced 2026-07-03 19:47:53 +02:00
Add custom module loading (#816)
This commit is contained in:
@@ -60,6 +60,11 @@ For alternative installation options and known issues, please refer to the [docu
|
||||
|
||||
MVT provides two commands `mvt-ios` and `mvt-android`. [Check out the documentation to learn how to use them!](https://docs.mvt.re/)
|
||||
|
||||
Module-running `check-*` commands can load custom Python modules with
|
||||
`--load-module PATH` or from a folder set in `MVT_CUSTOM_MODULES`. See the
|
||||
[development documentation](https://docs.mvt.re/en/latest/development/) for
|
||||
details.
|
||||
|
||||
|
||||
## License
|
||||
|
||||
|
||||
@@ -39,6 +39,82 @@ Selecting a single module also runs its transitive dependencies. If a dependency
|
||||
is unavailable or the dependency graph contains a cycle, the command logs a
|
||||
warning and does not run any modules.
|
||||
|
||||
## Custom modules
|
||||
|
||||
Module-running `check-*` commands can load custom modules from Python files that
|
||||
are not installed as part of MVT. Load one file with:
|
||||
|
||||
```bash
|
||||
mvt-ios check-backup --load-module ./example_module.py --output ./out ./backup
|
||||
```
|
||||
|
||||
You can also load a folder. MVT loads non-hidden top-level `*.py` files in
|
||||
sorted order and skips `__init__.py`:
|
||||
|
||||
```bash
|
||||
mvt-ios check-fs --load-module ./custom_modules ./filesystem-dump
|
||||
```
|
||||
|
||||
Set `MVT_CUSTOM_MODULES` to load a folder for every module-running command. This
|
||||
folder is loaded before any `--load-module` path:
|
||||
|
||||
```bash
|
||||
MVT_CUSTOM_MODULES=./custom_modules mvt-android check-bugreport ./bugreport.zip
|
||||
```
|
||||
|
||||
Custom modules are normal `MVTModule` subclasses:
|
||||
|
||||
```python
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class ExampleCustomModule(MVTModule):
|
||||
supported_commands = (("ios", "check-backup"), ("ios", "check-fs"))
|
||||
slug = "example_custom_module"
|
||||
|
||||
def run(self):
|
||||
self.results = [{"message": "custom module ran"}]
|
||||
|
||||
def check_indicators(self):
|
||||
pass
|
||||
|
||||
def serialize(self, result):
|
||||
return None
|
||||
```
|
||||
|
||||
Use `supported_commands` to restrict a module to specific platform/command
|
||||
pairs. Missing or empty `supported_commands` means the module is available to
|
||||
all commands, which keeps older modules compatible. Supported pairs are:
|
||||
|
||||
```python
|
||||
("ios", "check-backup")
|
||||
("ios", "check-fs")
|
||||
("ios", "check-iocs")
|
||||
("android", "check-backup")
|
||||
("android", "check-bugreport")
|
||||
("android", "check-androidqf")
|
||||
("android", "check-intrusion-logs")
|
||||
("android", "check-iocs")
|
||||
```
|
||||
|
||||
Custom modules can depend on existing MVT module classes. Dependencies are
|
||||
resolved with the same ordering logic as built-in modules, and custom modules
|
||||
are appended after built-ins before ordering:
|
||||
|
||||
```python
|
||||
from mvt.common.module import MVTModule
|
||||
from mvt.ios.modules.backup.manifest import Manifest
|
||||
|
||||
|
||||
class DependentCustomModule(MVTModule):
|
||||
supported_commands = (("ios", "check-backup"),)
|
||||
dependencies = (Manifest,)
|
||||
|
||||
def run(self):
|
||||
manifest_results = self.get_dependency_results(Manifest)
|
||||
self.results = [{"manifest_entries": len(manifest_results)}]
|
||||
```
|
||||
|
||||
## Profiling
|
||||
|
||||
Some MVT modules extract and process significant amounts of data during the analysis process or while checking results against known indicators. Care must be
|
||||
|
||||
+69
-2
@@ -22,6 +22,7 @@ from mvt.common.help import (
|
||||
HELP_MSG_HASHES,
|
||||
HELP_MSG_IOC,
|
||||
HELP_MSG_LIST_MODULES,
|
||||
HELP_MSG_LOAD_MODULE,
|
||||
HELP_MSG_MODULE,
|
||||
HELP_MSG_NONINTERACTIVE,
|
||||
HELP_MSG_OUTPUT,
|
||||
@@ -30,6 +31,7 @@ from mvt.common.help import (
|
||||
HELP_MSG_VERSION,
|
||||
)
|
||||
from mvt.common.logo import logo
|
||||
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
|
||||
from mvt.common.updates import IndicatorsUpdates
|
||||
from mvt.common.utils import init_logging, set_verbose_logging
|
||||
|
||||
@@ -59,6 +61,13 @@ def _get_disable_flags(ctx):
|
||||
)
|
||||
|
||||
|
||||
def _load_custom_modules(load_module):
|
||||
try:
|
||||
return load_custom_modules(load_module)
|
||||
except CustomModuleLoadError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@@ -119,11 +128,28 @@ def check_adb(ctx):
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@click.argument("BUGREPORT_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_path):
|
||||
def check_bugreport(
|
||||
ctx,
|
||||
iocs,
|
||||
output,
|
||||
list_modules,
|
||||
module,
|
||||
load_module,
|
||||
verbose,
|
||||
bugreport_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
# Always generate hashes as bug reports are small.
|
||||
cmd = CmdAndroidCheckBugreport(
|
||||
target_path=bugreport_path,
|
||||
@@ -133,6 +159,7 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
hashes=True,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -164,6 +191,13 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
|
||||
)
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
|
||||
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@@ -174,12 +208,14 @@ def check_backup(
|
||||
iocs,
|
||||
output,
|
||||
list_modules,
|
||||
load_module,
|
||||
non_interactive,
|
||||
backup_password,
|
||||
verbose,
|
||||
backup_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
|
||||
# Always generate hashes as backups are generally small.
|
||||
cmd = CmdAndroidCheckBackup(
|
||||
@@ -193,6 +229,7 @@ def check_backup(
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -223,6 +260,13 @@ def check_backup(
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
|
||||
@click.option("--non-interactive", "-n", is_flag=True, help=HELP_MSG_NONINTERACTIVE)
|
||||
@click.option("--backup-password", "-p", help=HELP_MSG_ANDROID_BACKUP_PASSWORD)
|
||||
@@ -235,6 +279,7 @@ def check_androidqf(
|
||||
output,
|
||||
list_modules,
|
||||
module,
|
||||
load_module,
|
||||
hashes,
|
||||
non_interactive,
|
||||
backup_password,
|
||||
@@ -242,6 +287,7 @@ def check_androidqf(
|
||||
androidqf_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
|
||||
cmd = CmdAndroidCheckAndroidQF(
|
||||
target_path=androidqf_path,
|
||||
@@ -255,6 +301,7 @@ def check_androidqf(
|
||||
},
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -288,6 +335,13 @@ def check_androidqf(
|
||||
@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option(
|
||||
"--timezone",
|
||||
"-t",
|
||||
@@ -307,11 +361,13 @@ def check_intrusion_logs(
|
||||
output,
|
||||
list_modules,
|
||||
module,
|
||||
load_module,
|
||||
timezone,
|
||||
verbose,
|
||||
logs_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
|
||||
module_options = {}
|
||||
if timezone:
|
||||
@@ -325,6 +381,7 @@ def check_intrusion_logs(
|
||||
module_options=module_options if module_options else None,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -352,15 +409,25 @@ def check_intrusion_logs(
|
||||
)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
def check_iocs(ctx, iocs, list_modules, module, load_module, folder):
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
platform="android",
|
||||
)
|
||||
cmd.modules = (
|
||||
BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES + INTRUSION_LOGS_MODULES
|
||||
|
||||
@@ -17,6 +17,7 @@ from mvt.android.cmd_check_backup import CmdAndroidCheckBackup, InvalidAndroidBa
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.androidqf import ANDROIDQF_MODULES
|
||||
from .modules.androidqf.base import AndroidQFModule
|
||||
@@ -50,6 +51,7 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -64,8 +66,10 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "android"
|
||||
self.name = "check-androidqf"
|
||||
self.modules = ANDROIDQF_MODULES
|
||||
|
||||
@@ -210,6 +214,7 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
custom_modules=self.custom_modules,
|
||||
)
|
||||
cmd.from_zip(bugreport)
|
||||
cmd.run()
|
||||
@@ -239,6 +244,7 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
module_options=self.module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
custom_modules=self.custom_modules,
|
||||
)
|
||||
try:
|
||||
cmd.from_ab(backup)
|
||||
@@ -318,6 +324,7 @@ class CmdAndroidCheckAndroidQF(Command):
|
||||
module_options=adv_module_options,
|
||||
hashes=self.hashes,
|
||||
sub_command=True,
|
||||
custom_modules=self.custom_modules,
|
||||
)
|
||||
cmd.run()
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@ from mvt.android.parsers.backup import (
|
||||
)
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
|
||||
@@ -45,6 +46,7 @@ class CmdAndroidCheckBackup(Command):
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -59,8 +61,10 @@ class CmdAndroidCheckBackup(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "android"
|
||||
self.name = "check-backup"
|
||||
self.modules = BACKUP_MODULES
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ from zipfile import ZipFile
|
||||
from mvt.android.modules.bugreport.base import BugReportModule
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.bugreport import BUGREPORT_MODULES
|
||||
|
||||
@@ -32,6 +33,7 @@ class CmdAndroidCheckBugreport(Command):
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -46,8 +48,10 @@ class CmdAndroidCheckBugreport(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "android"
|
||||
self.name = "check-bugreport"
|
||||
self.modules = BUGREPORT_MODULES
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.intrusion_logs import (
|
||||
INTRUSION_LOGS_MODULES,
|
||||
@@ -35,6 +36,7 @@ class CmdAndroidCheckIntrusionLogs(Command):
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -49,8 +51,10 @@ class CmdAndroidCheckIntrusionLogs(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "android"
|
||||
self.name = "check-intrusion-logs"
|
||||
self.modules = INTRUSION_LOGS_MODULES
|
||||
self._all_events: dict[str, list[dict]] = {}
|
||||
|
||||
@@ -8,6 +8,7 @@ import os
|
||||
from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.module import MVTModule
|
||||
from mvt.common.utils import exec_or_profile
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
@@ -26,6 +27,8 @@ class CmdCheckIOCS(Command):
|
||||
sub_command: Optional[bool] = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
platform: str = "",
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -39,14 +42,16 @@ class CmdCheckIOCS(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = platform
|
||||
self.name = "check-iocs"
|
||||
|
||||
def run(self) -> None:
|
||||
assert self.target_path is not None
|
||||
all_modules = []
|
||||
for entry in self.modules:
|
||||
for entry in self._available_modules():
|
||||
if entry not in all_modules:
|
||||
all_modules.append(entry)
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ from .alerts import AlertLevel, AlertStore
|
||||
from .config import settings
|
||||
from .indicators import Indicators
|
||||
from .module import EncryptedBackupError, MVTModule, run_module, save_timeline
|
||||
from .module_loader import module_supports_command
|
||||
from .module_types import ModuleTimeline
|
||||
from .utils import (
|
||||
CustomJSONEncoder,
|
||||
@@ -44,9 +45,12 @@ class Command:
|
||||
log: logging.Logger = logging.getLogger(__name__),
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
self.name = ""
|
||||
self.platform = ""
|
||||
self.modules: list[type[MVTModule]] = []
|
||||
self.custom_modules = custom_modules if custom_modules else []
|
||||
|
||||
self.target_path = target_path
|
||||
self.results_path = results_path
|
||||
@@ -199,9 +203,24 @@ class Command:
|
||||
|
||||
def list_modules(self) -> None:
|
||||
self.log.info("Following is the list of available %s modules:", self.name)
|
||||
for module in self.modules:
|
||||
for module in self._available_modules():
|
||||
self.log.info(" - %s", module.__name__)
|
||||
|
||||
def _available_modules(self) -> list[type[MVTModule]]:
|
||||
modules = list(self.modules)
|
||||
modules.extend(
|
||||
module
|
||||
for module in self.custom_modules
|
||||
if module_supports_command(module, self.platform, self.name)
|
||||
)
|
||||
|
||||
deduplicated = []
|
||||
for module in modules:
|
||||
if module not in deduplicated:
|
||||
deduplicated.append(module)
|
||||
|
||||
return deduplicated
|
||||
|
||||
def init(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -262,14 +281,15 @@ class Command:
|
||||
|
||||
def _ordered_modules(self) -> Optional[list[type[MVTModule]]]:
|
||||
"""Return enabled modules in stable topological order."""
|
||||
module_indexes = {module: index for index, module in enumerate(self.modules)}
|
||||
modules = self._available_modules()
|
||||
module_indexes = {module: index for index, module in enumerate(modules)}
|
||||
|
||||
if self.module_name:
|
||||
selected = [
|
||||
module for module in self.modules if module.__name__ == self.module_name
|
||||
module for module in modules if module.__name__ == self.module_name
|
||||
]
|
||||
else:
|
||||
selected = [module for module in self.modules if module.enabled]
|
||||
selected = [module for module in modules if module.enabled]
|
||||
|
||||
required = set(selected)
|
||||
pending = list(selected)
|
||||
|
||||
@@ -10,6 +10,10 @@ HELP_MSG_IOC = "Path to indicators file (can be invoked multiple time)"
|
||||
HELP_MSG_FAST = "Avoid running time/resource consuming features"
|
||||
HELP_MSG_LIST_MODULES = "Print list of available modules and exit"
|
||||
HELP_MSG_MODULE = "Name of a single module you would like to run instead of all"
|
||||
HELP_MSG_LOAD_MODULE = (
|
||||
"Load custom MVT module(s) from a Python file or folder "
|
||||
"(can be invoked multiple times)"
|
||||
)
|
||||
HELP_MSG_NONINTERACTIVE = "Don't ask interactive questions during processing"
|
||||
HELP_MSG_HASHES = "Generate hashes of all the files analyzed"
|
||||
HELP_MSG_VERBOSE = "Verbose mode"
|
||||
|
||||
@@ -44,6 +44,7 @@ class MVTModule:
|
||||
enabled: bool = True
|
||||
slug: Optional[str] = None
|
||||
dependencies: Sequence[type["MVTModule"]] = ()
|
||||
supported_commands: Sequence[tuple[str, str]] = ()
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
|
||||
@@ -0,0 +1,127 @@
|
||||
# Mobile Verification Toolkit (MVT)
|
||||
# Copyright (c) 2021-2026 The MVT Authors.
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import hashlib
|
||||
import importlib.util
|
||||
import inspect
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from types import ModuleType
|
||||
from typing import Iterable, Optional
|
||||
|
||||
from .module import MVTModule
|
||||
|
||||
MVT_CUSTOM_MODULES_ENV = "MVT_CUSTOM_MODULES"
|
||||
|
||||
|
||||
class CustomModuleLoadError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _module_name_for_path(path: Path) -> str:
|
||||
digest = hashlib.sha256(str(path).encode("utf-8")).hexdigest()[:16]
|
||||
return f"_mvt_custom_module_{path.stem}_{digest}"
|
||||
|
||||
|
||||
def _iter_module_files(path: Path) -> Iterable[Path]:
|
||||
if path.is_file():
|
||||
if path.suffix != ".py":
|
||||
raise CustomModuleLoadError(f"Custom module file is not a Python file: {path}")
|
||||
yield path
|
||||
return
|
||||
|
||||
if path.is_dir():
|
||||
for child in sorted(path.iterdir()):
|
||||
if child.name.startswith("."):
|
||||
continue
|
||||
if child.name == "__init__.py":
|
||||
continue
|
||||
if child.is_file() and child.suffix == ".py":
|
||||
yield child
|
||||
return
|
||||
|
||||
raise CustomModuleLoadError(f"Custom module path does not exist: {path}")
|
||||
|
||||
|
||||
def _load_python_file(path: Path) -> ModuleType:
|
||||
module_name = _module_name_for_path(path)
|
||||
spec = importlib.util.spec_from_file_location(module_name, path)
|
||||
if spec is None or spec.loader is None:
|
||||
raise CustomModuleLoadError(f"Unable to load custom module file: {path}")
|
||||
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
sys.modules[module_name] = module
|
||||
try:
|
||||
spec.loader.exec_module(module)
|
||||
except Exception as exc:
|
||||
raise CustomModuleLoadError(f"Unable to import custom module {path}: {exc}") from exc
|
||||
|
||||
return module
|
||||
|
||||
|
||||
def discover_mvt_modules(module: ModuleType) -> list[type[MVTModule]]:
|
||||
modules = []
|
||||
for _, obj in inspect.getmembers(module, inspect.isclass):
|
||||
if obj is MVTModule:
|
||||
continue
|
||||
if obj.__module__ != module.__name__:
|
||||
continue
|
||||
if not issubclass(obj, MVTModule):
|
||||
continue
|
||||
modules.append(obj)
|
||||
|
||||
return modules
|
||||
|
||||
|
||||
def load_custom_modules_from_path(path: str) -> list[type[MVTModule]]:
|
||||
custom_modules: list[type[MVTModule]] = []
|
||||
seen: set[tuple[str, str]] = set()
|
||||
resolved_path = Path(path).expanduser().resolve()
|
||||
|
||||
for module_file in _iter_module_files(resolved_path):
|
||||
loaded_module = _load_python_file(module_file)
|
||||
for module_class in discover_mvt_modules(loaded_module):
|
||||
key = (str(module_file), module_class.__qualname__)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
custom_modules.append(module_class)
|
||||
|
||||
return custom_modules
|
||||
|
||||
|
||||
def load_custom_modules(paths: Optional[Iterable[str]] = None) -> list[type[MVTModule]]:
|
||||
search_paths: list[str] = []
|
||||
env_path = os.environ.get(MVT_CUSTOM_MODULES_ENV)
|
||||
if env_path:
|
||||
search_paths.append(env_path)
|
||||
if paths:
|
||||
search_paths.extend(paths)
|
||||
|
||||
custom_modules: list[type[MVTModule]] = []
|
||||
seen: set[tuple[str, str]] = set()
|
||||
for path in search_paths:
|
||||
for module_class in load_custom_modules_from_path(path):
|
||||
source = Path(inspect.getfile(module_class)).resolve()
|
||||
key = (str(source), module_class.__qualname__)
|
||||
if key in seen:
|
||||
continue
|
||||
seen.add(key)
|
||||
custom_modules.append(module_class)
|
||||
|
||||
return custom_modules
|
||||
|
||||
|
||||
def module_supports_command(
|
||||
module_class: type[MVTModule],
|
||||
platform: str,
|
||||
command: str,
|
||||
) -> bool:
|
||||
supported_commands = getattr(module_class, "supported_commands", None)
|
||||
if not supported_commands:
|
||||
return True
|
||||
|
||||
return (platform, command) in {tuple(entry) for entry in supported_commands}
|
||||
+60
-3
@@ -31,6 +31,7 @@ from mvt.common.help import (
|
||||
HELP_MSG_OUTPUT,
|
||||
HELP_MSG_FAST,
|
||||
HELP_MSG_LIST_MODULES,
|
||||
HELP_MSG_LOAD_MODULE,
|
||||
HELP_MSG_MODULE,
|
||||
HELP_MSG_VERBOSE,
|
||||
HELP_MSG_CHECK_FS,
|
||||
@@ -40,6 +41,7 @@ from mvt.common.help import (
|
||||
HELP_MSG_DISABLE_UPDATE_CHECK,
|
||||
HELP_MSG_DISABLE_INDICATOR_UPDATE_CHECK,
|
||||
)
|
||||
from mvt.common.module_loader import CustomModuleLoadError, load_custom_modules
|
||||
from .cmd_check_backup import CmdIOSCheckBackup
|
||||
from .cmd_check_fs import CmdIOSCheckFS
|
||||
from .decrypt import DecryptBackup
|
||||
@@ -65,6 +67,13 @@ def _get_disable_flags(ctx):
|
||||
)
|
||||
|
||||
|
||||
def _load_custom_modules(load_module):
|
||||
try:
|
||||
return load_custom_modules(load_module)
|
||||
except CustomModuleLoadError as exc:
|
||||
raise click.ClickException(str(exc)) from exc
|
||||
|
||||
|
||||
# ==============================================================================
|
||||
# Main
|
||||
# ==============================================================================
|
||||
@@ -229,15 +238,32 @@ def extract_key(password, key_file, backup_path):
|
||||
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@click.argument("BACKUP_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_backup(
|
||||
ctx, iocs, output, fast, list_modules, module, hashes, verbose, backup_path
|
||||
ctx,
|
||||
iocs,
|
||||
output,
|
||||
fast,
|
||||
list_modules,
|
||||
module,
|
||||
load_module,
|
||||
hashes,
|
||||
verbose,
|
||||
backup_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
module_options = {"fast_mode": fast}
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
|
||||
cmd = CmdIOSCheckBackup(
|
||||
target_path=backup_path,
|
||||
@@ -248,6 +274,7 @@ def check_backup(
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -277,13 +304,32 @@ def check_backup(
|
||||
@click.option("--fast", "-f", is_flag=True, help=HELP_MSG_FAST)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.option("--hashes", "-H", is_flag=True, help=HELP_MSG_HASHES)
|
||||
@click.option("--verbose", "-v", is_flag=True, help=HELP_MSG_VERBOSE)
|
||||
@click.argument("DUMP_PATH", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dump_path):
|
||||
def check_fs(
|
||||
ctx,
|
||||
iocs,
|
||||
output,
|
||||
fast,
|
||||
list_modules,
|
||||
module,
|
||||
load_module,
|
||||
hashes,
|
||||
verbose,
|
||||
dump_path,
|
||||
):
|
||||
set_verbose_logging(verbose)
|
||||
module_options = {"fast_mode": fast}
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
|
||||
cmd = CmdIOSCheckFS(
|
||||
target_path=dump_path,
|
||||
@@ -294,6 +340,7 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
hashes=hashes,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
if list_modules:
|
||||
@@ -321,15 +368,25 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
|
||||
)
|
||||
@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES)
|
||||
@click.option("--module", "-m", help=HELP_MSG_MODULE)
|
||||
@click.option(
|
||||
"--load-module",
|
||||
type=click.Path(exists=True),
|
||||
multiple=True,
|
||||
default=[],
|
||||
help=HELP_MSG_LOAD_MODULE,
|
||||
)
|
||||
@click.argument("FOLDER", type=click.Path(exists=True))
|
||||
@click.pass_context
|
||||
def check_iocs(ctx, iocs, list_modules, module, folder):
|
||||
def check_iocs(ctx, iocs, list_modules, module, load_module, folder):
|
||||
custom_modules = _load_custom_modules(load_module)
|
||||
cmd = CmdCheckIOCS(
|
||||
target_path=folder,
|
||||
ioc_files=iocs,
|
||||
module_name=module,
|
||||
disable_version_check=_get_disable_flags(ctx)[0],
|
||||
disable_indicator_check=_get_disable_flags(ctx)[1],
|
||||
custom_modules=custom_modules,
|
||||
platform="ios",
|
||||
)
|
||||
cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.backup import BACKUP_MODULES
|
||||
from .modules.mixed import MIXED_MODULES
|
||||
@@ -29,6 +30,7 @@ class CmdIOSCheckBackup(Command):
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -43,8 +45,10 @@ class CmdIOSCheckBackup(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "ios"
|
||||
self.name = "check-backup"
|
||||
self.modules = BACKUP_MODULES + MIXED_MODULES
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ from typing import Optional
|
||||
|
||||
from mvt.common.command import Command
|
||||
from mvt.common.indicators import Indicators
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
from .modules.fs import FS_MODULES
|
||||
from .modules.mixed import MIXED_MODULES
|
||||
@@ -29,6 +30,7 @@ class CmdIOSCheckFS(Command):
|
||||
sub_command: bool = False,
|
||||
disable_version_check: bool = False,
|
||||
disable_indicator_check: bool = False,
|
||||
custom_modules: Optional[list[type[MVTModule]]] = None,
|
||||
) -> None:
|
||||
super().__init__(
|
||||
target_path=target_path,
|
||||
@@ -42,8 +44,10 @@ class CmdIOSCheckFS(Command):
|
||||
log=log,
|
||||
disable_version_check=disable_version_check,
|
||||
disable_indicator_check=disable_indicator_check,
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
self.platform = "ios"
|
||||
self.name = "check-fs"
|
||||
self.modules = FS_MODULES + MIXED_MODULES
|
||||
|
||||
|
||||
@@ -42,6 +42,19 @@ class IndependentModule(RecordingModule):
|
||||
pass
|
||||
|
||||
|
||||
class CustomIOSBackupModule(RecordingModule):
|
||||
supported_commands = (("ios", "check-backup"),)
|
||||
|
||||
|
||||
class CustomIOSFSModule(RecordingModule):
|
||||
supported_commands = (("ios", "check-fs"),)
|
||||
|
||||
|
||||
class CustomDependsOnBuiltin(RecordingModule):
|
||||
supported_commands = (("ios", "check-backup"),)
|
||||
dependencies = (FirstModule,)
|
||||
|
||||
|
||||
class RecordingCommand(Command):
|
||||
def init(self):
|
||||
self.initialized = True
|
||||
@@ -132,3 +145,46 @@ class TestCommand:
|
||||
assert RecordingModule.run_order == []
|
||||
assert not hasattr(cmd, "initialized")
|
||||
assert "depends on unavailable module UnavailableModule" in caplog.text
|
||||
|
||||
def test_custom_modules_are_filtered_before_ordering(self):
|
||||
cmd = RecordingCommand()
|
||||
cmd.platform = "ios"
|
||||
cmd.name = "check-backup"
|
||||
cmd.modules = [FirstModule]
|
||||
cmd.custom_modules = [CustomIOSBackupModule, CustomIOSFSModule]
|
||||
|
||||
assert [module.__name__ for module in cmd._ordered_modules()] == [
|
||||
"FirstModule",
|
||||
"CustomIOSBackupModule",
|
||||
]
|
||||
|
||||
def test_selected_custom_module_runs(self):
|
||||
cmd = RecordingCommand(module_name="CustomIOSBackupModule")
|
||||
cmd.platform = "ios"
|
||||
cmd.name = "check-backup"
|
||||
cmd.custom_modules = [CustomIOSBackupModule]
|
||||
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == ["CustomIOSBackupModule"]
|
||||
|
||||
def test_selected_unsupported_custom_module_does_not_run(self):
|
||||
cmd = RecordingCommand(module_name="CustomIOSFSModule")
|
||||
cmd.platform = "ios"
|
||||
cmd.name = "check-backup"
|
||||
cmd.custom_modules = [CustomIOSFSModule]
|
||||
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == []
|
||||
|
||||
def test_custom_module_dependencies_use_topological_order(self):
|
||||
cmd = RecordingCommand(module_name="CustomDependsOnBuiltin")
|
||||
cmd.platform = "ios"
|
||||
cmd.name = "check-backup"
|
||||
cmd.modules = [SecondModule, FirstModule]
|
||||
cmd.custom_modules = [CustomDependsOnBuiltin]
|
||||
|
||||
cmd.run()
|
||||
|
||||
assert RecordingModule.run_order == ["FirstModule", "CustomDependsOnBuiltin"]
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
import pytest
|
||||
|
||||
from mvt.common.module import MVTModule
|
||||
from mvt.common.module_loader import (
|
||||
CustomModuleLoadError,
|
||||
load_custom_modules,
|
||||
load_custom_modules_from_path,
|
||||
module_supports_command,
|
||||
)
|
||||
|
||||
|
||||
MODULE_TEMPLATE = """
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class {name}(MVTModule):
|
||||
supported_commands = {supported_commands!r}
|
||||
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
def check_indicators(self):
|
||||
pass
|
||||
|
||||
def serialize(self, result):
|
||||
return None
|
||||
"""
|
||||
|
||||
|
||||
def _write_module(path, name, supported_commands=()):
|
||||
path.write_text(
|
||||
MODULE_TEMPLATE.format(
|
||||
name=name,
|
||||
supported_commands=supported_commands,
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
return path
|
||||
|
||||
|
||||
def test_load_custom_modules_from_python_file(tmp_path):
|
||||
module_path = _write_module(tmp_path / "custom.py", "FileModule")
|
||||
|
||||
modules = load_custom_modules_from_path(str(module_path))
|
||||
|
||||
assert [module.__name__ for module in modules] == ["FileModule"]
|
||||
assert issubclass(modules[0], MVTModule)
|
||||
|
||||
|
||||
def test_load_custom_modules_from_folder_in_sorted_order(tmp_path):
|
||||
_write_module(tmp_path / "b_module.py", "BModule")
|
||||
_write_module(tmp_path / "a_module.py", "AModule")
|
||||
_write_module(tmp_path / ".hidden.py", "HiddenModule")
|
||||
_write_module(tmp_path / "__init__.py", "InitModule")
|
||||
nested = tmp_path / "nested"
|
||||
nested.mkdir()
|
||||
_write_module(nested / "nested_module.py", "NestedModule")
|
||||
|
||||
modules = load_custom_modules_from_path(str(tmp_path))
|
||||
|
||||
assert [module.__name__ for module in modules] == ["AModule", "BModule"]
|
||||
|
||||
|
||||
def test_discovery_ignores_imported_base_and_unrelated_classes(tmp_path):
|
||||
module_path = tmp_path / "custom.py"
|
||||
module_path.write_text(
|
||||
"""
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class Unrelated:
|
||||
pass
|
||||
|
||||
|
||||
class DiscoveredModule(MVTModule):
|
||||
def run(self):
|
||||
pass
|
||||
|
||||
def check_indicators(self):
|
||||
pass
|
||||
|
||||
def serialize(self, result):
|
||||
return None
|
||||
""",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
modules = load_custom_modules_from_path(str(module_path))
|
||||
|
||||
assert [module.__name__ for module in modules] == ["DiscoveredModule"]
|
||||
|
||||
|
||||
def test_load_custom_modules_deduplicates_same_class(tmp_path):
|
||||
module_path = _write_module(tmp_path / "custom.py", "DuplicateModule")
|
||||
|
||||
modules = load_custom_modules([str(module_path), str(module_path)])
|
||||
|
||||
assert [module.__name__ for module in modules] == ["DuplicateModule"]
|
||||
|
||||
|
||||
def test_load_custom_modules_raises_for_missing_path(tmp_path):
|
||||
with pytest.raises(CustomModuleLoadError, match="does not exist"):
|
||||
load_custom_modules_from_path(str(tmp_path / "missing.py"))
|
||||
|
||||
|
||||
def test_load_custom_modules_raises_for_import_error(tmp_path):
|
||||
module_path = tmp_path / "broken.py"
|
||||
module_path.write_text("raise RuntimeError('broken import')", encoding="utf-8")
|
||||
|
||||
with pytest.raises(CustomModuleLoadError, match="broken import"):
|
||||
load_custom_modules_from_path(str(module_path))
|
||||
|
||||
|
||||
def test_load_custom_modules_loads_env_folder_first(tmp_path, monkeypatch):
|
||||
env_folder = tmp_path / "env"
|
||||
env_folder.mkdir()
|
||||
cli_folder = tmp_path / "cli"
|
||||
cli_folder.mkdir()
|
||||
_write_module(env_folder / "env_module.py", "EnvModule")
|
||||
_write_module(cli_folder / "cli_module.py", "CliModule")
|
||||
monkeypatch.setenv("MVT_CUSTOM_MODULES", str(env_folder))
|
||||
|
||||
modules = load_custom_modules([str(cli_folder)])
|
||||
|
||||
assert [module.__name__ for module in modules] == ["EnvModule", "CliModule"]
|
||||
|
||||
|
||||
def test_module_supports_command_defaults_to_all_commands(tmp_path):
|
||||
module_path = _write_module(tmp_path / "custom.py", "DefaultModule")
|
||||
module = load_custom_modules_from_path(str(module_path))[0]
|
||||
|
||||
assert module_supports_command(module, "ios", "check-backup")
|
||||
assert module_supports_command(module, "android", "check-bugreport")
|
||||
|
||||
|
||||
def test_module_supports_command_honors_supported_commands(tmp_path):
|
||||
module_path = _write_module(
|
||||
tmp_path / "custom.py",
|
||||
"SpecificModule",
|
||||
(("ios", "check-backup"),),
|
||||
)
|
||||
module = load_custom_modules_from_path(str(module_path))[0]
|
||||
|
||||
assert module_supports_command(module, "ios", "check-backup")
|
||||
assert not module_supports_command(module, "ios", "check-fs")
|
||||
@@ -0,0 +1,193 @@
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mvt.android.cli import check_bugreport
|
||||
from mvt.android.cmd_check_androidqf import CmdAndroidCheckAndroidQF
|
||||
from mvt.android.cmd_check_backup import CmdAndroidCheckBackup
|
||||
from mvt.android.cmd_check_bugreport import CmdAndroidCheckBugreport
|
||||
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
|
||||
from mvt.common.module import MVTModule
|
||||
from mvt.ios.cli import check_backup, check_fs
|
||||
|
||||
|
||||
CUSTOM_MODULE = """
|
||||
from mvt.common.module import MVTModule
|
||||
|
||||
|
||||
class {name}(MVTModule):
|
||||
supported_commands = {supported_commands!r}
|
||||
slug = "{slug}"
|
||||
|
||||
def run(self):
|
||||
self.results = [{{"message": "custom module ran"}}]
|
||||
|
||||
def check_indicators(self):
|
||||
pass
|
||||
|
||||
def serialize(self, result):
|
||||
return None
|
||||
"""
|
||||
|
||||
|
||||
def _write_custom_module(path, name, supported_commands, slug=None):
|
||||
path.write_text(
|
||||
CUSTOM_MODULE.format(
|
||||
name=name,
|
||||
supported_commands=supported_commands,
|
||||
slug=slug or name.lower(),
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
return path
|
||||
|
||||
|
||||
def test_load_module_appears_only_for_supported_cli_command(tmp_path):
|
||||
module_path = _write_custom_module(
|
||||
tmp_path / "custom.py",
|
||||
"IOSBackupOnlyModule",
|
||||
(("ios", "check-backup"),),
|
||||
)
|
||||
|
||||
backup_result = CliRunner().invoke(
|
||||
check_backup,
|
||||
["--list-modules", "--load-module", str(module_path), str(tmp_path)],
|
||||
)
|
||||
fs_result = CliRunner().invoke(
|
||||
check_fs,
|
||||
["--list-modules", "--load-module", str(module_path), str(tmp_path)],
|
||||
)
|
||||
|
||||
assert backup_result.exit_code == 0
|
||||
assert "IOSBackupOnlyModule" in backup_result.output
|
||||
assert fs_result.exit_code == 0
|
||||
assert "IOSBackupOnlyModule" not in fs_result.output
|
||||
|
||||
|
||||
def test_module_option_runs_supported_custom_module(tmp_path):
|
||||
module_path = _write_custom_module(
|
||||
tmp_path / "custom.py",
|
||||
"CustomRunModule",
|
||||
(("ios", "check-backup"),),
|
||||
slug="custom_run_module",
|
||||
)
|
||||
output_path = tmp_path / "out"
|
||||
|
||||
result = CliRunner().invoke(
|
||||
check_backup,
|
||||
[
|
||||
"--module",
|
||||
"CustomRunModule",
|
||||
"--load-module",
|
||||
str(module_path),
|
||||
"--output",
|
||||
str(output_path),
|
||||
str(tmp_path),
|
||||
],
|
||||
)
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert (output_path / "custom_run_module.json").exists()
|
||||
|
||||
|
||||
def test_custom_modules_load_from_environment_without_cli_flag(tmp_path, monkeypatch):
|
||||
custom_modules_path = tmp_path / "custom_modules"
|
||||
custom_modules_path.mkdir()
|
||||
_write_custom_module(
|
||||
custom_modules_path / "env_module.py",
|
||||
"EnvBugreportModule",
|
||||
(("android", "check-bugreport"),),
|
||||
)
|
||||
monkeypatch.setenv("MVT_CUSTOM_MODULES", str(custom_modules_path))
|
||||
|
||||
result = CliRunner().invoke(check_bugreport, ["--list-modules", str(tmp_path)])
|
||||
|
||||
assert result.exit_code == 0
|
||||
assert "EnvBugreportModule" in result.output
|
||||
|
||||
|
||||
class NestedBugreportModule(MVTModule):
|
||||
supported_commands = (("android", "check-bugreport"),)
|
||||
|
||||
|
||||
class NestedBackupModule(MVTModule):
|
||||
supported_commands = (("android", "check-backup"),)
|
||||
|
||||
|
||||
class NestedIntrusionLogsModule(MVTModule):
|
||||
supported_commands = (("android", "check-intrusion-logs"),)
|
||||
|
||||
|
||||
class NestedAndroidQFModule(MVTModule):
|
||||
supported_commands = (("android", "check-androidqf"),)
|
||||
|
||||
|
||||
class DummyZip:
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_androidqf_propagates_custom_modules_to_nested_commands(tmp_path, monkeypatch):
|
||||
records = {}
|
||||
custom_modules = [
|
||||
NestedBugreportModule,
|
||||
NestedBackupModule,
|
||||
NestedIntrusionLogsModule,
|
||||
NestedAndroidQFModule,
|
||||
]
|
||||
cmd = CmdAndroidCheckAndroidQF(
|
||||
target_path=str(tmp_path),
|
||||
custom_modules=custom_modules,
|
||||
)
|
||||
|
||||
def record_available(name):
|
||||
def _record(command):
|
||||
records[name] = [
|
||||
module.__name__
|
||||
for module in command._available_modules()
|
||||
if module.__name__.startswith("Nested")
|
||||
]
|
||||
|
||||
return _record
|
||||
|
||||
monkeypatch.setattr(cmd, "load_bugreport", lambda: DummyZip())
|
||||
monkeypatch.setattr(
|
||||
CmdAndroidCheckBugreport,
|
||||
"from_zip",
|
||||
lambda self, bugreport: None,
|
||||
)
|
||||
monkeypatch.setattr(
|
||||
CmdAndroidCheckBugreport,
|
||||
"run",
|
||||
record_available("bugreport"),
|
||||
)
|
||||
|
||||
monkeypatch.setattr(cmd, "load_backup", lambda: b"")
|
||||
monkeypatch.setattr(CmdAndroidCheckBackup, "from_ab", lambda self, backup: None)
|
||||
monkeypatch.setattr(
|
||||
CmdAndroidCheckBackup,
|
||||
"run",
|
||||
record_available("backup"),
|
||||
)
|
||||
|
||||
intrusion_logs_path = tmp_path / "intrusion_logs"
|
||||
intrusion_logs_path.mkdir()
|
||||
setattr(cmd, "_CmdAndroidCheckAndroidQF__format", "dir")
|
||||
setattr(
|
||||
cmd,
|
||||
"_CmdAndroidCheckAndroidQF__files",
|
||||
["androidqf/intrusion_logs/security.txt"],
|
||||
)
|
||||
monkeypatch.setattr(cmd, "_read_device_timezone", lambda: None)
|
||||
monkeypatch.setattr(
|
||||
CmdAndroidCheckIntrusionLogs,
|
||||
"run",
|
||||
record_available("intrusion_logs"),
|
||||
)
|
||||
|
||||
assert cmd.run_bugreport_cmd()
|
||||
assert cmd.run_backup_cmd()
|
||||
assert cmd.run_intrusion_logs_cmd()
|
||||
assert records == {
|
||||
"bugreport": ["NestedBugreportModule"],
|
||||
"backup": ["NestedBackupModule"],
|
||||
"intrusion_logs": ["NestedIntrusionLogsModule"],
|
||||
}
|
||||
Reference in New Issue
Block a user