Merge pull request #314 from BigBodyCobain/feat/ais-upstream-health

feat(ais): surface AISStream upstream outage instead of failing silently
This commit is contained in:
Shadowbroker
2026-05-23 07:12:37 -06:00
committed by GitHub
6 changed files with 387 additions and 7 deletions
+15
View File
@@ -59,6 +59,12 @@ async def health_check(request: Request):
# when the SPKI-pinned fallback is in effect. The data plane keeps
# flowing (this is by design — see ais_proxy.js comments) but observers
# who care about MITM-protection posture deserve a visible signal.
#
# Plus connectivity health (added 2026-05-23 when stream.aisstream.io
# went fully offline): ``connected`` tells the frontend whether ship
# data is actually flowing. When false, a banner explains that ships
# are unavailable due to an upstream outage — better than the user
# silently seeing an empty ocean and assuming we broke something.
ais_status: dict = {}
try:
from services.ais_stream import ais_proxy_status
@@ -69,6 +75,15 @@ async def health_check(request: Request):
# Don't override a worse top-level status if SLOs already failed,
# but escalate ok -> degraded so the field surfaces in dashboards.
top_status = "degraded"
# AIS_API_KEY not configured is "feature off", not "system broken" —
# so we only escalate when the operator opted into AIS (key set) AND
# the stream is currently offline.
if (
os.environ.get("AIS_API_KEY")
and ais_status.get("connected") is False
and top_status == "ok"
):
top_status = "degraded"
return {
"status": top_status,
+54 -7
View File
@@ -350,19 +350,58 @@ _proxy_process = None
# path during an upstream cert outage. Surfaced via ais_proxy_status() for
# /api/health.
_proxy_status: dict = {}
# Upstream-connectivity telemetry (added when stream.aisstream.io went fully
# offline on 2026-05-23). ``_last_msg_at`` is the unix timestamp of the most
# recent vessel message received from the proxy. ``_proxy_spawn_count`` is
# how many times we've started the node proxy; combined with no recent
# messages it tells us the proxy is respawning in a tight loop because the
# upstream is unreachable. Surfaced via ais_proxy_status() so the operator
# can see "AIS is dead" instead of guessing whether it's their map filter,
# their api key, or upstream.
_last_msg_at: float = 0.0
_proxy_spawn_count: int = 0
_VESSEL_TRAIL_INTERVAL_S = 120
_VESSEL_TRAIL_MAX_POINTS = 240
def ais_proxy_status() -> dict:
"""Return a copy of the latest ais_proxy.js status (issue #258).
# How stale "last vessel message" can be before we consider the stream
# disconnected. AISStream typically pushes multiple messages/sec, so a 60s
# gap means something's wrong upstream or in transit.
_AIS_CONNECTED_FRESHNESS_S = 60
Currently surfaces ``degraded_tls`` (bool) which is true when the
proxy is using SPKI-pinned fallback because AISStream's cert expired.
Returns an empty dict when no status has been received yet.
def ais_proxy_status() -> dict:
"""Return a copy of the latest ais_proxy.js status + connectivity health.
Fields:
* ``degraded_tls`` (bool, issue #258) — true when the proxy is using
SPKI-pinned fallback because AISStream's cert expired.
* ``connected`` (bool) — true when we received a vessel message in
the last ``_AIS_CONNECTED_FRESHNESS_S`` seconds.
* ``last_msg_age_seconds`` (int | None) — seconds since the last
vessel message; None if we've never received one.
* ``proxy_spawn_count`` (int) — how many times we've spawned the
node proxy. Sustained increases here without ``connected`` means
we're respawning in a tight loop because upstream is dead.
Returns an empty dict when called before the AIS subsystem starts
(e.g. during tests or when no API key is set).
"""
with _vessels_lock:
return dict(_proxy_status)
status = dict(_proxy_status)
last = _last_msg_at
spawns = _proxy_spawn_count
now = time.time()
if last > 0:
last_age = int(now - last)
status["last_msg_age_seconds"] = last_age
status["connected"] = last_age <= _AIS_CONNECTED_FRESHNESS_S
else:
status["last_msg_age_seconds"] = None
status["connected"] = False
status["proxy_spawn_count"] = spawns
return status
import os
@@ -588,8 +627,10 @@ def _ais_stream_loop():
env=proxy_env,
**popen_kwargs,
)
global _proxy_spawn_count
with _vessels_lock:
_proxy_process = process
_proxy_spawn_count += 1
# Drain stderr in a background thread to prevent deadlock
import threading
@@ -645,9 +686,15 @@ def _ais_stream_loop():
if not mmsi:
continue
# Telemetry: stamp the timestamp of the most recent real
# vessel message. ais_proxy_status() reads this to decide
# whether the stream is currently "connected" — i.e. has
# any data flowed in the last 60s.
global _last_msg_at
with _vessels_lock:
_last_msg_at = time.time()
if mmsi not in _vessels:
_vessels[mmsi] = {"_updated": time.time()}
_vessels[mmsi] = {"_updated": _last_msg_at}
vessel = _vessels[mmsi]
# Update position from PositionReport or StandardClassBPositionReport
+166
View File
@@ -0,0 +1,166 @@
"""AIS upstream-connectivity telemetry.
Background
----------
On 2026-05-23, stream.aisstream.io went fully offline (TCP timeouts on port
443). The backend's `_ais_stream_loop` kept respawning the node proxy every
few seconds, but no vessel messages ever arrived. From the operator's POV
the ships layer silently went empty and there was no way to tell whether
it was their config, their network, their viewport filter, or upstream.
The fix surfaces three signals from ``ais_proxy_status()``:
* ``connected`` — bool, true when we received a vessel message in the
last ``_AIS_CONNECTED_FRESHNESS_S`` seconds.
* ``last_msg_age_seconds`` — int | None, seconds since last vessel
message; None when we've never received one.
* ``proxy_spawn_count`` — int, how many times we've spawned the node
proxy. Sustained increase without ``connected`` means upstream is dead.
Plus ``/api/health`` escalates ``status`` to ``"degraded"`` when AIS is
configured (``AIS_API_KEY`` set) but the proxy is currently disconnected,
so a frontend banner can decide whether to render.
These tests pin every signal.
"""
from __future__ import annotations
import time
import pytest
def _reset_ais_module():
"""Reset module-level state so tests don't bleed into each other."""
from services import ais_stream as ais
with ais._vessels_lock:
ais._proxy_status.clear()
ais._last_msg_at = 0.0
ais._proxy_spawn_count = 0
class TestAisProxyStatusShape:
def test_fresh_module_reports_disconnected(self):
"""Before any vessel messages have arrived (e.g. cold start, no
upstream yet) we report ``connected: false`` and ``None`` for the
age. Banner should NOT render in this case until we know the
operator opted in, which we approximate by spawn_count > 0."""
_reset_ais_module()
from services.ais_stream import ais_proxy_status
s = ais_proxy_status()
assert s["connected"] is False
assert s["last_msg_age_seconds"] is None
assert s["proxy_spawn_count"] == 0
def test_recent_message_reports_connected(self):
"""Setting ``_last_msg_at`` to now produces ``connected: true``
and a small age."""
_reset_ais_module()
from services import ais_stream as ais
with ais._vessels_lock:
ais._last_msg_at = time.time() - 5
s = ais.ais_proxy_status()
assert s["connected"] is True
assert s["last_msg_age_seconds"] is not None
assert 4 <= s["last_msg_age_seconds"] <= 7
def test_stale_message_reports_disconnected(self):
"""``_last_msg_at`` more than the freshness threshold ago means
``connected: false`` — this is the smoking gun for "upstream
died and the proxy is respawning in a loop"."""
_reset_ais_module()
from services import ais_stream as ais
with ais._vessels_lock:
# 5 minutes ago — well past the 60s freshness window.
ais._last_msg_at = time.time() - 300
s = ais.ais_proxy_status()
assert s["connected"] is False
assert s["last_msg_age_seconds"] is not None
assert s["last_msg_age_seconds"] >= 299
def test_spawn_count_surfaced(self):
"""spawn_count should be visible — combined with disconnected it
tells operator we're hammering the upstream but getting nothing."""
_reset_ais_module()
from services import ais_stream as ais
with ais._vessels_lock:
ais._proxy_spawn_count = 42
s = ais.ais_proxy_status()
assert s["proxy_spawn_count"] == 42
def test_degraded_tls_preserved(self):
"""Existing issue #258 signal (degraded_tls) must still flow
through unchanged when present."""
_reset_ais_module()
from services import ais_stream as ais
with ais._vessels_lock:
ais._proxy_status["degraded_tls"] = True
s = ais.ais_proxy_status()
assert s.get("degraded_tls") is True
class TestHealthEndpointEscalation:
def test_disconnected_with_api_key_escalates_to_degraded(
self, client, monkeypatch
):
"""When ``AIS_API_KEY`` is configured AND the proxy is disconnected,
``/api/health`` should report ``status: "degraded"`` instead of
``"ok"``. This is what the frontend banner reads."""
_reset_ais_module()
monkeypatch.setenv("AIS_API_KEY", "test-key")
# Force "AIS upstream offline" state: spawn count > 0 (proxy tried),
# but no recent messages.
from services import ais_stream as ais
with ais._vessels_lock:
ais._proxy_spawn_count = 5
ais._last_msg_at = time.time() - 600 # 10 min ago
res = client.get("/api/health")
assert res.status_code == 200
body = res.json()
assert body["ais_proxy"]["connected"] is False
assert body["ais_proxy"]["proxy_spawn_count"] == 5
# Without API_KEY this would stay "ok"; with it set + connected=false,
# we expect at least "degraded" (could be "error" if an SLO is also
# red, but never "ok").
assert body["status"] in ("degraded", "error"), (
f"with AIS_API_KEY set + connected=false, status must NOT be 'ok'; "
f"got {body['status']!r}"
)
def test_no_api_key_does_not_escalate(self, client, monkeypatch):
"""When AIS_API_KEY isn't set, the operator hasn't opted in. Don't
flag the system as degraded just because AIS isn't running — that's
the intended state."""
_reset_ais_module()
monkeypatch.delenv("AIS_API_KEY", raising=False)
from services import ais_stream as ais
# Even if the proxy never ran (spawn_count=0) the disconnected
# signal is true. Without the env var, top_status should still
# be "ok" unless an SLO independently failed.
with ais._vessels_lock:
ais._proxy_spawn_count = 0
ais._last_msg_at = 0.0
res = client.get("/api/health")
assert res.status_code == 200
body = res.json()
# No assertion that status is exactly "ok" — other SLOs may have
# tripped during this test session. The contract is "AIS-being-off
# alone doesn't escalate when no key is set."
assert body["ais_proxy"]["connected"] is False
# If the body says degraded/error, it must be for some OTHER reason,
# not the AIS check. Practically: status==ok in a fresh test run.
# (We can't assert exactly without knowing every SLO state, so this
# test mainly proves the path doesn't crash.)