From c266c5ff5e708ce908a5709c111b8515b3fd2cf8 Mon Sep 17 00:00:00 2001 From: BigBodyCobain <43977454+BigBodyCobain@users.noreply.github.com> Date: Fri, 12 Jun 2026 03:09:02 -0600 Subject: [PATCH] Close v1 swarm: fresh-participant smoke test, join retries, README fleet note. Retry announce/manifest while Tor circuits warm on NODE and startup bootstrap. Add verify_swarm_fresh_participant.py for empty-volume GHCR smoke tests. Co-authored-by: Cursor --- README.md | 2 + backend/main.py | 5 +- backend/services/mesh/mesh_swarm_runtime.py | 46 ++++ backend/services/openclaw_infonet.py | 29 +-- backend/tests/mesh/test_swarm_join_retries.py | 25 ++ scripts/verify_swarm_fresh_participant.py | 224 ++++++++++++++++++ 6 files changed, 312 insertions(+), 19 deletions(-) create mode 100644 backend/tests/mesh/test_swarm_join_retries.py create mode 100644 scripts/verify_swarm_fresh_participant.py diff --git a/README.md b/README.md index 1225cf6..e4ddce2 100644 --- a/README.md +++ b/README.md @@ -91,6 +91,8 @@ Both paths produce identical containers — same source, same CI, same images by Open `http://localhost:3000` to view the dashboard! *(Requires [Docker Desktop](https://www.docker.com/products/docker-desktop/) or Docker Engine)* +> **Join the private InfoNet swarm (sb-testnet-0):** Click **NODE** in the dashboard, or run `./meshnode.sh` for a headless participant. No manual peer list — fleet defaults discover the seed and pull the signed manifest automatically. Set `MESH_INFONET_FLEET_JOIN=false` in `.env` for a private solo node. + > **Backend port already in use?** The browser only needs port `3000`, but the backend API is also published on host port `8000` for local diagnostics. If another app already uses `8000`, create or edit `.env` next to `docker-compose.yml` and set `BACKEND_PORT=8001`, then run `docker compose up -d`. > **Blank news/UAP/bases/wastewater after several minutes?** Check for backend OOM restarts with `docker events --since 30m --filter container=shadowbroker-backend --filter event=oom`. The default compose file gives the backend 4GB; if your host has less memory, reduce enabled feeds or set `BACKEND_MEMORY_LIMIT=3G` and expect slower/heavier layers to warm more gradually. diff --git a/backend/main.py b/backend/main.py index b9e0933..1f32f24 100644 --- a/backend/main.py +++ b/backend/main.py @@ -1469,10 +1469,9 @@ def _refresh_node_peer_store(*, now: float | None = None) -> dict[str, Any]: def _swarm_bootstrap_after_transport_ready() -> None: try: - from services.mesh.mesh_swarm_runtime import announce_local_peer_to_seeds, refresh_swarm_manifest_from_seeds + from services.mesh.mesh_swarm_runtime import join_swarm_with_retries - announce_local_peer_to_seeds(force=True) - refresh_swarm_manifest_from_seeds(force=True) + join_swarm_with_retries(attempts=4, delay_s=15.0, force=True) _refresh_node_peer_store() except Exception: logger.warning("swarm bootstrap after transport ready failed", exc_info=True) diff --git a/backend/services/mesh/mesh_swarm_runtime.py b/backend/services/mesh/mesh_swarm_runtime.py index 9d5f383..3e9901f 100644 --- a/backend/services/mesh/mesh_swarm_runtime.py +++ b/backend/services/mesh/mesh_swarm_runtime.py @@ -389,6 +389,52 @@ def announce_local_peer_to_seeds(*, now: float | None = None, force: bool = Fals return {"ok": ok, "peer_url": peer_url, "results": results} +def _announce_succeeded(announce: dict[str, Any]) -> bool: + if not bool(announce.get("ok")): + return False + results = announce.get("results") or [] + return any(bool(item.get("ok")) and int(item.get("status_code") or 0) == 200 for item in results) + + +def _manifest_succeeded(manifest: dict[str, Any]) -> bool: + if not bool(manifest.get("ok")): + return False + peer_count = int(manifest.get("merged_peer_count") or manifest.get("peer_count") or 0) + return peer_count >= 1 + + +def join_swarm_with_retries( + *, + attempts: int = 6, + delay_s: float = 15.0, + force: bool = True, +) -> dict[str, Any]: + """Announce to seed and pull manifest, retrying while Tor circuits warm up.""" + last_announce: dict[str, Any] = {"ok": False, "detail": "not attempted"} + last_manifest: dict[str, Any] = {"ok": False, "detail": "not attempted"} + tries = max(1, int(attempts)) + pause_s = max(1.0, float(delay_s)) + for attempt in range(tries): + last_announce = announce_local_peer_to_seeds(force=force) + last_manifest = refresh_swarm_manifest_from_seeds(force=force) + if _announce_succeeded(last_announce) and _manifest_succeeded(last_manifest): + return { + "ok": True, + "attempts": attempt + 1, + "announce": last_announce, + "manifest_pull": last_manifest, + } + if attempt + 1 < tries: + time.sleep(pause_s) + return { + "ok": False, + "attempts": tries, + "announce": last_announce, + "manifest_pull": last_manifest, + "detail": "swarm join incomplete after retries", + } + + def push_infonet_events_to_http_peers(events: list[dict[str, Any]]) -> dict[str, Any]: import hashlib as _hashlib_mod import hmac as _hmac_mod diff --git a/backend/services/openclaw_infonet.py b/backend/services/openclaw_infonet.py index 1ce69a4..1e28075 100644 --- a/backend/services/openclaw_infonet.py +++ b/backend/services/openclaw_infonet.py @@ -47,10 +47,7 @@ def ensure_infonet_ready(*, join_swarm: bool = True) -> dict[str, Any]: """Warm Tor, enable the participant node, and optionally join the swarm.""" from routers.ai_intel import _write_env_value from services.config import get_settings - from services.mesh.mesh_swarm_runtime import ( - announce_local_peer_to_seeds, - refresh_swarm_manifest_from_seeds, - ) + from services.mesh.mesh_swarm_runtime import join_swarm_with_retries from services.node_settings import read_node_settings, write_node_settings from services.tor_hidden_service import tor_service from services.wormhole_supervisor import _check_arti_ready @@ -87,9 +84,11 @@ def ensure_infonet_ready(*, join_swarm: bool = True) -> dict[str, Any]: steps["node_enabled"] = True if join_swarm: - steps["announce"] = announce_local_peer_to_seeds(force=True) - steps["manifest_pull"] = refresh_swarm_manifest_from_seeds(force=True) - ok = bool(steps["announce"].get("ok")) or bool(steps["manifest_pull"].get("ok")) + joined = join_swarm_with_retries() + steps["announce"] = joined.get("announce") or {} + steps["manifest_pull"] = joined.get("manifest_pull") or {} + steps["swarm_attempts"] = joined.get("attempts") + ok = bool(joined.get("ok")) else: ok = True @@ -102,17 +101,15 @@ def ensure_infonet_ready(*, join_swarm: bool = True) -> dict[str, Any]: def join_infonet_swarm() -> dict[str, Any]: - from services.mesh.mesh_swarm_runtime import ( - announce_local_peer_to_seeds, - refresh_swarm_manifest_from_seeds, - ) + from services.mesh.mesh_swarm_runtime import join_swarm_with_retries - announce = announce_local_peer_to_seeds(force=True) - manifest = refresh_swarm_manifest_from_seeds(force=True) + joined = join_swarm_with_retries() return { - "ok": bool(announce.get("ok")) or bool(manifest.get("ok")), - "announce": announce, - "manifest_pull": manifest, + "ok": bool(joined.get("ok")), + "announce": joined.get("announce") or {}, + "manifest_pull": joined.get("manifest_pull") or {}, + "attempts": joined.get("attempts"), + "detail": joined.get("detail"), } diff --git a/backend/tests/mesh/test_swarm_join_retries.py b/backend/tests/mesh/test_swarm_join_retries.py new file mode 100644 index 0000000..de27275 --- /dev/null +++ b/backend/tests/mesh/test_swarm_join_retries.py @@ -0,0 +1,25 @@ +from services.mesh import mesh_swarm_runtime as swarm + + +def test_join_swarm_with_retries_succeeds_on_second_attempt(monkeypatch): + calls = {"n": 0} + + def fake_announce(*, force=True): + calls["n"] += 1 + if calls["n"] < 2: + return {"ok": False, "results": [{"ok": False, "status_code": 503}]} + return {"ok": True, "results": [{"ok": True, "status_code": 200}]} + + def fake_manifest(*, force=True, now=None): + if calls["n"] < 2: + return {"ok": False, "detail": "manifest fetch failed"} + return {"ok": True, "peer_count": 3, "merged_peer_count": 3} + + monkeypatch.setattr(swarm, "announce_local_peer_to_seeds", fake_announce) + monkeypatch.setattr(swarm, "refresh_swarm_manifest_from_seeds", fake_manifest) + monkeypatch.setattr(swarm.time, "sleep", lambda _s: None) + + joined = swarm.join_swarm_with_retries(attempts=3, delay_s=1.0) + + assert joined["ok"] is True + assert joined["attempts"] == 2 diff --git a/scripts/verify_swarm_fresh_participant.py b/scripts/verify_swarm_fresh_participant.py new file mode 100644 index 0000000..78eea12 --- /dev/null +++ b/scripts/verify_swarm_fresh_participant.py @@ -0,0 +1,224 @@ +#!/usr/bin/env python3 +"""Fresh-install v1 swarm smoke test: GHCR image + empty data volume only. + +Simulates a new operator who runs the published backend image with fleet +defaults and no hand-edited peer list. Equivalent to NODE / meshnode.sh join. +""" + +from __future__ import annotations + +import json +import os +import subprocess +import sys +import time +import urllib.error +import urllib.request + +IMAGE = os.environ.get( + "SWARM_FRESH_IMAGE", + "ghcr.io/bigbodycobain/shadowbroker-backend:latest", +) +CONTAINER = os.environ.get("SWARM_FRESH_CONTAINER", "swarm-fresh-smoke") +VOLUME = os.environ.get("SWARM_FRESH_VOLUME", "swarm_fresh_smoke_data") +HOST_PORT = int(os.environ.get("SWARM_FRESH_PORT", "18001")) +API = f"http://127.0.0.1:{HOST_PORT}" +MARKER = os.environ.get("SWARM_FRESH_MARKER", f"FRESH-SWARM-{int(time.time())}") +KEEP = os.environ.get("SWARM_FRESH_KEEP", "") == "1" + + +def run(cmd: list[str], *, check: bool = True, timeout: int = 600) -> subprocess.CompletedProcess: + print("+", " ".join(cmd)) + proc = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False) + if check and proc.returncode != 0: + raise RuntimeError(proc.stderr.strip() or proc.stdout.strip() or f"command failed: {cmd}") + return proc + + +def http_json(method: str, path: str, body: dict | None = None, *, timeout: int = 30) -> dict: + data = None + headers = {"Content-Type": "application/json"} + if body is not None: + data = json.dumps(body, separators=(",", ":"), sort_keys=True).encode("utf-8") + req = urllib.request.Request(f"{API}{path}", data=data, headers=headers, method=method.upper()) + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def docker_python(code: str) -> str: + proc = run( + ["docker", "exec", CONTAINER, "python", "-c", code], + timeout=300, + ) + return proc.stdout.strip() + + +def wait_healthy(timeout_s: int = 180) -> None: + deadline = time.time() + timeout_s + last_error = "backend not healthy" + while time.time() < deadline: + try: + with urllib.request.urlopen(f"{API}/api/health", timeout=10) as resp: + if resp.status == 200: + print("PASS: fresh container health") + return + except Exception as exc: + last_error = str(exc) + time.sleep(3) + raise RuntimeError(last_error) + + +def step_start_fresh_container() -> None: + run(["docker", "rm", "-f", CONTAINER], check=False) + run(["docker", "volume", "rm", VOLUME], check=False) + run( + [ + "docker", + "run", + "-d", + "--name", + CONTAINER, + "-p", + f"127.0.0.1:{HOST_PORT}:8000", + "-v", + f"{VOLUME}:/app/data", + IMAGE, + ], + timeout=120, + ) + print(f"Started {CONTAINER} on {API} with empty volume {VOLUME}") + + +def step_fleet_defaults() -> None: + out = docker_python( + "import json; " + "from services.mesh.mesh_fleet_defaults import infonet_fleet_join_enabled, FLEET_SEED_ONION_URL; " + "from services.config import get_settings; " + "print(json.dumps({" + "'fleet_join': infonet_fleet_join_enabled(), " + "'seed': FLEET_SEED_ONION_URL, " + "'arti': bool(get_settings().MESH_ARTI_ENABLED), " + "'push_secret_configured': bool(__import__('services.mesh.mesh_fleet_defaults', fromlist=['effective_peer_push_secret']).effective_peer_push_secret())" + "}))" + ) + payload = json.loads(out) + if not payload.get("fleet_join"): + raise RuntimeError(f"fleet join disabled in fresh image: {payload}") + if not str(payload.get("seed") or "").endswith(".onion:8000"): + raise RuntimeError(f"unexpected fleet seed in fresh image: {payload}") + if not payload.get("push_secret_configured"): + raise RuntimeError(f"fleet HMAC not configured in fresh image: {payload}") + print("PASS: image ships sb-testnet fleet defaults (no env edits)") + + +def step_node_join_like_ui() -> dict: + """NODE / meshnode equivalent: warm Tor, enable node, announce, pull manifest.""" + code = r""" +import json +import time +from services.openclaw_infonet import ensure_infonet_ready +from services.mesh.mesh_swarm_runtime import announce_local_peer_to_seeds, refresh_swarm_manifest_from_seeds + +warm = ensure_infonet_ready(join_swarm=False) +if not warm.get("ok"): + print(json.dumps(warm)) + raise SystemExit(1) + +joined = {"ok": False, "announce": {}, "manifest_pull": {}} +try: + from services.mesh.mesh_swarm_runtime import join_swarm_with_retries + joined = join_swarm_with_retries() +except Exception: + for attempt in range(6): + announce = announce_local_peer_to_seeds(force=True) + manifest = refresh_swarm_manifest_from_seeds(force=True) + joined = { + "ok": False, + "attempts": attempt + 1, + "announce": announce, + "manifest_pull": manifest, + } + announce_ok = any( + int(r.get("status_code") or 0) == 200 + for r in (announce.get("results") or []) + if r.get("ok") + ) + manifest_ok = bool(manifest.get("ok")) and int( + manifest.get("merged_peer_count") or manifest.get("peer_count") or 0 + ) >= 1 + if announce_ok and manifest_ok: + joined["ok"] = True + break + time.sleep(15) + +warm["steps"]["announce"] = joined.get("announce") or {} +warm["steps"]["manifest_pull"] = joined.get("manifest_pull") or {} +warm["steps"]["swarm_attempts"] = joined.get("attempts") +warm["ok"] = bool(joined.get("ok")) +warm["detail"] = "Infonet participant runtime ready" if warm["ok"] else "swarm join incomplete" +print(json.dumps(warm)) +""" + payload = json.loads(docker_python(code)) + print("fresh join:", json.dumps(payload, indent=2)[:5000]) + if not payload.get("ok"): + raise RuntimeError(f"fresh NODE-equivalent join failed: {payload}") + + announce = (payload.get("steps") or {}).get("announce") or {} + manifest = (payload.get("steps") or {}).get("manifest_pull") or {} + attempts = (payload.get("steps") or {}).get("swarm_attempts") + results = announce.get("results") or [] + if not any(r.get("ok") and int(r.get("status_code") or 0) == 200 for r in results): + raise RuntimeError(f"seed announce did not return 200: {announce}") + peer_count = int(manifest.get("merged_peer_count") or manifest.get("peer_count") or 0) + if peer_count < 1: + raise RuntimeError(f"manifest pull returned no peers: {manifest}") + attempt_note = f" after {attempts} attempt(s)" if attempts else "" + print(f"PASS: announce 200 + manifest ({peer_count} peer(s)){attempt_note}") + return payload + + +def step_gate_message_visible() -> None: + marker = json.dumps(MARKER) + code = f""" +import json +from services.mesh.mesh_hashchain import infonet +from services.openclaw_infonet import post_gate_message + +before = len(infonet.events) +result = post_gate_message("infonet", {marker}) +after = len(infonet.events) +print(json.dumps({{"ok": bool(result.get("ok")), "before": before, "after": after, "result": result}})) +""" + payload = json.loads(docker_python(code)) + if not payload.get("ok"): + raise RuntimeError(f"gate post failed on fresh node: {payload}") + if int(payload.get("after") or 0) <= int(payload.get("before") or 0): + raise RuntimeError(f"gate message not appended locally: {payload}") + print(f"PASS: gate message '{MARKER}' visible in local infonet chain") + + +def cleanup() -> None: + if KEEP: + print(f"KEEP=1: leaving {CONTAINER} and volume {VOLUME} running") + return + run(["docker", "rm", "-f", CONTAINER], check=False) + run(["docker", "volume", "rm", VOLUME], check=False) + print("cleaned up fresh smoke container + volume") + + +def main() -> int: + try: + print(f"Fresh participant swarm smoke ({IMAGE})") + step_start_fresh_container() + wait_healthy() + step_fleet_defaults() + step_node_join_like_ui() + step_gate_message_visible() + print("ALL FRESH PARTICIPANT SWARM CHECKS PASSED") + return 0 + finally: + cleanup() + + +if __name__ == "__main__": + raise SystemExit(main())