Files
Shadowbroker/backend/services/mesh/mesh_schema.py
T
Shadowbroker 1d7fa5185a feat(infonet): private gate + DM hashchain spool with hardened propagation (#326)
Private gate messages and offline DMs now ride the Infonet hashchain
as ciphertext-only events, replicated across nodes via private
transports (Tor onion / RNS / loopback) and decrypted only by parties
holding the gate or recipient keys.

Hashchain core (mesh_hashchain.py)
----------------------------------

* New ``append_private_gate_message`` and ``append_private_dm_message``
  append paths with full signature verification, public-key binding,
  revocation check, and replay protection in a dedicated sequence
  domain (so a gate post does not consume the author's public broadcast
  sequence, and a DM cannot replay-block a public message at sequence=1).
* Fork validation and full-chain validation now accept the gate
  signature compatibility variants — older signatures that canonicalize
  with/without epoch or reply_to still verify, so a re-sync from an
  older peer doesn't reject still-valid history.
* DM hashchain spool: capped at 2 active sealed offline DMs per
  recipient mailbox, plus a per-(sender, recipient) cap so one prolific
  sender can't consume both slots. 1-hour TTL on the cap counter.
  Spool intentionally small — it's an offline bootstrap channel,
  not a persistent mailbox.
* Rebuild-state preserves the gate sequence domain across reloads so
  a chain reload doesn't accidentally let an old gate sequence
  replay-collide on next append.

Schema enforcement (mesh_schema.py)
-----------------------------------

* Private gate + DM payloads have closed allowlists of fields.
  Plaintext keys (``message``, ``plaintext``, ``_local_plaintext``,
  ``_local_reply_to``) are explicit rejection-bait — they raise before
  the event ever touches the chain.
* DM ciphertext + nonce must look like base64-ish sealed bytes;
  obvious base64-encoded plaintext shapes are rejected.
* ``transport_lock`` required: DM hashchain spool requires
  ``private_strong``; gate accepts ``private``/``private_strong``/
  ``rns``/``onion``.

Defense-in-depth at the network layer (main.py + mesh_public.py)
----------------------------------------------------------------

* ``_infonet_sync_response_events`` now silently redacts private events
  (gate_message + dm_message) unless the request looks like a loopback /
  onion / RNS / private transport caller. If an operator accidentally
  exposes :8000 to the public internet, an external puller gets
  public events only — never ciphertext.
* ``_sync_from_peer`` raises ``PeerSyncRateLimited`` for 429 (handled
  as 4-tuple return with retry_after_s) and ``PeerSyncHTTPError`` for
  other non-200 statuses (handled by ``_run_public_sync_cycle`` to
  honor server cooldown hints even outside the 429 path).

DM relay hydration (main.py)
-----------------------------

* New ``_hydrate_dm_relay_from_chain``: when accepted dm_message chain
  events arrive on a node, they get deposited into the local DM relay
  store with a deterministic sender_token_hash so re-sync of the same
  event is idempotent. Recipients see the ciphertext as a normal DM
  on their next poll and decrypt with their existing recipient key.

Other surfaces
--------------

* meshnode.bat / meshnode.sh now set ``MESH_INFONET_ALLOW_CLEARNET_SYNC=
  false`` and the participant runtime flags by default so a freshly
  spun-up node defaults to private-only sync.
* InfonetTerminal/InfonetShell.tsx adds a gate directory renderer for
  the new private-gate workflow.
* docker-compose.relay.yml binds the relay backend to 127.0.0.1:8000
  only; Tor's hidden service forwards onion traffic into 127.0.0.1.
  Public clearnet :8000 stays off the network edge.

Tests
-----

* 7 new tests in test_private_gate_hashchain.py + test_private_dm_
  hashchain.py covering: gate fork accepts ciphertext propagation,
  gate fork rejects plaintext, append rejects plaintext before
  normalize, append requires private_strong, append rejects
  non-sealed ciphertext shape, DM spool 2-per-recipient + 1-per-pair
  cap, DM hydration delivers to poll/claim.
* Updated test_mesh_node_bootstrap_runtime.py covers 429 backoff via
  PeerSyncRateLimited 4-tuple AND PeerSyncHTTPError exception.
* Updated test_s14b_public_sync_gate_filter.py + test_s9b_gate_store_
  hydration.py + test_gate_write_cutover.py cover the new private
  redaction on public sync responses.
* test_private_gate_hashchain.py + test_private_dm_hashchain.py:
  10 passed locally.
* Combined mesh-relevant suite (the 5 modified existing tests +
  2 new): 17 passed.

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 21:25:18 -06:00

594 lines
20 KiB
Python

"""Central schema registry for mesh protocol events."""
from __future__ import annotations
import base64
import binascii
import math
from dataclasses import dataclass
from typing import Any, Callable
from services.mesh.mesh_protocol import normalize_payload, PROTOCOL_VERSION, NETWORK_ID
def _safe_int(val, default=0):
try:
return int(val)
except (TypeError, ValueError):
return default
@dataclass(frozen=True)
class EventSchema:
event_type: str
required_fields: tuple[str, ...]
optional_fields: tuple[str, ...]
validate: Callable[[dict[str, Any]], tuple[bool, str]]
def validate_payload(self, payload: dict[str, Any]) -> tuple[bool, str]:
return self.validate(payload)
def _require_fields(payload: dict[str, Any], fields: tuple[str, ...]) -> tuple[bool, str]:
for key in fields:
if key not in payload:
return False, f"Missing field: {key}"
return True, "ok"
def _decode_base64ish(value: Any) -> bytes | None:
raw = str(value or "").strip()
if not raw or any(ch.isspace() for ch in raw):
return None
padded = raw + ("=" * (-len(raw) % 4))
for altchars in (None, b"-_"):
try:
return base64.b64decode(padded.encode("ascii"), altchars=altchars, validate=True)
except (binascii.Error, UnicodeEncodeError, ValueError):
continue
return None
def _byte_entropy(data: bytes) -> float:
if not data:
return 0.0
counts = [0] * 256
for byte in data:
counts[byte] += 1
total = float(len(data))
return -sum((count / total) * math.log2(count / total) for count in counts if count)
def _validate_sealed_bytes_field(
payload: dict[str, Any],
field: str,
*,
min_bytes: int = 8,
entropy_floor: float = 2.5,
) -> tuple[bool, str]:
data = _decode_base64ish(payload.get(field, ""))
if data is None:
return False, f"{field} must be base64-encoded sealed bytes"
if len(data) < min_bytes:
return False, f"{field} is too short"
# Short test vectors and compact envelopes can be low entropy; only apply
# heuristics once there is enough material to distinguish a sealed blob
# from accidental base64-encoded plaintext.
if len(data) >= 32:
printable = sum(1 for byte in data if 32 <= byte <= 126 or byte in (9, 10, 13))
if printable / len(data) > 0.9:
try:
data.decode("utf-8")
return False, f"{field} looks like encoded plaintext"
except UnicodeDecodeError:
pass
if _byte_entropy(data) < entropy_floor:
return False, f"{field} entropy is too low for sealed bytes"
return True, "ok"
def _validate_message(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(
payload, ("message", "destination", "channel", "priority", "ephemeral")
)
if not ok:
return ok, reason
if payload.get("priority") not in ("normal", "high", "emergency", "low"):
return False, "Invalid priority"
if not isinstance(payload.get("ephemeral"), bool):
return False, "ephemeral must be boolean"
return True, "ok"
def _validate_gate_message(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("gate", "ciphertext", "nonce", "sender_ref"))
if not ok:
return ok, reason
if "message" in payload:
return False, "plaintext gate message field is not allowed"
gate = str(payload.get("gate", "")).strip().lower()
if not gate:
return False, "gate cannot be empty"
if "epoch" in payload:
epoch = _safe_int(payload.get("epoch", 0) or 0, 0)
if epoch <= 0:
return False, "epoch must be a positive integer"
elif (
not str(payload.get("ciphertext", "")).strip()
and not str(payload.get("nonce", "")).strip()
and not str(payload.get("sender_ref", "")).strip()
):
return False, "epoch must be a positive integer"
if not str(payload.get("ciphertext", "")).strip():
return False, "ciphertext cannot be empty"
if not str(payload.get("nonce", "")).strip():
return False, "nonce cannot be empty"
if not str(payload.get("sender_ref", "")).strip():
return False, "sender_ref cannot be empty"
payload_format = str(payload.get("format", "mls1") or "mls1").strip().lower()
if payload_format != "mls1":
return False, "Unsupported gate message format"
return True, "ok"
def _validate_vote(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("target_id", "vote", "gate"))
if not ok:
return ok, reason
if payload.get("vote") not in (-1, 1):
return False, "Invalid vote"
return True, "ok"
def _validate_gate_create(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("gate_id", "display_name", "rules"))
if not ok:
return ok, reason
if not isinstance(payload.get("rules"), dict):
return False, "rules must be an object"
return True, "ok"
def _validate_prediction(payload: dict[str, Any]) -> tuple[bool, str]:
return _require_fields(payload, ("market_title", "side", "stake_amount"))
def _validate_stake(payload: dict[str, Any]) -> tuple[bool, str]:
return _require_fields(payload, ("message_id", "poster_id", "side", "amount", "duration_days"))
def _validate_dm_block(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("blocked_id", "action"))
if not ok:
return ok, reason
if payload.get("action") not in ("block", "unblock"):
return False, "Invalid action"
return True, "ok"
def _validate_dm_key(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("dh_pub_key", "dh_algo", "timestamp"))
if not ok:
return ok, reason
if payload.get("dh_algo") not in ("X25519", "ECDH", "ECDH_P256"):
return False, "Invalid dh_algo"
return True, "ok"
def _validate_dm_message(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(
payload, ("recipient_id", "delivery_class", "recipient_token", "ciphertext", "msg_id", "timestamp")
)
if not ok:
return ok, reason
delivery_class = str(payload.get("delivery_class", "")).lower()
if delivery_class not in ("request", "shared"):
return False, "Invalid delivery_class"
if delivery_class == "shared" and not str(payload.get("recipient_token", "")).strip():
return False, "recipient_token required for shared delivery"
dm_format = str(payload.get("format", "mls1") or "mls1").strip().lower()
if dm_format not in ("mls1", "dm1"):
return False, f"Unknown DM format: {dm_format}"
return True, "ok"
def _validate_mailbox_claims(claims: Any) -> tuple[bool, str]:
if not isinstance(claims, list) or not claims:
return False, "mailbox_claims must be a non-empty list"
for claim in claims:
if not isinstance(claim, dict):
return False, "mailbox_claims entries must be objects"
claim_type = str(claim.get("type", "")).lower()
if claim_type not in ("self", "requests", "shared"):
return False, "Invalid mailbox claim type"
if not str(claim.get("token", "")).strip():
return False, f"{claim_type} mailbox claims require token"
return True, "ok"
def _validate_dm_poll(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("mailbox_claims", "timestamp", "nonce"))
if not ok:
return ok, reason
return _validate_mailbox_claims(payload.get("mailbox_claims"))
def _validate_dm_count(payload: dict[str, Any]) -> tuple[bool, str]:
return _validate_dm_poll(payload)
def _validate_key_rotate(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(
payload,
(
"old_node_id",
"old_public_key",
"old_public_key_algo",
"new_public_key",
"new_public_key_algo",
"timestamp",
"old_signature",
),
)
if not ok:
return ok, reason
return True, "ok"
def _validate_key_revoke(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(
payload,
(
"revoked_public_key",
"revoked_public_key_algo",
"revoked_at",
"grace_until",
"reason",
),
)
if not ok:
return ok, reason
revoked_at = _safe_int(payload.get("revoked_at", 0) or 0, 0)
grace_until = _safe_int(payload.get("grace_until", 0) or 0, 0)
if revoked_at <= 0:
return False, "revoked_at must be a positive timestamp"
if grace_until < revoked_at:
return False, "grace_until must be >= revoked_at"
return True, "ok"
def _validate_abuse_report(payload: dict[str, Any]) -> tuple[bool, str]:
ok, reason = _require_fields(payload, ("target_id", "reason"))
if not ok:
return ok, reason
if not str(payload.get("reason", "")).strip():
return False, "reason cannot be empty"
return True, "ok"
SCHEMA_REGISTRY: dict[str, EventSchema] = {
"message": EventSchema(
event_type="message",
required_fields=("message", "destination", "channel", "priority", "ephemeral"),
optional_fields=(),
validate=_validate_message,
),
"gate_message": EventSchema(
event_type="gate_message",
required_fields=("gate", "ciphertext", "nonce", "sender_ref"),
optional_fields=("format",),
validate=_validate_gate_message,
),
"vote": EventSchema(
event_type="vote",
required_fields=("target_id", "vote", "gate"),
optional_fields=(),
validate=_validate_vote,
),
"gate_create": EventSchema(
event_type="gate_create",
required_fields=("gate_id", "display_name", "rules"),
optional_fields=(),
validate=_validate_gate_create,
),
"prediction": EventSchema(
event_type="prediction",
required_fields=("market_title", "side", "stake_amount"),
optional_fields=(),
validate=_validate_prediction,
),
"stake": EventSchema(
event_type="stake",
required_fields=("message_id", "poster_id", "side", "amount", "duration_days"),
optional_fields=(),
validate=_validate_stake,
),
"dm_block": EventSchema(
event_type="dm_block",
required_fields=("blocked_id", "action"),
optional_fields=(),
validate=_validate_dm_block,
),
"dm_key": EventSchema(
event_type="dm_key",
required_fields=("dh_pub_key", "dh_algo", "timestamp"),
optional_fields=(),
validate=_validate_dm_key,
),
"dm_message": EventSchema(
event_type="dm_message",
required_fields=("recipient_id", "delivery_class", "recipient_token", "ciphertext", "msg_id", "timestamp"),
optional_fields=(),
validate=_validate_dm_message,
),
"dm_poll": EventSchema(
event_type="dm_poll",
required_fields=("mailbox_claims", "timestamp", "nonce"),
optional_fields=(),
validate=_validate_dm_poll,
),
"dm_count": EventSchema(
event_type="dm_count",
required_fields=("mailbox_claims", "timestamp", "nonce"),
optional_fields=(),
validate=_validate_dm_count,
),
"key_rotate": EventSchema(
event_type="key_rotate",
required_fields=(
"old_node_id",
"old_public_key",
"old_public_key_algo",
"new_public_key",
"new_public_key_algo",
"timestamp",
"old_signature",
),
optional_fields=(),
validate=_validate_key_rotate,
),
"key_revoke": EventSchema(
event_type="key_revoke",
required_fields=(
"revoked_public_key",
"revoked_public_key_algo",
"revoked_at",
"grace_until",
"reason",
),
optional_fields=(),
validate=_validate_key_revoke,
),
"abuse_report": EventSchema(
event_type="abuse_report",
required_fields=("target_id", "reason"),
optional_fields=("gate", "evidence"),
validate=_validate_abuse_report,
),
}
ACTIVE_PUBLIC_LEDGER_EVENT_TYPES: frozenset[str] = frozenset(
{
"message",
"vote",
"gate_create",
"prediction",
"stake",
"key_rotate",
"key_revoke",
"abuse_report",
}
)
"""Event types that may be newly appended to the public infonet chain."""
LEGACY_PUBLIC_LEDGER_EVENT_TYPES: frozenset[str] = frozenset(
{
"gate_message",
"dm_message",
}
)
"""Event types that exist historically on the public chain and must remain
ingestable for sync/restart compatibility, but may NOT be newly appended."""
PUBLIC_LEDGER_EVENT_TYPES: frozenset[str] = (
ACTIVE_PUBLIC_LEDGER_EVENT_TYPES | LEGACY_PUBLIC_LEDGER_EVENT_TYPES
)
"""Union of active + legacy — the full set accepted during ingest."""
_PUBLIC_LEDGER_FORBIDDEN_FIELDS: frozenset[str] = frozenset(
{
"ip",
"ip_address",
"origin_ip",
"source_ip",
"client_ip",
"host",
"hostname",
"origin",
"originator",
"originator_hint",
"routing_hint",
"route",
"route_hint",
"route_reason",
"routed_via",
"transport",
"transport_handle",
"transport_lock",
"recipient_id",
"recipient_token",
"delivery_class",
"mailbox_claims",
"dh_pub_key",
"sender_token",
}
)
def get_schema(event_type: str) -> EventSchema | None:
return SCHEMA_REGISTRY.get(event_type)
# ─── Extension registry (Sprint 8+ chain cutover, 2026-04-28) ────────────
# The infonet economy layer registers its event-type validators here at
# import time via ``services/infonet/_chain_cutover.py``. mesh_schema does
# NOT import from services.infonet (would create a cycle); the direction
# stays one-way (infonet → mesh).
#
# Extensions opt out of the legacy normalize_payload + ephemeral-check
# pipeline because their payloads have their own normalization rules.
# The legacy flow stays byte-identical for legacy event types.
_EXTENSION_VALIDATORS: dict[str, Callable[[dict[str, Any]], tuple[bool, str]]] = {}
def register_extension_validator(
event_type: str,
validator: Callable[[dict[str, Any]], tuple[bool, str]],
) -> None:
"""Register an extension event-type validator.
Idempotent — calling twice with the same ``event_type`` overwrites
the prior validator (no-op when called with the same function).
Used by ``services/infonet/_chain_cutover.py``.
"""
if not isinstance(event_type, str) or not event_type:
raise ValueError("event_type must be a non-empty string")
_EXTENSION_VALIDATORS[event_type] = validator
def is_extension_event_type(event_type: str) -> bool:
return event_type in _EXTENSION_VALIDATORS
def validate_event_payload(event_type: str, payload: dict[str, Any]) -> tuple[bool, str]:
schema = get_schema(event_type)
if schema is None:
# Fall through to extension validators (registered by infonet
# economy layer at import time).
ext = _EXTENSION_VALIDATORS.get(event_type)
if ext is not None:
return ext(payload)
return False, "Unknown event_type"
normalized = normalize_payload(event_type, payload)
if normalized != payload:
return False, "Payload is not normalized"
if event_type not in ("message", "gate_message") and "ephemeral" in payload:
return False, "ephemeral not allowed for this event type"
return schema.validate_payload(payload)
def validate_public_ledger_payload(event_type: str, payload: dict[str, Any]) -> tuple[bool, str]:
if event_type == "gate_message":
return validate_private_gate_ledger_payload(payload)
if event_type not in PUBLIC_LEDGER_EVENT_TYPES and event_type not in _EXTENSION_VALIDATORS:
return False, f"{event_type} is not allowed on the public ledger"
forbidden = sorted(
key
for key in payload.keys()
if str(key or "").strip().lower() in _PUBLIC_LEDGER_FORBIDDEN_FIELDS
)
if forbidden:
return False, f"public ledger payload contains forbidden fields: {', '.join(forbidden)}"
if event_type == "message":
destination = str(payload.get("destination", "") or "").strip().lower()
if destination and destination != "broadcast":
return False, "public ledger message destination must be broadcast"
return True, "ok"
_PRIVATE_GATE_LEDGER_ALLOWED_FIELDS: frozenset[str] = frozenset(
{
"gate",
"ciphertext",
"nonce",
"sender_ref",
"format",
"epoch",
"gate_envelope",
"envelope_hash",
"reply_to",
"transport_lock",
"signed_context",
}
)
def validate_private_gate_ledger_payload(payload: dict[str, Any]) -> tuple[bool, str]:
"""Validate ciphertext-only gate events for private Infonet replication."""
ok, reason = validate_event_payload("gate_message", payload)
if not ok:
return ok, reason
unexpected = sorted(
key
for key in payload.keys()
if str(key or "").strip().lower() not in _PRIVATE_GATE_LEDGER_ALLOWED_FIELDS
)
if unexpected:
return False, f"private gate ledger payload contains unsupported fields: {', '.join(unexpected)}"
if "message" in payload or "_local_plaintext" in payload or "_local_reply_to" in payload:
return False, "private gate ledger payload must not contain plaintext"
transport_lock = str(payload.get("transport_lock", "") or "").strip().lower()
if transport_lock and transport_lock not in {"private", "private_strong", "rns", "onion"}:
return False, "gate messages require private transport_lock"
ok, reason = _validate_sealed_bytes_field(payload, "ciphertext")
if not ok:
return ok, reason
ok, reason = _validate_sealed_bytes_field(payload, "nonce")
if not ok:
return ok, reason
return True, "ok"
_PRIVATE_DM_LEDGER_ALLOWED_FIELDS: frozenset[str] = frozenset(
{
"recipient_id",
"delivery_class",
"recipient_token",
"ciphertext",
"msg_id",
"timestamp",
"format",
"session_welcome",
"sender_seal",
"relay_salt",
"transport_lock",
"signed_context",
}
)
def validate_private_dm_ledger_payload(payload: dict[str, Any]) -> tuple[bool, str]:
"""Validate ciphertext-only DM dead-drop events for private Infonet replication."""
ok, reason = validate_event_payload("dm_message", payload)
if not ok:
return ok, reason
unexpected = sorted(
key
for key in payload.keys()
if str(key or "").strip().lower() not in _PRIVATE_DM_LEDGER_ALLOWED_FIELDS
)
if unexpected:
return False, f"private DM ledger payload contains unsupported fields: {', '.join(unexpected)}"
if "message" in payload or "plaintext" in payload or "_local_plaintext" in payload:
return False, "private DM ledger payload must not contain plaintext"
transport_lock = str(payload.get("transport_lock", "") or "").strip().lower()
if transport_lock != "private_strong":
return False, "DM hashchain spool requires private_strong transport_lock"
if not str(payload.get("ciphertext", "") or "").strip():
return False, "ciphertext cannot be empty"
ok, reason = _validate_sealed_bytes_field(payload, "ciphertext")
if not ok:
return ok, reason
return True, "ok"
def validate_protocol_fields(protocol_version: str, network_id: str) -> tuple[bool, str]:
if protocol_version != PROTOCOL_VERSION:
return False, "Unsupported protocol_version"
if network_id != NETWORK_ID:
return False, "network_id mismatch"
return True, "ok"