From 154e6dab153271dbd1cfd7a5e247ef475a3090d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Tue, 24 Dec 2024 23:30:18 +0000 Subject: [PATCH 1/6] Add config file parser for MVT --- src/mvt/common/config.py | 91 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 src/mvt/common/config.py diff --git a/src/mvt/common/config.py b/src/mvt/common/config.py new file mode 100644 index 0000000..4341d87 --- /dev/null +++ b/src/mvt/common/config.py @@ -0,0 +1,91 @@ +import os +import uuid +import yaml +import json + +from typing import Tuple, Type +from appdirs import user_config_dir +from pydantic import AnyHttpUrl, Field +from pydantic_settings import ( + BaseSettings, + InitSettingsSource, + PydanticBaseSettingsSource, + SettingsConfigDict, + YamlConfigSettingsSource, +) + +MVT_CONFIG_FOLDER = user_config_dir("mvt") +MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml") + + +class MVTSettings(BaseSettings): + model_config = SettingsConfigDict( + env_prefix="MVT_", env_nested_delimiter="_", extra="ignore" + ) + # Allow to decided if want to load environment variables + load_env: bool = Field(True, exclude=True) + + # General settings + PYPI_UPDATE_URL: AnyHttpUrl = Field( + "https://pypi.org/pypi/mvt/json", + validate_default=False, + ) + INDICATORS_UPDATE_URL: AnyHttpUrl = Field( + default="https://raw.githubusercontent.com/mvt-project/mvt-indicators/main/indicators.yaml", + validate_default=False, + ) + NETWORK_ACCESS_ALLOWED: bool = True + NETWORK_TIMEOUT: int = 15 + + @classmethod + def settings_customise_sources( + cls, + settings_cls: Type[BaseSettings], + init_settings: InitSettingsSource, + env_settings: PydanticBaseSettingsSource, + dotenv_settings: PydanticBaseSettingsSource, + file_secret_settings: PydanticBaseSettingsSource, + ) -> Tuple[PydanticBaseSettingsSource, ...]: + sources = ( + YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH), + init_settings, + ) + # Load env variables if enabled + if init_settings.init_kwargs.get("load_env", True): + sources = (env_settings,) + sources + return sources + + def save_settings( + self, + ) -> None: + """ + Save the current settings to a file. + """ + if not os.path.isdir(MVT_CONFIG_FOLDER): + os.makedirs(MVT_CONFIG_FOLDER) + + # Dump the settings to the YAML file + model_serializable = json.loads(self.model_dump_json(exclude_defaults=True)) + with open(MVT_CONFIG_PATH, "w") as config_file: + config_file.write(yaml.dump(model_serializable, default_flow_style=False)) + + @classmethod + def initialise(cls) -> "MVTSettings": + """ + Initialise the settings file. + + We first initialise the settings (without env variable) and then persist + them to file. This way we can update the config file with the default values. + + Afterwards we load the settings again, this time including the env variables. + """ + # Set invalid env prefix to avoid loading env variables. + settings = MVTSettings(load_env=False) + settings.save_settings() + + # Load the settings again with any ENV variables. + settings = MVTSettings(load_env=True) + return settings + + +settings = MVTSettings.initialise() From 28c0c86c4efea090e642f36ed2a7d44be77adfa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 25 Dec 2024 00:09:29 +0000 Subject: [PATCH 2/6] Update MVT code to use config file rather than raw env variables --- src/mvt/common/config.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/mvt/common/config.py b/src/mvt/common/config.py index 4341d87..8a116e2 100644 --- a/src/mvt/common/config.py +++ b/src/mvt/common/config.py @@ -1,5 +1,4 @@ import os -import uuid import yaml import json From f4425865c0e51de5872afd08cf1ec737a80b57a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 25 Dec 2024 00:14:14 +0000 Subject: [PATCH 3/6] Add missed modules using updated settings module --- src/mvt/android/modules/backup/helpers.py | 17 +++++++-------- src/mvt/common/command.py | 5 +++-- src/mvt/common/config.py | 25 ++++++++++++++++++----- src/mvt/common/indicators.py | 7 ++++--- src/mvt/common/updates.py | 3 ++- src/mvt/common/utils.py | 2 +- tests/common/test_indicators.py | 4 ++++ tests/test_check_android_androidqf.py | 4 ++++ tests/test_check_android_backup.py | 4 ++++ 9 files changed, 51 insertions(+), 20 deletions(-) diff --git a/src/mvt/android/modules/backup/helpers.py b/src/mvt/android/modules/backup/helpers.py index 5ddb80d..3e48078 100644 --- a/src/mvt/android/modules/backup/helpers.py +++ b/src/mvt/android/modules/backup/helpers.py @@ -3,10 +3,11 @@ # Use of this software is governed by the MVT License 1.1 that can be found at # https://license.mvt.re/1.1/ -import os from rich.prompt import Prompt +from mvt.common.config import settings + MVT_ANDROID_BACKUP_PASSWORD = "MVT_ANDROID_BACKUP_PASSWORD" @@ -16,24 +17,24 @@ def cli_load_android_backup_password(log, backup_password): Used in MVT CLI command parsers. """ - password_from_env = os.environ.get(MVT_ANDROID_BACKUP_PASSWORD, None) + password_from_env_or_config = settings.ANDROID_BACKUP_PASSWORD if backup_password: log.info( "Your password may be visible in the process table because it " "was supplied on the command line!" ) - if password_from_env: + if password_from_env_or_config: log.info( "Ignoring %s environment variable, using --backup-password argument instead", - MVT_ANDROID_BACKUP_PASSWORD, + "MVT_ANDROID_BACKUP_PASSWORD", ) return backup_password - elif password_from_env: + elif password_from_env_or_config: log.info( - "Using backup password from %s environment variable", - MVT_ANDROID_BACKUP_PASSWORD, + "Using backup password from %s environment variable or config file", + "MVT_ANDROID_BACKUP_PASSWORD", ) - return password_from_env + return password_from_env_or_config def prompt_or_load_android_backup_password(log, module_options): diff --git a/src/mvt/common/command.py b/src/mvt/common/command.py index b44a56b..527c21a 100644 --- a/src/mvt/common/command.py +++ b/src/mvt/common/command.py @@ -17,6 +17,7 @@ from mvt.common.utils import ( generate_hashes_from_path, get_sha256_from_file_path, ) +from mvt.common.config import settings from mvt.common.version import MVT_VERSION @@ -132,7 +133,7 @@ class Command: if ioc_file_path and ioc_file_path not in info["ioc_files"]: info["ioc_files"].append(ioc_file_path) - if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes): + if self.target_path and (settings.HASH_FILES or self.hashes): self.generate_hashes() info["hashes"] = self.hash_values @@ -141,7 +142,7 @@ class Command: with open(info_path, "w+", encoding="utf-8") as handle: json.dump(info, handle, indent=4) - if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes): + if self.target_path and (settings.HASH_FILES or self.hashes): info_hash = get_sha256_from_file_path(info_path) self.log.info('Reference hash of the info.json file: "%s"', info_hash) diff --git a/src/mvt/common/config.py b/src/mvt/common/config.py index 8a116e2..b2fcf6d 100644 --- a/src/mvt/common/config.py +++ b/src/mvt/common/config.py @@ -19,7 +19,10 @@ MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml") class MVTSettings(BaseSettings): model_config = SettingsConfigDict( - env_prefix="MVT_", env_nested_delimiter="_", extra="ignore" + env_prefix="MVT_", + env_nested_delimiter="_", + extra="ignore", + nested_model_default_partial_updates=True, ) # Allow to decided if want to load environment variables load_env: bool = Field(True, exclude=True) @@ -29,13 +32,25 @@ class MVTSettings(BaseSettings): "https://pypi.org/pypi/mvt/json", validate_default=False, ) - INDICATORS_UPDATE_URL: AnyHttpUrl = Field( - default="https://raw.githubusercontent.com/mvt-project/mvt-indicators/main/indicators.yaml", - validate_default=False, - ) NETWORK_ACCESS_ALLOWED: bool = True NETWORK_TIMEOUT: int = 15 + # Command default settings, all can be specified by MVT_ prefixed environment variables too. + IOS_BACKUP_PASSWORD: str | None = Field( + None, description="Default password to use to decrypt iOS backups" + ) + ANDROID_BACKUP_PASSWORD: str | None = Field( + None, description="Default password to use to decrypt Android backups" + ) + STIX2: str | None = Field( + None, description="List of directories where STIX2 files are stored" + ) + VT_API_KEY: str | None = Field( + None, description="API key to use for VirusTotal lookups" + ) + PROFILE: bool = Field(False, description="Profile the execution of MVT modules") + HASH_FILES: bool = Field(False, description="Should MVT hash output files") + @classmethod def settings_customise_sources( cls, diff --git a/src/mvt/common/indicators.py b/src/mvt/common/indicators.py index a73938d..6bcce2e 100644 --- a/src/mvt/common/indicators.py +++ b/src/mvt/common/indicators.py @@ -14,6 +14,7 @@ import ahocorasick from appdirs import user_data_dir from .url import URL +from .config import settings MVT_DATA_FOLDER = user_data_dir("mvt") MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") @@ -41,12 +42,12 @@ class Indicators: def _check_stix2_env_variable(self) -> None: """ - Checks if a variable MVT_STIX2 contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2 + Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2 """ - if "MVT_STIX2" not in os.environ: + if not settings.STIX2: return - paths = os.environ["MVT_STIX2"].split(":") + paths = settings.STIX2.split(":") for path in paths: if os.path.isfile(path) and path.lower().endswith(".stix2"): self.parse_stix2(path) diff --git a/src/mvt/common/updates.py b/src/mvt/common/updates.py index c82a3f8..e782b91 100644 --- a/src/mvt/common/updates.py +++ b/src/mvt/common/updates.py @@ -14,6 +14,7 @@ from packaging import version from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER from .version import MVT_VERSION +from .config import settings log = logging.getLogger(__name__) @@ -23,7 +24,7 @@ INDICATORS_CHECK_FREQUENCY = 12 class MVTUpdates: def check(self) -> str: - res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15) + res = requests.get(settings.PYPI_UPDATE_URL, timeout=15) data = res.json() latest_version = data.get("info", {}).get("version", "") diff --git a/src/mvt/common/utils.py b/src/mvt/common/utils.py index 0a64521..b7f5f83 100644 --- a/src/mvt/common/utils.py +++ b/src/mvt/common/utils.py @@ -256,7 +256,7 @@ def set_verbose_logging(verbose: bool = False): def exec_or_profile(module, globals, locals): """Hook for profiling MVT modules""" - if int(os.environ.get("MVT_PROFILE", False)): + if settings.PROFILE: cProfile.runctx(module, globals, locals) else: exec(module, globals, locals) diff --git a/tests/common/test_indicators.py b/tests/common/test_indicators.py index 9a687c0..efc24f7 100644 --- a/tests/common/test_indicators.py +++ b/tests/common/test_indicators.py @@ -6,6 +6,8 @@ import logging import os + +from mvt.common.config import settings from mvt.common.indicators import Indicators from ..utils import get_artifact_folder @@ -100,6 +102,8 @@ class TestIndicators: def test_env_stix(self, indicator_file): os.environ["MVT_STIX2"] = indicator_file + settings.__init__() # Reset settings + ind = Indicators(log=logging) ind.load_indicators_files([], load_default=False) assert ind.total_ioc_count == 9 diff --git a/tests/test_check_android_androidqf.py b/tests/test_check_android_androidqf.py index 29b4b99..167b5b7 100644 --- a/tests/test_check_android_androidqf.py +++ b/tests/test_check_android_androidqf.py @@ -8,6 +8,7 @@ import os from click.testing import CliRunner from mvt.android.cli import check_androidqf +from mvt.common.config import settings from .utils import get_artifact_folder @@ -56,6 +57,8 @@ class TestCheckAndroidqfCommand: ) os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD + settings.__init__() # Reset settings + runner = CliRunner() path = os.path.join(get_artifact_folder(), "androidqf_encrypted") result = runner.invoke(check_androidqf, [path]) @@ -63,3 +66,4 @@ class TestCheckAndroidqfCommand: assert prompt_mock.call_count == 0 assert result.exit_code == 0 del os.environ["MVT_ANDROID_BACKUP_PASSWORD"] + settings.__init__() # Reset settings diff --git a/tests/test_check_android_backup.py b/tests/test_check_android_backup.py index 7ccd8ec..71c0586 100644 --- a/tests/test_check_android_backup.py +++ b/tests/test_check_android_backup.py @@ -9,6 +9,7 @@ import os from click.testing import CliRunner from mvt.android.cli import check_backup +from mvt.common.config import settings from .utils import get_artifact_folder @@ -63,6 +64,8 @@ class TestCheckAndroidBackupCommand: ) os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD + settings.__init__() # Reset settings + runner = CliRunner() path = os.path.join(get_artifact_folder(), "androidqf_encrypted/backup.ab") result = runner.invoke(check_backup, [path]) @@ -70,3 +73,4 @@ class TestCheckAndroidBackupCommand: assert prompt_mock.call_count == 0 assert result.exit_code == 0 del os.environ["MVT_ANDROID_BACKUP_PASSWORD"] + settings.__init__() # Reset settings From 0f1eec39719e796b37be8971cff4cf45a2405846 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 25 Dec 2024 00:21:42 +0000 Subject: [PATCH 4/6] Add Pydantic dependencies --- pyproject.toml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index b2faba7..63c3c7c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,6 +32,8 @@ dependencies = [ "cryptography >=42.0.5", "pyyaml >=6.0", "pyahocorasick >= 2.0.0", + "pydantic >= 2.10.0", + "pydantic-settings >= 2.7.0", ] requires-python = ">= 3.8" From 52e854b8b7302ddc65c759b68344ea41ba39e45a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 25 Dec 2024 00:23:36 +0000 Subject: [PATCH 5/6] Add missing import --- src/mvt/common/utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/mvt/common/utils.py b/src/mvt/common/utils.py index b7f5f83..3d054f5 100644 --- a/src/mvt/common/utils.py +++ b/src/mvt/common/utils.py @@ -13,6 +13,7 @@ import re from typing import Any, Iterator, Union from rich.logging import RichHandler +from mvt.common.config import settings class CustomJSONEncoder(json.JSONEncoder): From 458195a0abe9fcab78565076083c5dbef92d281f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 25 Dec 2024 00:28:02 +0000 Subject: [PATCH 6/6] Fix optional typing syntax for Python 3.8 --- src/mvt/common/config.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/mvt/common/config.py b/src/mvt/common/config.py index b2fcf6d..d2e4e20 100644 --- a/src/mvt/common/config.py +++ b/src/mvt/common/config.py @@ -2,7 +2,7 @@ import os import yaml import json -from typing import Tuple, Type +from typing import Tuple, Type, Optional from appdirs import user_config_dir from pydantic import AnyHttpUrl, Field from pydantic_settings import ( @@ -36,16 +36,16 @@ class MVTSettings(BaseSettings): NETWORK_TIMEOUT: int = 15 # Command default settings, all can be specified by MVT_ prefixed environment variables too. - IOS_BACKUP_PASSWORD: str | None = Field( + IOS_BACKUP_PASSWORD: Optional[str] = Field( None, description="Default password to use to decrypt iOS backups" ) - ANDROID_BACKUP_PASSWORD: str | None = Field( + ANDROID_BACKUP_PASSWORD: Optional[str] = Field( None, description="Default password to use to decrypt Android backups" ) - STIX2: str | None = Field( + STIX2: Optional[str] = Field( None, description="List of directories where STIX2 files are stored" ) - VT_API_KEY: str | None = Field( + VT_API_KEY: Optional[str] = Field( None, description="API key to use for VirusTotal lookups" ) PROFILE: bool = Field(False, description="Profile the execution of MVT modules")