Merge pull request #309 from BigBodyCobain/feat/cross-node-dm-mailbox-replication

DM mailbox: per-(sender, recipient) anti-spam cap + replication primitives
This commit is contained in:
Shadowbroker
2026-05-22 22:43:26 -06:00
committed by GitHub
5 changed files with 786 additions and 0 deletions
+58
View File
@@ -85,6 +85,64 @@ async def infonet_peer_push(request: Request):
return {"ok": True, **result}
@router.post("/api/mesh/dm/replicate-envelope")
@limiter.limit("60/minute")
async def dm_replicate_envelope(request: Request):
"""Accept a DM envelope replicated from a peer relay (cross-node mailbox).
Companion endpoint to ``DMRelay.replicate_to_peers`` (outbound, in
``mesh_dm_relay.py``). The sender's relay POSTs an encrypted DM
envelope here after a successful local ``deposit``; this endpoint
re-enforces the per-(sender, recipient) anti-spam cap and stores
the envelope in the local mailbox if accepted.
The cap is the network rule: a hostile sender's relay can spool
extras locally, but every honest peer enforces the cap on inbound
replication. Recipient polling from any honest peer therefore
never sees more than ``MESH_DM_PENDING_PER_SENDER_LIMIT`` pending
from any one sender, no matter how many spam attempts were tried.
Same HMAC auth pattern as ``infonet_peer_push`` and ``gate_peer_push``.
"""
content_length = request.headers.get("content-length")
if content_length:
try:
# DM envelopes are bounded by MESH_DM_MAX_MSG_BYTES + envelope
# overhead; 64 KB is a generous ceiling.
if int(content_length) > 65_536:
return Response(
content='{"ok":false,"detail":"Request body too large (max 64KB)"}',
status_code=413, media_type="application/json",
)
except (ValueError, TypeError):
pass
body_bytes = await request.body()
if not _verify_peer_push_hmac(request, body_bytes):
return Response(
content='{"ok":false,"detail":"Invalid or missing peer HMAC"}',
status_code=403, media_type="application/json",
)
try:
body = json_mod.loads(body_bytes or b"{}")
except (ValueError, TypeError):
return Response(
content='{"ok":false,"detail":"Invalid JSON body"}',
status_code=400, media_type="application/json",
)
envelope = body.get("envelope")
if not isinstance(envelope, dict):
return {"ok": False, "detail": "envelope must be an object"}
originating_peer = _peer_hmac_url_from_request(request) or ""
from services.mesh.mesh_dm_relay import dm_relay
result = dm_relay.accept_replica(
envelope=envelope,
originating_peer_url=originating_peer,
)
return result
@router.post("/api/mesh/gate/peer-push")
@limiter.limit("30/minute")
async def gate_peer_push(request: Request):
+15
View File
@@ -116,6 +116,21 @@ class Settings(BaseSettings):
MESH_DM_REQUEST_MAILBOX_LIMIT: int = 12
MESH_DM_SHARED_MAILBOX_LIMIT: int = 48
MESH_DM_SELF_MAILBOX_LIMIT: int = 12
# Anti-spam: cap on distinct UNACKED messages a single sender can have
# parked in a single recipient's mailbox at any one time. Once the
# recipient pulls (acks) a message, the sender's quota for that pair
# frees up. Default 2 — a sender who wants to deliver more must wait
# for the recipient to actually read the prior messages.
#
# This cap is enforced TWICE: once on the local deposit path (the
# sender's own node refuses to spool the 3rd message) AND once on
# the replication-acceptance path (honest peer relays refuse to
# accept inbound replicas that would put them over the cap). The
# double enforcement makes the rule a NETWORK rule — patching out
# the local check on a hostile sender's relay doesn't let extras
# propagate, because every honest peer enforces the same cap on
# inbound replication.
MESH_DM_PENDING_PER_SENDER_LIMIT: int = 2
MESH_BLOCK_LEGACY_AGENT_ID_LOOKUP: bool = True
MESH_ALLOW_COMPAT_DM_INVITE_IMPORT: bool = False
MESH_ALLOW_COMPAT_DM_INVITE_IMPORT_UNTIL: str = ""
+293
View File
@@ -317,6 +317,39 @@ class DMRelay:
def _self_mailbox_limit(self) -> int:
return max(1, int(self._settings().MESH_DM_SELF_MAILBOX_LIMIT))
def _per_sender_pending_limit(self) -> int:
"""Anti-spam cap on UNACKED messages a single sender can have parked
in a single recipient mailbox at any one time. See ``config.py``
``MESH_DM_PENDING_PER_SENDER_LIMIT`` for the threat model — this
rule is enforced both at ``deposit`` (local) and at
``accept_replica`` (peer push acceptance), making it a network
rule rather than a client-side honor system."""
try:
limit = int(getattr(self._settings(), "MESH_DM_PENDING_PER_SENDER_LIMIT", 2) or 2)
except (TypeError, ValueError):
limit = 2
return max(1, limit)
def _per_sender_pending_count(
self,
*,
mailbox_key: str,
sender_block_ref: str,
) -> int:
"""Count UNACKED messages from ``sender_block_ref`` currently parked
in ``mailbox_key``. Caller already holds ``self._lock``.
Messages that have been claimed/acked are removed from the mailbox
list (see ``claim_message_ids``), so anything still here is by
definition unacked. We count by exact ``sender_block_ref`` match
— that's the per-pair sender identity used for blocking too, so
the cap is naturally per-(sender, recipient).
"""
if not mailbox_key or not sender_block_ref:
return 0
messages = self._mailboxes.get(mailbox_key, [])
return sum(1 for m in messages if m.sender_block_ref == sender_block_ref)
def _nonce_ttl_seconds(self) -> int:
return max(30, int(self._settings().MESH_DM_NONCE_TTL_S))
@@ -1515,6 +1548,29 @@ class DMRelay:
if len(self._mailboxes[mailbox_key]) >= self._mailbox_limit_for_class(delivery_class):
metrics_inc("dm_drop_full")
return {"ok": False, "detail": "Recipient mailbox full"}
# Anti-spam: per-(sender, recipient) cap on unacked messages.
# A sender who already has the configured number of messages
# parked in this mailbox can't deposit more until the recipient
# pulls (acks) at least one. The same cap is re-enforced on
# inbound replication in ``accept_replica`` so this rule isn't
# bypassable by patching out the local check on a hostile
# sender's relay — see config.py
# MESH_DM_PENDING_PER_SENDER_LIMIT for the threat model.
per_sender_limit = self._per_sender_pending_limit()
pending = self._per_sender_pending_count(
mailbox_key=mailbox_key,
sender_block_ref=sender_block_ref,
)
if pending >= per_sender_limit:
metrics_inc("dm_drop_per_sender_cap")
return {
"ok": False,
"detail": (
f"Recipient already has {pending} unread message"
f"{'s' if pending != 1 else ''} from you. Wait for "
"them to read your messages before sending more."
),
}
if not msg_id:
msg_id = f"dm_{int(time.time() * 1000)}_{secrets.token_hex(6)}"
elif any(m.msg_id == msg_id for m in self._mailboxes[mailbox_key]):
@@ -1539,8 +1595,245 @@ class DMRelay:
)
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
self._save()
# Cross-node mailbox replication: push the freshly-stored
# envelope to every authenticated relay peer so the recipient
# can log into ANY node and find their messages. The push is
# async (fire-and-forget thread) so deposit() returns
# immediately — slow Tor peers can't block the sender's UX.
# Each receiving peer re-enforces the per-sender cap on
# acceptance, so hostile relays can't widen the cap.
try:
envelope_for_push = self.envelope_for_replication(
mailbox_key=mailbox_key, msg_id=msg_id,
)
if envelope_for_push:
self._replicate_envelope_to_peers_async(
envelope=envelope_for_push,
)
except Exception:
metrics_inc("dm_replication_push_error")
return {"ok": True, "msg_id": msg_id}
def accept_replica(
self,
*,
envelope: dict[str, Any],
originating_peer_url: str = "",
) -> dict[str, Any]:
"""Receive a DM envelope replicated from a peer relay.
Cross-node mailbox replication entry point. When a sender's local
relay accepts a ``deposit`` and pushes the envelope to
``MESH_RELAY_PEERS`` (so the recipient can log into any peer
node and find their messages), each receiving peer calls
``accept_replica`` to ingest it.
The per-(sender, recipient) cap is re-enforced HERE. That's what
makes the rule a NETWORK rule rather than a client-side honor
system: a hostile sender who patches out the local ``deposit``
check still can't get a 3rd unacked message to spread, because
every honest peer enforces the same cap on inbound replicas.
Result: hostile relays can hold extras locally, but those extras
never reach any node a legitimate recipient is polling from.
Returns the same shape as ``deposit`` so the calling endpoint can
forward the result back to the originating peer.
"""
if not isinstance(envelope, dict):
return {"ok": False, "detail": "envelope must be an object"}
msg_id = str(envelope.get("msg_id", "") or "").strip()
mailbox_key = str(envelope.get("mailbox_key", "") or "").strip()
sender_block_ref = str(envelope.get("sender_block_ref", "") or "").strip()
ciphertext = str(envelope.get("ciphertext", "") or "")
if not msg_id or not mailbox_key or not sender_block_ref or not ciphertext:
return {"ok": False, "detail": "envelope missing required fields"}
with self._lock:
self._refresh_from_shared_relay()
self._cleanup_expired()
# Idempotent — if we already hold this exact msg_id, the
# replication round-tripped or a peer pushed the same
# envelope through multiple paths. Accept silently.
if any(m.msg_id == msg_id for m in self._mailboxes.get(mailbox_key, [])):
metrics_inc("dm_replica_duplicate")
return {"ok": True, "msg_id": msg_id, "duplicate": True}
# Same per-class cap as the deposit path — defense in depth
# against a peer that wraps a "deposit" as a "replica" to
# bypass the class limit.
delivery_class = str(envelope.get("delivery_class", "") or "")
if delivery_class in ("request", "shared", "self"):
class_limit = self._mailbox_limit_for_class(delivery_class)
else:
class_limit = self._shared_mailbox_limit()
if len(self._mailboxes.get(mailbox_key, [])) >= class_limit:
metrics_inc("dm_replica_drop_full")
return {"ok": False, "detail": "Recipient mailbox full"}
# THE network rule: per-(sender, recipient) anti-spam cap.
per_sender_limit = self._per_sender_pending_limit()
pending = self._per_sender_pending_count(
mailbox_key=mailbox_key,
sender_block_ref=sender_block_ref,
)
if pending >= per_sender_limit:
metrics_inc("dm_replica_drop_per_sender_cap")
# Returning a structured rejection — the sender's relay
# learns its envelope was rejected by an honest peer and
# can stop trying to push it.
return {
"ok": False,
"detail": (
"Per-sender cap reached on this relay; refusing replica"
),
"cap_violation": True,
"pending": pending,
"limit": per_sender_limit,
}
# Accept the replica into the local mailbox.
self._mailboxes[mailbox_key].append(
DMMessage(
sender_id=str(envelope.get("sender_id", "") or ""),
ciphertext=ciphertext,
timestamp=float(envelope.get("timestamp", time.time()) or time.time()),
msg_id=msg_id,
delivery_class=str(envelope.get("delivery_class", "shared") or "shared"),
sender_seal=str(envelope.get("sender_seal", "") or ""),
relay_salt=str(envelope.get("relay_salt", "") or ""),
sender_block_ref=sender_block_ref,
payload_format=str(envelope.get("payload_format", "dm1") or "dm1"),
session_welcome=str(envelope.get("session_welcome", "") or ""),
)
)
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
self._save()
metrics_inc("dm_replica_accepted")
return {"ok": True, "msg_id": msg_id}
def _replicate_envelope_to_peers_async(
self,
*,
envelope: dict[str, Any],
) -> None:
"""Push an outbound DM envelope to every authenticated relay peer.
Fire-and-forget: spawned in a background thread so ``deposit``
returns to the caller immediately. Per-peer errors are logged
and swallowed — the sender's UX must not block on slow Tor
peers, and a peer that's down today gets the next message
whenever it comes back. Inbound recipient polling from a healthy
peer keeps the system functional during peer failures.
Each peer is authed with the existing per-peer HMAC pattern
(#256) — same headers and key resolver gate-message replication
uses, so a hostile node that doesn't know any peer's HMAC key
can't impersonate a legitimate relay.
"""
import threading
def _do_push():
try:
import hashlib
import hmac
import requests as _requests
from services.mesh.mesh_crypto import (
normalize_peer_url,
resolve_peer_key_for_url,
)
from services.mesh.mesh_router import (
authenticated_push_peer_urls,
)
peers = authenticated_push_peer_urls()
if not peers:
return
payload = json.dumps(
{"envelope": envelope},
separators=(",", ":"),
ensure_ascii=False,
).encode("utf-8")
timeout = max(
1,
int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10),
)
for peer_url in peers:
try:
normalized = normalize_peer_url(peer_url)
headers = {"Content-Type": "application/json"}
peer_key = resolve_peer_key_for_url(normalized)
if peer_key:
headers["X-Peer-Url"] = normalized
headers["X-Peer-HMAC"] = hmac.new(
peer_key, payload, hashlib.sha256
).hexdigest()
url = f"{peer_url}/api/mesh/dm/replicate-envelope"
resp = _requests.post(
url, data=payload, timeout=timeout, headers=headers,
)
if resp.status_code == 200:
metrics_inc("dm_replication_push_ok")
else:
# 4xx including the structured cap_violation
# rejection from accept_replica — sender's
# relay learns and stops retrying this msg_id.
metrics_inc("dm_replication_push_rejected")
except Exception:
# Per-peer failure is non-fatal — log to metrics
# but don't break the loop. Other peers and a
# future retry can still propagate the envelope.
metrics_inc("dm_replication_push_error")
continue
except Exception:
# Outer guard — never let replication errors propagate
# back to the sender's deposit() caller.
metrics_inc("dm_replication_push_error")
thread = threading.Thread(
target=_do_push,
name="dm-replicate-push",
daemon=True,
)
thread.start()
def envelope_for_replication(
self,
*,
mailbox_key: str,
msg_id: str,
) -> dict[str, Any] | None:
"""Return the wire-form envelope for a stored message, suitable
for POSTing to a peer relay's replicate-envelope endpoint.
Returns ``None`` if the message isn't in the mailbox (already
acked, expired, never existed). The caller holds the
responsibility for transport security (Tor SOCKS for .onion
peers, per-peer HMAC) and for not leaking the envelope to
clearnet peers when private transport is required.
"""
with self._lock:
for m in self._mailboxes.get(mailbox_key, []):
if m.msg_id == msg_id:
return {
"msg_id": m.msg_id,
"mailbox_key": mailbox_key,
"sender_id": m.sender_id,
"sender_block_ref": m.sender_block_ref,
"sender_seal": m.sender_seal,
"ciphertext": m.ciphertext,
"timestamp": m.timestamp,
"delivery_class": m.delivery_class,
"relay_salt": m.relay_salt,
"payload_format": m.payload_format,
"session_welcome": m.session_welcome,
}
return None
def is_blocked(self, recipient_id: str, sender_id: str) -> bool:
with self._lock:
self._refresh_from_shared_relay()
@@ -0,0 +1,270 @@
"""Per-(sender, recipient) anti-spam cap on the DM relay.
The user-stated rule: a single sender can have at most N UNACKED messages
parked in a single recipient's mailbox at any one time (N=2 by default).
Once the recipient pulls a message, the sender's quota for that pair
frees up.
Network rule, not local rule
-----------------------------
The cap is enforced TWICE:
1. ``DMRelay.deposit(...)`` -- local check on the sender's own node.
Refuses to spool the (N+1)th message before it can be replicated.
2. ``DMRelay.accept_replica(...)`` -- replication-acceptance check on
every receiving peer. Refuses to accept an inbound replica that
would put the local mailbox over the cap, even if the originating
peer claims it had cap room.
The double enforcement matters because cap (1) is client-side -- a
hostile relay could patch it out and continue to spool extras locally.
Cap (2) means those extras can't propagate: every honest peer rejects
them on the way in. A recipient who polls from honest peers therefore
never sees more than N pending from any one sender, regardless of how
many spam attempts the sender's own relay accepted.
These tests pin both halves of the rule.
"""
from __future__ import annotations
import time
import pytest
@pytest.fixture
def relay():
"""Fresh ``DMRelay`` per test."""
from services.mesh.mesh_dm_relay import DMRelay
r = DMRelay()
r._mailboxes.clear()
r._blocks.clear()
r._stats = {"messages_in_memory": 0}
return r
def _deposit(
relay,
*,
sender: str = "alice",
recipient_token: str = "bob_mailbox_token_abc",
ciphertext: str = "ciphertext-blob",
msg_id: str = "",
):
"""Convenience wrapper using ``shared`` delivery class."""
return relay.deposit(
sender_id=sender,
raw_sender_id=sender,
recipient_id="bob",
ciphertext=ciphertext,
msg_id=msg_id,
delivery_class="shared",
recipient_token=recipient_token,
)
# ---------------------------------------------------------------------------
# Local cap on ``deposit``
# ---------------------------------------------------------------------------
class TestDepositCap:
def test_two_deposits_from_same_sender_succeed(self, relay):
r1 = _deposit(relay)
r2 = _deposit(relay)
assert r1["ok"] is True
assert r2["ok"] is True
assert r1["msg_id"] != r2["msg_id"]
def test_third_deposit_from_same_sender_rejected(self, relay):
_deposit(relay)
_deposit(relay)
r3 = _deposit(relay)
assert r3["ok"] is False
detail = r3["detail"].lower()
assert "unread" in detail or "read your messages" in detail
def test_different_senders_have_independent_quotas(self, relay):
for _ in range(2):
assert _deposit(relay, sender="alice")["ok"] is True
for _ in range(2):
assert _deposit(relay, sender="carol")["ok"] is True
assert _deposit(relay, sender="carol")["ok"] is False
def test_different_recipients_have_independent_quotas(self, relay):
for _ in range(2):
assert _deposit(relay, sender="alice", recipient_token="bob_token")["ok"] is True
for _ in range(2):
assert _deposit(relay, sender="alice", recipient_token="dave_token")["ok"] is True
def test_ack_frees_quota(self, relay):
r1 = _deposit(relay)
_deposit(relay)
assert _deposit(relay)["ok"] is False
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
relay._mailboxes[mailbox_key] = [
m for m in relay._mailboxes[mailbox_key]
if m.msg_id != r1["msg_id"]
]
relay._stats["messages_in_memory"] = sum(
len(v) for v in relay._mailboxes.values()
)
r3 = _deposit(relay)
assert r3["ok"] is True, f"expected quota free after ack, got: {r3}"
def test_cap_is_env_tunable(self, relay, monkeypatch):
import services.mesh.mesh_dm_relay as mdr
monkeypatch.setattr(
mdr.DMRelay,
"_per_sender_pending_limit",
lambda self: 1,
)
assert _deposit(relay)["ok"] is True
assert _deposit(relay)["ok"] is False
# ---------------------------------------------------------------------------
# Replication-acceptance cap (the half that makes this a network rule)
# ---------------------------------------------------------------------------
class TestAcceptReplicaCap:
def _envelope(self, *, msg_id: str, sender_block_ref: str, mailbox_key: str):
return {
"msg_id": msg_id,
"mailbox_key": mailbox_key,
"sender_block_ref": sender_block_ref,
"sender_id": "alice",
"sender_seal": "",
"ciphertext": f"ciphertext-{msg_id}",
"timestamp": time.time(),
"delivery_class": "shared",
"relay_salt": "",
"payload_format": "dm1",
"session_welcome": "",
}
def test_replica_accepted_under_cap(self, relay):
env = self._envelope(
msg_id="dm_replica_1",
sender_block_ref="alice_block_ref",
mailbox_key="mailbox_xyz",
)
result = relay.accept_replica(envelope=env)
assert result["ok"] is True
def test_replica_idempotent_on_duplicate_msg_id(self, relay):
mailbox_key = "mailbox_xyz"
env = self._envelope(
msg_id="dm_dup_1",
sender_block_ref="alice_block_ref",
mailbox_key=mailbox_key,
)
r1 = relay.accept_replica(envelope=env)
r2 = relay.accept_replica(envelope=env)
assert r1["ok"] is True
assert r2["ok"] is True
assert r2.get("duplicate") is True
assert len(relay._mailboxes[mailbox_key]) == 1
def test_replica_rejected_when_local_count_already_at_cap(self, relay):
mailbox_key = "mailbox_xyz"
for i in (1, 2):
relay.accept_replica(envelope=self._envelope(
msg_id=f"dm_seeded_{i}",
sender_block_ref="alice_block_ref",
mailbox_key=mailbox_key,
))
result = relay.accept_replica(envelope=self._envelope(
msg_id="dm_overcap_3",
sender_block_ref="alice_block_ref",
mailbox_key=mailbox_key,
))
assert result["ok"] is False
assert result.get("cap_violation") is True
assert result.get("pending") == 2
assert result.get("limit") == 2
assert len(relay._mailboxes[mailbox_key]) == 2
def test_replica_from_different_sender_passes_when_one_is_at_cap(self, relay):
mailbox_key = "mailbox_xyz"
for i in (1, 2):
relay.accept_replica(envelope=self._envelope(
msg_id=f"dm_alice_{i}",
sender_block_ref="alice_block_ref",
mailbox_key=mailbox_key,
))
assert relay.accept_replica(envelope=self._envelope(
msg_id="dm_alice_3",
sender_block_ref="alice_block_ref",
mailbox_key=mailbox_key,
))["ok"] is False
assert relay.accept_replica(envelope=self._envelope(
msg_id="dm_carol_1",
sender_block_ref="carol_block_ref",
mailbox_key=mailbox_key,
))["ok"] is True
def test_replica_rejects_malformed_envelopes(self, relay):
for bad in (
{},
{"msg_id": "x"},
{"msg_id": "x", "mailbox_key": "y"},
"not an object at all",
):
result = relay.accept_replica(envelope=bad)
assert result["ok"] is False
# ---------------------------------------------------------------------------
# ``envelope_for_replication`` -- helper for the outbound replication path
# ---------------------------------------------------------------------------
class TestEnvelopeForReplication:
def test_returns_envelope_for_stored_message(self, relay):
r = _deposit(relay, ciphertext="hello-ciphertext")
msg_id = r["msg_id"]
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
env = relay.envelope_for_replication(mailbox_key=mailbox_key, msg_id=msg_id)
assert env is not None
assert env["msg_id"] == msg_id
assert env["mailbox_key"] == mailbox_key
assert env["ciphertext"] == "hello-ciphertext"
assert env["delivery_class"] == "shared"
for k in ("msg_id", "mailbox_key", "sender_block_ref", "ciphertext"):
assert env.get(k), f"envelope missing required field {k!r}"
def test_returns_none_for_unknown_message(self, relay):
env = relay.envelope_for_replication(
mailbox_key="never_existed", msg_id="never_existed",
)
assert env is None
def test_envelope_round_trips_through_accept_replica(self, relay):
from services.mesh.mesh_dm_relay import DMRelay
receiver_relay = DMRelay()
receiver_relay._mailboxes.clear()
receiver_relay._stats = {"messages_in_memory": 0}
r = _deposit(relay)
msg_id = r["msg_id"]
mailbox_key = relay._hashed_mailbox_token("bob_mailbox_token_abc")
env = relay.envelope_for_replication(
mailbox_key=mailbox_key, msg_id=msg_id,
)
assert env is not None
result = receiver_relay.accept_replica(envelope=env)
assert result["ok"] is True
stored = receiver_relay._mailboxes.get(mailbox_key, [])
assert len(stored) == 1
assert stored[0].msg_id == msg_id
assert stored[0].ciphertext == "ciphertext-blob"
@@ -0,0 +1,150 @@
"""POST /api/mesh/dm/replicate-envelope — receiving side of cross-node DM
mailbox replication.
This is the endpoint that peer relays call when they want to hand off an
encrypted DM envelope to us (so the recipient can log into our node and
find their messages). It re-enforces the per-(sender, recipient) anti-spam
cap so hostile sender relays can't widen the cap by skipping the local
check on their own deposit path.
The endpoint:
* authenticates the caller via the existing per-peer HMAC pattern
(same one /api/mesh/infonet/peer-push and /api/mesh/gate/peer-push
use, introduced in #256 — ``X-Peer-Url`` + ``X-Peer-HMAC`` headers
keyed off ``resolve_peer_key_for_url``)
* rejects bodies > 64 KB (DM envelope size is bounded by
``MESH_DM_MAX_MSG_BYTES`` — 64KB ceiling has generous headroom)
* rejects requests without a valid peer HMAC with 403
* passes the envelope to ``DMRelay.accept_replica`` which enforces
the cap
This file pins the endpoint contract. The cap enforcement itself is
tested in ``test_dm_relay_per_sender_cap.py`` against the relay's
``accept_replica`` method directly.
"""
from __future__ import annotations
import asyncio
import hashlib
import hmac
import json
import pytest
from httpx import ASGITransport, AsyncClient
@pytest.fixture
def remote_client():
"""ASGI client with peer IP 1.2.3.4 — never on the local-operator
allowlist. Used to prove the endpoint isn't accidentally reachable
by random remote callers without peer HMAC."""
from main import app
class _RemoteClient:
def __init__(self):
self._loop = asyncio.new_event_loop()
self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345))
self._base = "http://1.2.3.4:8000"
def post(self, url, **kw):
async def go():
async with AsyncClient(transport=self._transport, base_url=self._base) as ac:
return await ac.post(url, **kw)
return self._loop.run_until_complete(go())
def close(self):
self._loop.close()
c = _RemoteClient()
yield c
c.close()
class TestReplicateEndpointAuth:
def test_rejects_request_without_peer_hmac(self, remote_client):
"""A peer push that does NOT carry X-Peer-Url + X-Peer-HMAC
must be rejected with 403 before the envelope is ever passed
to the relay. Same gate the existing infonet/gate peer-push
endpoints enforce."""
payload = {
"envelope": {
"msg_id": "dm_unauth_1",
"mailbox_key": "mb",
"sender_block_ref": "sender",
"ciphertext": "x",
},
}
r = remote_client.post(
"/api/mesh/dm/replicate-envelope",
json=payload,
)
assert r.status_code == 403
assert "peer HMAC" in r.text or "peer hmac" in r.text.lower()
def test_rejects_wrong_peer_hmac(self, remote_client, monkeypatch):
"""A request with a peer HMAC header keyed off the WRONG secret
is rejected. Confirms the HMAC is actually verified — a tampered
body or a key-substitution attack doesn't sneak through."""
# Plant a known peer secret. The request will sign with a
# DIFFERENT key, so verification must fail.
from services.config import get_settings
monkeypatch.setenv("MESH_PEER_PUSH_SECRET", "real-secret-32-chars-min-padding-padding")
get_settings.cache_clear()
body = json.dumps({
"envelope": {
"msg_id": "dm_wronghmac",
"mailbox_key": "mb",
"sender_block_ref": "sender",
"ciphertext": "x",
},
}).encode("utf-8")
wrong_hmac = hmac.new(b"wrong-key", body, hashlib.sha256).hexdigest()
r = remote_client.post(
"/api/mesh/dm/replicate-envelope",
content=body,
headers={
"Content-Type": "application/json",
"X-Peer-Url": "http://example-peer.onion:8000",
"X-Peer-HMAC": wrong_hmac,
},
)
assert r.status_code == 403
def test_rejects_oversize_body(self, remote_client):
"""64 KB ceiling — anything bigger doesn't even get parsed.
Defends against memory amplification via giant ciphertexts."""
# 100 KB body is well over the 64 KB cap.
big = b"{" + b"x" * 100_000 + b"}"
r = remote_client.post(
"/api/mesh/dm/replicate-envelope",
content=big,
headers={
"Content-Type": "application/json",
"Content-Length": str(len(big)),
},
)
assert r.status_code in (400, 413), (
f"oversize body should be rejected with 400/413, got {r.status_code}"
)
class TestReplicateEndpointRegistered:
def test_route_present_in_app(self):
"""Static check that the route is actually wired into the app.
Catches a future refactor that drops the router include or
deletes the endpoint by accident."""
from main import app
paths_methods = set()
for route in app.routes:
path = getattr(route, "path", None)
methods = getattr(route, "methods", set()) or set()
for m in methods:
paths_methods.add((m, path))
assert ("POST", "/api/mesh/dm/replicate-envelope") in paths_methods, (
"POST /api/mesh/dm/replicate-envelope is not registered on the app"
)