Improves STIX2 support and testing (#523)

* Improves STIX2 support and testing

* Adds documentation on STIX2 support in MVT

---------

Co-authored-by: Donncha Ó Cearbhaill <donncha.ocearbhaill@amnesty.org>
This commit is contained in:
Tek
2024-10-16 16:47:10 +02:00
committed by GitHub
parent 821943a859
commit 052c4e207b
26 changed files with 9648 additions and 66 deletions

View File

@@ -34,6 +34,13 @@ It is also possible to load STIX2 files automatically from the environment varia
export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
```
## STIX2 Support
So far MVT implements only a subset of [STIX2 specifications](https://docs.oasis-open.org/cti/stix/v2.1/csprd01/stix-v2.1-csprd01.html):
* It only supports checks for one value (such as `[domain-name:value='DOMAIN']`) and not boolean expressions over multiple comparisons
* It only supports the following types: `domain-name:value`, `process:name`, `email-addr:value`, `file:name`, `file:path`, `file:hashes.md5`, `file:hashes.sha1`, `file:hashes.sha256`, `app:id`, `configuration-profile:id`, `android-property:name`, `url:value` (but each type will only be checked by a module if it is relevant to the type of data obtained)
## Known repositories of STIX2 IOCs
- The [Amnesty International investigations repository](https://github.com/AmnestyTech/investigations) contains STIX-formatted IOCs for:
@@ -46,3 +53,6 @@ export MVT_STIX2="/home/user/IOC1.stix2:/home/user/IOC2.stix2"
You can automaticallly download the latest public indicator files with the command `mvt-ios download-iocs` or `mvt-android download-iocs`. These commands download the list of indicators from the [mvt-indicators](https://github.com/mvt-project/mvt-indicators/blob/main/indicators.yaml) repository and store them in the [appdir](https://pypi.org/project/appdirs/) folder. They are then loaded automatically by MVT.
Please [open an issue](https://github.com/mvt-project/mvt/issues/) to suggest new sources of STIX-formatted IOCs.

View File

@@ -51,8 +51,9 @@ class ChromeHistory(AndroidExtraction):
return
for result in self.results:
if self.indicators.check_domain(result["url"]):
if self.indicators.check_url(result["url"]):
self.detected.append(result)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse a Chrome History database file.

View File

@@ -85,8 +85,9 @@ class SMS(AndroidExtraction):
if message_links == []:
message_links = check_for_links(message["body"])
if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse an Android bugle_db SMS database file.

View File

@@ -55,8 +55,9 @@ class Whatsapp(AndroidExtraction):
continue
message_links = check_for_links(message["data"])
if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue
def _parse_db(self, db_path: str) -> None:
"""Parse an Android msgstore.db WhatsApp database file.

View File

@@ -43,8 +43,9 @@ class SMS(BackupExtraction):
if message_links == []:
message_links = check_for_links(message.get("text", ""))
if self.indicators.check_domains(message_links):
if self.indicators.check_urls(message_links):
self.detected.append(message)
continue
def run(self) -> None:
sms_path = "apps/com.android.providers.telephony/d_f/*_sms_backup"

View File

@@ -79,15 +79,18 @@ class Indicators:
"emails": [],
"file_names": [],
"file_paths": [],
"files_md5": [],
"files_sha1": [],
"files_sha256": [],
"app_ids": [],
"ios_profile_ids": [],
"android_property_names": [],
"urls": [],
"count": 0,
}
def _add_indicator(self, ioc: str, ioc_coll: dict, ioc_coll_list: list) -> None:
ioc = ioc.strip("'")
ioc = ioc.replace("'", "").strip()
if ioc not in ioc_coll_list:
ioc_coll_list.append(ioc)
ioc_coll["count"] += 1
@@ -95,6 +98,7 @@ class Indicators:
def _process_indicator(self, indicator: dict, collection: dict) -> None:
key, value = indicator.get("pattern", "").strip("[]").split("=")
key = key.strip()
if key == "domain-name:value":
# We force domain names to lower case.
@@ -122,6 +126,14 @@ class Indicators:
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["file_paths"]
)
elif key == "file:hashes.md5":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_md5"]
)
elif key == "file:hashes.sha1":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha1"]
)
elif key == "file:hashes.sha256":
self._add_indicator(
ioc=value, ioc_coll=collection, ioc_coll_list=collection["files_sha256"]
@@ -143,6 +155,14 @@ class Indicators:
ioc_coll=collection,
ioc_coll_list=collection["android_property_names"],
)
elif key == "url:value":
self._add_indicator(
ioc=value,
ioc_coll=collection,
ioc_coll_list=collection["urls"],
)
else:
self.log.debug("Can't add indicator %s, type %s not supported", value, key)
def parse_stix2(self, file_path: str) -> None:
"""Extract indicators from a STIX2 file.
@@ -166,13 +186,17 @@ class Indicators:
malware = {}
indicators = []
relationships = []
reports = []
for entry in data.get("objects", []):
entry_type = entry.get("type", "")
# Consider both malware and reports as collections
if entry_type == "malware":
malware[entry["id"]] = {
"name": entry["name"],
"description": entry.get("description", ""),
}
elif entry_type == "report":
reports.append(entry)
elif entry_type == "indicator":
indicators.append(entry)
elif entry_type == "relationship":
@@ -189,27 +213,58 @@ class Indicators:
)
collections.append(collection)
for report in reports:
collection = self._new_collection(
report["id"],
report.get("name", ""),
report.get("description", ""),
os.path.basename(file_path),
file_path,
)
collections.append(collection)
# Adds a default collection
default_collection = self._new_collection(
"0",
"Default collection",
"Collection with IOCs unrelated to malware or reports",
os.path.basename(file_path),
file_path,
)
# We loop through all indicators.
for indicator in indicators:
malware_id = None
# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue
# We loop through reports first to see if the indicator is in the refs
for report in reports:
for ref in report.get("object_refs", []):
if ref == indicator["id"]:
malware_id = report["id"]
break
# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
break
if malware_id is None:
# We loop through all relationships and find the one pertinent to
# the current indicator.
for relationship in relationships:
if relationship["source_ref"] != indicator["id"]:
continue
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
# Look for a malware definition with the correct identifier.
if relationship["target_ref"] in malware.keys():
malware_id = relationship["target_ref"]
break
if malware_id is not None:
# Now we look for the correct collection matching the malware ID we
# got from the relationship.
for collection in collections:
if collection["id"] == malware_id:
self._process_indicator(indicator, collection)
break
else:
# Adds to the default collection
self._process_indicator(indicator, default_collection)
for coll in collections:
self.log.debug(
@@ -219,6 +274,9 @@ class Indicators:
)
self.ioc_collections.extend(collections)
if default_collection["count"] > 0:
# Adds the default collection only if therare some IOCs in it
self.ioc_collections.append(default_collection)
def load_indicators_files(
self, files: list, load_default: Optional[bool] = True
@@ -257,7 +315,7 @@ class Indicators:
Build an Aho-Corasick automaton from a list of iocs (i.e indicators)
Returns an Aho-Corasick automaton
This data-structue and algorithim allows for fast matching of a large number
This data-structue and algorithm allows for fast matching of a large number
of match strings (i.e IOCs) against a large body of text. This will also
match strings containing the IOC, so it is important to confirm the
match is a valid IOC before using it.
@@ -267,7 +325,7 @@ class Indicators:
print(ioc)
We use an LRU cache to avoid rebuilding the automaton every time we call a
function such as check_domain().
function such as check_url().
"""
automaton = ahocorasick.Automaton()
if ioc_type:
@@ -275,7 +333,7 @@ class Indicators:
elif ioc_list:
iocs = ioc_list
else:
raise ValueError("Must provide either ioc_tyxpe or ioc_list")
raise ValueError("Must provide either ioc_type or ioc_list")
for ioc in iocs:
automaton.add_word(ioc["value"], ioc)
@@ -283,7 +341,7 @@ class Indicators:
return automaton
@lru_cache()
def check_domain(self, url: str) -> Union[dict, None]:
def check_url(self, url: str) -> Union[dict, None]:
"""Check if a given URL matches any of the provided domain indicators.
:param url: URL to match against domain indicators
@@ -296,9 +354,21 @@ class Indicators:
if not isinstance(url, str):
return None
# Create an Aho-Corasick automaton from the list of domains
domain_matcher = self.get_ioc_matcher("domains")
# Check the URL first
for ioc in self.get_iocs("urls"):
if ioc["value"] == url:
self.log.warning(
"Found a known suspicious URL %s "
'matching indicator "%s" from "%s"',
url,
ioc["value"],
ioc["name"],
)
return ioc
# Then check the domain
# Create an Aho-Corasick automaton from the list of urls
domain_matcher = self.get_ioc_matcher("domains")
try:
# First we use the provided URL.
orig_url = URL(url)
@@ -322,7 +392,7 @@ class Indicators:
orig_url.url,
dest_url.url,
)
return self.check_domain(dest_url.url)
return self.check_url(dest_url.url)
final_url = dest_url
else:
@@ -395,7 +465,7 @@ class Indicators:
return None
def check_domains(self, urls: list) -> Union[dict, None]:
def check_urls(self, urls: list) -> Union[dict, None]:
"""Check a list of URLs against the provided list of domain indicators.
:param urls: List of URLs to check against domain indicators
@@ -407,7 +477,7 @@ class Indicators:
return None
for url in urls:
check = self.check_domain(url)
check = self.check_url(url)
if check:
return check
@@ -597,9 +667,9 @@ class Indicators:
return None
def check_file_hash(self, file_hash: str) -> Union[dict, None]:
"""Check the provided SHA256 file hash against the list of indicators.
"""Check the provided file hash against the list of indicators.
:param file_hash: SHA256 hash to check
:param file_hash: hash to check
:type file_hash: str
:returns: Indicator details if matched, otherwise None
@@ -607,7 +677,14 @@ class Indicators:
if not file_hash:
return None
for ioc in self.get_iocs("files_sha256"):
if len(file_hash) == 32:
hash_type = "md5"
elif len(file_hash) == 40:
hash_type = "sha1"
else:
hash_type = "sha256"
for ioc in self.get_iocs("files_" + hash_type):
if file_hash.lower() == ioc["value"].lower():
self.log.warning(
'Found a known suspicious file with hash "%s" '
@@ -665,3 +742,15 @@ class Indicators:
return ioc
return None
def check_domain(self, url: str) -> Union[dict, None]:
"""
Renamed check_url now, kept for compatibility
"""
return self.check_url(url)
def check_domains(self, urls: list) -> Union[dict, None]:
"""
Renamed check_domains, kept for compatibility
"""
return self.check_urls(urls)

View File

@@ -107,7 +107,7 @@ class Manifest(IOSExtraction):
except Exception:
continue
ioc = self.indicators.check_domain(part)
ioc = self.indicators.check_url(part)
if ioc:
self.log.warning(
'Found mention of domain "%s" in a backup file with '

View File

@@ -70,14 +70,8 @@ class Analytics(IOSExtraction):
self.detected.append(new_result)
continue
ioc = self.indicators.check_domain(value)
ioc = self.indicators.check_url(value)
if ioc:
self.log.warning(
'Found mention of a malicious domain "%s" in %s file at %s',
value,
result["artifact"],
result["isodate"],
)
new_result = copy.copy(result)
new_result["matched_indicator"] = ioc
self.detected.append(new_result)

View File

@@ -51,7 +51,7 @@ class CacheFiles(IOSExtraction):
self.detected = {}
for key, values in self.results.items():
for value in values:
ioc = self.indicators.check_domain(value["url"])
ioc = self.indicators.check_url(value["url"])
if ioc:
value["matched_indicator"] = ioc
if key not in self.detected:

View File

@@ -51,9 +51,9 @@ class SafariFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
ioc = self.indicators.check_url(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc

View File

@@ -18,7 +18,7 @@ class WebkitBase(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -51,13 +51,13 @@ class ChromeFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if not ioc:
ioc = self.indicators.check_domain(result["icon_url"])
ioc = self.indicators.check_url(result["icon_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
continue
def run(self) -> None:
self._find_ios_database(

View File

@@ -55,7 +55,7 @@ class ChromeHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -53,9 +53,9 @@ class FirefoxFavicon(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result.get("url", ""))
ioc = self.indicators.check_url(result.get("url", ""))
if not ioc:
ioc = self.indicators.check_domain(result.get("history_url", ""))
ioc = self.indicators.check_url(result.get("history_url", ""))
if ioc:
result["matched_indicator"] = ioc

View File

@@ -56,7 +56,7 @@ class FirefoxHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -58,7 +58,7 @@ class SafariBrowserState(IOSExtraction):
for result in self.results:
if "tab_url" in result:
ioc = self.indicators.check_domain(result["tab_url"])
ioc = self.indicators.check_url(result["tab_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)
@@ -69,7 +69,7 @@ class SafariBrowserState(IOSExtraction):
for session_entry in result["session_data"]:
if "entry_url" in session_entry:
ioc = self.indicators.check_domain(session_entry["entry_url"])
ioc = self.indicators.check_url(session_entry["entry_url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -107,7 +107,7 @@ class SafariHistory(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domain(result["url"])
ioc = self.indicators.check_url(result["url"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -72,7 +72,7 @@ class Shortcuts(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domains(result["action_urls"])
ioc = self.indicators.check_urls(result["action_urls"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -84,7 +84,7 @@ class SMS(IOSExtraction):
# Making sure not link was ignored
if message_links == []:
message_links = check_for_links(result.get("text", ""))
ioc = self.indicators.check_domains(message_links)
ioc = self.indicators.check_urls(message_links)
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -62,7 +62,7 @@ class WebkitResourceLoadStatistics(IOSExtraction):
self.detected = []
for result in self.results:
ioc = self.indicators.check_domain(result["registrable_domain"])
ioc = self.indicators.check_url(result["registrable_domain"])
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -86,7 +86,7 @@ class WebkitSessionResourceLog(IOSExtraction):
[entry["origin"]] + source_domains + destination_domains
)
ioc = self.indicators.check_domains(all_origins)
ioc = self.indicators.check_urls(all_origins)
if ioc:
entry["matched_indicator"] = ioc
self.detected.append(entry)

View File

@@ -57,7 +57,7 @@ class Whatsapp(IOSExtraction):
return
for result in self.results:
ioc = self.indicators.check_domains(result.get("links", []))
ioc = self.indicators.check_urls(result.get("links", []))
if ioc:
result["matched_indicator"] = ioc
self.detected.append(result)

View File

@@ -17,6 +17,9 @@ def generate_test_stix_file(file_path):
emails = ["foobar@example.org"]
filenames = ["/var/foobar/txt"]
android_property = ["sys.foobar"]
sha256 = ["570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6"]
sha1 = ["da0611a300a9ce9aa7a09d1212f203fca5856794"]
urls = ["http://example.com/thisisbad"]
res = []
malware = Malware(name="TestMalware", is_family=False, description="")
@@ -66,6 +69,33 @@ def generate_test_stix_file(file_path):
res.append(i)
res.append(Relationship(i, "indicates", malware))
for h in sha256:
i = Indicator(
indicator_types=["malicious-activity"],
pattern="[file:hashes.sha256='{}']".format(h),
pattern_type="stix",
)
res.append(i)
res.append(Relationship(i, "indicates", malware))
for h in sha1:
i = Indicator(
indicator_types=["malicious-activity"],
pattern="[file:hashes.sha1='{}']".format(h),
pattern_type="stix",
)
res.append(i)
res.append(Relationship(i, "indicates", malware))
for u in urls:
i = Indicator(
indicator_types=["malicious-activity"],
pattern="[url:value='{}']".format(u),
pattern_type="stix",
)
res.append(i)
res.append(Relationship(i, "indicates", malware))
bundle = Bundle(objects=res)
with open(file_path, "w+", encoding="utf-8") as f:
f.write(bundle.serialize(pretty=True))

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -7,26 +7,86 @@ import logging
import os
from mvt.common.indicators import Indicators
from ..utils import get_artifact_folder
class TestIndicators:
def test_parse_stix2(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)
assert ind.ioc_collections[0]["count"] == 5
assert len(ind.ioc_collections) == 1
assert ind.ioc_collections[0]["count"] == 8
assert len(ind.ioc_collections[0]["domains"]) == 1
assert len(ind.ioc_collections[0]["emails"]) == 1
assert len(ind.ioc_collections[0]["file_names"]) == 1
assert len(ind.ioc_collections[0]["processes"]) == 1
assert len(ind.ioc_collections[0]["android_property_names"]) == 1
assert len(ind.ioc_collections[0]["files_sha256"]) == 1
assert len(ind.ioc_collections[0]["files_sha1"]) == 1
assert len(ind.ioc_collections[0]["urls"]) == 1
def test_check_domain(self, indicator_file):
def test_parse_stix2_amnesty(self):
"""
STIX2 file from
https://github.com/AmnestyTech/investigations/blob/master/2021-12-16_cytrox/cytrox.stix2
"""
ind = Indicators(log=logging)
file = os.path.join(get_artifact_folder(), "stix2", "cytrox.stix2")
ind.load_indicators_files([file], load_default=False)
assert len(ind.ioc_collections) == 1
assert ind.ioc_collections[0]["count"] == 343
assert len(ind.ioc_collections[0]["domains"]) == 336
assert len(ind.ioc_collections[0]["emails"]) == 0
assert len(ind.ioc_collections[0]["file_names"]) == 0
assert len(ind.ioc_collections[0]["file_paths"]) == 6
assert len(ind.ioc_collections[0]["ios_profile_ids"]) == 1
assert len(ind.ioc_collections[0]["processes"]) == 0
assert len(ind.ioc_collections[0]["android_property_names"]) == 0
assert len(ind.ioc_collections[0]["urls"]) == 0
def test_parse_stix2_otx(self):
"""
STIX2 file from OTX Pulse
https://otx.alienvault.com/pulse/638cd3ee5e5f019f84f9e0ea
"""
ind = Indicators(log=logging)
file = os.path.join(
get_artifact_folder(), "stix2", "638cd3ee5e5f019f84f9e0ea.json"
)
ind.load_indicators_files([file], load_default=False)
assert len(ind.ioc_collections) == 1
assert ind.ioc_collections[0]["count"] == 69
assert len(ind.ioc_collections[0]["domains"]) == 15
assert len(ind.ioc_collections[0]["emails"]) == 0
assert len(ind.ioc_collections[0]["file_names"]) == 0
assert len(ind.ioc_collections[0]["processes"]) == 0
assert len(ind.ioc_collections[0]["android_property_names"]) == 0
assert len(ind.ioc_collections[0]["urls"]) == 54
def test_check_url(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)
assert ind.check_domain(42) is None
assert ind.check_domain("https://www.example.org/foobar")
assert ind.check_domain("http://example.org:8080/toto")
assert ind.check_domain("https://github.com") is None
assert ind.check_url(42) is None
assert ind.check_url("http://example.com/thisisbad")
assert ind.check_url("http://example.com/thisisgood") is None
assert ind.check_url("https://www.example.org/foobar")
assert ind.check_url("http://example.org:8080/toto")
assert ind.check_url("https://github.com") is None
assert ind.check_url("https://example.com/") is None
def test_check_file_hash(self, indicator_file):
ind = Indicators(log=logging)
ind.load_indicators_files([indicator_file], load_default=False)
assert (
ind.check_file_hash(
"003764fd74bf13cff9bf1ddd870cbf593b23e2b584ba4465114023870ea6fbef"
)
is None
)
assert ind.check_file_hash(
"570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6"
)
assert ind.check_file_hash("da0611a300a9ce9aa7a09d1212f203fca5856794")
def test_check_android_property(self, indicator_file):
ind = Indicators(log=logging)
@@ -38,4 +98,4 @@ class TestIndicators:
os.environ["MVT_STIX2"] = indicator_file
ind = Indicators(log=logging)
ind.load_indicators_files([], load_default=False)
assert ind.total_ioc_count == 5
assert ind.total_ioc_count == 8