Warn on unknown intrusion log event types

This commit is contained in:
Janik Besendorf
2026-05-12 11:47:19 +02:00
parent f405d38e68
commit 9b6cf82b34
4 changed files with 69 additions and 4 deletions
+16 -1
View File
@@ -10,7 +10,10 @@ from typing import Optional
from mvt.common.command import Command
from mvt.common.indicators import Indicators
from .modules.intrusion_logs import INTRUSION_LOGS_MODULES
from .modules.intrusion_logs import (
INTRUSION_LOGS_MODULES,
KNOWN_INTRUSION_LOG_EVENT_TYPES,
)
from .modules.intrusion_logs.base import IntrusionLogsModule
log = logging.getLogger(__name__)
@@ -95,4 +98,16 @@ class CmdAndroidCheckIntrusionLogs(Command):
len(all_events),
)
unknown_event_types = sorted(
event_type
for event_type in all_events
if event_type not in KNOWN_INTRUSION_LOG_EVENT_TYPES
)
if unknown_event_types:
self.log.warning(
"Found unknown intrusion logging event type(s): %s. "
"Please open an issue on GitHub so MVT can add support for them.",
", ".join(unknown_event_types),
)
return all_events
@@ -12,3 +12,9 @@ INTRUSION_LOGS_MODULES = [
ConnectEvent,
SecurityEvent,
]
KNOWN_INTRUSION_LOG_EVENT_TYPES = {
"connect_event",
"dns_event",
"security_event",
}
@@ -744,3 +744,15 @@ class SecurityEvent(IntrusionLogsModule):
"name", event_type
)
self.log.info(" - %s: %d", event_name, count)
unknown_event_types = sorted(
event_type
for event_type in self.event_type_counts
if event_type not in SECURITY_EVENT_TAGS
)
if unknown_event_types:
self.log.warning(
"Found unknown intrusion logging security event type(s): %s. "
"Please open an issue on GitHub so MVT can add support for them.",
", ".join(unknown_event_types),
)
+35 -3
View File
@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
import json
import logging
from click.testing import CliRunner
@@ -46,7 +47,32 @@ def test_load_all_events_preserves_unknown_top_level_event(tmp_path):
}
def test_check_intrusion_logs_parses_core_and_unknown_security_events(tmp_path):
def test_check_intrusion_logs_warns_about_unknown_top_level_event_type(
tmp_path, caplog
):
_write_ndjson(
tmp_path / "intrusion.txt",
[
{
"future_event": {
"event_time": 1_700_000_000_000,
"field": "value",
}
}
],
)
with caplog.at_level(logging.WARNING):
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
assert "Found unknown intrusion logging event type(s): future_event" in caplog.text
assert "Please open an issue on GitHub" in caplog.text
def test_check_intrusion_logs_parses_core_and_unknown_security_events(
tmp_path, caplog
):
_write_ndjson(
tmp_path / "intrusion.txt",
[
@@ -87,8 +113,9 @@ def test_check_intrusion_logs_parses_core_and_unknown_security_events(tmp_path):
],
)
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
with caplog.at_level(logging.WARNING):
cmd = CmdAndroidCheckIntrusionLogs(target_path=str(tmp_path))
cmd.run()
assert [module.__class__.__name__ for module in cmd.executed] == [
"DnsEvent",
@@ -109,6 +136,11 @@ def test_check_intrusion_logs_parses_core_and_unknown_security_events(tmp_path):
assert len(future_timeline_events) == 1
assert "future_google_event" in future_timeline_events[0]["data"]
assert "field" in future_timeline_events[0]["data"]
assert (
"Found unknown intrusion logging security event type(s): future_google_event"
in caplog.text
)
assert "Please open an issue on GitHub" in caplog.text
def test_check_intrusion_logs_cli_lists_modules(tmp_path):