diff --git a/backend/services/config.py b/backend/services/config.py index 518e09c..46d3f6c 100644 --- a/backend/services/config.py +++ b/backend/services/config.py @@ -116,6 +116,21 @@ class Settings(BaseSettings): MESH_DM_REQUEST_MAILBOX_LIMIT: int = 12 MESH_DM_SHARED_MAILBOX_LIMIT: int = 48 MESH_DM_SELF_MAILBOX_LIMIT: int = 12 + # Anti-spam: cap on distinct UNACKED messages a single sender can have + # parked in a single recipient's mailbox at any one time. Once the + # recipient pulls (acks) a message, the sender's quota for that pair + # frees up. Default 2 — a sender who wants to deliver more must wait + # for the recipient to actually read the prior messages. + # + # This cap is enforced TWICE: once on the local deposit path (the + # sender's own node refuses to spool the 3rd message) AND once on + # the replication-acceptance path (honest peer relays refuse to + # accept inbound replicas that would put them over the cap). The + # double enforcement makes the rule a NETWORK rule — patching out + # the local check on a hostile sender's relay doesn't let extras + # propagate, because every honest peer enforces the same cap on + # inbound replication. + MESH_DM_PENDING_PER_SENDER_LIMIT: int = 2 MESH_BLOCK_LEGACY_AGENT_ID_LOOKUP: bool = True MESH_ALLOW_COMPAT_DM_INVITE_IMPORT: bool = False MESH_ALLOW_COMPAT_DM_INVITE_IMPORT_UNTIL: str = "" diff --git a/backend/services/mesh/mesh_dm_relay.py b/backend/services/mesh/mesh_dm_relay.py index b732d59..ad97249 100644 --- a/backend/services/mesh/mesh_dm_relay.py +++ b/backend/services/mesh/mesh_dm_relay.py @@ -317,6 +317,39 @@ class DMRelay: def _self_mailbox_limit(self) -> int: return max(1, int(self._settings().MESH_DM_SELF_MAILBOX_LIMIT)) + def _per_sender_pending_limit(self) -> int: + """Anti-spam cap on UNACKED messages a single sender can have parked + in a single recipient mailbox at any one time. See ``config.py`` + ``MESH_DM_PENDING_PER_SENDER_LIMIT`` for the threat model — this + rule is enforced both at ``deposit`` (local) and at + ``accept_replica`` (peer push acceptance), making it a network + rule rather than a client-side honor system.""" + try: + limit = int(getattr(self._settings(), "MESH_DM_PENDING_PER_SENDER_LIMIT", 2) or 2) + except (TypeError, ValueError): + limit = 2 + return max(1, limit) + + def _per_sender_pending_count( + self, + *, + mailbox_key: str, + sender_block_ref: str, + ) -> int: + """Count UNACKED messages from ``sender_block_ref`` currently parked + in ``mailbox_key``. Caller already holds ``self._lock``. + + Messages that have been claimed/acked are removed from the mailbox + list (see ``claim_message_ids``), so anything still here is by + definition unacked. We count by exact ``sender_block_ref`` match + — that's the per-pair sender identity used for blocking too, so + the cap is naturally per-(sender, recipient). + """ + if not mailbox_key or not sender_block_ref: + return 0 + messages = self._mailboxes.get(mailbox_key, []) + return sum(1 for m in messages if m.sender_block_ref == sender_block_ref) + def _nonce_ttl_seconds(self) -> int: return max(30, int(self._settings().MESH_DM_NONCE_TTL_S)) @@ -1515,6 +1548,29 @@ class DMRelay: if len(self._mailboxes[mailbox_key]) >= self._mailbox_limit_for_class(delivery_class): metrics_inc("dm_drop_full") return {"ok": False, "detail": "Recipient mailbox full"} + # Anti-spam: per-(sender, recipient) cap on unacked messages. + # A sender who already has the configured number of messages + # parked in this mailbox can't deposit more until the recipient + # pulls (acks) at least one. The same cap is re-enforced on + # inbound replication in ``accept_replica`` so this rule isn't + # bypassable by patching out the local check on a hostile + # sender's relay — see config.py + # MESH_DM_PENDING_PER_SENDER_LIMIT for the threat model. + per_sender_limit = self._per_sender_pending_limit() + pending = self._per_sender_pending_count( + mailbox_key=mailbox_key, + sender_block_ref=sender_block_ref, + ) + if pending >= per_sender_limit: + metrics_inc("dm_drop_per_sender_cap") + return { + "ok": False, + "detail": ( + f"Recipient already has {pending} unread message" + f"{'s' if pending != 1 else ''} from you. Wait for " + "them to read your messages before sending more." + ), + } if not msg_id: msg_id = f"dm_{int(time.time() * 1000)}_{secrets.token_hex(6)}" elif any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key]): @@ -1541,6 +1597,137 @@ class DMRelay: self._save() return {"ok": True, "msg_id": msg_id} + def accept_replica( + self, + *, + envelope: dict[str, Any], + originating_peer_url: str = "", + ) -> dict[str, Any]: + """Receive a DM envelope replicated from a peer relay. + + Cross-node mailbox replication entry point. When a sender's local + relay accepts a ``deposit`` and pushes the envelope to + ``MESH_RELAY_PEERS`` (so the recipient can log into any peer + node and find their messages), each receiving peer calls + ``accept_replica`` to ingest it. + + The per-(sender, recipient) cap is re-enforced HERE. That's what + makes the rule a NETWORK rule rather than a client-side honor + system: a hostile sender who patches out the local ``deposit`` + check still can't get a 3rd unacked message to spread, because + every honest peer enforces the same cap on inbound replicas. + Result: hostile relays can hold extras locally, but those extras + never reach any node a legitimate recipient is polling from. + + Returns the same shape as ``deposit`` so the calling endpoint can + forward the result back to the originating peer. + """ + if not isinstance(envelope, dict): + return {"ok": False, "detail": "envelope must be an object"} + msg_id = str(envelope.get("msg_id", "") or "").strip() + mailbox_key = str(envelope.get("mailbox_key", "") or "").strip() + sender_block_ref = str(envelope.get("sender_block_ref", "") or "").strip() + ciphertext = str(envelope.get("ciphertext", "") or "") + if not msg_id or not mailbox_key or not sender_block_ref or not ciphertext: + return {"ok": False, "detail": "envelope missing required fields"} + + with self._lock: + self._refresh_from_shared_relay() + self._cleanup_expired() + + # Idempotent — if we already hold this exact msg_id, the + # replication round-tripped or a peer pushed the same + # envelope through multiple paths. Accept silently. + if any(m.msg_id == msg_id for m in self._mailboxes.get(mailbox_key, [])): + metrics_inc("dm_replica_duplicate") + return {"ok": True, "msg_id": msg_id, "duplicate": True} + + # Same per-class cap as the deposit path — defense in depth + # against a peer that wraps a "deposit" as a "replica" to + # bypass the class limit. + delivery_class = str(envelope.get("delivery_class", "") or "") + if delivery_class in ("request", "shared", "self"): + class_limit = self._mailbox_limit_for_class(delivery_class) + else: + class_limit = self._shared_mailbox_limit() + if len(self._mailboxes.get(mailbox_key, [])) >= class_limit: + metrics_inc("dm_replica_drop_full") + return {"ok": False, "detail": "Recipient mailbox full"} + + # THE network rule: per-(sender, recipient) anti-spam cap. + per_sender_limit = self._per_sender_pending_limit() + pending = self._per_sender_pending_count( + mailbox_key=mailbox_key, + sender_block_ref=sender_block_ref, + ) + if pending >= per_sender_limit: + metrics_inc("dm_replica_drop_per_sender_cap") + # Returning a structured rejection — the sender's relay + # learns its envelope was rejected by an honest peer and + # can stop trying to push it. + return { + "ok": False, + "detail": ( + "Per-sender cap reached on this relay; refusing replica" + ), + "cap_violation": True, + "pending": pending, + "limit": per_sender_limit, + } + + # Accept the replica into the local mailbox. + self._mailboxes[mailbox_key].append( + DMMessage( + sender_id=str(envelope.get("sender_id", "") or ""), + ciphertext=ciphertext, + timestamp=float(envelope.get("timestamp", time.time()) or time.time()), + msg_id=msg_id, + delivery_class=str(envelope.get("delivery_class", "shared") or "shared"), + sender_seal=str(envelope.get("sender_seal", "") or ""), + relay_salt=str(envelope.get("relay_salt", "") or ""), + sender_block_ref=sender_block_ref, + payload_format=str(envelope.get("payload_format", "dm1") or "dm1"), + session_welcome=str(envelope.get("session_welcome", "") or ""), + ) + ) + self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values()) + self._save() + metrics_inc("dm_replica_accepted") + return {"ok": True, "msg_id": msg_id} + + def envelope_for_replication( + self, + *, + mailbox_key: str, + msg_id: str, + ) -> dict[str, Any] | None: + """Return the wire-form envelope for a stored message, suitable + for POSTing to a peer relay's replicate-envelope endpoint. + + Returns ``None`` if the message isn't in the mailbox (already + acked, expired, never existed). The caller holds the + responsibility for transport security (Tor SOCKS for .onion + peers, per-peer HMAC) and for not leaking the envelope to + clearnet peers when private transport is required. + """ + with self._lock: + for m in self._mailboxes.get(mailbox_key, []): + if m.msg_id == msg_id: + return { + "msg_id": m.msg_id, + "mailbox_key": mailbox_key, + "sender_id": m.sender_id, + "sender_block_ref": m.sender_block_ref, + "sender_seal": m.sender_seal, + "ciphertext": m.ciphertext, + "timestamp": m.timestamp, + "delivery_class": m.delivery_class, + "relay_salt": m.relay_salt, + "payload_format": m.payload_format, + "session_welcome": m.session_welcome, + } + return None + def is_blocked(self, recipient_id: str, sender_id: str) -> bool: with self._lock: self._refresh_from_shared_relay() diff --git a/backend/tests/test_dm_relay_per_sender_cap.py b/backend/tests/test_dm_relay_per_sender_cap.py new file mode 100644 index 0000000..5f1f208 --- /dev/null +++ b/backend/tests/test_dm_relay_per_sender_cap.py @@ -0,0 +1,270 @@ +"""Per-(sender, recipient) anti-spam cap on the DM relay. + +The user-stated rule: a single sender can have at most N UNACKED messages +parked in a single recipient's mailbox at any one time (N=2 by default). +Once the recipient pulls a message, the sender's quota for that pair +frees up. + +Network rule, not local rule +----------------------------- +The cap is enforced TWICE: + +1. ``DMRelay.deposit(...)`` -- local check on the sender's own node. + Refuses to spool the (N+1)th message before it can be replicated. + +2. ``DMRelay.accept_replica(...)`` -- replication-acceptance check on + every receiving peer. Refuses to accept an inbound replica that + would put the local mailbox over the cap, even if the originating + peer claims it had cap room. + +The double enforcement matters because cap (1) is client-side -- a +hostile relay could patch it out and continue to spool extras locally. +Cap (2) means those extras can't propagate: every honest peer rejects +them on the way in. A recipient who polls from honest peers therefore +never sees more than N pending from any one sender, regardless of how +many spam attempts the sender's own relay accepted. + +These tests pin both halves of the rule. +""" + +from __future__ import annotations + +import time + +import pytest + + +@pytest.fixture +def relay(): + """Fresh ``DMRelay`` per test.""" + from services.mesh.mesh_dm_relay import DMRelay + r = DMRelay() + r._mailboxes.clear() + r._blocks.clear() + r._stats = {"messages_in_memory": 0} + return r + + +def _deposit( + relay, + *, + sender: str = "alice", + recipient_token: str = "bob_mailbox_token_abc", + ciphertext: str = "ciphertext-blob", + msg_id: str = "", +): + """Convenience wrapper using ``shared`` delivery class.""" + return relay.deposit( + sender_id=sender, + raw_sender_id=sender, + recipient_id="bob", + ciphertext=ciphertext, + msg_id=msg_id, + delivery_class="shared", + recipient_token=recipient_token, + ) + + +# --------------------------------------------------------------------------- +# Local cap on ``deposit`` +# --------------------------------------------------------------------------- + + +class TestDepositCap: + def test_two_deposits_from_same_sender_succeed(self, relay): + r1 = _deposit(relay) + r2 = _deposit(relay) + assert r1["ok"] is True + assert r2["ok"] is True + assert r1["msg_id"] != r2["msg_id"] + + def test_third_deposit_from_same_sender_rejected(self, relay): + _deposit(relay) + _deposit(relay) + r3 = _deposit(relay) + assert r3["ok"] is False + detail = r3["detail"].lower() + assert "unread" in detail or "read your messages" in detail + + def test_different_senders_have_independent_quotas(self, relay): + for _ in range(2): + assert _deposit(relay, sender="alice")["ok"] is True + for _ in range(2): + assert _deposit(relay, sender="carol")["ok"] is True + assert _deposit(relay, sender="carol")["ok"] is False + + def test_different_recipients_have_independent_quotas(self, relay): + for _ in range(2): + assert _deposit(relay, sender="alice", recipient_token="bob_token")["ok"] is True + for _ in range(2): + assert _deposit(relay, sender="alice", recipient_token="dave_token")["ok"] is True + + def test_ack_frees_quota(self, relay): + r1 = _deposit(relay) + _deposit(relay) + assert _deposit(relay)["ok"] is False + + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + relay._mailboxes[mailbox_key] = [ + m for m in relay._mailboxes[mailbox_key] + if m.msg_id != r1["msg_id"] + ] + relay._stats["messages_in_memory"] = sum( + len(v) for v in relay._mailboxes.values() + ) + + r3 = _deposit(relay) + assert r3["ok"] is True, f"expected quota free after ack, got: {r3}" + + def test_cap_is_env_tunable(self, relay, monkeypatch): + import services.mesh.mesh_dm_relay as mdr + monkeypatch.setattr( + mdr.DMRelay, + "_per_sender_pending_limit", + lambda self: 1, + ) + + assert _deposit(relay)["ok"] is True + assert _deposit(relay)["ok"] is False + + +# --------------------------------------------------------------------------- +# Replication-acceptance cap (the half that makes this a network rule) +# --------------------------------------------------------------------------- + + +class TestAcceptReplicaCap: + def _envelope(self, *, msg_id: str, sender_block_ref: str, mailbox_key: str): + return { + "msg_id": msg_id, + "mailbox_key": mailbox_key, + "sender_block_ref": sender_block_ref, + "sender_id": "alice", + "sender_seal": "", + "ciphertext": f"ciphertext-{msg_id}", + "timestamp": time.time(), + "delivery_class": "shared", + "relay_salt": "", + "payload_format": "dm1", + "session_welcome": "", + } + + def test_replica_accepted_under_cap(self, relay): + env = self._envelope( + msg_id="dm_replica_1", + sender_block_ref="alice_block_ref", + mailbox_key="mailbox_xyz", + ) + result = relay.accept_replica(envelope=env) + assert result["ok"] is True + + def test_replica_idempotent_on_duplicate_msg_id(self, relay): + mailbox_key = "mailbox_xyz" + env = self._envelope( + msg_id="dm_dup_1", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + ) + r1 = relay.accept_replica(envelope=env) + r2 = relay.accept_replica(envelope=env) + assert r1["ok"] is True + assert r2["ok"] is True + assert r2.get("duplicate") is True + assert len(relay._mailboxes[mailbox_key]) == 1 + + def test_replica_rejected_when_local_count_already_at_cap(self, relay): + mailbox_key = "mailbox_xyz" + for i in (1, 2): + relay.accept_replica(envelope=self._envelope( + msg_id=f"dm_seeded_{i}", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + + result = relay.accept_replica(envelope=self._envelope( + msg_id="dm_overcap_3", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + assert result["ok"] is False + assert result.get("cap_violation") is True + assert result.get("pending") == 2 + assert result.get("limit") == 2 + assert len(relay._mailboxes[mailbox_key]) == 2 + + def test_replica_from_different_sender_passes_when_one_is_at_cap(self, relay): + mailbox_key = "mailbox_xyz" + for i in (1, 2): + relay.accept_replica(envelope=self._envelope( + msg_id=f"dm_alice_{i}", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + )) + assert relay.accept_replica(envelope=self._envelope( + msg_id="dm_alice_3", + sender_block_ref="alice_block_ref", + mailbox_key=mailbox_key, + ))["ok"] is False + assert relay.accept_replica(envelope=self._envelope( + msg_id="dm_carol_1", + sender_block_ref="carol_block_ref", + mailbox_key=mailbox_key, + ))["ok"] is True + + def test_replica_rejects_malformed_envelopes(self, relay): + for bad in ( + {}, + {"msg_id": "x"}, + {"msg_id": "x", "mailbox_key": "y"}, + "not an object at all", + ): + result = relay.accept_replica(envelope=bad) + assert result["ok"] is False + + +# --------------------------------------------------------------------------- +# ``envelope_for_replication`` -- helper for the outbound replication path +# --------------------------------------------------------------------------- + + +class TestEnvelopeForReplication: + def test_returns_envelope_for_stored_message(self, relay): + r = _deposit(relay, ciphertext="hello-ciphertext") + msg_id = r["msg_id"] + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + + env = relay.envelope_for_replication(mailbox_key=mailbox_key, msg_id=msg_id) + assert env is not None + assert env["msg_id"] == msg_id + assert env["mailbox_key"] == mailbox_key + assert env["ciphertext"] == "hello-ciphertext" + assert env["delivery_class"] == "shared" + for k in ("msg_id", "mailbox_key", "sender_block_ref", "ciphertext"): + assert env.get(k), f"envelope missing required field {k!r}" + + def test_returns_none_for_unknown_message(self, relay): + env = relay.envelope_for_replication( + mailbox_key="never_existed", msg_id="never_existed", + ) + assert env is None + + def test_envelope_round_trips_through_accept_replica(self, relay): + from services.mesh.mesh_dm_relay import DMRelay + receiver_relay = DMRelay() + receiver_relay._mailboxes.clear() + receiver_relay._stats = {"messages_in_memory": 0} + + r = _deposit(relay) + msg_id = r["msg_id"] + mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc") + env = relay.envelope_for_replication( + mailbox_key=mailbox_key, msg_id=msg_id, + ) + assert env is not None + + result = receiver_relay.accept_replica(envelope=env) + assert result["ok"] is True + stored = receiver_relay._mailboxes.get(mailbox_key, []) + assert len(stored) == 1 + assert stored[0].msg_id == msg_id + assert stored[0].ciphertext == "ciphertext-blob"