diff --git a/backend/routers/mesh_peer_sync.py b/backend/routers/mesh_peer_sync.py index fbad9bf..4427803 100644 --- a/backend/routers/mesh_peer_sync.py +++ b/backend/routers/mesh_peer_sync.py @@ -85,6 +85,64 @@ async def infonet_peer_push(request: Request): return {"ok": True, **result} +@router.post("/api/mesh/dm/replicate-envelope") +@limiter.limit("60/minute") +async def dm_replicate_envelope(request: Request): + """Accept a DM envelope replicated from a peer relay (cross-node mailbox). + + Companion endpoint to ``DMRelay.replicate_to_peers`` (outbound, in + ``mesh_dm_relay.py``). The sender's relay POSTs an encrypted DM + envelope here after a successful local ``deposit``; this endpoint + re-enforces the per-(sender, recipient) anti-spam cap and stores + the envelope in the local mailbox if accepted. + + The cap is the network rule: a hostile sender's relay can spool + extras locally, but every honest peer enforces the cap on inbound + replication. Recipient polling from any honest peer therefore + never sees more than ``MESH_DM_PENDING_PER_SENDER_LIMIT`` pending + from any one sender, no matter how many spam attempts were tried. + + Same HMAC auth pattern as ``infonet_peer_push`` and ``gate_peer_push``. + """ + content_length = request.headers.get("content-length") + if content_length: + try: + # DM envelopes are bounded by MESH_DM_MAX_MSG_BYTES + envelope + # overhead; 64 KB is a generous ceiling. + if int(content_length) > 65_536: + return Response( + content='{"ok":false,"detail":"Request body too large (max 64KB)"}', + status_code=413, media_type="application/json", + ) + except (ValueError, TypeError): + pass + body_bytes = await request.body() + if not _verify_peer_push_hmac(request, body_bytes): + return Response( + content='{"ok":false,"detail":"Invalid or missing peer HMAC"}', + status_code=403, media_type="application/json", + ) + try: + body = json_mod.loads(body_bytes or b"{}") + except (ValueError, TypeError): + return Response( + content='{"ok":false,"detail":"Invalid JSON body"}', + status_code=400, media_type="application/json", + ) + envelope = body.get("envelope") + if not isinstance(envelope, dict): + return {"ok": False, "detail": "envelope must be an object"} + + originating_peer = _peer_hmac_url_from_request(request) or "" + + from services.mesh.mesh_dm_relay import dm_relay + result = dm_relay.accept_replica( + envelope=envelope, + originating_peer_url=originating_peer, + ) + return result + + @router.post("/api/mesh/gate/peer-push") @limiter.limit("30/minute") async def gate_peer_push(request: Request): diff --git a/backend/services/config.py b/backend/services/config.py index 518e09c..46d3f6c 100644 --- a/backend/services/config.py +++ b/backend/services/config.py @@ -116,6 +116,21 @@ class Settings(BaseSettings): MESH_DM_REQUEST_MAILBOX_LIMIT: int = 12 MESH_DM_SHARED_MAILBOX_LIMIT: int = 48 MESH_DM_SELF_MAILBOX_LIMIT: int = 12 + # Anti-spam: cap on distinct UNACKED messages a single sender can have + # parked in a single recipient's mailbox at any one time. Once the + # recipient pulls (acks) a message, the sender's quota for that pair + # frees up. Default 2 — a sender who wants to deliver more must wait + # for the recipient to actually read the prior messages. + # + # This cap is enforced TWICE: once on the local deposit path (the + # sender's own node refuses to spool the 3rd message) AND once on + # the replication-acceptance path (honest peer relays refuse to + # accept inbound replicas that would put them over the cap). The + # double enforcement makes the rule a NETWORK rule — patching out + # the local check on a hostile sender's relay doesn't let extras + # propagate, because every honest peer enforces the same cap on + # inbound replication. + MESH_DM_PENDING_PER_SENDER_LIMIT: int = 2 MESH_BLOCK_LEGACY_AGENT_ID_LOOKUP: bool = True MESH_ALLOW_COMPAT_DM_INVITE_IMPORT: bool = False MESH_ALLOW_COMPAT_DM_INVITE_IMPORT_UNTIL: str = "" diff --git a/backend/services/mesh/mesh_dm_relay.py b/backend/services/mesh/mesh_dm_relay.py index b732d59..cc12b0c 100644 --- a/backend/services/mesh/mesh_dm_relay.py +++ b/backend/services/mesh/mesh_dm_relay.py @@ -317,6 +317,39 @@ class DMRelay: def _self_mailbox_limit(self) -> int: return max(1, int(self._settings().MESH_DM_SELF_MAILBOX_LIMIT)) + def _per_sender_pending_limit(self) -> int: + """Anti-spam cap on UNACKED messages a single sender can have parked + in a single recipient mailbox at any one time. See ``config.py`` + ``MESH_DM_PENDING_PER_SENDER_LIMIT`` for the threat model — this + rule is enforced both at ``deposit`` (local) and at + ``accept_replica`` (peer push acceptance), making it a network + rule rather than a client-side honor system.""" + try: + limit = int(getattr(self._settings(), "MESH_DM_PENDING_PER_SENDER_LIMIT", 2) or 2) + except (TypeError, ValueError): + limit = 2 + return max(1, limit) + + def _per_sender_pending_count( + self, + *, + mailbox_key: str, + sender_block_ref: str, + ) -> int: + """Count UNACKED messages from ``sender_block_ref`` currently parked + in ``mailbox_key``. Caller already holds ``self._lock``. + + Messages that have been claimed/acked are removed from the mailbox + list (see ``claim_message_ids``), so anything still here is by + definition unacked. We count by exact ``sender_block_ref`` match + — that's the per-pair sender identity used for blocking too, so + the cap is naturally per-(sender, recipient). + """ + if not mailbox_key or not sender_block_ref: + return 0 + messages = self._mailboxes.get(mailbox_key, []) + return sum(1 for m in messages if m.sender_block_ref == sender_block_ref) + def _nonce_ttl_seconds(self) -> int: return max(30, int(self._settings().MESH_DM_NONCE_TTL_S)) @@ -1515,6 +1548,29 @@ class DMRelay: if len(self._mailboxes[mailbox_key]) >= self._mailbox_limit_for_class(delivery_class): metrics_inc("dm_drop_full") return {"ok": False, "detail": "Recipient mailbox full"} + # Anti-spam: per-(sender, recipient) cap on unacked messages. + # A sender who already has the configured number of messages + # parked in this mailbox can't deposit more until the recipient + # pulls (acks) at least one. The same cap is re-enforced on + # inbound replication in ``accept_replica`` so this rule isn't + # bypassable by patching out the local check on a hostile + # sender's relay — see config.py + # MESH_DM_PENDING_PER_SENDER_LIMIT for the threat model. + per_sender_limit = self._per_sender_pending_limit() + pending = self._per_sender_pending_count( + mailbox_key=mailbox_key, + sender_block_ref=sender_block_ref, + ) + if pending >= per_sender_limit: + metrics_inc("dm_drop_per_sender_cap") + return { + "ok": False, + "detail": ( + f"Recipient already has {pending} unread message" + f"{'s' if pending != 1 else ''} from you. Wait for " + "them to read your messages before sending more." + ), + } if not msg_id: msg_id = f"dm_{int(time.time() * 1000)}_{secrets.token_hex(6)}" elif any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key]): @@ -1539,8 +1595,245 @@ class DMRelay: ) self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values()) self._save() + # Cross-node mailbox replication: push the freshly-stored + # envelope to every authenticated relay peer so the recipient + # can log into ANY node and find their messages. The push is + # async (fire-and-forget thread) so deposit() returns + # immediately — slow Tor peers can't block the sender's UX. + # Each receiving peer re-enforces the per-sender cap on + # acceptance, so hostile relays can't widen the cap. + try: + envelope_for_push = self.envelope_for_replication( + mailbox_key=mailbox_key, msg_id=msg_id, + ) + if envelope_for_push: + self._replicate_envelope_to_peers_async( + envelope=envelope_for_push, + ) + except Exception: + metrics_inc("dm_replication_push_error") return {"ok": True, "msg_id": msg_id} + def accept_replica( + self, + *, + envelope: dict[str, Any], + originating_peer_url: str = "", + ) -> dict[str, Any]: + """Receive a DM envelope replicated from a peer relay. + + Cross-node mailbox replication entry point. When a sender's local + relay accepts a ``deposit`` and pushes the envelope to + ``MESH_RELAY_PEERS`` (so the recipient can log into any peer + node and find their messages), each receiving peer calls + ``accept_replica`` to ingest it. + + The per-(sender, recipient) cap is re-enforced HERE. That's what + makes the rule a NETWORK rule rather than a client-side honor + system: a hostile sender who patches out the local ``deposit`` + check still can't get a 3rd unacked message to spread, because + every honest peer enforces the same cap on inbound replicas. + Result: hostile relays can hold extras locally, but those extras + never reach any node a legitimate recipient is polling from. + + Returns the same shape as ``deposit`` so the calling endpoint can + forward the result back to the originating peer. + """ + if not isinstance(envelope, dict): + return {"ok": False, "detail": "envelope must be an object"} + msg_id = str(envelope.get("msg_id", "") or "").strip() + mailbox_key = str(envelope.get("mailbox_key", "") or "").strip() + sender_block_ref = str(envelope.get("sender_block_ref", "") or "").strip() + ciphertext = str(envelope.get("ciphertext", "") or "") + if not msg_id or not mailbox_key or not sender_block_ref or not ciphertext: + return {"ok": False, "detail": "envelope missing required fields"} + + with self._lock: + self._refresh_from_shared_relay() + self._cleanup_expired() + + # Idempotent — if we already hold this exact msg_id, the + # replication round-tripped or a peer pushed the same + # envelope through multiple paths. Accept silently. + if any(m.msg_id == msg_id for m in self._mailboxes.get(mailbox_key, [])): + metrics_inc("dm_replica_duplicate") + return {"ok": True, "msg_id": msg_id, "duplicate": True} + + # Same per-class cap as the deposit path — defense in depth + # against a peer that wraps a "deposit" as a "replica" to + # bypass the class limit. + delivery_class = str(envelope.get("delivery_class", "") or "") + if delivery_class in ("request", "shared", "self"): + class_limit = self._mailbox_limit_for_class(delivery_class) + else: + class_limit = self._shared_mailbox_limit() + if len(self._mailboxes.get(mailbox_key, [])) >= class_limit: + metrics_inc("dm_replica_drop_full") + return {"ok": False, "detail": "Recipient mailbox full"} + + # THE network rule: per-(sender, recipient) anti-spam cap. + per_sender_limit = self._per_sender_pending_limit() + pending = self._per_sender_pending_count( + mailbox_key=mailbox_key, + sender_block_ref=sender_block_ref, + ) + if pending >= per_sender_limit: + metrics_inc("dm_replica_drop_per_sender_cap") + # Returning a structured rejection — the sender's relay + # learns its envelope was rejected by an honest peer and + # can stop trying to push it. + return { + "ok": False, + "detail": ( + "Per-sender cap reached on this relay; refusing replica" + ), + "cap_violation": True, + "pending": pending, + "limit": per_sender_limit, + } + + # Accept the replica into the local mailbox. + self._mailboxes[mailbox_key].append( + DMMessage( + sender_id=str(envelope.get("sender_id", "") or ""), + ciphertext=ciphertext, + timestamp=float(envelope.get("timestamp", time.time()) or time.time()), + msg_id=msg_id, + delivery_class=str(envelope.get("delivery_class", "shared") or "shared"), + sender_seal=str(envelope.get("sender_seal", "") or ""), + relay_salt=str(envelope.get("relay_salt", "") or ""), + sender_block_ref=sender_block_ref, + payload_format=str(envelope.get("payload_format", "dm1") or "dm1"), + session_welcome=str(envelope.get("session_welcome", "") or ""), + ) + ) + self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values()) + self._save() + metrics_inc("dm_replica_accepted") + return {"ok": True, "msg_id": msg_id} + + def _replicate_envelope_to_peers_async( + self, + *, + envelope: dict[str, Any], + ) -> None: + """Push an outbound DM envelope to every authenticated relay peer. + + Fire-and-forget: spawned in a background thread so ``deposit`` + returns to the caller immediately. Per-peer errors are logged + and swallowed — the sender's UX must not block on slow Tor + peers, and a peer that's down today gets the next message + whenever it comes back. Inbound recipient polling from a healthy + peer keeps the system functional during peer failures. + + Each peer is authed with the existing per-peer HMAC pattern + (#256) — same headers and key resolver gate-message replication + uses, so a hostile node that doesn't know any peer's HMAC key + can't impersonate a legitimate relay. + """ + import threading + + def _do_push(): + try: + import hashlib + import hmac + import requests as _requests + + from services.mesh.mesh_crypto import ( + normalize_peer_url, + resolve_peer_key_for_url, + ) + from services.mesh.mesh_router import ( + authenticated_push_peer_urls, + ) + + peers = authenticated_push_peer_urls() + if not peers: + return + + payload = json.dumps( + {"envelope": envelope}, + separators=(",", ":"), + ensure_ascii=False, + ).encode("utf-8") + + timeout = max( + 1, + int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10), + ) + + for peer_url in peers: + try: + normalized = normalize_peer_url(peer_url) + headers = {"Content-Type": "application/json"} + peer_key = resolve_peer_key_for_url(normalized) + if peer_key: + headers["X-Peer-Url"] = normalized + headers["X-Peer-HMAC"] = hmac.new( + peer_key, payload, hashlib.sha256 + ).hexdigest() + url = f"{peer_url}/api/mesh/dm/replicate-envelope" + resp = _requests.post( + url, data=payload, timeout=timeout, headers=headers, + ) + if resp.status_code == 200: + metrics_inc("dm_replication_push_ok") + else: + # 4xx including the structured cap_violation + # rejection from accept_replica — sender's + # relay learns and stops retrying this msg_id. + metrics_inc("dm_replication_push_rejected") + except Exception: + # Per-peer failure is non-fatal — log to metrics + # but don't break the loop. Other peers and a + # future retry can still propagate the envelope. + metrics_inc("dm_replication_push_error") + continue + except Exception: + # Outer guard — never let replication errors propagate + # back to the sender's deposit() caller. + metrics_inc("dm_replication_push_error") + + thread = threading.Thread( + target=_do_push, + name="dm-replicate-push", + daemon=True, + ) + thread.start() + + def envelope_for_replication( + self, + *, + mailbox_key: str, + msg_id: str, + ) -> dict[str, Any] | None: + """Return the wire-form envelope for a stored message, suitable + for POSTing to a peer relay's replicate-envelope endpoint. + + Returns ``None`` if the message isn't in the mailbox (already + acked, expired, never existed). The caller holds the + responsibility for transport security (Tor SOCKS for .onion + peers, per-peer HMAC) and for not leaking the envelope to + clearnet peers when private transport is required. + """ + with self._lock: + for m in self._mailboxes.get(mailbox_key, []): + if m.msg_id == msg_id: + return { + "msg_id": m.msg_id, + "mailbox_key": mailbox_key, + "sender_id": m.sender_id, + "sender_block_ref": m.sender_block_ref, + "sender_seal": m.sender_seal, + "ciphertext": m.ciphertext, + "timestamp": m.timestamp, + "delivery_class": m.delivery_class, + "relay_salt": m.relay_salt, + "payload_format": m.payload_format, + "session_welcome": m.session_welcome, + } + return None + def is_blocked(self, recipient_id: str, sender_id: str) -> bool: with self._lock: self._refresh_from_shared_relay() diff --git a/backend/tests/test_dm_relay_per_sender_cap.py b/backend/tests/test_dm_relay_per_sender_cap.py new file mode 100644 index 0000000..5f1f208 --- /dev/null +++ b/backend/tests/test_dm_relay_per_sender_cap.py @@ -0,0 +1,270 @@ +"""Per-(sender, recipient) anti-spam cap on the DM relay. + +The user-stated rule: a single sender can have at most N UNACKED messages +parked in a single recipient's mailbox at any one time (N=2 by default). +Once the recipient pulls a message, the sender's quota for that pair +frees up. + +Network rule, not local rule +----------------------------- +The cap is enforced TWICE: + +1. ``DMRelay.deposit(...)`` -- local check on the sender's own node. + Refuses to spool the (N+1)th message before it can be replicated. + +2. ``DMRelay.accept_replica(...)`` -- replication-acceptance check on + every receiving peer. Refuses to accept an inbound replica that + would put the local mailbox over the cap, even if the originating + peer claims it had cap room. + +The double enforcement matters because cap (1) is client-side -- a +hostile relay could patch it out and continue to spool extras locally. +Cap (2) means those extras can't propagate: every honest peer rejects +them on the way in. A recipient who polls from honest peers therefore +never sees more than N pending from any one sender, regardless of how +many spam attempts the sender's own relay accepted. + +These tests pin both halves of the rule. +""" + +from __future__ import annotations + +import time + +import pytest + + +@pytest.fixture +def relay(): + """Fresh ``DMRelay`` per test.""" + from services.mesh.mesh_dm_relay import DMRelay + r = DMRelay() + r._mailboxes.clear() + r._blocks.clear() + r._stats = {"messages_in_memory": 0} + return r + + +def _deposit( + relay, + *, + sender: str = "alice", + recipient_token: str = "bob_mailbox_token_abc", + ciphertext: str = "ciphertext-blob", + msg_id: str = "", +): + """Convenience wrapper using ``shared`` delivery class.""" + return relay.deposit( + sender_id=sender, + raw_sender_id=sender, + recipient_id="bob", + ciphertext=ciphertext, + msg_id=msg_id, + delivery_class="shared", + recipient_token=recipient_token, + ) + + +# --------------------------------------------------------------------------- +# Local cap on ``deposit`` +# --------------------------------------------------------------------------- + + +class TestDepositCap: + def test_two_deposits_from_same_sender_succeed(self, relay): + r1 = _deposit(relay) + r2 = _deposit(relay) + assert r1["ok"] is True + assert r2["ok"] is True + assert r1["msg_id"] != r2["msg_id"] + + def test_third_deposit_from_same_sender_rejected(self, relay): + _deposit(relay) + _deposit(relay) + r3 = _deposit(relay) + assert r3["ok"] is False + detail = r3["detail"].lower() + assert "unread" in detail or "read your messages" in detail + + def test_different_senders_have_independent_quotas(self, relay): + for _ in range(2): + assert _deposit(relay, sender="alice")["ok"] is True + for _ in range(2): + assert _deposit(relay, sender="carol")["ok"] is True + assert _deposit(relay, sender="carol")["ok"] is False + + def test_different_recipients_have_independent_quotas(self, relay): + for _ in range(2): + assert _deposit(relay, sender="alice", recipient_token="bob_token")["ok"] is True + for _ in range(2): + assert _deposit(relay, sender="alice", recipient_token="dave_token")["ok"] is True + + def test_ack_frees_quota(self, relay): + r1 = _deposit(relay) + _deposit(relay) + assert _deposit(relay)["ok"] is False + + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + relay._mailboxes[mailbox_key] = [ + m for m in relay._mailboxes[mailbox_key] + if m.msg_id != r1["msg_id"] + ] + relay._stats["messages_in_memory"] = sum( + len(v) for v in relay._mailboxes.values() + ) + + r3 = _deposit(relay) + assert r3["ok"] is True, f"expected quota free after ack, got: {r3}" + + def test_cap_is_env_tunable(self, relay, monkeypatch): + import services.mesh.mesh_dm_relay as mdr + monkeypatch.setattr( + mdr.DMRelay, + "_per_sender_pending_limit", + lambda self: 1, + ) + + assert _deposit(relay)["ok"] is True + assert _deposit(relay)["ok"] is False + + +# --------------------------------------------------------------------------- +# Replication-acceptance cap (the half that makes this a network rule) +# --------------------------------------------------------------------------- + + +class TestAcceptReplicaCap: + def _envelope(self, *, msg_id: str, sender_block_ref: str, mailbox_key: str): + return { + "msg_id": msg_id, + "mailbox_key": mailbox_key, + "sender_block_ref": sender_block_ref, + "sender_id": "alice", + "sender_seal": "", + "ciphertext": f"ciphertext-{msg_id}", + "timestamp": time.time(), + "delivery_class": "shared", + "relay_salt": "", + "payload_format": "dm1", + "session_welcome": "", + } + + def test_replica_accepted_under_cap(self, relay): + env = self._envelope( + msg_id="dm_replica_1", + sender_block_ref="alice_block_ref", + mailbox_key="mailbox_xyz", + ) + result = relay.accept_replica(envelope=env) + assert result["ok"] is True + + def test_replica_idempotent_on_duplicate_msg_id(self, relay): + mailbox_key = "mailbox_xyz" + env = self._envelope( + msg_id="dm_dup_1", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + ) + r1 = relay.accept_replica(envelope=env) + r2 = relay.accept_replica(envelope=env) + assert r1["ok"] is True + assert r2["ok"] is True + assert r2.get("duplicate") is True + assert len(relay._mailboxes[mailbox_key]) == 1 + + def test_replica_rejected_when_local_count_already_at_cap(self, relay): + mailbox_key = "mailbox_xyz" + for i in (1, 2): + relay.accept_replica(envelope=self._envelope( + msg_id=f"dm_seeded_{i}", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + + result = relay.accept_replica(envelope=self._envelope( + msg_id="dm_overcap_3", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + assert result["ok"] is False + assert result.get("cap_violation") is True + assert result.get("pending") == 2 + assert result.get("limit") == 2 + assert len(relay._mailboxes[mailbox_key]) == 2 + + def test_replica_from_different_sender_passes_when_one_is_at_cap(self, relay): + mailbox_key = "mailbox_xyz" + for i in (1, 2): + relay.accept_replica(envelope=self._envelope( + msg_id=f"dm_alice_{i}", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + assert relay.accept_replica(envelope=self._envelope( + msg_id="dm_alice_3", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + ))["ok"] is False + assert relay.accept_replica(envelope=self._envelope( + msg_id="dm_carol_1", + sender_block_ref="carol_block_ref", + mailbox_key=mailbox_key, + ))["ok"] is True + + def test_replica_rejects_malformed_envelopes(self, relay): + for bad in ( + {}, + {"msg_id": "x"}, + {"msg_id": "x", "mailbox_key": "y"}, + "not an object at all", + ): + result = relay.accept_replica(envelope=bad) + assert result["ok"] is False + + +# --------------------------------------------------------------------------- +# ``envelope_for_replication`` -- helper for the outbound replication path +# --------------------------------------------------------------------------- + + +class TestEnvelopeForReplication: + def test_returns_envelope_for_stored_message(self, relay): + r = _deposit(relay, ciphertext="hello-ciphertext") + msg_id = r["msg_id"] + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + + env = relay.envelope_for_replication(mailbox_key=mailbox_key, msg_id=msg_id) + assert env is not None + assert env["msg_id"] == msg_id + assert env["mailbox_key"] == mailbox_key + assert env["ciphertext"] == "hello-ciphertext" + assert env["delivery_class"] == "shared" + for k in ("msg_id", "mailbox_key", "sender_block_ref", "ciphertext"): + assert env.get(k), f"envelope missing required field {k!r}" + + def test_returns_none_for_unknown_message(self, relay): + env = relay.envelope_for_replication( + mailbox_key="never_existed", msg_id="never_existed", + ) + assert env is None + + def test_envelope_round_trips_through_accept_replica(self, relay): + from services.mesh.mesh_dm_relay import DMRelay + receiver_relay = DMRelay() + receiver_relay._mailboxes.clear() + receiver_relay._stats = {"messages_in_memory": 0} + + r = _deposit(relay) + msg_id = r["msg_id"] + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + env = relay.envelope_for_replication( + mailbox_key=mailbox_key, msg_id=msg_id, + ) + assert env is not None + + result = receiver_relay.accept_replica(envelope=env) + assert result["ok"] is True + stored = receiver_relay._mailboxes.get(mailbox_key, []) + assert len(stored) == 1 + assert stored[0].msg_id == msg_id + assert stored[0].ciphertext == "ciphertext-blob" diff --git a/backend/tests/test_dm_replicate_envelope_endpoint.py b/backend/tests/test_dm_replicate_envelope_endpoint.py new file mode 100644 index 0000000..a1f5772 --- /dev/null +++ b/backend/tests/test_dm_replicate_envelope_endpoint.py @@ -0,0 +1,150 @@ +"""POST /api/mesh/dm/replicate-envelope — receiving side of cross-node DM +mailbox replication. + +This is the endpoint that peer relays call when they want to hand off an +encrypted DM envelope to us (so the recipient can log into our node and +find their messages). It re-enforces the per-(sender, recipient) anti-spam +cap so hostile sender relays can't widen the cap by skipping the local +check on their own deposit path. + +The endpoint: + + * authenticates the caller via the existing per-peer HMAC pattern + (same one /api/mesh/infonet/peer-push and /api/mesh/gate/peer-push + use, introduced in #256 — ``X-Peer-Url`` + ``X-Peer-HMAC`` headers + keyed off ``resolve_peer_key_for_url``) + * rejects bodies > 64 KB (DM envelope size is bounded by + ``MESH_DM_MAX_MSG_BYTES`` — 64KB ceiling has generous headroom) + * rejects requests without a valid peer HMAC with 403 + * passes the envelope to ``DMRelay.accept_replica`` which enforces + the cap + +This file pins the endpoint contract. The cap enforcement itself is +tested in ``test_dm_relay_per_sender_cap.py`` against the relay's +``accept_replica`` method directly. +""" + +from __future__ import annotations + +import asyncio +import hashlib +import hmac +import json + +import pytest +from httpx import ASGITransport, AsyncClient + + +@pytest.fixture +def remote_client(): + """ASGI client with peer IP 1.2.3.4 — never on the local-operator + allowlist. Used to prove the endpoint isn't accidentally reachable + by random remote callers without peer HMAC.""" + from main import app + + class _RemoteClient: + def __init__(self): + self._loop = asyncio.new_event_loop() + self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345)) + self._base = "http://1.2.3.4:8000" + + def post(self, url, **kw): + async def go(): + async with AsyncClient(transport=self._transport, base_url=self._base) as ac: + return await ac.post(url, **kw) + return self._loop.run_until_complete(go()) + + def close(self): + self._loop.close() + + c = _RemoteClient() + yield c + c.close() + + +class TestReplicateEndpointAuth: + def test_rejects_request_without_peer_hmac(self, remote_client): + """A peer push that does NOT carry X-Peer-Url + X-Peer-HMAC + must be rejected with 403 before the envelope is ever passed + to the relay. Same gate the existing infonet/gate peer-push + endpoints enforce.""" + payload = { + "envelope": { + "msg_id": "dm_unauth_1", + "mailbox_key": "mb", + "sender_block_ref": "sender", + "ciphertext": "x", + }, + } + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + json=payload, + ) + assert r.status_code == 403 + assert "peer HMAC" in r.text or "peer hmac" in r.text.lower() + + def test_rejects_wrong_peer_hmac(self, remote_client, monkeypatch): + """A request with a peer HMAC header keyed off the WRONG secret + is rejected. Confirms the HMAC is actually verified — a tampered + body or a key-substitution attack doesn't sneak through.""" + # Plant a known peer secret. The request will sign with a + # DIFFERENT key, so verification must fail. + from services.config import get_settings + monkeypatch.setenv("MESH_PEER_PUSH_SECRET", "real-secret-32-chars-min-padding-padding") + get_settings.cache_clear() + + body = json.dumps({ + "envelope": { + "msg_id": "dm_wronghmac", + "mailbox_key": "mb", + "sender_block_ref": "sender", + "ciphertext": "x", + }, + }).encode("utf-8") + wrong_hmac = hmac.new(b"wrong-key", body, hashlib.sha256).hexdigest() + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + content=body, + headers={ + "Content-Type": "application/json", + "X-Peer-Url": "http://example-peer.onion:8000", + "X-Peer-HMAC": wrong_hmac, + }, + ) + assert r.status_code == 403 + + def test_rejects_oversize_body(self, remote_client): + """64 KB ceiling — anything bigger doesn't even get parsed. + Defends against memory amplification via giant ciphertexts.""" + # 100 KB body is well over the 64 KB cap. + big = b"{" + b"x" * 100_000 + b"}" + r = remote_client.post( + "/api/mesh/dm/replicate-envelope", + content=big, + headers={ + "Content-Type": "application/json", + "Content-Length": str(len(big)), + }, + ) + assert r.status_code in (400, 413), ( + f"oversize body should be rejected with 400/413, got {r.status_code}" + ) + + +class TestReplicateEndpointRegistered: + def test_route_present_in_app(self): + """Static check that the route is actually wired into the app. + Catches a future refactor that drops the router include or + deletes the endpoint by accident.""" + from main import app + + paths_methods = set() + for route in app.routes: + path = getattr(route, "path", None) + methods = getattr(route, "methods", set()) or set() + for m in methods: + paths_methods.add((m, path)) + + assert ("POST", "/api/mesh/dm/replicate-envelope") in paths_methods, ( + "POST /api/mesh/dm/replicate-envelope is not registered on the app" + )