Files
Shadowbroker/backend/services/fetchers/meshtastic_map.py
T
BigBodyCobain 71a9d9e144 [security] Close post-#227 control-surface and fetcher gaps
PR #227 hardened most Wormhole/Infonet control surfaces behind
require_local_operator and made the CrowdThreat fetcher opt-in. An
audit of the codebase against that PR's stated goals turned up four
classes of gap that the original change missed:

1. Two operator-only endpoints were left unprotected:
   - POST /api/wormhole/join: calls bootstrap_wormhole_identity() and
     flips the node into Tor mode, exactly the surface #227 hardened
     on /api/wormhole/identity/bootstrap.
   - POST /api/sigint/transmit: relays APRS-IS packets over radio
     using operator-supplied credentials. Anything that reached the
     API could transmit on the operator's authority.

   Both now require_local_operator. test_control_surface_auth.py
   extended with regression coverage for both.

2. Five third-party fetchers were still default-on, phoning home to
   politically/commercially sensitive upstreams on every poll cycle:
   - fimi.py            -> euvsdisinfo.eu        -> FIMI_ENABLED
   - prediction_markets -> Polymarket + Kalshi   -> PREDICTION_MARKETS_ENABLED
   - financial.py       -> Finnhub / yfinance    -> FINANCIAL_ENABLED or FINNHUB_API_KEY
   - nuforc_enrichment  -> huggingface.co        -> NUFORC_ENABLED
   - news.py            -> configured RSS feeds  -> NEWS_ENABLED (default on, kill switch)

   Same CrowdThreat-style pattern: explicit env-var opt-in, empty
   the data slot and mark_fresh when disabled. New regression test
   file test_third_party_fetchers_opt_in.py asserts each fetcher's
   network entry point is not called when its gate is off.

3. The outbound User-Agent leaked both the operator's personal email
   and a fork-specific GitHub URL on every fetcher request. Consolidated
   to a single DEFAULT_USER_AGENT in network_utils.py, project-generic
   by default (no contact info), overridable via SHADOWBROKER_USER_AGENT
   for operators who want to identify themselves (e.g. for Nominatim or
   weather.gov usage-policy compliance). Six call sites updated; the
   Nominatim-specific override is preserved.

4. The same generic UA now also flows through the peer prekey lookup
   in mesh_wormhole_prekey.py, so DM first-contact requests no longer
   identify the caller as a Shadowbroker fork to the peer being
   queried.

.env.example updated to document all new opt-in env vars.

Tests: backend/tests/test_control_surface_auth.py (extended),
       backend/tests/test_crowdthreat_opt_in.py (unchanged, still passes),
       backend/tests/test_third_party_fetchers_opt_in.py (new, 7 tests).
All 31 tests pass.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-18 13:53:33 -06:00

268 lines
9.7 KiB
Python

"""Meshtastic Map fetcher — pulls global node positions from meshtastic.liamcottle.net.
Bootstrap + top-up strategy:
- On startup: fetch all nodes with positions to seed the map
- Every 4 hours: refresh from the API
- Persists to JSON cache so data survives restarts
- MQTT bridge provides real-time updates between API fetches
API source: https://meshtastic.liamcottle.net/api/v1/nodes (community project by Liam Cottle)
Polling interval deliberately kept low (4h) to be respectful to the service.
"""
import json
import logging
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
import requests
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
logger = logging.getLogger("services.data_fetcher")
_API_URL = "https://meshtastic.liamcottle.net/api/v1/nodes"
_CACHE_FILE = Path(__file__).resolve().parent.parent.parent / "data" / "meshtastic_nodes_cache.json"
_FETCH_TIMEOUT = 90 # seconds — response is ~37MB, needs time on slow connections
_MAX_AGE_HOURS = 24 # discard nodes not seen within this window
# Skip network fetch if cached data is fresher than this — the API is a
# one-person hobby service, so we prefer stale data over hammering it.
_CACHE_TRUST_HOURS = 20
# Track when we last fetched so the frontend can show staleness
_last_fetch_ts: float = 0.0
def _parse_node(node: dict) -> dict | None:
"""Convert an API node into a slim signal-like dict."""
lat_i = node.get("latitude")
lng_i = node.get("longitude")
if lat_i is None or lng_i is None:
return None
lat = lat_i / 1e7
lng = lng_i / 1e7
# Basic validity
if not (-90 <= lat <= 90 and -180 <= lng <= 180):
return None
if abs(lat) < 0.1 and abs(lng) < 0.1:
return None
callsign = node.get("node_id_hex", "")
if not callsign:
nid = node.get("node_id")
callsign = f"!{int(nid):08x}" if nid else ""
if not callsign:
return None
# Position age from API — reject nodes older than _MAX_AGE_HOURS
pos_updated = node.get("position_updated_at") or node.get("updated_at", "")
if pos_updated:
try:
ts = datetime.fromisoformat(pos_updated.replace("Z", "+00:00"))
if datetime.now(timezone.utc) - ts > timedelta(hours=_MAX_AGE_HOURS):
return None
except (ValueError, TypeError):
pass
else:
return None # no timestamp at all — skip
return {
"callsign": callsign[:20],
"lat": round(lat, 5),
"lng": round(lng, 5),
"source": "meshtastic",
"confidence": 0.5,
"timestamp": pos_updated,
"position_updated_at": pos_updated,
"from_api": True,
"long_name": (node.get("long_name") or "")[:40],
"short_name": (node.get("short_name") or "")[:4],
"hardware": node.get("hardware_model_name", ""),
"role": node.get("role_name", ""),
"battery_level": node.get("battery_level"),
"voltage": node.get("voltage"),
"altitude": node.get("altitude"),
}
def _is_fresh(node: dict) -> bool:
"""Check if a cached node is still within the _MAX_AGE_HOURS window."""
ts_str = node.get("position_updated_at") or node.get("timestamp", "")
if not ts_str:
return False
try:
ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
return datetime.now(timezone.utc) - ts <= timedelta(hours=_MAX_AGE_HOURS)
except (ValueError, TypeError):
return False
def _load_cache() -> list[dict]:
"""Load cached nodes from disk, filtering out stale entries."""
if _CACHE_FILE.exists():
try:
data = json.loads(_CACHE_FILE.read_text(encoding="utf-8"))
nodes = data.get("nodes", [])
fresh = [n for n in nodes if _is_fresh(n)]
logger.info(f"Meshtastic map cache loaded: {len(fresh)} fresh / {len(nodes)} total")
return fresh
except Exception as e:
logger.warning(f"Failed to load meshtastic cache: {e}")
return []
def _save_cache(nodes: list[dict], fetch_ts: float):
"""Persist processed nodes to disk."""
try:
_CACHE_FILE.parent.mkdir(parents=True, exist_ok=True)
_CACHE_FILE.write_text(
json.dumps(
{
"fetched_at": fetch_ts,
"count": len(nodes),
"nodes": nodes,
}
),
encoding="utf-8",
)
except Exception as e:
logger.warning(f"Failed to save meshtastic cache: {e}")
def fetch_meshtastic_nodes():
"""Fetch global Meshtastic node positions from Liam Cottle's map API.
Stores processed nodes in latest_data["meshtastic_map_nodes"].
Persists to JSON cache for restart resilience.
"""
from services.fetchers._store import is_any_active
if not is_any_active("sigint_meshtastic"):
return
global _last_fetch_ts
# Trust a recent cache on disk — avoids hammering the upstream HTTP API
# when every install polls on roughly the same cadence.
try:
if _CACHE_FILE.exists():
mtime = _CACHE_FILE.stat().st_mtime
if time.time() - mtime < _CACHE_TRUST_HOURS * 3600:
# If memory is empty (cold start), hydrate from cache and skip fetch.
with _data_lock:
has_memory = bool(latest_data.get("meshtastic_map_nodes"))
if not has_memory:
cached = _load_cache()
if cached:
with _data_lock:
latest_data["meshtastic_map_nodes"] = cached
latest_data["meshtastic_map_fetched_at"] = mtime
_mark_fresh("meshtastic_map")
logger.info(
"Meshtastic map: cache fresh (<%.0fh), skipping network fetch",
_CACHE_TRUST_HOURS,
)
return
else:
logger.info(
"Meshtastic map: cache fresh (<%.0fh), skipping network fetch",
_CACHE_TRUST_HOURS,
)
return
except Exception as e:
logger.debug(f"Meshtastic cache freshness check failed: {e}")
# Build a polite User-Agent. Include the operator callsign when set so
# the upstream service can correlate per-install traffic if needed.
try:
from services.config import get_settings
callsign = str(getattr(get_settings(), "MESHTASTIC_OPERATOR_CALLSIGN", "") or "").strip()
except Exception:
callsign = ""
from services.network_utils import DEFAULT_USER_AGENT
ua_base = f"{DEFAULT_USER_AGENT}; 24h polling"
user_agent = f"{ua_base}; node={callsign}" if callsign else ua_base
try:
logger.info("Fetching Meshtastic map nodes from API...")
resp = requests.get(
_API_URL,
timeout=_FETCH_TIMEOUT,
headers={
"User-Agent": user_agent,
"Accept": "application/json",
},
)
resp.raise_for_status()
raw = resp.json()
raw_nodes = raw.get("nodes", []) if isinstance(raw, dict) else raw
# Parse and filter to only nodes with valid positions
parsed = []
for node in raw_nodes:
sig = _parse_node(node)
if sig:
parsed.append(sig)
_last_fetch_ts = time.time()
_save_cache(parsed, _last_fetch_ts)
with _data_lock:
latest_data["meshtastic_map_nodes"] = parsed
latest_data["meshtastic_map_fetched_at"] = _last_fetch_ts
try:
from services.fetchers.sigint import refresh_sigint_snapshot
refresh_sigint_snapshot()
except Exception as exc:
logger.debug(f"Meshtastic map: SIGINT snapshot refresh skipped: {exc}")
logger.info(
f"Meshtastic map: {len(parsed)} nodes with positions " f"(from {len(raw_nodes)} total)"
)
except Exception as e:
logger.error(f"Meshtastic map fetch failed: {e}")
# Fall back to cache if available and we have nothing in memory
with _data_lock:
if not latest_data.get("meshtastic_map_nodes"):
cached = _load_cache()
if cached:
latest_data["meshtastic_map_nodes"] = cached
latest_data["meshtastic_map_fetched_at"] = (
_CACHE_FILE.stat().st_mtime if _CACHE_FILE.exists() else 0
)
logger.info(
f"Meshtastic map: using {len(cached)} cached nodes (API unavailable)"
)
try:
from services.fetchers.sigint import refresh_sigint_snapshot
refresh_sigint_snapshot()
except Exception as exc:
logger.debug(f"Meshtastic map cache: SIGINT snapshot refresh skipped: {exc}")
_mark_fresh("meshtastic_map")
def load_meshtastic_cache_if_available():
"""On startup, load cached nodes immediately (before first API fetch)."""
global _last_fetch_ts
cached = _load_cache()
if cached:
with _data_lock:
latest_data["meshtastic_map_nodes"] = cached
_last_fetch_ts = _CACHE_FILE.stat().st_mtime if _CACHE_FILE.exists() else 0
latest_data["meshtastic_map_fetched_at"] = _last_fetch_ts
try:
from services.fetchers.sigint import refresh_sigint_snapshot
refresh_sigint_snapshot()
except Exception as exc:
logger.debug(f"Meshtastic preload: SIGINT snapshot refresh skipped: {exc}")
logger.info(f"Meshtastic map: preloaded {len(cached)} nodes from cache")