From b35cd4bc734b2e77e7dadd179ad6453c21827501 Mon Sep 17 00:00:00 2001 From: Nex Date: Fri, 21 Jan 2022 16:26:58 +0100 Subject: [PATCH] Added support for context-aware indicators. This way when a detection is logged, the user can know which STIX2 file was matched by the module --- mvt/android/cli.py | 4 +- mvt/common/indicators.py | 244 ++++++++++++++++++----------- mvt/ios/cli.py | 10 +- mvt/ios/modules/backup/manifest.py | 13 +- mvt/ios/modules/fs/analytics.py | 29 ++-- mvt/ios/modules/fs/cache_files.py | 10 +- mvt/ios/modules/fs/filesystem.py | 19 +-- mvt/ios/modules/fs/shutdownlog.py | 4 +- mvt/ios/modules/mixed/whatsapp.py | 8 +- tests/common/test_indicators.py | 14 +- tests/ios/test_datausage.py | 2 +- tests/ios/test_manifest.py | 3 +- 12 files changed, 209 insertions(+), 151 deletions(-) diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 0daad48..1881a03 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -139,7 +139,7 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial): m = adb_module(output_folder=output, fast_mode=fast, log=logging.getLogger(adb_module.__module__)) - if indicators.ioc_count: + if indicators.total_ioc_count: m.indicators = indicators m.indicators.log = m.log if serial: @@ -190,7 +190,7 @@ def check_backup(ctx, iocs, output, backup_path, serial): for module in BACKUP_MODULES: m = module(base_folder=backup_path, output_folder=output, log=logging.getLogger(module.__module__)) - if indicators.ioc_count: + if indicators.total_ioc_count: m.indicators = indicators m.indicators.log = m.log if serial: diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 05d09d0..4286670 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -21,20 +21,8 @@ class Indicators: def __init__(self, log=None): self.data_dir = user_data_dir("mvt") self.log = log - self.ioc_domains = [] - self.ioc_processes = [] - self.ioc_emails = [] self.ioc_files = [] - self.ioc_file_paths = [] - self.ioc_files_sha256 = [] - self.ioc_app_ids = [] - self.ios_profile_ids = [] - self.ioc_count = 0 - - def _add_indicator(self, ioc, iocs_list): - if ioc not in iocs_list: - iocs_list.append(ioc) - self.ioc_count += 1 + self.total_ioc_count = 0 def _load_downloaded_indicators(self): if not os.path.isdir(self.data_dir): @@ -58,6 +46,102 @@ class Indicators: else: self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path) + def _generate_indicators_file(self): + return { + "name": "", + "description": "", + "file_name": "", + "file_path": "", + "domains": [], + "processes": [], + "emails": [], + "file_names": [], + "file_paths": [], + "files_sha256": [], + "app_ids": [], + "ios_profile_ids": [], + "count": 0, + } + + def _add_indicator(self, ioc, ioc_file, iocs_list): + if ioc not in iocs_list: + iocs_list.append(ioc) + ioc_file["count"] += 1 + self.total_ioc_count += 1 + + def parse_stix2(self, file_path): + """Extract indicators from a STIX2 file. + + :param file_path: Path to the STIX2 file to parse + :type file_path: str + + """ + self.log.info("Parsing STIX2 indicators file at path %s", file_path) + + ioc_file = self._generate_indicators_file() + ioc_file["file_path"] = file_path + ioc_file["file_name"] = os.path.basename(file_path) + + with open(file_path, "r") as handle: + try: + data = json.load(handle) + except json.decoder.JSONDecodeError: + self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.") + return + + for entry in data.get("objects", []): + entry_type = entry.get("type", "") + if entry_type == "malware": + ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"] + ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"] + continue + + if entry_type != "indicator": + continue + + key, value = entry.get("pattern", "").strip("[]").split("=") + value = value.strip("'") + + if key == "domain-name:value": + # We force domain names to lower case. + self._add_indicator(ioc=value.lower(), + ioc_file=ioc_file, + iocs_list=ioc_file["domains"]) + elif key == "process:name": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["processes"]) + elif key == "email-addr:value": + # We force email addresses to lower case. + self._add_indicator(ioc=value.lower(), + ioc_file=ioc_file, + iocs_list=ioc_file["emails"]) + elif key == "file:name": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["file_names"]) + elif key == "file:path": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["file_paths"]) + elif key == "file:hashes.sha256": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["files_sha256"]) + elif key == "app:id": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["app_ids"]) + elif key == "configuration-profile:id": + self._add_indicator(ioc=value, + ioc_file=ioc_file, + iocs_list=ioc_file["ios_profile_ids"]) + + self.log.info("Loaded %d indicators from \"%s\" indicators file", + ioc_file["count"], ioc_file["name"]) + + self.ioc_files.append(ioc_file) + def load_indicators_files(self, files, load_default=True): """ Load a list of indicators files. @@ -68,60 +152,20 @@ class Indicators: else: self.log.warning("This indicators file %s does not exist", file_path) - # Load downloaded indicators and any indicators from env variable + # Load downloaded indicators and any indicators from env variable. if load_default: self._load_downloaded_indicators() + self._check_stix2_env_variable() - self.log.info("Loaded a total of %d unique indicators", self.ioc_count) + self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def parse_stix2(self, file_path): - """Extract indicators from a STIX2 file. - - :param file_path: Path to the STIX2 file to parse - :type file_path: str - - """ - self.log.info("Parsing STIX2 indicators file at path %s", file_path) - with open(file_path, "r") as handle: - try: - data = json.load(handle) - except json.decoder.JSONDecodeError: - self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.") - return - - for entry in data.get("objects", []): - if entry.get("type", "") != "indicator": - continue - - key, value = entry.get("pattern", "").strip("[]").split("=") - value = value.strip("'") - - if key == "domain-name:value": - # We force domain names to lower case. - self._add_indicator(ioc=value.lower(), - iocs_list=self.ioc_domains) - elif key == "process:name": - self._add_indicator(ioc=value, - iocs_list=self.ioc_processes) - elif key == "email-addr:value": - # We force email addresses to lower case. - self._add_indicator(ioc=value.lower(), - iocs_list=self.ioc_emails) - elif key == "file:name": - self._add_indicator(ioc=value, - iocs_list=self.ioc_files) - elif key == "file:path": - self._add_indicator(ioc=value, - iocs_list=self.ioc_file_paths) - elif key == "app:id": - self._add_indicator(ioc=value, - iocs_list=self.ioc_app_ids) - elif key == "configuration-profile:id": - self._add_indicator(ioc=value, - iocs_list=self.ios_profile_ids) - elif key == "file:hashes.sha256": - self._add_indicator(ioc=value, - iocs_list=self.ioc_files_sha256) + def get_iocs(self, ioc_type): + for ioc_file in self.ioc_files: + for ioc in ioc_file.get(ioc_type, []): + yield { + "value": ioc, + "name": ioc_file["name"] + } def check_domain(self, url) -> bool: """Check if a given URL matches any of the provided domain indicators. @@ -133,7 +177,7 @@ class Indicators: """ # TODO: If the IOC domain contains a subdomain, it is not currently - # being matched. + # being matched. if not url: return False @@ -163,33 +207,36 @@ class Indicators: except Exception: # If URL parsing failed, we just try to do a simple substring # match. - for ioc in self.ioc_domains: - if ioc.lower() in url: - self.log.warning("Maybe found a known suspicious domain: %s", url) + for ioc in self.get_iocs("domains"): + if ioc["value"].lower() in url: + self.log.warning("Maybe found a known suspicious domain %s matching indicators from \"%s\"", + url, ioc["name"]) return True # If nothing matched, we can quit here. return False # If all parsing worked, we start walking through available domain indicators. - for ioc in self.ioc_domains: + for ioc in self.get_iocs("domains"): # First we check the full domain. - if final_url.domain.lower() == ioc: + if final_url.domain.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning("Found a known suspicious domain %s shortened as %s", - final_url.url, orig_url.url) + self.log.warning("Found a known suspicious domain %s shortened as %s matching indicators from \"%s\"", + final_url.url, orig_url.url, ioc["name"]) else: - self.log.warning("Found a known suspicious domain: %s", final_url.url) + self.log.warning("Found a known suspicious domain %s matching indicators from \"%s\"", + final_url.url, ioc["name"]) return True # Then we just check the top level domain. - if final_url.top_level.lower() == ioc: + if final_url.top_level.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: - self.log.warning("Found a sub-domain matching a known suspicious top level %s shortened as %s", - final_url.url, orig_url.url) + self.log.warning("Found a sub-domain with suspicious top level %s shortened as %s matching indicators from \"%s\"", + final_url.url, orig_url.url, ioc["name"]) else: - self.log.warning("Found a sub-domain matching a known suspicious top level: %s", final_url.url) + self.log.warning("Found a sub-domain with a suspicious top level %s matching indicators from \"%s\"", + final_url.url, ioc["name"]) return True @@ -227,14 +274,16 @@ class Indicators: return False proc_name = os.path.basename(process) - if proc_name in self.ioc_processes: - self.log.warning("Found a known suspicious process name \"%s\"", process) - return True + for ioc in self.get_iocs("processes"): + if proc_name == ioc["value"]: + self.log.warning("Found a known suspicious process name \"%s\" matching indicators from \"%s\"", + process, ioc["name"]) + return True - if len(proc_name) == 16: - for bad_proc in self.ioc_processes: - if bad_proc.startswith(proc_name): - self.log.warning("Found a truncated known suspicious process name \"%s\"", process) + if len(proc_name) == 16: + if ioc["value"].startswith(proc_name): + self.log.warning("Found a truncated known suspicious process name \"%s\" matching indicators from \"%s\"", + process, ioc["name"]) return True return False @@ -270,9 +319,11 @@ class Indicators: if not email: return False - if email.lower() in self.ioc_emails: - self.log.warning("Found a known suspicious email address: \"%s\"", email) - return True + for ioc in self.get_iocs("emails"): + if email.lower() == ioc["value"].lower(): + self.log.warning("Found a known suspicious email address \"%s\" matching indicators from \"%s\"", + email, ioc["name"]) + return True return False @@ -289,8 +340,11 @@ class Indicators: if not file_name: return False - if file_name in self.ioc_files: - return True + for ioc in self.get_iocs("file_names"): + if ioc["value"] == file_name: + self.log.warning("Found a known suspicious file name \"%s\" matching indicators from \"%s\"", + file_name, ioc["name"]) + return True return False @@ -310,9 +364,11 @@ class Indicators: if self.check_file_name(os.path.basename(file_path)): return True - for ioc_file in self.ioc_file_paths: + for ioc in self.get_iocs("file_paths"): # Strip any trailing slash from indicator paths to match directories. - if file_path.startswith(ioc_file.rstrip("/")): + if file_path.startswith(ioc["value"].rstrip("/")): + self.log.warning("Found a known suspicious file path \"%s\" matching indicators form \"%s\"", + file_path, ioc["name"]) return True return False @@ -326,8 +382,11 @@ class Indicators: :rtype: bool """ - if profile_uuid in self.ios_profile_ids: - return True + for ioc in self.get_iocs("ios_profile_ids"): + if profile_uuid in ioc["value"]: + self.log.warning("Found a known suspicious profile ID \"%s\" matching indicators from \"%s\"", + profile_uuid, ioc["name"]) + return True return False @@ -361,4 +420,5 @@ def download_indicators_files(log): # Write file to disk. This will overwrite any older version of the STIX2 file. with io.open(ioc_path, "w") as f: f.write(res.text) + log.info("Saved indicator file to '%s'", os.path.basename(ioc_path)) diff --git a/mvt/ios/cli.py b/mvt/ios/cli.py index cef65a2..18c4866 100644 --- a/mvt/ios/cli.py +++ b/mvt/ios/cli.py @@ -168,7 +168,7 @@ def check_backup(ctx, iocs, output, fast, backup_path, list_modules, module): m = backup_module(base_folder=backup_path, output_folder=output, fast_mode=fast, log=logging.getLogger(backup_module.__module__)) m.is_backup = True - if indicators.ioc_count: + if indicators.total_ioc_count > 0: m.indicators = indicators m.indicators.log = m.log @@ -225,7 +225,7 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module): log=logging.getLogger(fs_module.__module__)) m.is_fs_dump = True - if indicators.ioc_count: + if indicators.total_ioc_count > 0: m.indicators = indicators m.indicators.log = m.log @@ -245,14 +245,14 @@ def check_fs(ctx, iocs, output, fast, dump_path, list_modules, module): #============================================================================== @cli.command("check-iocs", help="Compare stored JSON results to provided indicators") @click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, - default=[], required=True, help=HELP_MSG_IOC) + default=[], help=HELP_MSG_IOC) @click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) @click.option("--module", "-m", help=HELP_MSG_MODULE) @click.argument("FOLDER", type=click.Path(exists=True)) @click.pass_context def check_iocs(ctx, iocs, list_modules, module, folder): all_modules = [] - for entry in BACKUP_MODULES + FS_MODULES: + for entry in BACKUP_MODULES + FS_MODULES + MIXED_MODULES: if entry not in all_modules: all_modules.append(entry) @@ -284,7 +284,7 @@ def check_iocs(ctx, iocs, list_modules, module, folder): m = iocs_module.from_json(file_path, log=logging.getLogger(iocs_module.__module__)) - if indicators.ioc_count: + if indicators.total_ioc_count > 0: m.indicators = indicators m.indicators.log = m.log diff --git a/mvt/ios/modules/backup/manifest.py b/mvt/ios/modules/backup/manifest.py index 41352d0..1d71a7f 100644 --- a/mvt/ios/modules/backup/manifest.py +++ b/mvt/ios/modules/backup/manifest.py @@ -72,9 +72,7 @@ class Manifest(IOSExtraction): return for result in self.results: - if "relative_path" not in result: - continue - if not result["relative_path"]: + if not result.get("relative_path"): continue if result["domain"]: @@ -84,15 +82,14 @@ class Manifest(IOSExtraction): continue if self.indicators.check_file_path("/" + result["relative_path"]): - self.log.warning("Found a known malicious file at path: %s", result["relative_path"]) self.detected.append(result) continue - relPath = result["relative_path"].lower() - for ioc in self.indicators.ioc_domains: - if ioc.lower() in relPath: + rel_path = result["relative_path"].lower() + for ioc in self.indicators.get_iocs("domains"): + if ioc["value"].lower() in rel_path: self.log.warning("Found mention of domain \"%s\" in a backup file with path: %s", - ioc, relPath) + ioc["value"], rel_path) self.detected.append(result) def run(self): diff --git a/mvt/ios/modules/fs/analytics.py b/mvt/ios/modules/fs/analytics.py index 7567f9d..f399a1d 100644 --- a/mvt/ios/modules/fs/analytics.py +++ b/mvt/ios/modules/fs/analytics.py @@ -37,20 +37,20 @@ class Analytics(IOSExtraction): return for result in self.results: - for ioc in self.indicators.ioc_processes: - for key in result.keys(): - if ioc == result[key]: - self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s", - ioc, result["artifact"], result["timestamp"]) - self.detected.append(result) - break - for ioc in self.indicators.ioc_domains: - for key in result.keys(): - if ioc in str(result[key]): - self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s", - ioc, result["artifact"], result["timestamp"]) - self.detected.append(result) - break + for value in result.values(): + if not isinstance(value, str): + continue + + if self.indicators.check_process(value): + self.log.warning("Found mention of a malicious process \"%s\" in %s file at %s", + value, result["artifact"], result["timestamp"]) + self.detected.append(result) + continue + + if self.indicators.check_domain(value): + self.log.warning("Found mention of a malicious domain \"%s\" in %s file at %s", + value, result["artifact"], result["timestamp"]) + self.detected.append(result) def _extract_analytics_data(self): artifact = self.file_path.split("/")[-1] @@ -101,6 +101,7 @@ class Analytics(IOSExtraction): timestamp = "" data = plistlib.loads(row[1]) data["timestamp"] = timestamp + data["artifact"] = artifact self.results.append(data) diff --git a/mvt/ios/modules/fs/cache_files.py b/mvt/ios/modules/fs/cache_files.py index eaa5e8d..522360d 100644 --- a/mvt/ios/modules/fs/cache_files.py +++ b/mvt/ios/modules/fs/cache_files.py @@ -34,13 +34,13 @@ class CacheFiles(IOSExtraction): return self.detected = {} - for key, items in self.results.items(): - for item in items: - if self.indicators.check_domain(item["url"]): + for key, values in self.results.items(): + for value in values: + if self.indicators.check_domain(value["url"]): if key not in self.detected: - self.detected[key] = [item, ] + self.detected[key] = [value, ] else: - self.detected[key].append(item) + self.detected[key].append(value) def _process_cache_file(self, file_path): self.log.info("Processing cache file at path: %s", file_path) diff --git a/mvt/ios/modules/fs/filesystem.py b/mvt/ios/modules/fs/filesystem.py index c68ee82..c81c897 100644 --- a/mvt/ios/modules/fs/filesystem.py +++ b/mvt/ios/modules/fs/filesystem.py @@ -37,19 +37,20 @@ class Filesystem(IOSExtraction): return for result in self.results: + if "path" not in result: + continue + if self.indicators.check_file_path(result["path"]): - self.log.warning("Found a known malicious file path at path: %s", result["path"]) self.detected.append(result) - # If we are instructed to run fast, we skip this. + # If we are instructed to run fast, we skip the rest. if self.fast_mode: - self.log.info("Flag --fast was enabled: skipping extended search for suspicious files/processes") - else: - for ioc in self.indicators.ioc_processes: - parts = result["path"].split("/") - if ioc in parts: - self.log.warning("Found a known malicious file/process at path: %s", result["path"]) - self.detected.append(result) + continue + + for ioc in ioc_file.get_iocs("processes"): + parts = result["path"].split("/") + if ioc in parts: + self.detected.append(result) def run(self): for root, dirs, files in os.walk(self.base_folder): diff --git a/mvt/ios/modules/fs/shutdownlog.py b/mvt/ios/modules/fs/shutdownlog.py index 1893e26..5efcdb6 100644 --- a/mvt/ios/modules/fs/shutdownlog.py +++ b/mvt/ios/modules/fs/shutdownlog.py @@ -35,12 +35,10 @@ class ShutdownLog(IOSExtraction): for result in self.results: if self.indicators.check_file_path(result["client"]): - self.log.warning("Found mention of a known malicious file \"%s\" in shutdown.log", - result["client"]) self.detected.append(result) continue - for ioc in self.indicators.ioc_processes: + for ioc in self.indicators.get_iocs("processes"): parts = result["client"].split("/") if ioc in parts: self.log.warning("Found mention of a known malicious process \"%s\" in shutdown.log", diff --git a/mvt/ios/modules/mixed/whatsapp.py b/mvt/ios/modules/mixed/whatsapp.py index ed729a6..42c5c7b 100644 --- a/mvt/ios/modules/mixed/whatsapp.py +++ b/mvt/ios/modules/mixed/whatsapp.py @@ -35,6 +35,7 @@ class Whatsapp(IOSExtraction): links_text = "" if record["links"]: links_text = " - Embedded links: " + ", ".join(record["links"]) + return { "timestamp": record.get("isodate"), "module": self.__class__.__name__, @@ -47,7 +48,7 @@ class Whatsapp(IOSExtraction): return for message in self.results: - if self.indicators.check_domains(message["links"]): + if self.indicators.check_domains(message.get("links", [])): self.detected.append(message) def run(self): @@ -83,14 +84,15 @@ class Whatsapp(IOSExtraction): message["isodate"] = convert_timestamp_to_iso(convert_mactime_to_unix(message.get("ZMESSAGEDATE"))) message["ZTEXT"] = message["ZTEXT"] if message["ZTEXT"] else "" - # Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. Check each of them! + # Extract links from the WhatsApp message. URLs can be stored in multiple fields/columns. + # Check each of them! message_links = [] fields_with_links = ["ZTEXT", "ZMATCHEDTEXT", "ZMEDIAURL", "ZCONTENT1", "ZCONTENT2"] for field in fields_with_links: if message.get(field): message_links.extend(check_for_links(message.get(field, ""))) - # Remove WhatsApp internal media URLs + # Remove WhatsApp internal media URLs. filtered_links = [] for link in message_links: if not (link.startswith("https://mmg-fna.whatsapp.net/") or link.startswith("https://mmg.whatsapp.net/")): diff --git a/tests/common/test_indicators.py b/tests/common/test_indicators.py index 4e95405..6ee516f 100644 --- a/tests/common/test_indicators.py +++ b/tests/common/test_indicators.py @@ -13,11 +13,11 @@ class TestIndicators: def test_parse_stix2(self, indicator_file): ind = Indicators(log=logging) ind.load_indicators_files([indicator_file], load_default=False) - assert ind.ioc_count == 4 - assert len(ind.ioc_domains) == 1 - assert len(ind.ioc_emails) == 1 - assert len(ind.ioc_files) == 1 - assert len(ind.ioc_processes) == 1 + assert ind.ioc_files[0]["count"] == 4 + assert len(ind.ioc_files[0]["domains"]) == 1 + assert len(ind.ioc_files[0]["emails"]) == 1 + assert len(ind.ioc_files[0]["file_names"]) == 1 + assert len(ind.ioc_files[0]["processes"]) == 1 def test_check_domain(self, indicator_file): ind = Indicators(log=logging) @@ -28,5 +28,5 @@ class TestIndicators: def test_env_stix(self, indicator_file): os.environ["MVT_STIX2"] = indicator_file ind = Indicators(log=logging) - ind.load_indicators_files([indicator_file], load_default=False) - assert ind.ioc_count == 4 + ind.load_indicators_files([], load_default=False) + assert ind.total_ioc_count == 4 diff --git a/tests/ios/test_datausage.py b/tests/ios/test_datausage.py index 6053d65..df214a4 100644 --- a/tests/ios/test_datausage.py +++ b/tests/ios/test_datausage.py @@ -25,7 +25,7 @@ class TestDatausageModule: ind = Indicators(log=logging) ind.parse_stix2(indicator_file) # Adds a file that exists in the manifest. - ind.ioc_processes[0] = "CumulativeUsageTracker" + ind.ioc_files[0]["processes"].append("CumulativeUsageTracker") m.indicators = ind run_module(m) assert len(m.detected) == 2 diff --git a/tests/ios/test_manifest.py b/tests/ios/test_manifest.py index 749eefc..a4bdfb9 100644 --- a/tests/ios/test_manifest.py +++ b/tests/ios/test_manifest.py @@ -24,8 +24,7 @@ class TestManifestModule: m = Manifest(base_folder=get_backup_folder(), log=logging, results=[]) ind = Indicators(log=logging) ind.parse_stix2(indicator_file) - # Adds a file that exists in the manifest - ind.ioc_files[0] = "com.apple.CoreBrightness.plist" + ind.ioc_files[0]["file_names"].append("com.apple.CoreBrightness.plist") m.indicators = ind run_module(m) assert len(m.detected) == 1