Remove slug from alertstore calls

This commit is contained in:
Janik Besendorf
2025-11-07 18:20:36 +01:00
parent d4b970c7c0
commit d259ab4810
21 changed files with 44 additions and 82 deletions

View File

@@ -17,7 +17,7 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
def parse(self, content: str) -> None:

View File

@@ -5,12 +5,11 @@
from datetime import datetime
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_datetime_to_iso
from .artifact import AndroidArtifact
RISKY_PERMISSIONS = ["REQUEST_INSTALL_PACKAGES"]
RISKY_PACKAGES = ["com.android.shell"]
@@ -46,9 +45,7 @@ class DumpsysAppopsArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result.get("package_name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
continue
# We use a placeholder entry to create a basic alert even without permission entries.
@@ -66,7 +63,6 @@ class DumpsysAppopsArtifact(AndroidArtifact):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
self.get_slug(),
f"Package '{result['package_name']}' had risky permission '{perm['name']}' set to '{entry['access']}' at {entry['timestamp']}",
entry["timestamp"],
cleaned_result,
@@ -80,7 +76,6 @@ class DumpsysAppopsArtifact(AndroidArtifact):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
self.get_slug(),
f"Risky package '{result['package_name']}' had '{perm['name']}' permission set to '{entry['access']}' at {entry['timestamp']}",
entry["timestamp"],
cleaned_result,

View File

@@ -4,8 +4,9 @@
# https://license.mvt.re/1.1/
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleSerializedResult, ModuleAtomicResult
class DumpsysBatteryDailyArtifact(AndroidArtifact):
@@ -30,7 +31,7 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
def parse(self, output: str) -> None:

View File

@@ -19,7 +19,7 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
def parse(self, data: str) -> None:

View File

@@ -23,9 +23,7 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(part)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
continue
def parse(self, output: str) -> None:

View File

@@ -15,9 +15,7 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(activity["package_name"])
if ioc_match:
activity["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", activity
)
self.alertstore.critical(ioc_match.message, "", activity)
continue
def parse(self, content: str):

View File

@@ -7,9 +7,9 @@ import re
from typing import Any, Dict, List
from mvt.android.utils import ROOT_PACKAGES
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
class DumpsysPackagesArtifact(AndroidArtifact):
@@ -18,7 +18,6 @@ class DumpsysPackagesArtifact(AndroidArtifact):
# XXX: De-duplication Package detections
if result["package_name"] in ROOT_PACKAGES:
self.alertstore.medium(
self.get_slug(),
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
"",
result,
@@ -32,7 +31,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.log_latest()
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:

View File

@@ -19,7 +19,7 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
def parse(self, data: str) -> None:

View File

@@ -53,9 +53,7 @@ class DumpsysReceiversArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(receiver["package_name"])
if ioc_match:
receiver["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", {intent: receiver}
)
self.alertstore.critical(ioc_match.message, "", {intent: receiver})
continue
def parse(self, output: str) -> None:

View File

@@ -60,7 +60,7 @@ class GetProp(AndroidArtifact):
if entry["name"] == "ro.build.version.security_patch":
warning_message = warn_android_patch_level(entry["value"], self.log)
self.alertstore.medium(self.get_slug(), warning_message, "", entry)
self.alertstore.medium(warning_message, "", entry)
if not self.indicators:
return
@@ -71,4 +71,4 @@ class GetProp(AndroidArtifact):
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)

View File

@@ -134,14 +134,12 @@ class Mounts(AndroidArtifact):
system_rw_mounts.append(mount)
if mount_point == "/system":
self.alertstore.high(
self.get_slug(),
"Root detected /system partition is mounted as read-write (rw)",
"",
mount,
)
else:
self.alertstore.high(
self.get_slug(),
f"System partition {mount_point} is mounted as read-write (rw). This may indicate system modifications.",
"",
mount,
@@ -157,7 +155,6 @@ class Mounts(AndroidArtifact):
continue
suspicious_mounts.append(mount)
self.alertstore.high(
self.get_slug(),
f"Suspicious mount options found for {mount_point}: {', '.join(suspicious_opts)}",
"",
mount,
@@ -184,7 +181,6 @@ class Mounts(AndroidArtifact):
if ioc:
mount["matched_indicator"] = ioc
self.alertstore.critical(
self.get_slug(),
f"Mount point matches indicator: {mount.get('mount_point', '')}",
"",
mount,
@@ -195,7 +191,6 @@ class Mounts(AndroidArtifact):
if ioc:
mount["matched_indicator"] = ioc
self.alertstore.critical(
self.get_slug(),
f"Device path matches indicator: {mount.get('device', '')}",
"",
mount,

View File

@@ -61,10 +61,10 @@ class Processes(AndroidArtifact):
ioc_match = self.indicators.check_app_id(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)

View File

@@ -6,15 +6,15 @@
import datetime
from typing import List, Optional
import pydantic
import betterproto
import pydantic
from dateutil import parser
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.android.parsers.proto.tombstone import Tombstone
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_datetime_to_iso
from .artifact import AndroidArtifact
TOMBSTONE_DELIMITER = "*** *** *** *** *** *** *** *** *** *** *** *** *** *** *** ***"
@@ -96,7 +96,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
ioc_match = self.indicators.check_process(result["process_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
continue
if result.get("command_line", []):
@@ -104,9 +104,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
ioc_match = self.indicators.check_process(command_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
continue
SUSPICIOUS_UIDS = [
@@ -116,7 +114,6 @@ class TombstoneCrashArtifact(AndroidArtifact):
]
if result["uid"] in SUSPICIOUS_UIDS:
self.alertstore.medium(
self.get_slug(),
(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"

View File

@@ -8,12 +8,12 @@ import os
import sqlite3
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from .base import AndroidExtraction
@@ -59,7 +59,7 @@ class ChromeHistory(AndroidExtraction):
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.

View File

@@ -94,18 +94,16 @@ class Packages(AndroidExtraction):
if not self.indicators:
continue
ioc_match = self.indicators.check_app_id(result.get("package_name"))
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
for package_file in result.get("files", []):
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
# @staticmethod
# def check_virustotal(packages: list) -> None:

View File

@@ -10,12 +10,12 @@ from typing import Optional
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction
@@ -93,9 +93,7 @@ class SMS(AndroidExtraction):
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
self.alertstore.critical(ioc_match.message, "", message)
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.

View File

@@ -15,8 +15,8 @@ from typing import Optional
from mvt.android.modules.androidqf.base import AndroidQFModule
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
@@ -90,7 +90,7 @@ class AQFFiles(AndroidQFModule):
ioc_match = self.indicators.check_file_path(result["path"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.log_latest()
continue
@@ -105,16 +105,16 @@ class AQFFiles(AndroidQFModule):
file_type = "executable "
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
self.alertstore.high(self.get_slug(), msg, "", result)
self.alertstore.high(msg, "", result)
self.alertstore.log_latest()
if result.get("sha256", "") == "":
continue
ioc_match = self.indicators.check_file_hash(result["sha256"])
ioc_match = self.indicators.check_file_hash(result.get("sha256"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
# TODO: adds SHA1 and MD5 when available in MVT

View File

@@ -11,13 +11,13 @@ from mvt.android.utils import (
BROWSER_INSTALLERS,
PLAY_STORE_INSTALLERS,
ROOT_PACKAGES,
THIRD_PARTY_STORE_INSTALLERS,
SECURITY_PACKAGES,
SYSTEM_UPDATE_PACKAGES,
THIRD_PARTY_STORE_INSTALLERS,
)
from mvt.common.module_types import ModuleResults
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFPackages(AndroidQFModule):
@@ -45,7 +45,6 @@ class AQFPackages(AndroidQFModule):
for result in self.results:
if result["name"] in ROOT_PACKAGES:
self.alertstore.medium(
self.get_slug(),
f'Found an installed package related to rooting/jailbreaking: "{result["name"]}"',
"",
result,
@@ -56,7 +55,6 @@ class AQFPackages(AndroidQFModule):
# Detections for apps installed via unusual methods.
if result["installer"] in THIRD_PARTY_STORE_INSTALLERS:
self.alertstore.info(
self.get_slug(),
f'Found a package installed via a third party store (installer="{result["installer"]}"): "{result["name"]}"',
"",
result,
@@ -64,7 +62,6 @@ class AQFPackages(AndroidQFModule):
self.alertstore.log_latest()
elif result["installer"] in BROWSER_INSTALLERS:
self.alertstore.medium(
self.get_slug(),
f'Found a package installed via a browser (installer="{result["installer"]}"): "{result["name"]}"',
"",
result,
@@ -72,7 +69,6 @@ class AQFPackages(AndroidQFModule):
self.alertstore.log_latest()
elif result["installer"] == "null" and result["system"] is False:
self.alertstore.high(
self.get_slug(),
f'Found a non-system package installed via adb or another method: "{result["name"]}"',
"",
result,
@@ -85,7 +81,6 @@ class AQFPackages(AndroidQFModule):
package_disabled = result.get("disabled", None)
if result["name"] in SECURITY_PACKAGES and package_disabled:
self.alertstore.high(
self.get_slug(),
f'Security package "{result["name"]}" disabled on the phone',
"",
result,
@@ -94,7 +89,6 @@ class AQFPackages(AndroidQFModule):
if result["name"] in SYSTEM_UPDATE_PACKAGES and package_disabled:
self.alertstore.high(
self.get_slug(),
f'System OTA update package "{result["name"]}" disabled on the phone',
"",
result,
@@ -107,16 +101,14 @@ class AQFPackages(AndroidQFModule):
ioc_match = self.indicators.check_app_id(result.get("name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.log_latest()
for package_file in result.get("files", []):
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.log_latest()
if "certificate" not in package_file:
@@ -130,9 +122,7 @@ class AQFPackages(AndroidQFModule):
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.log_latest()
break

View File

@@ -47,7 +47,6 @@ class RootBinaries(AndroidQFModule):
# All found root binaries are considered indicators of rooting
for result in self.results:
self.alertstore.high(
self.get_slug(),
f'Found root binary "{result["binary_name"]}" at path "{result["path"]}"',
"",
result,

View File

@@ -56,9 +56,7 @@ class SMS(AndroidQFModule):
ioc_match = self.indicators.check_domains(message.get("links", []))
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
self.alertstore.critical(ioc_match.message, "", message)
def parse_backup(self, data):
header = parse_ab_header(data)

View File

@@ -8,8 +8,8 @@ from typing import Optional
from mvt.android.modules.backup.base import BackupModule
from mvt.android.parsers.backup import parse_sms_file
from mvt.common.utils import check_for_links
from mvt.common.module_types import ModuleResults
from mvt.common.utils import check_for_links
class SMS(BackupModule):
@@ -47,9 +47,7 @@ class SMS(BackupModule):
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
self.alertstore.critical(ioc_match.message, "", message)
continue
def run(self) -> None: