Files
anoracleofra-code 81c90a9faf fix: stop AIS proxy crash-loop when API key is not set
Exit early from _ais_stream_loop() if AIS_API_KEY is empty instead of
endlessly spawning the Node proxy which immediately prints FATAL and
exits. This was flooding docker logs with hundreds of lines per minute.
2026-03-26 10:53:30 -06:00

702 lines
21 KiB
Python

"""
AIS Stream WebSocket client for real-time maritime vessel tracking.
Connects to aisstream.io and maintains a live dictionary of global vessel positions.
"""
import asyncio
import json
import logging
import threading
import time
from datetime import datetime, timezone
import os
logger = logging.getLogger(__name__)
AIS_WS_URL = "wss://stream.aisstream.io/v0/stream"
API_KEY = os.environ.get("AIS_API_KEY", "")
# AIS vessel type code classification
# See: https://coast.noaa.gov/data/marinecadastre/ais/VesselTypeCodes2018.pdf
def classify_vessel(ais_type: int, mmsi: int) -> str:
"""Classify a vessel by its AIS type code into a rendering category."""
if 80 <= ais_type <= 89:
return "tanker" # Oil/Chemical/Gas tankers → RED
if 70 <= ais_type <= 79:
return "cargo" # Cargo ships, container vessels → RED
if 60 <= ais_type <= 69:
return "passenger" # Cruise ships, ferries → GRAY
if ais_type in (36, 37):
return "yacht" # Sailing/Pleasure craft → DARK BLUE
if ais_type == 35:
return "military_vessel" # Military → YELLOW
# MMSI-based military detection: military MMSIs often start with certain prefixes
mmsi_str = str(mmsi)
if mmsi_str.startswith("3380") or mmsi_str.startswith("3381"):
return "military_vessel" # US Navy
if ais_type in (30, 31, 32, 33, 34):
return "other" # Fishing, towing, dredging, diving, etc.
if ais_type in (50, 51, 52, 53, 54, 55, 56, 57, 58, 59):
return "other" # Pilot, SAR, tug, port tender, etc.
return "unknown" # Not yet classified — will update when ShipStaticData arrives
# MMSI Maritime Identification Digit (MID) → Country mapping
# First 3 digits of MMSI (for 9-digit MMSIs) encode the flag state
MID_COUNTRY = {
201: "Albania",
202: "Andorra",
203: "Austria",
204: "Portugal",
205: "Belgium",
206: "Belarus",
207: "Bulgaria",
208: "Vatican",
209: "Cyprus",
210: "Cyprus",
211: "Germany",
212: "Cyprus",
213: "Georgia",
214: "Moldova",
215: "Malta",
216: "Armenia",
218: "Germany",
219: "Denmark",
220: "Denmark",
224: "Spain",
225: "Spain",
226: "France",
227: "France",
228: "France",
229: "Malta",
230: "Finland",
231: "Faroe Islands",
232: "United Kingdom",
233: "United Kingdom",
234: "United Kingdom",
235: "United Kingdom",
236: "Gibraltar",
237: "Greece",
238: "Croatia",
239: "Greece",
240: "Greece",
241: "Greece",
242: "Morocco",
243: "Hungary",
244: "Netherlands",
245: "Netherlands",
246: "Netherlands",
247: "Italy",
248: "Malta",
249: "Malta",
250: "Ireland",
251: "Iceland",
252: "Liechtenstein",
253: "Luxembourg",
254: "Monaco",
255: "Portugal",
256: "Malta",
257: "Norway",
258: "Norway",
259: "Norway",
261: "Poland",
263: "Portugal",
264: "Romania",
265: "Sweden",
266: "Sweden",
267: "Slovakia",
268: "San Marino",
269: "Switzerland",
270: "Czech Republic",
271: "Turkey",
272: "Ukraine",
273: "Russia",
274: "North Macedonia",
275: "Latvia",
276: "Estonia",
277: "Lithuania",
278: "Slovenia",
301: "Anguilla",
303: "Alaska",
304: "Antigua",
305: "Antigua",
306: "Netherlands Antilles",
307: "Aruba",
308: "Bahamas",
309: "Bahamas",
310: "Bermuda",
311: "Bahamas",
312: "Belize",
314: "Barbados",
316: "Canada",
319: "Cayman Islands",
321: "Costa Rica",
323: "Cuba",
325: "Dominica",
327: "Dominican Republic",
329: "Guadeloupe",
330: "Grenada",
331: "Greenland",
332: "Guatemala",
334: "Honduras",
336: "Haiti",
338: "United States",
339: "Jamaica",
341: "Saint Kitts",
343: "Saint Lucia",
345: "Mexico",
347: "Martinique",
348: "Montserrat",
350: "Nicaragua",
351: "Panama",
352: "Panama",
353: "Panama",
354: "Panama",
355: "Panama",
356: "Panama",
357: "Panama",
358: "Puerto Rico",
359: "El Salvador",
361: "Saint Pierre",
362: "Trinidad",
364: "Turks and Caicos",
366: "United States",
367: "United States",
368: "United States",
369: "United States",
370: "Panama",
371: "Panama",
372: "Panama",
373: "Panama",
374: "Panama",
375: "Saint Vincent",
376: "Saint Vincent",
377: "Saint Vincent",
378: "British Virgin Islands",
379: "US Virgin Islands",
401: "Afghanistan",
403: "Saudi Arabia",
405: "Bangladesh",
408: "Bahrain",
410: "Bhutan",
412: "China",
413: "China",
414: "China",
416: "Taiwan",
417: "Sri Lanka",
419: "India",
422: "Iran",
423: "Azerbaijan",
425: "Iraq",
428: "Israel",
431: "Japan",
432: "Japan",
434: "Turkmenistan",
436: "Kazakhstan",
437: "Uzbekistan",
438: "Jordan",
440: "South Korea",
441: "South Korea",
443: "Palestine",
445: "North Korea",
447: "Kuwait",
450: "Lebanon",
451: "Kyrgyzstan",
453: "Macao",
455: "Maldives",
457: "Mongolia",
459: "Nepal",
461: "Oman",
463: "Pakistan",
466: "Qatar",
468: "Syria",
470: "UAE",
472: "Tajikistan",
473: "Yemen",
475: "Tonga",
477: "Hong Kong",
478: "Bosnia",
501: "Antarctica",
503: "Australia",
506: "Myanmar",
508: "Brunei",
510: "Micronesia",
511: "Palau",
512: "New Zealand",
514: "Cambodia",
515: "Cambodia",
516: "Christmas Island",
518: "Cook Islands",
520: "Fiji",
523: "Cocos Islands",
525: "Indonesia",
529: "Kiribati",
531: "Laos",
533: "Malaysia",
536: "Northern Mariana Islands",
538: "Marshall Islands",
540: "New Caledonia",
542: "Niue",
544: "Nauru",
546: "French Polynesia",
548: "Philippines",
553: "Papua New Guinea",
555: "Pitcairn",
557: "Solomon Islands",
559: "American Samoa",
561: "Samoa",
563: "Singapore",
564: "Singapore",
565: "Singapore",
566: "Singapore",
567: "Thailand",
570: "Tonga",
572: "Tuvalu",
574: "Vietnam",
576: "Vanuatu",
577: "Vanuatu",
578: "Wallis and Futuna",
601: "South Africa",
603: "Angola",
605: "Algeria",
607: "Benin",
609: "Botswana",
610: "Burundi",
611: "Cameroon",
612: "Cape Verde",
613: "Central African Republic",
615: "Congo",
616: "Comoros",
617: "DR Congo",
618: "Ivory Coast",
619: "Djibouti",
620: "Egypt",
621: "Equatorial Guinea",
622: "Ethiopia",
624: "Eritrea",
625: "Gabon",
626: "Gambia",
627: "Ghana",
629: "Guinea",
630: "Guinea-Bissau",
631: "Kenya",
632: "Lesotho",
633: "Liberia",
634: "Liberia",
635: "Liberia",
636: "Liberia",
637: "Libya",
642: "Madagascar",
644: "Malawi",
645: "Mali",
647: "Mauritania",
649: "Mauritius",
650: "Mozambique",
654: "Namibia",
655: "Niger",
656: "Nigeria",
657: "Guinea",
659: "Rwanda",
660: "Senegal",
661: "Sierra Leone",
662: "Somalia",
663: "South Africa",
664: "Sudan",
667: "Tanzania",
668: "Togo",
669: "Tunisia",
670: "Uganda",
671: "Egypt",
672: "Tanzania",
674: "Zambia",
675: "Zimbabwe",
676: "Comoros",
677: "Tanzania",
}
def get_country_from_mmsi(mmsi: int) -> str:
"""Look up flag state from MMSI Maritime Identification Digit."""
mmsi_str = str(mmsi)
if len(mmsi_str) == 9:
mid = int(mmsi_str[:3])
return MID_COUNTRY.get(mid, "UNKNOWN")
return "UNKNOWN"
# Global vessel store: MMSI → vessel dict
_vessels: dict[int, dict] = {}
_vessels_lock = threading.Lock()
_ws_thread: threading.Thread | None = None
_ws_running = False
_proxy_process = None
import os
CACHE_FILE = os.path.join(os.path.dirname(__file__), "ais_cache.json")
def _save_cache():
"""Save vessel data to disk for persistence across restarts."""
try:
with _vessels_lock:
# Convert int keys to strings for JSON
data = {str(k): v for k, v in _vessels.items()}
with open(CACHE_FILE, "w") as f:
json.dump(data, f)
logger.info(f"AIS cache saved: {len(data)} vessels")
except (IOError, OSError) as e:
logger.error(f"Failed to save AIS cache: {e}")
def _load_cache():
"""Load vessel data from disk on startup."""
global _vessels
if not os.path.exists(CACHE_FILE):
return
try:
with open(CACHE_FILE, "r") as f:
data = json.load(f)
now = time.time()
stale_cutoff = now - 3600 # Accept vessels up to 1 hour old on restart
loaded = 0
with _vessels_lock:
for k, v in data.items():
if v.get("_updated", 0) > stale_cutoff:
_vessels[int(k)] = v
loaded += 1
logger.info(f"AIS cache loaded: {loaded} vessels from disk")
except (IOError, OSError, json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to load AIS cache: {e}")
def prune_stale_vessels():
"""Remove vessels not updated in the last 15 minutes. Safe to call from a scheduler."""
now = time.time()
stale_cutoff = now - 900
with _vessels_lock:
stale_keys = [k for k, v in _vessels.items() if v.get("_updated", 0) < stale_cutoff]
for k in stale_keys:
del _vessels[k]
if stale_keys:
logger.info(f"AIS pruned {len(stale_keys)} stale vessels")
def get_ais_vessels() -> list[dict]:
"""Return a snapshot of tracked AIS vessels, pruning stale."""
prune_stale_vessels()
with _vessels_lock:
result = []
for mmsi, v in _vessels.items():
v_type = v.get("type", "unknown")
# Skip vessels without valid position
if not v.get("lat") or not v.get("lng"):
continue
# Sanitize speed: AIS 102.3 kn = "speed not available"
sog = v.get("sog", 0)
if sog >= 102.2:
sog = 0
result.append(
{
"mmsi": mmsi,
"name": v.get("name", "UNKNOWN"),
"type": v_type,
"lat": round(v.get("lat", 0), 5),
"lng": round(v.get("lng", 0), 5),
"heading": v.get("heading", 0),
"sog": round(sog, 1),
"cog": round(v.get("cog", 0), 1),
"callsign": v.get("callsign", ""),
"destination": v.get("destination", "") or "UNKNOWN",
"imo": v.get("imo", 0),
"country": get_country_from_mmsi(mmsi),
}
)
return result
def ingest_ais_catcher(msgs: list[dict]) -> int:
"""Ingest decoded AIS messages from AIS-catcher HTTP feed.
Returns number of vessels updated."""
count = 0
now = time.time()
with _vessels_lock:
for msg in msgs:
mmsi = msg.get("mmsi")
if not mmsi or not isinstance(mmsi, int):
continue
vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi})
msg_type = msg.get("type", 0)
# Position reports (types 1, 2, 3 = Class A; 18, 19 = Class B)
if msg_type in (1, 2, 3, 18, 19):
lat = msg.get("lat")
lon = msg.get("lon")
if lat is not None and lon is not None and lat != 91.0 and lon != 181.0:
vessel["lat"] = lat
vessel["lng"] = lon
# AIS raw value 1023 (102.3 kn) = "speed not available"
raw_speed = msg.get("speed", 0)
vessel["sog"] = 0 if raw_speed >= 102.2 else raw_speed
vessel["cog"] = msg.get("course", 0)
heading = msg.get("heading", 511)
vessel["heading"] = heading if heading != 511 else vessel.get("cog", 0)
vessel["_updated"] = now
if msg.get("shipname"):
vessel["name"] = msg["shipname"].strip()
count += 1
# Static data (type 5 = Class A static; 24 = Class B static)
elif msg_type in (5, 24):
if msg.get("shipname"):
vessel["name"] = msg["shipname"].strip()
if msg.get("callsign"):
vessel["callsign"] = msg["callsign"].strip()
if msg.get("imo"):
vessel["imo"] = msg["imo"]
if msg.get("destination"):
vessel["destination"] = msg["destination"].strip().replace("@", "")
ship_type = msg.get("shiptype", 0)
if ship_type:
vessel["ais_type_code"] = ship_type
vessel["type"] = classify_vessel(ship_type, mmsi)
vessel["_updated"] = now
# Ensure country is set from MMSI MID
if "country" not in vessel:
vessel["country"] = get_country_from_mmsi(mmsi)
# Ensure name exists
if "name" not in vessel:
vessel["name"] = msg.get("shipname", "UNKNOWN") or "UNKNOWN"
return count
def _ais_stream_loop():
"""Main loop: spawn node proxy and process messages from stdout."""
global _proxy_process
import subprocess
import os
proxy_script = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ais_proxy.js")
backoff = 1 # Exponential backoff starting at 1 second
if not API_KEY:
logger.info("AIS_API_KEY not set — ship tracking disabled. Set AIS_API_KEY to enable.")
return
while _ws_running:
try:
logger.info("Starting Node.js AIS Stream Proxy...")
proxy_env = os.environ.copy()
proxy_env["AIS_API_KEY"] = API_KEY
process = subprocess.Popen(
["node", proxy_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
env=proxy_env,
)
with _vessels_lock:
_proxy_process = process
# Drain stderr in a background thread to prevent deadlock
import threading
def _drain_stderr():
for errline in iter(process.stderr.readline, ""):
errline = errline.strip()
if errline:
logger.warning(f"AIS proxy stderr: {errline}")
threading.Thread(target=_drain_stderr, daemon=True).start()
logger.info("AIS Stream proxy started — receiving vessel data")
msg_count = 0
ok_streak = 0 # Track consecutive successful messages for backoff reset
last_log_time = time.time()
for raw_msg in iter(process.stdout.readline, ""):
if not _ws_running:
process.terminate()
break
raw_msg = raw_msg.strip()
if not raw_msg:
continue
try:
data = json.loads(raw_msg)
except json.JSONDecodeError:
continue
if "error" in data:
logger.error(f"AIS Stream error: {data['error']}")
continue
msg_type = data.get("MessageType", "")
metadata = data.get("MetaData", {})
message = data.get("Message", {})
mmsi = metadata.get("MMSI", 0)
if not mmsi:
continue
with _vessels_lock:
if mmsi not in _vessels:
_vessels[mmsi] = {"_updated": time.time()}
vessel = _vessels[mmsi]
# Update position from PositionReport or StandardClassBPositionReport
if msg_type in ("PositionReport", "StandardClassBPositionReport"):
report = message.get(msg_type, {})
lat = report.get("Latitude", metadata.get("latitude", 0))
lng = report.get("Longitude", metadata.get("longitude", 0))
# Skip invalid positions
if lat == 0 and lng == 0:
continue
if abs(lat) > 90 or abs(lng) > 180:
continue
with _vessels_lock:
vessel["lat"] = lat
vessel["lng"] = lng
# AIS raw value 1023 (102.3 kn) = "speed not available"
raw_sog = report.get("Sog", 0)
vessel["sog"] = 0 if raw_sog >= 102.2 else raw_sog
vessel["cog"] = report.get("Cog", 0)
heading = report.get("TrueHeading", 511)
vessel["heading"] = heading if heading != 511 else report.get("Cog", 0)
vessel["_updated"] = time.time()
# Use metadata name if we don't have one yet
if not vessel.get("name") or vessel["name"] == "UNKNOWN":
vessel["name"] = (
metadata.get("ShipName", "UNKNOWN").strip() or "UNKNOWN"
)
# Update static data from ShipStaticData
elif msg_type == "ShipStaticData":
static = message.get("ShipStaticData", {})
ais_type = static.get("Type", 0)
with _vessels_lock:
vessel["name"] = (
static.get("Name", "") or metadata.get("ShipName", "UNKNOWN")
).strip() or "UNKNOWN"
vessel["callsign"] = (static.get("CallSign", "") or "").strip()
vessel["imo"] = static.get("ImoNumber", 0)
vessel["destination"] = (
(static.get("Destination", "") or "").strip().replace("@", "")
)
vessel["ais_type_code"] = ais_type
vessel["type"] = classify_vessel(ais_type, mmsi)
vessel["_updated"] = time.time()
msg_count += 1
ok_streak += 1
# Reset backoff after 200 consecutive successful messages
if ok_streak >= 200 and backoff > 1:
backoff = 1
ok_streak = 0
# Periodic logging + cache save (time-based instead of count-based to avoid lock in hot loop)
now = time.time()
if now - last_log_time >= 60:
with _vessels_lock:
count = len(_vessels)
logger.info(
f"AIS Stream: processed {msg_count} messages, tracking {count} vessels"
)
_save_cache()
last_log_time = now
except (ConnectionError, TimeoutError, OSError, ValueError, KeyError) as e:
logger.error(f"AIS proxy connection error: {e}")
if _ws_running:
logger.info(f"Restarting AIS proxy in {backoff}s (exponential backoff)...")
time.sleep(backoff)
backoff = min(backoff * 2, 60) # Double up to 60s max
continue
def _run_ais_loop():
"""Thread target: run the AIS loop."""
global _ws_running, _ws_thread, _proxy_process
try:
_ais_stream_loop()
except Exception as e:
logger.error(f"AIS Stream thread crashed: {e}")
finally:
with _vessels_lock:
_ws_running = False
_ws_thread = None
_proxy_process = None
def start_ais_stream():
"""Start the AIS WebSocket stream in a background thread."""
global _ws_thread, _ws_running
with _vessels_lock:
if _ws_running:
logger.info("AIS Stream already running")
return
_ws_running = True
existing_thread = _ws_thread
if existing_thread and existing_thread.is_alive():
logger.info("AIS Stream already running")
return
# Load cached vessel data from disk
_load_cache()
_ws_thread = threading.Thread(target=_run_ais_loop, daemon=True, name="ais-stream")
_ws_thread.start()
logger.info("AIS Stream background thread started")
def stop_ais_stream():
"""Stop the AIS WebSocket stream and save cache."""
global _ws_running, _ws_thread, _proxy_process
with _vessels_lock:
_ws_running = False
_ws_thread = None
proc = _proxy_process
_proxy_process = None
if proc and proc.stdin:
try:
proc.stdin.close()
except Exception:
pass
_save_cache() # Save on shutdown
logger.info("AIS Stream stopping...")
def update_ais_bbox(south: float, west: float, north: float, east: float):
"""Dynamically update the AIS stream bounding box via proxy stdin."""
with _vessels_lock:
proc = _proxy_process
if not proc or not proc.stdin:
return
try:
cmd = json.dumps({"type": "update_bbox", "bboxes": [[[south, west], [north, east]]]})
proc.stdin.write(cmd + "\n")
proc.stdin.flush()
logger.info(
f"Updated AIS bounding box to: S:{south:.2f} W:{west:.2f} N:{north:.2f} E:{east:.2f}"
)
except Exception as e:
logger.error(f"Failed to update AIS bbox: {e}")