mirror of
https://github.com/mvt-project/mvt.git
synced 2026-06-10 00:43:53 +02:00
c782d79974
* Run bugreport and backup modules during check-androidqf Adding support to automatically run ADB backup and bugreport modules automatically when running the check-androidqf command. This is a first step to deduplicate the code for Android modules. * Deduplicate modules which are run by the sub-commands. * Raise the proper NoAndroidQFBackup exception when a back-up isn't found * Remove check-adb command and update docs * Remove check-apk code and old dependencies * Major refactor to add structured alerting and typed indicators This commit makes a structural change to MVT by changing binary detected/not detected logic into a structured multi-level system of alerts. This gives far more power to extend MVT and manage alerts. This commit also begins the process of adding proper typing for key objects used in MVT including Indicators, IndicatorMatches, and ModuleResults. This will also be keep to programmatically using the output of MVT. * Fix up, remove ADB module base * Rework old detections tracking into stuctured alert levels * Quote STIX path in log line * Fix profile events log line * Close open archive (zip/tar) file handles * Fix root_binaries and mounts modules to use alertstore * Update tests to use alertstore instead of detected attribute * Fix alertstore method calls - use high() instead of warning() * Fix remaining test errors - Add log_latest() call in root_binaries to log each alert - Fix UnboundLocalError in cmd_check_androidqf by initializing bugreport variable - Remove incorrect backup.close() call since load_backup() returns bytes - Remove duplicate from_ab method in cmd_check_backup that was using old attributes * Log alerts on add * Remove slug from alertstore calls * update alerts.py * update alerts.py * move indicator_match to alert object * . * - Remove timeline_detected and route to alertstore * fix typing for mypy * Remove unused type imports * Fix check_receiver_prefix and check_android_property_name - check_receiver_prefix() used dict syntax (ioc["value"]) on Indicator dataclass objects from get_iocs(). Changed to ioc.value/ioc.name. - check_receiver_prefix() returned raw ioc instead of IndicatorMatch. Now returns IndicatorMatch with descriptive message. - Fixed return type annotations on both methods to Optional[IndicatorMatch]. - Removed unused Union import. * Fix residual self.detected usage in packages and dumpsys_receivers These modules still used self.detected.append() which no longer exists after the alertstore migration. Converted to alertstore calls: - packages.py: ROOT_PACKAGES detection → alertstore.high() - dumpsys_receivers.py: receiver IOC match → alertstore.critical() * Fix SMS module alertstore.high() call passing slug as message The first argument was self.get_slug() (module slug) instead of a human-readable message. The module is already auto-detected via AlertStore._get_calling_module(). Also removed redundant log_latest(). * Apply suggestions from code review Fix JSON serialization in `module.save_to_json` and fix argument order in iOS alertstore calls. Co-authored-by: tes <tesitura@users.noreply.github.com> * Remove unsupported ADB modules * Fail removed check-adb command * Fix alert serialization and logging * Close sqlite connections in iOS modules * Fix DEBUG messages not reaching handlers, save_to_json for dictionary results and TypeError on mixed event_time types in safary_history * add matched_indicator via alertstore instead of directly modifying json objects * Alert on battery daily uninstall and downgrade * Lower alert severity to medium for suspicious items * Switch version to 2026.4.28 CalVer --------- Co-authored-by: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org> Co-authored-by: tes <tesitura@users.noreply.github.com> Co-authored-by: Janik Besendorf <janik.besendorf@reporter-ohne-grenzen.de>
786 lines
28 KiB
Python
786 lines
28 KiB
Python
# Mobile Verification Toolkit (MVT)
|
|
# Copyright (c) 2021-2023 The MVT Authors.
|
|
# Use of this software is governed by the MVT License 1.1 that can be found at
|
|
# https://license.mvt.re/1.1/
|
|
|
|
import glob
|
|
import json
|
|
import logging
|
|
import os
|
|
from dataclasses import dataclass
|
|
from functools import lru_cache
|
|
from typing import Any, Dict, Iterator, List, Optional
|
|
|
|
import ahocorasick
|
|
from appdirs import user_data_dir
|
|
|
|
from .config import settings
|
|
from .url import URL
|
|
|
|
MVT_DATA_FOLDER = user_data_dir("mvt")
|
|
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class Indicator:
|
|
value: str
|
|
type: str
|
|
name: str
|
|
stix2_file_name: str
|
|
|
|
|
|
@dataclass
|
|
class IndicatorMatch:
|
|
ioc: Indicator
|
|
message: str
|
|
|
|
|
|
class Indicators:
|
|
"""This class is used to parse indicators from a STIX2 file and provide
|
|
functions to compare extracted artifacts to the indicators.
|
|
"""
|
|
|
|
def __init__(self, log=logger) -> None:
|
|
self.log = log
|
|
self.ioc_collections: List[Dict[str, Any]] = []
|
|
self.total_ioc_count = 0
|
|
|
|
def _load_downloaded_indicators(self) -> None:
|
|
if not os.path.isdir(MVT_INDICATORS_FOLDER):
|
|
return
|
|
|
|
for ioc_file_name in os.listdir(MVT_INDICATORS_FOLDER):
|
|
if ioc_file_name.lower().endswith(".stix2"):
|
|
self.parse_stix2(os.path.join(MVT_INDICATORS_FOLDER, ioc_file_name))
|
|
|
|
def _check_stix2_env_variable(self) -> None:
|
|
"""
|
|
Checks if MVT_STIX2 setting or environment variable contains path to a STIX file. Also recursively searches through dirs in MVT_STIX2
|
|
"""
|
|
if not settings.STIX2:
|
|
return
|
|
|
|
paths = settings.STIX2.split(":")
|
|
for path in paths:
|
|
if os.path.isfile(path) and path.lower().endswith(".stix2"):
|
|
self.parse_stix2(path)
|
|
elif os.path.isdir(path):
|
|
for file in glob.glob(os.path.join(path, "**", "*.stix2"), recursive=True):
|
|
self.parse_stix2(file)
|
|
else:
|
|
self.log.error(
|
|
"Path specified with env MVT_STIX2 is not a valid path: '%s'", path
|
|
)
|
|
|
|
def _new_collection(
|
|
self,
|
|
cid: Optional[str] = None,
|
|
name: Optional[str] = None,
|
|
description: Optional[str] = None,
|
|
file_name: Optional[str] = None,
|
|
file_path: Optional[str] = None,
|
|
) -> dict:
|
|
return {
|
|
"id": cid,
|
|
"name": name,
|
|
"description": description,
|
|
"stix2_file_name": file_name,
|
|
"stix2_file_path": file_path,
|
|
"domains": [],
|
|
"processes": [],
|
|
"emails": [],
|
|
"file_names": [],
|
|
"file_paths": [],
|
|
"files_md5": [],
|
|
"files_sha1": [],
|
|
"files_sha256": [],
|
|
"app_cert_hashes": [],
|
|
"app_ids": [],
|
|
"ios_profile_ids": [],
|
|
"android_property_names": [],
|
|
"urls": [],
|
|
"count": 0,
|
|
}
|
|
|
|
def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None:
|
|
ioc = ioc.replace("'", "").strip()
|
|
if ioc not in ioc_coll_list:
|
|
ioc_coll_list.append(ioc)
|
|
ioc_coll["count"] += 1
|
|
self.total_ioc_count += 1
|
|
|
|
def _process_indicator(self, indicator: dict, collection: dict) -> None:
|
|
key, value = indicator.get("pattern", "").strip("[]").split("=")
|
|
key = key.strip()
|
|
|
|
# Normalize hash algorithm keys so that both the STIX2-spec-compliant
|
|
# form (e.g. file:hashes.'SHA-256', which requires quotes around
|
|
# algorithm names that contain hyphens) and the non-standard lowercase
|
|
# form (e.g. file:hashes.sha256) are accepted. Strip single quotes and
|
|
# hyphens from the algorithm name only, then lowercase it.
|
|
for sep in ("hashes.", "cert."):
|
|
if sep in key:
|
|
prefix, _, algo = key.partition(sep)
|
|
key = prefix + sep + algo.replace("'", "").replace("-", "").lower()
|
|
break
|
|
|
|
if key == "domain-name:value":
|
|
# We force domain names to lower case.
|
|
self._add_indicator(
|
|
ioc=value.lower(),
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["domains"],
|
|
)
|
|
elif key == "ipv4-addr:value":
|
|
# We treat IP addresses as simple domains here to ease checks.
|
|
self._add_indicator(
|
|
ioc=value.strip(),
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["domains"],
|
|
)
|
|
elif key == "process:name":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["processes"]
|
|
)
|
|
elif key == "email-addr:value":
|
|
# We force email addresses to lower case.
|
|
self._add_indicator(
|
|
ioc=value.lower(),
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["emails"],
|
|
)
|
|
elif key == "file:name":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_names"]
|
|
)
|
|
elif key == "file:path":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_paths"]
|
|
)
|
|
elif key == "file:hashes.md5":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_md5"]
|
|
)
|
|
elif key == "file:hashes.sha1":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha1"]
|
|
)
|
|
elif key == "file:hashes.sha256":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha256"]
|
|
)
|
|
elif key == "app:cert.md5":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["app_cert_hashes"],
|
|
)
|
|
elif key == "app:cert.sha1":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["app_cert_hashes"],
|
|
)
|
|
elif key == "app:cert.sha256":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["app_cert_hashes"],
|
|
)
|
|
elif key == "app:id":
|
|
self._add_indicator(
|
|
ioc=value, ioc_coll=collection, ioc_coll_list=collection["app_ids"]
|
|
)
|
|
elif key == "configuration-profile:id":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["ios_profile_ids"],
|
|
)
|
|
elif key == "android-property:name":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["android_property_names"],
|
|
)
|
|
elif key == "url:value":
|
|
self._add_indicator(
|
|
ioc=value,
|
|
ioc_coll=collection,
|
|
ioc_coll_list=collection["urls"],
|
|
)
|
|
else:
|
|
self.log.debug("Can't add indicator %s, type %s not supported", value, key)
|
|
|
|
def parse_stix2(self, file_path: str) -> None:
|
|
"""Extract indicators from a STIX2 file.
|
|
|
|
:param file_path: Path to the STIX2 file to parse
|
|
:type file_path: str
|
|
|
|
"""
|
|
self.log.info("Parsing STIX2 indicators file at path '%s'", file_path)
|
|
|
|
with open(file_path, "r", encoding="utf-8") as handle:
|
|
try:
|
|
data = json.load(handle)
|
|
except json.decoder.JSONDecodeError:
|
|
self.log.warning(
|
|
"Unable to parse STIX2 indicator file. "
|
|
"The file is corrupted or in the wrong format!"
|
|
)
|
|
return
|
|
|
|
malware = {}
|
|
indicators = []
|
|
relationships = []
|
|
reports = []
|
|
for entry in data.get("objects", []):
|
|
entry_type = entry.get("type", "")
|
|
# Consider both malware and reports as collections
|
|
if entry_type == "malware":
|
|
malware[entry["id"]] = {
|
|
"name": entry["name"],
|
|
"description": entry.get("description", ""),
|
|
}
|
|
elif entry_type == "report":
|
|
reports.append(entry)
|
|
elif entry_type == "indicator":
|
|
indicators.append(entry)
|
|
elif entry_type == "relationship":
|
|
relationships.append(entry)
|
|
|
|
collections = []
|
|
for mal_id, mal_values in malware.items():
|
|
collection = self._new_collection(
|
|
mal_id,
|
|
mal_values.get("name"),
|
|
mal_values.get("description"),
|
|
os.path.basename(file_path),
|
|
file_path,
|
|
)
|
|
collections.append(collection)
|
|
|
|
for report in reports:
|
|
collection = self._new_collection(
|
|
report["id"],
|
|
report.get("name", ""),
|
|
report.get("description", ""),
|
|
os.path.basename(file_path),
|
|
file_path,
|
|
)
|
|
collections.append(collection)
|
|
|
|
# Adds a default collection
|
|
default_collection = self._new_collection(
|
|
"0",
|
|
"Default collection",
|
|
"Collection with IOCs unrelated to malware or reports",
|
|
os.path.basename(file_path),
|
|
file_path,
|
|
)
|
|
|
|
# We loop through all indicators.
|
|
for indicator in indicators:
|
|
malware_id = None
|
|
|
|
# We loop through reports first to see if the indicator is in the refs
|
|
for report in reports:
|
|
for ref in report.get("object_refs", []):
|
|
if ref == indicator["id"]:
|
|
malware_id = report["id"]
|
|
break
|
|
|
|
if malware_id is None:
|
|
# We loop through all relationships and find the one pertinent to
|
|
# the current indicator.
|
|
for relationship in relationships:
|
|
if relationship["source_ref"] != indicator["id"]:
|
|
continue
|
|
|
|
# Look for a malware definition with the correct identifier.
|
|
if relationship["target_ref"] in malware.keys():
|
|
malware_id = relationship["target_ref"]
|
|
break
|
|
|
|
if malware_id is not None:
|
|
# Now we look for the correct collection matching the malware ID we
|
|
# got from the relationship.
|
|
for collection in collections:
|
|
if collection["id"] == malware_id:
|
|
self._process_indicator(indicator, collection)
|
|
break
|
|
else:
|
|
# Adds to the default collection
|
|
self._process_indicator(indicator, default_collection)
|
|
|
|
for coll in collections:
|
|
self.log.debug(
|
|
'Extracted %d indicators for collection with name "%s"',
|
|
coll["count"],
|
|
coll["name"],
|
|
)
|
|
|
|
self.ioc_collections.extend(collections)
|
|
if default_collection["count"] > 0:
|
|
# Adds the default collection only if therare some IOCs in it
|
|
self.ioc_collections.append(default_collection)
|
|
|
|
def load_indicators_files(
|
|
self, files: list, load_default: Optional[bool] = True
|
|
) -> None:
|
|
"""
|
|
Load a list of indicators files.
|
|
"""
|
|
for file_path in files:
|
|
if os.path.isfile(file_path):
|
|
self.parse_stix2(file_path)
|
|
else:
|
|
self.log.error("No indicators file exists at path %s", file_path)
|
|
|
|
# Load downloaded indicators and any indicators from env variable.
|
|
if load_default:
|
|
self._load_downloaded_indicators()
|
|
|
|
self._check_stix2_env_variable()
|
|
self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count)
|
|
|
|
def get_iocs(self, ioc_type: str) -> Iterator[Indicator]:
|
|
for ioc_collection in self.ioc_collections:
|
|
for ioc in ioc_collection.get(ioc_type, []):
|
|
yield Indicator(
|
|
value=ioc,
|
|
type=ioc_type,
|
|
name=ioc_collection["name"],
|
|
stix2_file_name=ioc_collection["stix2_file_name"],
|
|
)
|
|
|
|
@lru_cache()
|
|
def get_ioc_matcher(
|
|
self, ioc_type: Optional[str] = None, ioc_list: Optional[List[Indicator]] = None
|
|
) -> ahocorasick.Automaton:
|
|
"""
|
|
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
|
|
Returns an Aho-Corasick automaton
|
|
|
|
This data-structue and algorithm allows for fast matching of a large number
|
|
of match strings (i.e IOCs) against a large body of text. This will also
|
|
match strings containing the IOC, so it is important to confirm the
|
|
match is a valid IOC before using it.
|
|
|
|
for _, ioc in domains_automaton.iter(url.domain.lower()):
|
|
if ioc.value == url.domain.lower():
|
|
print(ioc)
|
|
|
|
We use an LRU cache to avoid rebuilding the automaton every time we call a
|
|
function such as check_url().
|
|
"""
|
|
automaton = ahocorasick.Automaton()
|
|
if ioc_type:
|
|
iocs: Iterator[Indicator] = self.get_iocs(ioc_type)
|
|
elif ioc_list:
|
|
iocs = iter(ioc_list)
|
|
else:
|
|
raise ValueError("Must provide either ioc_type or ioc_list")
|
|
|
|
for ioc in iocs:
|
|
automaton.add_word(ioc.value, ioc)
|
|
automaton.make_automaton()
|
|
return automaton
|
|
|
|
@lru_cache()
|
|
def check_url(self, url: str) -> Optional[IndicatorMatch]:
|
|
"""Check if a given URL matches any of the provided domain indicators.
|
|
|
|
:param url: URL to match against domain indicators
|
|
:type url: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not url or not isinstance(url, str):
|
|
return None
|
|
|
|
# Check the URL first
|
|
for ioc in self.get_iocs("urls"):
|
|
if ioc.value == url:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious URL {url} matching indicator "{ioc.value}" from "{ioc.name}"',
|
|
)
|
|
|
|
# Then check the domain
|
|
# Create an Aho-Corasick automaton from the list of urls
|
|
domain_matcher = self.get_ioc_matcher("domains")
|
|
try:
|
|
# First we use the provided URL.
|
|
orig_url = URL(url)
|
|
|
|
if orig_url.check_if_shortened():
|
|
# If it is, we try to retrieve the actual URL making an
|
|
# HTTP HEAD request.
|
|
unshortened = orig_url.unshorten()
|
|
|
|
self.log.debug("Found a shortened URL %s -> %s", url, unshortened)
|
|
if unshortened is None:
|
|
self.log.warning("Unable to unshorten URL %s", url)
|
|
return None
|
|
|
|
# Now we check for any nested URL shorteners.
|
|
dest_url = URL(unshortened)
|
|
if dest_url.check_if_shortened():
|
|
self.log.debug(
|
|
"Original URL %s appears to shorten another "
|
|
"shortened URL %s ... checking!",
|
|
orig_url.url,
|
|
dest_url.url,
|
|
)
|
|
return self.check_url(dest_url.url)
|
|
|
|
final_url = dest_url
|
|
else:
|
|
# If it's not shortened, we just use the original URL object.
|
|
final_url = orig_url
|
|
except Exception:
|
|
# If URL parsing failed, we just try to do a simple substring
|
|
# match.
|
|
for _, ioc in domain_matcher.iter(url):
|
|
if ioc.value.lower() in url:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Maybe found a known suspicious domain {url} matching indicator "{ioc.value}" from "{ioc.name}"',
|
|
)
|
|
|
|
# If nothing matched, we can quit here.
|
|
return None
|
|
|
|
# If all parsing worked, we start walking through available domain
|
|
# indicators.
|
|
for _, ioc in domain_matcher.iter(final_url.domain.lower()):
|
|
# First we check the full domain.
|
|
if final_url.domain.lower() == ioc.value:
|
|
if orig_url.is_shortened and orig_url.url != final_url.url:
|
|
message = f'Found a known suspicious domain {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
|
else:
|
|
message = f'Found a known suspicious domain {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
|
|
|
return IndicatorMatch(ioc=ioc, message=message)
|
|
|
|
# Then we just check the top level domain.
|
|
for _, ioc in domain_matcher.iter(final_url.top_level.lower()):
|
|
if final_url.top_level.lower() == ioc.value:
|
|
if orig_url.is_shortened and orig_url.url != final_url.url:
|
|
message = f'Found a sub-domain with suspicious top level {final_url.url} shortened as {orig_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
|
else:
|
|
message = f'Found a sub-domain with a suspicious top level {final_url.url} matching indicator "{ioc.value}" from "{ioc.name}"'
|
|
|
|
return IndicatorMatch(ioc=ioc, message=message)
|
|
|
|
return None
|
|
|
|
def check_urls(self, urls: list) -> Optional[IndicatorMatch]:
|
|
"""Check a list of URLs against the provided list of domain indicators.
|
|
|
|
:param urls: List of URLs to check against domain indicators
|
|
:type urls: list
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not urls:
|
|
return None
|
|
|
|
for url in urls:
|
|
check = self.check_url(url)
|
|
if check:
|
|
return check
|
|
|
|
return None
|
|
|
|
def check_process(self, process: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided process name against the list of process
|
|
indicators.
|
|
|
|
:param process: Process name to check against process indicators
|
|
:type process: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not process:
|
|
return None
|
|
|
|
proc_name = os.path.basename(process)
|
|
for ioc in self.get_iocs("processes"):
|
|
if proc_name == ioc.value:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious process name "{process}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
if len(proc_name) == 16:
|
|
if ioc.value.startswith(proc_name):
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a truncated known suspicious process name "{process}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_processes(self, processes: list) -> Optional[IndicatorMatch]:
|
|
"""Check the provided list of processes against the list of
|
|
process indicators.
|
|
|
|
:param processes: List of processes to check against process indicators
|
|
:type processes: list
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not processes:
|
|
return None
|
|
|
|
for process in processes:
|
|
check = self.check_process(process)
|
|
if check:
|
|
return check
|
|
|
|
return None
|
|
|
|
def check_email(self, email: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided email against the list of email indicators.
|
|
|
|
:param email: Email address to check against email indicators
|
|
:type email: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not email:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("emails"):
|
|
if email.lower() == ioc.value.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious email address "{email}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_file_name(self, file_name: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided file name against the list of file indicators.
|
|
|
|
:param file_name: File name to check against file
|
|
indicators
|
|
:type file_name: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not file_name:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("file_names"):
|
|
if ioc.value == file_name:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious file name "{file_name}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_file_path(self, file_path: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided file path against the list of file indicators
|
|
(both path and name).
|
|
|
|
:param file_path: File path or file name to check against file
|
|
indicators
|
|
:type file_path: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not file_path:
|
|
return None
|
|
|
|
ioc_match = self.check_file_name(os.path.basename(file_path))
|
|
if ioc_match:
|
|
return ioc_match
|
|
|
|
for ioc in self.get_iocs("file_paths"):
|
|
# Strip any trailing slash from indicator paths to match
|
|
# directories.
|
|
if file_path.startswith(ioc.value.rstrip("/")):
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious file path "{file_path}" matching indicators form "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_file_path_process(self, file_path: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided file path contains a process name from the
|
|
list of indicators
|
|
|
|
:param file_path: File path or file name to check against file
|
|
indicators
|
|
:type file_path: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not file_path:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("processes"):
|
|
parts = file_path.split("/")
|
|
if ioc.value in parts:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found known suspicious process name mentioned in file at path "{file_path}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_profile(self, profile_uuid: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided configuration profile UUID against the list of
|
|
indicators.
|
|
|
|
:param profile_uuid: Profile UUID to check against configuration profile
|
|
indicators
|
|
:type profile_uuid: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not profile_uuid:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("ios_profile_ids"):
|
|
if profile_uuid in ioc.value:
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious profile ID "{profile_uuid}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_file_hash(self, file_hash: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided file hash against the list of indicators.
|
|
|
|
:param file_hash: hash to check
|
|
:type file_hash: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not file_hash:
|
|
return None
|
|
|
|
if len(file_hash) == 32:
|
|
hash_type = "md5"
|
|
elif len(file_hash) == 40:
|
|
hash_type = "sha1"
|
|
else:
|
|
hash_type = "sha256"
|
|
|
|
for ioc in self.get_iocs("files_" + hash_type):
|
|
if file_hash.lower() == ioc.value.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious file with hash "{file_hash}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_app_certificate_hash(self, cert_hash: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided cert hash against the list of indicators.
|
|
|
|
:param cert_hash: hash to check
|
|
:type cert_hash: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not cert_hash:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("app_cert_hashes"):
|
|
if cert_hash.lower() == ioc.value.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious app certfificate with hash "{cert_hash}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_app_id(self, app_id: str) -> Optional[IndicatorMatch]:
|
|
"""Check the provided app identifier (typically an Android package name)
|
|
against the list of indicators.
|
|
|
|
:param app_id: App ID to check against the list of indicators
|
|
:type app_id: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if not app_id:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("app_ids"):
|
|
if app_id.lower() == ioc.value.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious app with ID "{app_id}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_receiver_prefix(
|
|
self, receiver_name: str
|
|
) -> Optional[IndicatorMatch]:
|
|
"""Check the provided receiver name against the list of indicators.
|
|
An IoC match is detected when a substring of the receiver matches the indicator.
|
|
|
|
:param receiver_name: Receiver name to check against app ID indicators
|
|
:type receiver_name: str
|
|
:returns: IndicatorMatch if matched, otherwise None
|
|
|
|
"""
|
|
if not receiver_name:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("app_ids"):
|
|
if ioc.value.lower() in receiver_name.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious receiver with name "{receiver_name}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_android_property_name(
|
|
self, property_name: str
|
|
) -> Optional[IndicatorMatch]:
|
|
"""Check the android property name against the list of indicators.
|
|
|
|
:param property_name: Name of the Android property
|
|
:type property_name: str
|
|
:returns: Indicator details if matched, otherwise None
|
|
|
|
"""
|
|
if property_name is None:
|
|
return None
|
|
|
|
for ioc in self.get_iocs("android_property_names"):
|
|
if property_name.lower() == ioc.value.lower():
|
|
return IndicatorMatch(
|
|
ioc=ioc,
|
|
message=f'Found a known suspicious Android property "{property_name}" matching indicators from "{ioc.name}"',
|
|
)
|
|
|
|
return None
|
|
|
|
def check_domain(self, url: str) -> Optional[IndicatorMatch]:
|
|
"""
|
|
Renamed check_url now, kept for compatibility
|
|
"""
|
|
return self.check_url(url)
|
|
|
|
def check_domains(self, urls: list) -> Optional[IndicatorMatch]:
|
|
"""
|
|
Renamed check_domains, kept for compatibility
|
|
"""
|
|
return self.check_urls(urls)
|