diff --git a/README.md b/README.md index 5ef2af9..64f58d2 100644 --- a/README.md +++ b/README.md @@ -600,9 +600,16 @@ Open `http://localhost:3000` to view the dashboard. > **Deploying publicly or on a LAN?** No configuration needed for most setups. > The frontend proxies all API calls through the Next.js server to `BACKEND_URL`, > which defaults to `http://backend:8000` (Docker internal networking). -> Host port `8000` is only published for local API/debug access. If it conflicts -> with another service, set `BACKEND_PORT=8001` in `.env`; leave `BACKEND_URL` -> as `http://backend:8000` because that is the Docker-internal port. +> Host port `8000` is only published for local API/debug access (`127.0.0.1:8000` +> in `docker-compose.yml`). If it conflicts with another service, set +> `BACKEND_PORT=8001` in `.env`; leave `BACKEND_URL` as `http://backend:8000` +> because that is the Docker-internal port. +> +> **Running the backend outside Docker** (`cd backend && python main.py`): +> the dev server binds **loopback only** (`127.0.0.1:8000`) so other machines on +> your LAN cannot hit admin/local-trust routes with an empty `ADMIN_KEY`. Set +> `SHADOWBROKER_DEV_BIND_ALL=true` in `.env` only when you deliberately need +> `0.0.0.0` and use a strong `ADMIN_KEY` for any non-local callers. > The backend memory cap is controlled by `BACKEND_MEMORY_LIMIT` and defaults > to `4G`. If Docker reports OOM events, the backend will restart and slow > layers can look empty until they repopulate. diff --git a/backend/.env.example b/backend/.env.example index b5f3bb6..c536109 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -18,6 +18,15 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key # AISHUB_USERNAME= # AISHUB_POLL_INTERVAL_MINUTES=20 +# `python main.py` (uvicorn reload) binds 127.0.0.1:8000 by default so LAN clients +# cannot reach a dev server with empty ADMIN_KEY (#375). Set true only when you +# intentionally need 0.0.0.0 and understand the local-trust implications. +# SHADOWBROKER_DEV_BIND_ALL=false +# +# Thread pool for GDELT, LiveUAMap, CCTV ingest, and slow-tier refresh batches. +# Keeps heavy jobs from starving fast flight/ship workers (default 2). +# SHADOWBROKER_HEAVY_FETCH_WORKERS=2 + # Override allowed CORS origins (comma-separated). Defaults to localhost + LAN auto-detect. # CORS_ORIGINS=http://192.168.1.50:3000,https://my-domain.com diff --git a/backend/main.py b/backend/main.py index 9f83644..8549b6b 100644 --- a/backend/main.py +++ b/backend/main.py @@ -3851,7 +3851,17 @@ async def update_layers(update: LayerUpdate, request: Request): @app.get("/api/live-data") @limiter.limit("120/minute") async def live_data(request: Request): - return get_latest_data() + etag = _current_etag(prefix="live|full|") + if request.headers.get("if-none-match") == etag: + return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) + from services.fetchers._store import get_latest_data_deepcopy_snapshot + + payload = get_latest_data_deepcopy_snapshot() + return Response( + content=orjson.dumps(_sanitize_payload(payload)), + media_type="application/json", + headers={"ETag": etag, "Cache-Control": "no-cache"}, + ) def _etag_response(request: Request, payload: dict, prefix: str = "", default=None): @@ -12044,5 +12054,36 @@ async def system_update(request: Request): return result +def _dev_uvicorn_bind_host() -> str: + """Default loopback for `python main.py` so LAN clients cannot reach a dev server (#375). + + Docker compose still publishes 127.0.0.1:8000; the dashboard stays on :3000. + Set SHADOWBROKER_DEV_BIND_ALL=true only when you intentionally need LAN access + (and use ADMIN_KEY for remote callers). + """ + if str(os.environ.get("SHADOWBROKER_DEV_BIND_ALL", "")).strip().lower() in { + "1", + "true", + "yes", + "on", + }: + return "0.0.0.0" + return "127.0.0.1" + + if __name__ == "__main__": - uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True, timeout_keep_alive=120) + _host = _dev_uvicorn_bind_host() + _port = int(os.environ.get("BACKEND_PORT", "8000")) + if _host == "127.0.0.1": + logger.info( + "Dev server binding %s:%s (loopback). Set SHADOWBROKER_DEV_BIND_ALL=true for 0.0.0.0.", + _host, + _port, + ) + uvicorn.run( + "main:app", + host=_host, + port=_port, + reload=True, + timeout_keep_alive=120, + ) diff --git a/backend/routers/data.py b/backend/routers/data.py index 54749b0..82c3971 100644 --- a/backend/routers/data.py +++ b/backend/routers/data.py @@ -9,7 +9,7 @@ from fastapi.responses import JSONResponse from pydantic import BaseModel from limiter import limiter from auth import require_admin, require_local_operator -from services.data_fetcher import get_latest_data, update_all_data +from services.data_fetcher import update_all_data import orjson import json as json_mod @@ -554,7 +554,17 @@ async def update_layers(update: LayerUpdate, request: Request): @router.get("/api/live-data") @limiter.limit("120/minute") async def live_data(request: Request): - return get_latest_data() + etag = _current_etag(prefix="live|full|") + if request.headers.get("if-none-match") == etag: + return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"}) + from services.fetchers._store import get_latest_data_deepcopy_snapshot + + payload = get_latest_data_deepcopy_snapshot() + return Response( + content=orjson.dumps(_sanitize_payload(payload)), + media_type="application/json", + headers={"ETag": etag, "Cache-Control": "no-cache"}, + ) @router.get("/api/bootstrap/critical") diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 5db0193..459c5f8 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -145,13 +145,18 @@ _STARTUP_HEAVY_REFRESH_DELAY_S = float(os.environ.get("SHADOWBROKER_STARTUP_HEAV _STARTUP_HEAVY_REFRESH_STARTED = False _STARTUP_HEAVY_REFRESH_LOCK = threading.Lock() _FETCH_WORKERS = int(os.environ.get("SHADOWBROKER_FETCH_WORKERS", "8")) +_HEAVY_FETCH_WORKERS = int(os.environ.get("SHADOWBROKER_HEAVY_FETCH_WORKERS", "2")) _SLOW_FETCH_CONCURRENCY = int(os.environ.get("SHADOWBROKER_SLOW_FETCH_CONCURRENCY", "4")) _STARTUP_HEAVY_CONCURRENCY = int(os.environ.get("SHADOWBROKER_STARTUP_HEAVY_CONCURRENCY", "2")) -# Shared thread pool — reused across all fetch cycles instead of creating/destroying per tick +# Fast-tier pool (flights, ships, sigint, …). Slow / heavy work uses a separate pool +# so Playwright, GDELT, CCTV ingest, etc. cannot starve the 60s refresh path (#375). _SHARED_EXECUTOR = concurrent.futures.ThreadPoolExecutor( max_workers=max(2, _FETCH_WORKERS), thread_name_prefix="fetch" ) +_SLOW_EXECUTOR = concurrent.futures.ThreadPoolExecutor( + max_workers=max(1, _HEAVY_FETCH_WORKERS), thread_name_prefix="fetch-slow" +) def _cache_json_safe(value): @@ -320,10 +325,42 @@ def seed_startup_caches() -> None: # --------------------------------------------------------------------------- # Scheduler & Orchestration # --------------------------------------------------------------------------- +def _executor_for_task_label(label: str) -> concurrent.futures.ThreadPoolExecutor: + if label.startswith(("slow-tier", "startup-heavy")): + return _SLOW_EXECUTOR + return _SHARED_EXECUTOR + + +def _run_task_with_health_on_executor( + executor: concurrent.futures.ThreadPoolExecutor, + func, + name: str | None = None, +) -> None: + """Run a scheduled job on the given pool so it cannot starve fast-tier workers.""" + task_name = name or getattr(func, "__name__", "task") + future = executor.submit(func) + start = time.perf_counter() + try: + future.result(timeout=_TASK_HARD_TIMEOUT_S) + duration = time.perf_counter() - start + from services.fetch_health import record_success + + record_success(task_name, duration_s=duration) + if duration > _SLOW_FETCH_S: + logger.warning("task slow: %s took %.2f}s", task_name, duration) + except Exception as e: + duration = time.perf_counter() - start + from services.fetch_health import record_failure + + record_failure(task_name, error=e, duration_s=duration) + logger.exception("task failed: %s", task_name) + + def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None): """Run tasks concurrently and log any exceptions (do not fail silently).""" if not funcs: return + executor = _executor_for_task_label(label) if max_concurrency is None: if label.startswith("slow-tier"): max_concurrency = _SLOW_FETCH_CONCURRENCY @@ -336,7 +373,7 @@ def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None): remaining_funcs = list(funcs) while remaining_funcs: batch, remaining_funcs = remaining_funcs[:max_concurrency], remaining_funcs[max_concurrency:] - futures = {_SHARED_EXECUTOR.submit(func): (func.__name__, time.perf_counter()) for func in batch} + futures = {executor.submit(func): (func.__name__, time.perf_counter()) for func in batch} _drain_task_futures(label, futures) @@ -865,7 +902,7 @@ def start_scheduler(): # GDELT — every 30 minutes (downloads 32 ZIP files per call, avoid rate limits) _scheduler.add_job( - lambda: _run_task_with_health(fetch_gdelt, "fetch_gdelt"), + lambda: _run_task_with_health_on_executor(_SLOW_EXECUTOR, fetch_gdelt, "fetch_gdelt"), "interval", minutes=30, id="gdelt", @@ -873,7 +910,9 @@ def start_scheduler(): misfire_grace_time=120, ) _scheduler.add_job( - lambda: _run_task_with_health(update_liveuamap, "update_liveuamap"), + lambda: _run_task_with_health_on_executor( + _SLOW_EXECUTOR, update_liveuamap, "update_liveuamap" + ), "interval", minutes=30, id="liveuamap", @@ -934,7 +973,9 @@ def start_scheduler(): logger.warning(f"CCTV post-ingest refresh failed: {e}") _scheduler.add_job( - _run_cctv_ingest_cycle, + lambda: _run_task_with_health_on_executor( + _SLOW_EXECUTOR, _run_cctv_ingest_cycle, "cctv_ingest_cycle" + ), "interval", minutes=10, id="cctv_ingest", @@ -1151,7 +1192,10 @@ def start_scheduler(): def stop_scheduler(): if _scheduler: _scheduler.shutdown(wait=False) + _SLOW_EXECUTOR.shutdown(wait=False, cancel_futures=True) def get_latest_data(): - return get_latest_data_subset(*latest_data.keys()) + from services.fetchers._store import get_latest_data_deepcopy_snapshot + + return get_latest_data_deepcopy_snapshot() diff --git a/backend/services/fetchers/_store.py b/backend/services/fetchers/_store.py index 1751237..36a39b9 100644 --- a/backend/services/fetchers/_store.py +++ b/backend/services/fetchers/_store.py @@ -241,16 +241,22 @@ def get_active_layers_version() -> int: def get_latest_data_subset(*keys: str) -> DashboardData: """Return a deep snapshot of only the requested top-level keys. - This avoids cloning the entire dashboard store for endpoints that only need - a small tier-specific subset. Deep copy ensures callers cannot mutate - nested structures (e.g. individual flight dicts) and affect the live store. + Grabs references under the lock, then deep-copies outside it so fetcher + writers are not blocked for the duration of a large clone (#375). """ with _data_lock: - snap: DashboardData = {} - for key in keys: - value = latest_data.get(key) - snap[key] = copy.deepcopy(value) - return snap + items = [(key, latest_data.get(key)) for key in keys] + snap: DashboardData = {} + for key, value in items: + snap[key] = copy.deepcopy(value) + return snap + + +def get_latest_data_deepcopy_snapshot() -> DashboardData: + """Deep-copy the full dashboard for legacy /api/live-data consumers.""" + with _data_lock: + items = list(latest_data.items()) + return {key: copy.deepcopy(value) for key, value in items} def get_latest_data_subset_refs(*keys: str) -> DashboardData: diff --git a/backend/tests/test_api_smoke.py b/backend/tests/test_api_smoke.py index 27bae94..666dc57 100644 --- a/backend/tests/test_api_smoke.py +++ b/backend/tests/test_api_smoke.py @@ -22,9 +22,11 @@ class TestHealthEndpoint: class TestLiveDataEndpoints: - def test_live_data_returns_200(self, client): + def test_live_data_returns_200_or_304(self, client): r = client.get("/api/live-data") - assert r.status_code == 200 + assert r.status_code in (200, 304) + if r.status_code == 200: + assert r.headers.get("etag") def test_live_data_fast_returns_200_or_304(self, client): r = client.get("/api/live-data/fast") diff --git a/backend/tests/test_issue_375_production_hardening.py b/backend/tests/test_issue_375_production_hardening.py new file mode 100644 index 0000000..2bac5e9 --- /dev/null +++ b/backend/tests/test_issue_375_production_hardening.py @@ -0,0 +1,67 @@ +"""Regression tests for GitHub #375 production-readiness fixes.""" + +import os + +import pytest + + +class TestDevBindHost: + def test_defaults_to_loopback(self, monkeypatch): + monkeypatch.delenv("SHADOWBROKER_DEV_BIND_ALL", raising=False) + from main import _dev_uvicorn_bind_host + + assert _dev_uvicorn_bind_host() == "127.0.0.1" + + @pytest.mark.parametrize("value", ("1", "true", "yes", "on", "TRUE")) + def test_bind_all_opt_in(self, monkeypatch, value): + monkeypatch.setenv("SHADOWBROKER_DEV_BIND_ALL", value) + from main import _dev_uvicorn_bind_host + + assert _dev_uvicorn_bind_host() == "0.0.0.0" + + +class TestDataStoreSnapshots: + def test_deepcopy_snapshot_isolated_from_store(self): + from services.fetchers import _store + + original = [{"title": "baseline"}] + with _store._data_lock: + _store.latest_data["news"] = list(original) + snap = _store.get_latest_data_deepcopy_snapshot() + snap["news"][0]["title"] = "mutated" + with _store._data_lock: + assert _store.latest_data["news"][0]["title"] == "baseline" + + def test_subset_deepcopy_isolated(self): + from services.fetchers import _store + + with _store._data_lock: + _store.latest_data["news"] = [{"title": "subset"}] + snap = _store.get_latest_data_subset("news") + snap["news"][0]["title"] = "changed" + with _store._data_lock: + assert _store.latest_data["news"][0]["title"] == "subset" + + +class TestHeavyFetchExecutorRouting: + def test_slow_tier_uses_slow_executor(self): + from services.data_fetcher import ( + _SLOW_EXECUTOR, + _SHARED_EXECUTOR, + _executor_for_task_label, + ) + + assert _executor_for_task_label("slow-tier-refresh") is _SLOW_EXECUTOR + assert _executor_for_task_label("startup-heavy-warm") is _SLOW_EXECUTOR + assert _executor_for_task_label("fast-tier-refresh") is _SHARED_EXECUTOR + + +class TestLiveDataFullEndpoint: + def test_live_data_supports_etag_304(self, client): + r1 = client.get("/api/live-data") + assert r1.status_code == 200 + etag = r1.headers.get("etag") + assert etag + r2 = client.get("/api/live-data", headers={"If-None-Match": etag}) + assert r2.status_code == 304 + assert r2.headers.get("etag") == etag