diff --git a/backend/routers/mesh_peer_sync.py b/backend/routers/mesh_peer_sync.py index fbad9bf..4427803 100644 --- a/backend/routers/mesh_peer_sync.py +++ b/backend/routers/mesh_peer_sync.py @@ -85,6 +85,64 @@ async def infonet_peer_push(request: Request): return {"ok": True, **result} +@router.post("/api/mesh/dm/replicate-envelope") +@limiter.limit("60/minute") +async def dm_replicate_envelope(request: Request): + """Accept a DM envelope replicated from a peer relay (cross-node mailbox). + + Companion endpoint to ``DMRelay.replicate_to_peers`` (outbound, in + ``mesh_dm_relay.py``). The sender's relay POSTs an encrypted DM + envelope here after a successful local ``deposit``; this endpoint + re-enforces the per-(sender, recipient) anti-spam cap and stores + the envelope in the local mailbox if accepted. + + The cap is the network rule: a hostile sender's relay can spool + extras locally, but every honest peer enforces the cap on inbound + replication. Recipient polling from any honest peer therefore + never sees more than ``MESH_DM_PENDING_PER_SENDER_LIMIT`` pending + from any one sender, no matter how many spam attempts were tried. + + Same HMAC auth pattern as ``infonet_peer_push`` and ``gate_peer_push``. + """ + content_length = request.headers.get("content-length") + if content_length: + try: + # DM envelopes are bounded by MESH_DM_MAX_MSG_BYTES + envelope + # overhead; 64 KB is a generous ceiling. + if int(content_length) > 65_536: + return Response( + content='{"ok":false,"detail":"Request body too large (max 64KB)"}', + status_code=413, media_type="application/json", + ) + except (ValueError, TypeError): + pass + body_bytes = await request.body() + if not _verify_peer_push_hmac(request, body_bytes): + return Response( + content='{"ok":false,"detail":"Invalid or missing peer HMAC"}', + status_code=403, media_type="application/json", + ) + try: + body = json_mod.loads(body_bytes or b"{}") + except (ValueError, TypeError): + return Response( + content='{"ok":false,"detail":"Invalid JSON body"}', + status_code=400, media_type="application/json", + ) + envelope = body.get("envelope") + if not isinstance(envelope, dict): + return {"ok": False, "detail": "envelope must be an object"} + + originating_peer = _peer_hmac_url_from_request(request) or "" + + from services.mesh.mesh_dm_relay import dm_relay + result = dm_relay.accept_replica( + envelope=envelope, + originating_peer_url=originating_peer, + ) + return result + + @router.post("/api/mesh/gate/peer-push") @limiter.limit("30/minute") async def gate_peer_push(request: Request): diff --git a/backend/services/mesh/mesh_dm_relay.py b/backend/services/mesh/mesh_dm_relay.py index ad97249..cc12b0c 100644 --- a/backend/services/mesh/mesh_dm_relay.py +++ b/backend/services/mesh/mesh_dm_relay.py @@ -1595,6 +1595,23 @@ class DMRelay: ) self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values()) self._save() + # Cross-node mailbox replication: push the freshly-stored + # envelope to every authenticated relay peer so the recipient + # can log into ANY node and find their messages. The push is + # async (fire-and-forget thread) so deposit() returns + # immediately — slow Tor peers can't block the sender's UX. + # Each receiving peer re-enforces the per-sender cap on + # acceptance, so hostile relays can't widen the cap. + try: + envelope_for_push = self.envelope_for_replication( + mailbox_key=mailbox_key, msg_id=msg_id, + ) + if envelope_for_push: + self._replicate_envelope_to_peers_async( + envelope=envelope_for_push, + ) + except Exception: + metrics_inc("dm_replication_push_error") return {"ok": True, "msg_id": msg_id} def accept_replica( @@ -1695,6 +1712,95 @@ class DMRelay: metrics_inc("dm_replica_accepted") return {"ok": True, "msg_id": msg_id} + def _replicate_envelope_to_peers_async( + self, + *, + envelope: dict[str, Any], + ) -> None: + """Push an outbound DM envelope to every authenticated relay peer. + + Fire-and-forget: spawned in a background thread so ``deposit`` + returns to the caller immediately. Per-peer errors are logged + and swallowed — the sender's UX must not block on slow Tor + peers, and a peer that's down today gets the next message + whenever it comes back. Inbound recipient polling from a healthy + peer keeps the system functional during peer failures. + + Each peer is authed with the existing per-peer HMAC pattern + (#256) — same headers and key resolver gate-message replication + uses, so a hostile node that doesn't know any peer's HMAC key + can't impersonate a legitimate relay. + """ + import threading + + def _do_push(): + try: + import hashlib + import hmac + import requests as _requests + + from services.mesh.mesh_crypto import ( + normalize_peer_url, + resolve_peer_key_for_url, + ) + from services.mesh.mesh_router import ( + authenticated_push_peer_urls, + ) + + peers = authenticated_push_peer_urls() + if not peers: + return + + payload = json.dumps( + {"envelope": envelope}, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + timeout = max( + 1, + int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10), + ) + + for peer_url in peers: + try: + normalized = normalize_peer_url(peer_url) + headers = {"Content-Type": "application/json"} + peer_key = resolve_peer_key_for_url(normalized) + if peer_key: + headers["X-Peer-Url"] = normalized + headers["X-Peer-HMAC"] = hmac.new( + peer_key, payload, hashlib.sha256 + ).hexdigest() + url = f"{peer_url}/api/mesh/dm/replicate-envelope" + resp = _requests.post( + url, data=payload, timeout=timeout, headers=headers, + ) + if resp.status_code == 200: + metrics_inc("dm_replication_push_ok") + else: + # 4xx including the structured cap_violation + # rejection from accept_replica — sender's + # relay learns and stops retrying this msg_id. + metrics_inc("dm_replication_push_rejected") + except Exception: + # Per-peer failure is non-fatal — log to metrics + # but don't break the loop. Other peers and a + # future retry can still propagate the envelope. + metrics_inc("dm_replication_push_error") + continue + except Exception: + # Outer guard — never let replication errors propagate + # back to the sender's deposit() caller. + metrics_inc("dm_replication_push_error") + + thread = threading.Thread( + target=_do_push, + name="dm-replicate-push", + daemon=True, + ) + thread.start() + def envelope_for_replication( self, *, diff --git a/backend/tests/test_dm_replicate_envelope_endpoint.py b/backend/tests/test_dm_replicate_envelope_endpoint.py new file mode 100644 index 0000000..a1f5772 --- /dev/null +++ b/backend/tests/test_dm_replicate_envelope_endpoint.py @@ -0,0 +1,150 @@ +"""POST /api/mesh/dm/replicate-envelope — receiving side of cross-node DM +mailbox replication. + +This is the endpoint that peer relays call when they want to hand off an +encrypted DM envelope to us (so the recipient can log into our node and +find their messages). It re-enforces the per-(sender, recipient) anti-spam +cap so hostile sender relays can't widen the cap by skipping the local +check on their own deposit path. + +The endpoint: + + * authenticates the caller via the existing per-peer HMAC pattern + (same one /api/mesh/infonet/peer-push and /api/mesh/gate/peer-push + use, introduced in #256 — ``X-Peer-Url`` + ``X-Peer-HMAC`` headers + keyed off ``resolve_peer_key_for_url``) + * rejects bodies > 64 KB (DM envelope size is bounded by + ``MESH_DM_MAX_MSG_BYTES`` — 64KB ceiling has generous headroom) + * rejects requests without a valid peer HMAC with 403 + * passes the envelope to ``DMRelay.accept_replica`` which enforces + the cap + +This file pins the endpoint contract. The cap enforcement itself is +tested in ``test_dm_relay_per_sender_cap.py`` against the relay's +``accept_replica`` method directly. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import hmac +import json + +import pytest +from httpx import ASGITransport, AsyncClient + + +@pytest.fixture +def remote_client(): + """ASGI client with peer IP 1.2.3.4 — never on the local-operator + allowlist. Used to prove the endpoint isn't accidentally reachable + by random remote callers without peer HMAC.""" + from main import app + + class _RemoteClient: + def __init__(self): + self._loop = asyncio.new_event_loop() + self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345)) + self._base = "http://1.2.3.4:8000" + + def post(self, url, **kw): + async def go(): + async with AsyncClient(transport=self._transport, base_url=self._base) as ac: + return await ac.post(url, **kw) + return self._loop.run_until_complete(go()) + + def close(self): + self._loop.close() + + c = _RemoteClient() + yield c + c.close() + + +class TestReplicateEndpointAuth: + def test_rejects_request_without_peer_hmac(self, remote_client): + """A peer push that does NOT carry X-Peer-Url + X-Peer-HMAC + must be rejected with 403 before the envelope is ever passed + to the relay. Same gate the existing infonet/gate peer-push + endpoints enforce.""" + payload = { + "envelope": { + "msg_id": "dm_unauth_1", + "mailbox_key": "mb", + "sender_block_ref": "sender", + "ciphertext": "x", + }, + } + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + json=payload, + ) + assert r.status_code == 403 + assert "peer HMAC" in r.text or "peer hmac" in r.text.lower() + + def test_rejects_wrong_peer_hmac(self, remote_client, monkeypatch): + """A request with a peer HMAC header keyed off the WRONG secret + is rejected. Confirms the HMAC is actually verified — a tampered + body or a key-substitution attack doesn't sneak through.""" + # Plant a known peer secret. The request will sign with a + # DIFFERENT key, so verification must fail. + from services.config import get_settings + monkeypatch.setenv("MESH_PEER_PUSH_SECRET", "real-secret-32-chars-min-padding-padding") + get_settings.cache_clear() + + body = json.dumps({ + "envelope": { + "msg_id": "dm_wronghmac", + "mailbox_key": "mb", + "sender_block_ref": "sender", + "ciphertext": "x", + }, + }).encode("utf-8") + wrong_hmac = hmac.new(b"wrong-key", body, hashlib.sha256).hexdigest() + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + content=body, + headers={ + "Content-Type": "application/json", + "X-Peer-Url": "http://example-peer.onion:8000", + "X-Peer-HMAC": wrong_hmac, + }, + ) + assert r.status_code == 403 + + def test_rejects_oversize_body(self, remote_client): + """64 KB ceiling — anything bigger doesn't even get parsed. + Defends against memory amplification via giant ciphertexts.""" + # 100 KB body is well over the 64 KB cap. + big = b"{" + b"x" * 100_000 + b"}" + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + content=big, + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(big)), + }, + ) + assert r.status_code in (400, 413), ( + f"oversize body should be rejected with 400/413, got {r.status_code}" + ) + + +class TestReplicateEndpointRegistered: + def test_route_present_in_app(self): + """Static check that the route is actually wired into the app. + Catches a future refactor that drops the router include or + deletes the endpoint by accident.""" + from main import app + + paths_methods = set() + for route in app.routes: + path = getattr(route, "path", None) + methods = getattr(route, "methods", set()) or set() + for m in methods: + paths_methods.add((m, path)) + + assert ("POST", "/api/mesh/dm/replicate-envelope") in paths_methods, ( + "POST /api/mesh/dm/replicate-envelope is not registered on the app" + )