mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-07 18:06:49 +02:00
Stabilize Docker startup and runtime proxy
Reduce cold-start stalls by raising the default backend memory limit, bounding heavy feed concurrency, preserving non-empty startup caches, and refreshing working news feeds. Fix the Next API proxy for Docker control-plane writes by stripping unsupported hop/body headers and forwarding small request bodies safely. Keep the dashboard dynamic so production users do not get stuck on a cached startup shell.
This commit is contained in:
@@ -58,6 +58,15 @@ ADMIN_KEY=
|
||||
# FAST_STARTUP_CACHE_MAX_AGE_S=21600
|
||||
# INTEL_STARTUP_CACHE_MAX_AGE_S=21600
|
||||
|
||||
# Docker resource tuning. The backend synthesizes large geospatial feeds; keep
|
||||
# this at 4G or higher on hosts that run AIS, OpenSky, CCTV, satellites, and
|
||||
# threat feeds together. Lower caps can cause Docker OOM restarts and empty
|
||||
# slow layers such as news, UAP sightings, military bases, and wastewater.
|
||||
# BACKEND_MEMORY_LIMIT=4G
|
||||
# SHADOWBROKER_FETCH_WORKERS=8
|
||||
# SHADOWBROKER_SLOW_FETCH_CONCURRENCY=4
|
||||
# SHADOWBROKER_STARTUP_HEAVY_CONCURRENCY=2
|
||||
|
||||
# Google Earth Engine for VIIRS night lights change detection (optional).
|
||||
# pip install earthengine-api
|
||||
# GEE_SERVICE_ACCOUNT_KEY=
|
||||
|
||||
@@ -72,6 +72,8 @@ Open `http://localhost:3000` to view the dashboard! *(Requires [Docker Desktop](
|
||||
|
||||
> **Backend port already in use?** The browser only needs port `3000`, but the backend API is also published on host port `8000` for local diagnostics. If another app already uses `8000`, create or edit `.env` next to `docker-compose.yml` and set `BACKEND_PORT=8001`, then run `docker compose up -d`.
|
||||
|
||||
> **Blank news/UAP/bases/wastewater after several minutes?** Check for backend OOM restarts with `docker events --since 30m --filter container=shadowbroker-backend --filter event=oom`. The default compose file gives the backend 4GB; if your host has less memory, reduce enabled feeds or set `BACKEND_MEMORY_LIMIT=3G` and expect slower/heavier layers to warm more gradually.
|
||||
|
||||
> **Podman users:** Podman works, but `podman compose` is a wrapper and still needs a Compose provider installed. On Windows/WSL, if you see `looking up compose provider failed`, install `podman-compose` and run `podman-compose pull` followed by `podman-compose up -d` from inside the cloned `Shadowbroker` folder. On Linux/macOS/WSL shells you can also use `./compose.sh --engine podman pull` and `./compose.sh --engine podman up -d`.
|
||||
|
||||
---
|
||||
@@ -567,6 +569,9 @@ Open `http://localhost:3000` to view the dashboard.
|
||||
> Host port `8000` is only published for local API/debug access. If it conflicts
|
||||
> with another service, set `BACKEND_PORT=8001` in `.env`; leave `BACKEND_URL`
|
||||
> as `http://backend:8000` because that is the Docker-internal port.
|
||||
> The backend memory cap is controlled by `BACKEND_MEMORY_LIMIT` and defaults
|
||||
> to `4G`. If Docker reports OOM events, the backend will restart and slow
|
||||
> layers can look empty until they repopulate.
|
||||
>
|
||||
> If your backend runs on a **different host or port**, set `BACKEND_URL` at runtime — no rebuild required:
|
||||
>
|
||||
|
||||
@@ -1,15 +1,5 @@
|
||||
{
|
||||
"feeds": [
|
||||
{
|
||||
"name": "Reuters",
|
||||
"url": "https://www.reutersagency.com/feed/?best-topics=world",
|
||||
"weight": 5
|
||||
},
|
||||
{
|
||||
"name": "AP News",
|
||||
"url": "https://rsshub.app/apnews/topics/world-news",
|
||||
"weight": 5
|
||||
},
|
||||
{
|
||||
"name": "NPR",
|
||||
"url": "https://feeds.npr.org/1004/rss.xml",
|
||||
@@ -99,6 +89,26 @@
|
||||
"name": "Japan Times",
|
||||
"url": "https://www.japantimes.co.jp/feed/",
|
||||
"weight": 3
|
||||
},
|
||||
{
|
||||
"name": "CSM",
|
||||
"url": "https://www.csmonitor.com/rss/world",
|
||||
"weight": 4
|
||||
},
|
||||
{
|
||||
"name": "PBS NewsHour",
|
||||
"url": "https://www.pbs.org/newshour/feeds/rss/world",
|
||||
"weight": 4
|
||||
},
|
||||
{
|
||||
"name": "France 24",
|
||||
"url": "https://www.france24.com/en/rss",
|
||||
"weight": 4
|
||||
},
|
||||
{
|
||||
"name": "DW",
|
||||
"url": "https://rss.dw.com/xml/rss-en-world",
|
||||
"weight": 4
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -134,15 +134,22 @@ _INTEL_STARTUP_CACHE_KEYS = (
|
||||
"trending_markets",
|
||||
"correlations",
|
||||
"fimi",
|
||||
"crowdthreat",
|
||||
"uap_sightings",
|
||||
"military_bases",
|
||||
"wastewater",
|
||||
)
|
||||
_STARTUP_PRIORITY_TIMEOUT_S = float(os.environ.get("SHADOWBROKER_STARTUP_PRIORITY_TIMEOUT_S", "18"))
|
||||
_STARTUP_HEAVY_REFRESH_DELAY_S = float(os.environ.get("SHADOWBROKER_STARTUP_HEAVY_REFRESH_DELAY_S", "90"))
|
||||
_STARTUP_HEAVY_REFRESH_STARTED = False
|
||||
_STARTUP_HEAVY_REFRESH_LOCK = threading.Lock()
|
||||
_FETCH_WORKERS = int(os.environ.get("SHADOWBROKER_FETCH_WORKERS", "8"))
|
||||
_SLOW_FETCH_CONCURRENCY = int(os.environ.get("SHADOWBROKER_SLOW_FETCH_CONCURRENCY", "4"))
|
||||
_STARTUP_HEAVY_CONCURRENCY = int(os.environ.get("SHADOWBROKER_STARTUP_HEAVY_CONCURRENCY", "2"))
|
||||
|
||||
# Shared thread pool — reused across all fetch cycles instead of creating/destroying per tick
|
||||
_SHARED_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
|
||||
max_workers=20, thread_name_prefix="fetch"
|
||||
max_workers=max(2, _FETCH_WORKERS), thread_name_prefix="fetch"
|
||||
)
|
||||
|
||||
|
||||
@@ -156,6 +163,14 @@ def _cache_json_safe(value):
|
||||
return value
|
||||
|
||||
|
||||
def _has_cache_value(value) -> bool:
|
||||
if value is None:
|
||||
return False
|
||||
if isinstance(value, (list, tuple, dict, set)):
|
||||
return bool(value)
|
||||
return True
|
||||
|
||||
|
||||
def _load_fast_startup_cache_if_available() -> bool:
|
||||
"""Seed moving layers from a recent disk cache while live fetches warm up."""
|
||||
if _FAST_STARTUP_CACHE_MAX_AGE_S <= 0 or not _FAST_STARTUP_CACHE_PATH.exists():
|
||||
@@ -200,10 +215,15 @@ def _save_fast_startup_cache() -> None:
|
||||
"""Persist recent moving layers for the next cold start."""
|
||||
try:
|
||||
with _data_lock:
|
||||
layers = {
|
||||
key: latest_data.get(key)
|
||||
for key in _FAST_STARTUP_CACHE_KEYS
|
||||
if _has_cache_value(latest_data.get(key))
|
||||
}
|
||||
payload = {
|
||||
"cached_at": time.time(),
|
||||
"last_updated": latest_data.get("last_updated"),
|
||||
"layers": {key: latest_data.get(key) for key in _FAST_STARTUP_CACHE_KEYS},
|
||||
"layers": layers,
|
||||
"freshness": {
|
||||
key: source_timestamps.get(key)
|
||||
for key in _FAST_STARTUP_CACHE_KEYS
|
||||
@@ -264,10 +284,15 @@ def _save_intel_startup_cache() -> None:
|
||||
"""Persist compact right-side intelligence data for the next cold start."""
|
||||
try:
|
||||
with _data_lock:
|
||||
layers = {
|
||||
key: latest_data.get(key)
|
||||
for key in _INTEL_STARTUP_CACHE_KEYS
|
||||
if _has_cache_value(latest_data.get(key))
|
||||
}
|
||||
payload = {
|
||||
"cached_at": time.time(),
|
||||
"last_updated": latest_data.get("last_updated"),
|
||||
"layers": {key: latest_data.get(key) for key in _INTEL_STARTUP_CACHE_KEYS},
|
||||
"layers": layers,
|
||||
"freshness": {
|
||||
key: source_timestamps.get(key)
|
||||
for key in _INTEL_STARTUP_CACHE_KEYS
|
||||
@@ -294,11 +319,27 @@ def seed_startup_caches() -> None:
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scheduler & Orchestration
|
||||
# ---------------------------------------------------------------------------
|
||||
def _run_tasks(label: str, funcs: list):
|
||||
def _run_tasks(label: str, funcs: list, *, max_concurrency: int | None = None):
|
||||
"""Run tasks concurrently and log any exceptions (do not fail silently)."""
|
||||
if not funcs:
|
||||
return
|
||||
futures = {_SHARED_EXECUTOR.submit(func): (func.__name__, time.perf_counter()) for func in funcs}
|
||||
if max_concurrency is None:
|
||||
if label.startswith("slow-tier"):
|
||||
max_concurrency = _SLOW_FETCH_CONCURRENCY
|
||||
elif label.startswith("startup-heavy"):
|
||||
max_concurrency = _STARTUP_HEAVY_CONCURRENCY
|
||||
else:
|
||||
max_concurrency = len(funcs)
|
||||
max_concurrency = max(1, min(max_concurrency, len(funcs)))
|
||||
|
||||
remaining_funcs = list(funcs)
|
||||
while remaining_funcs:
|
||||
batch, remaining_funcs = remaining_funcs[:max_concurrency], remaining_funcs[max_concurrency:]
|
||||
futures = {_SHARED_EXECUTOR.submit(func): (func.__name__, time.perf_counter()) for func in batch}
|
||||
_drain_task_futures(label, futures)
|
||||
|
||||
|
||||
def _drain_task_futures(label: str, futures: dict):
|
||||
# Iterate directly so future.result(timeout=...) is the blocking call.
|
||||
# as_completed() blocks inside __next__() waiting for completion — the timeout
|
||||
# on result() would never be reached for a hanging task under that pattern.
|
||||
@@ -486,6 +527,7 @@ def update_all_data(*, startup_mode: bool = False):
|
||||
priority_funcs = [
|
||||
fetch_airports,
|
||||
update_fast_data,
|
||||
fetch_news,
|
||||
fetch_gdelt,
|
||||
fetch_crowdthreat,
|
||||
fetch_firms_fires,
|
||||
@@ -520,55 +562,36 @@ def update_all_data(*, startup_mode: bool = False):
|
||||
except Exception as e:
|
||||
_record_fetch_failure("startup-priority", name, start, e)
|
||||
logger.info("Startup preload: deferring Playwright Liveuamap scraper to scheduled cadence")
|
||||
_save_intel_startup_cache()
|
||||
_schedule_delayed_startup_heavy_refresh()
|
||||
logger.info("Startup priority preload complete; slow synthesis is warming in background.")
|
||||
return
|
||||
futures = {
|
||||
_SHARED_EXECUTOR.submit(fetch_airports): ("fetch_airports", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(update_fast_data): ("update_fast_data", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(update_slow_data): ("update_slow_data", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_volcanoes): ("fetch_volcanoes", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_viirs_change_nodes): ("fetch_viirs_change_nodes", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_unusual_whales): ("fetch_unusual_whales", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_fimi): ("fetch_fimi", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_gdelt): ("fetch_gdelt", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_uap_sightings): ("fetch_uap_sightings", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_wastewater): ("fetch_wastewater", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_crowdthreat): ("fetch_crowdthreat", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_sar_catalog): ("fetch_sar_catalog", time.perf_counter()),
|
||||
_SHARED_EXECUTOR.submit(fetch_sar_products): ("fetch_sar_products", time.perf_counter()),
|
||||
}
|
||||
refresh_funcs = [
|
||||
fetch_airports,
|
||||
update_fast_data,
|
||||
update_slow_data,
|
||||
fetch_volcanoes,
|
||||
fetch_viirs_change_nodes,
|
||||
fetch_unusual_whales,
|
||||
fetch_fimi,
|
||||
fetch_gdelt,
|
||||
fetch_uap_sightings,
|
||||
fetch_wastewater,
|
||||
fetch_crowdthreat,
|
||||
fetch_sar_catalog,
|
||||
fetch_sar_products,
|
||||
]
|
||||
if not startup_mode or not meshtastic_seeded:
|
||||
futures[_SHARED_EXECUTOR.submit(fetch_meshtastic_nodes)] = (
|
||||
"fetch_meshtastic_nodes",
|
||||
time.perf_counter(),
|
||||
)
|
||||
refresh_funcs.append(fetch_meshtastic_nodes)
|
||||
else:
|
||||
logger.info(
|
||||
"Startup preload: Meshtastic cache already loaded, deferring remote map refresh to scheduled cadence"
|
||||
)
|
||||
if not startup_mode:
|
||||
futures[_SHARED_EXECUTOR.submit(update_liveuamap)] = (
|
||||
"update_liveuamap",
|
||||
time.perf_counter(),
|
||||
)
|
||||
refresh_funcs.append(update_liveuamap)
|
||||
else:
|
||||
logger.info("Startup preload: deferring Playwright Liveuamap scraper to scheduled cadence")
|
||||
for future, (name, start) in futures.items():
|
||||
try:
|
||||
future.result(timeout=_TASK_HARD_TIMEOUT_S)
|
||||
duration = time.perf_counter() - start
|
||||
from services.fetch_health import record_success
|
||||
|
||||
record_success(name, duration_s=duration)
|
||||
if duration > _SLOW_FETCH_S:
|
||||
logger.warning(f"full-refresh task slow: {name} took {duration:.2f}s")
|
||||
except Exception as e:
|
||||
duration = time.perf_counter() - start
|
||||
from services.fetch_health import record_failure
|
||||
|
||||
record_failure(name, error=e, duration_s=duration)
|
||||
logger.exception(f"full-refresh task failed: {name}")
|
||||
_run_tasks("full-refresh", refresh_funcs, max_concurrency=_STARTUP_HEAVY_CONCURRENCY)
|
||||
# Run CCTV ingest immediately so cameras are available on first request
|
||||
# (the scheduled job also runs every 10 min for ongoing refresh).
|
||||
if startup_mode:
|
||||
|
||||
@@ -15,6 +15,8 @@ _FEED_URL_REPLACEMENTS = {
|
||||
"https://www.channelnewsasia.com/rssfeed/8395986": "https://www.channelnewsasia.com/api/v1/rss-outbound-feed?_format=xml",
|
||||
}
|
||||
_DEAD_FEED_URLS = {
|
||||
"https://www.reutersagency.com/feed/?best-topics=world",
|
||||
"https://rsshub.app/apnews/topics/world-news",
|
||||
"https://www3.nhk.or.jp/nhkworld/rss/world.xml",
|
||||
"https://focustaiwan.tw/rss",
|
||||
"https://english.kyodonews.net/rss/news.xml",
|
||||
@@ -29,6 +31,11 @@ DEFAULT_FEEDS = [
|
||||
{"name": "AlJazeera", "url": "https://www.aljazeera.com/xml/rss/all.xml", "weight": 2},
|
||||
{"name": "NYT", "url": "https://rss.nytimes.com/services/xml/rss/nyt/World.xml", "weight": 1},
|
||||
{"name": "GDACS", "url": "https://www.gdacs.org/xml/rss.xml", "weight": 5},
|
||||
{"name": "The War Zone", "url": "https://www.twz.com/feed", "weight": 4},
|
||||
{"name": "Bellingcat", "url": "https://www.bellingcat.com/feed/", "weight": 4},
|
||||
{"name": "Guardian", "url": "https://www.theguardian.com/world/rss", "weight": 3},
|
||||
{"name": "TASS", "url": "https://tass.com/rss/v2.xml", "weight": 2},
|
||||
{"name": "Xinhua", "url": "http://www.news.cn/english/rss/worldrss.xml", "weight": 2},
|
||||
{"name": "CNA", "url": "https://www.channelnewsasia.com/api/v1/rss-outbound-feed?_format=xml", "weight": 3},
|
||||
{"name": "Mercopress", "url": "https://en.mercopress.com/rss/", "weight": 3},
|
||||
{"name": "SCMP", "url": "https://www.scmp.com/rss/91/feed", "weight": 4},
|
||||
@@ -73,7 +80,9 @@ def get_feeds() -> list[dict]:
|
||||
normalised = _normalise_feeds(feeds)
|
||||
if normalised != feeds:
|
||||
save_feeds(normalised)
|
||||
return normalised
|
||||
if normalised:
|
||||
return normalised
|
||||
logger.warning("News feed configuration contained no usable feeds; falling back to defaults")
|
||||
except (IOError, OSError, json.JSONDecodeError, ValueError) as e:
|
||||
logger.warning(f"Failed to read news feed config: {e}")
|
||||
return list(DEFAULT_FEEDS)
|
||||
|
||||
@@ -145,6 +145,10 @@ class TestFeedConfig:
|
||||
|
||||
def test_new_east_asia_feeds_present(self):
|
||||
names = {f["name"] for f in DEFAULT_FEEDS}
|
||||
expected = {"FocusTaiwan", "Kyodo", "SCMP", "The Diplomat", "Stars and Stripes",
|
||||
"Yonhap", "Nikkei Asia", "Taipei Times", "Asia Times", "Defense News", "Japan Times"}
|
||||
expected = {"SCMP", "The Diplomat", "Yonhap", "Asia Times", "Defense News", "Japan Times"}
|
||||
assert expected.issubset(names)
|
||||
|
||||
def test_known_dead_feeds_are_not_defaulted(self):
|
||||
urls = {f["url"] for f in DEFAULT_FEEDS}
|
||||
assert "https://www.reutersagency.com/feed/?best-topics=world" not in urls
|
||||
assert "https://rsshub.app/apnews/topics/world-news" not in urls
|
||||
|
||||
+1
-1
@@ -54,7 +54,7 @@ services:
|
||||
deploy:
|
||||
resources:
|
||||
limits:
|
||||
memory: 2G
|
||||
memory: ${BACKEND_MEMORY_LIMIT:-4G}
|
||||
cpus: '2'
|
||||
|
||||
frontend:
|
||||
|
||||
@@ -26,6 +26,8 @@ const STRIP_REQUEST = new Set([
|
||||
'transfer-encoding',
|
||||
'upgrade',
|
||||
'host',
|
||||
'content-length',
|
||||
'expect',
|
||||
]);
|
||||
|
||||
// Headers that must not be forwarded back to the browser.
|
||||
@@ -201,9 +203,10 @@ async function proxy(req: NextRequest, pathSegments: string[]): Promise<NextResp
|
||||
cache: 'no-store',
|
||||
};
|
||||
if (!isBodyless) {
|
||||
requestInit.body = req.body;
|
||||
// Required for streaming request bodies in Node.js fetch
|
||||
requestInit.duplex = 'half';
|
||||
const body = await req.text();
|
||||
if (body.length > 0) {
|
||||
requestInit.body = body;
|
||||
}
|
||||
}
|
||||
const maxAttempts = isBodyless ? 18 : 1;
|
||||
let fetchError: unknown = null;
|
||||
@@ -214,6 +217,13 @@ async function proxy(req: NextRequest, pathSegments: string[]): Promise<NextResp
|
||||
break;
|
||||
} catch (error) {
|
||||
fetchError = error;
|
||||
if (attempt >= maxAttempts) {
|
||||
console.error('api proxy upstream fetch failed', {
|
||||
method: req.method,
|
||||
target: targetUrl.toString(),
|
||||
error,
|
||||
});
|
||||
}
|
||||
if (attempt >= maxAttempts) break;
|
||||
await sleep(250);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,12 @@ export const metadata: Metadata = {
|
||||
description: 'Advanced Geopolitical Risk Dashboard',
|
||||
};
|
||||
|
||||
// The dashboard is a live local runtime, not a static landing page. If Next
|
||||
// prerenders and caches the initial shell, Docker users can get stuck on the
|
||||
// "prioritizing map feeds" markup before client polling ever hydrates.
|
||||
export const dynamic = 'force-dynamic';
|
||||
export const revalidate = 0;
|
||||
|
||||
export default function RootLayout({
|
||||
children,
|
||||
}: Readonly<{
|
||||
|
||||
@@ -13,8 +13,8 @@ function buildCsp(nonce: string): string {
|
||||
const directives = [
|
||||
"default-src 'self'",
|
||||
isDev
|
||||
? `script-src 'self' 'unsafe-inline' 'unsafe-eval' 'nonce-${nonce}' blob:`
|
||||
: `script-src 'self' 'unsafe-inline' 'nonce-${nonce}' blob:`,
|
||||
? "script-src 'self' 'unsafe-inline' 'unsafe-eval' blob:"
|
||||
: "script-src 'self' 'unsafe-inline' blob:",
|
||||
"style-src 'self' 'unsafe-inline' https://fonts.googleapis.com",
|
||||
"img-src 'self' data: blob: https:",
|
||||
isDev
|
||||
@@ -35,7 +35,10 @@ function buildCsp(nonce: string): string {
|
||||
export function middleware(request: NextRequest) {
|
||||
const nonce = Buffer.from(crypto.randomUUID()).toString('base64');
|
||||
|
||||
// Forward nonce to server components via request header.
|
||||
// Forward a nonce for future fully-wired CSP support. Do not include it in
|
||||
// script-src until every Next inline bootstrap script receives the nonce;
|
||||
// otherwise production hydration can fail and leave the app on the static
|
||||
// "prioritizing map feeds" shell.
|
||||
const requestHeaders = new Headers(request.headers);
|
||||
requestHeaders.set('x-nonce', nonce);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user