mirror of
https://github.com/mvt-project/mvt.git
synced 2026-02-12 16:42:45 +00:00
Merge pull request #592 from mvt-project/feature/config-file
Reworking handling of config options
This commit is contained in:
@@ -32,6 +32,8 @@ dependencies = [
|
||||
"cryptography >=42.0.5",
|
||||
"pyyaml >=6.0",
|
||||
"pyahocorasick >= 2.0.0",
|
||||
"pydantic >= 2.10.0",
|
||||
"pydantic-settings >= 2.7.0",
|
||||
]
|
||||
requires-python = ">= 3.8"
|
||||
|
||||
|
||||
@@ -3,10 +3,11 @@
|
||||
# Use of this software is governed by the MVT License 1.1 that can be found at
|
||||
# https://license.mvt.re/1.1/
|
||||
|
||||
import os
|
||||
|
||||
from rich.prompt import Prompt
|
||||
|
||||
from mvt.common.config import settings
|
||||
|
||||
MVT_ANDROID_BACKUP_PASSWORD = "MVT_ANDROID_BACKUP_PASSWORD"
|
||||
|
||||
|
||||
@@ -16,24 +17,24 @@ def cli_load_android_backup_password(log, backup_password):
|
||||
|
||||
Used in MVT CLI command parsers.
|
||||
"""
|
||||
password_from_env = os.environ.get(MVT_ANDROID_BACKUP_PASSWORD, None)
|
||||
password_from_env_or_config = settings.ANDROID_BACKUP_PASSWORD
|
||||
if backup_password:
|
||||
log.info(
|
||||
"Your password may be visible in the process table because it "
|
||||
"was supplied on the command line!"
|
||||
)
|
||||
if password_from_env:
|
||||
if password_from_env_or_config:
|
||||
log.info(
|
||||
"Ignoring %s environment variable, using --backup-password argument instead",
|
||||
MVT_ANDROID_BACKUP_PASSWORD,
|
||||
"MVT_ANDROID_BACKUP_PASSWORD",
|
||||
)
|
||||
return backup_password
|
||||
elif password_from_env:
|
||||
elif password_from_env_or_config:
|
||||
log.info(
|
||||
"Using backup password from %s environment variable",
|
||||
MVT_ANDROID_BACKUP_PASSWORD,
|
||||
"Using backup password from %s environment variable or config file",
|
||||
"MVT_ANDROID_BACKUP_PASSWORD",
|
||||
)
|
||||
return password_from_env
|
||||
return password_from_env_or_config
|
||||
|
||||
|
||||
def prompt_or_load_android_backup_password(log, module_options):
|
||||
|
||||
@@ -17,6 +17,7 @@ from mvt.common.utils import (
|
||||
generate_hashes_from_path,
|
||||
get_sha256_from_file_path,
|
||||
)
|
||||
from mvt.common.config import settings
|
||||
from mvt.common.version import MVT_VERSION
|
||||
|
||||
|
||||
@@ -132,7 +133,7 @@ class Command:
|
||||
if ioc_file_path and ioc_file_path not in info["ioc_files"]:
|
||||
info["ioc_files"].append(ioc_file_path)
|
||||
|
||||
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
|
||||
if self.target_path and (settings.HASH_FILES or self.hashes):
|
||||
self.generate_hashes()
|
||||
|
||||
info["hashes"] = self.hash_values
|
||||
@@ -141,7 +142,7 @@ class Command:
|
||||
with open(info_path, "w+", encoding="utf-8") as handle:
|
||||
json.dump(info, handle, indent=4)
|
||||
|
||||
if self.target_path and (os.environ.get("MVT_HASH_FILES") or self.hashes):
|
||||
if self.target_path and (settings.HASH_FILES or self.hashes):
|
||||
info_hash = get_sha256_from_file_path(info_path)
|
||||
self.log.info('Reference hash of the info.json file: "%s"', info_hash)
|
||||
|
||||
|
||||
105
src/mvt/common/config.py
Normal file
105
src/mvt/common/config.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import os
|
||||
import yaml
|
||||
import json
|
||||
|
||||
from typing import Tuple, Type, Optional
|
||||
from appdirs import user_config_dir
|
||||
from pydantic import AnyHttpUrl, Field
|
||||
from pydantic_settings import (
|
||||
BaseSettings,
|
||||
InitSettingsSource,
|
||||
PydanticBaseSettingsSource,
|
||||
SettingsConfigDict,
|
||||
YamlConfigSettingsSource,
|
||||
)
|
||||
|
||||
MVT_CONFIG_FOLDER = user_config_dir("mvt")
|
||||
MVT_CONFIG_PATH = os.path.join(MVT_CONFIG_FOLDER, "config.yaml")
|
||||
|
||||
|
||||
class MVTSettings(BaseSettings):
|
||||
model_config = SettingsConfigDict(
|
||||
env_prefix="MVT_",
|
||||
env_nested_delimiter="_",
|
||||
extra="ignore",
|
||||
nested_model_default_partial_updates=True,
|
||||
)
|
||||
# Allow to decided if want to load environment variables
|
||||
load_env: bool = Field(True, exclude=True)
|
||||
|
||||
# General settings
|
||||
PYPI_UPDATE_URL: AnyHttpUrl = Field(
|
||||
"https://pypi.org/pypi/mvt/json",
|
||||
validate_default=False,
|
||||
)
|
||||
NETWORK_ACCESS_ALLOWED: bool = True
|
||||
NETWORK_TIMEOUT: int = 15
|
||||
|
||||
# Command default settings, all can be specified by MVT_ prefixed environment variables too.
|
||||
IOS_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt iOS backups"
|
||||
)
|
||||
ANDROID_BACKUP_PASSWORD: Optional[str] = Field(
|
||||
None, description="Default password to use to decrypt Android backups"
|
||||
)
|
||||
STIX2: Optional[str] = Field(
|
||||
None, description="List of directories where STIX2 files are stored"
|
||||
)
|
||||
VT_API_KEY: Optional[str] = Field(
|
||||
None, description="API key to use for VirusTotal lookups"
|
||||
)
|
||||
PROFILE: bool = Field(False, description="Profile the execution of MVT modules")
|
||||
HASH_FILES: bool = Field(False, description="Should MVT hash output files")
|
||||
|
||||
@classmethod
|
||||
def settings_customise_sources(
|
||||
cls,
|
||||
settings_cls: Type[BaseSettings],
|
||||
init_settings: InitSettingsSource,
|
||||
env_settings: PydanticBaseSettingsSource,
|
||||
dotenv_settings: PydanticBaseSettingsSource,
|
||||
file_secret_settings: PydanticBaseSettingsSource,
|
||||
) -> Tuple[PydanticBaseSettingsSource, ...]:
|
||||
sources = (
|
||||
YamlConfigSettingsSource(settings_cls, MVT_CONFIG_PATH),
|
||||
init_settings,
|
||||
)
|
||||
# Load env variables if enabled
|
||||
if init_settings.init_kwargs.get("load_env", True):
|
||||
sources = (env_settings,) + sources
|
||||
return sources
|
||||
|
||||
def save_settings(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Save the current settings to a file.
|
||||
"""
|
||||
if not os.path.isdir(MVT_CONFIG_FOLDER):
|
||||
os.makedirs(MVT_CONFIG_FOLDER)
|
||||
|
||||
# Dump the settings to the YAML file
|
||||
model_serializable = json.loads(self.model_dump_json(exclude_defaults=True))
|
||||
with open(MVT_CONFIG_PATH, "w") as config_file:
|
||||
config_file.write(yaml.dump(model_serializable, default_flow_style=False))
|
||||
|
||||
@classmethod
|
||||
def initialise(cls) -> "MVTSettings":
|
||||
"""
|
||||
Initialise the settings file.
|
||||
|
||||
We first initialise the settings (without env variable) and then persist
|
||||
them to file. This way we can update the config file with the default values.
|
||||
|
||||
Afterwards we load the settings again, this time including the env variables.
|
||||
"""
|
||||
# Set invalid env prefix to avoid loading env variables.
|
||||
settings = MVTSettings(load_env=False)
|
||||
settings.save_settings()
|
||||
|
||||
# Load the settings again with any ENV variables.
|
||||
settings = MVTSettings(load_env=True)
|
||||
return settings
|
||||
|
||||
|
||||
settings = MVTSettings.initialise()
|
||||
@@ -14,6 +14,7 @@ import ahocorasick
|
||||
from appdirs import user_data_dir
|
||||
|
||||
from .url import URL
|
||||
from .config import settings
|
||||
|
||||
MVT_DATA_FOLDER = user_data_dir("mvt")
|
||||
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
|
||||
@@ -41,12 +42,12 @@ class Indicators:
|
||||
|
||||
def _check_stix2_env_variable(self) -> None:
|
||||
"""
|
||||
Checks if a variable MVT_STIX2 contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
|
||||
Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
|
||||
"""
|
||||
if "MVT_STIX2" not in os.environ:
|
||||
if not settings.STIX2:
|
||||
return
|
||||
|
||||
paths = os.environ["MVT_STIX2"].split(":")
|
||||
paths = settings.STIX2.split(":")
|
||||
for path in paths:
|
||||
if os.path.isfile(path) and path.lower().endswith(".stix2"):
|
||||
self.parse_stix2(path)
|
||||
|
||||
@@ -14,6 +14,7 @@ from packaging import version
|
||||
|
||||
from .indicators import MVT_DATA_FOLDER, MVT_INDICATORS_FOLDER
|
||||
from .version import MVT_VERSION
|
||||
from .config import settings
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
@@ -23,7 +24,7 @@ INDICATORS_CHECK_FREQUENCY = 12
|
||||
|
||||
class MVTUpdates:
|
||||
def check(self) -> str:
|
||||
res = requests.get("https://pypi.org/pypi/mvt/json", timeout=15)
|
||||
res = requests.get(settings.PYPI_UPDATE_URL, timeout=15)
|
||||
data = res.json()
|
||||
latest_version = data.get("info", {}).get("version", "")
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ import re
|
||||
from typing import Any, Iterator, Union
|
||||
|
||||
from rich.logging import RichHandler
|
||||
from mvt.common.config import settings
|
||||
|
||||
|
||||
class CustomJSONEncoder(json.JSONEncoder):
|
||||
@@ -256,7 +257,7 @@ def set_verbose_logging(verbose: bool = False):
|
||||
|
||||
def exec_or_profile(module, globals, locals):
|
||||
"""Hook for profiling MVT modules"""
|
||||
if int(os.environ.get("MVT_PROFILE", False)):
|
||||
if settings.PROFILE:
|
||||
cProfile.runctx(module, globals, locals)
|
||||
else:
|
||||
exec(module, globals, locals)
|
||||
|
||||
@@ -6,6 +6,8 @@
|
||||
import logging
|
||||
import os
|
||||
|
||||
|
||||
from mvt.common.config import settings
|
||||
from mvt.common.indicators import Indicators
|
||||
from ..utils import get_artifact_folder
|
||||
|
||||
@@ -100,6 +102,8 @@ class TestIndicators:
|
||||
|
||||
def test_env_stix(self, indicator_file):
|
||||
os.environ["MVT_STIX2"] = indicator_file
|
||||
settings.__init__() # Reset settings
|
||||
|
||||
ind = Indicators(log=logging)
|
||||
ind.load_indicators_files([], load_default=False)
|
||||
assert ind.total_ioc_count == 9
|
||||
|
||||
@@ -8,6 +8,7 @@ import os
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mvt.android.cli import check_androidqf
|
||||
from mvt.common.config import settings
|
||||
|
||||
from .utils import get_artifact_folder
|
||||
|
||||
@@ -56,6 +57,8 @@ class TestCheckAndroidqfCommand:
|
||||
)
|
||||
|
||||
os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD
|
||||
settings.__init__() # Reset settings
|
||||
|
||||
runner = CliRunner()
|
||||
path = os.path.join(get_artifact_folder(), "androidqf_encrypted")
|
||||
result = runner.invoke(check_androidqf, [path])
|
||||
@@ -63,3 +66,4 @@ class TestCheckAndroidqfCommand:
|
||||
assert prompt_mock.call_count == 0
|
||||
assert result.exit_code == 0
|
||||
del os.environ["MVT_ANDROID_BACKUP_PASSWORD"]
|
||||
settings.__init__() # Reset settings
|
||||
|
||||
@@ -9,6 +9,7 @@ import os
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mvt.android.cli import check_backup
|
||||
from mvt.common.config import settings
|
||||
|
||||
from .utils import get_artifact_folder
|
||||
|
||||
@@ -63,6 +64,8 @@ class TestCheckAndroidBackupCommand:
|
||||
)
|
||||
|
||||
os.environ["MVT_ANDROID_BACKUP_PASSWORD"] = TEST_BACKUP_PASSWORD
|
||||
settings.__init__() # Reset settings
|
||||
|
||||
runner = CliRunner()
|
||||
path = os.path.join(get_artifact_folder(), "androidqf_encrypted/backup.ab")
|
||||
result = runner.invoke(check_backup, [path])
|
||||
@@ -70,3 +73,4 @@ class TestCheckAndroidBackupCommand:
|
||||
assert prompt_mock.call_count == 0
|
||||
assert result.exit_code == 0
|
||||
del os.environ["MVT_ANDROID_BACKUP_PASSWORD"]
|
||||
settings.__init__() # Reset settings
|
||||
|
||||
Reference in New Issue
Block a user