Harden production checklist: dedupe live-data routes and align serializers.

Pin Mathieu's data-path checklist in docs and PR template, remove dead main.py fast/slow handlers, unify orjson via _live_data_json_bytes, and bound LiveUAMap Playwright defaults.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-06 20:16:18 -06:00
parent bd81a940ff
commit 079ff7b737
6 changed files with 51 additions and 437 deletions
+13
View File
@@ -0,0 +1,13 @@
## Summary
<!-- What changed and why (13 bullets). -->
## Test plan
- [ ] <!-- How you verified the change -->
## Production hardening (data path / fetchers / unattended deploys only)
If this PR touches the data path, fetchers, or live-data APIs, walk through [docs/production-hardening.md](../docs/production-hardening.md) and note any N/A items here.
- [ ] Checklist reviewed (or N/A — explain why)
-420
View File
@@ -3848,426 +3848,6 @@ async def update_layers(update: LayerUpdate, request: Request):
return {"status": "ok"}
def _etag_response(request: Request, payload: dict, prefix: str = "", default=None):
"""Serialize once, use data version for ETag, return 304 or full response.
Uses a monotonic version counter instead of MD5-hashing the full payload.
The 304 fast path avoids serialization entirely.
"""
etag = _current_etag(prefix)
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
content = json_mod.dumps(_json_safe(payload), default=default, allow_nan=False)
return Response(
content=content,
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
def _current_etag(prefix: str = "") -> str:
from services.fetchers._store import get_active_layers_version, get_data_version
return f"{prefix}v{get_data_version()}-l{get_active_layers_version()}"
def _json_safe(value):
"""Recursively replace non-finite floats with None so responses stay valid JSON."""
if isinstance(value, float):
return value if math.isfinite(value) else None
if isinstance(value, dict):
# Snapshot mutable mappings first so background fetcher updates do not
# invalidate iteration while we serialize a response.
return {k: _json_safe(v) for k, v in list(value.items())}
if isinstance(value, list):
return [_json_safe(v) for v in list(value)]
if isinstance(value, tuple):
return [_json_safe(v) for v in list(value)]
return value
def _sanitize_payload(value):
"""Thread-safe snapshot with NaN→None. Cheaper than _json_safe: only deep-
copies dicts (for thread safety) and replaces non-finite floats. Lists are
shallow-copied â orjson handles the leaf serialisation natively."""
if isinstance(value, float):
return value if math.isfinite(value) else None
if isinstance(value, dict):
return {k: _sanitize_payload(v) for k, v in list(value.items())}
if isinstance(value, (list, tuple)):
return list(value)
return value
def _bbox_filter(
items: list, s: float, w: float, n: float, e: float, lat_key: str = "lat", lng_key: str = "lng"
) -> list:
"""Filter a list of dicts to those within the bounding box (with 20% padding).
Handles antimeridian crossing (e.g. w=170, e=-170)."""
pad_lat = (n - s) * 0.2
pad_lng = (e - w) * 0.2 if e > w else ((e + 360 - w) * 0.2)
s2, n2 = s - pad_lat, n + pad_lat
w2, e2 = w - pad_lng, e + pad_lng
crosses_antimeridian = w2 > e2
out = []
for item in items:
lat = item.get(lat_key)
lng = item.get(lng_key)
if lat is None or lng is None:
out.append(item) # Keep items without coords (don't filter them out)
continue
if not (s2 <= lat <= n2):
continue
if crosses_antimeridian:
if lng >= w2 or lng <= e2:
out.append(item)
else:
if w2 <= lng <= e2:
out.append(item)
return out
def _bbox_filter_geojson_points(items: list, s: float, w: float, n: float, e: float) -> list:
"""Filter GeoJSON Point features to a padded bounding box."""
pad_lat = (n - s) * 0.2
pad_lng = (e - w) * 0.2 if e > w else ((e + 360 - w) * 0.2)
s2, n2 = s - pad_lat, n + pad_lat
w2, e2 = w - pad_lng, e + pad_lng
crosses_antimeridian = w2 > e2
out = []
for item in items:
geometry = item.get("geometry") if isinstance(item, dict) else None
coords = geometry.get("coordinates") if isinstance(geometry, dict) else None
if not isinstance(coords, (list, tuple)) or len(coords) < 2:
out.append(item)
continue
lng, lat = coords[0], coords[1]
if lat is None or lng is None:
out.append(item)
continue
if not (s2 <= lat <= n2):
continue
if crosses_antimeridian:
if lng >= w2 or lng <= e2:
out.append(item)
else:
if w2 <= lng <= e2:
out.append(item)
return out
def _bbox_spans(s: float | None, w: float | None, n: float | None, e: float | None) -> tuple[float, float]:
if None in (s, w, n, e):
return 180.0, 360.0
lat_span = max(0.0, float(n) - float(s))
lng_span = float(e) - float(w)
if lng_span < 0:
lng_span += 360.0
if lng_span == 0 and w == -180 and e == 180:
lng_span = 360.0
return lat_span, max(0.0, lng_span)
def _downsample_points(items: list, max_items: int) -> list:
if max_items <= 0 or len(items) <= max_items:
return items
step = len(items) / float(max_items)
return [items[min(len(items) - 1, int(i * step))] for i in range(max_items)]
def _world_and_continental_scale(
has_bbox: bool, s: float | None, w: float | None, n: float | None, e: float | None
) -> tuple[bool, bool]:
lat_span, lng_span = _bbox_spans(s, w, n, e)
world_scale = (not has_bbox) or lng_span >= 300 or lat_span >= 120
continental_scale = has_bbox and not world_scale and (lng_span >= 120 or lat_span >= 55)
return world_scale, continental_scale
def _filter_sigint_by_layers(items: list, active_layers: dict[str, bool]) -> list:
allow_aprs = bool(active_layers.get("sigint_aprs", True))
allow_mesh = bool(active_layers.get("sigint_meshtastic", True))
if allow_aprs and allow_mesh:
return items
allowed_sources: set[str] = {"js8call"}
if allow_aprs:
allowed_sources.add("aprs")
if allow_mesh:
allowed_sources.update({"meshtastic", "meshtastic-map"})
return [item for item in items if str(item.get("source") or "").lower() in allowed_sources]
def _sigint_totals_for_items(items: list) -> dict[str, int]:
totals = {
"total": len(items),
"meshtastic": 0,
"meshtastic_live": 0,
"meshtastic_map": 0,
"aprs": 0,
"js8call": 0,
}
for item in items:
source = str(item.get("source") or "").lower()
if source == "meshtastic":
totals["meshtastic"] += 1
if bool(item.get("from_api")):
totals["meshtastic_map"] += 1
else:
totals["meshtastic_live"] += 1
elif source == "aprs":
totals["aprs"] += 1
elif source == "js8call":
totals["js8call"] += 1
return totals
def _cap_startup_items(items: list | None, max_items: int) -> list:
if not items:
return []
if len(items) <= max_items:
return items
return items[:max_items]
def _cap_fast_startup_payload(payload: dict) -> dict:
"""Trim high-volume layers for the first dashboard paint.
The full fast payload can legitimately contain tens of thousands of AIS,
ADS-B, SIGINT, and CCTV records. Returning all of that during app startup
blocks the first map render behind serialization/proxy/network pressure.
This startup payload paints representative live data immediately; the next
normal poll replaces it with the full dataset.
"""
capped = dict(payload)
capped["commercial_flights"] = _cap_startup_items(capped.get("commercial_flights"), 800)
capped["private_flights"] = _cap_startup_items(capped.get("private_flights"), 300)
capped["private_jets"] = _cap_startup_items(capped.get("private_jets"), 150)
capped["ships"] = _cap_startup_items(capped.get("ships"), 1500)
capped["cctv"] = []
capped["sigint"] = _cap_startup_items(capped.get("sigint"), 500)
capped["trains"] = _cap_startup_items(capped.get("trains"), 100)
capped["startup_payload"] = True
return capped
def _cap_fast_dashboard_payload(payload: dict) -> dict:
capped = dict(payload)
capped["commercial_flights"] = _downsample_points(capped.get("commercial_flights") or [], 6000)
capped["private_flights"] = _downsample_points(capped.get("private_flights") or [], 1500)
capped["private_jets"] = _downsample_points(capped.get("private_jets") or [], 1500)
capped["ships"] = _downsample_points(capped.get("ships") or [], 8000)
capped["cctv"] = _downsample_points(capped.get("cctv") or [], 2500)
capped["sigint"] = _downsample_points(capped.get("sigint") or [], 5000)
return capped
@app.get("/api/live-data/fast")
@limiter.limit("120/minute")
async def live_data_fast(
request: Request,
# bbox params accepted for backward compat but no longer used for filtering —
# all cached data is returned and the frontend culls off-screen entities via MapLibre.
s: float = Query(None, description="South bound (ignored)", ge=-90, le=90),
w: float = Query(None, description="West bound (ignored)", ge=-180, le=180),
n: float = Query(None, description="North bound (ignored)", ge=-90, le=90),
e: float = Query(None, description="East bound (ignored)", ge=-180, le=180),
initial: bool = Query(False, description="Return a capped startup payload for first paint"),
):
etag = _current_etag(prefix="fast|initial|" if initial else "fast|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import (
active_layers,
get_latest_data_subset_refs,
get_source_timestamps_snapshot,
)
d = get_latest_data_subset_refs(
"last_updated",
"commercial_flights",
"military_flights",
"private_flights",
"private_jets",
"tracked_flights",
"ships",
"cctv",
"uavs",
"liveuamap",
"gps_jamming",
"satellites",
"satellite_source",
"sigint",
"sigint_totals",
"trains",
)
freshness = get_source_timestamps_snapshot()
ships_enabled = any(
active_layers.get(key, True)
for key in (
"ships_military",
"ships_cargo",
"ships_civilian",
"ships_passenger",
"ships_tracked_yachts",
)
)
cctv_total = len(d.get("cctv") or [])
sigint_items = _filter_sigint_by_layers(d.get("sigint") or [], active_layers)
sigint_totals = _sigint_totals_for_items(sigint_items)
payload = {
"commercial_flights": (d.get("commercial_flights") or []) if active_layers.get("flights", True) else [],
"military_flights": (d.get("military_flights") or []) if active_layers.get("military", True) else [],
"private_flights": (d.get("private_flights") or []) if active_layers.get("private", True) else [],
"private_jets": (d.get("private_jets") or []) if active_layers.get("jets", True) else [],
"tracked_flights": (d.get("tracked_flights") or []) if active_layers.get("tracked", True) else [],
"ships": (d.get("ships") or []) if ships_enabled else [],
"cctv": (d.get("cctv") or []) if active_layers.get("cctv", True) else [],
"uavs": (d.get("uavs") or []) if active_layers.get("military", True) else [],
"liveuamap": (d.get("liveuamap") or []) if active_layers.get("global_incidents", True) else [],
"gps_jamming": (d.get("gps_jamming") or []) if active_layers.get("gps_jamming", True) else [],
"satellites": (d.get("satellites") or []) if active_layers.get("satellites", True) else [],
"satellite_source": d.get("satellite_source", "none"),
"sigint": sigint_items
if (active_layers.get("sigint_meshtastic", True) or active_layers.get("sigint_aprs", True))
else [],
"sigint_totals": sigint_totals,
"cctv_total": cctv_total,
"trains": (d.get("trains") or []) if active_layers.get("trains", True) else [],
"freshness": freshness,
}
if initial:
payload = _cap_fast_startup_payload(payload)
else:
payload = _cap_fast_dashboard_payload(payload)
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@app.get("/api/live-data/slow")
@limiter.limit("60/minute")
async def live_data_slow(
request: Request,
# bbox params accepted for backward compat but no longer used for filtering.
s: float = Query(None, description="South bound (ignored)", ge=-90, le=90),
w: float = Query(None, description="West bound (ignored)", ge=-180, le=180),
n: float = Query(None, description="North bound (ignored)", ge=-90, le=90),
e: float = Query(None, description="East bound (ignored)", ge=-180, le=180),
):
etag = _current_etag(prefix="slow|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import (
active_layers,
get_latest_data_subset_refs,
get_source_timestamps_snapshot,
)
d = get_latest_data_subset_refs(
"last_updated",
"news",
"stocks",
"financial_source",
"oil",
"weather",
"traffic",
"earthquakes",
"frontlines",
"gdelt",
"airports",
"kiwisdr",
"satnogs_stations",
"satnogs_observations",
"tinygs_satellites",
"space_weather",
"internet_outages",
"firms_fires",
"datacenters",
"military_bases",
"power_plants",
"viirs_change_nodes",
"scanners",
"weather_alerts",
"ukraine_alerts",
"air_quality",
"volcanoes",
"fishing_activity",
"psk_reporter",
"crowdthreat",
"correlations",
"threat_level",
"trending_markets",
"fimi",
"uap_sightings",
"wastewater",
"sar_scenes",
"sar_anomalies",
"sar_aoi_coverage",
)
freshness = get_source_timestamps_snapshot()
payload = {
"last_updated": d.get("last_updated"),
"threat_level": d.get("threat_level"),
"trending_markets": d.get("trending_markets", []),
"fimi": d.get("fimi", {}),
"news": d.get("news", []),
"stocks": d.get("stocks", {}),
"financial_source": d.get("financial_source", ""),
"oil": d.get("oil", {}),
"weather": d.get("weather"),
"traffic": d.get("traffic", []),
"earthquakes": (d.get("earthquakes") or []) if active_layers.get("earthquakes", True) else [],
"frontlines": d.get("frontlines") if active_layers.get("ukraine_frontline", True) else None,
"gdelt": (d.get("gdelt") or []) if active_layers.get("global_incidents", True) else [],
"airports": d.get("airports") or [],
"kiwisdr": (d.get("kiwisdr") or []) if active_layers.get("kiwisdr", True) else [],
"satnogs_stations": (d.get("satnogs_stations") or []) if active_layers.get("satnogs", True) else [],
"satnogs_total": len(d.get("satnogs_stations") or []),
"satnogs_observations": (d.get("satnogs_observations") or []) if active_layers.get("satnogs", True) else [],
"tinygs_satellites": (d.get("tinygs_satellites") or []) if active_layers.get("tinygs", True) else [],
"tinygs_total": len(d.get("tinygs_satellites") or []),
"psk_reporter": (d.get("psk_reporter") or []) if active_layers.get("psk_reporter", True) else [],
"space_weather": d.get("space_weather"),
"internet_outages": (d.get("internet_outages") or []) if active_layers.get("internet_outages", True) else [],
"firms_fires": (d.get("firms_fires") or []) if active_layers.get("firms", True) else [],
"datacenters": (d.get("datacenters") or []) if active_layers.get("datacenters", True) else [],
"military_bases": (d.get("military_bases") or []) if active_layers.get("military_bases", True) else [],
"power_plants": (d.get("power_plants") or []) if active_layers.get("power_plants", True) else [],
"viirs_change_nodes": (d.get("viirs_change_nodes") or []) if active_layers.get("viirs_nightlights", True) else [],
"scanners": (d.get("scanners") or []) if active_layers.get("scanners", True) else [],
"weather_alerts": d.get("weather_alerts", []) if active_layers.get("weather_alerts", True) else [],
"ukraine_alerts": d.get("ukraine_alerts", []) if active_layers.get("ukraine_alerts", True) else [],
"air_quality": (d.get("air_quality") or []) if active_layers.get("air_quality", True) else [],
"volcanoes": (d.get("volcanoes") or []) if active_layers.get("volcanoes", True) else [],
"fishing_activity": (d.get("fishing_activity") or []) if active_layers.get("fishing_activity", True) else [],
"crowdthreat": (d.get("crowdthreat") or []) if active_layers.get("crowdthreat", True) else [],
"correlations": (d.get("correlations") or []) if active_layers.get("correlations", True) else [],
"uap_sightings": (d.get("uap_sightings") or []) if active_layers.get("uap_sightings", True) else [],
"wastewater": (d.get("wastewater") or []) if active_layers.get("wastewater", True) else [],
"sar_scenes": (d.get("sar_scenes") or []) if active_layers.get("sar", True) else [],
"sar_anomalies": (d.get("sar_anomalies") or []) if active_layers.get("sar", True) else [],
"sar_aoi_coverage": (d.get("sar_aoi_coverage") or []) if active_layers.get("sar", True) else [],
"freshness": freshness,
}
return Response(
content=orjson.dumps(
_sanitize_payload(payload),
default=str,
option=orjson.OPT_NON_STR_KEYS,
),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@app.get("/api/oracle/region-intel")
@limiter.limit("30/minute")
async def oracle_region_intel(
+17 -9
View File
@@ -211,6 +211,15 @@ def _sanitize_payload(value):
return value
def _live_data_json_bytes(payload: dict) -> bytes:
"""Serialize dashboard payloads with the same defensive orjson options everywhere."""
return orjson.dumps(
_sanitize_payload(payload),
default=str,
option=orjson.OPT_NON_STR_KEYS,
)
def _bbox_filter(items: list, s: float, w: float, n: float, e: float,
lat_key: str = "lat", lng_key: str = "lng") -> list:
pad_lat = (n - s) * 0.2
@@ -561,11 +570,7 @@ async def live_data(request: Request):
payload = get_latest_data_deepcopy_snapshot()
return Response(
content=orjson.dumps(
_sanitize_payload(payload),
default=str,
option=orjson.OPT_NON_STR_KEYS,
),
content=_live_data_json_bytes(payload),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@@ -663,7 +668,7 @@ async def bootstrap_critical(request: Request):
"bootstrap_payload": True,
}
return Response(
content=orjson.dumps(_sanitize_payload(payload), default=str, option=orjson.OPT_NON_STR_KEYS),
content=_live_data_json_bytes(payload),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@@ -725,8 +730,11 @@ async def live_data_fast(
# to the pre-#288 implementation.
if _has_full_bbox(s, w, n, e):
payload = _apply_bbox_to_payload(payload, _FAST_BBOX_HEAVY_KEYS, s, w, n, e)
return Response(content=orjson.dumps(_sanitize_payload(payload)), media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"})
return Response(
content=_live_data_json_bytes(payload),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@router.get("/api/live-data/slow")
@@ -800,7 +808,7 @@ async def live_data_slow(
if _has_full_bbox(s, w, n, e):
payload = _apply_bbox_to_payload(payload, _SLOW_BBOX_HEAVY_KEYS, s, w, n, e)
return Response(
content=orjson.dumps(_sanitize_payload(payload), default=str, option=orjson.OPT_NON_STR_KEYS),
content=_live_data_json_bytes(payload),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
+3
View File
@@ -39,6 +39,9 @@ def fetch_liveuamap():
viewport={"width": 1920, "height": 1080},
color_scheme="dark",
)
# Bound navigation and script evaluation so a stuck region cannot hang the slow pool.
context.set_default_navigation_timeout(60_000)
context.set_default_timeout(30_000)
page = context.new_page()
stealth_sync(page)
@@ -38,14 +38,6 @@
"main",
"routers.health"
],
"GET /api/live-data/fast": [
"main",
"routers.data"
],
"GET /api/live-data/slow": [
"main",
"routers.data"
],
"GET /api/mesh/channels": [
"main",
"routers.mesh_public"
@@ -66,6 +66,24 @@ class TestLiveDataFullEndpoint:
assert r2.status_code == 304
assert r2.headers.get("etag") == etag
def test_live_data_fast_serializes_non_json_native_values(self, client):
from datetime import datetime, timezone
from services.fetchers import _store
with _store._data_lock:
prior = _store.latest_data.get("sigint")
_store.latest_data["sigint"] = [
{"source": "aprs", "observed": datetime(2026, 1, 1, tzinfo=timezone.utc)},
]
try:
r = client.get("/api/live-data/fast")
assert r.status_code == 200
assert "2026-01-01" in r.text
finally:
with _store._data_lock:
_store.latest_data["sigint"] = prior
def test_live_data_serializes_non_json_native_values(self, client):
from datetime import datetime, timezone