From 1b03002a00c7c8ed439c04160eac266db10c16fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Sun, 16 Feb 2025 00:10:44 +0100 Subject: [PATCH] Major refactor to add structured alerting and typed indicators This commit makes a structural change to MVT by changing binary detected/not detected logic into a structured multi-level system of alerts. This gives far more power to extend MVT and manage alerts. This commit also begins the process of adding proper typing for key objects used in MVT including Indicators, IndicatorMatches, and ModuleResults. This will also be keep to programmatically using the output of MVT. --- src/mvt/android/cli.py | 30 +-- src/mvt/android/cmd_check_androidqf.py | 9 +- src/mvt/common/alerts.py | 181 +++++++++++++++ src/mvt/common/command.py | 123 +++++++---- src/mvt/common/indicators.py | 267 +++++++++-------------- src/mvt/common/log.py | 65 ++++++ src/mvt/common/module.py | 70 ++++-- src/mvt/common/module_types.py | 29 +++ src/mvt/common/utils.py | 9 +- src/mvt/ios/cli.py | 17 +- tests/android_androidqf/test_packages.py | 7 +- 11 files changed, 544 insertions(+), 263 deletions(-) create mode 100644 src/mvt/common/alerts.py create mode 100644 src/mvt/common/log.py create mode 100644 src/mvt/common/module_types.py diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index 5d23411..ebdeeff 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -37,6 +37,7 @@ from .cmd_check_bugreport import CmdAndroidCheckBugreport from .modules.backup import BACKUP_MODULES from .modules.backup.helpers import cli_load_android_backup_password from .modules.bugreport import BUGREPORT_MODULES +from .modules.androidqf import ANDROIDQF_MODULES init_logging() log = logging.getLogger("mvt") @@ -109,12 +110,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_ log.info("Checking Android bug report at path: %s", bugreport_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the Android bug report produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -171,12 +168,8 @@ def check_backup( log.info("Checking Android backup at path: %s", backup_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the Android backup produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -235,12 +228,9 @@ def check_androidqf( log.info("Checking AndroidQF acquisition at path: %s", androidqf_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the AndroidQF acquisition produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_disable_adb_warning() + cmd.show_support_message() # ============================================================================== @@ -261,13 +251,15 @@ def check_androidqf( @click.pass_context def check_iocs(ctx, iocs, list_modules, module, folder): cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module) - cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES if list_modules: cmd.list_modules() return cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 580b5e2..e0f49ab 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -67,6 +67,9 @@ class CmdAndroidCheckAndroidQF(Command): self.__files: List[str] = [] def init(self): + if not self.target_path: + raise NoAndroidQFTargetPath + if os.path.isdir(self.target_path): self.__format = "dir" parent_path = Path(self.target_path).absolute().parent.as_posix() @@ -154,9 +157,8 @@ class CmdAndroidCheckAndroidQF(Command): cmd.from_zip(bugreport) cmd.run() - self.detected_count += cmd.detected_count self.timeline.extend(cmd.timeline) - self.timeline_detected.extend(cmd.timeline_detected) + self.alertstore.extend(cmd.alertstore.alerts) def run_backup_cmd(self) -> bool: try: @@ -179,9 +181,8 @@ class CmdAndroidCheckAndroidQF(Command): cmd.from_ab(backup) cmd.run() - self.detected_count += cmd.detected_count self.timeline.extend(cmd.timeline) - self.timeline_detected.extend(cmd.timeline_detected) + self.alertstore.extend(cmd.alertstore.alerts) def finish(self) -> None: """ diff --git a/src/mvt/common/alerts.py b/src/mvt/common/alerts.py new file mode 100644 index 0000000..635520d --- /dev/null +++ b/src/mvt/common/alerts.py @@ -0,0 +1,181 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import csv +import logging +from enum import Enum +from dataclasses import dataclass, asdict +from typing import List, Dict, Any, Optional + +from .log import INFO_ALERT, LOW_ALERT, HIGH_ALERT, CRITICAL_ALERT, MEDIUM_ALERT +from .module_types import ModuleAtomicResult + + +class AlertLevel(Enum): + INFORMATIONAL = 0 + LOW = 10 + MEDIUM = 20 + HIGH = 30 + CRITICAL = 40 + + +@dataclass +class Alert: + level: AlertLevel + module: str + message: str + event_time: str + event: ModuleAtomicResult + + +class AlertStore: + def __init__(self, log: Optional[logging.Logger] = None) -> None: + self.__alerts: List[Alert] = [] + self.__log = log + + @property + def alerts(self) -> List[Alert]: + return self.__alerts + + def add(self, alert: Alert) -> None: + self.__alerts.append(alert) + + def extend(self, alerts: List[Alert]) -> None: + self.__alerts.extend(alerts) + + def info( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.INFORMATIONAL, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def low( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.LOW, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def medium( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.MEDIUM, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def high( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.HIGH, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def critical( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.CRITICAL, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def log(self, alert: Alert) -> None: + if not self.__log: + return + + if not alert.message: + return + + if alert.level == AlertLevel.INFORMATIONAL: + self.__log.log(INFO_ALERT, alert.message) + elif alert.level == AlertLevel.LOW: + self.__log.log(LOW_ALERT, alert.message) + elif alert.level == AlertLevel.MEDIUM: + self.__log.log(MEDIUM_ALERT, alert.message) + elif alert.level == AlertLevel.HIGH: + self.__log.log(HIGH_ALERT, alert.message) + elif alert.level == AlertLevel.CRITICAL: + self.__log.log(CRITICAL_ALERT, alert.message) + + def log_latest(self) -> None: + self.log(self.__alerts[-1]) + + def count(self, level: AlertLevel) -> int: + count = 0 + for alert in self.__alerts: + if alert.level == level: + count += 1 + + return count + + def as_json(self) -> List[Dict[str, Any]]: + alerts = [] + for alert in self.__alerts: + alert_dict = asdict(alert) + # This is required because an Enum is not JSON serializable. + alert_dict["level"] = alert.level.name + alerts.append(alert_dict) + + return alerts + + def save_timeline(self, timeline_path: str) -> None: + with open(timeline_path, "a+", encoding="utf-8") as handle: + csvoutput = csv.writer( + handle, + delimiter=",", + quotechar='"', + quoting=csv.QUOTE_ALL, + escapechar="\\", + ) + csvoutput.writerow(["Event Time", "Module", "Message", "Event"]) + + timed_alerts = [] + for alert in self.alerts: + if not alert.event_time: + continue + + timed_alerts.append(asdict(alert)) + + for event in sorted( + timed_alerts, + key=lambda x: x["event_time"] if x["event_time"] is not None else "", + ): + csvoutput.writerow( + [ + event.get("event_time"), + event.get("module"), + event.get("message"), + event.get("event"), + ] + ) diff --git a/src/mvt/common/command.py b/src/mvt/common/command.py index b6d7aaf..920f3a0 100644 --- a/src/mvt/common/command.py +++ b/src/mvt/common/command.py @@ -9,16 +9,20 @@ import os import sys from datetime import datetime from typing import Optional +from rich.console import Console +from rich.panel import Panel +from rich.text import Text -from mvt.common.indicators import Indicators -from mvt.common.module import MVTModule, run_module, save_timeline -from mvt.common.utils import ( +from .indicators import Indicators +from .module import MVTModule, run_module, save_timeline +from .utils import ( convert_datetime_to_iso, generate_hashes_from_path, get_sha256_from_file_path, ) -from mvt.common.config import settings -from mvt.common.version import MVT_VERSION +from .config import settings +from .alerts import AlertStore, AlertLevel +from .version import MVT_VERSION class Command: @@ -70,12 +74,14 @@ class Command: self.iocs = Indicators(self.log) self.iocs.load_indicators_files(self.ioc_files) + self.alertstore = AlertStore() + def _create_storage(self) -> None: if self.results_path and not os.path.exists(self.results_path): try: os.makedirs(self.results_path) except Exception as exc: - self.log.critical( + self.log.fatal( "Unable to create output folder %s: %s", self.results_path, exc ) sys.exit(1) @@ -94,14 +100,14 @@ class Command: file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) - # MVT can be run in a loop - # Old file handlers stick around in subsequent loops - # Remove any existing logging.FileHandler instances + # MVT can be run in a loop. + # Old file handlers stick around in subsequent loops. + # Remove any existing logging.FileHandler instances. for handler in logger.handlers: if isinstance(handler, logging.FileHandler): logger.removeHandler(handler) - # And finally add the new one + # And finally add the new one. logger.addHandler(file_handler) def _store_timeline(self) -> None: @@ -122,12 +128,24 @@ class Command: is_utc=is_utc, ) - if len(self.timeline_detected) > 0: - save_timeline( - self.timeline_detected, - os.path.join(self.results_path, "timeline_detected.csv"), - is_utc=is_utc, - ) + def _store_alerts(self) -> None: + if not self.results_path: + return + + alerts = self.alertstore.as_json() + if not alerts: + return + + alerts_path = os.path.join(self.results_path, "alerts.json") + with open(alerts_path, "w+", encoding="utf-8") as handle: + json.dump(alerts, handle, indent=4) + + def _store_alerts_timeline(self) -> None: + if not self.results_path: + return + + alerts_timeline_path = os.path.join(self.results_path, "alerts_timeline.csv") + self.alertstore.save_timeline(alerts_timeline_path) def _store_info(self) -> None: if not self.results_path: @@ -187,26 +205,54 @@ class Command: def finish(self) -> None: raise NotImplementedError - def _show_disable_adb_warning(self) -> None: - """Warn if ADB is enabled""" - if type(self).__name__ in ["CmdAndroidCheckADB", "CmdAndroidCheckAndroidQF"]: - self.log.info( - "Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. " - "ADB is a powerful tool which can allow unauthorized access to the device." - ) + def show_alerts_brief(self) -> None: + console = Console() + + message = Text() + for i, level in enumerate(AlertLevel): + message.append( + f"MVT produced {self.alertstore.count(level)} {level.name} alerts." + ) + if i < len(AlertLevel) - 1: + message.append("\n") + + panel = Panel( + message, title="ALERTS", style="sandy_brown", border_style="sandy_brown" + ) + console.print("") + console.print(panel) + + def show_disable_adb_warning(self) -> None: + console = Console() + message = Text( + "Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. " + "ADB is a powerful tool which can allow unauthorized access to the device." + ) + panel = Panel(message, title="NOTE", style="yellow", border_style="yellow") + console.print("") + console.print(panel) + + def show_support_message(self) -> None: + console = Console() + message = Text() - def _show_support_message(self) -> None: support_message = "Please seek reputable expert help if you have serious concerns about a possible spyware attack. Such support is available to human rights defenders and civil society through Amnesty International's Security Lab at https://securitylab.amnesty.org/get-help/?c=mvt" - if self.detected_count == 0: - self.log.info( - f"[bold]NOTE:[/bold] Using MVT with public indicators of compromise (IOCs) [bold]WILL NOT[/bold] automatically detect advanced attacks.\n\n{support_message}", - extra={"markup": True}, + if ( + self.alertstore.count(AlertLevel.HIGH) > 0 + or self.alertstore.count(AlertLevel.CRITICAL) > 0 + ): + message.append( + f"MVT produced HIGH or CRITICAL alerts. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}", ) + panel = Panel(message, title="WARNING", style="red", border_style="red") else: - self.log.warning( - f"[bold]NOTE: Detected indicators of compromise[/bold]. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}", - extra={"markup": True}, + message.append( + f"The lack of severe alerts does not equate to a clean bill of health.\n\n{support_message}", ) + panel = Panel(message, title="NOTE", style="yellow", border_style="yellow") + + console.print("") + console.print(panel) def run(self) -> None: try: @@ -218,6 +264,11 @@ class Command: if self.module_name and module.__name__ != self.module_name: continue + if not module.enabled and not ( + self.module_name and module.__name__ == self.module_name + ): + continue + # FIXME: do we need the logger here module_logger = logging.getLogger(module.__module__) @@ -243,11 +294,8 @@ class Command: run_module(m) self.executed.append(m) - - self.detected_count += len(m.detected) - self.timeline.extend(m.timeline) - self.timeline_detected.extend(m.timeline_detected) + self.alertstore.extend(m.alertstore.alerts) try: self.finish() @@ -259,7 +307,6 @@ class Command: return self._store_timeline() + self._store_alerts_timeline() + self._store_alerts() self._store_info() - - self._show_disable_adb_warning() - self._show_support_message() diff --git a/src/mvt/common/indicators.py b/src/mvt/common/indicators.py index e23a996..adb4483 100644 --- a/src/mvt/common/indicators.py +++ b/src/mvt/common/indicators.py @@ -8,7 +8,8 @@ import json import logging import os from functools import lru_cache -from typing import Any, Dict, Iterator, List, Optional, Union +from typing import Any, Dict, Iterator, List, Optional +from dataclasses import dataclass import ahocorasick from appdirs import user_data_dir @@ -22,6 +23,20 @@ MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") logger = logging.getLogger(__name__) +@dataclass +class Indicator: + value: str + type: str + name: str + stix2_file_name: str + + +@dataclass +class IndicatorMatch: + ioc: Indicator + message: str + + class Indicators: """This class is used to parse indicators from a STIX2 file and provide functions to compare extracted artifacts to the indicators. @@ -203,7 +218,7 @@ class Indicators: try: data = json.load(handle) except json.decoder.JSONDecodeError: - self.log.critical( + self.log.warning( "Unable to parse STIX2 indicator file. " "The file is corrupted or in the wrong format!" ) @@ -314,7 +329,7 @@ class Indicators: if os.path.isfile(file_path): self.parse_stix2(file_path) else: - self.log.warning("No indicators file exists at path %s", file_path) + self.log.error("No indicators file exists at path %s", file_path) # Load downloaded indicators and any indicators from env variable. if load_default: @@ -323,15 +338,15 @@ class Indicators: self._check_stix2_env_variable() self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]: + def get_iocs(self, ioc_type: str) -> Iterator[Indicator]: for ioc_collection in self.ioc_collections: for ioc in ioc_collection.get(ioc_type, []): - yield { - "value": ioc, - "type": ioc_type, - "name": ioc_collection["name"], - "stix2_file_name": ioc_collection["stix2_file_name"], - } + yield Indicator( + value=ioc, + type=ioc_type, + name=ioc_collection["name"], + stix2_file_name=ioc_collection["stix2_file_name"], + ) @lru_cache() def get_ioc_matcher( @@ -362,12 +377,12 @@ class Indicators: raise ValueError("Must provide either ioc_type or ioc_list") for ioc in iocs: - automaton.add_word(ioc["value"], ioc) + automaton.add_word(ioc.value, ioc) automaton.make_automaton() return automaton @lru_cache() - def check_url(self, url: str) -> Union[dict, None]: + def check_url(self, url: str) -> Optional[IndicatorMatch]: """Check if a given URL matches any of the provided domain indicators. :param url: URL to match against domain indicators @@ -375,21 +390,16 @@ class Indicators: :returns: Indicator details if matched, otherwise None """ - if not url: - return None - if not isinstance(url, str): + if not url or not isinstance(url, str): return None # Check the URL first for ioc in self.get_iocs("urls"): - if ioc["value"] == url: - self.log.warning( - 'Found a known suspicious URL %s matching indicator "%s" from "%s"', - url, - ioc["value"], - ioc["name"], + if ioc.value == url: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious URL {url} matching indicator "{ioc.value}" from "{ioc.name}"', ) - return ioc # Then check the domain # Create an Aho-Corasick automaton from the list of urls @@ -426,71 +436,41 @@ class Indicators: except Exception: # If URL parsing failed, we just try to do a simple substring # match. - for idx, ioc in domain_matcher.iter(url): - if ioc["value"].lower() in url: - self.log.warning( - "Maybe found a known suspicious domain %s " - 'matching indicator "%s" from "%s"', - url, - ioc["value"], - ioc["name"], + for _, ioc in domain_matcher.iter(url): + if ioc.value.lower() in url: + return IndicatorMatch( + ioc=ioc, + message=f'Maybe found a known suspicious domain {url} matching indicator "{ioc.value}" from "{ioc.name}"', ) - return ioc # If nothing matched, we can quit here. return None # If all parsing worked, we start walking through available domain # indicators. - for idx, ioc in domain_matcher.iter(final_url.domain.lower()): + for _, ioc in domain_matcher.iter(final_url.domain.lower()): # First we check the full domain. - if final_url.domain.lower() == ioc["value"]: + if final_url.domain.lower() == ioc.value: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning( - "Found a known suspicious domain %s " - 'shortened as %s matching indicator "%s" from "%s"', - final_url.url, - orig_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a known suspicious domain {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' else: - self.log.warning( - "Found a known suspicious domain %s " - 'matching indicator "%s" from "%s"', - final_url.url, - ioc["value"], - ioc["name"], - ) - return ioc + message = f'Found a known suspicious domain {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' + + return IndicatorMatch(ioc=ioc, message=message) # Then we just check the top level domain. - for idx, ioc in domain_matcher.iter(final_url.top_level.lower()): - if final_url.top_level.lower() == ioc["value"]: + for _, ioc in domain_matcher.iter(final_url.top_level.lower()): + if final_url.top_level.lower() == ioc.value: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning( - "Found a sub-domain with suspicious top " - "level %s shortened as %s matching " - 'indicator "%s" from "%s"', - final_url.url, - orig_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a sub-domain with suspicious top level {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' else: - self.log.warning( - "Found a sub-domain with a suspicious top " - 'level %s matching indicator "%s" from "%s"', - final_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a sub-domain with a suspicious top level {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' - return ioc + return IndicatorMatch(ioc=ioc, message=message) return None - def check_urls(self, urls: list) -> Union[dict, None]: + def check_urls(self, urls: list) -> Optional[IndicatorMatch]: """Check a list of URLs against the provided list of domain indicators. :param urls: List of URLs to check against domain indicators @@ -508,7 +488,7 @@ class Indicators: return None - def check_process(self, process: str) -> Union[dict, None]: + def check_process(self, process: str) -> Optional[IndicatorMatch]: """Check the provided process name against the list of process indicators. @@ -522,28 +502,22 @@ class Indicators: proc_name = os.path.basename(process) for ioc in self.get_iocs("processes"): - if proc_name == ioc["value"]: - self.log.warning( - 'Found a known suspicious process name "%s" ' - 'matching indicators from "%s"', - process, - ioc["name"], + if proc_name == ioc.value: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious process name "{process}" matching indicators from "{ioc.name}"', ) - return ioc if len(proc_name) == 16: - if ioc["value"].startswith(proc_name): - self.log.warning( - "Found a truncated known suspicious " - 'process name "%s" matching indicators from "%s"', - process, - ioc["name"], + if ioc.value.startswith(proc_name): + return IndicatorMatch( + ioc=ioc, + message=f'Found a truncated known suspicious process name "{process}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_processes(self, processes: list) -> Union[dict, None]: + def check_processes(self, processes: list) -> Optional[IndicatorMatch]: """Check the provided list of processes against the list of process indicators. @@ -562,7 +536,7 @@ class Indicators: return None - def check_email(self, email: str) -> Union[dict, None]: + def check_email(self, email: str) -> Optional[IndicatorMatch]: """Check the provided email against the list of email indicators. :param email: Email address to check against email indicators @@ -574,18 +548,15 @@ class Indicators: return None for ioc in self.get_iocs("emails"): - if email.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious email address "%s" ' - 'matching indicators from "%s"', - email, - ioc["name"], + if email.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious email address "{email}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_name(self, file_name: str) -> Union[dict, None]: + def check_file_name(self, file_name: str) -> Optional[IndicatorMatch]: """Check the provided file name against the list of file indicators. :param file_name: File name to check against file @@ -598,18 +569,15 @@ class Indicators: return None for ioc in self.get_iocs("file_names"): - if ioc["value"] == file_name: - self.log.warning( - 'Found a known suspicious file name "%s" ' - 'matching indicators from "%s"', - file_name, - ioc["name"], + if ioc.value == file_name: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file name "{file_name}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_path(self, file_path: str) -> Union[dict, None]: + def check_file_path(self, file_path: str) -> Optional[IndicatorMatch]: """Check the provided file path against the list of file indicators (both path and name). @@ -629,18 +597,15 @@ class Indicators: for ioc in self.get_iocs("file_paths"): # Strip any trailing slash from indicator paths to match # directories. - if file_path.startswith(ioc["value"].rstrip("/")): - self.log.warning( - 'Found a known suspicious file path "%s" ' - 'matching indicators form "%s"', - file_path, - ioc["name"], + if file_path.startswith(ioc.value.rstrip("/")): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file path "{file_path}" matching indicators form "{ioc.name}"', ) - return ioc return None - def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]: + def check_file_path_process(self, file_path: str) -> Optional[IndicatorMatch]: """Check the provided file path contains a process name from the list of indicators @@ -655,18 +620,15 @@ class Indicators: for ioc in self.get_iocs("processes"): parts = file_path.split("/") - if ioc["value"] in parts: - self.log.warning( - "Found known suspicious process name mentioned in file at " - 'path "%s" matching indicators from "%s"', - file_path, - ioc["name"], + if ioc.value in parts: + return IndicatorMatch( + ioc=ioc, + message=f'Found known suspicious process name mentioned in file at path "{file_path}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_profile(self, profile_uuid: str) -> Union[dict, None]: + def check_profile(self, profile_uuid: str) -> Optional[IndicatorMatch]: """Check the provided configuration profile UUID against the list of indicators. @@ -680,18 +642,15 @@ class Indicators: return None for ioc in self.get_iocs("ios_profile_ids"): - if profile_uuid in ioc["value"]: - self.log.warning( - 'Found a known suspicious profile ID "%s" ' - 'matching indicators from "%s"', - profile_uuid, - ioc["name"], + if profile_uuid in ioc.value: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious profile ID "{profile_uuid}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_hash(self, file_hash: str) -> Union[dict, None]: + def check_file_hash(self, file_hash: str) -> Optional[IndicatorMatch]: """Check the provided file hash against the list of indicators. :param file_hash: hash to check @@ -710,18 +669,15 @@ class Indicators: hash_type = "sha256" for ioc in self.get_iocs("files_" + hash_type): - if file_hash.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious file with hash "%s" ' - 'matching indicators from "%s"', - file_hash, - ioc["name"], + if file_hash.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file with hash "{file_hash}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_app_certificate_hash(self, cert_hash: str) -> Union[dict, None]: + def check_app_certificate_hash(self, cert_hash: str) -> Optional[IndicatorMatch]: """Check the provided cert hash against the list of indicators. :param cert_hash: hash to check @@ -733,18 +689,15 @@ class Indicators: return None for ioc in self.get_iocs("app_cert_hashes"): - if cert_hash.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious app certfificate with hash "%s" ' - 'matching indicators from "%s"', - cert_hash, - ioc["name"], + if cert_hash.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious app certfificate with hash "{cert_hash}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_app_id(self, app_id: str) -> Union[dict, None]: + def check_app_id(self, app_id: str) -> Optional[IndicatorMatch]: """Check the provided app identifier (typically an Android package name) against the list of indicators. @@ -757,18 +710,17 @@ class Indicators: return None for ioc in self.get_iocs("app_ids"): - if app_id.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious app with ID "%s" ' - 'matching indicators from "%s"', - app_id, - ioc["name"], + if app_id.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious app with ID "{app_id}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_android_property_name(self, property_name: str) -> Optional[dict]: + def check_android_property_name( + self, property_name: str + ) -> Optional[IndicatorMatch]: """Check the android property name against the list of indicators. :param property_name: Name of the Android property @@ -780,24 +732,21 @@ class Indicators: return None for ioc in self.get_iocs("android_property_names"): - if property_name.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious Android property "%s" ' - 'matching indicators from "%s"', - property_name, - ioc["name"], + if property_name.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious Android property "{property_name}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_domain(self, url: str) -> Union[dict, None]: + def check_domain(self, url: str) -> Optional[IndicatorMatch]: """ Renamed check_url now, kept for compatibility """ return self.check_url(url) - def check_domains(self, urls: list) -> Union[dict, None]: + def check_domains(self, urls: list) -> Optional[IndicatorMatch]: """ Renamed check_domains, kept for compatibility """ diff --git a/src/mvt/common/log.py b/src/mvt/common/log.py new file mode 100644 index 0000000..498b13d --- /dev/null +++ b/src/mvt/common/log.py @@ -0,0 +1,65 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from rich.console import Console +from rich.logging import RichHandler +from typing import Optional + +INFO = logging.INFO +DEBUG = logging.DEBUG +ERROR = logging.ERROR +FATAL = logging.CRITICAL +WARNING = logging.WARNING + +INFO_ALERT = 25 +LOW_ALERT = 35 +MEDIUM_ALERT = 45 +HIGH_ALERT = 55 +CRITICAL_ALERT = 65 + +logging.addLevelName(INFO_ALERT, "INFO") +logging.addLevelName(LOW_ALERT, "LOW") +logging.addLevelName(MEDIUM_ALERT, "MEDIUM") +logging.addLevelName(HIGH_ALERT, "HIGH") +logging.addLevelName(CRITICAL_ALERT, "CRITICAL") + + +class MVTLogHandler(RichHandler): + def __init__(self, console: Optional[Console] = None, level: int = logging.DEBUG): + super().__init__(console=console, level=level) + + def __add_prefix_space(self, level: str) -> str: + max_length = len("CRITICAL ALERT") + space = max_length - len(level) + return f"{level}{' ' * space}" + + def emit(self, record: logging.LogRecord): + try: + msg = rf"[grey50]\[{record.name}][/] {self.format(record)}" + + if record.levelno == ERROR: + msg = f"[bold red]{self.__add_prefix_space('ERROR')}[/bold red] {msg}" + elif record.levelno == FATAL: + msg = f"[bold red]{self.__add_prefix_space('FATAL')}[/bold red] {msg}" + elif record.levelno == WARNING: + msg = f"[yellow]{self.__add_prefix_space('WARNING')}[/yellow] {msg}" + elif record.levelno == INFO_ALERT: + msg = f"[blue]{self.__add_prefix_space('INFO ALERT')}[/blue] {msg}" + elif record.levelno == LOW_ALERT: + msg = f"[yellow]{self.__add_prefix_space('LOW ALERT')}[/yellow] {msg}" + elif record.levelno == MEDIUM_ALERT: + msg = f"[sandy_brown]{self.__add_prefix_space('MEDIUM ALERT')}[/sandy_brown] {msg}" + elif record.levelno == HIGH_ALERT: + msg = f"[red]{self.__add_prefix_space('HIGH ALERT')}[/red] {msg}" + elif record.levelno == CRITICAL_ALERT: + msg = f"[bold red]{self.__add_prefix_space('CRITICAL ALERT')}[/bold red] {msg}" + else: + msg = f"{self.__add_prefix_space('')} {msg}" + + self.console.print(msg) + + except Exception: + self.handleError(record) diff --git a/src/mvt/common/module.py b/src/mvt/common/module.py index 1468cf4..e57a9fc 100644 --- a/src/mvt/common/module.py +++ b/src/mvt/common/module.py @@ -8,9 +8,18 @@ import json import logging import os import re -from typing import Any, Dict, List, Optional, Union +from dataclasses import asdict, is_dataclass +from typing import Any, Dict, Optional from .utils import CustomJSONEncoder, exec_or_profile +from .indicators import Indicators +from .alerts import AlertStore +from .module_types import ( + ModuleResults, + ModuleTimeline, + ModuleSerializedResult, + ModuleAtomicResult, +) class DatabaseNotFoundError(Exception): @@ -28,7 +37,7 @@ class InsufficientPrivileges(Exception): class MVTModule: """This class provides a base for all extraction modules.""" - enabled = True + enabled: bool = True slug: Optional[str] = None def __init__( @@ -38,7 +47,7 @@ class MVTModule: results_path: Optional[str] = None, module_options: Optional[Dict[str, Any]] = None, log: logging.Logger = logging.getLogger(__name__), - results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None, + results: ModuleResults = [], ) -> None: """Initialize module. @@ -46,7 +55,7 @@ class MVTModule: :type file_path: str :param target_path: Path to the target folder (backup or filesystem dump) - :type file_path: str + :type target_path: str :param results_path: Folder where results will be stored :type results_path: str :param fast_mode: Flag to enable or disable slow modules @@ -55,16 +64,21 @@ class MVTModule: :param results: Provided list of results entries :type results: list """ - self.file_path = file_path - self.target_path = target_path - self.results_path = results_path - self.module_options = module_options if module_options else {} + self.file_path: Optional[str] = file_path + self.target_path: Optional[str] = target_path + self.results_path: Optional[str] = results_path + self.module_options: Optional[Dict[str, Any]] = ( + module_options if module_options else {} + ) + self.log = log - self.indicators = None - self.results = results if results else [] - self.detected: List[Dict[str, Any]] = [] - self.timeline: List[Dict[str, str]] = [] - self.timeline_detected: List[Dict[str, str]] = [] + self.indicators: Optional[Indicators] = None + self.alertstore: AlertStore = AlertStore(log=log) + + self.results: ModuleResults = results if results else [] + self.detected: ModuleResults = [] + self.timeline: ModuleTimeline = [] + self.timeline_detected: ModuleTimeline = [] @classmethod def from_json(cls, json_path: str, log: logging.Logger): @@ -72,11 +86,11 @@ class MVTModule: results = json.load(handle) if log: log.info('Loaded %d results from "%s"', len(results), json_path) + return cls(results=results, log=log) @classmethod def get_slug(cls) -> str: - """Use the module's class name to retrieve a slug""" if cls.slug: return cls.slug @@ -84,26 +98,26 @@ class MVTModule: return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower() def check_indicators(self) -> None: - """Check the results of this module against a provided list of - indicators. - - - """ raise NotImplementedError def save_to_json(self) -> None: - """Save the collected results to a json file.""" if not self.results_path: return name = self.get_slug() if self.results: + converted_results = [ + asdict(result) if is_dataclass(result) else result + for result in self.results + ] results_file_name = f"{name}.json" results_json_path = os.path.join(self.results_path, results_file_name) with open(results_json_path, "w", encoding="utf-8") as handle: try: - json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder) + json.dump( + converted_results, handle, indent=4, cls=CustomJSONEncoder + ) except Exception as exc: self.log.error( "Unable to store results of module %s to file %s: %s", @@ -118,7 +132,7 @@ class MVTModule: with open(detected_json_path, "w", encoding="utf-8") as handle: json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder) - def serialize(self, record: dict) -> Union[dict, list, None]: + def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult: raise NotImplementedError @staticmethod @@ -130,13 +144,21 @@ class MVTModule: """ timeline_set = set() for record in timeline: - timeline_set.add(json.dumps(record, sort_keys=True)) + timeline_set.add( + json.dumps( + asdict(record) if is_dataclass(record) else record, sort_keys=True + ) + ) + return [json.loads(record) for record in timeline_set] def to_timeline(self) -> None: """Convert results into a timeline.""" + if not self.results: + return + for result in self.results: - record = self.serialize(result) + record: ModuleSerializedResult = self.serialize(result) if record: if isinstance(record, list): self.timeline.extend(record) diff --git a/src/mvt/common/module_types.py b/src/mvt/common/module_types.py new file mode 100644 index 0000000..f433482 --- /dev/null +++ b/src/mvt/common/module_types.py @@ -0,0 +1,29 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from .indicators import Indicator +from dataclasses import dataclass +from typing import List, Union, Optional + + +@dataclass +class ModuleAtomicResult: + timestamp: Optional[str] + matched_indicator: Optional[Indicator] + + +ModuleResults = List[ModuleAtomicResult] + + +@dataclass +class ModuleAtomicTimeline: + timestamp: str + module: str + event: str + data: str + + +ModuleTimeline = List[ModuleAtomicTimeline] +ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline] diff --git a/src/mvt/common/utils.py b/src/mvt/common/utils.py index 3d054f5..500fd85 100644 --- a/src/mvt/common/utils.py +++ b/src/mvt/common/utils.py @@ -12,7 +12,7 @@ import os import re from typing import Any, Iterator, Union -from rich.logging import RichHandler +from .log import MVTLogHandler from mvt.common.config import settings @@ -234,11 +234,10 @@ def init_logging(verbose: bool = False): """ Initialise logging for the MVT module """ - # Setup logging using Rich. log = logging.getLogger("mvt") - log.setLevel(logging.DEBUG) - consoleHandler = RichHandler(show_path=False, log_time_format="%X") - consoleHandler.setFormatter(logging.Formatter("[%(name)s] %(message)s")) + log.setLevel(logging.INFO) + consoleHandler = MVTLogHandler() + consoleHandler.setFormatter(logging.Formatter("%(message)s")) if verbose: consoleHandler.setLevel(logging.DEBUG) else: diff --git a/src/mvt/ios/cli.py b/src/mvt/ios/cli.py index 1d06c96..6ffd106 100644 --- a/src/mvt/ios/cli.py +++ b/src/mvt/ios/cli.py @@ -228,11 +228,8 @@ def check_backup( log.info("Checking iTunes backup located at: %s", backup_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the backup produced %d detections!", cmd.detected_count - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -275,12 +272,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum log.info("Checking iOS filesystem located at: %s", dump_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the iOS filesystem produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -308,6 +301,8 @@ def check_iocs(ctx, iocs, list_modules, module, folder): return cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== diff --git a/tests/android_androidqf/test_packages.py b/tests/android_androidqf/test_packages.py index 966d8a6..fe6332a 100644 --- a/tests/android_androidqf/test_packages.py +++ b/tests/android_androidqf/test_packages.py @@ -91,7 +91,8 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.blah" assert ( - possible_detected_app[0]["matched_indicator"]["value"] == "com.malware.blah" + possible_detected_app[0]["matched_indicator"].ioc.value + == "com.malware.blah" ) def test_packages_ioc_sha256(self, module, indicators_factory): @@ -109,7 +110,7 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.muahaha" assert ( - possible_detected_app[0]["matched_indicator"]["value"] + possible_detected_app[0]["matched_indicator"].ioc.value == "31037a27af59d4914906c01ad14a318eee2f3e31d48da8954dca62a99174e3fa" ) @@ -128,6 +129,6 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.muahaha" assert ( - possible_detected_app[0]["matched_indicator"]["value"] + possible_detected_app[0]["matched_indicator"].ioc.value == "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730" )