This commit is contained in:
Janik Besendorf
2025-11-07 18:52:31 +01:00
parent cc7781e255
commit 6d1d499c4e
21 changed files with 142 additions and 72 deletions

View File

@@ -16,8 +16,9 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def parse(self, content: str) -> None:

View File

@@ -4,6 +4,7 @@
# https://license.mvt.re/1.1/
from datetime import datetime
from typing import Any
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.common.utils import convert_datetime_to_iso
@@ -44,8 +45,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
if self.indicators:
ioc_match = self.indicators.check_app_id(result.get("package_name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
# We use a placeholder entry to create a basic alert even without permission entries.
@@ -83,9 +85,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
def parse(self, output: str) -> None:
# self.results: List[Dict[str, Any]] = []
perm = {}
package = {}
entry = {}
perm: dict[str, Any] = {}
package: dict[str, Any] = {}
entry: dict[str, Any] = {}
uid = None
in_packages = False

View File

@@ -3,6 +3,7 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Any
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
@@ -30,13 +31,14 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def parse(self, output: str) -> None:
daily = None
daily_updates = []
daily_updates: list[dict[str, Any]] = []
for line in output.splitlines():
if line.startswith(" Daily from "):
if len(daily_updates) > 0:

View File

@@ -18,8 +18,9 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def parse(self, data: str) -> None:

View File

@@ -22,8 +22,9 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
for part in path.split("/"):
ioc_match = self.indicators.check_app_id(part)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def parse(self, output: str) -> None:

View File

@@ -14,8 +14,9 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
for activity in self.results:
ioc_match = self.indicators.check_app_id(activity["package_name"])
if ioc_match:
activity["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", activity)
self.alertstore.critical(
ioc_match.message, "", activity, matched_indicator=ioc_match.ioc
)
continue
def parse(self, content: str):

View File

@@ -30,8 +30,9 @@ class DumpsysPackagesArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
@@ -62,15 +63,15 @@ class DumpsysPackagesArtifact(AndroidArtifact):
"""
Parse one entry of a dumpsys package information
"""
details = {
details: Dict[str, Any] = {
"uid": "",
"version_name": "",
"version_code": "",
"timestamp": "",
"first_install_time": "",
"last_update_time": "",
"permissions": [],
"requested_permissions": [],
"permissions": list(),
"requested_permissions": list(),
}
in_install_permissions = False
in_runtime_permissions = False
@@ -148,7 +149,7 @@ class DumpsysPackagesArtifact(AndroidArtifact):
results = []
package_name = None
package = {}
lines = []
lines: list[str] = []
for line in output.splitlines():
if line.startswith(" Package ["):
if len(lines) > 0:

View File

@@ -18,8 +18,9 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
for result in self.results:
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
def parse(self, data: str) -> None:

View File

@@ -52,12 +52,16 @@ class DumpsysReceiversArtifact(AndroidArtifact):
ioc_match = self.indicators.check_app_id(receiver["package_name"])
if ioc_match:
receiver["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", {intent: receiver})
self.alertstore.critical(
ioc_match.message,
"",
{intent: receiver},
matched_indicator=ioc_match.ioc,
)
continue
def parse(self, output: str) -> None:
self.results = {}
self.results: dict[str, list[dict[str, str]]] = {}
in_receiver_resolver_table = False
in_non_data_actions = False

View File

@@ -39,10 +39,10 @@ class GetProp(AndroidArtifact):
if not matches or len(matches[0]) != 2:
continue
entry = {"name": matches[0][0], "value": matches[0][1]}
self.results.append(entry)
prop_entry = {"name": matches[0][0], "value": matches[0][1]}
self.results.append(prop_entry)
def get_device_timezone(self) -> str:
def get_device_timezone(self) -> str | None:
"""
Get the device timezone from the getprop results
@@ -70,5 +70,6 @@ class GetProp(AndroidArtifact):
result.get("name", "")
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)

View File

@@ -179,19 +179,19 @@ class Mounts(AndroidArtifact):
# Check if any mount points match indicators
ioc = self.indicators.check_file_path(mount.get("mount_point", ""))
if ioc:
mount["matched_indicator"] = ioc
self.alertstore.critical(
f"Mount point matches indicator: {mount.get('mount_point', '')}",
"",
mount,
matched_indicator=ioc,
)
# Check device paths for indicators
ioc = self.indicators.check_file_path(mount.get("device", ""))
if ioc:
mount["matched_indicator"] = ioc
self.alertstore.critical(
f"Device path matches indicator: {mount.get('device', '')}",
"",
mount,
matched_indicator=ioc,
)

View File

@@ -60,11 +60,13 @@ class Processes(AndroidArtifact):
ioc_match = self.indicators.check_app_id(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)

View File

@@ -95,16 +95,19 @@ class TombstoneCrashArtifact(AndroidArtifact):
for result in self.results:
ioc_match = self.indicators.check_process(result["process_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
command_name = result["command_line"][0]
ioc_match = self.indicators.check_process(command_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
continue
SUSPICIOUS_UIDS = [

View File

@@ -58,8 +58,9 @@ class ChromeHistory(AndroidExtraction):
for result in self.results:
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.

View File

@@ -96,14 +96,16 @@ class Packages(AndroidExtraction):
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
for package_file in result.get("files", []):
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
# @staticmethod
# def check_virustotal(packages: list) -> None:

View File

@@ -92,8 +92,9 @@ class SMS(AndroidExtraction):
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", message)
self.alertstore.critical(
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
)
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.

View File

@@ -89,8 +89,9 @@ class AQFFiles(AndroidQFModule):
for result in self.results:
ioc_match = self.indicators.check_file_path(result["path"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
continue
@@ -113,8 +114,9 @@ class AQFFiles(AndroidQFModule):
ioc_match = self.indicators.check_file_hash(result.get("sha256"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
# TODO: adds SHA1 and MD5 when available in MVT

View File

@@ -100,15 +100,17 @@ class AQFPackages(AndroidQFModule):
ioc_match = self.indicators.check_app_id(result.get("name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
for package_file in result.get("files", []):
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message, "", result, matched_indicator=ioc_match.ioc
)
self.alertstore.log_latest()
if "certificate" not in package_file:
@@ -121,8 +123,12 @@ class AQFPackages(AndroidQFModule):
certificate_hash
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", result)
self.alertstore.critical(
ioc_match.message,
"",
result,
matched_indicator=ioc_match.ioc,
)
self.alertstore.log_latest()
break

View File

@@ -55,8 +55,9 @@ class SMS(AndroidQFModule):
ioc_match = self.indicators.check_domains(message.get("links", []))
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", message)
self.alertstore.critical(
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
)
def parse_backup(self, data):
header = parse_ab_header(data)

View File

@@ -4,7 +4,7 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional
from typing import Any, Optional
from mvt.android.modules.backup.base import BackupModule
from mvt.android.parsers.backup import parse_sms_file
@@ -30,7 +30,7 @@ class SMS(BackupModule):
log=log,
results=results,
)
self.results = []
self.results: list[dict[str, Any]] = []
def check_indicators(self) -> None:
if not self.indicators:
@@ -46,8 +46,9 @@ class SMS(BackupModule):
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(ioc_match.message, "", message)
self.alertstore.critical(
ioc_match.message, "", message, matched_indicator=ioc_match.ioc
)
continue
def run(self) -> None:

View File

@@ -29,6 +29,7 @@ class Alert:
message: str
event_time: str
event: ModuleAtomicResult
matched_indicator: Optional[Any] = None
class AlertStore:
@@ -60,7 +61,7 @@ class AlertStore:
# Check if it has a get_slug method (MVT modules have this)
if hasattr(obj, "get_slug") and callable(obj.get_slug):
try:
return obj.get_slug()
return str(obj.get_slug())
except Exception:
pass
@@ -81,7 +82,13 @@ class AlertStore:
for alert in alerts:
self.add(alert)
def info(self, message: str, event_time: str, event: ModuleAtomicResult):
def info(
self,
message: str,
event_time: str,
event: ModuleAtomicResult,
matched_indicator: Optional[Any] = None,
):
self.add(
Alert(
level=AlertLevel.INFORMATIONAL,
@@ -89,10 +96,17 @@ class AlertStore:
message=message,
event_time=event_time,
event=event,
matched_indicator=matched_indicator,
)
)
def low(self, message: str, event_time: str, event: ModuleAtomicResult):
def low(
self,
message: str,
event_time: str,
event: ModuleAtomicResult,
matched_indicator: Optional[Any] = None,
):
self.add(
Alert(
level=AlertLevel.LOW,
@@ -100,10 +114,17 @@ class AlertStore:
message=message,
event_time=event_time,
event=event,
matched_indicator=matched_indicator,
)
)
def medium(self, message: str, event_time: str, event: ModuleAtomicResult):
def medium(
self,
message: str,
event_time: str,
event: ModuleAtomicResult,
matched_indicator: Optional[Any] = None,
):
self.add(
Alert(
level=AlertLevel.MEDIUM,
@@ -111,10 +132,17 @@ class AlertStore:
message=message,
event_time=event_time,
event=event,
matched_indicator=matched_indicator,
)
)
def high(self, message: str, event_time: str, event: ModuleAtomicResult):
def high(
self,
message: str,
event_time: str,
event: ModuleAtomicResult,
matched_indicator: Optional[Any] = None,
):
self.add(
Alert(
level=AlertLevel.HIGH,
@@ -122,10 +150,17 @@ class AlertStore:
message=message,
event_time=event_time,
event=event,
matched_indicator=matched_indicator,
)
)
def critical(self, message: str, event_time: str, event: ModuleAtomicResult):
def critical(
self,
message: str,
event_time: str,
event: ModuleAtomicResult,
matched_indicator: Optional[Any] = None,
):
self.add(
Alert(
level=AlertLevel.CRITICAL,
@@ -133,6 +168,7 @@ class AlertStore:
message=message,
event_time=event_time,
event=event,
matched_indicator=matched_indicator,
)
)