update alerts.py

This commit is contained in:
Janik Besendorf
2025-11-07 18:22:08 +01:00
parent d259ab4810
commit b1f0a2de06

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import csv
import inspect
import logging
from dataclasses import asdict, dataclass
from enum import Enum
@@ -35,6 +36,39 @@ class AlertStore:
self.__alerts: List[Alert] = []
self.__log = log
def _get_calling_module(self) -> str:
"""
Automatically detect the calling MVT module and return its slug.
Walks up the call stack to find the first frame that belongs to an MVT module
(artifact or extraction module) and extracts its slug.
:return: Module slug string
"""
frame = inspect.currentframe()
try:
# Walk up the call stack
while frame is not None:
frame = frame.f_back
if frame is None:
break
# Get the 'self' object from the frame's local variables
frame_locals = frame.f_locals
if "self" in frame_locals:
obj = frame_locals["self"]
# Check if it has a get_slug method (MVT modules have this)
if hasattr(obj, "get_slug") and callable(obj.get_slug):
try:
return obj.get_slug()
except:
pass
# Fallback: return "unknown" if we can't find the module
return "unknown"
finally:
del frame
@property
def alerts(self) -> List[Alert]:
return self.__alerts
@@ -47,65 +81,55 @@ class AlertStore:
for alert in alerts:
self.add(alert)
def info(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
def info(self, message: str, event_time: str, event: ModuleAtomicResult):
self.add(
Alert(
level=AlertLevel.INFORMATIONAL,
module=module,
module=self._get_calling_module(),
message=message,
event_time=event_time,
event=event,
)
)
def low(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
def low(self, message: str, event_time: str, event: ModuleAtomicResult):
self.add(
Alert(
level=AlertLevel.LOW,
module=module,
module=self._get_calling_module(),
message=message,
event_time=event_time,
event=event,
)
)
def medium(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
def medium(self, message: str, event_time: str, event: ModuleAtomicResult):
self.add(
Alert(
level=AlertLevel.MEDIUM,
module=module,
module=self._get_calling_module(),
message=message,
event_time=event_time,
event=event,
)
)
def high(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
def high(self, message: str, event_time: str, event: ModuleAtomicResult):
self.add(
Alert(
level=AlertLevel.HIGH,
module=module,
module=self._get_calling_module(),
message=message,
event_time=event_time,
event=event,
)
)
def critical(
self, module: str, message: str, event_time: str, event: ModuleAtomicResult
):
def critical(self, message: str, event_time: str, event: ModuleAtomicResult):
self.add(
Alert(
level=AlertLevel.CRITICAL,
module=module,
module=self._get_calling_module(),
message=message,
event_time=event_time,
event=event,