Address #375 production-readiness: dev bind, live-data lock, heavy fetch pool.

Default python main.py to loopback, deep-copy dashboard snapshots outside the store lock with ETag on full live-data, and route GDELT/LiveUAMap/CCTV/slow-tier work through an isolated executor so Playwright jobs cannot starve fast-tier workers.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-04 17:29:04 -06:00
parent 80a01275ff
commit 9a0a9a116a
8 changed files with 209 additions and 23 deletions
+10 -3
View File
@@ -600,9 +600,16 @@ Open `http://localhost:3000` to view the dashboard.
> **Deploying publicly or on a LAN?** No configuration needed for most setups.
> The frontend proxies all API calls through the Next.js server to `BACKEND_URL`,
> which defaults to `http://backend:8000` (Docker internal networking).
> Host port `8000` is only published for local API/debug access. If it conflicts
> with another service, set `BACKEND_PORT=8001` in `.env`; leave `BACKEND_URL`
> as `http://backend:8000` because that is the Docker-internal port.
> Host port `8000` is only published for local API/debug access (`127.0.0.1:8000`
> in `docker-compose.yml`). If it conflicts with another service, set
> `BACKEND_PORT=8001` in `.env`; leave `BACKEND_URL` as `http://backend:8000`
> because that is the Docker-internal port.
>
> **Running the backend outside Docker** (`cd backend && python main.py`):
> the dev server binds **loopback only** (`127.0.0.1:8000`) so other machines on
> your LAN cannot hit admin/local-trust routes with an empty `ADMIN_KEY`. Set
> `SHADOWBROKER_DEV_BIND_ALL=true` in `.env` only when you deliberately need
> `0.0.0.0` and use a strong `ADMIN_KEY` for any non-local callers.
> The backend memory cap is controlled by `BACKEND_MEMORY_LIMIT` and defaults
> to `4G`. If Docker reports OOM events, the backend will restart and slow
> layers can look empty until they repopulate.
+9
View File
@@ -18,6 +18,15 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
# AISHUB_USERNAME=
# AISHUB_POLL_INTERVAL_MINUTES=20
# `python main.py` (uvicorn reload) binds 127.0.0.1:8000 by default so LAN clients
# cannot reach a dev server with empty ADMIN_KEY (#375). Set true only when you
# intentionally need 0.0.0.0 and understand the local-trust implications.
# SHADOWBROKER_DEV_BIND_ALL=false
#
# Thread pool for GDELT, LiveUAMap, CCTV ingest, and slow-tier refresh batches.
# Keeps heavy jobs from starving fast flight/ship workers (default 2).
# SHADOWBROKER_HEAVY_FETCH_WORKERS=2
# Override allowed CORS origins (comma-separated). Defaults to localhost + LAN auto-detect.
# CORS_ORIGINS=http://192.168.1.50:3000,https://my-domain.com
+43 -2
View File
@@ -3851,7 +3851,17 @@ async def update_layers(update: LayerUpdate, request: Request):
@app.get("/api/live-data")
@limiter.limit("120/minute")
async def live_data(request: Request):
return get_latest_data()
etag = _current_etag(prefix="live|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import get_latest_data_deepcopy_snapshot
payload = get_latest_data_deepcopy_snapshot()
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
def _etag_response(request: Request, payload: dict, prefix: str = "", default=None):
@@ -12044,5 +12054,36 @@ async def system_update(request: Request):
return result
def _dev_uvicorn_bind_host() -> str:
"""Default loopback for `python main.py` so LAN clients cannot reach a dev server (#375).
Docker compose still publishes 127.0.0.1:8000; the dashboard stays on :3000.
Set SHADOWBROKER_DEV_BIND_ALL=true only when you intentionally need LAN access
(and use ADMIN_KEY for remote callers).
"""
if str(os.environ.get("SHADOWBROKER_DEV_BIND_ALL", "")).strip().lower() in {
"1",
"true",
"yes",
"on",
}:
return "0.0.0.0"
return "127.0.0.1"
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, timeout_keep_alive=120)
_host = _dev_uvicorn_bind_host()
_port = int(os.environ.get("BACKEND_PORT", "8000"))
if _host == "127.0.0.1":
logger.info(
"Dev server binding %s:%s (loopback). Set SHADOWBROKER_DEV_BIND_ALL=true for 0.0.0.0.",
_host,
_port,
)
uvicorn.run(
"main:app",
host=_host,
port=_port,
reload=True,
timeout_keep_alive=120,
)
+12 -2
View File
@@ -9,7 +9,7 @@ from fastapi.responses import JSONResponse
from pydantic import BaseModel
from limiter import limiter
from auth import require_admin, require_local_operator
from services.data_fetcher import get_latest_data, update_all_data
from services.data_fetcher import update_all_data
import orjson
import json as json_mod
@@ -554,7 +554,17 @@ async def update_layers(update: LayerUpdate, request: Request):
@router.get("/api/live-data")
@limiter.limit("120/minute")
async def live_data(request: Request):
return get_latest_data()
etag = _current_etag(prefix="live|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import get_latest_data_deepcopy_snapshot
payload = get_latest_data_deepcopy_snapshot()
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@router.get("/api/bootstrap/critical")
+50 -6
View File
@@ -145,13 +145,18 @@ _STARTUP_HEAVY_REFRESH_DELAY_S = float(os.environ.get("SHADOWBROKER_STARTUP_HEAV
_STARTUP_HEAVY_REFRESH_STARTED = False
_STARTUP_HEAVY_REFRESH_LOCK = threading.Lock()
_FETCH_WORKERS = int(os.environ.get("SHADOWBROKER_FETCH_WORKERS", "8"))
_HEAVY_FETCH_WORKERS = int(os.environ.get("SHADOWBROKER_HEAVY_FETCH_WORKERS", "2"))
_SLOW_FETCH_CONCURRENCY = int(os.environ.get("SHADOWBROKER_SLOW_FETCH_CONCURRENCY", "4"))
_STARTUP_HEAVY_CONCURRENCY = int(os.environ.get("SHADOWBROKER_STARTUP_HEAVY_CONCURRENCY", "2"))
# Shared thread pool — reused across all fetch cycles instead of creating/destroying per tick
# Fast-tier pool (flights, ships, sigint, …). Slow / heavy work uses a separate pool
# so Playwright, GDELT, CCTV ingest, etc. cannot starve the 60s refresh path (#375).
_SHARED_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
max_workers=max(2, _FETCH_WORKERS), thread_name_prefix="fetch"
)
_SLOW_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
max_workers=max(1, _HEAVY_FETCH_WORKERS), thread_name_prefix="fetch-slow"
)
def _cache_json_safe(value):
@@ -320,10 +325,42 @@ def seed_startup_caches() -> None:
# ---------------------------------------------------------------------------
# Scheduler & Orchestration
# ---------------------------------------------------------------------------
def _executor_for_task_label(label: str) -> concurrent.futures.ThreadPoolExecutor:
if label.startswith(("slow-tier", "startup-heavy")):
return _SLOW_EXECUTOR
return _SHARED_EXECUTOR
def _run_task_with_health_on_executor(
executor: concurrent.futures.ThreadPoolExecutor,
func,
name: str | None = None,
) -> None:
"""Run a scheduled job on the given pool so it cannot starve fast-tier workers."""
task_name = name or getattr(func, "__name__", "task")
future = executor.submit(func)
start = time.perf_counter()
try:
future.result(timeout=_TASK_HARD_TIMEOUT_S)
duration = time.perf_counter() - start
from services.fetch_health import record_success
record_success(task_name, duration_s=duration)
if duration > _SLOW_FETCH_S:
logger.warning("task slow: %s took %.2f}s", task_name, duration)
except Exception as e:
duration = time.perf_counter() - start
from services.fetch_health import record_failure
record_failure(task_name, error=e, duration_s=duration)
logger.exception("task failed: %s", task_name)
def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None):
"""Run tasks concurrently and log any exceptions (do not fail silently)."""
if not funcs:
return
executor = _executor_for_task_label(label)
if max_concurrency is None:
if label.startswith("slow-tier"):
max_concurrency = _SLOW_FETCH_CONCURRENCY
@@ -336,7 +373,7 @@ def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None):
remaining_funcs = list(funcs)
while remaining_funcs:
batch, remaining_funcs = remaining_funcs[:max_concurrency], remaining_funcs[max_concurrency:]
futures = {_SHARED_EXECUTOR.submit(func): (func.__name__, time.perf_counter()) for func in batch}
futures = {executor.submit(func): (func.__name__, time.perf_counter()) for func in batch}
_drain_task_futures(label, futures)
@@ -865,7 +902,7 @@ def start_scheduler():
# GDELT — every 30 minutes (downloads 32 ZIP files per call, avoid rate limits)
_scheduler.add_job(
lambda: _run_task_with_health(fetch_gdelt, "fetch_gdelt"),
lambda: _run_task_with_health_on_executor(_SLOW_EXECUTOR, fetch_gdelt, "fetch_gdelt"),
"interval",
minutes=30,
id="gdelt",
@@ -873,7 +910,9 @@ def start_scheduler():
misfire_grace_time=120,
)
_scheduler.add_job(
lambda: _run_task_with_health(update_liveuamap, "update_liveuamap"),
lambda: _run_task_with_health_on_executor(
_SLOW_EXECUTOR, update_liveuamap, "update_liveuamap"
),
"interval",
minutes=30,
id="liveuamap",
@@ -934,7 +973,9 @@ def start_scheduler():
logger.warning(f"CCTV post-ingest refresh failed: {e}")
_scheduler.add_job(
_run_cctv_ingest_cycle,
lambda: _run_task_with_health_on_executor(
_SLOW_EXECUTOR, _run_cctv_ingest_cycle, "cctv_ingest_cycle"
),
"interval",
minutes=10,
id="cctv_ingest",
@@ -1151,7 +1192,10 @@ def start_scheduler():
def stop_scheduler():
if _scheduler:
_scheduler.shutdown(wait=False)
_SLOW_EXECUTOR.shutdown(wait=False, cancel_futures=True)
def get_latest_data():
return get_latest_data_subset(*latest_data.keys())
from services.fetchers._store import get_latest_data_deepcopy_snapshot
return get_latest_data_deepcopy_snapshot()
+14 -8
View File
@@ -241,16 +241,22 @@ def get_active_layers_version() -> int:
def get_latest_data_subset(*keys: str) -> DashboardData:
"""Return a deep snapshot of only the requested top-level keys.
This avoids cloning the entire dashboard store for endpoints that only need
a small tier-specific subset. Deep copy ensures callers cannot mutate
nested structures (e.g. individual flight dicts) and affect the live store.
Grabs references under the lock, then deep-copies outside it so fetcher
writers are not blocked for the duration of a large clone (#375).
"""
with _data_lock:
snap: DashboardData = {}
for key in keys:
value = latest_data.get(key)
snap[key] = copy.deepcopy(value)
return snap
items = [(key, latest_data.get(key)) for key in keys]
snap: DashboardData = {}
for key, value in items:
snap[key] = copy.deepcopy(value)
return snap
def get_latest_data_deepcopy_snapshot() -> DashboardData:
"""Deep-copy the full dashboard for legacy /api/live-data consumers."""
with _data_lock:
items = list(latest_data.items())
return {key: copy.deepcopy(value) for key, value in items}
def get_latest_data_subset_refs(*keys: str) -> DashboardData:
+4 -2
View File
@@ -22,9 +22,11 @@ class TestHealthEndpoint:
class TestLiveDataEndpoints:
def test_live_data_returns_200(self, client):
def test_live_data_returns_200_or_304(self, client):
r = client.get("/api/live-data")
assert r.status_code == 200
assert r.status_code in (200, 304)
if r.status_code == 200:
assert r.headers.get("etag")
def test_live_data_fast_returns_200_or_304(self, client):
r = client.get("/api/live-data/fast")
@@ -0,0 +1,67 @@
"""Regression tests for GitHub #375 production-readiness fixes."""
import os
import pytest
class TestDevBindHost:
def test_defaults_to_loopback(self, monkeypatch):
monkeypatch.delenv("SHADOWBROKER_DEV_BIND_ALL", raising=False)
from main import _dev_uvicorn_bind_host
assert _dev_uvicorn_bind_host() == "127.0.0.1"
@pytest.mark.parametrize("value", ("1", "true", "yes", "on", "TRUE"))
def test_bind_all_opt_in(self, monkeypatch, value):
monkeypatch.setenv("SHADOWBROKER_DEV_BIND_ALL", value)
from main import _dev_uvicorn_bind_host
assert _dev_uvicorn_bind_host() == "0.0.0.0"
class TestDataStoreSnapshots:
def test_deepcopy_snapshot_isolated_from_store(self):
from services.fetchers import _store
original = [{"title": "baseline"}]
with _store._data_lock:
_store.latest_data["news"] = list(original)
snap = _store.get_latest_data_deepcopy_snapshot()
snap["news"][0]["title"] = "mutated"
with _store._data_lock:
assert _store.latest_data["news"][0]["title"] == "baseline"
def test_subset_deepcopy_isolated(self):
from services.fetchers import _store
with _store._data_lock:
_store.latest_data["news"] = [{"title": "subset"}]
snap = _store.get_latest_data_subset("news")
snap["news"][0]["title"] = "changed"
with _store._data_lock:
assert _store.latest_data["news"][0]["title"] == "subset"
class TestHeavyFetchExecutorRouting:
def test_slow_tier_uses_slow_executor(self):
from services.data_fetcher import (
_SLOW_EXECUTOR,
_SHARED_EXECUTOR,
_executor_for_task_label,
)
assert _executor_for_task_label("slow-tier-refresh") is _SLOW_EXECUTOR
assert _executor_for_task_label("startup-heavy-warm") is _SLOW_EXECUTOR
assert _executor_for_task_label("fast-tier-refresh") is _SHARED_EXECUTOR
class TestLiveDataFullEndpoint:
def test_live_data_supports_etag_304(self, client):
r1 = client.get("/api/live-data")
assert r1.status_code == 200
etag = r1.headers.get("etag")
assert etag
r2 = client.get("/api/live-data", headers={"If-None-Match": etag})
assert r2.status_code == 304
assert r2.headers.get("etag") == etag