Fix fleet E2E for third participant and Tor-only shared DM delivery.

Step 8 uses live HTTP poll/decrypt instead of wedging remote python;
prime local wormhole before Tor warmup; auto-set MESH_RELAY_PEERS on
participant prime. Verified Extra run 119 and Pete Tor-only run 121.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-15 02:10:26 -06:00
parent 7151563a41
commit 5135b771f5
2 changed files with 299 additions and 156 deletions
+294 -156
View File
@@ -1706,6 +1706,33 @@ def _prime_pete_dm_wormhole() -> dict:
return _ssh_pete_python(code)
def _prime_remote_wormhole_join() -> dict:
"""Join wormhole on remote participant via in-container curl (no admin key)."""
proc = subprocess.run(
[
"ssh",
"-o",
"BatchMode=yes",
SSH_PETE,
(
"docker exec shadowbroker-backend curl -s -X POST "
"-H 'Content-Type: application/json' -d '{}' "
"http://127.0.0.1:8000/api/wormhole/join --max-time 120"
),
],
capture_output=True,
text=True,
timeout=150,
check=False,
)
if proc.returncode != 0:
return {"ok": False, "detail": proc.stderr.strip() or proc.stdout.strip() or "remote join failed"}
try:
return json.loads(proc.stdout.strip() or "{}")
except json.JSONDecodeError:
return {"ok": False, "detail": proc.stdout.strip() or "remote join invalid json"}
def _warmup_tor() -> None:
"""Prime local Arti SOCKS before fleet lookups (cold Tor can exceed lookup budgets)."""
if not PETE_ONION:
@@ -2144,6 +2171,256 @@ except Exception as exc:
return _ssh_pete_python(code, timeout_s=int(timeout_s) + 30)
def _shared_mailbox_claims(
shared_send: dict,
*,
sender_id: str = "",
shared_alias: str = "",
) -> list[dict]:
"""Build shared-lane mailbox claims from the sender's release tokens."""
tokens: list[str] = []
for key in ("recipient_token", "recipient_token_prev"):
token = str(shared_send.get(key) or "").strip()
if token and token not in tokens:
tokens.append(token)
if not tokens and sender_id:
code = f"""import json
from services.mesh.mesh_wormhole_dead_drop import derive_dead_drop_token_pair
from services.mesh.mesh_wormhole_persona import get_dm_identity
identity = get_dm_identity()
sender_dh = str(identity.get("dh_pub_key") or "")
tokens = []
for peer_ref in [{json.dumps(sender_id)}, {json.dumps(shared_alias)}]:
peer_ref = str(peer_ref or "").strip()
if not peer_ref:
continue
pair = derive_dead_drop_token_pair(
peer_id={json.dumps(sender_id)},
peer_dh_pub=sender_dh,
peer_ref=peer_ref,
)
if not pair.get("ok"):
continue
for token in [str(pair.get("current") or ""), str(pair.get("previous") or "")]:
if token and token not in tokens:
tokens.append(token)
print(json.dumps({{"ok": bool(tokens), "tokens": tokens}}))
"""
derived = _docker_python(code, timeout_s=45)
if not derived.get("ok"):
raise RuntimeError(f"shared mailbox tokens unavailable: {derived}")
tokens = [str(t) for t in list(derived.get("tokens") or []) if str(t or "").strip()]
if not tokens:
raise RuntimeError("shared mailbox tokens unavailable")
return [{"type": "shared", "token": token} for token in tokens]
def _shared_hit_from_replicate_package(package: dict, *, shared_msg_id: str = "") -> dict | None:
"""Extract poll hit fields from a scoped replicate package (host-side, no remote exec)."""
if not package.get("ok") or not package.get("body_b64"):
return None
try:
body = json.loads(base64.b64decode(str(package.get("body_b64") or "")).decode("utf-8"))
except Exception:
return None
envelope = dict(body.get("envelope") or {})
msg_id = str(envelope.get("msg_id") or "").strip()
if shared_msg_id and msg_id and msg_id != shared_msg_id:
return None
ciphertext = str(envelope.get("ciphertext") or "").strip()
if not ciphertext:
return None
payload_format = str(envelope.get("payload_format") or envelope.get("format") or "mls1")
return {
"msg_id": msg_id,
"ciphertext": ciphertext,
"format": payload_format,
"payload_format": payload_format,
"session_welcome": str(envelope.get("session_welcome") or ""),
"mailbox_key": str(envelope.get("mailbox_key") or ""),
}
def _remote_http_dm_poll_shared(
agent_id: str,
claims: list[dict],
*,
shared_msg_id: str = "",
timeout_s: int = 45,
) -> dict:
"""Poll remote participant shared mailbox via live uvicorn HTTP only."""
code = f"""import json, secrets, time, urllib.request
{_EMBED_SIGNED_MAILBOX_HELPERS}
agent_id = {json.dumps(agent_id)}
claims = {json.dumps(claims)}
shared_msg_id = {json.dumps(shared_msg_id)}
body, data = _build_signed_mailbox_request(
agent_id=agent_id,
event_type="dm_poll",
kind="dm_poll",
endpoint="/api/mesh/dm/poll",
sequence_domain="dm_poll",
claims=claims,
)
req = urllib.request.Request(
"http://127.0.0.1:8000/api/mesh/dm/poll",
data=data,
headers={{"Content-Type": "application/json"}},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout={int(timeout_s)}) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
print(json.dumps({{"ok": False, "detail": str(exc) or type(exc).__name__, "messages": []}}))
else:
messages = list(payload.get("messages") or [])
hit = None
for message in messages:
if shared_msg_id and str(message.get("msg_id", "")) == shared_msg_id:
hit = message
break
print(json.dumps({{
"ok": bool(hit),
"hit": hit or {{}},
"messages": messages,
"poll_detail": str(payload.get("detail", "") or ""),
"source": "http",
}}))
"""
return _ssh_pete_python(code, timeout_s=int(timeout_s) + 20)
def _remote_disk_shared_poll_hit(
agent_id: str,
claims: list[dict],
*,
shared_msg_id: str = "",
) -> dict:
"""Read shared mailbox from persisted dm_relay on remote (short script, no HTTP)."""
code = f"""import json
from services.mesh.mesh_dm_relay import dm_relay
agent_id = {json.dumps(agent_id)}
claims = {json.dumps(claims)}
shared_msg_id = {json.dumps(shared_msg_id)}
hit = None
seen = []
with dm_relay._lock:
dm_relay._refresh_from_shared_relay()
messages, _has_more = dm_relay.collect_claims(agent_id, claims, limit=32)
seen = [str(m.get("msg_id") or "") for m in list(messages or [])]
for message in list(messages or []):
if str(message.get("msg_id", "")) == shared_msg_id:
hit = message
break
if not hit and shared_msg_id:
with dm_relay._lock:
for mailbox_key, messages in dm_relay._mailboxes.items():
for message in list(messages or []):
if str(message.msg_id or "") == shared_msg_id:
hit = {{
"msg_id": message.msg_id,
"ciphertext": message.ciphertext,
"format": message.payload_format,
"payload_format": message.payload_format,
"session_welcome": message.session_welcome,
"mailbox_key": mailbox_key,
}}
break
if hit:
break
print(json.dumps({{
"ok": bool(hit),
"hit": hit or {{}},
"seen": seen,
"source": "disk_relay",
}}))
"""
return _ssh_pete_python(code, timeout_s=90)
def _poll_remote_shared_mailbox(
agent_id: str,
claims: list[dict],
*,
shared_msg_id: str = "",
replicate_package: dict | None = None,
disk_inject_ok: bool = False,
attempts: int = 10,
sleep_s: float = 5.0,
) -> dict:
"""Find shared DM on remote via replicate package, HTTP poll, or disk relay."""
package = dict(replicate_package or {})
if disk_inject_ok:
hit = _shared_hit_from_replicate_package(package, shared_msg_id=shared_msg_id)
if hit:
return {"ok": True, "poll_source": "replicate_package", "hit": hit}
try:
disk = _remote_disk_shared_poll_hit(agent_id, claims, shared_msg_id=shared_msg_id)
if disk.get("ok") and disk.get("hit"):
return {
"ok": True,
"poll_source": str(disk.get("source") or "disk_relay"),
"hit": dict(disk.get("hit") or {}),
}
except Exception as exc:
print(f"step 8 disk poll skipped: {exc}")
last_detail = ""
seen: list[str] = []
for attempt in range(attempts):
if attempt:
time.sleep(sleep_s)
try:
polled = _remote_http_dm_poll_shared(
agent_id,
claims,
shared_msg_id=shared_msg_id,
timeout_s=45,
)
except Exception as exc:
last_detail = str(exc) or type(exc).__name__
print(f"step 8 shared poll attempt {attempt} http error: {last_detail}")
continue
if polled.get("ok") and polled.get("hit"):
return {
"ok": True,
"poll_source": "http",
"hit": dict(polled.get("hit") or {}),
"attempt": attempt,
}
last_detail = str(polled.get("poll_detail") or polled.get("detail") or last_detail)
seen = [str(m.get("msg_id") or "") for m in list(polled.get("messages") or [])]
hit = _shared_hit_from_replicate_package(package, shared_msg_id=shared_msg_id)
if hit:
return {"ok": True, "poll_source": "replicate_package", "hit": hit}
try:
disk = _remote_disk_shared_poll_hit(agent_id, claims, shared_msg_id=shared_msg_id)
if disk.get("ok") and disk.get("hit"):
return {
"ok": True,
"poll_source": str(disk.get("source") or "disk_relay"),
"hit": dict(disk.get("hit") or {}),
}
seen = list(disk.get("seen") or seen)
last_detail = str(disk.get("detail") or last_detail)
except Exception as exc:
last_detail = str(exc) or type(exc).__name__
return {
"ok": False,
"detail": "shared reply not in remote mailbox",
"seen": seen,
"claim_tokens": len(claims),
"poll_source": "none",
"last_poll_detail": last_detail,
}
def _commit_local_contact_accept(
peer_id: str,
*,
@@ -2705,13 +2982,14 @@ def main() -> int:
raise RuntimeError(f"Pete wormhole did not become ready: {pete_runtime}")
else:
print("== prep: skip remote restart (E2E_DM_SKIP_REMOTE_PREP=1) ==")
print("== warmup: prime Tor to Pete ==")
_warmup_tor()
print(json.dumps(_prime_remote_wormhole_join(), indent=2))
print("== warmup: enable wormhole for private DM relay ==")
print(json.dumps(_prime_dm_wormhole(), indent=2))
print("== warmup: prime Tor to Pete ==")
_warmup_tor()
print("== warmup: wait for anonymous hidden transport ==")
hidden = _wait_hidden_transport_ready()
print(json.dumps(hidden, indent=2))
@@ -3189,163 +3467,23 @@ print(json.dumps({{
print("waiting 15s for Pete shared mailbox settle...")
time.sleep(15)
print("== step 8: Pete polls shared mailbox and decrypts reply ==")
print("== step 8: remote polls shared mailbox and decrypts reply ==")
_ensure_pete_api_responsive(pete_admin, reason="step 8 shared poll")
shared_recipient_token = str(shared_send.get("recipient_token") or "")
shared_recipient_token_prev = str(shared_send.get("recipient_token_prev") or "")
shared_poll_code = f"""import json, time, hashlib, hmac, secrets, urllib.request
from services.mesh.mesh_dm_relay import dm_relay
from services.mesh.mesh_wormhole_dead_drop import derive_dead_drop_token_pair
from services.mesh.mesh_wormhole_persona import get_dm_identity
sender_id = {json.dumps(local_sender_id)}
shared_msg_id = {json.dumps(shared_msg_id)}
marker = {json.dumps(REPLY_MARKER)}
agent_id = {json.dumps(pete_id)}
pete_agent_id = {json.dumps(pete_id)}
shared_alias = {json.dumps(str(local_accept.get("shared_alias") or ""))}
explicit_tokens = [
{json.dumps(shared_recipient_token)},
{json.dumps(shared_recipient_token_prev)},
]
# Match sender-side _default_dm_local_alias(peer_id=pete_agent_id):
# hmac(local_node_id, peer_agent_id) — NOT transport node_id.
initiator_local_alias = ""
if sender_id and pete_agent_id:
derived = hmac.new(
sender_id.encode("utf-8"),
pete_agent_id.encode("utf-8"),
hashlib.sha256,
).hexdigest()[:12]
initiator_local_alias = "dm-" + derived
bundle = __import__(
"services.mesh.mesh_wormhole_prekey",
fromlist=["fetch_dm_prekey_bundle"],
).fetch_dm_prekey_bundle(agent_id=sender_id)
sender_dh = str(bundle.get("dh_pub_key") or bundle.get("identity_dh_pub_key") or "")
tokens: list[str] = []
for token in explicit_tokens:
token = str(token or "").strip()
if token and token not in tokens:
tokens.append(token)
for peer_ref in [sender_id, shared_alias]:
peer_ref = str(peer_ref or "").strip()
if not peer_ref:
continue
token_pair = derive_dead_drop_token_pair(
peer_id=sender_id,
peer_dh_pub=sender_dh,
peer_ref=peer_ref,
shared_claims = _shared_mailbox_claims(
shared_send,
sender_id=local_sender_id,
shared_alias=str(local_accept.get("shared_alias") or ""),
)
if not token_pair.get("ok"):
continue
for token in [str(token_pair.get("current") or ""), str(token_pair.get("previous") or "")]:
if token and token not in tokens:
tokens.append(token)
if not tokens:
print(json.dumps({{"ok": False, "detail": "shared mailbox tokens unavailable"}}))
raise SystemExit(0)
claims = [{{"type": "shared", "token": token}} for token in tokens]
{_EMBED_SIGNED_MAILBOX_HELPERS}
hit = None
seen = []
poll_source = ""
last_poll_detail = ""
for attempt in range(10):
if attempt:
time.sleep(5)
body, data = _build_signed_mailbox_request(
agent_id=agent_id,
event_type="dm_poll",
kind="dm_poll",
endpoint="/api/mesh/dm/poll",
sequence_domain="dm_poll",
claims=claims,
shared_poll = _poll_remote_shared_mailbox(
pete_id,
shared_claims,
shared_msg_id=shared_msg_id,
replicate_package=dict(shared_replicate.get("package") or {}),
disk_inject_ok=bool(shared_replicate.get("disk_inject", {}).get("ok")),
)
req = urllib.request.Request(
"http://127.0.0.1:8000/api/mesh/dm/poll",
data=data,
headers={{"Content-Type": "application/json"}},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=45) as resp:
payload = json.loads(resp.read().decode("utf-8"))
except Exception as exc:
last_poll_detail = str(exc) or type(exc).__name__
if attempt >= 9:
break
continue
if not payload.get("ok"):
last_poll_detail = str(payload.get("detail") or last_poll_detail)
continue
messages = list(payload.get("messages") or [])
seen = [str(m.get("msg_id") or "") for m in messages]
poll_source = "http"
for message in messages:
if str(message.get("msg_id", "")) == shared_msg_id:
hit = message
break
if hit:
break
if not hit:
with dm_relay._lock:
dm_relay._refresh_from_shared_relay()
messages, _has_more = dm_relay.collect_claims(agent_id, claims, limit=32)
seen = [str(m.get("msg_id") or "") for m in list(messages or [])]
poll_source = "disk_relay"
for message in list(messages or []):
if str(message.get("msg_id", "")) == shared_msg_id:
hit = message
break
if not hit:
with dm_relay._lock:
dm_relay._refresh_from_shared_relay()
for mailbox_key, messages in dm_relay._mailboxes.items():
for message in list(messages or []):
if str(message.msg_id or "") == shared_msg_id:
hit = {{
"msg_id": message.msg_id,
"ciphertext": message.ciphertext,
"format": message.payload_format,
"payload_format": message.payload_format,
"session_welcome": message.session_welcome,
"mailbox_key": mailbox_key,
}}
poll_source = "disk_scan"
break
if hit:
break
if not hit:
print(json.dumps({{
"ok": False,
"detail": "shared reply not in Pete mailbox",
"seen": seen,
"claim_tokens": len(tokens),
"poll_source": poll_source or "none",
"last_poll_detail": last_poll_detail,
}}))
else:
print(json.dumps({{
"ok": True,
"poll_source": poll_source,
"hit": hit,
"local_alias": shared_alias,
"remote_alias": initiator_local_alias,
}}))
"""
shared_poll = _ssh_pete_python(shared_poll_code, timeout_s=300)
print(json.dumps(shared_poll, indent=2))
if not shared_poll.get("ok"):
raise RuntimeError(f"Pete could not find shared DM: {shared_poll}")
raise RuntimeError(f"remote could not find shared DM: {shared_poll}")
hit = dict(shared_poll.get("hit") or {})
shared_alias_val = str(local_accept.get("shared_alias") or "")
initiator_remote = ""
@@ -3379,7 +3517,7 @@ else:
)
print(json.dumps(shared_decrypt, indent=2))
if not shared_decrypt.get("ok") or REPLY_MARKER not in str(shared_decrypt.get("plaintext", "")):
raise RuntimeError(f"Pete could not decrypt shared DM: {shared_decrypt}")
raise RuntimeError(f"remote could not decrypt shared DM: {shared_decrypt}")
print("== E2E PASS: invite -> accept -> private shared DM ==")
return 0
+5
View File
@@ -19,6 +19,11 @@ write_wormhole_settings(
tor = tor_service.start(target_port=8000)
if tor.get("ok"):
_write_env_value("MESH_ARTI_ENABLED", "true")
onion = str(tor.get("onion_address") or "").strip().rstrip("/")
if onion:
# Replicate-envelope HMAC checks X-Peer-Url against authenticated_push_peer_urls;
# fresh participants need their own onion in the push allowlist until fleet manifest sync.
_write_env_value("MESH_RELAY_PEERS", onion)
get_settings.cache_clear()
runtime = connect_wormhole(reason="participant_warmup")
print(json.dumps({"ok": True, "tor": tor, "runtime": runtime}))