Follow up on #375 review: dedupe live-data route and harden serializers.

Align full /api/live-data with slow-tier orjson options, remove dead main.py duplicate, cap slow batches to pool size, cancel queued work on timeout, and stop retrying HTTP 4xx/5xx.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-06 20:10:59 -06:00
parent 9a0a9a116a
commit bd81a940ff
7 changed files with 100 additions and 27 deletions
-16
View File
@@ -3848,22 +3848,6 @@ async def update_layers(update: LayerUpdate, request: Request):
return {"status": "ok"}
@app.get("/api/live-data")
@limiter.limit("120/minute")
async def live_data(request: Request):
etag = _current_etag(prefix="live|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import get_latest_data_deepcopy_snapshot
payload = get_latest_data_deepcopy_snapshot()
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
def _etag_response(request: Request, payload: dict, prefix: str = "", default=None):
"""Serialize once, use data version for ETag, return 304 or full response.
+5 -1
View File
@@ -561,7 +561,11 @@ async def live_data(request: Request):
payload = get_latest_data_deepcopy_snapshot()
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
content=orjson.dumps(
_sanitize_payload(payload),
default=str,
option=orjson.OPT_NON_STR_KEYS,
),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
+17 -2
View File
@@ -347,7 +347,14 @@ def _run_task_with_health_on_executor(
record_success(task_name, duration_s=duration)
if duration > _SLOW_FETCH_S:
logger.warning("task slow: %s took %.2f}s", task_name, duration)
logger.warning("task slow: %s took %.2fs", task_name, duration)
except concurrent.futures.TimeoutError:
future.cancel()
duration = time.perf_counter() - start
from services.fetch_health import record_failure
record_failure(task_name, error=TimeoutError(f"{task_name} timed out"), duration_s=duration)
logger.error("task timed out: %s (%.2fs)", task_name, duration)
except Exception as e:
duration = time.perf_counter() - start
from services.fetch_health import record_failure
@@ -368,7 +375,8 @@ def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None):
max_concurrency = _STARTUP_HEAVY_CONCURRENCY
else:
max_concurrency = len(funcs)
max_concurrency = max(1, min(max_concurrency, len(funcs)))
pool_workers = getattr(executor, "_max_workers", len(funcs))
max_concurrency = max(1, min(max_concurrency, len(funcs), pool_workers))
remaining_funcs = list(funcs)
while remaining_funcs:
@@ -390,6 +398,13 @@ def _drain_task_futures(label: str, futures: dict):
record_success(name, duration_s=duration)
if duration > _SLOW_FETCH_S:
logger.warning(f"{label} task slow: {name} took {duration:.2f}s")
except concurrent.futures.TimeoutError:
future.cancel()
duration = time.perf_counter() - start
from services.fetch_health import record_failure
record_failure(name, error=TimeoutError(f"{name} timed out"), duration_s=duration)
logger.error("%s task timed out: %s (%.2fs)", label, name, duration)
except Exception as e:
duration = time.perf_counter() - start
from services.fetch_health import record_failure
+4 -2
View File
@@ -230,12 +230,14 @@ _active_layers_version: int = 0
def bump_active_layers_version() -> None:
"""Increment the active-layer version when frontend toggles change response shape."""
global _active_layers_version
_active_layers_version += 1
with _data_lock:
_active_layers_version += 1
def get_active_layers_version() -> int:
"""Return the current active-layer version (for ETag generation)."""
return _active_layers_version
with _data_lock:
return _active_layers_version
def get_latest_data_subset(*keys: str) -> DashboardData:
+9 -2
View File
@@ -11,15 +11,20 @@ import random
import logging
import functools
import requests
from requests.exceptions import ChunkedEncodingError, ConnectionError as RequestsConnectionError
from requests.exceptions import Timeout as RequestsTimeout
logger = logging.getLogger(__name__)
# Only retry on transient network/OS errors — not on parse errors, key errors, etc.
# Only retry on transient network/OS errors — not parse/key errors or HTTP 4xx/5xx.
# requests.HTTPError (from raise_for_status) is intentionally excluded.
TRANSIENT_ERRORS = (
TimeoutError,
ConnectionError,
OSError,
requests.RequestException,
RequestsConnectionError,
RequestsTimeout,
ChunkedEncodingError,
)
@@ -43,6 +48,8 @@ def with_retry(max_retries: int = 3, base_delay: float = 2.0, max_delay: float =
for attempt in range(1 + max_retries):
try:
return func(*args, **kwargs)
except requests.HTTPError:
raise
except TRANSIENT_ERRORS as exc:
last_exc = exc
if attempt < max_retries:
@@ -38,10 +38,6 @@
"main",
"routers.health"
],
"GET /api/live-data": [
"main",
"routers.data"
],
"GET /api/live-data/fast": [
"main",
"routers.data"
@@ -65,3 +65,68 @@ class TestLiveDataFullEndpoint:
r2 = client.get("/api/live-data", headers={"If-None-Match": etag})
assert r2.status_code == 304
assert r2.headers.get("etag") == etag
def test_live_data_serializes_non_json_native_values(self, client):
from datetime import datetime, timezone
from services.fetchers import _store
with _store._data_lock:
prior = _store.latest_data.get("gdelt")
_store.latest_data["gdelt"] = [
{"observed": datetime(2026, 1, 1, tzinfo=timezone.utc)},
]
try:
r = client.get("/api/live-data")
assert r.status_code == 200
assert "2026-01-01" in r.text
finally:
with _store._data_lock:
_store.latest_data["gdelt"] = prior
class TestSlowTaskConcurrency:
def test_run_tasks_caps_batch_size_to_executor_workers(self, monkeypatch):
from unittest.mock import MagicMock
import services.data_fetcher as df
class _FakeExecutor:
_max_workers = 2
def submit(self, func):
return MagicMock()
mock_executor = _FakeExecutor()
monkeypatch.setattr(df, "_executor_for_task_label", lambda _label: mock_executor)
monkeypatch.setattr(df, "_SLOW_FETCH_CONCURRENCY", 8)
batch_sizes = []
def _capture_drain(_label, futures):
batch_sizes.append(len(futures))
monkeypatch.setattr(df, "_drain_task_futures", _capture_drain)
jobs = [lambda: None for _ in range(5)]
df._run_tasks("slow-tier-test", jobs)
assert batch_sizes == [2, 2, 1]
class TestFetcherRetryScope:
def test_http_error_is_not_retried(self, monkeypatch):
import requests
from services.fetchers.retry import with_retry
attempts = {"n": 0}
@with_retry(max_retries=2, base_delay=0.01)
def _raises_http():
attempts["n"] += 1
raise requests.HTTPError("403 Client Error")
with pytest.raises(requests.HTTPError):
_raises_http()
assert attempts["n"] == 1