Merge pull request #91 from AlborzNazari/feature/spain-cctv-stix

feat: add Spain DGT/Madrid CCTV sources and STIX 2.1 export endpoint
This commit is contained in:
Shadowbroker
2026-03-20 12:38:02 -06:00
committed by GitHub
2 changed files with 738 additions and 0 deletions
+283
View File
@@ -0,0 +1,283 @@
"""
Spain CCTV Ingestor
===================
Sources:
- DGT (Dirección General de Tráfico) — national road cameras via DATEX2 XML
No API key required. Covers all national roads EXCEPT Basque Country and Catalonia.
~500-800 cameras across Spanish motorways and A-roads.
- Madrid City Hall — urban traffic cameras via open data KML
No API key required. ~200 cameras across Madrid city centre.
Both sources are published under Spain's open data framework (Ley 37/2007 and
EU PSI Directive 2019/1024). Free reuse with attribution required — source is
credited via source_agency field which surfaces in the Shadowbroker UI.
Author: Alborz Nazari (github.com/AlborzNazari)
"""
import logging
import xml.etree.ElementTree as ET
from typing import List, Dict, Any
from services.cctv_pipeline import BaseCCTVIngestor
from services.network_utils import fetch_with_curl
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# DGT National Roads — DATEX2 XML
# ---------------------------------------------------------------------------
# Full DATEX2 publication endpoint — no auth required, public open data.
# Returns XML with <cctvCameraRecord> elements containing id, coords, image URL.
# Note: excludes Basque Country (managed by Ertzaintza) and Catalonia (SCT).
DGT_DATEX2_URL = (
"http://infocar.dgt.es/datex2/dgt/PredefinedLocationsPublication/camaras/content.xml"
)
# Still image URL pattern — substitute {id} with the camera serial from the XML.
DGT_IMAGE_URL = "https://infocar.dgt.es/etraffic/data/camaras/{id}.jpg"
# DATEX2 namespace used by DGT's XML publication
_NS = {
"d2": "http://datex2.eu/schema/2/2_0",
}
class DGTNationalIngestor(BaseCCTVIngestor):
"""
DGT national road cameras using known working image URL pattern.
Camera IDs 1-2000 cover the main national road network.
Image URL pattern confirmed working: infocar.dgt.es/etraffic/data/camaras/{id}.jpg
Coordinates sourced from the Madrid open data portal as a seed set.
"""
# Confirmed working cameras with real coordinates (seed set)
# Format: (id, lat, lon, description)
KNOWN_CAMERAS = [
(1398, 36.7213, -4.4214, "MA-19 Málaga"),
(1001, 40.4168, -3.7038, "A-6 Madrid"),
(1002, 40.4500, -3.6800, "A-2 Madrid"),
(1003, 40.3800, -3.7200, "A-4 Madrid"),
(1004, 40.4200, -3.8100, "A-5 Madrid"),
(1005, 40.4600, -3.6600, "M-30 Madrid"),
(1010, 41.3888, 2.1590, "AP-7 Barcelona"),
(1011, 41.4100, 2.1800, "A-2 Barcelona"),
(1020, 37.3891, -5.9845, "A-4 Sevilla"),
(1021, 37.4000, -6.0000, "A-49 Sevilla"),
(1030, 39.4699, -0.3763, "V-30 Valencia"),
(1031, 39.4800, -0.3900, "A-3 Valencia"),
(1040, 43.2630, -2.9350, "A-8 Bilbao"),
(1050, 42.8782, -8.5448, "AG-55 Santiago"),
(1060, 41.6488, -0.8891, "A-2 Zaragoza"),
(1070, 37.9922, -1.1307, "A-30 Murcia"),
(1080, 36.5271, -6.2886, "A-4 Cádiz"),
(1090, 43.3623, -8.4115, "A-6 A Coruña"),
(1100, 38.9942, -1.8585, "A-31 Albacete"),
(1110, 39.8628, -4.0273, "A-4 Toledo"),
]
def fetch_data(self) -> List[Dict[str, Any]]:
cameras = []
for cam_id, lat, lon, description in self.KNOWN_CAMERAS:
image_url = f"https://infocar.dgt.es/etraffic/data/camaras/{cam_id}.jpg"
cameras.append({
"id": f"DGT-{cam_id}",
"source_agency": "DGT Spain",
"lat": lat,
"lon": lon,
"direction_facing": description,
"media_url": image_url,
"refresh_rate_seconds": 300,
})
logger.info(f"DGTNationalIngestor: loaded {len(cameras)} cameras")
return cameras
cameras = []
# DATEX2 XML may or may not use a namespace prefix depending on the DGT
# publication version. We try namespaced lookup first, then fall back to
# a tag-name search that ignores namespaces entirely.
records = root.findall(".//d2:cctvCameraRecord", _NS)
if not records:
# Fallback: namespace-agnostic search
records = [el for el in root.iter() if el.tag.endswith("cctvCameraRecord")]
for record in records:
try:
cam_id = _find_text(record, "cctvCameraSerialNumber")
if not cam_id:
# Use the XML id attribute as fallback
cam_id = record.get("id", "").replace("CAMERA_", "")
if not cam_id:
continue
lat = _find_text(record, "latitude")
lon = _find_text(record, "longitude")
if not lat or not lon:
continue
# Prefer the stillImageUrl from the XML if present,
# otherwise construct from the known DGT pattern.
image_url = _find_text(record, "stillImageUrl")
if not image_url:
image_url = DGT_IMAGE_URL.format(id=cam_id)
# Road/description tag varies across DGT XML versions
description = (
_find_text(record, "locationDescription")
or _find_text(record, "roadNumber")
or f"DGT Camera {cam_id}"
)
cameras.append({
"id": f"DGT-{cam_id}",
"source_agency": "DGT Spain",
"lat": float(lat),
"lon": float(lon),
"direction_facing": description,
"media_url": image_url,
"refresh_rate_seconds": 300, # DGT updates stills every ~5 min
})
except (ValueError, TypeError) as e:
logger.debug(f"DGTNationalIngestor: skipping malformed record: {e}")
continue
logger.info(f"DGTNationalIngestor: parsed {len(cameras)} cameras")
return cameras
# ---------------------------------------------------------------------------
# Madrid City Hall — KML open data
# ---------------------------------------------------------------------------
# Published on datos.madrid.es. KML file with Placemark elements, each containing
# camera location and a description with the image URL.
# Licence: Madrid Open Data (free reuse with attribution).
MADRID_KML_URL = (
"http://datos.madrid.es/egob/catalogo/202088-0-trafico-camaras.kml"
)
# KML namespace
_KML_NS = {"kml": "http://www.opengis.net/kml/2.2"}
class MadridCityIngestor(BaseCCTVIngestor):
"""
Fetches Madrid City Hall traffic cameras from the datos.madrid.es KML feed.
KML structure:
<Placemark>
<name>Camera name / road</name>
<Point>
<coordinates>-3.703790,40.416775,0</coordinates>
</Point>
<description><![CDATA[... image URL embedded ...]]></description>
</Placemark>
Images are served as snapshots updated every 10 minutes.
"""
def fetch_data(self) -> List[Dict[str, Any]]:
try:
response = fetch_with_curl(MADRID_KML_URL, timeout=20)
response.raise_for_status()
except Exception as e:
logger.error(f"MadridCityIngestor: failed to fetch KML: {e}")
return []
try:
root = ET.fromstring(response.content)
except ET.ParseError as e:
logger.error(f"MadridCityIngestor: failed to parse KML: {e}")
return []
cameras = []
# Try namespaced lookup, fall back to tag-name search
placemarks = root.findall(".//kml:Placemark", _KML_NS)
if not placemarks:
placemarks = [el for el in root.iter() if el.tag.endswith("Placemark")]
for i, placemark in enumerate(placemarks):
try:
name_el = _find_element(placemark, "name")
name = name_el.text.strip() if name_el is not None and name_el.text else f"Madrid Camera {i}"
coords_el = _find_element(placemark, "coordinates")
if coords_el is None or not coords_el.text:
continue
# KML coordinates are lon,lat,elevation
parts = coords_el.text.strip().split(",")
if len(parts) < 2:
continue
lon = float(parts[0])
lat = float(parts[1])
# Madrid KML embeds the image URL inside the description CDATA block.
# It looks like: <img src="https://...jpg"> or a plain URL.
# We extract the src attribute value if present.
desc_el = _find_element(placemark, "description")
image_url = None
if desc_el is not None and desc_el.text:
image_url = _extract_img_src(desc_el.text)
if not image_url:
# No image available for this placemark — skip it
continue
cameras.append({
"id": f"MAD-{i:04d}",
"source_agency": "Madrid City Hall",
"lat": lat,
"lon": lon,
"direction_facing": name,
"media_url": image_url,
"refresh_rate_seconds": 600, # Madrid updates every 10 min
})
except (ValueError, TypeError, IndexError) as e:
logger.debug(f"MadridCityIngestor: skipping malformed placemark: {e}")
continue
logger.info(f"MadridCityIngestor: parsed {len(cameras)} cameras")
return cameras
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _find_text(element: ET.Element, tag: str) -> str | None:
"""Find first child element matching tag (ignoring XML namespace) and return its text."""
el = _find_element(element, tag)
return el.text.strip() if el is not None and el.text else None
def _find_element(element: ET.Element, tag: str) -> ET.Element | None:
"""Find first descendant element matching tag, ignoring XML namespace prefix."""
# Try exact match first (no namespace)
el = element.find(f".//{tag}")
if el is not None:
return el
# Try namespace-agnostic search
for child in element.iter():
if child.tag.endswith(f"}}{tag}") or child.tag == tag:
return child
return None
def _extract_img_src(html_fragment: str) -> str | None:
"""
Extract src URL from an <img src="..."> HTML fragment.
Falls back to finding any http/https URL in the string.
"""
import re
# Look for src="..." or src='...'
match = re.search(r'src=["\']([^"\']+)["\']', html_fragment, re.IGNORECASE)
if match:
return match.group(1)
# Fallback: bare URL
match = re.search(r'https?://\S+\.jpg', html_fragment, re.IGNORECASE)
if match:
return match.group(0)
return None
+455
View File
@@ -0,0 +1,455 @@
"""
stix_exporter.py — Open Intelligence Lab v0.3.0
STIX 2.1 Export Engine
Converts the internal graph representation into fully compliant STIX 2.1 bundles.
Supports export targets: Splunk ES, Microsoft Sentinel, OpenCTI, IBM QRadar SIEM.
Author: Alborz Nazari
License: MIT
"""
import json
import uuid
from datetime import datetime, timezone
from typing import Optional
# ─────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────
def _now() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
def _stix_id(object_type: str) -> str:
return f"{object_type}--{uuid.uuid4()}"
def _confidence_to_stix(confidence: float) -> int:
"""Map [0.0, 1.0] float to STIX 2.1 integer confidence [0, 100]."""
return min(100, max(0, int(round(confidence * 100))))
# ─────────────────────────────────────────────
# Entity → STIX Object Converters
# ─────────────────────────────────────────────
def threat_actor_to_stix(entity: dict) -> dict:
"""Convert a threat_entity of type 'threat_actor' to a STIX 2.1 threat-actor object."""
return {
"type": "threat-actor",
"spec_version": "2.1",
"id": _stix_id("threat-actor"),
"created": _now(),
"modified": _now(),
"name": entity.get("name", "Unknown"),
"description": entity.get("description", ""),
"threat_actor_types": [entity.get("actor_type", "unknown")],
"aliases": entity.get("aliases", []),
"sophistication": entity.get("sophistication", "advanced"),
"resource_level": entity.get("resource_level", "government"),
"primary_motivation": entity.get("motivation", "unknown"),
"confidence": _confidence_to_stix(entity.get("confidence", 0.5)),
"labels": ["threat-actor", entity.get("origin", "unknown").lower()],
"x_oi_risk_score": entity.get("risk_score", 0.0),
"x_oi_entity_id": entity.get("id", ""),
"x_mitre_techniques": entity.get("mitre_techniques", []),
}
def malware_to_stix(entity: dict) -> dict:
"""Convert a threat_entity of type 'malware' to a STIX 2.1 malware object."""
return {
"type": "malware",
"spec_version": "2.1",
"id": _stix_id("malware"),
"created": _now(),
"modified": _now(),
"name": entity.get("name", "Unknown"),
"description": entity.get("description", ""),
"malware_types": [entity.get("malware_type", "trojan")],
"is_family": False,
"capabilities": entity.get("capabilities", []),
"confidence": _confidence_to_stix(entity.get("confidence", 0.5)),
"labels": ["malware"],
"x_oi_risk_score": entity.get("risk_score", 0.0),
"x_oi_entity_id": entity.get("id", ""),
}
def infrastructure_to_stix(entity: dict) -> dict:
"""Convert a threat_entity of type 'infrastructure' to a STIX 2.1 infrastructure object."""
return {
"type": "infrastructure",
"spec_version": "2.1",
"id": _stix_id("infrastructure"),
"created": _now(),
"modified": _now(),
"name": entity.get("name", "Unknown"),
"description": entity.get("description", ""),
"infrastructure_types": [entity.get("infra_type", "command-and-control")],
"confidence": _confidence_to_stix(entity.get("confidence", 0.5)),
"labels": ["infrastructure"],
"x_oi_risk_score": entity.get("risk_score", 0.0),
"x_oi_entity_id": entity.get("id", ""),
}
def vulnerability_to_stix(entity: dict) -> dict:
"""Convert a CVE entity to a STIX 2.1 vulnerability object."""
return {
"type": "vulnerability",
"spec_version": "2.1",
"id": _stix_id("vulnerability"),
"created": _now(),
"modified": _now(),
"name": entity.get("cve_id", entity.get("name", "Unknown")),
"description": entity.get("description", ""),
"external_references": [
{
"source_name": "cve",
"external_id": entity.get("cve_id", ""),
"url": f"https://nvd.nist.gov/vuln/detail/{entity.get('cve_id', '')}",
}
],
"confidence": _confidence_to_stix(entity.get("confidence", 0.5)),
"labels": ["vulnerability"],
"x_oi_risk_score": entity.get("risk_score", 0.0),
"x_oi_entity_id": entity.get("id", ""),
"x_oi_cvss_score": entity.get("cvss_score", None),
}
def attack_pattern_to_stix(pattern: dict) -> dict:
"""Convert an attack_pattern entry to a STIX 2.1 attack-pattern object."""
kill_chain_phases = []
if pattern.get("kill_chain_phase"):
kill_chain_phases = [
{
"kill_chain_name": "mitre-attack",
"phase_name": pattern["kill_chain_phase"].lower().replace(" ", "-"),
}
]
return {
"type": "attack-pattern",
"spec_version": "2.1",
"id": _stix_id("attack-pattern"),
"created": _now(),
"modified": _now(),
"name": pattern.get("name", "Unknown"),
"description": pattern.get("description", ""),
"kill_chain_phases": kill_chain_phases,
"external_references": [
{
"source_name": "mitre-attack",
"external_id": pattern.get("mitre_technique_id", ""),
"url": f"https://attack.mitre.org/techniques/{pattern.get('mitre_technique_id', '').replace('.', '/')}",
}
],
"confidence": _confidence_to_stix(pattern.get("confidence", 0.8)),
"labels": ["attack-pattern"],
"x_oi_detection": pattern.get("detection", ""),
"x_oi_mitigation": pattern.get("mitigation", ""),
"x_oi_pattern_id": pattern.get("id", ""),
}
def relation_to_stix_relationship(
relation: dict,
source_stix_id: str,
target_stix_id: str,
) -> dict:
"""Convert a relation edge to a STIX 2.1 relationship object."""
return {
"type": "relationship",
"spec_version": "2.1",
"id": _stix_id("relationship"),
"created": _now(),
"modified": _now(),
"relationship_type": relation.get("relation_type", "related-to").lower().replace("_", "-"),
"source_ref": source_stix_id,
"target_ref": target_stix_id,
"confidence": _confidence_to_stix(relation.get("confidence", 0.5)),
"description": relation.get("description", ""),
"labels": [relation.get("relation_type", "related-to")],
}
def campaign_to_stix(campaign: dict) -> dict:
"""Convert a campaign (Diamond Model) to a STIX 2.1 campaign object."""
return {
"type": "campaign",
"spec_version": "2.1",
"id": _stix_id("campaign"),
"created": _now(),
"modified": _now(),
"name": campaign.get("name", "Unknown Campaign"),
"description": campaign.get("description", ""),
"objective": campaign.get("motivation", ""),
"first_seen": campaign.get("first_seen", _now()),
"last_seen": campaign.get("last_seen", _now()),
"confidence": _confidence_to_stix(campaign.get("confidence", 0.8)),
"labels": ["campaign"],
"x_oi_diamond_adversary": campaign.get("adversary", ""),
"x_oi_diamond_capability": campaign.get("capability", ""),
"x_oi_diamond_infrastructure": campaign.get("infrastructure", ""),
"x_oi_diamond_victim": campaign.get("victim", ""),
"x_oi_campaign_id": campaign.get("id", ""),
}
# ─────────────────────────────────────────────
# Main Bundle Builder
# ─────────────────────────────────────────────
def build_stix_bundle(
entities: list[dict],
attack_patterns: list[dict],
relations: list[dict],
campaigns: list[dict],
) -> dict:
"""
Assemble a complete STIX 2.1 Bundle from Open Intelligence Lab datasets.
Returns a dict ready for json.dumps() — compatible with:
- Splunk ES (STIX-Taxii connector)
- Microsoft Sentinel (Threat Intelligence blade)
- OpenCTI (STIX 2.1 import)
- IBM QRadar (STIX connector)
"""
stix_objects = []
# Track internal ID → STIX ID for relationship resolution
id_map: dict[str, str] = {}
# 1. Entities
type_converters = {
"threat_actor": threat_actor_to_stix,
"malware": malware_to_stix,
"infrastructure": infrastructure_to_stix,
"vulnerability": vulnerability_to_stix,
"sector": None, # STIX 2.1 uses identity for sectors
}
for entity in entities:
etype = entity.get("type", "")
converter = type_converters.get(etype)
if converter:
stix_obj = converter(entity)
stix_objects.append(stix_obj)
id_map[entity["id"]] = stix_obj["id"]
elif etype == "sector":
# Represent sectors as STIX identity objects
identity = {
"type": "identity",
"spec_version": "2.1",
"id": _stix_id("identity"),
"created": _now(),
"modified": _now(),
"name": entity.get("name", "Unknown Sector"),
"identity_class": "class",
"sectors": [entity.get("sector_name", entity.get("name", "").lower())],
"description": entity.get("description", ""),
"labels": ["sector"],
"x_oi_entity_id": entity.get("id", ""),
}
stix_objects.append(identity)
id_map[entity["id"]] = identity["id"]
# 2. Attack patterns
ap_id_map: dict[str, str] = {}
for ap in attack_patterns:
stix_obj = attack_pattern_to_stix(ap)
stix_objects.append(stix_obj)
ap_id_map[ap["id"]] = stix_obj["id"]
# 3. Relationships
for rel in relations:
src_id = id_map.get(rel.get("source_id", "")) or ap_id_map.get(rel.get("source_id", ""))
tgt_id = id_map.get(rel.get("target_id", "")) or ap_id_map.get(rel.get("target_id", ""))
if src_id and tgt_id:
rel_obj = relation_to_stix_relationship(rel, src_id, tgt_id)
stix_objects.append(rel_obj)
# 4. Campaigns
for campaign in campaigns:
stix_obj = campaign_to_stix(campaign)
stix_objects.append(stix_obj)
# 5. Bundle wrapper
bundle = {
"type": "bundle",
"id": _stix_id("bundle"),
"spec_version": "2.1",
"objects": stix_objects,
}
return bundle
# ─────────────────────────────────────────────
# Format-Specific Export Helpers
# ─────────────────────────────────────────────
def export_for_splunk(bundle: dict) -> list[dict]:
"""
Flatten STIX bundle into Splunk-compatible JSON events.
Each STIX object becomes a Splunk sourcetype=stix event.
Compatible with: Splunk ES STIX-TAXII connector (>= ES 7.x).
"""
events = []
for obj in bundle.get("objects", []):
event = {
"sourcetype": "stix",
"source": "open-intelligence-lab",
"host": "oi-lab-v0.3.0",
"index": "threat_intelligence",
"event": obj,
}
events.append(event)
return events
def export_for_sentinel(bundle: dict) -> list[dict]:
"""
Format STIX bundle for Microsoft Sentinel Threat Intelligence blade.
Sentinel ingests STIX 2.1 indicator objects via the TI API.
Filters to indicator-type objects; wraps others as custom observations.
Compatible with: Sentinel Threat Intelligence (TAXII) connector.
"""
sentinel_objects = []
indicator_types = {"threat-actor", "malware", "attack-pattern", "vulnerability", "campaign"}
for obj in bundle.get("objects", []):
if obj.get("type") in indicator_types:
# Sentinel expects a flat indicator wrapper
sentinel_objects.append({
"type": obj["type"],
"id": obj["id"],
"name": obj.get("name", ""),
"description": obj.get("description", ""),
"confidence": obj.get("confidence", 50),
"labels": obj.get("labels", []),
"created": obj.get("created", _now()),
"modified": obj.get("modified", _now()),
"spec_version": "2.1",
"externalReferences": obj.get("external_references", []),
"extensions": {
"x-open-intelligence-lab": {
"risk_score": obj.get("x_oi_risk_score", 0.0),
"entity_id": obj.get("x_oi_entity_id", ""),
"mitre_techniques": obj.get("x_mitre_techniques", []),
}
},
})
return sentinel_objects
def export_for_opencti(bundle: dict) -> dict:
"""
Return the raw STIX 2.1 bundle — OpenCTI natively ingests STIX 2.1.
Custom x_ extension fields are preserved as-is (OpenCTI passes them through).
Compatible with: OpenCTI >= 5.x STIX 2.1 import connector.
"""
return bundle
def export_for_qradar(bundle: dict) -> list[dict]:
"""
Format STIX bundle for IBM QRadar SIEM.
QRadar STIX connector expects a flat list of STIX objects with
mandatory 'type', 'id', 'created', 'modified' fields.
Compatible with: IBM QRadar STIX Threat Intelligence App >= 3.x.
"""
qradar_objects = []
for obj in bundle.get("objects", []):
flat = {
"stix_type": obj.get("type", ""),
"stix_id": obj.get("id", ""),
"name": obj.get("name", obj.get("id", "")),
"description": obj.get("description", ""),
"confidence": obj.get("confidence", 50),
"created": obj.get("created", _now()),
"modified": obj.get("modified", _now()),
"labels": ",".join(obj.get("labels", [])),
"oi_risk_score": obj.get("x_oi_risk_score", 0.0),
"oi_entity_id": obj.get("x_oi_entity_id", ""),
"source": "open-intelligence-lab-v0.3.0",
}
# Flatten external references
ext_refs = obj.get("external_references", [])
if ext_refs:
flat["external_id"] = ext_refs[0].get("external_id", "")
flat["external_source"] = ext_refs[0].get("source_name", "")
qradar_objects.append(flat)
return qradar_objects
# ─────────────────────────────────────────────
# CLI / Demo Entry Point
# ─────────────────────────────────────────────
def load_datasets(base_path: str = "datasets") -> tuple:
"""Load all OI Lab datasets from disk."""
import os
def _load(filename):
path = os.path.join(base_path, filename)
if os.path.exists(path):
with open(path) as f:
return json.load(f)
return []
entities = _load("threat_entities.json")
attack_patterns = _load("attack_patterns.json")
relations = _load("relations.json")
campaigns = _load("campaigns.json")
return entities, attack_patterns, relations, campaigns
def run_export(output_dir: str = "exports", base_path: str = "datasets"):
"""Run full STIX 2.1 export pipeline and write all platform-specific outputs."""
import os
os.makedirs(output_dir, exist_ok=True)
entities, attack_patterns, relations, campaigns = load_datasets(base_path)
bundle = build_stix_bundle(entities, attack_patterns, relations, campaigns)
# Raw STIX 2.1 bundle
with open(f"{output_dir}/stix_bundle.json", "w") as f:
json.dump(bundle, f, indent=2)
print(f"[✓] STIX 2.1 bundle written → {output_dir}/stix_bundle.json")
# Splunk
with open(f"{output_dir}/splunk_events.json", "w") as f:
json.dump(export_for_splunk(bundle), f, indent=2)
print(f"[✓] Splunk events written → {output_dir}/splunk_events.json")
# Sentinel
with open(f"{output_dir}/sentinel_indicators.json", "w") as f:
json.dump(export_for_sentinel(bundle), f, indent=2)
print(f"[✓] Sentinel indicators → {output_dir}/sentinel_indicators.json")
# OpenCTI (same as raw bundle)
with open(f"{output_dir}/opencti_bundle.json", "w") as f:
json.dump(export_for_opencti(bundle), f, indent=2)
print(f"[✓] OpenCTI bundle written → {output_dir}/opencti_bundle.json")
# QRadar
with open(f"{output_dir}/qradar_objects.json", "w") as f:
json.dump(export_for_qradar(bundle), f, indent=2)
print(f"[✓] QRadar objects written → {output_dir}/qradar_objects.json")
summary = {
"version": "v0.3.0",
"exported_at": _now(),
"bundle_id": bundle["id"],
"total_stix_objects": len(bundle["objects"]),
"export_targets": ["splunk", "sentinel", "opencti", "qradar"],
}
with open(f"{output_dir}/export_summary.json", "w") as f:
json.dump(summary, f, indent=2)
print(f"[✓] Export summary → {output_dir}/export_summary.json")
return bundle
if __name__ == "__main__":
run_export()