Major refactor to add structured alerting and typed indicators

This commit makes a structural change to MVT by changing binary
detected/not detected logic into a structured multi-level system
of alerts. This gives far more power to extend MVT and manage
alerts.

This commit also begins the process of adding proper typing for
key objects used in MVT including Indicators, IndicatorMatches,
and ModuleResults. This will also be keep to programmatically using
the output of MVT.
This commit is contained in:
Donncha Ó Cearbhaill
2025-02-16 00:10:44 +01:00
parent 6bac787cb5
commit 1b03002a00
11 changed files with 544 additions and 263 deletions
+11 -19
View File
@@ -37,6 +37,7 @@ from .cmd_check_bugreport import CmdAndroidCheckBugreport
from .modules.backup import BACKUP_MODULES
from .modules.backup.helpers import cli_load_android_backup_password
from .modules.bugreport import BUGREPORT_MODULES
from .modules.androidqf import ANDROIDQF_MODULES
init_logging()
log = logging.getLogger("mvt")
@@ -109,12 +110,8 @@ def check_bugreport(ctx, iocs, output, list_modules, module, verbose, bugreport_
log.info("Checking Android bug report at path: %s", bugreport_path)
cmd.run()
if cmd.detected_count > 0:
log.warning(
"The analysis of the Android bug report produced %d detections!",
cmd.detected_count,
)
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
@@ -171,12 +168,8 @@ def check_backup(
log.info("Checking Android backup at path: %s", backup_path)
cmd.run()
if cmd.detected_count > 0:
log.warning(
"The analysis of the Android backup produced %d detections!",
cmd.detected_count,
)
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
@@ -235,12 +228,9 @@ def check_androidqf(
log.info("Checking AndroidQF acquisition at path: %s", androidqf_path)
cmd.run()
if cmd.detected_count > 0:
log.warning(
"The analysis of the AndroidQF acquisition produced %d detections!",
cmd.detected_count,
)
cmd.show_alerts_brief()
cmd.show_disable_adb_warning()
cmd.show_support_message()
# ==============================================================================
@@ -261,13 +251,15 @@ def check_androidqf(
@click.pass_context
def check_iocs(ctx, iocs, list_modules, module, folder):
cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module)
cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES
cmd.modules = BACKUP_MODULES + BUGREPORT_MODULES + ANDROIDQF_MODULES
if list_modules:
cmd.list_modules()
return
cmd.run()
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
+5 -4
View File
@@ -67,6 +67,9 @@ class CmdAndroidCheckAndroidQF(Command):
self.__files: List[str] = []
def init(self):
if not self.target_path:
raise NoAndroidQFTargetPath
if os.path.isdir(self.target_path):
self.__format = "dir"
parent_path = Path(self.target_path).absolute().parent.as_posix()
@@ -154,9 +157,8 @@ class CmdAndroidCheckAndroidQF(Command):
cmd.from_zip(bugreport)
cmd.run()
self.detected_count += cmd.detected_count
self.timeline.extend(cmd.timeline)
self.timeline_detected.extend(cmd.timeline_detected)
self.alertstore.extend(cmd.alertstore.alerts)
def run_backup_cmd(self) -> bool:
try:
@@ -179,9 +181,8 @@ class CmdAndroidCheckAndroidQF(Command):
cmd.from_ab(backup)
cmd.run()
self.detected_count += cmd.detected_count
self.timeline.extend(cmd.timeline)
self.timeline_detected.extend(cmd.timeline_detected)
self.alertstore.extend(cmd.alertstore.alerts)
def finish(self) -> None:
"""
+181
View File
@@ -0,0 +1,181 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2025 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import csv
import logging
from enum import Enum
from dataclasses import dataclass, asdict
from typing import List, Dict, Any, Optional
from .log import INFO_ALERT, LOW_ALERT, HIGH_ALERT, CRITICAL_ALERT, MEDIUM_ALERT
from .module_types import ModuleAtomicResult
class AlertLevel(Enum):
INFORMATIONAL = 0
LOW = 10
MEDIUM = 20
HIGH = 30
CRITICAL = 40
@dataclass
class Alert:
level: AlertLevel
module: str
message: str
event_time: str
event: ModuleAtomicResult
class AlertStore:
def __init__(self, log: Optional[logging.Logger] = None) -> None:
self.__alerts: List[Alert] = []
self.__log = log
@property
def alerts(self) -> List[Alert]:
return self.__alerts
def add(self, alert: Alert) -> None:
self.__alerts.append(alert)
def extend(self, alerts: List[Alert]) -> None:
self.__alerts.extend(alerts)
def info(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
self.add(
Alert(
level=AlertLevel.INFORMATIONAL,
module=module,
message=message,
event_time=event_time,
event=event,
)
)
def low(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
self.add(
Alert(
level=AlertLevel.LOW,
module=module,
message=message,
event_time=event_time,
event=event,
)
)
def medium(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
self.add(
Alert(
level=AlertLevel.MEDIUM,
module=module,
message=message,
event_time=event_time,
event=event,
)
)
def high(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
self.add(
Alert(
level=AlertLevel.HIGH,
module=module,
message=message,
event_time=event_time,
event=event,
)
)
def critical(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
self.add(
Alert(
level=AlertLevel.CRITICAL,
module=module,
message=message,
event_time=event_time,
event=event,
)
)
def log(self, alert: Alert) -> None:
if not self.__log:
return
if not alert.message:
return
if alert.level == AlertLevel.INFORMATIONAL:
self.__log.log(INFO_ALERT, alert.message)
elif alert.level == AlertLevel.LOW:
self.__log.log(LOW_ALERT, alert.message)
elif alert.level == AlertLevel.MEDIUM:
self.__log.log(MEDIUM_ALERT, alert.message)
elif alert.level == AlertLevel.HIGH:
self.__log.log(HIGH_ALERT, alert.message)
elif alert.level == AlertLevel.CRITICAL:
self.__log.log(CRITICAL_ALERT, alert.message)
def log_latest(self) -> None:
self.log(self.__alerts[-1])
def count(self, level: AlertLevel) -> int:
count = 0
for alert in self.__alerts:
if alert.level == level:
count += 1
return count
def as_json(self) -> List[Dict[str, Any]]:
alerts = []
for alert in self.__alerts:
alert_dict = asdict(alert)
# This is required because an Enum is not JSON serializable.
alert_dict["level"] = alert.level.name
alerts.append(alert_dict)
return alerts
def save_timeline(self, timeline_path: str) -> None:
with open(timeline_path, "a+", encoding="utf-8") as handle:
csvoutput = csv.writer(
handle,
delimiter=",",
quotechar='"',
quoting=csv.QUOTE_ALL,
escapechar="\\",
)
csvoutput.writerow(["Event Time", "Module", "Message", "Event"])
timed_alerts = []
for alert in self.alerts:
if not alert.event_time:
continue
timed_alerts.append(asdict(alert))
for event in sorted(
timed_alerts,
key=lambda x: x["event_time"] if x["event_time"] is not None else "",
):
csvoutput.writerow(
[
event.get("event_time"),
event.get("module"),
event.get("message"),
event.get("event"),
]
)
+85 -38
View File
@@ -9,16 +9,20 @@ import os
import sys
from datetime import datetime
from typing import Optional
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from mvt.common.indicators import Indicators
from mvt.common.module import MVTModule, run_module, save_timeline
from mvt.common.utils import (
from .indicators import Indicators
from .module import MVTModule, run_module, save_timeline
from .utils import (
convert_datetime_to_iso,
generate_hashes_from_path,
get_sha256_from_file_path,
)
from mvt.common.config import settings
from mvt.common.version import MVT_VERSION
from .config import settings
from .alerts import AlertStore, AlertLevel
from .version import MVT_VERSION
class Command:
@@ -70,12 +74,14 @@ class Command:
self.iocs = Indicators(self.log)
self.iocs.load_indicators_files(self.ioc_files)
self.alertstore = AlertStore()
def _create_storage(self) -> None:
if self.results_path and not os.path.exists(self.results_path):
try:
os.makedirs(self.results_path)
except Exception as exc:
self.log.critical(
self.log.fatal(
"Unable to create output folder %s: %s", self.results_path, exc
)
sys.exit(1)
@@ -94,14 +100,14 @@ class Command:
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
# MVT can be run in a loop
# Old file handlers stick around in subsequent loops
# Remove any existing logging.FileHandler instances
# MVT can be run in a loop.
# Old file handlers stick around in subsequent loops.
# Remove any existing logging.FileHandler instances.
for handler in logger.handlers:
if isinstance(handler, logging.FileHandler):
logger.removeHandler(handler)
# And finally add the new one
# And finally add the new one.
logger.addHandler(file_handler)
def _store_timeline(self) -> None:
@@ -122,12 +128,24 @@ class Command:
is_utc=is_utc,
)
if len(self.timeline_detected) > 0:
save_timeline(
self.timeline_detected,
os.path.join(self.results_path, "timeline_detected.csv"),
is_utc=is_utc,
)
def _store_alerts(self) -> None:
if not self.results_path:
return
alerts = self.alertstore.as_json()
if not alerts:
return
alerts_path = os.path.join(self.results_path, "alerts.json")
with open(alerts_path, "w+", encoding="utf-8") as handle:
json.dump(alerts, handle, indent=4)
def _store_alerts_timeline(self) -> None:
if not self.results_path:
return
alerts_timeline_path = os.path.join(self.results_path, "alerts_timeline.csv")
self.alertstore.save_timeline(alerts_timeline_path)
def _store_info(self) -> None:
if not self.results_path:
@@ -187,26 +205,54 @@ class Command:
def finish(self) -> None:
raise NotImplementedError
def _show_disable_adb_warning(self) -> None:
"""Warn if ADB is enabled"""
if type(self).__name__ in ["CmdAndroidCheckADB", "CmdAndroidCheckAndroidQF"]:
self.log.info(
"Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. "
"ADB is a powerful tool which can allow unauthorized access to the device."
)
def show_alerts_brief(self) -> None:
console = Console()
message = Text()
for i, level in enumerate(AlertLevel):
message.append(
f"MVT produced {self.alertstore.count(level)} {level.name} alerts."
)
if i < len(AlertLevel) - 1:
message.append("\n")
panel = Panel(
message, title="ALERTS", style="sandy_brown", border_style="sandy_brown"
)
console.print("")
console.print(panel)
def show_disable_adb_warning(self) -> None:
console = Console()
message = Text(
"Please disable Developer Options and ADB (Android Debug Bridge) on the device once finished with the acquisition. "
"ADB is a powerful tool which can allow unauthorized access to the device."
)
panel = Panel(message, title="NOTE", style="yellow", border_style="yellow")
console.print("")
console.print(panel)
def show_support_message(self) -> None:
console = Console()
message = Text()
def _show_support_message(self) -> None:
support_message = "Please seek reputable expert help if you have serious concerns about a possible spyware attack. Such support is available to human rights defenders and civil society through Amnesty International's Security Lab at https://securitylab.amnesty.org/get-help/?c=mvt"
if self.detected_count == 0:
self.log.info(
f"[bold]NOTE:[/bold] Using MVT with public indicators of compromise (IOCs) [bold]WILL NOT[/bold] automatically detect advanced attacks.\n\n{support_message}",
extra={"markup": True},
if (
self.alertstore.count(AlertLevel.HIGH) > 0
or self.alertstore.count(AlertLevel.CRITICAL) > 0
):
message.append(
f"MVT produced HIGH or CRITICAL alerts. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}",
)
panel = Panel(message, title="WARNING", style="red", border_style="red")
else:
self.log.warning(
f"[bold]NOTE: Detected indicators of compromise[/bold]. Only expert review can confirm if the detected indicators are signs of an attack.\n\n{support_message}",
extra={"markup": True},
message.append(
f"The lack of severe alerts does not equate to a clean bill of health.\n\n{support_message}",
)
panel = Panel(message, title="NOTE", style="yellow", border_style="yellow")
console.print("")
console.print(panel)
def run(self) -> None:
try:
@@ -218,6 +264,11 @@ class Command:
if self.module_name and module.__name__ != self.module_name:
continue
if not module.enabled and not (
self.module_name and module.__name__ == self.module_name
):
continue
# FIXME: do we need the logger here
module_logger = logging.getLogger(module.__module__)
@@ -243,11 +294,8 @@ class Command:
run_module(m)
self.executed.append(m)
self.detected_count += len(m.detected)
self.timeline.extend(m.timeline)
self.timeline_detected.extend(m.timeline_detected)
self.alertstore.extend(m.alertstore.alerts)
try:
self.finish()
@@ -259,7 +307,6 @@ class Command:
return
self._store_timeline()
self._store_alerts_timeline()
self._store_alerts()
self._store_info()
self._show_disable_adb_warning()
self._show_support_message()
+108 -159
View File
@@ -8,7 +8,8 @@ import json
import logging
import os
from functools import lru_cache
from typing import Any, Dict, Iterator, List, Optional, Union
from typing import Any, Dict, Iterator, List, Optional
from dataclasses import dataclass
import ahocorasick
from appdirs import user_data_dir
@@ -22,6 +23,20 @@ MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
logger = logging.getLogger(__name__)
@dataclass
class Indicator:
value: str
type: str
name: str
stix2_file_name: str
@dataclass
class IndicatorMatch:
ioc: Indicator
message: str
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
functions to compare extracted artifacts to the indicators.
@@ -203,7 +218,7 @@ class Indicators:
try:
data = json.load(handle)
except json.decoder.JSONDecodeError:
self.log.critical(
self.log.warning(
"Unable to parse STIX2 indicator file. "
"The file is corrupted or in the wrong format!"
)
@@ -314,7 +329,7 @@ class Indicators:
if os.path.isfile(file_path):
self.parse_stix2(file_path)
else:
self.log.warning("No indicators file exists at path %s", file_path)
self.log.error("No indicators file exists at path %s", file_path)
# Load downloaded indicators and any indicators from env variable.
if load_default:
@@ -323,15 +338,15 @@ class Indicators:
self._check_stix2_env_variable()
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]:
def get_iocs(self, ioc_type: str) -> Iterator[Indicator]:
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
"value": ioc,
"type": ioc_type,
"name": ioc_collection["name"],
"stix2_file_name": ioc_collection["stix2_file_name"],
}
yield Indicator(
value=ioc,
type=ioc_type,
name=ioc_collection["name"],
stix2_file_name=ioc_collection["stix2_file_name"],
)
@lru_cache()
def get_ioc_matcher(
@@ -362,12 +377,12 @@ class Indicators:
raise ValueError("Must provide either ioc_type or ioc_list")
for ioc in iocs:
automaton.add_word(ioc["value"], ioc)
automaton.add_word(ioc.value, ioc)
automaton.make_automaton()
return automaton
@lru_cache()
def check_url(self, url: str) -> Union[dict, None]:
def check_url(self, url: str) -> Optional[IndicatorMatch]:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
@@ -375,21 +390,16 @@ class Indicators:
:returns: Indicator details if matched, otherwise None
"""
if not url:
return None
if not isinstance(url, str):
if not url or not isinstance(url, str):
return None
# Check the URL first
for ioc in self.get_iocs("urls"):
if ioc["value"] == url:
self.log.warning(
'Found a known suspicious URL %s matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
if ioc.value == url:
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious URL {url} matching indicator "{ioc.value}" from "{ioc.name}"',
)
return ioc
# Then check the domain
# Create an Aho-Corasick automaton from the list of urls
@@ -426,71 +436,41 @@ class Indicators:
except Exception:
# If URL parsing failed, we just try to do a simple substring
# match.
for idx, ioc in domain_matcher.iter(url):
if ioc["value"].lower() in url:
self.log.warning(
"Maybe found a known suspicious domain %s "
'matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
for _, ioc in domain_matcher.iter(url):
if ioc.value.lower() in url:
return IndicatorMatch(
ioc=ioc,
message=f'Maybe found a known suspicious domain {url} matching indicator "{ioc.value}" from "{ioc.name}"',
)
return ioc
# If nothing matched, we can quit here.
return None
# If all parsing worked, we start walking through available domain
# indicators.
for idx, ioc in domain_matcher.iter(final_url.domain.lower()):
for _, ioc in domain_matcher.iter(final_url.domain.lower()):
# First we check the full domain.
if final_url.domain.lower() == ioc["value"]:
if final_url.domain.lower() == ioc.value:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning(
"Found a known suspicious domain %s "
'shortened as %s matching indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
message = f'Found a known suspicious domain {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
else:
self.log.warning(
"Found a known suspicious domain %s "
'matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)
return ioc
message = f'Found a known suspicious domain {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
return IndicatorMatch(ioc=ioc, message=message)
# Then we just check the top level domain.
for idx, ioc in domain_matcher.iter(final_url.top_level.lower()):
if final_url.top_level.lower() == ioc["value"]:
for _, ioc in domain_matcher.iter(final_url.top_level.lower()):
if final_url.top_level.lower() == ioc.value:
if orig_url.is_shortened and orig_url.url != final_url.url:
self.log.warning(
"Found a sub-domain with suspicious top "
"level %s shortened as %s matching "
'indicator "%s" from "%s"',
final_url.url,
orig_url.url,
ioc["value"],
ioc["name"],
)
message = f'Found a sub-domain with suspicious top level {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
else:
self.log.warning(
"Found a sub-domain with a suspicious top "
'level %s matching indicator "%s" from "%s"',
final_url.url,
ioc["value"],
ioc["name"],
)
message = f'Found a sub-domain with a suspicious top level {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
return ioc
return IndicatorMatch(ioc=ioc, message=message)
return None
def check_urls(self, urls: list) -> Union[dict, None]:
def check_urls(self, urls: list) -> Optional[IndicatorMatch]:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
@@ -508,7 +488,7 @@ class Indicators:
return None
def check_process(self, process: str) -> Union[dict, None]:
def check_process(self, process: str) -> Optional[IndicatorMatch]:
"""Check the provided process name against the list of process
indicators.
@@ -522,28 +502,22 @@ class Indicators:
proc_name = os.path.basename(process)
for ioc in self.get_iocs("processes"):
if proc_name == ioc["value"]:
self.log.warning(
'Found a known suspicious process name "%s" '
'matching indicators from "%s"',
process,
ioc["name"],
if proc_name == ioc.value:
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious process name "{process}" matching indicators from "{ioc.name}"',
)
return ioc
if len(proc_name) == 16:
if ioc["value"].startswith(proc_name):
self.log.warning(
"Found a truncated known suspicious "
'process name "%s" matching indicators from "%s"',
process,
ioc["name"],
if ioc.value.startswith(proc_name):
return IndicatorMatch(
ioc=ioc,
message=f'Found a truncated known suspicious process name "{process}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_processes(self, processes: list) -> Union[dict, None]:
def check_processes(self, processes: list) -> Optional[IndicatorMatch]:
"""Check the provided list of processes against the list of
process indicators.
@@ -562,7 +536,7 @@ class Indicators:
return None
def check_email(self, email: str) -> Union[dict, None]:
def check_email(self, email: str) -> Optional[IndicatorMatch]:
"""Check the provided email against the list of email indicators.
:param email: Email address to check against email indicators
@@ -574,18 +548,15 @@ class Indicators:
return None
for ioc in self.get_iocs("emails"):
if email.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious email address "%s" '
'matching indicators from "%s"',
email,
ioc["name"],
if email.lower() == ioc.value.lower():
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious email address "{email}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_file_name(self, file_name: str) -> Union[dict, None]:
def check_file_name(self, file_name: str) -> Optional[IndicatorMatch]:
"""Check the provided file name against the list of file indicators.
:param file_name: File name to check against file
@@ -598,18 +569,15 @@ class Indicators:
return None
for ioc in self.get_iocs("file_names"):
if ioc["value"] == file_name:
self.log.warning(
'Found a known suspicious file name "%s" '
'matching indicators from "%s"',
file_name,
ioc["name"],
if ioc.value == file_name:
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious file name "{file_name}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_file_path(self, file_path: str) -> Union[dict, None]:
def check_file_path(self, file_path: str) -> Optional[IndicatorMatch]:
"""Check the provided file path against the list of file indicators
(both path and name).
@@ -629,18 +597,15 @@ class Indicators:
for ioc in self.get_iocs("file_paths"):
# Strip any trailing slash from indicator paths to match
# directories.
if file_path.startswith(ioc["value"].rstrip("/")):
self.log.warning(
'Found a known suspicious file path "%s" '
'matching indicators form "%s"',
file_path,
ioc["name"],
if file_path.startswith(ioc.value.rstrip("/")):
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious file path "{file_path}" matching indicators form "{ioc.name}"',
)
return ioc
return None
def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]:
def check_file_path_process(self, file_path: str) -> Optional[IndicatorMatch]:
"""Check the provided file path contains a process name from the
list of indicators
@@ -655,18 +620,15 @@ class Indicators:
for ioc in self.get_iocs("processes"):
parts = file_path.split("/")
if ioc["value"] in parts:
self.log.warning(
"Found known suspicious process name mentioned in file at "
'path "%s" matching indicators from "%s"',
file_path,
ioc["name"],
if ioc.value in parts:
return IndicatorMatch(
ioc=ioc,
message=f'Found known suspicious process name mentioned in file at path "{file_path}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_profile(self, profile_uuid: str) -> Union[dict, None]:
def check_profile(self, profile_uuid: str) -> Optional[IndicatorMatch]:
"""Check the provided configuration profile UUID against the list of
indicators.
@@ -680,18 +642,15 @@ class Indicators:
return None
for ioc in self.get_iocs("ios_profile_ids"):
if profile_uuid in ioc["value"]:
self.log.warning(
'Found a known suspicious profile ID "%s" '
'matching indicators from "%s"',
profile_uuid,
ioc["name"],
if profile_uuid in ioc.value:
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious profile ID "{profile_uuid}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_file_hash(self, file_hash: str) -> Union[dict, None]:
def check_file_hash(self, file_hash: str) -> Optional[IndicatorMatch]:
"""Check the provided file hash against the list of indicators.
:param file_hash: hash to check
@@ -710,18 +669,15 @@ class Indicators:
hash_type = "sha256"
for ioc in self.get_iocs("files_" + hash_type):
if file_hash.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious file with hash "%s" '
'matching indicators from "%s"',
file_hash,
ioc["name"],
if file_hash.lower() == ioc.value.lower():
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious file with hash "{file_hash}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_app_certificate_hash(self, cert_hash: str) -> Union[dict, None]:
def check_app_certificate_hash(self, cert_hash: str) -> Optional[IndicatorMatch]:
"""Check the provided cert hash against the list of indicators.
:param cert_hash: hash to check
@@ -733,18 +689,15 @@ class Indicators:
return None
for ioc in self.get_iocs("app_cert_hashes"):
if cert_hash.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious app certfificate with hash "%s" '
'matching indicators from "%s"',
cert_hash,
ioc["name"],
if cert_hash.lower() == ioc.value.lower():
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious app certfificate with hash "{cert_hash}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_app_id(self, app_id: str) -> Union[dict, None]:
def check_app_id(self, app_id: str) -> Optional[IndicatorMatch]:
"""Check the provided app identifier (typically an Android package name)
against the list of indicators.
@@ -757,18 +710,17 @@ class Indicators:
return None
for ioc in self.get_iocs("app_ids"):
if app_id.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious app with ID "%s" '
'matching indicators from "%s"',
app_id,
ioc["name"],
if app_id.lower() == ioc.value.lower():
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious app with ID "{app_id}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_android_property_name(self, property_name: str) -> Optional[dict]:
def check_android_property_name(
self, property_name: str
) -> Optional[IndicatorMatch]:
"""Check the android property name against the list of indicators.
:param property_name: Name of the Android property
@@ -780,24 +732,21 @@ class Indicators:
return None
for ioc in self.get_iocs("android_property_names"):
if property_name.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious Android property "%s" '
'matching indicators from "%s"',
property_name,
ioc["name"],
if property_name.lower() == ioc.value.lower():
return IndicatorMatch(
ioc=ioc,
message=f'Found a known suspicious Android property "{property_name}" matching indicators from "{ioc.name}"',
)
return ioc
return None
def check_domain(self, url: str) -> Union[dict, None]:
def check_domain(self, url: str) -> Optional[IndicatorMatch]:
"""
Renamed check_url now, kept for compatibility
"""
return self.check_url(url)
def check_domains(self, urls: list) -> Union[dict, None]:
def check_domains(self, urls: list) -> Optional[IndicatorMatch]:
"""
Renamed check_domains, kept for compatibility
"""
+65
View File
@@ -0,0 +1,65 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2025 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
import logging
from rich.console import Console
from rich.logging import RichHandler
from typing import Optional
INFO = logging.INFO
DEBUG = logging.DEBUG
ERROR = logging.ERROR
FATAL = logging.CRITICAL
WARNING = logging.WARNING
INFO_ALERT = 25
LOW_ALERT = 35
MEDIUM_ALERT = 45
HIGH_ALERT = 55
CRITICAL_ALERT = 65
logging.addLevelName(INFO_ALERT, "INFO")
logging.addLevelName(LOW_ALERT, "LOW")
logging.addLevelName(MEDIUM_ALERT, "MEDIUM")
logging.addLevelName(HIGH_ALERT, "HIGH")
logging.addLevelName(CRITICAL_ALERT, "CRITICAL")
class MVTLogHandler(RichHandler):
def __init__(self, console: Optional[Console] = None, level: int = logging.DEBUG):
super().__init__(console=console, level=level)
def __add_prefix_space(self, level: str) -> str:
max_length = len("CRITICAL ALERT")
space = max_length - len(level)
return f"{level}{' ' * space}"
def emit(self, record: logging.LogRecord):
try:
msg = rf"[grey50]\[{record.name}][/] {self.format(record)}"
if record.levelno == ERROR:
msg = f"[bold red]{self.__add_prefix_space('ERROR')}[/bold red] {msg}"
elif record.levelno == FATAL:
msg = f"[bold red]{self.__add_prefix_space('FATAL')}[/bold red] {msg}"
elif record.levelno == WARNING:
msg = f"[yellow]{self.__add_prefix_space('WARNING')}[/yellow] {msg}"
elif record.levelno == INFO_ALERT:
msg = f"[blue]{self.__add_prefix_space('INFO ALERT')}[/blue] {msg}"
elif record.levelno == LOW_ALERT:
msg = f"[yellow]{self.__add_prefix_space('LOW ALERT')}[/yellow] {msg}"
elif record.levelno == MEDIUM_ALERT:
msg = f"[sandy_brown]{self.__add_prefix_space('MEDIUM ALERT')}[/sandy_brown] {msg}"
elif record.levelno == HIGH_ALERT:
msg = f"[red]{self.__add_prefix_space('HIGH ALERT')}[/red] {msg}"
elif record.levelno == CRITICAL_ALERT:
msg = f"[bold red]{self.__add_prefix_space('CRITICAL ALERT')}[/bold red] {msg}"
else:
msg = f"{self.__add_prefix_space('')} {msg}"
self.console.print(msg)
except Exception:
self.handleError(record)
+46 -24
View File
@@ -8,9 +8,18 @@ import json
import logging
import os
import re
from typing import Any, Dict, List, Optional, Union
from dataclasses import asdict, is_dataclass
from typing import Any, Dict, Optional
from .utils import CustomJSONEncoder, exec_or_profile
from .indicators import Indicators
from .alerts import AlertStore
from .module_types import (
ModuleResults,
ModuleTimeline,
ModuleSerializedResult,
ModuleAtomicResult,
)
class DatabaseNotFoundError(Exception):
@@ -28,7 +37,7 @@ class InsufficientPrivileges(Exception):
class MVTModule:
"""This class provides a base for all extraction modules."""
enabled = True
enabled: bool = True
slug: Optional[str] = None
def __init__(
@@ -38,7 +47,7 @@ class MVTModule:
results_path: Optional[str] = None,
module_options: Optional[Dict[str, Any]] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
results: ModuleResults = [],
) -> None:
"""Initialize module.
@@ -46,7 +55,7 @@ class MVTModule:
:type file_path: str
:param target_path: Path to the target folder (backup or filesystem
dump)
:type file_path: str
:type target_path: str
:param results_path: Folder where results will be stored
:type results_path: str
:param fast_mode: Flag to enable or disable slow modules
@@ -55,16 +64,21 @@ class MVTModule:
:param results: Provided list of results entries
:type results: list
"""
self.file_path = file_path
self.target_path = target_path
self.results_path = results_path
self.module_options = module_options if module_options else {}
self.file_path: Optional[str] = file_path
self.target_path: Optional[str] = target_path
self.results_path: Optional[str] = results_path
self.module_options: Optional[Dict[str, Any]] = (
module_options if module_options else {}
)
self.log = log
self.indicators = None
self.results = results if results else []
self.detected: List[Dict[str, Any]] = []
self.timeline: List[Dict[str, str]] = []
self.timeline_detected: List[Dict[str, str]] = []
self.indicators: Optional[Indicators] = None
self.alertstore: AlertStore = AlertStore(log=log)
self.results: ModuleResults = results if results else []
self.detected: ModuleResults = []
self.timeline: ModuleTimeline = []
self.timeline_detected: ModuleTimeline = []
@classmethod
def from_json(cls, json_path: str, log: logging.Logger):
@@ -72,11 +86,11 @@ class MVTModule:
results = json.load(handle)
if log:
log.info('Loaded %d results from "%s"', len(results), json_path)
return cls(results=results, log=log)
@classmethod
def get_slug(cls) -> str:
"""Use the module's class name to retrieve a slug"""
if cls.slug:
return cls.slug
@@ -84,26 +98,26 @@ class MVTModule:
return re.sub("([a-z0-9])([A-Z])", r"\1_\2", sub).lower()
def check_indicators(self) -> None:
"""Check the results of this module against a provided list of
indicators.
"""
raise NotImplementedError
def save_to_json(self) -> None:
"""Save the collected results to a json file."""
if not self.results_path:
return
name = self.get_slug()
if self.results:
converted_results = [
asdict(result) if is_dataclass(result) else result
for result in self.results
]
results_file_name = f"{name}.json"
results_json_path = os.path.join(self.results_path, results_file_name)
with open(results_json_path, "w", encoding="utf-8") as handle:
try:
json.dump(self.results, handle, indent=4, cls=CustomJSONEncoder)
json.dump(
converted_results, handle, indent=4, cls=CustomJSONEncoder
)
except Exception as exc:
self.log.error(
"Unable to store results of module %s to file %s: %s",
@@ -118,7 +132,7 @@ class MVTModule:
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
def serialize(self, record: dict) -> Union[dict, list, None]:
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
raise NotImplementedError
@staticmethod
@@ -130,13 +144,21 @@ class MVTModule:
"""
timeline_set = set()
for record in timeline:
timeline_set.add(json.dumps(record, sort_keys=True))
timeline_set.add(
json.dumps(
asdict(record) if is_dataclass(record) else record, sort_keys=True
)
)
return [json.loads(record) for record in timeline_set]
def to_timeline(self) -> None:
"""Convert results into a timeline."""
if not self.results:
return
for result in self.results:
record = self.serialize(result)
record: ModuleSerializedResult = self.serialize(result)
if record:
if isinstance(record, list):
self.timeline.extend(record)
+29
View File
@@ -0,0 +1,29 @@
# Mobile Verification Toolkit (MVT)
# Copyright (c) 2021-2025 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .indicators import Indicator
from dataclasses import dataclass
from typing import List, Union, Optional
@dataclass
class ModuleAtomicResult:
timestamp: Optional[str]
matched_indicator: Optional[Indicator]
ModuleResults = List[ModuleAtomicResult]
@dataclass
class ModuleAtomicTimeline:
timestamp: str
module: str
event: str
data: str
ModuleTimeline = List[ModuleAtomicTimeline]
ModuleSerializedResult = Union[ModuleAtomicTimeline, ModuleTimeline]
+4 -5
View File
@@ -12,7 +12,7 @@ import os
import re
from typing import Any, Iterator, Union
from rich.logging import RichHandler
from .log import MVTLogHandler
from mvt.common.config import settings
@@ -234,11 +234,10 @@ def init_logging(verbose: bool = False):
"""
Initialise logging for the MVT module
"""
# Setup logging using Rich.
log = logging.getLogger("mvt")
log.setLevel(logging.DEBUG)
consoleHandler = RichHandler(show_path=False, log_time_format="%X")
consoleHandler.setFormatter(logging.Formatter("[%(name)s] %(message)s"))
log.setLevel(logging.INFO)
consoleHandler = MVTLogHandler()
consoleHandler.setFormatter(logging.Formatter("%(message)s"))
if verbose:
consoleHandler.setLevel(logging.DEBUG)
else:
+6 -11
View File
@@ -228,11 +228,8 @@ def check_backup(
log.info("Checking iTunes backup located at: %s", backup_path)
cmd.run()
if cmd.detected_count > 0:
log.warning(
"The analysis of the backup produced %d detections!", cmd.detected_count
)
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
@@ -275,12 +272,8 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum
log.info("Checking iOS filesystem located at: %s", dump_path)
cmd.run()
if cmd.detected_count > 0:
log.warning(
"The analysis of the iOS filesystem produced %d detections!",
cmd.detected_count,
)
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
@@ -308,6 +301,8 @@ def check_iocs(ctx, iocs, list_modules, module, folder):
return
cmd.run()
cmd.show_alerts_brief()
cmd.show_support_message()
# ==============================================================================
+4 -3
View File
@@ -91,7 +91,8 @@ class TestAndroidqfPackages:
assert len(possible_detected_app) == 1
assert possible_detected_app[0]["name"] == "com.malware.blah"
assert (
possible_detected_app[0]["matched_indicator"]["value"] == "com.malware.blah"
possible_detected_app[0]["matched_indicator"].ioc.value
== "com.malware.blah"
)
def test_packages_ioc_sha256(self, module, indicators_factory):
@@ -109,7 +110,7 @@ class TestAndroidqfPackages:
assert len(possible_detected_app) == 1
assert possible_detected_app[0]["name"] == "com.malware.muahaha"
assert (
possible_detected_app[0]["matched_indicator"]["value"]
possible_detected_app[0]["matched_indicator"].ioc.value
== "31037a27af59d4914906c01ad14a318eee2f3e31d48da8954dca62a99174e3fa"
)
@@ -128,6 +129,6 @@ class TestAndroidqfPackages:
assert len(possible_detected_app) == 1
assert possible_detected_app[0]["name"] == "com.malware.muahaha"
assert (
possible_detected_app[0]["matched_indicator"]["value"]
possible_detected_app[0]["matched_indicator"].ioc.value
== "c7e56178748be1441370416d4c10e34817ea0c961eb636c8e9d98e0fd79bf730"
)