fix: stabilize v0.9.7 startup and feeds

This commit is contained in:
BigBodyCobain
2026-05-02 13:35:49 -06:00
parent f5b9d14b48
commit 08810f2537
22 changed files with 940 additions and 130 deletions
+20
View File
@@ -38,6 +38,26 @@ ADMIN_KEY=
# Leave blank to skip this optional enrichment.
# NUFORC_MAPBOX_TOKEN=
# Optional startup-risk controls.
# On Windows, external curl fallback and the Playwright LiveUAMap scraper are
# disabled by default so blocked upstream feeds cannot interrupt start.bat.
# SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=false
# SHADOWBROKER_ENABLE_LIVEUAMAP_SCRAPER=false
# AIS starts by default when AIS_API_KEY is set. Set to 0/false to force-disable.
# SHADOWBROKER_ENABLE_AIS_STREAM_PROXY=true
# Minimum visible satellite catalog before forcing a CelesTrak refresh.
# SHADOWBROKER_MIN_VISIBLE_SATELLITES=350
# Upper bound for TLE fallback satellite search when CelesTrak is unreachable.
# SHADOWBROKER_MAX_VISIBLE_SATELLITES=450
# NUFORC fallback uses the Hugging Face mirror when live NUFORC is unavailable.
# NUFORC_HF_FALLBACK_LIMIT=250
# NUFORC_HF_GEOCODE_LIMIT=150
# First-paint cache age budgets. These let the map and Global Threat Intercept
# paint from the last local snapshot while live feeds refresh in the background.
# FAST_STARTUP_CACHE_MAX_AGE_S=21600
# INTEL_STARTUP_CACHE_MAX_AGE_S=21600
# Google Earth Engine for VIIRS night lights change detection (optional).
# pip install earthengine-api
# GEE_SERVICE_ACCOUNT_KEY=
+57 -1
View File
@@ -200,6 +200,7 @@ from services.data_fetcher import (
start_scheduler,
stop_scheduler,
get_latest_data,
seed_startup_caches,
)
from services.ais_stream import start_ais_stream, stop_ais_stream
from services.carrier_tracker import start_carrier_tracker, stop_carrier_tracker
@@ -2232,6 +2233,11 @@ async def lifespan(app: FastAPI):
threading.Thread(target=_prime_aircraft_database, daemon=True).start()
# Seed cached first-paint layers before accepting requests. This is
# disk-only and keeps the critical bootstrap endpoint independent from
# slow network warmup.
seed_startup_caches()
# Start the recurring scheduler (fast=60s, slow=30min).
start_scheduler()
@@ -2239,6 +2245,9 @@ async def lifespan(app: FastAPI):
# is listening on port 8000 instantly. The frontend's adaptive polling
# (retries every 3s) will pick up data piecemeal as each fetcher finishes.
def _background_preload():
delay_s = float(os.environ.get("SHADOWBROKER_STARTUP_PRELOAD_DELAY_S", "2.0") or 0)
if delay_s > 0:
time.sleep(delay_s)
logger.info("=== PRELOADING DATA (background — server already accepting requests) ===")
try:
update_all_data(startup_mode=True)
@@ -3472,6 +3481,46 @@ def _sigint_totals_for_items(items: list) -> dict[str, int]:
return totals
def _cap_startup_items(items: list | None, max_items: int) -> list:
if not items:
return []
if len(items) <= max_items:
return items
return items[:max_items]
def _cap_fast_startup_payload(payload: dict) -> dict:
"""Trim high-volume layers for the first dashboard paint.
The full fast payload can legitimately contain tens of thousands of AIS,
ADS-B, SIGINT, and CCTV records. Returning all of that during app startup
blocks the first map render behind serialization/proxy/network pressure.
This startup payload paints representative live data immediately; the next
normal poll replaces it with the full dataset.
"""
capped = dict(payload)
capped["commercial_flights"] = _cap_startup_items(capped.get("commercial_flights"), 800)
capped["private_flights"] = _cap_startup_items(capped.get("private_flights"), 300)
capped["private_jets"] = _cap_startup_items(capped.get("private_jets"), 150)
capped["ships"] = _cap_startup_items(capped.get("ships"), 1500)
capped["cctv"] = []
capped["sigint"] = _cap_startup_items(capped.get("sigint"), 500)
capped["trains"] = _cap_startup_items(capped.get("trains"), 100)
capped["startup_payload"] = True
return capped
def _cap_fast_dashboard_payload(payload: dict) -> dict:
capped = dict(payload)
capped["commercial_flights"] = _downsample_points(capped.get("commercial_flights") or [], 6000)
capped["private_flights"] = _downsample_points(capped.get("private_flights") or [], 1500)
capped["private_jets"] = _downsample_points(capped.get("private_jets") or [], 1500)
capped["ships"] = _downsample_points(capped.get("ships") or [], 8000)
capped["cctv"] = _downsample_points(capped.get("cctv") or [], 2500)
capped["sigint"] = _downsample_points(capped.get("sigint") or [], 5000)
return capped
@app.get("/api/live-data/fast")
@limiter.limit("120/minute")
async def live_data_fast(
@@ -3482,8 +3531,9 @@ async def live_data_fast(
w: float = Query(None, description="West bound (ignored)", ge=-180, le=180),
n: float = Query(None, description="North bound (ignored)", ge=-90, le=90),
e: float = Query(None, description="East bound (ignored)", ge=-180, le=180),
initial: bool = Query(False, description="Return a capped startup payload for first paint"),
):
etag = _current_etag(prefix="fast|full|")
etag = _current_etag(prefix="fast|initial|" if initial else "fast|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
@@ -3548,6 +3598,10 @@ async def live_data_fast(
"trains": (d.get("trains") or []) if active_layers.get("trains", True) else [],
"freshness": freshness,
}
if initial:
payload = _cap_fast_startup_payload(payload)
else:
payload = _cap_fast_dashboard_payload(payload)
return Response(
content=orjson.dumps(_sanitize_payload(payload)),
media_type="application/json",
@@ -3609,6 +3663,7 @@ async def live_data_slow(
"correlations",
"threat_level",
"trending_markets",
"fimi",
"uap_sightings",
"wastewater",
"sar_scenes",
@@ -3621,6 +3676,7 @@ async def live_data_slow(
"last_updated": d.get("last_updated"),
"threat_level": d.get("threat_level"),
"trending_markets": d.get("trending_markets", []),
"fimi": d.get("fimi", {}),
"news": d.get("news", []),
"stocks": d.get("stocks", {}),
"financial_source": d.get("financial_source", ""),
+126 -5
View File
@@ -185,11 +185,29 @@ def _bbox_spans(s, w, n, e) -> tuple:
return lat_span, max(0.0, lng_span)
def _downsample_points(items: list, max_items: int) -> list:
if max_items <= 0 or len(items) <= max_items:
def _cap_startup_items(items: list | None, max_items: int) -> list:
if not items:
return []
if len(items) <= max_items:
return items
step = len(items) / float(max_items)
return [items[min(len(items) - 1, int(i * step))] for i in range(max_items)]
return items[:max_items]
def _cap_fast_startup_payload(payload: dict) -> dict:
capped = dict(payload)
capped["commercial_flights"] = _cap_startup_items(capped.get("commercial_flights"), 800)
capped["private_flights"] = _cap_startup_items(capped.get("private_flights"), 300)
capped["private_jets"] = _cap_startup_items(capped.get("private_jets"), 150)
capped["ships"] = _cap_startup_items(capped.get("ships"), 1500)
capped["cctv"] = []
capped["sigint"] = _cap_startup_items(capped.get("sigint"), 500)
capped["trains"] = _cap_startup_items(capped.get("trains"), 100)
capped["startup_payload"] = True
return capped
def _cap_fast_dashboard_payload(payload: dict) -> dict:
return payload
def _world_and_continental_scale(has_bbox: bool, s, w, n, e) -> tuple:
@@ -326,6 +344,104 @@ async def live_data(request: Request):
return get_latest_data()
@router.get("/api/bootstrap/critical")
@limiter.limit("180/minute")
async def bootstrap_critical(request: Request):
"""Cached first-paint payload for the dashboard.
This endpoint is intentionally memory-only: no upstream calls, no refresh,
and a bounded response. It exists so the map and threat feed can paint
before slower panels and background enrichers finish warming up.
"""
etag = _current_etag(prefix="bootstrap|critical|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import (
active_layers,
get_latest_data_subset_refs,
get_source_timestamps_snapshot,
)
d = get_latest_data_subset_refs(
"last_updated", "commercial_flights", "military_flights", "private_flights",
"private_jets", "tracked_flights", "ships", "uavs", "liveuamap", "gps_jamming",
"satellites", "satellite_source", "satellite_analysis", "sigint", "sigint_totals",
"trains", "news", "gdelt", "airports", "threat_level", "trending_markets",
"correlations", "fimi", "crowdthreat",
)
freshness = get_source_timestamps_snapshot()
ships_enabled = any(active_layers.get(key, True) for key in (
"ships_military", "ships_cargo", "ships_civilian", "ships_passenger", "ships_tracked_yachts"))
sigint_items = _filter_sigint_by_layers(d.get("sigint") or [], active_layers)
payload = {
"last_updated": d.get("last_updated"),
"commercial_flights": _cap_startup_items(
(d.get("commercial_flights") or []) if active_layers.get("flights", True) else [],
800,
),
"military_flights": _cap_startup_items(
(d.get("military_flights") or []) if active_layers.get("military", True) else [],
300,
),
"private_flights": _cap_startup_items(
(d.get("private_flights") or []) if active_layers.get("private", True) else [],
300,
),
"private_jets": _cap_startup_items(
(d.get("private_jets") or []) if active_layers.get("jets", True) else [],
150,
),
"tracked_flights": _cap_startup_items(
(d.get("tracked_flights") or []) if active_layers.get("tracked", True) else [],
250,
),
"ships": _cap_startup_items((d.get("ships") or []) if ships_enabled else [], 1500),
"uavs": _cap_startup_items((d.get("uavs") or []) if active_layers.get("military", True) else [], 100),
"liveuamap": _cap_startup_items(
(d.get("liveuamap") or []) if active_layers.get("global_incidents", True) else [],
300,
),
"gps_jamming": _cap_startup_items(
(d.get("gps_jamming") or []) if active_layers.get("gps_jamming", True) else [],
200,
),
"satellites": _cap_startup_items(
(d.get("satellites") or []) if active_layers.get("satellites", True) else [],
250,
),
"satellite_source": d.get("satellite_source", "none"),
"satellite_analysis": (d.get("satellite_analysis") or {}) if active_layers.get("satellites", True) else {},
"sigint": _cap_startup_items(
sigint_items if (active_layers.get("sigint_meshtastic", True) or active_layers.get("sigint_aprs", True)) else [],
500,
),
"sigint_totals": _sigint_totals_for_items(sigint_items),
"trains": _cap_startup_items((d.get("trains") or []) if active_layers.get("trains", True) else [], 100),
"news": _cap_startup_items(d.get("news") or [], 30),
"gdelt": _cap_startup_items((d.get("gdelt") or []) if active_layers.get("global_incidents", True) else [], 300),
"airports": _cap_startup_items(d.get("airports") or [], 500),
"threat_level": d.get("threat_level"),
"trending_markets": _cap_startup_items(d.get("trending_markets") or [], 10),
"correlations": _cap_startup_items(
(d.get("correlations") or []) if active_layers.get("correlations", True) else [],
50,
),
"fimi": d.get("fimi"),
"crowdthreat": _cap_startup_items(
(d.get("crowdthreat") or []) if active_layers.get("crowdthreat", True) else [],
150,
),
"freshness": freshness,
"bootstrap_ready": True,
"bootstrap_payload": True,
}
return Response(
content=orjson.dumps(_sanitize_payload(payload), default=str, option=orjson.OPT_NON_STR_KEYS),
media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"},
)
@router.get("/api/live-data/fast")
@limiter.limit("120/minute")
async def live_data_fast(
@@ -334,8 +450,9 @@ async def live_data_fast(
w: float = Query(None, description="West bound (ignored)", ge=-180, le=180),
n: float = Query(None, description="North bound (ignored)", ge=-90, le=90),
e: float = Query(None, description="East bound (ignored)", ge=-180, le=180),
initial: bool = Query(False, description="Return a capped startup payload for first paint"),
):
etag = _current_etag(prefix="fast|full|")
etag = _current_etag(prefix="fast|initial|" if initial else "fast|full|")
if request.headers.get("if-none-match") == etag:
return Response(status_code=304, headers={"ETag": etag, "Cache-Control": "no-cache"})
from services.fetchers._store import (active_layers, get_latest_data_subset_refs, get_source_timestamps_snapshot)
@@ -371,6 +488,10 @@ async def live_data_fast(
"trains": (d.get("trains") or []) if active_layers.get("trains", True) else [],
"freshness": freshness,
}
if initial:
payload = _cap_fast_startup_payload(payload)
else:
payload = _cap_fast_dashboard_payload(payload)
return Response(content=orjson.dumps(_sanitize_payload(payload)), media_type="application/json",
headers={"ETag": etag, "Cache-Control": "no-cache"})
+35 -3
View File
@@ -17,6 +17,18 @@ AIS_WS_URL = "wss://stream.aisstream.io/v0/stream"
API_KEY = os.environ.get("AIS_API_KEY", "")
def _env_truthy(name: str) -> bool:
return str(os.getenv(name, "")).strip().lower() in {"1", "true", "yes", "on"}
def ais_stream_proxy_enabled() -> bool:
"""Return whether the external Node AIS proxy may be started."""
setting = str(os.getenv("SHADOWBROKER_ENABLE_AIS_STREAM_PROXY", "")).strip().lower()
if setting:
return _env_truthy("SHADOWBROKER_ENABLE_AIS_STREAM_PROXY")
return True
# AIS vessel type code classification
# See: https://coast.noaa.gov/data/marinecadastre/ais/VesselTypeCodes2018.pdf
def classify_vessel(ais_type: int, mmsi: int) -> str:
@@ -496,6 +508,12 @@ def _ais_stream_loop():
logger.info("Starting Node.js AIS Stream Proxy...")
proxy_env = os.environ.copy()
proxy_env["AIS_API_KEY"] = API_KEY
popen_kwargs = {}
if os.name == "nt":
popen_kwargs["creationflags"] = (
getattr(subprocess, "CREATE_NO_WINDOW", 0)
| getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
)
process = subprocess.Popen(
["node", proxy_script],
stdin=subprocess.PIPE,
@@ -504,6 +522,7 @@ def _ais_stream_loop():
text=True,
bufsize=1,
env=proxy_env,
**popen_kwargs,
)
with _vessels_lock:
_proxy_process = process
@@ -646,6 +665,22 @@ def _run_ais_loop():
def start_ais_stream():
"""Start the AIS WebSocket stream in a background thread."""
global _ws_thread, _ws_running
# Always load cached vessel data first so the ships layer can paint even
# when live streaming is disabled or the upstream is unavailable.
_load_cache()
if not API_KEY:
logger.info("AIS_API_KEY not set — ship tracking disabled. Set AIS_API_KEY to enable.")
return
if not ais_stream_proxy_enabled():
logger.info(
"AIS live stream proxy disabled for this runtime; using cached AIS vessels. "
"Set SHADOWBROKER_ENABLE_AIS_STREAM_PROXY=1 to opt in."
)
return
with _vessels_lock:
if _ws_running:
logger.info("AIS Stream already running")
@@ -656,9 +691,6 @@ def start_ais_stream():
logger.info("AIS Stream already running")
return
# Load cached vessel data from disk
_load_cache()
_ws_thread = threading.Thread(target=_run_ais_loop, daemon=True, name="ais-stream")
_ws_thread.start()
logger.info("AIS Stream background thread started")
+93 -5
View File
@@ -105,7 +105,7 @@ _SLOW_FETCH_S = float(os.environ.get("FETCH_SLOW_THRESHOLD_S", "5"))
# Hard wall-clock limit per individual fetch task. A task that exceeds this
# is treated as a failure so it cannot block an entire fetch tier indefinitely.
_TASK_HARD_TIMEOUT_S = float(os.environ.get("FETCH_TASK_TIMEOUT_S", "120"))
_FAST_STARTUP_CACHE_MAX_AGE_S = float(os.environ.get("FAST_STARTUP_CACHE_MAX_AGE_S", "300"))
_FAST_STARTUP_CACHE_MAX_AGE_S = float(os.environ.get("FAST_STARTUP_CACHE_MAX_AGE_S", "21600"))
_FAST_STARTUP_CACHE_PATH = Path(__file__).resolve().parents[1] / "data" / "fast_startup_cache.json"
_FAST_STARTUP_CACHE_KEYS = (
"commercial_flights",
@@ -123,6 +123,17 @@ _FAST_STARTUP_CACHE_KEYS = (
"sigint_totals",
"trains",
)
_INTEL_STARTUP_CACHE_MAX_AGE_S = float(os.environ.get("INTEL_STARTUP_CACHE_MAX_AGE_S", "21600"))
_INTEL_STARTUP_CACHE_PATH = Path(__file__).resolve().parents[1] / "data" / "intel_startup_cache.json"
_INTEL_STARTUP_CACHE_KEYS = (
"news",
"gdelt",
"liveuamap",
"threat_level",
"trending_markets",
"correlations",
"fimi",
)
# Shared thread pool — reused across all fetch cycles instead of creating/destroying per tick
_SHARED_EXECUTOR = concurrent.futures.ThreadPoolExecutor(
@@ -204,6 +215,77 @@ def _save_fast_startup_cache() -> None:
logger.debug("Fast startup cache save skipped: %s", e)
def _load_intel_startup_cache_if_available() -> bool:
"""Seed the right-side intelligence panel from disk while live feeds warm up."""
if _INTEL_STARTUP_CACHE_MAX_AGE_S <= 0 or not _INTEL_STARTUP_CACHE_PATH.exists():
return False
try:
with _INTEL_STARTUP_CACHE_PATH.open("r", encoding="utf-8") as fh:
payload = json.load(fh)
cached_at = float(payload.get("cached_at") or 0)
age_s = time.time() - cached_at
if cached_at <= 0 or age_s > _INTEL_STARTUP_CACHE_MAX_AGE_S:
logger.info("Skipping stale intel startup cache (age %.1fs)", age_s)
return False
layers = payload.get("layers") or {}
freshness = payload.get("freshness") or {}
loaded: list[str] = []
with _data_lock:
for key in _INTEL_STARTUP_CACHE_KEYS:
if key in layers:
latest_data[key] = layers[key]
loaded.append(key)
for key, ts in freshness.items():
source_timestamps[str(key)] = ts
if payload.get("last_updated"):
latest_data["last_updated"] = payload.get("last_updated")
if not loaded:
return False
from services.fetchers._store import bump_data_version
bump_data_version()
logger.info(
"Loaded intel startup cache for %d layers (age %.1fs) so Global Threat Intercept can paint early",
len(loaded),
age_s,
)
return True
except Exception as e:
logger.warning("Intel startup cache load failed (non-fatal): %s", e)
return False
def _save_intel_startup_cache() -> None:
"""Persist compact right-side intelligence data for the next cold start."""
try:
with _data_lock:
payload = {
"cached_at": time.time(),
"last_updated": latest_data.get("last_updated"),
"layers": {key: latest_data.get(key) for key in _INTEL_STARTUP_CACHE_KEYS},
"freshness": {
key: source_timestamps.get(key)
for key in _INTEL_STARTUP_CACHE_KEYS
if source_timestamps.get(key)
},
}
safe_payload = _cache_json_safe(payload)
_INTEL_STARTUP_CACHE_PATH.parent.mkdir(parents=True, exist_ok=True)
tmp_path = _INTEL_STARTUP_CACHE_PATH.with_suffix(".tmp")
with tmp_path.open("w", encoding="utf-8") as fh:
json.dump(safe_payload, fh, separators=(",", ":"))
tmp_path.replace(_INTEL_STARTUP_CACHE_PATH)
except Exception as e:
logger.debug("Intel startup cache save skipped: %s", e)
def seed_startup_caches() -> None:
"""Load disk-backed first-paint caches without touching remote services."""
load_meshtastic_cache_if_available()
_load_fast_startup_cache_if_available()
_load_intel_startup_cache_if_available()
# ---------------------------------------------------------------------------
# Scheduler & Orchestration
# ---------------------------------------------------------------------------
@@ -262,7 +344,6 @@ def update_fast_data():
fetch_satellites,
fetch_sigint,
fetch_trains,
fetch_tinygs,
]
_run_tasks("fast-tier", fast_funcs)
with _data_lock:
@@ -289,6 +370,7 @@ def update_slow_data():
fetch_cctv,
fetch_kiwisdr,
fetch_satnogs,
fetch_tinygs,
fetch_frontlines,
fetch_datacenters,
fetch_military_bases,
@@ -313,6 +395,7 @@ def update_slow_data():
logger.error("Correlation engine failed: %s", e)
from services.fetchers._store import bump_data_version
bump_data_version()
_save_intel_startup_cache()
logger.info("Slow-tier update complete.")
@@ -324,8 +407,7 @@ def update_all_data(*, startup_mode: bool = False):
"""
logger.info("Full data update starting (parallel)...")
# Preload Meshtastic map cache immediately (instant, from disk)
load_meshtastic_cache_if_available()
_load_fast_startup_cache_if_available()
seed_startup_caches()
with _data_lock:
meshtastic_seeded = bool(latest_data.get("meshtastic_map_nodes"))
futures = {
@@ -337,7 +419,6 @@ def update_all_data(*, startup_mode: bool = False):
_SHARED_EXECUTOR.submit(fetch_unusual_whales): ("fetch_unusual_whales", time.perf_counter()),
_SHARED_EXECUTOR.submit(fetch_fimi): ("fetch_fimi", time.perf_counter()),
_SHARED_EXECUTOR.submit(fetch_gdelt): ("fetch_gdelt", time.perf_counter()),
_SHARED_EXECUTOR.submit(update_liveuamap): ("update_liveuamap", time.perf_counter()),
_SHARED_EXECUTOR.submit(fetch_uap_sightings): ("fetch_uap_sightings", time.perf_counter()),
_SHARED_EXECUTOR.submit(fetch_wastewater): ("fetch_wastewater", time.perf_counter()),
_SHARED_EXECUTOR.submit(fetch_crowdthreat): ("fetch_crowdthreat", time.perf_counter()),
@@ -353,6 +434,13 @@ def update_all_data(*, startup_mode: bool = False):
logger.info(
"Startup preload: Meshtastic cache already loaded, deferring remote map refresh to scheduled cadence"
)
if not startup_mode:
futures[_SHARED_EXECUTOR.submit(update_liveuamap)] = (
"update_liveuamap",
time.perf_counter(),
)
else:
logger.info("Startup preload: deferring Playwright Liveuamap scraper to scheduled cadence")
for future, (name, start) in futures.items():
try:
future.result(timeout=_TASK_HARD_TIMEOUT_S)
+158 -5
View File
@@ -15,7 +15,7 @@ import time
import heapq
from datetime import datetime, timedelta
from pathlib import Path
from services.network_utils import fetch_with_curl
from services.network_utils import external_curl_fallback_enabled, fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.nuforc_enrichment import enrich_sighting
from services.fetchers.retry import with_retry
@@ -685,6 +685,8 @@ _NUFORC_TOKEN = os.environ.get("NUFORC_MAPBOX_TOKEN", "").strip()
_NUFORC_RADIUS_M = 200_000 # 200 km query radius
_NUFORC_LIMIT = 50 # max features per tilequery call
_NUFORC_RECENT_DAYS = int(os.environ.get("NUFORC_RECENT_DAYS", "60"))
_NUFORC_HF_FALLBACK_LIMIT = max(25, int(os.environ.get("NUFORC_HF_FALLBACK_LIMIT", "250")))
_NUFORC_HF_GEOCODE_LIMIT = max(25, int(os.environ.get("NUFORC_HF_GEOCODE_LIMIT", "150")))
_NUFORC_GEOCODE_WORKERS = max(1, int(os.environ.get("NUFORC_GEOCODE_WORKERS", "1")))
# Photon (Komoot) is more lenient than Nominatim — ~200ms per query in
# practice, so a 0.3s spacing keeps us well under any soft throttle while
@@ -1034,6 +1036,14 @@ def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]:
index_url = _NUFORC_LIVE_INDEX_URL.format(yyyymm=yyyymm)
ajax_url = _NUFORC_LIVE_AJAX_URL.format(yyyymm=yyyymm)
if not external_curl_fallback_enabled():
logger.warning(
"NUFORC live: external curl disabled on Windows for %s; "
"set SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=1 to opt in.",
yyyymm,
)
return []
# Step 1: GET the month index to capture session cookies + fresh nonce.
try:
index_res = subprocess.run(
@@ -1340,6 +1350,143 @@ def _build_recent_uap_sightings() -> list[dict]:
return sightings
def _split_uap_location(location: str) -> tuple[str, str, str]:
parts = [p.strip() for p in str(location or "").split(",") if p.strip()]
city = parts[0] if parts else ""
state = ""
country = ""
if len(parts) >= 2:
state = parts[1]
if len(parts) >= 3:
country = parts[-1]
if country and country.upper() in _US_COUNTRY_ALIASES:
country = "US"
return city, state, country
def _build_uap_sightings_from_hf_mirror() -> list[dict]:
"""Build visible UAP points from the public Hugging Face NUFORC mirror.
This is a resilience fallback for local/Windows runs where nuforc.org is
Cloudflare-gated and the Mapbox token is not configured. It is not as fresh
as the live NUFORC AJAX feed, but it keeps the layer visible and cached.
"""
from services.fetchers.nuforc_enrichment import _HF_CSV_URL, _parse_date
from services.geocode_validate import coord_in_country
try:
response = fetch_with_curl(_HF_CSV_URL, timeout=180, follow_redirects=True)
if not response or response.status_code != 200:
logger.warning(
"UAP sightings: HF fallback failed HTTP %s",
getattr(response, "status_code", "None"),
)
return []
except Exception as e:
logger.warning("UAP sightings: HF fallback download failed: %s", e)
return []
candidates: list[dict] = []
try:
reader = csv.DictReader(io.StringIO(response.text))
for row in reader:
occurred = _parse_date(
row.get("Occurred", "")
or row.get("Date / Time", "")
or row.get("Date", "")
)
if not occurred:
continue
raw_location = _normalize_uap_location(
row.get("Location", "")
or row.get("City", "")
or row.get("location", "")
)
if not raw_location:
continue
city, state, country = _split_uap_location(raw_location)
if not city:
continue
sighting_id = str(row.get("Sighting", "") or "").strip()
if not sighting_id:
sighting_id = hashlib.sha1(
f"{occurred}|{raw_location}|{row.get('Summary', '')}".encode("utf-8", "ignore")
).hexdigest()[:12]
summary = (row.get("Summary", "") or row.get("Text", "") or "Sighting reported").strip()
if len(summary) > 280:
summary = summary[:277] + "..."
candidates.append({
"id": f"NUFORC-{sighting_id}",
"occurred": occurred,
"posted": _parse_date(row.get("Posted", "") or row.get("Reported", "")) or occurred,
"location": raw_location,
"city": city,
"state": state,
"country": country or _uap_country_from_location(raw_location, state),
"shape_raw": (row.get("Shape", "") or "Unknown").strip(),
"duration": (row.get("Duration", "") or "").strip(),
"summary": summary,
})
except Exception as e:
logger.warning("UAP sightings: HF fallback parse failed: %s", e)
return []
candidates.sort(key=lambda row: (row["occurred"], row["posted"], row["id"]), reverse=True)
candidates = candidates[:_NUFORC_HF_FALLBACK_LIMIT]
location_cache = _load_nuforc_location_cache()
sightings: list[dict] = []
geocoded = 0
for row in candidates:
coords = location_cache.get(row["location"])
if row["location"] not in location_cache and geocoded < _NUFORC_HF_GEOCODE_LIMIT:
try:
coords = _geocode_uap_location(
row["location"], row["city"], row["state"], row["country"]
)
except Exception:
coords = None
location_cache[row["location"]] = coords
geocoded += 1
if geocoded < _NUFORC_HF_GEOCODE_LIMIT:
time.sleep(_NUFORC_GEOCODE_SPACING_S)
if not coords:
continue
if row.get("country"):
try:
inside = coord_in_country(coords[0], coords[1], row["country"])
except Exception:
inside = None
if inside is False:
continue
shape_raw = row["shape_raw"] or "Unknown"
sightings.append({
"id": row["id"],
"date_time": row["occurred"],
"city": row["city"],
"state": row["state"],
"country": row["country"],
"shape": _normalize_uap_shape(shape_raw) if shape_raw != "Unknown" else "unknown",
"shape_raw": shape_raw,
"duration": row["duration"],
"summary": row["summary"],
"posted": row["posted"],
"lat": float(coords[0]),
"lng": float(coords[1]),
"count": 1,
"source": "NUFORC-HF",
})
_save_nuforc_location_cache(location_cache)
logger.info(
"UAP sightings: %d mapped reports from HF fallback (%d candidates, %d geocoded)",
len(sightings),
len(candidates),
geocoded,
)
return sightings
@with_retry(max_retries=1, base_delay=5)
def fetch_uap_sightings(*, force_refresh: bool = False):
"""Fetch last-year UAP sightings from NUFORC.
@@ -1355,12 +1502,18 @@ def fetch_uap_sightings(*, force_refresh: bool = False):
sightings = _load_nuforc_sightings_cache(force_refresh=force_refresh)
if sightings is None:
sightings = _build_recent_uap_sightings()
_save_nuforc_sightings_cache(sightings)
try:
sightings = _build_recent_uap_sightings()
except Exception as e:
logger.warning("UAP sightings: live NUFORC rebuild failed, using fallback: %s", e)
sightings = _build_uap_sightings_from_hf_mirror()
if sightings:
_save_nuforc_sightings_cache(sightings)
with _data_lock:
latest_data["uap_sightings"] = sightings
_mark_fresh("uap_sightings")
latest_data["uap_sightings"] = sightings or []
if sightings:
_mark_fresh("uap_sightings")
return
cutoff = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)
+24
View File
@@ -15,6 +15,24 @@ from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
def _env_flag(name: str) -> str:
return str(os.getenv(name, "")).strip().lower()
def liveuamap_scraper_enabled() -> bool:
"""Return whether the Playwright-based LiveUAMap scraper should run.
It is useful enrichment, but it starts a browser/Node driver and must not be
allowed to destabilize Windows local startup.
"""
setting = _env_flag("SHADOWBROKER_ENABLE_LIVEUAMAP_SCRAPER")
if setting in {"1", "true", "yes", "on"}:
return True
if setting in {"0", "false", "no", "off"}:
return False
return os.name != "nt"
# ---------------------------------------------------------------------------
# Ships (AIS + Carriers)
# ---------------------------------------------------------------------------
@@ -191,6 +209,12 @@ def update_liveuamap():
if not is_any_active("global_incidents"):
return
if not liveuamap_scraper_enabled():
logger.info(
"Liveuamap scraper disabled for this runtime; set "
"SHADOWBROKER_ENABLE_LIVEUAMAP_SCRAPER=1 to opt in."
)
return
logger.info("Running scheduled Liveuamap scraper...")
try:
from services.liveuamap_scraper import fetch_liveuamap
+119 -7
View File
@@ -15,6 +15,7 @@ Analysis features (derived from cached TLEs — no extra network requests):
import math
import time
import json
import os
import re
import logging
import requests
@@ -41,6 +42,38 @@ def _gmst(jd_ut1):
# CelesTrak fair use: fetch at most once per 24 hours (86400s).
# SGP4 propagation runs every 60s using cached TLEs — positions stay live.
_CELESTRAK_FETCH_INTERVAL = 86400 # 24 hours
_MIN_VISIBLE_SATELLITE_CATALOG = int(os.environ.get("SHADOWBROKER_MIN_VISIBLE_SATELLITES", "350"))
_MAX_VISIBLE_SATELLITE_CATALOG = int(os.environ.get("SHADOWBROKER_MAX_VISIBLE_SATELLITES", "450"))
_CELESTRAK_VISIBLE_GROUPS = {
"military": {"mission": "military", "sat_type": "Military / Defense"},
"radar": {"mission": "sar", "sat_type": "Radar / SAR"},
"resource": {"mission": "earth_observation", "sat_type": "Earth Observation"},
"weather": {"mission": "weather", "sat_type": "Weather / Meteorology"},
"gnss": {"mission": "navigation", "sat_type": "GNSS / Navigation"},
"science": {"mission": "science", "sat_type": "Science"},
}
_TLE_VISIBLE_FALLBACK_TERMS = {
"COSMOS": {"mission": "military", "sat_type": "Russian / Soviet Military"},
"USA": {"mission": "military", "sat_type": "US Military / NRO"},
"NROL": {"mission": "military", "sat_type": "Classified NRO"},
"GPS": {"mission": "navigation", "sat_type": "GPS Navigation"},
"GALILEO": {"mission": "navigation", "sat_type": "Galileo Navigation"},
"BEIDOU": {"mission": "navigation", "sat_type": "BeiDou Navigation"},
"GLONASS": {"mission": "navigation", "sat_type": "GLONASS Navigation"},
"NOAA": {"mission": "weather", "sat_type": "NOAA Weather"},
"METEOR": {"mission": "weather", "sat_type": "Meteor Weather"},
"SENTINEL": {"mission": "earth_observation", "sat_type": "Sentinel Earth Observation"},
"LANDSAT": {"mission": "earth_observation", "sat_type": "Landsat Earth Observation"},
"WORLDVIEW": {"mission": "commercial_imaging", "sat_type": "Maxar High-Res"},
"PLEIADES": {"mission": "commercial_imaging", "sat_type": "Airbus Imaging"},
"SKYSAT": {"mission": "commercial_imaging", "sat_type": "Planet Video"},
"JILIN": {"mission": "commercial_imaging", "sat_type": "Jilin Imaging"},
"FLOCK": {"mission": "commercial_imaging", "sat_type": "PlanetScope"},
"LEMUR": {"mission": "commercial_rf", "sat_type": "Spire RF / AIS"},
"ICEYE": {"mission": "sar", "sat_type": "ICEYE SAR"},
"UMBRA": {"mission": "sar", "sat_type": "Umbra SAR"},
"CAPELLA": {"mission": "sar", "sat_type": "Capella SAR"},
}
_sat_gp_cache = {"data": None, "last_fetch": 0, "source": "none", "last_modified": None}
_sat_classified_cache = {"data": None, "gp_fetch_ts": 0}
_SAT_CACHE_PATH = Path(__file__).parent.parent.parent / "data" / "sat_gp_cache.json"
@@ -564,9 +597,61 @@ def _parse_tle_to_gp(name, norad_id, line1, line2):
return None
def _annotate_celestrak_group(records: list[dict], group: str) -> list[dict]:
meta = _CELESTRAK_VISIBLE_GROUPS.get(group, {})
out = []
for sat in records:
if not isinstance(sat, dict):
continue
item = dict(sat)
item["_SB_GROUP"] = group
if meta:
item["_SB_GROUP_META"] = meta
out.append(item)
return out
def _fetch_visible_celestrak_catalog(headers: dict | None = None) -> list[dict]:
"""Fetch bounded CelesTrak groups used by the visible satellite layer.
The full ``active`` catalog is too large and frequently times out on local
startup. These groups cover the visible operational set users expect
without pulling Starlink-scale constellations into the map.
"""
headers = headers or {}
merged: dict[int, dict] = {}
for group in _CELESTRAK_VISIBLE_GROUPS:
url = f"https://celestrak.org/NORAD/elements/gp.php?GROUP={group}&FORMAT=json"
try:
response = fetch_with_curl(url, timeout=15, headers=headers)
if response.status_code != 200:
logger.debug("Satellites: CelesTrak group %s returned HTTP %s", group, response.status_code)
continue
gp_data = response.json()
if not isinstance(gp_data, list):
continue
for sat in _annotate_celestrak_group(gp_data, group):
norad_id = sat.get("NORAD_CAT_ID")
if norad_id is None:
continue
merged[int(norad_id)] = sat
time.sleep(0.35)
except (
requests.RequestException,
ConnectionError,
TimeoutError,
ValueError,
KeyError,
json.JSONDecodeError,
OSError,
) as e:
logger.warning("Satellites: Failed to fetch CelesTrak group %s: %s", group, e)
return list(merged.values())
def _fetch_satellites_from_tle_api():
"""Fallback: fetch satellite TLEs from tle.ivanstanojevic.me when CelesTrak is blocked."""
search_terms = set()
search_terms = set(_TLE_VISIBLE_FALLBACK_TERMS)
for key, _ in _SAT_INTEL_DB:
term = key.split()[0] if len(key.split()) > 1 and key.split()[0] in ("USA", "NROL") else key
search_terms.add(term)
@@ -591,8 +676,13 @@ def _fetch_satellites_from_tle_api():
sat_id = gp.get("NORAD_CAT_ID")
if sat_id not in seen_ids:
seen_ids.add(sat_id)
if term in _TLE_VISIBLE_FALLBACK_TERMS:
gp["_SB_GROUP"] = f"tle:{term}"
gp["_SB_GROUP_META"] = _TLE_VISIBLE_FALLBACK_TERMS[term]
all_results.append(gp)
time.sleep(1) # Polite delay between requests
if len(all_results) >= _MAX_VISIBLE_SATELLITE_CATALOG:
return all_results
time.sleep(0.15) # Polite delay between requests
except (
requests.RequestException,
ConnectionError,
@@ -644,18 +734,34 @@ def fetch_satellites():
if (
_sat_gp_cache["data"] is None
or len(_sat_gp_cache.get("data") or []) < _MIN_VISIBLE_SATELLITE_CATALOG
or (now_ts - _sat_gp_cache["last_fetch"]) > _CELESTRAK_FETCH_INTERVAL
):
gp_urls = [
"https://celestrak.org/NORAD/elements/gp.php?GROUP=active&FORMAT=json",
"https://celestrak.com/NORAD/elements/gp.php?GROUP=active&FORMAT=json",
]
# Build conditional request headers (CelesTrak fair use)
headers = {}
if _sat_gp_cache.get("last_modified"):
headers["If-Modified-Since"] = _sat_gp_cache["last_modified"]
visible_data = _fetch_visible_celestrak_catalog(headers=headers)
if len(visible_data) >= _MIN_VISIBLE_SATELLITE_CATALOG:
_sat_gp_cache["data"] = visible_data
_sat_gp_cache["last_fetch"] = now_ts
_sat_gp_cache["source"] = "celestrak_visible_groups"
_save_sat_cache(visible_data)
_snapshot_current_tles(visible_data)
logger.info(
"Satellites: Downloaded %d GP records from visible CelesTrak groups",
len(visible_data),
)
gp_urls = [
"https://celestrak.org/NORAD/elements/gp.php?GROUP=active&FORMAT=json",
"https://celestrak.com/NORAD/elements/gp.php?GROUP=active&FORMAT=json",
]
for url in gp_urls:
if len(_sat_gp_cache.get("data") or []) >= _MIN_VISIBLE_SATELLITE_CATALOG:
break
try:
response = fetch_with_curl(url, timeout=15, headers=headers)
if response.status_code == 304:
@@ -696,7 +802,10 @@ def fetch_satellites():
logger.warning(f"Satellites: Failed to fetch from {url}: {e}")
continue
if _sat_gp_cache["data"] is None:
if (
_sat_gp_cache["data"] is None
or len(_sat_gp_cache.get("data") or []) < _MIN_VISIBLE_SATELLITE_CATALOG
):
logger.info("Satellites: CelesTrak unreachable, trying TLE fallback API...")
try:
fallback_data = _fetch_satellites_from_tle_api()
@@ -757,6 +866,9 @@ def fetch_satellites():
owner = sat.get("OWNER", sat.get("OBJECT_OWNER", ""))
if owner in _OWNER_CODE_MAP:
intel = {"country": _OWNER_CODE_MAP[owner], "mission": "general", "sat_type": "Unclassified"}
if not intel and sat.get("_SB_GROUP_META"):
intel = dict(sat["_SB_GROUP_META"])
intel.setdefault("country", "Unknown")
if not intel:
continue
+34 -5
View File
@@ -1,5 +1,6 @@
import logging
import json
import os
import subprocess
import shutil
import time
@@ -20,7 +21,6 @@ _session.mount("http://", HTTPAdapter(max_retries=_retry, pool_maxsize=10))
# Find bash for curl fallback — Git bash's curl has the TLS features
# needed to pass CDN fingerprint checks (brotli, zstd, libpsl)
_BASH_PATH = shutil.which("bash") or "bash"
# Cache domains where requests fails — skip straight to curl for 5 minutes
_domain_fail_cache: dict[str, float] = {}
@@ -39,6 +39,17 @@ class UpstreamCircuitBreakerError(OSError):
"""Raised when a domain recently failed hard and is temporarily skipped."""
def _env_truthy(name: str) -> bool:
return str(os.getenv(name, "")).strip().lower() in {"1", "true", "yes", "on"}
def external_curl_fallback_enabled() -> bool:
"""Return whether the backend may spawn an external curl process."""
if os.name != "nt":
return True
return _env_truthy("SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK")
class _DummyResponse:
"""Minimal response object matching requests.Response interface."""
def __init__(self, status_code, text):
@@ -98,11 +109,22 @@ def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None,
_circuit_breaker.pop(domain, None)
return res
except (requests.RequestException, ConnectionError, TimeoutError, OSError) as e:
logger.warning(f"Python requests failed for {url} ({e}), falling back to bash curl...")
fallback = "falling back to curl" if external_curl_fallback_enabled() else "skipping external curl"
logger.warning(f"Python requests failed for {url} ({e}), {fallback}...")
with _cb_lock:
_domain_fail_cache[domain] = time.time()
# Curl fallback — reached from both _skip_requests and requests-exception paths
if not external_curl_fallback_enabled():
logger.warning(
"External curl fallback disabled on Windows for %s; set "
"SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=1 to opt in.",
domain,
)
with _cb_lock:
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
_CURL_PATH = shutil.which("curl") or "curl"
cmd = [_CURL_PATH, "-s", "-w", "\n%{http_code}"]
if follow_redirects:
@@ -116,9 +138,16 @@ def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None,
try:
stdin_data = json.dumps(json_data) if (method == "POST" and json_data) else None
creationflags = 0
if os.name == "nt":
creationflags = (
getattr(subprocess, "CREATE_NO_WINDOW", 0)
| getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
)
res = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout + 5,
input=stdin_data, encoding="utf-8", errors="replace"
input=stdin_data, encoding="utf-8", errors="replace",
creationflags=creationflags,
)
if res.returncode == 0 and (res.stdout or "").strip():
# Parse HTTP status code from -w output (last line)
@@ -130,12 +159,12 @@ def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None,
_circuit_breaker.pop(domain, None) # Clear circuit breaker on success
return _DummyResponse(http_code, body)
else:
logger.error(f"bash curl fallback failed: exit={res.returncode} stderr={res.stderr[:200]}")
logger.error(f"curl fallback failed: exit={res.returncode} stderr={res.stderr[:200]}")
with _cb_lock:
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
except (subprocess.SubprocessError, ConnectionError, TimeoutError, OSError) as curl_e:
logger.error(f"bash curl fallback exception: {curl_e}")
logger.error(f"curl fallback exception: {curl_e}")
with _cb_lock:
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
+11 -1
View File
@@ -225,6 +225,11 @@ def _installed() -> bool:
def _pid_alive(pid: int) -> bool:
if pid <= 0:
return False
if os.name == "nt":
# Windows PIDs are reused and os.kill(pid, 0) is not a reliable
# ownership check. A persisted wormhole_status.json PID from an older
# run must never be treated as a process we own.
return False
try:
os.kill(pid, 0)
except OSError:
@@ -268,7 +273,12 @@ def _current_runtime_state() -> dict[str, Any]:
pid = int(_PROCESS.pid or 0)
elif _pid_alive(pid):
running = True
elif _probe_ready(timeout_s=0.35):
running = True
pid = 0
ready = running and _probe_ready()
if not running:
pid = 0
transport_active = status.get("transport_active", "") if ready else ""
proxy_active = status.get("proxy_active", "") if ready else ""
effective_transport = str(transport_active or settings.get("transport", "direct") or "direct").lower()
@@ -489,7 +499,7 @@ def disconnect_wormhole(*, reason: str = "disconnect") -> dict[str, Any]:
_PROCESS.kill()
except Exception:
pass
elif _pid_alive(pid):
elif os.name != "nt" and _pid_alive(pid):
try:
os.kill(pid, signal.SIGTERM)
except Exception:
+24 -7
View File
@@ -51,6 +51,10 @@ const NO_STORE_PROXY_HEADERS = {
Pragma: 'no-cache',
};
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function isSensitiveProxyPath(pathSegments: string[]): boolean {
const joined = pathSegments.join('/');
if (!joined) return false;
@@ -76,8 +80,7 @@ async function proxy(req: NextRequest, pathSegments: string[]): Promise<NextResp
isMesh &&
!isSensitiveMeshPath &&
['POST', 'PUT', 'DELETE'].includes(req.method.toUpperCase()) &&
(meshSegments.join('/') === 'send' ||
meshSegments.join('/') === 'vote' ||
(meshSegments.join('/') === 'vote' ||
meshSegments.join('/') === 'report' ||
meshSegments.join('/') === 'gate/create' ||
(meshSegments[0] === 'gate' && meshSegments[2] === 'message') ||
@@ -191,7 +194,7 @@ async function proxy(req: NextRequest, pathSegments: string[]): Promise<NextResp
}
const isBodyless = req.method === 'GET' || req.method === 'HEAD';
let upstream: Response;
let upstream: Response | null = null;
const requestInit: RequestInit & { duplex?: 'half' } = {
method: req.method,
headers: forwardHeaders,
@@ -202,12 +205,26 @@ async function proxy(req: NextRequest, pathSegments: string[]): Promise<NextResp
// Required for streaming request bodies in Node.js fetch
requestInit.duplex = 'half';
}
try {
upstream = await fetch(targetUrl.toString(), requestInit);
} catch {
const maxAttempts = isBodyless ? 18 : 1;
let fetchError: unknown = null;
for (let attempt = 1; attempt <= maxAttempts; attempt += 1) {
try {
upstream = await fetch(targetUrl.toString(), requestInit);
fetchError = null;
break;
} catch (error) {
fetchError = error;
if (attempt >= maxAttempts) break;
await sleep(250);
}
}
if (!upstream) {
return new NextResponse(JSON.stringify({ error: 'Backend unavailable' }), {
status: 502,
headers: { 'Content-Type': 'application/json' },
headers: {
'Content-Type': 'application/json',
'X-Proxy-Error': fetchError instanceof Error ? fetchError.name : 'fetch_failed',
},
});
}
+132 -72
View File
@@ -30,7 +30,7 @@ import type { ActiveLayers, KiwiSDR, Scanner, SelectedEntity } from '@/types/das
import type { ShodanSearchMatch } from '@/types/shodan';
import { API_BASE } from '@/lib/api';
import { useDataPolling, LAYER_TOGGLE_EVENT } from '@/hooks/useDataPolling';
import { useBackendStatus, useDataKey } from '@/hooks/useDataStore';
import { useBackendStatus, useDataKey, useDataKeys } from '@/hooks/useDataStore';
import { useReverseGeocode } from '@/hooks/useReverseGeocode';
import { useRegionDossier } from '@/hooks/useRegionDossier';
import { useAgentActions } from '@/hooks/useAgentActions';
@@ -61,6 +61,9 @@ const MaplibreViewer = dynamic(() => import('@/components/MaplibreViewer'), { ss
export default function Dashboard() {
const viewBoundsRef = useRef<{ south: number; west: number; north: number; east: number } | null>(null);
// Start the critical map data request before panel/control-plane effects.
// Non-map widgets can warm up after this; first paint needs flights, ships, and intel first.
useDataPolling();
const { mouseCoords, locationLabel, handleMouseCoords } = useReverseGeocode();
const [selectedEntity, setSelectedEntity] = useState<SelectedEntity | null>(null);
const [trackedSdr, setTrackedSdr] = useState<KiwiSDR | null>(null);
@@ -211,10 +214,35 @@ export default function Dashboard() {
const [shodanResults, setShodanResults] = useState<ShodanSearchMatch[]>([]);
const [, setShodanQueryLabel] = useState('');
const [shodanStyle, setShodanStyle] = useState<import('@/types/shodan').ShodanStyleConfig>({ shape: 'circle', color: '#16a34a', size: 'md' });
useDataPolling();
const backendStatus = useBackendStatus();
const spaceWeather = useDataKey('space_weather');
const feedHealth = useFeedHealth();
const bootSignals = useDataKeys([
'bootstrap_ready',
'commercial_flights',
'military_flights',
'tracked_flights',
'ships',
'news',
'threat_level',
] as const);
const criticalPaintReady = Boolean(
bootSignals.bootstrap_ready ||
(bootSignals.commercial_flights?.length || 0) > 0 ||
(bootSignals.military_flights?.length || 0) > 0 ||
(bootSignals.tracked_flights?.length || 0) > 0 ||
(bootSignals.ships?.length || 0) > 0 ||
(bootSignals.news?.length || 0) > 0 ||
bootSignals.threat_level,
);
const [secondaryBootReady, setSecondaryBootReady] = useState(false);
useEffect(() => {
if (secondaryBootReady) return;
const delay = criticalPaintReady ? 900 : 5500;
const id = window.setTimeout(() => setSecondaryBootReady(true), delay);
return () => window.clearTimeout(id);
}, [criticalPaintReady, secondaryBootReady]);
// Global keyboard shortcuts
useKeyboardShortcuts({
@@ -249,6 +277,7 @@ export default function Dashboard() {
const layersTimerRef = useRef<ReturnType<typeof setTimeout> | null>(null);
const initialLayerSyncRef = useRef(false);
useEffect(() => {
if (!secondaryBootReady) return;
const syncLayers = (triggerRefetch: boolean) =>
fetch(`${API_BASE}/api/layers`, {
method: 'POST',
@@ -258,7 +287,7 @@ export default function Dashboard() {
if (triggerRefetch) {
window.dispatchEvent(new Event(LAYER_TOGGLE_EVENT));
}
}).catch((e) => console.error('Failed to update backend layers:', e));
}).catch((e) => console.warn('Backend layer sync will retry after runtime is reachable:', e));
if (layersTimerRef.current) clearTimeout(layersTimerRef.current);
if (!initialLayerSyncRef.current) {
@@ -272,7 +301,7 @@ export default function Dashboard() {
return () => {
if (layersTimerRef.current) clearTimeout(layersTimerRef.current);
};
}, [activeLayers]);
}, [activeLayers, secondaryBootReady]);
// Left panel accordion state
const [leftDataMinimized, setLeftDataMinimized] = useState(false);
@@ -393,12 +422,28 @@ export default function Dashboard() {
};
const [activeFilters, setActiveFilters] = useState<Record<string, string[]>>({});
const firstPaintActiveLayers = useMemo<ActiveLayers>(() => {
if (secondaryBootReady) return activeLayers;
return {
...activeLayers,
cctv: false,
sar: false,
gibs_imagery: false,
highres_satellite: false,
sentinel_hub: false,
viirs_nightlights: false,
psk_reporter: false,
tinygs: false,
datacenters: false,
power_plants: false,
};
}, [activeLayers, secondaryBootReady]);
// Agent fly_to handler (sar_focus_aoi etc.) — wired here now that
// setFlyToLocation is in scope. show_image is routed through
// useAgentActions at the top of Dashboard.
useAgentActions(handleMapRightClick, ({ lat, lng }) => {
setFlyToLocation({ lat, lng, ts: Date.now() });
});
}, secondaryBootReady);
// Eavesdrop Mode State
const [isEavesdropping] = useState(false);
@@ -415,7 +460,7 @@ export default function Dashboard() {
{/* MAPLIBRE WEBGL OVERLAY */}
<ErrorBoundary name="Map">
<MaplibreViewer
activeLayers={activeLayers}
activeLayers={firstPaintActiveLayers}
activeFilters={activeFilters}
effects={memoizedEffects}
onEntityClick={setSelectedEntity}
@@ -502,74 +547,87 @@ export default function Dashboard() {
>
{/* 1. DATA LAYERS (Top) */}
<div className="contents" style={{ direction: 'ltr' }}>
<ErrorBoundary name="WorldviewLeftPanel">
<WorldviewLeftPanel
activeLayers={activeLayers}
setActiveLayers={setActiveLayers}
shodanResultCount={shodanResults.length}
onSettingsClick={() => setSettingsOpen(true)}
onLegendClick={() => setLegendOpen(true)}
onOpenSarAoiEditor={() => setSarAoiEditorOpen(true)}
gibsDate={gibsDate}
setGibsDate={setGibsDate}
gibsOpacity={gibsOpacity}
setGibsOpacity={setGibsOpacity}
sentinelDate={sentinelDate}
setSentinelDate={setSentinelDate}
sentinelOpacity={sentinelOpacity}
setSentinelOpacity={setSentinelOpacity}
sentinelPreset={sentinelPreset}
setSentinelPreset={setSentinelPreset}
onEntityClick={setSelectedEntity}
onFlyTo={handleFlyTo}
trackedSdr={trackedSdr}
setTrackedSdr={setTrackedSdr}
trackedScanner={trackedScanner}
setTrackedScanner={setTrackedScanner}
isMinimized={leftDataMinimized}
onMinimizedChange={setLeftDataMinimized}
/>
</ErrorBoundary>
{secondaryBootReady ? (
<ErrorBoundary name="WorldviewLeftPanel">
<WorldviewLeftPanel
activeLayers={activeLayers}
setActiveLayers={setActiveLayers}
shodanResultCount={shodanResults.length}
onSettingsClick={() => setSettingsOpen(true)}
onLegendClick={() => setLegendOpen(true)}
onOpenSarAoiEditor={() => setSarAoiEditorOpen(true)}
gibsDate={gibsDate}
setGibsDate={setGibsDate}
gibsOpacity={gibsOpacity}
setGibsOpacity={setGibsOpacity}
sentinelDate={sentinelDate}
setSentinelDate={setSentinelDate}
sentinelOpacity={sentinelOpacity}
setSentinelOpacity={setSentinelOpacity}
sentinelPreset={sentinelPreset}
setSentinelPreset={setSentinelPreset}
onEntityClick={setSelectedEntity}
onFlyTo={handleFlyTo}
trackedSdr={trackedSdr}
setTrackedSdr={setTrackedSdr}
trackedScanner={trackedScanner}
setTrackedScanner={setTrackedScanner}
isMinimized={leftDataMinimized}
onMinimizedChange={setLeftDataMinimized}
/>
</ErrorBoundary>
) : (
<div className="bg-[#05090d]/95 border border-cyan-900/50 p-4 font-mono text-cyan-500/70">
<div className="text-[11px] tracking-[0.2em] text-cyan-400 font-bold">DATA LAYERS</div>
<div className="mt-3 text-[10px] tracking-wider">PRIORITIZING MAP FEEDS</div>
</div>
)}
</div>
{/* 2. MESH CHAT (Middle) */}
<div className="contents" style={{ direction: 'ltr' }}>
<MeshChat
onFlyTo={handleFlyTo}
expanded={leftMeshExpanded}
onExpandedChange={setLeftMeshExpanded}
onSettingsClick={() => setSettingsOpen(true)}
onTerminalToggle={openSecureTerminalLauncher}
launchRequest={meshChatLaunchRequest}
/>
</div>
{secondaryBootReady && (
<div className="contents" style={{ direction: 'ltr' }}>
<MeshChat
onFlyTo={handleFlyTo}
expanded={leftMeshExpanded}
onExpandedChange={setLeftMeshExpanded}
onSettingsClick={() => setSettingsOpen(true)}
onTerminalToggle={openSecureTerminalLauncher}
launchRequest={meshChatLaunchRequest}
/>
</div>
)}
{/* 3. SHODAN CONNECTOR (Bottom) */}
<div className="contents" style={{ direction: 'ltr' }}>
<ShodanPanel
currentResults={shodanResults}
onOpenSettings={() => setSettingsOpen(true)}
settingsOpen={settingsOpen}
onResultsChange={(results, queryLabel) => {
setShodanResults(results);
setShodanQueryLabel(queryLabel);
setActiveLayers((prev) => ({ ...prev, shodan_overlay: results.length > 0 }));
}}
onSelectEntity={setSelectedEntity}
onStyleChange={setShodanStyle}
isMinimized={leftShodanMinimized}
onMinimizedChange={setLeftShodanMinimized}
/>
</div>
{secondaryBootReady && (
<div className="contents" style={{ direction: 'ltr' }}>
<ShodanPanel
currentResults={shodanResults}
onOpenSettings={() => setSettingsOpen(true)}
settingsOpen={settingsOpen}
onResultsChange={(results, queryLabel) => {
setShodanResults(results);
setShodanQueryLabel(queryLabel);
setActiveLayers((prev) => ({ ...prev, shodan_overlay: results.length > 0 }));
}}
onSelectEntity={setSelectedEntity}
onStyleChange={setShodanStyle}
isMinimized={leftShodanMinimized}
onMinimizedChange={setLeftShodanMinimized}
/>
</div>
)}
{/* 4. AI INTEL (Below Shodan) */}
<div className="contents" style={{ direction: 'ltr' }}>
<AIIntelPanel
onFlyTo={handleFlyTo}
pinPlacementMode={pinPlacementMode}
onPinPlacementModeChange={setPinPlacementMode}
/>
</div>
{secondaryBootReady && (
<div className="contents" style={{ direction: 'ltr' }}>
<AIIntelPanel
onFlyTo={handleFlyTo}
pinPlacementMode={pinPlacementMode}
onPinPlacementModeChange={setPinPlacementMode}
/>
</div>
)}
</motion.div>
{/* LEFT SIDEBAR TOGGLE TAB — aligns with Data Layers section */}
@@ -647,11 +705,13 @@ export default function Dashboard() {
{/* GLOBAL TICKER REPLACES MARKETS PANEL - RENDERED OUTSIDE THIS DIV */}
{/* EVENT TIMELINE */}
<div className={`flex-shrink-0 ${rightFocusedPanel && rightFocusedPanel !== 'predictions' ? 'hidden' : ''}`}>
<ErrorBoundary name="TimelinePanel">
<TimelinePanel />
</ErrorBoundary>
</div>
{secondaryBootReady && (
<div className={`flex-shrink-0 ${rightFocusedPanel && rightFocusedPanel !== 'predictions' ? 'hidden' : ''}`}>
<ErrorBoundary name="TimelinePanel">
<TimelinePanel />
</ErrorBoundary>
</div>
)}
{/* DATA FILTERS */}
<div className={`flex-shrink-0 ${rightFocusedPanel && rightFocusedPanel !== 'filters' ? 'hidden' : ''}`}>
+8 -2
View File
@@ -345,6 +345,7 @@ const MaplibreViewer = ({
const data = useMemo(() => ({ ...coreData, ...extraData }) as DashboardData, [coreData, extraData]);
const mapRef = useRef<MapRef>(null);
const mapInitRef = useRef(false);
const [mapReady, setMapReady] = useState(false);
const { theme } = useTheme();
const mapThemeStyle = useMemo<maplibregl.StyleSpecification>(
() => (theme === 'light' ? lightStyle : darkStyle) as maplibregl.StyleSpecification,
@@ -914,15 +915,20 @@ const MaplibreViewer = ({
// Load Images into the Map Style once loaded
const onMapLoad = useCallback((e: { target: maplibregl.Map }) => {
initializeMap(e.target);
setMapReady(true);
}, [initializeMap]);
const onMapStyleData = useCallback((e: { target: maplibregl.Map }) => {
initializeMap(e.target);
setMapReady(true);
}, [initializeMap]);
useEffect(() => {
const map = mapRef.current?.getMap();
if (map) initializeMap(map);
if (map) {
initializeMap(map);
setMapReady(true);
}
}, [initializeMap, theme]);
// Build a set of tracked icao24s to exclude from other flight layers
@@ -1561,7 +1567,7 @@ const MaplibreViewer = ({
}, [activeLayers.uap_sightings, activeLayers.wastewater, theme]);
// --- Imperative source updates: bypass React reconciliation for GeoJSON layers ---
const mapForHook = mapRef.current;
const mapForHook = mapReady ? mapRef.current : null;
useImperativeSource(mapForHook, 'commercial-flights', commFlightsGeoJSON);
useImperativeSource(mapForHook, 'private-flights', privFlightsGeoJSON);
useImperativeSource(mapForHook, 'private-jets', privJetsGeoJSON);
@@ -3915,7 +3915,7 @@ export function useMeshChatController({
wormholeEnabled &&
wormholeReadyState &&
!selectedGateAccessReady) ||
((activeTab === 'infonet' || activeTab === 'meshtastic') && anonymousPublicBlocked) ||
(activeTab === 'infonet' && anonymousPublicBlocked) ||
(activeTab === 'dms' &&
(dmView !== 'chat' ||
!selectedContact ||
@@ -34,7 +34,7 @@ export function useImperativeSource(
};
const pushWhenReady = () => {
let attemptsRemaining = 20;
let attemptsRemaining = 150;
const tryPush = () => {
if (cancelled) return;
@@ -62,6 +62,7 @@ export function useImperativeSource(
pushWhenReady();
};
rawMap.on('load', handleStyleData);
rawMap.on('styledata', handleStyleData);
// Skip redundant writes for unchanged references, but keep the styledata
@@ -73,6 +74,7 @@ export function useImperativeSource(
return () => {
cancelled = true;
rawMap.off('load', handleStyleData);
rawMap.off('styledata', handleStyleData);
if (timerRef.current) clearTimeout(timerRef.current);
if (retryTimerRef.current) clearTimeout(retryTimerRef.current);
+3 -1
View File
@@ -35,6 +35,7 @@ interface AgentAction {
export function useAgentActions(
onShowImage: (coords: { lat: number; lng: number }) => void,
onFlyTo?: (coords: { lat: number; lng: number; zoom?: number }) => void,
enabled = true,
) {
const onShowImageRef = useRef(onShowImage);
onShowImageRef.current = onShowImage;
@@ -70,9 +71,10 @@ export function useAgentActions(
useEffect(() => {
// Poll every 3 seconds — lightweight endpoint, ~50 bytes when empty
if (!enabled) return;
const interval = setInterval(poll, 3000);
// Initial poll on mount
poll();
return () => clearInterval(interval);
}, [poll]);
}, [enabled, poll]);
}
+46 -8
View File
@@ -59,6 +59,8 @@ type FastDataProbe = {
ships?: unknown[];
sigint?: unknown[];
cctv?: unknown[];
news?: unknown[];
threat_level?: unknown;
};
function hasMeaningfulFastData(json: FastDataProbe): boolean {
@@ -100,11 +102,37 @@ export function useDataPolling() {
_slowEtagRef = slowEtag;
let hasData = false;
let fetchedStartupFastPayload = false;
let fastTimerId: ReturnType<typeof setTimeout> | null = null;
let slowTimerId: ReturnType<typeof setTimeout> | null = null;
const fastAbortRef = { current: null as AbortController | null };
const slowAbortRef = { current: null as AbortController | null };
const fetchCriticalBootstrap = async () => {
try {
const res = await fetch(`${API_BASE}/api/bootstrap/critical`, {
headers: { Accept: 'application/json' },
});
if (res.ok) {
setStoreBackendStatus('connected');
const json = await res.json();
mergeData(json);
if (hasMeaningfulFastData(json) || (json.news?.length || 0) > 0 || json.threat_level) {
hasData = true;
}
}
} catch (e) {
const aborted =
typeof e === 'object' &&
e !== null &&
'name' in e &&
(e as { name?: string }).name === 'AbortError';
if (!aborted) {
console.warn("Critical bootstrap fetch will retry via live polling", e);
}
}
};
const fetchFastData = async () => {
if (fastTimerId) {
clearTimeout(fastTimerId);
@@ -116,9 +144,11 @@ export function useDataPolling() {
const controller = new AbortController();
fastAbortRef.current = controller;
try {
const useStartupPayload = !fetchedStartupFastPayload && !fastEtag.current;
const headers: Record<string, string> = {};
if (fastEtag.current) headers['If-None-Match'] = fastEtag.current;
const res = await fetch(`${API_BASE}/api/live-data/fast`, {
if (!useStartupPayload && fastEtag.current) headers['If-None-Match'] = fastEtag.current;
const url = `${API_BASE}/api/live-data/fast${useStartupPayload ? '?initial=1' : ''}`;
const res = await fetch(url, {
headers,
signal: controller.signal,
});
@@ -129,7 +159,10 @@ export function useDataPolling() {
}
if (res.ok) {
setStoreBackendStatus('connected');
fastEtag.current = res.headers.get('etag') || null;
// Do not keep the capped startup ETag. The next steady poll should
// request the full fast dataset and replace the representative first paint.
fastEtag.current = useStartupPayload ? null : res.headers.get('etag') || null;
if (useStartupPayload) fetchedStartupFastPayload = true;
const json = await res.json();
mergeData(json);
if (hasMeaningfulFastData(json)) hasData = true;
@@ -141,7 +174,7 @@ export function useDataPolling() {
'name' in e &&
(e as { name?: string }).name === 'AbortError';
if (!aborted) {
console.error("Failed fetching fast live data", e);
console.warn("Fast live data fetch will retry after runtime is reachable", e);
setStoreBackendStatus('disconnected');
}
} finally {
@@ -177,7 +210,7 @@ export function useDataPolling() {
'name' in e &&
(e as { name?: string }).name === 'AbortError';
if (!aborted) {
console.error("Failed fetching slow live data", e);
console.warn("Slow live data fetch will retry after runtime is reachable", e);
}
} finally {
if (slowAbortRef.current === controller) {
@@ -191,7 +224,8 @@ export function useDataPolling() {
const scheduleNext = (tier: 'fast' | 'slow') => {
if (tier === 'fast') {
const delay = hasData ? 15000 : 3000; // 3s startup retry → 15s steady state
fastTimerId = setTimeout(fetchFastData, delay);
const needsFullFastPayload = fetchedStartupFastPayload && !fastEtag.current;
fastTimerId = setTimeout(fetchFastData, needsFullFastPayload ? 750 : delay);
} else {
const delay = hasData ? 120000 : 5000; // 5s startup retry → 120s steady state
slowTimerId = setTimeout(fetchSlowData, delay);
@@ -208,8 +242,12 @@ export function useDataPolling() {
};
window.addEventListener(LAYER_TOGGLE_EVENT, onLayerToggle);
fetchFastData();
fetchSlowData();
void (async () => {
await fetchCriticalBootstrap();
fetchFastData();
// Let the bootstrap/fast payload paint before competing with the slow tier.
slowTimerId = setTimeout(fetchSlowData, 5000);
})();
return () => {
window.removeEventListener(LAYER_TOGGLE_EVENT, onLayerToggle);
+2 -2
View File
@@ -387,7 +387,7 @@ export async function refreshSnapshotList(): Promise<void> {
const json = await res.json();
updateTimelineFromSnapshots(sortSnapshots(json.snapshots || []));
} catch (e) {
console.error('Time Machine: failed to fetch snapshots', e);
console.warn('Time Machine snapshots will retry after runtime is reachable', e);
}
}
@@ -402,7 +402,7 @@ export async function refreshHourlyIndex(): Promise<void> {
setState({ hourlyIndex: json.hours || {} });
}
} catch (e) {
console.error('Time Machine: failed to fetch hourly index', e);
console.warn('Time Machine hourly index will retry after runtime is reachable', e);
}
}
+2
View File
@@ -826,6 +826,8 @@ export interface DashboardData {
cctv_total?: number;
satnogs_total?: number;
tinygs_total?: number;
bootstrap_ready?: boolean;
bootstrap_payload?: boolean;
sigint_totals?: {
total?: number;
meshtastic?: number;
+31
View File
@@ -0,0 +1,31 @@
param(
[string]$Root = (Resolve-Path (Join-Path $PSScriptRoot "..")).Path
)
$ErrorActionPreference = "Stop"
$Host.UI.RawUI.WindowTitle = "ShadowBroker Runtime"
Set-Location -LiteralPath $Root
Write-Host "==================================================="
Write-Host " ShadowBroker runtime"
Write-Host " Dashboard: http://localhost:3000"
Write-Host " Close this window or press Ctrl+C to stop."
Write-Host "==================================================="
Write-Host ""
try {
& node "frontend\scripts\dev-all.cjs"
$exitCode = $LASTEXITCODE
} catch {
Write-Host ""
Write-Host "[!] Runtime failed: $($_.Exception.Message)"
$exitCode = 1
}
Write-Host ""
Write-Host "==================================================="
Write-Host " ShadowBroker has stopped. Exit code: $exitCode"
Write-Host "==================================================="
Read-Host "Press Enter to close"
exit $exitCode
+2 -1
View File
@@ -299,7 +299,8 @@ echo ===================================================
echo (Press Ctrl+C to stop)
echo.
call npm run dev
start "ShadowBroker Runtime" powershell.exe -NoProfile -ExecutionPolicy Bypass -NoExit -File "%ROOT%\scripts\run-windows-runtime.ps1" -Root "%ROOT%"
exit /b 0
echo.
echo ===================================================
+9 -3
View File
@@ -1,7 +1,13 @@
#!/bin/bash
# Graceful shutdown: kill all child processes on exit/interrupt
trap 'kill 0' EXIT SIGINT SIGTERM
# Graceful shutdown: stop child processes without signaling the parent shell.
cleanup() {
trap - EXIT SIGINT SIGTERM
if command -v pkill >/dev/null 2>&1; then
pkill -P $$ 2>/dev/null || true
fi
}
trap cleanup EXIT SIGINT SIGTERM
echo "======================================================="
echo " S H A D O W B R O K E R - macOS / Linux Start "
@@ -252,4 +258,4 @@ echo "======================================================="
echo " (Press Ctrl+C to stop)"
echo ""
npm run dev
node scripts/dev-all.cjs