diff --git a/.env.example b/.env.example index cb36053..ad72720 100644 --- a/.env.example +++ b/.env.example @@ -62,6 +62,9 @@ ADMIN_KEY= # Free MAP_KEY from https://firms.modaps.eosdis.nasa.gov/ # FIRMS_MAP_KEY= +# Airframes.io ACARS/VDL datalink (plane dossier messages). Dashboard → API Key. +# AIRFRAMES_API_KEY= + # Ukraine air raid alerts — free token from https://alerts.in.ua/ # ALERTS_IN_UA_TOKEN= diff --git a/DATA-ATTRIBUTION.md b/DATA-ATTRIBUTION.md index c54317f..9e8ea90 100644 --- a/DATA-ATTRIBUTION.md +++ b/DATA-ATTRIBUTION.md @@ -34,6 +34,7 @@ These sources have their own terms; consult each link before redistributing. | Source | URL | License / Terms | Notes | |---|---|---|---| | OpenSky Network | https://opensky-network.org | OpenSky API terms | Commercial and private aircraft tracking | +| Airframes.io | https://airframes.io | Airframes API terms | Optional ACARS/VDL datalink messages in aircraft dossiers | | CelesTrak | https://celestrak.org | Public domain / no restrictions | Satellite TLE data | | USGS Earthquake Hazards | https://earthquake.usgs.gov | Public domain (US Federal) | Seismic events | | NASA FIRMS | https://firms.modaps.eosdis.nasa.gov | NASA Open Data | Fire/thermal anomalies (VIIRS) | diff --git a/backend/.env.example b/backend/.env.example index a339ad3..a82958c 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -120,6 +120,9 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key # Free MAP_KEY from https://firms.modaps.eosdis.nasa.gov/map/#d:24hrs;@0.0,0.0,3.0z # FIRMS_MAP_KEY= +# Airframes.io ACARS/VDL datalink (plane dossier messages). Dashboard → API Key. +# AIRFRAMES_API_KEY= + # Ukraine frontline mirror (GitHub). Default follows cyterat/deepstate-map-data@main. # Pin an immutable commit SHA so ingest cannot silently change if main is force-pushed (#362). # Example (verify on GitHub before use): main @ b479954e94696bc5622c7818fd20a64a699f4fe8 diff --git a/backend/routers/data.py b/backend/routers/data.py index 831cc25..4f2d581 100644 --- a/backend/routers/data.py +++ b/backend/routers/data.py @@ -396,6 +396,52 @@ async def get_selected_ship_trail(mmsi: int, request: Request): # noqa: ARG001 return {"id": mmsi, "trail": get_vessel_trail(mmsi)} +@router.get("/api/aviation/datalink/status") +@limiter.limit("60/minute") +async def aviation_datalink_status(request: Request): # noqa: ARG001 + from services.fetchers.airframes import get_datalink_status + + return get_datalink_status() + + +@router.get("/api/aviation/datalink/messages") +@limiter.limit("240/minute") +async def aviation_datalink_messages( + request: Request, # noqa: ARG001 + icao24: str = Query("", description="ICAO24 hex for the aircraft"), + registration: str = Query("", description="Tail / registration number"), + callsign: str = Query("", description="Optional callsign filter"), + live: bool = Query( + False, + description="When true, fetch from Airframes if cache has no messages (slower)", + ), +): + from services.fetchers.airframes import lookup_datalink_messages + + return lookup_datalink_messages( + icao24=icao24, + registration=registration, + callsign=callsign, + allow_live=live, + ) + + +@router.get("/api/sigint/meshtastic/status") +@limiter.limit("120/minute") +async def meshtastic_map_status(request: Request): # noqa: ARG001 + from services.fetchers.meshtastic_map import get_meshtastic_map_status + + return get_meshtastic_map_status() + + +@router.post("/api/sigint/meshtastic/scan", dependencies=[Depends(require_local_operator)]) +@limiter.limit("3/hour") +async def meshtastic_planet_scan(request: Request): # noqa: ARG001 + from services.fetchers.meshtastic_map import start_meshtastic_planet_scan + + return start_meshtastic_planet_scan() + + @router.post("/api/viewport") @limiter.limit("60/minute") async def update_viewport(vp: ViewportUpdate, request: Request): # noqa: ARG001 diff --git a/backend/services/api_settings.py b/backend/services/api_settings.py index e3e70e1..7f88616 100644 --- a/backend/services/api_settings.py +++ b/backend/services/api_settings.py @@ -6,6 +6,7 @@ Keys are stored in the backend .env file and loaded via python-dotenv. import os import re import tempfile +import threading from pathlib import Path # Path to the backend .env file @@ -87,6 +88,15 @@ API_REGISTRY = [ "url": "https://firms.modaps.eosdis.nasa.gov/api/area/", "required": False, }, + { + "id": "airframes_api_key", + "env_key": "AIRFRAMES_API_KEY", + "name": "Airframes.io — API Key", + "description": "ACARS/VDL datalink for plane dossiers. ShadowBroker bulk-ingests the global Airframes firehose (up to 100 messages per API call, one call every 2s, refill every 15 minutes) and indexes by tail/ICAO. Opening a dossier with no cache queues a single-plane lookup. Get a key at app.airframes.io → Dashboard → API Key.", + "category": "Aviation", + "url": "https://app.airframes.io/user/dashboard", + "required": False, + }, { "id": "celestrak", "env_key": None, @@ -366,6 +376,17 @@ def save_api_keys(updates: dict[str, str]) -> dict: flights.opensky_client.expires_at = 0 except Exception: pass + if "AIRFRAMES_API_KEY" in clean: + try: + from services.fetchers.airframes import sync_airframes_messages + + threading.Thread( + target=lambda: sync_airframes_messages(force=True), + daemon=True, + name="airframes-initial-sync", + ).start() + except Exception: + pass try: from services.config import get_settings diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 16c71d0..816ed7f 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -77,6 +77,7 @@ from services.fetchers.infrastructure import ( # noqa: F401 fetch_psk_reporter, ) from services.fetchers.road_corridor_sat import fetch_road_corridor_trends # noqa: F401 +from services.fetchers.airframes import sync_airframes_messages # noqa: F401 from services.fetchers.geo import ( # noqa: F401 fetch_ships, fetch_airports, @@ -1263,6 +1264,17 @@ def start_scheduler(): next_run_time=datetime.utcnow() + timedelta(minutes=5), # first snapshot 5m after startup ) + _airframes_interval_m = max(5, int(os.environ.get("AIRFRAMES_SYNC_INTERVAL_MINUTES", "15"))) + _scheduler.add_job( + lambda: _run_task_with_health(sync_airframes_messages, "sync_airframes_messages"), + "interval", + minutes=_airframes_interval_m, + id="airframes_datalink", + max_instances=1, + misfire_grace_time=120, + next_run_time=datetime.utcnow() + timedelta(seconds=90), + ) + _scheduler.start() logger.info("Scheduler started.") diff --git a/backend/services/fetchers/airframes.py b/backend/services/fetchers/airframes.py new file mode 100644 index 0000000..9d1f3cc --- /dev/null +++ b/backend/services/fetchers/airframes.py @@ -0,0 +1,605 @@ +"""Airframes.io ACARS/VDL datalink ingest — staggered queue cache for plane dossiers.""" + +from __future__ import annotations + +import json +import logging +import os +import re +import threading +import time +from collections import deque +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any + +import requests + +logger = logging.getLogger("services.airframes") + +API_BASE = os.environ.get("AIRFRAMES_API_BASE", "https://api.airframes.io/v1").rstrip("/") +SYNC_INTERVAL_MINUTES = max(5, int(os.environ.get("AIRFRAMES_SYNC_INTERVAL_MINUTES", "15"))) +MAX_BULK_PAGES_PER_CYCLE = max(1, int(os.environ.get("AIRFRAMES_MAX_PAGES_PER_SYNC", "28"))) +MESSAGES_PER_AIRCRAFT = max(5, int(os.environ.get("AIRFRAMES_MESSAGES_PER_AIRCRAFT", "40"))) +RETENTION_HOURS = max(6, int(os.environ.get("AIRFRAMES_RETENTION_HOURS", "48"))) +# 2s between calls => 30/min, safely under Airframes 60/min cap. +REQUEST_PAUSE_S = float(os.environ.get("AIRFRAMES_REQUEST_PAUSE_S", "2.0")) +PRIORITY_LOOKBACK_HOURS = max(6, int(os.environ.get("AIRFRAMES_PRIORITY_LOOKBACK_HOURS", "48"))) +FETCH_TIMEOUT_S = max(5, int(os.environ.get("AIRFRAMES_FETCH_TIMEOUT_S", "20"))) + +_DATA_DIR = Path(os.environ.get("SB_DATA_DIR", str(Path(__file__).resolve().parents[2] / "data"))) +if not _DATA_DIR.is_absolute(): + _DATA_DIR = Path(__file__).resolve().parents[2] / _DATA_DIR +_CACHE_PATH = _DATA_DIR / "airframes_datalink_cache.json" + +_lock = threading.Lock() +_queue_lock = threading.Lock() +_worker_guard = threading.Lock() +_queue: deque[dict[str, Any]] = deque() +_queued_aircraft_keys: set[str] = set() +_bulk_cursor: dict[str, Any] = {"since_iso": "", "before_id": None, "pages": 0} +_worker_started = False +_cache_loaded = False +_save_timer: threading.Timer | None = None +_save_timer_lock = threading.Lock() +_api_key_known_configured: bool | None = None +_cache: dict[str, Any] = { + "last_sync_at": None, + "last_success_at": None, + "last_error": None, + "pages_fetched": 0, + "messages_ingested": 0, + "bulk_pages_this_cycle": 0, + "ticks_processed": 0, + "by_icao": {}, + "by_tail": {}, + "by_callsign": {}, +} + + +def _utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def _iso(dt: datetime) -> str: + return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _parse_ts(value: str | None) -> datetime | None: + if not value: + return None + try: + cleaned = value.replace("Z", "+00:00") + return datetime.fromisoformat(cleaned).astimezone(timezone.utc) + except ValueError: + return None + + +def api_key_configured() -> bool: + global _api_key_known_configured + if os.environ.get("AIRFRAMES_API_KEY", "").strip(): + _api_key_known_configured = True + return True + if _api_key_known_configured is False: + return False + from services.api_settings import load_persisted_api_keys_into_environ + + load_persisted_api_keys_into_environ() + _api_key_known_configured = bool(os.environ.get("AIRFRAMES_API_KEY", "").strip()) + return _api_key_known_configured + + +def _norm_hex(value: str | None) -> str: + return (value or "").strip().lower() + + +def _norm_tail(value: str | None) -> str: + return re.sub(r"[^A-Z0-9]", "", (value or "").strip().upper()) + + +def _norm_callsign(value: str | None) -> str: + return re.sub(r"\s+", "", (value or "").strip().upper()) + + +def _aircraft_queue_key(entry: dict[str, str]) -> str: + return f"{entry.get('icao24', '')}|{entry.get('registration', '')}|{entry.get('callsign', '')}" + + +def _tail_lookup_keys(value: str | None) -> list[str]: + tail = _norm_tail(value) + if not tail: + return [] + keys = [tail] + raw = (value or "").strip().upper() + if raw and raw not in keys: + keys.append(raw) + return keys + + +def _load_cache_if_cold() -> None: + global _cache, _cache_loaded + if _cache_loaded: + return + loaded: dict[str, Any] | None = None + if _CACHE_PATH.exists(): + try: + with _CACHE_PATH.open(encoding="utf-8") as handle: + parsed = json.load(handle) + if isinstance(parsed, dict): + loaded = parsed + except (OSError, json.JSONDecodeError, ValueError) as exc: + logger.warning("Failed to load Airframes cache: %s", exc) + with _lock: + if _cache_loaded: + return + if loaded: + _cache.update(loaded) + _cache.setdefault("by_callsign", {}) + _cache_loaded = True + + +def _persist_cache_now() -> None: + with _lock: + snapshot = json.dumps(_cache, indent=2, ensure_ascii=False) + "\n" + _DATA_DIR.mkdir(parents=True, exist_ok=True) + tmp = _CACHE_PATH.with_suffix(".tmp") + tmp.write_text(snapshot, encoding="utf-8") + tmp.replace(_CACHE_PATH) + + +def _schedule_cache_persist() -> None: + global _save_timer + + def _flush() -> None: + global _save_timer + try: + _persist_cache_now() + except OSError as exc: + logger.warning("Failed to save Airframes cache: %s", exc) + finally: + with _save_timer_lock: + _save_timer = None + + with _save_timer_lock: + if _save_timer is not None: + _save_timer.cancel() + _save_timer = threading.Timer(0.75, _flush) + _save_timer.daemon = True + _save_timer.start() + + +def _save_cache() -> None: + _schedule_cache_persist() + + +def _compact_message(raw: dict[str, Any]) -> dict[str, Any] | None: + text = (raw.get("text") or raw.get("data") or "").strip() + if not text: + return None + msg_id = raw.get("id") + if msg_id is None: + return None + return { + "id": int(msg_id), + "timestamp": raw.get("timestamp") or raw.get("createdAt") or "", + "label": str(raw.get("label") or "").strip(), + "text": text[:500], + "source_type": str(raw.get("sourceType") or raw.get("source") or "").strip(), + "tail": _norm_tail(raw.get("tail")), + "flight_number": _norm_callsign(raw.get("flightNumber")), + "from_hex": _norm_hex(raw.get("fromHex")), + "to_hex": _norm_hex(raw.get("toHex")), + } + + +def _bucket_key(store: dict[str, list], key: str, message: dict[str, Any]) -> None: + if not key: + return + bucket = store.setdefault(key, []) + if any(existing.get("id") == message["id"] for existing in bucket): + return + bucket.append(message) + bucket.sort(key=lambda item: item.get("timestamp") or "", reverse=True) + del bucket[MESSAGES_PER_AIRCRAFT:] + + +def _index_message(compact: dict[str, Any]) -> None: + for hex_code in (compact.get("from_hex"), compact.get("to_hex")): + if hex_code: + _bucket_key(_cache["by_icao"], hex_code, compact) + for tail_key in _tail_lookup_keys(compact.get("tail")): + _bucket_key(_cache["by_tail"], tail_key, compact) + callsign = compact.get("flight_number") + if callsign: + _bucket_key(_cache["by_callsign"], callsign, compact) + + +def _prune_store(store: dict[str, list]) -> None: + cutoff = _utc_now() - timedelta(hours=RETENTION_HOURS) + for key in list(store.keys()): + kept = [] + for message in store.get(key, []): + ts = _parse_ts(message.get("timestamp")) + if ts is None or ts >= cutoff: + kept.append(message) + if kept: + store[key] = kept[:MESSAGES_PER_AIRCRAFT] + else: + del store[key] + + +def _ingest_message(message: dict[str, Any]) -> bool: + compact = _compact_message(message) + if not compact: + return False + _index_message(compact) + return True + + +def _ingest_messages_batch(raw_messages: list[dict[str, Any]]) -> int: + if not raw_messages: + return 0 + ingested = 0 + with _lock: + _cache.setdefault("by_callsign", {}) + for raw in raw_messages: + if _ingest_message(raw): + ingested += 1 + if ingested: + _cache["messages_ingested"] = int(_cache.get("messages_ingested", 0)) + ingested + _cache["last_success_at"] = _iso(_utc_now()) + _save_cache() + return ingested + + +def _fetch_messages(*, api_key: str, params: dict[str, Any]) -> list[dict[str, Any]]: + response = requests.get( + f"{API_BASE}/messages", + headers={"Authorization": f"Bearer {api_key}"}, + params=params, + timeout=FETCH_TIMEOUT_S, + ) + if response.status_code == 404: + logger.debug("Airframes messages 404 for params=%s", params) + return [] + if response.status_code == 429: + retry_after = int(response.headers.get("Retry-After", "60")) + raise RuntimeError(f"rate_limited:{retry_after}") + response.raise_for_status() + payload = response.json() + if not isinstance(payload, list): + return [] + return [item for item in payload if isinstance(item, dict)] + + +def _refill_queue(*, since_iso: str, force: bool = False) -> int: + """Queue bulk global ingest only — each bulk call returns up to 100 messages + across many aircraft. Per-plane calls happen only on dossier cache miss.""" + global _bulk_cursor, _queued_aircraft_keys + + with _queue_lock: + if force: + _queue.clear() + _queued_aircraft_keys = set() + _bulk_cursor = {"since_iso": since_iso, "before_id": None, "pages": 0} + + added = 0 + has_bulk = any(item.get("type") == "bulk" for item in _queue) + if not has_bulk: + _bulk_cursor["since_iso"] = since_iso + _queue.append({"type": "bulk", "since_iso": since_iso, "before_id": None}) + added += 1 + + with _lock: + _cache["bulk_pages_this_cycle"] = 0 + _save_cache() + + return added + + +def _prioritize_aircraft_scan(entry: dict[str, str]) -> bool: + """Jump this aircraft to the front of the queue — next API tick (~2s).""" + key = _aircraft_queue_key(entry) + if key.replace("|", "").strip() == "": + return False + + item = {"type": "aircraft", **entry} + with _queue_lock: + kept: deque[dict[str, Any]] = deque() + for queued in _queue: + if queued.get("type") == "aircraft" and _aircraft_queue_key(queued) == key: + continue + kept.append(queued) + _queue.clear() + _queue.extend(kept) + _queued_aircraft_keys.discard(key) + _queued_aircraft_keys.add(key) + _queue.appendleft(item) + return True + + +def _enqueue_bulk_page(*, since_iso: str, before_id: int | None = None) -> None: + with _queue_lock: + _queue.append({"type": "bulk", "since_iso": since_iso, "before_id": before_id}) + + +def _process_aircraft_item(api_key: str, entry: dict[str, str]) -> int: + since_iso = _iso(_utc_now() - timedelta(hours=PRIORITY_LOOKBACK_HOURS)) + params: dict[str, Any] = { + "since": since_iso, + "limit": 100, + "exclude_errors": "1", + } + if entry.get("icao24"): + params["icao"] = entry["icao24"] + elif entry.get("registration"): + params["text"] = entry["registration"] + elif entry.get("callsign"): + params["text"] = entry["callsign"] + else: + return 0 + + try: + batch = _fetch_messages(api_key=api_key, params=params) + except Exception as exc: + logger.debug("Airframes aircraft fetch failed for %s: %s", entry, exc) + with _lock: + _cache["last_error"] = str(exc)[:240] + _save_cache() + return 0 + + return _ingest_messages_batch(batch) + + +def _process_bulk_item(api_key: str, item: dict[str, Any]) -> int: + global _bulk_cursor + + params: dict[str, Any] = { + "since": item["since_iso"], + "limit": 100, + "exclude_errors": "1", + } + before_id = item.get("before_id") + if before_id is not None: + params["before_id"] = before_id + + try: + batch = _fetch_messages(api_key=api_key, params=params) + except Exception as exc: + logger.debug("Airframes bulk fetch failed: %s", exc) + with _lock: + _cache["last_error"] = str(exc)[:240] + _save_cache() + return 0 + + ingested = _ingest_messages_batch(batch) + + with _lock: + _bulk_cursor["pages"] = int(_bulk_cursor.get("pages", 0)) + 1 + _cache["pages_fetched"] = int(_cache.get("pages_fetched", 0)) + 1 + _save_cache() + + if ( + batch + and len(batch) >= 100 + and _bulk_cursor.get("pages", 0) < MAX_BULK_PAGES_PER_CYCLE + ): + ids = [int(row["id"]) for row in batch if row.get("id") is not None] + if ids: + next_before = min(ids) + if before_id is None or next_before < before_id: + _enqueue_bulk_page(since_iso=item["since_iso"], before_id=next_before) + + return ingested + + +def _process_one_staggered_tick() -> int: + """Process exactly one queued Airframes API call. Used by the background worker.""" + if not api_key_configured(): + return 0 + + api_key = os.environ.get("AIRFRAMES_API_KEY", "").strip() + with _queue_lock: + if not _queue: + return 0 + item = _queue.popleft() + + if item.get("type") == "aircraft": + key = _aircraft_queue_key(item) + with _queue_lock: + _queued_aircraft_keys.discard(key) + ingested = _process_aircraft_item(api_key, item) + elif item.get("type") == "bulk": + ingested = _process_bulk_item(api_key, item) + else: + ingested = 0 + + with _lock: + _cache["ticks_processed"] = int(_cache.get("ticks_processed", 0)) + 1 + if int(_cache.get("ticks_processed", 0)) % 25 == 0: + for store_key in ("by_icao", "by_tail", "by_callsign"): + _prune_store(_cache[store_key]) + _save_cache() + + return ingested + + +def _stagger_worker_loop() -> None: + while True: + time.sleep(REQUEST_PAUSE_S) + try: + _process_one_staggered_tick() + except Exception as exc: + logger.error("Airframes stagger worker tick failed: %s", exc) + + +def _ensure_stagger_worker() -> None: + global _worker_started + if _worker_started: + return + with _worker_guard: + if _worker_started: + return + _worker_started = True + threading.Thread( + target=_stagger_worker_loop, + daemon=True, + name="airframes-stagger", + ).start() + logger.info( + "Airframes stagger worker started (bulk ingest: 1 call / %.1fs, up to %s msgs/call, refill every %sm)", + REQUEST_PAUSE_S, + 100, + SYNC_INTERVAL_MINUTES, + ) + + +def sync_airframes_messages(*, force: bool = False) -> dict[str, Any]: + """Queue staggered Airframes fetches — one API call every REQUEST_PAUSE_S.""" + if not api_key_configured(): + return {"ok": False, "skipped": True, "reason": "AIRFRAMES_API_KEY not configured"} + + started = _utc_now() + _load_cache_if_cold() + + with _lock: + _cache.setdefault("by_callsign", {}) + last_sync_at = _parse_ts(_cache.get("last_sync_at")) + if ( + not force + and last_sync_at is not None + and started - last_sync_at < timedelta(minutes=SYNC_INTERVAL_MINUTES - 1) + ): + return {"ok": True, "skipped": True, "reason": "sync_interval_not_elapsed"} + + if _cache.get("last_success_at"): + since_dt = _parse_ts(_cache.get("last_success_at")) or ( + started - timedelta(minutes=SYNC_INTERVAL_MINUTES) + ) + since_dt -= timedelta(minutes=2) + else: + since_dt = started - timedelta(hours=PRIORITY_LOOKBACK_HOURS) + since_iso = _iso(since_dt) + _cache["last_sync_at"] = _iso(started) + _cache["last_error"] = None + _save_cache() + + queued = _refill_queue(since_iso=since_iso, force=force) + _ensure_stagger_worker() + + with _queue_lock: + queue_depth = len(_queue) + + logger.info( + "Airframes cycle queued: added=%s depth=%s interval=%.1fs", + queued, + queue_depth, + REQUEST_PAUSE_S, + ) + return { + "ok": True, + "queued": queued, + "queue_depth": queue_depth, + "request_interval_s": REQUEST_PAUSE_S, + "sync_interval_minutes": SYNC_INTERVAL_MINUTES, + } + + +def _lookup_from_cache( + *, + hex_key: str, + tail_keys: list[str], + callsign_key: str, +) -> tuple[list[dict[str, Any]], str | None]: + _load_cache_if_cold() + with _lock: + _cache.setdefault("by_callsign", {}) + merged: dict[int, dict[str, Any]] = {} + if hex_key: + for message in _cache.get("by_icao", {}).get(hex_key, []): + merged[message["id"]] = message + for tail_key in tail_keys: + for message in _cache.get("by_tail", {}).get(tail_key, []): + merged[message["id"]] = message + if callsign_key: + for message in _cache.get("by_callsign", {}).get(callsign_key, []): + merged[message["id"]] = message + last_success_at = _cache.get("last_success_at") + + messages = sorted(merged.values(), key=lambda item: item.get("timestamp") or "", reverse=True) + return messages[:MESSAGES_PER_AIRCRAFT], last_success_at + + +def get_datalink_status() -> dict[str, Any]: + configured = api_key_configured() + _load_cache_if_cold() + with _queue_lock: + queue_depth = len(_queue) + with _lock: + return { + "configured": configured, + "sync_interval_minutes": SYNC_INTERVAL_MINUTES, + "request_interval_s": REQUEST_PAUSE_S, + "last_sync_at": _cache.get("last_sync_at"), + "last_success_at": _cache.get("last_success_at"), + "last_error": _cache.get("last_error"), + "pages_fetched": _cache.get("pages_fetched", 0), + "messages_ingested": _cache.get("messages_ingested", 0), + "bulk_pages_this_cycle": int(_bulk_cursor.get("pages", 0)), + "bulk_pages_per_cycle": MAX_BULK_PAGES_PER_CYCLE, + "messages_per_bulk_call": 100, + "queue_depth": queue_depth, + "ticks_processed": _cache.get("ticks_processed", 0), + "icao_keys": len(_cache.get("by_icao", {})), + "tail_keys": len(_cache.get("by_tail", {})), + "callsign_keys": len(_cache.get("by_callsign", {})), + } + + +def lookup_datalink_messages( + *, + icao24: str = "", + registration: str = "", + callsign: str = "", + allow_live: bool = False, +) -> dict[str, Any]: + configured = bool(os.environ.get("AIRFRAMES_API_KEY", "").strip()) or api_key_configured() + if not configured: + return { + "configured": False, + "messages": [], + "hint": "Add AIRFRAMES_API_KEY in Settings → API Keys to enable ACARS datalink.", + } + + hex_key = _norm_hex(icao24) + tail_keys = _tail_lookup_keys(registration) + callsign_key = _norm_callsign(callsign) + + messages, last_success_at = _lookup_from_cache( + hex_key=hex_key, + tail_keys=tail_keys, + callsign_key=callsign_key, + ) + + queued_refresh = False + if hex_key or tail_keys or callsign_key: + queued_refresh = _prioritize_aircraft_scan( + { + "icao24": hex_key, + "registration": _norm_tail(registration), + "callsign": callsign_key, + } + ) + if queued_refresh: + _ensure_stagger_worker() + + return { + "configured": True, + "messages": messages, + "last_success_at": last_success_at, + "queued_refresh": queued_refresh, + "priority_scan": queued_refresh, + } + + +_load_cache_if_cold() diff --git a/backend/services/fetchers/meshtastic_map.py b/backend/services/fetchers/meshtastic_map.py index 4614bd9..2681f07 100644 --- a/backend/services/fetchers/meshtastic_map.py +++ b/backend/services/fetchers/meshtastic_map.py @@ -12,9 +12,11 @@ Polling interval deliberately kept low (4h) to be respectful to the service. import json import logging +import threading import time from datetime import datetime, timezone, timedelta from pathlib import Path +from typing import Any import requests @@ -30,9 +32,6 @@ _MAX_AGE_HOURS = 24 # discard nodes not seen within this window # one-person hobby service, so we prefer stale data over hammering it. _CACHE_TRUST_HOURS = 20 -# Track when we last fetched so the frontend can show staleness -_last_fetch_ts: float = 0.0 - def _parse_node(node: dict) -> dict | None: """Convert an API node into a slim signal-like dict.""" @@ -132,7 +131,43 @@ def _save_cache(nodes: list[dict], fetch_ts: float): logger.warning(f"Failed to save meshtastic cache: {e}") -def fetch_meshtastic_nodes(): +# Track when we last fetched so the frontend can show staleness +_last_fetch_ts: float = 0.0 +_scan_lock = threading.Lock() +_scan_in_progress = False + + +def get_meshtastic_map_status() -> dict[str, Any]: + from services.fetchers._store import get_latest_data_subset_refs + + snap = get_latest_data_subset_refs("meshtastic_map_nodes", "meshtastic_map_fetched_at") + nodes = snap.get("meshtastic_map_nodes") or [] + fetched_at = snap.get("meshtastic_map_fetched_at") + return { + "node_count": len(nodes) if isinstance(nodes, list) else 0, + "fetched_at": fetched_at, + "scan_in_progress": _scan_in_progress, + } + + +def start_meshtastic_planet_scan() -> dict[str, Any]: + if not _scan_lock.acquire(blocking=False): + return {"ok": False, "status": "scan already in progress"} + + def _run() -> None: + global _scan_in_progress + try: + _scan_in_progress = True + fetch_meshtastic_nodes(force=True) + finally: + _scan_in_progress = False + _scan_lock.release() + + threading.Thread(target=_run, daemon=True, name="meshtastic-planet-scan").start() + return {"ok": True, "status": "scanning"} + + +def fetch_meshtastic_nodes(*, force: bool = False): """Fetch global Meshtastic node positions from Liam Cottle's map API. Stores processed nodes in latest_data["meshtastic_map_nodes"]. @@ -140,39 +175,40 @@ def fetch_meshtastic_nodes(): """ from services.fetchers._store import is_any_active - if not is_any_active("sigint_meshtastic"): + if not force and not is_any_active("sigint_meshtastic"): return global _last_fetch_ts # Trust a recent cache on disk — avoids hammering the upstream HTTP API # when every install polls on roughly the same cadence. - try: - if _CACHE_FILE.exists(): - mtime = _CACHE_FILE.stat().st_mtime - if time.time() - mtime < _CACHE_TRUST_HOURS * 3600: - # If memory is empty (cold start), hydrate from cache and skip fetch. - with _data_lock: - has_memory = bool(latest_data.get("meshtastic_map_nodes")) - if not has_memory: - cached = _load_cache() - if cached: - with _data_lock: - latest_data["meshtastic_map_nodes"] = cached - latest_data["meshtastic_map_fetched_at"] = mtime - _mark_fresh("meshtastic_map") + if not force: + try: + if _CACHE_FILE.exists(): + mtime = _CACHE_FILE.stat().st_mtime + if time.time() - mtime < _CACHE_TRUST_HOURS * 3600: + # If memory is empty (cold start), hydrate from cache and skip fetch. + with _data_lock: + has_memory = bool(latest_data.get("meshtastic_map_nodes")) + if not has_memory: + cached = _load_cache() + if cached: + with _data_lock: + latest_data["meshtastic_map_nodes"] = cached + latest_data["meshtastic_map_fetched_at"] = mtime + _mark_fresh("meshtastic_map") + logger.info( + "Meshtastic map: cache fresh (<%.0fh), skipping network fetch", + _CACHE_TRUST_HOURS, + ) + return + else: logger.info( "Meshtastic map: cache fresh (<%.0fh), skipping network fetch", _CACHE_TRUST_HOURS, ) return - else: - logger.info( - "Meshtastic map: cache fresh (<%.0fh), skipping network fetch", - _CACHE_TRUST_HOURS, - ) - return - except Exception as e: - logger.debug(f"Meshtastic cache freshness check failed: {e}") + except Exception as e: + logger.debug(f"Meshtastic cache freshness check failed: {e}") # Build a polite User-Agent. Historically this included the operator # callsign so meshtastic.org could rate-limit per-install; that's still diff --git a/backend/tests/test_airframes_datalink.py b/backend/tests/test_airframes_datalink.py new file mode 100644 index 0000000..ac64ca2 --- /dev/null +++ b/backend/tests/test_airframes_datalink.py @@ -0,0 +1,177 @@ +import json +from unittest.mock import patch + +import pytest + + +@pytest.fixture +def airframes_env(tmp_path, monkeypatch): + from services.fetchers import airframes + + cache_path = tmp_path / "airframes_datalink_cache.json" + monkeypatch.setattr(airframes, "_CACHE_PATH", cache_path) + monkeypatch.setattr(airframes, "_DATA_DIR", tmp_path) + airframes._cache = { + "last_sync_at": None, + "last_success_at": None, + "last_error": None, + "pages_fetched": 0, + "messages_ingested": 0, + "priority_aircraft_synced": 0, + "bulk_pages_this_cycle": 0, + "ticks_processed": 0, + "by_icao": {}, + "by_tail": {}, + "by_callsign": {}, + } + airframes._queue.clear() + airframes._queued_aircraft_keys.clear() + airframes._bulk_cursor = {"since_iso": "", "before_id": None, "pages": 0} + airframes._cache_loaded = True + airframes._api_key_known_configured = True + monkeypatch.setenv("AIRFRAMES_API_KEY", "test-key") + return airframes + + +def test_sync_skips_without_api_key(airframes_env, monkeypatch): + monkeypatch.delenv("AIRFRAMES_API_KEY", raising=False) + airframes_env._api_key_known_configured = None + result = airframes_env.sync_airframes_messages(force=True) + assert result["ok"] is False + assert result["skipped"] is True + + +@patch("services.fetchers.airframes.requests.get") +def test_sync_ingests_messages(mock_get, airframes_env): + from datetime import datetime, timezone + + recent = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + mock_get.return_value.status_code = 200 + mock_get.return_value.headers = {} + mock_get.return_value.json.return_value = [ + { + "id": 101, + "timestamp": recent, + "label": "H1", + "text": "ETA 1432", + "sourceType": "acars", + "fromHex": "A022B9", + "tail": "9H-TJZ", + "flightNumber": "CXI3SY", + } + ] + + result = airframes_env.sync_airframes_messages(force=True) + assert result["ok"] is True + assert result["queued"] >= 1 + + ingested = airframes_env._process_one_staggered_tick() + assert ingested == 1 + + lookup = airframes_env.lookup_datalink_messages( + icao24="a022b9", + registration="9H-TJZ", + callsign="CXI3SY", + allow_live=False, + ) + assert lookup["configured"] is True + assert len(lookup["messages"]) == 1 + assert lookup["messages"][0]["text"] == "ETA 1432" + + +def test_lookup_queues_priority_scan_on_every_open(airframes_env): + lookup = airframes_env.lookup_datalink_messages(icao24="abc123", allow_live=False) + assert lookup["configured"] is True + assert lookup["messages"] == [] + assert lookup["queued_refresh"] is True + assert lookup["priority_scan"] is True + with airframes_env._queue_lock: + assert airframes_env._queue[0]["type"] == "aircraft" + assert airframes_env._queue[0]["icao24"] == "abc123" + + +def test_priority_scan_jumps_ahead_of_bulk(airframes_env): + airframes_env._refill_queue(since_iso="2026-01-01T00:00:00Z", force=True) + with airframes_env._queue_lock: + assert airframes_env._queue[0]["type"] == "bulk" + + airframes_env.lookup_datalink_messages(icao24="deadbeef", allow_live=False) + + with airframes_env._queue_lock: + assert airframes_env._queue[0]["type"] == "aircraft" + assert airframes_env._queue[0]["icao24"] == "deadbeef" + + +def test_lookup_still_queues_when_cache_hit(airframes_env): + from datetime import datetime, timezone + + recent = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + airframes_env._ingest_message( + { + "id": 404, + "timestamp": recent, + "text": "CACHED MSG", + "fromHex": "a022b9", + } + ) + lookup = airframes_env.lookup_datalink_messages(icao24="a022b9", allow_live=False) + assert len(lookup["messages"]) == 1 + assert lookup["priority_scan"] is True + with airframes_env._queue_lock: + assert airframes_env._queue[0]["icao24"] == "a022b9" + + +def test_lookup_unconfigured_shows_hint(airframes_env, monkeypatch): + monkeypatch.delenv("AIRFRAMES_API_KEY", raising=False) + airframes_env._api_key_known_configured = None + lookup = airframes_env.lookup_datalink_messages(icao24="abc123") + assert lookup["configured"] is False + assert lookup["messages"] == [] + assert "AIRFRAMES_API_KEY" in lookup["hint"] + + +def test_lookup_indexes_to_hex_and_callsign(airframes_env): + from datetime import datetime, timezone + + recent = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + airframes_env._ingest_message( + { + "id": 202, + "timestamp": recent, + "text": "DESCENT TO FL100", + "fromHex": "ABCDEF", + "toHex": "a022b9", + "flightNumber": "RCH123", + } + ) + + by_icao = airframes_env.lookup_datalink_messages(icao24="a022b9", allow_live=False) + assert len(by_icao["messages"]) == 1 + + by_callsign = airframes_env.lookup_datalink_messages(callsign="RCH123", allow_live=False) + assert len(by_callsign["messages"]) == 1 + + +def test_tail_lookup_normalizes_dashes(airframes_env): + from datetime import datetime, timezone + + recent = datetime.now(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + airframes_env._ingest_message( + { + "id": 303, + "timestamp": recent, + "text": "ON GROUND", + "tail": "9H-TJZ", + } + ) + + lookup = airframes_env.lookup_datalink_messages(registration="9HTJZ", allow_live=False) + assert len(lookup["messages"]) == 1 + + +def test_api_registry_includes_airframes_key(): + from services.api_settings import API_REGISTRY, ALLOWED_ENV_KEYS + + entry = next(item for item in API_REGISTRY if item["id"] == "airframes_api_key") + assert entry["env_key"] == "AIRFRAMES_API_KEY" + assert "AIRFRAMES_API_KEY" in ALLOWED_ENV_KEYS diff --git a/docker-compose.yml b/docker-compose.yml index 3225087..388dbcf 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,6 +25,7 @@ services: - WINDY_API_KEY=${WINDY_API_KEY:-} - ADMIN_KEY=${ADMIN_KEY:-} - FINNHUB_API_KEY=${FINNHUB_API_KEY:-} + - AIRFRAMES_API_KEY=${AIRFRAMES_API_KEY:-} # Override allowed CORS origins (comma-separated). Auto-detects LAN IPs if empty. - CORS_ORIGINS=${CORS_ORIGINS:-} # Private Infonet bootstrap seeds. Seeds are discovery hints, not fixed roots. diff --git a/frontend/src/components/DatalinkMessagesBlock.tsx b/frontend/src/components/DatalinkMessagesBlock.tsx new file mode 100644 index 0000000..07774d1 --- /dev/null +++ b/frontend/src/components/DatalinkMessagesBlock.tsx @@ -0,0 +1,178 @@ +'use client'; + +import { useCallback, useEffect, useRef, useState } from 'react'; +import { API_BASE } from '@/lib/api'; + +type DatalinkMessage = { + id: number; + timestamp?: string; + label?: string; + text?: string; + source_type?: string; +}; + +const PRIORITY_POLL_MS = 3_000; +const PRIORITY_POLL_MAX_MS = 45_000; + +function formatDatalinkTime(value?: string): string { + if (!value) return '--:--'; + try { + const date = new Date(value); + if (Number.isNaN(date.getTime())) return value.slice(11, 16) || value; + return date.toLocaleTimeString([], { hour: '2-digit', minute: '2-digit', hour12: false }); + } catch { + return '--:--'; + } +} + +export default function DatalinkMessagesBlock({ + icao24, + registration, + callsign, +}: { + icao24?: string; + registration?: string; + callsign?: string; +}) { + const [loading, setLoading] = useState(true); + const [configured, setConfigured] = useState(null); + const [messages, setMessages] = useState([]); + const [hint, setHint] = useState(null); + const [loadError, setLoadError] = useState(null); + const [priorityScanning, setPriorityScanning] = useState(false); + const pollUntilRef = useRef(0); + + const buildParams = useCallback(() => { + const params = new URLSearchParams(); + if (icao24) params.set('icao24', icao24); + if (registration) params.set('registration', registration); + if (callsign) params.set('callsign', callsign); + return params; + }, [icao24, registration, callsign]); + + const fetchDatalink = useCallback( + async (opts?: { showLoading?: boolean }) => { + const params = buildParams(); + if ([...params.keys()].length === 0) { + setLoading(false); + return; + } + + if (opts?.showLoading) { + setLoading(true); + setLoadError(null); + } + + try { + const res = await fetch(`${API_BASE}/api/aviation/datalink/messages?${params.toString()}`, { + cache: 'no-store', + }); + if (!res.ok) throw new Error(`datalink ${res.status}`); + const json = await res.json(); + setConfigured(Boolean(json.configured)); + setMessages(Array.isArray(json.messages) ? json.messages : []); + setHint(typeof json.hint === 'string' ? json.hint : null); + setLoadError(null); + if (json.priority_scan || json.queued_refresh) { + setPriorityScanning(true); + pollUntilRef.current = Date.now() + PRIORITY_POLL_MAX_MS; + } + if (Array.isArray(json.messages) && json.messages.length > 0) { + setPriorityScanning(false); + } + } catch { + if (opts?.showLoading) { + setConfigured(null); + setMessages([]); + setLoadError('Could not reach ACARS cache. Try again in a moment.'); + } + } finally { + if (opts?.showLoading) setLoading(false); + } + }, + [buildParams], + ); + + useEffect(() => { + pollUntilRef.current = 0; + setPriorityScanning(false); + void fetchDatalink({ showLoading: true }); + }, [fetchDatalink]); + + useEffect(() => { + if (!priorityScanning) return; + + const intervalId = window.setInterval(() => { + if (Date.now() > pollUntilRef.current) { + setPriorityScanning(false); + return; + } + void fetchDatalink(); + }, PRIORITY_POLL_MS); + + return () => window.clearInterval(intervalId); + }, [priorityScanning, fetchDatalink]); + + if (loading) { + return ( +
+ DATALINK (AIRFRAMES) + Loading ACARS cache… +
+ ); + } + + if (loadError) { + return ( +
+ DATALINK (AIRFRAMES) +

{loadError}

+
+ ); + } + + if (configured === false) { + return ( +
+ DATALINK (AIRFRAMES) +

+ {hint || 'Add your Airframes API key in Settings → API Keys to enable ACARS datalink on plane dossiers.'} +

+
+ ); + } + + if (!messages.length) { + return ( +
+ DATALINK (AIRFRAMES) +

+ {priorityScanning + ? 'Priority scan queued for this aircraft (~2s)…' + : 'No recent ACARS/VDL messages for this aircraft.'} +

+
+ ); + } + + return ( +
+ DATALINK (AIRFRAMES) + {priorityScanning ? ( +

Refreshing this aircraft…

+ ) : null} +
+ {messages.map((message) => ( +
+
+ {formatDatalinkTime(message.timestamp)} + {message.label ? {message.label} : null} + {message.source_type ? {message.source_type} : null} +
+
{message.text}
+
+ ))} +
+
+ ); +} diff --git a/frontend/src/components/NewsFeed.tsx b/frontend/src/components/NewsFeed.tsx index 49d2800..c6c2257 100644 --- a/frontend/src/components/NewsFeed.tsx +++ b/frontend/src/components/NewsFeed.tsx @@ -11,6 +11,7 @@ import { fetchWikipediaSummary } from '@/lib/wikimediaClient'; import type { SelectedEntity, RegionDossier, FimiData } from "@/types/dashboard"; import { useDataKeys } from '@/hooks/useDataStore'; import { API_BASE } from '@/lib/api'; +import DatalinkMessagesBlock from '@/components/DatalinkMessagesBlock'; import { lookupShodanHost } from '@/lib/shodanClient'; import type { ShodanHost } from '@/types/shodan'; @@ -909,6 +910,11 @@ function NewsFeedInner({ selectedEntity, regionDossier, regionDossierLoading, gt {flight.squawk}{flight.squawk === '7700' ? ' ⚠ EMERGENCY' : flight.squawk === '7600' ? ' COMMS LOST' : ''} )} + {flight.alert_link && (
@@ -1162,6 +1168,11 @@ function NewsFeedInner({ selectedEntity, regionDossier, regionDossierLoading, gt {flight.squawk}{flight.squawk === '7700' ? ' ⚠ EMERGENCY' : flight.squawk === '7600' ? ' COMMS LOST' : ''}
)} +
ROUTE {flight.origin_name !== "UNKNOWN" ? `[${flight.origin_name}] → [${flight.dest_name}]` : "UNKNOWN"} diff --git a/frontend/src/components/WorldviewLeftPanel.tsx b/frontend/src/components/WorldviewLeftPanel.tsx index 1adc863..0f38f9e 100644 --- a/frontend/src/components/WorldviewLeftPanel.tsx +++ b/frontend/src/components/WorldviewLeftPanel.tsx @@ -44,6 +44,7 @@ import { Radar, MapPin, Truck, + RefreshCw, } from 'lucide-react'; import RoadCorridorLayerControls from '@/components/RoadCorridorLayerControls'; import { API_BASE } from '@/lib/api'; @@ -764,7 +765,10 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({ const { theme, toggleTheme, hudColor, cycleHudColor } = useTheme(); const [gibsPlaying, setGibsPlaying] = useState(false); const [potusEnabled, setPotusEnabled] = useState(true); + const [meshtasticScanning, setMeshtasticScanning] = useState(false); + const [meshtasticScanMessage, setMeshtasticScanMessage] = useState(''); const gibsIntervalRef = useRef | null>(null); + const meshtasticScanPollRef = useRef | null>(null); // SAR mode chooser — prompts the first time the user enables the SAR // layer, remembers the choice, and auto-detects server-side Mode B. @@ -986,6 +990,67 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({ return { meshtasticCount: mesh, aprsCount: aprs }; }, [data?.sigint, data?.sigint_totals]); + const stopMeshtasticScanPoll = useCallback(() => { + if (meshtasticScanPollRef.current) { + clearInterval(meshtasticScanPollRef.current); + meshtasticScanPollRef.current = null; + } + }, []); + + useEffect(() => () => stopMeshtasticScanPoll(), [stopMeshtasticScanPoll]); + + const scanMeshtasticPlanet = useCallback( + async (event: React.MouseEvent) => { + event.stopPropagation(); + if (meshtasticScanning) return; + + setMeshtasticScanning(true); + setMeshtasticScanMessage('Starting global node scan…'); + stopMeshtasticScanPoll(); + + try { + const res = await fetch(`${API_BASE}/api/sigint/meshtastic/scan`, { method: 'POST' }); + const json = await res.json().catch(() => ({})); + if (!res.ok || json.ok === false) { + setMeshtasticScanMessage( + typeof json.status === 'string' ? json.status : 'Meshtastic scan could not start.', + ); + setMeshtasticScanning(false); + return; + } + + setMeshtasticScanMessage('Scanning planet (~90s)…'); + let polls = 0; + meshtasticScanPollRef.current = setInterval(async () => { + polls += 1; + try { + const statusRes = await fetch(`${API_BASE}/api/sigint/meshtastic/status`); + if (!statusRes.ok) return; + const status = await statusRes.json(); + if (!status.scan_in_progress) { + stopMeshtasticScanPoll(); + setMeshtasticScanning(false); + const count = Number(status.node_count || 0); + setMeshtasticScanMessage( + count > 0 ? `Scan complete — ${count.toLocaleString()} nodes on map.` : 'Scan complete.', + ); + } else if (polls >= 40) { + stopMeshtasticScanPoll(); + setMeshtasticScanning(false); + setMeshtasticScanMessage('Scan still running in background. Count will update when finished.'); + } + } catch { + // keep polling until timeout + } + }, 3000); + } catch { + setMeshtasticScanning(false); + setMeshtasticScanMessage('Meshtastic scan request failed.'); + } + }, + [meshtasticScanning, stopMeshtasticScanPoll], + ); + const cctvCount = Number(data?.cctv_total || data?.cctv?.length || 0); const satnogsCount = Number(data?.satnogs_total || data?.satnogs_stations?.length || 0); const tinygsCount = Number(data?.tinygs_total || data?.tinygs_satellites?.length || 0); @@ -1926,6 +1991,31 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({ )}
+ {active && layer.id === 'sigint_meshtastic' && ( +
e.stopPropagation()} + > + + {meshtasticScanMessage ? ( + + {meshtasticScanMessage} + + ) : null} +
+ )} {/* GIBS Imagery inline controls */} {active && layer.id === 'gibs_imagery' &&