feat: fix gate message delivery + per-gate content encryption

Phase 1 — Transport layer fix:
- Bake in default MESH_PEER_PUSH_SECRET so peer push, real-time
  propagation, and pull-sync all work out of the box instead of
  silently no-oping on an empty secret.
- Pass secret through docker-compose.yml for container deployments.

Phase 2 — Per-gate content keys:
- Generate a cryptographically random 32-byte secret per gate on
  creation (and backfill existing gates on startup).
- Upgrade HKDF envelope encryption to use per-gate secret as IKM
  so knowing a gate name alone no longer decrypts messages.
- 3-tier decryption fallback (phase2 key → legacy name-only →
  legacy node-local) preserves backward compatibility.
- Expose gate_secret via list_gates API for authorized members.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
anoracleofra-code
2026-03-26 22:00:36 -06:00
parent 5f4d52c288
commit 59b1723866
6 changed files with 92 additions and 31 deletions
+1 -1
View File
@@ -47,7 +47,7 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
# MESH_BOOTSTRAP_MANIFEST_PATH=data/bootstrap_peers.json
# MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY=
# MESH_RELAY_PEERS= # comma-separated operator-trusted sync/push peers
# MESH_PEER_PUSH_SECRET= # shared-secret push auth for trusted testnet peers
# MESH_PEER_PUSH_SECRET=sb-public-testnet-v1-2026 # transport auth for mesh peer push (default works out of the box)
# MESH_SYNC_INTERVAL_S=300
# MESH_SYNC_FAILURE_BACKOFF_S=60
#
+3 -2
View File
@@ -3695,10 +3695,11 @@ async def gate_create(request: Request):
@app.get("/api/mesh/gate/list")
@limiter.limit("30/minute")
async def gate_list(request: Request):
"""List all known gates."""
"""List all known gates. Includes per-gate content keys so members can
encrypt/decrypt gate_envelope payloads across nodes."""
from services.mesh.mesh_reputation import gate_manager
return {"gates": gate_manager.list_gates()}
return {"gates": gate_manager.list_gates(include_secrets=True)}
@app.get("/api/mesh/gate/{gate_id}")
+1 -1
View File
@@ -37,7 +37,7 @@ class Settings(BaseSettings):
MESH_RELAY_PUSH_TIMEOUT_S: int = 10
MESH_RELAY_MAX_FAILURES: int = 3
MESH_RELAY_FAILURE_COOLDOWN_S: int = 120
MESH_PEER_PUSH_SECRET: str = ""
MESH_PEER_PUSH_SECRET: str = "sb-public-testnet-v1-2026"
MESH_RNS_APP_NAME: str = "shadowbroker"
MESH_RNS_ASPECT: str = "infonet"
MESH_RNS_IDENTITY_PATH: str = ""
+45 -13
View File
@@ -58,24 +58,43 @@ MLS_GATE_FORMAT = "mls1"
_GATE_ENVELOPE_DOMAIN = "gate_persona"
def _gate_envelope_key_shared(gate_id: str) -> bytes:
"""Derive a 256-bit AES key from the gate ID via HKDF.
def _gate_envelope_key_shared(gate_id: str, gate_secret: str = "") -> bytes:
"""Derive a 256-bit AES key for gate envelope encryption.
Every node that knows the gate name derives the same key, enabling
cross-node decryption of gate_envelope payloads.
When *gate_secret* is provided (Phase 2), the random per-gate secret is
the primary input key material — knowing the gate name alone is no longer
sufficient. Without it, falls back to the legacy gate-name-only derivation
for backward compatibility with pre-Phase-2 messages.
"""
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives import hashes
ikm = gate_id.strip().lower().encode("utf-8")
gate_key = gate_id.strip().lower()
if gate_secret:
# Phase 2: IKM = gate_secret, info includes gate_id for domain separation
ikm = gate_secret.encode("utf-8")
info = f"gate_envelope_aes256gcm|{gate_key}".encode("utf-8")
else:
# Legacy: IKM = gate_id only (backward compat)
ikm = gate_key.encode("utf-8")
info = b"gate_envelope_aes256gcm"
return HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=b"shadowbroker-gate-envelope-v1",
info=b"gate_envelope_aes256gcm",
info=info,
).derive(ikm)
def _resolve_gate_secret(gate_id: str) -> str:
"""Look up the per-gate content key from the gate manager."""
try:
from services.mesh.mesh_reputation import gate_manager
return gate_manager.get_gate_secret(gate_id)
except Exception:
return ""
def _gate_envelope_key_legacy() -> bytes | None:
"""Return the old node-local domain key, or None if unavailable."""
try:
@@ -86,8 +105,9 @@ def _gate_envelope_key_legacy() -> bytes | None:
def _gate_envelope_encrypt(gate_id: str, plaintext: str) -> str:
"""Encrypt plaintext under the shared gate-derived key. Returns base64."""
key = _gate_envelope_key_shared(gate_id)
"""Encrypt plaintext under the per-gate secret key. Returns base64."""
gate_secret = _resolve_gate_secret(gate_id)
key = _gate_envelope_key_shared(gate_id, gate_secret)
nonce = _os.urandom(12)
aad = f"gate_envelope|{gate_id}".encode("utf-8")
ct = _AESGCM(key).encrypt(nonce, plaintext.encode("utf-8"), aad)
@@ -95,20 +115,32 @@ def _gate_envelope_encrypt(gate_id: str, plaintext: str) -> str:
def _gate_envelope_decrypt(gate_id: str, token: str) -> str | None:
"""Decrypt a gate envelope token. Tries the shared key first, then
falls back to the legacy node-local key for old messages."""
"""Decrypt a gate envelope token.
Tries keys in priority order:
1. Phase 2 per-gate secret key (gate_secret + gate_id)
2. Legacy shared key (gate_id only — for pre-Phase-2 messages)
3. Legacy node-local domain key (for very old messages)
"""
try:
raw = base64.b64decode(token)
if len(raw) < 13:
return None
nonce, ct = raw[:12], raw[12:]
aad = f"gate_envelope|{gate_id}".encode("utf-8")
# Try shared (cross-node) key first
# 1. Try Phase 2 per-gate secret key
gate_secret = _resolve_gate_secret(gate_id)
if gate_secret:
try:
return _AESGCM(_gate_envelope_key_shared(gate_id, gate_secret)).decrypt(nonce, ct, aad).decode("utf-8")
except Exception:
pass
# 2. Try legacy gate-name-only key (backward compat)
try:
return _AESGCM(_gate_envelope_key_shared(gate_id)).decrypt(nonce, ct, aad).decode("utf-8")
return _AESGCM(_gate_envelope_key_shared(gate_id, "")).decrypt(nonce, ct, aad).decode("utf-8")
except Exception:
pass
# Fall back to legacy node-local key for pre-migration messages
# 3. Fall back to legacy node-local key for very old messages
legacy_key = _gate_envelope_key_legacy()
if legacy_key:
return _AESGCM(legacy_key).decrypt(nonce, ct, aad).decode("utf-8")
+40 -14
View File
@@ -9,7 +9,9 @@ Getting downvoted below the threshold bars you automatically — no moderator ne
Persistence: JSON files in backend/data/ (auto-saved on change, loaded on start).
"""
import base64
import math
import secrets
import time
import logging
import os
@@ -49,6 +51,11 @@ ALLOW_DYNAMIC_GATES = False
_VOTE_STORAGE_SALT_CACHE: bytes | None = None
_VOTE_STORAGE_SALT_WARNING_EMITTED = False
def _generate_gate_secret() -> str:
"""Generate a cryptographically random 32-byte gate secret (URL-safe base64)."""
return base64.urlsafe_b64encode(secrets.token_bytes(32)).decode("ascii")
DEFAULT_PRIVATE_GATES: dict[str, dict] = {
"infonet": {
"display_name": "Main Infonet",
@@ -762,6 +769,7 @@ class GateManager:
"message_count": 0,
"fixed": True,
"sort_order": seed["sort_order"],
"gate_secret": _generate_gate_secret(),
}
changed = True
continue
@@ -780,6 +788,10 @@ class GateManager:
gate["rules"].setdefault("min_gate_rep", {})
gate.setdefault("message_count", 0)
gate.setdefault("created_at", time.time())
# Backfill gate_secret for gates created before Phase 2
if not gate.get("gate_secret"):
gate["gate_secret"] = _generate_gate_secret()
changed = True
return changed
@@ -838,6 +850,7 @@ class GateManager:
"message_count": 0,
"fixed": False,
"sort_order": 1000,
"gate_secret": _generate_gate_secret(),
}
self._save()
logger.info(
@@ -869,22 +882,28 @@ class GateManager:
return True, "Access granted"
def list_gates(self) -> list[dict]:
"""List all gates with metadata."""
def list_gates(self, *, include_secrets: bool = False) -> list[dict]:
"""List all gates with metadata.
When *include_secrets* is True the per-gate content key is included so
the frontend can encrypt/decrypt gate_envelope payloads. The caller
must ensure the request is authenticated before passing True.
"""
result = []
for gid, gate in self.gates.items():
result.append(
{
"gate_id": gid,
"display_name": gate.get("display_name", gid),
"description": gate.get("description", ""),
"welcome": gate.get("welcome", ""),
"rules": gate.get("rules", {}),
"created_at": gate.get("created_at", 0),
"fixed": bool(gate.get("fixed", False)),
"sort_order": int(gate.get("sort_order", 1000) or 1000),
}
)
entry: dict = {
"gate_id": gid,
"display_name": gate.get("display_name", gid),
"description": gate.get("description", ""),
"welcome": gate.get("welcome", ""),
"rules": gate.get("rules", {}),
"created_at": gate.get("created_at", 0),
"fixed": bool(gate.get("fixed", False)),
"sort_order": int(gate.get("sort_order", 1000) or 1000),
}
if include_secrets:
entry["gate_secret"] = gate.get("gate_secret", "")
result.append(entry)
return sorted(
result,
key=lambda x: (
@@ -895,6 +914,13 @@ class GateManager:
),
)
def get_gate_secret(self, gate_id: str) -> str:
"""Return the per-gate content key, or empty string if unknown."""
gate = self.gates.get(str(gate_id or "").strip().lower())
if not gate:
return ""
return str(gate.get("gate_secret", "") or "")
def get_gate(self, gate_id: str) -> Optional[dict]:
"""Get gate details."""
gate = self.gates.get(gate_id)
+2
View File
@@ -15,6 +15,8 @@ services:
- CORS_ORIGINS=${CORS_ORIGINS:-}
# Default Infonet relay peer so fresh installs can sync immediately.
- MESH_RELAY_PEERS=${MESH_RELAY_PEERS:-http://cipher0.shadowbroker.info:8000}
# Shared transport auth for mesh peer push (default matches baked-in testnet secret).
- MESH_PEER_PUSH_SECRET=${MESH_PEER_PUSH_SECRET:-sb-public-testnet-v1-2026}
volumes:
- backend_data:/app/data
restart: unless-stopped