Improves Indicator object

This commit is contained in:
tek
2023-04-07 15:07:45 +02:00
parent a2cbaacfce
commit 1dc1ee2238
+14 -10
View File
@@ -15,13 +15,15 @@ from .url import URL
MVT_DATA_FOLDER = user_data_dir("mvt")
MVT_INDICATORS_FOLDER = os.path.join(MVT_DATA_FOLDER, "indicators")
logger = logging.getLogger(__name__)
class Indicators:
"""This class is used to parse indicators from a STIX2 file and provide
functions to compare extracted artifacts to the indicators.
"""
def __init__(self, log=logging.Logger) -> None:
def __init__(self, log=logger) -> None:
self.log = log
self.ioc_collections: List[Dict[str, Any]] = []
self.total_ioc_count = 0
@@ -215,7 +217,7 @@ class Indicators:
self.log.info("Loaded a total of %d unique indicators",
self.total_ioc_count)
def get_iocs(self, ioc_type: str) -> Union[Iterator[Dict[str, Any]], None]:
def get_iocs(self, ioc_type: str) -> Iterator[Dict[str, Any]]:
for ioc_collection in self.ioc_collections:
for ioc in ioc_collection.get(ioc_type, []):
yield {
@@ -233,8 +235,6 @@ class Indicators:
:returns: Indicator details if matched, otherwise None
"""
# TODO: If the IOC domain contains a subdomain, it is not currently
# being matched.
if not url:
return None
if not isinstance(url, str):
@@ -249,15 +249,17 @@ class Indicators:
# HTTP HEAD request.
unshortened = orig_url.unshorten()
# self.log.info("Found a shortened URL %s -> %s",
# url, unshortened)
self.log.debug("Found a shortened URL %s -> %s",
url, unshortened)
if unshortened is None:
return None
# Now we check for any nested URL shorteners.
dest_url = URL(unshortened)
if dest_url.check_if_shortened():
# self.log.info("Original URL %s appears to shorten another "
# "shortened URL %s ... checking!",
# orig_url.url, dest_url.url)
self.log.debug("Original URL %s appears to shorten another "
"shortened URL %s ... checking!",
orig_url.url, dest_url.url)
return self.check_domain(dest_url.url)
final_url = dest_url
@@ -444,7 +446,7 @@ class Indicators:
return None
def check_file_path_process(self, file_path: str) -> Union[dict, None]:
def check_file_path_process(self, file_path: str) -> Optional[Dict[str, Any]]:
"""Check the provided file path contains a process name from the
list of indicators
@@ -465,6 +467,8 @@ class Indicators:
file_path, ioc["name"])
return ioc
return None
def check_profile(self, profile_uuid: str) -> Union[dict, None]:
"""Check the provided configuration profile UUID against the list of
indicators.