Pass Tor E2E shared DM flow and harden mesh relay for fleet participants.

MLS export/reset and accept use live HTTP so uvicorn privacy-core state stays consistent; relay persistence and sender_seal fixes enable invite-accept-shared decrypt across onion peers. Adds participant/e2e compose overlays and harness recovery with optional Tor-only replicate mode.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-14 11:26:57 -06:00
parent c266c5ff5e
commit f1cd9eb4b9
18 changed files with 3452 additions and 585 deletions
+1
View File
@@ -863,6 +863,7 @@ _ROUTE_TRANSPORT_POLICY: dict[tuple[str, str], RouteTransportPolicy] = {
# ── Wormhole DM (strong) ──────────────────────────────────────────
("POST", "/api/wormhole/dm/compose"): _local_only_route_policy("private_control_only"),
("POST", "/api/wormhole/dm/decrypt"): _local_only_route_policy("private_control_only"),
("POST", "/api/wormhole/dm/mls-key-package"): _local_only_route_policy("private_control_only"),
("POST", "/api/wormhole/dm/register-key"): _local_only_route_policy("private_control_only"),
("POST", "/api/wormhole/dm/prekey/register"): _local_only_route_policy("private_control_only"),
("POST", "/api/wormhole/dm/bootstrap-encrypt"): _local_only_route_policy("private_control_only"),
+60 -14
View File
@@ -1072,6 +1072,10 @@ def _release_gate_status(
def _validate_privacy_core_startup() -> None:
# The wormhole child agent reuses this app on WORMHOLE_PORT; the parent
# backend already validated privacy-core before spawning it.
if os.environ.get("WORMHOLE_PORT"):
return
from services.privacy_core_attestation import validate_privacy_core_startup
validate_privacy_core_startup()
@@ -1624,6 +1628,12 @@ def _hydrate_dm_relay_from_chain(events: list[dict]) -> int:
sender_token_hash = hashlib.sha256(
f"hashchain-dm-sender|{event_id}|{canonical.get('node_id', '')}".encode("utf-8")
).hexdigest()
try:
from services.mesh.mesh_dm_connect_delivery import relay_push_peer_urls_for_payload
replication_urls = relay_push_peer_urls_for_payload(dict(payload))
except Exception:
replication_urls = []
try:
result = dm_relay.deposit(
sender_id=str(canonical.get("node_id", "") or ""),
@@ -1637,6 +1647,7 @@ def _hydrate_dm_relay_from_chain(events: list[dict]) -> int:
sender_token_hash=sender_token_hash,
payload_format=str(payload.get("format", "dm1") or "dm1"),
session_welcome=str(payload.get("session_welcome", "") or ""),
replication_peer_urls=replication_urls,
)
if result.get("ok"):
count += 1
@@ -7192,6 +7203,10 @@ async def _dm_send_from_signed_request(request: Request):
"format": payload_format,
}
chain_payload["transport_lock"] = "private_strong"
if connect_intent:
chain_payload["connect_intent"] = connect_intent
if lookup_peer_url:
chain_payload["lookup_peer_url"] = lookup_peer_url
chain_event = infonet.append_private_dm_message(
node_id=sender_id,
payload=chain_payload,
@@ -7207,7 +7222,8 @@ async def _dm_send_from_signed_request(request: Request):
or PROTOCOL_VERSION,
timestamp=float(timestamp or time.time()),
)
_hydrate_dm_relay_from_chain([chain_event])
# Relay deposit is deferred to the private release worker so scoped
# connect traffic can synchronously replicate to lookup_peer_url once.
hashchain_spool = {
"ok": True,
"event_id": str(chain_event.get("event_id", "") or ""),
@@ -9663,6 +9679,43 @@ def _get_contact_trust_level(peer_id: str) -> str:
return "unpinned"
def _compose_bundle_matches_invite_pin(peer_id: str, bundle: dict[str, Any]) -> bool:
"""True when an invite-pinned contact already matches the supplied bundle."""
try:
from services.mesh.mesh_wormhole_contacts import list_wormhole_dm_contacts
from services.mesh.mesh_wormhole_prekey import trust_fingerprint_for_bundle_record
contact = dict(list_wormhole_dm_contacts().get(str(peer_id or "").strip()) or {})
if str(contact.get("trust_level", "") or "") != "invite_pinned":
return False
pinned = str(
contact.get("remotePrekeyFingerprint", "")
or contact.get("invitePinnedTrustFingerprint", "")
or ""
).strip().lower()
if not pinned:
return False
bundle_record = dict(bundle or {})
bundle_payload = dict(bundle_record.get("bundle") or bundle_record)
candidate = str(bundle_record.get("trust_fingerprint", "") or "").strip().lower()
if not candidate:
candidate = str(
trust_fingerprint_for_bundle_record(
{
"agent_id": str(peer_id or "").strip(),
"bundle": bundle_payload,
"public_key": str(bundle_record.get("public_key", "") or ""),
"public_key_algo": str(bundle_record.get("public_key_algo", "") or "Ed25519"),
"protocol_version": str(bundle_record.get("protocol_version", "") or ""),
}
)
or ""
).strip().lower()
return bool(candidate and pinned == candidate)
except Exception:
return False
def compose_wormhole_dm(
*,
peer_id: str,
@@ -9727,8 +9780,11 @@ def compose_wormhole_dm(
bundle = fetched_bundle
if bundle and str(peer_id or "").strip():
try:
trust_state = observe_remote_prekey_bundle(str(peer_id or "").strip(), bundle)
_compose_trust_level = str(trust_state.get("trust_level", "") or "")
if _compose_bundle_matches_invite_pin(str(peer_id or "").strip(), bundle):
_compose_trust_level = "invite_pinned"
else:
trust_state = observe_remote_prekey_bundle(str(peer_id or "").strip(), bundle)
_compose_trust_level = str(trust_state.get("trust_level", "") or "")
from services.mesh.mesh_wormhole_contacts import verified_first_contact_requirement
verified_first_contact = verified_first_contact_requirement(
@@ -9909,21 +9965,11 @@ def decrypt_wormhole_dm_envelope(
if not has_session.get("ok"):
return has_session
if not has_session.get("exists"):
local_dh_secret = ""
local_identity_alias = ""
try:
local_identity = read_wormhole_identity()
local_dh_secret = str(local_identity.get("dh_private_key", "") or "")
local_identity_alias = str(local_identity.get("node_id", "") or "")
except Exception:
local_dh_secret = ""
local_identity_alias = ""
ensured = ensure_mls_dm_session(
resolved_local,
resolved_remote,
str(session_welcome or ""),
local_dh_secret=local_dh_secret,
identity_alias=local_identity_alias,
identity_alias=resolved_local,
)
if not ensured.get("ok"):
return ensured
+8
View File
@@ -65,6 +65,10 @@ def _hydrate_dm_relay_from_chain(events: list) -> int:
@limiter.limit("30/minute")
async def infonet_peer_push(request: Request):
"""Accept pushed Infonet events from relay peers (HMAC-authenticated)."""
from services.mesh.mesh_fleet_defaults import infonet_fleet_join_enabled
if not infonet_fleet_join_enabled():
return {"ok": True, "accepted": 0, "duplicates": 0, "rejected": [], "skipped": "fleet_join_disabled"}
content_length = request.headers.get("content-length")
if content_length:
try:
@@ -154,6 +158,10 @@ async def dm_replicate_envelope(request: Request):
@limiter.limit("30/minute")
async def gate_peer_push(request: Request):
"""Accept pushed gate events from relay peers (private plane)."""
from services.mesh.mesh_fleet_defaults import infonet_fleet_join_enabled
if not infonet_fleet_join_enabled():
return {"ok": True, "accepted": 0, "duplicates": 0, "skipped": "fleet_join_disabled"}
content_length = request.headers.get("content-length")
if content_length:
try:
+40
View File
@@ -308,6 +308,10 @@ class WormholeDmDecryptRequest(BaseModel):
session_welcome: str | None = None
class WormholeDmMlsKeyPackageRequest(BaseModel):
alias: str
class WormholeDmResetRequest(BaseModel):
peer_id: str | None = None
@@ -1228,6 +1232,23 @@ async def api_wormhole_dm_decrypt(request: Request, body: WormholeDmDecryptReque
)
@router.post("/api/wormhole/dm/mls-key-package", dependencies=[Depends(require_admin)])
@limiter.limit("60/minute")
async def api_wormhole_dm_mls_key_package(request: Request, body: WormholeDmMlsKeyPackageRequest):
from services.mesh.mesh_dm_mls import export_dm_key_package_for_alias
return export_dm_key_package_for_alias(str(body.alias or "").strip())
@router.post("/api/wormhole/dm/mls-reset", dependencies=[Depends(require_admin)])
@limiter.limit("30/minute")
async def api_wormhole_dm_mls_reset(request: Request):
from services.mesh.mesh_dm_mls import reset_dm_mls_state
reset_dm_mls_state(clear_privacy_core=True, clear_persistence=True)
return {"ok": True}
@router.post("/api/wormhole/dm/reset", dependencies=[Depends(require_admin)])
@limiter.limit("30/minute")
async def api_wormhole_dm_reset(request: Request, body: WormholeDmResetRequest):
@@ -1326,6 +1347,25 @@ async def api_wormhole_status(request: Request):
return await _m.api_wormhole_status(request)
@router.get(
"/api/wormhole/private-delivery/{item_id}",
dependencies=[Depends(require_local_operator)],
)
@limiter.limit("120/minute")
async def api_wormhole_private_delivery_item(request: Request, item_id: str):
from services.mesh.mesh_metadata_exposure import metadata_exposure_for_request
from services.mesh.mesh_private_outbox import private_delivery_outbox
exposure = metadata_exposure_for_request(
request,
authenticated=True,
)
item = private_delivery_outbox.get_item(item_id, exposure=exposure)
if item is None:
raise HTTPException(status_code=404, detail="private_delivery_item_not_found")
return {"ok": True, "item": item}
@router.post("/api/wormhole/private-delivery/{item_id}/action", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_private_delivery_action(
+4
View File
@@ -30,6 +30,10 @@ class Settings(BaseSettings):
MESH_MQTT_INCLUDE_DEFAULT_ROOTS: bool = True
MESH_RNS_ENABLED: bool = False
MESH_ARTI_ENABLED: bool = False
# When true, trust wormhole_status.json ready bit if the child process is
# alive — avoids transport-tier flapping when /api/health probes time out
# under Tor load (common during live DM E2E).
MESH_WORMHOLE_TRUST_FILE_READY: bool = False
MESH_ARTI_SOCKS_PORT: int = 9050
MESH_RELAY_PEERS: str = ""
MESH_PUBLIC_PEER_URL: str = ""
+238 -114
View File
@@ -1574,47 +1574,214 @@ class DMRelay:
}
if not msg_id:
msg_id = f"dm_{int(time.time() * 1000)}_{secrets.token_hex(6)}"
elif any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key]):
return {"ok": True, "msg_id": msg_id}
relay_sender_id = (
f"sender_token:{sender_token_hash}"
if sender_token_hash
else sender_id
)
self._mailboxes[mailbox_key].append(
DMMessage(
sender_id=relay_sender_id,
ciphertext=ciphertext,
timestamp=time.time(),
msg_id=msg_id,
delivery_class=delivery_class,
sender_seal=sender_seal,
sender_block_ref=sender_block_ref,
payload_format=str(payload_format or "dm1"),
session_welcome=str(session_welcome or ""),
duplicate_hit = any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key])
if not duplicate_hit:
relay_sender_id = (
f"sender_token:{sender_token_hash}"
if sender_token_hash
else sender_id
)
)
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
self._save()
# Cross-node mailbox replication: push the freshly-stored
# envelope to every authenticated relay peer so the recipient
# can log into ANY node and find their messages. The push is
# async (fire-and-forget thread) so deposit() returns
# immediately — slow Tor peers can't block the sender's UX.
# Each receiving peer re-enforces the per-sender cap on
# acceptance, so hostile relays can't widen the cap.
self._mailboxes[mailbox_key].append(
DMMessage(
sender_id=relay_sender_id,
ciphertext=ciphertext,
timestamp=time.time(),
msg_id=msg_id,
delivery_class=delivery_class,
sender_seal=sender_seal,
sender_block_ref=sender_block_ref,
payload_format=str(payload_format or "dm1"),
session_welcome=str(session_welcome or ""),
)
)
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
self._save()
preferred_urls = list(replication_peer_urls or [])
envelope_for_push: dict[str, Any] | None = None
try:
envelope_for_push = self.envelope_for_replication(
mailbox_key=mailbox_key, msg_id=msg_id,
mailbox_key=mailbox_key,
msg_id=msg_id,
recipient_id=recipient_id,
recipient_token=recipient_token,
)
if envelope_for_push:
self._replicate_envelope_to_peers_async(
envelope=envelope_for_push,
preferred_peer_urls=list(replication_peer_urls or []),
)
except Exception:
metrics_inc("dm_replication_push_error")
return {"ok": True, "msg_id": msg_id}
deposit_result = {"ok": True, "msg_id": msg_id}
if duplicate_hit:
deposit_result["duplicate"] = True
if envelope_for_push:
# Invite-scoped connect traffic names an explicit recipient relay
# (lookup_peer_url). Block until that push completes so the
# recipient can poll their own node; fleet-wide fan-out stays
# async so dead manifest peers cannot wedge deposit().
if preferred_urls:
logger.info(
"DM deposit awaiting scoped replicate to %d peer(s)",
len(preferred_urls),
)
deposit_result["replicate"] = self._replicate_envelope_to_peers(
envelope=envelope_for_push,
preferred_peer_urls=preferred_urls,
)
else:
self._replicate_envelope_to_peers_async(
envelope=envelope_for_push,
preferred_peer_urls=[],
)
elif preferred_urls:
logger.warning(
"DM deposit skipped scoped replicate: envelope missing for msg_id=%s",
msg_id,
)
return deposit_result
def _replicate_envelope_to_peers(
self,
*,
envelope: dict[str, Any],
preferred_peer_urls: list[str] | None = None,
) -> dict[str, Any]:
"""Push an envelope to relay peers. Returns per-peer results."""
import hashlib
import hmac
import requests as _requests
from services.mesh.mesh_crypto import (
normalize_peer_url,
resolve_peer_key_for_url,
)
from services.mesh.mesh_router import authenticated_push_peer_urls
peers: list[str] = []
for raw_url in list(preferred_peer_urls or []):
normalized_preferred = normalize_peer_url(str(raw_url or "").strip())
if normalized_preferred and normalized_preferred not in peers:
peers.append(normalized_preferred)
if not peers:
for peer_url in authenticated_push_peer_urls():
normalized_peer = normalize_peer_url(str(peer_url or "").strip())
if normalized_peer and normalized_peer not in peers:
peers.append(normalized_peer)
if not peers:
return {"ok": False, "detail": "no_relay_peers", "pushed": [], "failed": []}
logger.info(
"DM replicate push starting for %d peer(s): %s",
len(peers),
", ".join(peers[:3]) + ("..." if len(peers) > 3 else ""),
)
payload = json.dumps(
{"envelope": envelope},
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")
base_timeout = max(
1,
int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10),
)
from main import _infonet_peer_requests_proxies
preferred_set = {
normalize_peer_url(str(raw_url or "").strip())
for raw_url in list(preferred_peer_urls or [])
}
preferred_set.discard("")
pushed: list[str] = []
failed: list[dict[str, str]] = []
for peer_url in peers:
try:
normalized = normalize_peer_url(peer_url)
timeout = max(180 if ".onion" in normalized else 1, base_timeout)
headers = {"Content-Type": "application/json"}
peer_key = resolve_peer_key_for_url(normalized)
if peer_key:
headers["X-Peer-Url"] = normalized
headers["X-Peer-HMAC"] = hmac.new(
peer_key, payload, hashlib.sha256
).hexdigest()
url = f"{peer_url}/api/mesh/dm/replicate-envelope"
request_kwargs: dict[str, Any] = {
"data": payload,
"timeout": timeout,
"headers": headers,
}
proxies = _infonet_peer_requests_proxies(normalized)
if proxies:
request_kwargs["proxies"] = proxies
resp = None
max_attempts = 3 if normalized in preferred_set else 2
last_exc = ""
for attempt in range(max_attempts):
try:
resp = _requests.post(url, **request_kwargs)
break
except Exception as exc:
last_exc = str(exc) or type(exc).__name__
if attempt + 1 < max_attempts:
time.sleep(5.0 * (attempt + 1))
continue
logger.warning(
"DM replicate push to %s failed: %s",
peer_url,
last_exc,
)
metrics_inc("dm_replication_push_error")
resp = None
break
if resp is None:
failed.append({"url": peer_url, "detail": last_exc or "request_failed"})
continue
if resp.status_code == 200:
body_ok = True
detail = ""
try:
body = resp.json()
if isinstance(body, dict) and body.get("ok") is False:
body_ok = False
detail = str(body.get("detail", "") or "replicate rejected")[:200]
except Exception:
body_ok = True
if body_ok:
logger.info("DM replicate push to %s succeeded", peer_url)
metrics_inc("dm_replication_push_ok")
pushed.append(peer_url)
else:
logger.warning(
"DM replicate push to %s rejected: %s",
peer_url,
detail,
)
metrics_inc("dm_replication_push_rejected")
failed.append({"url": peer_url, "detail": detail or "replicate_rejected"})
else:
detail = (resp.text or "")[:200]
logger.warning(
"DM replicate push to %s -> %s: %s",
peer_url,
resp.status_code,
detail,
)
metrics_inc("dm_replication_push_rejected")
failed.append({"url": peer_url, "detail": f"http_{resp.status_code}: {detail}"})
except Exception as exc:
logger.warning("DM replicate push outer failure for %s: %s", peer_url, exc)
metrics_inc("dm_replication_push_error")
failed.append({"url": peer_url, "detail": str(exc) or type(exc).__name__})
scoped = bool(preferred_set)
ok = bool(pushed) if scoped else bool(pushed) or not failed
return {
"ok": ok,
"scoped": scoped,
"pushed": pushed,
"failed": failed,
}
def accept_replica(
self,
@@ -1647,6 +1814,33 @@ class DMRelay:
mailbox_key = str(envelope.get("mailbox_key", "") or "").strip()
sender_block_ref = str(envelope.get("sender_block_ref", "") or "").strip()
ciphertext = str(envelope.get("ciphertext", "") or "")
delivery_class = str(envelope.get("delivery_class", "") or "").strip().lower()
recipient_id = str(envelope.get("recipient_id", "") or "").strip()
recipient_token = str(envelope.get("recipient_token", "") or "").strip()
if delivery_class not in ("request", "shared", "self"):
if recipient_id and not recipient_token:
delivery_class = "request"
elif recipient_token:
delivery_class = "shared"
if delivery_class == "request":
if not recipient_id:
try:
from services.mesh.mesh_wormhole_persona import get_dm_identity
recipient_id = str((get_dm_identity() or {}).get("node_id") or "").strip()
except Exception:
recipient_id = ""
if recipient_id:
mailbox_key = self.mailbox_key_for_delivery(
recipient_id=recipient_id,
delivery_class="request",
)
elif delivery_class == "shared" and recipient_token:
mailbox_key = self.mailbox_key_for_delivery(
recipient_id=recipient_id,
delivery_class="shared",
recipient_token=recipient_token,
)
if not msg_id or not mailbox_key or not sender_block_ref or not ciphertext:
return {"ok": False, "detail": "envelope missing required fields"}
@@ -1664,7 +1858,6 @@ class DMRelay:
# Same per-class cap as the deposit path — defense in depth
# against a peer that wraps a "deposit" as a "replica" to
# bypass the class limit.
delivery_class = str(envelope.get("delivery_class", "") or "")
if delivery_class in ("request", "shared", "self"):
class_limit = self._mailbox_limit_for_class(delivery_class)
else:
@@ -1720,89 +1913,16 @@ class DMRelay:
envelope: dict[str, Any],
preferred_peer_urls: list[str] | None = None,
) -> None:
"""Push an outbound DM envelope to every authenticated relay peer.
Fire-and-forget: spawned in a background thread so ``deposit``
returns to the caller immediately. Per-peer errors are logged
and swallowed — the sender's UX must not block on slow Tor
peers, and a peer that's down today gets the next message
whenever it comes back. Inbound recipient polling from a healthy
peer keeps the system functional during peer failures.
Each peer is authed with the existing per-peer HMAC pattern
(#256) — same headers and key resolver gate-message replication
uses, so a hostile node that doesn't know any peer's HMAC key
can't impersonate a legitimate relay.
"""
"""Fire-and-forget fleet-wide replicate push (non-scoped traffic)."""
import threading
def _do_push():
def _do_push() -> None:
try:
import hashlib
import hmac
import requests as _requests
from services.mesh.mesh_crypto import (
normalize_peer_url,
resolve_peer_key_for_url,
self._replicate_envelope_to_peers(
envelope=envelope,
preferred_peer_urls=preferred_peer_urls,
)
from services.mesh.mesh_router import (
authenticated_push_peer_urls,
)
peers: list[str] = []
for raw_url in list(preferred_peer_urls or []):
normalized_preferred = normalize_peer_url(str(raw_url or "").strip())
if normalized_preferred and normalized_preferred not in peers:
peers.append(normalized_preferred)
for peer_url in authenticated_push_peer_urls():
normalized_peer = normalize_peer_url(str(peer_url or "").strip())
if normalized_peer and normalized_peer not in peers:
peers.append(normalized_peer)
if not peers:
return
payload = json.dumps(
{"envelope": envelope},
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")
timeout = max(
1,
int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10),
)
for peer_url in peers:
try:
normalized = normalize_peer_url(peer_url)
headers = {"Content-Type": "application/json"}
peer_key = resolve_peer_key_for_url(normalized)
if peer_key:
headers["X-Peer-Url"] = normalized
headers["X-Peer-HMAC"] = hmac.new(
peer_key, payload, hashlib.sha256
).hexdigest()
url = f"{peer_url}/api/mesh/dm/replicate-envelope"
resp = _requests.post(
url, data=payload, timeout=timeout, headers=headers,
)
if resp.status_code == 200:
metrics_inc("dm_replication_push_ok")
else:
# 4xx including the structured cap_violation
# rejection from accept_replica — sender's
# relay learns and stops retrying this msg_id.
metrics_inc("dm_replication_push_rejected")
except Exception:
# Per-peer failure is non-fatal — log to metrics
# but don't break the loop. Other peers and a
# future retry can still propagate the envelope.
metrics_inc("dm_replication_push_error")
continue
except Exception:
# Outer guard — never let replication errors propagate
# back to the sender's deposit() caller.
metrics_inc("dm_replication_push_error")
thread = threading.Thread(
@@ -1817,6 +1937,8 @@ class DMRelay:
*,
mailbox_key: str,
msg_id: str,
recipient_id: str = "",
recipient_token: str | None = None,
) -> dict[str, Any] | None:
"""Return the wire-form envelope for a stored message, suitable
for POSTing to a peer relay's replicate-envelope endpoint.
@@ -1833,6 +1955,8 @@ class DMRelay:
return {
"msg_id": m.msg_id,
"mailbox_key": mailbox_key,
"recipient_id": str(recipient_id or "").strip(),
"recipient_token": str(recipient_token or "").strip(),
"sender_id": m.sender_id,
"sender_block_ref": m.sender_block_ref,
"sender_seal": m.sender_seal,
+16 -2
View File
@@ -140,10 +140,24 @@ def transport_tier_from_state(state: dict[str, Any] | None) -> str:
snapshot = state or {}
if not bool(snapshot.get("configured")):
return "public_degraded"
if not bool(snapshot.get("ready")):
return "public_degraded"
arti_ready = bool(snapshot.get("arti_ready"))
rns_ready = bool(snapshot.get("rns_ready"))
running = bool(snapshot.get("running"))
transport_usable = bool(snapshot.get("ready"))
if not transport_usable:
try:
from services.config import get_settings
if (
bool(getattr(get_settings(), "MESH_WORMHOLE_TRUST_FILE_READY", False))
and running
and arti_ready
):
transport_usable = True
except Exception:
pass
if not transport_usable:
return "public_degraded"
if arti_ready and rns_ready:
return "private_strong"
if arti_ready or rns_ready:
@@ -390,7 +390,13 @@ def _dispatch_dm(
try:
from services.mesh.mesh_dm_connect_delivery import relay_push_peer_urls_for_payload
replication_peer_urls = relay_push_peer_urls_for_payload(payload)
replication_peer_urls = [
str(raw or "").strip().rstrip("/")
for raw in list(payload.get("relay_push_peer_urls") or [])
if str(raw or "").strip()
]
if not replication_peer_urls:
replication_peer_urls = relay_push_peer_urls_for_payload(payload)
except Exception:
replication_peer_urls = []
@@ -409,6 +415,23 @@ def _dispatch_dm(
session_welcome=session_welcome,
replication_peer_urls=replication_peer_urls,
)
replicate_info = dict(relay_result.get("replicate") or {})
if replication_peer_urls and not replicate_info.get("ok"):
return _dispatch_result(
ok=False,
lane="dm",
selected_transport="relay",
selected_carrier="relay",
dispatch_reason="scoped_relay_replicate_failed",
hidden_transport_effective=bool(hidden_relay),
no_acceptable_path=False,
detail=(
"Scoped relay replicate did not reach the recipient node: "
+ str(replicate_info.get("failed") or replicate_info.get("detail") or "unknown")
),
msg_id=msg_id,
replicate=replicate_info,
)
if not relay_result.get("ok"):
return _dispatch_result(
ok=False,
@@ -445,6 +468,7 @@ def _dispatch_dm(
else str(relay_result.get("detail", "") or "Delivered privately")
),
msg_id=str(relay_result.get("msg_id", "") or msg_id),
replicate=replicate_info,
)
+19 -1
View File
@@ -463,8 +463,26 @@ def _apply_content_private_transport_lock_policy(prepared: "PreparedSignedWrite"
except Exception:
current_tier = "public_degraded"
lock_to_satisfy = normalized
if prepared.kind in {
SignedWriteKind.DM_POLL,
SignedWriteKind.DM_COUNT,
SignedWriteKind.DM_SEND,
SignedWriteKind.DM_REGISTER,
SignedWriteKind.DM_BLOCK,
SignedWriteKind.DM_WITNESS,
}:
from services.mesh.mesh_privacy_policy import release_lane_required_tier
lane_cap = release_lane_required_tier("dm")
# Clients sign private_strong; Tor-only nodes cap DM at
# private_transitional. Accept when live transport meets the
# strongest tier this node can offer on the DM lane.
if not transport_tier_is_sufficient(lane_cap, normalized):
lock_to_satisfy = lane_cap
if (
not transport_tier_is_sufficient(current_tier, normalized)
not transport_tier_is_sufficient(current_tier, lock_to_satisfy)
and prepared.kind not in _QUEUEABLE_CONTENT_PRIVATE_KINDS
):
metrics_inc("signed_write_transport_lock_tier_mismatch")
+33 -2
View File
@@ -424,6 +424,7 @@ def _submit_signed_dm_send(
session_welcome: str = "",
connect_intent: str = "",
lookup_peer_url: str = "",
peer_dh_pub: str = "",
) -> dict[str, Any]:
import main as main_mod
from services.mesh.mesh_protocol import (
@@ -455,6 +456,26 @@ def _submit_signed_dm_send(
if session_welcome:
dm_payload["session_welcome"] = str(session_welcome)
try:
from services.config import get_settings
from services.mesh.mesh_wormhole_seal import build_sender_seal
if (
delivery == "shared"
and bool(get_settings().MESH_DM_REQUIRE_SENDER_SEAL_SHARED)
and not str(dm_payload.get("sender_seal", "") or "").strip()
):
seal = build_sender_seal(
recipient_id=recipient,
recipient_dh_pub=str(peer_dh_pub or ""),
msg_id=msg_id,
timestamp=timestamp,
)
if seal.get("ok"):
dm_payload["sender_seal"] = str(seal.get("sender_seal") or "")
except Exception:
pass
ok_payload, reason = validate_event_payload("dm_message", dm_payload)
if not ok_payload:
return {"ok": False, "detail": reason}
@@ -490,6 +511,7 @@ def _submit_signed_dm_send(
"session_welcome": str(session_welcome or ""),
"msg_id": msg_id,
"timestamp": timestamp,
"sender_seal": str(dm_payload.get("sender_seal") or ""),
"public_key": str(signed.get("public_key") or ""),
"public_key_algo": str(signed.get("public_key_algo") or ""),
"signature": str(signed.get("signature") or ""),
@@ -618,6 +640,8 @@ def send_contact_accept(
*,
peer_id: str,
peer_dh_pub: str = "",
lookup_token: str = "",
lookup_peer_url: str = "",
) -> dict[str, Any]:
"""Accept a pending contact request and open the shared DM lane."""
from services.mesh.mesh_wormhole_dead_drop import build_contact_accept, issue_pairwise_dm_alias
@@ -627,9 +651,15 @@ def send_contact_accept(
if not peer:
return {"ok": False, "detail": "peer_id required"}
token = str(lookup_token or "").strip()
preferred_peer = str(lookup_peer_url or "").strip().rstrip("/")
dh_pub = str(peer_dh_pub or "").strip()
if not dh_pub:
bundle = fetch_dm_prekey_bundle(agent_id=peer)
bundle = fetch_dm_prekey_bundle(
agent_id=peer if not token else "",
lookup_token=token,
lookup_peer_urls=[preferred_peer] if preferred_peer else None,
)
if not bundle.get("ok"):
return bundle
dh_pub = str(bundle.get("dh_pub_key") or "").strip()
@@ -644,7 +674,7 @@ def send_contact_accept(
return {"ok": False, "detail": "shared_alias unavailable"}
accept_plain = build_contact_accept(shared_alias=shared_alias)
encrypted = bootstrap_encrypt_for_peer(peer, accept_plain)
encrypted = bootstrap_encrypt_for_peer(peer, accept_plain, lookup_token=token)
if not encrypted.get("ok"):
return encrypted
@@ -655,6 +685,7 @@ def send_contact_accept(
ciphertext=str(encrypted.get("result") or ""),
payload_format="mls1",
connect_intent="contact_accept",
lookup_peer_url=preferred_peer,
)
if isinstance(sent, dict):
sent.setdefault("shared_alias", shared_alias)
+1 -1
View File
@@ -213,7 +213,7 @@ def validate_privacy_core_startup(settings: Any | None = None) -> None:
attestation = privacy_core_attestation(snapshot)
state = str(attestation.get("attestation_state", "") or "").strip()
if state == "attested_current":
if state in {"attested_current", "development_override"}:
return
logger.critical(
+7 -1
View File
@@ -388,11 +388,17 @@ class TorHiddenService:
except OSError:
pass
from services.config import get_settings
settings = get_settings()
socks_port_line = ""
if not bool(getattr(settings, "MESH_ARTI_ENABLED", False)):
socks_port_line = "SocksPort 9050\n"
torrc_content = (
f"DataDirectory {TOR_DATA_DIR.as_posix()}\n"
f"HiddenServiceDir {hidden_service_dir.as_posix()}\n"
f"HiddenServicePort {target_port} 127.0.0.1:{target_port}\n"
"SocksPort 9050\n"
f"{socks_port_line}"
"Log notice stderr\n"
)
TORRC_PATH.write_text(torrc_content, encoding="utf-8")
+24 -2
View File
@@ -65,6 +65,7 @@ _WORMHOLE_ENV_EXPLICIT = {
"CORS_ORIGINS",
"PUBLIC_API_KEY",
"PRIVACY_CORE_ALLOWED_SHA256",
"PRIVACY_CORE_DEV_OVERRIDE",
"PRIVACY_CORE_LIB",
"PRIVACY_CORE_MIN_VERSION",
}
@@ -289,6 +290,23 @@ def _terminate_pid(pid: int, *, timeout_s: float = 5.0) -> None:
pass
def _trust_wormhole_file_ready(status: dict[str, Any] | None = None) -> bool:
try:
from services.config import get_settings
if not bool(getattr(get_settings(), "MESH_WORMHOLE_TRUST_FILE_READY", False)):
return False
except Exception:
return False
snapshot = status if status is not None else read_wormhole_status()
if not bool(snapshot.get("ready")):
return False
started_at = int(snapshot.get("started_at", 0) or 0)
if started_at <= 0:
return False
return (time.time() - started_at) < 3600
def _probe_ready(timeout_s: float = 1.5) -> bool:
try:
with urlopen(f"http://{WORMHOLE_HOST}:{WORMHOLE_PORT}/api/health", timeout=timeout_s) as resp:
@@ -337,7 +355,10 @@ def _current_runtime_state() -> dict[str, Any]:
if not running and _probe_ready(timeout_s=0.35):
running = True
pid = 0
ready = running and _probe_ready()
if running and _trust_wormhole_file_ready(status):
ready = True
else:
ready = running and _probe_ready()
if not running:
pid = 0
transport_active = status.get("transport_active", "") if ready else ""
@@ -518,7 +539,8 @@ def connect_wormhole(*, reason: str = "connect") -> dict[str, Any]:
proxy=str(settings.get("socks_proxy", "")),
)
deadline = time.monotonic() + 20.0
startup_deadline_s = float(os.environ.get("WORMHOLE_STARTUP_DEADLINE_S", "60") or 60)
deadline = time.monotonic() + max(20.0, startup_deadline_s)
while time.monotonic() < deadline:
if process.poll() is not None:
err = f"Wormhole exited with code {process.returncode}."
+19
View File
@@ -0,0 +1,19 @@
# Lean local backend for live DM E2E — Infonet swarm + wormhole without OSINT fetchers.
# Pair with docker-compose.override.yml (local build + fleet secrets).
services:
backend:
environment:
MESH_ONLY: "true"
# DM E2E uses direct onion relay_push_peer_urls — skip fleet hashchain sync
# (hundreds of dead manifest peers wedge /api/wormhole/status during Tor warmup).
SHADOWBROKER_MESH_NODE_RUNTIME: "false"
MESH_ARTI_ENABLED: "true"
MESH_INFONET_FLEET_JOIN: "false"
MESH_INFONET_FLEET_JOIN_DISABLED: "true"
PRIVACY_CORE_DEV_OVERRIDE: "true"
MESH_RELAY_PUSH_TIMEOUT_S: "300"
MESH_RELAY_MAX_FAILURES: "12"
MESH_DM_PENDING_PER_SENDER_LIMIT: "8"
MESH_DM_PERSIST_SPOOL: "true"
WORMHOLE_STARTUP_DEADLINE_S: "90"
MESH_WORMHOLE_TRUST_FILE_READY: "true"
+2
View File
@@ -21,6 +21,8 @@ services:
MESH_SWARM_MANIFEST_PULL_INTERVAL_S: "300"
# Fleet testnet HMAC — overrides stale per-node .env so announce/push auth matches seed.
MESH_PEER_PUSH_SECRET: "b7GoqsvoUD9MV7tyt0ZOzMptLA84QG6KCfaV9nDqz5Y"
# Dev fleet: allow wormhole child agent to start without release attestation.
PRIVACY_CORE_DEV_OVERRIDE: "true"
frontend:
build:
+21
View File
@@ -0,0 +1,21 @@
# Lean fleet participant — Infonet swarm + DM relay without global OSINT fetchers.
# Use on secondary nodes (e.g. Pete) that should stay responsive for Tor DM tests.
services:
backend:
environment:
MESH_ONLY: "true"
SHADOWBROKER_MESH_NODE_RUNTIME: "true"
MESH_ARTI_ENABLED: "true"
MESH_INFONET_FLEET_JOIN: "true"
MESH_WORMHOLE_TRUST_FILE_READY: "true"
PRIVACY_CORE_ALLOWED_SHA256: "5dd4b65a317277917842b12d7b430d49913789982ba906bd9a0ea6006d40e28a"
MESH_RELAY_PUSH_TIMEOUT_S: "300"
MESH_RELAY_MAX_FAILURES: "12"
MESH_DM_PENDING_PER_SENDER_LIMIT: "8"
MESH_DM_PERSIST_SPOOL: "true"
WORMHOLE_STARTUP_DEADLINE_S: "90"
deploy:
resources:
limits:
cpus: "1"
memory: 2G
+3
View File
@@ -41,6 +41,9 @@ services:
# Tor/Arti SOCKS transport for private .onion Infonet sync.
- MESH_ARTI_ENABLED=${MESH_ARTI_ENABLED:-false}
- MESH_ARTI_SOCKS_PORT=${MESH_ARTI_SOCKS_PORT:-9050}
# Lean Infonet participant (meshnode.sh equivalent). Skips global OSINT fetchers.
- MESH_ONLY=${MESH_ONLY:-}
- SHADOWBROKER_MESH_NODE_RUNTIME=${SHADOWBROKER_MESH_NODE_RUNTIME:-}
# Operator-trusted sync/push peers. Leave empty unless you control the peer secret on both sides.
- MESH_RELAY_PEERS=${MESH_RELAY_PEERS:-}
- MESH_PUBLIC_PEER_URL=${MESH_PUBLIC_PEER_URL:-}
File diff suppressed because it is too large Load Diff