diff --git a/src/mvt/android/cli.py b/src/mvt/android/cli.py index 5d23411..ebdeeff 100644 --- a/src/mvt/android/cli.py +++ b/src/mvt/android/cli.py @@ -37,6 +37,7 @@ from .cmd_check_bugreport import CmdAndroidCheckBugreport from .modules.backup import BACKUP_MODULES from .modules.backup.helpers import cli_load_android_backup_password from .modules.bugreport import BUGREPORT_MODULES +from .modules.androidqf import ANDROIDQF_MODULES init_logging() log = logging.getLogger("mvt") @@ -109,12 +110,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_ log.info("Checking Android bug report at path: %s", bugreport_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the Android bug report produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -171,12 +168,8 @@ def check_backup( log.info("Checking Android backup at path: %s", backup_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the Android backup produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -235,12 +228,9 @@ def check_androidqf( log.info("Checking AndroidQF acquisition at path: %s", androidqf_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the AndroidQF acquisition produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_disable_adb_warning() + cmd.show_support_message() # ============================================================================== @@ -261,13 +251,15 @@ def check_androidqf( @click.pass_context def check_iocs(ctx, iocs, list_modules, module, folder): cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module) - cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES if list_modules: cmd.list_modules() return cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== diff --git a/src/mvt/android/cmd_check_androidqf.py b/src/mvt/android/cmd_check_androidqf.py index 580b5e2..e0f49ab 100644 --- a/src/mvt/android/cmd_check_androidqf.py +++ b/src/mvt/android/cmd_check_androidqf.py @@ -67,6 +67,9 @@ class CmdAndroidCheckAndroidQF(Command): self.__files: List[str] = [] def init(self): + if not self.target_path: + raise NoAndroidQFTargetPath + if os.path.isdir(self.target_path): self.__format = "dir" parent_path = Path(self.target_path).absolute().parent.as_posix() @@ -154,9 +157,8 @@ class CmdAndroidCheckAndroidQF(Command): cmd.from_zip(bugreport) cmd.run() - self.detected_count += cmd.detected_count self.timeline.extend(cmd.timeline) - self.timeline_detected.extend(cmd.timeline_detected) + self.alertstore.extend(cmd.alertstore.alerts) def run_backup_cmd(self) -> bool: try: @@ -179,9 +181,8 @@ class CmdAndroidCheckAndroidQF(Command): cmd.from_ab(backup) cmd.run() - self.detected_count += cmd.detected_count self.timeline.extend(cmd.timeline) - self.timeline_detected.extend(cmd.timeline_detected) + self.alertstore.extend(cmd.alertstore.alerts) def finish(self) -> None: """ diff --git a/src/mvt/common/alerts.py b/src/mvt/common/alerts.py new file mode 100644 index 0000000..635520d --- /dev/null +++ b/src/mvt/common/alerts.py @@ -0,0 +1,181 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import csv +import logging +from enum import Enum +from dataclasses import dataclass, asdict +from typing import List, Dict, Any, Optional + +from .log import INFO_ALERT, LOW_ALERT, HIGH_ALERT, CRITICAL_ALERT, MEDIUM_ALERT +from .module_types import ModuleAtomicResult + + +class AlertLevel(Enum): + INFORMATIONAL = 0 + LOW = 10 + MEDIUM = 20 + HIGH = 30 + CRITICAL = 40 + + +@dataclass +class Alert: + level: AlertLevel + module: str + message: str + event_time: str + event: ModuleAtomicResult + + +class AlertStore: + def __init__(self, log: Optional[logging.Logger] = None) -> None: + self.__alerts: List[Alert] = [] + self.__log = log + + @property + def alerts(self) -> List[Alert]: + return self.__alerts + + def add(self, alert: Alert) -> None: + self.__alerts.append(alert) + + def extend(self, alerts: List[Alert]) -> None: + self.__alerts.extend(alerts) + + def info( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.INFORMATIONAL, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def low( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.LOW, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def medium( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.MEDIUM, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def high( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.HIGH, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def critical( + self, module: str, message: str, event_time: str, event: ModuleAtomicResult + ): + self.add( + Alert( + level=AlertLevel.CRITICAL, + module=module, + message=message, + event_time=event_time, + event=event, + ) + ) + + def log(self, alert: Alert) -> None: + if not self.__log: + return + + if not alert.message: + return + + if alert.level == AlertLevel.INFORMATIONAL: + self.__log.log(INFO_ALERT, alert.message) + elif alert.level == AlertLevel.LOW: + self.__log.log(LOW_ALERT, alert.message) + elif alert.level == AlertLevel.MEDIUM: + self.__log.log(MEDIUM_ALERT, alert.message) + elif alert.level == AlertLevel.HIGH: + self.__log.log(HIGH_ALERT, alert.message) + elif alert.level == AlertLevel.CRITICAL: + self.__log.log(CRITICAL_ALERT, alert.message) + + def log_latest(self) -> None: + self.log(self.__alerts[-1]) + + def count(self, level: AlertLevel) -> int: + count = 0 + for alert in self.__alerts: + if alert.level == level: + count += 1 + + return count + + def as_json(self) -> List[Dict[str, Any]]: + alerts = [] + for alert in self.__alerts: + alert_dict = asdict(alert) + # This is required because an Enum is not JSON serializable. + alert_dict["level"] = alert.level.name + alerts.append(alert_dict) + + return alerts + + def save_timeline(self, timeline_path: str) -> None: + with open(timeline_path, "a+", encoding="utf-8") as handle: + csvoutput = csv.writer( + handle, + delimiter=",", + quotechar='"', + quoting=csv.QUOTE_ALL, + escapechar="\\", + ) + csvoutput.writerow(["Event Time", "Module", "Message", "Event"]) + + timed_alerts = [] + for alert in self.alerts: + if not alert.event_time: + continue + + timed_alerts.append(asdict(alert)) + + for event in sorted( + timed_alerts, + key=lambda x: x["event_time"] if x["event_time"] is not None else "", + ): + csvoutput.writerow( + [ + event.get("event_time"), + event.get("module"), + event.get("message"), + event.get("event"), + ] + ) diff --git a/src/mvt/common/command.py b/src/mvt/common/command.py index b6d7aaf..920f3a0 100644 --- a/src/mvt/common/command.py +++ b/src/mvt/common/command.py @@ -9,16 +9,20 @@ import os import sys from datetime import datetime from typing import Optional +from rich.console import Console +from rich.panel import Panel +from rich.text import Text -from mvt.common.indicators import Indicators -from mvt.common.module import MVTModule, run_module, save_timeline -from mvt.common.utils import ( +from .indicators import Indicators +from .module import MVTModule, run_module, save_timeline +from .utils import ( convert_datetime_to_iso, generate_hashes_from_path, get_sha256_from_file_path, ) -from mvt.common.config import settings -from mvt.common.version import MVT_VERSION +from .config import settings +from .alerts import AlertStore, AlertLevel +from .version import MVT_VERSION class Command: @@ -70,12 +74,14 @@ class Command: self.iocs = Indicators(self.log) self.iocs.load_indicators_files(self.ioc_files) + self.alertstore = AlertStore() + def _create_storage(self) -> None: if self.results_path and not os.path.exists(self.results_path): try: os.makedirs(self.results_path) except Exception as exc: - self.log.critical( + self.log.fatal( "Unable to create output folder %s: %s", self.results_path, exc ) sys.exit(1) @@ -94,14 +100,14 @@ class Command: file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) - # MVT can be run in a loop - # Old file handlers stick around in subsequent loops - # Remove any existing logging.FileHandler instances + # MVT can be run in a loop. + # Old file handlers stick around in subsequent loops. + # Remove any existing logging.FileHandler instances. for handler in logger.handlers: if isinstance(handler, logging.FileHandler): logger.removeHandler(handler) - # And finally add the new one + # And finally add the new one. logger.addHandler(file_handler) def _store_timeline(self) -> None: @@ -122,12 +128,24 @@ class Command: is_utc=is_utc, ) - if len(self.timeline_detected) > 0: - save_timeline( - self.timeline_detected, - os.path.join(self.results_path, "timeline_detected.csv"), - is_utc=is_utc, - ) + def _store_alerts(self) -> None: + if not self.results_path: + return + + alerts = self.alertstore.as_json() + if not alerts: + return + + alerts_path = os.path.join(self.results_path, "alerts.json") + with open(alerts_path, "w+", encoding="utf-8") as handle: + json.dump(alerts, handle, indent=4) + + def _store_alerts_timeline(self) -> None: + if not self.results_path: + return + + alerts_timeline_path = os.path.join(self.results_path, "alerts_timeline.csv") + self.alertstore.save_timeline(alerts_timeline_path) def _store_info(self) -> None: if not self.results_path: @@ -187,26 +205,54 @@ class Command: def finish(self) -> None: raise NotImplementedError - def _show_disable_adb_warning(self) -> None: - """Warn if ADB is enabled""" - if type(self).__name__ in ["CmdAndroidCheckADB", "CmdAndroidCheckAndroidQF"]: - self.log.info( - "Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. " - "ADB is a powerful tool which can allow unauthorized access to the device." - ) + def show_alerts_brief(self) -> None: + console = Console() + + message = Text() + for i, level in enumerate(AlertLevel): + message.append( + f"MVT produced {self.alertstore.count(level)} {level.name} alerts." + ) + if i < len(AlertLevel) - 1: + message.append("\n") + + panel = Panel( + message, title="ALERTS", style="sandy_brown", border_style="sandy_brown" + ) + console.print("") + console.print(panel) + + def show_disable_adb_warning(self) -> None: + console = Console() + message = Text( + "Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. " + "ADB is a powerful tool which can allow unauthorized access to the device." + ) + panel = Panel(message, title="NOTE", style="yellow", border_style="yellow") + console.print("") + console.print(panel) + + def show_support_message(self) -> None: + console = Console() + message = Text() - def _show_support_message(self) -> None: support_message = "Please seek reputable expert help if you have serious concerns about a possible spyware attack. Such support is available to human rights defenders and civil society through Amnesty International's Security Lab at https://securitylab.amnesty.org/get-help/?c=mvt" - if self.detected_count == 0: - self.log.info( - f"[bold]NOTE:[/bold] Using MVT with public indicators of compromise (IOCs) [bold]WILL NOT[/bold] automatically detect advanced attacks.\n\n{support_message}", - extra={"markup": True}, + if ( + self.alertstore.count(AlertLevel.HIGH) > 0 + or self.alertstore.count(AlertLevel.CRITICAL) > 0 + ): + message.append( + f"MVT produced HIGH or CRITICAL alerts. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}", ) + panel = Panel(message, title="WARNING", style="red", border_style="red") else: - self.log.warning( - f"[bold]NOTE: Detected indicators of compromise[/bold]. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}", - extra={"markup": True}, + message.append( + f"The lack of severe alerts does not equate to a clean bill of health.\n\n{support_message}", ) + panel = Panel(message, title="NOTE", style="yellow", border_style="yellow") + + console.print("") + console.print(panel) def run(self) -> None: try: @@ -218,6 +264,11 @@ class Command: if self.module_name and module.__name__ != self.module_name: continue + if not module.enabled and not ( + self.module_name and module.__name__ == self.module_name + ): + continue + # FIXME: do we need the logger here module_logger = logging.getLogger(module.__module__) @@ -243,11 +294,8 @@ class Command: run_module(m) self.executed.append(m) - - self.detected_count += len(m.detected) - self.timeline.extend(m.timeline) - self.timeline_detected.extend(m.timeline_detected) + self.alertstore.extend(m.alertstore.alerts) try: self.finish() @@ -259,7 +307,6 @@ class Command: return self._store_timeline() + self._store_alerts_timeline() + self._store_alerts() self._store_info() - - self._show_disable_adb_warning() - self._show_support_message() diff --git a/src/mvt/common/indicators.py b/src/mvt/common/indicators.py index e23a996..adb4483 100644 --- a/src/mvt/common/indicators.py +++ b/src/mvt/common/indicators.py @@ -8,7 +8,8 @@ import json import logging import os from functools import lru_cache -from typing import Any, Dict, Iterator, List, Optional, Union +from typing import Any, Dict, Iterator, List, Optional +from dataclasses import dataclass import ahocorasick from appdirs import user_data_dir @@ -22,6 +23,20 @@ MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") logger = logging.getLogger(__name__) +@dataclass +class Indicator: + value: str + type: str + name: str + stix2_file_name: str + + +@dataclass +class IndicatorMatch: + ioc: Indicator + message: str + + class Indicators: """This class is used to parse indicators from a STIX2 file and provide functions to compare extracted artifacts to the indicators. @@ -203,7 +218,7 @@ class Indicators: try: data = json.load(handle) except json.decoder.JSONDecodeError: - self.log.critical( + self.log.warning( "Unable to parse STIX2 indicator file. " "The file is corrupted or in the wrong format!" ) @@ -314,7 +329,7 @@ class Indicators: if os.path.isfile(file_path): self.parse_stix2(file_path) else: - self.log.warning("No indicators file exists at path %s", file_path) + self.log.error("No indicators file exists at path %s", file_path) # Load downloaded indicators and any indicators from env variable. if load_default: @@ -323,15 +338,15 @@ class Indicators: self._check_stix2_env_variable() self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]: + def get_iocs(self, ioc_type: str) -> Iterator[Indicator]: for ioc_collection in self.ioc_collections: for ioc in ioc_collection.get(ioc_type, []): - yield { - "value": ioc, - "type": ioc_type, - "name": ioc_collection["name"], - "stix2_file_name": ioc_collection["stix2_file_name"], - } + yield Indicator( + value=ioc, + type=ioc_type, + name=ioc_collection["name"], + stix2_file_name=ioc_collection["stix2_file_name"], + ) @lru_cache() def get_ioc_matcher( @@ -362,12 +377,12 @@ class Indicators: raise ValueError("Must provide either ioc_type or ioc_list") for ioc in iocs: - automaton.add_word(ioc["value"], ioc) + automaton.add_word(ioc.value, ioc) automaton.make_automaton() return automaton @lru_cache() - def check_url(self, url: str) -> Union[dict, None]: + def check_url(self, url: str) -> Optional[IndicatorMatch]: """Check if a given URL matches any of the provided domain indicators. :param url: URL to match against domain indicators @@ -375,21 +390,16 @@ class Indicators: :returns: Indicator details if matched, otherwise None """ - if not url: - return None - if not isinstance(url, str): + if not url or not isinstance(url, str): return None # Check the URL first for ioc in self.get_iocs("urls"): - if ioc["value"] == url: - self.log.warning( - 'Found a known suspicious URL %s matching indicator "%s" from "%s"', - url, - ioc["value"], - ioc["name"], + if ioc.value == url: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious URL {url} matching indicator "{ioc.value}" from "{ioc.name}"', ) - return ioc # Then check the domain # Create an Aho-Corasick automaton from the list of urls @@ -426,71 +436,41 @@ class Indicators: except Exception: # If URL parsing failed, we just try to do a simple substring # match. - for idx, ioc in domain_matcher.iter(url): - if ioc["value"].lower() in url: - self.log.warning( - "Maybe found a known suspicious domain %s " - 'matching indicator "%s" from "%s"', - url, - ioc["value"], - ioc["name"], + for _, ioc in domain_matcher.iter(url): + if ioc.value.lower() in url: + return IndicatorMatch( + ioc=ioc, + message=f'Maybe found a known suspicious domain {url} matching indicator "{ioc.value}" from "{ioc.name}"', ) - return ioc # If nothing matched, we can quit here. return None # If all parsing worked, we start walking through available domain # indicators. - for idx, ioc in domain_matcher.iter(final_url.domain.lower()): + for _, ioc in domain_matcher.iter(final_url.domain.lower()): # First we check the full domain. - if final_url.domain.lower() == ioc["value"]: + if final_url.domain.lower() == ioc.value: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning( - "Found a known suspicious domain %s " - 'shortened as %s matching indicator "%s" from "%s"', - final_url.url, - orig_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a known suspicious domain {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' else: - self.log.warning( - "Found a known suspicious domain %s " - 'matching indicator "%s" from "%s"', - final_url.url, - ioc["value"], - ioc["name"], - ) - return ioc + message = f'Found a known suspicious domain {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' + + return IndicatorMatch(ioc=ioc, message=message) # Then we just check the top level domain. - for idx, ioc in domain_matcher.iter(final_url.top_level.lower()): - if final_url.top_level.lower() == ioc["value"]: + for _, ioc in domain_matcher.iter(final_url.top_level.lower()): + if final_url.top_level.lower() == ioc.value: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning( - "Found a sub-domain with suspicious top " - "level %s shortened as %s matching " - 'indicator "%s" from "%s"', - final_url.url, - orig_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a sub-domain with suspicious top level {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' else: - self.log.warning( - "Found a sub-domain with a suspicious top " - 'level %s matching indicator "%s" from "%s"', - final_url.url, - ioc["value"], - ioc["name"], - ) + message = f'Found a sub-domain with a suspicious top level {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"' - return ioc + return IndicatorMatch(ioc=ioc, message=message) return None - def check_urls(self, urls: list) -> Union[dict, None]: + def check_urls(self, urls: list) -> Optional[IndicatorMatch]: """Check a list of URLs against the provided list of domain indicators. :param urls: List of URLs to check against domain indicators @@ -508,7 +488,7 @@ class Indicators: return None - def check_process(self, process: str) -> Union[dict, None]: + def check_process(self, process: str) -> Optional[IndicatorMatch]: """Check the provided process name against the list of process indicators. @@ -522,28 +502,22 @@ class Indicators: proc_name = os.path.basename(process) for ioc in self.get_iocs("processes"): - if proc_name == ioc["value"]: - self.log.warning( - 'Found a known suspicious process name "%s" ' - 'matching indicators from "%s"', - process, - ioc["name"], + if proc_name == ioc.value: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious process name "{process}" matching indicators from "{ioc.name}"', ) - return ioc if len(proc_name) == 16: - if ioc["value"].startswith(proc_name): - self.log.warning( - "Found a truncated known suspicious " - 'process name "%s" matching indicators from "%s"', - process, - ioc["name"], + if ioc.value.startswith(proc_name): + return IndicatorMatch( + ioc=ioc, + message=f'Found a truncated known suspicious process name "{process}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_processes(self, processes: list) -> Union[dict, None]: + def check_processes(self, processes: list) -> Optional[IndicatorMatch]: """Check the provided list of processes against the list of process indicators. @@ -562,7 +536,7 @@ class Indicators: return None - def check_email(self, email: str) -> Union[dict, None]: + def check_email(self, email: str) -> Optional[IndicatorMatch]: """Check the provided email against the list of email indicators. :param email: Email address to check against email indicators @@ -574,18 +548,15 @@ class Indicators: return None for ioc in self.get_iocs("emails"): - if email.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious email address "%s" ' - 'matching indicators from "%s"', - email, - ioc["name"], + if email.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious email address "{email}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_name(self, file_name: str) -> Union[dict, None]: + def check_file_name(self, file_name: str) -> Optional[IndicatorMatch]: """Check the provided file name against the list of file indicators. :param file_name: File name to check against file @@ -598,18 +569,15 @@ class Indicators: return None for ioc in self.get_iocs("file_names"): - if ioc["value"] == file_name: - self.log.warning( - 'Found a known suspicious file name "%s" ' - 'matching indicators from "%s"', - file_name, - ioc["name"], + if ioc.value == file_name: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file name "{file_name}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_path(self, file_path: str) -> Union[dict, None]: + def check_file_path(self, file_path: str) -> Optional[IndicatorMatch]: """Check the provided file path against the list of file indicators (both path and name). @@ -629,18 +597,15 @@ class Indicators: for ioc in self.get_iocs("file_paths"): # Strip any trailing slash from indicator paths to match # directories. - if file_path.startswith(ioc["value"].rstrip("/")): - self.log.warning( - 'Found a known suspicious file path "%s" ' - 'matching indicators form "%s"', - file_path, - ioc["name"], + if file_path.startswith(ioc.value.rstrip("/")): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file path "{file_path}" matching indicators form "{ioc.name}"', ) - return ioc return None - def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]: + def check_file_path_process(self, file_path: str) -> Optional[IndicatorMatch]: """Check the provided file path contains a process name from the list of indicators @@ -655,18 +620,15 @@ class Indicators: for ioc in self.get_iocs("processes"): parts = file_path.split("/") - if ioc["value"] in parts: - self.log.warning( - "Found known suspicious process name mentioned in file at " - 'path "%s" matching indicators from "%s"', - file_path, - ioc["name"], + if ioc.value in parts: + return IndicatorMatch( + ioc=ioc, + message=f'Found known suspicious process name mentioned in file at path "{file_path}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_profile(self, profile_uuid: str) -> Union[dict, None]: + def check_profile(self, profile_uuid: str) -> Optional[IndicatorMatch]: """Check the provided configuration profile UUID against the list of indicators. @@ -680,18 +642,15 @@ class Indicators: return None for ioc in self.get_iocs("ios_profile_ids"): - if profile_uuid in ioc["value"]: - self.log.warning( - 'Found a known suspicious profile ID "%s" ' - 'matching indicators from "%s"', - profile_uuid, - ioc["name"], + if profile_uuid in ioc.value: + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious profile ID "{profile_uuid}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_file_hash(self, file_hash: str) -> Union[dict, None]: + def check_file_hash(self, file_hash: str) -> Optional[IndicatorMatch]: """Check the provided file hash against the list of indicators. :param file_hash: hash to check @@ -710,18 +669,15 @@ class Indicators: hash_type = "sha256" for ioc in self.get_iocs("files_" + hash_type): - if file_hash.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious file with hash "%s" ' - 'matching indicators from "%s"', - file_hash, - ioc["name"], + if file_hash.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious file with hash "{file_hash}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_app_certificate_hash(self, cert_hash: str) -> Union[dict, None]: + def check_app_certificate_hash(self, cert_hash: str) -> Optional[IndicatorMatch]: """Check the provided cert hash against the list of indicators. :param cert_hash: hash to check @@ -733,18 +689,15 @@ class Indicators: return None for ioc in self.get_iocs("app_cert_hashes"): - if cert_hash.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious app certfificate with hash "%s" ' - 'matching indicators from "%s"', - cert_hash, - ioc["name"], + if cert_hash.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious app certfificate with hash "{cert_hash}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_app_id(self, app_id: str) -> Union[dict, None]: + def check_app_id(self, app_id: str) -> Optional[IndicatorMatch]: """Check the provided app identifier (typically an Android package name) against the list of indicators. @@ -757,18 +710,17 @@ class Indicators: return None for ioc in self.get_iocs("app_ids"): - if app_id.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious app with ID "%s" ' - 'matching indicators from "%s"', - app_id, - ioc["name"], + if app_id.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious app with ID "{app_id}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_android_property_name(self, property_name: str) -> Optional[dict]: + def check_android_property_name( + self, property_name: str + ) -> Optional[IndicatorMatch]: """Check the android property name against the list of indicators. :param property_name: Name of the Android property @@ -780,24 +732,21 @@ class Indicators: return None for ioc in self.get_iocs("android_property_names"): - if property_name.lower() == ioc["value"].lower(): - self.log.warning( - 'Found a known suspicious Android property "%s" ' - 'matching indicators from "%s"', - property_name, - ioc["name"], + if property_name.lower() == ioc.value.lower(): + return IndicatorMatch( + ioc=ioc, + message=f'Found a known suspicious Android property "{property_name}" matching indicators from "{ioc.name}"', ) - return ioc return None - def check_domain(self, url: str) -> Union[dict, None]: + def check_domain(self, url: str) -> Optional[IndicatorMatch]: """ Renamed check_url now, kept for compatibility """ return self.check_url(url) - def check_domains(self, urls: list) -> Union[dict, None]: + def check_domains(self, urls: list) -> Optional[IndicatorMatch]: """ Renamed check_domains, kept for compatibility """ diff --git a/src/mvt/common/log.py b/src/mvt/common/log.py new file mode 100644 index 0000000..498b13d --- /dev/null +++ b/src/mvt/common/log.py @@ -0,0 +1,65 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +from rich.console import Console +from rich.logging import RichHandler +from typing import Optional + +INFO = logging.INFO +DEBUG = logging.DEBUG +ERROR = logging.ERROR +FATAL = logging.CRITICAL +WARNING = logging.WARNING + +INFO_ALERT = 25 +LOW_ALERT = 35 +MEDIUM_ALERT = 45 +HIGH_ALERT = 55 +CRITICAL_ALERT = 65 + +logging.addLevelName(INFO_ALERT, "INFO") +logging.addLevelName(LOW_ALERT, "LOW") +logging.addLevelName(MEDIUM_ALERT, "MEDIUM") +logging.addLevelName(HIGH_ALERT, "HIGH") +logging.addLevelName(CRITICAL_ALERT, "CRITICAL") + + +class MVTLogHandler(RichHandler): + def __init__(self, console: Optional[Console] = None, level: int = logging.DEBUG): + super().__init__(console=console, level=level) + + def __add_prefix_space(self, level: str) -> str: + max_length = len("CRITICAL ALERT") + space = max_length - len(level) + return f"{level}{' ' * space}" + + def emit(self, record: logging.LogRecord): + try: + msg = rf"[grey50]\[{record.name}][/] {self.format(record)}" + + if record.levelno == ERROR: + msg = f"[bold red]{self.__add_prefix_space('ERROR')}[/bold red] {msg}" + elif record.levelno == FATAL: + msg = f"[bold red]{self.__add_prefix_space('FATAL')}[/bold red] {msg}" + elif record.levelno == WARNING: + msg = f"[yellow]{self.__add_prefix_space('WARNING')}[/yellow] {msg}" + elif record.levelno == INFO_ALERT: + msg = f"[blue]{self.__add_prefix_space('INFO ALERT')}[/blue] {msg}" + elif record.levelno == LOW_ALERT: + msg = f"[yellow]{self.__add_prefix_space('LOW ALERT')}[/yellow] {msg}" + elif record.levelno == MEDIUM_ALERT: + msg = f"[sandy_brown]{self.__add_prefix_space('MEDIUM ALERT')}[/sandy_brown] {msg}" + elif record.levelno == HIGH_ALERT: + msg = f"[red]{self.__add_prefix_space('HIGH ALERT')}[/red] {msg}" + elif record.levelno == CRITICAL_ALERT: + msg = f"[bold red]{self.__add_prefix_space('CRITICAL ALERT')}[/bold red] {msg}" + else: + msg = f"{self.__add_prefix_space('')} {msg}" + + self.console.print(msg) + + except Exception: + self.handleError(record) diff --git a/src/mvt/common/module.py b/src/mvt/common/module.py index 1468cf4..e57a9fc 100644 --- a/src/mvt/common/module.py +++ b/src/mvt/common/module.py @@ -8,9 +8,18 @@ import json import logging import os import re -from typing import Any, Dict, List, Optional, Union +from dataclasses import asdict, is_dataclass +from typing import Any, Dict, Optional from .utils import CustomJSONEncoder, exec_or_profile +from .indicators import Indicators +from .alerts import AlertStore +from .module_types import ( + ModuleResults, + ModuleTimeline, + ModuleSerializedResult, + ModuleAtomicResult, +) class DatabaseNotFoundError(Exception): @@ -28,7 +37,7 @@ class InsufficientPrivileges(Exception): class MVTModule: """This class provides a base for all extraction modules.""" - enabled = True + enabled: bool = True slug: Optional[str] = None def __init__( @@ -38,7 +47,7 @@ class MVTModule: results_path: Optional[str] = None, module_options: Optional[Dict[str, Any]] = None, log: logging.Logger = logging.getLogger(__name__), - results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None, + results: ModuleResults = [], ) -> None: """Initialize module. @@ -46,7 +55,7 @@ class MVTModule: :type file_path: str :param target_path: Path to the target folder (backup or filesystem dump) - :type file_path: str + :type target_path: str :param results_path: Folder where results will be stored :type results_path: str :param fast_mode: Flag to enable or disable slow modules @@ -55,16 +64,21 @@ class MVTModule: :param results: Provided list of results entries :type results: list """ - self.file_path = file_path - self.target_path = target_path - self.results_path = results_path - self.module_options = module_options if module_options else {} + self.file_path: Optional[str] = file_path + self.target_path: Optional[str] = target_path + self.results_path: Optional[str] = results_path + self.module_options: Optional[Dict[str, Any]] = ( + module_options if module_options else {} + ) + self.log = log - self.indicators = None - self.results = results if results else [] - self.detected: List[Dict[str, Any]] = [] - self.timeline: List[Dict[str, str]] = [] - self.timeline_detected: List[Dict[str, str]] = [] + self.indicators: Optional[Indicators] = None + self.alertstore: AlertStore = AlertStore(log=log) + + self.results: ModuleResults = results if results else [] + self.detected: ModuleResults = [] + self.timeline: ModuleTimeline = [] + self.timeline_detected: ModuleTimeline = [] @classmethod def from_json(cls, json_path: str, log: logging.Logger): @@ -72,11 +86,11 @@ class MVTModule: results = json.load(handle) if log: log.info('Loaded %d results from "%s"', len(results), json_path) + return cls(results=results, log=log) @classmethod def get_slug(cls) -> str: - """Use the module's class name to retrieve a slug""" if cls.slug: return cls.slug @@ -84,26 +98,26 @@ class MVTModule: return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower() def check_indicators(self) -> None: - """Check the results of this module against a provided list of - indicators. - - - """ raise NotImplementedError def save_to_json(self) -> None: - """Save the collected results to a json file.""" if not self.results_path: return name = self.get_slug() if self.results: + converted_results = [ + asdict(result) if is_dataclass(result) else result + for result in self.results + ] results_file_name = f"{name}.json" results_json_path = os.path.join(self.results_path, results_file_name) with open(results_json_path, "w", encoding="utf-8") as handle: try: - json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder) + json.dump( + converted_results, handle, indent=4, cls=CustomJSONEncoder + ) except Exception as exc: self.log.error( "Unable to store results of module %s to file %s: %s", @@ -118,7 +132,7 @@ class MVTModule: with open(detected_json_path, "w", encoding="utf-8") as handle: json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder) - def serialize(self, record: dict) -> Union[dict, list, None]: + def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult: raise NotImplementedError @staticmethod @@ -130,13 +144,21 @@ class MVTModule: """ timeline_set = set() for record in timeline: - timeline_set.add(json.dumps(record, sort_keys=True)) + timeline_set.add( + json.dumps( + asdict(record) if is_dataclass(record) else record, sort_keys=True + ) + ) + return [json.loads(record) for record in timeline_set] def to_timeline(self) -> None: """Convert results into a timeline.""" + if not self.results: + return + for result in self.results: - record = self.serialize(result) + record: ModuleSerializedResult = self.serialize(result) if record: if isinstance(record, list): self.timeline.extend(record) diff --git a/src/mvt/common/module_types.py b/src/mvt/common/module_types.py new file mode 100644 index 0000000..f433482 --- /dev/null +++ b/src/mvt/common/module_types.py @@ -0,0 +1,29 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2025 The MVT Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from .indicators import Indicator +from dataclasses import dataclass +from typing import List, Union, Optional + + +@dataclass +class ModuleAtomicResult: + timestamp: Optional[str] + matched_indicator: Optional[Indicator] + + +ModuleResults = List[ModuleAtomicResult] + + +@dataclass +class ModuleAtomicTimeline: + timestamp: str + module: str + event: str + data: str + + +ModuleTimeline = List[ModuleAtomicTimeline] +ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline] diff --git a/src/mvt/common/utils.py b/src/mvt/common/utils.py index 3d054f5..500fd85 100644 --- a/src/mvt/common/utils.py +++ b/src/mvt/common/utils.py @@ -12,7 +12,7 @@ import os import re from typing import Any, Iterator, Union -from rich.logging import RichHandler +from .log import MVTLogHandler from mvt.common.config import settings @@ -234,11 +234,10 @@ def init_logging(verbose: bool = False): """ Initialise logging for the MVT module """ - # Setup logging using Rich. log = logging.getLogger("mvt") - log.setLevel(logging.DEBUG) - consoleHandler = RichHandler(show_path=False, log_time_format="%X") - consoleHandler.setFormatter(logging.Formatter("[%(name)s] %(message)s")) + log.setLevel(logging.INFO) + consoleHandler = MVTLogHandler() + consoleHandler.setFormatter(logging.Formatter("%(message)s")) if verbose: consoleHandler.setLevel(logging.DEBUG) else: diff --git a/src/mvt/ios/cli.py b/src/mvt/ios/cli.py index 1d06c96..6ffd106 100644 --- a/src/mvt/ios/cli.py +++ b/src/mvt/ios/cli.py @@ -228,11 +228,8 @@ def check_backup( log.info("Checking iTunes backup located at: %s", backup_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the backup produced %d detections!", cmd.detected_count - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -275,12 +272,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum log.info("Checking iOS filesystem located at: %s", dump_path) cmd.run() - - if cmd.detected_count > 0: - log.warning( - "The analysis of the iOS filesystem produced %d detections!", - cmd.detected_count, - ) + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== @@ -308,6 +301,8 @@ def check_iocs(ctx, iocs, list_modules, module, folder): return cmd.run() + cmd.show_alerts_brief() + cmd.show_support_message() # ============================================================================== diff --git a/tests/android_androidqf/test_packages.py b/tests/android_androidqf/test_packages.py index 966d8a6..fe6332a 100644 --- a/tests/android_androidqf/test_packages.py +++ b/tests/android_androidqf/test_packages.py @@ -91,7 +91,8 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.blah" assert ( - possible_detected_app[0]["matched_indicator"]["value"] == "com.malware.blah" + possible_detected_app[0]["matched_indicator"].ioc.value + == "com.malware.blah" ) def test_packages_ioc_sha256(self, module, indicators_factory): @@ -109,7 +110,7 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.muahaha" assert ( - possible_detected_app[0]["matched_indicator"]["value"] + possible_detected_app[0]["matched_indicator"].ioc.value == "31037a27af59d4914906c01ad14a318eee2f3e31d48da8954dca62a99174e3fa" ) @@ -128,6 +129,6 @@ class TestAndroidqfPackages: assert len(possible_detected_app) == 1 assert possible_detected_app[0]["name"] == "com.malware.muahaha" assert ( - possible_detected_app[0]["matched_indicator"]["value"] + possible_detected_app[0]["matched_indicator"].ioc.value == "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730" )