diff --git a/backend/.env.example b/backend/.env.example index 545c4b0..1e6a24b 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -11,6 +11,13 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key # ── Optional ─────────────────────────────────────────────────── +# AISHub REST fallback. Used when stream.aisstream.io is unreachable +# (e.g. their cert expires or server goes offline). Free tier requires +# registration at https://www.aishub.net/api. Poll cadence defaults to +# 20 min to stay courteous; tunable via AISHUB_POLL_INTERVAL_MINUTES. +# AISHUB_USERNAME= +# AISHUB_POLL_INTERVAL_MINUTES=20 + # Override allowed CORS origins (comma-separated). Defaults to localhost + LAN auto-detect. # CORS_ORIGINS=http://192.168.1.50:3000,https://my-domain.com diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index bdbefb0..65bb8f7 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -777,6 +777,26 @@ def start_scheduler(): misfire_grace_time=60, ) + # AISHub REST fallback — slow polling when the AISStream WebSocket + # primary is offline. Configurable interval via + # AISHUB_POLL_INTERVAL_MINUTES env (default 20 min). Operator must + # set AISHUB_USERNAME to opt in. The fetcher is gated internally on + # the primary being disconnected, so this job is cheap when the + # WebSocket is healthy (early-returns after a status check). + from services.fetchers.aishub_fallback import ( + aishub_poll_interval_minutes, + fetch_aishub_vessels, + ) + _aishub_interval = aishub_poll_interval_minutes() + _scheduler.add_job( + lambda: _run_task_with_health(fetch_aishub_vessels, "fetch_aishub_vessels"), + "interval", + minutes=_aishub_interval, + id="aishub_fallback", + max_instances=1, + misfire_grace_time=120, + ) + # Route database — bulk refresh from vrs-standing-data.adsb.lol every 5 # days. Replaces the legacy /api/0/routeset POST (blocked under our UA, # and broken upstream). Airline schedules change on a quarterly cycle, diff --git a/backend/services/fetchers/aishub_fallback.py b/backend/services/fetchers/aishub_fallback.py new file mode 100644 index 0000000..b8d51b0 --- /dev/null +++ b/backend/services/fetchers/aishub_fallback.py @@ -0,0 +1,290 @@ +"""AISHub REST fallback for ship tracking when AISStream is unreachable. + +Background +---------- +On 2026-05-23 ``stream.aisstream.io`` (the primary live AIS WebSocket feed) +went fully offline. Backend's only ship signal vanished. This module polls +``data.aishub.net``'s free REST API on a slow cadence (default 20 min) when +the WebSocket primary is disconnected, so the ships layer doesn't go fully +dark during upstream outages. + +Why 20 minutes +-------------- +AISHub's free tier is rate-limited and explicitly asks consumers to be +courteous. 20 minutes is well inside their limits, gives ships time to +move enough to look "alive" on the map, and won't drain their service. +Configurable via the ``AISHUB_POLL_INTERVAL_MINUTES`` env var (clamped to +[1, 360]). + +Why slow vs primary +------------------- +This is degraded mode, not a replacement. A ship at 20 knots moves about +6 nautical miles in 20 minutes — visible on the map but coarser than the +real-time WebSocket signal. When AISStream comes back online, the +WebSocket data will overwrite these records via the same ``_vessels`` +dict and ``source`` will flip from ``"aishub"`` back to upstream-live. + +Opt-in +------ +Operator must set ``AISHUB_USERNAME`` (free registration at +https://www.aishub.net/api). If unset, this fetcher is a no-op. +""" + +from __future__ import annotations + +import json +import logging +import os +import time +from typing import Any + +from services.network_utils import fetch_with_curl + +logger = logging.getLogger(__name__) + + +AISHUB_URL = "https://data.aishub.net/ws.php" + + +def aishub_username() -> str: + return str(os.environ.get("AISHUB_USERNAME", "")).strip() + + +def aishub_fallback_enabled() -> bool: + """Returns True only when the operator has registered with AISHub and + set ``AISHUB_USERNAME``. The presence of the username is the opt-in.""" + return bool(aishub_username()) + + +def aishub_poll_interval_minutes() -> int: + """Default 20 minutes. Clamped to [1, 360] so a hostile or + misconfigured env var can't either hammer the upstream or silence the + fallback for a day.""" + raw = os.environ.get("AISHUB_POLL_INTERVAL_MINUTES", "20") + try: + value = int(str(raw).strip()) + except (TypeError, ValueError): + value = 20 + return max(1, min(360, value)) + + +def _should_run_fallback() -> bool: + """Only run when the primary WebSocket is disconnected. Avoids stomping + over fresher live data when AISStream is healthy. + + Returns False if: + * AISHub isn't configured (no username) + * AISStream primary is currently connected (recent vessel messages) + + Returns True only when AIS is configured-but-down. The + ``proxy_spawn_count > 0`` guard means "the primary has at least tried + to run" — if the user set AISHUB_USERNAME but not AIS_API_KEY at all, + AISHub will still serve as a primary on its own slow cadence. + """ + if not aishub_fallback_enabled(): + return False + try: + from services.ais_stream import ais_proxy_status + status = ais_proxy_status() or {} + except Exception: + return True # ais_stream not importable? still try AISHub. + # If the WebSocket primary is connected, skip the fallback — fresher + # data is already flowing. + if status.get("connected") is True: + return False + return True + + +def _parse_aishub_response(payload: str) -> list[dict]: + """Parse the AISHub JSON response into a list of vessel records. + + Successful response shape:: + + [ + {"ERROR": false, "USERNAME": "...", "FORMAT": "1", "RECORDS": N}, + [{"MMSI": ..., "LATITUDE": ..., "LONGITUDE": ..., ...}, ...] + ] + + Error response shape:: + + [{"ERROR": true, "ERROR_MESSAGE": "..."}] + + Empty payload (e.g. silent rate-limit drop) returns ``[]``. + """ + if not payload or not payload.strip(): + return [] + try: + data = json.loads(payload) + except json.JSONDecodeError as e: + logger.warning("AISHub: response is not JSON: %s", e) + return [] + if not isinstance(data, list) or not data: + return [] + header = data[0] if isinstance(data[0], dict) else {} + if header.get("ERROR") is True: + logger.warning( + "AISHub: upstream error: %s", + header.get("ERROR_MESSAGE", ""), + ) + return [] + if len(data) < 2 or not isinstance(data[1], list): + return [] + return [row for row in data[1] if isinstance(row, dict)] + + +def _normalize_record(row: dict) -> dict | None: + """Map an AISHub vessel record to our internal vessel schema. + + Returns None when the record can't be used (no MMSI, bad position, + sentinel "not available" lat/lng). + """ + try: + mmsi = int(row.get("MMSI") or 0) + except (TypeError, ValueError): + return None + if not mmsi: + return None + try: + lat = float(row.get("LATITUDE")) + lng = float(row.get("LONGITUDE")) + except (TypeError, ValueError): + return None + # AIS uses 91/181 as "no position available" sentinels. + if abs(lat) > 90 or abs(lng) > 180: + return None + if lat == 91.0 or lng == 181.0: + return None + # SOG raw 102.3 is "speed not available"; sanitize to 0. + try: + sog_raw = float(row.get("SOG") or 0) + except (TypeError, ValueError): + sog_raw = 0.0 + sog = 0.0 if sog_raw >= 102.2 else sog_raw + try: + cog = float(row.get("COG") or 0) + except (TypeError, ValueError): + cog = 0.0 + try: + heading_raw = int(row.get("HEADING") or 511) + except (TypeError, ValueError): + heading_raw = 511 + # AIS heading sentinel 511 = "not available" — fall back to COG. + heading = heading_raw if heading_raw != 511 else cog + try: + ais_type = int(row.get("TYPE") or 0) + except (TypeError, ValueError): + ais_type = 0 + return { + "mmsi": mmsi, + "lat": lat, + "lng": lng, + "sog": sog, + "cog": cog, + "heading": heading, + "name": str(row.get("NAME") or "").strip() or "UNKNOWN", + "callsign": str(row.get("CALLSIGN") or "").strip(), + "destination": str(row.get("DEST") or "").strip().replace("@", "") or "", + "imo": int(row.get("IMO") or 0), + "ais_type_code": ais_type, + } + + +def fetch_aishub_vessels() -> int: + """Poll AISHub and merge vessels into the shared ``_vessels`` store. + + Returns the number of vessels updated (0 on skip, error, or no data). + Designed to be called by the APScheduler tier — see + ``data_fetcher.py`` for the 20-minute interval job that wraps this. + """ + if not _should_run_fallback(): + logger.debug("AISHub fallback skipped: primary connected or not configured") + return 0 + + username = aishub_username() + url = ( + f"{AISHUB_URL}?username={username}&format=1&output=json" + f"&compress=0" + ) + + try: + response = fetch_with_curl(url, timeout=30) + except Exception as e: + logger.warning("AISHub fetch failed: %s", e) + return 0 + + if not response or response.status_code != 200: + logger.warning( + "AISHub HTTP %s", + getattr(response, "status_code", "None"), + ) + return 0 + + rows = _parse_aishub_response(getattr(response, "text", "") or "") + if not rows: + return 0 + + # Inline imports to avoid a circular dependency at module load time + # (ais_stream imports lots of things and is loaded by main.py). + from services.ais_stream import ( + _vessels, + _vessels_lock, + _record_vessel_trail_locked, + classify_vessel, + get_country_from_mmsi, + ) + + now = time.time() + count = 0 + with _vessels_lock: + for row in rows: + normalized = _normalize_record(row) + if normalized is None: + continue + mmsi = normalized["mmsi"] + vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi}) + # Don't overwrite fresher live data: if the WebSocket pushed an + # update for this MMSI more recently than now-1s (race during + # the brief reconnection window) keep the live one. + last = float(vessel.get("_updated") or 0) + if last > now - 1: + continue + vessel.update( + { + "lat": normalized["lat"], + "lng": normalized["lng"], + "sog": normalized["sog"], + "cog": normalized["cog"], + "heading": normalized["heading"], + "_updated": now, + "source": "aishub", + } + ) + if normalized["name"] and normalized["name"] != "UNKNOWN": + vessel["name"] = normalized["name"] + if normalized["callsign"]: + vessel["callsign"] = normalized["callsign"] + if normalized["destination"]: + vessel["destination"] = normalized["destination"] + if normalized["imo"]: + vessel["imo"] = normalized["imo"] + if normalized["ais_type_code"]: + vessel["ais_type_code"] = normalized["ais_type_code"] + vessel["type"] = classify_vessel(normalized["ais_type_code"], mmsi) + if not vessel.get("country"): + vessel["country"] = get_country_from_mmsi(mmsi) + _record_vessel_trail_locked( + mmsi, + normalized["lat"], + normalized["lng"], + normalized["sog"], + now, + ) + count += 1 + + if count: + logger.info( + "AISHub fallback: merged %d vessels (poll interval %d min)", + count, + aishub_poll_interval_minutes(), + ) + return count diff --git a/backend/tests/test_aishub_fallback.py b/backend/tests/test_aishub_fallback.py new file mode 100644 index 0000000..4cf1083 --- /dev/null +++ b/backend/tests/test_aishub_fallback.py @@ -0,0 +1,432 @@ +"""AISHub REST fallback for ship tracking. + +Background +---------- +When ``stream.aisstream.io`` (the WebSocket primary) is unreachable, the +ships layer goes empty. ``aishub_fallback.py`` polls ``data.aishub.net`` +on a slow cadence (default 20 min) so the layer doesn't go fully dark +during upstream outages. + +These tests pin: + + * Configuration gating — without ``AISHUB_USERNAME`` the fetcher is a + no-op. The username's presence is the opt-in. + * Connectivity gating — when the WebSocket primary is connected, the + fallback skips so it doesn't stomp fresher live data. + * Response parsing — successful, error, and empty AISHub payloads. + * Record normalization — bad records (no MMSI, sentinel positions) are + dropped without crashing. + * Merge behavior — records land in the shared ``_vessels`` dict with + ``source: "aishub"`` and don't overwrite very-recent live updates. + * Poll interval clamping — env var overrides honored within [1, 360]. +""" + +from __future__ import annotations + +import json +import os +import time + +import pytest + + +# --------------------------------------------------------------------------- +# Configuration / gating +# --------------------------------------------------------------------------- + + +class TestGating: + def test_no_username_means_disabled(self, monkeypatch): + from services.fetchers.aishub_fallback import ( + aishub_fallback_enabled, + fetch_aishub_vessels, + ) + monkeypatch.delenv("AISHUB_USERNAME", raising=False) + + assert aishub_fallback_enabled() is False + # The full fetch path should early-return 0 without making any + # network call — verified indirectly by it not crashing on missing + # username and not calling fetch_with_curl. + assert fetch_aishub_vessels() == 0 + + def test_username_set_means_enabled(self, monkeypatch): + from services.fetchers.aishub_fallback import aishub_fallback_enabled + monkeypatch.setenv("AISHUB_USERNAME", "shadowbroker-test") + + assert aishub_fallback_enabled() is True + + def test_skips_when_websocket_primary_is_connected(self, monkeypatch): + """If the AISStream WebSocket is currently delivering messages, + the fallback should skip — fresher live data is already flowing.""" + from services.fetchers import aishub_fallback + from services import ais_stream as ais + + monkeypatch.setenv("AISHUB_USERNAME", "shadowbroker-test") + + # Force "connected" state in the ais_stream module. + with ais._vessels_lock: + ais._last_msg_at = time.time() - 5 # 5s ago — well inside 60s + ais._proxy_spawn_count = 1 + # Sanity check the gate: + assert ais.ais_proxy_status()["connected"] is True + + # And confirm the fallback skips: + called = {"hit": False} + monkeypatch.setattr( + aishub_fallback, + "fetch_with_curl", + lambda *a, **kw: (_ for _ in ()).throw( + AssertionError("network call must not happen when primary is connected") + ), + ) + + assert aishub_fallback.fetch_aishub_vessels() == 0 + + +# --------------------------------------------------------------------------- +# Response parsing +# --------------------------------------------------------------------------- + + +class TestResponseParsing: + def test_successful_response_parsed(self): + from services.fetchers.aishub_fallback import _parse_aishub_response + + payload = json.dumps([ + {"ERROR": False, "USERNAME": "test", "FORMAT": "1", "RECORDS": 2}, + [ + {"MMSI": 123, "LATITUDE": 40.0, "LONGITUDE": -73.0}, + {"MMSI": 456, "LATITUDE": 51.5, "LONGITUDE": -0.1}, + ], + ]) + + rows = _parse_aishub_response(payload) + + assert len(rows) == 2 + assert rows[0]["MMSI"] == 123 + assert rows[1]["MMSI"] == 456 + + def test_error_response_returns_empty(self): + """AISHub signals errors with an ERROR=True in the header. We log + and treat as no data.""" + from services.fetchers.aishub_fallback import _parse_aishub_response + + payload = json.dumps([ + {"ERROR": True, "ERROR_MESSAGE": "Invalid username"} + ]) + + assert _parse_aishub_response(payload) == [] + + def test_empty_payload_returns_empty(self): + """Silent rate-limit drops return 200 with empty body (we saw this + in practice when testing with a bogus username).""" + from services.fetchers.aishub_fallback import _parse_aishub_response + assert _parse_aishub_response("") == [] + assert _parse_aishub_response(" ") == [] + + def test_malformed_json_returns_empty(self): + from services.fetchers.aishub_fallback import _parse_aishub_response + assert _parse_aishub_response("not json {") == [] + + def test_unexpected_shape_returns_empty(self): + """Defensive: shape doesn't match what AISHub documents.""" + from services.fetchers.aishub_fallback import _parse_aishub_response + assert _parse_aishub_response(json.dumps({"unexpected": "object"})) == [] + assert _parse_aishub_response(json.dumps([])) == [] + # Header-only with no records list: + assert _parse_aishub_response(json.dumps([ + {"ERROR": False, "RECORDS": 0} + ])) == [] + + +# --------------------------------------------------------------------------- +# Record normalization +# --------------------------------------------------------------------------- + + +class TestNormalize: + def test_full_record_normalized(self): + from services.fetchers.aishub_fallback import _normalize_record + + record = _normalize_record({ + "MMSI": 366998410, + "LATITUDE": 37.8, + "LONGITUDE": -122.4, + "COG": 280, + "SOG": 12.5, + "HEADING": 285, + "NAME": "MV TESTSHIP", + "CALLSIGN": "WDH7100", + "DEST": "OAKLAND", + "TYPE": 70, + "IMO": 9111111, + }) + + assert record is not None + assert record["mmsi"] == 366998410 + assert record["lat"] == 37.8 + assert record["lng"] == -122.4 + assert record["sog"] == 12.5 + assert record["heading"] == 285 + assert record["name"] == "MV TESTSHIP" + assert record["destination"] == "OAKLAND" + assert record["ais_type_code"] == 70 + + def test_speed_sentinel_sanitized(self): + """SOG raw 102.3+ kn = "speed not available" in the AIS spec. + Sanitize to 0 so it doesn't look like a 200-knot ship.""" + from services.fetchers.aishub_fallback import _normalize_record + record = _normalize_record({ + "MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5, + "SOG": 102.3, "COG": 0, + }) + assert record["sog"] == 0.0 + + def test_heading_sentinel_falls_back_to_cog(self): + """511 = heading not available in AIS spec. Use COG instead.""" + from services.fetchers.aishub_fallback import _normalize_record + record = _normalize_record({ + "MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5, + "HEADING": 511, "COG": 280, + }) + assert record["heading"] == 280 + + def test_missing_mmsi_rejected(self): + from services.fetchers.aishub_fallback import _normalize_record + assert _normalize_record({"LATITUDE": 0.5, "LONGITUDE": 0.5}) is None + assert _normalize_record({"MMSI": 0, "LATITUDE": 0.5, "LONGITUDE": 0.5}) is None + + def test_no_position_rejected(self): + from services.fetchers.aishub_fallback import _normalize_record + assert _normalize_record({"MMSI": 1}) is None + assert _normalize_record({"MMSI": 1, "LATITUDE": 0.5}) is None + assert _normalize_record({"MMSI": 1, "LONGITUDE": 0.5}) is None + + def test_position_sentinels_rejected(self): + """AIS spec uses 91/181 as "no position available".""" + from services.fetchers.aishub_fallback import _normalize_record + assert _normalize_record({ + "MMSI": 1, "LATITUDE": 91.0, "LONGITUDE": 0.0 + }) is None + assert _normalize_record({ + "MMSI": 1, "LATITUDE": 0.0, "LONGITUDE": 181.0 + }) is None + + def test_out_of_range_rejected(self): + from services.fetchers.aishub_fallback import _normalize_record + assert _normalize_record({ + "MMSI": 1, "LATITUDE": 95.0, "LONGITUDE": 0.0 + }) is None + assert _normalize_record({ + "MMSI": 1, "LATITUDE": 0.0, "LONGITUDE": 200.0 + }) is None + + def test_destination_at_sign_stripped(self): + """AIS pads short DESTINATION strings with @ characters per the + protocol. Strip them so the UI doesn't render "OAKLAND@@@@@".""" + from services.fetchers.aishub_fallback import _normalize_record + record = _normalize_record({ + "MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5, + "DEST": "OAKLAND@@@", + }) + assert record["destination"] == "OAKLAND" + + +# --------------------------------------------------------------------------- +# Poll interval clamping +# --------------------------------------------------------------------------- + + +class TestPollInterval: + def test_default_is_twenty_minutes(self, monkeypatch): + from services.fetchers.aishub_fallback import aishub_poll_interval_minutes + monkeypatch.delenv("AISHUB_POLL_INTERVAL_MINUTES", raising=False) + assert aishub_poll_interval_minutes() == 20 + + def test_env_override_honored(self, monkeypatch): + from services.fetchers.aishub_fallback import aishub_poll_interval_minutes + monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "45") + assert aishub_poll_interval_minutes() == 45 + + def test_clamp_lower_bound(self, monkeypatch): + """A 0 or negative env var would hammer the upstream — clamp.""" + from services.fetchers.aishub_fallback import aishub_poll_interval_minutes + monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "0") + assert aishub_poll_interval_minutes() == 1 + monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "-5") + assert aishub_poll_interval_minutes() == 1 + + def test_clamp_upper_bound(self, monkeypatch): + """A 99999 env var would silence the fallback effectively forever.""" + from services.fetchers.aishub_fallback import aishub_poll_interval_minutes + monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "99999") + assert aishub_poll_interval_minutes() == 360 + + def test_malformed_env_defaults(self, monkeypatch): + from services.fetchers.aishub_fallback import aishub_poll_interval_minutes + monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "twenty") + assert aishub_poll_interval_minutes() == 20 + + +# --------------------------------------------------------------------------- +# End-to-end fetch + merge into _vessels store +# --------------------------------------------------------------------------- + + +class TestFetchAndMerge: + def _force_primary_disconnected(self): + """Set ais_stream module state so the gate allows the fallback.""" + from services import ais_stream as ais + with ais._vessels_lock: + # Far in the past → connected = false; spawn_count > 0 → primary + # has at least tried so the gate engages. + ais._last_msg_at = time.time() - 3600 + ais._proxy_spawn_count = 5 + ais._vessels.clear() + + def test_vessels_merged_with_source_tag(self, monkeypatch): + """Happy path: AISHub returns 2 ships, both land in ``_vessels`` + with ``source: 'aishub'``.""" + from services.fetchers import aishub_fallback + from services import ais_stream as ais + + monkeypatch.setenv("AISHUB_USERNAME", "test-user") + self._force_primary_disconnected() + + payload = json.dumps([ + {"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 2}, + [ + { + "MMSI": 111111111, + "LATITUDE": 40.0, + "LONGITUDE": -73.0, + "SOG": 12.0, + "COG": 270, + "HEADING": 275, + "NAME": "SHIP A", + "TYPE": 70, + }, + { + "MMSI": 222222222, + "LATITUDE": 51.5, + "LONGITUDE": -0.1, + "SOG": 8.0, + "COG": 90, + "HEADING": 92, + "NAME": "SHIP B", + "TYPE": 60, + }, + ], + ]) + + class FakeResp: + status_code = 200 + text = payload + + monkeypatch.setattr( + aishub_fallback, "fetch_with_curl", lambda *a, **kw: FakeResp() + ) + + count = aishub_fallback.fetch_aishub_vessels() + + assert count == 2 + with ais._vessels_lock: + v1 = ais._vessels.get(111111111) + v2 = ais._vessels.get(222222222) + assert v1 is not None + assert v1["source"] == "aishub" + assert v1["lat"] == 40.0 + assert v1["name"] == "SHIP A" + assert v2 is not None + assert v2["source"] == "aishub" + assert v2["type"] == "passenger" # AIS type 60 → passenger + + def test_does_not_overwrite_fresh_live_data(self, monkeypatch): + """If the WebSocket pushed an update for an MMSI 0.5s ago and the + AISHub poll completes in that window, we should NOT clobber the + fresher live data.""" + from services.fetchers import aishub_fallback + from services import ais_stream as ais + + monkeypatch.setenv("AISHUB_USERNAME", "test-user") + self._force_primary_disconnected() + + # Pre-seed _vessels with a "very fresh" live record. + fresh_ts = time.time() + with ais._vessels_lock: + ais._vessels[111111111] = { + "mmsi": 111111111, + "lat": 12.34, + "lng": 56.78, + "source": "aisstream", + "_updated": fresh_ts, + } + + payload = json.dumps([ + {"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 1}, + [ + { + "MMSI": 111111111, + "LATITUDE": 99.0, # bogus to make the test obvious + "LONGITUDE": 99.0, + "NAME": "STALE", + "SOG": 0, + "COG": 0, + "TYPE": 0, + }, + ], + ]) + + class FakeResp: + status_code = 200 + text = payload + + monkeypatch.setattr( + aishub_fallback, "fetch_with_curl", lambda *a, **kw: FakeResp() + ) + + # Note: 99.0/99.0 also exceeds the 91/181 sentinel guard and + # would be filtered. Pick a valid-but-bogus position instead. + payload = json.dumps([ + {"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 1}, + [ + { + "MMSI": 111111111, + "LATITUDE": 0.0, # different from the live 12.34 + "LONGITUDE": 0.0, + "NAME": "STALE", + "SOG": 0, + "COG": 0, + "TYPE": 0, + }, + ], + ]) + monkeypatch.setattr( + aishub_fallback, "fetch_with_curl", + lambda *a, **kw: type("R", (), {"status_code": 200, "text": payload})(), + ) + + aishub_fallback.fetch_aishub_vessels() + + with ais._vessels_lock: + v = ais._vessels.get(111111111) + # Live data wins — position should still be 12.34 / 56.78. + assert v["lat"] == 12.34 + assert v["lng"] == 56.78 + assert v["source"] == "aisstream" + + def test_http_failure_returns_zero(self, monkeypatch): + from services.fetchers import aishub_fallback + + monkeypatch.setenv("AISHUB_USERNAME", "test-user") + self._force_primary_disconnected() + + class FailResp: + status_code = 503 + text = "" + + monkeypatch.setattr( + aishub_fallback, "fetch_with_curl", lambda *a, **kw: FailResp() + ) + + assert aishub_fallback.fetch_aishub_vessels() == 0