feat: real-time gate messages via SSE + faster push/pull intervals

- Add Server-Sent Events endpoint at GET /api/mesh/gate/stream that
  broadcasts ALL gate events to connected frontends (privacy: no
  per-gate subscriptions, clients filter locally)
- Hook SSE broadcast into all gate event entry points: local append,
  peer push receiver, and pull loop
- Reduce push/pull intervals from 30s to 10s for faster relay sync
- Add useGateSSE hook for frontend EventSource integration
- GateView + MeshChat use SSE for instant refresh, polling demoted
  to 30s fallback

Latency: same-node instant, cross-node ~10s avg (was ~34s)
This commit is contained in:
anoracleofra-code
2026-03-27 09:35:53 -06:00
parent 40a3cbdfdc
commit c81d81ec41
4 changed files with 137 additions and 5 deletions
+78 -3
View File
@@ -694,7 +694,7 @@ def _schedule_public_event_propagation(event_dict: dict[str, Any]) -> None:
# Runs alongside the sync loop. Every PUSH_INTERVAL seconds, batches new
# Infonet events and sends them via HMAC-authenticated POST to push peers.
_PEER_PUSH_INTERVAL_S = 30
_PEER_PUSH_INTERVAL_S = 10
_PEER_PUSH_BATCH_SIZE = 50
_peer_push_last_index: dict[str, int] = {} # peer_url → last pushed event index
@@ -782,7 +782,7 @@ def _http_peer_push_loop() -> None:
# Complements the push loop: push sends OUR events to peers, pull fetches
# THEIR events from peers (needed when this node is behind NAT).
_GATE_PULL_INTERVAL_S = 30
_GATE_PULL_INTERVAL_S = 10
_gate_pull_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → known count}
@@ -893,6 +893,7 @@ def _http_gate_pull_loop() -> None:
accepted = int(result.get("accepted", 0) or 0)
dups = int(result.get("duplicates", 0) or 0)
if accepted > 0:
_broadcast_gate_events(gate_id, events[:accepted])
logger.info(
"Gate pull: %d new event(s) for %s from %s",
accepted, gate_id[:12], normalized[:40],
@@ -907,6 +908,35 @@ def _http_gate_pull_loop() -> None:
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
# ─── SSE Gate Event Broadcast ─────────────────────────────────────────────
# All connected SSE clients receive every gate event (encrypted blobs).
# Clients filter locally by gate_id — the server never learns which gates
# a client cares about (privacy-preserving broadcast).
_gate_sse_clients: set[asyncio.Queue] = set()
_gate_sse_lock = threading.Lock()
def _broadcast_gate_events(gate_id: str, events: list[dict]) -> None:
"""Notify all connected SSE clients about new gate events (non-blocking)."""
if not events:
return
payload = json_mod.dumps(
{"gate_id": gate_id, "count": len(events), "ts": time.time()},
separators=(",", ":"),
ensure_ascii=False,
)
with _gate_sse_lock:
dead: list[asyncio.Queue] = []
for q in _gate_sse_clients:
try:
q.put_nowait(payload)
except (asyncio.QueueFull, Exception):
dead.append(q)
for q in dead:
_gate_sse_clients.discard(q)
# ─── Background Gate Message Push Worker ─────────────────────────────────
_gate_push_last_count: dict[str, dict[str, int]] = {} # peer → {gate_id → count}
@@ -4008,6 +4038,7 @@ def _submit_gate_message_envelope(request: Request, gate_id: str, body: dict[str
if reply_to:
store_payload["reply_to"] = reply_to
stored_event = gate_store.append(gate_id, gate_event)
_broadcast_gate_events(gate_id, [gate_event])
chain_event_id = chain_event_id or str(stored_event.get("event_id", ""))
try:
from services.mesh.mesh_rns import rns_bridge
@@ -4429,9 +4460,12 @@ async def gate_peer_push(request: Request):
rejected = 0
for event_gate_id, items in grouped_events.items():
result = gate_store.ingest_peer_events(event_gate_id, items)
accepted += int(result.get("accepted", 0) or 0)
a = int(result.get("accepted", 0) or 0)
accepted += a
duplicates += int(result.get("duplicates", 0) or 0)
rejected += int(result.get("rejected", 0) or 0)
if a > 0:
_broadcast_gate_events(event_gate_id, items[:a])
return {"ok": True, "accepted": accepted, "duplicates": duplicates, "rejected": rejected}
@@ -4489,6 +4523,47 @@ async def gate_peer_pull(request: Request):
return {"ok": True, "events": batch, "total": total, "gate_id": gate_id}
# ---------------------------------------------------------------------------
# SSE Gate Event Stream — real-time push of gate activity to frontends.
# Delivers ALL gate events (encrypted blobs) to every connected client.
# The client filters locally by gate_id — the server never learns which
# gates a client cares about (privacy-preserving broadcast).
# ---------------------------------------------------------------------------
@app.get("/api/mesh/gate/stream")
async def gate_event_stream(request: Request):
"""SSE stream of all gate events for real-time delivery."""
client_queue: asyncio.Queue = asyncio.Queue(maxsize=256)
with _gate_sse_lock:
_gate_sse_clients.add(client_queue)
async def event_generator():
try:
yield ": connected\n\n"
while True:
if await request.is_disconnected():
break
try:
payload = await asyncio.wait_for(client_queue.get(), timeout=15.0)
yield f"data: {payload}\n\n"
except asyncio.TimeoutError:
yield ": keepalive\n\n"
finally:
with _gate_sse_lock:
_gate_sse_clients.discard(client_queue)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
},
)
# ---------------------------------------------------------------------------
# Peer Management API — operator endpoints for adding / removing / listing
# peers without editing peer_store.json by hand.
@@ -13,6 +13,7 @@ import {
} from '@/mesh/wormholeIdentityClient';
import { gateEnvelopeDisplayText, gateEnvelopeState, isEncryptedGateEnvelope } from '@/mesh/gateEnvelope';
import { validateEventPayload } from '@/mesh/meshSchema';
import { useGateSSE } from '@/hooks/useGateSSE';
const GATE_INTROS: Record<string, string> = {
infonet:
@@ -357,11 +358,21 @@ export default function GateView({
}
}, [gateId, hydrateMessages]);
// SSE: instant delivery when new gate events arrive
const handleSSEEvent = useCallback(
(eventGateId: string) => {
if (eventGateId === gateId) void refreshGate();
},
[gateId, refreshGate],
);
useGateSSE(handleSSEEvent);
// Fallback poll (30s) in case SSE disconnects
useEffect(() => {
void refreshGate();
const timer = window.setInterval(() => {
void refreshGate();
}, 8000);
}, 30_000);
return () => {
window.clearInterval(timer);
};
+14 -1
View File
@@ -23,6 +23,7 @@ import {
} from 'lucide-react';
import { API_BASE } from '@/lib/api';
import { controlPlaneJson } from '@/lib/controlPlane';
import { useGateSSE } from '@/hooks/useGateSSE';
import { requestSecureMeshTerminalLauncherOpen } from '@/lib/meshTerminalLauncher';
import {
loadIdentityBoundSensitiveValue,
@@ -1111,6 +1112,17 @@ const MeshChat = React.memo(function MeshChat({
const [reps, setReps] = useState<Record<string, number>>({});
const repsRef = useRef(reps);
const [votedOn, setVotedOn] = useState<Record<string, 1 | -1>>({});
// SSE: bump tick counter to trigger immediate re-poll on gate events
const [sseGateTick, setSseGateTick] = useState(0);
const selectedGateRef = useRef(selectedGate);
selectedGateRef.current = selectedGate;
const handleSSEGateEvent = useCallback((eventGateId: string) => {
if (eventGateId === selectedGateRef.current.trim().toLowerCase()) {
setSseGateTick((t) => t + 1);
}
}, []);
useGateSSE(handleSSEGateEvent);
const [gateReplyContext, setGateReplyContext] = useState<GateReplyContext | null>(null);
const [showCreateGate, setShowCreateGate] = useState(false);
const [newGateId, setNewGateId] = useState('');
@@ -1699,7 +1711,7 @@ const MeshChat = React.memo(function MeshChat({
}
};
poll();
const iv = setInterval(poll, 10000);
const iv = setInterval(poll, 30_000); // SSE handles fast path; this is fallback
return () => {
cancelled = true;
clearInterval(iv);
@@ -1712,6 +1724,7 @@ const MeshChat = React.memo(function MeshChat({
gatePersonaBusy,
gatePersonaPromptOpen,
hydrateInfonetMessages,
sseGateTick, // SSE event triggers immediate re-poll
]);
useEffect(() => {
+33
View File
@@ -0,0 +1,33 @@
import { useEffect, useRef } from 'react';
import { API_BASE } from '@/lib/api';
/**
* Subscribe to the backend SSE gate-event stream.
* Delivers ALL gate events (encrypted blobs) — the client filters by gate_id locally.
* The server never learns which gates a client cares about (privacy-preserving broadcast).
*
* Falls back gracefully: if the stream fails the browser's EventSource auto-reconnects.
*/
export function useGateSSE(onEvent: (gateId: string) => void) {
const callbackRef = useRef(onEvent);
callbackRef.current = onEvent;
useEffect(() => {
const es = new EventSource(`${API_BASE}/api/mesh/gate/stream`);
es.onmessage = (e) => {
try {
const data = JSON.parse(e.data);
if (data.gate_id && typeof data.gate_id === 'string') {
callbackRef.current(data.gate_id);
}
} catch {
/* ignore parse errors */
}
};
// Browser auto-reconnects EventSource on error — no manual retry needed.
return () => es.close();
}, []);
}