From c81d81ec41382f79246d9998d41ea1f0ffcda64e Mon Sep 17 00:00:00 2001 From: anoracleofra-code Date: Fri, 27 Mar 2026 09:35:53 -0600 Subject: [PATCH] feat: real-time gate messages via SSE + faster push/pull intervals - Add Server-Sent Events endpoint at GET /api/mesh/gate/stream that broadcasts ALL gate events to connected frontends (privacy: no per-gate subscriptions, clients filter locally) - Hook SSE broadcast into all gate event entry points: local append, peer push receiver, and pull loop - Reduce push/pull intervals from 30s to 10s for faster relay sync - Add useGateSSE hook for frontend EventSource integration - GateView + MeshChat use SSE for instant refresh, polling demoted to 30s fallback Latency: same-node instant, cross-node ~10s avg (was ~34s) --- backend/main.py | 81 ++++++++++++++++++- .../components/InfonetTerminal/GateView.tsx | 13 ++- frontend/src/components/MeshChat.tsx | 15 +++- frontend/src/hooks/useGateSSE.ts | 33 ++++++++ 4 files changed, 137 insertions(+), 5 deletions(-) create mode 100644 frontend/src/hooks/useGateSSE.ts diff --git a/backend/main.py b/backend/main.py index f94d51d..a1a3944 100644 --- a/backend/main.py +++ b/backend/main.py @@ -694,7 +694,7 @@ def _schedule_public_event_propagation(event_dict: dict[str, Any]) -> None: # Runs alongside the sync loop. Every PUSH_INTERVAL seconds, batches new # Infonet events and sends them via HMAC-authenticated POST to push peers. -_PEER_PUSH_INTERVAL_S = 30 +_PEER_PUSH_INTERVAL_S = 10 _PEER_PUSH_BATCH_SIZE = 50 _peer_push_last_index: dict[str, int] = {} # peer_url → last pushed event index @@ -782,7 +782,7 @@ def _http_peer_push_loop() -> None: # Complements the push loop: push sends OUR events to peers, pull fetches # THEIR events from peers (needed when this node is behind NAT). -_GATE_PULL_INTERVAL_S = 30 +_GATE_PULL_INTERVAL_S = 10 _gate_pull_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → known count} @@ -893,6 +893,7 @@ def _http_gate_pull_loop() -> None: accepted = int(result.get("accepted", 0) or 0) dups = int(result.get("duplicates", 0) or 0) if accepted > 0: + _broadcast_gate_events(gate_id, events[:accepted]) logger.info( "Gate pull: %d new event(s) for %s from %s", accepted, gate_id[:12], normalized[:40], @@ -907,6 +908,35 @@ def _http_gate_pull_loop() -> None: _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) +# ─── SSE Gate Event Broadcast ───────────────────────────────────────────── +# All connected SSE clients receive every gate event (encrypted blobs). +# Clients filter locally by gate_id — the server never learns which gates +# a client cares about (privacy-preserving broadcast). + +_gate_sse_clients: set[asyncio.Queue] = set() +_gate_sse_lock = threading.Lock() + + +def _broadcast_gate_events(gate_id: str, events: list[dict]) -> None: + """Notify all connected SSE clients about new gate events (non-blocking).""" + if not events: + return + payload = json_mod.dumps( + {"gate_id": gate_id, "count": len(events), "ts": time.time()}, + separators=(",", ":"), + ensure_ascii=False, + ) + with _gate_sse_lock: + dead: list[asyncio.Queue] = [] + for q in _gate_sse_clients: + try: + q.put_nowait(payload) + except (asyncio.QueueFull, Exception): + dead.append(q) + for q in dead: + _gate_sse_clients.discard(q) + + # ─── Background Gate Message Push Worker ───────────────────────────────── _gate_push_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → count} @@ -4008,6 +4038,7 @@ def _submit_gate_message_envelope(request: Request, gate_id: str, body: dict[str if reply_to: store_payload["reply_to"] = reply_to stored_event = gate_store.append(gate_id, gate_event) + _broadcast_gate_events(gate_id, [gate_event]) chain_event_id = chain_event_id or str(stored_event.get("event_id", "")) try: from services.mesh.mesh_rns import rns_bridge @@ -4429,9 +4460,12 @@ async def gate_peer_push(request: Request): rejected = 0 for event_gate_id, items in grouped_events.items(): result = gate_store.ingest_peer_events(event_gate_id, items) - accepted += int(result.get("accepted", 0) or 0) + a = int(result.get("accepted", 0) or 0) + accepted += a duplicates += int(result.get("duplicates", 0) or 0) rejected += int(result.get("rejected", 0) or 0) + if a > 0: + _broadcast_gate_events(event_gate_id, items[:a]) return {"ok": True, "accepted": accepted, "duplicates": duplicates, "rejected": rejected} @@ -4489,6 +4523,47 @@ async def gate_peer_pull(request: Request): return {"ok": True, "events": batch, "total": total, "gate_id": gate_id} +# --------------------------------------------------------------------------- +# SSE Gate Event Stream — real-time push of gate activity to frontends. +# Delivers ALL gate events (encrypted blobs) to every connected client. +# The client filters locally by gate_id — the server never learns which +# gates a client cares about (privacy-preserving broadcast). +# --------------------------------------------------------------------------- + + +@app.get("/api/mesh/gate/stream") +async def gate_event_stream(request: Request): + """SSE stream of all gate events for real-time delivery.""" + client_queue: asyncio.Queue = asyncio.Queue(maxsize=256) + with _gate_sse_lock: + _gate_sse_clients.add(client_queue) + + async def event_generator(): + try: + yield ": connected\n\n" + while True: + if await request.is_disconnected(): + break + try: + payload = await asyncio.wait_for(client_queue.get(), timeout=15.0) + yield f"data: {payload}\n\n" + except asyncio.TimeoutError: + yield ": keepalive\n\n" + finally: + with _gate_sse_lock: + _gate_sse_clients.discard(client_queue) + + return StreamingResponse( + event_generator(), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "Connection": "keep-alive", + "X-Accel-Buffering": "no", + }, + ) + + # --------------------------------------------------------------------------- # Peer Management API — operator endpoints for adding / removing / listing # peers without editing peer_store.json by hand. diff --git a/frontend/src/components/InfonetTerminal/GateView.tsx b/frontend/src/components/InfonetTerminal/GateView.tsx index 9d57e91..9876fc4 100644 --- a/frontend/src/components/InfonetTerminal/GateView.tsx +++ b/frontend/src/components/InfonetTerminal/GateView.tsx @@ -13,6 +13,7 @@ import { } from '@/mesh/wormholeIdentityClient'; import { gateEnvelopeDisplayText, gateEnvelopeState, isEncryptedGateEnvelope } from '@/mesh/gateEnvelope'; import { validateEventPayload } from '@/mesh/meshSchema'; +import { useGateSSE } from '@/hooks/useGateSSE'; const GATE_INTROS: Record = { infonet: @@ -357,11 +358,21 @@ export default function GateView({ } }, [gateId, hydrateMessages]); + // SSE: instant delivery when new gate events arrive + const handleSSEEvent = useCallback( + (eventGateId: string) => { + if (eventGateId === gateId) void refreshGate(); + }, + [gateId, refreshGate], + ); + useGateSSE(handleSSEEvent); + + // Fallback poll (30s) in case SSE disconnects useEffect(() => { void refreshGate(); const timer = window.setInterval(() => { void refreshGate(); - }, 8000); + }, 30_000); return () => { window.clearInterval(timer); }; diff --git a/frontend/src/components/MeshChat.tsx b/frontend/src/components/MeshChat.tsx index 5706b30..590a4ef 100644 --- a/frontend/src/components/MeshChat.tsx +++ b/frontend/src/components/MeshChat.tsx @@ -23,6 +23,7 @@ import { } from 'lucide-react'; import { API_BASE } from '@/lib/api'; import { controlPlaneJson } from '@/lib/controlPlane'; +import { useGateSSE } from '@/hooks/useGateSSE'; import { requestSecureMeshTerminalLauncherOpen } from '@/lib/meshTerminalLauncher'; import { loadIdentityBoundSensitiveValue, @@ -1111,6 +1112,17 @@ const MeshChat = React.memo(function MeshChat({ const [reps, setReps] = useState>({}); const repsRef = useRef(reps); const [votedOn, setVotedOn] = useState>({}); + + // SSE: bump tick counter to trigger immediate re-poll on gate events + const [sseGateTick, setSseGateTick] = useState(0); + const selectedGateRef = useRef(selectedGate); + selectedGateRef.current = selectedGate; + const handleSSEGateEvent = useCallback((eventGateId: string) => { + if (eventGateId === selectedGateRef.current.trim().toLowerCase()) { + setSseGateTick((t) => t + 1); + } + }, []); + useGateSSE(handleSSEGateEvent); const [gateReplyContext, setGateReplyContext] = useState(null); const [showCreateGate, setShowCreateGate] = useState(false); const [newGateId, setNewGateId] = useState(''); @@ -1699,7 +1711,7 @@ const MeshChat = React.memo(function MeshChat({ } }; poll(); - const iv = setInterval(poll, 10000); + const iv = setInterval(poll, 30_000); // SSE handles fast path; this is fallback return () => { cancelled = true; clearInterval(iv); @@ -1712,6 +1724,7 @@ const MeshChat = React.memo(function MeshChat({ gatePersonaBusy, gatePersonaPromptOpen, hydrateInfonetMessages, + sseGateTick, // SSE event triggers immediate re-poll ]); useEffect(() => { diff --git a/frontend/src/hooks/useGateSSE.ts b/frontend/src/hooks/useGateSSE.ts new file mode 100644 index 0000000..0ce2ffc --- /dev/null +++ b/frontend/src/hooks/useGateSSE.ts @@ -0,0 +1,33 @@ +import { useEffect, useRef } from 'react'; +import { API_BASE } from '@/lib/api'; + +/** + * Subscribe to the backend SSE gate-event stream. + * Delivers ALL gate events (encrypted blobs) — the client filters by gate_id locally. + * The server never learns which gates a client cares about (privacy-preserving broadcast). + * + * Falls back gracefully: if the stream fails the browser's EventSource auto-reconnects. + */ +export function useGateSSE(onEvent: (gateId: string) => void) { + const callbackRef = useRef(onEvent); + callbackRef.current = onEvent; + + useEffect(() => { + const es = new EventSource(`${API_BASE}/api/mesh/gate/stream`); + + es.onmessage = (e) => { + try { + const data = JSON.parse(e.data); + if (data.gate_id && typeof data.gate_id === 'string') { + callbackRef.current(data.gate_id); + } + } catch { + /* ignore parse errors */ + } + }; + + // Browser auto-reconnects EventSource on error — no manual retry needed. + + return () => es.close(); + }, []); +}