Rework old detections tracking into stuctured alert levels

This commit is contained in:
Donncha Ó Cearbhaill
2025-02-19 23:46:03 +01:00
parent ca0bc46f11
commit 2d547662f8
126 changed files with 1132 additions and 761 deletions
@@ -14,10 +14,10 @@ class DumpsysAccessibilityArtifact(AndroidArtifact):
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def parse(self, content: str) -> None:
+34 -31
View File
@@ -4,9 +4,9 @@
# https://license.mvt.re/1.1/
from datetime import datetime
from typing import Any, Dict, List, Union
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from .artifact import AndroidArtifact
@@ -20,9 +20,9 @@ class DumpsysAppopsArtifact(AndroidArtifact):
Parser for dumpsys app ops info
"""
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for perm in record["permissions"]:
for perm in result["permissions"]:
if "entries" not in perm:
continue
@@ -33,7 +33,7 @@ class DumpsysAppopsArtifact(AndroidArtifact):
"timestamp": entry["timestamp"],
"module": self.__class__.__name__,
"event": entry["access"],
"data": f"{record['package_name']} access to "
"data": f"{result['package_name']} access to "
f"{perm['name']}: {entry['access']}",
}
)
@@ -43,48 +43,51 @@ class DumpsysAppopsArtifact(AndroidArtifact):
def check_indicators(self) -> None:
for result in self.results:
if self.indicators:
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result.get("package_name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
detected_permissions = []
# We use a placeholder entry to create a basic alert even without permission entries.
placeholder_entry = {"access": "Unknown", "timestamp": ""}
for perm in result["permissions"]:
if (
perm["name"] in RISKY_PERMISSIONS
# and perm["access"] == "allow"
):
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Package '%s' had risky permission '%s' set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
for entry in sorted(
perm["entries"] or [placeholder_entry],
key=lambda x: x["timestamp"],
):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
self.get_slug(),
f"Package '{result['package_name']}' had risky permission '{perm['name']}' set to '{entry['access']}' at {entry['timestamp']}",
entry["timestamp"],
cleaned_result,
)
elif result["package_name"] in RISKY_PACKAGES:
detected_permissions.append(perm)
for entry in sorted(perm["entries"], key=lambda x: x["timestamp"]):
self.log.warning(
"Risky package '%s' had '%s' permission set to '%s' at %s",
result["package_name"],
perm["name"],
entry["access"],
for entry in sorted(
perm["entries"] or [placeholder_entry],
key=lambda x: x["timestamp"],
):
cleaned_result = result.copy()
cleaned_result["permissions"] = [perm]
self.alertstore.medium(
self.get_slug(),
f"Risky package '{result['package_name']}' had '{perm['name']}' permission set to '{entry['access']}' at {entry['timestamp']}",
entry["timestamp"],
cleaned_result,
)
if detected_permissions:
# We clean the result to only include the risky permission, otherwise the timeline
# will be polluted with all the other irrelevant permissions
cleaned_result = result.copy()
cleaned_result["permissions"] = detected_permissions
self.detected.append(cleaned_result)
def parse(self, output: str) -> None:
self.results: List[Dict[str, Any]] = []
# self.results: List[Dict[str, Any]] = []
perm = {}
package = {}
entry = {}
@@ -3,9 +3,9 @@
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Union
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleSerializedResult, ModuleAtomicResult
class DumpsysBatteryDailyArtifact(AndroidArtifact):
@@ -13,7 +13,7 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
Parser for dumpsys dattery daily updates.
"""
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["from"],
"module": self.__class__.__name__,
@@ -27,10 +27,10 @@ class DumpsysBatteryDailyArtifact(AndroidArtifact):
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def parse(self, output: str) -> None:
@@ -16,10 +16,10 @@ class DumpsysBatteryHistoryArtifact(AndroidArtifact):
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def parse(self, data: str) -> None:
+6 -4
View File
@@ -20,10 +20,12 @@ class DumpsysDBInfoArtifact(AndroidArtifact):
for result in self.results:
path = result.get("path", "")
for part in path.split("/"):
ioc = self.indicators.check_app_id(part)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(part)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
def parse(self, output: str) -> None:
@@ -12,10 +12,12 @@ class DumpsysPackageActivitiesArtifact(AndroidArtifact):
return
for activity in self.results:
ioc = self.indicators.check_app_id(activity["package_name"])
if ioc:
activity["matched_indicator"] = ioc
self.detected.append(activity)
ioc_match = self.indicators.check_app_id(activity["package_name"])
if ioc_match:
activity["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", activity
)
continue
def parse(self, content: str):
+15 -11
View File
@@ -4,35 +4,39 @@
# https://license.mvt.re/1.1/
import re
from typing import Any, Dict, List, Union
from typing import Any, Dict, List
from mvt.android.utils import ROOT_PACKAGES
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
class DumpsysPackagesArtifact(AndroidArtifact):
def check_indicators(self) -> None:
for result in self.results:
# XXX: De-duplication Package detections
if result["package_name"] in ROOT_PACKAGES:
self.log.warning(
'Found an installed package related to rooting/jailbreaking: "%s"',
result["package_name"],
self.alertstore.medium(
self.get_slug(),
f'Found an installed package related to rooting/jailbreaking: "{result["package_name"]}"',
"",
result,
)
self.detected.append(result)
self.alertstore.log_latest()
continue
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result.get("package_name", ""))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
timestamps = [
{"event": "package_install", "timestamp": record["timestamp"]},
{
@@ -16,10 +16,10 @@ class DumpsysPlatformCompatArtifact(AndroidArtifact):
return
for result in self.results:
ioc = self.indicators.check_app_id(result["package_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result["package_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def parse(self, data: str) -> None:
@@ -50,10 +50,12 @@ class DumpsysReceiversArtifact(AndroidArtifact):
if not self.indicators:
continue
ioc = self.indicators.check_app_id(receiver["package_name"])
if ioc:
receiver["matched_indicator"] = ioc
self.detected.append({intent: receiver})
ioc_match = self.indicators.check_app_id(receiver["package_name"])
if ioc_match:
receiver["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", {intent: receiver}
)
continue
def parse(self, output: str) -> None:
+2 -2
View File
@@ -2,13 +2,13 @@
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from typing import Union
from .artifact import AndroidArtifact
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
class FileTimestampsArtifact(AndroidArtifact):
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for ts in set(
+8 -5
View File
@@ -59,13 +59,16 @@ class GetProp(AndroidArtifact):
self.log.info("%s: %s", entry["name"], entry["value"])
if entry["name"] == "ro.build.version.security_patch":
warn_android_patch_level(entry["value"], self.log)
warning_message = warn_android_patch_level(entry["value"], self.log)
self.alertstore.medium(self.get_slug(), warning_message, "", entry)
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_android_property_name(result.get("name", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_android_property_name(
result.get("name", "")
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
+8 -8
View File
@@ -58,13 +58,13 @@ class Processes(AndroidArtifact):
if result["proc_name"] == "gatekeeperd":
continue
ioc = self.indicators.check_app_id(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
+21 -14
View File
@@ -4,12 +4,13 @@
# https://license.mvt.re/1.1/
import datetime
from typing import List, Optional, Union
from typing import List, Optional
import pydantic
import betterproto
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from mvt.android.parsers.proto.tombstone import Tombstone
from .artifact import AndroidArtifact
@@ -75,7 +76,7 @@ class TombstoneCrashArtifact(AndroidArtifact):
This parser can parse both text and protobuf tombstone crash files.
"""
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["timestamp"],
"module": self.__class__.__name__,
@@ -91,18 +92,20 @@ class TombstoneCrashArtifact(AndroidArtifact):
return
for result in self.results:
ioc = self.indicators.check_process(result["process_name"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(result["process_name"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
if result.get("command_line", []):
command_name = result.get("command_line")[0].split("/")[-1]
ioc = self.indicators.check_process(command_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(command_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
SUSPICIOUS_UIDS = [
@@ -111,11 +114,15 @@ class TombstoneCrashArtifact(AndroidArtifact):
2000, # shell
]
if result["uid"] in SUSPICIOUS_UIDS:
self.log.warning(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
self.alertstore.medium(
self.get_slug(),
(
f"Potentially suspicious crash in process '{result['process_name']}' "
f"running as UID '{result['uid']}' in tombstone '{result['file_name']}' at {result['timestamp']}"
),
"",
result,
)
self.detected.append(result)
def parse_protobuf(
self, file_name: str, file_timestamp: datetime.datetime, data: bytes
+2 -2
View File
@@ -11,7 +11,7 @@ import tarfile
from pathlib import Path
from typing import List, Optional
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.modules.backup.base import BackupModule
from mvt.android.modules.backup.helpers import prompt_or_load_android_backup_password
from mvt.android.parsers.backup import (
AndroidBackupParsingError,
@@ -113,7 +113,7 @@ class CmdAndroidCheckBackup(Command):
)
sys.exit(1)
def module_init(self, module: BackupExtraction) -> None: # type: ignore[override]
def module_init(self, module: BackupModule) -> None: # type: ignore[override]
if self.backup_type == "folder":
module.from_dir(self.target_path, self.backup_files)
else:
+12 -6
View File
@@ -6,9 +6,14 @@
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from .base import AndroidExtraction
@@ -25,7 +30,7 @@ class ChromeHistory(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -37,7 +42,7 @@ class ChromeHistory(AndroidExtraction):
)
self.results = []
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -51,9 +56,10 @@ class ChromeHistory(AndroidExtraction):
return
for result in self.results:
if self.indicators.check_url(result["url"]):
self.detected.append(result)
continue
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.
+2 -1
View File
@@ -8,6 +8,7 @@ import os
from typing import Optional
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class DumpsysFull(AndroidExtraction):
@@ -20,7 +21,7 @@ class DumpsysFull(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+2 -1
View File
@@ -9,6 +9,7 @@ import stat
from typing import Optional, Union
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import ModuleResults
from .base import AndroidExtraction
@@ -32,7 +33,7 @@ class Files(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+2 -1
View File
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class Getprop(GetPropArtifact, AndroidExtraction):
@@ -21,7 +22,7 @@ class Getprop(GetPropArtifact, AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+2 -1
View File
@@ -8,6 +8,7 @@ import os
from typing import Optional
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class Logcat(AndroidExtraction):
@@ -20,7 +21,7 @@ class Logcat(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+60 -60
View File
@@ -4,12 +4,7 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from rich.console import Console
from rich.progress import track
from rich.table import Table
from rich.text import Text
from typing import Optional
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
from mvt.android.utils import (
@@ -19,7 +14,11 @@ from mvt.android.utils import (
SECURITY_PACKAGES,
SYSTEM_UPDATE_PACKAGES,
)
from mvt.common.virustotal import VTNoKey, VTQuotaExceeded, virustotal_lookup
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from .base import AndroidExtraction
@@ -34,7 +33,7 @@ class Packages(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -46,7 +45,7 @@ class Packages(AndroidExtraction):
)
self._user_needed = False
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
timestamps = [
@@ -95,70 +94,71 @@ class Packages(AndroidExtraction):
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("package_name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc_match = self.indicators.check_app_id(result.get("package_name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
for package_file in result.get("files", []):
ioc = self.indicators.check_file_hash(package_file["sha256"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
@staticmethod
def check_virustotal(packages: list) -> None:
hashes = []
for package in packages:
for file in package.get("files", []):
if file["sha256"] not in hashes:
hashes.append(file["sha256"])
# @staticmethod
# def check_virustotal(packages: list) -> None:
# hashes = []
# for package in packages:
# for file in package.get("files", []):
# if file["sha256"] not in hashes:
# hashes.append(file["sha256"])
total_hashes = len(hashes)
detections = {}
# total_hashes = len(hashes)
# detections = {}
progress_desc = f"Looking up {total_hashes} files..."
for i in track(range(total_hashes), description=progress_desc):
try:
results = virustotal_lookup(hashes[i])
except VTNoKey:
return
except VTQuotaExceeded as exc:
print("Unable to continue: %s", exc)
break
# progress_desc = f"Looking up {total_hashes} files..."
# for i in track(range(total_hashes), description=progress_desc):
# try:
# results = virustotal_lookup(hashes[i])
# except VTNoKey:
# return
# except VTQuotaExceeded as exc:
# print("Unable to continue: %s", exc)
# break
if not results:
continue
# if not results:
# continue
positives = results["attributes"]["last_analysis_stats"]["malicious"]
total = len(results["attributes"]["last_analysis_results"])
# positives = results["attributes"]["last_analysis_stats"]["malicious"]
# total = len(results["attributes"]["last_analysis_results"])
detections[hashes[i]] = f"{positives}/{total}"
# detections[hashes[i]] = f"{positives}/{total}"
table = Table(title="VirusTotal Packages Detections")
table.add_column("Package name")
table.add_column("File path")
table.add_column("Detections")
# table = Table(title="VirusTotal Packages Detections")
# table.add_column("Package name")
# table.add_column("File path")
# table.add_column("Detections")
for package in packages:
for file in package.get("files", []):
row = [package["package_name"], file["path"]]
# for package in packages:
# for file in package.get("files", []):
# row = [package["package_name"], file["path"]]
if file["sha256"] in detections:
detection = detections[file["sha256"]]
positives = detection.split("/")[0]
if int(positives) > 0:
row.append(Text(detection, "red bold"))
else:
row.append(detection)
else:
row.append("not found")
# if file["sha256"] in detections:
# detection = detections[file["sha256"]]
# positives = detection.split("/")[0]
# if int(positives) > 0:
# row.append(Text(detection, "red bold"))
# else:
# row.append(detection)
# else:
# row.append("not found")
table.add_row(*row)
# table.add_row(*row)
console = Console()
console.print(table)
# console = Console()
# console.print(table)
@staticmethod
def parse_package_for_details(output: str) -> dict:
+2 -1
View File
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class Processes(ProcessesArtifact, AndroidExtraction):
@@ -21,7 +22,7 @@ class Processes(ProcessesArtifact, AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+2 -1
View File
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class RootBinaries(AndroidExtraction):
@@ -19,7 +20,7 @@ class RootBinaries(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from .base import AndroidExtraction
from mvt.common.module_types import ModuleResults
class SELinuxStatus(AndroidExtraction):
@@ -21,7 +22,7 @@ class SELinuxStatus(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+2 -1
View File
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.settings import Settings as SettingsArtifact
from mvt.common.module_types import ModuleResults
from .base import AndroidExtraction
@@ -21,7 +22,7 @@ class Settings(SettingsArtifact, AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+14 -6
View File
@@ -6,11 +6,16 @@
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.android.parsers.backup import AndroidBackupParsingError, parse_tar_for_sms
from mvt.common.module import InsufficientPrivileges
from mvt.common.utils import check_for_links, convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from .base import AndroidExtraction
@@ -51,7 +56,7 @@ class SMS(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -64,7 +69,7 @@ class SMS(AndroidExtraction):
self.sms_db_type = 0
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
body = record["body"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
@@ -85,9 +90,12 @@ class SMS(AndroidExtraction):
if message_links == []:
message_links = check_for_links(message["body"])
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.
+8 -3
View File
@@ -7,11 +7,16 @@ import base64
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import check_for_links, convert_unix_to_iso
from .base import AndroidExtraction
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
WHATSAPP_PATH = "data/data/com.whatsapp/databases/msgstore.db"
@@ -26,7 +31,7 @@ class Whatsapp(AndroidExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -37,7 +42,7 @@ class Whatsapp(AndroidExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
text = record["data"].replace("\n", "\\n")
return {
"timestamp": record["isodate"],
+21 -18
View File
@@ -10,10 +10,15 @@ import logging
try:
import zoneinfo
except ImportError:
from backports import zoneinfo
from typing import Optional, Union
from backports import zoneinfo # type: ignore
from typing import Optional
from mvt.android.modules.androidqf.base import AndroidQFModule
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleSerializedResult,
)
from mvt.common.utils import convert_datetime_to_iso
SUSPICIOUS_PATHS = [
@@ -36,7 +41,7 @@ class AQFFiles(AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -47,7 +52,7 @@ class AQFFiles(AndroidQFModule):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for ts in set(
@@ -82,10 +87,11 @@ class AQFFiles(AndroidQFModule):
return
for result in self.results:
ioc = self.indicators.check_file_path(result["path"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_path(result["path"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
continue
# NOTE: Update with final path used for Android collector.
@@ -98,20 +104,17 @@ class AQFFiles(AndroidQFModule):
if self.file_is_executable(result["mode"]):
file_type = "executable "
self.log.warning(
'Found %sfile at suspicious path "%s".',
file_type,
result["path"],
)
self.detected.append(result)
msg = f'Found {file_type}file at suspicious path "{result["path"]}"'
self.alertstore.high(self.get_slug(), msg, "", result)
self.alertstore.log_latest()
if result.get("sha256", "") == "":
continue
ioc = self.indicators.check_file_hash(result["sha256"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_hash(result["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
# TODO: adds SHA1 and MD5 when available in MVT
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFGetProp(GetPropArtifact, AndroidQFModule):
@@ -21,7 +22,7 @@ class AQFGetProp(GetPropArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -9,6 +9,7 @@ import logging
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleResults
from .base import AndroidQFModule
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
@@ -25,7 +26,7 @@ class AQFLogTimestamps(FileTimestampsArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -17,6 +17,7 @@ from mvt.android.utils import (
)
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFPackages(AndroidQFModule):
@@ -29,7 +30,7 @@ class AQFPackages(AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -43,79 +44,98 @@ class AQFPackages(AndroidQFModule):
def check_indicators(self) -> None:
for result in self.results:
if result["name"] in ROOT_PACKAGES:
self.log.warning(
'Found an installed package related to rooting/jailbreaking: "%s"',
result["name"],
self.alertstore.medium(
self.get_slug(),
f'Found an installed package related to rooting/jailbreaking: "{result["name"]}"',
"",
result,
)
self.detected.append(result)
self.alertstore.log_latest()
continue
# Detections for apps installed via unusual methods
# Detections for apps installed via unusual methods.
if result["installer"] in THIRD_PARTY_STORE_INSTALLERS:
self.log.warning(
'Found a package installed via a third party store (installer="%s"): "%s"',
result["installer"],
result["name"],
self.alertstore.info(
self.get_slug(),
f'Found a package installed via a third party store (installer="{result["installer"]}"): "{result["name"]}"',
"",
result,
)
self.alertstore.log_latest()
elif result["installer"] in BROWSER_INSTALLERS:
self.log.warning(
'Found a package installed via a browser (installer="%s"): "%s"',
result["installer"],
result["name"],
self.alertstore.medium(
self.get_slug(),
f'Found a package installed via a browser (installer="{result["installer"]}"): "{result["name"]}"',
"",
result,
)
self.detected.append(result)
self.alertstore.log_latest()
elif result["installer"] == "null" and result["system"] is False:
self.log.warning(
'Found a non-system package installed via adb or another method: "%s"',
result["name"],
self.alertstore.high(
self.get_slug(),
f'Found a non-system package installed via adb or another method: "{result["name"]}"',
"",
result,
)
self.detected.append(result)
self.alertstore.log_latest()
elif result["installer"] in PLAY_STORE_INSTALLERS:
pass
# Check for disabled security or software update packages
# Check for disabled security or software update packages.
package_disabled = result.get("disabled", None)
if result["name"] in SECURITY_PACKAGES and package_disabled:
self.log.warning(
'Security package "%s" disabled on the phone', result["name"]
self.alertstore.high(
self.get_slug(),
f'Security package "{result["name"]}" disabled on the phone',
"",
result,
)
self.alertstore.log_latest()
if result["name"] in SYSTEM_UPDATE_PACKAGES and package_disabled:
self.log.warning(
'System OTA update package "%s" disabled on the phone',
result["name"],
self.alertstore.high(
self.get_slug(),
f'System OTA update package "{result["name"]}" disabled on the phone',
"",
result,
)
self.alertstore.log_latest()
if not self.indicators:
continue
ioc = self.indicators.check_app_id(result.get("name"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_id(result.get("name"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
for package_file in result.get("files", []):
ioc = self.indicators.check_file_hash(package_file["sha256"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_hash(package_file["sha256"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.log_latest()
if "certificate" not in package_file:
continue
# The keys generated by AndroidQF have a leading uppercase character
# The keys generated by AndroidQF have a leading uppercase character.
for hash_type in ["Md5", "Sha1", "Sha256"]:
certificate_hash = package_file["certificate"][hash_type]
ioc = self.indicators.check_app_certificate_hash(certificate_hash)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_app_certificate_hash(
certificate_hash
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
self.alertstore.log_latest()
break
# Deduplicate the detected packages
dedupe_detected_dict = {str(item): item for item in self.detected}
self.detected = list(dedupe_detected_dict.values())
def run(self) -> None:
packages = self._get_files_by_pattern("*/packages.json")
if not packages:
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.processes import Processes as ProcessesArtifact
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFProcesses(ProcessesArtifact, AndroidQFModule):
@@ -21,7 +22,7 @@ class AQFProcesses(ProcessesArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.settings import Settings as SettingsArtifact
from .base import AndroidQFModule
from mvt.common.module_types import ModuleResults
class AQFSettings(SettingsArtifact, AndroidQFModule):
@@ -21,7 +22,7 @@ class AQFSettings(SettingsArtifact, AndroidQFModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+3 -2
View File
@@ -7,9 +7,10 @@ import fnmatch
import logging
import os
import zipfile
from typing import Any, Dict, List, Optional, Union
from typing import List, Optional
from mvt.common.module import MVTModule
from mvt.common.module_types import ModuleResults
class AndroidQFModule(MVTModule):
@@ -22,7 +23,7 @@ class AndroidQFModule(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Union[List[Dict[str, Any]], Dict[str, Any], None] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+6 -2
View File
@@ -53,8 +53,12 @@ class SMS(AndroidQFModule):
if "body" not in message:
continue
if self.indicators.check_domains(message.get("links", [])):
self.detected.append(message)
ioc_match = self.indicators.check_domains(message.get("links", []))
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
def parse_backup(self, data):
header = parse_ab_header(data)
+3 -3
View File
@@ -9,10 +9,10 @@ import os
from tarfile import TarFile
from typing import List, Optional
from mvt.common.module import MVTModule
from mvt.common.module import MVTModule, ModuleResults
class BackupExtraction(MVTModule):
class BackupModule(MVTModule):
"""This class provides a base for all backup extractios modules"""
def __init__(
@@ -22,7 +22,7 @@ class BackupExtraction(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+10 -5
View File
@@ -6,12 +6,13 @@
import logging
from typing import Optional
from mvt.android.modules.backup.base import BackupExtraction
from mvt.android.modules.backup.base import BackupModule
from mvt.android.parsers.backup import parse_sms_file
from mvt.common.utils import check_for_links
from mvt.common.module_types import ModuleResults
class SMS(BackupExtraction):
class SMS(BackupModule):
def __init__(
self,
file_path: Optional[str] = None,
@@ -19,7 +20,7 @@ class SMS(BackupExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -43,8 +44,12 @@ class SMS(BackupExtraction):
if message_links == []:
message_links = check_for_links(message.get("text", ""))
if self.indicators.check_urls(message_links):
self.detected.append(message)
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
message["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", message
)
continue
def run(self) -> None:
+2 -2
View File
@@ -10,7 +10,7 @@ import os
from typing import List, Optional
from zipfile import ZipFile
from mvt.common.module import MVTModule
from mvt.common.module import MVTModule, ModuleResults
class BugReportModule(MVTModule):
@@ -23,7 +23,7 @@ class BugReportModule(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_accessibility import DumpsysAccessibilityArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysAccessibility(DumpsysAccessibilityArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.dumpsys_package_activities import (
DumpsysPackageActivitiesArtifact,
)
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -23,7 +24,7 @@ class DumpsysActivities(DumpsysPackageActivitiesArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_adb import DumpsysADBArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysADBState(DumpsysADBArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_appops import DumpsysAppopsArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysAppops(DumpsysAppopsArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_daily import DumpsysBatteryDailyArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysBatteryDaily(DumpsysBatteryDailyArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_battery_history import DumpsysBatteryHistoryArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysBatteryHistory(DumpsysBatteryHistoryArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_dbinfo import DumpsysDBInfoArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -23,7 +24,7 @@ class DumpsysDBInfo(DumpsysDBInfoArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.getprop import GetProp as GetPropArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysGetProp(GetPropArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -6,6 +6,7 @@
import logging
from typing import Optional
from mvt.common.module_types import ModuleResults
from mvt.android.artifacts.dumpsys_packages import DumpsysPackagesArtifact
from mvt.android.utils import DANGEROUS_PERMISSIONS, DANGEROUS_PERMISSIONS_THRESHOLD
@@ -22,7 +23,7 @@ class DumpsysPackages(DumpsysPackagesArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -9,6 +9,7 @@ from typing import Optional
from mvt.android.artifacts.dumpsys_platform_compat import DumpsysPlatformCompatArtifact
from mvt.android.modules.bugreport.base import BugReportModule
from mvt.common.module_types import ModuleResults
class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
@@ -21,7 +22,7 @@ class DumpsysPlatformCompat(DumpsysPlatformCompatArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.dumpsys_receivers import DumpsysReceiversArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -21,7 +22,7 @@ class DumpsysReceivers(DumpsysReceiversArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -8,6 +8,7 @@ from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from .base import BugReportModule
from mvt.common.module_types import ModuleResults
from mvt.android.artifacts.file_timestamps import FileTimestampsArtifact
@@ -23,7 +24,7 @@ class BugReportTimestamps(FileTimestampsArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,6 +7,7 @@ import logging
from typing import Optional
from mvt.android.artifacts.tombstone_crashes import TombstoneCrashArtifact
from mvt.common.module_types import ModuleResults
from .base import BugReportModule
@@ -22,7 +23,7 @@ class Tombstones(TombstoneCrashArtifact, BugReportModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+5 -5
View File
@@ -6,16 +6,16 @@ from datetime import datetime, timedelta
from typing import List
def warn_android_patch_level(patch_level: str, log) -> bool:
def warn_android_patch_level(patch_level: str, log) -> str:
"""Alert if Android patch level out-of-date"""
patch_date = datetime.strptime(patch_level, "%Y-%m-%d")
if (datetime.now() - patch_date) > timedelta(days=6 * 31):
log.warning(
"This phone has not received security updates "
"for more than six months (last update: %s)",
warning_message = (
f"This phone has not received security updates "
f"for more than six months (last update: {patch_level}).",
patch_level,
)
return True
return warning_message
return False
+5 -21
View File
@@ -2,27 +2,11 @@
# Copyright (c) 2021-2023 The MVT Authors.
# Use of this software is governed by the MVT License 1.1 that can be found at
# https://license.mvt.re/1.1/
from .module import MVTModule
class Artifact:
class Artifact(MVTModule):
"""Base class for artifacts.
XXX: Inheriting from MVTModule to have the same signature as other modules. Not sure if this is a good idea.
"""
Main artifact class
"""
def __init__(self, *args, **kwargs):
self.results = []
self.detected = []
self.indicators = None
super().__init__(*args, **kwargs)
def parse(self, entry: str):
"""
Parse the artifact, adds the parsed information to self.results
"""
raise NotImplementedError
def check_indicators(self) -> None:
"""Check the results of this module against a provided list of
indicators coming from self.indicators
"""
raise NotImplementedError
+1 -1
View File
@@ -78,7 +78,7 @@ class CmdCheckIOCS(Command):
except NotImplementedError:
continue
else:
total_detections += len(m.detected)
total_detections += len(m.alertstore.alerts)
if total_detections > 0:
log.warning(
-2
View File
@@ -58,11 +58,9 @@ class Command:
# This list will contain all executed modules.
# We can use this to reference e.g. self.executed[0].results.
self.executed = []
self.detected_count = 0
self.hashes = hashes
self.hash_values = []
self.timeline = []
self.timeline_detected = []
# Load IOCs
self._create_storage()
+3 -3
View File
@@ -590,9 +590,9 @@ class Indicators:
if not file_path:
return None
ioc = self.check_file_name(os.path.basename(file_path))
if ioc:
return ioc
ioc_match = self.check_file_name(os.path.basename(file_path))
if ioc_match:
return ioc_match
for ioc in self.get_iocs("file_paths"):
# Strip any trailing slash from indicator paths to match
+13 -12
View File
@@ -76,7 +76,6 @@ class MVTModule:
self.alertstore: AlertStore = AlertStore(log=log)
self.results: ModuleResults = results if results else []
self.detected: ModuleResults = []
self.timeline: ModuleTimeline = []
self.timeline_detected: ModuleTimeline = []
@@ -126,11 +125,13 @@ class MVTModule:
exc,
)
if self.detected:
if self.alertstore.alerts:
detected_file_name = f"{name}_detected.json"
detected_json_path = os.path.join(self.results_path, detected_file_name)
with open(detected_json_path, "w", encoding="utf-8") as handle:
json.dump(self.detected, handle, indent=4, cls=CustomJSONEncoder)
json.dump(
self.alertstore.alerts, handle, indent=4, cls=CustomJSONEncoder
)
def serialize(self, result: ModuleAtomicResult) -> ModuleSerializedResult:
raise NotImplementedError
@@ -165,17 +166,17 @@ class MVTModule:
else:
self.timeline.append(record)
for detected in self.detected:
record = self.serialize(detected)
if record:
if isinstance(record, list):
self.timeline_detected.extend(record)
else:
self.timeline_detected.append(record)
# for detected in self.alertstore.alerts:
# record = self.serialize(detected)
# if record:
# if isinstance(record, list):
# self.timeline_detected.extend(record)
# else:
# self.timeline_detected.append(record)
# De-duplicate timeline entries.
self.timeline = self._deduplicate_timeline(self.timeline)
self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
# self.timeline_detected = self._deduplicate_timeline(self.timeline_detected)
def run(self) -> None:
"""Run the main module procedure."""
@@ -230,7 +231,7 @@ def run_module(module: MVTModule) -> None:
)
else:
if module.indicators and not module.detected:
if module.indicators and not module.alertstore.alerts:
module.log.info(
"The %s module produced no detections!", module.__class__.__name__
)
+2 -1
View File
@@ -9,6 +9,7 @@ import plistlib
from typing import Optional
from mvt.common.module import DatabaseNotFoundError
from mvt.common.module_types import ModuleResults
from mvt.ios.versions import get_device_desc_from_id, is_ios_version_outdated
from ..base import IOSExtraction
@@ -24,7 +25,7 @@ class BackupInfo(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -7,9 +7,14 @@ import logging
import os
import plistlib
from base64 import b64encode
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -28,7 +33,7 @@ class ConfigurationProfiles(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -39,7 +44,7 @@ class ConfigurationProfiles(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
if not record["install_date"]:
return {}
@@ -63,28 +68,28 @@ class ConfigurationProfiles(IOSExtraction):
# Alert on any known malicious configuration profiles in the
# indicator list.
ioc = self.indicators.check_profile(result["plist"]["PayloadUUID"])
if ioc:
self.log.warning(
"Found a known malicious configuration "
'profile "%s" with UUID %s',
result["plist"]["PayloadDisplayName"],
result["plist"]["PayloadUUID"],
ioc_match = self.indicators.check_profile(
result["plist"]["PayloadUUID"]
)
if ioc_match:
warning_message = (
f'Found a known malicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with UUID "{result["plist"]["PayloadUUID"]}"',
)
result["matched_indicator"] = ioc
self.detected.append(result)
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), warning_message, "", result
)
self.alertstore.log_latest()
continue
# Highlight suspicious configuration profiles which may be used
# to hide notifications.
if payload_content["PayloadType"] in ["com.apple.notificationsettings"]:
self.log.warning(
"Found a potentially suspicious configuration profile "
'"%s" with payload type %s',
result["plist"]["PayloadDisplayName"],
payload_content["PayloadType"],
warning_message = (
f'Found a potentially suspicious configuration profile "{result["plist"]["PayloadDisplayName"]}" with payload type {payload_content["PayloadType"]}',
)
self.detected.append(result)
self.alertstore.medum(self.get_slug(), warning_message, "", result)
self.alertstore.log_latest()
continue
def run(self) -> None:
+19 -12
View File
@@ -13,6 +13,11 @@ from typing import Optional
from mvt.common.module import DatabaseNotFoundError
from mvt.common.url import URL
from mvt.common.utils import convert_datetime_to_iso, convert_unix_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -27,7 +32,7 @@ class Manifest(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -60,7 +65,7 @@ class Manifest(IOSExtraction):
return convert_unix_to_iso(timestamp_or_unix_time_int)
def serialize(self, record: dict) -> []:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
if "modified" not in record or "status_changed" not in record:
return records
@@ -95,8 +100,10 @@ class Manifest(IOSExtraction):
if not self.indicators:
continue
if self.indicators.check_file_path("/" + result["relative_path"]):
self.detected.append(result)
ioc_match = self.indicators.check_file_path("/" + result["relative_path"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
continue
rel_path = result["relative_path"].lower()
@@ -107,15 +114,15 @@ class Manifest(IOSExtraction):
except Exception:
continue
ioc = self.indicators.check_url(part)
if ioc:
self.log.warning(
'Found mention of domain "%s" in a backup file with path: %s',
ioc["value"],
rel_path,
ioc_match = self.indicators.check_url(part)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f'Found mention of domain "{ioc_match.ioc.value}" in a backup file with path: {rel_path}',
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
def run(self) -> None:
manifest_db_path = os.path.join(self.target_path, "Manifest.db")
+23 -20
View File
@@ -5,9 +5,14 @@
import logging
import plistlib
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -29,7 +34,7 @@ class ProfileEvents(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,7 +45,7 @@ class ProfileEvents(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record.get("timestamp"),
"module": self.__class__.__name__,
@@ -51,20 +56,27 @@ class ProfileEvents(IOSExtraction):
}
def check_indicators(self) -> None:
for result in self.results:
message = f'On {result.get("timestamp")} process "{result.get("timestamp")}" started operation "{result.get("operation")}" of profile "{result.get("profile_id")}"'
self.alertstore.low(
self.get_slug(), message, result.get("timestamp"), result
)
self.alertstore.log_latest()
if not self.indicators:
return
for result in self.results:
ioc = self.indicators.check_process(result.get("process"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(result.get("process"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
ioc = self.indicators.check_profile(result.get("profile_id"))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_profile(result.get("profile_id"))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
@staticmethod
def parse_profile_events(file_data: bytes) -> list:
@@ -109,13 +121,4 @@ class ProfileEvents(IOSExtraction):
with open(events_file_path, "rb") as handle:
self.results.extend(self.parse_profile_events(handle.read()))
for result in self.results:
self.log.info(
'On %s process "%s" started operation "%s" of profile "%s"',
result.get("timestamp"),
result.get("process"),
result.get("operation"),
result.get("profile_id"),
)
self.log.info("Extracted %d profile events", len(self.results))
+3 -2
View File
@@ -11,7 +11,8 @@ import sqlite3
import subprocess
from typing import Iterator, Optional, Union
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError, MVTModule
from mvt.common.module import DatabaseCorruptedError, DatabaseNotFoundError
from mvt.common.module import MVTModule, ModuleResults
class IOSExtraction(MVTModule):
@@ -25,7 +26,7 @@ class IOSExtraction(MVTModule):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+23 -16
View File
@@ -7,9 +7,14 @@ import copy
import logging
import plistlib
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -29,7 +34,7 @@ class Analytics(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,7 +45,7 @@ class Analytics(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -57,24 +62,26 @@ class Analytics(IOSExtraction):
if not isinstance(value, str):
continue
ioc = self.indicators.check_process(value)
if ioc:
self.log.warning(
'Found mention of a malicious process "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
ioc_match = self.indicators.check_process(value)
if ioc_match:
warning_message = (
f'Found mention of a malicious process "{value}" in {result["artifact"]} file at {result["isodate"]}',
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
new_result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), warning_message, "", new_result
)
self.alertstore.log_latest()
continue
ioc = self.indicators.check_url(value)
if ioc:
ioc_match = self.indicators.check_url(value)
if ioc_match:
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", new_result
)
def _extract_analytics_data(self):
artifact = self.file_path.split("/")[-1]
@@ -5,9 +5,14 @@
import logging
from datetime import datetime
from typing import Optional, Union
from typing import Optional
from mvt.ios.versions import find_version_by_build
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
from .analytics import Analytics
@@ -25,7 +30,7 @@ class AnalyticsIOSVersions(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -36,7 +41,7 @@ class AnalyticsIOSVersions(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
+16 -10
View File
@@ -6,8 +6,13 @@
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -19,7 +24,7 @@ class CacheFiles(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -30,7 +35,7 @@ class CacheFiles(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for item in self.results[record]:
records.append(
@@ -48,18 +53,19 @@ class CacheFiles(IOSExtraction):
if not self.indicators:
return
self.detected = {}
self.alertstore.alerts = {}
for key, values in self.results.items():
for value in values:
ioc = self.indicators.check_url(value["url"])
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected:
self.detected[key] = [
ioc_match = self.indicators.check_url(value["url"])
if ioc_match:
value["matched_indicator"] = ioc_match.ioc
# XXX: Finish converting this method
if key not in self.alertstore.alerts:
self.alertstore.alerts[key] = [
value,
]
else:
self.detected[key].append(value)
self.alertstore.alerts[key].append(value)
def _process_cache_file(self, file_path):
self.log.info("Processing cache file at path: %s", file_path)
+16 -11
View File
@@ -5,9 +5,14 @@
import logging
import os
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -24,7 +29,7 @@ class Filesystem(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -35,7 +40,7 @@ class Filesystem(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["modified"],
"module": self.__class__.__name__,
@@ -51,19 +56,19 @@ class Filesystem(IOSExtraction):
if "path" not in result:
continue
ioc = self.indicators.check_file_path(result["path"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_path(result["path"])
if ioc_match:
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
# If we are instructed to run fast, we skip the rest.
if self.module_options.get("fast_mode", None):
continue
ioc = self.indicators.check_file_path_process(result["path"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_path_process(result["path"])
if ioc_match:
self.alertstore.high(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
def run(self) -> None:
for root, dirs, files in os.walk(self.target_path):
+2 -1
View File
@@ -7,6 +7,7 @@ import logging
import sqlite3
from typing import Optional
from mvt.common.module_types import ModuleResults
from ..net_base import NetBase
NETUSAGE_ROOT_PATHS = [
@@ -29,7 +30,7 @@ class Netusage(NetBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+14 -9
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -26,7 +31,7 @@ class SafariFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -37,7 +42,7 @@ class SafariFavicon(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -51,13 +56,13 @@ class SafariFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if not ioc:
ioc = self.indicators.check_url(result["icon_url"])
ioc_match = self.indicators.check_url(result["url"])
if not ioc_match:
ioc_match = self.indicators.check_url(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
if ioc_match:
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
def _process_favicon_db(self, file_path):
conn = self._open_sqlite_db(file_path)
+20 -14
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -25,7 +30,7 @@ class ShutdownLog(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -36,7 +41,7 @@ class ShutdownLog(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -50,22 +55,23 @@ class ShutdownLog(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_file_path(result["client"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_file_path(result["client"])
if ioc_match:
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
continue
for ioc in self.indicators.get_iocs("processes"):
parts = result["client"].split("/")
if ioc in parts:
self.log.warning(
'Found mention of a known malicious process "%s" in '
"shutdown.log",
ioc,
)
if ioc.value in parts:
result["matched_indicator"] = ioc
self.detected.append(result)
self.alertstore.critical(
self.get_slug(),
f'Found mention of a known malicious process "{ioc.value}" in shutdown.log',
"",
result,
)
self.alertstore.log_latest()
continue
def process_shutdownlog(self, content):
+8 -3
View File
@@ -6,9 +6,14 @@
import datetime
import json
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -27,7 +32,7 @@ class IOSVersionHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -38,7 +43,7 @@ class IOSVersionHistory(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
+5 -4
View File
@@ -18,10 +18,11 @@ class WebkitBase(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def _process_webkit_folder(self, root_paths):
for found_path in self._get_fs_files_from_patterns(root_paths):
+8 -3
View File
@@ -4,8 +4,13 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from .webkit_base import WebkitBase
WEBKIT_INDEXEDDB_ROOT_PATHS = [
@@ -29,7 +34,7 @@ class WebkitIndexedDB(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,7 +45,7 @@ class WebkitIndexedDB(WebkitBase):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -4,8 +4,13 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from .webkit_base import WebkitBase
WEBKIT_LOCALSTORAGE_ROOT_PATHS = [
@@ -27,7 +32,7 @@ class WebkitLocalStorage(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -38,7 +43,7 @@ class WebkitLocalStorage(WebkitBase):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -6,6 +6,7 @@
import logging
from typing import Optional
from mvt.common.module_types import ModuleResults
from .webkit_base import WebkitBase
WEBKIT_SAFARIVIEWSERVICE_ROOT_PATHS = [
@@ -27,7 +28,7 @@ class WebkitSafariViewService(WebkitBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+36 -24
View File
@@ -8,11 +8,13 @@ import logging
import os
import plistlib
from datetime import datetime, timezone
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, Optional
from mvt.common.module import DatabaseNotFoundError
from mvt.common.utils import convert_datetime_to_iso
from mvt.ios.modules.base import IOSExtraction
from mvt.common.module import ModuleResults, ModuleAtomicResult, ModuleSerializedResult
APPLICATIONS_DB_PATH = [
"private/var/containers/Bundle/Application/*/iTunesMetadata.plist"
@@ -35,7 +37,7 @@ class Applications(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -46,7 +48,7 @@ class Applications(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
if "isodate" in record:
return {
"timestamp": record["isodate"],
@@ -60,41 +62,51 @@ class Applications(IOSExtraction):
for result in self.results:
if self.indicators:
if "softwareVersionBundleId" not in result:
self.log.warning(
"Suspicious application identified without softwareVersionBundleId"
self.alertstore.high(
self.get_slug(),
"Suspicious application identified without softwareVersionBundleId",
"",
result,
)
self.detected.append(result)
continue
ioc = self.indicators.check_process(result["softwareVersionBundleId"])
if ioc:
self.log.warning(
"Malicious application %s identified",
result["softwareVersionBundleId"],
ioc_match = self.indicators.check_process(
result["softwareVersionBundleId"]
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(),
f"Malicious application {result['softwareVersionBundleId']} identified",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
ioc = self.indicators.check_app_id(result["softwareVersionBundleId"])
if ioc:
self.log.warning(
"Malicious application %s identified",
result["softwareVersionBundleId"],
ioc_match = self.indicators.check_app_id(
result["softwareVersionBundleId"]
)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(),
f"Malicious application {result['softwareVersionBundleId']} identified",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
continue
# Some apps installed from apple store with sourceApp "com.apple.AppStore.ProductPageExtension"
if (
result.get("sourceApp", "com.apple.AppStore")
not in KNOWN_APP_INSTALLERS
):
self.log.warning(
"Suspicious app not installed from the App Store or MDM: %s",
result["softwareVersionBundleId"],
self.alertstore.medium(
self.get_slug(),
f"Suspicious app not installed from the App Store or MDM: {result['softwareVersionBundleId']}",
"",
result,
)
self.detected.append(result)
def _parse_itunes_timestamp(self, entry: Dict[str, Any]) -> None:
"""
+20 -10
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -26,7 +31,7 @@ class Calendar(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -44,7 +49,7 @@ class Calendar(IOSExtraction):
"participant_last_modified",
]
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for timestamp in self.timestamps:
if timestamp not in record or not record[timestamp]:
@@ -64,18 +69,23 @@ class Calendar(IOSExtraction):
def check_indicators(self) -> None:
for result in self.results:
if result["participant_email"] and self.indicators:
ioc = self.indicators.check_email(result["participant_email"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_email(result["participant_email"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
# Custom check for Quadream exploit
if result["summary"] == "Meeting" and result["description"] == "Notes":
self.log.warning(
"Potential Quadream exploit event identified: %s", result["uuid"]
self.alertstore.high(
self.get_slug(),
f"Potential Quadream exploit event identified: {result['uuid']}",
"",
result,
)
self.detected.append(result)
self.alertstore.log_latest()
def _parse_calendar_db(self):
"""
+3 -2
View File
@@ -4,9 +4,10 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import ModuleAtomicResult, ModuleSerializedResult
from ..base import IOSExtraction
@@ -37,7 +38,7 @@ class Calls(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
+15 -9
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -27,7 +32,7 @@ class ChromeFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -38,7 +43,7 @@ class ChromeFavicon(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -51,12 +56,13 @@ class ChromeFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if not ioc:
ioc = self.indicators.check_url(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["url"])
if not ioc_match:
ioc_match = self.indicators.check_url(result["icon_url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
continue
def run(self) -> None:
+12 -7
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_chrometime_to_datetime, convert_datetime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -29,7 +34,7 @@ class ChromeHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,7 +45,7 @@ class ChromeHistory(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -55,10 +60,10 @@ class ChromeHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
+2 -1
View File
@@ -7,6 +7,7 @@ import logging
import sqlite3
from typing import Optional
from mvt.common.module_types import ModuleResults
from ..base import IOSExtraction
CONTACTS_BACKUP_IDS = [
@@ -27,7 +28,7 @@ class Contacts(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
+14 -9
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -28,7 +33,7 @@ class FirefoxFavicon(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -39,7 +44,7 @@ class FirefoxFavicon(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -53,13 +58,13 @@ class FirefoxFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result.get("url", ""))
if not ioc:
ioc = self.indicators.check_url(result.get("history_url", ""))
ioc_match = self.indicators.check_url(result.get("url", ""))
if not ioc_match:
ioc_match = self.indicators.check_url(result.get("history_url", ""))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
+12 -7
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -32,7 +37,7 @@ class FirefoxHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -43,7 +48,7 @@ class FirefoxHistory(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -56,10 +61,10 @@ class FirefoxHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
@@ -7,6 +7,7 @@ import logging
import plistlib
from typing import Optional
from mvt.common.module_types import ModuleResults
from ..base import IOSExtraction
GLOBAL_PREFERENCES_BACKUP_IDS = ["0dc926a1810f7aee4e8f38793ed788701f93bf9d"]
@@ -25,7 +26,7 @@ class GlobalPreferences(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,9 +41,15 @@ class GlobalPreferences(IOSExtraction):
for entry in self.results:
if entry["entry"] == "LDMGlobalEnabled":
if entry["value"]:
self.log.warning("Lockdown mode enabled")
self.alertstore.info(
self.get_slug(), "Lockdown mode enabled", "", None
)
else:
self.log.warning("Lockdown mode disabled")
self.alertstore.low(
self.get_slug(), "Lockdown mode disabled", "", None
)
self.alertstore.log_latest()
continue
def process_file(self, file_path: str) -> None:
with open(file_path, "rb") as handle:
+19 -11
View File
@@ -6,9 +6,14 @@
import collections
import logging
import plistlib
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -31,7 +36,7 @@ class IDStatusCache(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -42,7 +47,7 @@ class IDStatusCache(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -58,18 +63,21 @@ class IDStatusCache(IOSExtraction):
for result in self.results:
if result.get("user", "").startswith("mailto:"):
email = result["user"][7:].strip("'")
ioc = self.indicators.check_email(email)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_email(email)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
if "\\x00\\x00" in result.get("user", ""):
self.log.warning(
"Found an ID Status Cache entry with suspicious patterns: %s",
result.get("user"),
self.alertstore.high(
self.get_slug(),
f"Found an ID Status Cache entry with suspicious patterns: {result.get('user')}",
"",
result,
)
self.detected.append(result)
def _extract_idstatuscache_entries(self, file_path):
with open(file_path, "rb") as handle:
+8 -3
View File
@@ -5,9 +5,14 @@
import logging
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -223,7 +228,7 @@ class InteractionC(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -247,7 +252,7 @@ class InteractionC(IOSExtraction):
"last_outgoing_recipient_date",
]
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
processed = []
for timestamp in self.timestamps:
+54 -37
View File
@@ -6,9 +6,14 @@
import base64
import logging
import plistlib
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -31,7 +36,7 @@ class LocationdClients(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -54,7 +59,7 @@ class LocationdClients(IOSExtraction):
"BeaconRegionTimeStopped",
]
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
records = []
for timestamp in self.timestamps:
if timestamp in record.keys():
@@ -77,59 +82,71 @@ class LocationdClients(IOSExtraction):
parts = result["package"].split("/")
proc_name = parts[len(parts) - 1]
ioc = self.indicators.check_process(proc_name)
if ioc:
self.log.warning(
"Found a suspicious process name in LocationD entry %s",
result["package"],
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious process name in LocationD entry {result['package']}",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
self.alertstore.log_latest()
continue
if "BundleId" in result:
ioc = self.indicators.check_process(result["BundleId"])
if ioc:
self.log.warning(
"Found a suspicious process name in LocationD entry %s",
result["package"],
ioc_match = self.indicators.check_process(result["BundleId"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious process name in LocationD entry {result['package']}",
"",
result,
)
result["matched_indicator"] = ioc
self.alertstore.log_latest()
if "BundlePath" in result:
ioc = self.indicators.check_file_path(result["BundlePath"])
if ioc:
self.log.warning(
"Found a suspicious file path in Location D: %s",
result["BundlePath"],
ioc_match = self.indicators.check_file_path(result["BundlePath"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['BundlePath']}",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
self.alertstore.log_latest()
continue
if "Executable" in result:
ioc = self.indicators.check_file_path(result["Executable"])
if ioc:
self.log.warning(
"Found a suspicious file path in Location D: %s",
result["Executable"],
ioc_match = self.indicators.check_file_path(result["Executable"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['Executable']}",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
self.alertstore.log_latest()
continue
if "Registered" in result:
# Sometimes registered is a bool
if isinstance(result["Registered"], bool):
continue
ioc = self.indicators.check_file_path(result["Registered"])
if ioc:
self.log.warning(
"Found a suspicious file path in Location D: %s",
result["Registered"],
ioc_match = self.indicators.check_file_path(result["Registered"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(),
f"Found a suspicious file path in LocationD entry {result['Registered']}",
"",
result,
)
result["matched_indicator"] = ioc
self.detected.append(result)
self.alertstore.log_latest()
continue
def _extract_locationd_entries(self, file_path):
+2 -1
View File
@@ -6,6 +6,7 @@
import logging
from typing import Optional
from mvt.common.module_types import ModuleResults
from ..net_base import NetBase
DATAUSAGE_BACKUP_IDS = [
@@ -30,7 +31,7 @@ class Datausage(NetBase):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -5,9 +5,14 @@
import logging
import plistlib
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -30,7 +35,7 @@ class OSAnalyticsADDaily(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -41,7 +46,7 @@ class OSAnalyticsADDaily(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["ts"],
"module": self.__class__.__name__,
@@ -57,10 +62,10 @@ class OSAnalyticsADDaily(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_process(result["package"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(result["package"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
@@ -8,9 +8,14 @@ import logging
import os
import plistlib
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso, keys_bytes_to_string
from mvt.common.module_types import (
ModuleResults,
ModuleSerializedResult,
ModuleAtomicResult,
)
from ..base import IOSExtraction
@@ -31,7 +36,7 @@ class SafariBrowserState(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -44,7 +49,7 @@ class SafariBrowserState(IOSExtraction):
self._session_history_count = 0
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["last_viewed_timestamp"],
"module": self.__class__.__name__,
@@ -58,10 +63,12 @@ class SafariBrowserState(IOSExtraction):
for result in self.results:
if "tab_url" in result:
ioc = self.indicators.check_url(result["tab_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["tab_url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
continue
if "session_data" not in result:
@@ -69,10 +76,12 @@ class SafariBrowserState(IOSExtraction):
for session_entry in result["session_data"]:
if "entry_url" in session_entry:
ioc = self.indicators.check_url(session_entry["entry_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(session_entry["entry_url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", result
)
def _process_browser_state_db(self, db_path):
self._recover_sqlite_db_if_needed(db_path)
+17 -10
View File
@@ -5,10 +5,15 @@
import logging
import os
from typing import Optional, Union
from typing import Optional
from mvt.common.url import URL
from mvt.common.utils import convert_mactime_to_datetime, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleResults,
ModuleAtomicResult,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -33,7 +38,7 @@ class SafariHistory(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -44,7 +49,7 @@ class SafariHistory(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -95,9 +100,11 @@ class SafariHistory(IOSExtraction):
elapsed_ms = elapsed_time.microseconds / 1000
if elapsed_time.seconds == 0:
self.log.warning(
"Redirect took less than a second! (%d milliseconds)",
elapsed_ms,
self.alertstore.medium(
self.get_slug(),
f"Redirect took less than a second! ({elapsed_ms} milliseconds)",
result["timestamp"],
result,
)
def check_indicators(self) -> None:
@@ -107,10 +114,10 @@ class SafariHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["url"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def _process_history_db(self, history_path):
self._recover_sqlite_db_if_needed(history_path)
+12 -7
View File
@@ -8,9 +8,14 @@ import itertools
import logging
import plistlib
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -32,7 +37,7 @@ class Shortcuts(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -43,7 +48,7 @@ class Shortcuts(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
found_urls = ""
if record["action_urls"]:
found_urls = f"- URLs in actions: {', '.join(record['action_urls'])}"
@@ -72,10 +77,10 @@ class Shortcuts(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_urls(result["action_urls"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_urls(result["action_urls"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
+17 -9
View File
@@ -6,9 +6,14 @@
import logging
import sqlite3
from base64 import b64encode
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -30,7 +35,7 @@ class SMS(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -41,7 +46,7 @@ class SMS(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
text = record["text"].replace("\n", "\\n")
sms_data = f'{record["service"]}: {record["guid"]} "{text}" from {record["phone_number"]} ({record["account"]})'
records = [
@@ -71,10 +76,13 @@ class SMS(IOSExtraction):
if message.get("text", "").startswith(alert_old) or message.get(
"text", ""
).startswith(alert_new):
self.log.warning(
"Apple warning about state-sponsored attack received on the %s",
self.alertstore.high(
self.get_slug(),
f"Apple warning about state-sponsored attack received on the {message['isodate']}",
message["isodate"],
message,
)
self.alertstore.log_latest()
if not self.indicators:
return
@@ -84,10 +92,10 @@ class SMS(IOSExtraction):
# Making sure not link was ignored
if message_links == []:
message_links = check_for_links(result.get("text", ""))
ioc = self.indicators.check_urls(message_links)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_urls(message_links)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
+19 -11
View File
@@ -5,9 +5,14 @@
import logging
from base64 import b64encode
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -29,7 +34,7 @@ class SMSAttachments(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -40,7 +45,7 @@ class SMSAttachments(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
return {
"timestamp": record["isodate"],
"module": self.__class__.__name__,
@@ -56,22 +61,25 @@ class SMSAttachments(IOSExtraction):
def check_indicators(self) -> None:
for attachment in self.results:
# Check for known malicious filenames.
if self.indicators and self.indicators.check_file_path(
attachment["filename"]
):
self.detected.append(attachment)
if self.indicators:
ioc_match = self.indicators.check_file_path(attachment["filename"])
if ioc_match:
attachment["matched_indicator"] = ioc_match.ioc
self.alertstore.high(
self.get_slug(), ioc_match.message, "", attachment
)
if (
attachment["filename"].startswith("/var/tmp/")
and attachment["filename"].endswith("-1")
and attachment["direction"] == "received"
):
self.log.warning(
"Suspicious iMessage attachment %s on %s",
attachment["filename"],
self.alertstore.medium(
self.get_slug(),
f"Suspicious iMessage attachment {attachment['filename']} on {attachment['isodate']}",
attachment["isodate"],
attachment,
)
self.detected.append(attachment)
def run(self) -> None:
self._find_ios_database(backup_ids=SMS_BACKUP_IDS, root_paths=SMS_ROOT_PATHS)
+12 -7
View File
@@ -5,9 +5,14 @@
import logging
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -51,7 +56,7 @@ class TCC(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -62,7 +67,7 @@ class TCC(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
if "last_modified" in record:
if "allowed_value" in record:
msg = (
@@ -89,10 +94,10 @@ class TCC(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_process(result["client"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(result["client"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def process_db(self, file_path):
conn = self._open_sqlite_db(file_path)
@@ -6,9 +6,14 @@
import logging
import os
import sqlite3
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_unix_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
from ..base import IOSExtraction
@@ -32,7 +37,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -45,7 +50,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.results = [] if not results else results
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
msg = f"Webkit resource loaded from {record['registrable_domain']}"
if record["domain"] != "":
msg += f" by app in domain {record['domain']}"
@@ -60,12 +65,12 @@ class WebkitResourceLoadStatistics(IOSExtraction):
if not self.indicators:
return
self.detected = []
for result in self.results:
ioc = self.indicators.check_url(result["registrable_domain"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_url(result["registrable_domain"])
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
self.alertstore.log_latest()
def _process_observations_db(self, db_path: str, domain: str, path: str) -> None:
self.log.info(
@@ -9,6 +9,7 @@ import plistlib
from typing import Optional
from mvt.common.utils import convert_datetime_to_iso
from mvt.common.module_types import ModuleResults
from ..base import IOSExtraction
@@ -38,7 +39,7 @@ class WebkitSessionResourceLog(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -86,10 +87,12 @@ class WebkitSessionResourceLog(IOSExtraction):
[entry["origin"]] + source_domains + destination_domains
)
ioc = self.indicators.check_urls(all_origins)
if ioc:
entry["matched_indicator"] = ioc
self.detected.append(entry)
ioc_match = self.indicators.check_urls(all_origins)
if ioc_match:
entry["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, "", entry
)
redirect_path = ""
if len(source_domains) > 0:
@@ -110,9 +113,11 @@ class WebkitSessionResourceLog(IOSExtraction):
redirect_path += ", ".join(destination_domains)
self.log.warning(
"Found HTTP redirect between suspicious domains: %s",
redirect_path,
self.alertstore.high(
self.get_slug(),
f"Found HTTP redirect between suspicious domains: {redirect_path}",
"",
entry,
)
def _extract_browsing_stats(self, log_path):
+12 -7
View File
@@ -4,9 +4,14 @@
# https://license.mvt.re/1.1/
import logging
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import check_for_links, convert_mactime_to_iso
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleResults,
ModuleSerializedResult,
)
from ..base import IOSExtraction
@@ -28,7 +33,7 @@ class Whatsapp(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -39,7 +44,7 @@ class Whatsapp(IOSExtraction):
results=results,
)
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
text = record.get("ZTEXT", "").replace("\n", "\\n")
links_text = ""
if record.get("links"):
@@ -57,10 +62,10 @@ class Whatsapp(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_urls(result.get("links", []))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_urls(result.get("links", []))
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(self.get_slug(), ioc_match.message, "", result)
def run(self) -> None:
self._find_ios_database(
+34 -16
View File
@@ -7,11 +7,16 @@ import logging
import operator
import sqlite3
from pathlib import Path
from typing import Optional, Union
from typing import Optional
from mvt.common.utils import convert_mactime_to_iso
from .base import IOSExtraction
from mvt.common.module_types import (
ModuleAtomicResult,
ModuleSerializedResult,
ModuleResults,
)
class NetBase(IOSExtraction):
@@ -25,7 +30,7 @@ class NetBase(IOSExtraction):
results_path: Optional[str] = None,
module_options: Optional[dict] = None,
log: logging.Logger = logging.getLogger(__name__),
results: Optional[list] = None,
results: ModuleResults = [],
) -> None:
super().__init__(
file_path=file_path,
@@ -129,7 +134,7 @@ class NetBase(IOSExtraction):
self.log.info("Extracted information on %d processes", len(self.results))
def serialize(self, record: dict) -> Union[dict, list]:
def serialize(self, record: ModuleAtomicResult) -> ModuleSerializedResult:
record_data = (
f"{record['proc_name']} (Bundle ID: {record['bundle_id']},"
f" ID: {record['proc_id']})"
@@ -232,7 +237,10 @@ class NetBase(IOSExtraction):
"been truncated in the database)"
)
self.log.warning(msg)
self.alertstore.medium(
self.get_slug(), msg, proc["live_isodate"], proc
)
if not proc["live_proc_id"]:
self.log.info(
"Found process entry in ZPROCESS but not in ZLIVEUSAGE: %s at %s",
@@ -251,16 +259,23 @@ class NetBase(IOSExtraction):
# Avoid duplicate warnings for same process.
if result["live_proc_id"] not in missing_process_cache:
missing_process_cache.add(result["live_proc_id"])
self.log.warning(
"Found manipulated process entry %s. Entry on %s",
result["live_proc_id"],
self.alertstore.high(
self.get_slug(),
f"Found manipulated process entry {result['live_proc_id']}. Entry on {result['live_isodate']}",
result["live_isodate"],
result,
)
self.alertstore.log_latest()
# Set manipulated proc timestamp so it appears in timeline.
result["first_isodate"] = result["isodate"] = result["live_isodate"]
result["proc_name"] = "MANIPULATED [process record deleted]"
self.detected.append(result)
self.alertstore.high(
self.get_slug(),
f"Found manipulated process entry {result['live_proc_id']}/",
result["first_isodate"],
result,
)
def find_deleted(self):
"""Identify process which may have been deleted from the DataUsage
@@ -278,12 +293,13 @@ class NetBase(IOSExtraction):
for proc_id in range(min(all_proc_id), max(all_proc_id)):
if proc_id not in all_proc_id:
previous_proc = results_by_proc[last_proc_id]
self.log.info(
'Missing process %d. Previous process at "%s" (%s)',
proc_id,
self.alertstore.low(
self.get_slug(),
f'Missing process {proc_id}. Previous process at "{previous_proc["first_isodate"]}" ({previous_proc["proc_name"]})',
previous_proc["first_isodate"],
previous_proc["proc_name"],
previous_proc,
)
self.alertstore.log_latest()
missing_procs[proc_id] = {
"proc_id": proc_id,
@@ -333,7 +349,9 @@ class NetBase(IOSExtraction):
if not result["proc_id"]:
continue
ioc = self.indicators.check_process(proc_name)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
ioc_match = self.indicators.check_process(proc_name)
if ioc_match:
result["matched_indicator"] = ioc_match.ioc
self.alertstore.critical(
self.get_slug(), ioc_match.message, result["first_isodate"], result
)
@@ -49,6 +49,6 @@ class TestDumpsysAccessibilityArtifact:
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["app_ids"].append("com.sec.android.app.camera")
da.indicators = ind
assert len(da.detected) == 0
assert len(da.alertstore.alerts) == 0
da.check_indicators()
assert len(da.detected) == 1
assert len(da.alertstore.alerts) == 1
@@ -42,22 +42,24 @@ class TestDumpsysAppopsArtifact:
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["app_ids"].append("com.facebook.katana")
da.indicators = ind
assert len(da.detected) == 0
assert len(da.alertstore.alerts) == 0
da.check_indicators()
detected_by_ioc = [
detected for detected in da.detected if detected.get("matched_indicator")
alert
for alert in da.alertstore.alerts
if "matched_indicator" in alert.event
]
detected_by_permission_heuristic = [
detected
for detected in da.detected
alert
for alert in da.alertstore.alerts
if all(
[
perm["name"] == "REQUEST_INSTALL_PACKAGES"
for perm in detected["permissions"]
for perm in alert.event["permissions"]
]
)
]
assert len(da.detected) == 3
assert len(da.alertstore.alerts) == 3
assert len(detected_by_ioc) == 1
assert len(detected_by_permission_heuristic) == 2
@@ -32,6 +32,6 @@ class TestDumpsysBatteryDailyArtifact:
ind.parse_stix2(indicator_file)
ind.ioc_collections[0]["app_ids"].append("com.facebook.system")
dba.indicators = ind
assert len(dba.detected) == 0
assert len(dba.alertstore.alerts) == 0
dba.check_indicators()
assert len(dba.detected) == 1
assert len(dba.alertstore.alerts) == 1

Some files were not shown because too many files have changed in this diff Show More