mirror of
https://github.com/mvt-project/mvt.git
synced 2026-07-02 19:25:48 +02:00
intrusion_logs: alert on certificate events and run heuristics without IOCs (#811)
* intrusion_logs: alert on certificate events and run heuristics without IOCs SecurityEvent.check_indicators() returned early when no indicator set was loaded, so none of its heuristic alerts (key integrity, wipe failure, crypto self-test, certificate events) reached the alert store on a default run. On top of that, cert_authority_installed and cert_validation_failure only emitted log.warning and never alerted even when indicators were present. Run the heuristic alerts independently of the loaded indicators (matching the accessibility fix in #807) and surface the two certificate events through the alert store at medium severity. A successfully installed root CA and a certificate validation failure are interception/MITM-relevant signals that belong in the alert report. Adds regression tests for both certificate events and for heuristics firing with no indicators loaded. * intrusion_logs: gate certificate authority install alert on success Failed install attempts log a warning instead of raising the "Certificate authority installed" alert. Add a regression test covering success encoded as bool and as int. --------- Co-authored-by: John Kavanagh <668351+kavanista@users.noreply.github.com> Co-authored-by: besendorf <janik@besendorf.org>
This commit is contained in:
@@ -303,10 +303,15 @@ class SecurityEvent(IntrusionLogsModule):
|
||||
|
||||
def check_indicators(self) -> None:
|
||||
"""Check security events against indicators of compromise."""
|
||||
if not self.indicators:
|
||||
return
|
||||
|
||||
for result in self.results:
|
||||
# Heuristic alerts are intrinsic to the event, so they run even
|
||||
# when no indicator set is loaded.
|
||||
self._check_security_heuristics(result)
|
||||
|
||||
# The remaining checks match events against loaded indicators.
|
||||
if not self.indicators:
|
||||
continue
|
||||
|
||||
# Check app process start events for suspicious package names
|
||||
if "app_process_start" in result:
|
||||
process_info = result["app_process_start"]
|
||||
@@ -390,60 +395,73 @@ class SecurityEvent(IntrusionLogsModule):
|
||||
matched_indicator=ioc.ioc,
|
||||
)
|
||||
|
||||
# Flag failed cryptographic operations as potentially suspicious
|
||||
if "key_generated" in result:
|
||||
if not result["key_generated"].get("success", True):
|
||||
self.log.warning(
|
||||
"Failed key generation detected for key_id: %s",
|
||||
result["key_generated"].get("key_id", "unknown"),
|
||||
)
|
||||
|
||||
# Flag certificate validation failures
|
||||
if "cert_validation_failure" in result:
|
||||
def _check_security_heuristics(self, result: dict) -> None:
|
||||
"""Raise alerts for events that are intrinsically suspicious,
|
||||
independent of any loaded indicators."""
|
||||
# Flag failed cryptographic operations as potentially suspicious
|
||||
if "key_generated" in result:
|
||||
if not result["key_generated"].get("success", True):
|
||||
self.log.warning(
|
||||
"Certificate validation failure detected: %s",
|
||||
result.get("cert_validation_failure"),
|
||||
"Failed key generation detected for key_id: %s",
|
||||
result["key_generated"].get("key_id", "unknown"),
|
||||
)
|
||||
|
||||
# Flag key integrity violations
|
||||
if "key_integrity_violation" in result:
|
||||
# Flag certificate validation failures
|
||||
if "cert_validation_failure" in result:
|
||||
self.alertstore.medium(
|
||||
"Certificate validation failure detected: "
|
||||
f"{result.get('cert_validation_failure')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag key integrity violations
|
||||
if "key_integrity_violation" in result:
|
||||
self.alertstore.medium(
|
||||
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag successful certificate authority installations (potential
|
||||
# MITM); a missing success field is treated as installed
|
||||
if "cert_authority_installed" in result:
|
||||
cert_info = result["cert_authority_installed"]
|
||||
if cert_info.get("success", True):
|
||||
self.alertstore.medium(
|
||||
f"Key integrity violation detected: {result.get('key_integrity_violation')}",
|
||||
"Certificate authority installed: "
|
||||
f"{cert_info.get('subject', 'unknown')}",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag certificate authority installations (potential MITM)
|
||||
if "cert_authority_installed" in result:
|
||||
cert_info = result["cert_authority_installed"]
|
||||
else:
|
||||
self.log.warning(
|
||||
"Certificate authority installed: %s (success: %s)",
|
||||
"Failed certificate authority install attempt: %s",
|
||||
cert_info.get("subject", "unknown"),
|
||||
cert_info.get("success", "unknown"),
|
||||
)
|
||||
|
||||
# Flag wipe failures
|
||||
if "wipe_failure" in result:
|
||||
# Flag wipe failures
|
||||
if "wipe_failure" in result:
|
||||
self.alertstore.medium(
|
||||
"Device wipe failure detected",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag crypto self test failures
|
||||
if "crypto_self_test_completed" in result:
|
||||
test_result = result["crypto_self_test_completed"]
|
||||
if isinstance(test_result, dict):
|
||||
success = test_result.get("success", True)
|
||||
else:
|
||||
success = test_result == 1
|
||||
if not success:
|
||||
self.alertstore.medium(
|
||||
"Device wipe failure detected",
|
||||
"Cryptographic self test failed",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
# Flag crypto self test failures
|
||||
if "crypto_self_test_completed" in result:
|
||||
test_result = result["crypto_self_test_completed"]
|
||||
if isinstance(test_result, dict):
|
||||
success = test_result.get("success", True)
|
||||
else:
|
||||
success = test_result == 1
|
||||
if not success:
|
||||
self.alertstore.medium(
|
||||
"Cryptographic self test failed",
|
||||
result.get("timestamp") or "",
|
||||
result,
|
||||
)
|
||||
|
||||
def serialize(self, record: dict) -> Union[dict, list]:
|
||||
"""Serialize a security event record for timeline output."""
|
||||
# Determine the event sub-type
|
||||
|
||||
@@ -6,12 +6,14 @@
|
||||
import json
|
||||
import logging
|
||||
|
||||
import pytest
|
||||
from click.testing import CliRunner
|
||||
|
||||
from mvt.android.cli import check_intrusion_logs
|
||||
from mvt.android.cmd_check_intrusion_logs import CmdAndroidCheckIntrusionLogs
|
||||
from mvt.android.modules.intrusion_logs.base import IntrusionLogsModule
|
||||
from mvt.android.modules.intrusion_logs.security_event import SecurityEvent
|
||||
from mvt.common.alerts import AlertLevel
|
||||
|
||||
|
||||
def _write_ndjson(path, records):
|
||||
@@ -204,3 +206,82 @@ def test_check_intrusion_logs_cli_lists_modules(tmp_path):
|
||||
assert "DnsEvent" in result.output
|
||||
assert "ConnectEvent" in result.output
|
||||
assert "SecurityEvent" in result.output
|
||||
|
||||
|
||||
def _run_security_heuristics(results):
|
||||
# No indicators loaded: heuristic alerts must still fire.
|
||||
module = SecurityEvent(results=results)
|
||||
module.check_indicators()
|
||||
return module.alertstore.alerts
|
||||
|
||||
|
||||
def test_cert_authority_installed_raises_medium_alert_without_indicators():
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_authority_installed": {
|
||||
"subject": "CN=Unexpected Root CA",
|
||||
"success": True,
|
||||
},
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0].level == AlertLevel.MEDIUM
|
||||
assert "Certificate authority installed" in alerts[0].message
|
||||
assert "Unexpected Root CA" in alerts[0].message
|
||||
|
||||
|
||||
# Exported logs encode success as a JSON bool, raw SecurityLog as int 0/1.
|
||||
@pytest.mark.parametrize("success", [False, 0])
|
||||
def test_failed_cert_authority_install_does_not_alert(success, caplog):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_authority_installed": {
|
||||
"subject": "CN=Unexpected Root CA",
|
||||
"success": success,
|
||||
},
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert alerts == []
|
||||
assert "Failed certificate authority install attempt" in caplog.text
|
||||
assert "Unexpected Root CA" in caplog.text
|
||||
|
||||
|
||||
def test_cert_validation_failure_raises_medium_alert_without_indicators():
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"cert_validation_failure": "chain validation failed",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 1
|
||||
assert alerts[0].level == AlertLevel.MEDIUM
|
||||
assert "Certificate validation failure" in alerts[0].message
|
||||
|
||||
|
||||
def test_security_heuristics_fire_when_no_indicators_loaded():
|
||||
# check_indicators() previously returned early with no indicators loaded,
|
||||
# so none of the heuristic alerts fired on a default run.
|
||||
alerts = _run_security_heuristics(
|
||||
[
|
||||
{"timestamp": "2024-01-01 00:00:00.000", "wipe_failure": {"reason": "x"}},
|
||||
{
|
||||
"timestamp": "2024-01-01 00:00:00.000",
|
||||
"key_integrity_violation": {"key_id": "k1"},
|
||||
},
|
||||
]
|
||||
)
|
||||
|
||||
assert len(alerts) == 2
|
||||
assert all(alert.level == AlertLevel.MEDIUM for alert in alerts)
|
||||
|
||||
Reference in New Issue
Block a user