mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-28 18:11:31 +02:00
5e0b2c037e
On 2026-05-23, stream.aisstream.io went fully offline (TCP timeouts on port
443). The backend kept respawning the node WebSocket proxy every few
seconds with nothing arriving. From the operator's POV the ships layer
silently went empty — no banner, no log surfacing, no way to tell whether
it was their config / network / viewport filter / upstream.
Backend:
* ais_proxy_status() now also returns:
- connected (bool): true when a vessel message arrived in last 60s
- last_msg_age_seconds (int | None)
- proxy_spawn_count (int): proxy respawns — sustained growth without
connected means upstream is dead
* /api/health escalates top status to "degraded" when AIS_API_KEY is set
but the proxy is currently disconnected. Existing degraded_tls signal
preserved.
Frontend:
* useAisUpstreamHealth hook polls /api/health every 30s, derives the
outage state. Defensively only reports outage once spawn_count > 0 so
operators who haven't opted in don't see the banner.
* AisUpstreamBanner component renders a dismissible amber notice
"Ship data temporarily unavailable — AISStream upstream is offline"
mounted on the main app shell.
7 backend tests pin the status-shape contract and the /api/health
escalation behavior in both with-key and without-key configurations.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
859 lines
28 KiB
Python
859 lines
28 KiB
Python
"""
|
|
AIS Stream WebSocket client for real-time maritime vessel tracking.
|
|
Connects to aisstream.io and maintains a live dictionary of global vessel positions.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
AIS_WS_URL = "wss://stream.aisstream.io/v0/stream"
|
|
API_KEY = os.environ.get("AIS_API_KEY", "")
|
|
|
|
|
|
def _env_truthy(name: str) -> bool:
|
|
return str(os.getenv(name, "")).strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def ais_stream_proxy_enabled() -> bool:
|
|
"""Return whether the external Node AIS proxy may be started."""
|
|
setting = str(os.getenv("SHADOWBROKER_ENABLE_AIS_STREAM_PROXY", "")).strip().lower()
|
|
if setting:
|
|
return _env_truthy("SHADOWBROKER_ENABLE_AIS_STREAM_PROXY")
|
|
return True
|
|
|
|
|
|
# AIS vessel type code classification
|
|
# See: https://coast.noaa.gov/data/marinecadastre/ais/VesselTypeCodes2018.pdf
|
|
def classify_vessel(ais_type: int, mmsi: int) -> str:
|
|
"""Classify a vessel by its AIS type code into a rendering category."""
|
|
if 80 <= ais_type <= 89:
|
|
return "tanker" # Oil/Chemical/Gas tankers → RED
|
|
if 70 <= ais_type <= 79:
|
|
return "cargo" # Cargo ships, container vessels → RED
|
|
if 60 <= ais_type <= 69:
|
|
return "passenger" # Cruise ships, ferries → GRAY
|
|
if ais_type in (36, 37):
|
|
return "yacht" # Sailing/Pleasure craft → DARK BLUE
|
|
if ais_type == 35:
|
|
return "military_vessel" # Military → YELLOW
|
|
# MMSI-based military detection: military MMSIs often start with certain prefixes
|
|
mmsi_str = str(mmsi)
|
|
if mmsi_str.startswith("3380") or mmsi_str.startswith("3381"):
|
|
return "military_vessel" # US Navy
|
|
if ais_type in (30, 31, 32, 33, 34):
|
|
return "other" # Fishing, towing, dredging, diving, etc.
|
|
if ais_type in (50, 51, 52, 53, 54, 55, 56, 57, 58, 59):
|
|
return "other" # Pilot, SAR, tug, port tender, etc.
|
|
return "unknown" # Not yet classified — will update when ShipStaticData arrives
|
|
|
|
|
|
# MMSI Maritime Identification Digit (MID) → Country mapping
|
|
# First 3 digits of MMSI (for 9-digit MMSIs) encode the flag state
|
|
MID_COUNTRY = {
|
|
201: "Albania",
|
|
202: "Andorra",
|
|
203: "Austria",
|
|
204: "Portugal",
|
|
205: "Belgium",
|
|
206: "Belarus",
|
|
207: "Bulgaria",
|
|
208: "Vatican",
|
|
209: "Cyprus",
|
|
210: "Cyprus",
|
|
211: "Germany",
|
|
212: "Cyprus",
|
|
213: "Georgia",
|
|
214: "Moldova",
|
|
215: "Malta",
|
|
216: "Armenia",
|
|
218: "Germany",
|
|
219: "Denmark",
|
|
220: "Denmark",
|
|
224: "Spain",
|
|
225: "Spain",
|
|
226: "France",
|
|
227: "France",
|
|
228: "France",
|
|
229: "Malta",
|
|
230: "Finland",
|
|
231: "Faroe Islands",
|
|
232: "United Kingdom",
|
|
233: "United Kingdom",
|
|
234: "United Kingdom",
|
|
235: "United Kingdom",
|
|
236: "Gibraltar",
|
|
237: "Greece",
|
|
238: "Croatia",
|
|
239: "Greece",
|
|
240: "Greece",
|
|
241: "Greece",
|
|
242: "Morocco",
|
|
243: "Hungary",
|
|
244: "Netherlands",
|
|
245: "Netherlands",
|
|
246: "Netherlands",
|
|
247: "Italy",
|
|
248: "Malta",
|
|
249: "Malta",
|
|
250: "Ireland",
|
|
251: "Iceland",
|
|
252: "Liechtenstein",
|
|
253: "Luxembourg",
|
|
254: "Monaco",
|
|
255: "Portugal",
|
|
256: "Malta",
|
|
257: "Norway",
|
|
258: "Norway",
|
|
259: "Norway",
|
|
261: "Poland",
|
|
263: "Portugal",
|
|
264: "Romania",
|
|
265: "Sweden",
|
|
266: "Sweden",
|
|
267: "Slovakia",
|
|
268: "San Marino",
|
|
269: "Switzerland",
|
|
270: "Czech Republic",
|
|
271: "Turkey",
|
|
272: "Ukraine",
|
|
273: "Russia",
|
|
274: "North Macedonia",
|
|
275: "Latvia",
|
|
276: "Estonia",
|
|
277: "Lithuania",
|
|
278: "Slovenia",
|
|
301: "Anguilla",
|
|
303: "Alaska",
|
|
304: "Antigua",
|
|
305: "Antigua",
|
|
306: "Netherlands Antilles",
|
|
307: "Aruba",
|
|
308: "Bahamas",
|
|
309: "Bahamas",
|
|
310: "Bermuda",
|
|
311: "Bahamas",
|
|
312: "Belize",
|
|
314: "Barbados",
|
|
316: "Canada",
|
|
319: "Cayman Islands",
|
|
321: "Costa Rica",
|
|
323: "Cuba",
|
|
325: "Dominica",
|
|
327: "Dominican Republic",
|
|
329: "Guadeloupe",
|
|
330: "Grenada",
|
|
331: "Greenland",
|
|
332: "Guatemala",
|
|
334: "Honduras",
|
|
336: "Haiti",
|
|
338: "United States",
|
|
339: "Jamaica",
|
|
341: "Saint Kitts",
|
|
343: "Saint Lucia",
|
|
345: "Mexico",
|
|
347: "Martinique",
|
|
348: "Montserrat",
|
|
350: "Nicaragua",
|
|
351: "Panama",
|
|
352: "Panama",
|
|
353: "Panama",
|
|
354: "Panama",
|
|
355: "Panama",
|
|
356: "Panama",
|
|
357: "Panama",
|
|
358: "Puerto Rico",
|
|
359: "El Salvador",
|
|
361: "Saint Pierre",
|
|
362: "Trinidad",
|
|
364: "Turks and Caicos",
|
|
366: "United States",
|
|
367: "United States",
|
|
368: "United States",
|
|
369: "United States",
|
|
370: "Panama",
|
|
371: "Panama",
|
|
372: "Panama",
|
|
373: "Panama",
|
|
374: "Panama",
|
|
375: "Saint Vincent",
|
|
376: "Saint Vincent",
|
|
377: "Saint Vincent",
|
|
378: "British Virgin Islands",
|
|
379: "US Virgin Islands",
|
|
401: "Afghanistan",
|
|
403: "Saudi Arabia",
|
|
405: "Bangladesh",
|
|
408: "Bahrain",
|
|
410: "Bhutan",
|
|
412: "China",
|
|
413: "China",
|
|
414: "China",
|
|
416: "Taiwan",
|
|
417: "Sri Lanka",
|
|
419: "India",
|
|
422: "Iran",
|
|
423: "Azerbaijan",
|
|
425: "Iraq",
|
|
428: "Israel",
|
|
431: "Japan",
|
|
432: "Japan",
|
|
434: "Turkmenistan",
|
|
436: "Kazakhstan",
|
|
437: "Uzbekistan",
|
|
438: "Jordan",
|
|
440: "South Korea",
|
|
441: "South Korea",
|
|
443: "Palestine",
|
|
445: "North Korea",
|
|
447: "Kuwait",
|
|
450: "Lebanon",
|
|
451: "Kyrgyzstan",
|
|
453: "Macao",
|
|
455: "Maldives",
|
|
457: "Mongolia",
|
|
459: "Nepal",
|
|
461: "Oman",
|
|
463: "Pakistan",
|
|
466: "Qatar",
|
|
468: "Syria",
|
|
470: "UAE",
|
|
472: "Tajikistan",
|
|
473: "Yemen",
|
|
475: "Tonga",
|
|
477: "Hong Kong",
|
|
478: "Bosnia",
|
|
501: "Antarctica",
|
|
503: "Australia",
|
|
506: "Myanmar",
|
|
508: "Brunei",
|
|
510: "Micronesia",
|
|
511: "Palau",
|
|
512: "New Zealand",
|
|
514: "Cambodia",
|
|
515: "Cambodia",
|
|
516: "Christmas Island",
|
|
518: "Cook Islands",
|
|
520: "Fiji",
|
|
523: "Cocos Islands",
|
|
525: "Indonesia",
|
|
529: "Kiribati",
|
|
531: "Laos",
|
|
533: "Malaysia",
|
|
536: "Northern Mariana Islands",
|
|
538: "Marshall Islands",
|
|
540: "New Caledonia",
|
|
542: "Niue",
|
|
544: "Nauru",
|
|
546: "French Polynesia",
|
|
548: "Philippines",
|
|
553: "Papua New Guinea",
|
|
555: "Pitcairn",
|
|
557: "Solomon Islands",
|
|
559: "American Samoa",
|
|
561: "Samoa",
|
|
563: "Singapore",
|
|
564: "Singapore",
|
|
565: "Singapore",
|
|
566: "Singapore",
|
|
567: "Thailand",
|
|
570: "Tonga",
|
|
572: "Tuvalu",
|
|
574: "Vietnam",
|
|
576: "Vanuatu",
|
|
577: "Vanuatu",
|
|
578: "Wallis and Futuna",
|
|
601: "South Africa",
|
|
603: "Angola",
|
|
605: "Algeria",
|
|
607: "Benin",
|
|
609: "Botswana",
|
|
610: "Burundi",
|
|
611: "Cameroon",
|
|
612: "Cape Verde",
|
|
613: "Central African Republic",
|
|
615: "Congo",
|
|
616: "Comoros",
|
|
617: "DR Congo",
|
|
618: "Ivory Coast",
|
|
619: "Djibouti",
|
|
620: "Egypt",
|
|
621: "Equatorial Guinea",
|
|
622: "Ethiopia",
|
|
624: "Eritrea",
|
|
625: "Gabon",
|
|
626: "Gambia",
|
|
627: "Ghana",
|
|
629: "Guinea",
|
|
630: "Guinea-Bissau",
|
|
631: "Kenya",
|
|
632: "Lesotho",
|
|
633: "Liberia",
|
|
634: "Liberia",
|
|
635: "Liberia",
|
|
636: "Liberia",
|
|
637: "Libya",
|
|
642: "Madagascar",
|
|
644: "Malawi",
|
|
645: "Mali",
|
|
647: "Mauritania",
|
|
649: "Mauritius",
|
|
650: "Mozambique",
|
|
654: "Namibia",
|
|
655: "Niger",
|
|
656: "Nigeria",
|
|
657: "Guinea",
|
|
659: "Rwanda",
|
|
660: "Senegal",
|
|
661: "Sierra Leone",
|
|
662: "Somalia",
|
|
663: "South Africa",
|
|
664: "Sudan",
|
|
667: "Tanzania",
|
|
668: "Togo",
|
|
669: "Tunisia",
|
|
670: "Uganda",
|
|
671: "Egypt",
|
|
672: "Tanzania",
|
|
674: "Zambia",
|
|
675: "Zimbabwe",
|
|
676: "Comoros",
|
|
677: "Tanzania",
|
|
}
|
|
|
|
|
|
def get_country_from_mmsi(mmsi: int) -> str:
|
|
"""Look up flag state from MMSI Maritime Identification Digit."""
|
|
mmsi_str = str(mmsi)
|
|
if len(mmsi_str) == 9:
|
|
mid = int(mmsi_str[:3])
|
|
return MID_COUNTRY.get(mid, "UNKNOWN")
|
|
return "UNKNOWN"
|
|
|
|
|
|
# Global vessel store: MMSI → vessel dict
|
|
_vessels: dict[int, dict] = {}
|
|
_vessel_trails: dict[int, dict] = {}
|
|
_vessels_lock = threading.Lock()
|
|
_ws_thread: threading.Thread | None = None
|
|
_ws_running = False
|
|
_proxy_process = None
|
|
# Issue #258: latest status snapshot emitted by ais_proxy.js. Populated when
|
|
# the proxy reports e.g. {"__ais_proxy_status": {"degraded_tls": true}} on
|
|
# stdout, which it does when it falls back to the SPKI-pinned insecure-date
|
|
# path during an upstream cert outage. Surfaced via ais_proxy_status() for
|
|
# /api/health.
|
|
_proxy_status: dict = {}
|
|
# Upstream-connectivity telemetry (added when stream.aisstream.io went fully
|
|
# offline on 2026-05-23). ``_last_msg_at`` is the unix timestamp of the most
|
|
# recent vessel message received from the proxy. ``_proxy_spawn_count`` is
|
|
# how many times we've started the node proxy; combined with no recent
|
|
# messages it tells us the proxy is respawning in a tight loop because the
|
|
# upstream is unreachable. Surfaced via ais_proxy_status() so the operator
|
|
# can see "AIS is dead" instead of guessing whether it's their map filter,
|
|
# their api key, or upstream.
|
|
_last_msg_at: float = 0.0
|
|
_proxy_spawn_count: int = 0
|
|
_VESSEL_TRAIL_INTERVAL_S = 120
|
|
_VESSEL_TRAIL_MAX_POINTS = 240
|
|
|
|
|
|
# How stale "last vessel message" can be before we consider the stream
|
|
# disconnected. AISStream typically pushes multiple messages/sec, so a 60s
|
|
# gap means something's wrong upstream or in transit.
|
|
_AIS_CONNECTED_FRESHNESS_S = 60
|
|
|
|
|
|
def ais_proxy_status() -> dict:
|
|
"""Return a copy of the latest ais_proxy.js status + connectivity health.
|
|
|
|
Fields:
|
|
* ``degraded_tls`` (bool, issue #258) — true when the proxy is using
|
|
SPKI-pinned fallback because AISStream's cert expired.
|
|
* ``connected`` (bool) — true when we received a vessel message in
|
|
the last ``_AIS_CONNECTED_FRESHNESS_S`` seconds.
|
|
* ``last_msg_age_seconds`` (int | None) — seconds since the last
|
|
vessel message; None if we've never received one.
|
|
* ``proxy_spawn_count`` (int) — how many times we've spawned the
|
|
node proxy. Sustained increases here without ``connected`` means
|
|
we're respawning in a tight loop because upstream is dead.
|
|
|
|
Returns an empty dict when called before the AIS subsystem starts
|
|
(e.g. during tests or when no API key is set).
|
|
"""
|
|
with _vessels_lock:
|
|
status = dict(_proxy_status)
|
|
last = _last_msg_at
|
|
spawns = _proxy_spawn_count
|
|
|
|
now = time.time()
|
|
if last > 0:
|
|
last_age = int(now - last)
|
|
status["last_msg_age_seconds"] = last_age
|
|
status["connected"] = last_age <= _AIS_CONNECTED_FRESHNESS_S
|
|
else:
|
|
status["last_msg_age_seconds"] = None
|
|
status["connected"] = False
|
|
status["proxy_spawn_count"] = spawns
|
|
return status
|
|
|
|
import os
|
|
|
|
CACHE_FILE = os.path.join(os.path.dirname(__file__), "ais_cache.json")
|
|
|
|
|
|
def _record_vessel_trail_locked(mmsi: int, lat, lng, sog=0, now_ts: float | None = None) -> None:
|
|
"""Append a sampled AIS trail point. Caller must hold _vessels_lock."""
|
|
if lat is None or lng is None:
|
|
return
|
|
try:
|
|
lat_f = float(lat)
|
|
lng_f = float(lng)
|
|
except (TypeError, ValueError):
|
|
return
|
|
if abs(lat_f) > 90 or abs(lng_f) > 180 or (lat_f == 0 and lng_f == 0):
|
|
return
|
|
now = now_ts or time.time()
|
|
trail_data = _vessel_trails.setdefault(int(mmsi), {"points": [], "last_seen": now})
|
|
point = [round(lat_f, 5), round(lng_f, 5), round(float(sog or 0), 1), round(now)]
|
|
last_point_ts = trail_data["points"][-1][3] if trail_data["points"] else 0
|
|
if now - last_point_ts < _VESSEL_TRAIL_INTERVAL_S:
|
|
trail_data["last_seen"] = now
|
|
return
|
|
if (
|
|
trail_data["points"]
|
|
and trail_data["points"][-1][0] == point[0]
|
|
and trail_data["points"][-1][1] == point[1]
|
|
):
|
|
trail_data["last_seen"] = now
|
|
return
|
|
trail_data["points"].append(point)
|
|
trail_data["last_seen"] = now
|
|
if len(trail_data["points"]) > _VESSEL_TRAIL_MAX_POINTS:
|
|
trail_data["points"] = trail_data["points"][-_VESSEL_TRAIL_MAX_POINTS:]
|
|
|
|
|
|
def get_vessel_trail(mmsi: int) -> list:
|
|
"""Return the accumulated trail for a single vessel without expanding live payloads."""
|
|
try:
|
|
key = int(mmsi)
|
|
except (TypeError, ValueError):
|
|
return []
|
|
with _vessels_lock:
|
|
points = _vessel_trails.get(key, {}).get("points", [])
|
|
return [list(point) for point in points]
|
|
|
|
|
|
def _save_cache():
|
|
"""Save vessel data to disk for persistence across restarts."""
|
|
try:
|
|
with _vessels_lock:
|
|
# Convert int keys to strings for JSON
|
|
data = {str(k): v for k, v in _vessels.items()}
|
|
with open(CACHE_FILE, "w") as f:
|
|
json.dump(data, f)
|
|
logger.info(f"AIS cache saved: {len(data)} vessels")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Failed to save AIS cache: {e}")
|
|
|
|
|
|
def _load_cache():
|
|
"""Load vessel data from disk on startup."""
|
|
global _vessels
|
|
if not os.path.exists(CACHE_FILE):
|
|
return
|
|
try:
|
|
with open(CACHE_FILE, "r") as f:
|
|
data = json.load(f)
|
|
now = time.time()
|
|
stale_cutoff = now - 3600 # Accept vessels up to 1 hour old on restart
|
|
loaded = 0
|
|
with _vessels_lock:
|
|
for k, v in data.items():
|
|
if v.get("_updated", 0) > stale_cutoff:
|
|
_vessels[int(k)] = v
|
|
loaded += 1
|
|
logger.info(f"AIS cache loaded: {loaded} vessels from disk")
|
|
except (IOError, OSError, json.JSONDecodeError, ValueError) as e:
|
|
logger.error(f"Failed to load AIS cache: {e}")
|
|
|
|
|
|
def prune_stale_vessels():
|
|
"""Remove vessels not updated in the last 15 minutes. Safe to call from a scheduler."""
|
|
now = time.time()
|
|
stale_cutoff = now - 900
|
|
with _vessels_lock:
|
|
stale_keys = [k for k, v in _vessels.items() if v.get("_updated", 0) < stale_cutoff]
|
|
for k in stale_keys:
|
|
del _vessels[k]
|
|
_vessel_trails.pop(k, None)
|
|
if stale_keys:
|
|
logger.info(f"AIS pruned {len(stale_keys)} stale vessels")
|
|
|
|
|
|
def get_ais_vessels() -> list[dict]:
|
|
"""Return a snapshot of tracked AIS vessels, pruning stale."""
|
|
prune_stale_vessels()
|
|
|
|
with _vessels_lock:
|
|
result = []
|
|
for mmsi, v in _vessels.items():
|
|
v_type = v.get("type", "unknown")
|
|
# Skip vessels without valid position
|
|
if not v.get("lat") or not v.get("lng"):
|
|
continue
|
|
|
|
# Sanitize speed: AIS 102.3 kn = "speed not available"
|
|
sog = v.get("sog", 0)
|
|
if sog >= 102.2:
|
|
sog = 0
|
|
|
|
result.append(
|
|
{
|
|
"mmsi": mmsi,
|
|
"name": v.get("name", "UNKNOWN"),
|
|
"type": v_type,
|
|
"lat": round(v.get("lat", 0), 5),
|
|
"lng": round(v.get("lng", 0), 5),
|
|
"heading": v.get("heading", 0),
|
|
"sog": round(sog, 1),
|
|
"cog": round(v.get("cog", 0), 1),
|
|
"callsign": v.get("callsign", ""),
|
|
"destination": v.get("destination", "") or "UNKNOWN",
|
|
"imo": v.get("imo", 0),
|
|
"country": get_country_from_mmsi(mmsi),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
def ingest_ais_catcher(msgs: list[dict]) -> int:
|
|
"""Ingest decoded AIS messages from AIS-catcher HTTP feed.
|
|
Returns number of vessels updated."""
|
|
count = 0
|
|
now = time.time()
|
|
with _vessels_lock:
|
|
for msg in msgs:
|
|
mmsi = msg.get("mmsi")
|
|
if not mmsi or not isinstance(mmsi, int):
|
|
continue
|
|
|
|
vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi})
|
|
msg_type = msg.get("type", 0)
|
|
|
|
# Position reports (types 1, 2, 3 = Class A; 18, 19 = Class B)
|
|
if msg_type in (1, 2, 3, 18, 19):
|
|
lat = msg.get("lat")
|
|
lon = msg.get("lon")
|
|
if lat is not None and lon is not None and lat != 91.0 and lon != 181.0:
|
|
vessel["lat"] = lat
|
|
vessel["lng"] = lon
|
|
# AIS raw value 1023 (102.3 kn) = "speed not available"
|
|
raw_speed = msg.get("speed", 0)
|
|
vessel["sog"] = 0 if raw_speed >= 102.2 else raw_speed
|
|
vessel["cog"] = msg.get("course", 0)
|
|
heading = msg.get("heading", 511)
|
|
vessel["heading"] = heading if heading != 511 else vessel.get("cog", 0)
|
|
vessel["_updated"] = now
|
|
_record_vessel_trail_locked(mmsi, lat, lon, vessel["sog"], now)
|
|
if msg.get("shipname"):
|
|
vessel["name"] = msg["shipname"].strip()
|
|
count += 1
|
|
|
|
# Static data (type 5 = Class A static; 24 = Class B static)
|
|
elif msg_type in (5, 24):
|
|
if msg.get("shipname"):
|
|
vessel["name"] = msg["shipname"].strip()
|
|
if msg.get("callsign"):
|
|
vessel["callsign"] = msg["callsign"].strip()
|
|
if msg.get("imo"):
|
|
vessel["imo"] = msg["imo"]
|
|
if msg.get("destination"):
|
|
vessel["destination"] = msg["destination"].strip().replace("@", "")
|
|
ship_type = msg.get("shiptype", 0)
|
|
if ship_type:
|
|
vessel["ais_type_code"] = ship_type
|
|
vessel["type"] = classify_vessel(ship_type, mmsi)
|
|
vessel["_updated"] = now
|
|
|
|
# Ensure country is set from MMSI MID
|
|
if "country" not in vessel:
|
|
vessel["country"] = get_country_from_mmsi(mmsi)
|
|
|
|
# Ensure name exists
|
|
if "name" not in vessel:
|
|
vessel["name"] = msg.get("shipname", "UNKNOWN") or "UNKNOWN"
|
|
|
|
return count
|
|
|
|
|
|
def _ais_stream_loop():
|
|
"""Main loop: spawn node proxy and process messages from stdout."""
|
|
global _proxy_process
|
|
import subprocess
|
|
import os
|
|
|
|
proxy_script = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ais_proxy.js")
|
|
backoff = 1 # Exponential backoff starting at 1 second
|
|
|
|
if not API_KEY:
|
|
logger.info("AIS_API_KEY not set — ship tracking disabled. Set AIS_API_KEY to enable.")
|
|
return
|
|
|
|
while _ws_running:
|
|
try:
|
|
logger.info("Starting Node.js AIS Stream Proxy...")
|
|
proxy_env = os.environ.copy()
|
|
proxy_env["AIS_API_KEY"] = API_KEY
|
|
popen_kwargs = {}
|
|
if os.name == "nt":
|
|
popen_kwargs["creationflags"] = (
|
|
getattr(subprocess, "CREATE_NO_WINDOW", 0)
|
|
| getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
|
|
)
|
|
process = subprocess.Popen(
|
|
["node", proxy_script],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
env=proxy_env,
|
|
**popen_kwargs,
|
|
)
|
|
global _proxy_spawn_count
|
|
with _vessels_lock:
|
|
_proxy_process = process
|
|
_proxy_spawn_count += 1
|
|
|
|
# Drain stderr in a background thread to prevent deadlock
|
|
import threading
|
|
|
|
def _drain_stderr():
|
|
for errline in iter(process.stderr.readline, ""):
|
|
errline = errline.strip()
|
|
if errline:
|
|
logger.warning(f"AIS proxy stderr: {errline}")
|
|
|
|
threading.Thread(target=_drain_stderr, daemon=True).start()
|
|
|
|
logger.info("AIS Stream proxy started — receiving vessel data")
|
|
|
|
msg_count = 0
|
|
ok_streak = 0 # Track consecutive successful messages for backoff reset
|
|
last_log_time = time.time()
|
|
for raw_msg in iter(process.stdout.readline, ""):
|
|
if not _ws_running:
|
|
process.terminate()
|
|
break
|
|
|
|
raw_msg = raw_msg.strip()
|
|
if not raw_msg:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(raw_msg)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if "error" in data:
|
|
logger.error(f"AIS Stream error: {data['error']}")
|
|
continue
|
|
|
|
# Issue #258: ais_proxy.js emits status markers (e.g.
|
|
# {"__ais_proxy_status": {"degraded_tls": true}}) when the
|
|
# SPKI-pinned fallback is in use. We snapshot the latest
|
|
# status so the backend can expose it on /api/health.
|
|
if isinstance(data, dict) and "__ais_proxy_status" in data:
|
|
status = data.get("__ais_proxy_status") or {}
|
|
if isinstance(status, dict):
|
|
with _vessels_lock:
|
|
_proxy_status.clear()
|
|
_proxy_status.update(status)
|
|
continue
|
|
|
|
msg_type = data.get("MessageType", "")
|
|
metadata = data.get("MetaData", {})
|
|
message = data.get("Message", {})
|
|
|
|
mmsi = metadata.get("MMSI", 0)
|
|
if not mmsi:
|
|
continue
|
|
|
|
# Telemetry: stamp the timestamp of the most recent real
|
|
# vessel message. ais_proxy_status() reads this to decide
|
|
# whether the stream is currently "connected" — i.e. has
|
|
# any data flowed in the last 60s.
|
|
global _last_msg_at
|
|
with _vessels_lock:
|
|
_last_msg_at = time.time()
|
|
if mmsi not in _vessels:
|
|
_vessels[mmsi] = {"_updated": _last_msg_at}
|
|
vessel = _vessels[mmsi]
|
|
|
|
# Update position from PositionReport or StandardClassBPositionReport
|
|
if msg_type in ("PositionReport", "StandardClassBPositionReport"):
|
|
report = message.get(msg_type, {})
|
|
lat = report.get("Latitude", metadata.get("latitude", 0))
|
|
lng = report.get("Longitude", metadata.get("longitude", 0))
|
|
|
|
# Skip invalid positions
|
|
if lat == 0 and lng == 0:
|
|
continue
|
|
if abs(lat) > 90 or abs(lng) > 180:
|
|
continue
|
|
|
|
with _vessels_lock:
|
|
vessel["lat"] = lat
|
|
vessel["lng"] = lng
|
|
# AIS raw value 1023 (102.3 kn) = "speed not available"
|
|
raw_sog = report.get("Sog", 0)
|
|
vessel["sog"] = 0 if raw_sog >= 102.2 else raw_sog
|
|
vessel["cog"] = report.get("Cog", 0)
|
|
heading = report.get("TrueHeading", 511)
|
|
vessel["heading"] = heading if heading != 511 else report.get("Cog", 0)
|
|
now_ts = time.time()
|
|
vessel["_updated"] = now_ts
|
|
_record_vessel_trail_locked(mmsi, lat, lng, vessel["sog"], now_ts)
|
|
# Use metadata name if we don't have one yet
|
|
if not vessel.get("name") or vessel["name"] == "UNKNOWN":
|
|
vessel["name"] = (
|
|
metadata.get("ShipName", "UNKNOWN").strip() or "UNKNOWN"
|
|
)
|
|
|
|
# Update static data from ShipStaticData
|
|
elif msg_type == "ShipStaticData":
|
|
static = message.get("ShipStaticData", {})
|
|
ais_type = static.get("Type", 0)
|
|
|
|
with _vessels_lock:
|
|
vessel["name"] = (
|
|
static.get("Name", "") or metadata.get("ShipName", "UNKNOWN")
|
|
).strip() or "UNKNOWN"
|
|
vessel["callsign"] = (static.get("CallSign", "") or "").strip()
|
|
vessel["imo"] = static.get("ImoNumber", 0)
|
|
vessel["destination"] = (
|
|
(static.get("Destination", "") or "").strip().replace("@", "")
|
|
)
|
|
vessel["ais_type_code"] = ais_type
|
|
vessel["type"] = classify_vessel(ais_type, mmsi)
|
|
vessel["_updated"] = time.time()
|
|
|
|
msg_count += 1
|
|
ok_streak += 1
|
|
|
|
# Reset backoff after 200 consecutive successful messages
|
|
if ok_streak >= 200 and backoff > 1:
|
|
backoff = 1
|
|
ok_streak = 0
|
|
|
|
# Periodic logging + cache save (time-based instead of count-based to avoid lock in hot loop)
|
|
now = time.time()
|
|
if now - last_log_time >= 60:
|
|
with _vessels_lock:
|
|
count = len(_vessels)
|
|
logger.info(
|
|
f"AIS Stream: processed {msg_count} messages, tracking {count} vessels"
|
|
)
|
|
_save_cache()
|
|
last_log_time = now
|
|
|
|
except (ConnectionError, TimeoutError, OSError, ValueError, KeyError) as e:
|
|
logger.error(f"AIS proxy connection error: {e}")
|
|
if _ws_running:
|
|
logger.info(f"Restarting AIS proxy in {backoff}s (exponential backoff)...")
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 60) # Double up to 60s max
|
|
continue
|
|
|
|
|
|
def _run_ais_loop():
|
|
"""Thread target: run the AIS loop."""
|
|
global _ws_running, _ws_thread, _proxy_process
|
|
try:
|
|
_ais_stream_loop()
|
|
except Exception as e:
|
|
logger.error(f"AIS Stream thread crashed: {e}")
|
|
finally:
|
|
with _vessels_lock:
|
|
_ws_running = False
|
|
_ws_thread = None
|
|
_proxy_process = None
|
|
|
|
|
|
def start_ais_stream():
|
|
"""Start the AIS WebSocket stream in a background thread."""
|
|
global _ws_thread, _ws_running
|
|
|
|
# Always load cached vessel data first so the ships layer can paint even
|
|
# when live streaming is disabled or the upstream is unavailable.
|
|
_load_cache()
|
|
|
|
if not API_KEY:
|
|
logger.info("AIS_API_KEY not set — ship tracking disabled. Set AIS_API_KEY to enable.")
|
|
return
|
|
|
|
if not ais_stream_proxy_enabled():
|
|
logger.info(
|
|
"AIS live stream proxy disabled for this runtime; using cached AIS vessels. "
|
|
"Set SHADOWBROKER_ENABLE_AIS_STREAM_PROXY=1 to opt in."
|
|
)
|
|
return
|
|
|
|
with _vessels_lock:
|
|
if _ws_running:
|
|
logger.info("AIS Stream already running")
|
|
return
|
|
_ws_running = True
|
|
existing_thread = _ws_thread
|
|
if existing_thread and existing_thread.is_alive():
|
|
logger.info("AIS Stream already running")
|
|
return
|
|
|
|
_ws_thread = threading.Thread(target=_run_ais_loop, daemon=True, name="ais-stream")
|
|
_ws_thread.start()
|
|
logger.info("AIS Stream background thread started")
|
|
|
|
|
|
def stop_ais_stream():
|
|
"""Stop the AIS WebSocket stream and save cache."""
|
|
global _ws_running, _ws_thread, _proxy_process
|
|
with _vessels_lock:
|
|
_ws_running = False
|
|
_ws_thread = None
|
|
proc = _proxy_process
|
|
_proxy_process = None
|
|
|
|
if proc and proc.stdin:
|
|
try:
|
|
proc.stdin.close()
|
|
except Exception:
|
|
pass
|
|
|
|
_save_cache() # Save on shutdown
|
|
logger.info("AIS Stream stopping...")
|
|
|
|
|
|
def update_ais_bbox(south: float, west: float, north: float, east: float):
|
|
"""Dynamically update the AIS stream bounding box via proxy stdin."""
|
|
with _vessels_lock:
|
|
proc = _proxy_process
|
|
if not proc or not proc.stdin:
|
|
return
|
|
|
|
try:
|
|
cmd = json.dumps({"type": "update_bbox", "bboxes": [[[south, west], [north, east]]]})
|
|
proc.stdin.write(cmd + "\n")
|
|
proc.stdin.flush()
|
|
logger.info(
|
|
f"Updated AIS bounding box to: S:{south:.2f} W:{west:.2f} N:{north:.2f} E:{east:.2f}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to update AIS bbox: {e}")
|