""" stix_exporter.py — Open Intelligence Lab v0.3.0 STIX 2.1 Export Engine Converts the internal graph representation into fully compliant STIX 2.1 bundles. Supports export targets: Splunk ES, Microsoft Sentinel, OpenCTI, IBM QRadar SIEM. Author: Alborz Nazari License: MIT """ import json import uuid from datetime import datetime, timezone from typing import Optional # ───────────────────────────────────────────── # Helpers # ───────────────────────────────────────────── def _now() -> str: return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") def _stix_id(object_type: str) -> str: return f"{object_type}--{uuid.uuid4()}" def _confidence_to_stix(confidence: float) -> int: """Map [0.0, 1.0] float to STIX 2.1 integer confidence [0, 100].""" return min(100, max(0, int(round(confidence * 100)))) # ───────────────────────────────────────────── # Entity → STIX Object Converters # ───────────────────────────────────────────── def threat_actor_to_stix(entity: dict) -> dict: """Convert a threat_entity of type 'threat_actor' to a STIX 2.1 threat-actor object.""" return { "type": "threat-actor", "spec_version": "2.1", "id": _stix_id("threat-actor"), "created": _now(), "modified": _now(), "name": entity.get("name", "Unknown"), "description": entity.get("description", ""), "threat_actor_types": [entity.get("actor_type", "unknown")], "aliases": entity.get("aliases", []), "sophistication": entity.get("sophistication", "advanced"), "resource_level": entity.get("resource_level", "government"), "primary_motivation": entity.get("motivation", "unknown"), "confidence": _confidence_to_stix(entity.get("confidence", 0.5)), "labels": ["threat-actor", entity.get("origin", "unknown").lower()], "x_oi_risk_score": entity.get("risk_score", 0.0), "x_oi_entity_id": entity.get("id", ""), "x_mitre_techniques": entity.get("mitre_techniques", []), } def malware_to_stix(entity: dict) -> dict: """Convert a threat_entity of type 'malware' to a STIX 2.1 malware object.""" return { "type": "malware", "spec_version": "2.1", "id": _stix_id("malware"), "created": _now(), "modified": _now(), "name": entity.get("name", "Unknown"), "description": entity.get("description", ""), "malware_types": [entity.get("malware_type", "trojan")], "is_family": False, "capabilities": entity.get("capabilities", []), "confidence": _confidence_to_stix(entity.get("confidence", 0.5)), "labels": ["malware"], "x_oi_risk_score": entity.get("risk_score", 0.0), "x_oi_entity_id": entity.get("id", ""), } def infrastructure_to_stix(entity: dict) -> dict: """Convert a threat_entity of type 'infrastructure' to a STIX 2.1 infrastructure object.""" return { "type": "infrastructure", "spec_version": "2.1", "id": _stix_id("infrastructure"), "created": _now(), "modified": _now(), "name": entity.get("name", "Unknown"), "description": entity.get("description", ""), "infrastructure_types": [entity.get("infra_type", "command-and-control")], "confidence": _confidence_to_stix(entity.get("confidence", 0.5)), "labels": ["infrastructure"], "x_oi_risk_score": entity.get("risk_score", 0.0), "x_oi_entity_id": entity.get("id", ""), } def vulnerability_to_stix(entity: dict) -> dict: """Convert a CVE entity to a STIX 2.1 vulnerability object.""" return { "type": "vulnerability", "spec_version": "2.1", "id": _stix_id("vulnerability"), "created": _now(), "modified": _now(), "name": entity.get("cve_id", entity.get("name", "Unknown")), "description": entity.get("description", ""), "external_references": [ { "source_name": "cve", "external_id": entity.get("cve_id", ""), "url": f"https://nvd.nist.gov/vuln/detail/{entity.get('cve_id', '')}", } ], "confidence": _confidence_to_stix(entity.get("confidence", 0.5)), "labels": ["vulnerability"], "x_oi_risk_score": entity.get("risk_score", 0.0), "x_oi_entity_id": entity.get("id", ""), "x_oi_cvss_score": entity.get("cvss_score", None), } def attack_pattern_to_stix(pattern: dict) -> dict: """Convert an attack_pattern entry to a STIX 2.1 attack-pattern object.""" kill_chain_phases = [] if pattern.get("kill_chain_phase"): kill_chain_phases = [ { "kill_chain_name": "mitre-attack", "phase_name": pattern["kill_chain_phase"].lower().replace(" ", "-"), } ] return { "type": "attack-pattern", "spec_version": "2.1", "id": _stix_id("attack-pattern"), "created": _now(), "modified": _now(), "name": pattern.get("name", "Unknown"), "description": pattern.get("description", ""), "kill_chain_phases": kill_chain_phases, "external_references": [ { "source_name": "mitre-attack", "external_id": pattern.get("mitre_technique_id", ""), "url": f"https://attack.mitre.org/techniques/{pattern.get('mitre_technique_id', '').replace('.', '/')}", } ], "confidence": _confidence_to_stix(pattern.get("confidence", 0.8)), "labels": ["attack-pattern"], "x_oi_detection": pattern.get("detection", ""), "x_oi_mitigation": pattern.get("mitigation", ""), "x_oi_pattern_id": pattern.get("id", ""), } def relation_to_stix_relationship( relation: dict, source_stix_id: str, target_stix_id: str, ) -> dict: """Convert a relation edge to a STIX 2.1 relationship object.""" return { "type": "relationship", "spec_version": "2.1", "id": _stix_id("relationship"), "created": _now(), "modified": _now(), "relationship_type": relation.get("relation_type", "related-to").lower().replace("_", "-"), "source_ref": source_stix_id, "target_ref": target_stix_id, "confidence": _confidence_to_stix(relation.get("confidence", 0.5)), "description": relation.get("description", ""), "labels": [relation.get("relation_type", "related-to")], } def campaign_to_stix(campaign: dict) -> dict: """Convert a campaign (Diamond Model) to a STIX 2.1 campaign object.""" return { "type": "campaign", "spec_version": "2.1", "id": _stix_id("campaign"), "created": _now(), "modified": _now(), "name": campaign.get("name", "Unknown Campaign"), "description": campaign.get("description", ""), "objective": campaign.get("motivation", ""), "first_seen": campaign.get("first_seen", _now()), "last_seen": campaign.get("last_seen", _now()), "confidence": _confidence_to_stix(campaign.get("confidence", 0.8)), "labels": ["campaign"], "x_oi_diamond_adversary": campaign.get("adversary", ""), "x_oi_diamond_capability": campaign.get("capability", ""), "x_oi_diamond_infrastructure": campaign.get("infrastructure", ""), "x_oi_diamond_victim": campaign.get("victim", ""), "x_oi_campaign_id": campaign.get("id", ""), } # ───────────────────────────────────────────── # Main Bundle Builder # ───────────────────────────────────────────── def build_stix_bundle( entities: list[dict], attack_patterns: list[dict], relations: list[dict], campaigns: list[dict], ) -> dict: """ Assemble a complete STIX 2.1 Bundle from Open Intelligence Lab datasets. Returns a dict ready for json.dumps() — compatible with: - Splunk ES (STIX-Taxii connector) - Microsoft Sentinel (Threat Intelligence blade) - OpenCTI (STIX 2.1 import) - IBM QRadar (STIX connector) """ stix_objects = [] # Track internal ID → STIX ID for relationship resolution id_map: dict[str, str] = {} # 1. Entities type_converters = { "threat_actor": threat_actor_to_stix, "malware": malware_to_stix, "infrastructure": infrastructure_to_stix, "vulnerability": vulnerability_to_stix, "sector": None, # STIX 2.1 uses identity for sectors } for entity in entities: etype = entity.get("type", "") converter = type_converters.get(etype) if converter: stix_obj = converter(entity) stix_objects.append(stix_obj) id_map[entity["id"]] = stix_obj["id"] elif etype == "sector": # Represent sectors as STIX identity objects identity = { "type": "identity", "spec_version": "2.1", "id": _stix_id("identity"), "created": _now(), "modified": _now(), "name": entity.get("name", "Unknown Sector"), "identity_class": "class", "sectors": [entity.get("sector_name", entity.get("name", "").lower())], "description": entity.get("description", ""), "labels": ["sector"], "x_oi_entity_id": entity.get("id", ""), } stix_objects.append(identity) id_map[entity["id"]] = identity["id"] # 2. Attack patterns ap_id_map: dict[str, str] = {} for ap in attack_patterns: stix_obj = attack_pattern_to_stix(ap) stix_objects.append(stix_obj) ap_id_map[ap["id"]] = stix_obj["id"] # 3. Relationships for rel in relations: src_id = id_map.get(rel.get("source_id", "")) or ap_id_map.get(rel.get("source_id", "")) tgt_id = id_map.get(rel.get("target_id", "")) or ap_id_map.get(rel.get("target_id", "")) if src_id and tgt_id: rel_obj = relation_to_stix_relationship(rel, src_id, tgt_id) stix_objects.append(rel_obj) # 4. Campaigns for campaign in campaigns: stix_obj = campaign_to_stix(campaign) stix_objects.append(stix_obj) # 5. Bundle wrapper bundle = { "type": "bundle", "id": _stix_id("bundle"), "spec_version": "2.1", "objects": stix_objects, } return bundle # ───────────────────────────────────────────── # Format-Specific Export Helpers # ───────────────────────────────────────────── def export_for_splunk(bundle: dict) -> list[dict]: """ Flatten STIX bundle into Splunk-compatible JSON events. Each STIX object becomes a Splunk sourcetype=stix event. Compatible with: Splunk ES STIX-TAXII connector (>= ES 7.x). """ events = [] for obj in bundle.get("objects", []): event = { "sourcetype": "stix", "source": "open-intelligence-lab", "host": "oi-lab-v0.3.0", "index": "threat_intelligence", "event": obj, } events.append(event) return events def export_for_sentinel(bundle: dict) -> list[dict]: """ Format STIX bundle for Microsoft Sentinel Threat Intelligence blade. Sentinel ingests STIX 2.1 indicator objects via the TI API. Filters to indicator-type objects; wraps others as custom observations. Compatible with: Sentinel Threat Intelligence (TAXII) connector. """ sentinel_objects = [] indicator_types = {"threat-actor", "malware", "attack-pattern", "vulnerability", "campaign"} for obj in bundle.get("objects", []): if obj.get("type") in indicator_types: # Sentinel expects a flat indicator wrapper sentinel_objects.append({ "type": obj["type"], "id": obj["id"], "name": obj.get("name", ""), "description": obj.get("description", ""), "confidence": obj.get("confidence", 50), "labels": obj.get("labels", []), "created": obj.get("created", _now()), "modified": obj.get("modified", _now()), "spec_version": "2.1", "externalReferences": obj.get("external_references", []), "extensions": { "x-open-intelligence-lab": { "risk_score": obj.get("x_oi_risk_score", 0.0), "entity_id": obj.get("x_oi_entity_id", ""), "mitre_techniques": obj.get("x_mitre_techniques", []), } }, }) return sentinel_objects def export_for_opencti(bundle: dict) -> dict: """ Return the raw STIX 2.1 bundle — OpenCTI natively ingests STIX 2.1. Custom x_ extension fields are preserved as-is (OpenCTI passes them through). Compatible with: OpenCTI >= 5.x STIX 2.1 import connector. """ return bundle def export_for_qradar(bundle: dict) -> list[dict]: """ Format STIX bundle for IBM QRadar SIEM. QRadar STIX connector expects a flat list of STIX objects with mandatory 'type', 'id', 'created', 'modified' fields. Compatible with: IBM QRadar STIX Threat Intelligence App >= 3.x. """ qradar_objects = [] for obj in bundle.get("objects", []): flat = { "stix_type": obj.get("type", ""), "stix_id": obj.get("id", ""), "name": obj.get("name", obj.get("id", "")), "description": obj.get("description", ""), "confidence": obj.get("confidence", 50), "created": obj.get("created", _now()), "modified": obj.get("modified", _now()), "labels": ",".join(obj.get("labels", [])), "oi_risk_score": obj.get("x_oi_risk_score", 0.0), "oi_entity_id": obj.get("x_oi_entity_id", ""), "source": "open-intelligence-lab-v0.3.0", } # Flatten external references ext_refs = obj.get("external_references", []) if ext_refs: flat["external_id"] = ext_refs[0].get("external_id", "") flat["external_source"] = ext_refs[0].get("source_name", "") qradar_objects.append(flat) return qradar_objects # ───────────────────────────────────────────── # CLI / Demo Entry Point # ───────────────────────────────────────────── def load_datasets(base_path: str = "datasets") -> tuple: """Load all OI Lab datasets from disk.""" import os def _load(filename): path = os.path.join(base_path, filename) if os.path.exists(path): with open(path) as f: return json.load(f) return [] entities = _load("threat_entities.json") attack_patterns = _load("attack_patterns.json") relations = _load("relations.json") campaigns = _load("campaigns.json") return entities, attack_patterns, relations, campaigns def run_export(output_dir: str = "exports", base_path: str = "datasets"): """Run full STIX 2.1 export pipeline and write all platform-specific outputs.""" import os os.makedirs(output_dir, exist_ok=True) entities, attack_patterns, relations, campaigns = load_datasets(base_path) bundle = build_stix_bundle(entities, attack_patterns, relations, campaigns) # Raw STIX 2.1 bundle with open(f"{output_dir}/stix_bundle.json", "w") as f: json.dump(bundle, f, indent=2) print(f"[✓] STIX 2.1 bundle written → {output_dir}/stix_bundle.json") # Splunk with open(f"{output_dir}/splunk_events.json", "w") as f: json.dump(export_for_splunk(bundle), f, indent=2) print(f"[✓] Splunk events written → {output_dir}/splunk_events.json") # Sentinel with open(f"{output_dir}/sentinel_indicators.json", "w") as f: json.dump(export_for_sentinel(bundle), f, indent=2) print(f"[✓] Sentinel indicators → {output_dir}/sentinel_indicators.json") # OpenCTI (same as raw bundle) with open(f"{output_dir}/opencti_bundle.json", "w") as f: json.dump(export_for_opencti(bundle), f, indent=2) print(f"[✓] OpenCTI bundle written → {output_dir}/opencti_bundle.json") # QRadar with open(f"{output_dir}/qradar_objects.json", "w") as f: json.dump(export_for_qradar(bundle), f, indent=2) print(f"[✓] QRadar objects written → {output_dir}/qradar_objects.json") summary = { "version": "v0.3.0", "exported_at": _now(), "bundle_id": bundle["id"], "total_stix_objects": len(bundle["objects"]), "export_targets": ["splunk", "sentinel", "opencti", "qradar"], } with open(f"{output_dir}/export_summary.json", "w") as f: json.dump(summary, f, indent=2) print(f"[✓] Export summary → {output_dir}/export_summary.json") return bundle if __name__ == "__main__": run_export()