diff --git a/src/mvt/common/indicators.py b/src/mvt/common/indicators.py index d30094f..aec98f9 100644 --- a/src/mvt/common/indicators.py +++ b/src/mvt/common/indicators.py @@ -100,6 +100,17 @@ class Indicators: key, value = indicator.get("pattern", "").strip("[]").split("=") key = key.strip() + # Normalize hash algorithm keys so that both the STIX2-spec-compliant + # form (e.g. file:hashes.'SHA-256', which requires quotes around + # algorithm names that contain hyphens) and the non-standard lowercase + # form (e.g. file:hashes.sha256) are accepted. Strip single quotes and + # hyphens from the algorithm name only, then lowercase it. + for sep in ("hashes.", "cert."): + if sep in key: + prefix, _, algo = key.partition(sep) + key = prefix + sep + algo.replace("'", "").replace("-", "").lower() + break + if key == "domain-name:value": # We force domain names to lower case. self._add_indicator( diff --git a/tests/artifacts/generate_stix.py b/tests/artifacts/generate_stix.py index 174f0dd..7fd1b74 100644 --- a/tests/artifacts/generate_stix.py +++ b/tests/artifacts/generate_stix.py @@ -82,7 +82,7 @@ def generate_test_stix_file(file_path): for h in sha256: i = Indicator( indicator_types=["malicious-activity"], - pattern="[file:hashes.sha256='{}']".format(h), + pattern="[file:hashes.'SHA-256'='{}']".format(h), pattern_type="stix", ) res.append(i) @@ -91,7 +91,7 @@ def generate_test_stix_file(file_path): for h in sha1: i = Indicator( indicator_types=["malicious-activity"], - pattern="[file:hashes.sha1='{}']".format(h), + pattern="[file:hashes.'SHA-1'='{}']".format(h), pattern_type="stix", ) res.append(i) diff --git a/tests/common/test_indicators.py b/tests/common/test_indicators.py index efc24f7..00c7276 100644 --- a/tests/common/test_indicators.py +++ b/tests/common/test_indicators.py @@ -94,6 +94,78 @@ class TestIndicators: ) assert ind.check_file_hash("da0611a300a9ce9aa7a09d1212f203fca5856794") + def test_parse_stix2_hash_key_variants(self, tmp_path): + """STIX2 spec requires single-quoted algorithm names that contain hyphens, + e.g. file:hashes.'SHA-256'. Verify MVT accepts both spec-compliant and + non-standard lowercase spellings for MD5, SHA-1 and SHA-256.""" + import json + + sha256_hash = "570cd76bf49cf52e0cb347a68bdcf0590b2eaece134e1b1eba7e8d66261bdbe6" + sha1_hash = "da0611a300a9ce9aa7a09d1212f203fca5856794" + md5_hash = "d41d8cd98f00b204e9800998ecf8427e" + + variants = [ + # (pattern_key, expected_bucket) + ("file:hashes.'SHA-256'", "files_sha256"), + ("file:hashes.SHA-256", "files_sha256"), + ("file:hashes.SHA256", "files_sha256"), + ("file:hashes.sha256", "files_sha256"), + ("file:hashes.'SHA-1'", "files_sha1"), + ("file:hashes.SHA-1", "files_sha1"), + ("file:hashes.SHA1", "files_sha1"), + ("file:hashes.sha1", "files_sha1"), + ("file:hashes.MD5", "files_md5"), + ("file:hashes.'MD5'", "files_md5"), + ("file:hashes.md5", "files_md5"), + ] + + hash_for = { + "files_sha256": sha256_hash, + "files_sha1": sha1_hash, + "files_md5": md5_hash, + } + + for pattern_key, bucket in variants: + h = hash_for[bucket] + stix = { + "type": "bundle", + "id": "bundle--test", + "objects": [ + { + "type": "malware", + "id": "malware--test", + "name": "TestMalware", + "is_family": False, + }, + { + "type": "indicator", + "id": "indicator--test", + "indicator_types": ["malicious-activity"], + "pattern": f"[{pattern_key}='{h}']", + "pattern_type": "stix", + "valid_from": "2024-01-01T00:00:00Z", + }, + { + "type": "relationship", + "id": "relationship--test", + "relationship_type": "indicates", + "source_ref": "indicator--test", + "target_ref": "malware--test", + }, + ], + } + stix_file = tmp_path / "test.stix2" + stix_file.write_text(json.dumps(stix)) + + ind = Indicators(log=logging) + ind.load_indicators_files([str(stix_file)], load_default=False) + assert len(ind.ioc_collections[0][bucket]) == 1, ( + f"Pattern key '{pattern_key}' was not parsed into '{bucket}'" + ) + assert ind.check_file_hash(h) is not None, ( + f"check_file_hash failed for pattern key '{pattern_key}'" + ) + def test_check_android_property(self, indicator_file): ind = Indicators(log=logging) ind.load_indicators_files([indicator_file], load_default=False)