v0.9.5: The Voltron Update — modular architecture, stable IDs, parallelized boot

- Parallelized startup (60s → 15s) via ThreadPoolExecutor
- Adaptive polling engine with ETag caching (no more bbox interrupts)
- useCallback optimization for interpolation functions
- Sliding LAYERS/INTEL edge panels replace bulky Record Panel
- Modular fetcher architecture (flights, geo, infrastructure, financial, earth_observation)
- Stable entity IDs for GDELT & News popups (PR #63, credit @csysp)
- Admin auth (X-Admin-Key), rate limiting (slowapi), auto-updater
- Docker Swarm secrets support, env_check.py validation
- 85+ vitest tests, CI pipeline, geoJSON builder extraction
- Server-side viewport bbox filtering reduces payloads 80%+

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Former-commit-id: f2883150b5bc78ebc139d89cc966a76f7d7c0408
This commit is contained in:
anoracleofra-code
2026-03-14 14:01:54 -06:00
parent 60c90661d4
commit 90c2e90e2c
63 changed files with 6015 additions and 2756 deletions
+85 -1
View File
@@ -207,8 +207,66 @@ def get_ais_vessels() -> list[dict]:
return result
def ingest_ais_catcher(msgs: list[dict]) -> int:
"""Ingest decoded AIS messages from AIS-catcher HTTP feed.
Returns number of vessels updated."""
count = 0
now = time.time()
with _vessels_lock:
for msg in msgs:
mmsi = msg.get("mmsi")
if not mmsi or not isinstance(mmsi, int):
continue
vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi})
msg_type = msg.get("type", 0)
# Position reports (types 1, 2, 3 = Class A; 18, 19 = Class B)
if msg_type in (1, 2, 3, 18, 19):
lat = msg.get("lat")
lon = msg.get("lon")
if lat is not None and lon is not None and lat != 91.0 and lon != 181.0:
vessel["lat"] = lat
vessel["lng"] = lon
vessel["sog"] = msg.get("speed", 0)
vessel["cog"] = msg.get("course", 0)
heading = msg.get("heading", 511)
vessel["heading"] = heading if heading != 511 else vessel.get("cog", 0)
vessel["_updated"] = now
if msg.get("shipname"):
vessel["name"] = msg["shipname"].strip()
count += 1
# Static data (type 5 = Class A static; 24 = Class B static)
elif msg_type in (5, 24):
if msg.get("shipname"):
vessel["name"] = msg["shipname"].strip()
if msg.get("callsign"):
vessel["callsign"] = msg["callsign"].strip()
if msg.get("imo"):
vessel["imo"] = msg["imo"]
if msg.get("destination"):
vessel["destination"] = msg["destination"].strip().replace("@", "")
ship_type = msg.get("shiptype", 0)
if ship_type:
vessel["ais_type_code"] = ship_type
vessel["type"] = classify_vessel(ship_type, mmsi)
vessel["_updated"] = now
# Ensure country is set from MMSI MID
if "country" not in vessel:
vessel["country"] = get_country_from_mmsi(mmsi)
# Ensure name exists
if "name" not in vessel:
vessel["name"] = msg.get("shipname", "UNKNOWN") or "UNKNOWN"
return count
def _ais_stream_loop():
"""Main loop: spawn node proxy and process messages from stdout."""
global _proxy_process
import subprocess
import os
@@ -220,11 +278,13 @@ def _ais_stream_loop():
logger.info("Starting Node.js AIS Stream Proxy...")
process = subprocess.Popen(
['node', proxy_script, API_KEY],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1
)
_proxy_process = process
# Drain stderr in a background thread to prevent deadlock
import threading
@@ -361,7 +421,31 @@ def start_ais_stream():
def stop_ais_stream():
"""Stop the AIS WebSocket stream and save cache."""
global _ws_running
global _ws_running, _proxy_process
_ws_running = False
if _proxy_process and _proxy_process.stdin:
try:
_proxy_process.stdin.close()
except Exception:
pass
_save_cache() # Save on shutdown
logger.info("AIS Stream stopping...")
def update_ais_bbox(south: float, west: float, north: float, east: float):
"""Dynamically update the AIS stream bounding box via proxy stdin."""
global _proxy_process
if not _proxy_process or not _proxy_process.stdin:
return
try:
cmd = json.dumps({
"type": "update_bbox",
"bboxes": [[[south, west], [north, east]]]
})
_proxy_process.stdin.write(cmd + "\n")
_proxy_process.stdin.flush()
logger.info(f"Updated AIS bounding box to: S:{south:.2f} W:{west:.2f} N:{north:.2f} E:{east:.2f}")
except Exception as e:
logger.error(f"Failed to update AIS bbox: {e}")
+1 -1
View File
@@ -381,7 +381,7 @@ def update_carrier_positions():
if hull in positions:
positions[hull].update(pos)
logger.info(f"Carrier OSINT: updated {CARRIER_REGISTRY[hull]['name']} from news")
except Exception as e:
except (ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.warning(f"GDELT carrier fetch failed: {e}")
# Save and update the global state with enriched positions
+33
View File
@@ -0,0 +1,33 @@
# ─── ShadowBroker Backend Constants ──────────────────────────────────────────
# Centralized magic numbers. Import from here instead of hardcoding.
# ─── Flight Trails ──────────────────────────────────────────────────────────
FLIGHT_TRAIL_MAX_TRACKED = 2000 # Max concurrent tracked trails before LRU eviction
FLIGHT_TRAIL_POINTS_PER_FLIGHT = 200 # Max trail points kept per aircraft
TRACKED_TRAIL_TTL_S = 1800 # 30 min - trail TTL for tracked flights
DEFAULT_TRAIL_TTL_S = 300 # 5 min - trail TTL for non-tracked flights
# ─── Detection Thresholds ──────────────────────────────────────────────────
HOLD_PATTERN_DEGREES = 300 # Total heading change to flag holding pattern
GPS_JAMMING_NACP_THRESHOLD = 8 # NACp below this = degraded GPS signal
GPS_JAMMING_GRID_SIZE = 1.0 # 1 degree grid for aggregation
GPS_JAMMING_MIN_RATIO = 0.25 # 25% degraded aircraft to flag zone
# ─── Network & Circuit Breaker ──────────────────────────────────────────────
CIRCUIT_BREAKER_TTL_S = 120 # Skip domain for 2 min after total failure
DOMAIN_FAIL_TTL_S = 300 # Skip requests.get for 5 min, go straight to curl
CONNECT_TIMEOUT_S = 3 # Short connect timeout for fast firewall-block detection
# ─── Data Fetcher Intervals ────────────────────────────────────────────────
FAST_FETCH_INTERVAL_S = 60 # Flights, ships, satellites, military
SLOW_FETCH_INTERVAL_MIN = 30 # News, markets, space weather
CCTV_FETCH_INTERVAL_MIN = 1 # CCTV camera pipeline
LIVEUAMAP_FETCH_INTERVAL_HR = 12 # LiveUAMap scraper
# ─── External API ──────────────────────────────────────────────────────────
OPENSKY_RATE_LIMIT_S = 300 # Only re-fetch OpenSky every 5 minutes
OPENSKY_REQUEST_TIMEOUT_S = 15 # Timeout for OpenSky API calls
ROUTE_FETCH_TIMEOUT_S = 15 # Timeout for adsb.lol route lookups
# ─── Internet Outage Detection ─────────────────────────────────────────────
INTERNET_OUTAGE_MIN_SEVERITY = 0.10 # 10% drop minimum to show
+69 -522
View File
@@ -1,510 +1,54 @@
"""Data fetcher orchestrator — schedules and coordinates all data source modules.
Heavy logic has been extracted into services/fetchers/:
- _store.py — shared state (latest_data, locks, timestamps)
- plane_alert.py — aircraft enrichment DB
- flights.py — commercial flights, routes, trails, GPS jamming
- military.py — military flights, UAV detection
- satellites.py — satellite tracking (SGP4)
- news.py — RSS news fetching, clustering, risk assessment
- _store.py — shared state (latest_data, locks, timestamps)
- plane_alert.py — aircraft enrichment DB
- flights.py — commercial flights, routes, trails, GPS jamming
- military.py — military flights, UAV detection
- satellites.py — satellite tracking (SGP4)
- news.py — RSS news fetching, clustering, risk assessment
- yacht_alert.py — superyacht alert enrichment
- financial.py — defense stocks, oil prices
- earth_observation.py — earthquakes, FIRMS fires, space weather, weather radar
- infrastructure.py — internet outages, data centers, CCTV, KiwiSDR
- geo.py — ships, airports, frontlines, GDELT, LiveUAMap
"""
import yfinance as yf
import csv
import io
import json
import time
import math
import logging
import heapq
import concurrent.futures
from pathlib import Path
from datetime import datetime
from cachetools import TTLCache
from apscheduler.schedulers.background import BackgroundScheduler
from dotenv import load_dotenv
load_dotenv()
from services.network_utils import fetch_with_curl
from services.cctv_pipeline import (
init_db, TFLJamCamIngestor, LTASingaporeIngestor,
AustinTXIngestor, NYCDOTIngestor, get_all_cameras,
)
from apscheduler.schedulers.background import BackgroundScheduler
from services.cctv_pipeline import init_db
# Shared state — all fetcher modules read/write through this
from services.fetchers._store import (
latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — source_timestamps re-exported for main.py
latest_data, source_timestamps, _mark_fresh, _data_lock, # noqa: F401 — re-exported for main.py
)
# Domain-specific fetcher modules
from services.fetchers.flights import fetch_flights
from services.fetchers.military import fetch_military_flights
from services.fetchers.satellites import fetch_satellites
from services.fetchers.news import fetch_news
# Domain-specific fetcher modules (already extracted)
from services.fetchers.flights import fetch_flights # noqa: F401
from services.fetchers.flights import _BLIND_SPOT_REGIONS # noqa: F401 — re-exported for tests
from services.fetchers.military import fetch_military_flights # noqa: F401
from services.fetchers.satellites import fetch_satellites # noqa: F401
from services.fetchers.news import fetch_news # noqa: F401
# Newly extracted fetcher modules
from services.fetchers.financial import fetch_defense_stocks, fetch_oil_prices # noqa: F401
from services.fetchers.earth_observation import ( # noqa: F401
fetch_earthquakes, fetch_firms_fires, fetch_space_weather, fetch_weather,
)
from services.fetchers.infrastructure import ( # noqa: F401
fetch_internet_outages, fetch_datacenters, fetch_cctv, fetch_kiwisdr,
)
from services.fetchers.geo import ( # noqa: F401
fetch_ships, fetch_airports, find_nearest_airport, cached_airports,
fetch_frontlines, fetch_gdelt, fetch_geopolitics, update_liveuamap,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Financial data
# ---------------------------------------------------------------------------
def _fetch_single_ticker(symbol: str, period: str = "2d"):
"""Fetch a single yfinance ticker. Returns (symbol, data_dict) or (symbol, None)."""
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period=period)
if len(hist) >= 1:
current_price = hist['Close'].iloc[-1]
prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price
change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0
return symbol, {
"price": round(float(current_price), 2),
"change_percent": round(float(change_percent), 2),
"up": bool(change_percent >= 0)
}
except Exception as e:
logger.warning(f"Could not fetch data for {symbol}: {e}")
return symbol, None
def fetch_defense_stocks():
tickers = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"]
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
results = pool.map(lambda t: _fetch_single_ticker(t, "2d"), tickers)
stocks_data = {sym: data for sym, data in results if data}
with _data_lock:
latest_data['stocks'] = stocks_data
_mark_fresh("stocks")
except Exception as e:
logger.error(f"Error fetching stocks: {e}")
def fetch_oil_prices():
tickers = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"}
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
results = pool.map(lambda item: (_fetch_single_ticker(item[1], "5d")[1], item[0]), tickers.items())
oil_data = {name: data for data, name in results if data}
with _data_lock:
latest_data['oil'] = oil_data
_mark_fresh("oil")
except Exception as e:
logger.error(f"Error fetching oil: {e}")
# ---------------------------------------------------------------------------
# Weather
# ---------------------------------------------------------------------------
def fetch_weather():
try:
url = "https://api.rainviewer.com/public/weather-maps.json"
response = fetch_with_curl(url, timeout=10)
if response.status_code == 200:
data = response.json()
if "radar" in data and "past" in data["radar"]:
latest_time = data["radar"]["past"][-1]["time"]
with _data_lock:
latest_data["weather"] = {"time": latest_time, "host": data.get("host", "https://tilecache.rainviewer.com")}
_mark_fresh("weather")
except Exception as e:
logger.error(f"Error fetching weather: {e}")
# ---------------------------------------------------------------------------
# CCTV
# ---------------------------------------------------------------------------
def fetch_cctv():
try:
cameras = get_all_cameras()
with _data_lock:
latest_data["cctv"] = cameras
_mark_fresh("cctv")
except Exception as e:
logger.error(f"Error fetching cctv from DB: {e}")
with _data_lock:
latest_data["cctv"] = []
# ---------------------------------------------------------------------------
# KiwiSDR
# ---------------------------------------------------------------------------
def fetch_kiwisdr():
try:
from services.kiwisdr_fetcher import fetch_kiwisdr_nodes
nodes = fetch_kiwisdr_nodes()
with _data_lock:
latest_data["kiwisdr"] = nodes
_mark_fresh("kiwisdr")
except Exception as e:
logger.error(f"Error fetching KiwiSDR nodes: {e}")
with _data_lock:
latest_data["kiwisdr"] = []
# ---------------------------------------------------------------------------
# NASA FIRMS Fires
# ---------------------------------------------------------------------------
def fetch_firms_fires():
"""Fetch global fire/thermal anomalies from NASA FIRMS (NOAA-20 VIIRS, 24h, no key needed)."""
fires = []
try:
url = "https://firms.modaps.eosdis.nasa.gov/data/active_fire/noaa-20-viirs-c2/csv/J1_VIIRS_C2_Global_24h.csv"
response = fetch_with_curl(url, timeout=30)
if response.status_code == 200:
reader = csv.DictReader(io.StringIO(response.text))
all_rows = []
for row in reader:
try:
lat = float(row.get("latitude", 0))
lng = float(row.get("longitude", 0))
frp = float(row.get("frp", 0))
conf = row.get("confidence", "nominal")
daynight = row.get("daynight", "")
bright = float(row.get("bright_ti4", 0))
all_rows.append({
"lat": lat, "lng": lng, "frp": frp,
"brightness": bright, "confidence": conf,
"daynight": daynight,
"acq_date": row.get("acq_date", ""),
"acq_time": row.get("acq_time", ""),
})
except (ValueError, TypeError):
continue
fires = heapq.nlargest(5000, all_rows, key=lambda x: x["frp"])
logger.info(f"FIRMS fires: {len(fires)} hotspots (from {response.status_code})")
except Exception as e:
logger.error(f"Error fetching FIRMS fires: {e}")
with _data_lock:
latest_data["firms_fires"] = fires
if fires:
_mark_fresh("firms_fires")
# ---------------------------------------------------------------------------
# Space Weather
# ---------------------------------------------------------------------------
def fetch_space_weather():
"""Fetch NOAA SWPC Kp index and recent solar events."""
try:
kp_resp = fetch_with_curl("https://services.swpc.noaa.gov/json/planetary_k_index_1m.json", timeout=10)
kp_value = None
kp_text = "QUIET"
if kp_resp.status_code == 200:
kp_data = kp_resp.json()
if kp_data:
latest_kp = kp_data[-1]
kp_value = float(latest_kp.get("kp_index", 0))
if kp_value >= 7:
kp_text = f"STORM G{min(int(kp_value) - 4, 5)}"
elif kp_value >= 5:
kp_text = f"STORM G{min(int(kp_value) - 4, 5)}"
elif kp_value >= 4:
kp_text = "ACTIVE"
elif kp_value >= 3:
kp_text = "UNSETTLED"
events = []
ev_resp = fetch_with_curl("https://services.swpc.noaa.gov/json/edited_events.json", timeout=10)
if ev_resp.status_code == 200:
all_events = ev_resp.json()
for ev in all_events[-10:]:
events.append({
"type": ev.get("type", ""),
"begin": ev.get("begin", ""),
"end": ev.get("end", ""),
"classtype": ev.get("classtype", ""),
})
with _data_lock:
latest_data["space_weather"] = {
"kp_index": kp_value,
"kp_text": kp_text,
"events": events,
}
_mark_fresh("space_weather")
logger.info(f"Space weather: Kp={kp_value} ({kp_text}), {len(events)} events")
except Exception as e:
logger.error(f"Error fetching space weather: {e}")
# ---------------------------------------------------------------------------
# Internet Outages (IODA)
# ---------------------------------------------------------------------------
_region_geocode_cache: TTLCache = TTLCache(maxsize=2000, ttl=86400)
def _geocode_region(region_name: str, country_name: str) -> tuple:
"""Geocode a region using OpenStreetMap Nominatim (cached, respects rate limit)."""
cache_key = f"{region_name}|{country_name}"
if cache_key in _region_geocode_cache:
return _region_geocode_cache[cache_key]
try:
import urllib.parse
query = urllib.parse.quote(f"{region_name}, {country_name}")
url = f"https://nominatim.openstreetmap.org/search?q={query}&format=json&limit=1"
response = fetch_with_curl(url, timeout=8, headers={"User-Agent": "ShadowBroker-OSINT/1.0"})
if response.status_code == 200:
results = response.json()
if results:
lat = float(results[0]["lat"])
lon = float(results[0]["lon"])
_region_geocode_cache[cache_key] = (lat, lon)
return (lat, lon)
except Exception:
pass
_region_geocode_cache[cache_key] = None
return None
def fetch_internet_outages():
"""Fetch regional internet outage alerts from IODA (Georgia Tech)."""
RELIABLE_DATASOURCES = {"bgp", "ping-slash24"}
outages = []
try:
now = int(time.time())
start = now - 86400
url = f"https://api.ioda.inetintel.cc.gatech.edu/v2/outages/alerts?from={start}&until={now}&limit=500"
response = fetch_with_curl(url, timeout=15)
if response.status_code == 200:
data = response.json()
alerts = data.get("data", [])
region_outages = {}
for alert in alerts:
entity = alert.get("entity", {})
etype = entity.get("type", "")
level = alert.get("level", "")
if level == "normal" or etype != "region":
continue
datasource = alert.get("datasource", "")
if datasource not in RELIABLE_DATASOURCES:
continue
code = entity.get("code", "")
name = entity.get("name", "")
attrs = entity.get("attrs", {})
country_code = attrs.get("country_code", "")
country_name = attrs.get("country_name", "")
value = alert.get("value", 0)
history_value = alert.get("historyValue", 0)
severity = 0
if history_value and history_value > 0:
severity = round((1 - value / history_value) * 100)
severity = max(0, min(severity, 100))
if severity < 10:
continue
if code not in region_outages or severity > region_outages[code]["severity"]:
region_outages[code] = {
"region_code": code,
"region_name": name,
"country_code": country_code,
"country_name": country_name,
"level": level,
"datasource": datasource,
"severity": severity,
}
geocoded = []
for rcode, r in region_outages.items():
coords = _geocode_region(r["region_name"], r["country_name"])
if coords:
r["lat"] = coords[0]
r["lng"] = coords[1]
geocoded.append(r)
outages = heapq.nlargest(100, geocoded, key=lambda x: x["severity"])
logger.info(f"Internet outages: {len(outages)} regions affected")
except Exception as e:
logger.error(f"Error fetching internet outages: {e}")
with _data_lock:
latest_data["internet_outages"] = outages
if outages:
_mark_fresh("internet_outages")
# ---------------------------------------------------------------------------
# Data Centers
# ---------------------------------------------------------------------------
_DC_GEOCODED_PATH = Path(__file__).parent.parent / "data" / "datacenters_geocoded.json"
def fetch_datacenters():
"""Load geocoded data centers (5K+ street-level precise locations)."""
dcs = []
try:
if not _DC_GEOCODED_PATH.exists():
logger.warning(f"Geocoded DC file not found: {_DC_GEOCODED_PATH}")
return
raw = json.loads(_DC_GEOCODED_PATH.read_text(encoding="utf-8"))
for entry in raw:
lat = entry.get("lat")
lng = entry.get("lng")
if lat is None or lng is None:
continue
if not (-90 <= lat <= 90 and -180 <= lng <= 180):
continue
dcs.append({
"name": entry.get("name", "Unknown"),
"company": entry.get("company", ""),
"street": entry.get("street", ""),
"city": entry.get("city", ""),
"country": entry.get("country", ""),
"zip": entry.get("zip", ""),
"lat": lat, "lng": lng,
})
logger.info(f"Data centers: {len(dcs)} geocoded locations loaded")
except Exception as e:
logger.error(f"Error loading data centers: {e}")
with _data_lock:
latest_data["datacenters"] = dcs
if dcs:
_mark_fresh("datacenters")
# ---------------------------------------------------------------------------
# Earthquakes
# ---------------------------------------------------------------------------
def fetch_earthquakes():
quakes = []
try:
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson"
response = fetch_with_curl(url, timeout=10)
if response.status_code == 200:
features = response.json().get("features", [])
for f in features[:50]:
mag = f["properties"]["mag"]
lng, lat, depth = f["geometry"]["coordinates"]
quakes.append({
"id": f["id"], "mag": mag,
"lat": lat, "lng": lng,
"place": f["properties"]["place"]
})
except Exception as e:
logger.error(f"Error fetching earthquakes: {e}")
with _data_lock:
latest_data["earthquakes"] = quakes
if quakes:
_mark_fresh("earthquakes")
# ---------------------------------------------------------------------------
# Ships (AIS + Carriers)
# ---------------------------------------------------------------------------
def fetch_ships():
"""Fetch real-time AIS vessel data and combine with OSINT carrier positions."""
from services.ais_stream import get_ais_vessels
from services.carrier_tracker import get_carrier_positions
ships = []
try:
carriers = get_carrier_positions()
ships.extend(carriers)
except Exception as e:
logger.error(f"Carrier tracker error (non-fatal): {e}")
carriers = []
try:
ais_vessels = get_ais_vessels()
ships.extend(ais_vessels)
except Exception as e:
logger.error(f"AIS stream error (non-fatal): {e}")
ais_vessels = []
logger.info(f"Ships: {len(carriers)} carriers + {len(ais_vessels)} AIS vessels")
with _data_lock:
latest_data['ships'] = ships
_mark_fresh("ships")
# ---------------------------------------------------------------------------
# Airports
# ---------------------------------------------------------------------------
cached_airports = []
def find_nearest_airport(lat, lng, max_distance_nm=200):
"""Find the nearest large airport to a given lat/lng using haversine distance."""
if not cached_airports:
return None
best = None
best_dist = float('inf')
lat_r = math.radians(lat)
lng_r = math.radians(lng)
for apt in cached_airports:
apt_lat_r = math.radians(apt['lat'])
apt_lng_r = math.radians(apt['lng'])
dlat = apt_lat_r - lat_r
dlng = apt_lng_r - lng_r
a = math.sin(dlat / 2) ** 2 + math.cos(lat_r) * math.cos(apt_lat_r) * math.sin(dlng / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
dist_nm = 3440.065 * c
if dist_nm < best_dist:
best_dist = dist_nm
best = apt
if best and best_dist <= max_distance_nm:
return {
"iata": best['iata'], "name": best['name'],
"lat": best['lat'], "lng": best['lng'],
"distance_nm": round(best_dist, 1)
}
return None
def fetch_airports():
global cached_airports
if not cached_airports:
logger.info("Downloading global airports database from ourairports.com...")
try:
url = "https://ourairports.com/data/airports.csv"
response = fetch_with_curl(url, timeout=15)
if response.status_code == 200:
f = io.StringIO(response.text)
reader = csv.DictReader(f)
for row in reader:
if row['type'] == 'large_airport' and row['iata_code']:
cached_airports.append({
"id": row['ident'],
"name": row['name'],
"iata": row['iata_code'],
"lat": float(row['latitude_deg']),
"lng": float(row['longitude_deg']),
"type": "airport"
})
logger.info(f"Loaded {len(cached_airports)} large airports into cache.")
except Exception as e:
logger.error(f"Error fetching airports: {e}")
with _data_lock:
latest_data['airports'] = cached_airports
# ---------------------------------------------------------------------------
# Geopolitics & Liveuamap
# ---------------------------------------------------------------------------
from services.geopolitics import fetch_ukraine_frontlines, fetch_global_military_incidents
def fetch_frontlines():
"""Fetch Ukraine frontline data (fast — single GitHub API call)."""
try:
frontlines = fetch_ukraine_frontlines()
if frontlines:
with _data_lock:
latest_data['frontlines'] = frontlines
_mark_fresh("frontlines")
except Exception as e:
logger.error(f"Error fetching frontlines: {e}")
def fetch_gdelt():
"""Fetch GDELT global military incidents (slow — downloads 32 ZIP files)."""
try:
gdelt = fetch_global_military_incidents()
if gdelt is not None:
with _data_lock:
latest_data['gdelt'] = gdelt
_mark_fresh("gdelt")
except Exception as e:
logger.error(f"Error fetching GDELT: {e}")
def fetch_geopolitics():
"""Legacy wrapper — runs both sequentially. Used by recurring scheduler."""
fetch_frontlines()
fetch_gdelt()
def update_liveuamap():
logger.info("Running scheduled Liveuamap scraper...")
try:
from services.liveuamap_scraper import fetch_liveuamap
res = fetch_liveuamap()
if res:
with _data_lock:
latest_data['liveuamap'] = res
_mark_fresh("liveuamap")
except Exception as e:
logger.error(f"Liveuamap scraper error: {e}")
# ---------------------------------------------------------------------------
# Scheduler & Orchestration
# ---------------------------------------------------------------------------
@@ -525,23 +69,21 @@ def update_fast_data():
logger.info("Fast-tier update complete.")
def update_slow_data():
"""Slow-tier: feeds that change infrequently (every 30min).
Each fetcher writes to latest_data independently as it finishes,
so the frontend sees results progressively — no all-or-nothing barrier."""
"""Slow-tier: contextual + enrichment data that refreshes less often (every 510 min)."""
logger.info("Slow-tier data update starting...")
slow_funcs = [
fetch_news,
fetch_earthquakes,
fetch_firms_fires,
fetch_defense_stocks,
fetch_oil_prices,
fetch_weather,
fetch_cctv,
fetch_earthquakes,
fetch_frontlines, # fast — single GitHub API call
fetch_gdelt, # slow — 32 ZIP downloads (runs in parallel, won't block frontlines)
fetch_kiwisdr,
fetch_space_weather,
fetch_internet_outages,
fetch_firms_fires,
fetch_cctv,
fetch_kiwisdr,
fetch_frontlines,
fetch_gdelt,
fetch_datacenters,
]
with concurrent.futures.ThreadPoolExecutor(max_workers=len(slow_funcs)) as executor:
@@ -550,7 +92,7 @@ def update_slow_data():
logger.info("Slow-tier update complete.")
def update_all_data():
"""Full update — runs on startup. All tiers run IN PARALLEL for fastest startup."""
"""Full refresh — all tiers run IN PARALLEL for fastest startup."""
logger.info("Full data update starting (parallel)...")
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as pool:
f0 = pool.submit(fetch_airports)
@@ -559,39 +101,44 @@ def update_all_data():
concurrent.futures.wait([f0, f1, f2])
logger.info("Full data update complete.")
scheduler = BackgroundScheduler()
_scheduler = None
def start_scheduler():
global _scheduler
init_db()
_scheduler = BackgroundScheduler(daemon=True)
# NOTE: initial update_all_data() is called synchronously in main.py lifespan
# before start_scheduler(). These are only the RECURRING interval jobs.
scheduler.add_job(update_fast_data, 'interval', seconds=60)
scheduler.add_job(update_slow_data, 'interval', minutes=30)
# Fast tier — every 60 seconds
_scheduler.add_job(update_fast_data, 'interval', seconds=60, id='fast_tier', max_instances=1, misfire_grace_time=30)
def update_cctvs():
logger.info("Running CCTV Pipeline Ingestion...")
ingestors = [
TFLJamCamIngestor,
LTASingaporeIngestor,
AustinTXIngestor,
NYCDOTIngestor
]
for ingestor in ingestors:
try:
ingestor().ingest()
except Exception as e:
logger.error(f"Failed {ingestor.__name__} cctv ingest: {e}")
fetch_cctv()
# Slow tier — every 5 minutes
_scheduler.add_job(update_slow_data, 'interval', minutes=5, id='slow_tier', max_instances=1, misfire_grace_time=120)
scheduler.add_job(update_cctvs, 'interval', minutes=1)
# Very slow — every 15 minutes
_scheduler.add_job(fetch_gdelt, 'interval', minutes=15, id='gdelt', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(update_liveuamap, 'interval', minutes=15, id='liveuamap', max_instances=1, misfire_grace_time=120)
scheduler.add_job(update_liveuamap, 'interval', hours=12)
# CCTV pipeline refresh — every 10 minutes
# Instantiate once and reuse — avoids re-creating DB connections on every tick
from services.cctv_pipeline import (
TFLJamCamIngestor, LTASingaporeIngestor,
AustinTXIngestor, NYCDOTIngestor,
)
_cctv_tfl = TFLJamCamIngestor()
_cctv_lta = LTASingaporeIngestor()
_cctv_atx = AustinTXIngestor()
_cctv_nyc = NYCDOTIngestor()
_scheduler.add_job(_cctv_tfl.ingest, 'interval', minutes=10, id='cctv_tfl', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_lta.ingest, 'interval', minutes=10, id='cctv_lta', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_atx.ingest, 'interval', minutes=10, id='cctv_atx', max_instances=1, misfire_grace_time=120)
_scheduler.add_job(_cctv_nyc.ingest, 'interval', minutes=10, id='cctv_nyc', max_instances=1, misfire_grace_time=120)
scheduler.start()
_scheduler.start()
logger.info("Scheduler started.")
def stop_scheduler():
scheduler.shutdown()
if _scheduler:
_scheduler.shutdown(wait=False)
def get_latest_data():
with _data_lock:
+77
View File
@@ -0,0 +1,77 @@
"""Startup environment validation — called once in the FastAPI lifespan hook.
Ensures required env vars are present before the scheduler starts.
Logs warnings for optional keys that degrade functionality when missing.
"""
import os
import sys
import logging
logger = logging.getLogger(__name__)
# Keys grouped by criticality
_REQUIRED = {
# Empty for now — add keys here only if the app literally cannot function without them
}
_CRITICAL_WARN = {
"ADMIN_KEY": "Authentication for /api/settings and /api/system/update — endpoints are UNPROTECTED without it!",
}
_OPTIONAL = {
"AIS_API_KEY": "AIS vessel streaming (ships layer will be empty without it)",
"OPENSKY_CLIENT_ID": "OpenSky OAuth2 — gap-fill flights in Africa/Asia/LatAm",
"OPENSKY_CLIENT_SECRET": "OpenSky OAuth2 — gap-fill flights in Africa/Asia/LatAm",
"LTA_ACCOUNT_KEY": "Singapore LTA traffic cameras (CCTV layer)",
}
def validate_env(*, strict: bool = True) -> bool:
"""Validate environment variables at startup.
Args:
strict: If True, exit the process on missing required keys.
If False, only log errors (useful for tests).
Returns:
True if all required keys are present, False otherwise.
"""
all_ok = True
# Required keys — must be set
for key, desc in _REQUIRED.items():
value = os.environ.get(key, "").strip()
if not value:
logger.error(
"❌ REQUIRED env var %s is not set. %s\n"
" Set it in .env or via Docker secrets (%s_FILE).",
key, desc, key,
)
all_ok = False
if not all_ok and strict:
logger.critical("Startup aborted — required environment variables are missing.")
sys.exit(1)
# Critical-warn keys — app works but security/functionality is degraded
for key, desc in _CRITICAL_WARN.items():
value = os.environ.get(key, "").strip()
if not value:
logger.critical(
"🔓 CRITICAL: env var %s is not set — %s\n"
" This is safe for local dev but MUST be set in production.",
key, desc,
)
# Optional keys — warn if missing
for key, desc in _OPTIONAL.items():
value = os.environ.get(key, "").strip()
if not value:
logger.warning(
"⚠️ Optional env var %s is not set — %s", key, desc
)
if all_ok:
logger.info("✅ Environment validation passed.")
return all_ok
@@ -0,0 +1,144 @@
"""Earth-observation fetchers — earthquakes, FIRMS fires, space weather, weather radar."""
import csv
import io
import logging
import heapq
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Earthquakes (USGS)
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=1)
def fetch_earthquakes():
quakes = []
try:
url = "https://earthquake.usgs.gov/earthquakes/feed/v1.0/summary/2.5_day.geojson"
response = fetch_with_curl(url, timeout=10)
if response.status_code == 200:
features = response.json().get("features", [])
for f in features[:50]:
mag = f["properties"]["mag"]
lng, lat, depth = f["geometry"]["coordinates"]
quakes.append({
"id": f["id"], "mag": mag,
"lat": lat, "lng": lng,
"place": f["properties"]["place"]
})
except Exception as e:
logger.error(f"Error fetching earthquakes: {e}")
with _data_lock:
latest_data["earthquakes"] = quakes
if quakes:
_mark_fresh("earthquakes")
# ---------------------------------------------------------------------------
# NASA FIRMS Fires
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=2)
def fetch_firms_fires():
"""Fetch global fire/thermal anomalies from NASA FIRMS (NOAA-20 VIIRS, 24h, no key needed)."""
fires = []
try:
url = "https://firms.modaps.eosdis.nasa.gov/data/active_fire/noaa-20-viirs-c2/csv/J1_VIIRS_C2_Global_24h.csv"
response = fetch_with_curl(url, timeout=30)
if response.status_code == 200:
reader = csv.DictReader(io.StringIO(response.text))
all_rows = []
for row in reader:
try:
lat = float(row.get("latitude", 0))
lng = float(row.get("longitude", 0))
frp = float(row.get("frp", 0))
conf = row.get("confidence", "nominal")
daynight = row.get("daynight", "")
bright = float(row.get("bright_ti4", 0))
all_rows.append({
"lat": lat, "lng": lng, "frp": frp,
"brightness": bright, "confidence": conf,
"daynight": daynight,
"acq_date": row.get("acq_date", ""),
"acq_time": row.get("acq_time", ""),
})
except (ValueError, TypeError):
continue
fires = heapq.nlargest(5000, all_rows, key=lambda x: x["frp"])
logger.info(f"FIRMS fires: {len(fires)} hotspots (from {response.status_code})")
except Exception as e:
logger.error(f"Error fetching FIRMS fires: {e}")
with _data_lock:
latest_data["firms_fires"] = fires
if fires:
_mark_fresh("firms_fires")
# ---------------------------------------------------------------------------
# Space Weather (NOAA SWPC)
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=1)
def fetch_space_weather():
"""Fetch NOAA SWPC Kp index and recent solar events."""
try:
kp_resp = fetch_with_curl("https://services.swpc.noaa.gov/json/planetary_k_index_1m.json", timeout=10)
kp_value = None
kp_text = "QUIET"
if kp_resp.status_code == 200:
kp_data = kp_resp.json()
if kp_data:
latest_kp = kp_data[-1]
kp_value = float(latest_kp.get("kp_index", 0))
if kp_value >= 7:
kp_text = f"STORM G{min(int(kp_value) - 4, 5)}"
elif kp_value >= 5:
kp_text = f"STORM G{min(int(kp_value) - 4, 5)}"
elif kp_value >= 4:
kp_text = "ACTIVE"
elif kp_value >= 3:
kp_text = "UNSETTLED"
events = []
ev_resp = fetch_with_curl("https://services.swpc.noaa.gov/json/edited_events.json", timeout=10)
if ev_resp.status_code == 200:
all_events = ev_resp.json()
for ev in all_events[-10:]:
events.append({
"type": ev.get("type", ""),
"begin": ev.get("begin", ""),
"end": ev.get("end", ""),
"classtype": ev.get("classtype", ""),
})
with _data_lock:
latest_data["space_weather"] = {
"kp_index": kp_value,
"kp_text": kp_text,
"events": events,
}
_mark_fresh("space_weather")
logger.info(f"Space weather: Kp={kp_value} ({kp_text}), {len(events)} events")
except Exception as e:
logger.error(f"Error fetching space weather: {e}")
# ---------------------------------------------------------------------------
# Weather Radar (RainViewer)
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=1)
def fetch_weather():
try:
url = "https://api.rainviewer.com/public/weather-maps.json"
response = fetch_with_curl(url, timeout=10)
if response.status_code == 200:
data = response.json()
if "radar" in data and "past" in data["radar"]:
latest_time = data["radar"]["past"][-1]["time"]
with _data_lock:
latest_data["weather"] = {"time": latest_time, "host": data.get("host", "https://tilecache.rainviewer.com")}
_mark_fresh("weather")
except Exception as e:
logger.error(f"Error fetching weather: {e}")
+58
View File
@@ -0,0 +1,58 @@
"""Financial data fetchers — defense stocks and oil prices.
Uses yfinance for ticker data with concurrent execution for performance.
"""
import logging
import concurrent.futures
import yfinance as yf
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
def _fetch_single_ticker(symbol: str, period: str = "2d"):
"""Fetch a single yfinance ticker. Returns (symbol, data_dict) or (symbol, None)."""
try:
ticker = yf.Ticker(symbol)
hist = ticker.history(period=period)
if len(hist) >= 1:
current_price = hist['Close'].iloc[-1]
prev_close = hist['Close'].iloc[0] if len(hist) > 1 else current_price
change_percent = ((current_price - prev_close) / prev_close) * 100 if prev_close else 0
return symbol, {
"price": round(float(current_price), 2),
"change_percent": round(float(change_percent), 2),
"up": bool(change_percent >= 0)
}
except Exception as e:
logger.warning(f"Could not fetch data for {symbol}: {e}")
return symbol, None
@with_retry(max_retries=1, base_delay=1)
def fetch_defense_stocks():
tickers = ["RTX", "LMT", "NOC", "GD", "BA", "PLTR"]
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as pool:
results = pool.map(lambda t: _fetch_single_ticker(t, "2d"), tickers)
stocks_data = {sym: data for sym, data in results if data}
with _data_lock:
latest_data['stocks'] = stocks_data
_mark_fresh("stocks")
except Exception as e:
logger.error(f"Error fetching stocks: {e}")
@with_retry(max_retries=1, base_delay=1)
def fetch_oil_prices():
tickers = {"WTI Crude": "CL=F", "Brent Crude": "BZ=F"}
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as pool:
results = pool.map(lambda item: (_fetch_single_ticker(item[1], "5d")[1], item[0]), tickers.items())
oil_data = {name: data for data, name in results if data}
with _data_lock:
latest_data['oil'] = oil_data
_mark_fresh("oil")
except Exception as e:
logger.error(f"Error fetching oil: {e}")
+15 -12
View File
@@ -4,6 +4,7 @@ import re
import os
import time
import math
import json
import logging
import threading
import concurrent.futures
@@ -13,6 +14,7 @@ from cachetools import TTLCache
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.plane_alert import enrich_with_plane_alert, enrich_with_tracked_names
from services.fetchers.retry import with_retry
logger = logging.getLogger("services.data_fetcher")
@@ -139,7 +141,7 @@ def _fetch_supplemental_sources(seen_hex: set) -> list:
if res.status_code == 200:
data = res.json()
return data.get("ac", [])
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.debug(f"airplanes.live {region['name']} failed: {e}")
return []
@@ -153,7 +155,7 @@ def _fetch_supplemental_sources(seen_hex: set) -> list:
f["supplemental_source"] = "airplanes.live"
new_supplemental.append(f)
supplemental_hex.add(h)
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, OSError) as e:
logger.warning(f"airplanes.live supplemental fetch failed: {e}")
ap_count = len(new_supplemental)
@@ -172,10 +174,10 @@ def _fetch_supplemental_sources(seen_hex: set) -> list:
f["supplemental_source"] = "adsb.fi"
new_supplemental.append(f)
supplemental_hex.add(h)
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.debug(f"adsb.fi {region['name']} failed: {e}")
time.sleep(1.1)
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, OSError) as e:
logger.warning(f"adsb.fi supplemental fetch failed: {e}")
fi_count = len(new_supplemental) - ap_count
@@ -236,8 +238,8 @@ def fetch_routes_background(sampled):
"dest_loc": [dest_apt.get("lon", 0), dest_apt.get("lat", 0)],
}
time.sleep(0.25)
except Exception:
logger.debug("Route batch request failed")
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.debug(f"Route batch request failed: {e}")
finally:
with _routes_lock:
routes_fetch_in_progress = False
@@ -327,7 +329,7 @@ def _classify_and_publish(all_adsb_flights):
"aircraft_category": ac_category,
"nac_p": f.get("nac_p")
})
except Exception as loop_e:
except (ValueError, TypeError, KeyError, AttributeError) as loop_e:
logger.error(f"Flight interpolation error: {loop_e}")
continue
@@ -530,7 +532,7 @@ def _classify_and_publish(all_adsb_flights):
latest_data['gps_jamming'] = jamming_zones
if jamming_zones:
logger.info(f"GPS Jamming: {len(jamming_zones)} interference zones detected")
except Exception as e:
except (ValueError, TypeError, KeyError, ZeroDivisionError) as e:
logger.error(f"GPS Jamming detection error: {e}")
with _data_lock:
latest_data['gps_jamming'] = []
@@ -571,7 +573,7 @@ def _classify_and_publish(all_adsb_flights):
holding_count += 1
if holding_count:
logger.info(f"Holding patterns: {holding_count} aircraft circling")
except Exception as e:
except (ValueError, TypeError, KeyError, ZeroDivisionError) as e:
logger.error(f"Holding pattern detection error: {e}")
with _data_lock:
@@ -596,7 +598,7 @@ def _fetch_adsb_lol_regions():
if res.status_code == 200:
data = res.json()
return data.get("ac", [])
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.warning(f"Region fetch failed for lat={r['lat']}: {e}")
return []
@@ -663,7 +665,7 @@ def _enrich_with_opensky_and_supplemental(adsb_flights):
})
else:
logger.warning(f"OpenSky API {os_reg['name']} failed: {os_res.status_code}")
except Exception as ex:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as ex:
logger.error(f"OpenSky fetching error for {os_reg['name']}: {ex}")
cached_opensky_flights = new_opensky_flights
@@ -686,7 +688,7 @@ def _enrich_with_opensky_and_supplemental(adsb_flights):
seen_hex.add(h)
if gap_fill:
logger.info(f"Gap-fill: added {len(gap_fill)} aircraft to pipeline")
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, OSError) as e:
logger.warning(f"Supplemental source fetch failed (non-fatal): {e}")
# Re-publish with enriched data
@@ -697,6 +699,7 @@ def _enrich_with_opensky_and_supplemental(adsb_flights):
logger.error(f"OpenSky/supplemental enrichment error: {e}")
@with_retry(max_retries=1, base_delay=1)
def fetch_flights():
"""Two-phase flight fetching:
Phase 1 (fast): Fetch adsb.lol → classify → publish immediately (~3-5s)
+161
View File
@@ -0,0 +1,161 @@
"""Ship and geopolitics fetchers — AIS vessels, carriers, frontlines, GDELT, LiveUAmap."""
import csv
import io
import math
import logging
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Ships (AIS + Carriers)
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=1)
def fetch_ships():
"""Fetch real-time AIS vessel data and combine with OSINT carrier positions."""
from services.ais_stream import get_ais_vessels
from services.carrier_tracker import get_carrier_positions
ships = []
try:
carriers = get_carrier_positions()
ships.extend(carriers)
except Exception as e:
logger.error(f"Carrier tracker error (non-fatal): {e}")
carriers = []
try:
ais_vessels = get_ais_vessels()
ships.extend(ais_vessels)
except Exception as e:
logger.error(f"AIS stream error (non-fatal): {e}")
ais_vessels = []
# Enrich ships with yacht alert data (tracked superyachts)
from services.fetchers.yacht_alert import enrich_with_yacht_alert
for ship in ships:
enrich_with_yacht_alert(ship)
logger.info(f"Ships: {len(carriers)} carriers + {len(ais_vessels)} AIS vessels")
with _data_lock:
latest_data['ships'] = ships
_mark_fresh("ships")
# ---------------------------------------------------------------------------
# Airports (ourairports.com)
# ---------------------------------------------------------------------------
cached_airports = []
def find_nearest_airport(lat, lng, max_distance_nm=200):
"""Find the nearest large airport to a given lat/lng using haversine distance."""
if not cached_airports:
return None
best = None
best_dist = float('inf')
lat_r = math.radians(lat)
lng_r = math.radians(lng)
for apt in cached_airports:
apt_lat_r = math.radians(apt['lat'])
apt_lng_r = math.radians(apt['lng'])
dlat = apt_lat_r - lat_r
dlng = apt_lng_r - lng_r
a = math.sin(dlat / 2) ** 2 + math.cos(lat_r) * math.cos(apt_lat_r) * math.sin(dlng / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
dist_nm = 3440.065 * c
if dist_nm < best_dist:
best_dist = dist_nm
best = apt
if best and best_dist <= max_distance_nm:
return {
"iata": best['iata'], "name": best['name'],
"lat": best['lat'], "lng": best['lng'],
"distance_nm": round(best_dist, 1)
}
return None
def fetch_airports():
global cached_airports
if not cached_airports:
logger.info("Downloading global airports database from ourairports.com...")
try:
url = "https://ourairports.com/data/airports.csv"
response = fetch_with_curl(url, timeout=15)
if response.status_code == 200:
f = io.StringIO(response.text)
reader = csv.DictReader(f)
for row in reader:
if row['type'] == 'large_airport' and row['iata_code']:
cached_airports.append({
"id": row['ident'],
"name": row['name'],
"iata": row['iata_code'],
"lat": float(row['latitude_deg']),
"lng": float(row['longitude_deg']),
"type": "airport"
})
logger.info(f"Loaded {len(cached_airports)} large airports into cache.")
except Exception as e:
logger.error(f"Error fetching airports: {e}")
with _data_lock:
latest_data['airports'] = cached_airports
# ---------------------------------------------------------------------------
# Geopolitics & LiveUAMap
# ---------------------------------------------------------------------------
@with_retry(max_retries=1, base_delay=2)
def fetch_frontlines():
"""Fetch Ukraine frontline data (fast — single GitHub API call)."""
try:
from services.geopolitics import fetch_ukraine_frontlines
frontlines = fetch_ukraine_frontlines()
if frontlines:
with _data_lock:
latest_data['frontlines'] = frontlines
_mark_fresh("frontlines")
except Exception as e:
logger.error(f"Error fetching frontlines: {e}")
@with_retry(max_retries=1, base_delay=3)
def fetch_gdelt():
"""Fetch GDELT global military incidents (slow — downloads 32 ZIP files)."""
try:
from services.geopolitics import fetch_global_military_incidents
gdelt = fetch_global_military_incidents()
if gdelt is not None:
with _data_lock:
latest_data['gdelt'] = gdelt
_mark_fresh("gdelt")
except Exception as e:
logger.error(f"Error fetching GDELT: {e}")
def fetch_geopolitics():
"""Legacy wrapper — runs both sequentially. Used by recurring scheduler."""
fetch_frontlines()
fetch_gdelt()
def update_liveuamap():
logger.info("Running scheduled Liveuamap scraper...")
try:
from services.liveuamap_scraper import fetch_liveuamap
res = fetch_liveuamap()
if res:
with _data_lock:
latest_data['liveuamap'] = res
_mark_fresh("liveuamap")
except Exception as e:
logger.error(f"Liveuamap scraper error: {e}")
+176
View File
@@ -0,0 +1,176 @@
"""Infrastructure fetchers — internet outages (IODA), data centers, CCTV, KiwiSDR."""
import json
import time
import heapq
import logging
from pathlib import Path
from cachetools import TTLCache
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Internet Outages (IODA — Georgia Tech)
# ---------------------------------------------------------------------------
_region_geocode_cache: TTLCache = TTLCache(maxsize=2000, ttl=86400)
def _geocode_region(region_name: str, country_name: str) -> tuple:
"""Geocode a region using OpenStreetMap Nominatim (cached, respects rate limit)."""
cache_key = f"{region_name}|{country_name}"
if cache_key in _region_geocode_cache:
return _region_geocode_cache[cache_key]
try:
import urllib.parse
query = urllib.parse.quote(f"{region_name}, {country_name}")
url = f"https://nominatim.openstreetmap.org/search?q={query}&format=json&limit=1"
response = fetch_with_curl(url, timeout=8, headers={"User-Agent": "ShadowBroker-OSINT/1.0"})
if response.status_code == 200:
results = response.json()
if results:
lat = float(results[0]["lat"])
lon = float(results[0]["lon"])
_region_geocode_cache[cache_key] = (lat, lon)
return (lat, lon)
except Exception:
pass
_region_geocode_cache[cache_key] = None
return None
@with_retry(max_retries=1, base_delay=1)
def fetch_internet_outages():
"""Fetch regional internet outage alerts from IODA (Georgia Tech)."""
RELIABLE_DATASOURCES = {"bgp", "ping-slash24"}
outages = []
try:
now = int(time.time())
start = now - 86400
url = f"https://api.ioda.inetintel.cc.gatech.edu/v2/outages/alerts?from={start}&until={now}&limit=500"
response = fetch_with_curl(url, timeout=15)
if response.status_code == 200:
data = response.json()
alerts = data.get("data", [])
region_outages = {}
for alert in alerts:
entity = alert.get("entity", {})
etype = entity.get("type", "")
level = alert.get("level", "")
if level == "normal" or etype != "region":
continue
datasource = alert.get("datasource", "")
if datasource not in RELIABLE_DATASOURCES:
continue
code = entity.get("code", "")
name = entity.get("name", "")
attrs = entity.get("attrs", {})
country_code = attrs.get("country_code", "")
country_name = attrs.get("country_name", "")
value = alert.get("value", 0)
history_value = alert.get("historyValue", 0)
severity = 0
if history_value and history_value > 0:
severity = round((1 - value / history_value) * 100)
severity = max(0, min(severity, 100))
if severity < 10:
continue
if code not in region_outages or severity > region_outages[code]["severity"]:
region_outages[code] = {
"region_code": code,
"region_name": name,
"country_code": country_code,
"country_name": country_name,
"level": level,
"datasource": datasource,
"severity": severity,
}
geocoded = []
for rcode, r in region_outages.items():
coords = _geocode_region(r["region_name"], r["country_name"])
if coords:
r["lat"] = coords[0]
r["lng"] = coords[1]
geocoded.append(r)
outages = heapq.nlargest(100, geocoded, key=lambda x: x["severity"])
logger.info(f"Internet outages: {len(outages)} regions affected")
except Exception as e:
logger.error(f"Error fetching internet outages: {e}")
with _data_lock:
latest_data["internet_outages"] = outages
if outages:
_mark_fresh("internet_outages")
# ---------------------------------------------------------------------------
# Data Centers (local geocoded JSON)
# ---------------------------------------------------------------------------
_DC_GEOCODED_PATH = Path(__file__).parent.parent.parent / "data" / "datacenters_geocoded.json"
def fetch_datacenters():
"""Load geocoded data centers (5K+ street-level precise locations)."""
dcs = []
try:
if not _DC_GEOCODED_PATH.exists():
logger.warning(f"Geocoded DC file not found: {_DC_GEOCODED_PATH}")
return
raw = json.loads(_DC_GEOCODED_PATH.read_text(encoding="utf-8"))
for entry in raw:
lat = entry.get("lat")
lng = entry.get("lng")
if lat is None or lng is None:
continue
if not (-90 <= lat <= 90 and -180 <= lng <= 180):
continue
dcs.append({
"name": entry.get("name", "Unknown"),
"company": entry.get("company", ""),
"street": entry.get("street", ""),
"city": entry.get("city", ""),
"country": entry.get("country", ""),
"zip": entry.get("zip", ""),
"lat": lat, "lng": lng,
})
logger.info(f"Data centers: {len(dcs)} geocoded locations loaded")
except Exception as e:
logger.error(f"Error loading data centers: {e}")
with _data_lock:
latest_data["datacenters"] = dcs
if dcs:
_mark_fresh("datacenters")
# ---------------------------------------------------------------------------
# CCTV Cameras
# ---------------------------------------------------------------------------
def fetch_cctv():
try:
from services.cctv_pipeline import get_all_cameras
cameras = get_all_cameras()
with _data_lock:
latest_data["cctv"] = cameras
_mark_fresh("cctv")
except Exception as e:
logger.error(f"Error fetching cctv from DB: {e}")
with _data_lock:
latest_data["cctv"] = []
# ---------------------------------------------------------------------------
# KiwiSDR Receivers
# ---------------------------------------------------------------------------
@with_retry(max_retries=2, base_delay=2)
def fetch_kiwisdr():
try:
from services.kiwisdr_fetcher import fetch_kiwisdr_nodes
nodes = fetch_kiwisdr_nodes()
with _data_lock:
latest_data["kiwisdr"] = nodes
_mark_fresh("kiwisdr")
except Exception as e:
logger.error(f"Error fetching KiwiSDR nodes: {e}")
with _data_lock:
latest_data["kiwisdr"] = []
+2
View File
@@ -1,5 +1,7 @@
"""Military flight tracking and UAV detection from ADS-B data."""
import json
import logging
import requests
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.plane_alert import enrich_with_plane_alert
+4 -1
View File
@@ -2,9 +2,11 @@
import re
import logging
import concurrent.futures
import requests
import feedparser
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
from services.fetchers.retry import with_retry
logger = logging.getLogger("services.data_fetcher")
@@ -89,6 +91,7 @@ _KEYWORD_COORDS = {
}
@with_retry(max_retries=1, base_delay=2)
def fetch_news():
from services.news_feed_config import get_feeds
feed_config = get_feeds()
@@ -103,7 +106,7 @@ def fetch_news():
try:
xml_data = fetch_with_curl(url, timeout=10).text
return source_name, feedparser.parse(xml_data)
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, OSError) as e:
logger.warning(f"Feed {source_name} failed: {e}")
return source_name, None
+49
View File
@@ -0,0 +1,49 @@
"""Retry decorator with exponential backoff + jitter for network-bound fetcher functions.
Usage:
@with_retry(max_retries=3, base_delay=2)
def fetch_something():
...
"""
import time
import random
import logging
import functools
logger = logging.getLogger(__name__)
def with_retry(max_retries: int = 3, base_delay: float = 2.0, max_delay: float = 30.0):
"""Decorator: retries the wrapped function on any exception with exponential backoff + jitter.
Args:
max_retries: Number of retry attempts after the initial failure.
base_delay: Base delay (seconds) for exponential backoff (2 → 4 → 8 …).
max_delay: Cap on the delay between retries.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exc = None
for attempt in range(1 + max_retries):
try:
return func(*args, **kwargs)
except Exception as exc:
last_exc = exc
if attempt < max_retries:
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.25)
total = delay + jitter
logger.warning(
"%s failed (attempt %d/%d): %s — retrying in %.1fs",
func.__name__, attempt + 1, max_retries + 1, exc, total,
)
time.sleep(total)
else:
logger.error(
"%s failed after %d attempts: %s",
func.__name__, max_retries + 1, exc,
)
raise last_exc # type: ignore[misc]
return wrapper
return decorator
+11 -10
View File
@@ -11,6 +11,7 @@ import time
import json
import re
import logging
import requests
from pathlib import Path
from datetime import datetime, timedelta
from sgp4.api import Satrec, WGS72, jday
@@ -53,7 +54,7 @@ def _load_sat_cache():
return data
else:
logger.info(f"Satellites: Disk cache is {age_hours:.0f}h old, will try fresh fetch")
except Exception as e:
except (IOError, OSError, json.JSONDecodeError, ValueError, KeyError) as e:
logger.warning(f"Satellites: Failed to load disk cache: {e}")
return None
@@ -65,7 +66,7 @@ def _save_sat_cache(data):
json.dump(data, f)
_save_cache_meta()
logger.info(f"Satellites: Saved {len(data)} records to disk cache")
except Exception as e:
except (IOError, OSError) as e:
logger.warning(f"Satellites: Failed to save disk cache: {e}")
def _load_cache_meta():
@@ -75,7 +76,7 @@ def _load_cache_meta():
with open(_SAT_CACHE_META_PATH, "r") as f:
meta = json.load(f)
_sat_gp_cache["last_modified"] = meta.get("last_modified")
except Exception:
except (IOError, OSError, json.JSONDecodeError, ValueError, KeyError):
pass
def _save_cache_meta():
@@ -83,7 +84,7 @@ def _save_cache_meta():
try:
with open(_SAT_CACHE_META_PATH, "w") as f:
json.dump({"last_modified": _sat_gp_cache.get("last_modified")}, f)
except Exception:
except (IOError, OSError):
pass
@@ -163,7 +164,7 @@ def _parse_tle_to_gp(name, norad_id, line1, line2):
"BSTAR": bstar,
"EPOCH": epoch_dt.strftime("%Y-%m-%dT%H:%M:%S"),
}
except Exception:
except (ValueError, TypeError, IndexError, KeyError):
return None
@@ -196,7 +197,7 @@ def _fetch_satellites_from_tle_api():
seen_ids.add(sat_id)
all_results.append(gp)
time.sleep(1) # Polite delay between requests
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.debug(f"TLE fallback search '{term}' failed: {e}")
return all_results
@@ -238,7 +239,7 @@ def fetch_satellites():
_save_sat_cache(gp_data)
logger.info(f"Satellites: Downloaded {len(gp_data)} GP records from CelesTrak")
break
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.warning(f"Satellites: Failed to fetch from {url}: {e}")
continue
@@ -252,7 +253,7 @@ def fetch_satellites():
_sat_gp_cache["source"] = "tle_api"
_save_sat_cache(fallback_data)
logger.info(f"Satellites: Got {len(fallback_data)} records from TLE fallback API")
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, OSError) as e:
logger.error(f"Satellites: TLE fallback also failed: {e}")
if _sat_gp_cache["data"] is None:
@@ -375,11 +376,11 @@ def fetch_satellites():
'BSTAR', 'EPOCH', 'tle1', 'tle2'):
s.pop(k, None)
sats.append(s)
except Exception:
except (ValueError, TypeError, KeyError, AttributeError, ZeroDivisionError):
continue
logger.info(f"Satellites: {len(classified)} classified, {len(sats)} positioned")
except Exception as e:
except (requests.RequestException, ConnectionError, TimeoutError, ValueError, KeyError, json.JSONDecodeError, OSError) as e:
logger.error(f"Error fetching satellites: {e}")
if sats:
with _data_lock:
+62
View File
@@ -0,0 +1,62 @@
"""Yacht-Alert DB — load and enrich AIS vessels with tracked yacht metadata."""
import os
import json
import logging
logger = logging.getLogger("services.data_fetcher")
# Category -> color mapping
_CATEGORY_COLOR: dict[str, str] = {
"Tech Billionaire": "#FF69B4",
"Celebrity / Mogul": "#FF69B4",
"Oligarch Watch": "#FF2020",
}
def _category_to_color(cat: str) -> str:
"""Map category to display color. Defaults to hot pink."""
return _CATEGORY_COLOR.get(cat, "#FF69B4")
_YACHT_ALERT_DB: dict = {}
def _load_yacht_alert_db():
"""Load yacht_alert_db.json into memory at import time."""
global _YACHT_ALERT_DB
json_path = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"data", "yacht_alert_db.json"
)
if not os.path.exists(json_path):
logger.warning(f"Yacht-Alert DB not found at {json_path}")
return
try:
with open(json_path, "r", encoding="utf-8") as fh:
raw = json.load(fh)
for mmsi_str, info in raw.items():
info["color"] = _category_to_color(info.get("category", ""))
_YACHT_ALERT_DB[mmsi_str] = info
logger.info(f"Yacht-Alert DB loaded: {len(_YACHT_ALERT_DB)} vessels")
except (IOError, OSError, json.JSONDecodeError, ValueError, KeyError) as e:
logger.error(f"Failed to load Yacht-Alert DB: {e}")
_load_yacht_alert_db()
def enrich_with_yacht_alert(ship: dict) -> dict:
"""If ship's MMSI is in the Yacht-Alert DB, attach owner/alert metadata."""
mmsi = str(ship.get("mmsi", "")).strip()
if mmsi and mmsi in _YACHT_ALERT_DB:
info = _YACHT_ALERT_DB[mmsi]
ship["yacht_alert"] = True
ship["yacht_owner"] = info["owner"]
ship["yacht_name"] = info["name"]
ship["yacht_category"] = info["category"]
ship["yacht_color"] = info["color"]
ship["yacht_builder"] = info.get("builder", "")
ship["yacht_length"] = info.get("length_m", 0)
ship["yacht_year"] = info.get("year", 0)
ship["yacht_link"] = info.get("link", "")
return ship
+46 -36
View File
@@ -3,6 +3,7 @@ import json
import subprocess
import shutil
import time
import threading
import requests
from urllib.parse import urlparse
from requests.adapters import HTTPAdapter
@@ -30,6 +31,9 @@ _DOMAIN_FAIL_TTL = 300 # 5 minutes
_circuit_breaker: dict[str, float] = {}
_CIRCUIT_BREAKER_TTL = 120 # 2 minutes
# Lock protecting _domain_fail_cache and _circuit_breaker mutations
_cb_lock = threading.Lock()
class _DummyResponse:
"""Minimal response object matching requests.Response interface."""
def __init__(self, status_code, text):
@@ -61,13 +65,14 @@ def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None)
domain = urlparse(url).netloc
# Circuit breaker: if domain failed completely <2min ago, fail fast
if domain in _circuit_breaker and (time.time() - _circuit_breaker[domain]) < _CIRCUIT_BREAKER_TTL:
raise Exception(f"Circuit breaker open for {domain} (failed <{_CIRCUIT_BREAKER_TTL}s ago)")
with _cb_lock:
if domain in _circuit_breaker and (time.time() - _circuit_breaker[domain]) < _CIRCUIT_BREAKER_TTL:
raise Exception(f"Circuit breaker open for {domain} (failed <{_CIRCUIT_BREAKER_TTL}s ago)")
# Check if this domain recently failed with requests — skip straight to curl
if domain in _domain_fail_cache and (time.time() - _domain_fail_cache[domain]) < _DOMAIN_FAIL_TTL:
pass # Fall through to curl below
else:
with _cb_lock:
_skip_requests = domain in _domain_fail_cache and (time.time() - _domain_fail_cache[domain]) < _DOMAIN_FAIL_TTL
if not _skip_requests:
try:
# Use a short connect timeout (3s) so firewall blocks fail fast,
# but allow the full timeout for reading the response body.
@@ -78,42 +83,47 @@ def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None)
res = _session.get(url, timeout=req_timeout, headers=default_headers)
res.raise_for_status()
# Clear failure caches on success
_domain_fail_cache.pop(domain, None)
_circuit_breaker.pop(domain, None)
with _cb_lock:
_domain_fail_cache.pop(domain, None)
_circuit_breaker.pop(domain, None)
return res
except (requests.RequestException, ConnectionError, TimeoutError, OSError) as e:
logger.warning(f"Python requests failed for {url} ({e}), falling back to bash curl...")
_domain_fail_cache[domain] = time.time()
with _cb_lock:
_domain_fail_cache[domain] = time.time()
# Build curl as argument list — never pass through shell to prevent injection
_CURL_PATH = shutil.which("curl") or "curl"
cmd = [_CURL_PATH, "-s", "-w", "\n%{http_code}"]
for k, v in default_headers.items():
cmd += ["-H", f"{k}: {v}"]
if method == "POST" and json_data:
cmd += ["-X", "POST", "-H", "Content-Type: application/json",
"--data-binary", "@-"]
cmd.append(url)
# Curl fallback — reached from both _skip_requests and requests-exception paths
_CURL_PATH = shutil.which("curl") or "curl"
cmd = [_CURL_PATH, "-s", "-w", "\n%{http_code}"]
for k, v in default_headers.items():
cmd += ["-H", f"{k}: {v}"]
if method == "POST" and json_data:
cmd += ["-X", "POST", "-H", "Content-Type: application/json",
"--data-binary", "@-"]
cmd.append(url)
try:
stdin_data = json.dumps(json_data) if (method == "POST" and json_data) else None
res = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout + 5,
input=stdin_data
)
if res.returncode == 0 and res.stdout.strip():
# Parse HTTP status code from -w output (last line)
lines = res.stdout.rstrip().rsplit("\n", 1)
body = lines[0] if len(lines) > 1 else res.stdout
http_code = int(lines[-1]) if len(lines) > 1 and lines[-1].strip().isdigit() else 200
if http_code < 400:
try:
stdin_data = json.dumps(json_data) if (method == "POST" and json_data) else None
res = subprocess.run(
cmd, capture_output=True, text=True, timeout=timeout + 5,
input=stdin_data
)
if res.returncode == 0 and res.stdout.strip():
# Parse HTTP status code from -w output (last line)
lines = res.stdout.rstrip().rsplit("\n", 1)
body = lines[0] if len(lines) > 1 else res.stdout
http_code = int(lines[-1]) if len(lines) > 1 and lines[-1].strip().isdigit() else 200
if http_code < 400:
with _cb_lock:
_circuit_breaker.pop(domain, None) # Clear circuit breaker on success
return _DummyResponse(http_code, body)
else:
logger.error(f"bash curl fallback failed: exit={res.returncode} stderr={res.stderr[:200]}")
return _DummyResponse(http_code, body)
else:
logger.error(f"bash curl fallback failed: exit={res.returncode} stderr={res.stderr[:200]}")
with _cb_lock:
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
except (subprocess.SubprocessError, ConnectionError, TimeoutError, OSError) as curl_e:
logger.error(f"bash curl fallback exception: {curl_e}")
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
except (subprocess.SubprocessError, ConnectionError, TimeoutError, OSError) as curl_e:
logger.error(f"bash curl fallback exception: {curl_e}")
with _cb_lock:
_circuit_breaker[domain] = time.time()
return _DummyResponse(500, "")
+26
View File
@@ -0,0 +1,26 @@
from pydantic import BaseModel
from typing import Optional, Dict, List, Any
class HealthResponse(BaseModel):
status: str
last_updated: Optional[str] = None
sources: Dict[str, int]
freshness: Dict[str, str]
uptime_seconds: int
class RefreshResponse(BaseModel):
status: str
class AisFeedResponse(BaseModel):
status: str
ingested: int = 0
class RouteResponse(BaseModel):
orig_loc: Optional[list] = None
dest_loc: Optional[list] = None
origin_name: Optional[str] = None
dest_name: Optional[str] = None