diff --git a/mvt/android/cli.py b/mvt/android/cli.py index 790fc6e..01db812 100644 --- a/mvt/android/cli.py +++ b/mvt/android/cli.py @@ -5,6 +5,8 @@ import logging import os +from pathlib import Path +from zipfile import ZipFile import click from rich.logging import RichHandler @@ -21,6 +23,7 @@ from .lookups.koodous import koodous_lookup from .lookups.virustotal import virustotal_lookup from .modules.adb import ADB_MODULES from .modules.backup import BACKUP_MODULES +from .modules.bugreport import BUGREPORT_MODULES # Setup logging using Rich. LOG_FORMAT = "[%(name)s] %(message)s" @@ -46,7 +49,7 @@ def version(): #============================================================================== -# Download APKs +# Command: download-apks #============================================================================== @cli.command("download-apks", help="Download all or non-safelisted installed APKs installed on the device") @click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL) @@ -99,7 +102,7 @@ def download_apks(ctx, all_apks, virustotal, koodous, all_checks, output, from_f #============================================================================== -# Checks through ADB +# Command: check-adb #============================================================================== @cli.command("check-adb", help="Check an Android device over adb") @click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL) @@ -157,7 +160,81 @@ def check_adb(ctx, iocs, output, fast, list_modules, module, serial): #============================================================================== -# Check ADB backup +# Command: check-bugreport +#============================================================================== +@cli.command("check-bugreport", help="Check an Android Bug Report") +@click.option("--iocs", "-i", type=click.Path(exists=True), multiple=True, + default=[], help=HELP_MSG_IOC) +@click.option("--output", "-o", type=click.Path(exists=False), help=HELP_MSG_OUTPUT) +@click.option("--list-modules", "-l", is_flag=True, help=HELP_MSG_LIST_MODULES) +@click.option("--module", "-m", help=HELP_MSG_MODULE) +@click.argument("BUGREPORT_PATH", type=click.Path(exists=True)) +@click.pass_context +def check_bugreport(ctx, iocs, output, list_modules, module, bugreport_path): + if list_modules: + log.info("Following is the list of available check-bugreport modules:") + for adb_module in BUGREPORT_MODULES: + log.info(" - %s", adb_module.__name__) + + return + + log.info("Checking an Android Bug Report located at: %s", bugreport_path) + + if output and not os.path.exists(output): + try: + os.makedirs(output) + except Exception as e: + log.critical("Unable to create output folder %s: %s", output, e) + ctx.exit(1) + + indicators = Indicators(log=log) + indicators.load_indicators_files(iocs) + + if os.path.isfile(bugreport_path): + bugreport_format = "zip" + zip_archive = ZipFile(bugreport_path) + zip_files = [] + for file_name in zip_archive.namelist(): + zip_files.append(file_name) + elif os.path.isdir(bugreport_path): + bugreport_format = "dir" + folder_files = [] + parent_path = Path(bugreport_path).absolute().as_posix() + for root, subdirs, subfiles in os.walk(os.path.abspath(bugreport_path)): + for file_name in subfiles: + folder_files.append(os.path.relpath(os.path.join(root, file_name), parent_path)) + + timeline = [] + timeline_detected = [] + for bugreport_module in BUGREPORT_MODULES: + if module and bugreport_module.__name__ != module: + continue + + m = bugreport_module(base_folder=bugreport_path, output_folder=output, + log=logging.getLogger(bugreport_module.__module__)) + + if bugreport_format == "zip": + m.from_zip(zip_archive, zip_files) + else: + m.from_folder(bugreport_path, folder_files) + + if indicators.total_ioc_count: + m.indicators = indicators + m.indicators.log = m.log + + run_module(m) + timeline.extend(m.timeline) + timeline_detected.extend(m.timeline_detected) + + if output: + if len(timeline) > 0: + save_timeline(timeline, os.path.join(output, "timeline.csv")) + if len(timeline_detected) > 0: + save_timeline(timeline_detected, os.path.join(output, "timeline_detected.csv")) + + +#============================================================================== +# Command: check-backup #============================================================================== @cli.command("check-backup", help="Check an Android Backup") @click.option("--serial", "-s", type=str, help=HELP_MSG_SERIAL) diff --git a/mvt/android/modules/adb/dumpsys_accessibility.py b/mvt/android/modules/adb/dumpsys_accessibility.py index e3bae4e..511f310 100644 --- a/mvt/android/modules/adb/dumpsys_accessibility.py +++ b/mvt/android/modules/adb/dumpsys_accessibility.py @@ -47,7 +47,6 @@ class DumpsysAccessibility(AndroidExtraction): break service = line.split(":")[1].strip() - log.info("Found installed accessibility service \"%s\"", service) results.append({ "package_name": service.split("/")[0], @@ -62,6 +61,9 @@ class DumpsysAccessibility(AndroidExtraction): output = self._adb_command("dumpsys accessibility") self.results = self.parse_accessibility(output) + for result in self.results: + log.info("Found installed accessibility service \"%s\"", result.get("service")) + self.log.info("Identified a total of %d accessibility services", len(self.results)) self._adb_disconnect() diff --git a/mvt/android/modules/adb/dumpsys_battery_daily.py b/mvt/android/modules/adb/dumpsys_battery_daily.py index 934f87c..c1abc7d 100644 --- a/mvt/android/modules/adb/dumpsys_battery_daily.py +++ b/mvt/android/modules/adb/dumpsys_battery_daily.py @@ -39,23 +39,22 @@ class DumpsysBatteryDaily(AndroidExtraction): continue @staticmethod - def parse_battery_history(output): + def parse_battery_daily(output): results = [] daily = None daily_updates = [] for line in output.split("\n")[1:]: if line.startswith(" Daily from "): + if len(daily_updates) > 0: + results.extend(daily_updates) + daily_updates = [] + timeframe = line[13:].strip() date_from, date_to = timeframe.strip(":").split(" to ", 1) daily = {"from": date_from[0:10], "to": date_to[0:10]} - - if not daily: continue - if line.strip() == "": - results.extend(daily_updates) - daily = None - daily_updates = [] + if not daily: continue if not line.strip().startswith("Update "): @@ -86,7 +85,7 @@ class DumpsysBatteryDaily(AndroidExtraction): self._adb_connect() output = self._adb_command("dumpsys batterystats --daily") - self.results = self.parse_battery_history(output) + self.results = self.parse_battery_daily(output) self.log.info("Extracted %d records from battery daily stats", len(self.results)) diff --git a/mvt/android/modules/adb/packages.py b/mvt/android/modules/adb/packages.py index 3875e71..cb356de 100644 --- a/mvt/android/modules/adb/packages.py +++ b/mvt/android/modules/adb/packages.py @@ -39,6 +39,7 @@ DANGEROUS_PERMISSIONS = [ "com.android.browser.permission.READ_HISTORY_BOOKMARKS", ] + class Packages(AndroidExtraction): """This module extracts the list of installed packages.""" diff --git a/mvt/android/modules/bugreport/__init__.py b/mvt/android/modules/bugreport/__init__.py new file mode 100644 index 0000000..d7f3943 --- /dev/null +++ b/mvt/android/modules/bugreport/__init__.py @@ -0,0 +1,15 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +from .accessibility import Accessibility +from .activities import Activities +from .battery_daily import BatteryDaily +from .battery_history import BatteryHistory +from .dbinfo import DBInfo +from .packages import Packages +from .receivers import Receivers + +BUGREPORT_MODULES = [Accessibility, Activities, BatteryDaily, BatteryHistory, + DBInfo, Packages, Receivers] diff --git a/mvt/android/modules/bugreport/accessibility.py b/mvt/android/modules/bugreport/accessibility.py new file mode 100644 index 0000000..e0a96d0 --- /dev/null +++ b/mvt/android/modules/bugreport/accessibility.py @@ -0,0 +1,63 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_accessibility import DumpsysAccessibility + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class Accessibility(BugReportModule): + """This module extracts stats on accessibility.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + def check_indicators(self): + if not self.indicators: + return + + for result in self.results: + ioc = self.indicators.check_app_id(result["package_name"]) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + lines = [] + in_accessibility = False + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE accessibility:": + in_accessibility = True + continue + + if not in_accessibility: + continue + + if line.strip() == "------------------------------------------------------------------------------": + break + + lines.append(line) + + self.results = DumpsysAccessibility.parse_accessibility("\n".join(lines)) + for result in self.results: + log.info("Found installed accessibility service \"%s\"", result.get("service")) + + self.log.info("Identified a total of %d accessibility services", len(self.results)) diff --git a/mvt/android/modules/bugreport/activities.py b/mvt/android/modules/bugreport/activities.py new file mode 100644 index 0000000..e092c65 --- /dev/null +++ b/mvt/android/modules/bugreport/activities.py @@ -0,0 +1,62 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_activities import DumpsysActivities + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class Activities(BugReportModule): + """This module extracts details on receivers for risky activities.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + self.results = results if results else {} + + def check_indicators(self): + if not self.indicators: + return + + for intent, activities in self.results.items(): + for activity in activities: + ioc = self.indicators.check_app_id(activity["package_name"]) + if ioc: + activity["matched_indicator"] = ioc + self.detected.append({intent: activity}) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + lines = [] + in_package = False + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE package:": + in_package = True + continue + + if not in_package: + continue + + if line.strip() == "------------------------------------------------------------------------------": + break + + lines.append(line) + + self.results = DumpsysActivities.parse_activity_resolver_table("\n".join(lines)) diff --git a/mvt/android/modules/bugreport/base.py b/mvt/android/modules/bugreport/base.py new file mode 100644 index 0000000..94909a1 --- /dev/null +++ b/mvt/android/modules/bugreport/base.py @@ -0,0 +1,47 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 Claudio Guarnieri. +# See the file 'LICENSE' for usage and copying permissions, or find a copy at +# https://github.com/mvt-project/mvt/blob/main/LICENSE + +import fnmatch +import logging +import os + +from mvt.common.module import MVTModule + +log = logging.getLogger(__name__) + + +class BugReportModule(MVTModule): + """This class provides a base for all Android Bug Report modules.""" + + zip_archive = None + + def from_folder(self, extract_path, extract_files): + self.extract_path = extract_path + self.extract_files = extract_files + + def from_zip(self, zip_archive, zip_files): + self.zip_archive = zip_archive + self.zip_files = zip_files + + def _get_files_by_pattern(self, pattern): + file_names = [] + if self.zip_archive: + for zip_file in self.zip_files: + file_names.append(zip_file) + else: + file_names = self.extract_files + + return fnmatch.filter(file_names, pattern) + + def _get_file_content(self, file_path): + if self.zip_archive: + handle = self.zip_archive.open(file_path) + else: + handle = open(os.path.join(self.extract_path, file_path), "rb") + + data = handle.read() + handle.close() + + return data diff --git a/mvt/android/modules/bugreport/battery_daily.py b/mvt/android/modules/bugreport/battery_daily.py new file mode 100644 index 0000000..c330cb2 --- /dev/null +++ b/mvt/android/modules/bugreport/battery_daily.py @@ -0,0 +1,76 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_battery_daily import DumpsysBatteryDaily + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class BatteryDaily(BugReportModule): + """This module extracts records from battery daily updates.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + def serialize(self, record): + return { + "timestamp": record["from"], + "module": self.__class__.__name__, + "event": "battery_daily", + "data": f"Recorded update of package {record['package_name']} with vers {record['vers']}" + } + + def check_indicators(self): + if not self.indicators: + return + + for result in self.results: + ioc = self.indicators.check_app_id(result["package_name"]) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + lines = [] + in_batterystats = False + in_daily = False + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE batterystats:": + in_batterystats = True + continue + + if not in_batterystats: + continue + + if line.strip() == "Daily stats:": + lines.append(line) + in_daily = True + continue + + if not in_daily: + continue + + if line.strip() == "": + break + + lines.append(line) + + self.results = DumpsysBatteryDaily.parse_battery_daily("\n".join(lines)) diff --git a/mvt/android/modules/bugreport/battery_history.py b/mvt/android/modules/bugreport/battery_history.py new file mode 100644 index 0000000..c0e9697 --- /dev/null +++ b/mvt/android/modules/bugreport/battery_history.py @@ -0,0 +1,69 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_battery_history import \ + DumpsysBatteryHistory + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class BatteryHistory(BugReportModule): + """This module extracts records from battery daily updates.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + def check_indicators(self): + if not self.indicators: + return + + for result in self.results: + ioc = self.indicators.check_app_id(result["package_name"]) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + lines = [] + in_batterystats = False + in_history = False + for line in content.decode().split("\n"): + if line.strip() == "********** Print latest newbatterystats **********": + in_batterystats = True + continue + + if not in_batterystats: + continue + + if line.strip().startswith("Battery History "): + lines.append(line) + in_history = True + continue + + if not in_history: + continue + + if line.strip() == "": + break + + lines.append(line) + + self.results = DumpsysBatteryHistory.parse_battery_history("\n".join(lines)) diff --git a/mvt/android/modules/bugreport/dbinfo.py b/mvt/android/modules/bugreport/dbinfo.py new file mode 100644 index 0000000..c125b52 --- /dev/null +++ b/mvt/android/modules/bugreport/dbinfo.py @@ -0,0 +1,63 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_dbinfo import DumpsysDBInfo + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class DBInfo(BugReportModule): + """This module extracts records from battery daily updates.""" + + slug = "dbinfo" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + def check_indicators(self): + if not self.indicators: + return + + for result in self.results: + path = result.get("path", "") + for part in path.split("/"): + ioc = self.indicators.check_app_id(part) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + in_dbinfo = False + lines = [] + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE dbinfo:": + in_dbinfo = True + continue + + if not in_dbinfo: + continue + + if line.strip() == "------------------------------------------------------------------------------": + break + + lines.append(line) + + self.results = DumpsysDBInfo.parse_dbinfo("\n".join(lines)) diff --git a/mvt/android/modules/bugreport/packages.py b/mvt/android/modules/bugreport/packages.py new file mode 100644 index 0000000..d1c1f22 --- /dev/null +++ b/mvt/android/modules/bugreport/packages.py @@ -0,0 +1,118 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging +import re + +from mvt.android.modules.adb.packages import Packages as PCK + +from .base import BugReportModule + +log = logging.getLogger(__name__) + + +class Packages(BugReportModule): + """This module extracts details on receivers for risky activities.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + def serialize(self, record): + records = [] + + timestamps = [ + {"event": "package_install", "timestamp": record["timestamp"]}, + {"event": "package_first_install", "timestamp": record["first_install_time"]}, + {"event": "package_last_update", "timestamp": record["last_update_time"]}, + ] + + for ts in timestamps: + records.append({ + "timestamp": ts["timestamp"], + "module": self.__class__.__name__, + "event": ts["event"], + "data": f"Install or update of package {record['package_name']}", + }) + + return records + + def check_indicators(self): + if not self.indicators: + return + + for result in self.results: + ioc = self.indicators.check_app_id(result["package_name"]) + if ioc: + result["matched_indicator"] = ioc + self.detected.append(result) + continue + + @staticmethod + def parse_packages_list(output): + pkg_rxp = re.compile(r" Package \[(.+?)\].*") + + results = [] + package_name = None + package = {} + lines = [] + for line in output.split("\n"): + if line.startswith(" Package ["): + if len(lines) > 0: + details = PCK.parse_package_for_details("\n".join(lines)) + package.update(details) + results.append(package) + package = {} + + matches = pkg_rxp.findall(line) + if not matches: + continue + + package_name = matches[0] + package["package_name"] = package_name + continue + + if not package_name: + continue + + lines.append(line) + + return results + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + in_package = False + in_packages_list = False + lines = [] + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE package:": + in_package = True + continue + + if not in_package: + continue + + if line.strip() == "Packages:": + in_packages_list = True + continue + + if not in_packages_list: + continue + + if line.strip() == "": + break + + lines.append(line) + + self.results = self.parse_packages_list("\n".join(lines)) diff --git a/mvt/android/modules/bugreport/receivers.py b/mvt/android/modules/bugreport/receivers.py new file mode 100644 index 0000000..4954c9a --- /dev/null +++ b/mvt/android/modules/bugreport/receivers.py @@ -0,0 +1,84 @@ +# Mobile Verification Toolkit (MVT) +# Copyright (c) 2021-2022 The MVT Project Authors. +# Use of this software is governed by the MVT License 1.1 that can be found at +# https://license.mvt.re/1.1/ + +import logging + +from mvt.android.modules.adb.dumpsys_receivers import DumpsysReceivers + +from .base import BugReportModule + +log = logging.getLogger(__name__) + +INTENT_NEW_OUTGOING_SMS = "android.provider.Telephony.NEW_OUTGOING_SMS" +INTENT_SMS_RECEIVED = "android.provider.Telephony.SMS_RECEIVED" +INTENT_DATA_SMS_RECEIVED = "android.intent.action.DATA_SMS_RECEIVED" +INTENT_PHONE_STATE = "android.intent.action.PHONE_STATE" +INTENT_NEW_OUTGOING_CALL = "android.intent.action.NEW_OUTGOING_CALL" + + +class Receivers(BugReportModule): + """This module extracts details on receivers for risky activities.""" + + def __init__(self, file_path=None, base_folder=None, output_folder=None, + serial=None, fast_mode=False, log=None, results=[]): + super().__init__(file_path=file_path, base_folder=base_folder, + output_folder=output_folder, fast_mode=fast_mode, + log=log, results=results) + + self.results = results if results else {} + + def check_indicators(self): + if not self.indicators: + return + + for intent, receivers in self.results.items(): + for receiver in receivers: + if intent == INTENT_NEW_OUTGOING_SMS: + self.log.info("Found a receiver to intercept outgoing SMS messages: \"%s\"", + receiver["receiver"]) + elif intent == INTENT_SMS_RECEIVED: + self.log.info("Found a receiver to intercept incoming SMS messages: \"%s\"", + receiver["receiver"]) + elif intent == INTENT_DATA_SMS_RECEIVED: + self.log.info("Found a receiver to intercept incoming data SMS message: \"%s\"", + receiver["receiver"]) + elif intent == INTENT_PHONE_STATE: + self.log.info("Found a receiver monitoring telephony state/incoming calls: \"%s\"", + receiver["receiver"]) + elif intent == INTENT_NEW_OUTGOING_CALL: + self.log.info("Found a receiver monitoring outgoing calls: \"%s\"", + receiver["receiver"]) + + ioc = self.indicators.check_app_id(receiver["package_name"]) + if ioc: + receiver["matched_indicator"] = ioc + self.detected.append({intent: receiver}) + continue + + def run(self): + dumpstate_files = self._get_files_by_pattern("dumpstate-*") + if not dumpstate_files: + return + + content = self._get_file_content(dumpstate_files[0]) + if not content: + return + + in_receivers = False + lines = [] + for line in content.decode().split("\n"): + if line.strip() == "DUMP OF SERVICE package:": + in_receivers = True + continue + + if not in_receivers: + continue + + if line.strip() == "------------------------------------------------------------------------------": + break + + lines.append(line) + + self.results = DumpsysReceivers.parse_receiver_resolver_table("\n".join(lines)) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 3bda7e2..339b35d 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -20,7 +20,7 @@ class Indicators: def __init__(self, log=None): self.data_dir = user_data_dir("mvt") self.log = log - self.ioc_files = [] + self.ioc_collections = [] self.total_ioc_count = 0 def _load_downloaded_indicators(self): @@ -33,7 +33,7 @@ class Indicators: def _check_stix2_env_variable(self): """ - Checks if a variable MVT_STIX2 contains path to STIX Files. + Checks if a variable MVT_STIX2 contains path to a STIX files. """ if "MVT_STIX2" not in os.environ: return @@ -43,14 +43,17 @@ class Indicators: if os.path.isfile(path): self.parse_stix2(path) else: - self.log.info("Invalid STIX2 path %s in MVT_STIX2 environment variable", path) + self.log.error("Path specified with env MVT_STIX2 is not a valid file: %s", + path) - def _generate_indicators_file(self): + def _new_collection(self, cid="", name="", description="", file_name="", + file_path=""): return { - "name": "", - "description": "", - "file_name": "", - "file_path": "", + "id": cid, + "name": name, + "description": description, + "stix2_file_name": file_name, + "stix2_file_path": file_path, "domains": [], "processes": [], "emails": [], @@ -62,10 +65,11 @@ class Indicators: "count": 0, } - def _add_indicator(self, ioc, ioc_file, iocs_list): - if ioc not in iocs_list: - iocs_list.append(ioc) - ioc_file["count"] += 1 + def _add_indicator(self, ioc, ioc_coll, ioc_coll_list): + ioc = ioc.strip("'") + if ioc not in ioc_coll_list: + ioc_coll_list.append(ioc) + ioc_coll["count"] += 1 self.total_ioc_count += 1 def parse_stix2(self, file_path): @@ -77,69 +81,106 @@ class Indicators: """ self.log.info("Parsing STIX2 indicators file at path %s", file_path) - ioc_file = self._generate_indicators_file() - ioc_file["file_path"] = file_path - ioc_file["file_name"] = os.path.basename(file_path) - with open(file_path, "r", encoding="utf-8") as handle: try: data = json.load(handle) except json.decoder.JSONDecodeError: - self.log.critical("Unable to parse STIX2 indicator file. The file is malformed or in the wrong format.") + self.log.critical("Unable to parse STIX2 indicator file. " \ + "The file is corrupted or in the wrong format!") return + malware = {} + indicators = [] + relationships = [] for entry in data.get("objects", []): entry_type = entry.get("type", "") if entry_type == "malware": - ioc_file["name"] = entry.get("name", "") or ioc_file["file_name"] - ioc_file["description"] = entry.get("description", "") or ioc_file["file_name"] - continue + malware[entry["id"]] = { + "name": entry["name"], + "description": entry["description"], + } + elif entry_type == "indicator": + indicators.append(entry) + elif entry_type == "relationship": + relationships.append(entry) - if entry_type != "indicator": - continue + collections = [] + for mal_id, mal_values in malware.items(): + collection = self._new_collection(mal_id, mal_values.get("name"), + mal_values.get("description"), + os.path.basename(file_path), + file_path) + collections.append(collection) - key, value = entry.get("pattern", "").strip("[]").split("=") - value = value.strip("'") + # We loop through all indicators. + for indicator in indicators: + malware_id = None + malware_name = None + malware_description = None - if key == "domain-name:value": - # We force domain names to lower case. - self._add_indicator(ioc=value.lower(), - ioc_file=ioc_file, - iocs_list=ioc_file["domains"]) - elif key == "process:name": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["processes"]) - elif key == "email-addr:value": - # We force email addresses to lower case. - self._add_indicator(ioc=value.lower(), - ioc_file=ioc_file, - iocs_list=ioc_file["emails"]) - elif key == "file:name": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["file_names"]) - elif key == "file:path": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["file_paths"]) - elif key == "file:hashes.sha256": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["files_sha256"]) - elif key == "app:id": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["app_ids"]) - elif key == "configuration-profile:id": - self._add_indicator(ioc=value, - ioc_file=ioc_file, - iocs_list=ioc_file["ios_profile_ids"]) + # We loop through all relationships and find the one pertinent to + # the current indicator. + for relationship in relationships: + if relationship["source_ref"] != indicator["id"]: + continue - self.log.info("Loaded %d indicators from \"%s\" indicators file", - ioc_file["count"], ioc_file["name"]) + # Look for a malware definition with the correct identifier. + if relationship["target_ref"] in malware.keys(): + malware_id = relationship["target_ref"] + malware_name = malware[relationship["target_ref"]].get("name", "") + malware_description = malware[relationship["target_ref"]].get("description", "") + break - self.ioc_files.append(ioc_file) + # Now we look for the correct collection matching the malware ID we + # got from the relationship. + for collection in collections: + if collection["id"] != malware_id: + continue + + key, value = indicator.get("pattern", "").strip("[]").split("=") + + if key == "domain-name:value": + # We force domain names to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["domains"]) + elif key == "process:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["processes"]) + elif key == "email-addr:value": + # We force email addresses to lower case. + self._add_indicator(ioc=value.lower(), + ioc_coll=collection, + ioc_coll_list=collection["emails"]) + elif key == "file:name": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_names"]) + elif key == "file:path": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["file_paths"]) + elif key == "file:hashes.sha256": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["files_sha256"]) + elif key == "app:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["app_ids"]) + elif key == "configuration-profile:id": + self._add_indicator(ioc=value, + ioc_coll=collection, + ioc_coll_list=collection["ios_profile_ids"]) + + break + + for coll in collections: + self.log.info("Extracted %d indicators for collection with name \"%s\"", + coll["count"], coll["name"]) + + self.ioc_collections.extend(collections) def load_indicators_files(self, files, load_default=True): """ @@ -149,7 +190,8 @@ class Indicators: if os.path.isfile(file_path): self.parse_stix2(file_path) else: - self.log.warning("This indicators file %s does not exist", file_path) + self.log.warning("No indicators file exists at path %s", + file_path) # Load downloaded indicators and any indicators from env variable. if load_default: @@ -159,12 +201,13 @@ class Indicators: self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) def get_iocs(self, ioc_type): - for ioc_file in self.ioc_files: - for ioc in ioc_file.get(ioc_type, []): + for ioc_collection in self.ioc_collections: + for ioc in ioc_collection.get(ioc_type, []): yield { "value": ioc, "type": ioc_type, - "name": ioc_file["name"] + "name": ioc_collection["name"], + "stix2_file_name": ioc_collection["stix2_file_name"], } def check_domain(self, url): @@ -424,18 +467,18 @@ def download_indicators_files(log): for ioc_entry in res.json(): ioc_url = ioc_entry["stix2_url"] - log.info("Downloading indicator file '%s' from '%s'", ioc_entry["name"], ioc_url) + log.info("Downloading indicator file %s from %s", ioc_entry["name"], ioc_url) res = requests.get(ioc_url) if res.status_code != 200: - log.warning("Could not find indicator file '%s'", ioc_url) + log.warning("Could not find indicator file %s", ioc_url) continue clean_file_name = ioc_url.lstrip("https://").replace("/", "_") ioc_path = os.path.join(data_dir, clean_file_name) # Write file to disk. This will overwrite any older version of the STIX2 file. - with open(ioc_path, "w", encoding="utf-8") as f: - f.write(res.text) + with open(ioc_path, "w", encoding="utf-8") as handle: + handle.write(res.text) - log.info("Saved indicator file to '%s'", os.path.basename(ioc_path)) + log.info("Saved indicator file to %s", os.path.basename(ioc_path)) diff --git a/mvt/common/utils.py b/mvt/common/utils.py index 30eb1c4..202323a 100644 --- a/mvt/common/utils.py +++ b/mvt/common/utils.py @@ -73,7 +73,7 @@ def check_for_links(text): :returns: Search results. """ - return re.findall("(?Phttps?://[^\s]+)", text, re.IGNORECASE) + return re.findall(r"(?Phttps?://[^\s]+)", text, re.IGNORECASE) def get_sha256_from_file_path(file_path):