mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-06-03 21:08:13 +02:00
Compare commits
12 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a2f5d62926 | |||
| 5e0b2c037e | |||
| febcce9125 | |||
| 31ebcb5cd9 | |||
| b3fca3dc18 | |||
| 401f114e4f | |||
| 79b39e8985 | |||
| c3e38621fc | |||
| 9ef02dd06f | |||
| ba39d3b9aa | |||
| f91ddcf38b | |||
| 49151d8b9f |
@@ -11,6 +11,13 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
|
||||
|
||||
# ── Optional ───────────────────────────────────────────────────
|
||||
|
||||
# AISHub REST fallback. Used when stream.aisstream.io is unreachable
|
||||
# (e.g. their cert expires or server goes offline). Free tier requires
|
||||
# registration at https://www.aishub.net/api. Poll cadence defaults to
|
||||
# 20 min to stay courteous; tunable via AISHUB_POLL_INTERVAL_MINUTES.
|
||||
# AISHUB_USERNAME=
|
||||
# AISHUB_POLL_INTERVAL_MINUTES=20
|
||||
|
||||
# Override allowed CORS origins (comma-separated). Defaults to localhost + LAN auto-detect.
|
||||
# CORS_ORIGINS=http://192.168.1.50:3000,https://my-domain.com
|
||||
|
||||
|
||||
+59
-10
@@ -1417,6 +1417,29 @@ def _peer_sync_response(peer_url: str, body: dict[str, Any]) -> dict[str, Any]:
|
||||
proxy = f"socks5h://127.0.0.1:{socks_port}"
|
||||
kwargs["proxies"] = {"http": proxy, "https": proxy}
|
||||
response = _requests.post(f"{normalized}/api/mesh/infonet/sync", **kwargs)
|
||||
# HTTP 429 must be surfaced as a typed exception carrying the
|
||||
# Retry-After value, so finish_sync can honor it and stop hammering
|
||||
# the upstream. Pre-fix this path just stringified the status into
|
||||
# a ValueError, which finish_sync then ignored — keeping the
|
||||
# upstream's rate-limit bucket full indefinitely.
|
||||
if response.status_code == 429:
|
||||
from services.mesh.mesh_infonet_sync_support import (
|
||||
PeerSyncRateLimited,
|
||||
parse_retry_after_header,
|
||||
)
|
||||
|
||||
retry_after_s = parse_retry_after_header(
|
||||
response.headers.get("Retry-After", "") or "",
|
||||
)
|
||||
try:
|
||||
body_text = response.text[:200]
|
||||
except Exception:
|
||||
body_text = ""
|
||||
raise PeerSyncRateLimited(
|
||||
f"HTTP 429 from {normalized} (retry_after={retry_after_s}s): {body_text}",
|
||||
retry_after_s=retry_after_s,
|
||||
status=429,
|
||||
)
|
||||
try:
|
||||
payload = response.json()
|
||||
except Exception as exc:
|
||||
@@ -1462,8 +1485,23 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int:
|
||||
return count
|
||||
|
||||
|
||||
def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5) -> tuple[bool, str, bool]:
|
||||
def _sync_from_peer(
|
||||
peer_url: str,
|
||||
*,
|
||||
page_limit: int = 100,
|
||||
max_rounds: int = 5,
|
||||
) -> tuple[bool, str, bool, int]:
|
||||
"""Sync the local Infonet chain against ``peer_url``.
|
||||
|
||||
Returns ``(ok, error, forked, retry_after_s)``. The fourth tuple
|
||||
element is non-zero only when the peer responded with HTTP 429
|
||||
and supplied a parseable ``Retry-After`` header — see the typed
|
||||
``PeerSyncRateLimited`` exception in mesh_infonet_sync_support.py.
|
||||
Callers should pass that value to ``finish_sync(retry_after_s=...)``
|
||||
so the next attempt actually waits.
|
||||
"""
|
||||
from services.mesh.mesh_hashchain import infonet
|
||||
from services.mesh.mesh_infonet_sync_support import PeerSyncRateLimited
|
||||
|
||||
rounds = 0
|
||||
while rounds < max_rounds:
|
||||
@@ -1472,7 +1510,11 @@ def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5
|
||||
"locator": infonet.get_locator(),
|
||||
"limit": page_limit,
|
||||
}
|
||||
payload = _peer_sync_response(peer_url, body)
|
||||
try:
|
||||
payload = _peer_sync_response(peer_url, body)
|
||||
except PeerSyncRateLimited as exc:
|
||||
# Bubble up the retry-after so finish_sync can honor it.
|
||||
return False, str(exc), False, exc.retry_after_s
|
||||
if bool(payload.get("forked")):
|
||||
# Auto-recover small local forks: if the local chain is tiny
|
||||
# (< 20 events) and the remote has a longer chain, reset local
|
||||
@@ -1488,23 +1530,23 @@ def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5
|
||||
)
|
||||
infonet.reset_chain()
|
||||
continue # retry sync with clean genesis locator
|
||||
return False, "fork detected", True
|
||||
return False, "fork detected", True, 0
|
||||
events = payload.get("events", [])
|
||||
if not isinstance(events, list):
|
||||
return False, "peer sync events must be a list", False
|
||||
return False, "peer sync events must be a list", False, 0
|
||||
if not events:
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
result = infonet.ingest_events(events)
|
||||
_hydrate_gate_store_from_chain(events)
|
||||
rejected = list(result.get("rejected", []) or [])
|
||||
if rejected:
|
||||
return False, f"sync ingest rejected {len(rejected)} event(s)", False
|
||||
return False, f"sync ingest rejected {len(rejected)} event(s)", False, 0
|
||||
if int(result.get("accepted", 0) or 0) == 0 and int(result.get("duplicates", 0) or 0) >= len(events):
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
if len(events) < page_limit:
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
rounds += 1
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
|
||||
|
||||
def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
@@ -1567,11 +1609,12 @@ def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
with _NODE_RUNTIME_LOCK:
|
||||
set_sync_state(started)
|
||||
try:
|
||||
ok, error, forked = _sync_from_peer(record.peer_url)
|
||||
ok, error, forked, retry_after_s = _sync_from_peer(record.peer_url)
|
||||
except Exception as exc:
|
||||
ok = False
|
||||
error = str(exc or type(exc).__name__)
|
||||
forked = False
|
||||
retry_after_s = 0
|
||||
if ok:
|
||||
store.mark_seen(record.peer_url, "sync", now=time.time())
|
||||
store.mark_sync_success(record.peer_url, now=time.time())
|
||||
@@ -1618,6 +1661,12 @@ def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
now=time.time(),
|
||||
interval_s=int(get_settings().MESH_SYNC_INTERVAL_S or 300),
|
||||
failure_backoff_s=failure_backoff_s,
|
||||
# 429 retry-storm fix: when the peer returned HTTP 429 with
|
||||
# a Retry-After header, finish_sync uses max(exponential,
|
||||
# retry_after) for next_sync_due_at — so we actually wait
|
||||
# the time the upstream asked for instead of hammering
|
||||
# every 60s and keeping its rate-limit bucket full forever.
|
||||
retry_after_s=retry_after_s,
|
||||
)
|
||||
with _NODE_RUNTIME_LOCK:
|
||||
set_sync_state(updated)
|
||||
|
||||
+193
-39
@@ -2521,45 +2521,85 @@ async def api_capabilities(request: Request):
|
||||
# OpenClaw Connection Management (local-operator only — NOT via HMAC)
|
||||
# These endpoints manage the HMAC secret itself, so they MUST require
|
||||
# local operator access to prevent privilege escalation.
|
||||
#
|
||||
# Issue #302 (tg12): pre-fix, GET /api/ai/connect-info had two problems:
|
||||
#
|
||||
# 1. ``?reveal=true`` made the full secret travel through every operator
|
||||
# page-load that opened the Connect modal. Even gated to
|
||||
# ``require_local_operator``, that put the secret into browser
|
||||
# history, dev-tools network panels, browser disk caches, HAR
|
||||
# exports, and screen captures. Every time the modal opened.
|
||||
#
|
||||
# 2. The same GET endpoint auto-bootstrapped (generated + persisted)
|
||||
# the secret on first read. Side effects on a GET are a footgun:
|
||||
# browser prefetchers, mirror tools, and casual curl-from-history
|
||||
# would all silently mint+persist a fresh secret. (Gated, but
|
||||
# still surprising — and noisy in the audit log.)
|
||||
#
|
||||
# Resolution:
|
||||
#
|
||||
# GET /api/ai/connect-info — always returns the MASKED
|
||||
# secret. No ?reveal param.
|
||||
# No auto-bootstrap; if the
|
||||
# secret is missing,
|
||||
# ``hmac_secret_set: false``
|
||||
# tells the frontend to call
|
||||
# /bootstrap.
|
||||
#
|
||||
# POST /api/ai/connect-info/bootstrap — NEW. Generates + persists the
|
||||
# secret if missing. Idempotent.
|
||||
# Returns metadata only, never
|
||||
# the full secret.
|
||||
#
|
||||
# POST /api/ai/connect-info/reveal — NEW. Returns the full secret in
|
||||
# the body with strict
|
||||
# ``Cache-Control: no-store,
|
||||
# no-cache, must-revalidate``
|
||||
# + ``Pragma: no-cache`` so
|
||||
# it does not land in browser
|
||||
# caches. POST means it does
|
||||
# not land in URL history.
|
||||
#
|
||||
# POST /api/ai/connect-info/regenerate — keeps existing one-time-reveal
|
||||
# behavior (regenerate IS a
|
||||
# deliberate destructive action
|
||||
# the operator triggered, so
|
||||
# displaying the new secret
|
||||
# once is the only path that
|
||||
# makes the operation useful).
|
||||
# Same no-store headers added.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@router.get("/api/ai/connect-info", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def get_connect_info(request: Request, reveal: bool = False):
|
||||
"""Return connection details for the OpenClaw Connect modal.
|
||||
# Cache-Control headers that should accompany every response carrying the
|
||||
# full HMAC secret. Reused across the reveal + regenerate endpoints so a
|
||||
# future refactor that splits or renames them can't forget the headers.
|
||||
_NO_STORE_HEADERS = {
|
||||
"Cache-Control": "no-store, no-cache, must-revalidate, private",
|
||||
"Pragma": "no-cache",
|
||||
"Expires": "0",
|
||||
}
|
||||
|
||||
The HMAC secret is masked by default. Pass ?reveal=true to see the full key.
|
||||
Private keys are NEVER returned.
|
||||
|
||||
def _mask_hmac_secret(secret: str) -> str:
|
||||
"""Return a fingerprint-style mask (first6 + bullets + last4) suitable
|
||||
for display in the UI before the operator clicks Reveal."""
|
||||
if not secret:
|
||||
return ""
|
||||
if len(secret) > 10:
|
||||
return secret[:6] + "••••••••" + secret[-4:]
|
||||
return "••••••••"
|
||||
|
||||
|
||||
def _connect_info_metadata(settings) -> dict:
|
||||
"""Return everything the Connect modal needs EXCEPT the secret itself.
|
||||
|
||||
Shared between GET /api/ai/connect-info (where the full secret is
|
||||
masked) and POST /api/ai/connect-info/bootstrap (where the operator
|
||||
just generated a secret but we don't return it inline — they have to
|
||||
call /reveal to see it).
|
||||
"""
|
||||
import os
|
||||
import secrets
|
||||
from services.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
hmac_secret = str(settings.OPENCLAW_HMAC_SECRET or "").strip()
|
||||
access_tier = str(settings.OPENCLAW_ACCESS_TIER or "restricted").strip().lower()
|
||||
|
||||
# Auto-generate if not set
|
||||
if not hmac_secret:
|
||||
hmac_secret = secrets.token_hex(24) # 48 chars
|
||||
_write_env_value("OPENCLAW_HMAC_SECRET", hmac_secret)
|
||||
# Clear settings cache so next read picks up the new value
|
||||
get_settings.cache_clear()
|
||||
|
||||
masked = hmac_secret[:6] + "••••••••" + hmac_secret[-4:] if len(hmac_secret) > 10 else "••••••••"
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"hmac_secret": hmac_secret if reveal else masked,
|
||||
"hmac_secret_set": bool(hmac_secret),
|
||||
"bootstrap_behavior": {
|
||||
"auto_generates_when_missing": True,
|
||||
"auto_generated_this_call": not bool(settings.OPENCLAW_HMAC_SECRET or ""),
|
||||
"notes": [
|
||||
"If no HMAC secret exists yet, this endpoint bootstraps one and persists it to .env.",
|
||||
"Regenerating the HMAC secret revokes all existing direct-mode OpenClaw callers at once.",
|
||||
],
|
||||
},
|
||||
"access_tier": access_tier,
|
||||
"trust_model": {
|
||||
"remote_http_principal": "holder_of_openclaw_hmac_secret",
|
||||
@@ -2613,24 +2653,138 @@ async def get_connect_info(request: Request, reveal: bool = False):
|
||||
}
|
||||
|
||||
|
||||
@router.post("/api/ai/connect-info/regenerate", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("5/minute")
|
||||
async def regenerate_hmac_secret(request: Request):
|
||||
"""Generate a new HMAC secret. Old secret immediately stops working."""
|
||||
@router.get("/api/ai/connect-info", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def get_connect_info(request: Request):
|
||||
"""Return connection details for the OpenClaw Connect modal.
|
||||
|
||||
The HMAC secret is always returned as a fingerprint mask
|
||||
(``first6 + bullets + last4``); the full value is only ever served by
|
||||
``POST /api/ai/connect-info/reveal`` (see #302). When the secret has
|
||||
not been bootstrapped yet, ``hmac_secret_set`` is false and the
|
||||
frontend should call ``POST /api/ai/connect-info/bootstrap``.
|
||||
|
||||
Private keys are NEVER returned.
|
||||
"""
|
||||
from services.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
hmac_secret = str(settings.OPENCLAW_HMAC_SECRET or "").strip()
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"masked_hmac_secret": _mask_hmac_secret(hmac_secret),
|
||||
"hmac_secret_set": bool(hmac_secret),
|
||||
"bootstrap_behavior": {
|
||||
"auto_generates_when_missing": False,
|
||||
"notes": [
|
||||
"Call POST /api/ai/connect-info/bootstrap to mint a secret on first use.",
|
||||
"Call POST /api/ai/connect-info/reveal to see the full secret (no-store).",
|
||||
"Regenerating the HMAC secret revokes all existing direct-mode OpenClaw callers at once.",
|
||||
],
|
||||
},
|
||||
**_connect_info_metadata(settings),
|
||||
}
|
||||
|
||||
|
||||
@router.post("/api/ai/connect-info/bootstrap", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("10/minute")
|
||||
async def bootstrap_hmac_secret(request: Request):
|
||||
"""Mint and persist the OpenClaw HMAC secret if it isn't already set.
|
||||
|
||||
Idempotent: if a secret already exists, returns ``generated: false``
|
||||
and leaves the existing secret untouched. Never returns the secret
|
||||
value in the response body — the operator calls
|
||||
``POST /api/ai/connect-info/reveal`` to see it.
|
||||
"""
|
||||
import secrets
|
||||
from services.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
existing = str(settings.OPENCLAW_HMAC_SECRET or "").strip()
|
||||
if existing:
|
||||
return {
|
||||
"ok": True,
|
||||
"generated": False,
|
||||
"hmac_secret_set": True,
|
||||
"masked_hmac_secret": _mask_hmac_secret(existing),
|
||||
"detail": "HMAC secret already configured. Use /reveal to see it.",
|
||||
}
|
||||
|
||||
new_secret = secrets.token_hex(24) # 48 chars
|
||||
_write_env_value("OPENCLAW_HMAC_SECRET", new_secret)
|
||||
get_settings.cache_clear()
|
||||
|
||||
return {
|
||||
"ok": True,
|
||||
"hmac_secret": new_secret,
|
||||
"detail": "HMAC secret regenerated. Update your OpenClaw agent configuration.",
|
||||
"generated": True,
|
||||
"hmac_secret_set": True,
|
||||
"masked_hmac_secret": _mask_hmac_secret(new_secret),
|
||||
"detail": "HMAC secret generated. Call /reveal to copy it into your OpenClaw config.",
|
||||
}
|
||||
|
||||
|
||||
@router.post("/api/ai/connect-info/reveal", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("10/minute")
|
||||
async def reveal_hmac_secret(request: Request):
|
||||
"""Return the full HMAC secret in the response body.
|
||||
|
||||
POST (not GET) so the secret never lands in URL history, access logs,
|
||||
or browser visit history. Strict ``Cache-Control: no-store`` headers
|
||||
prevent intermediaries from persisting the response. Returns 404 if
|
||||
no secret has been bootstrapped — the frontend should call
|
||||
``POST /api/ai/connect-info/bootstrap`` first.
|
||||
"""
|
||||
from services.config import get_settings
|
||||
|
||||
settings = get_settings()
|
||||
hmac_secret = str(settings.OPENCLAW_HMAC_SECRET or "").strip()
|
||||
if not hmac_secret:
|
||||
raise HTTPException(
|
||||
404,
|
||||
"No HMAC secret configured. Call POST /api/ai/connect-info/bootstrap first.",
|
||||
)
|
||||
return JSONResponse(
|
||||
content={
|
||||
"ok": True,
|
||||
"hmac_secret": hmac_secret,
|
||||
"masked_hmac_secret": _mask_hmac_secret(hmac_secret),
|
||||
},
|
||||
headers=_NO_STORE_HEADERS,
|
||||
)
|
||||
|
||||
|
||||
@router.post("/api/ai/connect-info/regenerate", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("5/minute")
|
||||
async def regenerate_hmac_secret(request: Request):
|
||||
"""Generate a new HMAC secret. Old secret immediately stops working.
|
||||
|
||||
Returns the new secret in the response body — this is the only
|
||||
operation where the full secret travels back through the response,
|
||||
because regenerating IS a deliberate destructive action the operator
|
||||
triggered and they need to see the new value once to update their
|
||||
OpenClaw configuration. Strict ``Cache-Control: no-store`` headers
|
||||
keep it from being persisted by browser caches, proxies, or HAR
|
||||
capture tooling.
|
||||
"""
|
||||
import secrets
|
||||
from services.config import get_settings
|
||||
|
||||
new_secret = secrets.token_hex(24) # 48 chars
|
||||
_write_env_value("OPENCLAW_HMAC_SECRET", new_secret)
|
||||
get_settings.cache_clear()
|
||||
|
||||
return JSONResponse(
|
||||
content={
|
||||
"ok": True,
|
||||
"hmac_secret": new_secret,
|
||||
"masked_hmac_secret": _mask_hmac_secret(new_secret),
|
||||
"detail": "HMAC secret regenerated. Update your OpenClaw agent configuration.",
|
||||
},
|
||||
headers=_NO_STORE_HEADERS,
|
||||
)
|
||||
|
||||
|
||||
@router.put("/api/ai/connect-info/access-tier", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("10/minute")
|
||||
async def set_access_tier(request: Request, body: dict):
|
||||
|
||||
@@ -59,6 +59,12 @@ async def health_check(request: Request):
|
||||
# when the SPKI-pinned fallback is in effect. The data plane keeps
|
||||
# flowing (this is by design — see ais_proxy.js comments) but observers
|
||||
# who care about MITM-protection posture deserve a visible signal.
|
||||
#
|
||||
# Plus connectivity health (added 2026-05-23 when stream.aisstream.io
|
||||
# went fully offline): ``connected`` tells the frontend whether ship
|
||||
# data is actually flowing. When false, a banner explains that ships
|
||||
# are unavailable due to an upstream outage — better than the user
|
||||
# silently seeing an empty ocean and assuming we broke something.
|
||||
ais_status: dict = {}
|
||||
try:
|
||||
from services.ais_stream import ais_proxy_status
|
||||
@@ -69,6 +75,15 @@ async def health_check(request: Request):
|
||||
# Don't override a worse top-level status if SLOs already failed,
|
||||
# but escalate ok -> degraded so the field surfaces in dashboards.
|
||||
top_status = "degraded"
|
||||
# AIS_API_KEY not configured is "feature off", not "system broken" —
|
||||
# so we only escalate when the operator opted into AIS (key set) AND
|
||||
# the stream is currently offline.
|
||||
if (
|
||||
os.environ.get("AIS_API_KEY")
|
||||
and ais_status.get("connected") is False
|
||||
and top_status == "ok"
|
||||
):
|
||||
top_status = "degraded"
|
||||
|
||||
return {
|
||||
"status": top_status,
|
||||
|
||||
@@ -85,6 +85,64 @@ async def infonet_peer_push(request: Request):
|
||||
return {"ok": True, **result}
|
||||
|
||||
|
||||
@router.post("/api/mesh/dm/replicate-envelope")
|
||||
@limiter.limit("60/minute")
|
||||
async def dm_replicate_envelope(request: Request):
|
||||
"""Accept a DM envelope replicated from a peer relay (cross-node mailbox).
|
||||
|
||||
Companion endpoint to ``DMRelay.replicate_to_peers`` (outbound, in
|
||||
``mesh_dm_relay.py``). The sender's relay POSTs an encrypted DM
|
||||
envelope here after a successful local ``deposit``; this endpoint
|
||||
re-enforces the per-(sender, recipient) anti-spam cap and stores
|
||||
the envelope in the local mailbox if accepted.
|
||||
|
||||
The cap is the network rule: a hostile sender's relay can spool
|
||||
extras locally, but every honest peer enforces the cap on inbound
|
||||
replication. Recipient polling from any honest peer therefore
|
||||
never sees more than ``MESH_DM_PENDING_PER_SENDER_LIMIT`` pending
|
||||
from any one sender, no matter how many spam attempts were tried.
|
||||
|
||||
Same HMAC auth pattern as ``infonet_peer_push`` and ``gate_peer_push``.
|
||||
"""
|
||||
content_length = request.headers.get("content-length")
|
||||
if content_length:
|
||||
try:
|
||||
# DM envelopes are bounded by MESH_DM_MAX_MSG_BYTES + envelope
|
||||
# overhead; 64 KB is a generous ceiling.
|
||||
if int(content_length) > 65_536:
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Request body too large (max 64KB)"}',
|
||||
status_code=413, media_type="application/json",
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
body_bytes = await request.body()
|
||||
if not _verify_peer_push_hmac(request, body_bytes):
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Invalid or missing peer HMAC"}',
|
||||
status_code=403, media_type="application/json",
|
||||
)
|
||||
try:
|
||||
body = json_mod.loads(body_bytes or b"{}")
|
||||
except (ValueError, TypeError):
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Invalid JSON body"}',
|
||||
status_code=400, media_type="application/json",
|
||||
)
|
||||
envelope = body.get("envelope")
|
||||
if not isinstance(envelope, dict):
|
||||
return {"ok": False, "detail": "envelope must be an object"}
|
||||
|
||||
originating_peer = _peer_hmac_url_from_request(request) or ""
|
||||
|
||||
from services.mesh.mesh_dm_relay import dm_relay
|
||||
result = dm_relay.accept_replica(
|
||||
envelope=envelope,
|
||||
originating_peer_url=originating_peer,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/api/mesh/gate/peer-push")
|
||||
@limiter.limit("30/minute")
|
||||
async def gate_peer_push(request: Request):
|
||||
|
||||
@@ -350,19 +350,58 @@ _proxy_process = None
|
||||
# path during an upstream cert outage. Surfaced via ais_proxy_status() for
|
||||
# /api/health.
|
||||
_proxy_status: dict = {}
|
||||
# Upstream-connectivity telemetry (added when stream.aisstream.io went fully
|
||||
# offline on 2026-05-23). ``_last_msg_at`` is the unix timestamp of the most
|
||||
# recent vessel message received from the proxy. ``_proxy_spawn_count`` is
|
||||
# how many times we've started the node proxy; combined with no recent
|
||||
# messages it tells us the proxy is respawning in a tight loop because the
|
||||
# upstream is unreachable. Surfaced via ais_proxy_status() so the operator
|
||||
# can see "AIS is dead" instead of guessing whether it's their map filter,
|
||||
# their api key, or upstream.
|
||||
_last_msg_at: float = 0.0
|
||||
_proxy_spawn_count: int = 0
|
||||
_VESSEL_TRAIL_INTERVAL_S = 120
|
||||
_VESSEL_TRAIL_MAX_POINTS = 240
|
||||
|
||||
|
||||
def ais_proxy_status() -> dict:
|
||||
"""Return a copy of the latest ais_proxy.js status (issue #258).
|
||||
# How stale "last vessel message" can be before we consider the stream
|
||||
# disconnected. AISStream typically pushes multiple messages/sec, so a 60s
|
||||
# gap means something's wrong upstream or in transit.
|
||||
_AIS_CONNECTED_FRESHNESS_S = 60
|
||||
|
||||
Currently surfaces ``degraded_tls`` (bool) which is true when the
|
||||
proxy is using SPKI-pinned fallback because AISStream's cert expired.
|
||||
Returns an empty dict when no status has been received yet.
|
||||
|
||||
def ais_proxy_status() -> dict:
|
||||
"""Return a copy of the latest ais_proxy.js status + connectivity health.
|
||||
|
||||
Fields:
|
||||
* ``degraded_tls`` (bool, issue #258) — true when the proxy is using
|
||||
SPKI-pinned fallback because AISStream's cert expired.
|
||||
* ``connected`` (bool) — true when we received a vessel message in
|
||||
the last ``_AIS_CONNECTED_FRESHNESS_S`` seconds.
|
||||
* ``last_msg_age_seconds`` (int | None) — seconds since the last
|
||||
vessel message; None if we've never received one.
|
||||
* ``proxy_spawn_count`` (int) — how many times we've spawned the
|
||||
node proxy. Sustained increases here without ``connected`` means
|
||||
we're respawning in a tight loop because upstream is dead.
|
||||
|
||||
Returns an empty dict when called before the AIS subsystem starts
|
||||
(e.g. during tests or when no API key is set).
|
||||
"""
|
||||
with _vessels_lock:
|
||||
return dict(_proxy_status)
|
||||
status = dict(_proxy_status)
|
||||
last = _last_msg_at
|
||||
spawns = _proxy_spawn_count
|
||||
|
||||
now = time.time()
|
||||
if last > 0:
|
||||
last_age = int(now - last)
|
||||
status["last_msg_age_seconds"] = last_age
|
||||
status["connected"] = last_age <= _AIS_CONNECTED_FRESHNESS_S
|
||||
else:
|
||||
status["last_msg_age_seconds"] = None
|
||||
status["connected"] = False
|
||||
status["proxy_spawn_count"] = spawns
|
||||
return status
|
||||
|
||||
import os
|
||||
|
||||
@@ -588,8 +627,10 @@ def _ais_stream_loop():
|
||||
env=proxy_env,
|
||||
**popen_kwargs,
|
||||
)
|
||||
global _proxy_spawn_count
|
||||
with _vessels_lock:
|
||||
_proxy_process = process
|
||||
_proxy_spawn_count += 1
|
||||
|
||||
# Drain stderr in a background thread to prevent deadlock
|
||||
import threading
|
||||
@@ -645,9 +686,15 @@ def _ais_stream_loop():
|
||||
if not mmsi:
|
||||
continue
|
||||
|
||||
# Telemetry: stamp the timestamp of the most recent real
|
||||
# vessel message. ais_proxy_status() reads this to decide
|
||||
# whether the stream is currently "connected" — i.e. has
|
||||
# any data flowed in the last 60s.
|
||||
global _last_msg_at
|
||||
with _vessels_lock:
|
||||
_last_msg_at = time.time()
|
||||
if mmsi not in _vessels:
|
||||
_vessels[mmsi] = {"_updated": time.time()}
|
||||
_vessels[mmsi] = {"_updated": _last_msg_at}
|
||||
vessel = _vessels[mmsi]
|
||||
|
||||
# Update position from PositionReport or StandardClassBPositionReport
|
||||
|
||||
@@ -116,6 +116,21 @@ class Settings(BaseSettings):
|
||||
MESH_DM_REQUEST_MAILBOX_LIMIT: int = 12
|
||||
MESH_DM_SHARED_MAILBOX_LIMIT: int = 48
|
||||
MESH_DM_SELF_MAILBOX_LIMIT: int = 12
|
||||
# Anti-spam: cap on distinct UNACKED messages a single sender can have
|
||||
# parked in a single recipient's mailbox at any one time. Once the
|
||||
# recipient pulls (acks) a message, the sender's quota for that pair
|
||||
# frees up. Default 2 — a sender who wants to deliver more must wait
|
||||
# for the recipient to actually read the prior messages.
|
||||
#
|
||||
# This cap is enforced TWICE: once on the local deposit path (the
|
||||
# sender's own node refuses to spool the 3rd message) AND once on
|
||||
# the replication-acceptance path (honest peer relays refuse to
|
||||
# accept inbound replicas that would put them over the cap). The
|
||||
# double enforcement makes the rule a NETWORK rule — patching out
|
||||
# the local check on a hostile sender's relay doesn't let extras
|
||||
# propagate, because every honest peer enforces the same cap on
|
||||
# inbound replication.
|
||||
MESH_DM_PENDING_PER_SENDER_LIMIT: int = 2
|
||||
MESH_BLOCK_LEGACY_AGENT_ID_LOOKUP: bool = True
|
||||
MESH_ALLOW_COMPAT_DM_INVITE_IMPORT: bool = False
|
||||
MESH_ALLOW_COMPAT_DM_INVITE_IMPORT_UNTIL: str = ""
|
||||
|
||||
@@ -777,6 +777,26 @@ def start_scheduler():
|
||||
misfire_grace_time=60,
|
||||
)
|
||||
|
||||
# AISHub REST fallback — slow polling when the AISStream WebSocket
|
||||
# primary is offline. Configurable interval via
|
||||
# AISHUB_POLL_INTERVAL_MINUTES env (default 20 min). Operator must
|
||||
# set AISHUB_USERNAME to opt in. The fetcher is gated internally on
|
||||
# the primary being disconnected, so this job is cheap when the
|
||||
# WebSocket is healthy (early-returns after a status check).
|
||||
from services.fetchers.aishub_fallback import (
|
||||
aishub_poll_interval_minutes,
|
||||
fetch_aishub_vessels,
|
||||
)
|
||||
_aishub_interval = aishub_poll_interval_minutes()
|
||||
_scheduler.add_job(
|
||||
lambda: _run_task_with_health(fetch_aishub_vessels, "fetch_aishub_vessels"),
|
||||
"interval",
|
||||
minutes=_aishub_interval,
|
||||
id="aishub_fallback",
|
||||
max_instances=1,
|
||||
misfire_grace_time=120,
|
||||
)
|
||||
|
||||
# Route database — bulk refresh from vrs-standing-data.adsb.lol every 5
|
||||
# days. Replaces the legacy /api/0/routeset POST (blocked under our UA,
|
||||
# and broken upstream). Airline schedules change on a quarterly cycle,
|
||||
|
||||
@@ -0,0 +1,290 @@
|
||||
"""AISHub REST fallback for ship tracking when AISStream is unreachable.
|
||||
|
||||
Background
|
||||
----------
|
||||
On 2026-05-23 ``stream.aisstream.io`` (the primary live AIS WebSocket feed)
|
||||
went fully offline. Backend's only ship signal vanished. This module polls
|
||||
``data.aishub.net``'s free REST API on a slow cadence (default 20 min) when
|
||||
the WebSocket primary is disconnected, so the ships layer doesn't go fully
|
||||
dark during upstream outages.
|
||||
|
||||
Why 20 minutes
|
||||
--------------
|
||||
AISHub's free tier is rate-limited and explicitly asks consumers to be
|
||||
courteous. 20 minutes is well inside their limits, gives ships time to
|
||||
move enough to look "alive" on the map, and won't drain their service.
|
||||
Configurable via the ``AISHUB_POLL_INTERVAL_MINUTES`` env var (clamped to
|
||||
[1, 360]).
|
||||
|
||||
Why slow vs primary
|
||||
-------------------
|
||||
This is degraded mode, not a replacement. A ship at 20 knots moves about
|
||||
6 nautical miles in 20 minutes — visible on the map but coarser than the
|
||||
real-time WebSocket signal. When AISStream comes back online, the
|
||||
WebSocket data will overwrite these records via the same ``_vessels``
|
||||
dict and ``source`` will flip from ``"aishub"`` back to upstream-live.
|
||||
|
||||
Opt-in
|
||||
------
|
||||
Operator must set ``AISHUB_USERNAME`` (free registration at
|
||||
https://www.aishub.net/api). If unset, this fetcher is a no-op.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from services.network_utils import fetch_with_curl
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
AISHUB_URL = "https://data.aishub.net/ws.php"
|
||||
|
||||
|
||||
def aishub_username() -> str:
|
||||
return str(os.environ.get("AISHUB_USERNAME", "")).strip()
|
||||
|
||||
|
||||
def aishub_fallback_enabled() -> bool:
|
||||
"""Returns True only when the operator has registered with AISHub and
|
||||
set ``AISHUB_USERNAME``. The presence of the username is the opt-in."""
|
||||
return bool(aishub_username())
|
||||
|
||||
|
||||
def aishub_poll_interval_minutes() -> int:
|
||||
"""Default 20 minutes. Clamped to [1, 360] so a hostile or
|
||||
misconfigured env var can't either hammer the upstream or silence the
|
||||
fallback for a day."""
|
||||
raw = os.environ.get("AISHUB_POLL_INTERVAL_MINUTES", "20")
|
||||
try:
|
||||
value = int(str(raw).strip())
|
||||
except (TypeError, ValueError):
|
||||
value = 20
|
||||
return max(1, min(360, value))
|
||||
|
||||
|
||||
def _should_run_fallback() -> bool:
|
||||
"""Only run when the primary WebSocket is disconnected. Avoids stomping
|
||||
over fresher live data when AISStream is healthy.
|
||||
|
||||
Returns False if:
|
||||
* AISHub isn't configured (no username)
|
||||
* AISStream primary is currently connected (recent vessel messages)
|
||||
|
||||
Returns True only when AIS is configured-but-down. The
|
||||
``proxy_spawn_count > 0`` guard means "the primary has at least tried
|
||||
to run" — if the user set AISHUB_USERNAME but not AIS_API_KEY at all,
|
||||
AISHub will still serve as a primary on its own slow cadence.
|
||||
"""
|
||||
if not aishub_fallback_enabled():
|
||||
return False
|
||||
try:
|
||||
from services.ais_stream import ais_proxy_status
|
||||
status = ais_proxy_status() or {}
|
||||
except Exception:
|
||||
return True # ais_stream not importable? still try AISHub.
|
||||
# If the WebSocket primary is connected, skip the fallback — fresher
|
||||
# data is already flowing.
|
||||
if status.get("connected") is True:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _parse_aishub_response(payload: str) -> list[dict]:
|
||||
"""Parse the AISHub JSON response into a list of vessel records.
|
||||
|
||||
Successful response shape::
|
||||
|
||||
[
|
||||
{"ERROR": false, "USERNAME": "...", "FORMAT": "1", "RECORDS": N},
|
||||
[{"MMSI": ..., "LATITUDE": ..., "LONGITUDE": ..., ...}, ...]
|
||||
]
|
||||
|
||||
Error response shape::
|
||||
|
||||
[{"ERROR": true, "ERROR_MESSAGE": "..."}]
|
||||
|
||||
Empty payload (e.g. silent rate-limit drop) returns ``[]``.
|
||||
"""
|
||||
if not payload or not payload.strip():
|
||||
return []
|
||||
try:
|
||||
data = json.loads(payload)
|
||||
except json.JSONDecodeError as e:
|
||||
logger.warning("AISHub: response is not JSON: %s", e)
|
||||
return []
|
||||
if not isinstance(data, list) or not data:
|
||||
return []
|
||||
header = data[0] if isinstance(data[0], dict) else {}
|
||||
if header.get("ERROR") is True:
|
||||
logger.warning(
|
||||
"AISHub: upstream error: %s",
|
||||
header.get("ERROR_MESSAGE", "<unspecified>"),
|
||||
)
|
||||
return []
|
||||
if len(data) < 2 or not isinstance(data[1], list):
|
||||
return []
|
||||
return [row for row in data[1] if isinstance(row, dict)]
|
||||
|
||||
|
||||
def _normalize_record(row: dict) -> dict | None:
|
||||
"""Map an AISHub vessel record to our internal vessel schema.
|
||||
|
||||
Returns None when the record can't be used (no MMSI, bad position,
|
||||
sentinel "not available" lat/lng).
|
||||
"""
|
||||
try:
|
||||
mmsi = int(row.get("MMSI") or 0)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if not mmsi:
|
||||
return None
|
||||
try:
|
||||
lat = float(row.get("LATITUDE"))
|
||||
lng = float(row.get("LONGITUDE"))
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
# AIS uses 91/181 as "no position available" sentinels.
|
||||
if abs(lat) > 90 or abs(lng) > 180:
|
||||
return None
|
||||
if lat == 91.0 or lng == 181.0:
|
||||
return None
|
||||
# SOG raw 102.3 is "speed not available"; sanitize to 0.
|
||||
try:
|
||||
sog_raw = float(row.get("SOG") or 0)
|
||||
except (TypeError, ValueError):
|
||||
sog_raw = 0.0
|
||||
sog = 0.0 if sog_raw >= 102.2 else sog_raw
|
||||
try:
|
||||
cog = float(row.get("COG") or 0)
|
||||
except (TypeError, ValueError):
|
||||
cog = 0.0
|
||||
try:
|
||||
heading_raw = int(row.get("HEADING") or 511)
|
||||
except (TypeError, ValueError):
|
||||
heading_raw = 511
|
||||
# AIS heading sentinel 511 = "not available" — fall back to COG.
|
||||
heading = heading_raw if heading_raw != 511 else cog
|
||||
try:
|
||||
ais_type = int(row.get("TYPE") or 0)
|
||||
except (TypeError, ValueError):
|
||||
ais_type = 0
|
||||
return {
|
||||
"mmsi": mmsi,
|
||||
"lat": lat,
|
||||
"lng": lng,
|
||||
"sog": sog,
|
||||
"cog": cog,
|
||||
"heading": heading,
|
||||
"name": str(row.get("NAME") or "").strip() or "UNKNOWN",
|
||||
"callsign": str(row.get("CALLSIGN") or "").strip(),
|
||||
"destination": str(row.get("DEST") or "").strip().replace("@", "") or "",
|
||||
"imo": int(row.get("IMO") or 0),
|
||||
"ais_type_code": ais_type,
|
||||
}
|
||||
|
||||
|
||||
def fetch_aishub_vessels() -> int:
|
||||
"""Poll AISHub and merge vessels into the shared ``_vessels`` store.
|
||||
|
||||
Returns the number of vessels updated (0 on skip, error, or no data).
|
||||
Designed to be called by the APScheduler tier — see
|
||||
``data_fetcher.py`` for the 20-minute interval job that wraps this.
|
||||
"""
|
||||
if not _should_run_fallback():
|
||||
logger.debug("AISHub fallback skipped: primary connected or not configured")
|
||||
return 0
|
||||
|
||||
username = aishub_username()
|
||||
url = (
|
||||
f"{AISHUB_URL}?username={username}&format=1&output=json"
|
||||
f"&compress=0"
|
||||
)
|
||||
|
||||
try:
|
||||
response = fetch_with_curl(url, timeout=30)
|
||||
except Exception as e:
|
||||
logger.warning("AISHub fetch failed: %s", e)
|
||||
return 0
|
||||
|
||||
if not response or response.status_code != 200:
|
||||
logger.warning(
|
||||
"AISHub HTTP %s",
|
||||
getattr(response, "status_code", "None"),
|
||||
)
|
||||
return 0
|
||||
|
||||
rows = _parse_aishub_response(getattr(response, "text", "") or "")
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
# Inline imports to avoid a circular dependency at module load time
|
||||
# (ais_stream imports lots of things and is loaded by main.py).
|
||||
from services.ais_stream import (
|
||||
_vessels,
|
||||
_vessels_lock,
|
||||
_record_vessel_trail_locked,
|
||||
classify_vessel,
|
||||
get_country_from_mmsi,
|
||||
)
|
||||
|
||||
now = time.time()
|
||||
count = 0
|
||||
with _vessels_lock:
|
||||
for row in rows:
|
||||
normalized = _normalize_record(row)
|
||||
if normalized is None:
|
||||
continue
|
||||
mmsi = normalized["mmsi"]
|
||||
vessel = _vessels.setdefault(mmsi, {"mmsi": mmsi})
|
||||
# Don't overwrite fresher live data: if the WebSocket pushed an
|
||||
# update for this MMSI more recently than now-1s (race during
|
||||
# the brief reconnection window) keep the live one.
|
||||
last = float(vessel.get("_updated") or 0)
|
||||
if last > now - 1:
|
||||
continue
|
||||
vessel.update(
|
||||
{
|
||||
"lat": normalized["lat"],
|
||||
"lng": normalized["lng"],
|
||||
"sog": normalized["sog"],
|
||||
"cog": normalized["cog"],
|
||||
"heading": normalized["heading"],
|
||||
"_updated": now,
|
||||
"source": "aishub",
|
||||
}
|
||||
)
|
||||
if normalized["name"] and normalized["name"] != "UNKNOWN":
|
||||
vessel["name"] = normalized["name"]
|
||||
if normalized["callsign"]:
|
||||
vessel["callsign"] = normalized["callsign"]
|
||||
if normalized["destination"]:
|
||||
vessel["destination"] = normalized["destination"]
|
||||
if normalized["imo"]:
|
||||
vessel["imo"] = normalized["imo"]
|
||||
if normalized["ais_type_code"]:
|
||||
vessel["ais_type_code"] = normalized["ais_type_code"]
|
||||
vessel["type"] = classify_vessel(normalized["ais_type_code"], mmsi)
|
||||
if not vessel.get("country"):
|
||||
vessel["country"] = get_country_from_mmsi(mmsi)
|
||||
_record_vessel_trail_locked(
|
||||
mmsi,
|
||||
normalized["lat"],
|
||||
normalized["lng"],
|
||||
normalized["sog"],
|
||||
now,
|
||||
)
|
||||
count += 1
|
||||
|
||||
if count:
|
||||
logger.info(
|
||||
"AISHub fallback: merged %d vessels (poll interval %d min)",
|
||||
count,
|
||||
aishub_poll_interval_minutes(),
|
||||
)
|
||||
return count
|
||||
@@ -317,6 +317,39 @@ class DMRelay:
|
||||
def _self_mailbox_limit(self) -> int:
|
||||
return max(1, int(self._settings().MESH_DM_SELF_MAILBOX_LIMIT))
|
||||
|
||||
def _per_sender_pending_limit(self) -> int:
|
||||
"""Anti-spam cap on UNACKED messages a single sender can have parked
|
||||
in a single recipient mailbox at any one time. See ``config.py``
|
||||
``MESH_DM_PENDING_PER_SENDER_LIMIT`` for the threat model — this
|
||||
rule is enforced both at ``deposit`` (local) and at
|
||||
``accept_replica`` (peer push acceptance), making it a network
|
||||
rule rather than a client-side honor system."""
|
||||
try:
|
||||
limit = int(getattr(self._settings(), "MESH_DM_PENDING_PER_SENDER_LIMIT", 2) or 2)
|
||||
except (TypeError, ValueError):
|
||||
limit = 2
|
||||
return max(1, limit)
|
||||
|
||||
def _per_sender_pending_count(
|
||||
self,
|
||||
*,
|
||||
mailbox_key: str,
|
||||
sender_block_ref: str,
|
||||
) -> int:
|
||||
"""Count UNACKED messages from ``sender_block_ref`` currently parked
|
||||
in ``mailbox_key``. Caller already holds ``self._lock``.
|
||||
|
||||
Messages that have been claimed/acked are removed from the mailbox
|
||||
list (see ``claim_message_ids``), so anything still here is by
|
||||
definition unacked. We count by exact ``sender_block_ref`` match
|
||||
— that's the per-pair sender identity used for blocking too, so
|
||||
the cap is naturally per-(sender, recipient).
|
||||
"""
|
||||
if not mailbox_key or not sender_block_ref:
|
||||
return 0
|
||||
messages = self._mailboxes.get(mailbox_key, [])
|
||||
return sum(1 for m in messages if m.sender_block_ref == sender_block_ref)
|
||||
|
||||
def _nonce_ttl_seconds(self) -> int:
|
||||
return max(30, int(self._settings().MESH_DM_NONCE_TTL_S))
|
||||
|
||||
@@ -1515,6 +1548,29 @@ class DMRelay:
|
||||
if len(self._mailboxes[mailbox_key]) >= self._mailbox_limit_for_class(delivery_class):
|
||||
metrics_inc("dm_drop_full")
|
||||
return {"ok": False, "detail": "Recipient mailbox full"}
|
||||
# Anti-spam: per-(sender, recipient) cap on unacked messages.
|
||||
# A sender who already has the configured number of messages
|
||||
# parked in this mailbox can't deposit more until the recipient
|
||||
# pulls (acks) at least one. The same cap is re-enforced on
|
||||
# inbound replication in ``accept_replica`` so this rule isn't
|
||||
# bypassable by patching out the local check on a hostile
|
||||
# sender's relay — see config.py
|
||||
# MESH_DM_PENDING_PER_SENDER_LIMIT for the threat model.
|
||||
per_sender_limit = self._per_sender_pending_limit()
|
||||
pending = self._per_sender_pending_count(
|
||||
mailbox_key=mailbox_key,
|
||||
sender_block_ref=sender_block_ref,
|
||||
)
|
||||
if pending >= per_sender_limit:
|
||||
metrics_inc("dm_drop_per_sender_cap")
|
||||
return {
|
||||
"ok": False,
|
||||
"detail": (
|
||||
f"Recipient already has {pending} unread message"
|
||||
f"{'s' if pending != 1 else ''} from you. Wait for "
|
||||
"them to read your messages before sending more."
|
||||
),
|
||||
}
|
||||
if not msg_id:
|
||||
msg_id = f"dm_{int(time.time() * 1000)}_{secrets.token_hex(6)}"
|
||||
elif any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key]):
|
||||
@@ -1539,8 +1595,245 @@ class DMRelay:
|
||||
)
|
||||
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
|
||||
self._save()
|
||||
# Cross-node mailbox replication: push the freshly-stored
|
||||
# envelope to every authenticated relay peer so the recipient
|
||||
# can log into ANY node and find their messages. The push is
|
||||
# async (fire-and-forget thread) so deposit() returns
|
||||
# immediately — slow Tor peers can't block the sender's UX.
|
||||
# Each receiving peer re-enforces the per-sender cap on
|
||||
# acceptance, so hostile relays can't widen the cap.
|
||||
try:
|
||||
envelope_for_push = self.envelope_for_replication(
|
||||
mailbox_key=mailbox_key, msg_id=msg_id,
|
||||
)
|
||||
if envelope_for_push:
|
||||
self._replicate_envelope_to_peers_async(
|
||||
envelope=envelope_for_push,
|
||||
)
|
||||
except Exception:
|
||||
metrics_inc("dm_replication_push_error")
|
||||
return {"ok": True, "msg_id": msg_id}
|
||||
|
||||
def accept_replica(
|
||||
self,
|
||||
*,
|
||||
envelope: dict[str, Any],
|
||||
originating_peer_url: str = "",
|
||||
) -> dict[str, Any]:
|
||||
"""Receive a DM envelope replicated from a peer relay.
|
||||
|
||||
Cross-node mailbox replication entry point. When a sender's local
|
||||
relay accepts a ``deposit`` and pushes the envelope to
|
||||
``MESH_RELAY_PEERS`` (so the recipient can log into any peer
|
||||
node and find their messages), each receiving peer calls
|
||||
``accept_replica`` to ingest it.
|
||||
|
||||
The per-(sender, recipient) cap is re-enforced HERE. That's what
|
||||
makes the rule a NETWORK rule rather than a client-side honor
|
||||
system: a hostile sender who patches out the local ``deposit``
|
||||
check still can't get a 3rd unacked message to spread, because
|
||||
every honest peer enforces the same cap on inbound replicas.
|
||||
Result: hostile relays can hold extras locally, but those extras
|
||||
never reach any node a legitimate recipient is polling from.
|
||||
|
||||
Returns the same shape as ``deposit`` so the calling endpoint can
|
||||
forward the result back to the originating peer.
|
||||
"""
|
||||
if not isinstance(envelope, dict):
|
||||
return {"ok": False, "detail": "envelope must be an object"}
|
||||
msg_id = str(envelope.get("msg_id", "") or "").strip()
|
||||
mailbox_key = str(envelope.get("mailbox_key", "") or "").strip()
|
||||
sender_block_ref = str(envelope.get("sender_block_ref", "") or "").strip()
|
||||
ciphertext = str(envelope.get("ciphertext", "") or "")
|
||||
if not msg_id or not mailbox_key or not sender_block_ref or not ciphertext:
|
||||
return {"ok": False, "detail": "envelope missing required fields"}
|
||||
|
||||
with self._lock:
|
||||
self._refresh_from_shared_relay()
|
||||
self._cleanup_expired()
|
||||
|
||||
# Idempotent — if we already hold this exact msg_id, the
|
||||
# replication round-tripped or a peer pushed the same
|
||||
# envelope through multiple paths. Accept silently.
|
||||
if any(m.msg_id == msg_id for m in self._mailboxes.get(mailbox_key, [])):
|
||||
metrics_inc("dm_replica_duplicate")
|
||||
return {"ok": True, "msg_id": msg_id, "duplicate": True}
|
||||
|
||||
# Same per-class cap as the deposit path — defense in depth
|
||||
# against a peer that wraps a "deposit" as a "replica" to
|
||||
# bypass the class limit.
|
||||
delivery_class = str(envelope.get("delivery_class", "") or "")
|
||||
if delivery_class in ("request", "shared", "self"):
|
||||
class_limit = self._mailbox_limit_for_class(delivery_class)
|
||||
else:
|
||||
class_limit = self._shared_mailbox_limit()
|
||||
if len(self._mailboxes.get(mailbox_key, [])) >= class_limit:
|
||||
metrics_inc("dm_replica_drop_full")
|
||||
return {"ok": False, "detail": "Recipient mailbox full"}
|
||||
|
||||
# THE network rule: per-(sender, recipient) anti-spam cap.
|
||||
per_sender_limit = self._per_sender_pending_limit()
|
||||
pending = self._per_sender_pending_count(
|
||||
mailbox_key=mailbox_key,
|
||||
sender_block_ref=sender_block_ref,
|
||||
)
|
||||
if pending >= per_sender_limit:
|
||||
metrics_inc("dm_replica_drop_per_sender_cap")
|
||||
# Returning a structured rejection — the sender's relay
|
||||
# learns its envelope was rejected by an honest peer and
|
||||
# can stop trying to push it.
|
||||
return {
|
||||
"ok": False,
|
||||
"detail": (
|
||||
"Per-sender cap reached on this relay; refusing replica"
|
||||
),
|
||||
"cap_violation": True,
|
||||
"pending": pending,
|
||||
"limit": per_sender_limit,
|
||||
}
|
||||
|
||||
# Accept the replica into the local mailbox.
|
||||
self._mailboxes[mailbox_key].append(
|
||||
DMMessage(
|
||||
sender_id=str(envelope.get("sender_id", "") or ""),
|
||||
ciphertext=ciphertext,
|
||||
timestamp=float(envelope.get("timestamp", time.time()) or time.time()),
|
||||
msg_id=msg_id,
|
||||
delivery_class=str(envelope.get("delivery_class", "shared") or "shared"),
|
||||
sender_seal=str(envelope.get("sender_seal", "") or ""),
|
||||
relay_salt=str(envelope.get("relay_salt", "") or ""),
|
||||
sender_block_ref=sender_block_ref,
|
||||
payload_format=str(envelope.get("payload_format", "dm1") or "dm1"),
|
||||
session_welcome=str(envelope.get("session_welcome", "") or ""),
|
||||
)
|
||||
)
|
||||
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
|
||||
self._save()
|
||||
metrics_inc("dm_replica_accepted")
|
||||
return {"ok": True, "msg_id": msg_id}
|
||||
|
||||
def _replicate_envelope_to_peers_async(
|
||||
self,
|
||||
*,
|
||||
envelope: dict[str, Any],
|
||||
) -> None:
|
||||
"""Push an outbound DM envelope to every authenticated relay peer.
|
||||
|
||||
Fire-and-forget: spawned in a background thread so ``deposit``
|
||||
returns to the caller immediately. Per-peer errors are logged
|
||||
and swallowed — the sender's UX must not block on slow Tor
|
||||
peers, and a peer that's down today gets the next message
|
||||
whenever it comes back. Inbound recipient polling from a healthy
|
||||
peer keeps the system functional during peer failures.
|
||||
|
||||
Each peer is authed with the existing per-peer HMAC pattern
|
||||
(#256) — same headers and key resolver gate-message replication
|
||||
uses, so a hostile node that doesn't know any peer's HMAC key
|
||||
can't impersonate a legitimate relay.
|
||||
"""
|
||||
import threading
|
||||
|
||||
def _do_push():
|
||||
try:
|
||||
import hashlib
|
||||
import hmac
|
||||
import requests as _requests
|
||||
|
||||
from services.mesh.mesh_crypto import (
|
||||
normalize_peer_url,
|
||||
resolve_peer_key_for_url,
|
||||
)
|
||||
from services.mesh.mesh_router import (
|
||||
authenticated_push_peer_urls,
|
||||
)
|
||||
|
||||
peers = authenticated_push_peer_urls()
|
||||
if not peers:
|
||||
return
|
||||
|
||||
payload = json.dumps(
|
||||
{"envelope": envelope},
|
||||
separators=(",", ":"),
|
||||
ensure_ascii=False,
|
||||
).encode("utf-8")
|
||||
|
||||
timeout = max(
|
||||
1,
|
||||
int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10),
|
||||
)
|
||||
|
||||
for peer_url in peers:
|
||||
try:
|
||||
normalized = normalize_peer_url(peer_url)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
peer_key = resolve_peer_key_for_url(normalized)
|
||||
if peer_key:
|
||||
headers["X-Peer-Url"] = normalized
|
||||
headers["X-Peer-HMAC"] = hmac.new(
|
||||
peer_key, payload, hashlib.sha256
|
||||
).hexdigest()
|
||||
url = f"{peer_url}/api/mesh/dm/replicate-envelope"
|
||||
resp = _requests.post(
|
||||
url, data=payload, timeout=timeout, headers=headers,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
metrics_inc("dm_replication_push_ok")
|
||||
else:
|
||||
# 4xx including the structured cap_violation
|
||||
# rejection from accept_replica — sender's
|
||||
# relay learns and stops retrying this msg_id.
|
||||
metrics_inc("dm_replication_push_rejected")
|
||||
except Exception:
|
||||
# Per-peer failure is non-fatal — log to metrics
|
||||
# but don't break the loop. Other peers and a
|
||||
# future retry can still propagate the envelope.
|
||||
metrics_inc("dm_replication_push_error")
|
||||
continue
|
||||
except Exception:
|
||||
# Outer guard — never let replication errors propagate
|
||||
# back to the sender's deposit() caller.
|
||||
metrics_inc("dm_replication_push_error")
|
||||
|
||||
thread = threading.Thread(
|
||||
target=_do_push,
|
||||
name="dm-replicate-push",
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
|
||||
def envelope_for_replication(
|
||||
self,
|
||||
*,
|
||||
mailbox_key: str,
|
||||
msg_id: str,
|
||||
) -> dict[str, Any] | None:
|
||||
"""Return the wire-form envelope for a stored message, suitable
|
||||
for POSTing to a peer relay's replicate-envelope endpoint.
|
||||
|
||||
Returns ``None`` if the message isn't in the mailbox (already
|
||||
acked, expired, never existed). The caller holds the
|
||||
responsibility for transport security (Tor SOCKS for .onion
|
||||
peers, per-peer HMAC) and for not leaking the envelope to
|
||||
clearnet peers when private transport is required.
|
||||
"""
|
||||
with self._lock:
|
||||
for m in self._mailboxes.get(mailbox_key, []):
|
||||
if m.msg_id == msg_id:
|
||||
return {
|
||||
"msg_id": m.msg_id,
|
||||
"mailbox_key": mailbox_key,
|
||||
"sender_id": m.sender_id,
|
||||
"sender_block_ref": m.sender_block_ref,
|
||||
"sender_seal": m.sender_seal,
|
||||
"ciphertext": m.ciphertext,
|
||||
"timestamp": m.timestamp,
|
||||
"delivery_class": m.delivery_class,
|
||||
"relay_salt": m.relay_salt,
|
||||
"payload_format": m.payload_format,
|
||||
"session_welcome": m.session_welcome,
|
||||
}
|
||||
return None
|
||||
|
||||
def is_blocked(self, recipient_id: str, sender_id: str) -> bool:
|
||||
with self._lock:
|
||||
self._refresh_from_shared_relay()
|
||||
|
||||
@@ -2,10 +2,64 @@ from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import asdict, dataclass
|
||||
from email.utils import parsedate_to_datetime
|
||||
from datetime import timezone
|
||||
|
||||
from services.mesh.mesh_peer_store import PeerRecord
|
||||
|
||||
|
||||
class PeerSyncRateLimited(Exception):
|
||||
"""Upstream peer returned HTTP 429 — Too Many Requests.
|
||||
|
||||
Carries the ``Retry-After`` header value (parsed to seconds) so
|
||||
the caller can pass it to ``finish_sync(retry_after_s=...)`` and
|
||||
actually wait that long instead of hammering the upstream every
|
||||
60s and keeping its rate-limit bucket full.
|
||||
|
||||
``retry_after_s`` is 0 when the upstream didn't provide a header.
|
||||
Caller should still apply the exponential backoff in that case.
|
||||
"""
|
||||
|
||||
def __init__(self, message: str, retry_after_s: int = 0, status: int = 429):
|
||||
super().__init__(message)
|
||||
self.retry_after_s = max(0, int(retry_after_s or 0))
|
||||
self.status = int(status or 429)
|
||||
|
||||
|
||||
def parse_retry_after_header(header_value: str, *, now: float | None = None) -> int:
|
||||
"""Parse the ``Retry-After`` HTTP header.
|
||||
|
||||
Two valid forms per RFC 7231 §7.1.3:
|
||||
|
||||
* Delay-seconds: a non-negative integer (e.g. ``Retry-After: 120``)
|
||||
* HTTP-date: an absolute time (e.g. ``Retry-After: Wed, 21 Oct 2026 07:28:00 GMT``)
|
||||
|
||||
Returns the wait in **seconds from now**. Unparseable / empty headers
|
||||
return 0 (caller falls back to exponential backoff). Clamped at a
|
||||
sane upper bound (1 hour) so a typo'd or hostile peer can't pin us
|
||||
silent for days.
|
||||
"""
|
||||
value = str(header_value or "").strip()
|
||||
if not value:
|
||||
return 0
|
||||
upper_bound = 3600 # never trust a peer to silence us > 1h
|
||||
# Form 1: pure integer seconds.
|
||||
if value.isdigit():
|
||||
return min(max(0, int(value)), upper_bound)
|
||||
# Form 2: HTTP-date.
|
||||
try:
|
||||
target = parsedate_to_datetime(value)
|
||||
if target is None:
|
||||
return 0
|
||||
if target.tzinfo is None:
|
||||
target = target.replace(tzinfo=timezone.utc)
|
||||
current = float(now if now is not None else time.time())
|
||||
delta = int(target.timestamp() - current)
|
||||
return min(max(0, delta), upper_bound)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SyncWorkerState:
|
||||
last_sync_started_at: int = 0
|
||||
@@ -72,6 +126,59 @@ def begin_sync(
|
||||
)
|
||||
|
||||
|
||||
def _failure_backoff_seconds(
|
||||
*,
|
||||
base_backoff_s: int,
|
||||
consecutive_failures: int,
|
||||
retry_after_s: int,
|
||||
cap_s: int = 1800,
|
||||
) -> int:
|
||||
"""Compute the next-attempt delay after a failed sync.
|
||||
|
||||
Two inputs combine:
|
||||
|
||||
* ``retry_after_s`` — when an upstream peer answered HTTP 429
|
||||
with a ``Retry-After`` header, we honor it exactly. Continuing
|
||||
to hammer the upstream every 60s is the bug this fix exists to
|
||||
close: it keeps the upstream's rate-limit bucket full
|
||||
indefinitely and no sync ever lands.
|
||||
|
||||
* Exponential growth on ``consecutive_failures`` — even without an
|
||||
explicit Retry-After, repeated failures should slow us down. The
|
||||
first failure waits ``base`` (preserves pre-fix behavior for
|
||||
one-off blips). Each subsequent failure doubles the wait, capped
|
||||
to ``cap_s`` (default 30 minutes). With base=60 and cap=1800,
|
||||
the schedule is 60s → 120s → 240s → 480s → 960s → 1800s →
|
||||
1800s → … .
|
||||
|
||||
The actual delay is the MAX of the two — whichever asks for more
|
||||
patience wins. ``retry_after_s == 0`` (no header) falls back to
|
||||
pure exponential. An aggressive ``Retry-After`` (say 600s while
|
||||
we're only at 1 failure) wins over the exponential ladder.
|
||||
"""
|
||||
base = max(0, int(base_backoff_s or 0))
|
||||
failures = max(0, int(consecutive_failures or 0))
|
||||
cap = max(0, int(cap_s or 0))
|
||||
retry_after = max(0, int(retry_after_s or 0))
|
||||
# ``cap_s=0`` explicitly disables the exponential ladder entirely
|
||||
# — operators who want the pre-fix "honor Retry-After only" behavior
|
||||
# can set this. The default cap of 1800s is what saturates the
|
||||
# ladder at the 5th-6th failure for base=60.
|
||||
if cap == 0:
|
||||
return retry_after
|
||||
# 2^(failures-1) — so failure #1 = base (preserves the pre-fix
|
||||
# default for transient blips), failure #2 = 2*base, etc. Cap on
|
||||
# the exponent (16) is defense against integer overflow on a
|
||||
# hostile or very large failures counter.
|
||||
if base > 0 and failures > 0:
|
||||
exponent = min(max(0, failures - 1), 16)
|
||||
grown = base * (2 ** exponent)
|
||||
else:
|
||||
grown = 0
|
||||
exponential = min(max(0, grown), cap)
|
||||
return max(exponential, retry_after)
|
||||
|
||||
|
||||
def finish_sync(
|
||||
state: SyncWorkerState,
|
||||
*,
|
||||
@@ -83,7 +190,26 @@ def finish_sync(
|
||||
now: float | None = None,
|
||||
interval_s: int = 300,
|
||||
failure_backoff_s: int = 60,
|
||||
retry_after_s: int = 0,
|
||||
failure_backoff_cap_s: int = 1800,
|
||||
) -> SyncWorkerState:
|
||||
"""Finalise a sync attempt and compute when the next one should run.
|
||||
|
||||
New args (added for the 429 retry storm fix):
|
||||
|
||||
* ``retry_after_s`` — if the peer responded with HTTP 429 + a
|
||||
``Retry-After`` header, pass that value here. ``finish_sync``
|
||||
will use ``max(exponential, retry_after_s)`` for the delay so
|
||||
we never hammer a peer that asked us to back off.
|
||||
* ``failure_backoff_cap_s`` — upper bound on the exponential
|
||||
ladder. Default 1800 (30 min) — keeps a sync queue from going
|
||||
silent for hours while still cutting the request rate to
|
||||
something the upstream can absorb.
|
||||
|
||||
The pre-fix behavior (constant 60s on every failure) is recoverable
|
||||
by passing ``failure_backoff_cap_s=0`` and ``retry_after_s=0``, but
|
||||
there's no reason to.
|
||||
"""
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
if ok:
|
||||
return SyncWorkerState(
|
||||
@@ -99,17 +225,25 @@ def finish_sync(
|
||||
consecutive_failures=0,
|
||||
)
|
||||
|
||||
next_failures = state.consecutive_failures + 1
|
||||
delay_s = _failure_backoff_seconds(
|
||||
base_backoff_s=failure_backoff_s,
|
||||
consecutive_failures=next_failures,
|
||||
retry_after_s=retry_after_s,
|
||||
cap_s=failure_backoff_cap_s,
|
||||
)
|
||||
|
||||
return SyncWorkerState(
|
||||
last_sync_started_at=state.last_sync_started_at,
|
||||
last_sync_finished_at=timestamp,
|
||||
last_sync_ok_at=state.last_sync_ok_at,
|
||||
next_sync_due_at=timestamp + max(0, int(failure_backoff_s or 0)),
|
||||
next_sync_due_at=timestamp + delay_s,
|
||||
last_peer_url=peer_url or state.last_peer_url,
|
||||
last_error=str(error or "").strip(),
|
||||
last_outcome="fork" if fork_detected else "error",
|
||||
current_head=current_head or state.current_head,
|
||||
fork_detected=bool(fork_detected),
|
||||
consecutive_failures=state.consecutive_failures + 1,
|
||||
consecutive_failures=next_failures,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
"""Infonet sync respects upstream HTTP 429 + applies exponential backoff.
|
||||
|
||||
Background
|
||||
----------
|
||||
Before this fix, ``finish_sync`` used a constant 60s ``failure_backoff_s``
|
||||
regardless of how many consecutive failures preceded. When an upstream
|
||||
peer (e.g. the seed onion) returned HTTP 429 "Too Many Requests", the
|
||||
sync worker would:
|
||||
|
||||
1. Receive 429
|
||||
2. Stringify the status into a generic ``ValueError``
|
||||
3. Call ``finish_sync(error=str(exc))`` -- losing the status code
|
||||
4. Schedule next attempt for ``now + 60s``
|
||||
5. Retry. Upstream's rate-limit bucket is still full. 429 again. Loop.
|
||||
|
||||
Net effect: a node with one transient 429 would hammer the upstream
|
||||
every 60s forever, keeping the bucket full and never recovering. This
|
||||
is what kept the user's Infonet node from reaching the seed peer.
|
||||
|
||||
What the fix does
|
||||
-----------------
|
||||
* New typed exception ``PeerSyncRateLimited`` carries the parsed
|
||||
``Retry-After`` value out of the HTTP layer.
|
||||
* ``_sync_from_peer`` returns ``(ok, error, forked, retry_after_s)``
|
||||
instead of the old 3-tuple.
|
||||
* ``finish_sync`` honors ``retry_after_s`` AND applies exponential
|
||||
backoff: ``delay = max(retry_after_s, base * 2^failures, cap=1800)``.
|
||||
* ``parse_retry_after_header`` handles both RFC 7231 forms (delay
|
||||
seconds, and HTTP-date).
|
||||
|
||||
These tests pin every part of the new contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_retry_after_header — both RFC 7231 forms + edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseRetryAfter:
|
||||
def test_integer_seconds(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("120") == 120
|
||||
assert parse_retry_after_header(" 30 ") == 30
|
||||
assert parse_retry_after_header("0") == 0
|
||||
|
||||
def test_http_date(self):
|
||||
"""RFC 7231 §7.1.3 explicitly allows ``Retry-After: <HTTP-date>``.
|
||||
We compute seconds-from-now so callers can use the same field
|
||||
regardless of which form the upstream chose."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
# Pin "now" so the test is deterministic.
|
||||
now = 1_700_000_000.0 # 2023-11-14T22:13:20Z
|
||||
# 300 seconds in the future, formatted per RFC 7231.
|
||||
future = "Tue, 14 Nov 2023 22:18:20 GMT"
|
||||
result = parse_retry_after_header(future, now=now)
|
||||
assert 295 <= result <= 305, f"expected ~300s, got {result}"
|
||||
|
||||
def test_http_date_in_past_returns_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
now = 1_700_000_000.0
|
||||
past = "Mon, 13 Nov 2023 00:00:00 GMT"
|
||||
assert parse_retry_after_header(past, now=now) == 0
|
||||
|
||||
def test_empty_and_whitespace_return_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("") == 0
|
||||
assert parse_retry_after_header(" ") == 0
|
||||
|
||||
def test_malformed_returns_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("not a header") == 0
|
||||
assert parse_retry_after_header("xyz") == 0
|
||||
|
||||
def test_clamps_to_one_hour(self):
|
||||
"""A hostile peer can't silence us for a week by claiming a
|
||||
24h Retry-After. We cap at 1 hour."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("86400") == 3600 # 24h -> 1h
|
||||
assert parse_retry_after_header("99999999") == 3600
|
||||
|
||||
def test_negative_returns_zero(self):
|
||||
"""RFC 7231 says ``Retry-After`` is a non-negative integer;
|
||||
leading-minus parses as a non-digit and yields 0 here."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("-10") == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _failure_backoff_seconds — exponential growth, retry-after override, cap
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFailureBackoffSeconds:
|
||||
def test_exponential_growth(self):
|
||||
"""First failure uses the base (preserves pre-fix behavior
|
||||
for one-off blips). Each subsequent failure doubles the wait,
|
||||
capped at 1800s. With base=60: 60, 120, 240, 480, 960, 1800,
|
||||
1800, 1800."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
delays = [
|
||||
_failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=n,
|
||||
retry_after_s=0,
|
||||
cap_s=1800,
|
||||
)
|
||||
for n in range(1, 9)
|
||||
]
|
||||
assert delays == [60, 120, 240, 480, 960, 1800, 1800, 1800], delays
|
||||
|
||||
def test_retry_after_wins_when_larger(self):
|
||||
"""If the upstream says ``Retry-After: 600`` but exponential
|
||||
would only ask for 60s (one failure), we honor the upstream."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=1,
|
||||
retry_after_s=600,
|
||||
cap_s=1800,
|
||||
) == 600
|
||||
|
||||
def test_exponential_wins_when_larger(self):
|
||||
"""If exponential is asking for 1800s (6+ failures) but
|
||||
upstream only sent ``Retry-After: 30``, we honor exponential.
|
||||
The 30s was the upstream's view at one moment; our exponential
|
||||
reflects sustained failure."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
result = _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=7,
|
||||
retry_after_s=30,
|
||||
cap_s=1800,
|
||||
)
|
||||
assert result == 1800
|
||||
|
||||
def test_cap_zero_disables_exponential(self):
|
||||
"""Operators who want pre-fix behavior can set cap=0; only the
|
||||
upstream's Retry-After is respected. (Pre-fix had no
|
||||
exponential growth at all.)"""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=10,
|
||||
retry_after_s=120,
|
||||
cap_s=0,
|
||||
) == 120
|
||||
|
||||
def test_zero_inputs_return_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=0,
|
||||
consecutive_failures=0,
|
||||
retry_after_s=0,
|
||||
) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# finish_sync end-to-end — failure path with retry-after + growing counter
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFinishSyncBackoff:
|
||||
def _state(self, **overrides):
|
||||
from services.mesh.mesh_infonet_sync_support import SyncWorkerState
|
||||
base = {
|
||||
"last_sync_started_at": 0,
|
||||
"last_sync_finished_at": 0,
|
||||
"last_sync_ok_at": 0,
|
||||
"next_sync_due_at": 0,
|
||||
"last_peer_url": "",
|
||||
"last_error": "",
|
||||
"last_outcome": "idle",
|
||||
"current_head": "",
|
||||
"fork_detected": False,
|
||||
"consecutive_failures": 0,
|
||||
}
|
||||
base.update(overrides)
|
||||
return SyncWorkerState(**base)
|
||||
|
||||
def test_first_failure_uses_base_unchanged(self):
|
||||
"""One failure means consecutive_failures becomes 1, which uses
|
||||
``base * 2^0 = base``. Preserves the pre-fix behavior so a
|
||||
single transient upstream blip doesn't suddenly take 2 minutes
|
||||
to retry — that change has to be earned by sustained failure."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(),
|
||||
ok=False,
|
||||
error="some upstream blip",
|
||||
now=1000.0,
|
||||
failure_backoff_s=60,
|
||||
)
|
||||
assert result.consecutive_failures == 1
|
||||
assert result.next_sync_due_at == 1000 + 60
|
||||
assert result.last_error == "some upstream blip"
|
||||
assert result.last_outcome == "error"
|
||||
|
||||
def test_consecutive_failures_grow_the_delay(self):
|
||||
"""After 5 prior failures already in state, the next failure
|
||||
sets consecutive=6 and uses the cap (1800s = 60 * 2^5)."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=5),
|
||||
ok=False,
|
||||
error="HTTP 429",
|
||||
now=2000.0,
|
||||
failure_backoff_s=60,
|
||||
)
|
||||
assert result.consecutive_failures == 6
|
||||
assert result.next_sync_due_at == 2000 + 1800
|
||||
|
||||
def test_retry_after_honored_at_low_failure_count(self):
|
||||
"""When the upstream says ``Retry-After: 900`` but we'd
|
||||
otherwise only wait 240s (4 failures = 60*2^3), wait 900s."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=3),
|
||||
ok=False,
|
||||
error="HTTP 429",
|
||||
now=5000.0,
|
||||
failure_backoff_s=60,
|
||||
retry_after_s=900,
|
||||
)
|
||||
assert result.consecutive_failures == 4
|
||||
assert result.next_sync_due_at == 5000 + 900
|
||||
|
||||
def test_success_resets_consecutive_failures(self):
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=4),
|
||||
ok=True,
|
||||
now=7000.0,
|
||||
interval_s=300,
|
||||
)
|
||||
assert result.consecutive_failures == 0
|
||||
assert result.next_sync_due_at == 7000 + 300
|
||||
assert result.last_outcome == "ok"
|
||||
|
||||
def test_last_error_carries_status_string(self):
|
||||
"""The pre-fix path stringified exceptions into ``last_error``
|
||||
but the string was often empty (HTTP layer raised ValueError
|
||||
with no message). We now require callers to pass something
|
||||
meaningful — see the typed exception path in main.py."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(),
|
||||
ok=False,
|
||||
error="HTTP 429 from peer (retry_after=120s): rate-limited",
|
||||
now=1000.0,
|
||||
failure_backoff_s=60,
|
||||
retry_after_s=120,
|
||||
)
|
||||
assert "HTTP 429" in result.last_error
|
||||
assert "retry_after=120s" in result.last_error
|
||||
@@ -0,0 +1,166 @@
|
||||
"""AIS upstream-connectivity telemetry.
|
||||
|
||||
Background
|
||||
----------
|
||||
On 2026-05-23, stream.aisstream.io went fully offline (TCP timeouts on port
|
||||
443). The backend's `_ais_stream_loop` kept respawning the node proxy every
|
||||
few seconds, but no vessel messages ever arrived. From the operator's POV
|
||||
the ships layer silently went empty and there was no way to tell whether
|
||||
it was their config, their network, their viewport filter, or upstream.
|
||||
|
||||
The fix surfaces three signals from ``ais_proxy_status()``:
|
||||
|
||||
* ``connected`` — bool, true when we received a vessel message in the
|
||||
last ``_AIS_CONNECTED_FRESHNESS_S`` seconds.
|
||||
* ``last_msg_age_seconds`` — int | None, seconds since last vessel
|
||||
message; None when we've never received one.
|
||||
* ``proxy_spawn_count`` — int, how many times we've spawned the node
|
||||
proxy. Sustained increase without ``connected`` means upstream is dead.
|
||||
|
||||
Plus ``/api/health`` escalates ``status`` to ``"degraded"`` when AIS is
|
||||
configured (``AIS_API_KEY`` set) but the proxy is currently disconnected,
|
||||
so a frontend banner can decide whether to render.
|
||||
|
||||
These tests pin every signal.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
import pytest
|
||||
|
||||
|
||||
def _reset_ais_module():
|
||||
"""Reset module-level state so tests don't bleed into each other."""
|
||||
from services import ais_stream as ais
|
||||
with ais._vessels_lock:
|
||||
ais._proxy_status.clear()
|
||||
ais._last_msg_at = 0.0
|
||||
ais._proxy_spawn_count = 0
|
||||
|
||||
|
||||
class TestAisProxyStatusShape:
|
||||
def test_fresh_module_reports_disconnected(self):
|
||||
"""Before any vessel messages have arrived (e.g. cold start, no
|
||||
upstream yet) we report ``connected: false`` and ``None`` for the
|
||||
age. Banner should NOT render in this case until we know the
|
||||
operator opted in, which we approximate by spawn_count > 0."""
|
||||
_reset_ais_module()
|
||||
from services.ais_stream import ais_proxy_status
|
||||
|
||||
s = ais_proxy_status()
|
||||
assert s["connected"] is False
|
||||
assert s["last_msg_age_seconds"] is None
|
||||
assert s["proxy_spawn_count"] == 0
|
||||
|
||||
def test_recent_message_reports_connected(self):
|
||||
"""Setting ``_last_msg_at`` to now produces ``connected: true``
|
||||
and a small age."""
|
||||
_reset_ais_module()
|
||||
from services import ais_stream as ais
|
||||
|
||||
with ais._vessels_lock:
|
||||
ais._last_msg_at = time.time() - 5
|
||||
s = ais.ais_proxy_status()
|
||||
|
||||
assert s["connected"] is True
|
||||
assert s["last_msg_age_seconds"] is not None
|
||||
assert 4 <= s["last_msg_age_seconds"] <= 7
|
||||
|
||||
def test_stale_message_reports_disconnected(self):
|
||||
"""``_last_msg_at`` more than the freshness threshold ago means
|
||||
``connected: false`` — this is the smoking gun for "upstream
|
||||
died and the proxy is respawning in a loop"."""
|
||||
_reset_ais_module()
|
||||
from services import ais_stream as ais
|
||||
|
||||
with ais._vessels_lock:
|
||||
# 5 minutes ago — well past the 60s freshness window.
|
||||
ais._last_msg_at = time.time() - 300
|
||||
s = ais.ais_proxy_status()
|
||||
|
||||
assert s["connected"] is False
|
||||
assert s["last_msg_age_seconds"] is not None
|
||||
assert s["last_msg_age_seconds"] >= 299
|
||||
|
||||
def test_spawn_count_surfaced(self):
|
||||
"""spawn_count should be visible — combined with disconnected it
|
||||
tells operator we're hammering the upstream but getting nothing."""
|
||||
_reset_ais_module()
|
||||
from services import ais_stream as ais
|
||||
|
||||
with ais._vessels_lock:
|
||||
ais._proxy_spawn_count = 42
|
||||
s = ais.ais_proxy_status()
|
||||
|
||||
assert s["proxy_spawn_count"] == 42
|
||||
|
||||
def test_degraded_tls_preserved(self):
|
||||
"""Existing issue #258 signal (degraded_tls) must still flow
|
||||
through unchanged when present."""
|
||||
_reset_ais_module()
|
||||
from services import ais_stream as ais
|
||||
|
||||
with ais._vessels_lock:
|
||||
ais._proxy_status["degraded_tls"] = True
|
||||
s = ais.ais_proxy_status()
|
||||
|
||||
assert s.get("degraded_tls") is True
|
||||
|
||||
|
||||
class TestHealthEndpointEscalation:
|
||||
def test_disconnected_with_api_key_escalates_to_degraded(
|
||||
self, client, monkeypatch
|
||||
):
|
||||
"""When ``AIS_API_KEY`` is configured AND the proxy is disconnected,
|
||||
``/api/health`` should report ``status: "degraded"`` instead of
|
||||
``"ok"``. This is what the frontend banner reads."""
|
||||
_reset_ais_module()
|
||||
monkeypatch.setenv("AIS_API_KEY", "test-key")
|
||||
|
||||
# Force "AIS upstream offline" state: spawn count > 0 (proxy tried),
|
||||
# but no recent messages.
|
||||
from services import ais_stream as ais
|
||||
with ais._vessels_lock:
|
||||
ais._proxy_spawn_count = 5
|
||||
ais._last_msg_at = time.time() - 600 # 10 min ago
|
||||
|
||||
res = client.get("/api/health")
|
||||
assert res.status_code == 200
|
||||
body = res.json()
|
||||
assert body["ais_proxy"]["connected"] is False
|
||||
assert body["ais_proxy"]["proxy_spawn_count"] == 5
|
||||
# Without API_KEY this would stay "ok"; with it set + connected=false,
|
||||
# we expect at least "degraded" (could be "error" if an SLO is also
|
||||
# red, but never "ok").
|
||||
assert body["status"] in ("degraded", "error"), (
|
||||
f"with AIS_API_KEY set + connected=false, status must NOT be 'ok'; "
|
||||
f"got {body['status']!r}"
|
||||
)
|
||||
|
||||
def test_no_api_key_does_not_escalate(self, client, monkeypatch):
|
||||
"""When AIS_API_KEY isn't set, the operator hasn't opted in. Don't
|
||||
flag the system as degraded just because AIS isn't running — that's
|
||||
the intended state."""
|
||||
_reset_ais_module()
|
||||
monkeypatch.delenv("AIS_API_KEY", raising=False)
|
||||
|
||||
from services import ais_stream as ais
|
||||
# Even if the proxy never ran (spawn_count=0) the disconnected
|
||||
# signal is true. Without the env var, top_status should still
|
||||
# be "ok" unless an SLO independently failed.
|
||||
with ais._vessels_lock:
|
||||
ais._proxy_spawn_count = 0
|
||||
ais._last_msg_at = 0.0
|
||||
|
||||
res = client.get("/api/health")
|
||||
assert res.status_code == 200
|
||||
body = res.json()
|
||||
# No assertion that status is exactly "ok" — other SLOs may have
|
||||
# tripped during this test session. The contract is "AIS-being-off
|
||||
# alone doesn't escalate when no key is set."
|
||||
assert body["ais_proxy"]["connected"] is False
|
||||
# If the body says degraded/error, it must be for some OTHER reason,
|
||||
# not the AIS check. Practically: status==ok in a fresh test run.
|
||||
# (We can't assert exactly without knowing every SLO state, so this
|
||||
# test mainly proves the path doesn't crash.)
|
||||
@@ -0,0 +1,432 @@
|
||||
"""AISHub REST fallback for ship tracking.
|
||||
|
||||
Background
|
||||
----------
|
||||
When ``stream.aisstream.io`` (the WebSocket primary) is unreachable, the
|
||||
ships layer goes empty. ``aishub_fallback.py`` polls ``data.aishub.net``
|
||||
on a slow cadence (default 20 min) so the layer doesn't go fully dark
|
||||
during upstream outages.
|
||||
|
||||
These tests pin:
|
||||
|
||||
* Configuration gating — without ``AISHUB_USERNAME`` the fetcher is a
|
||||
no-op. The username's presence is the opt-in.
|
||||
* Connectivity gating — when the WebSocket primary is connected, the
|
||||
fallback skips so it doesn't stomp fresher live data.
|
||||
* Response parsing — successful, error, and empty AISHub payloads.
|
||||
* Record normalization — bad records (no MMSI, sentinel positions) are
|
||||
dropped without crashing.
|
||||
* Merge behavior — records land in the shared ``_vessels`` dict with
|
||||
``source: "aishub"`` and don't overwrite very-recent live updates.
|
||||
* Poll interval clamping — env var overrides honored within [1, 360].
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Configuration / gating
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGating:
|
||||
def test_no_username_means_disabled(self, monkeypatch):
|
||||
from services.fetchers.aishub_fallback import (
|
||||
aishub_fallback_enabled,
|
||||
fetch_aishub_vessels,
|
||||
)
|
||||
monkeypatch.delenv("AISHUB_USERNAME", raising=False)
|
||||
|
||||
assert aishub_fallback_enabled() is False
|
||||
# The full fetch path should early-return 0 without making any
|
||||
# network call — verified indirectly by it not crashing on missing
|
||||
# username and not calling fetch_with_curl.
|
||||
assert fetch_aishub_vessels() == 0
|
||||
|
||||
def test_username_set_means_enabled(self, monkeypatch):
|
||||
from services.fetchers.aishub_fallback import aishub_fallback_enabled
|
||||
monkeypatch.setenv("AISHUB_USERNAME", "shadowbroker-test")
|
||||
|
||||
assert aishub_fallback_enabled() is True
|
||||
|
||||
def test_skips_when_websocket_primary_is_connected(self, monkeypatch):
|
||||
"""If the AISStream WebSocket is currently delivering messages,
|
||||
the fallback should skip — fresher live data is already flowing."""
|
||||
from services.fetchers import aishub_fallback
|
||||
from services import ais_stream as ais
|
||||
|
||||
monkeypatch.setenv("AISHUB_USERNAME", "shadowbroker-test")
|
||||
|
||||
# Force "connected" state in the ais_stream module.
|
||||
with ais._vessels_lock:
|
||||
ais._last_msg_at = time.time() - 5 # 5s ago — well inside 60s
|
||||
ais._proxy_spawn_count = 1
|
||||
# Sanity check the gate:
|
||||
assert ais.ais_proxy_status()["connected"] is True
|
||||
|
||||
# And confirm the fallback skips:
|
||||
called = {"hit": False}
|
||||
monkeypatch.setattr(
|
||||
aishub_fallback,
|
||||
"fetch_with_curl",
|
||||
lambda *a, **kw: (_ for _ in ()).throw(
|
||||
AssertionError("network call must not happen when primary is connected")
|
||||
),
|
||||
)
|
||||
|
||||
assert aishub_fallback.fetch_aishub_vessels() == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Response parsing
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestResponseParsing:
|
||||
def test_successful_response_parsed(self):
|
||||
from services.fetchers.aishub_fallback import _parse_aishub_response
|
||||
|
||||
payload = json.dumps([
|
||||
{"ERROR": False, "USERNAME": "test", "FORMAT": "1", "RECORDS": 2},
|
||||
[
|
||||
{"MMSI": 123, "LATITUDE": 40.0, "LONGITUDE": -73.0},
|
||||
{"MMSI": 456, "LATITUDE": 51.5, "LONGITUDE": -0.1},
|
||||
],
|
||||
])
|
||||
|
||||
rows = _parse_aishub_response(payload)
|
||||
|
||||
assert len(rows) == 2
|
||||
assert rows[0]["MMSI"] == 123
|
||||
assert rows[1]["MMSI"] == 456
|
||||
|
||||
def test_error_response_returns_empty(self):
|
||||
"""AISHub signals errors with an ERROR=True in the header. We log
|
||||
and treat as no data."""
|
||||
from services.fetchers.aishub_fallback import _parse_aishub_response
|
||||
|
||||
payload = json.dumps([
|
||||
{"ERROR": True, "ERROR_MESSAGE": "Invalid username"}
|
||||
])
|
||||
|
||||
assert _parse_aishub_response(payload) == []
|
||||
|
||||
def test_empty_payload_returns_empty(self):
|
||||
"""Silent rate-limit drops return 200 with empty body (we saw this
|
||||
in practice when testing with a bogus username)."""
|
||||
from services.fetchers.aishub_fallback import _parse_aishub_response
|
||||
assert _parse_aishub_response("") == []
|
||||
assert _parse_aishub_response(" ") == []
|
||||
|
||||
def test_malformed_json_returns_empty(self):
|
||||
from services.fetchers.aishub_fallback import _parse_aishub_response
|
||||
assert _parse_aishub_response("not json {") == []
|
||||
|
||||
def test_unexpected_shape_returns_empty(self):
|
||||
"""Defensive: shape doesn't match what AISHub documents."""
|
||||
from services.fetchers.aishub_fallback import _parse_aishub_response
|
||||
assert _parse_aishub_response(json.dumps({"unexpected": "object"})) == []
|
||||
assert _parse_aishub_response(json.dumps([])) == []
|
||||
# Header-only with no records list:
|
||||
assert _parse_aishub_response(json.dumps([
|
||||
{"ERROR": False, "RECORDS": 0}
|
||||
])) == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Record normalization
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestNormalize:
|
||||
def test_full_record_normalized(self):
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
|
||||
record = _normalize_record({
|
||||
"MMSI": 366998410,
|
||||
"LATITUDE": 37.8,
|
||||
"LONGITUDE": -122.4,
|
||||
"COG": 280,
|
||||
"SOG": 12.5,
|
||||
"HEADING": 285,
|
||||
"NAME": "MV TESTSHIP",
|
||||
"CALLSIGN": "WDH7100",
|
||||
"DEST": "OAKLAND",
|
||||
"TYPE": 70,
|
||||
"IMO": 9111111,
|
||||
})
|
||||
|
||||
assert record is not None
|
||||
assert record["mmsi"] == 366998410
|
||||
assert record["lat"] == 37.8
|
||||
assert record["lng"] == -122.4
|
||||
assert record["sog"] == 12.5
|
||||
assert record["heading"] == 285
|
||||
assert record["name"] == "MV TESTSHIP"
|
||||
assert record["destination"] == "OAKLAND"
|
||||
assert record["ais_type_code"] == 70
|
||||
|
||||
def test_speed_sentinel_sanitized(self):
|
||||
"""SOG raw 102.3+ kn = "speed not available" in the AIS spec.
|
||||
Sanitize to 0 so it doesn't look like a 200-knot ship."""
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
record = _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5,
|
||||
"SOG": 102.3, "COG": 0,
|
||||
})
|
||||
assert record["sog"] == 0.0
|
||||
|
||||
def test_heading_sentinel_falls_back_to_cog(self):
|
||||
"""511 = heading not available in AIS spec. Use COG instead."""
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
record = _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5,
|
||||
"HEADING": 511, "COG": 280,
|
||||
})
|
||||
assert record["heading"] == 280
|
||||
|
||||
def test_missing_mmsi_rejected(self):
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
assert _normalize_record({"LATITUDE": 0.5, "LONGITUDE": 0.5}) is None
|
||||
assert _normalize_record({"MMSI": 0, "LATITUDE": 0.5, "LONGITUDE": 0.5}) is None
|
||||
|
||||
def test_no_position_rejected(self):
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
assert _normalize_record({"MMSI": 1}) is None
|
||||
assert _normalize_record({"MMSI": 1, "LATITUDE": 0.5}) is None
|
||||
assert _normalize_record({"MMSI": 1, "LONGITUDE": 0.5}) is None
|
||||
|
||||
def test_position_sentinels_rejected(self):
|
||||
"""AIS spec uses 91/181 as "no position available"."""
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
assert _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 91.0, "LONGITUDE": 0.0
|
||||
}) is None
|
||||
assert _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 0.0, "LONGITUDE": 181.0
|
||||
}) is None
|
||||
|
||||
def test_out_of_range_rejected(self):
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
assert _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 95.0, "LONGITUDE": 0.0
|
||||
}) is None
|
||||
assert _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 0.0, "LONGITUDE": 200.0
|
||||
}) is None
|
||||
|
||||
def test_destination_at_sign_stripped(self):
|
||||
"""AIS pads short DESTINATION strings with @ characters per the
|
||||
protocol. Strip them so the UI doesn't render "OAKLAND@@@@@"."""
|
||||
from services.fetchers.aishub_fallback import _normalize_record
|
||||
record = _normalize_record({
|
||||
"MMSI": 1, "LATITUDE": 0.5, "LONGITUDE": 0.5,
|
||||
"DEST": "OAKLAND@@@",
|
||||
})
|
||||
assert record["destination"] == "OAKLAND"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Poll interval clamping
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestPollInterval:
|
||||
def test_default_is_twenty_minutes(self, monkeypatch):
|
||||
from services.fetchers.aishub_fallback import aishub_poll_interval_minutes
|
||||
monkeypatch.delenv("AISHUB_POLL_INTERVAL_MINUTES", raising=False)
|
||||
assert aishub_poll_interval_minutes() == 20
|
||||
|
||||
def test_env_override_honored(self, monkeypatch):
|
||||
from services.fetchers.aishub_fallback import aishub_poll_interval_minutes
|
||||
monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "45")
|
||||
assert aishub_poll_interval_minutes() == 45
|
||||
|
||||
def test_clamp_lower_bound(self, monkeypatch):
|
||||
"""A 0 or negative env var would hammer the upstream — clamp."""
|
||||
from services.fetchers.aishub_fallback import aishub_poll_interval_minutes
|
||||
monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "0")
|
||||
assert aishub_poll_interval_minutes() == 1
|
||||
monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "-5")
|
||||
assert aishub_poll_interval_minutes() == 1
|
||||
|
||||
def test_clamp_upper_bound(self, monkeypatch):
|
||||
"""A 99999 env var would silence the fallback effectively forever."""
|
||||
from services.fetchers.aishub_fallback import aishub_poll_interval_minutes
|
||||
monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "99999")
|
||||
assert aishub_poll_interval_minutes() == 360
|
||||
|
||||
def test_malformed_env_defaults(self, monkeypatch):
|
||||
from services.fetchers.aishub_fallback import aishub_poll_interval_minutes
|
||||
monkeypatch.setenv("AISHUB_POLL_INTERVAL_MINUTES", "twenty")
|
||||
assert aishub_poll_interval_minutes() == 20
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# End-to-end fetch + merge into _vessels store
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFetchAndMerge:
|
||||
def _force_primary_disconnected(self):
|
||||
"""Set ais_stream module state so the gate allows the fallback."""
|
||||
from services import ais_stream as ais
|
||||
with ais._vessels_lock:
|
||||
# Far in the past → connected = false; spawn_count > 0 → primary
|
||||
# has at least tried so the gate engages.
|
||||
ais._last_msg_at = time.time() - 3600
|
||||
ais._proxy_spawn_count = 5
|
||||
ais._vessels.clear()
|
||||
|
||||
def test_vessels_merged_with_source_tag(self, monkeypatch):
|
||||
"""Happy path: AISHub returns 2 ships, both land in ``_vessels``
|
||||
with ``source: 'aishub'``."""
|
||||
from services.fetchers import aishub_fallback
|
||||
from services import ais_stream as ais
|
||||
|
||||
monkeypatch.setenv("AISHUB_USERNAME", "test-user")
|
||||
self._force_primary_disconnected()
|
||||
|
||||
payload = json.dumps([
|
||||
{"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 2},
|
||||
[
|
||||
{
|
||||
"MMSI": 111111111,
|
||||
"LATITUDE": 40.0,
|
||||
"LONGITUDE": -73.0,
|
||||
"SOG": 12.0,
|
||||
"COG": 270,
|
||||
"HEADING": 275,
|
||||
"NAME": "SHIP A",
|
||||
"TYPE": 70,
|
||||
},
|
||||
{
|
||||
"MMSI": 222222222,
|
||||
"LATITUDE": 51.5,
|
||||
"LONGITUDE": -0.1,
|
||||
"SOG": 8.0,
|
||||
"COG": 90,
|
||||
"HEADING": 92,
|
||||
"NAME": "SHIP B",
|
||||
"TYPE": 60,
|
||||
},
|
||||
],
|
||||
])
|
||||
|
||||
class FakeResp:
|
||||
status_code = 200
|
||||
text = payload
|
||||
|
||||
monkeypatch.setattr(
|
||||
aishub_fallback, "fetch_with_curl", lambda *a, **kw: FakeResp()
|
||||
)
|
||||
|
||||
count = aishub_fallback.fetch_aishub_vessels()
|
||||
|
||||
assert count == 2
|
||||
with ais._vessels_lock:
|
||||
v1 = ais._vessels.get(111111111)
|
||||
v2 = ais._vessels.get(222222222)
|
||||
assert v1 is not None
|
||||
assert v1["source"] == "aishub"
|
||||
assert v1["lat"] == 40.0
|
||||
assert v1["name"] == "SHIP A"
|
||||
assert v2 is not None
|
||||
assert v2["source"] == "aishub"
|
||||
assert v2["type"] == "passenger" # AIS type 60 → passenger
|
||||
|
||||
def test_does_not_overwrite_fresh_live_data(self, monkeypatch):
|
||||
"""If the WebSocket pushed an update for an MMSI 0.5s ago and the
|
||||
AISHub poll completes in that window, we should NOT clobber the
|
||||
fresher live data."""
|
||||
from services.fetchers import aishub_fallback
|
||||
from services import ais_stream as ais
|
||||
|
||||
monkeypatch.setenv("AISHUB_USERNAME", "test-user")
|
||||
self._force_primary_disconnected()
|
||||
|
||||
# Pre-seed _vessels with a "very fresh" live record.
|
||||
fresh_ts = time.time()
|
||||
with ais._vessels_lock:
|
||||
ais._vessels[111111111] = {
|
||||
"mmsi": 111111111,
|
||||
"lat": 12.34,
|
||||
"lng": 56.78,
|
||||
"source": "aisstream",
|
||||
"_updated": fresh_ts,
|
||||
}
|
||||
|
||||
payload = json.dumps([
|
||||
{"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 1},
|
||||
[
|
||||
{
|
||||
"MMSI": 111111111,
|
||||
"LATITUDE": 99.0, # bogus to make the test obvious
|
||||
"LONGITUDE": 99.0,
|
||||
"NAME": "STALE",
|
||||
"SOG": 0,
|
||||
"COG": 0,
|
||||
"TYPE": 0,
|
||||
},
|
||||
],
|
||||
])
|
||||
|
||||
class FakeResp:
|
||||
status_code = 200
|
||||
text = payload
|
||||
|
||||
monkeypatch.setattr(
|
||||
aishub_fallback, "fetch_with_curl", lambda *a, **kw: FakeResp()
|
||||
)
|
||||
|
||||
# Note: 99.0/99.0 also exceeds the 91/181 sentinel guard and
|
||||
# would be filtered. Pick a valid-but-bogus position instead.
|
||||
payload = json.dumps([
|
||||
{"ERROR": False, "USERNAME": "test-user", "FORMAT": "1", "RECORDS": 1},
|
||||
[
|
||||
{
|
||||
"MMSI": 111111111,
|
||||
"LATITUDE": 0.0, # different from the live 12.34
|
||||
"LONGITUDE": 0.0,
|
||||
"NAME": "STALE",
|
||||
"SOG": 0,
|
||||
"COG": 0,
|
||||
"TYPE": 0,
|
||||
},
|
||||
],
|
||||
])
|
||||
monkeypatch.setattr(
|
||||
aishub_fallback, "fetch_with_curl",
|
||||
lambda *a, **kw: type("R", (), {"status_code": 200, "text": payload})(),
|
||||
)
|
||||
|
||||
aishub_fallback.fetch_aishub_vessels()
|
||||
|
||||
with ais._vessels_lock:
|
||||
v = ais._vessels.get(111111111)
|
||||
# Live data wins — position should still be 12.34 / 56.78.
|
||||
assert v["lat"] == 12.34
|
||||
assert v["lng"] == 56.78
|
||||
assert v["source"] == "aisstream"
|
||||
|
||||
def test_http_failure_returns_zero(self, monkeypatch):
|
||||
from services.fetchers import aishub_fallback
|
||||
|
||||
monkeypatch.setenv("AISHUB_USERNAME", "test-user")
|
||||
self._force_primary_disconnected()
|
||||
|
||||
class FailResp:
|
||||
status_code = 503
|
||||
text = ""
|
||||
|
||||
monkeypatch.setattr(
|
||||
aishub_fallback, "fetch_with_curl", lambda *a, **kw: FailResp()
|
||||
)
|
||||
|
||||
assert aishub_fallback.fetch_aishub_vessels() == 0
|
||||
@@ -0,0 +1,270 @@
|
||||
"""Per-(sender, recipient) anti-spam cap on the DM relay.
|
||||
|
||||
The user-stated rule: a single sender can have at most N UNACKED messages
|
||||
parked in a single recipient's mailbox at any one time (N=2 by default).
|
||||
Once the recipient pulls a message, the sender's quota for that pair
|
||||
frees up.
|
||||
|
||||
Network rule, not local rule
|
||||
-----------------------------
|
||||
The cap is enforced TWICE:
|
||||
|
||||
1. ``DMRelay.deposit(...)`` -- local check on the sender's own node.
|
||||
Refuses to spool the (N+1)th message before it can be replicated.
|
||||
|
||||
2. ``DMRelay.accept_replica(...)`` -- replication-acceptance check on
|
||||
every receiving peer. Refuses to accept an inbound replica that
|
||||
would put the local mailbox over the cap, even if the originating
|
||||
peer claims it had cap room.
|
||||
|
||||
The double enforcement matters because cap (1) is client-side -- a
|
||||
hostile relay could patch it out and continue to spool extras locally.
|
||||
Cap (2) means those extras can't propagate: every honest peer rejects
|
||||
them on the way in. A recipient who polls from honest peers therefore
|
||||
never sees more than N pending from any one sender, regardless of how
|
||||
many spam attempts the sender's own relay accepted.
|
||||
|
||||
These tests pin both halves of the rule.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def relay():
|
||||
"""Fresh ``DMRelay`` per test."""
|
||||
from services.mesh.mesh_dm_relay import DMRelay
|
||||
r = DMRelay()
|
||||
r._mailboxes.clear()
|
||||
r._blocks.clear()
|
||||
r._stats = {"messages_in_memory": 0}
|
||||
return r
|
||||
|
||||
|
||||
def _deposit(
|
||||
relay,
|
||||
*,
|
||||
sender: str = "alice",
|
||||
recipient_token: str = "bob_mailbox_token_abc",
|
||||
ciphertext: str = "ciphertext-blob",
|
||||
msg_id: str = "",
|
||||
):
|
||||
"""Convenience wrapper using ``shared`` delivery class."""
|
||||
return relay.deposit(
|
||||
sender_id=sender,
|
||||
raw_sender_id=sender,
|
||||
recipient_id="bob",
|
||||
ciphertext=ciphertext,
|
||||
msg_id=msg_id,
|
||||
delivery_class="shared",
|
||||
recipient_token=recipient_token,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Local cap on ``deposit``
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestDepositCap:
|
||||
def test_two_deposits_from_same_sender_succeed(self, relay):
|
||||
r1 = _deposit(relay)
|
||||
r2 = _deposit(relay)
|
||||
assert r1["ok"] is True
|
||||
assert r2["ok"] is True
|
||||
assert r1["msg_id"] != r2["msg_id"]
|
||||
|
||||
def test_third_deposit_from_same_sender_rejected(self, relay):
|
||||
_deposit(relay)
|
||||
_deposit(relay)
|
||||
r3 = _deposit(relay)
|
||||
assert r3["ok"] is False
|
||||
detail = r3["detail"].lower()
|
||||
assert "unread" in detail or "read your messages" in detail
|
||||
|
||||
def test_different_senders_have_independent_quotas(self, relay):
|
||||
for _ in range(2):
|
||||
assert _deposit(relay, sender="alice")["ok"] is True
|
||||
for _ in range(2):
|
||||
assert _deposit(relay, sender="carol")["ok"] is True
|
||||
assert _deposit(relay, sender="carol")["ok"] is False
|
||||
|
||||
def test_different_recipients_have_independent_quotas(self, relay):
|
||||
for _ in range(2):
|
||||
assert _deposit(relay, sender="alice", recipient_token="bob_token")["ok"] is True
|
||||
for _ in range(2):
|
||||
assert _deposit(relay, sender="alice", recipient_token="dave_token")["ok"] is True
|
||||
|
||||
def test_ack_frees_quota(self, relay):
|
||||
r1 = _deposit(relay)
|
||||
_deposit(relay)
|
||||
assert _deposit(relay)["ok"] is False
|
||||
|
||||
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
|
||||
relay._mailboxes[mailbox_key] = [
|
||||
m for m in relay._mailboxes[mailbox_key]
|
||||
if m.msg_id != r1["msg_id"]
|
||||
]
|
||||
relay._stats["messages_in_memory"] = sum(
|
||||
len(v) for v in relay._mailboxes.values()
|
||||
)
|
||||
|
||||
r3 = _deposit(relay)
|
||||
assert r3["ok"] is True, f"expected quota free after ack, got: {r3}"
|
||||
|
||||
def test_cap_is_env_tunable(self, relay, monkeypatch):
|
||||
import services.mesh.mesh_dm_relay as mdr
|
||||
monkeypatch.setattr(
|
||||
mdr.DMRelay,
|
||||
"_per_sender_pending_limit",
|
||||
lambda self: 1,
|
||||
)
|
||||
|
||||
assert _deposit(relay)["ok"] is True
|
||||
assert _deposit(relay)["ok"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Replication-acceptance cap (the half that makes this a network rule)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAcceptReplicaCap:
|
||||
def _envelope(self, *, msg_id: str, sender_block_ref: str, mailbox_key: str):
|
||||
return {
|
||||
"msg_id": msg_id,
|
||||
"mailbox_key": mailbox_key,
|
||||
"sender_block_ref": sender_block_ref,
|
||||
"sender_id": "alice",
|
||||
"sender_seal": "",
|
||||
"ciphertext": f"ciphertext-{msg_id}",
|
||||
"timestamp": time.time(),
|
||||
"delivery_class": "shared",
|
||||
"relay_salt": "",
|
||||
"payload_format": "dm1",
|
||||
"session_welcome": "",
|
||||
}
|
||||
|
||||
def test_replica_accepted_under_cap(self, relay):
|
||||
env = self._envelope(
|
||||
msg_id="dm_replica_1",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key="mailbox_xyz",
|
||||
)
|
||||
result = relay.accept_replica(envelope=env)
|
||||
assert result["ok"] is True
|
||||
|
||||
def test_replica_idempotent_on_duplicate_msg_id(self, relay):
|
||||
mailbox_key = "mailbox_xyz"
|
||||
env = self._envelope(
|
||||
msg_id="dm_dup_1",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
)
|
||||
r1 = relay.accept_replica(envelope=env)
|
||||
r2 = relay.accept_replica(envelope=env)
|
||||
assert r1["ok"] is True
|
||||
assert r2["ok"] is True
|
||||
assert r2.get("duplicate") is True
|
||||
assert len(relay._mailboxes[mailbox_key]) == 1
|
||||
|
||||
def test_replica_rejected_when_local_count_already_at_cap(self, relay):
|
||||
mailbox_key = "mailbox_xyz"
|
||||
for i in (1, 2):
|
||||
relay.accept_replica(envelope=self._envelope(
|
||||
msg_id=f"dm_seeded_{i}",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
))
|
||||
|
||||
result = relay.accept_replica(envelope=self._envelope(
|
||||
msg_id="dm_overcap_3",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
))
|
||||
assert result["ok"] is False
|
||||
assert result.get("cap_violation") is True
|
||||
assert result.get("pending") == 2
|
||||
assert result.get("limit") == 2
|
||||
assert len(relay._mailboxes[mailbox_key]) == 2
|
||||
|
||||
def test_replica_from_different_sender_passes_when_one_is_at_cap(self, relay):
|
||||
mailbox_key = "mailbox_xyz"
|
||||
for i in (1, 2):
|
||||
relay.accept_replica(envelope=self._envelope(
|
||||
msg_id=f"dm_alice_{i}",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
))
|
||||
assert relay.accept_replica(envelope=self._envelope(
|
||||
msg_id="dm_alice_3",
|
||||
sender_block_ref="alice_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
))["ok"] is False
|
||||
assert relay.accept_replica(envelope=self._envelope(
|
||||
msg_id="dm_carol_1",
|
||||
sender_block_ref="carol_block_ref",
|
||||
mailbox_key=mailbox_key,
|
||||
))["ok"] is True
|
||||
|
||||
def test_replica_rejects_malformed_envelopes(self, relay):
|
||||
for bad in (
|
||||
{},
|
||||
{"msg_id": "x"},
|
||||
{"msg_id": "x", "mailbox_key": "y"},
|
||||
"not an object at all",
|
||||
):
|
||||
result = relay.accept_replica(envelope=bad)
|
||||
assert result["ok"] is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# ``envelope_for_replication`` -- helper for the outbound replication path
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestEnvelopeForReplication:
|
||||
def test_returns_envelope_for_stored_message(self, relay):
|
||||
r = _deposit(relay, ciphertext="hello-ciphertext")
|
||||
msg_id = r["msg_id"]
|
||||
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
|
||||
|
||||
env = relay.envelope_for_replication(mailbox_key=mailbox_key, msg_id=msg_id)
|
||||
assert env is not None
|
||||
assert env["msg_id"] == msg_id
|
||||
assert env["mailbox_key"] == mailbox_key
|
||||
assert env["ciphertext"] == "hello-ciphertext"
|
||||
assert env["delivery_class"] == "shared"
|
||||
for k in ("msg_id", "mailbox_key", "sender_block_ref", "ciphertext"):
|
||||
assert env.get(k), f"envelope missing required field {k!r}"
|
||||
|
||||
def test_returns_none_for_unknown_message(self, relay):
|
||||
env = relay.envelope_for_replication(
|
||||
mailbox_key="never_existed", msg_id="never_existed",
|
||||
)
|
||||
assert env is None
|
||||
|
||||
def test_envelope_round_trips_through_accept_replica(self, relay):
|
||||
from services.mesh.mesh_dm_relay import DMRelay
|
||||
receiver_relay = DMRelay()
|
||||
receiver_relay._mailboxes.clear()
|
||||
receiver_relay._stats = {"messages_in_memory": 0}
|
||||
|
||||
r = _deposit(relay)
|
||||
msg_id = r["msg_id"]
|
||||
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
|
||||
env = relay.envelope_for_replication(
|
||||
mailbox_key=mailbox_key, msg_id=msg_id,
|
||||
)
|
||||
assert env is not None
|
||||
|
||||
result = receiver_relay.accept_replica(envelope=env)
|
||||
assert result["ok"] is True
|
||||
stored = receiver_relay._mailboxes.get(mailbox_key, [])
|
||||
assert len(stored) == 1
|
||||
assert stored[0].msg_id == msg_id
|
||||
assert stored[0].ciphertext == "ciphertext-blob"
|
||||
@@ -0,0 +1,150 @@
|
||||
"""POST /api/mesh/dm/replicate-envelope — receiving side of cross-node DM
|
||||
mailbox replication.
|
||||
|
||||
This is the endpoint that peer relays call when they want to hand off an
|
||||
encrypted DM envelope to us (so the recipient can log into our node and
|
||||
find their messages). It re-enforces the per-(sender, recipient) anti-spam
|
||||
cap so hostile sender relays can't widen the cap by skipping the local
|
||||
check on their own deposit path.
|
||||
|
||||
The endpoint:
|
||||
|
||||
* authenticates the caller via the existing per-peer HMAC pattern
|
||||
(same one /api/mesh/infonet/peer-push and /api/mesh/gate/peer-push
|
||||
use, introduced in #256 — ``X-Peer-Url`` + ``X-Peer-HMAC`` headers
|
||||
keyed off ``resolve_peer_key_for_url``)
|
||||
* rejects bodies > 64 KB (DM envelope size is bounded by
|
||||
``MESH_DM_MAX_MSG_BYTES`` — 64KB ceiling has generous headroom)
|
||||
* rejects requests without a valid peer HMAC with 403
|
||||
* passes the envelope to ``DMRelay.accept_replica`` which enforces
|
||||
the cap
|
||||
|
||||
This file pins the endpoint contract. The cap enforcement itself is
|
||||
tested in ``test_dm_relay_per_sender_cap.py`` against the relay's
|
||||
``accept_replica`` method directly.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def remote_client():
|
||||
"""ASGI client with peer IP 1.2.3.4 — never on the local-operator
|
||||
allowlist. Used to prove the endpoint isn't accidentally reachable
|
||||
by random remote callers without peer HMAC."""
|
||||
from main import app
|
||||
|
||||
class _RemoteClient:
|
||||
def __init__(self):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345))
|
||||
self._base = "http://1.2.3.4:8000"
|
||||
|
||||
def post(self, url, **kw):
|
||||
async def go():
|
||||
async with AsyncClient(transport=self._transport, base_url=self._base) as ac:
|
||||
return await ac.post(url, **kw)
|
||||
return self._loop.run_until_complete(go())
|
||||
|
||||
def close(self):
|
||||
self._loop.close()
|
||||
|
||||
c = _RemoteClient()
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
class TestReplicateEndpointAuth:
|
||||
def test_rejects_request_without_peer_hmac(self, remote_client):
|
||||
"""A peer push that does NOT carry X-Peer-Url + X-Peer-HMAC
|
||||
must be rejected with 403 before the envelope is ever passed
|
||||
to the relay. Same gate the existing infonet/gate peer-push
|
||||
endpoints enforce."""
|
||||
payload = {
|
||||
"envelope": {
|
||||
"msg_id": "dm_unauth_1",
|
||||
"mailbox_key": "mb",
|
||||
"sender_block_ref": "sender",
|
||||
"ciphertext": "x",
|
||||
},
|
||||
}
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
json=payload,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert "peer HMAC" in r.text or "peer hmac" in r.text.lower()
|
||||
|
||||
def test_rejects_wrong_peer_hmac(self, remote_client, monkeypatch):
|
||||
"""A request with a peer HMAC header keyed off the WRONG secret
|
||||
is rejected. Confirms the HMAC is actually verified — a tampered
|
||||
body or a key-substitution attack doesn't sneak through."""
|
||||
# Plant a known peer secret. The request will sign with a
|
||||
# DIFFERENT key, so verification must fail.
|
||||
from services.config import get_settings
|
||||
monkeypatch.setenv("MESH_PEER_PUSH_SECRET", "real-secret-32-chars-min-padding-padding")
|
||||
get_settings.cache_clear()
|
||||
|
||||
body = json.dumps({
|
||||
"envelope": {
|
||||
"msg_id": "dm_wronghmac",
|
||||
"mailbox_key": "mb",
|
||||
"sender_block_ref": "sender",
|
||||
"ciphertext": "x",
|
||||
},
|
||||
}).encode("utf-8")
|
||||
wrong_hmac = hmac.new(b"wrong-key", body, hashlib.sha256).hexdigest()
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
content=body,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Peer-Url": "http://example-peer.onion:8000",
|
||||
"X-Peer-HMAC": wrong_hmac,
|
||||
},
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
def test_rejects_oversize_body(self, remote_client):
|
||||
"""64 KB ceiling — anything bigger doesn't even get parsed.
|
||||
Defends against memory amplification via giant ciphertexts."""
|
||||
# 100 KB body is well over the 64 KB cap.
|
||||
big = b"{" + b"x" * 100_000 + b"}"
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
content=big,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(big)),
|
||||
},
|
||||
)
|
||||
assert r.status_code in (400, 413), (
|
||||
f"oversize body should be rejected with 400/413, got {r.status_code}"
|
||||
)
|
||||
|
||||
|
||||
class TestReplicateEndpointRegistered:
|
||||
def test_route_present_in_app(self):
|
||||
"""Static check that the route is actually wired into the app.
|
||||
Catches a future refactor that drops the router include or
|
||||
deletes the endpoint by accident."""
|
||||
from main import app
|
||||
|
||||
paths_methods = set()
|
||||
for route in app.routes:
|
||||
path = getattr(route, "path", None)
|
||||
methods = getattr(route, "methods", set()) or set()
|
||||
for m in methods:
|
||||
paths_methods.add((m, path))
|
||||
|
||||
assert ("POST", "/api/mesh/dm/replicate-envelope") in paths_methods, (
|
||||
"POST /api/mesh/dm/replicate-envelope is not registered on the app"
|
||||
)
|
||||
@@ -0,0 +1,334 @@
|
||||
"""Issue #302 (tg12): OpenClaw connect-info HMAC secret disclosure.
|
||||
|
||||
Before this change, ``GET /api/ai/connect-info?reveal=true`` returned the
|
||||
full HMAC secret in the response body on every modal open AND the same
|
||||
GET endpoint auto-bootstrapped (generated + persisted) the secret on a
|
||||
mere read. Even gated to ``require_local_operator``, that put the full
|
||||
secret into:
|
||||
|
||||
* browser visit history
|
||||
* dev-tools network panel
|
||||
* browser disk cache
|
||||
* HAR exports
|
||||
* screen captures / shoulder-surfing
|
||||
|
||||
Every single time the OpenClaw Connect modal opened.
|
||||
|
||||
After this change:
|
||||
|
||||
GET /api/ai/connect-info — always returns the MASKED
|
||||
fingerprint. No ?reveal param.
|
||||
No side effects (auto-bootstrap
|
||||
gone).
|
||||
POST /api/ai/connect-info/bootstrap — mints+persists the secret if
|
||||
missing. Idempotent. Never
|
||||
returns the full secret.
|
||||
POST /api/ai/connect-info/reveal — returns the full secret with
|
||||
strict Cache-Control: no-store
|
||||
headers. POST so the body
|
||||
doesn't land in URL history.
|
||||
POST /api/ai/connect-info/regenerate — keeps the one-time-disclosure
|
||||
for the new secret (regen IS a
|
||||
deliberate destructive action).
|
||||
Same no-store headers added.
|
||||
|
||||
These tests pin every property.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Loopback test client. ``require_local_operator`` resolves true for
|
||||
# request.client.host == "127.0.0.1"; FastAPI's TestClient sets it to
|
||||
# "testclient" which isn't on the allowlist. Use raw ASGITransport.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def loopback():
|
||||
from main import app
|
||||
|
||||
class _Client:
|
||||
def __init__(self, peer_ip: str = "127.0.0.1"):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._transport = ASGITransport(app=app, client=(peer_ip, 12345))
|
||||
self._base = f"http://{peer_ip}:8000"
|
||||
|
||||
def _do(self, method: str, url: str, **kw):
|
||||
async def go():
|
||||
async with AsyncClient(transport=self._transport, base_url=self._base) as ac:
|
||||
return await ac.request(method, url, **kw)
|
||||
return self._loop.run_until_complete(go())
|
||||
|
||||
def get(self, url, **kw): return self._do("GET", url, **kw)
|
||||
def post(self, url, **kw): return self._do("POST", url, **kw)
|
||||
def close(self): self._loop.close()
|
||||
|
||||
c = _Client()
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def remote():
|
||||
from main import app
|
||||
|
||||
class _Client:
|
||||
def __init__(self):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345))
|
||||
self._base = "http://1.2.3.4:8000"
|
||||
|
||||
def _do(self, method: str, url: str, **kw):
|
||||
async def go():
|
||||
async with AsyncClient(transport=self._transport, base_url=self._base) as ac:
|
||||
return await ac.request(method, url, **kw)
|
||||
return self._loop.run_until_complete(go())
|
||||
|
||||
def get(self, url, **kw): return self._do("GET", url, **kw)
|
||||
def post(self, url, **kw): return self._do("POST", url, **kw)
|
||||
def close(self): self._loop.close()
|
||||
|
||||
c = _Client()
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def stub_env(monkeypatch):
|
||||
"""Isolate connect-info tests from the dev's real backend .env.
|
||||
|
||||
Pydantic ``Settings()`` reads from ``.env`` file directly on
|
||||
instantiation, so monkey-patching ``os.environ`` isn't sufficient
|
||||
— the real ``OPENCLAW_HMAC_SECRET`` would leak through. Instead we
|
||||
override ``get_settings()`` in the route module to return a fresh
|
||||
``Settings`` instance whose env values are driven entirely by an
|
||||
in-test dict, AND we replace ``_write_env_value`` so writes update
|
||||
that same dict instead of touching the developer's filesystem.
|
||||
|
||||
Yields the dict so individual tests can pre-seed values or assert
|
||||
that writes happened.
|
||||
"""
|
||||
import routers.ai_intel as ai_intel
|
||||
import services.config as config
|
||||
|
||||
state: dict[str, str] = {}
|
||||
|
||||
class _FakeSettings:
|
||||
@property
|
||||
def OPENCLAW_HMAC_SECRET(self) -> str:
|
||||
return state.get("OPENCLAW_HMAC_SECRET", "")
|
||||
|
||||
@property
|
||||
def OPENCLAW_ACCESS_TIER(self) -> str:
|
||||
return state.get("OPENCLAW_ACCESS_TIER", "restricted")
|
||||
|
||||
fake = _FakeSettings()
|
||||
|
||||
def _fake_get_settings():
|
||||
return fake
|
||||
|
||||
# Route code calls ``get_settings.cache_clear()`` after writing the
|
||||
# env. The production version is wrapped with ``@lru_cache``, so
|
||||
# cache_clear exists. Attach a no-op shim here.
|
||||
_fake_get_settings.cache_clear = lambda: None # type: ignore[attr-defined]
|
||||
|
||||
monkeypatch.setattr(config, "get_settings", _fake_get_settings)
|
||||
|
||||
def _fake_write_env_value(key: str, value: str) -> None:
|
||||
state[key] = value
|
||||
|
||||
monkeypatch.setattr(ai_intel, "_write_env_value", _fake_write_env_value)
|
||||
|
||||
yield state
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# GET /api/ai/connect-info — always masked, no auto-bootstrap
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetConnectInfoMasking:
|
||||
def test_returns_masked_when_secret_set(self, loopback, stub_env):
|
||||
secret = "abcdef" + "0" * 38 + "wxyz"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = secret
|
||||
|
||||
r = loopback.get("/api/ai/connect-info")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
# Body must NOT carry the full secret value anywhere.
|
||||
assert secret not in r.text, (
|
||||
"GET /api/ai/connect-info MUST NOT include the full HMAC "
|
||||
"secret. Response body contained the secret value."
|
||||
)
|
||||
assert body["hmac_secret_set"] is True
|
||||
assert body["masked_hmac_secret"].startswith("abcdef")
|
||||
assert body["masked_hmac_secret"].endswith("wxyz")
|
||||
assert "•" in body["masked_hmac_secret"]
|
||||
# Pre-fix field is gone.
|
||||
assert "hmac_secret" not in body
|
||||
|
||||
def test_no_auto_bootstrap_when_secret_missing(self, loopback, stub_env):
|
||||
"""Side-effect-on-GET was the second half of issue #302. A GET
|
||||
with no secret configured must NOT mint one — that should
|
||||
require an explicit POST /bootstrap."""
|
||||
r = loopback.get("/api/ai/connect-info")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["hmac_secret_set"] is False
|
||||
assert body["masked_hmac_secret"] == ""
|
||||
# The bootstrap_behavior block should advertise the new flow.
|
||||
assert body["bootstrap_behavior"]["auto_generates_when_missing"] is False
|
||||
# And no _write_env_value call happened.
|
||||
assert "OPENCLAW_HMAC_SECRET" not in stub_env
|
||||
|
||||
def test_no_reveal_query_param(self, loopback, stub_env):
|
||||
"""Pre-fix, ?reveal=true would return the full secret. Post-fix
|
||||
the param is silently ignored — the response is the same as
|
||||
without it (still masked, no leak)."""
|
||||
secret = "abcdef" + "0" * 38 + "wxyz"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = secret
|
||||
|
||||
r = loopback.get("/api/ai/connect-info?reveal=true")
|
||||
assert r.status_code == 200
|
||||
assert secret not in r.text, (
|
||||
"?reveal=true must be a no-op on GET — the full secret "
|
||||
"MUST NOT come back in the response body."
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/ai/connect-info/bootstrap
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBootstrap:
|
||||
def test_mints_when_missing(self, loopback, stub_env):
|
||||
r = loopback.post("/api/ai/connect-info/bootstrap")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["ok"] is True
|
||||
assert body["generated"] is True
|
||||
assert body["hmac_secret_set"] is True
|
||||
# Bootstrap must NOT return the full secret in-line.
|
||||
assert "hmac_secret" not in body or not body.get("hmac_secret")
|
||||
assert "•" in body["masked_hmac_secret"]
|
||||
# _write_env_value was actually called.
|
||||
assert stub_env.get("OPENCLAW_HMAC_SECRET")
|
||||
# The full value isn't echoed back in the response text either.
|
||||
assert stub_env["OPENCLAW_HMAC_SECRET"] not in r.text
|
||||
|
||||
def test_idempotent_when_already_set(self, loopback, stub_env):
|
||||
existing = "abcdef" + "0" * 38 + "wxyz"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = existing
|
||||
|
||||
r = loopback.post("/api/ai/connect-info/bootstrap")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["ok"] is True
|
||||
assert body["generated"] is False
|
||||
assert body["hmac_secret_set"] is True
|
||||
# Existing secret untouched — value is still the seeded one.
|
||||
assert stub_env["OPENCLAW_HMAC_SECRET"] == existing
|
||||
# No full secret in the response.
|
||||
assert existing not in r.text
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/ai/connect-info/reveal
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestReveal:
|
||||
def test_returns_full_secret_when_set(self, loopback, stub_env):
|
||||
secret = "abcdef" + "0" * 38 + "wxyz"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = secret
|
||||
|
||||
r = loopback.post("/api/ai/connect-info/reveal")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["ok"] is True
|
||||
assert body["hmac_secret"] == secret
|
||||
|
||||
def test_strict_cache_control_headers(self, loopback, stub_env):
|
||||
"""The whole point of POST /reveal vs GET ?reveal=true is that
|
||||
the response carries headers that prevent every cache layer
|
||||
from persisting the secret."""
|
||||
secret = "abcdef" + "0" * 38 + "wxyz"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = secret
|
||||
|
||||
r = loopback.post("/api/ai/connect-info/reveal")
|
||||
cc = r.headers.get("cache-control", "")
|
||||
assert "no-store" in cc, (
|
||||
f"reveal MUST set Cache-Control: no-store — got {cc!r}"
|
||||
)
|
||||
assert "no-cache" in cc
|
||||
# Pragma + Expires as well for HTTP/1.0 caches.
|
||||
assert r.headers.get("pragma", "").lower() == "no-cache"
|
||||
assert r.headers.get("expires") == "0"
|
||||
|
||||
def test_404_when_no_secret_configured(self, loopback, stub_env):
|
||||
r = loopback.post("/api/ai/connect-info/reveal")
|
||||
assert r.status_code == 404
|
||||
# Hint should point at the bootstrap endpoint, not just say "404".
|
||||
detail = r.json().get("detail", "")
|
||||
assert "/bootstrap" in detail or "bootstrap" in detail.lower()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# POST /api/ai/connect-info/regenerate — still returns the new secret
|
||||
# inline (deliberate destructive action), but with no-store headers.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRegenerate:
|
||||
def test_returns_new_secret_with_no_store_headers(self, loopback, stub_env):
|
||||
# Seed an existing secret so we can prove it changes.
|
||||
old = "oldold" + "0" * 38 + "1234"
|
||||
stub_env["OPENCLAW_HMAC_SECRET"] = old
|
||||
|
||||
r = loopback.post("/api/ai/connect-info/regenerate")
|
||||
assert r.status_code == 200
|
||||
body = r.json()
|
||||
assert body["ok"] is True
|
||||
assert body["hmac_secret"]
|
||||
assert body["hmac_secret"] != old
|
||||
# no-store headers MUST be present so the new secret doesn't
|
||||
# land in browser disk cache after the regenerate click.
|
||||
cc = r.headers.get("cache-control", "")
|
||||
assert "no-store" in cc and "no-cache" in cc
|
||||
assert r.headers.get("pragma", "").lower() == "no-cache"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Auth-gate regression — every endpoint still rejects anonymous remote
|
||||
# callers. This is the property we already enforce for the rest of the
|
||||
# operator-only surface; adding the three new endpoints to the audit
|
||||
# coverage prevents a future refactor from dropping the dependency.
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAnonymousRejection:
|
||||
@pytest.mark.parametrize(
|
||||
"method,path,body",
|
||||
[
|
||||
("get", "/api/ai/connect-info", None),
|
||||
("post", "/api/ai/connect-info/bootstrap", None),
|
||||
("post", "/api/ai/connect-info/reveal", None),
|
||||
("post", "/api/ai/connect-info/regenerate", None),
|
||||
],
|
||||
)
|
||||
def test_remote_rejected(self, remote, method, path, body):
|
||||
fn = getattr(remote, method)
|
||||
r = fn(path, json=body) if body is not None else fn(path)
|
||||
assert r.status_code == 403, (
|
||||
f"{method.upper()} {path} must reject anonymous remote callers; "
|
||||
f"got {r.status_code}"
|
||||
)
|
||||
@@ -39,6 +39,7 @@ import { useFeedHealth } from '@/hooks/useFeedHealth';
|
||||
import { useKeyboardShortcuts } from '@/hooks/useKeyboardShortcuts';
|
||||
import KeyboardShortcutsOverlay from '@/components/KeyboardShortcutsOverlay';
|
||||
import AlertToast from '@/components/AlertToast';
|
||||
import AisUpstreamBanner from '@/components/AisUpstreamBanner';
|
||||
import { useAlertToasts } from '@/hooks/useAlertToasts';
|
||||
import { useWatchlist } from '@/hooks/useWatchlist';
|
||||
import WatchlistWidget from '@/components/WatchlistWidget';
|
||||
@@ -933,6 +934,11 @@ export default function Dashboard() {
|
||||
onFlyTo={handleFlyTo}
|
||||
/>
|
||||
|
||||
{/* AIS UPSTREAM OUTAGE BANNER — renders only when AIS is configured
|
||||
but the WebSocket upstream is unreachable. Tells users the empty
|
||||
ocean isn't their fault. */}
|
||||
<AisUpstreamBanner />
|
||||
|
||||
{/* ONBOARDING MODAL */}
|
||||
{showOnboarding && (
|
||||
<OnboardingModal
|
||||
|
||||
@@ -357,8 +357,15 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
const [riskAccepted, setRiskAccepted] = React.useState(false);
|
||||
const [accessTier, setAccessTier] = React.useState<'restricted' | 'full'>('restricted');
|
||||
const [connectionMode, setConnectionMode] = React.useState<'local' | 'remote'>('local');
|
||||
// hmacSecret holds the FULL secret once the operator has clicked
|
||||
// Reveal (or after a regenerate). maskedHmacSecret is the safe-to-show
|
||||
// fingerprint returned by GET /api/ai/connect-info and is loaded on
|
||||
// mount. The two are independent state slots so a stale full secret
|
||||
// can never leak back into the UI after a regenerate.
|
||||
const [hmacSecret, setHmacSecret] = React.useState('');
|
||||
const [maskedHmacSecret, setMaskedHmacSecret] = React.useState('');
|
||||
const [hmacLoading, setHmacLoading] = React.useState(false);
|
||||
const [revealing, setRevealing] = React.useState(false);
|
||||
const [tierSaving, setTierSaving] = React.useState(false);
|
||||
const [showAdvanced, setShowAdvanced] = React.useState(false);
|
||||
const [showResetConfirm, setShowResetConfirm] = React.useState(false);
|
||||
@@ -381,16 +388,40 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
const [torError, setTorError] = React.useState('');
|
||||
const [torOnion, setTorOnion] = React.useState('');
|
||||
|
||||
// Fetch connect-info + node status on mount
|
||||
// Issue #302 (tg12): the full HMAC secret no longer travels through
|
||||
// GET /api/ai/connect-info on every modal open. The flow is now:
|
||||
//
|
||||
// 1. GET /api/ai/connect-info — always returns the masked fingerprint
|
||||
// (first6 + bullets + last4). `hmacSecret` stays empty until the
|
||||
// operator clicks the Reveal (eye) button below.
|
||||
// 2. POST /api/ai/connect-info/bootstrap — fires once on mount if the
|
||||
// backend reports `hmac_secret_set: false`. Idempotent and never
|
||||
// returns the secret in the response.
|
||||
// 3. POST /api/ai/connect-info/reveal — fires when the operator clicks
|
||||
// Reveal or Copy without the secret yet loaded. Returns the full
|
||||
// secret with strict `Cache-Control: no-store` so it doesn't land
|
||||
// in browser caches or HAR exports.
|
||||
React.useEffect(() => {
|
||||
(async () => {
|
||||
try {
|
||||
setHmacLoading(true);
|
||||
const res = await fetch(`${API_BASE}/api/ai/connect-info?reveal=true`);
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
setHmacSecret(data.hmac_secret || '');
|
||||
setAccessTier(data.access_tier === 'full' ? 'full' : 'restricted');
|
||||
const res = await fetch(`${API_BASE}/api/ai/connect-info`);
|
||||
if (!res.ok) return;
|
||||
const data = await res.json();
|
||||
setMaskedHmacSecret(data.masked_hmac_secret || '');
|
||||
setAccessTier(data.access_tier === 'full' ? 'full' : 'restricted');
|
||||
|
||||
// Transparent first-use bootstrap. Mirrors the pre-#302 UX of
|
||||
// "open modal → secret exists" without the GET side-effect.
|
||||
if (!data.hmac_secret_set) {
|
||||
const bootRes = await fetch(
|
||||
`${API_BASE}/api/ai/connect-info/bootstrap`,
|
||||
{ method: 'POST' },
|
||||
);
|
||||
if (bootRes.ok) {
|
||||
const bootData = await bootRes.json();
|
||||
setMaskedHmacSecret(bootData.masked_hmac_secret || '');
|
||||
}
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
finally { setHmacLoading(false); }
|
||||
@@ -477,8 +508,17 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
const res = await fetch(`${API_BASE}/api/settings/agent/reset-all`, { method: 'POST' });
|
||||
const data = await res.json();
|
||||
if (data.ok) {
|
||||
// Update local state with new credentials
|
||||
if (data.new_hmac_secret) setHmacSecret(data.new_hmac_secret);
|
||||
// Update local state with new credentials. reset-all returns
|
||||
// the new HMAC secret in-band (same one-time-disclosure rule
|
||||
// as /regenerate — a deliberate destructive action). Refresh
|
||||
// both slots so the masked display stays in sync.
|
||||
if (data.new_hmac_secret) {
|
||||
setHmacSecret(data.new_hmac_secret);
|
||||
const s = String(data.new_hmac_secret);
|
||||
setMaskedHmacSecret(
|
||||
s.length > 10 ? s.slice(0, 6) + '•'.repeat(8) + s.slice(-4) : '•'.repeat(16),
|
||||
);
|
||||
}
|
||||
if (data.new_onion) {
|
||||
setTorOnion(data.new_onion);
|
||||
setRemoteUrl(data.new_onion);
|
||||
@@ -502,13 +542,41 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
finally { setTierSaving(false); }
|
||||
};
|
||||
|
||||
// Issue #302: POST /reveal returns the full secret with strict
|
||||
// no-store headers. Lazily fetched — never on mount. Returns the
|
||||
// secret string so callers can copy it immediately without waiting
|
||||
// for React state propagation.
|
||||
const revealHmacSecret = async (): Promise<string> => {
|
||||
if (hmacSecret) return hmacSecret;
|
||||
setRevealing(true);
|
||||
try {
|
||||
const res = await fetch(`${API_BASE}/api/ai/connect-info/reveal`, {
|
||||
method: 'POST',
|
||||
});
|
||||
if (!res.ok) return '';
|
||||
const data = await res.json();
|
||||
const secret = String(data.hmac_secret || '');
|
||||
setHmacSecret(secret);
|
||||
return secret;
|
||||
} catch {
|
||||
return '';
|
||||
} finally {
|
||||
setRevealing(false);
|
||||
}
|
||||
};
|
||||
|
||||
const handleRegenerate = async () => {
|
||||
setRegenerating(true);
|
||||
try {
|
||||
const res = await fetch(`${API_BASE}/api/ai/connect-info/regenerate`, { method: 'POST' });
|
||||
if (res.ok) {
|
||||
const data = await res.json();
|
||||
// Regenerate is a deliberate destructive action — operator needs
|
||||
// to see the new secret once to update their OpenClaw config.
|
||||
// Both the full and masked forms refresh in one shot.
|
||||
setHmacSecret(data.hmac_secret || '');
|
||||
setMaskedHmacSecret(data.masked_hmac_secret || '');
|
||||
setShowSecret(true);
|
||||
}
|
||||
} catch { /* ignore */ }
|
||||
finally { setRegenerating(false); }
|
||||
@@ -543,9 +611,17 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
finally { setNodeToggling(false); }
|
||||
};
|
||||
|
||||
const maskedSecret = hmacSecret
|
||||
? hmacSecret.slice(0, 6) + '\u2022'.repeat(8) + hmacSecret.slice(-4)
|
||||
: '\u2022'.repeat(16);
|
||||
// Issue #302: prefer the server-supplied fingerprint
|
||||
// (maskedHmacSecret) \u2014 it's filled on mount via the (no-secret) GET.
|
||||
// If the operator has clicked Reveal, fall through to deriving the
|
||||
// mask from the in-memory full secret so we keep the same shape
|
||||
// (first6 + bullets + last4) regardless of source. Final fallback
|
||||
// (no secret loaded yet) is a generic bullet string.
|
||||
const maskedSecret =
|
||||
maskedHmacSecret ||
|
||||
(hmacSecret
|
||||
? hmacSecret.slice(0, 6) + '\u2022'.repeat(8) + hmacSecret.slice(-4)
|
||||
: '\u2022'.repeat(16));
|
||||
|
||||
// Resolve the endpoint URL
|
||||
const resolvedUrl = connectionMode === 'local'
|
||||
@@ -672,10 +748,15 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
return lines.join('\n');
|
||||
};
|
||||
const displaySnippet = buildSnippet(maskedSecret);
|
||||
const copySnippet = buildSnippet(hmacSecret);
|
||||
|
||||
const handleCopySnippet = () => {
|
||||
navigator.clipboard.writeText(copySnippet);
|
||||
// Issue #302: the copy snippet needs the FULL secret. Pre-#302 we kept
|
||||
// it in memory from the GET-with-reveal load; now we lazy-fetch via
|
||||
// POST /reveal only when the operator actually clicks Copy. If they
|
||||
// already revealed, the in-memory value is reused (no extra request).
|
||||
const handleCopySnippet = async () => {
|
||||
const secret = hmacSecret || (await revealHmacSecret());
|
||||
if (!secret) return;
|
||||
navigator.clipboard.writeText(buildSnippet(secret));
|
||||
setSnippetCopied(true);
|
||||
setTimeout(() => setSnippetCopied(false), 2000);
|
||||
};
|
||||
@@ -913,18 +994,38 @@ function ConnectModalBody({ apiEndpoint, handleCopy, copied }: ConnectModalBodyP
|
||||
</div>
|
||||
<div className="flex items-center gap-2">
|
||||
<code className="flex-1 bg-black/60 border border-violet-800/40 px-3 py-2 text-xs font-mono text-violet-300 overflow-hidden text-ellipsis">
|
||||
{showSecret ? hmacSecret : maskedSecret}
|
||||
{/* Issue #302: when the operator hasn't clicked
|
||||
Reveal yet, hmacSecret is empty and we fall
|
||||
back to maskedHmacSecret (the safe fingerprint
|
||||
returned by GET /api/ai/connect-info). */}
|
||||
{showSecret && hmacSecret ? hmacSecret : (maskedHmacSecret || maskedSecret)}
|
||||
</code>
|
||||
<button
|
||||
onClick={() => setShowSecret(!showSecret)}
|
||||
className="p-2 bg-violet-600/20 border border-violet-500/40 text-violet-400 hover:bg-violet-600/40 transition-colors shrink-0"
|
||||
onClick={async () => {
|
||||
if (showSecret) {
|
||||
setShowSecret(false);
|
||||
return;
|
||||
}
|
||||
// Need the full secret in state before showing it.
|
||||
const secret = await revealHmacSecret();
|
||||
if (secret) setShowSecret(true);
|
||||
}}
|
||||
disabled={revealing}
|
||||
className="p-2 bg-violet-600/20 border border-violet-500/40 text-violet-400 hover:bg-violet-600/40 transition-colors shrink-0 disabled:opacity-50"
|
||||
title={showSecret ? 'Hide' : 'Reveal'}
|
||||
>
|
||||
{showSecret ? <EyeOff size={14} /> : <Eye size={14} />}
|
||||
</button>
|
||||
<button
|
||||
onClick={() => handleCopy(hmacSecret)}
|
||||
className="p-2 bg-violet-600/20 border border-violet-500/40 text-violet-400 hover:bg-violet-600/40 transition-colors shrink-0"
|
||||
onClick={async () => {
|
||||
// Copy needs the full secret. Fetch it lazily if
|
||||
// the operator hasn't clicked Reveal yet — no
|
||||
// point making them reveal first just to copy.
|
||||
const secret = hmacSecret || (await revealHmacSecret());
|
||||
if (secret) handleCopy(secret);
|
||||
}}
|
||||
disabled={revealing}
|
||||
className="p-2 bg-violet-600/20 border border-violet-500/40 text-violet-400 hover:bg-violet-600/40 transition-colors shrink-0 disabled:opacity-50"
|
||||
title="Copy key"
|
||||
>
|
||||
{copied ? <Check size={14} /> : <Copy size={14} />}
|
||||
|
||||
@@ -0,0 +1,61 @@
|
||||
/**
|
||||
* AisUpstreamBanner — visible notice that AIS ship data is unavailable
|
||||
* because the upstream provider (AISStream) is offline.
|
||||
*
|
||||
* Renders nothing when AIS is healthy or when AIS isn't configured at all.
|
||||
* Mounted at the app shell level so users see it before they wonder why
|
||||
* the ocean looks empty.
|
||||
*/
|
||||
import { useState } from 'react';
|
||||
import { useAisUpstreamHealth } from '@/hooks/useAisUpstreamHealth';
|
||||
|
||||
export function AisUpstreamBanner() {
|
||||
const health = useAisUpstreamHealth();
|
||||
const [dismissed, setDismissed] = useState(false);
|
||||
|
||||
if (!health || !health.aisEnabled || health.connected || dismissed) {
|
||||
return null;
|
||||
}
|
||||
|
||||
// Format the staleness for the operator. ``null`` means we never received
|
||||
// anything since startup; otherwise show minutes if > 60s.
|
||||
let stalenessLabel = 'never received';
|
||||
if (health.lastMsgAgeSeconds != null) {
|
||||
const minutes = Math.floor(health.lastMsgAgeSeconds / 60);
|
||||
if (minutes >= 1) {
|
||||
stalenessLabel = `last update ${minutes} min ago`;
|
||||
} else {
|
||||
stalenessLabel = `last update ${health.lastMsgAgeSeconds}s ago`;
|
||||
}
|
||||
}
|
||||
|
||||
return (
|
||||
<div
|
||||
role="status"
|
||||
aria-live="polite"
|
||||
className="pointer-events-auto fixed top-3 left-1/2 z-[100] -translate-x-1/2 max-w-[640px] rounded-md border border-amber-500/60 bg-amber-900/85 px-4 py-2 text-sm text-amber-50 shadow-lg backdrop-blur"
|
||||
>
|
||||
<div className="flex items-start gap-3">
|
||||
<span aria-hidden className="mt-0.5 text-amber-300">⚠</span>
|
||||
<div className="flex-1">
|
||||
<div className="font-semibold">Ship data temporarily unavailable</div>
|
||||
<div className="text-xs opacity-90">
|
||||
AISStream upstream is offline ({stalenessLabel}). The map will
|
||||
refill once their service comes back online — nothing is wrong
|
||||
with your install.
|
||||
</div>
|
||||
</div>
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => setDismissed(true)}
|
||||
aria-label="Dismiss"
|
||||
className="text-amber-200 hover:text-white"
|
||||
>
|
||||
✕
|
||||
</button>
|
||||
</div>
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
export default AisUpstreamBanner;
|
||||
@@ -140,17 +140,51 @@ const OnboardingModal = React.memo(function OnboardingModal({
|
||||
].join('\n');
|
||||
const remoteAgentNeedsTor = agentMode === 'remote' && !torAddress;
|
||||
|
||||
// Issue #302 (tg12): the full HMAC secret no longer comes back from
|
||||
// GET /api/ai/connect-info. We fetch metadata + the masked fingerprint
|
||||
// first; if the operator has explicitly asked to see the key (the
|
||||
// ``reveal`` flag), we follow up with POST /api/ai/connect-info/reveal
|
||||
// (after a transparent POST /bootstrap if the secret hasn't been
|
||||
// minted yet) which carries the secret with strict no-store headers.
|
||||
const fetchAgentConnectInfo = async (reveal = true) => {
|
||||
setAgentLoading(true);
|
||||
setAgentMsg(null);
|
||||
try {
|
||||
const res = await fetch(`/api/ai/connect-info?reveal=${reveal ? 'true' : 'false'}`);
|
||||
const data = await res.json().catch(() => ({}));
|
||||
if (!res.ok || data?.ok === false) {
|
||||
throw new Error(data?.detail || 'Could not prepare agent credentials.');
|
||||
// 1) GET metadata + masked fingerprint.
|
||||
const metaRes = await fetch('/api/ai/connect-info');
|
||||
const metaData = await metaRes.json().catch(() => ({}));
|
||||
if (!metaRes.ok || metaData?.ok === false) {
|
||||
throw new Error(metaData?.detail || 'Could not prepare agent credentials.');
|
||||
}
|
||||
setAgentTier(metaData.access_tier === 'full' ? 'full' : 'restricted');
|
||||
|
||||
// 2) Mint the secret if it isn't set yet — transparent, idempotent.
|
||||
let secretSet = !!metaData.hmac_secret_set;
|
||||
if (!secretSet) {
|
||||
const bootRes = await fetch('/api/ai/connect-info/bootstrap', {
|
||||
method: 'POST',
|
||||
});
|
||||
const bootData = await bootRes.json().catch(() => ({}));
|
||||
if (!bootRes.ok || bootData?.ok === false) {
|
||||
throw new Error(bootData?.detail || 'Could not generate agent credentials.');
|
||||
}
|
||||
secretSet = !!bootData.hmac_secret_set;
|
||||
}
|
||||
|
||||
// 3) If the caller asked to see the secret, fetch it explicitly.
|
||||
// Otherwise the masked fingerprint is enough for the UI.
|
||||
if (reveal && secretSet) {
|
||||
const revealRes = await fetch('/api/ai/connect-info/reveal', {
|
||||
method: 'POST',
|
||||
});
|
||||
const revealData = await revealRes.json().catch(() => ({}));
|
||||
if (!revealRes.ok || revealData?.ok === false) {
|
||||
throw new Error(revealData?.detail || 'Could not reveal agent credentials.');
|
||||
}
|
||||
setAgentSecret(revealData.hmac_secret || '');
|
||||
} else {
|
||||
setAgentSecret(metaData.masked_hmac_secret || '');
|
||||
}
|
||||
setAgentSecret(data.hmac_secret || '');
|
||||
setAgentTier(data.access_tier === 'full' ? 'full' : 'restricted');
|
||||
setAgentMsg({ type: 'ok', text: 'Agent key is ready. Copy it into your local or remote agent runtime.' });
|
||||
} catch (error) {
|
||||
setAgentMsg({
|
||||
|
||||
@@ -0,0 +1,85 @@
|
||||
/**
|
||||
* useAisUpstreamHealth — polls /api/health and exposes AIS proxy connectivity.
|
||||
*
|
||||
* Background: AISStream's WebSocket server went fully offline 2026-05-23 (TCP
|
||||
* timeouts at stream.aisstream.io). The backend kept reconnecting in a tight
|
||||
* loop and the ships layer silently went empty. Users had no signal that the
|
||||
* problem was upstream, not their config. This hook surfaces the state so a
|
||||
* banner can explain "AIS upstream is offline" instead of letting users
|
||||
* wonder.
|
||||
*
|
||||
* The poll interval is intentionally relaxed (30s) — this is a low-urgency UX
|
||||
* signal, not a real-time data feed. Backend already escalates top_status to
|
||||
* "degraded" when AIS is configured-but-disconnected.
|
||||
*/
|
||||
import { useEffect, useRef, useState } from 'react';
|
||||
import { API_BASE } from '@/lib/api';
|
||||
|
||||
export interface AisUpstreamHealth {
|
||||
/** True when we've received a vessel message in the last ~60s. */
|
||||
connected: boolean;
|
||||
/** Seconds since the last vessel message; null when we've never seen one. */
|
||||
lastMsgAgeSeconds: number | null;
|
||||
/**
|
||||
* True when the SPKI-pinned fallback is in effect (issue #258).
|
||||
* Data still flows in this mode — it's a separate, less urgent signal
|
||||
* than ``connected``.
|
||||
*/
|
||||
degradedTls: boolean;
|
||||
/** How many times the proxy has been spawned (sustained growth without
|
||||
* ``connected`` means upstream is dead and we're respawning in a loop). */
|
||||
proxySpawnCount: number;
|
||||
/** Whether the operator has configured an API key. When false, the banner
|
||||
* shouldn't fire because "AIS is off" is the intended state. The backend
|
||||
* signals this via the ``connected`` flag being false AND no msg ever
|
||||
* seen — we approximate it by requiring at least one spawn before
|
||||
* declaring an outage. */
|
||||
aisEnabled: boolean;
|
||||
}
|
||||
|
||||
const POLL_INTERVAL_MS = 30_000;
|
||||
|
||||
export function useAisUpstreamHealth(): AisUpstreamHealth | null {
|
||||
const [health, setHealth] = useState<AisUpstreamHealth | null>(null);
|
||||
const cancelledRef = useRef(false);
|
||||
|
||||
useEffect(() => {
|
||||
cancelledRef.current = false;
|
||||
|
||||
const fetchHealth = async () => {
|
||||
try {
|
||||
const res = await fetch(`${API_BASE}/api/health`, { cache: 'no-store' });
|
||||
if (!res.ok) return;
|
||||
const body = await res.json();
|
||||
if (cancelledRef.current) return;
|
||||
const proxy = body?.ais_proxy ?? {};
|
||||
// ``proxy_spawn_count > 0`` is the cheapest "AIS is enabled" check:
|
||||
// if the backend never spawned the proxy (no API key, opt-out env)
|
||||
// we shouldn't ever show the outage banner. Once the proxy has
|
||||
// spawned at least once we know the operator wants AIS data.
|
||||
const spawns = Number(proxy.proxy_spawn_count ?? 0);
|
||||
setHealth({
|
||||
connected: Boolean(proxy.connected),
|
||||
lastMsgAgeSeconds:
|
||||
proxy.last_msg_age_seconds == null
|
||||
? null
|
||||
: Number(proxy.last_msg_age_seconds),
|
||||
degradedTls: Boolean(proxy.degraded_tls),
|
||||
proxySpawnCount: spawns,
|
||||
aisEnabled: spawns > 0,
|
||||
});
|
||||
} catch {
|
||||
// Backend unreachable — separate problem. Banner not relevant.
|
||||
}
|
||||
};
|
||||
|
||||
void fetchHealth();
|
||||
const interval = setInterval(() => void fetchHealth(), POLL_INTERVAL_MS);
|
||||
return () => {
|
||||
cancelledRef.current = true;
|
||||
clearInterval(interval);
|
||||
};
|
||||
}, []);
|
||||
|
||||
return health;
|
||||
}
|
||||
+9
-2
@@ -76,6 +76,13 @@ function canRun(command, args) {
|
||||
return !result.error && result.status === 0;
|
||||
}
|
||||
|
||||
function canRunBackendPython(pythonBin) {
|
||||
return (
|
||||
canRun(pythonBin, ["-V"]) &&
|
||||
canRun(pythonBin, ["-c", "import fastapi, uvicorn"])
|
||||
);
|
||||
}
|
||||
|
||||
function findBasePython() {
|
||||
const candidates = isWindows
|
||||
? [
|
||||
@@ -135,12 +142,12 @@ function rebuildBackendVenv(targetDir, basePython) {
|
||||
if (result.error || result.status !== 0) {
|
||||
return null;
|
||||
}
|
||||
return canRun(repairedBin, ["-V"]) ? repairedBin : null;
|
||||
return canRunBackendPython(repairedBin) ? repairedBin : null;
|
||||
}
|
||||
|
||||
function ensureBackendVenv() {
|
||||
for (const candidate of venvCandidates) {
|
||||
if (fs.existsSync(candidate) && canRun(candidate, ["-V"])) {
|
||||
if (fs.existsSync(candidate) && canRunBackendPython(candidate)) {
|
||||
persistSelectedVenv(candidate);
|
||||
return candidate;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user