Harden infonet control surfaces

This commit is contained in:
BigBodyCobain
2026-05-18 11:22:38 -06:00
parent 25a98a9869
commit 11ea345518
30 changed files with 1810 additions and 276 deletions
+36 -14
View File
@@ -3061,6 +3061,17 @@ def _request_private_surface_warmup(*, path: str, method: str, current_tier: str
)
def _is_invite_scoped_prekey_bundle_lookup(request: Request, path: str) -> bool:
if request.method.upper() != "GET" or str(path or "").strip() != "/api/mesh/dm/prekey-bundle":
return False
try:
lookup_token = str(request.query_params.get("lookup_token", "") or "").strip()
agent_id = str(request.query_params.get("agent_id", "") or "").strip()
except Exception:
return False
return bool(lookup_token) and not agent_id
def _resume_private_delivery_background_work(*, current_tier: str, reason: str) -> None:
pending_items = private_delivery_outbox.pending_items()
if not pending_items:
@@ -3191,6 +3202,17 @@ async def enforce_high_privacy_mesh(request: Request, call_next):
# transport has not finished coming up yet.
request.state._private_control_transport_pending = current_tier == "public_degraded"
request.state._private_lane_current_tier = current_tier
elif _is_invite_scoped_prekey_bundle_lookup(request, path):
# A copied DM address carries a high-entropy invite lookup
# handle. Returning the public prekey bundle for that
# handle is the bootstrap step that lets first contact get
# saved; blocking it behind the full private lane creates a
# circular warm-up failure. Stable agent_id lookup still
# follows the normal transport-tier policy.
request.state._invite_prekey_lookup_transport_pending = (
current_tier == "public_degraded"
)
request.state._private_lane_current_tier = current_tier
else:
# Tor-style: instead of failing, keep trying in the
# background and return an ok:True "preparing" response
@@ -3323,7 +3345,7 @@ async def force_refresh(request: Request):
return {"status": "refreshing in background"}
@app.post("/api/ais/feed")
@app.post("/api/ais/feed", dependencies=[Depends(require_local_operator)])
@limiter.limit("60/minute")
async def ais_feed(request: Request):
"""Accept AIS-catcher HTTP JSON feed (POST decoded AIS messages)."""
@@ -3418,7 +3440,7 @@ class LayerUpdate(BaseModel):
layers: dict[str, bool]
@app.post("/api/layers")
@app.post("/api/layers", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def update_layers(update: LayerUpdate, request: Request):
"""Receive frontend layer toggle state. Starts/stops streams accordingly."""
@@ -9812,7 +9834,7 @@ async def api_wormhole_leave(request: Request):
}
@app.get("/api/wormhole/identity")
@app.get("/api/wormhole/identity", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_identity(request: Request):
try:
@@ -9825,7 +9847,7 @@ async def api_wormhole_identity(request: Request):
raise HTTPException(status_code=500, detail="wormhole_identity_failed") from exc
@app.post("/api/wormhole/identity/bootstrap")
@app.post("/api/wormhole/identity/bootstrap", dependencies=[Depends(require_local_operator)])
@limiter.limit("10/minute")
async def api_wormhole_identity_bootstrap(request: Request):
bootstrap_wormhole_identity()
@@ -10605,7 +10627,7 @@ async def api_wormhole_sign(request: Request, body: WormholeSignRequest):
)
@app.post("/api/wormhole/gate/enter")
@app.post("/api/wormhole/gate/enter", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_enter(request: Request, body: WormholeGateRequest):
gate_id = str(body.gate_id or "")
@@ -10619,7 +10641,7 @@ async def api_wormhole_gate_enter(request: Request, body: WormholeGateRequest):
return result
@app.post("/api/wormhole/gate/leave")
@app.post("/api/wormhole/gate/leave", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_leave(request: Request, body: WormholeGateRequest):
return leave_gate(str(body.gate_id or ""))
@@ -10661,7 +10683,7 @@ async def api_wormhole_gate_key_rotate(request: Request, body: WormholeGateRotat
return result
@app.post("/api/wormhole/gate/persona/create")
@app.post("/api/wormhole/gate/persona/create", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_create(
request: Request, body: WormholeGatePersonaCreateRequest
@@ -10677,7 +10699,7 @@ async def api_wormhole_gate_persona_create(
return result
@app.post("/api/wormhole/gate/persona/activate")
@app.post("/api/wormhole/gate/persona/activate", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_activate(
request: Request, body: WormholeGatePersonaActivateRequest
@@ -10693,7 +10715,7 @@ async def api_wormhole_gate_persona_activate(
return result
@app.post("/api/wormhole/gate/persona/clear")
@app.post("/api/wormhole/gate/persona/clear", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_clear(request: Request, body: WormholeGateRequest):
gate_id = str(body.gate_id or "")
@@ -10707,7 +10729,7 @@ async def api_wormhole_gate_persona_clear(request: Request, body: WormholeGateRe
return result
@app.post("/api/wormhole/gate/persona/retire")
@app.post("/api/wormhole/gate/persona/retire", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_retire(
request: Request, body: WormholeGatePersonaActivateRequest
@@ -10788,7 +10810,7 @@ async def api_wormhole_gate_message_compose(request: Request, body: WormholeGate
return composed
@app.post("/api/wormhole/gate/message/sign-encrypted")
@app.post("/api/wormhole/gate/message/sign-encrypted", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_message_sign_encrypted(
request: Request,
@@ -11000,13 +11022,13 @@ async def api_wormhole_gate_messages_decrypt(request: Request, body: WormholeGat
return {"ok": True, "results": results}
@app.post("/api/wormhole/gate/state/export")
@app.post("/api/wormhole/gate/state/export", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_state_export(request: Request, body: WormholeGateRequest):
return export_gate_state_snapshot_with_repair(str(body.gate_id or ""))
@app.post("/api/wormhole/gate/proof")
@app.post("/api/wormhole/gate/proof", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_proof(request: Request, body: WormholeGateRequest):
proof = _sign_gate_access_proof(str(body.gate_id or ""))
@@ -11553,7 +11575,7 @@ async def api_wormhole_health(request: Request):
return _redact_wormhole_status(full_state, authenticated=ok)
@app.post("/api/wormhole/connect")
@app.post("/api/wormhole/connect", dependencies=[Depends(require_local_operator)])
@limiter.limit("10/minute")
async def api_wormhole_connect(request: Request):
settings = read_wormhole_settings()
+3 -4
View File
@@ -379,14 +379,13 @@ async def api_refresh_layer_feed(request: Request, layer_id: str):
# Agent Actions endpoint — frontend polls this for UI commands from the agent
# ---------------------------------------------------------------------------
@router.get("/api/ai/agent-actions")
@router.get("/api/ai/agent-actions", dependencies=[Depends(require_local_operator)])
@limiter.limit("120/minute")
async def get_agent_actions(request: Request):
"""Frontend polls for pending agent display actions (destructive read).
No auth required — this only contains display directives (show image,
fly to location), not sensitive data. The agent authenticates when
pushing actions through the command channel.
Local operator access is required because polling destructively drains
the shared operator action queue.
"""
actions = pop_agent_actions()
return {"ok": True, "actions": actions}
+2 -2
View File
@@ -266,7 +266,7 @@ async def force_refresh(request: Request):
return {"status": "refreshing in background"}
@router.post("/api/ais/feed")
@router.post("/api/ais/feed", dependencies=[Depends(require_local_operator)])
@limiter.limit("60/minute")
async def ais_feed(request: Request):
"""Accept AIS-catcher HTTP JSON feed (POST decoded AIS messages)."""
@@ -304,7 +304,7 @@ async def update_viewport(vp: ViewportUpdate, request: Request): # noqa: ARG001
return {"status": "ok"}
@router.post("/api/layers")
@router.post("/api/layers", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def update_layers(update: LayerUpdate, request: Request):
"""Receive frontend layer toggle state. Starts/stops streams accordingly."""
+12 -12
View File
@@ -663,7 +663,7 @@ async def api_wormhole_leave(request: Request):
}
@router.get("/api/wormhole/identity")
@router.get("/api/wormhole/identity", dependencies=[Depends(require_local_operator)])
@limiter.limit("240/minute")
async def api_wormhole_identity(request: Request):
try:
@@ -674,7 +674,7 @@ async def api_wormhole_identity(request: Request):
raise HTTPException(status_code=500, detail="wormhole_identity_failed") from exc
@router.post("/api/wormhole/identity/bootstrap")
@router.post("/api/wormhole/identity/bootstrap", dependencies=[Depends(require_local_operator)])
@limiter.limit("10/minute")
async def api_wormhole_identity_bootstrap(request: Request):
bootstrap_wormhole_identity()
@@ -773,7 +773,7 @@ async def api_wormhole_sign(request: Request, body: WormholeSignRequest):
)
@router.post("/api/wormhole/gate/enter")
@router.post("/api/wormhole/gate/enter", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_enter(request: Request, body: WormholeGateRequest):
gate_id = str(body.gate_id or "")
@@ -787,7 +787,7 @@ async def api_wormhole_gate_enter(request: Request, body: WormholeGateRequest):
return result
@router.post("/api/wormhole/gate/leave")
@router.post("/api/wormhole/gate/leave", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_leave(request: Request, body: WormholeGateRequest):
return leave_gate(str(body.gate_id or ""))
@@ -829,7 +829,7 @@ async def api_wormhole_gate_key_rotate(request: Request, body: WormholeGateRotat
return result
@router.post("/api/wormhole/gate/persona/create")
@router.post("/api/wormhole/gate/persona/create", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_create(
request: Request, body: WormholeGatePersonaCreateRequest
@@ -845,7 +845,7 @@ async def api_wormhole_gate_persona_create(
return result
@router.post("/api/wormhole/gate/persona/activate")
@router.post("/api/wormhole/gate/persona/activate", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_activate(
request: Request, body: WormholeGatePersonaActivateRequest
@@ -861,7 +861,7 @@ async def api_wormhole_gate_persona_activate(
return result
@router.post("/api/wormhole/gate/persona/clear")
@router.post("/api/wormhole/gate/persona/clear", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_clear(request: Request, body: WormholeGateRequest):
gate_id = str(body.gate_id or "")
@@ -875,7 +875,7 @@ async def api_wormhole_gate_persona_clear(request: Request, body: WormholeGateRe
return result
@router.post("/api/wormhole/gate/persona/retire")
@router.post("/api/wormhole/gate/persona/retire", dependencies=[Depends(require_local_operator)])
@limiter.limit("20/minute")
async def api_wormhole_gate_persona_retire(
request: Request, body: WormholeGatePersonaActivateRequest
@@ -944,7 +944,7 @@ async def api_wormhole_gate_message_compose(request: Request, body: WormholeGate
return await _m.api_wormhole_gate_message_compose(request, body)
@router.post("/api/wormhole/gate/message/sign-encrypted")
@router.post("/api/wormhole/gate/message/sign-encrypted", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_message_sign_encrypted(
request: Request,
@@ -1004,14 +1004,14 @@ async def api_wormhole_gate_messages_decrypt(request: Request, body: WormholeGat
return await _m.api_wormhole_gate_messages_decrypt(request, body)
@router.post("/api/wormhole/gate/state/export")
@router.post("/api/wormhole/gate/state/export", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_state_export(request: Request, body: WormholeGateRequest):
import main as _m
return await _m.api_wormhole_gate_state_export(request, body)
@router.post("/api/wormhole/gate/proof")
@router.post("/api/wormhole/gate/proof", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def api_wormhole_gate_proof(request: Request, body: WormholeGateRequest):
proof = _sign_gate_access_proof(str(body.gate_id or ""))
@@ -1360,7 +1360,7 @@ async def api_wormhole_health(request: Request):
return _redact_wormhole_status(full_state, authenticated=ok)
@router.post("/api/wormhole/connect")
@router.post("/api/wormhole/connect", dependencies=[Depends(require_local_operator)])
@limiter.limit("10/minute")
async def api_wormhole_connect(request: Request):
settings = read_wormhole_settings()
+1 -1
View File
@@ -318,7 +318,7 @@ active_layers: dict[str, bool] = {
"uap_sightings": True,
"wastewater": True,
"ai_intel": True,
"crowdthreat": True,
"crowdthreat": False,
"sar": True,
}
+17
View File
@@ -7,6 +7,7 @@ No API key required — the /threats endpoint is unauthenticated.
"""
import logging
import os
from services.network_utils import fetch_with_curl
from services.fetchers._store import latest_data, _data_lock, _mark_fresh, is_any_active
@@ -16,6 +17,16 @@ logger = logging.getLogger("services.data_fetcher")
_CT_BASE = "https://backend.crowdthreat.world"
def crowdthreat_fetch_enabled() -> bool:
"""Return True only when the operator explicitly opts into CrowdThreat pulls."""
return str(os.environ.get("CROWDTHREAT_ENABLED", "")).strip().lower() in {
"1",
"true",
"yes",
"on",
}
# CrowdThreat category_id → icon ID used on the MapLibre layer
_CATEGORY_ICON = {
1: "ct-security", # Security & Conflict (red)
@@ -43,6 +54,12 @@ _CATEGORY_COLOUR = {
@with_retry(max_retries=2, base_delay=5)
def fetch_crowdthreat():
"""Fetch verified threat reports from CrowdThreat public API."""
if not crowdthreat_fetch_enabled():
logger.debug("CrowdThreat fetch skipped; set CROWDTHREAT_ENABLED=true to opt in")
with _data_lock:
latest_data["crowdthreat"] = []
_mark_fresh("crowdthreat")
return
if not is_any_active("crowdthreat"):
return
+19 -2
View File
@@ -1438,6 +1438,7 @@ class Infonet:
# Running counters — avoid O(N) scans in get_info()
self._type_counts: dict[str, int] = {}
self._active_count: int = 0
self._registered_nodes: set[str] = set()
self._chain_bytes: int = 2 # Start with "[]" empty JSON array
self._dirty = False
self._save_lock = threading.Lock()
@@ -1518,6 +1519,7 @@ class Infonet:
self._last_validated_index = 0
self._type_counts = {}
self._active_count = 0
self._registered_nodes = set()
self._chain_bytes = 2
def _rebuild_state(self) -> None:
@@ -1566,10 +1568,15 @@ class Infonet:
now = time.time()
self._type_counts = {}
self._active_count = 0
self._registered_nodes = set()
self._chain_bytes = 2 # "[]"
for evt in self.events:
t = evt.get("event_type", "unknown")
self._type_counts[t] = self._type_counts.get(t, 0) + 1
if t == "node_register":
node_id = str(evt.get("node_id", "") or "")
if node_id:
self._registered_nodes.add(node_id)
is_eph = evt.get("payload", {}).get("ephemeral") or evt.get("payload", {}).get("_ephemeral")
if not is_eph or (now - evt.get("timestamp", 0)) < EPHEMERAL_TTL:
self._active_count += 1
@@ -1579,6 +1586,10 @@ class Infonet:
"""Incrementally update counters when a new event is appended."""
t = evt.get("event_type", "unknown")
self._type_counts[t] = self._type_counts.get(t, 0) + 1
if t == "node_register":
node_id = str(evt.get("node_id", "") or "")
if node_id:
self._registered_nodes.add(node_id)
self._active_count += 1
self._chain_bytes += len(json.dumps(evt)) + 2
@@ -2247,6 +2258,7 @@ class Infonet:
self.event_index[event_id] = len(self.events) - 1
self.head_hash = event_id
self.node_sequences[node_id] = sequence
self._update_counters_for_event(evt)
accepted += 1
expected_prev = event_id
self._replay_filter.add(event_id)
@@ -2552,6 +2564,8 @@ class Infonet:
# Apply fork
self.events = prefix + ordered
self._rebuild_state()
self._rebuild_revocations()
self._rebuild_counters()
self._save()
try:
from services.mesh.mesh_metrics import increment as metrics_inc
@@ -2681,6 +2695,8 @@ class Infonet:
"head_hash_full": self.head_hash,
"chain_lock": self.chain_lock(),
"known_nodes": len(self.node_sequences),
"author_nodes": len(self.node_sequences),
"registered_nodes": len(self._registered_nodes),
"event_types": dict(self._type_counts),
"chain_size_kb": round(self._chain_bytes / 1024, 1),
"unsigned_events": 0,
@@ -2716,8 +2732,9 @@ class Infonet:
if len(new_events) != before:
self.events = new_events
# Rebuild index
self.event_index = {e["event_id"]: i for i, e in enumerate(self.events)}
self._rebuild_state()
self._rebuild_revocations()
self._rebuild_counters()
self._save()
logger.info(f"Infonet cleanup: removed {before - len(new_events)} expired events")
+125 -3
View File
@@ -17,7 +17,7 @@ import time
from typing import Any
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
from services.mesh.mesh_crypto import (
build_signature_payload,
@@ -464,6 +464,37 @@ def _bundle_fingerprint(data: dict[str, Any]) -> str:
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def _ensure_dm_dh_material(data: dict[str, Any]) -> tuple[dict[str, Any], bool]:
"""Repair legacy/corrupt DM identities that kept signing keys but lost DH material."""
if str(data.get("dh_pub_key", "") or "").strip() and str(data.get("dh_private_key", "") or "").strip():
return data, False
dh_priv = x25519.X25519PrivateKey.generate()
dh_priv_raw = dh_priv.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
dh_pub_raw = dh_priv.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
repaired = {
**dict(data or {}),
"dh_pub_key": base64.b64encode(dh_pub_raw).decode("ascii"),
"dh_algo": "X25519",
"dh_private_key": base64.b64encode(dh_priv_raw).decode("ascii"),
"last_dh_timestamp": int(time.time()),
"bundle_fingerprint": "",
"bundle_sequence": 0,
"bundle_registered_at": 0,
"prekey_bundle_registered_at": 0,
"prekey_transparency_head": "",
"prekey_transparency_size": 0,
}
return _write_identity(repaired), True
def trust_fingerprint_for_identity_material(
*,
agent_id: str,
@@ -830,10 +861,11 @@ def _sign_dm_invite_payload(
def register_wormhole_dm_key(force: bool = False) -> dict[str, Any]:
data = read_wormhole_identity()
data, repaired_dh = _ensure_dm_dh_material(data)
timestamp = int(time.time())
fingerprint = _bundle_fingerprint(data)
if not force and fingerprint and fingerprint == data.get("bundle_fingerprint"):
if not force and not repaired_dh and fingerprint and fingerprint == data.get("bundle_fingerprint"):
return {
"ok": True,
**_public_view(data),
@@ -1525,11 +1557,101 @@ def import_wormhole_dm_invite(invite: dict[str, Any], *, alias: str = "") -> dic
"detail": "compat dm invite import disabled; ask the sender to re-export a current signed invite",
}
def _prekey_missing_or_pending(detail: str) -> bool:
lower = str(detail or "").strip().lower()
return any(
phrase in lower
for phrase in (
"prekey bundle not found",
"invite prekey bundle not found",
"peer prekey lookup unavailable",
"peer prekey lookup still preparing",
"transport tier insufficient",
"preparing_private_lane",
)
)
def _pin_pending_invite_prekey(detail: str) -> dict[str, Any]:
if invite_version < DM_INVITE_VERSION:
return {"ok": False, "detail": detail or "invite prekey bundle not found"}
invite_root_distribution = _verify_dm_invite_root_distribution(payload)
if not invite_root_distribution.get("ok"):
return invite_root_distribution
attested = _verify_dm_invite_identity_attestation(
envelope=envelope,
payload=payload,
resolved_root_node_id=str(invite_root_distribution.get("root_node_id", "") or ""),
resolved_root_public_key=str(invite_root_distribution.get("root_public_key", "") or ""),
resolved_root_public_key_algo=str(
invite_root_distribution.get("root_public_key_algo", "Ed25519") or "Ed25519"
),
resolved_root_manifest_fingerprint=str(
invite_root_distribution.get("root_manifest_fingerprint", "") or ""
).strip().lower(),
)
if not attested.get("ok"):
return attested
pending_peer_id = str(verified.get("peer_id", "") or "").strip()
trust_fingerprint = str(verified.get("trust_fingerprint", "") or "").strip().lower()
contact = pin_wormhole_dm_invite(
pending_peer_id,
invite_payload={
"trust_fingerprint": trust_fingerprint,
"public_key": "",
"public_key_algo": "Ed25519",
"identity_dh_pub_key": "",
"dh_algo": "X25519",
"prekey_lookup_handle": lookup_handle,
"issued_at": int(payload.get("issued_at", 0) or 0),
"expires_at": int(payload.get("expires_at", 0) or 0),
"label": str(payload.get("label", "") or ""),
"root_node_id": str(attested.get("root_node_id", "") or ""),
"root_public_key": str(attested.get("root_public_key", "") or ""),
"root_public_key_algo": str(attested.get("root_public_key_algo", "Ed25519") or "Ed25519"),
"root_fingerprint": str(attested.get("root_fingerprint", "") or ""),
"root_manifest_fingerprint": str(invite_root_distribution.get("root_manifest_fingerprint", "") or ""),
"root_witness_policy_fingerprint": str(
invite_root_distribution.get("root_witness_policy_fingerprint", "") or ""
),
"root_witness_threshold": _safe_int(
invite_root_distribution.get("root_witness_threshold", 0) or 0,
0,
),
"root_witness_count": _safe_int(invite_root_distribution.get("root_witness_count", 0) or 0, 0),
"root_witness_domain_count": _safe_int(
invite_root_distribution.get("root_witness_domain_count", 0) or 0,
0,
),
"root_manifest_generation": _safe_int(
invite_root_distribution.get("root_manifest_generation", 0) or 0,
0,
),
"root_rotation_proven": bool(invite_root_distribution.get("root_rotation_proven")),
},
alias=resolved_alias,
attested=True,
)
return {
"ok": True,
"peer_id": pending_peer_id,
"invite_peer_id": pending_peer_id,
"trust_fingerprint": trust_fingerprint,
"trust_level": str(contact.get("trust_level", "") or ""),
"detail": "Contact saved.",
"invite_attested": True,
"pending_prekey": True,
"prekey_detail": detail or "invite prekey bundle not found",
"contact": contact,
}
from services.mesh.mesh_wormhole_prekey import fetch_dm_prekey_bundle
fetched = fetch_dm_prekey_bundle(lookup_token=lookup_handle)
if not fetched.get("ok"):
return {"ok": False, "detail": str(fetched.get("detail", "") or "invite prekey bundle not found")}
fetch_detail = str(fetched.get("detail", "") or "invite prekey bundle not found")
if _prekey_missing_or_pending(fetch_detail):
return _pin_pending_invite_prekey(fetch_detail)
return {"ok": False, "detail": fetch_detail}
resolved_peer_id = str(fetched.get("agent_id", "") or "").strip()
if not resolved_peer_id:
@@ -11,6 +11,7 @@ import os
import random
import time
import urllib.error
import urllib.parse
import urllib.request
from typing import Any
@@ -150,6 +151,118 @@ def _fetch_dm_prekey_bundle_from_peer_lookup(lookup_token: str) -> dict[str, Any
return {"ok": False, "detail": last_detail or "Prekey bundle not found"}
def _configured_public_lookup_peer_urls() -> list[str]:
try:
from services.config import get_settings
from services.mesh.mesh_router import active_sync_peer_urls, parse_configured_relay_peers
settings = get_settings()
candidates: list[str] = []
for raw in (
getattr(settings, "MESH_BOOTSTRAP_SEED_PEERS", ""),
getattr(settings, "MESH_DEFAULT_SYNC_PEERS", ""),
):
candidates.extend(parse_configured_relay_peers(str(raw or "")))
candidates.extend(active_sync_peer_urls())
except Exception:
return []
seen: set[str] = set()
peers: list[str] = []
for candidate in candidates:
peer = str(candidate or "").strip().rstrip("/")
if not peer or peer in seen:
continue
seen.add(peer)
peers.append(peer)
return peers
def _normalize_remote_lookup_bundle(payload: dict[str, Any]) -> dict[str, Any]:
data = dict(payload or {})
bundle = dict(data.get("bundle") or {})
public_key = str(data.get("public_key", "") or bundle.get("public_key", "") or "").strip()
if not public_key:
return {"ok": False, "detail": "Prekey bundle missing signing key"}
agent_id = str(data.get("agent_id", "") or "").strip() or derive_node_id(public_key)
if not agent_id:
return {"ok": False, "detail": "Prekey bundle public key binding mismatch"}
data["agent_id"] = agent_id
data["public_key"] = public_key
data["public_key_algo"] = str(data.get("public_key_algo", "") or bundle.get("public_key_algo", "Ed25519") or "Ed25519")
data["protocol_version"] = str(data.get("protocol_version", "") or bundle.get("protocol_version", PROTOCOL_VERSION) or PROTOCOL_VERSION)
data["bundle"] = bundle
ok, reason = _validate_bundle_record(data)
if not ok:
return {"ok": False, "detail": reason}
data["ok"] = True
data["lookup_mode"] = "invite_lookup_handle"
data["public_lookup"] = True
return data
def _fetch_dm_prekey_bundle_from_public_lookup(lookup_token: str) -> dict[str, Any]:
"""Fetch an invite-scoped prekey bundle from bootstrap/sync peers.
The token is high-entropy and invite-scoped. This path does not expose a
stable agent_id to the peer; if the ordinary peer response omits agent_id,
derive it from the signed identity public key and validate the bundle before
accepting it.
"""
token = str(lookup_token or "").strip()
if not token:
return {"ok": False, "detail": "lookup token required"}
peers = _configured_public_lookup_peer_urls()
if not peers:
return {"ok": False, "detail": "peer prekey lookup unavailable"}
try:
from services.config import get_settings
timeout = max(1, _safe_int(getattr(get_settings(), "MESH_SYNC_TIMEOUT_S", 5) or 5, 5))
except Exception:
timeout = 5
encoded = urllib.parse.urlencode({"lookup_token": token})
last_detail = ""
for peer_url in peers:
normalized_peer_url = str(peer_url or "").strip().rstrip("/")
if not normalized_peer_url:
continue
request = urllib.request.Request(
f"{normalized_peer_url}/api/mesh/dm/prekey-bundle?{encoded}",
headers={
"Accept": "application/json",
"User-Agent": "ShadowBroker-Infonet/0.9 (+https://github.com/BigBodyCobain/Shadowbroker)",
},
method="GET",
)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
raw = response.read(256 * 1024)
payload = json.loads(raw.decode("utf-8"))
except (urllib.error.URLError, TimeoutError, json.JSONDecodeError, OSError) as exc:
logger.debug("public prekey lookup failed for %s: %s", normalized_peer_url, type(exc).__name__)
last_detail = "peer prekey lookup unavailable"
continue
if not isinstance(payload, dict):
last_detail = "invalid peer response"
continue
if payload.get("pending") or str(payload.get("status", "") or "") == "preparing_private_lane":
last_detail = "peer prekey lookup still preparing"
continue
if not payload.get("ok"):
last_detail = str(payload.get("detail", "") or last_detail or "Prekey bundle not found")
continue
if not isinstance(payload.get("bundle"), dict):
last_detail = "Prekey bundle not found"
continue
normalized = _normalize_remote_lookup_bundle(payload)
if normalized.get("ok"):
return normalized
last_detail = str(normalized.get("detail", "") or last_detail)
return {"ok": False, "detail": last_detail or "Prekey bundle not found"}
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
@@ -926,6 +1039,11 @@ def fetch_dm_prekey_bundle(
peer_found = _fetch_dm_prekey_bundle_from_peer_lookup(resolved_lookup)
if peer_found.get("ok"):
return peer_found
public_found = _fetch_dm_prekey_bundle_from_public_lookup(resolved_lookup)
if public_found.get("ok"):
return public_found
if str(public_found.get("detail", "") or "").strip():
return {"ok": False, "detail": str(public_found.get("detail", "") or "Prekey bundle not found")}
return {"ok": False, "detail": str(peer_found.get("detail", "") or "Prekey bundle not found")}
else:
return {"ok": False, "detail": "Prekey bundle not found"}
+57 -1
View File
@@ -5,7 +5,7 @@ from starlette.requests import Request
from starlette.responses import Response
def _request(path: str, method: str = "POST") -> Request:
def _request(path: str, method: str = "POST", query_string: bytes = b"") -> Request:
return Request(
{
"type": "http",
@@ -13,6 +13,7 @@ def _request(path: str, method: str = "POST") -> Request:
"client": ("test", 12345),
"method": method,
"path": path,
"query_string": query_string,
}
)
@@ -504,6 +505,61 @@ def test_private_infonet_gate_write_returns_preparing_state_when_wormhole_not_re
get_settings.cache_clear()
def test_invite_scoped_prekey_lookup_reaches_handler_while_lane_prepares(monkeypatch):
"""Copied-address import must not be blocked by private-lane warmup."""
import main
import auth
from services.config import get_settings
from services import wormhole_supervisor
monkeypatch.setenv("MESH_PRIVATE_CLEARNET_FALLBACK", "block")
monkeypatch.setenv("MESH_BLOCK_LEGACY_NODE_ID_COMPAT", "true")
monkeypatch.setenv("MESH_BLOCK_LEGACY_AGENT_ID_LOOKUP", "true")
monkeypatch.setenv("MESH_ALLOW_COMPAT_DM_INVITE_IMPORT", "false")
get_settings.cache_clear()
monkeypatch.setattr(
auth,
"_anonymous_mode_state",
lambda: {
"enabled": False,
"wormhole_enabled": True,
"ready": False,
"effective_transport": "direct",
},
)
monkeypatch.setattr(
wormhole_supervisor,
"get_wormhole_state",
lambda: {
"configured": True,
"ready": False,
"rns_ready": False,
"arti_ready": False,
},
)
called = {"value": False}
async def call_next(_request: Request) -> Response:
called["value"] = True
return Response(status_code=200)
response = asyncio.run(
main.enforce_high_privacy_mesh(
_request(
"/api/mesh/dm/prekey-bundle",
method="GET",
query_string=b"lookup_token=invite-handle",
),
call_next,
)
)
assert response.status_code == 200
assert called["value"] is True
get_settings.cache_clear()
def test_private_dm_send_blocks_at_transitional_tier(monkeypatch):
import main
import auth
+14 -3
View File
@@ -47,6 +47,11 @@ def test_infonet_ingest_accepts_valid_event(tmp_path, monkeypatch):
assert result["accepted"] == 1
assert inf.head_hash == evt.event_id
info = inf.get_info()
assert info["known_nodes"] == 1
assert info["author_nodes"] == 1
assert info["total_events"] == 1
assert info["event_types"]["message"] == 1
def test_verify_node_binding_accepts_current_and_compat_ids_only(monkeypatch):
@@ -64,6 +69,8 @@ def test_verify_node_binding_accepts_current_and_compat_ids_only(monkeypatch):
f"{current[len(mesh_crypto.NODE_ID_PREFIX):len(mesh_crypto.NODE_ID_PREFIX) + 8]}"
)
monkeypatch.setenv("MESH_DEV_ALLOW_LEGACY_COMPAT", "true")
monkeypatch.setenv("MESH_BLOCK_LEGACY_NODE_ID_COMPAT", "false")
monkeypatch.setenv("MESH_ALLOW_LEGACY_NODE_ID_COMPAT_UNTIL", "2099-01-01")
from services.config import get_settings
@@ -98,7 +105,7 @@ def test_infonet_append_rejects_missing_signature_fields(tmp_path, monkeypatch):
assert "signature" in str(exc).lower()
def test_infonet_load_fails_closed_on_hash_mismatch(tmp_path, monkeypatch):
def test_infonet_load_quarantines_and_resets_on_hash_mismatch(tmp_path, monkeypatch):
monkeypatch.setattr(mesh_hashchain, "DATA_DIR", tmp_path)
monkeypatch.setattr(mesh_hashchain, "CHAIN_FILE", tmp_path / "infonet.json")
@@ -135,8 +142,12 @@ def test_infonet_load_fails_closed_on_hash_mismatch(tmp_path, monkeypatch):
encoding="utf-8",
)
with pytest.raises(ValueError, match="Hash mismatch on event load"):
mesh_hashchain.Infonet()
inf = mesh_hashchain.Infonet()
assert inf.events == []
assert inf.head_hash == mesh_hashchain.GENESIS_HASH
assert not mesh_hashchain.CHAIN_FILE.exists()
assert list(tmp_path.glob("infonet.json.quarantine.*"))
def test_validate_gate_message_payload_rejects_plaintext_shape():
@@ -12,6 +12,7 @@ Tests verify:
"""
import hashlib
import json
import time
from services.config import get_settings
@@ -611,6 +612,99 @@ class TestFetchPrekeyBundleByLookup:
"peer prekey lookup unavailable",
}
def test_fetch_lookup_token_uses_bootstrap_peer_without_agent_id(self, tmp_path, monkeypatch):
"""Invite lookup can resolve through bootstrap peers without exposing agent_id."""
_isolated_relay(tmp_path, monkeypatch)
record = _valid_bundle_record("test-agent")
requested_urls: list[str] = []
monkeypatch.setenv("MESH_BOOTSTRAP_SEED_PEERS", "https://seed.example")
monkeypatch.setenv("MESH_DEFAULT_SYNC_PEERS", "")
monkeypatch.setenv("MESH_RELAY_PEERS", "")
get_settings.cache_clear()
class _Response:
def __enter__(self):
return self
def __exit__(self, *_args):
return False
def read(self, _limit: int = -1):
return json.dumps(
{
"ok": True,
"identity_dh_pub_key": record["dh_pub_key"],
"dh_algo": record["dh_algo"],
"public_key": record["public_key"],
"public_key_algo": record["public_key_algo"],
"protocol_version": record["protocol_version"],
"sequence": 1,
"signed_at": int(record["bundle"].get("signed_at", 0) or 0),
"bundle": record["bundle"],
}
).encode("utf-8")
def _urlopen(request, timeout=0):
requested_urls.append(str(getattr(request, "full_url", "")))
return _Response()
monkeypatch.setattr("services.mesh.mesh_wormhole_prekey.urllib.request.urlopen", _urlopen)
from services.mesh.mesh_wormhole_prekey import fetch_dm_prekey_bundle
result = fetch_dm_prekey_bundle(agent_id="", lookup_token="bootstrap-handle")
assert result["ok"] is True
assert result["agent_id"] == record["agent_id"]
assert result["lookup_mode"] == "invite_lookup_handle"
assert result["public_lookup"] is True
assert requested_urls
assert "lookup_token=bootstrap-handle" in requested_urls[0]
assert "agent_id" not in requested_urls[0]
def test_fetch_lookup_token_does_not_parse_peer_pending_as_bundle(self, tmp_path, monkeypatch):
"""A peer's private-lane pending response is not a malformed prekey bundle."""
_isolated_relay(tmp_path, monkeypatch)
requested_urls: list[str] = []
monkeypatch.setenv("MESH_BOOTSTRAP_SEED_PEERS", "https://seed.example")
monkeypatch.setenv("MESH_DEFAULT_SYNC_PEERS", "")
monkeypatch.setenv("MESH_RELAY_PEERS", "")
get_settings.cache_clear()
class _Response:
def __enter__(self):
return self
def __exit__(self, *_args):
return False
def read(self, _limit: int = -1):
return json.dumps(
{
"ok": True,
"pending": True,
"status": "preparing_private_lane",
"detail": "transport tier insufficient",
}
).encode("utf-8")
def _urlopen(request, timeout=0):
requested_urls.append(str(getattr(request, "full_url", "")))
return _Response()
monkeypatch.setattr("services.mesh.mesh_wormhole_prekey.urllib.request.urlopen", _urlopen)
from services.mesh.mesh_wormhole_prekey import fetch_dm_prekey_bundle
result = fetch_dm_prekey_bundle(agent_id="", lookup_token="bootstrap-handle")
assert requested_urls
assert result["ok"] is False
assert result["detail"] == "peer prekey lookup still preparing"
assert result["detail"] != "Prekey bundle missing signing key"
def test_fetch_agent_id_uses_pinned_contact_lookup_handle(self, tmp_path, monkeypatch):
"""Pinned invite lookup handle is used before direct agent_id lookup."""
relay = _isolated_relay(tmp_path, monkeypatch)
@@ -71,6 +71,38 @@ def _fresh_wormhole_state(tmp_path, monkeypatch):
return relay, mesh_wormhole_identity, mesh_wormhole_contacts, mesh_wormhole_prekey
def test_register_wormhole_dm_key_repairs_missing_local_dh_material(tmp_path, monkeypatch):
relay, identity_mod, _contacts_mod, _prekey_mod = _fresh_wormhole_state(tmp_path, monkeypatch)
identity = identity_mod.read_wormhole_identity()
original_node_id = identity["node_id"]
original_public_key = identity["public_key"]
original_private_key = identity["private_key"]
identity_mod.write_dm_identity(
{
**identity,
"dh_pub_key": "",
"dh_private_key": "",
"bundle_fingerprint": "",
"bundle_sequence": 0,
"bundle_registered_at": 0,
}
)
registered = identity_mod.register_wormhole_dm_key()
repaired = identity_mod.read_wormhole_identity()
assert registered["ok"] is True
assert registered["dh_pub_key"]
assert registered["dh_algo"] == "X25519"
assert repaired["dh_pub_key"] == registered["dh_pub_key"]
assert repaired["dh_private_key"]
assert repaired["node_id"] == original_node_id
assert repaired["public_key"] == original_public_key
assert repaired["private_key"] == original_private_key
assert relay.get_dh_key(original_node_id)["dh_pub_key"] == registered["dh_pub_key"]
def _export_verified_invite(identity_mod):
exported = identity_mod.export_wormhole_dm_invite()
assert exported["ok"] is True
@@ -460,6 +492,30 @@ def test_imported_dm_invite_pins_contact_as_invite_pinned(tmp_path, monkeypatch)
assert contacts_mod.list_wormhole_dm_contacts()[imported["peer_id"]]["trust_level"] == "invite_pinned"
def test_imported_dm_invite_saves_pending_contact_when_prekey_not_visible(tmp_path, monkeypatch):
_relay, identity_mod, contacts_mod, prekey_mod = _fresh_wormhole_state(tmp_path, monkeypatch)
exported, verified = _export_verified_invite(identity_mod)
monkeypatch.setattr(
prekey_mod,
"fetch_dm_prekey_bundle",
lambda **_kw: {"ok": False, "detail": "Prekey bundle not found"},
)
imported = identity_mod.import_wormhole_dm_invite(exported["invite"], alias="alice")
contact = imported["contact"]
assert imported["ok"] is True
assert imported["pending_prekey"] is True
assert imported["peer_id"] == verified["peer_id"]
assert contact["alias"] == "alice"
assert contact["trust_level"] == "invite_pinned"
assert contact["invitePinnedPrekeyLookupHandle"] == exported["invite"]["payload"]["prekey_lookup_handle"]
assert contact["remotePrekeyLookupMode"] == "invite_lookup_handle"
assert contact["remotePrekeyFingerprint"] == verified["trust_fingerprint"]
assert contact["dhPubKey"] == ""
assert contacts_mod.list_wormhole_dm_contacts()[verified["peer_id"]]["trust_level"] == "invite_pinned"
def test_imported_dm_invite_requires_root_attested_prekey_bundle(tmp_path, monkeypatch):
relay, identity_mod, _contacts_mod, _prekey_mod = _fresh_wormhole_state(tmp_path, monkeypatch)
@@ -0,0 +1,69 @@
"""Regression coverage for operator-only control surfaces."""
import pytest
@pytest.mark.parametrize(
("method", "path", "payload"),
[
("get", "/api/wormhole/identity", None),
("post", "/api/wormhole/identity/bootstrap", {}),
("post", "/api/wormhole/gate/enter", {"gate_id": "general-talk"}),
("post", "/api/wormhole/gate/leave", {"gate_id": "general-talk"}),
("post", "/api/wormhole/sign", {"event_type": "gate_event", "payload": {"ok": True}}),
("post", "/api/wormhole/gate/key/rotate", {"gate_id": "general-talk", "reason": "test"}),
(
"post",
"/api/wormhole/gate/key/grant",
{
"gate_id": "general-talk",
"recipient_node_id": "node-test",
"recipient_dh_pub": "dh-test",
},
),
("post", "/api/wormhole/gate/persona/create", {"gate_id": "general-talk", "label": "test"}),
(
"post",
"/api/wormhole/gate/persona/activate",
{"gate_id": "general-talk", "persona_id": "persona-test"},
),
("post", "/api/wormhole/gate/persona/clear", {"gate_id": "general-talk"}),
(
"post",
"/api/wormhole/gate/persona/retire",
{"gate_id": "general-talk", "persona_id": "persona-test"},
),
(
"post",
"/api/wormhole/gate/message/sign-encrypted",
{
"gate_id": "general-talk",
"epoch": 1,
"ciphertext": "ciphertext",
"nonce": "nonce",
"format": "mls1",
"envelope_hash": "hash",
},
),
("post", "/api/wormhole/gate/message/compose", {"gate_id": "general-talk", "plaintext": "hello"}),
("post", "/api/wormhole/sign-raw", {"message": "raw"}),
("post", "/api/wormhole/gate/state/export", {"gate_id": "general-talk"}),
("post", "/api/wormhole/gate/proof", {"gate_id": "general-talk"}),
("post", "/api/wormhole/connect", {}),
("post", "/api/layers", {"layers": {"viirs_nightlights": True}}),
("post", "/api/ais/feed", {"msgs": []}),
],
)
def test_remote_control_surface_rejects_without_local_operator_or_admin(
remote_client, method, path, payload
):
request = getattr(remote_client, method)
response = request(path, json=payload) if payload is not None else request(path)
assert response.status_code == 403
def test_remote_agent_actions_poll_rejects_without_local_operator_or_admin(remote_client):
response = remote_client.get("/api/ai/agent-actions")
assert response.status_code == 403
+52
View File
@@ -0,0 +1,52 @@
"""CrowdThreat ingestion is operator opt-in only."""
class _CrowdThreatResponse:
status_code = 200
def json(self):
return {
"data": {
"threats": [
{
"id": "ct-1",
"title": "Example report",
"location": {
"lng_lat": [12.5, 41.9],
"name": "Example place",
"country": {"name": "Italy"},
},
"category": {"id": 1, "name": "Security"},
}
]
}
}
def test_crowdthreat_disabled_by_default_does_not_call_upstream(monkeypatch):
from services.fetchers import _store, crowdthreat
monkeypatch.delenv("CROWDTHREAT_ENABLED", raising=False)
monkeypatch.setitem(_store.latest_data, "crowdthreat", [{"id": "old"}])
monkeypatch.setattr(
crowdthreat,
"fetch_with_curl",
lambda *args, **kwargs: (_ for _ in ()).throw(AssertionError("upstream called")),
)
crowdthreat.fetch_crowdthreat()
assert _store.latest_data["crowdthreat"] == []
def test_crowdthreat_opt_in_fetches_when_layer_is_enabled(monkeypatch):
from services.fetchers import _store, crowdthreat
monkeypatch.setenv("CROWDTHREAT_ENABLED", "true")
monkeypatch.setitem(_store.active_layers, "crowdthreat", True)
monkeypatch.setattr(crowdthreat, "fetch_with_curl", lambda *args, **kwargs: _CrowdThreatResponse())
crowdthreat.fetch_crowdthreat()
assert _store.latest_data["crowdthreat"][0]["id"] == "ct-1"
assert _store.latest_data["crowdthreat"][0]["source"] == "CrowdThreat"