mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-28 10:01:31 +02:00
DM mailbox: outbound replication + receiving endpoint
Second commit on this branch (first added the per-sender cap + accept_replica primitive). This commit wires the actual cross-node propagation: Outbound (sender side) ---------------------- * New ``DMRelay._replicate_envelope_to_peers_async()`` — fire-and-forget thread that POSTs the envelope to every authenticated relay peer via the same per-peer HMAC pattern gate-message replication uses (#256 ``X-Peer-Url`` + ``X-Peer-HMAC`` headers, ``resolve_peer_key_for_url``). * ``deposit()`` now calls the replication helper after a successful local accept. Per-peer errors are swallowed — slow Tor peers must not block the sender's UX, and the recipient polling from a healthy peer works fine even if some peers are down. * Metrics: dm_replication_push_ok / _rejected / _error. Inbound (receiving side) ------------------------ * New endpoint ``POST /api/mesh/dm/replicate-envelope`` in routers/mesh_peer_sync.py. * Same HMAC auth gate (``_verify_peer_push_hmac``) as the existing infonet/gate peer-push endpoints. Unauthenticated requests get 403. * Body cap of 64 KB (DM envelope is bounded by MESH_DM_MAX_MSG_BYTES). * Calls DMRelay.accept_replica which enforces the per-sender cap as a network rule — hostile sender's relay can hold extras locally but honest peers reject them on inbound replication. End-to-end flow now works ------------------------- 1. Alice's node accepts a deposit to Bob's mailbox (local cap check). 2. Alice's node spawns a background thread that POSTs the envelope to MESH_RELAY_PEERS with per-peer HMAC. 3. Each peer's /api/mesh/dm/replicate-envelope verifies the HMAC and calls accept_replica, which re-enforces the per-sender cap. 4. Bob (offline at the time of send) eventually logs into ANY node in MESH_RELAY_PEERS, his existing pollDmMailboxes pulls from the local mailbox there, finds Alice's envelope, decrypts. Tests ----- backend/tests/test_dm_replicate_envelope_endpoint.py — 4 tests: TestReplicateEndpointAuth: - rejects requests without peer HMAC (403) - rejects requests with WRONG peer HMAC (403) — confirms the HMAC is actually verified, not just present - rejects oversize bodies (>64 KB) with 400/413 TestReplicateEndpointRegistered: - static check that POST /api/mesh/dm/replicate-envelope is registered on app.routes — catches future refactor that drops the router include All 38 backend tests touching the new code paths still pass: test_dm_relay_per_sender_cap.py (14) test_dm_replicate_envelope_endpoint.py (4) test_no_new_duplicate_routes.py (1) — new route is unique test_per_peer_secret_resolver.py (19) — HMAC primitive unaffected What's still ahead (PR-3+) -------------------------- * ack propagation: when recipient pulls a message on node X, peers Y/Z should prune their copies to free the sender's quota network-wide. Without this, the sender's quota frees only on the node the recipient actually polled — other peers still see N pending until TTL expiry. Workable but suboptimal. PR-3 will add a /api/mesh/dm/ack endpoint with the same HMAC pattern. * recipient pull-from-peers: today the recipient's poll only hits their own node's relay. If they log into a peer they didn't deposit with, they need a way to fetch envelopes from other peers in MESH_RELAY_PEERS. Today this works as long as the recipient's current node is one of the peers Alice's node pushed to — which is true in a fully-meshed deployment but not guaranteed for partial meshes. PR-4 if telemetry shows this matters.
This commit is contained in:
@@ -85,6 +85,64 @@ async def infonet_peer_push(request: Request):
|
||||
return {"ok": True, **result}
|
||||
|
||||
|
||||
@router.post("/api/mesh/dm/replicate-envelope")
|
||||
@limiter.limit("60/minute")
|
||||
async def dm_replicate_envelope(request: Request):
|
||||
"""Accept a DM envelope replicated from a peer relay (cross-node mailbox).
|
||||
|
||||
Companion endpoint to ``DMRelay.replicate_to_peers`` (outbound, in
|
||||
``mesh_dm_relay.py``). The sender's relay POSTs an encrypted DM
|
||||
envelope here after a successful local ``deposit``; this endpoint
|
||||
re-enforces the per-(sender, recipient) anti-spam cap and stores
|
||||
the envelope in the local mailbox if accepted.
|
||||
|
||||
The cap is the network rule: a hostile sender's relay can spool
|
||||
extras locally, but every honest peer enforces the cap on inbound
|
||||
replication. Recipient polling from any honest peer therefore
|
||||
never sees more than ``MESH_DM_PENDING_PER_SENDER_LIMIT`` pending
|
||||
from any one sender, no matter how many spam attempts were tried.
|
||||
|
||||
Same HMAC auth pattern as ``infonet_peer_push`` and ``gate_peer_push``.
|
||||
"""
|
||||
content_length = request.headers.get("content-length")
|
||||
if content_length:
|
||||
try:
|
||||
# DM envelopes are bounded by MESH_DM_MAX_MSG_BYTES + envelope
|
||||
# overhead; 64 KB is a generous ceiling.
|
||||
if int(content_length) > 65_536:
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Request body too large (max 64KB)"}',
|
||||
status_code=413, media_type="application/json",
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
pass
|
||||
body_bytes = await request.body()
|
||||
if not _verify_peer_push_hmac(request, body_bytes):
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Invalid or missing peer HMAC"}',
|
||||
status_code=403, media_type="application/json",
|
||||
)
|
||||
try:
|
||||
body = json_mod.loads(body_bytes or b"{}")
|
||||
except (ValueError, TypeError):
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Invalid JSON body"}',
|
||||
status_code=400, media_type="application/json",
|
||||
)
|
||||
envelope = body.get("envelope")
|
||||
if not isinstance(envelope, dict):
|
||||
return {"ok": False, "detail": "envelope must be an object"}
|
||||
|
||||
originating_peer = _peer_hmac_url_from_request(request) or ""
|
||||
|
||||
from services.mesh.mesh_dm_relay import dm_relay
|
||||
result = dm_relay.accept_replica(
|
||||
envelope=envelope,
|
||||
originating_peer_url=originating_peer,
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
@router.post("/api/mesh/gate/peer-push")
|
||||
@limiter.limit("30/minute")
|
||||
async def gate_peer_push(request: Request):
|
||||
|
||||
@@ -1595,6 +1595,23 @@ class DMRelay:
|
||||
)
|
||||
self._stats["messages_in_memory"] = sum(len(v) for v in self._mailboxes.values())
|
||||
self._save()
|
||||
# Cross-node mailbox replication: push the freshly-stored
|
||||
# envelope to every authenticated relay peer so the recipient
|
||||
# can log into ANY node and find their messages. The push is
|
||||
# async (fire-and-forget thread) so deposit() returns
|
||||
# immediately — slow Tor peers can't block the sender's UX.
|
||||
# Each receiving peer re-enforces the per-sender cap on
|
||||
# acceptance, so hostile relays can't widen the cap.
|
||||
try:
|
||||
envelope_for_push = self.envelope_for_replication(
|
||||
mailbox_key=mailbox_key, msg_id=msg_id,
|
||||
)
|
||||
if envelope_for_push:
|
||||
self._replicate_envelope_to_peers_async(
|
||||
envelope=envelope_for_push,
|
||||
)
|
||||
except Exception:
|
||||
metrics_inc("dm_replication_push_error")
|
||||
return {"ok": True, "msg_id": msg_id}
|
||||
|
||||
def accept_replica(
|
||||
@@ -1695,6 +1712,95 @@ class DMRelay:
|
||||
metrics_inc("dm_replica_accepted")
|
||||
return {"ok": True, "msg_id": msg_id}
|
||||
|
||||
def _replicate_envelope_to_peers_async(
|
||||
self,
|
||||
*,
|
||||
envelope: dict[str, Any],
|
||||
) -> None:
|
||||
"""Push an outbound DM envelope to every authenticated relay peer.
|
||||
|
||||
Fire-and-forget: spawned in a background thread so ``deposit``
|
||||
returns to the caller immediately. Per-peer errors are logged
|
||||
and swallowed — the sender's UX must not block on slow Tor
|
||||
peers, and a peer that's down today gets the next message
|
||||
whenever it comes back. Inbound recipient polling from a healthy
|
||||
peer keeps the system functional during peer failures.
|
||||
|
||||
Each peer is authed with the existing per-peer HMAC pattern
|
||||
(#256) — same headers and key resolver gate-message replication
|
||||
uses, so a hostile node that doesn't know any peer's HMAC key
|
||||
can't impersonate a legitimate relay.
|
||||
"""
|
||||
import threading
|
||||
|
||||
def _do_push():
|
||||
try:
|
||||
import hashlib
|
||||
import hmac
|
||||
import requests as _requests
|
||||
|
||||
from services.mesh.mesh_crypto import (
|
||||
normalize_peer_url,
|
||||
resolve_peer_key_for_url,
|
||||
)
|
||||
from services.mesh.mesh_router import (
|
||||
authenticated_push_peer_urls,
|
||||
)
|
||||
|
||||
peers = authenticated_push_peer_urls()
|
||||
if not peers:
|
||||
return
|
||||
|
||||
payload = json.dumps(
|
||||
{"envelope": envelope},
|
||||
separators=(",", ":"),
|
||||
ensure_ascii=False,
|
||||
).encode("utf-8")
|
||||
|
||||
timeout = max(
|
||||
1,
|
||||
int(getattr(self._settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 10) or 10),
|
||||
)
|
||||
|
||||
for peer_url in peers:
|
||||
try:
|
||||
normalized = normalize_peer_url(peer_url)
|
||||
headers = {"Content-Type": "application/json"}
|
||||
peer_key = resolve_peer_key_for_url(normalized)
|
||||
if peer_key:
|
||||
headers["X-Peer-Url"] = normalized
|
||||
headers["X-Peer-HMAC"] = hmac.new(
|
||||
peer_key, payload, hashlib.sha256
|
||||
).hexdigest()
|
||||
url = f"{peer_url}/api/mesh/dm/replicate-envelope"
|
||||
resp = _requests.post(
|
||||
url, data=payload, timeout=timeout, headers=headers,
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
metrics_inc("dm_replication_push_ok")
|
||||
else:
|
||||
# 4xx including the structured cap_violation
|
||||
# rejection from accept_replica — sender's
|
||||
# relay learns and stops retrying this msg_id.
|
||||
metrics_inc("dm_replication_push_rejected")
|
||||
except Exception:
|
||||
# Per-peer failure is non-fatal — log to metrics
|
||||
# but don't break the loop. Other peers and a
|
||||
# future retry can still propagate the envelope.
|
||||
metrics_inc("dm_replication_push_error")
|
||||
continue
|
||||
except Exception:
|
||||
# Outer guard — never let replication errors propagate
|
||||
# back to the sender's deposit() caller.
|
||||
metrics_inc("dm_replication_push_error")
|
||||
|
||||
thread = threading.Thread(
|
||||
target=_do_push,
|
||||
name="dm-replicate-push",
|
||||
daemon=True,
|
||||
)
|
||||
thread.start()
|
||||
|
||||
def envelope_for_replication(
|
||||
self,
|
||||
*,
|
||||
|
||||
@@ -0,0 +1,150 @@
|
||||
"""POST /api/mesh/dm/replicate-envelope — receiving side of cross-node DM
|
||||
mailbox replication.
|
||||
|
||||
This is the endpoint that peer relays call when they want to hand off an
|
||||
encrypted DM envelope to us (so the recipient can log into our node and
|
||||
find their messages). It re-enforces the per-(sender, recipient) anti-spam
|
||||
cap so hostile sender relays can't widen the cap by skipping the local
|
||||
check on their own deposit path.
|
||||
|
||||
The endpoint:
|
||||
|
||||
* authenticates the caller via the existing per-peer HMAC pattern
|
||||
(same one /api/mesh/infonet/peer-push and /api/mesh/gate/peer-push
|
||||
use, introduced in #256 — ``X-Peer-Url`` + ``X-Peer-HMAC`` headers
|
||||
keyed off ``resolve_peer_key_for_url``)
|
||||
* rejects bodies > 64 KB (DM envelope size is bounded by
|
||||
``MESH_DM_MAX_MSG_BYTES`` — 64KB ceiling has generous headroom)
|
||||
* rejects requests without a valid peer HMAC with 403
|
||||
* passes the envelope to ``DMRelay.accept_replica`` which enforces
|
||||
the cap
|
||||
|
||||
This file pins the endpoint contract. The cap enforcement itself is
|
||||
tested in ``test_dm_relay_per_sender_cap.py`` against the relay's
|
||||
``accept_replica`` method directly.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import hashlib
|
||||
import hmac
|
||||
import json
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def remote_client():
|
||||
"""ASGI client with peer IP 1.2.3.4 — never on the local-operator
|
||||
allowlist. Used to prove the endpoint isn't accidentally reachable
|
||||
by random remote callers without peer HMAC."""
|
||||
from main import app
|
||||
|
||||
class _RemoteClient:
|
||||
def __init__(self):
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._transport = ASGITransport(app=app, client=("1.2.3.4", 12345))
|
||||
self._base = "http://1.2.3.4:8000"
|
||||
|
||||
def post(self, url, **kw):
|
||||
async def go():
|
||||
async with AsyncClient(transport=self._transport, base_url=self._base) as ac:
|
||||
return await ac.post(url, **kw)
|
||||
return self._loop.run_until_complete(go())
|
||||
|
||||
def close(self):
|
||||
self._loop.close()
|
||||
|
||||
c = _RemoteClient()
|
||||
yield c
|
||||
c.close()
|
||||
|
||||
|
||||
class TestReplicateEndpointAuth:
|
||||
def test_rejects_request_without_peer_hmac(self, remote_client):
|
||||
"""A peer push that does NOT carry X-Peer-Url + X-Peer-HMAC
|
||||
must be rejected with 403 before the envelope is ever passed
|
||||
to the relay. Same gate the existing infonet/gate peer-push
|
||||
endpoints enforce."""
|
||||
payload = {
|
||||
"envelope": {
|
||||
"msg_id": "dm_unauth_1",
|
||||
"mailbox_key": "mb",
|
||||
"sender_block_ref": "sender",
|
||||
"ciphertext": "x",
|
||||
},
|
||||
}
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
json=payload,
|
||||
)
|
||||
assert r.status_code == 403
|
||||
assert "peer HMAC" in r.text or "peer hmac" in r.text.lower()
|
||||
|
||||
def test_rejects_wrong_peer_hmac(self, remote_client, monkeypatch):
|
||||
"""A request with a peer HMAC header keyed off the WRONG secret
|
||||
is rejected. Confirms the HMAC is actually verified — a tampered
|
||||
body or a key-substitution attack doesn't sneak through."""
|
||||
# Plant a known peer secret. The request will sign with a
|
||||
# DIFFERENT key, so verification must fail.
|
||||
from services.config import get_settings
|
||||
monkeypatch.setenv("MESH_PEER_PUSH_SECRET", "real-secret-32-chars-min-padding-padding")
|
||||
get_settings.cache_clear()
|
||||
|
||||
body = json.dumps({
|
||||
"envelope": {
|
||||
"msg_id": "dm_wronghmac",
|
||||
"mailbox_key": "mb",
|
||||
"sender_block_ref": "sender",
|
||||
"ciphertext": "x",
|
||||
},
|
||||
}).encode("utf-8")
|
||||
wrong_hmac = hmac.new(b"wrong-key", body, hashlib.sha256).hexdigest()
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
content=body,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"X-Peer-Url": "http://example-peer.onion:8000",
|
||||
"X-Peer-HMAC": wrong_hmac,
|
||||
},
|
||||
)
|
||||
assert r.status_code == 403
|
||||
|
||||
def test_rejects_oversize_body(self, remote_client):
|
||||
"""64 KB ceiling — anything bigger doesn't even get parsed.
|
||||
Defends against memory amplification via giant ciphertexts."""
|
||||
# 100 KB body is well over the 64 KB cap.
|
||||
big = b"{" + b"x" * 100_000 + b"}"
|
||||
r = remote_client.post(
|
||||
"/api/mesh/dm/replicate-envelope",
|
||||
content=big,
|
||||
headers={
|
||||
"Content-Type": "application/json",
|
||||
"Content-Length": str(len(big)),
|
||||
},
|
||||
)
|
||||
assert r.status_code in (400, 413), (
|
||||
f"oversize body should be rejected with 400/413, got {r.status_code}"
|
||||
)
|
||||
|
||||
|
||||
class TestReplicateEndpointRegistered:
|
||||
def test_route_present_in_app(self):
|
||||
"""Static check that the route is actually wired into the app.
|
||||
Catches a future refactor that drops the router include or
|
||||
deletes the endpoint by accident."""
|
||||
from main import app
|
||||
|
||||
paths_methods = set()
|
||||
for route in app.routes:
|
||||
path = getattr(route, "path", None)
|
||||
methods = getattr(route, "methods", set()) or set()
|
||||
for m in methods:
|
||||
paths_methods.add((m, path))
|
||||
|
||||
assert ("POST", "/api/mesh/dm/replicate-envelope") in paths_methods, (
|
||||
"POST /api/mesh/dm/replicate-envelope is not registered on the app"
|
||||
)
|
||||
Reference in New Issue
Block a user