diff --git a/mvt/common/indicators.py b/mvt/common/indicators.py index 63c4582..e764a86 100644 --- a/mvt/common/indicators.py +++ b/mvt/common/indicators.py @@ -7,7 +7,9 @@ import json import logging import os from typing import Any, Dict, Iterator, List, Optional, Union +from functools import lru_cache +import ahocorasick from appdirs import user_data_dir from .url import URL @@ -241,6 +243,40 @@ class Indicators: "stix2_file_name": ioc_collection["stix2_file_name"], } + @lru_cache() + def get_ioc_matcher( + self, ioc_type: Optional[str] = None, ioc_list: Optional[list] = None + ) -> ahocorasick.Automaton: + """ + Build an Aho-Corasick automaton from a list of iocs (i.e indicators) + Returns an Aho-Corasick automaton + + This data-structue and algorithim allows for fast matching of a large number + of match strings (i.e IOCs) against a large body of text. This will also + match strings containing the IOC, so it is important to confirm the + match is a valid IOC before using it. + + for _, ioc in domains_automaton.iter(url.domain.lower()): + if ioc.value == url.domain.lower(): + print(ioc) + + We use an LRU cache to avoid rebuilding the automaton every time we call a + function such as check_domain(). + """ + automaton = ahocorasick.Automaton() + if ioc_type: + iocs = self.get_iocs(ioc_type) + elif ioc_list: + iocs = ioc_list + else: + raise ValueError("Must provide either ioc_tyxpe or ioc_list") + + for ioc in iocs: + automaton.add_word(ioc["value"], ioc) + automaton.make_automaton() + return automaton + + @lru_cache() def check_domain(self, url: str) -> Union[dict, None]: """Check if a given URL matches any of the provided domain indicators. @@ -254,6 +290,9 @@ class Indicators: if not isinstance(url, str): return None + # Create an Aho-Corasick automaton from the list of domains + domain_matcher = self.get_ioc_matcher("domains") + try: # First we use the provided URL. orig_url = URL(url) @@ -265,6 +304,7 @@ class Indicators: self.log.debug("Found a shortened URL %s -> %s", url, unshortened) if unshortened is None: + self.log.warning("Unable to unshorten URL %s", url) return None # Now we check for any nested URL shorteners. @@ -285,12 +325,13 @@ class Indicators: except Exception: # If URL parsing failed, we just try to do a simple substring # match. - for ioc in self.get_iocs("domains"): + for idx, ioc in domain_matcher.iter(url): if ioc["value"].lower() in url: self.log.warning( "Maybe found a known suspicious domain %s " - 'matching indicators from "%s"', + 'matching indicator "%s" from "%s"', url, + ioc["value"], ioc["name"], ) return ioc @@ -300,43 +341,47 @@ class Indicators: # If all parsing worked, we start walking through available domain # indicators. - for ioc in self.get_iocs("domains"): + for idx, ioc in domain_matcher.iter(final_url.domain.lower()): # First we check the full domain. if final_url.domain.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: self.log.warning( "Found a known suspicious domain %s " - 'shortened as %s matching indicators from "%s"', + 'shortened as %s matching indicator "%s" from "%s"', final_url.url, orig_url.url, + ioc["value"], ioc["name"], ) else: self.log.warning( "Found a known suspicious domain %s " - 'matching indicators from "%s"', + 'matching indicator "%s" from "%s"', final_url.url, + ioc["value"], ioc["name"], ) - return ioc - # Then we just check the top level domain. + # Then we just check the top level domain. + for idx, ioc in domain_matcher.iter(final_url.top_level.lower()): if final_url.top_level.lower() == ioc["value"]: if orig_url.is_shortened and orig_url.url != final_url.url: self.log.warning( "Found a sub-domain with suspicious top " "level %s shortened as %s matching " - 'indicators from "%s"', + 'indicator "%s" from "%s"', final_url.url, orig_url.url, + ioc["value"], ioc["name"], ) else: self.log.warning( "Found a sub-domain with a suspicious top " - 'level %s matching indicators from "%s"', + 'level %s matching indicator "%s" from "%s"', final_url.url, + ioc["value"], ioc["name"], ) diff --git a/setup.cfg b/setup.cfg index 80edd89..f2a3139 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,6 +33,7 @@ install_requires = libusb1 >=3.0.0 cryptography >=38.0.1 pyyaml >=6.0 + pyahocorasick >= 2.0.0 [options.packages.find] where = ./