From 95b2f04db68750e46b7ade99e85416cf1ce90550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Donncha=20=C3=93=20Cearbhaill?= Date: Wed, 28 Jun 2023 21:46:18 +0200 Subject: [PATCH] WIP for Triangulation post-processing module --- mvt/common/cmd_check_iocs.py | 27 +++-- mvt/common/command.py | 3 +- mvt/common/module.py | 49 +++++++++ mvt/ios/cli.py | 3 +- mvt/ios/modules/backup/manifest.py | 1 + mvt/ios/modules/post_analysis/__init__.py | 3 + .../post_analysis/attachment_deletion.py | 101 ++++++++++++++++++ 7 files changed, 179 insertions(+), 8 deletions(-) create mode 100644 mvt/ios/modules/post_analysis/__init__.py create mode 100644 mvt/ios/modules/post_analysis/attachment_deletion.py diff --git a/mvt/common/cmd_check_iocs.py b/mvt/common/cmd_check_iocs.py index a654202..9f68620 100644 --- a/mvt/common/cmd_check_iocs.py +++ b/mvt/common/cmd_check_iocs.py @@ -7,6 +7,7 @@ import logging import os from typing import Optional +from mvt.common.module import PostAnalysisModule from mvt.common.command import Command log = logging.getLogger(__name__) @@ -32,6 +33,7 @@ class CmdCheckIOCS(Command): def run(self) -> None: assert self.target_path is not None all_modules = [] + post_modules = [] for entry in self.modules: if entry not in all_modules: all_modules.append(entry) @@ -43,18 +45,24 @@ class CmdCheckIOCS(Command): name_only, _ = os.path.splitext(file_name) file_path = os.path.join(self.target_path, file_name) - for iocs_module in all_modules: - if self.module_name and iocs_module.__name__ != self.module_name: + for module in all_modules: + if self.module_name and module.__name__ != self.module_name: continue - if iocs_module().get_slug() != name_only: + # Handle post-analysis modules at the end + if issubclass(module, PostAnalysisModule) and module not in post_modules: + post_modules.append(module) + continue + + # Skip if the current result file does not match the module name + if module().get_slug() != name_only: continue log.info("Loading results from \"%s\" with module %s", - file_name, iocs_module.__name__) + file_name, module.__name__) - m = iocs_module.from_json(file_path, - log=logging.getLogger(iocs_module.__module__)) + m = module.from_json(file_path, + log=logging.getLogger(module.__module__)) if self.iocs.total_ioc_count > 0: m.indicators = self.iocs m.indicators.log = m.log @@ -66,6 +74,13 @@ class CmdCheckIOCS(Command): else: total_detections += len(m.detected) + # Run post-analysis modules at end + for post_module in post_modules: + m = post_module.from_results(self.target_path, log=log) + m.run() + total_detections += len(m.detected) + + if total_detections > 0: log.warning("The check of the results produced %d detections!", total_detections) diff --git a/mvt/common/command.py b/mvt/common/command.py index 3791501..8a3196a 100644 --- a/mvt/common/command.py +++ b/mvt/common/command.py @@ -33,6 +33,7 @@ class Command: ) -> None: self.name = "" self.modules = [] + self.modules_post = [] self.target_path = target_path self.results_path = results_path @@ -139,7 +140,7 @@ class Command: def list_modules(self) -> None: self.log.info("Following is the list of available %s modules:", self.name) - for module in self.modules: + for module in (self.modules + self.modules_post): self.log.info(" - %s", module.__name__) def init(self) -> None: diff --git a/mvt/common/module.py b/mvt/common/module.py index a692e94..cb5de61 100644 --- a/mvt/common/module.py +++ b/mvt/common/module.py @@ -7,6 +7,7 @@ import csv import logging import os import re +import glob from typing import Any, Dict, List, Optional, Union import simplejson as json @@ -225,3 +226,51 @@ def save_timeline(timeline: list, timeline_path: str) -> None: event.get("event"), event.get("data"), ]) + + +class PostAnalysisModule(MVTModule): + """ + Base module for implementing post-processing rules against the output of + multiple MVT modules + """ + @classmethod + def from_results(cls, results_path: str, log: logging.Logger): + results = cls.load_results(results_path, log=log) + return cls(results=results, log=log) + + @classmethod + def load_results(cls, results_path: str, log: logging.Logger): + """Load the results from a directory of json file.""" + # TODO: Move this to run once before loading all post-processing modules + module_results = {} + for json_path in glob.glob(os.path.join(results_path, "*.json")): + module_name, _ = os.path.splitext(os.path.basename(json_path)) + with open(json_path, "r", encoding="utf-8") as handle: + try: + module_results[module_name] = json.load(handle) + except Exception as exc: + log.error("Unable to load results from file %s: %s", + json_path, exc) + + if not module_results: + log.error("Did not find any MVT results at %s", results_path) + + return module_results + + def load_timeline(self): + """Load timeline from CSV file""" + timeline = [] + timeline_path = os.path.join(self.results_path, "timeline.csv") + with open(timeline_path, "r", encoding="utf-8") as handle: + csvinput = csv.reader(handle, delimiter=",", quotechar="\"", + quoting=csv.QUOTE_ALL, escapechar='\\') + for row in csvinput: + if row[0] == "UTC Timestamp": + continue + timeline.append({ + "timestamp": row[0], + "module": row[1], + "event": row[2], + "data": row[3], + }) + return timeline \ No newline at end of file diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index f7daf0a..b803669 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -26,6 +26,7 @@ from .decrypt import DecryptBackup from .modules.backup import BACKUP_MODULES from .modules.fs import FS_MODULES from .modules.mixed import MIXED_MODULES +from .modules.post_analysis import POST_ANALYSIS_MODULES init_logging() log = logging.getLogger("mvt") @@ -234,7 +235,7 @@ def check_fs(ctx, iocs, output, fast, list_modules, module, hashes, verbose, dum @click.pass_context def check_iocs(ctx, iocs, list_modules, module, folder): cmd = CmdCheckIOCS(target_path=folder, ioc_files=iocs, module_name=module) - cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES + cmd.modules = BACKUP_MODULES + FS_MODULES + MIXED_MODULES + POST_ANALYSIS_MODULES if list_modules: cmd.list_modules() diff --git a/mvt/ios/modules/backup/manifest.py b/mvt/ios/modules/backup/manifest.py index 5c4b5ea..d592468 100644 --- a/mvt/ios/modules/backup/manifest.py +++ b/mvt/ios/modules/backup/manifest.py @@ -158,6 +158,7 @@ class Manifest(IOSExtraction): "mode": oct(self._get_key(file_metadata, "Mode")), "owner": self._get_key(file_metadata, "UserID"), "size": self._get_key(file_metadata, "Size"), + "type": "file" if file_data["flags"] == 1 else "directory", }) except Exception: self.log.exception("Error reading manifest file metadata for file with ID %s " diff --git a/mvt/ios/modules/post_analysis/__init__.py b/mvt/ios/modules/post_analysis/__init__.py new file mode 100644 index 0000000..04bd961 --- /dev/null +++ b/mvt/ios/modules/post_analysis/__init__.py @@ -0,0 +1,3 @@ +from .attachment_deletion import PostAttachmentDeletion + +POST_ANALYSIS_MODULES = [PostAttachmentDeletion] \ No newline at end of file diff --git a/mvt/ios/modules/post_analysis/attachment_deletion.py b/mvt/ios/modules/post_analysis/attachment_deletion.py new file mode 100644 index 0000000..ab7a42c --- /dev/null +++ b/mvt/ios/modules/post_analysis/attachment_deletion.py @@ -0,0 +1,101 @@ +import logging +import datetime +from typing import Optional + +from mvt.common.module import PostAnalysisModule + + +class PostAttachmentDeletion(PostAnalysisModule): + """ + Heuristic detection for attachment deletion in a cert time period. + + + This module implements a hueuristic detection for a multiple iOS SMS attachmemt being deleted + in a short period of time. This is a similar concept to the following script used + by Kaspersky Labs to detect infections with the Triangulation iOS malware: + https://github.com/KasperskyLab/triangle_check/blob/main/triangle_check/__init__.py + """ + def __init__( + self, + file_path: Optional[str] = None, + target_path: Optional[str] = None, + results_path: Optional[str] = None, + fast_mode: Optional[bool] = False, + log: logging.Logger = logging.getLogger(__name__), + results: Optional[list] = None + ) -> None: + super().__init__(file_path=file_path, target_path=target_path, + results_path=results_path, fast_mode=fast_mode, + log=log, results=results) + + self.required_modules = ["manifest"] + + + def load_locationd_events(self): + locationd_clients = self.results["locationd_clients"] + locations_stopped_event = [event for event in locationd_clients if "LocationTimeStopped" in event] + return locations_stopped_event + + def run(self) -> None: + """ + Run the post-processing module. + + The logical is to look for all SMS attachment directories which were recently created + shortly before their last modified time, but which have no contained files. + """ + for module in self.required_modules: + if module not in self.results: + raise Exception(f"Required module {module} was not found in results. Did you run the required modules?") + + locationd_events = [] + locationd_client_iocs = [ + "com.apple.locationd.bundle-/System/Library/LocationBundles/IonosphereHarvest.bundle", + "com.apple.locationd.bundle-/System/Library/LocationBundles/WRMLinkSelection.bundle" + ] + for event in self.load_locationd_events(): + for ioc in locationd_client_iocs: + if ioc in event["Registered"]: + locationd_events.append(event) + print(event) + + + + # Filter the relevant events from the manifest: + events_by_time = {} + sms_files = [event for event in self.results["manifest"] if event["relative_path"].startswith("Library/SMS/Attachments/")] + attachment_folders = {} + for record in sorted(sms_files, key=lambda x: x["relative_path"]): + num_path_segments = record["relative_path"].count('/') + # Skip entries with a full-path + # if not (num_path_segments == 3 or num_path_segments == 4): + # continue + + attachment_root = "/".join(record["relative_path"].split('/', 5)[:5]) + attachment_folder = attachment_folders.get(attachment_root, []) + attachment_folder.append(record) + attachment_folders[attachment_root] = attachment_folder + + # Look for directories containing no files, which had a short lifespan + for key, items in attachment_folders.items(): + has_files = any([item["flags"] == 1 for item in items]) + if has_files: + continue + + for item in sorted(items, key=lambda x: x["created"]): + # item_created = datetime.datetime.strptime(item["created"], "%Y-%m-%d %H:%M:%S.%f") + item_modified = datetime.datetime.strptime(item["modified"], "%Y-%m-%d %H:%M:%S.%f") # M + status_changed = datetime.datetime.strptime(item["status_changed"], "%Y-%m-%d %H:%M:%S.%f") # C + + # self.append_timeline(fs_stat['LastModified'], ('M', relativePath)) + # self.append_timeline(fs_stat['LastStatusChange'], ('C', relativePath)) + # self.append_timeline(fs_stat['Birth'], ('B', relativePath)) + + + # Skip items which were created and modified at the same time, likely never had files. + # print(item["relative_path"], status_changed, item_modified) + if item_modified == status_changed: + print("changed == modified", item["relative_path"], status_changed, item_modified) + continue + + if (item_modified - status_changed): # < datetime.timedelta(minutes=10): + self.log.info(f"Possible attachment deletion. Attachment folder '{key}' with no files, created and modified within 10 minutes. '{item['relative_path']}' created {item_created}, modified {item_modified})")