diff --git a/backend/main.py b/backend/main.py index 5ee1c0e..64d6ee4 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1,4 +1,4 @@ -import os +import os from dotenv import load_dotenv load_dotenv() @@ -8,6 +8,7 @@ import asyncio import base64 import hmac import importlib +import ipaddress import secrets import hashlib as _hashlib_mod from dataclasses import dataclass, field @@ -20,6 +21,11 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) _start_time = time.time() _MESH_ONLY = os.environ.get("MESH_ONLY", "").strip().lower() in ("1", "true", "yes") +_HEADLESS_MESH_NODE_RUNTIME = os.environ.get("SHADOWBROKER_MESH_NODE_RUNTIME", "").strip().lower() in ( + "1", + "true", + "yes", +) _WARNED_LEGACY_DM_PUBKEY_LOOKUPS: set[str] = set() @@ -1095,6 +1101,7 @@ _WORMHOLE_PUBLIC_PROFILE_FIELDS = {"wormhole_enabled"} _PRIVATE_LANE_CONTROL_FIELDS = {"private_lane_tier", "private_lane_policy"} _PUBLIC_RNS_STATUS_FIELDS = {"enabled", "ready", "configured_peers", "active_peers"} _NODE_PUBLIC_EVENT_HOOK_REGISTERED = False +_NODE_RUNTIME_THREADS_STARTED = False _INFONET_PRIVATE_TRANSPORT_LOCK = threading.Lock() @@ -1184,6 +1191,49 @@ def _filter_infonet_sync_records(records: list[Any]) -> list[Any]: ] +def _infonet_peer_url_allowed(peer_url: str) -> bool: + if not _infonet_private_transport_required(): + return True + return _is_private_infonet_transport(peer_transport_kind(peer_url)) + + +def _filter_infonet_peer_urls(peer_urls: list[str]) -> list[str]: + if not _infonet_private_transport_required(): + return peer_urls + return [peer_url for peer_url in peer_urls if _infonet_peer_url_allowed(peer_url)] + + +def _infonet_peer_requests_proxies(normalized_peer_url: str) -> dict[str, str] | None: + """Return requests proxy settings for a sync/push peer, enforcing private policy.""" + transport = peer_transport_kind(normalized_peer_url) + if _infonet_private_transport_required() and not _is_private_infonet_transport(transport): + raise RuntimeError(_infonet_private_transport_error()) + if transport != "onion": + return None + if not bool(get_settings().MESH_ARTI_ENABLED): + raise RuntimeError("onion peer requests require Arti to be enabled") + from services.wormhole_supervisor import _check_arti_ready + + if not _check_arti_ready(): + raise RuntimeError("onion peer requests require a ready Arti transport") + socks_port = int(get_settings().MESH_ARTI_SOCKS_PORT or 9050) + proxy = f"socks5h://127.0.0.1:{socks_port}" + return {"http": proxy, "https": proxy} + + +def _local_infonet_peer_url() -> str: + """Return this node's advertised peer URL for HMAC peer authentication.""" + configured = normalize_peer_url(str(getattr(get_settings(), "MESH_PUBLIC_PEER_URL", "") or "")) + if configured: + return configured + try: + from services.tor_hidden_service import tor_service + + return normalize_peer_url(str(tor_service.onion_address or "")) + except Exception: + return "" + + def _ensure_infonet_private_transport_ready(reason: str = "") -> bool: """Warm the local onion transport before private Infonet sync. @@ -1257,6 +1307,13 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]: operator_peers = configured_relay_peer_urls() bootstrap_seed_peers = _configured_bootstrap_seed_peer_urls() skipped_clearnet_peers = 0 + pruned_clearnet_peers = 0 + if private_transport_required: + for key, record in list(store._records.items()): + if _is_private_infonet_transport(str(getattr(record, "transport", "") or "")): + continue + del store._records[key] + pruned_clearnet_peers += 1 for peer_url in operator_peers: transport = peer_transport_kind(peer_url) if not transport: @@ -1364,6 +1421,7 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]: "node_mode": mode, "private_transport_required": private_transport_required, "skipped_clearnet_peer_count": skipped_clearnet_peers, + "pruned_clearnet_peer_count": pruned_clearnet_peers, "manifest_loaded": manifest is not None, "manifest_signer_id": manifest.signer_id if manifest is not None else "", "manifest_valid_until": int(manifest.valid_until or 0) if manifest is not None else 0, @@ -1384,6 +1442,28 @@ def _materialize_local_infonet_state() -> None: from services.mesh.mesh_hashchain import infonet infonet.ensure_materialized() + try: + _hydrate_gate_store_from_chain(list(infonet.events)) + _hydrate_dm_relay_from_chain(list(infonet.events)) + except Exception: + pass + + +class PeerSyncHTTPError(RuntimeError): + def __init__(self, status_code: int, detail: str, *, retry_after_s: int = 0): + self.status_code = int(status_code or 0) + self.retry_after_s = int(retry_after_s or 0) + message = str(detail or f"HTTP {self.status_code}").strip() + if not message.upper().startswith("HTTP"): + message = f"HTTP {self.status_code}: {message}" + super().__init__(message) + + +def _parse_retry_after_seconds(value: str) -> int: + try: + return max(0, int(float(str(value or "").strip()))) + except Exception: + return 0 def _peer_sync_response(peer_url: str, body: dict[str, Any]) -> dict[str, Any]: @@ -1446,7 +1526,8 @@ def _peer_sync_response(peer_url: str, body: dict[str, Any]) -> dict[str, Any]: raise ValueError(f"peer sync returned non-JSON response ({response.status_code})") from exc if response.status_code != 200: detail = str(payload.get("detail", "") or f"HTTP {response.status_code}").strip() - raise ValueError(detail or f"HTTP {response.status_code}") + retry_after_s = _parse_retry_after_seconds(response.headers.get("Retry-After", "")) + raise PeerSyncHTTPError(response.status_code, detail, retry_after_s=retry_after_s) if not isinstance(payload, dict): raise ValueError("peer sync returned malformed payload") return payload @@ -1485,6 +1566,46 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int: return count +def _hydrate_dm_relay_from_chain(events: list[dict]) -> int: + """Copy accepted dm_message chain events into the local encrypted DM relay.""" + import hashlib + + from services.mesh.mesh_dm_relay import dm_relay + from services.mesh.mesh_hashchain import infonet + + count = 0 + for evt in events: + if evt.get("event_type") != "dm_message": + continue + event_id = str(evt.get("event_id", "") or "").strip() + if not event_id or event_id not in infonet.event_index: + continue + canonical = infonet.events[infonet.event_index[event_id]] + payload = canonical.get("payload") if isinstance(canonical.get("payload"), dict) else {} + sender_token_hash = hashlib.sha256( + f"hashchain-dm-sender|{event_id}|{canonical.get('node_id', '')}".encode("utf-8") + ).hexdigest() + try: + result = dm_relay.deposit( + sender_id=str(canonical.get("node_id", "") or ""), + raw_sender_id=str(canonical.get("node_id", "") or ""), + recipient_id=str(payload.get("recipient_id", "") or ""), + ciphertext=str(payload.get("ciphertext", "") or ""), + msg_id=str(payload.get("msg_id", "") or ""), + delivery_class=str(payload.get("delivery_class", "") or ""), + recipient_token=str(payload.get("recipient_token", "") or "") or None, + sender_seal=str(payload.get("sender_seal", "") or ""), + sender_token_hash=sender_token_hash, + payload_format=str(payload.get("format", "dm1") or "dm1"), + session_welcome=str(payload.get("session_welcome", "") or ""), + ) + if result.get("ok"): + count += 1 + except Exception: + pass + return count + + def _sync_from_peer( peer_url: str, *, @@ -1538,6 +1659,7 @@ def _sync_from_peer( return True, "", False, 0 result = infonet.ingest_events(events) _hydrate_gate_store_from_chain(events) + _hydrate_dm_relay_from_chain(events) rejected = list(result.get("rejected", []) or []) if rejected: return False, f"sync ingest rejected {len(rejected)} event(s)", False, 0 @@ -1600,6 +1722,8 @@ def _run_public_sync_cycle() -> SyncWorkerState: last_error = "sync failed" for record in peers: + retry_after_s = 0 + http_status_code = 0 started = begin_sync( current_state, peer_url=record.peer_url, @@ -1610,6 +1734,17 @@ def _run_public_sync_cycle() -> SyncWorkerState: set_sync_state(started) try: ok, error, forked, retry_after_s = _sync_from_peer(record.peer_url) + except PeerSyncHTTPError as exc: + # _sync_from_peer catches PeerSyncRateLimited internally (4-tuple + # path for 429 with Retry-After). Other non-200 statuses surface + # here as PeerSyncHTTPError — pull retry_after_s + status off it + # so the cooldown calculation below can honor server hints even + # for non-429 throttling responses. + ok = False + error = str(exc) + forked = False + retry_after_s = int(exc.retry_after_s or 0) + http_status_code = int(exc.status_code or 0) except Exception as exc: ok = False error = str(exc or type(exc).__name__) @@ -1640,6 +1775,10 @@ def _run_public_sync_cycle() -> SyncWorkerState: getattr(settings, "MESH_BOOTSTRAP_SEED_FAILURE_COOLDOWN_S", cooldown_s) or cooldown_s ) + if http_status_code == 429: + failure_count = max(int(getattr(record, "failure_count", 0) or 0), current_state.consecutive_failures) + exponential_429_s = min(900, 60 * (2 ** min(failure_count, 4))) + cooldown_s = max(cooldown_s, retry_after_s, exponential_429_s) store.mark_failure( record.peer_url, "sync", @@ -1650,7 +1789,7 @@ def _run_public_sync_cycle() -> SyncWorkerState: store.save() failure_backoff_s = int(settings.MESH_SYNC_FAILURE_BACKOFF_S or 60) if is_seed_peer: - failure_backoff_s = min(failure_backoff_s, max(1, cooldown_s)) + failure_backoff_s = max(failure_backoff_s, max(1, cooldown_s)) updated = finish_sync( started, ok=False, @@ -1750,7 +1889,7 @@ def _propagate_public_event_to_peers(event_dict: dict[str, Any]) -> None: if not _participant_node_enabled(): return - if not authenticated_push_peer_urls(): + if not _filter_infonet_peer_urls(authenticated_push_peer_urls()): return envelope = MeshEnvelope( @@ -1784,6 +1923,45 @@ def _schedule_public_event_propagation(event_dict: dict[str, Any]) -> None: ).start() +def _infonet_node_runtime_requested() -> bool: + return (not _MESH_ONLY) or _HEADLESS_MESH_NODE_RUNTIME + + +def _start_infonet_node_runtime(reason: str = "startup") -> None: + """Start sync/push/pull workers for participant nodes.""" + global _NODE_PUBLIC_EVENT_HOOK_REGISTERED, _NODE_RUNTIME_THREADS_STARTED + + if not _infonet_node_runtime_requested(): + return + try: + from services.mesh.mesh_hashchain import register_public_event_append_hook + + _materialize_local_infonet_state() + _refresh_node_peer_store() + if _node_runtime_supported(): + if not _participant_node_enabled(): + logger.info("Infonet participant auto-enabled for private seed sync") + _set_participant_node_enabled(True) + threading.Thread( + target=lambda: _ensure_infonet_private_transport_ready(reason), + daemon=True, + name="infonet-private-transport-warmup", + ).start() + _NODE_SYNC_STOP.clear() + if not _NODE_RUNTIME_THREADS_STARTED: + threading.Thread(target=_public_infonet_sync_loop, daemon=True).start() + threading.Thread(target=_http_peer_push_loop, daemon=True).start() + threading.Thread(target=_http_gate_push_loop, daemon=True).start() + threading.Thread(target=_http_gate_pull_loop, daemon=True).start() + _NODE_RUNTIME_THREADS_STARTED = True + _kick_public_sync_background(reason) + if not _NODE_PUBLIC_EVENT_HOOK_REGISTERED: + register_public_event_append_hook(_schedule_public_event_propagation) + _NODE_PUBLIC_EVENT_HOOK_REGISTERED = True + except Exception as e: + logger.warning(f"Node bootstrap runtime failed to initialize: {e}") + + # ─── Background HTTP Peer Push Worker ──────────────────────────────────── # Runs alongside the sync loop. Every PUSH_INTERVAL seconds, batches new # Infonet events and sends them via HMAC-authenticated POST to push peers. @@ -1791,6 +1969,7 @@ def _schedule_public_event_propagation(event_dict: dict[str, Any]) -> None: _PEER_PUSH_INTERVAL_S = 10 _PEER_PUSH_BATCH_SIZE = 50 _peer_push_last_index: dict[str, int] = {} # peer_url → last pushed event index +_INFONET_SYNC_RATE_LIMIT = "600/minute" def _http_peer_push_loop() -> None: @@ -1812,7 +1991,7 @@ def _http_peer_push_loop() -> None: # loop on the global secret being set — an install that only # configures per-peer secrets is now valid. - peers = authenticated_push_peer_urls() + peers = _filter_infonet_peer_urls(authenticated_push_peer_urls()) if not peers: _NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S) continue @@ -1840,7 +2019,8 @@ def _http_peer_push_loop() -> None: ensure_ascii=False, ).encode("utf-8") - peer_key = resolve_peer_key_for_url(normalized) + sender_url = _local_infonet_peer_url() + peer_key = resolve_peer_key_for_url(sender_url) if not peer_key: continue import hmac as _hmac_mod2 @@ -1848,14 +2028,21 @@ def _http_peer_push_loop() -> None: hmac_hex = _hmac_mod2.new(peer_key, body_bytes, _hashlib_mod2.sha256).hexdigest() timeout = int(get_settings().MESH_RELAY_PUSH_TIMEOUT_S or 10) - resp = _requests.post( - f"{normalized}/api/mesh/infonet/peer-push", - data=body_bytes, - headers={ + proxies = _infonet_peer_requests_proxies(normalized) + request_kwargs: dict[str, Any] = { + "data": body_bytes, + "headers": { "Content-Type": "application/json", + "X-Peer-Url": sender_url, "X-Peer-HMAC": hmac_hex, }, - timeout=timeout, + "timeout": timeout, + } + if proxies: + request_kwargs["proxies"] = proxies + resp = _requests.post( + f"{normalized}/api/mesh/infonet/peer-push", + **request_kwargs, ) if resp.status_code == 200: _peer_push_last_index[normalized] = last_idx + len(batch) @@ -1895,7 +2082,7 @@ def _http_gate_pull_loop() -> None: # Issue #256: per-peer key resolution; see _http_peer_push_loop. - peers = authenticated_push_peer_urls() + peers = _filter_infonet_peer_urls(authenticated_push_peer_urls()) if not peers: _NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S) continue @@ -1905,7 +2092,8 @@ def _http_gate_pull_loop() -> None: if not normalized: continue - peer_key = resolve_peer_key_for_url(normalized) + sender_url = _local_infonet_peer_url() + peer_key = resolve_peer_key_for_url(sender_url) if not peer_key: continue @@ -1925,14 +2113,21 @@ def _http_gate_pull_loop() -> None: discovery_hmac = _hmac_pull.new(peer_key, discovery_body, _hashlib_pull.sha256).hexdigest() timeout = int(get_settings().MESH_RELAY_PUSH_TIMEOUT_S or 10) - resp = _requests.post( - f"{normalized}/api/mesh/gate/peer-pull", - data=discovery_body, - headers={ + proxies = _infonet_peer_requests_proxies(normalized) + discovery_kwargs: dict[str, Any] = { + "data": discovery_body, + "headers": { "Content-Type": "application/json", + "X-Peer-Url": sender_url, "X-Peer-HMAC": discovery_hmac, }, - timeout=timeout, + "timeout": timeout, + } + if proxies: + discovery_kwargs["proxies"] = proxies + resp = _requests.post( + f"{normalized}/api/mesh/gate/peer-pull", + **discovery_kwargs, ) if resp.status_code != 200: continue @@ -1962,14 +2157,20 @@ def _http_gate_pull_loop() -> None: pull_hmac = _hmac_pull.new(peer_key, pull_body, _hashlib_pull.sha256).hexdigest() - pull_resp = _requests.post( - f"{normalized}/api/mesh/gate/peer-pull", - data=pull_body, - headers={ + pull_kwargs: dict[str, Any] = { + "data": pull_body, + "headers": { "Content-Type": "application/json", + "X-Peer-Url": sender_url, "X-Peer-HMAC": pull_hmac, }, - timeout=timeout, + "timeout": timeout, + } + if proxies: + pull_kwargs["proxies"] = proxies + pull_resp = _requests.post( + f"{normalized}/api/mesh/gate/peer-pull", + **pull_kwargs, ) if pull_resp.status_code != 200: continue @@ -2020,7 +2221,7 @@ def _http_gate_push_loop() -> None: # Issue #256: per-peer key resolution; see _http_peer_push_loop. - peers = authenticated_push_peer_urls() + peers = _filter_infonet_peer_urls(authenticated_push_peer_urls()) if not peers: _NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S) continue @@ -2033,7 +2234,8 @@ def _http_gate_push_loop() -> None: if not normalized: continue - peer_key = resolve_peer_key_for_url(normalized) + sender_url = _local_infonet_peer_url() + peer_key = resolve_peer_key_for_url(sender_url) if not peer_key: continue @@ -2064,14 +2266,21 @@ def _http_gate_push_loop() -> None: hmac_hex = _hmac_mod3.new(peer_key, body_bytes, _hashlib_mod3.sha256).hexdigest() timeout = int(get_settings().MESH_RELAY_PUSH_TIMEOUT_S or 10) - resp = _requests.post( - f"{normalized}/api/mesh/gate/peer-push", - data=body_bytes, - headers={ + proxies = _infonet_peer_requests_proxies(normalized) + request_kwargs: dict[str, Any] = { + "data": body_bytes, + "headers": { "Content-Type": "application/json", + "X-Peer-Url": sender_url, "X-Peer-HMAC": hmac_hex, }, - timeout=timeout, + "timeout": timeout, + } + if proxies: + request_kwargs["proxies"] = proxies + resp = _requests.post( + f"{normalized}/api/mesh/gate/peer-push", + **request_kwargs, ) if resp.status_code == 200: peer_counts[gate_id] = last + len(batch) @@ -2413,32 +2622,8 @@ async def lifespan(app: FastAPI): daemon=True, name="wormhole-startup-sync", ).start() - try: - from services.mesh.mesh_hashchain import register_public_event_append_hook - _materialize_local_infonet_state() - _refresh_node_peer_store() - if _node_runtime_supported(): - if not _participant_node_enabled(): - logger.info("Infonet participant auto-enabled for private seed sync") - _set_participant_node_enabled(True) - threading.Thread( - target=lambda: _ensure_infonet_private_transport_ready("startup"), - daemon=True, - name="infonet-private-transport-warmup", - ).start() - _NODE_SYNC_STOP.clear() - threading.Thread(target=_public_infonet_sync_loop, daemon=True).start() - _kick_public_sync_background("startup") - threading.Thread(target=_http_peer_push_loop, daemon=True).start() - threading.Thread(target=_http_gate_push_loop, daemon=True).start() - threading.Thread(target=_http_gate_pull_loop, daemon=True).start() - global _NODE_PUBLIC_EVENT_HOOK_REGISTERED - if not _NODE_PUBLIC_EVENT_HOOK_REGISTERED: - register_public_event_append_hook(_schedule_public_event_propagation) - _NODE_PUBLIC_EVENT_HOOK_REGISTERED = True - except Exception as e: - logger.warning(f"Node bootstrap runtime failed to initialize: {e}") + _start_infonet_node_runtime("startup") if not _MESH_ONLY: # Prime the static route/airport database from vrs-standing-data.adsb.lol @@ -2677,6 +2862,91 @@ def _redact_public_event(event: dict) -> dict: return _redact_vote_gate(_redact_key_rotate_payload(_redact_gate_metadata(event))) +def _is_loopback_host(host: str) -> bool: + value = str(host or "").strip().lower() + if not value: + return False + if value.startswith("[") and "]" in value: + value = value[1 : value.index("]")] + if ":" in value and value.count(":") == 1: + value = value.rsplit(":", 1)[0] + if value in {"localhost", "ip6-localhost"}: + return True + try: + return ipaddress.ip_address(value).is_loopback + except ValueError: + return False + + +def _is_onion_host(host: str) -> bool: + value = str(host or "").strip().lower() + if not value: + return False + if ":" in value and value.count(":") == 1: + value = value.rsplit(":", 1)[0] + return value.endswith(".onion") + + +def _forwarded_for_hosts(request) -> list[str]: + headers = getattr(request, "headers", {}) or {} + hosts: list[str] = [] + x_forwarded_for = str(headers.get("x-forwarded-for", "") or "") + hosts.extend(part.strip() for part in x_forwarded_for.split(",") if part.strip()) + forwarded = str(headers.get("forwarded", "") or "") + for section in forwarded.split(","): + for item in section.split(";"): + key, sep, value = item.strip().partition("=") + if sep and key.strip().lower() == "for": + hosts.append(value.strip().strip('"').strip("[]")) + return hosts + + +def _request_appears_private_infonet_transport(request) -> bool: + """Return whether a sync request is safe to carry private ledger events. + + This is intentionally fail-closed for the private event surface only. A + questionable request still gets public events; gate/DM ciphertext simply + stays out of the response. + """ + if not _infonet_private_transport_required() or request is None: + return False + + forwarded_hosts = _forwarded_for_hosts(request) + if forwarded_hosts and any(not (_is_loopback_host(host) or _is_onion_host(host)) for host in forwarded_hosts): + return False + + client = getattr(request, "client", None) + client_host = str(getattr(client, "host", "") or "") + headers = getattr(request, "headers", {}) or {} + host_header = str(headers.get("host", "") or "") + url_host = str(getattr(getattr(request, "url", None), "hostname", "") or "") + return any( + ( + _is_loopback_host(client_host), + _is_loopback_host(host_header), + _is_loopback_host(url_host), + _is_onion_host(host_header), + _is_onion_host(url_host), + ) + ) + + +def _infonet_sync_response_events(events: list[dict], request=None) -> list[dict]: + """Build the sync event surface for the current transport policy.""" + include_private = _request_appears_private_infonet_transport(request) + response: list[dict] = [] + for event in events: + if not isinstance(event, dict): + continue + event_type = str(event.get("event_type", "") or "") + if event_type in {"gate_message", "dm_message"}: + if include_private: + response.append(dict(event)) + continue + response.append(_redact_public_event(event)) + return response + + def _trusted_gate_reply_to(event: dict) -> str: if not isinstance(event, dict): return "" @@ -5261,32 +5531,15 @@ def _submit_gate_message_envelope(request: Request, gate_id: str, body: dict[str if not cooldown_ok: return {"ok": False, "detail": cooldown_reason} - # Advance sequence counter (replay protection) without appending to - # the public infonet chain — gate messages are private. - try: - from services.mesh.mesh_hashchain import infonet, gate_store - - seq_ok, seq_reason = _validate_private_signed_sequence( - infonet, - sender_id, - sequence, - domain="gate_message", - ) - if not seq_ok: - return {"ok": False, "detail": seq_reason} - except ValueError as exc: - return {"ok": False, "detail": str(exc)} - except Exception: - logger.exception("Failed to advance sequence for gate message") - return {"ok": False, "detail": "Failed to record gate message"} - gate_manager.record_message(gate_id) _record_gate_post_cooldown(sender_id, gate_id) logger.info("Encrypted gate message accepted on obfuscated gate plane") - # Build gate event and store in gate_store (private — not on public chain). + # Build and commit the encrypted gate event to the private Infonet ledger. + # The main hashchain is the durable propagation surface; gate_store is the + # local materialized view used by the existing decrypt/UI path. try: - from services.mesh.mesh_hashchain import _private_gate_event_id + from services.mesh.mesh_hashchain import infonet import time as _time store_payload = dict(gate_payload) @@ -5308,19 +5561,24 @@ def _submit_gate_message_envelope(request: Request, gate_id: str, body: dict[str "public_key_algo": public_key_algo, "protocol_version": protocol_version or PROTOCOL_VERSION, } - gate_event["event_id"] = _private_gate_event_id(gate_id, sender_id, sequence, gate_event) + gate_event = infonet.append_private_gate_message( + node_id=sender_id, + payload=store_payload, + signature=signature, + sequence=sequence, + public_key=public_key, + public_key_algo=public_key_algo, + protocol_version=protocol_version or PROTOCOL_VERSION, + timestamp=float(gate_event.get("timestamp", 0) or 0), + ) + except ValueError as exc: + return {"ok": False, "detail": str(exc)} except Exception: - logger.exception("Failed to prepare private gate message for queued release") + logger.exception("Failed to append gate message to private Infonet ledger") return {"ok": False, "detail": "Failed to record gate message"} - # Append to the local gate_store immediately. The gate_store is a - # per-node persistent ciphertext chain; writing to it is a local - # operation with no network dependency. Previously this happened only - # inside the release worker's attempt_private_release path, which - # meant messages sat in the outbox — invisible to the author and the - # gate UI — until the transport tier reached the release floor. - # Decoupling local visibility from network fan-out: append locally now, - # queue the release for network propagation when the lane is ready. + # Append to the local gate_store immediately so the author sees the same + # materialized gate view that peers will hydrate after private sync. try: from services.mesh.mesh_hashchain import gate_store @@ -5447,7 +5705,7 @@ async def infonet_locator(request: Request, limit: int = Query(32, ge=4, le=128) @app.post("/api/mesh/infonet/sync") -@limiter.limit("30/minute") +@limiter.limit(_INFONET_SYNC_RATE_LIMIT) @mesh_write_exempt(MeshWriteExemption.PEER_GOSSIP) async def infonet_sync_post( request: Request, @@ -5500,8 +5758,7 @@ async def infonet_sync_post( elif matched_hash == GENESIS_HASH and len(locator) > 1: forked = True - # Filter out legacy gate_message events — not part of the public sync surface. - events = [_redact_public_event(e) for e in events if e.get("event_type") != "gate_message"] + events = _infonet_sync_response_events(events, request=request) response = { "events": events, @@ -5564,7 +5821,7 @@ async def mesh_rns_status(request: Request): @app.get("/api/mesh/infonet/sync") -@limiter.limit("30/minute") +@limiter.limit(_INFONET_SYNC_RATE_LIMIT) async def infonet_sync( request: Request, after_hash: str = "", @@ -5602,8 +5859,7 @@ async def infonet_sync( ) base = after_hash or GENESIS_HASH events = infonet.get_events_after(base, limit=limit) - # Filter out legacy gate_message events — not part of the public sync surface. - events = [_redact_public_event(e) for e in events if e.get("event_type") != "gate_message"] + events = _infonet_sync_response_events(events, request=request) return { "events": events, "after_hash": base, @@ -5642,6 +5898,7 @@ async def infonet_ingest(request: Request): result = infonet.ingest_events(events) _hydrate_gate_store_from_chain(events) + _hydrate_dm_relay_from_chain(events) return {"ok": True, **result} @@ -5682,6 +5939,7 @@ async def infonet_peer_push(request: Request): result = infonet.ingest_events(events) _hydrate_gate_store_from_chain(events) + _hydrate_dm_relay_from_chain(events) return {"ok": True, **result} @@ -6241,6 +6499,12 @@ async def infonet_event(request: Request, event_id: str): ) return _strip_gate_for_access(evt, access) return {"ok": False, "detail": "Event not found"} + if evt.get("event_type") == "dm_message": + return await _private_plane_refusal_response( + request, + status_code=403, + payload=_private_plane_access_denied_payload(), + ) if evt.get("event_type") == "gate_message": gate_id = str(evt.get("payload", {}).get("gate", "") or evt.get("gate", "") or "").strip() access = _verify_gate_access(request, gate_id) if gate_id else "" @@ -6265,7 +6529,7 @@ async def infonet_node_events( from services.mesh.mesh_hashchain import infonet events = infonet.get_events_by_node(node_id, limit=limit) - events = [e for e in events if e.get("event_type") != "gate_message"] + events = [e for e in events if e.get("event_type") not in {"gate_message", "dm_message"}] events = [_redact_public_event(e) for e in infonet.decorate_events(events)] events = _redact_public_node_history( events, @@ -6290,7 +6554,7 @@ async def infonet_events_by_type( else: events = list(reversed(infonet.events)) events = events[offset : offset + limit] - events = [e for e in events if e.get("event_type") != "gate_message"] + events = [e for e in events if e.get("event_type") not in {"gate_message", "dm_message"}] events = [_redact_public_event(e) for e in infonet.decorate_events(events)] return { "events": events, @@ -7028,6 +7292,7 @@ async def _dm_send_from_signed_request(request: Request): relay_salt_hex = str(body.get("relay_salt", "") or "").strip().lower() msg_id = str(body.get("msg_id", "")).strip() timestamp = _safe_int(body.get("timestamp", 0) or 0) + sequence = _safe_int(body.get("sequence", 0) or 0) nonce = str(body.get("nonce", "")).strip() if not sender_id or not recipient_id or not ciphertext or not msg_id or not timestamp: @@ -7101,7 +7366,7 @@ async def _dm_send_from_signed_request(request: Request): ok_seq, seq_reason = _validate_private_signed_sequence( infonet, sender_id, - int(body.get("sequence", 0) or 0), + sequence, domain="dm_send", ) if not ok_seq: @@ -7135,7 +7400,47 @@ async def _dm_send_from_signed_request(request: Request): "sender_seal": sender_seal, "relay_salt": relay_salt_hex, } + hashchain_spool: dict[str, Any] = {"ok": False, "detail": "not attempted"} + try: + from services.mesh.mesh_hashchain import infonet + + chain_payload = dict(prepared.payload if prepared is not None else {}) + if not chain_payload: + chain_payload = { + "recipient_id": recipient_id, + "delivery_class": delivery_class, + "recipient_token": recipient_token if delivery_class == "shared" else "", + "ciphertext": ciphertext, + "msg_id": msg_id, + "timestamp": timestamp, + "format": payload_format, + } + chain_payload["transport_lock"] = "private_strong" + chain_event = infonet.append_private_dm_message( + node_id=sender_id, + payload=chain_payload, + signature=str(prepared.signature if prepared is not None else body.get("signature", "") or ""), + sequence=sequence, + public_key=str(prepared.public_key if prepared is not None else body.get("public_key", "") or ""), + public_key_algo=str( + prepared.public_key_algo if prepared is not None else body.get("public_key_algo", "") or "" + ), + protocol_version=str( + prepared.protocol_version if prepared is not None else body.get("protocol_version", "") or "" + ) + or PROTOCOL_VERSION, + timestamp=float(timestamp or time.time()), + ) + _hydrate_dm_relay_from_chain([chain_event]) + hashchain_spool = { + "ok": True, + "event_id": str(chain_event.get("event_id", "") or ""), + "limit": 2, + } + except Exception as exc: + hashchain_spool = {"ok": False, "detail": str(exc) or type(exc).__name__} queued_result = _queue_dm_release(current_tier=tier, payload=release_payload) + queued_result["hashchain_spool"] = hashchain_spool if transport_upgrade_pending: queued_result["private_transport_pending"] = True return queued_result @@ -9111,6 +9416,11 @@ async def api_get_node_settings(request: Request): async def api_set_node_settings(request: Request, body: NodeSettingsUpdate): _refresh_node_peer_store() if bool(body.enabled): + if _infonet_private_transport_required() and not _ensure_infonet_private_transport_ready("operator_enable"): + return JSONResponse( + {"ok": False, "detail": _infonet_private_transport_error()}, + status_code=503, + ) try: from services.transport_lane_isolation import disable_public_mesh_lane @@ -9119,6 +9429,7 @@ async def api_set_node_settings(request: Request, body: NodeSettingsUpdate): logger.warning("Failed to disable public Mesh while enabling private node: %s", exc) result = _set_participant_node_enabled(bool(body.enabled)) if bool(body.enabled): + _start_infonet_node_runtime("operator_enable") _kick_public_sync_background("operator_enable") return result diff --git a/backend/routers/mesh_peer_sync.py b/backend/routers/mesh_peer_sync.py index 4427803..73fbefd 100644 --- a/backend/routers/mesh_peer_sync.py +++ b/backend/routers/mesh_peer_sync.py @@ -55,6 +55,12 @@ def _hydrate_gate_store_from_chain(events: list) -> int: return count +def _hydrate_dm_relay_from_chain(events: list) -> int: + import main as _m + + return int(_m._hydrate_dm_relay_from_chain(events)) + + @router.post("/api/mesh/infonet/peer-push") @limiter.limit("30/minute") async def infonet_peer_push(request: Request): @@ -82,6 +88,7 @@ async def infonet_peer_push(request: Request): return {"ok": True, "accepted": 0, "duplicates": 0, "rejected": []} result = infonet.ingest_events(events) _hydrate_gate_store_from_chain(events) + _hydrate_dm_relay_from_chain(events) return {"ok": True, **result} diff --git a/backend/routers/mesh_public.py b/backend/routers/mesh_public.py index 5fe7517..e88f8dd 100644 --- a/backend/routers/mesh_public.py +++ b/backend/routers/mesh_public.py @@ -65,6 +65,7 @@ from services.mesh.mesh_signed_events import ( logger = logging.getLogger(__name__) router = APIRouter() +_INFONET_SYNC_RATE_LIMIT = "600/minute" def _signed_body(request: Request) -> dict[str, Any]: @@ -263,6 +264,19 @@ def _redact_public_event(event: dict) -> dict: return _redact_vote_gate(_redact_key_rotate_payload(_redact_gate_metadata(event))) +def _infonet_private_transport_required() -> bool: + import main as _m + + return bool(_m._infonet_private_transport_required()) + + +def _infonet_sync_response_events(events: list[dict], request=None) -> list[dict]: + """Build the sync event surface for the current transport policy.""" + import main as _m + + return _m._infonet_sync_response_events(events, request=request) + + def _trusted_gate_reply_to(event: dict) -> str: if not isinstance(event, dict): return "" @@ -574,6 +588,12 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int: pass return count + +def _hydrate_dm_relay_from_chain(events: list[dict]) -> int: + import main as _m + + return int(_m._hydrate_dm_relay_from_chain(events)) + # --- Safe type helpers --- def _safe_int(val, default=0): @@ -1531,7 +1551,7 @@ async def infonet_locator(request: Request, limit: int = Query(32, ge=4, le=128) @router.post("/api/mesh/infonet/sync") -@limiter.limit("30/minute") +@limiter.limit(_INFONET_SYNC_RATE_LIMIT) @mesh_write_exempt(MeshWriteExemption.PEER_GOSSIP) async def infonet_sync_post( request: Request, @@ -1584,8 +1604,7 @@ async def infonet_sync_post( elif matched_hash == GENESIS_HASH and len(locator) > 1: forked = True - # Filter out legacy gate_message events — not part of the public sync surface. - events = [_redact_public_event(e) for e in events if e.get("event_type") != "gate_message"] + events = _infonet_sync_response_events(events, request=request) response = { "events": events, @@ -1646,7 +1665,7 @@ async def mesh_rns_status(request: Request): @router.get("/api/mesh/infonet/sync") -@limiter.limit("30/minute") +@limiter.limit(_INFONET_SYNC_RATE_LIMIT) async def infonet_sync( request: Request, after_hash: str = "", @@ -1684,8 +1703,7 @@ async def infonet_sync( ) base = after_hash or GENESIS_HASH events = infonet.get_events_after(base, limit=limit) - # Filter out legacy gate_message events — not part of the public sync surface. - events = [_redact_public_event(e) for e in events if e.get("event_type") != "gate_message"] + events = _infonet_sync_response_events(events, request=request) return { "events": events, "after_hash": base, @@ -1724,6 +1742,7 @@ async def infonet_ingest(request: Request): result = infonet.ingest_events(events) _hydrate_gate_store_from_chain(events) + _hydrate_dm_relay_from_chain(events) return {"ok": True, **result} @@ -2279,6 +2298,12 @@ async def infonet_event(request: Request, event_id: str): ) return _strip_gate_for_access(evt, access) return {"ok": False, "detail": "Event not found"} + if evt.get("event_type") == "dm_message": + return await _private_plane_refusal_response( + request, + status_code=403, + payload=_private_plane_access_denied_payload(), + ) if evt.get("event_type") == "gate_message": gate_id = str(evt.get("payload", {}).get("gate", "") or evt.get("gate", "") or "").strip() access = _verify_gate_access(request, gate_id) if gate_id else "" @@ -2303,7 +2328,7 @@ async def infonet_node_events( from services.mesh.mesh_hashchain import infonet events = infonet.get_events_by_node(node_id, limit=limit) - events = [e for e in events if e.get("event_type") != "gate_message"] + events = [e for e in events if e.get("event_type") not in {"gate_message", "dm_message"}] events = [_redact_public_event(e) for e in infonet.decorate_events(events)] events = _redact_public_node_history( events, @@ -2328,7 +2353,7 @@ async def infonet_events_by_type( else: events = list(reversed(infonet.events)) events = events[offset : offset + limit] - events = [e for e in events if e.get("event_type") != "gate_message"] + events = [e for e in events if e.get("event_type") not in {"gate_message", "dm_message"}] events = [_redact_public_event(e) for e in infonet.decorate_events(events)] return { "events": events, diff --git a/backend/services/config.py b/backend/services/config.py index 46d3f6c..a032262 100644 --- a/backend/services/config.py +++ b/backend/services/config.py @@ -32,6 +32,7 @@ class Settings(BaseSettings): MESH_ARTI_ENABLED: bool = False MESH_ARTI_SOCKS_PORT: int = 9050 MESH_RELAY_PEERS: str = "" + MESH_PUBLIC_PEER_URL: str = "" # Bootstrap seeds are discovery hints, not authoritative network roots. # Nodes promote healthy discovered peers from the store/manifest over time. MESH_BOOTSTRAP_SEED_PEERS: str = "http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000" diff --git a/backend/services/mesh/mesh_hashchain.py b/backend/services/mesh/mesh_hashchain.py index ddb3a42..c7420ad 100644 --- a/backend/services/mesh/mesh_hashchain.py +++ b/backend/services/mesh/mesh_hashchain.py @@ -33,8 +33,9 @@ Each event contains: Persistence: JSON file at backend/data/infonet.json -Encrypted gate chat events are intentionally kept off the public chain and -persisted separately via GateMessageStore. +Encrypted gate chat events are private-chain ciphertext records. They are +excluded from public read surfaces and replicated only over private Infonet +transports. """ import json @@ -64,6 +65,8 @@ from services.mesh.mesh_schema import ( ACTIVE_PUBLIC_LEDGER_EVENT_TYPES, PUBLIC_LEDGER_EVENT_TYPES, validate_event_payload, + validate_private_dm_ledger_payload, + validate_private_gate_ledger_payload, validate_protocol_fields, validate_public_ledger_payload, ) @@ -127,6 +130,12 @@ GATE_SEGMENT_MAX_COMPRESSED_BYTES = max( int(os.environ.get("MESH_GATE_SEGMENT_MAX_COMPRESSED_BYTES", str(2 * 1024 * 1024)) or str(2 * 1024 * 1024)), ) GATE_SEGMENT_STORAGE_VERSION = 1 +DM_HASHCHAIN_SPOOL_LIMIT = max(1, int(os.environ.get("MESH_DM_HASHCHAIN_SPOOL_LIMIT", "2") or "2")) +DM_HASHCHAIN_SPOOL_SENDER_LIMIT = max( + 1, + int(os.environ.get("MESH_DM_HASHCHAIN_SPOOL_SENDER_LIMIT", "1") or "1"), +) +DM_HASHCHAIN_SPOOL_TTL_S = max(60, int(os.environ.get("MESH_DM_HASHCHAIN_SPOOL_TTL_S", "3600") or "3600")) _PUBLIC_EVENT_APPEND_HOOKS: list[Any] = [] _PUBLIC_EVENT_APPEND_HOOKS_LOCK = threading.Lock() @@ -340,6 +349,32 @@ def _private_gate_event_id( ).hexdigest() +def _private_gate_signature_payload_variants(gate_id: str, event: dict[str, Any]) -> list[dict[str, Any]]: + payload = _private_gate_signature_payload(gate_id, event) + variants: list[dict[str, Any]] = [payload] + event_payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} + reply_to = str(event_payload.get("reply_to", "") or "").strip() + if reply_to: + variants.append(_private_gate_signature_payload(gate_id, event, include_reply_to=False)) + if "epoch" in payload: + no_epoch = dict(payload) + no_epoch.pop("epoch", None) + variants.append(no_epoch) + if reply_to: + no_epoch_no_reply = _private_gate_signature_payload(gate_id, event, include_reply_to=False) + no_epoch_no_reply.pop("epoch", None) + variants.append(no_epoch_no_reply) + deduped: list[dict[str, Any]] = [] + seen: set[str] = set() + for variant in variants: + material = json.dumps(variant, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + if material in seen: + continue + seen.add(material) + deduped.append(variant) + return deduped + + def _sanitize_private_gate_event(gate_id: str, event: dict[str, Any]) -> dict[str, Any]: payload = event.get("payload") if isinstance(event.get("payload"), dict) else {} sanitized = { @@ -1568,11 +1603,18 @@ class Infonet: def _rebuild_state(self) -> None: self.event_index = {} self.node_sequences = {} - # Keep private signed-write replay domains across public-chain - # rebuilds; these domains protect local side effects that are not - # represented as public Infonet events. - if not isinstance(getattr(self, "sequence_domains", None), dict): - self.sequence_domains = {} + # Keep private signed-write replay domains that are not represented + # on-chain, but rebuild the gate_message sequence domain from chain + # events so reloads/fork application do not mix it with public + # per-node message sequences. + preserved_domains = {} + if isinstance(getattr(self, "sequence_domains", None), dict): + preserved_domains = { + key: value + for key, value in self.sequence_domains.items() + if not str(key or "").endswith("|gate_message") + } + self.sequence_domains = dict(preserved_domains) self.public_key_bindings = {} self.revocations = {} self._replay_filter = ReplayFilter() @@ -1584,9 +1626,12 @@ class Infonet: node_id = evt.get("node_id", "") sequence = _safe_int(evt.get("sequence", 0) or 0, 0) if node_id and sequence: - last = self.node_sequences.get(node_id, 0) + sequence_table, sequence_key = self._sequence_table_for_event( + evt.get("event_type", ""), node_id + ) + last = sequence_table.get(sequence_key, 0) if sequence > last: - self.node_sequences[node_id] = sequence + sequence_table[sequence_key] = sequence public_key = str(evt.get("public_key", "") or "") if public_key and node_id: existing = self.public_key_bindings.get(public_key) @@ -1898,6 +1943,295 @@ class Infonet: self._save() return True, "ok" + def _sequence_table_for_event(self, event_type: str, node_id: str) -> tuple[dict[str, int], str]: + normalized = str(event_type or "").strip().lower() + if normalized == "gate_message": + return self.sequence_domains, f"{node_id}|gate_message" + if normalized == "dm_message": + return self.sequence_domains, f"{node_id}|dm_message" + return self.node_sequences, node_id + + def _dm_spool_target_key(self, payload: dict[str, Any]) -> tuple[str, str]: + delivery_class = str(payload.get("delivery_class", "") or "").strip().lower() + if delivery_class == "shared": + key = str(payload.get("recipient_token", "") or "").strip() + else: + key = str(payload.get("recipient_id", "") or "").strip() + return delivery_class, key + + def _dm_spool_active_counts( + self, + payload: dict[str, Any], + *, + sender_id: str = "", + now: float | None = None, + ) -> tuple[int, int]: + delivery_class, key = self._dm_spool_target_key(payload) + if not key: + return 0, 0 + sender_id = str(sender_id or "").strip() + current = time.time() if now is None else float(now) + total_count = 0 + sender_count = 0 + for evt in reversed(self.events): + if evt.get("event_type") != "dm_message": + continue + evt_payload = evt.get("payload") if isinstance(evt.get("payload"), dict) else {} + evt_delivery_class, evt_key = self._dm_spool_target_key(evt_payload) + if evt_delivery_class != delivery_class: + continue + if evt_key != key: + continue + evt_ts = float(evt_payload.get("timestamp", evt.get("timestamp", 0)) or 0) + if evt_ts > 0 and current - evt_ts > DM_HASHCHAIN_SPOOL_TTL_S: + continue + total_count += 1 + if sender_id and str(evt.get("node_id", "") or "").strip() == sender_id: + sender_count += 1 + if total_count >= DM_HASHCHAIN_SPOOL_LIMIT and ( + not sender_id or sender_count >= DM_HASHCHAIN_SPOOL_SENDER_LIMIT + ): + break + return total_count, sender_count + + def _dm_spool_active_count(self, payload: dict[str, Any], *, now: float | None = None) -> int: + total_count, _sender_count = self._dm_spool_active_counts(payload, now=now) + return total_count + + def append_private_dm_message( + self, + *, + node_id: str, + payload: dict, + signature: str, + sequence: int, + public_key: str, + public_key_algo: str, + protocol_version: str = "", + timestamp: float = 0, + ) -> dict: + """Append an encrypted DM dead-drop message to the private Infonet ledger. + + The event is a small offline spool, capped per mailbox target, so the + hashchain can carry a couple of sealed DMs without becoming an + unbounded global mailbox. + """ + event_type = "dm_message" + if sequence <= 0: + raise ValueError("sequence is required and must be > 0") + sequence_table, sequence_key = self._sequence_table_for_event(event_type, node_id) + last = sequence_table.get(sequence_key, 0) + if sequence <= last: + raise ValueError(f"Replay detected: sequence {sequence} <= last {last}") + + raw_payload = dict(payload or {}) + if "message" in raw_payload or "plaintext" in raw_payload or "_local_plaintext" in raw_payload: + raise ValueError("private DM ledger payload must not contain plaintext") + if str(raw_payload.get("transport_lock", "") or "").strip().lower() != "private_strong": + raise ValueError("DM hashchain spool requires private_strong transport_lock") + + payload = normalize_payload(event_type, raw_payload) + ok, reason = validate_private_dm_ledger_payload(payload) + if not ok: + raise ValueError(reason) + total_count, sender_count = self._dm_spool_active_counts(payload, sender_id=node_id) + if sender_count >= DM_HASHCHAIN_SPOOL_SENDER_LIMIT: + raise ValueError("DM hashchain sender spool full for recipient") + if total_count >= DM_HASHCHAIN_SPOOL_LIMIT: + raise ValueError("DM hashchain spool full for recipient") + + payload_json = json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + if len(payload_json.encode("utf-8")) > MAX_PAYLOAD_BYTES: + raise ValueError("payload exceeds max size") + + protocol_version = str(protocol_version or PROTOCOL_VERSION) + ok, reason = validate_protocol_fields(protocol_version, NETWORK_ID) + if not ok: + raise ValueError(reason) + + if not (signature and public_key and public_key_algo): + raise ValueError("Missing signature fields") + algo = parse_public_key_algo(public_key_algo) + if not algo: + raise ValueError("Unsupported public_key_algo") + if not verify_node_binding(node_id, public_key): + raise ValueError("node_id mismatch") + bound, bind_reason = self._bind_public_key(public_key, node_id) + if not bound: + raise ValueError(bind_reason) + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=sequence, + payload=payload, + ) + if not verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + raise ValueError("Invalid signature") + + revoked, _info = self._revocation_status(public_key) + if revoked: + raise ValueError("public key is revoked") + + event = ChainEvent( + prev_hash=self.head_hash, + event_type=event_type, + node_id=node_id, + payload=payload, + timestamp=float(timestamp or time.time()), + sequence=sequence, + signature=signature, + public_key=public_key, + public_key_algo=public_key_algo, + protocol_version=protocol_version, + ) + event_dict = event.to_dict() + self._write_wal(event_dict) + self.events.append(event_dict) + self.event_index[event.event_id] = len(self.events) - 1 + self.head_hash = event.event_id + sequence_table[sequence_key] = sequence + self._replay_filter.add(event.event_id) + self._invalidate_merkle_cache() + self._update_counters_for_event(event_dict) + self._save() + + try: + from services.mesh.mesh_rns import rns_bridge + + rns_bridge.publish_event(event_dict) + except Exception: + pass + _notify_public_event_append_hooks(event_dict) + logger.info( + f"Infonet append [dm_message] by {_redact_node(node_id)} seq={sequence} " + f"id={event.event_id[:16]}..." + ) + return event_dict + + def append_private_gate_message( + self, + *, + node_id: str, + payload: dict, + signature: str, + sequence: int, + public_key: str, + public_key_algo: str, + protocol_version: str = "", + timestamp: float = 0, + ) -> dict: + """Append an encrypted gate message to the private Infonet ledger. + + Gate messages use their own sequence domain so a gate post cannot + consume or replay-block the author's public broadcast sequence. + """ + event_type = "gate_message" + if sequence <= 0: + raise ValueError("sequence is required and must be > 0") + sequence_table, sequence_key = self._sequence_table_for_event(event_type, node_id) + last = sequence_table.get(sequence_key, 0) + if sequence <= last: + raise ValueError(f"Replay detected: sequence {sequence} <= last {last}") + + raw_payload = dict(payload or {}) + if "message" in raw_payload or "_local_plaintext" in raw_payload or "_local_reply_to" in raw_payload: + raise ValueError("private gate ledger payload must not contain plaintext") + if str(raw_payload.get("transport_lock", "") or "").strip().lower() != "private_strong": + raise ValueError("gate messages require private_strong transport_lock") + + payload = normalize_payload(event_type, raw_payload) + ok, reason = validate_private_gate_ledger_payload(payload) + if not ok: + raise ValueError(reason) + + payload_json = json.dumps(payload, sort_keys=True, separators=(",", ":"), ensure_ascii=False) + if len(payload_json.encode("utf-8")) > MAX_PAYLOAD_BYTES: + raise ValueError("payload exceeds max size") + + protocol_version = str(protocol_version or PROTOCOL_VERSION) + ok, reason = validate_protocol_fields(protocol_version, NETWORK_ID) + if not ok: + raise ValueError(reason) + + if not (signature and public_key and public_key_algo): + raise ValueError("Missing signature fields") + algo = parse_public_key_algo(public_key_algo) + if not algo: + raise ValueError("Unsupported public_key_algo") + if not verify_node_binding(node_id, public_key): + raise ValueError("node_id mismatch") + bound, bind_reason = self._bind_public_key(public_key, node_id) + if not bound: + raise ValueError(bind_reason) + event_for_signature = {"payload": payload} + signature_ok = False + for signature_payload in _private_gate_signature_payload_variants( + str(payload.get("gate", "") or ""), + event_for_signature, + ): + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=sequence, + payload=signature_payload, + ) + if verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + signature_ok = True + break + if not signature_ok: + raise ValueError("Invalid signature") + + revoked, _info = self._revocation_status(public_key) + if revoked: + raise ValueError("public key is revoked") + + event = ChainEvent( + prev_hash=self.head_hash, + event_type=event_type, + node_id=node_id, + payload=payload, + timestamp=float(timestamp or time.time()), + sequence=sequence, + signature=signature, + public_key=public_key, + public_key_algo=public_key_algo, + protocol_version=protocol_version, + ) + event_dict = event.to_dict() + self._write_wal(event_dict) + self.events.append(event_dict) + self.event_index[event.event_id] = len(self.events) - 1 + self.head_hash = event.event_id + sequence_table[sequence_key] = sequence + self._replay_filter.add(event.event_id) + self._invalidate_merkle_cache() + self._update_counters_for_event(event_dict) + self._save() + + try: + from services.mesh.mesh_rns import rns_bridge + + rns_bridge.publish_event(event_dict) + except Exception: + pass + _notify_public_event_append_hooks(event_dict) + + logger.info( + f"Infonet append [gate_message] by {_redact_node(node_id)} seq={sequence} " + f"id={event.event_id[:16]}..." + ) + return event_dict + def append( self, event_type: str, @@ -2078,6 +2412,18 @@ class Infonet: if not event_id or not prev_hash: rejected.append({"index": idx, "reason": "Missing event_id or prev_hash"}) continue + if event_id in self.event_index: + duplicates += 1 + continue + if self._replay_filter.seen(event_id): + try: + from services.mesh.mesh_metrics import increment as metrics_inc + + metrics_inc("ingest_replay_seen") + except Exception: + pass + duplicates += 1 + continue if prev_hash != expected_prev: try: from services.mesh.mesh_metrics import increment as metrics_inc @@ -2096,25 +2442,14 @@ class Infonet: pass rejected.append({"index": idx, "reason": "network_id mismatch"}) continue - if event_id in self.event_index: - duplicates += 1 - continue - if self._replay_filter.seen(event_id): - try: - from services.mesh.mesh_metrics import increment as metrics_inc - - metrics_inc("ingest_replay_seen") - except Exception: - pass - duplicates += 1 - continue if prev_hash != self.head_hash: rejected.append({"index": idx, "reason": "prev_hash does not match head"}) continue if sequence <= 0: rejected.append({"index": idx, "reason": "Invalid sequence"}) continue - last = self.node_sequences.get(node_id, 0) + sequence_table, sequence_key = self._sequence_table_for_event(event_type, node_id) + last = sequence_table.get(sequence_key, 0) if sequence <= last: rejected.append({"index": idx, "reason": "Replay detected"}) continue @@ -2149,7 +2484,18 @@ class Infonet: if not ok: rejected.append({"index": idx, "reason": reason}) continue - ok, reason = validate_public_ledger_payload(event_type, payload) + if event_type == "gate_message": + ok, reason = validate_private_gate_ledger_payload(payload) + elif event_type == "dm_message": + ok, reason = validate_private_dm_ledger_payload(payload) + if ok: + total_count, sender_count = self._dm_spool_active_counts(payload, sender_id=str(evt.get("node_id", "") or "")) + if sender_count >= DM_HASHCHAIN_SPOOL_SENDER_LIMIT: + ok, reason = False, "DM hashchain sender spool full for recipient" + elif total_count >= DM_HASHCHAIN_SPOOL_LIMIT: + ok, reason = False, "DM hashchain spool full for recipient" + else: + ok, reason = validate_public_ledger_payload(event_type, payload) if not ok: rejected.append({"index": idx, "reason": reason}) continue @@ -2225,7 +2571,7 @@ class Infonet: pass rejected.append({"index": idx, "reason": "public key is revoked"}) continue - last_seq = self.node_sequences.get(node_id, 0) + last_seq = sequence_table.get(sequence_key, 0) if sequence <= last_seq: try: from services.mesh.mesh_metrics import increment as metrics_inc @@ -2261,18 +2607,30 @@ class Infonet: rejected.append({"index": idx, "reason": bind_reason}) continue - sig_payload = build_signature_payload( - event_type=event_type, - node_id=node_id, - sequence=sequence, - payload=payload, - ) - if not verify_signature( - public_key_b64=public_key, - public_key_algo=public_key_algo, - signature_hex=signature, - payload=sig_payload, - ): + if event_type == "gate_message": + signature_payloads = _private_gate_signature_payload_variants( + str(payload.get("gate", "") or ""), + evt, + ) + else: + signature_payloads = [payload] + signature_ok = False + for signature_payload in signature_payloads: + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=sequence, + payload=signature_payload, + ) + if verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + signature_ok = True + break + if not signature_ok: try: from services.mesh.mesh_metrics import increment as metrics_inc @@ -2302,7 +2660,7 @@ class Infonet: self.events.append(evt) self.event_index[event_id] = len(self.events) - 1 self.head_hash = event_id - self.node_sequences[node_id] = sequence + sequence_table[sequence_key] = sequence self._update_counters_for_event(evt) accepted += 1 expected_prev = event_id @@ -2365,6 +2723,7 @@ class Infonet: verify_node_binding, ) + event_type = evt_dict.get("event_type", "") node_id = evt_dict.get("node_id", "") if not parse_public_key_algo(public_key_algo): return False, f"Unsupported public_key_algo at index {i}" @@ -2375,21 +2734,41 @@ class Infonet: return False, f"public key binding conflict at index {i}" seen_public_keys[public_key] = node_id - normalized = normalize_payload( - evt_dict.get("event_type", ""), evt_dict.get("payload", {}) - ) - sig_payload = build_signature_payload( - event_type=evt_dict.get("event_type", ""), - node_id=node_id, - sequence=_safe_int(evt_dict.get("sequence", 0) or 0, 0), - payload=normalized, - ) - if not verify_signature( - public_key_b64=public_key, - public_key_algo=public_key_algo, - signature_hex=signature, - payload=sig_payload, - ): + payload = evt_dict.get("payload", {}) + if event_type == "gate_message": + ok, reason = validate_private_gate_ledger_payload(payload) + if not ok: + return False, f"Invalid gate_message payload at index {i}: {reason}" + signature_payloads = _private_gate_signature_payload_variants( + str(payload.get("gate", "") or ""), + evt_dict, + ) + elif event_type == "dm_message": + ok, reason = validate_private_dm_ledger_payload(payload) + if not ok: + return False, f"Invalid dm_message payload at index {i}: {reason}" + signature_payloads = [normalize_payload(event_type, payload)] + else: + signature_payloads = [ + normalize_payload(event_type, payload) + ] + signature_ok = False + for signature_payload in signature_payloads: + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=_safe_int(evt_dict.get("sequence", 0) or 0, 0), + payload=signature_payload, + ) + if verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + signature_ok = True + break + if not signature_ok: return False, f"Invalid signature at index {i}" prev = evt_dict["event_id"] @@ -2454,27 +2833,48 @@ class Infonet: verify_node_binding, ) + event_type = evt_dict.get("event_type", "") node_id = evt_dict.get("node_id", "") if not parse_public_key_algo(public_key_algo): return False, f"Unsupported public_key_algo at index {i}" if not verify_node_binding(node_id, public_key): return False, f"node_id mismatch at index {i}" - normalized = normalize_payload( - evt_dict.get("event_type", ""), evt_dict.get("payload", {}) - ) - sig_payload = build_signature_payload( - event_type=evt_dict.get("event_type", ""), - node_id=node_id, - sequence=_safe_int(evt_dict.get("sequence", 0) or 0, 0), - payload=normalized, - ) - if not verify_signature( - public_key_b64=public_key, - public_key_algo=public_key_algo, - signature_hex=signature, - payload=sig_payload, - ): + payload = evt_dict.get("payload", {}) + if event_type == "gate_message": + ok, reason = validate_private_gate_ledger_payload(payload) + if not ok: + return False, f"Invalid gate_message payload at index {i}: {reason}" + signature_payloads = _private_gate_signature_payload_variants( + str(payload.get("gate", "") or ""), + evt_dict, + ) + elif event_type == "dm_message": + ok, reason = validate_private_dm_ledger_payload(payload) + if not ok: + return False, f"Invalid dm_message payload at index {i}: {reason}" + signature_payloads = [normalize_payload(event_type, payload)] + else: + signature_payloads = [ + normalize_payload(event_type, payload) + ] + signature_ok = False + for signature_payload in signature_payloads: + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=_safe_int(evt_dict.get("sequence", 0) or 0, 0), + payload=signature_payload, + ) + if verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + signature_ok = True + break + if not signature_ok: return False, f"Invalid signature at index {i}" prev = evt_dict["event_id"] @@ -2538,7 +2938,14 @@ class Infonet: node_id = evt.get("node_id", "") sequence = _safe_int(evt.get("sequence", 0) or 0, 0) if node_id and sequence: - last_seq[node_id] = max(last_seq.get(node_id, 0), sequence) + sequence_key = ( + f"{node_id}|gate_message" + if str(evt.get("event_type", "") or "").strip().lower() == "gate_message" + else f"{node_id}|dm_message" + if str(evt.get("event_type", "") or "").strip().lower() == "dm_message" + else node_id + ) + last_seq[sequence_key] = max(last_seq.get(sequence_key, 0), sequence) public_key = str(evt.get("public_key", "") or "") if public_key and node_id: seen_public_keys.setdefault(public_key, node_id) @@ -2558,8 +2965,21 @@ class Infonet: existing_idx = self.event_index.get(event_id) if existing_idx is not None and existing_idx <= prev_index: return False, "duplicate event_id" - payload = normalize_payload(event_type, dict(payload or {})) + if event_type == "gate_message": + payload = dict(payload or {}) + elif event_type == "dm_message": + payload = normalize_payload(event_type, dict(payload or {})) + else: + payload = normalize_payload(event_type, dict(payload or {})) ok, reason = validate_event_payload(event_type, payload) + if not ok: + return False, reason + if event_type == "gate_message": + ok, reason = validate_private_gate_ledger_payload(payload) + elif event_type == "dm_message": + ok, reason = validate_private_dm_ledger_payload(payload) + else: + ok, reason = validate_public_ledger_payload(event_type, payload) if not ok: return False, reason proto = evt.get("protocol_version") or PROTOCOL_VERSION @@ -2573,7 +2993,14 @@ class Infonet: revoked, _info = self._revocation_status(public_key) if revoked and event_type != "key_revoke": return False, "public key revoked" - last = last_seq.get(node_id, 0) + sequence_key = ( + f"{node_id}|gate_message" + if event_type == "gate_message" + else f"{node_id}|dm_message" + if event_type == "dm_message" + else node_id + ) + last = last_seq.get(sequence_key, 0) if sequence <= last: return False, "sequence replay" from services.mesh.mesh_crypto import ( @@ -2591,23 +3018,35 @@ class Infonet: if existing and existing != node_id: return False, "public key binding conflict" seen_public_keys[public_key] = node_id - sig_payload = build_signature_payload( - event_type=event_type, - node_id=node_id, - sequence=sequence, - payload=payload, - ) - if not verify_signature( - public_key_b64=public_key, - public_key_algo=public_key_algo, - signature_hex=signature, - payload=sig_payload, - ): + if event_type == "gate_message": + signature_payloads = _private_gate_signature_payload_variants( + str(payload.get("gate", "") or ""), + evt, + ) + else: + signature_payloads = [payload] + signature_ok = False + for signature_payload in signature_payloads: + sig_payload = build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=sequence, + payload=signature_payload, + ) + if verify_signature( + public_key_b64=public_key, + public_key_algo=public_key_algo, + signature_hex=signature, + payload=sig_payload, + ): + signature_ok = True + break + if not signature_ok: return False, "invalid signature" computed = ChainEvent.from_dict(evt).event_id if computed != event_id: return False, "event_id mismatch" - last_seq[node_id] = sequence + last_seq[sequence_key] = sequence # Apply fork self.events = prefix + ordered diff --git a/backend/services/mesh/mesh_infonet_sync_support.py b/backend/services/mesh/mesh_infonet_sync_support.py index d1f4d6e..c8d1f01 100644 --- a/backend/services/mesh/mesh_infonet_sync_support.py +++ b/backend/services/mesh/mesh_infonet_sync_support.py @@ -276,5 +276,6 @@ def should_run_sync( ) -> bool: current_time = int(now if now is not None else time.time()) if state.last_outcome == "running": - return False + started_at = int(state.last_sync_started_at or 0) + return started_at <= 0 or current_time - started_at >= 300 return int(state.next_sync_due_at or 0) <= current_time diff --git a/backend/services/mesh/mesh_schema.py b/backend/services/mesh/mesh_schema.py index b28bce1..b52fb82 100644 --- a/backend/services/mesh/mesh_schema.py +++ b/backend/services/mesh/mesh_schema.py @@ -2,6 +2,9 @@ from __future__ import annotations +import base64 +import binascii +import math from dataclasses import dataclass from typing import Any, Callable @@ -33,6 +36,58 @@ def _require_fields(payload: dict[str, Any], fields: tuple[str, ...]) -> tuple[b return True, "ok" +def _decode_base64ish(value: Any) -> bytes | None: + raw = str(value or "").strip() + if not raw or any(ch.isspace() for ch in raw): + return None + padded = raw + ("=" * (-len(raw) % 4)) + for altchars in (None, b"-_"): + try: + return base64.b64decode(padded.encode("ascii"), altchars=altchars, validate=True) + except (binascii.Error, UnicodeEncodeError, ValueError): + continue + return None + + +def _byte_entropy(data: bytes) -> float: + if not data: + return 0.0 + counts = [0] * 256 + for byte in data: + counts[byte] += 1 + total = float(len(data)) + return -sum((count / total) * math.log2(count / total) for count in counts if count) + + +def _validate_sealed_bytes_field( + payload: dict[str, Any], + field: str, + *, + min_bytes: int = 8, + entropy_floor: float = 2.5, +) -> tuple[bool, str]: + data = _decode_base64ish(payload.get(field, "")) + if data is None: + return False, f"{field} must be base64-encoded sealed bytes" + if len(data) < min_bytes: + return False, f"{field} is too short" + + # Short test vectors and compact envelopes can be low entropy; only apply + # heuristics once there is enough material to distinguish a sealed blob + # from accidental base64-encoded plaintext. + if len(data) >= 32: + printable = sum(1 for byte in data if 32 <= byte <= 126 or byte in (9, 10, 13)) + if printable / len(data) > 0.9: + try: + data.decode("utf-8") + return False, f"{field} looks like encoded plaintext" + except UnicodeDecodeError: + pass + if _byte_entropy(data) < entropy_floor: + return False, f"{field} entropy is too low for sealed bytes" + return True, "ok" + + def _validate_message(payload: dict[str, Any]) -> tuple[bool, str]: ok, reason = _require_fields( payload, ("message", "destination", "channel", "priority", "ephemeral") @@ -331,6 +386,7 @@ ACTIVE_PUBLIC_LEDGER_EVENT_TYPES: frozenset[str] = frozenset( LEGACY_PUBLIC_LEDGER_EVENT_TYPES: frozenset[str] = frozenset( { "gate_message", + "dm_message", } ) """Event types that exist historically on the public chain and must remain @@ -425,6 +481,8 @@ def validate_event_payload(event_type: str, payload: dict[str, Any]) -> tuple[bo def validate_public_ledger_payload(event_type: str, payload: dict[str, Any]) -> tuple[bool, str]: + if event_type == "gate_message": + return validate_private_gate_ledger_payload(payload) if event_type not in PUBLIC_LEDGER_EVENT_TYPES and event_type not in _EXTENSION_VALIDATORS: return False, f"{event_type} is not allowed on the public ledger" forbidden = sorted( @@ -441,6 +499,92 @@ def validate_public_ledger_payload(event_type: str, payload: dict[str, Any]) -> return True, "ok" +_PRIVATE_GATE_LEDGER_ALLOWED_FIELDS: frozenset[str] = frozenset( + { + "gate", + "ciphertext", + "nonce", + "sender_ref", + "format", + "epoch", + "gate_envelope", + "envelope_hash", + "reply_to", + "transport_lock", + "signed_context", + } +) + + +def validate_private_gate_ledger_payload(payload: dict[str, Any]) -> tuple[bool, str]: + """Validate ciphertext-only gate events for private Infonet replication.""" + ok, reason = validate_event_payload("gate_message", payload) + if not ok: + return ok, reason + unexpected = sorted( + key + for key in payload.keys() + if str(key or "").strip().lower() not in _PRIVATE_GATE_LEDGER_ALLOWED_FIELDS + ) + if unexpected: + return False, f"private gate ledger payload contains unsupported fields: {', '.join(unexpected)}" + if "message" in payload or "_local_plaintext" in payload or "_local_reply_to" in payload: + return False, "private gate ledger payload must not contain plaintext" + transport_lock = str(payload.get("transport_lock", "") or "").strip().lower() + if transport_lock and transport_lock not in {"private", "private_strong", "rns", "onion"}: + return False, "gate messages require private transport_lock" + ok, reason = _validate_sealed_bytes_field(payload, "ciphertext") + if not ok: + return ok, reason + ok, reason = _validate_sealed_bytes_field(payload, "nonce") + if not ok: + return ok, reason + return True, "ok" + + +_PRIVATE_DM_LEDGER_ALLOWED_FIELDS: frozenset[str] = frozenset( + { + "recipient_id", + "delivery_class", + "recipient_token", + "ciphertext", + "msg_id", + "timestamp", + "format", + "session_welcome", + "sender_seal", + "relay_salt", + "transport_lock", + "signed_context", + } +) + + +def validate_private_dm_ledger_payload(payload: dict[str, Any]) -> tuple[bool, str]: + """Validate ciphertext-only DM dead-drop events for private Infonet replication.""" + ok, reason = validate_event_payload("dm_message", payload) + if not ok: + return ok, reason + unexpected = sorted( + key + for key in payload.keys() + if str(key or "").strip().lower() not in _PRIVATE_DM_LEDGER_ALLOWED_FIELDS + ) + if unexpected: + return False, f"private DM ledger payload contains unsupported fields: {', '.join(unexpected)}" + if "message" in payload or "plaintext" in payload or "_local_plaintext" in payload: + return False, "private DM ledger payload must not contain plaintext" + transport_lock = str(payload.get("transport_lock", "") or "").strip().lower() + if transport_lock != "private_strong": + return False, "DM hashchain spool requires private_strong transport_lock" + if not str(payload.get("ciphertext", "") or "").strip(): + return False, "ciphertext cannot be empty" + ok, reason = _validate_sealed_bytes_field(payload, "ciphertext") + if not ok: + return ok, reason + return True, "ok" + + def validate_protocol_fields(protocol_version: str, network_id: str) -> tuple[bool, str]: if protocol_version != PROTOCOL_VERSION: return False, "Unsupported protocol_version" diff --git a/backend/tests/mesh/test_gate_write_cutover.py b/backend/tests/mesh/test_gate_write_cutover.py index fc788b5..0ee3075 100644 --- a/backend/tests/mesh/test_gate_write_cutover.py +++ b/backend/tests/mesh/test_gate_write_cutover.py @@ -87,11 +87,28 @@ def _run_gate_release_once(monkeypatch, *, transport_tier="private_strong"): def _patch_for_successful_post(monkeypatch, module): """Apply standard monkeypatches so a gate_message post succeeds.""" import main + from services.mesh import mesh_hashchain _setup_gate_outbox(monkeypatch) monkeypatch.setattr(main, "_verify_gate_message_signed_write", lambda **kw: (True, "ok", kw.get("reply_to", ""))) monkeypatch.setattr(main, "_resolve_envelope_policy", lambda _gate_id: "envelope_disabled") + def _fake_private_gate_append(**kwargs): + return { + "event_id": f"ledger-ev-{kwargs.get('sequence', 0)}", + "event_type": "gate_message", + "node_id": kwargs["node_id"], + "payload": dict(kwargs["payload"]), + "timestamp": kwargs.get("timestamp", 0) or 123.0, + "sequence": kwargs["sequence"], + "signature": kwargs["signature"], + "public_key": kwargs["public_key"], + "public_key_algo": kwargs["public_key_algo"], + "protocol_version": kwargs.get("protocol_version", "infonet/2"), + } + + monkeypatch.setattr(mesh_hashchain.infonet, "append_private_gate_message", _fake_private_gate_append) + from services.mesh.mesh_reputation import gate_manager, reputation_ledger monkeypatch.setattr(gate_manager, "can_enter", lambda *a, **kw: (True, "ok")) @@ -255,19 +272,30 @@ def test_gate_post_preserves_gate_envelope_in_store(monkeypatch): def test_gate_post_advances_sequence(monkeypatch): - """validate_and_set_sequence must be called to advance the counter.""" + """append_private_gate_message must receive the gate sequence.""" import main from services.mesh import mesh_hashchain _patch_for_successful_post(monkeypatch, main) - seq_calls = [] + append_calls = [] - def track_seq(node_id, seq, *, domain=""): - seq_calls.append((node_id, seq, domain)) - return (True, "ok") + def track_private_append(**kwargs): + append_calls.append(kwargs) + return { + "event_id": "ev-seq", + "event_type": "gate_message", + "node_id": kwargs["node_id"], + "payload": dict(kwargs["payload"]), + "timestamp": kwargs.get("timestamp", 0) or 123.0, + "sequence": kwargs["sequence"], + "signature": kwargs["signature"], + "public_key": kwargs["public_key"], + "public_key_algo": kwargs["public_key_algo"], + "protocol_version": kwargs.get("protocol_version", "infonet/2"), + } - monkeypatch.setattr(mesh_hashchain.infonet, "validate_and_set_sequence", track_seq) + monkeypatch.setattr(mesh_hashchain.infonet, "append_private_gate_message", track_private_append) monkeypatch.setattr( mesh_hashchain.gate_store, "append", @@ -280,8 +308,9 @@ def test_gate_post_advances_sequence(monkeypatch): assert result["ok"] is True assert result["queued"] is True - assert len(seq_calls) == 1 - assert seq_calls[0] == ("!sb_test1234567890", 42, "gate_message") + assert len(append_calls) == 1 + assert append_calls[0]["node_id"] == "!sb_test1234567890" + assert append_calls[0]["sequence"] == 42 def test_gate_post_rejects_replay_via_sequence(monkeypatch): @@ -290,11 +319,11 @@ def test_gate_post_rejects_replay_via_sequence(monkeypatch): from services.mesh import mesh_hashchain _patch_for_successful_post(monkeypatch, main) - monkeypatch.setattr( - mesh_hashchain.infonet, - "validate_and_set_sequence", - lambda node_id, seq: (False, "Replay detected: sequence 1 <= last 1"), - ) + + def reject_private_append(**_kwargs): + raise ValueError("Replay detected: sequence 1 <= last 1") + + monkeypatch.setattr(mesh_hashchain.infonet, "append_private_gate_message", reject_private_append) gate_id = "infonet" body = _build_gate_message_body(gate_id, sequence=1) diff --git a/backend/tests/mesh/test_mesh_infonet_sync_support.py b/backend/tests/mesh/test_mesh_infonet_sync_support.py index bbaddee..96e8666 100644 --- a/backend/tests/mesh/test_mesh_infonet_sync_support.py +++ b/backend/tests/mesh/test_mesh_infonet_sync_support.py @@ -117,3 +117,11 @@ def test_finish_solo_sync_marks_first_node_ready_without_peer_failure(): assert finished.next_sync_due_at == 500 assert should_run_sync(finished, now=499) is False assert should_run_sync(finished, now=500) is True + + +def test_should_run_sync_recovers_stale_running_state(): + fresh = SyncWorkerState(last_sync_started_at=100, last_outcome="running") + stale = SyncWorkerState(last_sync_started_at=100, last_outcome="running") + + assert should_run_sync(fresh, now=399) is False + assert should_run_sync(stale, now=400) is True diff --git a/backend/tests/mesh/test_mesh_node_bootstrap_runtime.py b/backend/tests/mesh/test_mesh_node_bootstrap_runtime.py index c324341..da466b2 100644 --- a/backend/tests/mesh/test_mesh_node_bootstrap_runtime.py +++ b/backend/tests/mesh/test_mesh_node_bootstrap_runtime.py @@ -8,6 +8,53 @@ from cryptography.hazmat.primitives.asymmetric import ed25519 from httpx import ASGITransport, AsyncClient +def test_onion_peer_requests_use_arti_socks_proxy(monkeypatch): + import main + from services import wormhole_supervisor + + monkeypatch.setattr(main, "_infonet_private_transport_required", lambda: True) + monkeypatch.setattr( + main, + "get_settings", + lambda: SimpleNamespace(MESH_ARTI_ENABLED=True, MESH_ARTI_SOCKS_PORT=19050), + ) + monkeypatch.setattr(wormhole_supervisor, "_check_arti_ready", lambda: True) + + proxies = main._infonet_peer_requests_proxies("http://exampleabcd.onion:8000") + + assert proxies == { + "http": "socks5h://127.0.0.1:19050", + "https": "socks5h://127.0.0.1:19050", + } + + +def test_private_peer_requests_reject_clearnet(monkeypatch): + import main + + monkeypatch.setattr(main, "_infonet_private_transport_required", lambda: True) + + try: + main._infonet_peer_requests_proxies("https://seed.example") + except RuntimeError as exc: + assert "private Infonet requires onion/RNS transport" in str(exc) + else: + raise AssertionError("clearnet peer was allowed while private transport is required") + + +def test_local_peer_url_prefers_configured_public_peer_url(monkeypatch): + import main + + monkeypatch.setattr( + main, + "get_settings", + lambda: SimpleNamespace( + MESH_PUBLIC_PEER_URL="HTTP://LOCALPEEREXAMPLE.onion:8000/", + ), + ) + + assert main._local_infonet_peer_url() == "http://localpeerexample.onion:8000" + + def _write_signed_manifest(path, *, private_key): from services.mesh.mesh_bootstrap_manifest import BOOTSTRAP_MANIFEST_VERSION from services.mesh.mesh_crypto import canonical_json @@ -142,6 +189,134 @@ def test_refresh_node_peer_store_suppresses_clearnet_seed_by_default(tmp_path, m assert store.records_for_bucket("sync") == [] +def test_refresh_node_peer_store_prunes_persisted_clearnet_records_in_private_mode(tmp_path, monkeypatch): + import main + from services.config import get_settings + from services.mesh import mesh_peer_store as peer_store_mod + + peer_store_path = tmp_path / "peer_store.json" + monkeypatch.setattr(peer_store_mod, "DEFAULT_PEER_STORE_PATH", peer_store_path) + store = peer_store_mod.PeerStore(peer_store_path) + store.upsert( + peer_store_mod.make_bootstrap_peer_record( + peer_url="https://node.shadowbroker.info", + transport="clearnet", + role="seed", + signer_id="shadowbroker-default", + now=1_749_999_900, + ) + ) + store.upsert( + peer_store_mod.make_sync_peer_record( + peer_url="https://node.shadowbroker.info", + transport="clearnet", + role="seed", + source="bundle", + now=1_749_999_900, + ) + ) + store.upsert( + peer_store_mod.make_push_peer_record( + peer_url="https://node.shadowbroker.info", + transport="clearnet", + role="relay", + now=1_749_999_900, + ) + ) + store.save() + + onion_seed = "http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000" + monkeypatch.setenv("MESH_RELAY_PEERS", "") + monkeypatch.setenv("MESH_BOOTSTRAP_SEED_PEERS", onion_seed) + monkeypatch.setenv("MESH_DEFAULT_SYNC_PEERS", "") + monkeypatch.delenv("MESH_INFONET_ALLOW_CLEARNET_SYNC", raising=False) + monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", "") + get_settings.cache_clear() + + try: + snapshot = main._refresh_node_peer_store(now=1_750_000_000) + store = peer_store_mod.PeerStore(peer_store_path) + store.load() + finally: + get_settings.cache_clear() + + assert snapshot["private_transport_required"] is True + assert snapshot["pruned_clearnet_peer_count"] == 3 + assert [record.peer_url for record in store.records()] == [onion_seed, onion_seed] + assert {record.bucket for record in store.records()} == {"bootstrap", "sync"} + assert all(record.transport == "onion" for record in store.records()) + + +def test_infonet_peer_url_filter_excludes_clearnet_in_private_mode(monkeypatch): + import main + from services.config import get_settings + + monkeypatch.delenv("MESH_INFONET_ALLOW_CLEARNET_SYNC", raising=False) + get_settings.cache_clear() + + try: + assert main._filter_infonet_peer_urls( + [ + "https://node.shadowbroker.info", + "http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000", + ] + ) == ["http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000"] + finally: + get_settings.cache_clear() + + +def test_public_sync_cycle_backs_off_on_429_retry_after(tmp_path, monkeypatch): + import time + + import main + from services.config import get_settings + from services.mesh import mesh_peer_store as peer_store_mod + + peer_store_path = tmp_path / "peer_store.json" + monkeypatch.setattr(peer_store_mod, "DEFAULT_PEER_STORE_PATH", peer_store_path) + onion_seed = "http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000" + store = peer_store_mod.PeerStore(peer_store_path) + store.upsert( + peer_store_mod.make_sync_peer_record( + peer_url=onion_seed, + transport="onion", + role="seed", + source="bundle", + now=1_750_000_000, + ) + ) + store.save() + + monkeypatch.delenv("MESH_INFONET_ALLOW_CLEARNET_SYNC", raising=False) + monkeypatch.setenv("MESH_SYNC_FAILURE_BACKOFF_S", "60") + monkeypatch.setenv("MESH_BOOTSTRAP_SEED_FAILURE_COOLDOWN_S", "15") + get_settings.cache_clear() + monkeypatch.setattr(main, "_participant_node_enabled", lambda: True) + monkeypatch.setattr(main, "_ensure_infonet_private_transport_ready", lambda reason="": True) + monkeypatch.setattr( + main, + "_sync_from_peer", + lambda peer_url: (_ for _ in ()).throw( + main.PeerSyncHTTPError(429, "rate limited", retry_after_s=180) + ), + ) + main.set_sync_state(main.SyncWorkerState()) + + try: + before = int(time.time()) + state = main._run_public_sync_cycle() + store = peer_store_mod.PeerStore(peer_store_path) + store.load() + finally: + get_settings.cache_clear() + main.set_sync_state(main.SyncWorkerState()) + + record = store.records_for_bucket("sync")[0] + assert state.last_error == "HTTP 429: rate limited" + assert state.next_sync_due_at >= before + 180 + assert record.cooldown_until >= before + 180 + + def test_verify_peer_push_hmac_requires_allowlisted_peer(monkeypatch): import hashlib import hmac @@ -225,3 +400,29 @@ def test_public_sync_cycle_allows_first_node_without_peers(tmp_path, monkeypatch assert result.last_error == "" assert result.last_peer_url == "" assert result.consecutive_failures == 0 + + +def test_headless_mesh_node_runtime_is_explicit(monkeypatch): + import main + + monkeypatch.setattr(main, "_MESH_ONLY", True) + monkeypatch.setattr(main, "_HEADLESS_MESH_NODE_RUNTIME", False) + assert main._infonet_node_runtime_requested() is False + + monkeypatch.setattr(main, "_HEADLESS_MESH_NODE_RUNTIME", True) + assert main._infonet_node_runtime_requested() is True + + +def test_meshnode_scripts_enable_private_hashchain_runtime(): + from pathlib import Path + + root = Path(__file__).resolve().parents[3] + bat = (root / "meshnode.bat").read_text(encoding="utf-8") + sh = (root / "meshnode.sh").read_text(encoding="utf-8") + + for script in (bat, sh): + assert "SHADOWBROKER_MESH_NODE_RUNTIME=true" in script + assert "MESH_INFONET_ALLOW_CLEARNET_SYNC=false" in script + assert "MESH_ARTI_ENABLED=true" in script + assert "MESH_DM_HASHCHAIN_SPOOL_LIMIT=2" in script + assert "gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000" in script diff --git a/backend/tests/mesh/test_private_dm_hashchain.py b/backend/tests/mesh/test_private_dm_hashchain.py new file mode 100644 index 0000000..3c8cf41 --- /dev/null +++ b/backend/tests/mesh/test_private_dm_hashchain.py @@ -0,0 +1,213 @@ +import base64 +import time + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from services.config import get_settings +from services.mesh import mesh_crypto, mesh_dm_relay, mesh_hashchain, mesh_protocol, mesh_secure_storage + + +def _keypair(): + private_key = ed25519.Ed25519PrivateKey.generate() + public_raw = private_key.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + public_key = base64.b64encode(public_raw).decode("utf-8") + node_id = mesh_crypto.derive_node_id(public_key) + return private_key, public_key, node_id + + +def _payload(recipient_id: str = "recipient-a", msg_id: str = "dm-1") -> dict: + return mesh_protocol.normalize_payload( + "dm_message", + { + "recipient_id": recipient_id, + "delivery_class": "request", + "recipient_token": "", + "ciphertext": base64.b64encode(f"cipher-{msg_id}".encode("utf-8")).decode("ascii"), + "msg_id": msg_id, + "timestamp": int(time.time()), + "format": "mls1", + "transport_lock": "private_strong", + }, + ) + + +def _signature(private_key, node_id: str, sequence: int, payload: dict) -> str: + signature_payload = mesh_crypto.build_signature_payload( + event_type="dm_message", + node_id=node_id, + sequence=sequence, + payload=payload, + ) + return private_key.sign(signature_payload.encode("utf-8")).hex() + + +def _fresh_infonet(tmp_path, monkeypatch) -> mesh_hashchain.Infonet: + monkeypatch.setattr(mesh_hashchain, "DATA_DIR", tmp_path) + monkeypatch.setattr(mesh_hashchain, "CHAIN_FILE", tmp_path / "infonet.json") + monkeypatch.setattr(mesh_hashchain, "WAL_FILE", tmp_path / "infonet.wal") + return mesh_hashchain.Infonet() + + +def _fresh_relay(tmp_path, monkeypatch) -> mesh_dm_relay.DMRelay: + monkeypatch.setattr(mesh_dm_relay, "DATA_DIR", tmp_path) + monkeypatch.setattr(mesh_dm_relay, "RELAY_FILE", tmp_path / "dm_relay.json") + monkeypatch.setattr(mesh_secure_storage, "DATA_DIR", tmp_path) + monkeypatch.setattr(mesh_secure_storage, "MASTER_KEY_FILE", tmp_path / "wormhole_secure_store.key") + get_settings.cache_clear() + return mesh_dm_relay.DMRelay() + + +def test_private_dm_hashchain_spools_two_ciphertexts_per_recipient_from_distinct_senders(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + senders = [_keypair(), _keypair()] + + for idx, (private_key, public_key, node_id) in enumerate(senders, start=1): + payload = _payload(msg_id=f"dm-{idx}") + event = inf.append_private_dm_message( + node_id=node_id, + payload=payload, + signature=_signature(private_key, node_id, 1, payload), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=float(payload["timestamp"]), + ) + assert event["event_type"] == "dm_message" + + private_key, public_key, node_id = _keypair() + third = _payload(msg_id="dm-3") + try: + inf.append_private_dm_message( + node_id=node_id, + payload=third, + signature=_signature(private_key, node_id, 1, third), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=float(third["timestamp"]), + ) + except ValueError as exc: + assert "spool full" in str(exc) + else: + raise AssertionError("third DM spool event was accepted") + + for _private_key, _public_key, sender_node_id in senders: + assert inf.sequence_domains[f"{sender_node_id}|dm_message"] == 1 + assert inf.validate_chain(verify_signatures=True)[0] is True + + +def test_private_dm_hashchain_limits_one_active_spool_per_sender_recipient_pair(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + + first = _payload(msg_id="dm-1") + inf.append_private_dm_message( + node_id=node_id, + payload=first, + signature=_signature(private_key, node_id, 1, first), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=float(first["timestamp"]), + ) + + second = _payload(msg_id="dm-2") + try: + inf.append_private_dm_message( + node_id=node_id, + payload=second, + signature=_signature(private_key, node_id, 2, second), + sequence=2, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=float(second["timestamp"]), + ) + except ValueError as exc: + assert "sender spool full" in str(exc) + else: + raise AssertionError("second DM from same sender to same recipient was accepted") + + +def test_private_dm_hashchain_rejects_plaintext(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + payload = _payload() + payload["message"] = "plaintext" + + try: + inf.append_private_dm_message( + node_id=node_id, + payload=payload, + signature=_signature(private_key, node_id, 1, _payload()), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + except ValueError as exc: + assert "plaintext" in str(exc) + else: + raise AssertionError("private DM append accepted plaintext") + + +def test_private_dm_hashchain_rejects_non_sealed_ciphertext_shape(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + payload = _payload() + payload["ciphertext"] = "not sealed plaintext" + + try: + inf.append_private_dm_message( + node_id=node_id, + payload=payload, + signature=_signature(private_key, node_id, 1, payload), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + except ValueError as exc: + assert "sealed bytes" in str(exc) + else: + raise AssertionError("private DM append accepted non-base64 ciphertext") + + +def test_hydrate_dm_relay_from_chain_delivers_to_poll_claim(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path / "chain", monkeypatch) + relay = _fresh_relay(tmp_path / "relay", monkeypatch) + monkeypatch.setattr(mesh_hashchain, "infonet", inf) + monkeypatch.setattr(mesh_dm_relay, "dm_relay", relay) + + private_key, public_key, node_id = _keypair() + payload = _payload(recipient_id="recipient-a", msg_id="dm-chain-1") + event = inf.append_private_dm_message( + node_id=node_id, + payload=payload, + signature=_signature(private_key, node_id, 1, payload), + sequence=1, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=float(payload["timestamp"]), + ) + + from main import _hydrate_dm_relay_from_chain + + assert _hydrate_dm_relay_from_chain([event]) == 1 + messages, more = relay.collect_claims( + "recipient-a", + [{"type": "requests", "token": "recipient-request-token"}], + limit=8, + ) + + assert more is False + assert [message["msg_id"] for message in messages] == ["dm-chain-1"] + assert messages[0]["ciphertext"] == payload["ciphertext"] diff --git a/backend/tests/mesh/test_private_gate_hashchain.py b/backend/tests/mesh/test_private_gate_hashchain.py new file mode 100644 index 0000000..7b2d880 --- /dev/null +++ b/backend/tests/mesh/test_private_gate_hashchain.py @@ -0,0 +1,269 @@ +import base64 + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import ed25519 + +from services.mesh import mesh_crypto, mesh_hashchain, mesh_protocol + + +def _keypair(): + private_key = ed25519.Ed25519PrivateKey.generate() + public_raw = private_key.public_key().public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw, + ) + public_key = base64.b64encode(public_raw).decode("utf-8") + node_id = mesh_crypto.derive_node_id(public_key) + return private_key, public_key, node_id + + +def _sign(private_key, *, event_type: str, node_id: str, sequence: int, payload: dict) -> str: + signature_payload = mesh_crypto.build_signature_payload( + event_type=event_type, + node_id=node_id, + sequence=sequence, + payload=payload, + ) + return private_key.sign(signature_payload.encode("utf-8")).hex() + + +def _message_payload(text: str) -> dict: + return mesh_protocol.normalize_payload( + "message", + { + "message": text, + "destination": "broadcast", + "channel": "LongFast", + "priority": "normal", + "ephemeral": False, + }, + ) + + +def _gate_payload(gate_id: str = "ops-gate", *, epoch: int = 2, plaintext: bool = False) -> dict: + payload = { + "gate": gate_id, + "ciphertext": base64.b64encode(b"encrypted-gate-ciphertext").decode("ascii"), + "nonce": base64.b64encode(b"nonce-value-1234").decode("ascii"), + "sender_ref": "sender-ref-1", + "format": "mls1", + "transport_lock": "private_strong", + } + if epoch > 0: + payload["epoch"] = epoch + if plaintext: + payload["message"] = "this must never land on the chain" + return mesh_protocol.normalize_payload("gate_message", payload) if not plaintext else payload + + +def _gate_event( + private_key, + public_key: str, + node_id: str, + *, + sequence: int, + prev_hash: str, + payload: dict, + signature_payload: dict | None = None, +) -> dict: + signature = _sign( + private_key, + event_type="gate_message", + node_id=node_id, + sequence=sequence, + payload=signature_payload or payload, + ) + return mesh_hashchain.ChainEvent( + prev_hash=prev_hash, + event_type="gate_message", + node_id=node_id, + payload=payload, + timestamp=1234.0 + sequence, + sequence=sequence, + signature=signature, + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + network_id=mesh_protocol.NETWORK_ID, + ).to_dict() + + +def _fresh_infonet(tmp_path, monkeypatch) -> mesh_hashchain.Infonet: + monkeypatch.setattr(mesh_hashchain, "DATA_DIR", tmp_path) + monkeypatch.setattr(mesh_hashchain, "CHAIN_FILE", tmp_path / "infonet.json") + monkeypatch.setattr(mesh_hashchain, "WAL_FILE", tmp_path / "infonet.wal") + return mesh_hashchain.Infonet() + + +def test_private_gate_fork_uses_gate_sequence_domain_and_signature_variants(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + + public_payload = _message_payload("public prefix") + public_event = inf.append( + event_type="message", + node_id=node_id, + payload=public_payload, + sequence=1, + signature=_sign( + private_key, + event_type="message", + node_id=node_id, + sequence=1, + payload=public_payload, + ), + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + + gate_payload = _gate_payload(epoch=3) + signature_payload = dict(gate_payload) + signature_payload.pop("epoch", None) + gate_event = _gate_event( + private_key, + public_key, + node_id, + sequence=1, + prev_hash=public_event["event_id"], + payload=gate_payload, + signature_payload=signature_payload, + ) + + ok, reason = inf.apply_fork([gate_event], gate_event["event_id"], proof_count=2, quorum=2) + + assert ok is True, reason + assert inf.events[-1]["event_type"] == "gate_message" + assert inf.node_sequences[node_id] == 1 + assert inf.sequence_domains[f"{node_id}|gate_message"] == 1 + assert inf.validate_chain(verify_signatures=True)[0] is True + + +def test_private_gate_fork_rejects_plaintext_payload(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + + public_payload = _message_payload("public prefix") + public_event = inf.append( + event_type="message", + node_id=node_id, + payload=public_payload, + sequence=1, + signature=_sign( + private_key, + event_type="message", + node_id=node_id, + sequence=1, + payload=public_payload, + ), + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + + plaintext_payload = _gate_payload(plaintext=True) + gate_event = _gate_event( + private_key, + public_key, + node_id, + sequence=1, + prev_hash=public_event["event_id"], + payload=plaintext_payload, + ) + + ok, reason = inf.apply_fork([gate_event], gate_event["event_id"], proof_count=2, quorum=2) + + assert ok is False + assert "normalized" in reason or "plaintext" in reason + assert len(inf.events) == 1 + assert "gate_message" not in inf.get_info()["event_types"] + + +def test_append_private_gate_message_rejects_plaintext_before_normalizing(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + payload = _gate_payload() + payload["message"] = "plaintext should not be silently dropped" + + try: + inf.append_private_gate_message( + node_id=node_id, + payload=payload, + sequence=1, + signature=_sign( + private_key, + event_type="gate_message", + node_id=node_id, + sequence=1, + payload=_gate_payload(), + ), + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + except ValueError as exc: + assert "plaintext" in str(exc) + else: + raise AssertionError("private gate append accepted plaintext") + + assert inf.events == [] + + +def test_append_private_gate_message_requires_private_strong_transport_lock(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + payload = _gate_payload() + payload.pop("transport_lock", None) + + try: + inf.append_private_gate_message( + node_id=node_id, + payload=payload, + sequence=1, + signature=_sign( + private_key, + event_type="gate_message", + node_id=node_id, + sequence=1, + payload=_gate_payload(), + ), + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + except ValueError as exc: + assert "private_strong" in str(exc) + else: + raise AssertionError("private gate append accepted missing transport_lock") + + assert inf.events == [] + + +def test_append_private_gate_message_rejects_non_sealed_ciphertext_shape(tmp_path, monkeypatch): + inf = _fresh_infonet(tmp_path, monkeypatch) + private_key, public_key, node_id = _keypair() + payload = _gate_payload() + payload["ciphertext"] = "not sealed plaintext" + + try: + inf.append_private_gate_message( + node_id=node_id, + payload=payload, + sequence=1, + signature=_sign( + private_key, + event_type="gate_message", + node_id=node_id, + sequence=1, + payload=payload, + ), + public_key=public_key, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + ) + except ValueError as exc: + assert "sealed bytes" in str(exc) + else: + raise AssertionError("private gate append accepted non-base64 ciphertext") + + assert inf.events == [] diff --git a/backend/tests/mesh/test_s14b_public_sync_gate_filter.py b/backend/tests/mesh/test_s14b_public_sync_gate_filter.py index 0db14bb..eec6633 100644 --- a/backend/tests/mesh/test_s14b_public_sync_gate_filter.py +++ b/backend/tests/mesh/test_s14b_public_sync_gate_filter.py @@ -1,14 +1,12 @@ -"""S14B Public Sync Gate Event Filter. +"""S14B private sync gate event policy. -Tests: -- GET /api/mesh/infonet/sync excludes gate_message when local infonet contains legacy gate_message plus public events -- POST /api/mesh/infonet/sync excludes gate_message under the same condition -- Both main app and router-served paths are covered -- Non-gate public redactions still hold (vote gate label stripped, key_rotate identity stripped) -- Do not overclaim that gate_message is removed from historical infonet storage or ingest +Private Infonet sync carries encrypted gate_message ledger events. If a node +is configured to allow clearnet-compatible sync, those gate events are filtered +out of the sync response. """ import asyncio +import base64 import json from starlette.requests import Request @@ -17,9 +15,6 @@ import main from services.mesh import mesh_hashchain -# ── Helpers ────────────────────────────────────────────────────────────── - - def _message_event() -> dict: return { "event_id": "msg-1", @@ -83,6 +78,7 @@ def _gate_message_event() -> dict: "nonce": "nonce-1", "sender_ref": "sender-ref-1", "format": "mls1", + "transport_lock": "private_strong", }, "timestamp": 103.0, "sequence": 4, @@ -93,9 +89,31 @@ def _gate_message_event() -> dict: } -class _FakeInfonet: - """Minimal fake infonet with a gate_message among public events.""" +def _dm_message_event() -> dict: + return { + "event_id": "dm-1", + "event_type": "dm_message", + "node_id": "!node-5", + "payload": { + "recipient_id": "recipient-a", + "delivery_class": "request", + "recipient_token": "", + "ciphertext": base64.b64encode(b"sealed-dm-ciphertext").decode("ascii"), + "msg_id": "dm-1", + "timestamp": 104, + "format": "mls1", + "transport_lock": "private_strong", + }, + "timestamp": 104.0, + "sequence": 5, + "signature": "sig", + "public_key": "pub", + "public_key_algo": "Ed25519", + "protocol_version": "infonet/2", + } + +class _FakeInfonet: def __init__(self): self.head_hash = "head-1" self.events = [ @@ -113,12 +131,10 @@ class _FakeInfonet: return int(getattr(limit, "default", 100) or 100) def get_events_after(self, after_hash: str, limit=100): - resolved = self._limit_value(limit) - return [dict(e) for e in self.events[:resolved]] + return [dict(e) for e in self.events[: self._limit_value(limit)]] def get_events_after_locator(self, locator: list[str], limit=100): - resolved = self._limit_value(limit) - return self.head_hash, 0, [dict(e) for e in self.events[:resolved]] + return self.head_hash, 0, [dict(e) for e in self.events[: self._limit_value(limit)]] def get_merkle_proofs(self, start_index: int, count: int): return {"root": "merkle-root", "total": len(self.events), "start": start_index, "proofs": []} @@ -127,7 +143,7 @@ class _FakeInfonet: return "merkle-root" -def _json_request(path: str, body: dict) -> Request: +def _json_request(path: str, body: dict, *, client_host: str = "127.0.0.1", headers: dict[str, str] | None = None) -> Request: payload = json.dumps(body).encode("utf-8") sent = {"value": False} @@ -137,11 +153,14 @@ def _json_request(path: str, body: dict) -> Request: sent["value"] = True return {"type": "http.request", "body": payload, "more_body": False} + raw_headers = [(b"content-type", b"application/json")] + for key, value in dict(headers or {}).items(): + raw_headers.append((key.lower().encode("ascii"), str(value).encode("ascii"))) return Request( { "type": "http", - "headers": [(b"content-type", b"application/json")], - "client": ("test", 12345), + "headers": raw_headers, + "client": (client_host, 12345), "method": "POST", "path": path, }, @@ -149,20 +168,15 @@ def _json_request(path: str, body: dict) -> Request: ) -def _get_request(path: str) -> Request: - sent = {"value": False} - +def _get_request(path: str, *, client_host: str = "127.0.0.1", headers: dict[str, str] | None = None) -> Request: async def receive(): - if sent["value"]: - return {"type": "http.request", "body": b"", "more_body": False} - sent["value"] = True return {"type": "http.request", "body": b"", "more_body": False} return Request( { "type": "http", - "headers": [], - "client": ("test", 12345), + "headers": [(key.lower().encode("ascii"), str(value).encode("ascii")) for key, value in dict(headers or {}).items()], + "client": (client_host, 12345), "method": "GET", "path": path, }, @@ -170,120 +184,166 @@ def _get_request(path: str) -> Request: ) -# ── GET sync excludes gate_message (main app) ────────────────────────── +def _force_private_sync(monkeypatch): + monkeypatch.setattr(main, "_infonet_private_transport_required", lambda: True) + monkeypatch.setattr(main, "_request_appears_private_infonet_transport", lambda request: True) -def test_get_sync_excludes_gate_message(client, monkeypatch): - """GET /api/mesh/infonet/sync must not return gate_message events.""" +def _force_private_policy_only(monkeypatch): + monkeypatch.setattr(main, "_infonet_private_transport_required", lambda: True) + + +def _force_clearnet_sync(monkeypatch): + monkeypatch.setattr(main, "_infonet_private_transport_required", lambda: False) + + +def _event_types(events: list[dict]) -> list[str]: + return [str(e.get("event_type", "")) for e in events] + + +def test_private_sync_redacts_private_events_from_exposed_clearnet_request(monkeypatch): + _force_private_policy_only(monkeypatch) + request = _get_request("/api/mesh/infonet/sync", client_host="203.0.113.10") + + events = main._infonet_sync_response_events( + [_message_event(), _gate_message_event(), _dm_message_event()], + request=request, + ) + + assert _event_types(events) == ["message"] + + +def test_private_sync_includes_private_events_for_loopback_request(monkeypatch): + _force_private_policy_only(monkeypatch) + request = _get_request("/api/mesh/infonet/sync", client_host="127.0.0.1") + + events = main._infonet_sync_response_events( + [_message_event(), _gate_message_event(), _dm_message_event()], + request=request, + ) + + assert _event_types(events) == ["message", "gate_message", "dm_message"] + + +def test_private_sync_redacts_private_events_when_forwarded_for_is_clearnet(monkeypatch): + _force_private_policy_only(monkeypatch) + request = _get_request( + "/api/mesh/infonet/sync", + client_host="127.0.0.1", + headers={"x-forwarded-for": "198.51.100.44"}, + ) + + events = main._infonet_sync_response_events( + [_message_event(), _gate_message_event(), _dm_message_event()], + request=request, + ) + + assert _event_types(events) == ["message"] + + +def test_get_sync_includes_gate_message_on_private_transport(client, monkeypatch): + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) - resp = client.get("/api/mesh/infonet/sync") - data = resp.json() - event_types = [e["event_type"] for e in data["events"]] - assert "gate_message" not in event_types - assert "message" in event_types - assert "vote" in event_types - assert "key_rotate" in event_types + + data = client.get("/api/mesh/infonet/sync").json() + + assert "gate_message" in _event_types(data["events"]) + assert data["count"] == 4 -def test_get_sync_count_excludes_gate_message(client, monkeypatch): - """GET sync count field must reflect filtered events (gate_message excluded).""" +def test_post_sync_includes_gate_message_on_private_transport(monkeypatch): + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) - resp = client.get("/api/mesh/infonet/sync") - data = resp.json() - assert data["count"] == 3 # message, vote, key_rotate — not gate_message - -# ── POST sync excludes gate_message (main app) ───────────────────────── - - -def test_post_sync_excludes_gate_message(monkeypatch): - """POST /api/mesh/infonet/sync must not return gate_message events.""" - monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) result = asyncio.run( main.infonet_sync_post( _json_request("/api/mesh/infonet/sync", {"locator": ["head-1"]}) ) ) - event_types = [e["event_type"] for e in result["events"]] - assert "gate_message" not in event_types - assert "message" in event_types - assert "vote" in event_types - assert "key_rotate" in event_types + + assert "gate_message" in _event_types(result["events"]) + assert result["count"] == 4 -def test_post_sync_count_excludes_gate_message(monkeypatch): - """POST sync count field must reflect filtered events.""" - monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) - result = asyncio.run( - main.infonet_sync_post( - _json_request("/api/mesh/infonet/sync", {"locator": ["head-1"]}) - ) - ) - assert result["count"] == 3 - - -# ── Router-served paths ──────────────────────────────────────────────── - - -def test_router_get_sync_excludes_gate_message(monkeypatch): - """Router GET /api/mesh/infonet/sync must not return gate_message.""" +def test_router_get_sync_includes_gate_message_on_private_transport(monkeypatch): from routers.mesh_public import infonet_sync + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) + result = asyncio.run(infonet_sync(_get_request("/api/mesh/infonet/sync"))) - event_types = [e["event_type"] for e in result["events"]] - assert "gate_message" not in event_types - assert "message" in event_types - assert data_count_matches(result) + + assert "gate_message" in _event_types(result["events"]) + assert result["count"] == len(result["events"]) -def test_router_post_sync_excludes_gate_message(monkeypatch): - """Router POST /api/mesh/infonet/sync must not return gate_message.""" +def test_router_post_sync_includes_gate_message_on_private_transport(monkeypatch): from routers.mesh_public import infonet_sync_post + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) + result = asyncio.run( infonet_sync_post( _json_request("/api/mesh/infonet/sync", {"locator": ["head-1"]}) ) ) - event_types = [e["event_type"] for e in result["events"]] - assert "gate_message" not in event_types - assert "message" in event_types - assert data_count_matches(result) + + assert "gate_message" in _event_types(result["events"]) + assert result["count"] == len(result["events"]) -def data_count_matches(result: dict) -> bool: - return result["count"] == len(result["events"]) +def test_get_sync_excludes_gate_message_when_clearnet_sync_allowed(client, monkeypatch): + _force_clearnet_sync(monkeypatch) + monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) + + data = client.get("/api/mesh/infonet/sync").json() + + assert "gate_message" not in _event_types(data["events"]) + assert data["count"] == 3 -# ── Non-gate redactions still hold ───────────────────────────────────── +def test_post_sync_excludes_gate_message_when_clearnet_sync_allowed(monkeypatch): + _force_clearnet_sync(monkeypatch) + monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) + + result = asyncio.run( + main.infonet_sync_post( + _json_request("/api/mesh/infonet/sync", {"locator": ["head-1"]}) + ) + ) + + assert "gate_message" not in _event_types(result["events"]) + assert result["count"] == 3 def test_get_sync_still_redacts_vote_gate_label(client, monkeypatch): - """Public sync must still strip gate label from vote payload.""" + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) - resp = client.get("/api/mesh/infonet/sync") - events = resp.json()["events"] + + events = client.get("/api/mesh/infonet/sync").json()["events"] vote = next(e for e in events if e["event_type"] == "vote") + assert "gate" not in vote.get("payload", {}) def test_get_sync_still_redacts_key_rotate_identity(client, monkeypatch): - """Public sync must still strip old identity fields from key_rotate payload.""" + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) - resp = client.get("/api/mesh/infonet/sync") - events = resp.json()["events"] + + events = client.get("/api/mesh/infonet/sync").json()["events"] rotate = next(e for e in events if e["event_type"] == "key_rotate") payload = rotate.get("payload", {}) + assert "old_node_id" not in payload assert "old_public_key" not in payload assert "old_signature" not in payload def test_post_sync_still_redacts_vote_and_rotate(monkeypatch): - """POST sync must still apply standard public redactions to non-gate events.""" + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _FakeInfonet(), raising=False) + result = asyncio.run( main.infonet_sync_post( _json_request("/api/mesh/infonet/sync", {"locator": ["head-1"]}) @@ -291,24 +351,17 @@ def test_post_sync_still_redacts_vote_and_rotate(monkeypatch): ) vote = next(e for e in result["events"] if e["event_type"] == "vote") rotate = next(e for e in result["events"] if e["event_type"] == "key_rotate") + assert "gate" not in vote.get("payload", {}) assert "old_node_id" not in rotate.get("payload", {}) -# ── No overclaim ─────────────────────────────────────────────────────── - - def test_gate_message_still_in_fake_infonet_storage(): - """The filter does NOT remove gate_message from underlying storage. - This test documents that the infonet still holds gate_message events; - only the public sync response surface filters them out.""" fake = _FakeInfonet() - all_types = [e["event_type"] for e in fake.events] - assert "gate_message" in all_types + assert "gate_message" in _event_types(fake.events) -def test_sync_with_only_gate_messages_returns_empty(client, monkeypatch): - """If infonet contains only gate_message events, sync returns empty list.""" +def test_private_sync_with_only_gate_messages_returns_gate_events(client, monkeypatch): class _GateOnlyInfonet: head_hash = "head-1" events = [_gate_message_event()] @@ -325,8 +378,10 @@ def test_sync_with_only_gate_messages_returns_empty(client, monkeypatch): def get_merkle_root(self): return "r" + _force_private_sync(monkeypatch) monkeypatch.setattr(mesh_hashchain, "infonet", _GateOnlyInfonet(), raising=False) - resp = client.get("/api/mesh/infonet/sync") - data = resp.json() - assert data["events"] == [] - assert data["count"] == 0 + + data = client.get("/api/mesh/infonet/sync").json() + + assert _event_types(data["events"]) == ["gate_message"] + assert data["count"] == 1 diff --git a/backend/tests/mesh/test_s9b_gate_store_hydration.py b/backend/tests/mesh/test_s9b_gate_store_hydration.py index dbaecb6..2cfe888 100644 --- a/backend/tests/mesh/test_s9b_gate_store_hydration.py +++ b/backend/tests/mesh/test_s9b_gate_store_hydration.py @@ -66,6 +66,20 @@ def _make_gate_message_event(priv, pub_b64, node_id, sequence, prev_hash, gate_i return evt.to_dict() +def _make_gate_payload(gate_id="test-gate") -> dict: + return mesh_protocol.normalize_payload( + "gate_message", + { + "gate": gate_id, + "ciphertext": base64.b64encode(b"encrypted-data").decode(), + "nonce": base64.b64encode(b"nonce-value-1234").decode(), + "sender_ref": "sender-abc", + "format": "mls1", + "transport_lock": "private_strong", + }, + ) + + @pytest.fixture() def fresh_env(tmp_path, monkeypatch): """Set up isolated infonet + gate_store, return (infonet, gate_store).""" @@ -89,6 +103,74 @@ def fresh_env(tmp_path, monkeypatch): # ── Rejected gate_message must NOT hydrate gate_store ───────────────────── +def test_append_private_gate_message_uses_hashchain_gate_sequence(fresh_env): + """Local gate posts become private hashchain events in a gate sequence domain.""" + inf, _gs = fresh_env + priv, pub_b64, node_id = _make_keypair() + sequence = 1 + payload = _make_gate_payload("test-gate") + sig_payload = mesh_crypto.build_signature_payload( + event_type="gate_message", + node_id=node_id, + sequence=sequence, + payload=payload, + ) + signature = priv.sign(sig_payload.encode("utf-8")).hex() + + event = inf.append_private_gate_message( + node_id=node_id, + payload=payload, + signature=signature, + sequence=sequence, + public_key=pub_b64, + public_key_algo="Ed25519", + protocol_version=mesh_protocol.PROTOCOL_VERSION, + timestamp=123.0, + ) + + assert event["event_type"] == "gate_message" + assert inf.head_hash == event["event_id"] + assert inf.sequence_domains[f"{node_id}|gate_message"] == sequence + assert inf.node_sequences.get(node_id, 0) == 0 + assert event["payload"]["transport_lock"] == "private_strong" + + +def test_ingest_accepts_new_suffix_after_duplicate_prefix(fresh_env): + """Peer-push batches may include events the receiver already has.""" + inf, _gs = fresh_env + priv, pub_b64, node_id = _make_keypair() + evt1 = _make_gate_message_event( + priv, + pub_b64, + node_id, + sequence=1, + prev_hash=mesh_hashchain.GENESIS_HASH, + ) + assert inf.ingest_events([evt1])["accepted"] == 1 + evt2 = _make_gate_message_event( + priv, + pub_b64, + node_id, + sequence=2, + prev_hash=evt1["event_id"], + ) + assert inf.ingest_events([evt2])["accepted"] == 1 + evt3 = _make_gate_message_event( + priv, + pub_b64, + node_id, + sequence=3, + prev_hash=evt2["event_id"], + ) + + result = inf.ingest_events([evt1, evt2, evt3]) + + assert result["duplicates"] == 2 + assert result["accepted"] == 1 + assert result["rejected"] == [] + assert inf.head_hash == evt3["event_id"] + + def test_rejected_event_does_not_hydrate_gate_store(fresh_env): """A gate_message rejected by ingest must not appear in gate_store.""" inf, gs = fresh_env diff --git a/docker-compose.relay.yml b/docker-compose.relay.yml index 0ffcc49..dc27e96 100644 --- a/docker-compose.relay.yml +++ b/docker-compose.relay.yml @@ -6,7 +6,7 @@ services: dockerfile: ./backend/Dockerfile container_name: shadowbroker-relay ports: - - "0.0.0.0:8000:8000" + - "127.0.0.1:8000:8000" env_file: .env volumes: - relay_data:/app/data diff --git a/docker-compose.yml b/docker-compose.yml index 21ca437..08db5a2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,8 +24,16 @@ services: # Private Infonet bootstrap seeds. Seeds are discovery hints, not fixed roots. - MESH_BOOTSTRAP_SEED_PEERS=${MESH_BOOTSTRAP_SEED_PEERS:-http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000} - MESH_DEFAULT_SYNC_PEERS=${MESH_DEFAULT_SYNC_PEERS:-} + - MESH_SYNC_TIMEOUT_S=${MESH_SYNC_TIMEOUT_S:-5} + - MESH_RELAY_PUSH_TIMEOUT_S=${MESH_RELAY_PUSH_TIMEOUT_S:-45} + # Explicitly opt into HTTPS/IP-based peer sync. Default remains private transports only. + - MESH_INFONET_ALLOW_CLEARNET_SYNC=${MESH_INFONET_ALLOW_CLEARNET_SYNC:-false} + # Tor/Arti SOCKS transport for private .onion Infonet sync. + - MESH_ARTI_ENABLED=${MESH_ARTI_ENABLED:-false} + - MESH_ARTI_SOCKS_PORT=${MESH_ARTI_SOCKS_PORT:-9050} # Operator-trusted sync/push peers. Leave empty unless you control the peer secret on both sides. - MESH_RELAY_PEERS=${MESH_RELAY_PEERS:-} + - MESH_PUBLIC_PEER_URL=${MESH_PUBLIC_PEER_URL:-} # Shared transport auth for operator peer push. Must be set to a unique secret per deployment. - MESH_PEER_PUSH_SECRET=${MESH_PEER_PUSH_SECRET:-} # Issue #256: optional per-peer HMAC secrets. Comma-separated diff --git a/frontend/src/components/InfonetTerminal/InfonetShell.tsx b/frontend/src/components/InfonetTerminal/InfonetShell.tsx index e2557d8..adc6b5c 100644 --- a/frontend/src/components/InfonetTerminal/InfonetShell.tsx +++ b/frontend/src/components/InfonetTerminal/InfonetShell.tsx @@ -298,6 +298,33 @@ export default function InfonetShell({ setCurrentView(view); }; + const renderGateDirectory = (variant: 'landing' | 'command' = 'command') => ( +
+

+ AVAILABLE OBFUSCATED GATES: +

+
+ {GATES.map(gate => ( + + ))} +
+
+ ); + const openGateWhenReady = async ( gateTarget: string, operation: () => Promise, @@ -471,19 +498,7 @@ export default function InfonetShell({ setHistory([]); return; } else if (trimmedCmd === 'gates') { - output = ( -
-

AVAILABLE OBFUSCATED GATES:

-
- {GATES.map(gate => ( -
handleNavigate('gate', gate)}> - [{'>'}] - {gate} -
- ))} -
-
- ); + output = renderGateDirectory('command'); } else if (trimmedCmd.startsWith('join ') || trimmedCmd.startsWith('g/')) { const target = trimmedCmd.startsWith('g/') ? trimmedCmd.slice(2) : trimmedCmd.split(' ')[1]; if (GATES.includes(target)) { @@ -661,6 +676,9 @@ export default function InfonetShell({

Type 'gates' or g/ to view available chatrooms.

+
+ {renderGateDirectory('landing')} +
diff --git a/meshnode.bat b/meshnode.bat index a7194fc..9fc0cf4 100644 --- a/meshnode.bat +++ b/meshnode.bat @@ -8,6 +8,7 @@ echo =================================================== echo. echo Lightweight node — syncs the Infonet chain only. echo No map, no frontend, no data feeds. +echo Private hashchain relay: gate messages + offline DM spool. echo Close this window to stop the node. echo. @@ -96,15 +97,22 @@ echo [*] Auto-enabling node participation... if not exist "data\" mkdir data echo {"enabled":true,"updated_at":0} > data\node.json +set MESH_ONLY=true +set SHADOWBROKER_MESH_NODE_RUNTIME=true +set MESH_NODE_MODE=participant +set MESH_INFONET_ALLOW_CLEARNET_SYNC=false +set MESH_ARTI_ENABLED=true +set MESH_DM_HASHCHAIN_SPOOL_LIMIT=2 +set MESH_DM_HASHCHAIN_SPOOL_TTL_S=3600 +if "%MESH_BOOTSTRAP_SEED_PEERS%"=="" set MESH_BOOTSTRAP_SEED_PEERS=http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000 + echo. echo =================================================== echo Mesh node starting on port 8000 echo Mode: MESH_ONLY (no data feeds) -echo Relay: %MESH_RELAY_PEERS% +echo Bootstrap: %MESH_BOOTSTRAP_SEED_PEERS% echo Press Ctrl+C to stop echo =================================================== echo. -set MESH_ONLY=true -set MESH_NODE_MODE=participant python -m uvicorn main:app --host 0.0.0.0 --port 8000 diff --git a/meshnode.sh b/meshnode.sh index 4ed97aa..f7966c1 100644 --- a/meshnode.sh +++ b/meshnode.sh @@ -10,6 +10,7 @@ echo "===================================================" echo "" echo " Lightweight node — syncs the Infonet chain only." echo " No map, no frontend, no data feeds." +echo " Private hashchain relay: gate messages + offline DM spool." echo " Press Ctrl+C to stop." echo "" @@ -51,15 +52,22 @@ echo "[*] Auto-enabling node participation..." mkdir -p data echo '{"enabled":true,"updated_at":0}' > data/node.json +export MESH_ONLY=true +export SHADOWBROKER_MESH_NODE_RUNTIME=true +export MESH_NODE_MODE=participant +export MESH_INFONET_ALLOW_CLEARNET_SYNC=false +export MESH_ARTI_ENABLED=true +export MESH_DM_HASHCHAIN_SPOOL_LIMIT=2 +export MESH_DM_HASHCHAIN_SPOOL_TTL_S=3600 +export MESH_BOOTSTRAP_SEED_PEERS="${MESH_BOOTSTRAP_SEED_PEERS:-http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000}" + echo "" echo "===================================================" echo " Mesh node starting on port 8000" echo " Mode: MESH_ONLY (no data feeds)" -echo " Relay: ${MESH_RELAY_PEERS:-default}" +echo " Bootstrap: ${MESH_BOOTSTRAP_SEED_PEERS}" echo " Press Ctrl+C to stop" echo "===================================================" echo "" -export MESH_ONLY=true -export MESH_NODE_MODE=participant python3 -m uvicorn main:app --host 0.0.0.0 --port 8000