From b1f0a2de06c987babeddf2cb6d6fbb84a677fc9b Mon Sep 17 00:00:00 2001 From: Janik Besendorf Date: Fri, 7 Nov 2025 18:22:08 +0100 Subject: [PATCH] update alerts.py --- src/mvt/common/alerts.py | 64 +++++++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/src/mvt/common/alerts.py b/src/mvt/common/alerts.py index e3831ab..0c596cf 100644 --- a/src/mvt/common/alerts.py +++ b/src/mvt/common/alerts.py @@ -4,6 +4,7 @@ # https://license.mvt.re/1.1/ import csv +import inspect import logging from dataclasses import asdict, dataclass from enum import Enum @@ -35,6 +36,39 @@ class AlertStore: self.__alerts: List[Alert] = [] self.__log = log + def _get_calling_module(self) -> str: + """ + Automatically detect the calling MVT module and return its slug. + + Walks up the call stack to find the first frame that belongs to an MVT module + (artifact or extraction module) and extracts its slug. + + :return: Module slug string + """ + frame = inspect.currentframe() + try: + # Walk up the call stack + while frame is not None: + frame = frame.f_back + if frame is None: + break + + # Get the 'self' object from the frame's local variables + frame_locals = frame.f_locals + if "self" in frame_locals: + obj = frame_locals["self"] + # Check if it has a get_slug method (MVT modules have this) + if hasattr(obj, "get_slug") and callable(obj.get_slug): + try: + return obj.get_slug() + except: + pass + + # Fallback: return "unknown" if we can't find the module + return "unknown" + finally: + del frame + @property def alerts(self) -> List[Alert]: return self.__alerts @@ -47,65 +81,55 @@ class AlertStore: for alert in alerts: self.add(alert) - def info( - self, module: str, message: str, event_time: str, event: ModuleAtomicResult - ): + def info(self, message: str, event_time: str, event: ModuleAtomicResult): self.add( Alert( level=AlertLevel.INFORMATIONAL, - module=module, + module=self._get_calling_module(), message=message, event_time=event_time, event=event, ) ) - def low( - self, module: str, message: str, event_time: str, event: ModuleAtomicResult - ): + def low(self, message: str, event_time: str, event: ModuleAtomicResult): self.add( Alert( level=AlertLevel.LOW, - module=module, + module=self._get_calling_module(), message=message, event_time=event_time, event=event, ) ) - def medium( - self, module: str, message: str, event_time: str, event: ModuleAtomicResult - ): + def medium(self, message: str, event_time: str, event: ModuleAtomicResult): self.add( Alert( level=AlertLevel.MEDIUM, - module=module, + module=self._get_calling_module(), message=message, event_time=event_time, event=event, ) ) - def high( - self, module: str, message: str, event_time: str, event: ModuleAtomicResult - ): + def high(self, message: str, event_time: str, event: ModuleAtomicResult): self.add( Alert( level=AlertLevel.HIGH, - module=module, + module=self._get_calling_module(), message=message, event_time=event_time, event=event, ) ) - def critical( - self, module: str, message: str, event_time: str, event: ModuleAtomicResult - ): + def critical(self, message: str, event_time: str, event: ModuleAtomicResult): self.add( Alert( level=AlertLevel.CRITICAL, - module=module, + module=self._get_calling_module(), message=message, event_time=event_time, event=event,