diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 0000000..a93e070 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,13 @@ +## Summary + + + +## Test plan + +- [ ] + +## Production hardening (data path / fetchers / unattended deploys only) + +If this PR touches the data path, fetchers, or live-data APIs, walk through [docs/production-hardening.md](../docs/production-hardening.md) and note any N/A items here. + +- [ ] Checklist reviewed (or N/A — explain why) diff --git a/backend/main.py b/backend/main.py index 8695b54..96ed16e 100644 --- a/backend/main.py +++ b/backend/main.py @@ -3848,426 +3848,6 @@ async def update_layers(update: LayerUpdate, request: Request): return {"status": "ok"} -def _etag_response(request: Request, payload: dict, prefix: str = "", default=None): - """Serialize once, use data version for ETag, return 304 or full response. - - Uses a monotonic version counter instead of MD5-hashing the full payload. - The 304 fast path avoids serialization entirely. - """ - etag = _current_etag(prefix) - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - content = json_mod.dumps(_json_safe(payload), default=default, allow_nan=False) - return Response( - content=content, - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"}, - ) - - -def _current_etag(prefix: str = "") -> str: - from services.fetchers._store import get_active_layers_version, get_data_version - - return f"{prefix}v{get_data_version()}-l{get_active_layers_version()}" - - -def _json_safe(value): - """Recursively replace non-finite floats with None so responses stay valid JSON.""" - if isinstance(value, float): - return value if math.isfinite(value) else None - if isinstance(value, dict): - # Snapshot mutable mappings first so background fetcher updates do not - # invalidate iteration while we serialize a response. - return {k: _json_safe(v) for k, v in list(value.items())} - if isinstance(value, list): - return [_json_safe(v) for v in list(value)] - if isinstance(value, tuple): - return [_json_safe(v) for v in list(value)] - return value - - -def _sanitize_payload(value): - """Thread-safe snapshot with NaN→None. Cheaper than _json_safe: only deep- - copies dicts (for thread safety) and replaces non-finite floats. Lists are - shallow-copied — orjson handles the leaf serialisation natively.""" - if isinstance(value, float): - return value if math.isfinite(value) else None - if isinstance(value, dict): - return {k: _sanitize_payload(v) for k, v in list(value.items())} - if isinstance(value, (list, tuple)): - return list(value) - return value - - -def _bbox_filter( - items: list, s: float, w: float, n: float, e: float, lat_key: str = "lat", lng_key: str = "lng" -) -> list: - """Filter a list of dicts to those within the bounding box (with 20% padding). - Handles antimeridian crossing (e.g. w=170, e=-170).""" - pad_lat = (n - s) * 0.2 - pad_lng = (e - w) * 0.2 if e > w else ((e + 360 - w) * 0.2) - s2, n2 = s - pad_lat, n + pad_lat - w2, e2 = w - pad_lng, e + pad_lng - crosses_antimeridian = w2 > e2 - out = [] - for item in items: - lat = item.get(lat_key) - lng = item.get(lng_key) - if lat is None or lng is None: - out.append(item) # Keep items without coords (don't filter them out) - continue - if not (s2 <= lat <= n2): - continue - if crosses_antimeridian: - if lng >= w2 or lng <= e2: - out.append(item) - else: - if w2 <= lng <= e2: - out.append(item) - return out - - -def _bbox_filter_geojson_points(items: list, s: float, w: float, n: float, e: float) -> list: - """Filter GeoJSON Point features to a padded bounding box.""" - pad_lat = (n - s) * 0.2 - pad_lng = (e - w) * 0.2 if e > w else ((e + 360 - w) * 0.2) - s2, n2 = s - pad_lat, n + pad_lat - w2, e2 = w - pad_lng, e + pad_lng - crosses_antimeridian = w2 > e2 - out = [] - for item in items: - geometry = item.get("geometry") if isinstance(item, dict) else None - coords = geometry.get("coordinates") if isinstance(geometry, dict) else None - if not isinstance(coords, (list, tuple)) or len(coords) < 2: - out.append(item) - continue - lng, lat = coords[0], coords[1] - if lat is None or lng is None: - out.append(item) - continue - if not (s2 <= lat <= n2): - continue - if crosses_antimeridian: - if lng >= w2 or lng <= e2: - out.append(item) - else: - if w2 <= lng <= e2: - out.append(item) - return out - - -def _bbox_spans(s: float | None, w: float | None, n: float | None, e: float | None) -> tuple[float, float]: - if None in (s, w, n, e): - return 180.0, 360.0 - lat_span = max(0.0, float(n) - float(s)) - lng_span = float(e) - float(w) - if lng_span < 0: - lng_span += 360.0 - if lng_span == 0 and w == -180 and e == 180: - lng_span = 360.0 - return lat_span, max(0.0, lng_span) - - -def _downsample_points(items: list, max_items: int) -> list: - if max_items <= 0 or len(items) <= max_items: - return items - step = len(items) / float(max_items) - return [items[min(len(items) - 1, int(i * step))] for i in range(max_items)] - - -def _world_and_continental_scale( - has_bbox: bool, s: float | None, w: float | None, n: float | None, e: float | None -) -> tuple[bool, bool]: - lat_span, lng_span = _bbox_spans(s, w, n, e) - world_scale = (not has_bbox) or lng_span >= 300 or lat_span >= 120 - continental_scale = has_bbox and not world_scale and (lng_span >= 120 or lat_span >= 55) - return world_scale, continental_scale - - -def _filter_sigint_by_layers(items: list, active_layers: dict[str, bool]) -> list: - allow_aprs = bool(active_layers.get("sigint_aprs", True)) - allow_mesh = bool(active_layers.get("sigint_meshtastic", True)) - if allow_aprs and allow_mesh: - return items - - allowed_sources: set[str] = {"js8call"} - if allow_aprs: - allowed_sources.add("aprs") - if allow_mesh: - allowed_sources.update({"meshtastic", "meshtastic-map"}) - return [item for item in items if str(item.get("source") or "").lower() in allowed_sources] - - -def _sigint_totals_for_items(items: list) -> dict[str, int]: - totals = { - "total": len(items), - "meshtastic": 0, - "meshtastic_live": 0, - "meshtastic_map": 0, - "aprs": 0, - "js8call": 0, - } - for item in items: - source = str(item.get("source") or "").lower() - if source == "meshtastic": - totals["meshtastic"] += 1 - if bool(item.get("from_api")): - totals["meshtastic_map"] += 1 - else: - totals["meshtastic_live"] += 1 - elif source == "aprs": - totals["aprs"] += 1 - elif source == "js8call": - totals["js8call"] += 1 - return totals - - -def _cap_startup_items(items: list | None, max_items: int) -> list: - if not items: - return [] - if len(items) <= max_items: - return items - return items[:max_items] - - -def _cap_fast_startup_payload(payload: dict) -> dict: - """Trim high-volume layers for the first dashboard paint. - - The full fast payload can legitimately contain tens of thousands of AIS, - ADS-B, SIGINT, and CCTV records. Returning all of that during app startup - blocks the first map render behind serialization/proxy/network pressure. - This startup payload paints representative live data immediately; the next - normal poll replaces it with the full dataset. - """ - capped = dict(payload) - capped["commercial_flights"] = _cap_startup_items(capped.get("commercial_flights"), 800) - capped["private_flights"] = _cap_startup_items(capped.get("private_flights"), 300) - capped["private_jets"] = _cap_startup_items(capped.get("private_jets"), 150) - capped["ships"] = _cap_startup_items(capped.get("ships"), 1500) - capped["cctv"] = [] - capped["sigint"] = _cap_startup_items(capped.get("sigint"), 500) - capped["trains"] = _cap_startup_items(capped.get("trains"), 100) - capped["startup_payload"] = True - return capped - - -def _cap_fast_dashboard_payload(payload: dict) -> dict: - capped = dict(payload) - capped["commercial_flights"] = _downsample_points(capped.get("commercial_flights") or [], 6000) - capped["private_flights"] = _downsample_points(capped.get("private_flights") or [], 1500) - capped["private_jets"] = _downsample_points(capped.get("private_jets") or [], 1500) - capped["ships"] = _downsample_points(capped.get("ships") or [], 8000) - capped["cctv"] = _downsample_points(capped.get("cctv") or [], 2500) - capped["sigint"] = _downsample_points(capped.get("sigint") or [], 5000) - return capped - - -@app.get("/api/live-data/fast") -@limiter.limit("120/minute") -async def live_data_fast( - request: Request, - # bbox params accepted for backward compat but no longer used for filtering — - # all cached data is returned and the frontend culls off-screen entities via MapLibre. - s: float = Query(None, description="South bound (ignored)", ge=-90, le=90), - w: float = Query(None, description="West bound (ignored)", ge=-180, le=180), - n: float = Query(None, description="North bound (ignored)", ge=-90, le=90), - e: float = Query(None, description="East bound (ignored)", ge=-180, le=180), - initial: bool = Query(False, description="Return a capped startup payload for first paint"), -): - etag = _current_etag(prefix="fast|initial|" if initial else "fast|full|") - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - - from services.fetchers._store import ( - active_layers, - get_latest_data_subset_refs, - get_source_timestamps_snapshot, - ) - - d = get_latest_data_subset_refs( - "last_updated", - "commercial_flights", - "military_flights", - "private_flights", - "private_jets", - "tracked_flights", - "ships", - "cctv", - "uavs", - "liveuamap", - "gps_jamming", - "satellites", - "satellite_source", - "sigint", - "sigint_totals", - "trains", - ) - freshness = get_source_timestamps_snapshot() - - ships_enabled = any( - active_layers.get(key, True) - for key in ( - "ships_military", - "ships_cargo", - "ships_civilian", - "ships_passenger", - "ships_tracked_yachts", - ) - ) - cctv_total = len(d.get("cctv") or []) - sigint_items = _filter_sigint_by_layers(d.get("sigint") or [], active_layers) - sigint_totals = _sigint_totals_for_items(sigint_items) - - payload = { - "commercial_flights": (d.get("commercial_flights") or []) if active_layers.get("flights", True) else [], - "military_flights": (d.get("military_flights") or []) if active_layers.get("military", True) else [], - "private_flights": (d.get("private_flights") or []) if active_layers.get("private", True) else [], - "private_jets": (d.get("private_jets") or []) if active_layers.get("jets", True) else [], - "tracked_flights": (d.get("tracked_flights") or []) if active_layers.get("tracked", True) else [], - "ships": (d.get("ships") or []) if ships_enabled else [], - "cctv": (d.get("cctv") or []) if active_layers.get("cctv", True) else [], - "uavs": (d.get("uavs") or []) if active_layers.get("military", True) else [], - "liveuamap": (d.get("liveuamap") or []) if active_layers.get("global_incidents", True) else [], - "gps_jamming": (d.get("gps_jamming") or []) if active_layers.get("gps_jamming", True) else [], - "satellites": (d.get("satellites") or []) if active_layers.get("satellites", True) else [], - "satellite_source": d.get("satellite_source", "none"), - "sigint": sigint_items - if (active_layers.get("sigint_meshtastic", True) or active_layers.get("sigint_aprs", True)) - else [], - "sigint_totals": sigint_totals, - "cctv_total": cctv_total, - "trains": (d.get("trains") or []) if active_layers.get("trains", True) else [], - "freshness": freshness, - } - if initial: - payload = _cap_fast_startup_payload(payload) - else: - payload = _cap_fast_dashboard_payload(payload) - return Response( - content=orjson.dumps(_sanitize_payload(payload)), - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"}, - ) - - -@app.get("/api/live-data/slow") -@limiter.limit("60/minute") -async def live_data_slow( - request: Request, - # bbox params accepted for backward compat but no longer used for filtering. - s: float = Query(None, description="South bound (ignored)", ge=-90, le=90), - w: float = Query(None, description="West bound (ignored)", ge=-180, le=180), - n: float = Query(None, description="North bound (ignored)", ge=-90, le=90), - e: float = Query(None, description="East bound (ignored)", ge=-180, le=180), -): - etag = _current_etag(prefix="slow|full|") - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - - from services.fetchers._store import ( - active_layers, - get_latest_data_subset_refs, - get_source_timestamps_snapshot, - ) - - d = get_latest_data_subset_refs( - "last_updated", - "news", - "stocks", - "financial_source", - "oil", - "weather", - "traffic", - "earthquakes", - "frontlines", - "gdelt", - "airports", - "kiwisdr", - "satnogs_stations", - "satnogs_observations", - "tinygs_satellites", - "space_weather", - "internet_outages", - "firms_fires", - "datacenters", - "military_bases", - "power_plants", - "viirs_change_nodes", - "scanners", - "weather_alerts", - "ukraine_alerts", - "air_quality", - "volcanoes", - "fishing_activity", - "psk_reporter", - "crowdthreat", - "correlations", - "threat_level", - "trending_markets", - "fimi", - "uap_sightings", - "wastewater", - "sar_scenes", - "sar_anomalies", - "sar_aoi_coverage", - ) - freshness = get_source_timestamps_snapshot() - - payload = { - "last_updated": d.get("last_updated"), - "threat_level": d.get("threat_level"), - "trending_markets": d.get("trending_markets", []), - "fimi": d.get("fimi", {}), - "news": d.get("news", []), - "stocks": d.get("stocks", {}), - "financial_source": d.get("financial_source", ""), - "oil": d.get("oil", {}), - "weather": d.get("weather"), - "traffic": d.get("traffic", []), - "earthquakes": (d.get("earthquakes") or []) if active_layers.get("earthquakes", True) else [], - "frontlines": d.get("frontlines") if active_layers.get("ukraine_frontline", True) else None, - "gdelt": (d.get("gdelt") or []) if active_layers.get("global_incidents", True) else [], - "airports": d.get("airports") or [], - "kiwisdr": (d.get("kiwisdr") or []) if active_layers.get("kiwisdr", True) else [], - "satnogs_stations": (d.get("satnogs_stations") or []) if active_layers.get("satnogs", True) else [], - "satnogs_total": len(d.get("satnogs_stations") or []), - "satnogs_observations": (d.get("satnogs_observations") or []) if active_layers.get("satnogs", True) else [], - "tinygs_satellites": (d.get("tinygs_satellites") or []) if active_layers.get("tinygs", True) else [], - "tinygs_total": len(d.get("tinygs_satellites") or []), - "psk_reporter": (d.get("psk_reporter") or []) if active_layers.get("psk_reporter", True) else [], - "space_weather": d.get("space_weather"), - "internet_outages": (d.get("internet_outages") or []) if active_layers.get("internet_outages", True) else [], - "firms_fires": (d.get("firms_fires") or []) if active_layers.get("firms", True) else [], - "datacenters": (d.get("datacenters") or []) if active_layers.get("datacenters", True) else [], - "military_bases": (d.get("military_bases") or []) if active_layers.get("military_bases", True) else [], - "power_plants": (d.get("power_plants") or []) if active_layers.get("power_plants", True) else [], - "viirs_change_nodes": (d.get("viirs_change_nodes") or []) if active_layers.get("viirs_nightlights", True) else [], - "scanners": (d.get("scanners") or []) if active_layers.get("scanners", True) else [], - "weather_alerts": d.get("weather_alerts", []) if active_layers.get("weather_alerts", True) else [], - "ukraine_alerts": d.get("ukraine_alerts", []) if active_layers.get("ukraine_alerts", True) else [], - "air_quality": (d.get("air_quality") or []) if active_layers.get("air_quality", True) else [], - "volcanoes": (d.get("volcanoes") or []) if active_layers.get("volcanoes", True) else [], - "fishing_activity": (d.get("fishing_activity") or []) if active_layers.get("fishing_activity", True) else [], - "crowdthreat": (d.get("crowdthreat") or []) if active_layers.get("crowdthreat", True) else [], - "correlations": (d.get("correlations") or []) if active_layers.get("correlations", True) else [], - "uap_sightings": (d.get("uap_sightings") or []) if active_layers.get("uap_sightings", True) else [], - "wastewater": (d.get("wastewater") or []) if active_layers.get("wastewater", True) else [], - "sar_scenes": (d.get("sar_scenes") or []) if active_layers.get("sar", True) else [], - "sar_anomalies": (d.get("sar_anomalies") or []) if active_layers.get("sar", True) else [], - "sar_aoi_coverage": (d.get("sar_aoi_coverage") or []) if active_layers.get("sar", True) else [], - "freshness": freshness, - } - return Response( - content=orjson.dumps( - _sanitize_payload(payload), - default=str, - option=orjson.OPT_NON_STR_KEYS, - ), - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"}, - ) - - @app.get("/api/oracle/region-intel") @limiter.limit("30/minute") async def oracle_region_intel( diff --git a/backend/routers/data.py b/backend/routers/data.py index 06fbece..92e10ae 100644 --- a/backend/routers/data.py +++ b/backend/routers/data.py @@ -211,6 +211,15 @@ def _sanitize_payload(value): return value +def _live_data_json_bytes(payload: dict) -> bytes: + """Serialize dashboard payloads with the same defensive orjson options everywhere.""" + return orjson.dumps( + _sanitize_payload(payload), + default=str, + option=orjson.OPT_NON_STR_KEYS, + ) + + def _bbox_filter(items: list, s: float, w: float, n: float, e: float, lat_key: str = "lat", lng_key: str = "lng") -> list: pad_lat = (n - s) * 0.2 @@ -561,11 +570,7 @@ async def live_data(request: Request): payload = get_latest_data_deepcopy_snapshot() return Response( - content=orjson.dumps( - _sanitize_payload(payload), - default=str, - option=orjson.OPT_NON_STR_KEYS, - ), + content=_live_data_json_bytes(payload), media_type="application/json", headers={"ETag": etag, "Cache-Control": "no-cache"}, ) @@ -663,7 +668,7 @@ async def bootstrap_critical(request: Request): "bootstrap_payload": True, } return Response( - content=orjson.dumps(_sanitize_payload(payload), default=str, option=orjson.OPT_NON_STR_KEYS), + content=_live_data_json_bytes(payload), media_type="application/json", headers={"ETag": etag, "Cache-Control": "no-cache"}, ) @@ -725,8 +730,11 @@ async def live_data_fast( # to the pre-#288 implementation. if _has_full_bbox(s, w, n, e): payload = _apply_bbox_to_payload(payload, _FAST_BBOX_HEAVY_KEYS, s, w, n, e) - return Response(content=orjson.dumps(_sanitize_payload(payload)), media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"}) + return Response( + content=_live_data_json_bytes(payload), + media_type="application/json", + headers={"ETag": etag, "Cache-Control": "no-cache"}, + ) @router.get("/api/live-data/slow") @@ -800,7 +808,7 @@ async def live_data_slow( if _has_full_bbox(s, w, n, e): payload = _apply_bbox_to_payload(payload, _SLOW_BBOX_HEAVY_KEYS, s, w, n, e) return Response( - content=orjson.dumps(_sanitize_payload(payload), default=str, option=orjson.OPT_NON_STR_KEYS), + content=_live_data_json_bytes(payload), media_type="application/json", headers={"ETag": etag, "Cache-Control": "no-cache"}, ) diff --git a/backend/services/liveuamap_scraper.py b/backend/services/liveuamap_scraper.py index 541061b..2d408bb 100644 --- a/backend/services/liveuamap_scraper.py +++ b/backend/services/liveuamap_scraper.py @@ -39,6 +39,9 @@ def fetch_liveuamap(): viewport={"width": 1920, "height": 1080}, color_scheme="dark", ) + # Bound navigation and script evaluation so a stuck region cannot hang the slow pool. + context.set_default_navigation_timeout(60_000) + context.set_default_timeout(30_000) page = context.new_page() stealth_sync(page) diff --git a/backend/tests/data/duplicate_routes_baseline.json b/backend/tests/data/duplicate_routes_baseline.json index fe0d732..17ff17d 100644 --- a/backend/tests/data/duplicate_routes_baseline.json +++ b/backend/tests/data/duplicate_routes_baseline.json @@ -38,14 +38,6 @@ "main", "routers.health" ], - "GET /api/live-data/fast": [ - "main", - "routers.data" - ], - "GET /api/live-data/slow": [ - "main", - "routers.data" - ], "GET /api/mesh/channels": [ "main", "routers.mesh_public" diff --git a/backend/tests/test_issue_375_production_hardening.py b/backend/tests/test_issue_375_production_hardening.py index 47574a6..8d655ec 100644 --- a/backend/tests/test_issue_375_production_hardening.py +++ b/backend/tests/test_issue_375_production_hardening.py @@ -66,6 +66,24 @@ class TestLiveDataFullEndpoint: assert r2.status_code == 304 assert r2.headers.get("etag") == etag + def test_live_data_fast_serializes_non_json_native_values(self, client): + from datetime import datetime, timezone + + from services.fetchers import _store + + with _store._data_lock: + prior = _store.latest_data.get("sigint") + _store.latest_data["sigint"] = [ + {"source": "aprs", "observed": datetime(2026, 1, 1, tzinfo=timezone.utc)}, + ] + try: + r = client.get("/api/live-data/fast") + assert r.status_code == 200 + assert "2026-01-01" in r.text + finally: + with _store._data_lock: + _store.latest_data["sigint"] = prior + def test_live_data_serializes_non_json_native_values(self, client): from datetime import datetime, timezone