From 8f7bb417dbc996fc7b8ac3cdf4e1edc740f64bb4 Mon Sep 17 00:00:00 2001 From: anoracleofra-code Date: Sat, 28 Mar 2026 07:05:19 -0600 Subject: [PATCH] fix: thread-safe SSE broadcast + node enabled by default - SSE broadcast now uses loop.call_soon_threadsafe() when called from background threads (gate pull/push loops), fixing silent notification failures for peer-synced messages - Chain hydration path now broadcasts SSE so gate messages arriving via public chain sync trigger frontend refresh - Node participation defaults to enabled so fresh installs automatically join the mesh network (push + pull) --- backend/main.py | 38 +++++++++++++++++++++++++++++-- backend/services/node_settings.py | 2 +- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/backend/main.py b/backend/main.py index 80e9f26..7d118d0 100644 --- a/backend/main.py +++ b/backend/main.py @@ -459,6 +459,7 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int: from services.mesh.mesh_hashchain import gate_store count = 0 + gate_ids_updated: set[str] = set() for evt in events: if evt.get("event_type") != "gate_message": continue @@ -471,6 +472,13 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int: # don't corrupt the chain event's payload hash. gate_store.append(gate_id, copy.deepcopy(evt)) count += 1 + gate_ids_updated.add(gate_id) + except Exception: + pass + # Notify SSE clients so frontends refresh immediately. + for gid in gate_ids_updated: + try: + _broadcast_gate_events(gid, [{"hydrated": True}]) except Exception: pass return count @@ -920,7 +928,12 @@ _gate_sse_lock = threading.Lock() def _broadcast_gate_events(gate_id: str, events: list[dict]) -> None: - """Notify all connected SSE clients about new gate events (non-blocking).""" + """Notify all connected SSE clients about new gate events (non-blocking). + + Called from background daemon threads (push/pull loops) AND the FastAPI + event-loop thread. asyncio.Queue.put_nowait() is NOT thread-safe, so + background callers schedule via loop.call_soon_threadsafe(). + """ if not events: return payload = json_mod.dumps( @@ -928,11 +941,32 @@ def _broadcast_gate_events(gate_id: str, events: list[dict]) -> None: separators=(",", ":"), ensure_ascii=False, ) + # Detect whether we're already on the event-loop thread. + try: + asyncio.get_running_loop() + _in_loop = True + except RuntimeError: + _in_loop = False + + _loop: asyncio.AbstractEventLoop | None = None + if not _in_loop: + try: + _loop = asyncio.get_event_loop() + if not _loop.is_running(): + _loop = None + except RuntimeError: + _loop = None + with _gate_sse_lock: dead: list[asyncio.Queue] = [] for q in _gate_sse_clients: try: - q.put_nowait(payload) + if _in_loop: + q.put_nowait(payload) + elif _loop is not None: + _loop.call_soon_threadsafe(q.put_nowait, payload) + else: + q.put_nowait(payload) # best-effort fallback except (asyncio.QueueFull, Exception): dead.append(q) for q in dead: diff --git a/backend/services/node_settings.py b/backend/services/node_settings.py index f9fe1a9..a13ad69 100644 --- a/backend/services/node_settings.py +++ b/backend/services/node_settings.py @@ -10,7 +10,7 @@ _cache: dict | None = None _cache_ts: float = 0.0 _CACHE_TTL = 5.0 _DEFAULTS = { - "enabled": False, + "enabled": True, }