From 1dc1ee2238c425eab77f95007b053c91cd1eec05 Mon Sep 17 00:00:00 2001 From: tek Date: Fri, 7 Apr 2023 15:07:45 +0200 Subject: [PATCH] Improves Indicator object --- mvt/common/indicators.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 6cf3056..a933835 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -15,13 +15,15 @@ from .url import URL MVT_DATA_FOLDER = user_data_dir("mvt") MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators") +logger = logging.getLogger(__name__) + class Indicators: """This class is used to parse indicators from a STIX2 file and provide functions to compare extracted artifacts to the indicators. """ - def __init__(self, log=logging.Logger) -> None: + def __init__(self, log=logger) -> None: self.log = log self.ioc_collections: List[Dict[str, Any]] = [] self.total_ioc_count = 0 @@ -215,7 +217,7 @@ class Indicators: self.log.info("Loaded a total of %d unique indicators", self.total_ioc_count) - def get_iocs(self, ioc_type: str) -> Union[Iterator[Dict[str, Any]], None]: + def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]: for ioc_collection in self.ioc_collections: for ioc in ioc_collection.get(ioc_type, []): yield { @@ -233,8 +235,6 @@ class Indicators: :returns: Indicator details if matched, otherwise None """ - # TODO: If the IOC domain contains a subdomain, it is not currently - # being matched. if not url: return None if not isinstance(url, str): @@ -249,15 +249,17 @@ class Indicators: # HTTP HEAD request. unshortened = orig_url.unshorten() - # self.log.info("Found a shortened URL %s -> %s", - # url, unshortened) + self.log.debug("Found a shortened URL %s -> %s", + url, unshortened) + if unshortened is None: + return None # Now we check for any nested URL shorteners. dest_url = URL(unshortened) if dest_url.check_if_shortened(): - # self.log.info("Original URL %s appears to shorten another " - # "shortened URL %s ... checking!", - # orig_url.url, dest_url.url) + self.log.debug("Original URL %s appears to shorten another " + "shortened URL %s ... checking!", + orig_url.url, dest_url.url) return self.check_domain(dest_url.url) final_url = dest_url @@ -444,7 +446,7 @@ class Indicators: return None - def check_file_path_process(self, file_path: str) -> Union[dict, None]: + def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]: """Check the provided file path contains a process name from the list of indicators @@ -465,6 +467,8 @@ class Indicators: file_path, ioc["name"]) return ioc + return None + def check_profile(self, profile_uuid: str) -> Union[dict, None]: """Check the provided configuration profile UUID against the list of indicators.