diff --git a/backend/main.py b/backend/main.py index 1087761..f94d51d 100644 --- a/backend/main.py +++ b/backend/main.py @@ -777,6 +777,136 @@ def _http_peer_push_loop() -> None: _NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S) +# ─── Background Gate Message Pull Worker ───────────────────────────────── +# Periodically pulls gate events from relay peers that this node is missing. +# Complements the push loop: push sends OUR events to peers, pull fetches +# THEIR events from peers (needed when this node is behind NAT). + +_GATE_PULL_INTERVAL_S = 30 +_gate_pull_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → known count} + + +def _http_gate_pull_loop() -> None: + """Background thread: pull new gate messages from HTTP relay peers.""" + import requests as _requests + from services.mesh.mesh_hashchain import gate_store + + while not _NODE_SYNC_STOP.is_set(): + try: + if not _participant_node_enabled(): + _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) + continue + + secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip() + if not secret: + _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) + continue + + peers = authenticated_push_peer_urls() + if not peers: + _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) + continue + + for peer_url in peers: + normalized = normalize_peer_url(peer_url) + if not normalized: + continue + + peer_key = _derive_peer_key(secret, normalized) + if not peer_key: + continue + + peer_counts = _gate_pull_last_count.setdefault(normalized, {}) + + try: + # Step 1: Ask the peer which gates it has and how many events each + discovery_body = json_mod.dumps( + {"gate_id": "", "after_count": 0}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + import hmac as _hmac_pull + import hashlib as _hashlib_pull + discovery_hmac = _hmac_pull.new(peer_key, discovery_body, _hashlib_pull.sha256).hexdigest() + + timeout = int(get_settings().MESH_RELAY_PUSH_TIMEOUT_S or 10) + resp = _requests.post( + f"{normalized}/api/mesh/gate/peer-pull", + data=discovery_body, + headers={ + "Content-Type": "application/json", + "X-Peer-HMAC": discovery_hmac, + }, + timeout=timeout, + ) + if resp.status_code != 200: + continue + discovery = resp.json() + if not discovery.get("ok"): + continue + remote_gates: dict[str, int] = discovery.get("gates", {}) + if not remote_gates: + continue + + # Step 2: For each gate with new events, pull the batch + for gate_id, remote_total in remote_gates.items(): + local_known = peer_counts.get(gate_id, 0) + # Also account for what we already have locally + with gate_store._lock: + local_count = len(gate_store._gates.get(gate_id, [])) + effective_cursor = max(local_known, local_count) + if effective_cursor >= remote_total: + continue + + pull_body = json_mod.dumps( + {"gate_id": gate_id, "after_count": effective_cursor}, + sort_keys=True, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + pull_hmac = _hmac_pull.new(peer_key, pull_body, _hashlib_pull.sha256).hexdigest() + + pull_resp = _requests.post( + f"{normalized}/api/mesh/gate/peer-pull", + data=pull_body, + headers={ + "Content-Type": "application/json", + "X-Peer-HMAC": pull_hmac, + }, + timeout=timeout, + ) + if pull_resp.status_code != 200: + continue + pull_data = pull_resp.json() + if not pull_data.get("ok"): + continue + + events = pull_data.get("events", []) + if not events: + peer_counts[gate_id] = remote_total + continue + + result = gate_store.ingest_peer_events(gate_id, events) + accepted = int(result.get("accepted", 0) or 0) + dups = int(result.get("duplicates", 0) or 0) + if accepted > 0: + logger.info( + "Gate pull: %d new event(s) for %s from %s", + accepted, gate_id[:12], normalized[:40], + ) + peer_counts[gate_id] = effective_cursor + len(events) + + except Exception as exc: + logger.warning("Gate pull from %s failed: %s", normalized[:40], exc) + + except Exception: + logger.exception("HTTP gate pull loop error") + _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) + + # ─── Background Gate Message Push Worker ───────────────────────────────── _gate_push_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → count} @@ -1221,6 +1351,7 @@ async def lifespan(app: FastAPI): threading.Thread(target=_public_infonet_sync_loop, daemon=True).start() threading.Thread(target=_http_peer_push_loop, daemon=True).start() threading.Thread(target=_http_gate_push_loop, daemon=True).start() + threading.Thread(target=_http_gate_pull_loop, daemon=True).start() global _NODE_PUBLIC_EVENT_HOOK_REGISTERED if not _NODE_PUBLIC_EVENT_HOOK_REGISTERED: register_public_event_append_hook(_schedule_public_event_propagation) @@ -4304,6 +4435,60 @@ async def gate_peer_push(request: Request): return {"ok": True, "accepted": accepted, "duplicates": duplicates, "rejected": rejected} +@app.post("/api/mesh/gate/peer-pull") +@limiter.limit("30/minute") +async def gate_peer_pull(request: Request): + """Return gate events a peer is missing (HMAC-authenticated pull sync). + + Body: {"gate_id": "...", "after_count": N} + Returns up to 50 events after the caller's known count for that gate. + """ + content_length = request.headers.get("content-length") + if content_length: + try: + if int(content_length) > 65_536: + return Response( + content='{"ok":false,"detail":"Request body too large"}', + status_code=413, + media_type="application/json", + ) + except (ValueError, TypeError): + pass + + from services.mesh.mesh_hashchain import gate_store + + body_bytes = await request.body() + if not _verify_peer_push_hmac(request, body_bytes): + return Response( + content='{"ok":false,"detail":"Invalid or missing peer HMAC"}', + status_code=403, + media_type="application/json", + ) + + body = json_mod.loads(body_bytes or b"{}") + gate_id = str(body.get("gate_id", "") or "").strip().lower() + after_count = _safe_int(body.get("after_count", 0) or 0) + + if not gate_id: + # If no gate_id, return all known gate IDs with their event counts + # so the puller knows which gates to sync. + gate_ids = gate_store.known_gate_ids() + gate_counts: dict[str, int] = {} + for gid in gate_ids: + with gate_store._lock: + gate_counts[gid] = len(gate_store._gates.get(gid, [])) + return {"ok": True, "gates": gate_counts} + + with gate_store._lock: + all_events = list(gate_store._gates.get(gate_id, [])) + total = len(all_events) + if after_count >= total: + return {"ok": True, "events": [], "total": total, "gate_id": gate_id} + + batch = all_events[after_count : after_count + _PEER_PUSH_BATCH_SIZE] + return {"ok": True, "events": batch, "total": total, "gate_id": gate_id} + + # --------------------------------------------------------------------------- # Peer Management API — operator endpoints for adding / removing / listing # peers without editing peer_store.json by hand.