mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-04-27 13:05:59 +02:00
81c90a9faf
Exit early from _ais_stream_loop() if AIS_API_KEY is empty instead of endlessly spawning the Node proxy which immediately prints FATAL and exits. This was flooding docker logs with hundreds of lines per minute.
702 lines
21 KiB
Python
702 lines
21 KiB
Python
"""
|
|
AIS Stream WebSocket client for real-time maritime vessel tracking.
|
|
Connects to aisstream.io and maintains a live dictionary of global vessel positions.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import threading
|
|
import time
|
|
from datetime import datetime, timezone
|
|
import os
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
AIS_WS_URL = "wss://stream.aisstream.io/v0/stream"
|
|
API_KEY = os.environ.get("AIS_API_KEY", "")
|
|
|
|
|
|
# AIS vessel type code classification
|
|
# See: https://coast.noaa.gov/data/marinecadastre/ais/VesselTypeCodes2018.pdf
|
|
def classify_vessel(ais_type: int, mmsi: int) -> str:
|
|
"""Classify a vessel by its AIS type code into a rendering category."""
|
|
if 80 <= ais_type <= 89:
|
|
return "tanker" # Oil/Chemical/Gas tankers → RED
|
|
if 70 <= ais_type <= 79:
|
|
return "cargo" # Cargo ships, container vessels → RED
|
|
if 60 <= ais_type <= 69:
|
|
return "passenger" # Cruise ships, ferries → GRAY
|
|
if ais_type in (36, 37):
|
|
return "yacht" # Sailing/Pleasure craft → DARK BLUE
|
|
if ais_type == 35:
|
|
return "military_vessel" # Military → YELLOW
|
|
# MMSI-based military detection: military MMSIs often start with certain prefixes
|
|
mmsi_str = str(mmsi)
|
|
if mmsi_str.startswith("3380") or mmsi_str.startswith("3381"):
|
|
return "military_vessel" # US Navy
|
|
if ais_type in (30, 31, 32, 33, 34):
|
|
return "other" # Fishing, towing, dredging, diving, etc.
|
|
if ais_type in (50, 51, 52, 53, 54, 55, 56, 57, 58, 59):
|
|
return "other" # Pilot, SAR, tug, port tender, etc.
|
|
return "unknown" # Not yet classified — will update when ShipStaticData arrives
|
|
|
|
|
|
# MMSI Maritime Identification Digit (MID) → Country mapping
|
|
# First 3 digits of MMSI (for 9-digit MMSIs) encode the flag state
|
|
MID_COUNTRY = {
|
|
201: "Albania",
|
|
202: "Andorra",
|
|
203: "Austria",
|
|
204: "Portugal",
|
|
205: "Belgium",
|
|
206: "Belarus",
|
|
207: "Bulgaria",
|
|
208: "Vatican",
|
|
209: "Cyprus",
|
|
210: "Cyprus",
|
|
211: "Germany",
|
|
212: "Cyprus",
|
|
213: "Georgia",
|
|
214: "Moldova",
|
|
215: "Malta",
|
|
216: "Armenia",
|
|
218: "Germany",
|
|
219: "Denmark",
|
|
220: "Denmark",
|
|
224: "Spain",
|
|
225: "Spain",
|
|
226: "France",
|
|
227: "France",
|
|
228: "France",
|
|
229: "Malta",
|
|
230: "Finland",
|
|
231: "Faroe Islands",
|
|
232: "United Kingdom",
|
|
233: "United Kingdom",
|
|
234: "United Kingdom",
|
|
235: "United Kingdom",
|
|
236: "Gibraltar",
|
|
237: "Greece",
|
|
238: "Croatia",
|
|
239: "Greece",
|
|
240: "Greece",
|
|
241: "Greece",
|
|
242: "Morocco",
|
|
243: "Hungary",
|
|
244: "Netherlands",
|
|
245: "Netherlands",
|
|
246: "Netherlands",
|
|
247: "Italy",
|
|
248: "Malta",
|
|
249: "Malta",
|
|
250: "Ireland",
|
|
251: "Iceland",
|
|
252: "Liechtenstein",
|
|
253: "Luxembourg",
|
|
254: "Monaco",
|
|
255: "Portugal",
|
|
256: "Malta",
|
|
257: "Norway",
|
|
258: "Norway",
|
|
259: "Norway",
|
|
261: "Poland",
|
|
263: "Portugal",
|
|
264: "Romania",
|
|
265: "Sweden",
|
|
266: "Sweden",
|
|
267: "Slovakia",
|
|
268: "San Marino",
|
|
269: "Switzerland",
|
|
270: "Czech Republic",
|
|
271: "Turkey",
|
|
272: "Ukraine",
|
|
273: "Russia",
|
|
274: "North Macedonia",
|
|
275: "Latvia",
|
|
276: "Estonia",
|
|
277: "Lithuania",
|
|
278: "Slovenia",
|
|
301: "Anguilla",
|
|
303: "Alaska",
|
|
304: "Antigua",
|
|
305: "Antigua",
|
|
306: "Netherlands Antilles",
|
|
307: "Aruba",
|
|
308: "Bahamas",
|
|
309: "Bahamas",
|
|
310: "Bermuda",
|
|
311: "Bahamas",
|
|
312: "Belize",
|
|
314: "Barbados",
|
|
316: "Canada",
|
|
319: "Cayman Islands",
|
|
321: "Costa Rica",
|
|
323: "Cuba",
|
|
325: "Dominica",
|
|
327: "Dominican Republic",
|
|
329: "Guadeloupe",
|
|
330: "Grenada",
|
|
331: "Greenland",
|
|
332: "Guatemala",
|
|
334: "Honduras",
|
|
336: "Haiti",
|
|
338: "United States",
|
|
339: "Jamaica",
|
|
341: "Saint Kitts",
|
|
343: "Saint Lucia",
|
|
345: "Mexico",
|
|
347: "Martinique",
|
|
348: "Montserrat",
|
|
350: "Nicaragua",
|
|
351: "Panama",
|
|
352: "Panama",
|
|
353: "Panama",
|
|
354: "Panama",
|
|
355: "Panama",
|
|
356: "Panama",
|
|
357: "Panama",
|
|
358: "Puerto Rico",
|
|
359: "El Salvador",
|
|
361: "Saint Pierre",
|
|
362: "Trinidad",
|
|
364: "Turks and Caicos",
|
|
366: "United States",
|
|
367: "United States",
|
|
368: "United States",
|
|
369: "United States",
|
|
370: "Panama",
|
|
371: "Panama",
|
|
372: "Panama",
|
|
373: "Panama",
|
|
374: "Panama",
|
|
375: "Saint Vincent",
|
|
376: "Saint Vincent",
|
|
377: "Saint Vincent",
|
|
378: "British Virgin Islands",
|
|
379: "US Virgin Islands",
|
|
401: "Afghanistan",
|
|
403: "Saudi Arabia",
|
|
405: "Bangladesh",
|
|
408: "Bahrain",
|
|
410: "Bhutan",
|
|
412: "China",
|
|
413: "China",
|
|
414: "China",
|
|
416: "Taiwan",
|
|
417: "Sri Lanka",
|
|
419: "India",
|
|
422: "Iran",
|
|
423: "Azerbaijan",
|
|
425: "Iraq",
|
|
428: "Israel",
|
|
431: "Japan",
|
|
432: "Japan",
|
|
434: "Turkmenistan",
|
|
436: "Kazakhstan",
|
|
437: "Uzbekistan",
|
|
438: "Jordan",
|
|
440: "South Korea",
|
|
441: "South Korea",
|
|
443: "Palestine",
|
|
445: "North Korea",
|
|
447: "Kuwait",
|
|
450: "Lebanon",
|
|
451: "Kyrgyzstan",
|
|
453: "Macao",
|
|
455: "Maldives",
|
|
457: "Mongolia",
|
|
459: "Nepal",
|
|
461: "Oman",
|
|
463: "Pakistan",
|
|
466: "Qatar",
|
|
468: "Syria",
|
|
470: "UAE",
|
|
472: "Tajikistan",
|
|
473: "Yemen",
|
|
475: "Tonga",
|
|
477: "Hong Kong",
|
|
478: "Bosnia",
|
|
501: "Antarctica",
|
|
503: "Australia",
|
|
506: "Myanmar",
|
|
508: "Brunei",
|
|
510: "Micronesia",
|
|
511: "Palau",
|
|
512: "New Zealand",
|
|
514: "Cambodia",
|
|
515: "Cambodia",
|
|
516: "Christmas Island",
|
|
518: "Cook Islands",
|
|
520: "Fiji",
|
|
523: "Cocos Islands",
|
|
525: "Indonesia",
|
|
529: "Kiribati",
|
|
531: "Laos",
|
|
533: "Malaysia",
|
|
536: "Northern Mariana Islands",
|
|
538: "Marshall Islands",
|
|
540: "New Caledonia",
|
|
542: "Niue",
|
|
544: "Nauru",
|
|
546: "French Polynesia",
|
|
548: "Philippines",
|
|
553: "Papua New Guinea",
|
|
555: "Pitcairn",
|
|
557: "Solomon Islands",
|
|
559: "American Samoa",
|
|
561: "Samoa",
|
|
563: "Singapore",
|
|
564: "Singapore",
|
|
565: "Singapore",
|
|
566: "Singapore",
|
|
567: "Thailand",
|
|
570: "Tonga",
|
|
572: "Tuvalu",
|
|
574: "Vietnam",
|
|
576: "Vanuatu",
|
|
577: "Vanuatu",
|
|
578: "Wallis and Futuna",
|
|
601: "South Africa",
|
|
603: "Angola",
|
|
605: "Algeria",
|
|
607: "Benin",
|
|
609: "Botswana",
|
|
610: "Burundi",
|
|
611: "Cameroon",
|
|
612: "Cape Verde",
|
|
613: "Central African Republic",
|
|
615: "Congo",
|
|
616: "Comoros",
|
|
617: "DR Congo",
|
|
618: "Ivory Coast",
|
|
619: "Djibouti",
|
|
620: "Egypt",
|
|
621: "Equatorial Guinea",
|
|
622: "Ethiopia",
|
|
624: "Eritrea",
|
|
625: "Gabon",
|
|
626: "Gambia",
|
|
627: "Ghana",
|
|
629: "Guinea",
|
|
630: "Guinea-Bissau",
|
|
631: "Kenya",
|
|
632: "Lesotho",
|
|
633: "Liberia",
|
|
634: "Liberia",
|
|
635: "Liberia",
|
|
636: "Liberia",
|
|
637: "Libya",
|
|
642: "Madagascar",
|
|
644: "Malawi",
|
|
645: "Mali",
|
|
647: "Mauritania",
|
|
649: "Mauritius",
|
|
650: "Mozambique",
|
|
654: "Namibia",
|
|
655: "Niger",
|
|
656: "Nigeria",
|
|
657: "Guinea",
|
|
659: "Rwanda",
|
|
660: "Senegal",
|
|
661: "Sierra Leone",
|
|
662: "Somalia",
|
|
663: "South Africa",
|
|
664: "Sudan",
|
|
667: "Tanzania",
|
|
668: "Togo",
|
|
669: "Tunisia",
|
|
670: "Uganda",
|
|
671: "Egypt",
|
|
672: "Tanzania",
|
|
674: "Zambia",
|
|
675: "Zimbabwe",
|
|
676: "Comoros",
|
|
677: "Tanzania",
|
|
}
|
|
|
|
|
|
def get_country_from_mmsi(mmsi: int) -> str:
|
|
"""Look up flag state from MMSI Maritime Identification Digit."""
|
|
mmsi_str = str(mmsi)
|
|
if len(mmsi_str) == 9:
|
|
mid = int(mmsi_str[:3])
|
|
return MID_COUNTRY.get(mid, "UNKNOWN")
|
|
return "UNKNOWN"
|
|
|
|
|
|
# Global vessel store: MMSI → vessel dict
|
|
_vessels: dict[int, dict] = {}
|
|
_vessels_lock = threading.Lock()
|
|
_ws_thread: threading.Thread | None = None
|
|
_ws_running = False
|
|
_proxy_process = None
|
|
|
|
import os
|
|
|
|
CACHE_FILE = os.path.join(os.path.dirname(__file__), "ais_cache.json")
|
|
|
|
|
|
def _save_cache():
|
|
"""Save vessel data to disk for persistence across restarts."""
|
|
try:
|
|
with _vessels_lock:
|
|
# Convert int keys to strings for JSON
|
|
data = {str(k): v for k, v in _vessels.items()}
|
|
with open(CACHE_FILE, "w") as f:
|
|
json.dump(data, f)
|
|
logger.info(f"AIS cache saved: {len(data)} vessels")
|
|
except (IOError, OSError) as e:
|
|
logger.error(f"Failed to save AIS cache: {e}")
|
|
|
|
|
|
def _load_cache():
|
|
"""Load vessel data from disk on startup."""
|
|
global _vessels
|
|
if not os.path.exists(CACHE_FILE):
|
|
return
|
|
try:
|
|
with open(CACHE_FILE, "r") as f:
|
|
data = json.load(f)
|
|
now = time.time()
|
|
stale_cutoff = now - 3600 # Accept vessels up to 1 hour old on restart
|
|
loaded = 0
|
|
with _vessels_lock:
|
|
for k, v in data.items():
|
|
if v.get("_updated", 0) > stale_cutoff:
|
|
_vessels[int(k)] = v
|
|
loaded += 1
|
|
logger.info(f"AIS cache loaded: {loaded} vessels from disk")
|
|
except (IOError, OSError, json.JSONDecodeError, ValueError) as e:
|
|
logger.error(f"Failed to load AIS cache: {e}")
|
|
|
|
|
|
def prune_stale_vessels():
|
|
"""Remove vessels not updated in the last 15 minutes. Safe to call from a scheduler."""
|
|
now = time.time()
|
|
stale_cutoff = now - 900
|
|
with _vessels_lock:
|
|
stale_keys = [k for k, v in _vessels.items() if v.get("_updated", 0) < stale_cutoff]
|
|
for k in stale_keys:
|
|
del _vessels[k]
|
|
if stale_keys:
|
|
logger.info(f"AIS pruned {len(stale_keys)} stale vessels")
|
|
|
|
|
|
def get_ais_vessels() -> list[dict]:
|
|
"""Return a snapshot of tracked AIS vessels, pruning stale."""
|
|
prune_stale_vessels()
|
|
|
|
with _vessels_lock:
|
|
result = []
|
|
for mmsi, v in _vessels.items():
|
|
v_type = v.get("type", "unknown")
|
|
# Skip vessels without valid position
|
|
if not v.get("lat") or not v.get("lng"):
|
|
continue
|
|
|
|
# Sanitize speed: AIS 102.3 kn = "speed not available"
|
|
sog = v.get("sog", 0)
|
|
if sog >= 102.2:
|
|
sog = 0
|
|
|
|
result.append(
|
|
{
|
|
"mmsi": mmsi,
|
|
"name": v.get("name", "UNKNOWN"),
|
|
"type": v_type,
|
|
"lat": round(v.get("lat", 0), 5),
|
|
"lng": round(v.get("lng", 0), 5),
|
|
"heading": v.get("heading", 0),
|
|
"sog": round(sog, 1),
|
|
"cog": round(v.get("cog", 0), 1),
|
|
"callsign": v.get("callsign", ""),
|
|
"destination": v.get("destination", "") or "UNKNOWN",
|
|
"imo": v.get("imo", 0),
|
|
"country": get_country_from_mmsi(mmsi),
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
def ingest_ais_catcher(msgs: list[dict]) -> int:
|
|
"""Ingest decoded AIS messages from AIS-catcher HTTP feed.
|
|
Returns number of vessels updated."""
|
|
count = 0
|
|
now = time.time()
|
|
with _vessels_lock:
|
|
for msg in msgs:
|
|
mmsi = msg.get("mmsi")
|
|
if not mmsi or not isinstance(mmsi, int):
|
|
continue
|
|
|
|
vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi})
|
|
msg_type = msg.get("type", 0)
|
|
|
|
# Position reports (types 1, 2, 3 = Class A; 18, 19 = Class B)
|
|
if msg_type in (1, 2, 3, 18, 19):
|
|
lat = msg.get("lat")
|
|
lon = msg.get("lon")
|
|
if lat is not None and lon is not None and lat != 91.0 and lon != 181.0:
|
|
vessel["lat"] = lat
|
|
vessel["lng"] = lon
|
|
# AIS raw value 1023 (102.3 kn) = "speed not available"
|
|
raw_speed = msg.get("speed", 0)
|
|
vessel["sog"] = 0 if raw_speed >= 102.2 else raw_speed
|
|
vessel["cog"] = msg.get("course", 0)
|
|
heading = msg.get("heading", 511)
|
|
vessel["heading"] = heading if heading != 511 else vessel.get("cog", 0)
|
|
vessel["_updated"] = now
|
|
if msg.get("shipname"):
|
|
vessel["name"] = msg["shipname"].strip()
|
|
count += 1
|
|
|
|
# Static data (type 5 = Class A static; 24 = Class B static)
|
|
elif msg_type in (5, 24):
|
|
if msg.get("shipname"):
|
|
vessel["name"] = msg["shipname"].strip()
|
|
if msg.get("callsign"):
|
|
vessel["callsign"] = msg["callsign"].strip()
|
|
if msg.get("imo"):
|
|
vessel["imo"] = msg["imo"]
|
|
if msg.get("destination"):
|
|
vessel["destination"] = msg["destination"].strip().replace("@", "")
|
|
ship_type = msg.get("shiptype", 0)
|
|
if ship_type:
|
|
vessel["ais_type_code"] = ship_type
|
|
vessel["type"] = classify_vessel(ship_type, mmsi)
|
|
vessel["_updated"] = now
|
|
|
|
# Ensure country is set from MMSI MID
|
|
if "country" not in vessel:
|
|
vessel["country"] = get_country_from_mmsi(mmsi)
|
|
|
|
# Ensure name exists
|
|
if "name" not in vessel:
|
|
vessel["name"] = msg.get("shipname", "UNKNOWN") or "UNKNOWN"
|
|
|
|
return count
|
|
|
|
|
|
def _ais_stream_loop():
|
|
"""Main loop: spawn node proxy and process messages from stdout."""
|
|
global _proxy_process
|
|
import subprocess
|
|
import os
|
|
|
|
proxy_script = os.path.join(os.path.dirname(os.path.dirname(__file__)), "ais_proxy.js")
|
|
backoff = 1 # Exponential backoff starting at 1 second
|
|
|
|
if not API_KEY:
|
|
logger.info("AIS_API_KEY not set — ship tracking disabled. Set AIS_API_KEY to enable.")
|
|
return
|
|
|
|
while _ws_running:
|
|
try:
|
|
logger.info("Starting Node.js AIS Stream Proxy...")
|
|
proxy_env = os.environ.copy()
|
|
proxy_env["AIS_API_KEY"] = API_KEY
|
|
process = subprocess.Popen(
|
|
["node", proxy_script],
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True,
|
|
bufsize=1,
|
|
env=proxy_env,
|
|
)
|
|
with _vessels_lock:
|
|
_proxy_process = process
|
|
|
|
# Drain stderr in a background thread to prevent deadlock
|
|
import threading
|
|
|
|
def _drain_stderr():
|
|
for errline in iter(process.stderr.readline, ""):
|
|
errline = errline.strip()
|
|
if errline:
|
|
logger.warning(f"AIS proxy stderr: {errline}")
|
|
|
|
threading.Thread(target=_drain_stderr, daemon=True).start()
|
|
|
|
logger.info("AIS Stream proxy started — receiving vessel data")
|
|
|
|
msg_count = 0
|
|
ok_streak = 0 # Track consecutive successful messages for backoff reset
|
|
last_log_time = time.time()
|
|
for raw_msg in iter(process.stdout.readline, ""):
|
|
if not _ws_running:
|
|
process.terminate()
|
|
break
|
|
|
|
raw_msg = raw_msg.strip()
|
|
if not raw_msg:
|
|
continue
|
|
|
|
try:
|
|
data = json.loads(raw_msg)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
if "error" in data:
|
|
logger.error(f"AIS Stream error: {data['error']}")
|
|
continue
|
|
|
|
msg_type = data.get("MessageType", "")
|
|
metadata = data.get("MetaData", {})
|
|
message = data.get("Message", {})
|
|
|
|
mmsi = metadata.get("MMSI", 0)
|
|
if not mmsi:
|
|
continue
|
|
|
|
with _vessels_lock:
|
|
if mmsi not in _vessels:
|
|
_vessels[mmsi] = {"_updated": time.time()}
|
|
vessel = _vessels[mmsi]
|
|
|
|
# Update position from PositionReport or StandardClassBPositionReport
|
|
if msg_type in ("PositionReport", "StandardClassBPositionReport"):
|
|
report = message.get(msg_type, {})
|
|
lat = report.get("Latitude", metadata.get("latitude", 0))
|
|
lng = report.get("Longitude", metadata.get("longitude", 0))
|
|
|
|
# Skip invalid positions
|
|
if lat == 0 and lng == 0:
|
|
continue
|
|
if abs(lat) > 90 or abs(lng) > 180:
|
|
continue
|
|
|
|
with _vessels_lock:
|
|
vessel["lat"] = lat
|
|
vessel["lng"] = lng
|
|
# AIS raw value 1023 (102.3 kn) = "speed not available"
|
|
raw_sog = report.get("Sog", 0)
|
|
vessel["sog"] = 0 if raw_sog >= 102.2 else raw_sog
|
|
vessel["cog"] = report.get("Cog", 0)
|
|
heading = report.get("TrueHeading", 511)
|
|
vessel["heading"] = heading if heading != 511 else report.get("Cog", 0)
|
|
vessel["_updated"] = time.time()
|
|
# Use metadata name if we don't have one yet
|
|
if not vessel.get("name") or vessel["name"] == "UNKNOWN":
|
|
vessel["name"] = (
|
|
metadata.get("ShipName", "UNKNOWN").strip() or "UNKNOWN"
|
|
)
|
|
|
|
# Update static data from ShipStaticData
|
|
elif msg_type == "ShipStaticData":
|
|
static = message.get("ShipStaticData", {})
|
|
ais_type = static.get("Type", 0)
|
|
|
|
with _vessels_lock:
|
|
vessel["name"] = (
|
|
static.get("Name", "") or metadata.get("ShipName", "UNKNOWN")
|
|
).strip() or "UNKNOWN"
|
|
vessel["callsign"] = (static.get("CallSign", "") or "").strip()
|
|
vessel["imo"] = static.get("ImoNumber", 0)
|
|
vessel["destination"] = (
|
|
(static.get("Destination", "") or "").strip().replace("@", "")
|
|
)
|
|
vessel["ais_type_code"] = ais_type
|
|
vessel["type"] = classify_vessel(ais_type, mmsi)
|
|
vessel["_updated"] = time.time()
|
|
|
|
msg_count += 1
|
|
ok_streak += 1
|
|
|
|
# Reset backoff after 200 consecutive successful messages
|
|
if ok_streak >= 200 and backoff > 1:
|
|
backoff = 1
|
|
ok_streak = 0
|
|
|
|
# Periodic logging + cache save (time-based instead of count-based to avoid lock in hot loop)
|
|
now = time.time()
|
|
if now - last_log_time >= 60:
|
|
with _vessels_lock:
|
|
count = len(_vessels)
|
|
logger.info(
|
|
f"AIS Stream: processed {msg_count} messages, tracking {count} vessels"
|
|
)
|
|
_save_cache()
|
|
last_log_time = now
|
|
|
|
except (ConnectionError, TimeoutError, OSError, ValueError, KeyError) as e:
|
|
logger.error(f"AIS proxy connection error: {e}")
|
|
if _ws_running:
|
|
logger.info(f"Restarting AIS proxy in {backoff}s (exponential backoff)...")
|
|
time.sleep(backoff)
|
|
backoff = min(backoff * 2, 60) # Double up to 60s max
|
|
continue
|
|
|
|
|
|
def _run_ais_loop():
|
|
"""Thread target: run the AIS loop."""
|
|
global _ws_running, _ws_thread, _proxy_process
|
|
try:
|
|
_ais_stream_loop()
|
|
except Exception as e:
|
|
logger.error(f"AIS Stream thread crashed: {e}")
|
|
finally:
|
|
with _vessels_lock:
|
|
_ws_running = False
|
|
_ws_thread = None
|
|
_proxy_process = None
|
|
|
|
|
|
def start_ais_stream():
|
|
"""Start the AIS WebSocket stream in a background thread."""
|
|
global _ws_thread, _ws_running
|
|
with _vessels_lock:
|
|
if _ws_running:
|
|
logger.info("AIS Stream already running")
|
|
return
|
|
_ws_running = True
|
|
existing_thread = _ws_thread
|
|
if existing_thread and existing_thread.is_alive():
|
|
logger.info("AIS Stream already running")
|
|
return
|
|
|
|
# Load cached vessel data from disk
|
|
_load_cache()
|
|
|
|
_ws_thread = threading.Thread(target=_run_ais_loop, daemon=True, name="ais-stream")
|
|
_ws_thread.start()
|
|
logger.info("AIS Stream background thread started")
|
|
|
|
|
|
def stop_ais_stream():
|
|
"""Stop the AIS WebSocket stream and save cache."""
|
|
global _ws_running, _ws_thread, _proxy_process
|
|
with _vessels_lock:
|
|
_ws_running = False
|
|
_ws_thread = None
|
|
proc = _proxy_process
|
|
_proxy_process = None
|
|
|
|
if proc and proc.stdin:
|
|
try:
|
|
proc.stdin.close()
|
|
except Exception:
|
|
pass
|
|
|
|
_save_cache() # Save on shutdown
|
|
logger.info("AIS Stream stopping...")
|
|
|
|
|
|
def update_ais_bbox(south: float, west: float, north: float, east: float):
|
|
"""Dynamically update the AIS stream bounding box via proxy stdin."""
|
|
with _vessels_lock:
|
|
proc = _proxy_process
|
|
if not proc or not proc.stdin:
|
|
return
|
|
|
|
try:
|
|
cmd = json.dumps({"type": "update_bbox", "bboxes": [[[south, west], [north, east]]]})
|
|
proc.stdin.write(cmd + "\n")
|
|
proc.stdin.flush()
|
|
logger.info(
|
|
f"Updated AIS bounding box to: S:{south:.2f} W:{west:.2f} N:{north:.2f} E:{east:.2f}"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Failed to update AIS bbox: {e}")
|