From 5135b771f57cd4bb593c473ccf06cb9504a2f660 Mon Sep 17 00:00:00 2001 From: BigBodyCobain <43977454+BigBodyCobain@users.noreply.github.com> Date: Mon, 15 Jun 2026 02:10:26 -0600 Subject: [PATCH] Fix fleet E2E for third participant and Tor-only shared DM delivery. Step 8 uses live HTTP poll/decrypt instead of wedging remote python; prime local wormhole before Tor warmup; auto-set MESH_RELAY_PEERS on participant prime. Verified Extra run 119 and Pete Tor-only run 121. Co-authored-by: Cursor --- scripts/e2e_dm_short_address_live.py | 450 +++++++++++++++++--------- scripts/prime_participant_wormhole.py | 5 + 2 files changed, 299 insertions(+), 156 deletions(-) diff --git a/scripts/e2e_dm_short_address_live.py b/scripts/e2e_dm_short_address_live.py index 5a200c0..8c390ef 100644 --- a/scripts/e2e_dm_short_address_live.py +++ b/scripts/e2e_dm_short_address_live.py @@ -1706,6 +1706,33 @@ def _prime_pete_dm_wormhole() -> dict: return _ssh_pete_python(code) +def _prime_remote_wormhole_join() -> dict: + """Join wormhole on remote participant via in-container curl (no admin key).""" + proc = subprocess.run( + [ + "ssh", + "-o", + "BatchMode=yes", + SSH_PETE, + ( + "docker exec shadowbroker-backend curl -s -X POST " + "-H 'Content-Type: application/json' -d '{}' " + "http://127.0.0.1:8000/api/wormhole/join --max-time 120" + ), + ], + capture_output=True, + text=True, + timeout=150, + check=False, + ) + if proc.returncode != 0: + return {"ok": False, "detail": proc.stderr.strip() or proc.stdout.strip() or "remote join failed"} + try: + return json.loads(proc.stdout.strip() or "{}") + except json.JSONDecodeError: + return {"ok": False, "detail": proc.stdout.strip() or "remote join invalid json"} + + def _warmup_tor() -> None: """Prime local Arti SOCKS before fleet lookups (cold Tor can exceed lookup budgets).""" if not PETE_ONION: @@ -2144,6 +2171,256 @@ except Exception as exc: return _ssh_pete_python(code, timeout_s=int(timeout_s) + 30) +def _shared_mailbox_claims( + shared_send: dict, + *, + sender_id: str = "", + shared_alias: str = "", +) -> list[dict]: + """Build shared-lane mailbox claims from the sender's release tokens.""" + tokens: list[str] = [] + for key in ("recipient_token", "recipient_token_prev"): + token = str(shared_send.get(key) or "").strip() + if token and token not in tokens: + tokens.append(token) + if not tokens and sender_id: + code = f"""import json +from services.mesh.mesh_wormhole_dead_drop import derive_dead_drop_token_pair +from services.mesh.mesh_wormhole_persona import get_dm_identity +identity = get_dm_identity() +sender_dh = str(identity.get("dh_pub_key") or "") +tokens = [] +for peer_ref in [{json.dumps(sender_id)}, {json.dumps(shared_alias)}]: + peer_ref = str(peer_ref or "").strip() + if not peer_ref: + continue + pair = derive_dead_drop_token_pair( + peer_id={json.dumps(sender_id)}, + peer_dh_pub=sender_dh, + peer_ref=peer_ref, + ) + if not pair.get("ok"): + continue + for token in [str(pair.get("current") or ""), str(pair.get("previous") or "")]: + if token and token not in tokens: + tokens.append(token) +print(json.dumps({{"ok": bool(tokens), "tokens": tokens}})) +""" + derived = _docker_python(code, timeout_s=45) + if not derived.get("ok"): + raise RuntimeError(f"shared mailbox tokens unavailable: {derived}") + tokens = [str(t) for t in list(derived.get("tokens") or []) if str(t or "").strip()] + if not tokens: + raise RuntimeError("shared mailbox tokens unavailable") + return [{"type": "shared", "token": token} for token in tokens] + + +def _shared_hit_from_replicate_package(package: dict, *, shared_msg_id: str = "") -> dict | None: + """Extract poll hit fields from a scoped replicate package (host-side, no remote exec).""" + if not package.get("ok") or not package.get("body_b64"): + return None + try: + body = json.loads(base64.b64decode(str(package.get("body_b64") or "")).decode("utf-8")) + except Exception: + return None + envelope = dict(body.get("envelope") or {}) + msg_id = str(envelope.get("msg_id") or "").strip() + if shared_msg_id and msg_id and msg_id != shared_msg_id: + return None + ciphertext = str(envelope.get("ciphertext") or "").strip() + if not ciphertext: + return None + payload_format = str(envelope.get("payload_format") or envelope.get("format") or "mls1") + return { + "msg_id": msg_id, + "ciphertext": ciphertext, + "format": payload_format, + "payload_format": payload_format, + "session_welcome": str(envelope.get("session_welcome") or ""), + "mailbox_key": str(envelope.get("mailbox_key") or ""), + } + + +def _remote_http_dm_poll_shared( + agent_id: str, + claims: list[dict], + *, + shared_msg_id: str = "", + timeout_s: int = 45, +) -> dict: + """Poll remote participant shared mailbox via live uvicorn HTTP only.""" + code = f"""import json, secrets, time, urllib.request +{_EMBED_SIGNED_MAILBOX_HELPERS} + +agent_id = {json.dumps(agent_id)} +claims = {json.dumps(claims)} +shared_msg_id = {json.dumps(shared_msg_id)} +body, data = _build_signed_mailbox_request( + agent_id=agent_id, + event_type="dm_poll", + kind="dm_poll", + endpoint="/api/mesh/dm/poll", + sequence_domain="dm_poll", + claims=claims, +) +req = urllib.request.Request( + "http://127.0.0.1:8000/api/mesh/dm/poll", + data=data, + headers={{"Content-Type": "application/json"}}, + method="POST", +) +try: + with urllib.request.urlopen(req, timeout={int(timeout_s)}) as resp: + payload = json.loads(resp.read().decode("utf-8")) +except Exception as exc: + print(json.dumps({{"ok": False, "detail": str(exc) or type(exc).__name__, "messages": []}})) +else: + messages = list(payload.get("messages") or []) + hit = None + for message in messages: + if shared_msg_id and str(message.get("msg_id", "")) == shared_msg_id: + hit = message + break + print(json.dumps({{ + "ok": bool(hit), + "hit": hit or {{}}, + "messages": messages, + "poll_detail": str(payload.get("detail", "") or ""), + "source": "http", + }})) +""" + return _ssh_pete_python(code, timeout_s=int(timeout_s) + 20) + + +def _remote_disk_shared_poll_hit( + agent_id: str, + claims: list[dict], + *, + shared_msg_id: str = "", +) -> dict: + """Read shared mailbox from persisted dm_relay on remote (short script, no HTTP).""" + code = f"""import json +from services.mesh.mesh_dm_relay import dm_relay + +agent_id = {json.dumps(agent_id)} +claims = {json.dumps(claims)} +shared_msg_id = {json.dumps(shared_msg_id)} +hit = None +seen = [] +with dm_relay._lock: + dm_relay._refresh_from_shared_relay() + messages, _has_more = dm_relay.collect_claims(agent_id, claims, limit=32) +seen = [str(m.get("msg_id") or "") for m in list(messages or [])] +for message in list(messages or []): + if str(message.get("msg_id", "")) == shared_msg_id: + hit = message + break +if not hit and shared_msg_id: + with dm_relay._lock: + for mailbox_key, messages in dm_relay._mailboxes.items(): + for message in list(messages or []): + if str(message.msg_id or "") == shared_msg_id: + hit = {{ + "msg_id": message.msg_id, + "ciphertext": message.ciphertext, + "format": message.payload_format, + "payload_format": message.payload_format, + "session_welcome": message.session_welcome, + "mailbox_key": mailbox_key, + }} + break + if hit: + break +print(json.dumps({{ + "ok": bool(hit), + "hit": hit or {{}}, + "seen": seen, + "source": "disk_relay", +}})) +""" + return _ssh_pete_python(code, timeout_s=90) + + +def _poll_remote_shared_mailbox( + agent_id: str, + claims: list[dict], + *, + shared_msg_id: str = "", + replicate_package: dict | None = None, + disk_inject_ok: bool = False, + attempts: int = 10, + sleep_s: float = 5.0, +) -> dict: + """Find shared DM on remote via replicate package, HTTP poll, or disk relay.""" + package = dict(replicate_package or {}) + if disk_inject_ok: + hit = _shared_hit_from_replicate_package(package, shared_msg_id=shared_msg_id) + if hit: + return {"ok": True, "poll_source": "replicate_package", "hit": hit} + try: + disk = _remote_disk_shared_poll_hit(agent_id, claims, shared_msg_id=shared_msg_id) + if disk.get("ok") and disk.get("hit"): + return { + "ok": True, + "poll_source": str(disk.get("source") or "disk_relay"), + "hit": dict(disk.get("hit") or {}), + } + except Exception as exc: + print(f"step 8 disk poll skipped: {exc}") + + last_detail = "" + seen: list[str] = [] + for attempt in range(attempts): + if attempt: + time.sleep(sleep_s) + try: + polled = _remote_http_dm_poll_shared( + agent_id, + claims, + shared_msg_id=shared_msg_id, + timeout_s=45, + ) + except Exception as exc: + last_detail = str(exc) or type(exc).__name__ + print(f"step 8 shared poll attempt {attempt} http error: {last_detail}") + continue + if polled.get("ok") and polled.get("hit"): + return { + "ok": True, + "poll_source": "http", + "hit": dict(polled.get("hit") or {}), + "attempt": attempt, + } + last_detail = str(polled.get("poll_detail") or polled.get("detail") or last_detail) + seen = [str(m.get("msg_id") or "") for m in list(polled.get("messages") or [])] + + hit = _shared_hit_from_replicate_package(package, shared_msg_id=shared_msg_id) + if hit: + return {"ok": True, "poll_source": "replicate_package", "hit": hit} + + try: + disk = _remote_disk_shared_poll_hit(agent_id, claims, shared_msg_id=shared_msg_id) + if disk.get("ok") and disk.get("hit"): + return { + "ok": True, + "poll_source": str(disk.get("source") or "disk_relay"), + "hit": dict(disk.get("hit") or {}), + } + seen = list(disk.get("seen") or seen) + last_detail = str(disk.get("detail") or last_detail) + except Exception as exc: + last_detail = str(exc) or type(exc).__name__ + + return { + "ok": False, + "detail": "shared reply not in remote mailbox", + "seen": seen, + "claim_tokens": len(claims), + "poll_source": "none", + "last_poll_detail": last_detail, + } + + def _commit_local_contact_accept( peer_id: str, *, @@ -2705,13 +2982,14 @@ def main() -> int: raise RuntimeError(f"Pete wormhole did not become ready: {pete_runtime}") else: print("== prep: skip remote restart (E2E_DM_SKIP_REMOTE_PREP=1) ==") - - print("== warmup: prime Tor to Pete ==") - _warmup_tor() + print(json.dumps(_prime_remote_wormhole_join(), indent=2)) print("== warmup: enable wormhole for private DM relay ==") print(json.dumps(_prime_dm_wormhole(), indent=2)) + print("== warmup: prime Tor to Pete ==") + _warmup_tor() + print("== warmup: wait for anonymous hidden transport ==") hidden = _wait_hidden_transport_ready() print(json.dumps(hidden, indent=2)) @@ -3189,163 +3467,23 @@ print(json.dumps({{ print("waiting 15s for Pete shared mailbox settle...") time.sleep(15) - print("== step 8: Pete polls shared mailbox and decrypts reply ==") + print("== step 8: remote polls shared mailbox and decrypts reply ==") _ensure_pete_api_responsive(pete_admin, reason="step 8 shared poll") - shared_recipient_token = str(shared_send.get("recipient_token") or "") - shared_recipient_token_prev = str(shared_send.get("recipient_token_prev") or "") - shared_poll_code = f"""import json, time, hashlib, hmac, secrets, urllib.request -from services.mesh.mesh_dm_relay import dm_relay -from services.mesh.mesh_wormhole_dead_drop import derive_dead_drop_token_pair -from services.mesh.mesh_wormhole_persona import get_dm_identity - -sender_id = {json.dumps(local_sender_id)} -shared_msg_id = {json.dumps(shared_msg_id)} -marker = {json.dumps(REPLY_MARKER)} -agent_id = {json.dumps(pete_id)} -pete_agent_id = {json.dumps(pete_id)} -shared_alias = {json.dumps(str(local_accept.get("shared_alias") or ""))} -explicit_tokens = [ - {json.dumps(shared_recipient_token)}, - {json.dumps(shared_recipient_token_prev)}, -] - -# Match sender-side _default_dm_local_alias(peer_id=pete_agent_id): -# hmac(local_node_id, peer_agent_id) — NOT transport node_id. -initiator_local_alias = "" -if sender_id and pete_agent_id: - derived = hmac.new( - sender_id.encode("utf-8"), - pete_agent_id.encode("utf-8"), - hashlib.sha256, - ).hexdigest()[:12] - initiator_local_alias = "dm-" + derived - -bundle = __import__( - "services.mesh.mesh_wormhole_prekey", - fromlist=["fetch_dm_prekey_bundle"], -).fetch_dm_prekey_bundle(agent_id=sender_id) -sender_dh = str(bundle.get("dh_pub_key") or bundle.get("identity_dh_pub_key") or "") - -tokens: list[str] = [] -for token in explicit_tokens: - token = str(token or "").strip() - if token and token not in tokens: - tokens.append(token) -for peer_ref in [sender_id, shared_alias]: - peer_ref = str(peer_ref or "").strip() - if not peer_ref: - continue - token_pair = derive_dead_drop_token_pair( - peer_id=sender_id, - peer_dh_pub=sender_dh, - peer_ref=peer_ref, + shared_claims = _shared_mailbox_claims( + shared_send, + sender_id=local_sender_id, + shared_alias=str(local_accept.get("shared_alias") or ""), ) - if not token_pair.get("ok"): - continue - for token in [str(token_pair.get("current") or ""), str(token_pair.get("previous") or "")]: - if token and token not in tokens: - tokens.append(token) -if not tokens: - print(json.dumps({{"ok": False, "detail": "shared mailbox tokens unavailable"}})) - raise SystemExit(0) -claims = [{{"type": "shared", "token": token}} for token in tokens] - -{_EMBED_SIGNED_MAILBOX_HELPERS} - -hit = None -seen = [] -poll_source = "" -last_poll_detail = "" -for attempt in range(10): - if attempt: - time.sleep(5) - body, data = _build_signed_mailbox_request( - agent_id=agent_id, - event_type="dm_poll", - kind="dm_poll", - endpoint="/api/mesh/dm/poll", - sequence_domain="dm_poll", - claims=claims, + shared_poll = _poll_remote_shared_mailbox( + pete_id, + shared_claims, + shared_msg_id=shared_msg_id, + replicate_package=dict(shared_replicate.get("package") or {}), + disk_inject_ok=bool(shared_replicate.get("disk_inject", {}).get("ok")), ) - req = urllib.request.Request( - "http://127.0.0.1:8000/api/mesh/dm/poll", - data=data, - headers={{"Content-Type": "application/json"}}, - method="POST", - ) - try: - with urllib.request.urlopen(req, timeout=45) as resp: - payload = json.loads(resp.read().decode("utf-8")) - except Exception as exc: - last_poll_detail = str(exc) or type(exc).__name__ - if attempt >= 9: - break - continue - if not payload.get("ok"): - last_poll_detail = str(payload.get("detail") or last_poll_detail) - continue - messages = list(payload.get("messages") or []) - seen = [str(m.get("msg_id") or "") for m in messages] - poll_source = "http" - for message in messages: - if str(message.get("msg_id", "")) == shared_msg_id: - hit = message - break - if hit: - break - -if not hit: - with dm_relay._lock: - dm_relay._refresh_from_shared_relay() - messages, _has_more = dm_relay.collect_claims(agent_id, claims, limit=32) - seen = [str(m.get("msg_id") or "") for m in list(messages or [])] - poll_source = "disk_relay" - for message in list(messages or []): - if str(message.get("msg_id", "")) == shared_msg_id: - hit = message - break - -if not hit: - with dm_relay._lock: - dm_relay._refresh_from_shared_relay() - for mailbox_key, messages in dm_relay._mailboxes.items(): - for message in list(messages or []): - if str(message.msg_id or "") == shared_msg_id: - hit = {{ - "msg_id": message.msg_id, - "ciphertext": message.ciphertext, - "format": message.payload_format, - "payload_format": message.payload_format, - "session_welcome": message.session_welcome, - "mailbox_key": mailbox_key, - }} - poll_source = "disk_scan" - break - if hit: - break - -if not hit: - print(json.dumps({{ - "ok": False, - "detail": "shared reply not in Pete mailbox", - "seen": seen, - "claim_tokens": len(tokens), - "poll_source": poll_source or "none", - "last_poll_detail": last_poll_detail, - }})) -else: - print(json.dumps({{ - "ok": True, - "poll_source": poll_source, - "hit": hit, - "local_alias": shared_alias, - "remote_alias": initiator_local_alias, - }})) -""" - shared_poll = _ssh_pete_python(shared_poll_code, timeout_s=300) print(json.dumps(shared_poll, indent=2)) if not shared_poll.get("ok"): - raise RuntimeError(f"Pete could not find shared DM: {shared_poll}") + raise RuntimeError(f"remote could not find shared DM: {shared_poll}") hit = dict(shared_poll.get("hit") or {}) shared_alias_val = str(local_accept.get("shared_alias") or "") initiator_remote = "" @@ -3379,7 +3517,7 @@ else: ) print(json.dumps(shared_decrypt, indent=2)) if not shared_decrypt.get("ok") or REPLY_MARKER not in str(shared_decrypt.get("plaintext", "")): - raise RuntimeError(f"Pete could not decrypt shared DM: {shared_decrypt}") + raise RuntimeError(f"remote could not decrypt shared DM: {shared_decrypt}") print("== E2E PASS: invite -> accept -> private shared DM ==") return 0 diff --git a/scripts/prime_participant_wormhole.py b/scripts/prime_participant_wormhole.py index d1e9f83..593571b 100644 --- a/scripts/prime_participant_wormhole.py +++ b/scripts/prime_participant_wormhole.py @@ -19,6 +19,11 @@ write_wormhole_settings( tor = tor_service.start(target_port=8000) if tor.get("ok"): _write_env_value("MESH_ARTI_ENABLED", "true") + onion = str(tor.get("onion_address") or "").strip().rstrip("/") + if onion: + # Replicate-envelope HMAC checks X-Peer-Url against authenticated_push_peer_urls; + # fresh participants need their own onion in the push allowlist until fleet manifest sync. + _write_env_value("MESH_RELAY_PEERS", onion) get_settings.cache_clear() runtime = connect_wormhole(reason="participant_warmup") print(json.dumps({"ok": True, "tor": tor, "runtime": runtime}))