From bd81a940ff3e3ceb5a53b443f771fa73760a2192 Mon Sep 17 00:00:00 2001 From: BigBodyCobain <43977454+BigBodyCobain@users.noreply.github.com> Date: Sat, 6 Jun 2026 20:10:59 -0600 Subject: [PATCH] Follow up on #375 review: dedupe live-data route and harden serializers. Align full /api/live-data with slow-tier orjson options, remove dead main.py duplicate, cap slow batches to pool size, cancel queued work on timeout, and stop retrying HTTP 4xx/5xx. Co-authored-by: Cursor --- backend/main.py | 16 ----- backend/routers/data.py | 6 +- backend/services/data_fetcher.py | 19 +++++- backend/services/fetchers/_store.py | 6 +- backend/services/fetchers/retry.py | 11 +++- .../tests/data/duplicate_routes_baseline.json | 4 -- .../test_issue_375_production_hardening.py | 65 +++++++++++++++++++ 7 files changed, 100 insertions(+), 27 deletions(-) diff --git a/backend/main.py b/backend/main.py index 8549b6b..8695b54 100644 --- a/backend/main.py +++ b/backend/main.py @@ -3848,22 +3848,6 @@ async def update_layers(update: LayerUpdate, request: Request): return {"status": "ok"} -@app.get("/api/live-data") -@limiter.limit("120/minute") -async def live_data(request: Request): - etag = _current_etag(prefix="live|full|") - if request.headers.get("if-none-match") == etag: - return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) - from services.fetchers._store import get_latest_data_deepcopy_snapshot - - payload = get_latest_data_deepcopy_snapshot() - return Response( - content=orjson.dumps(_sanitize_payload(payload)), - media_type="application/json", - headers={"ETag": etag, "Cache-Control": "no-cache"}, - ) - - def _etag_response(request: Request, payload: dict, prefix: str = "", default=None): """Serialize once, use data version for ETag, return 304 or full response. diff --git a/backend/routers/data.py b/backend/routers/data.py index 82c3971..06fbece 100644 --- a/backend/routers/data.py +++ b/backend/routers/data.py @@ -561,7 +561,11 @@ async def live_data(request: Request): payload = get_latest_data_deepcopy_snapshot() return Response( - content=orjson.dumps(_sanitize_payload(payload)), + content=orjson.dumps( + _sanitize_payload(payload), + default=str, + option=orjson.OPT_NON_STR_KEYS, + ), media_type="application/json", headers={"ETag": etag, "Cache-Control": "no-cache"}, ) diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 459c5f8..a569051 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -347,7 +347,14 @@ def _run_task_with_health_on_executor( record_success(task_name, duration_s=duration) if duration > _SLOW_FETCH_S: - logger.warning("task slow: %s took %.2f}s", task_name, duration) + logger.warning("task slow: %s took %.2fs", task_name, duration) + except concurrent.futures.TimeoutError: + future.cancel() + duration = time.perf_counter() - start + from services.fetch_health import record_failure + + record_failure(task_name, error=TimeoutError(f"{task_name} timed out"), duration_s=duration) + logger.error("task timed out: %s (%.2fs)", task_name, duration) except Exception as e: duration = time.perf_counter() - start from services.fetch_health import record_failure @@ -368,7 +375,8 @@ def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None): max_concurrency = _STARTUP_HEAVY_CONCURRENCY else: max_concurrency = len(funcs) - max_concurrency = max(1, min(max_concurrency, len(funcs))) + pool_workers = getattr(executor, "_max_workers", len(funcs)) + max_concurrency = max(1, min(max_concurrency, len(funcs), pool_workers)) remaining_funcs = list(funcs) while remaining_funcs: @@ -390,6 +398,13 @@ def _drain_task_futures(label: str, futures: dict): record_success(name, duration_s=duration) if duration > _SLOW_FETCH_S: logger.warning(f"{label} task slow: {name} took {duration:.2f}s") + except concurrent.futures.TimeoutError: + future.cancel() + duration = time.perf_counter() - start + from services.fetch_health import record_failure + + record_failure(name, error=TimeoutError(f"{name} timed out"), duration_s=duration) + logger.error("%s task timed out: %s (%.2fs)", label, name, duration) except Exception as e: duration = time.perf_counter() - start from services.fetch_health import record_failure diff --git a/backend/services/fetchers/_store.py b/backend/services/fetchers/_store.py index 36a39b9..b376634 100644 --- a/backend/services/fetchers/_store.py +++ b/backend/services/fetchers/_store.py @@ -230,12 +230,14 @@ _active_layers_version: int = 0 def bump_active_layers_version() -> None: """Increment the active-layer version when frontend toggles change response shape.""" global _active_layers_version - _active_layers_version += 1 + with _data_lock: + _active_layers_version += 1 def get_active_layers_version() -> int: """Return the current active-layer version (for ETag generation).""" - return _active_layers_version + with _data_lock: + return _active_layers_version def get_latest_data_subset(*keys: str) -> DashboardData: diff --git a/backend/services/fetchers/retry.py b/backend/services/fetchers/retry.py index 73e2ad0..d61c6ad 100644 --- a/backend/services/fetchers/retry.py +++ b/backend/services/fetchers/retry.py @@ -11,15 +11,20 @@ import random import logging import functools import requests +from requests.exceptions import ChunkedEncodingError, ConnectionError as RequestsConnectionError +from requests.exceptions import Timeout as RequestsTimeout logger = logging.getLogger(__name__) -# Only retry on transient network/OS errors — not on parse errors, key errors, etc. +# Only retry on transient network/OS errors — not parse/key errors or HTTP 4xx/5xx. +# requests.HTTPError (from raise_for_status) is intentionally excluded. TRANSIENT_ERRORS = ( TimeoutError, ConnectionError, OSError, - requests.RequestException, + RequestsConnectionError, + RequestsTimeout, + ChunkedEncodingError, ) @@ -43,6 +48,8 @@ def with_retry(max_retries: int = 3, base_delay: float = 2.0, max_delay: float = for attempt in range(1 + max_retries): try: return func(*args, **kwargs) + except requests.HTTPError: + raise except TRANSIENT_ERRORS as exc: last_exc = exc if attempt < max_retries: diff --git a/backend/tests/data/duplicate_routes_baseline.json b/backend/tests/data/duplicate_routes_baseline.json index 0b89b1a..fe0d732 100644 --- a/backend/tests/data/duplicate_routes_baseline.json +++ b/backend/tests/data/duplicate_routes_baseline.json @@ -38,10 +38,6 @@ "main", "routers.health" ], - "GET /api/live-data": [ - "main", - "routers.data" - ], "GET /api/live-data/fast": [ "main", "routers.data" diff --git a/backend/tests/test_issue_375_production_hardening.py b/backend/tests/test_issue_375_production_hardening.py index 2bac5e9..47574a6 100644 --- a/backend/tests/test_issue_375_production_hardening.py +++ b/backend/tests/test_issue_375_production_hardening.py @@ -65,3 +65,68 @@ class TestLiveDataFullEndpoint: r2 = client.get("/api/live-data", headers={"If-None-Match": etag}) assert r2.status_code == 304 assert r2.headers.get("etag") == etag + + def test_live_data_serializes_non_json_native_values(self, client): + from datetime import datetime, timezone + + from services.fetchers import _store + + with _store._data_lock: + prior = _store.latest_data.get("gdelt") + _store.latest_data["gdelt"] = [ + {"observed": datetime(2026, 1, 1, tzinfo=timezone.utc)}, + ] + try: + r = client.get("/api/live-data") + assert r.status_code == 200 + assert "2026-01-01" in r.text + finally: + with _store._data_lock: + _store.latest_data["gdelt"] = prior + + +class TestSlowTaskConcurrency: + def test_run_tasks_caps_batch_size_to_executor_workers(self, monkeypatch): + from unittest.mock import MagicMock + + import services.data_fetcher as df + + class _FakeExecutor: + _max_workers = 2 + + def submit(self, func): + return MagicMock() + + mock_executor = _FakeExecutor() + monkeypatch.setattr(df, "_executor_for_task_label", lambda _label: mock_executor) + monkeypatch.setattr(df, "_SLOW_FETCH_CONCURRENCY", 8) + + batch_sizes = [] + + def _capture_drain(_label, futures): + batch_sizes.append(len(futures)) + + monkeypatch.setattr(df, "_drain_task_futures", _capture_drain) + + jobs = [lambda: None for _ in range(5)] + df._run_tasks("slow-tier-test", jobs) + + assert batch_sizes == [2, 2, 1] + + +class TestFetcherRetryScope: + def test_http_error_is_not_retried(self, monkeypatch): + import requests + + from services.fetchers.retry import with_retry + + attempts = {"n": 0} + + @with_retry(max_retries=2, base_delay=0.01) + def _raises_http(): + attempts["n"] += 1 + raise requests.HTTPError("403 Client Error") + + with pytest.raises(requests.HTTPError): + _raises_http() + assert attempts["n"] == 1