mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-07-01 10:15:43 +02:00
Add private Infonet swarm discovery and gate propagation.
Signed peer manifest pull/announce on the seed, immediate hashchain push for gate messages, seed-only Docker defaults, and stale-genesis sync diagnostics. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
@@ -227,7 +227,16 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
|
||||
# MESH_GATE_SESSION_STREAM_MAX_GATES=16
|
||||
# MESH_BOOTSTRAP_DISABLED=false
|
||||
# MESH_BOOTSTRAP_MANIFEST_PATH=data/bootstrap_peers.json
|
||||
# Swarm discovery (signed peer manifest). Participants need only the public key;
|
||||
# the seed operator sets MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY (never commit it).
|
||||
# Generate a fleet keypair: uv run python backend/scripts/bootstrap_manifest_helper.py generate-keypair
|
||||
# MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY=
|
||||
# MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY= # seed only
|
||||
# MESH_BOOTSTRAP_SIGNER_ID=shadowbroker-seed
|
||||
# MESH_PEER_REGISTRY_ENABLED=true # seed only (auto-enabled when private key is set)
|
||||
# MESH_SWARM_MANIFEST_TTL_S=14400
|
||||
# MESH_SWARM_MANIFEST_PULL_INTERVAL_S=300
|
||||
# MESH_PEER_REGISTRY_STALE_S=604800
|
||||
# Infonet/Wormhole fails closed to onion/RNS by default. Only enable clearnet
|
||||
# sync for local relay development or an explicitly public testnet.
|
||||
# MESH_INFONET_ALLOW_CLEARNET_SYNC=false
|
||||
|
||||
@@ -1404,6 +1404,27 @@ def _peer_hmac_url_from_request(request: Request) -> str:
|
||||
return ""
|
||||
|
||||
|
||||
def _verify_peer_transport_hmac(request: Request, body_bytes: bytes) -> bool:
|
||||
"""Verify HMAC-SHA256 peer authentication without an allowlist check."""
|
||||
provided = str(request.headers.get("x-peer-hmac", "") or "").strip()
|
||||
if not provided:
|
||||
return False
|
||||
|
||||
peer_url = _peer_hmac_url_from_request(request)
|
||||
if not peer_url:
|
||||
return False
|
||||
peer_key = resolve_peer_key_for_url(peer_url)
|
||||
if not peer_key:
|
||||
return False
|
||||
|
||||
expected = _hmac_mod.new(
|
||||
peer_key,
|
||||
body_bytes,
|
||||
_hashlib_mod.sha256,
|
||||
).hexdigest()
|
||||
return _hmac_mod.compare_digest(provided.lower(), expected.lower())
|
||||
|
||||
|
||||
def _verify_peer_push_hmac(request: Request, body_bytes: bytes) -> bool:
|
||||
"""Verify HMAC-SHA256 peer authentication on push requests.
|
||||
|
||||
|
||||
+144
-2
@@ -244,6 +244,7 @@ from services.mesh.mesh_protocol import (
|
||||
PROTOCOL_VERSION,
|
||||
normalize_payload,
|
||||
)
|
||||
from services.mesh.mesh_hashchain import GENESIS_HASH
|
||||
from services.mesh.mesh_signed_events import (
|
||||
MeshWriteExemption,
|
||||
SignedWriteKind,
|
||||
@@ -324,6 +325,7 @@ from auth import (
|
||||
_validate_insecure_admin_startup,
|
||||
_validate_peer_push_secret,
|
||||
_verify_peer_push_hmac,
|
||||
_verify_peer_transport_hmac,
|
||||
)
|
||||
from node_state import (
|
||||
_NODE_BOOTSTRAP_STATE,
|
||||
@@ -1275,6 +1277,7 @@ def _ensure_infonet_private_transport_ready(reason: str = "") -> bool:
|
||||
get_settings.cache_clear()
|
||||
if _check_arti_ready():
|
||||
logger.info("Infonet private transport ready%s", label)
|
||||
threading.Thread(target=_swarm_bootstrap_after_transport_ready, daemon=True).start()
|
||||
return True
|
||||
logger.warning("Infonet private transport warmup incomplete%s: %s", label, tor_result)
|
||||
return False
|
||||
@@ -1416,6 +1419,16 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]:
|
||||
if private_transport_required and skipped_clearnet_peers and not bootstrap_error:
|
||||
bootstrap_error = _infonet_private_transport_error()
|
||||
|
||||
swarm_pull: dict[str, Any] = {}
|
||||
try:
|
||||
from services.mesh.mesh_swarm_runtime import refresh_swarm_manifest_from_seeds
|
||||
|
||||
swarm_pull = refresh_swarm_manifest_from_seeds(now=timestamp)
|
||||
if swarm_pull.get("ok") and not swarm_pull.get("skipped"):
|
||||
store.load()
|
||||
except Exception as exc:
|
||||
swarm_pull = {"ok": False, "detail": str(exc or type(exc).__name__)}
|
||||
|
||||
store.save()
|
||||
bootstrap_records = store.records_for_bucket("bootstrap")
|
||||
sync_records = store.records_for_bucket("sync")
|
||||
@@ -1424,6 +1437,8 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]:
|
||||
bootstrap_records = [record for record in bootstrap_records if _is_private_infonet_transport(record.transport)]
|
||||
sync_records = [record for record in sync_records if _is_private_infonet_transport(record.transport)]
|
||||
push_records = [record for record in push_records if _is_private_infonet_transport(record.transport)]
|
||||
swarm_sync_peer_count = len([record for record in sync_records if str(record.source or "") == "swarm"])
|
||||
swarm_push_peer_count = len([record for record in push_records if str(record.source or "") == "swarm"])
|
||||
snapshot = {
|
||||
"node_mode": mode,
|
||||
"private_transport_required": private_transport_required,
|
||||
@@ -1435,16 +1450,30 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]:
|
||||
"bootstrap_peer_count": len(bootstrap_records),
|
||||
"sync_peer_count": len(sync_records),
|
||||
"push_peer_count": len(push_records),
|
||||
"swarm_sync_peer_count": swarm_sync_peer_count,
|
||||
"swarm_push_peer_count": swarm_push_peer_count,
|
||||
"operator_peer_count": len(operator_peers),
|
||||
"bootstrap_seed_peer_count": len(bootstrap_seed_peers),
|
||||
"default_sync_peer_count": len(bootstrap_seed_peers),
|
||||
"last_bootstrap_error": bootstrap_error,
|
||||
"swarm_manifest_pull": swarm_pull,
|
||||
}
|
||||
with _NODE_RUNTIME_LOCK:
|
||||
_NODE_BOOTSTRAP_STATE.update(snapshot)
|
||||
return snapshot
|
||||
|
||||
|
||||
def _swarm_bootstrap_after_transport_ready() -> None:
|
||||
try:
|
||||
from services.mesh.mesh_swarm_runtime import announce_local_peer_to_seeds, refresh_swarm_manifest_from_seeds
|
||||
|
||||
announce_local_peer_to_seeds(force=True)
|
||||
refresh_swarm_manifest_from_seeds(force=True)
|
||||
_refresh_node_peer_store()
|
||||
except Exception:
|
||||
logger.warning("swarm bootstrap after transport ready failed", exc_info=True)
|
||||
|
||||
|
||||
def _materialize_local_infonet_state() -> None:
|
||||
from services.mesh.mesh_hashchain import infonet
|
||||
|
||||
@@ -1669,7 +1698,29 @@ def _sync_from_peer(
|
||||
_hydrate_dm_relay_from_chain(events)
|
||||
rejected = list(result.get("rejected", []) or [])
|
||||
if rejected:
|
||||
return False, f"sync ingest rejected {len(rejected)} event(s)", False, 0
|
||||
reasons = [
|
||||
str((item or {}).get("reason", "") or "").strip()
|
||||
for item in rejected
|
||||
if isinstance(item, dict)
|
||||
]
|
||||
reason_summary = ", ".join(reason for reason in reasons if reason)
|
||||
detail = f"sync ingest rejected {len(rejected)} event(s)"
|
||||
if reason_summary:
|
||||
detail = f"{detail}: {reason_summary}"
|
||||
local_empty = len(infonet.events) == 0
|
||||
stale_genesis = (
|
||||
local_empty
|
||||
and bool(events)
|
||||
and str((events[0] or {}).get("prev_hash", "") or "") == GENESIS_HASH
|
||||
and any("timestamp outside freshness window" in reason.lower() for reason in reasons)
|
||||
)
|
||||
if stale_genesis:
|
||||
detail = (
|
||||
f"{detail}; peer appears to be serving an expired genesis chain. "
|
||||
"Refresh or reset the peer chain, or perform an explicit one-time migration "
|
||||
"with MESH_INGEST_EVENT_MAX_AGE_S=0."
|
||||
)
|
||||
return False, detail, False, 0
|
||||
if int(result.get("accepted", 0) or 0) == 0 and int(result.get("duplicates", 0) or 0) >= len(events):
|
||||
return True, "", False, 0
|
||||
if len(events) < page_limit:
|
||||
@@ -1922,9 +1973,22 @@ def _propagate_public_event_to_peers(event_dict: dict[str, Any]) -> None:
|
||||
)
|
||||
|
||||
|
||||
def _propagate_ledger_event_to_peers(event_dict: dict[str, Any]) -> None:
|
||||
if not _participant_node_enabled():
|
||||
return
|
||||
event_type = str(event_dict.get("event_type") or "")
|
||||
if event_type in {"gate_message", "dm_message"}:
|
||||
from services.mesh.mesh_swarm_runtime import push_infonet_events_to_http_peers
|
||||
|
||||
push_infonet_events_to_http_peers([event_dict])
|
||||
_kick_public_sync_background("ledger_event")
|
||||
return
|
||||
_propagate_public_event_to_peers(event_dict)
|
||||
|
||||
|
||||
def _schedule_public_event_propagation(event_dict: dict[str, Any]) -> None:
|
||||
threading.Thread(
|
||||
target=_propagate_public_event_to_peers,
|
||||
target=_propagate_ledger_event_to_peers,
|
||||
args=(dict(event_dict),),
|
||||
daemon=True,
|
||||
).start()
|
||||
@@ -1960,6 +2024,7 @@ def _start_infonet_node_runtime(reason: str = "startup") -> None:
|
||||
threading.Thread(target=_http_peer_push_loop, daemon=True).start()
|
||||
threading.Thread(target=_http_gate_push_loop, daemon=True).start()
|
||||
threading.Thread(target=_http_gate_pull_loop, daemon=True).start()
|
||||
threading.Thread(target=_swarm_manifest_pull_loop, daemon=True).start()
|
||||
_NODE_RUNTIME_THREADS_STARTED = True
|
||||
_kick_public_sync_background(reason)
|
||||
if not _NODE_PUBLIC_EVENT_HOOK_REGISTERED:
|
||||
@@ -2067,6 +2132,22 @@ def _http_peer_push_loop() -> None:
|
||||
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
|
||||
|
||||
|
||||
def _swarm_manifest_pull_loop() -> None:
|
||||
"""Background thread: pull signed peer manifests from bootstrap seeds."""
|
||||
while not _NODE_SYNC_STOP.is_set():
|
||||
try:
|
||||
if _participant_node_enabled():
|
||||
from services.mesh.mesh_swarm_runtime import refresh_swarm_manifest_from_seeds
|
||||
|
||||
result = refresh_swarm_manifest_from_seeds()
|
||||
if result.get("ok") and not result.get("skipped"):
|
||||
_refresh_node_peer_store()
|
||||
except Exception:
|
||||
logger.exception("swarm manifest pull loop error")
|
||||
interval_s = int(getattr(get_settings(), "MESH_SWARM_MANIFEST_PULL_INTERVAL_S", 0) or 300)
|
||||
_NODE_SYNC_STOP.wait(max(30, interval_s))
|
||||
|
||||
|
||||
# ─── Background Gate Message Pull Worker ─────────────────────────────────
|
||||
# Periodically pulls gate events from relay peers that this node is missing.
|
||||
# Complements the push loop: push sends OUR events to peers, pull fetches
|
||||
@@ -5497,6 +5578,65 @@ async def infonet_ingest(request: Request):
|
||||
return {"ok": True, **result}
|
||||
|
||||
|
||||
@app.get("/api/mesh/infonet/peer-registry", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def infonet_peer_registry(request: Request):
|
||||
"""Operator view of the live swarm peer registry (seed nodes only)."""
|
||||
from services.mesh.mesh_peer_registry import DEFAULT_PEER_REGISTRY_PATH, PeerRegistry
|
||||
from services.mesh.mesh_swarm_runtime import peer_registry_enabled
|
||||
|
||||
if not peer_registry_enabled():
|
||||
return {"ok": False, "detail": "peer registry is not enabled on this node"}
|
||||
registry = PeerRegistry(DEFAULT_PEER_REGISTRY_PATH)
|
||||
try:
|
||||
peers = registry.load()
|
||||
except Exception as exc:
|
||||
return {"ok": False, "detail": str(exc or type(exc).__name__)}
|
||||
return {
|
||||
"ok": True,
|
||||
"peer_count": len(peers),
|
||||
"peers": [peer.to_dict() for peer in peers],
|
||||
}
|
||||
|
||||
|
||||
@app.get("/api/mesh/infonet/bootstrap-manifest")
|
||||
@limiter.limit(_INFONET_SYNC_RATE_LIMIT)
|
||||
async def infonet_bootstrap_manifest(request: Request):
|
||||
"""Return the current signed bootstrap/swarm peer manifest."""
|
||||
from services.mesh.mesh_swarm_runtime import load_live_bootstrap_manifest
|
||||
|
||||
manifest = load_live_bootstrap_manifest()
|
||||
if manifest is None:
|
||||
return {"ok": False, "detail": "bootstrap manifest unavailable"}
|
||||
return {"ok": True, "manifest": manifest.to_dict()}
|
||||
|
||||
|
||||
@app.post("/api/mesh/infonet/peer-announce")
|
||||
@limiter.limit("30/minute")
|
||||
@mesh_write_exempt(MeshWriteExemption.PEER_GOSSIP)
|
||||
async def infonet_peer_announce(request: Request):
|
||||
"""Register a participant onion peer in the swarm registry (HMAC-authenticated)."""
|
||||
from auth import _peer_hmac_url_from_request
|
||||
from services.mesh.mesh_swarm_runtime import peer_registry_enabled, record_peer_announcement
|
||||
|
||||
body_bytes = await request.body()
|
||||
if not _verify_peer_transport_hmac(request, body_bytes):
|
||||
return Response(
|
||||
content='{"ok":false,"detail":"Invalid or missing peer HMAC"}',
|
||||
status_code=403,
|
||||
media_type="application/json",
|
||||
)
|
||||
if not peer_registry_enabled():
|
||||
return {"ok": False, "detail": "peer registry is not enabled on this node"}
|
||||
body = json_mod.loads(body_bytes or b"{}")
|
||||
announced_url = normalize_peer_url(str(body.get("peer_url", "") or ""))
|
||||
header_url = _peer_hmac_url_from_request(request)
|
||||
if not announced_url or announced_url != header_url:
|
||||
return {"ok": False, "detail": "peer_url must match X-Peer-Url"}
|
||||
peer = record_peer_announcement(body)
|
||||
return {"ok": True, "peer_url": peer.peer_url, "role": peer.role, "transport": peer.transport}
|
||||
|
||||
|
||||
@app.post("/api/mesh/infonet/peer-push")
|
||||
@limiter.limit("30/minute")
|
||||
@mesh_write_exempt(MeshWriteExemption.PEER_GOSSIP)
|
||||
@@ -5535,6 +5675,8 @@ async def infonet_peer_push(request: Request):
|
||||
result = infonet.ingest_events(events)
|
||||
_hydrate_gate_store_from_chain(events)
|
||||
_hydrate_dm_relay_from_chain(events)
|
||||
if any(str(event.get("event_type") or "") in {"gate_message", "dm_message"} for event in events):
|
||||
_kick_public_sync_background("peer_push_ingest")
|
||||
return {"ok": True, **result}
|
||||
|
||||
|
||||
|
||||
@@ -44,6 +44,13 @@ class Settings(BaseSettings):
|
||||
MESH_BOOTSTRAP_DISABLED: bool = False
|
||||
MESH_BOOTSTRAP_MANIFEST_PATH: str = "data/bootstrap_peers.json"
|
||||
MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY: str = ""
|
||||
MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY: str = ""
|
||||
MESH_BOOTSTRAP_SIGNER_ID: str = ""
|
||||
MESH_PEER_REGISTRY_ENABLED: bool = False
|
||||
MESH_PEER_REGISTRY_DISABLED: bool = False
|
||||
MESH_PEER_REGISTRY_STALE_S: int = 604800
|
||||
MESH_SWARM_MANIFEST_TTL_S: int = 14400
|
||||
MESH_SWARM_MANIFEST_PULL_INTERVAL_S: int = 300
|
||||
MESH_NODE_MODE: str = "participant"
|
||||
MESH_SYNC_INTERVAL_S: int = 300
|
||||
MESH_SYNC_FAILURE_BACKOFF_S: int = 60
|
||||
|
||||
@@ -287,28 +287,18 @@ def write_signed_bootstrap_manifest(
|
||||
return manifest
|
||||
|
||||
|
||||
def load_bootstrap_manifest(
|
||||
path: str | Path,
|
||||
def parse_bootstrap_manifest_dict(
|
||||
raw: dict[str, Any],
|
||||
*,
|
||||
signer_public_key_b64: str,
|
||||
now: float | None = None,
|
||||
) -> BootstrapManifest:
|
||||
manifest_path = _resolve_manifest_path(str(path))
|
||||
try:
|
||||
raw = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||
except FileNotFoundError as exc:
|
||||
raise BootstrapManifestError(f"bootstrap manifest not found: {manifest_path}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise BootstrapManifestError("bootstrap manifest is not valid JSON") from exc
|
||||
|
||||
if not isinstance(raw, dict):
|
||||
raise BootstrapManifestError("bootstrap manifest root must be an object")
|
||||
|
||||
signature = str(raw.get("signature", "") or "").strip()
|
||||
payload = {key: value for key, value in raw.items() if key != "signature"}
|
||||
if not signature:
|
||||
raise BootstrapManifestError("bootstrap manifest signature is required")
|
||||
|
||||
_verify_manifest_signature(
|
||||
payload,
|
||||
signature_b64=signature,
|
||||
@@ -325,6 +315,29 @@ def load_bootstrap_manifest(
|
||||
)
|
||||
|
||||
|
||||
def load_bootstrap_manifest(
|
||||
path: str | Path,
|
||||
*,
|
||||
signer_public_key_b64: str,
|
||||
now: float | None = None,
|
||||
) -> BootstrapManifest:
|
||||
manifest_path = _resolve_manifest_path(str(path))
|
||||
try:
|
||||
raw = json.loads(manifest_path.read_text(encoding="utf-8"))
|
||||
except FileNotFoundError as exc:
|
||||
raise BootstrapManifestError(f"bootstrap manifest not found: {manifest_path}") from exc
|
||||
except json.JSONDecodeError as exc:
|
||||
raise BootstrapManifestError("bootstrap manifest is not valid JSON") from exc
|
||||
|
||||
if not isinstance(raw, dict):
|
||||
raise BootstrapManifestError("bootstrap manifest root must be an object")
|
||||
return parse_bootstrap_manifest_dict(
|
||||
raw,
|
||||
signer_public_key_b64=signer_public_key_b64,
|
||||
now=now,
|
||||
)
|
||||
|
||||
|
||||
def load_bootstrap_manifest_from_settings(*, now: float | None = None) -> BootstrapManifest | None:
|
||||
settings = get_settings()
|
||||
if bool(getattr(settings, "MESH_BOOTSTRAP_DISABLED", False)):
|
||||
|
||||
@@ -0,0 +1,152 @@
|
||||
"""Operator-signed peer registry for private Infonet swarm discovery."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import time
|
||||
from dataclasses import asdict, dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from services.mesh.mesh_crypto import normalize_peer_url
|
||||
from services.mesh.mesh_router import peer_transport_kind
|
||||
|
||||
BACKEND_DIR = Path(__file__).resolve().parents[2]
|
||||
DATA_DIR = BACKEND_DIR / "data"
|
||||
DEFAULT_PEER_REGISTRY_PATH = DATA_DIR / "peer_registry.json"
|
||||
REGISTRY_VERSION = 1
|
||||
ALLOWED_REGISTRY_ROLES = {"participant", "relay", "seed"}
|
||||
|
||||
|
||||
@dataclass
|
||||
class RegistryPeer:
|
||||
peer_url: str
|
||||
transport: str
|
||||
role: str
|
||||
node_id: str = ""
|
||||
label: str = ""
|
||||
announced_at: int = 0
|
||||
last_seen_at: int = 0
|
||||
failure_count: int = 0
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
return asdict(self)
|
||||
|
||||
def manifest_peer(self) -> dict[str, str]:
|
||||
return {
|
||||
"peer_url": self.peer_url,
|
||||
"transport": self.transport,
|
||||
"role": self.role,
|
||||
"label": self.label or self.node_id[:16],
|
||||
}
|
||||
|
||||
|
||||
class PeerRegistry:
|
||||
def __init__(self, path: str | Path = DEFAULT_PEER_REGISTRY_PATH):
|
||||
self.path = Path(path)
|
||||
self._peers: dict[str, RegistryPeer] = {}
|
||||
|
||||
def load(self) -> list[RegistryPeer]:
|
||||
if not self.path.exists():
|
||||
self._peers = {}
|
||||
return []
|
||||
raw = json.loads(self.path.read_text(encoding="utf-8"))
|
||||
if not isinstance(raw, dict):
|
||||
raise ValueError("peer registry root must be an object")
|
||||
version = int(raw.get("version", 0) or 0)
|
||||
if version != REGISTRY_VERSION:
|
||||
raise ValueError(f"unsupported peer registry version: {version}")
|
||||
entries = raw.get("peers", [])
|
||||
if not isinstance(entries, list):
|
||||
raise ValueError("peer registry peers must be a list")
|
||||
peers: dict[str, RegistryPeer] = {}
|
||||
for entry in entries:
|
||||
if not isinstance(entry, dict):
|
||||
continue
|
||||
peer = self._normalize_entry(entry)
|
||||
peers[peer.peer_url] = peer
|
||||
self._peers = peers
|
||||
return self.records()
|
||||
|
||||
def save(self) -> None:
|
||||
self.path.parent.mkdir(parents=True, exist_ok=True)
|
||||
payload = {
|
||||
"version": REGISTRY_VERSION,
|
||||
"updated_at": int(time.time()),
|
||||
"peers": [peer.to_dict() for peer in self.records()],
|
||||
}
|
||||
self.path.write_text(
|
||||
json.dumps(payload, sort_keys=True, indent=2) + "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
def records(self) -> list[RegistryPeer]:
|
||||
return sorted(self._peers.values(), key=lambda item: (item.role, item.peer_url))
|
||||
|
||||
def upsert_announcement(
|
||||
self,
|
||||
*,
|
||||
peer_url: str,
|
||||
transport: str,
|
||||
role: str,
|
||||
node_id: str = "",
|
||||
label: str = "",
|
||||
now: float | None = None,
|
||||
) -> RegistryPeer:
|
||||
normalized = normalize_peer_url(peer_url)
|
||||
if not normalized:
|
||||
raise ValueError("peer_url is required")
|
||||
resolved_transport = str(transport or "").strip().lower() or str(peer_transport_kind(normalized) or "")
|
||||
if resolved_transport not in {"onion", "clearnet"}:
|
||||
raise ValueError("unsupported peer transport")
|
||||
resolved_role = str(role or "participant").strip().lower()
|
||||
if resolved_role not in ALLOWED_REGISTRY_ROLES:
|
||||
raise ValueError("unsupported peer role")
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
existing = self._peers.get(normalized)
|
||||
peer = RegistryPeer(
|
||||
peer_url=normalized,
|
||||
transport=resolved_transport,
|
||||
role=resolved_role,
|
||||
node_id=str(node_id or (existing.node_id if existing else "") or "").strip(),
|
||||
label=str(label or (existing.label if existing else "") or "").strip(),
|
||||
announced_at=int(existing.announced_at if existing and existing.announced_at else timestamp),
|
||||
last_seen_at=timestamp,
|
||||
failure_count=int(existing.failure_count if existing else 0),
|
||||
)
|
||||
self._peers[normalized] = peer
|
||||
return peer
|
||||
|
||||
def prune_stale(self, *, max_age_s: int, now: float | None = None) -> int:
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
removed = 0
|
||||
for peer_url, peer in list(self._peers.items()):
|
||||
if peer.role == "seed":
|
||||
continue
|
||||
last_seen = int(peer.last_seen_at or peer.announced_at or 0)
|
||||
if last_seen > 0 and timestamp - last_seen > max(60, int(max_age_s or 0)):
|
||||
del self._peers[peer_url]
|
||||
removed += 1
|
||||
return removed
|
||||
|
||||
def manifest_peers(self) -> list[dict[str, str]]:
|
||||
return [peer.manifest_peer() for peer in self.records()]
|
||||
|
||||
def _normalize_entry(self, entry: dict[str, Any]) -> RegistryPeer:
|
||||
peer_url = normalize_peer_url(str(entry.get("peer_url", "") or ""))
|
||||
if not peer_url:
|
||||
raise ValueError("registry peer_url is required")
|
||||
transport = str(entry.get("transport", "") or peer_transport_kind(peer_url) or "").strip().lower()
|
||||
role = str(entry.get("role", "participant") or "participant").strip().lower()
|
||||
if role not in ALLOWED_REGISTRY_ROLES:
|
||||
raise ValueError("registry role unsupported")
|
||||
return RegistryPeer(
|
||||
peer_url=peer_url,
|
||||
transport=transport,
|
||||
role=role,
|
||||
node_id=str(entry.get("node_id", "") or "").strip(),
|
||||
label=str(entry.get("label", "") or "").strip(),
|
||||
announced_at=int(entry.get("announced_at", 0) or 0),
|
||||
last_seen_at=int(entry.get("last_seen_at", 0) or entry.get("announced_at", 0) or 0),
|
||||
failure_count=int(entry.get("failure_count", 0) or 0),
|
||||
)
|
||||
@@ -16,7 +16,7 @@ DATA_DIR = BACKEND_DIR / "data"
|
||||
DEFAULT_PEER_STORE_PATH = DATA_DIR / "peer_store.json"
|
||||
PEER_STORE_VERSION = 1
|
||||
ALLOWED_PEER_BUCKETS = {"bootstrap", "sync", "push"}
|
||||
ALLOWED_PEER_SOURCES = {"bundle", "operator", "bootstrap_promoted", "runtime"}
|
||||
ALLOWED_PEER_SOURCES = {"bundle", "operator", "bootstrap_promoted", "runtime", "swarm"}
|
||||
ALLOWED_PEER_TRANSPORTS = {"clearnet", "onion"}
|
||||
ALLOWED_PEER_ROLES = {"participant", "relay", "seed"}
|
||||
|
||||
|
||||
@@ -0,0 +1,455 @@
|
||||
"""Private Infonet swarm discovery and immediate ledger propagation."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from services.config import get_settings
|
||||
from services.mesh.mesh_bootstrap_manifest import (
|
||||
BootstrapManifest,
|
||||
BootstrapManifestError,
|
||||
BootstrapPeer,
|
||||
build_bootstrap_manifest_payload,
|
||||
load_bootstrap_manifest,
|
||||
parse_bootstrap_manifest_dict,
|
||||
sign_bootstrap_manifest_payload,
|
||||
write_signed_bootstrap_manifest,
|
||||
)
|
||||
from services.mesh.mesh_crypto import normalize_peer_url, resolve_peer_key_for_url
|
||||
from services.mesh.mesh_peer_registry import DEFAULT_PEER_REGISTRY_PATH, PeerRegistry, RegistryPeer
|
||||
from services.mesh.mesh_peer_store import (
|
||||
DEFAULT_PEER_STORE_PATH,
|
||||
PeerStore,
|
||||
make_push_peer_record,
|
||||
make_sync_peer_record,
|
||||
)
|
||||
from services.mesh.mesh_router import parse_configured_relay_peers, peer_transport_kind
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_SWARM_LOCK = threading.Lock()
|
||||
_LAST_MANIFEST_PULL_AT = 0.0
|
||||
_LAST_ANNOUNCE_AT = 0.0
|
||||
|
||||
|
||||
def peer_registry_enabled() -> bool:
|
||||
settings = get_settings()
|
||||
if bool(getattr(settings, "MESH_PEER_REGISTRY_DISABLED", False)):
|
||||
return False
|
||||
if str(getattr(settings, "MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", "") or "").strip():
|
||||
return True
|
||||
return bool(getattr(settings, "MESH_PEER_REGISTRY_ENABLED", False))
|
||||
|
||||
|
||||
def _manifest_path() -> str:
|
||||
return str(getattr(get_settings(), "MESH_BOOTSTRAP_MANIFEST_PATH", "") or "data/bootstrap_peers.json")
|
||||
|
||||
|
||||
def _signer_public_key_b64() -> str:
|
||||
return str(getattr(get_settings(), "MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", "") or "").strip()
|
||||
|
||||
|
||||
def _signer_private_key_b64() -> str:
|
||||
return str(getattr(settings, "MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", "") or "").strip() if (settings := get_settings()) else ""
|
||||
|
||||
|
||||
def _signer_id() -> str:
|
||||
configured = str(getattr(get_settings(), "MESH_BOOTSTRAP_SIGNER_ID", "") or "").strip()
|
||||
return configured or "shadowbroker-seed"
|
||||
|
||||
|
||||
def _private_transport_required() -> bool:
|
||||
return not bool(getattr(get_settings(), "MESH_INFONET_ALLOW_CLEARNET_SYNC", False))
|
||||
|
||||
|
||||
def _configured_seed_peer_urls() -> list[str]:
|
||||
settings = get_settings()
|
||||
primary = str(getattr(settings, "MESH_BOOTSTRAP_SEED_PEERS", "") or "").strip()
|
||||
legacy = str(getattr(settings, "MESH_DEFAULT_SYNC_PEERS", "") or "").strip()
|
||||
return parse_configured_relay_peers(primary or legacy)
|
||||
|
||||
|
||||
def _seed_manifest_peers() -> list[dict[str, str]]:
|
||||
peers: list[dict[str, str]] = []
|
||||
for peer_url in _configured_seed_peer_urls():
|
||||
transport = str(peer_transport_kind(peer_url) or "")
|
||||
if _private_transport_required() and transport != "onion":
|
||||
continue
|
||||
peers.append(
|
||||
{
|
||||
"peer_url": peer_url,
|
||||
"transport": transport,
|
||||
"role": "seed",
|
||||
"label": "ShadowBroker bootstrap seed",
|
||||
}
|
||||
)
|
||||
return peers
|
||||
|
||||
|
||||
def publish_registry_manifest(*, now: float | None = None, persist: bool = True) -> BootstrapManifest:
|
||||
private_key = _signer_private_key_b64()
|
||||
public_key = _signer_public_key_b64()
|
||||
if not private_key or not public_key:
|
||||
raise BootstrapManifestError("bootstrap signer keys are required to publish swarm manifest")
|
||||
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
registry = PeerRegistry(DEFAULT_PEER_REGISTRY_PATH)
|
||||
try:
|
||||
registry.load()
|
||||
except Exception:
|
||||
registry = PeerRegistry(DEFAULT_PEER_REGISTRY_PATH)
|
||||
stale_s = int(getattr(get_settings(), "MESH_PEER_REGISTRY_STALE_S", 0) or 7 * 86400)
|
||||
if stale_s > 0:
|
||||
registry.prune_stale(max_age_s=stale_s, now=timestamp)
|
||||
|
||||
peers = _seed_manifest_peers() + registry.manifest_peers()
|
||||
ttl_s = int(getattr(get_settings(), "MESH_SWARM_MANIFEST_TTL_S", 0) or 4 * 3600)
|
||||
payload = build_bootstrap_manifest_payload(
|
||||
signer_id=_signer_id(),
|
||||
peers=peers,
|
||||
issued_at=timestamp,
|
||||
valid_until=timestamp + max(300, ttl_s),
|
||||
)
|
||||
signature = sign_bootstrap_manifest_payload(payload, signer_private_key_b64=private_key)
|
||||
manifest = BootstrapManifest(
|
||||
version=int(payload["version"]),
|
||||
issued_at=int(payload["issued_at"]),
|
||||
valid_until=int(payload["valid_until"]),
|
||||
signer_id=str(payload["signer_id"]),
|
||||
peers=tuple(BootstrapPeer(**dict(peer)) for peer in peers),
|
||||
signature=signature,
|
||||
)
|
||||
if persist:
|
||||
registry.save()
|
||||
write_signed_bootstrap_manifest(
|
||||
_manifest_path(),
|
||||
signer_id=manifest.signer_id,
|
||||
signer_private_key_b64=private_key,
|
||||
peers=[peer.to_dict() for peer in manifest.peers],
|
||||
issued_at=manifest.issued_at,
|
||||
valid_until=manifest.valid_until,
|
||||
)
|
||||
return manifest
|
||||
|
||||
|
||||
def load_live_bootstrap_manifest(*, now: float | None = None) -> BootstrapManifest | None:
|
||||
public_key = _signer_public_key_b64()
|
||||
if not public_key:
|
||||
return None
|
||||
if peer_registry_enabled():
|
||||
try:
|
||||
return publish_registry_manifest(now=now, persist=False)
|
||||
except BootstrapManifestError:
|
||||
logger.warning("live registry manifest unavailable", exc_info=True)
|
||||
try:
|
||||
return load_bootstrap_manifest(_manifest_path(), signer_public_key_b64=public_key, now=now)
|
||||
except BootstrapManifestError:
|
||||
return None
|
||||
|
||||
|
||||
def _upsert_swarm_peer_into_store(
|
||||
*,
|
||||
peer_url: str,
|
||||
transport: str,
|
||||
role: str,
|
||||
label: str = "",
|
||||
signer_id: str = "",
|
||||
now: float | None = None,
|
||||
) -> None:
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
if _private_transport_required() and transport != "onion":
|
||||
return
|
||||
store = PeerStore(DEFAULT_PEER_STORE_PATH)
|
||||
try:
|
||||
store.load()
|
||||
except Exception:
|
||||
store = PeerStore(DEFAULT_PEER_STORE_PATH)
|
||||
store.upsert(
|
||||
make_sync_peer_record(
|
||||
peer_url=peer_url,
|
||||
transport=transport,
|
||||
role=role,
|
||||
source="swarm",
|
||||
label=label,
|
||||
signer_id=signer_id,
|
||||
now=timestamp,
|
||||
)
|
||||
)
|
||||
store.upsert(
|
||||
make_push_peer_record(
|
||||
peer_url=peer_url,
|
||||
transport=transport,
|
||||
role=role if role != "seed" else "relay",
|
||||
source="swarm",
|
||||
label=label,
|
||||
now=timestamp,
|
||||
)
|
||||
)
|
||||
store.save()
|
||||
|
||||
|
||||
def record_peer_announcement(body: dict[str, Any], *, now: float | None = None) -> RegistryPeer:
|
||||
if not peer_registry_enabled():
|
||||
raise ValueError("peer registry is not enabled on this node")
|
||||
registry = PeerRegistry(DEFAULT_PEER_REGISTRY_PATH)
|
||||
try:
|
||||
registry.load()
|
||||
except Exception:
|
||||
registry = PeerRegistry(DEFAULT_PEER_REGISTRY_PATH)
|
||||
peer = registry.upsert_announcement(
|
||||
peer_url=str(body.get("peer_url", "") or ""),
|
||||
transport=str(body.get("transport", "") or ""),
|
||||
role=str(body.get("role", "participant") or "participant"),
|
||||
node_id=str(body.get("node_id", "") or ""),
|
||||
label=str(body.get("label", "") or ""),
|
||||
now=now,
|
||||
)
|
||||
registry.save()
|
||||
_upsert_swarm_peer_into_store(
|
||||
peer_url=peer.peer_url,
|
||||
transport=peer.transport,
|
||||
role=peer.role,
|
||||
label=peer.label,
|
||||
signer_id=_signer_id(),
|
||||
now=now,
|
||||
)
|
||||
try:
|
||||
publish_registry_manifest(now=now, persist=True)
|
||||
except Exception:
|
||||
logger.warning("failed to republish swarm manifest after announce", exc_info=True)
|
||||
return peer
|
||||
|
||||
|
||||
def merge_manifest_into_peer_store(manifest: BootstrapManifest, *, now: float | None = None) -> int:
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
merged = 0
|
||||
for peer in manifest.peers:
|
||||
if _private_transport_required() and peer.transport != "onion":
|
||||
continue
|
||||
_upsert_swarm_peer_into_store(
|
||||
peer_url=peer.peer_url,
|
||||
transport=peer.transport,
|
||||
role=peer.role,
|
||||
label=peer.label,
|
||||
signer_id=manifest.signer_id,
|
||||
now=timestamp,
|
||||
)
|
||||
merged += 1
|
||||
return merged
|
||||
|
||||
|
||||
def fetch_remote_bootstrap_manifest(seed_peer_url: str, *, now: float | None = None) -> BootstrapManifest | None:
|
||||
import requests
|
||||
|
||||
public_key = _signer_public_key_b64()
|
||||
if not public_key:
|
||||
return None
|
||||
normalized = normalize_peer_url(seed_peer_url)
|
||||
if not normalized:
|
||||
return None
|
||||
|
||||
from main import _infonet_peer_requests_proxies
|
||||
|
||||
proxies = _infonet_peer_requests_proxies(normalized)
|
||||
timeout = int(getattr(get_settings(), "MESH_SYNC_TIMEOUT_S", 0) or 45)
|
||||
request_kwargs: dict[str, Any] = {"timeout": timeout}
|
||||
if proxies:
|
||||
request_kwargs["proxies"] = proxies
|
||||
try:
|
||||
response = requests.get(f"{normalized}/api/mesh/infonet/bootstrap-manifest", **request_kwargs)
|
||||
except Exception as exc:
|
||||
logger.debug("swarm manifest fetch failed for %s: %s", normalized, exc)
|
||||
return None
|
||||
if response.status_code != 200:
|
||||
return None
|
||||
try:
|
||||
raw = response.json()
|
||||
except Exception:
|
||||
return None
|
||||
if not isinstance(raw, dict) or raw.get("ok") is False:
|
||||
return None
|
||||
manifest_body = dict(raw.get("manifest") or raw)
|
||||
try:
|
||||
return parse_bootstrap_manifest_dict(
|
||||
manifest_body,
|
||||
signer_public_key_b64=public_key,
|
||||
now=now,
|
||||
)
|
||||
except BootstrapManifestError:
|
||||
return None
|
||||
|
||||
|
||||
def refresh_swarm_manifest_from_seeds(*, now: float | None = None, force: bool = False) -> dict[str, Any]:
|
||||
global _LAST_MANIFEST_PULL_AT
|
||||
interval_s = int(getattr(get_settings(), "MESH_SWARM_MANIFEST_PULL_INTERVAL_S", 0) or 300)
|
||||
timestamp = float(now if now is not None else time.time())
|
||||
with _SWARM_LOCK:
|
||||
if not force and _LAST_MANIFEST_PULL_AT and timestamp - _LAST_MANIFEST_PULL_AT < max(30, interval_s):
|
||||
return {"ok": True, "skipped": True, "reason": "manifest_pull_interval"}
|
||||
_LAST_MANIFEST_PULL_AT = timestamp
|
||||
|
||||
if not _signer_public_key_b64():
|
||||
return {"ok": False, "detail": "MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY is not configured"}
|
||||
|
||||
last_error = "manifest fetch failed"
|
||||
for seed_url in _configured_seed_peer_urls():
|
||||
manifest = fetch_remote_bootstrap_manifest(seed_url, now=timestamp)
|
||||
if manifest is None:
|
||||
continue
|
||||
try:
|
||||
merged = merge_manifest_into_peer_store(manifest, now=timestamp)
|
||||
return {
|
||||
"ok": True,
|
||||
"seed_peer_url": seed_url,
|
||||
"peer_count": len(manifest.peers),
|
||||
"merged_peer_count": merged,
|
||||
}
|
||||
except Exception as exc:
|
||||
last_error = str(exc or type(exc).__name__)
|
||||
return {"ok": False, "detail": last_error}
|
||||
|
||||
|
||||
def announce_local_peer_to_seeds(*, now: float | None = None, force: bool = False) -> dict[str, Any]:
|
||||
global _LAST_ANNOUNCE_AT
|
||||
import hashlib as _hashlib_mod
|
||||
import hmac as _hmac_mod
|
||||
import requests
|
||||
|
||||
from main import _infonet_peer_requests_proxies, _local_infonet_peer_url, _participant_node_enabled
|
||||
|
||||
if not _participant_node_enabled():
|
||||
return {"ok": False, "detail": "participant node disabled"}
|
||||
peer_url = _local_infonet_peer_url()
|
||||
if not peer_url:
|
||||
return {"ok": False, "detail": "local peer URL is not ready"}
|
||||
peer_key = resolve_peer_key_for_url(peer_url)
|
||||
if not peer_key:
|
||||
return {"ok": False, "detail": "peer HMAC secret is not configured"}
|
||||
|
||||
timestamp = float(now if now is not None else time.time())
|
||||
with _SWARM_LOCK:
|
||||
if not force and _LAST_ANNOUNCE_AT and timestamp - _LAST_ANNOUNCE_AT < 300:
|
||||
return {"ok": True, "skipped": True, "reason": "announce_interval"}
|
||||
_LAST_ANNOUNCE_AT = timestamp
|
||||
|
||||
transport = str(peer_transport_kind(peer_url) or "onion")
|
||||
body = {
|
||||
"peer_url": peer_url,
|
||||
"transport": transport,
|
||||
"role": "participant",
|
||||
"node_id": "",
|
||||
"label": "",
|
||||
"ts": int(timestamp),
|
||||
}
|
||||
body_bytes = json.dumps(body, sort_keys=True, separators=(",", ":"), ensure_ascii=False).encode("utf-8")
|
||||
hmac_hex = _hmac_mod.new(peer_key, body_bytes, _hashlib_mod.sha256).hexdigest()
|
||||
timeout = int(getattr(get_settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 0) or 45)
|
||||
results: list[dict[str, Any]] = []
|
||||
for seed_url in _configured_seed_peer_urls():
|
||||
normalized = normalize_peer_url(seed_url)
|
||||
if not normalized:
|
||||
continue
|
||||
proxies = _infonet_peer_requests_proxies(normalized)
|
||||
request_kwargs: dict[str, Any] = {
|
||||
"data": body_bytes,
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
"X-Peer-Url": peer_url,
|
||||
"X-Peer-HMAC": hmac_hex,
|
||||
},
|
||||
"timeout": timeout,
|
||||
}
|
||||
if proxies:
|
||||
request_kwargs["proxies"] = proxies
|
||||
try:
|
||||
response = requests.post(
|
||||
f"{normalized}/api/mesh/infonet/peer-announce",
|
||||
**request_kwargs,
|
||||
)
|
||||
results.append(
|
||||
{
|
||||
"seed_peer_url": normalized,
|
||||
"status_code": int(response.status_code),
|
||||
"ok": response.status_code == 200,
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
results.append({"seed_peer_url": normalized, "ok": False, "detail": str(exc)})
|
||||
ok = any(bool(item.get("ok")) for item in results)
|
||||
return {"ok": ok, "peer_url": peer_url, "results": results}
|
||||
|
||||
|
||||
def push_infonet_events_to_http_peers(events: list[dict[str, Any]]) -> dict[str, Any]:
|
||||
import hashlib as _hashlib_mod
|
||||
import hmac as _hmac_mod
|
||||
import requests
|
||||
|
||||
from main import (
|
||||
_filter_infonet_peer_urls,
|
||||
_infonet_peer_requests_proxies,
|
||||
_local_infonet_peer_url,
|
||||
_participant_node_enabled,
|
||||
_record_public_push_result,
|
||||
)
|
||||
from services.mesh.mesh_router import authenticated_push_peer_urls
|
||||
|
||||
if not _participant_node_enabled() or not events:
|
||||
return {"ok": False, "detail": "nothing to push"}
|
||||
peers = _filter_infonet_peer_urls(authenticated_push_peer_urls())
|
||||
if not peers:
|
||||
return {"ok": False, "detail": "no push peers configured"}
|
||||
|
||||
sender_url = _local_infonet_peer_url()
|
||||
peer_key = resolve_peer_key_for_url(sender_url)
|
||||
if not peer_key:
|
||||
return {"ok": False, "detail": "peer HMAC secret is not configured"}
|
||||
|
||||
body_bytes = json.dumps(
|
||||
{"events": events},
|
||||
sort_keys=True,
|
||||
separators=(",", ":"),
|
||||
ensure_ascii=False,
|
||||
).encode("utf-8")
|
||||
hmac_hex = _hmac_mod.new(peer_key, body_bytes, _hashlib_mod.sha256).hexdigest()
|
||||
timeout = int(getattr(get_settings(), "MESH_RELAY_PUSH_TIMEOUT_S", 0) or 45)
|
||||
results: list[dict[str, Any]] = []
|
||||
for peer_url in peers:
|
||||
normalized = normalize_peer_url(peer_url)
|
||||
if not normalized:
|
||||
continue
|
||||
proxies = _infonet_peer_requests_proxies(normalized)
|
||||
request_kwargs: dict[str, Any] = {
|
||||
"data": body_bytes,
|
||||
"headers": {
|
||||
"Content-Type": "application/json",
|
||||
"X-Peer-Url": sender_url,
|
||||
"X-Peer-HMAC": hmac_hex,
|
||||
},
|
||||
"timeout": timeout,
|
||||
}
|
||||
if proxies:
|
||||
request_kwargs["proxies"] = proxies
|
||||
try:
|
||||
response = requests.post(f"{normalized}/api/mesh/infonet/peer-push", **request_kwargs)
|
||||
results.append(
|
||||
{
|
||||
"peer_url": normalized,
|
||||
"ok": response.status_code == 200,
|
||||
"status_code": int(response.status_code),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
results.append({"peer_url": normalized, "ok": False, "detail": str(exc)})
|
||||
ok = any(bool(item.get("ok")) for item in results)
|
||||
event_id = str((events[-1] or {}).get("event_id", "") or "")
|
||||
_record_public_push_result(
|
||||
event_id,
|
||||
ok=ok,
|
||||
error="" if ok else "immediate peer push failed",
|
||||
results=results,
|
||||
)
|
||||
return {"ok": ok, "results": results}
|
||||
@@ -402,6 +402,57 @@ def test_public_sync_cycle_allows_first_node_without_peers(tmp_path, monkeypatch
|
||||
assert result.consecutive_failures == 0
|
||||
|
||||
|
||||
def test_sync_from_peer_explains_stale_genesis_chain(monkeypatch):
|
||||
import main
|
||||
from services.mesh import mesh_hashchain
|
||||
|
||||
class FakeInfonet:
|
||||
events = []
|
||||
head_hash = mesh_hashchain.GENESIS_HASH
|
||||
|
||||
def get_locator(self):
|
||||
return [mesh_hashchain.GENESIS_HASH]
|
||||
|
||||
def ingest_events(self, events):
|
||||
return {
|
||||
"accepted": 0,
|
||||
"duplicates": 0,
|
||||
"rejected": [
|
||||
{"index": 0, "reason": "Event timestamp outside freshness window"},
|
||||
{"index": 1, "reason": "prev_hash does not match head"},
|
||||
],
|
||||
}
|
||||
|
||||
stale_events = [
|
||||
{
|
||||
"event_id": "old-1",
|
||||
"prev_hash": mesh_hashchain.GENESIS_HASH,
|
||||
"event_type": "message",
|
||||
"timestamp": 1,
|
||||
},
|
||||
{
|
||||
"event_id": "old-2",
|
||||
"prev_hash": "old-1",
|
||||
"event_type": "message",
|
||||
"timestamp": 2,
|
||||
},
|
||||
]
|
||||
|
||||
monkeypatch.setattr(mesh_hashchain, "infonet", FakeInfonet())
|
||||
monkeypatch.setattr(main, "_peer_sync_response", lambda *_args, **_kwargs: {"events": stale_events})
|
||||
monkeypatch.setattr(main, "_hydrate_gate_store_from_chain", lambda *_args, **_kwargs: None)
|
||||
monkeypatch.setattr(main, "_hydrate_dm_relay_from_chain", lambda *_args, **_kwargs: None)
|
||||
|
||||
ok, error, forked, retry_after_s = main._sync_from_peer("https://node.shadowbroker.info")
|
||||
|
||||
assert ok is False
|
||||
assert forked is False
|
||||
assert retry_after_s == 0
|
||||
assert "Event timestamp outside freshness window" in error
|
||||
assert "expired genesis chain" in error
|
||||
assert "MESH_INGEST_EVENT_MAX_AGE_S=0" in error
|
||||
|
||||
|
||||
def test_headless_mesh_node_runtime_is_explicit(monkeypatch):
|
||||
import main
|
||||
|
||||
|
||||
@@ -0,0 +1,219 @@
|
||||
import json
|
||||
import time
|
||||
|
||||
import pytest
|
||||
from httpx import ASGITransport, AsyncClient
|
||||
|
||||
from services.mesh.mesh_bootstrap_manifest import (
|
||||
BootstrapManifestError,
|
||||
generate_bootstrap_signer,
|
||||
parse_bootstrap_manifest_dict,
|
||||
write_signed_bootstrap_manifest,
|
||||
)
|
||||
from services.mesh.mesh_peer_registry import PeerRegistry
|
||||
from services.mesh.mesh_peer_store import DEFAULT_PEER_STORE_PATH, PeerStore
|
||||
from services.mesh.mesh_swarm_runtime import (
|
||||
merge_manifest_into_peer_store,
|
||||
peer_registry_enabled,
|
||||
publish_registry_manifest,
|
||||
record_peer_announcement,
|
||||
)
|
||||
|
||||
|
||||
def test_peer_registry_upsert_and_prune(tmp_path, monkeypatch):
|
||||
registry_path = tmp_path / "peer_registry.json"
|
||||
monkeypatch.setattr(
|
||||
"services.mesh.mesh_peer_registry.DEFAULT_PEER_REGISTRY_PATH",
|
||||
registry_path,
|
||||
)
|
||||
registry = PeerRegistry(registry_path)
|
||||
peer = registry.upsert_announcement(
|
||||
peer_url="http://abc123.onion:8000",
|
||||
transport="onion",
|
||||
role="participant",
|
||||
node_id="!sb_test",
|
||||
now=1_750_000_000,
|
||||
)
|
||||
registry.save()
|
||||
assert peer.peer_url == "http://abc123.onion:8000"
|
||||
assert registry.prune_stale(max_age_s=3600, now=1_750_000_500) == 0
|
||||
assert registry.prune_stale(max_age_s=60, now=1_750_010_000) == 1
|
||||
|
||||
|
||||
def test_publish_registry_manifest_round_trip(tmp_path, monkeypatch):
|
||||
signer = generate_bootstrap_signer()
|
||||
manifest_path = tmp_path / "bootstrap_peers.json"
|
||||
registry_path = tmp_path / "peer_registry.json"
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", signer["public_key_b64"])
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", signer["private_key_b64"])
|
||||
monkeypatch.setenv("MESH_PEER_REGISTRY_ENABLED", "true")
|
||||
monkeypatch.setenv(
|
||||
"MESH_BOOTSTRAP_SEED_PEERS",
|
||||
"http://seedpeer.onion:8000",
|
||||
)
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_MANIFEST_PATH", str(manifest_path))
|
||||
monkeypatch.setattr(
|
||||
"services.mesh.mesh_peer_registry.DEFAULT_PEER_REGISTRY_PATH",
|
||||
registry_path,
|
||||
)
|
||||
from services.config import get_settings
|
||||
|
||||
get_settings.cache_clear()
|
||||
try:
|
||||
assert peer_registry_enabled() is True
|
||||
manifest = publish_registry_manifest(now=1_750_000_000, persist=True)
|
||||
assert manifest_path.exists()
|
||||
parsed = parse_bootstrap_manifest_dict(
|
||||
json.loads(manifest_path.read_text(encoding="utf-8")),
|
||||
signer_public_key_b64=signer["public_key_b64"],
|
||||
now=1_750_000_000,
|
||||
)
|
||||
assert parsed.signer_id == manifest.signer_id
|
||||
assert any(peer.role == "seed" for peer in parsed.peers)
|
||||
finally:
|
||||
get_settings.cache_clear()
|
||||
|
||||
|
||||
def test_record_peer_announcement_updates_store(tmp_path, monkeypatch):
|
||||
signer = generate_bootstrap_signer()
|
||||
registry_path = tmp_path / "peer_registry.json"
|
||||
peer_store_path = tmp_path / "peer_store.json"
|
||||
manifest_path = tmp_path / "bootstrap_peers.json"
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", signer["public_key_b64"])
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", signer["private_key_b64"])
|
||||
monkeypatch.setenv("MESH_PEER_REGISTRY_ENABLED", "true")
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_MANIFEST_PATH", str(manifest_path))
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SEED_PEERS", "http://seedpeer.onion:8000")
|
||||
monkeypatch.setattr(
|
||||
"services.mesh.mesh_peer_registry.DEFAULT_PEER_REGISTRY_PATH",
|
||||
registry_path,
|
||||
)
|
||||
monkeypatch.setattr("services.mesh.mesh_peer_store.DEFAULT_PEER_STORE_PATH", peer_store_path)
|
||||
monkeypatch.setattr("services.mesh.mesh_swarm_runtime.DEFAULT_PEER_STORE_PATH", peer_store_path)
|
||||
from services.config import get_settings
|
||||
|
||||
get_settings.cache_clear()
|
||||
try:
|
||||
peer = record_peer_announcement(
|
||||
{
|
||||
"peer_url": "http://participant.onion:8000",
|
||||
"transport": "onion",
|
||||
"role": "participant",
|
||||
},
|
||||
now=1_750_000_000,
|
||||
)
|
||||
assert peer.peer_url == "http://participant.onion:8000"
|
||||
store = PeerStore(peer_store_path)
|
||||
store.load()
|
||||
buckets = {record.bucket for record in store.records()}
|
||||
assert buckets == {"push", "sync"}
|
||||
assert any(record.source == "swarm" for record in store.records())
|
||||
finally:
|
||||
get_settings.cache_clear()
|
||||
|
||||
|
||||
def test_merge_manifest_into_peer_store(tmp_path, monkeypatch):
|
||||
signer = generate_bootstrap_signer()
|
||||
peer_store_path = tmp_path / "peer_store.json"
|
||||
manifest_path = tmp_path / "bootstrap_peers.json"
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", signer["public_key_b64"])
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", signer["private_key_b64"])
|
||||
monkeypatch.setattr("services.mesh.mesh_peer_store.DEFAULT_PEER_STORE_PATH", peer_store_path)
|
||||
monkeypatch.setattr("services.mesh.mesh_swarm_runtime.DEFAULT_PEER_STORE_PATH", peer_store_path)
|
||||
manifest = write_signed_bootstrap_manifest(
|
||||
manifest_path,
|
||||
signer_id="test-signer",
|
||||
signer_private_key_b64=signer["private_key_b64"],
|
||||
peers=[
|
||||
{
|
||||
"peer_url": "http://relay.onion:8000",
|
||||
"transport": "onion",
|
||||
"role": "relay",
|
||||
"label": "relay-a",
|
||||
}
|
||||
],
|
||||
issued_at=1_750_000_000,
|
||||
valid_until=1_750_360_000,
|
||||
)
|
||||
merged = merge_manifest_into_peer_store(manifest, now=1_750_000_000)
|
||||
assert merged == 1
|
||||
store = PeerStore(peer_store_path)
|
||||
store.load()
|
||||
assert len(store.records()) == 2
|
||||
|
||||
|
||||
def test_parse_bootstrap_manifest_dict_rejects_expired():
|
||||
signer = generate_bootstrap_signer()
|
||||
manifest_path = None
|
||||
payload = {
|
||||
"version": 1,
|
||||
"issued_at": 1,
|
||||
"valid_until": 2,
|
||||
"signer_id": "test",
|
||||
"peers": [
|
||||
{
|
||||
"peer_url": "http://seedpeer.onion:8000",
|
||||
"transport": "onion",
|
||||
"role": "seed",
|
||||
}
|
||||
],
|
||||
}
|
||||
from services.mesh.mesh_bootstrap_manifest import build_bootstrap_manifest_payload, sign_bootstrap_manifest_payload
|
||||
|
||||
signed_payload = build_bootstrap_manifest_payload(
|
||||
signer_id="test",
|
||||
peers=payload["peers"],
|
||||
issued_at=1,
|
||||
valid_until=2,
|
||||
)
|
||||
signature = sign_bootstrap_manifest_payload(
|
||||
signed_payload,
|
||||
signer_private_key_b64=signer["private_key_b64"],
|
||||
)
|
||||
raw = dict(signed_payload)
|
||||
raw["signature"] = signature
|
||||
with pytest.raises(BootstrapManifestError, match="expired"):
|
||||
parse_bootstrap_manifest_dict(
|
||||
raw,
|
||||
signer_public_key_b64=signer["public_key_b64"],
|
||||
now=time.time(),
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_bootstrap_manifest_endpoint_serves_live_registry(tmp_path, monkeypatch):
|
||||
import main
|
||||
|
||||
signer = generate_bootstrap_signer()
|
||||
registry_path = tmp_path / "peer_registry.json"
|
||||
manifest_path = tmp_path / "bootstrap_peers.json"
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY", signer["public_key_b64"])
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SIGNER_PRIVATE_KEY", signer["private_key_b64"])
|
||||
monkeypatch.setenv("MESH_PEER_REGISTRY_ENABLED", "true")
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_MANIFEST_PATH", str(manifest_path))
|
||||
monkeypatch.setenv("MESH_BOOTSTRAP_SEED_PEERS", "http://seedpeer.onion:8000")
|
||||
monkeypatch.setattr("services.mesh.mesh_peer_registry.DEFAULT_PEER_REGISTRY_PATH", registry_path)
|
||||
from services.config import get_settings
|
||||
|
||||
get_settings.cache_clear()
|
||||
try:
|
||||
now = int(time.time())
|
||||
record_peer_announcement(
|
||||
{
|
||||
"peer_url": "http://participant.onion:8000",
|
||||
"transport": "onion",
|
||||
"role": "participant",
|
||||
},
|
||||
now=now,
|
||||
)
|
||||
async with AsyncClient(transport=ASGITransport(app=main.app), base_url="http://test") as ac:
|
||||
response = await ac.get("/api/mesh/infonet/bootstrap-manifest")
|
||||
assert response.status_code == 200
|
||||
body = response.json()
|
||||
assert body["ok"] is True
|
||||
manifest = body["manifest"]
|
||||
peer_urls = [peer["peer_url"] for peer in manifest["peers"]]
|
||||
assert "http://participant.onion:8000" in peer_urls
|
||||
assert "http://seedpeer.onion:8000" in peer_urls
|
||||
finally:
|
||||
get_settings.cache_clear()
|
||||
@@ -4,6 +4,21 @@ services:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: ./backend/Dockerfile
|
||||
environment:
|
||||
# Private Infonet swarm: only the seed onion is required in config.
|
||||
# Other participants (e.g. Pete) are discovered via signed manifest pull.
|
||||
MESH_ARTI_ENABLED: "true"
|
||||
MESH_ARTI_SOCKS_PORT: "9050"
|
||||
MESH_SYNC_TIMEOUT_S: "45"
|
||||
MESH_RELAY_PUSH_TIMEOUT_S: "45"
|
||||
MESH_SYNC_MAX_PEERS_PER_CYCLE: "5"
|
||||
MESH_INFONET_ALLOW_CLEARNET_SYNC: "false"
|
||||
MESH_PUBLIC_PEER_URL: ""
|
||||
MESH_BOOTSTRAP_SEED_PEERS: "http://gqpbunqbgtkcqilvclm3xrkt3zowjyl3s62kkktvojgvxzizamvbrqid.onion:8000"
|
||||
MESH_RELAY_PEERS: ""
|
||||
MESH_DEFAULT_SYNC_PEERS: ""
|
||||
MESH_BOOTSTRAP_SIGNER_PUBLIC_KEY: "ul1d0kj/ODPIp0OhHzX8eLAVXzJ3CVvzW1vn2IC6q3I="
|
||||
MESH_SWARM_MANIFEST_PULL_INTERVAL_S: "300"
|
||||
|
||||
frontend:
|
||||
build:
|
||||
|
||||
@@ -60,6 +60,10 @@ export default function BootstrapView({ marketId, onBack }: BootstrapViewProps)
|
||||
nodeStatus?.bootstrap?.bootstrap_seed_peer_count ?? nodeStatus?.bootstrap?.default_sync_peer_count ?? 0,
|
||||
);
|
||||
const syncPeerCount = Number(nodeStatus?.bootstrap?.sync_peer_count || 0);
|
||||
const swarmSyncPeerCount = Number(nodeStatus?.bootstrap?.swarm_sync_peer_count || 0);
|
||||
const manifestLoaded = Boolean(nodeStatus?.bootstrap?.manifest_loaded);
|
||||
const swarmPull = nodeStatus?.bootstrap?.swarm_manifest_pull;
|
||||
const swarmPullOk = Boolean(swarmPull?.ok) && !swarmPull?.skipped;
|
||||
const lastPeerUrl = String(nodeStatus?.sync_runtime?.last_peer_url || '').trim();
|
||||
const privateTransportRequired = Boolean(nodeStatus?.private_transport_required);
|
||||
|
||||
@@ -146,7 +150,7 @@ export default function BootstrapView({ marketId, onBack }: BootstrapViewProps)
|
||||
Refresh
|
||||
</button>
|
||||
</div>
|
||||
<div className="grid grid-cols-1 md:grid-cols-3 gap-2 text-xs">
|
||||
<div className="grid grid-cols-1 md:grid-cols-2 xl:grid-cols-4 gap-2 text-xs">
|
||||
<div>
|
||||
<div className="text-gray-500">Transport</div>
|
||||
<div className="text-cyan-300 font-mono break-all">
|
||||
@@ -163,6 +167,19 @@ export default function BootstrapView({ marketId, onBack }: BootstrapViewProps)
|
||||
<div className="text-gray-500">Sync Path</div>
|
||||
<div className="text-white font-mono">
|
||||
{syncPeerCount} peers / {seedPeerCount} seeds
|
||||
{swarmSyncPeerCount > 0 ? ` (${swarmSyncPeerCount} swarm)` : ''}
|
||||
</div>
|
||||
</div>
|
||||
<div>
|
||||
<div className="text-gray-500">Swarm Manifest</div>
|
||||
<div className={swarmPullOk || manifestLoaded ? 'text-green-400' : 'text-amber-400'}>
|
||||
{swarmPullOk
|
||||
? `LIVE (${Number(swarmPull?.merged_peer_count || swarmPull?.peer_count || 0)} peers)`
|
||||
: manifestLoaded
|
||||
? 'LOCAL FILE'
|
||||
: swarmPull?.skipped
|
||||
? 'WAITING'
|
||||
: 'NOT LOADED'}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
@@ -92,7 +92,7 @@ export default function NetworkStats() {
|
||||
<span className="text-gray-700">|</span>
|
||||
<span>EVENTS <span className="text-white">{stats.infonetEvents}</span></span>
|
||||
<span className="text-gray-700">|</span>
|
||||
<span title="Configured peers this node pulls from. Usually this is just the seed unless another device is added as a sync peer.">
|
||||
<span title="Peers this node syncs from (seed + swarm-discovered participants).">
|
||||
SYNC PEERS <span className="text-white">{stats.syncPeers}</span>
|
||||
</span>
|
||||
{stats.seedPeers > stats.syncPeers ? (
|
||||
|
||||
@@ -25,6 +25,17 @@ export interface InfonetBootstrapSnapshot {
|
||||
bootstrap_seed_peer_count?: number;
|
||||
default_sync_peer_count?: number;
|
||||
last_bootstrap_error?: string;
|
||||
swarm_sync_peer_count?: number;
|
||||
swarm_push_peer_count?: number;
|
||||
swarm_manifest_pull?: {
|
||||
ok?: boolean;
|
||||
skipped?: boolean;
|
||||
reason?: string;
|
||||
detail?: string;
|
||||
peer_count?: number;
|
||||
merged_peer_count?: number;
|
||||
seed_peer_url?: string;
|
||||
};
|
||||
}
|
||||
|
||||
export interface InfonetSyncRuntimeSnapshot {
|
||||
|
||||
Reference in New Issue
Block a user