Compare commits

..

5 Commits

Author SHA1 Message Date
BigBodyCobain e5d793fb73 Fix #243/#252/#253: stop leaking settings posture to anonymous callers
Three settings endpoints were disclosing operational posture or
operator-curated configuration to any network caller. This change
either tightens the redacted-public view (#243) or adds a
local-operator auth gate (#252, #253) per the audit recommendations.

Zero hostility to legitimate users: in all three cases, the Tauri
shell (loopback), the Docker bridge frontend container (#250 + #278),
and any caller with an admin key continue to see the full data. Only
anonymous LAN/internet callers see the reduced surface.

== #243 (Wormhole transport posture, anonymous-mode, profile, node mode)

Tightened the public-redaction allowlists in BOTH the main.py and
routers/wormhole.py copies:
- _WORMHOLE_PUBLIC_SETTINGS_FIELDS: {enabled, transport, anonymous_mode}
                                 -> {enabled}
- _WORMHOLE_PUBLIC_PROFILE_FIELDS: {profile, wormhole_enabled}
                                 -> {wormhole_enabled}

`GET /api/settings/node` (both the routers/admin.py and main.py copies)
now returns an empty stub for unauthenticated callers and the full
node_mode + node_enabled fields only for authenticated callers via
_scoped_view_authenticated(request, "node").

== #252 (news feed inventory disclosure)

`GET /api/settings/news-feeds` now requires Depends(require_local_operator)
in both the canonical routers/admin.py handler and the duplicate main.py
handler. Anonymous callers can no longer enumerate operator-curated
feed names and URLs.

== #253 (Time Machine archival-capture posture disclosure)

`GET /api/settings/timemachine` now requires Depends(require_local_operator).
Anonymous callers can no longer fingerprint whether a deployment is
retaining replayable historical surveillance data.

Tests: backend/tests/test_round5_settings_info_disclosure.py (10 tests)
- Wormhole settings: anonymous sees only `enabled`; authenticated sees full state.
- Privacy profile: anonymous sees only `wormhole_enabled`; authenticated sees `profile` + `transport` + `anonymous_mode`.
- Node settings: anonymous sees `{}`; authenticated sees node_mode + node_enabled + persisted state.
- news-feeds: anonymous gets 403 (and get_feeds() is NOT called); authenticated gets full inventory.
- timemachine: anonymous gets 403; authenticated sees enabled + storage_warning.

Local: 73/73 security suite (round 5 + earlier rounds) green.

Credit: tg12 (external security audit, P1 + 2x Medium).
2026-05-21 10:26:59 -06:00
Shadowbroker 2b03b808ac Fix #279: add defusedxml to uv.lock so Docker image installs it (#282)
defusedxml is listed in backend/pyproject.toml line 18 but was missing
from uv.lock. The backend Dockerfile uses `uv sync --frozen --no-dev`,
which only installs packages pinned in the lockfile. As a result the
runtime image shipped without defusedxml even though pyproject declared
it, and any import path that touched it crashed at startup with:

    ModuleNotFoundError: No module named 'defusedxml'

Affected import sites:

- backend/services/psk_reporter_fetcher.py:10
- backend/services/fetchers/aircraft_database.py:21
- backend/services/cctv_pipeline.py:990
- backend/services/cctv_pipeline.py:1018

Fix: regenerate uv.lock so defusedxml v0.7.1 (matching the >=0.7.1
specifier in pyproject) is locked. No code changes -- only the lockfile.
Next image build picks it up via the existing `uv sync --frozen` step.

Reporter: external user. Thanks for catching the missing dep.
2026-05-21 10:18:40 -06:00
Shadowbroker 2e14e75a0e Fix #256: per-peer HMAC secrets defeat cross-peer impersonation (#281)
Before this change, every peer-push HMAC was derived from the single
fleet-shared MESH_PEER_PUSH_SECRET. The receiver could prove "this
request was signed by someone who knows the fleet secret" but it could
NOT prove which peer signed it. Any peer that knew the global secret
could compute the expected HMAC for any other peer URL and forge a
push pretending to be that peer.

Fix: introduce MESH_PEER_SECRETS, an optional comma-separated
url=secret map. When a peer URL appears in the map, only the listed
per-peer secret is accepted for it -- the global secret is ignored for
that specific URL. Peer A no longer knows peer B's secret, so peer A
cannot forge a push claiming to be peer B.

The new helper resolve_peer_key_for_url() in mesh_crypto.py wraps the
lookup and is called from every existing peer-push call site:

- backend/auth.py:_verify_peer_push_hmac (receiver)
- backend/main.py:_http_peer_push_loop (Infonet event push)
- backend/main.py:_http_gate_pull_loop (gate event pull)
- backend/main.py:_http_gate_push_loop (gate event push)
- backend/services/mesh/mesh_router.py (two transports, push)
- backend/services/mesh/mesh_hashchain.py (gate wire ref key)
- backend/services/mesh/mesh_wormhole_prekey.py (peer prekey lookup)

Zero hostility, by design:

- Single-peer installs leave MESH_PEER_SECRETS empty -> resolver falls
  back to MESH_PEER_PUSH_SECRET -> behavior is byte-for-byte unchanged.
- Multi-peer installs that haven't migrated yet behave exactly as
  before.
- Multi-peer installs that DO migrate set MESH_PEER_SECRETS on both
  ends of each peering and immediately close the impersonation surface
  for those URLs. Migration is incremental: unlisted peers keep using
  the global secret.

Tests in backend/tests/test_per_peer_secret_resolver.py:
- env parsing (default, override, whitespace, malformed entries, cache)
- precedence: per-peer beats global
- migration window: unlisted peer falls back to global
- IMPERSONATION REFUSAL: peer A with global-secret-only cannot forge
  HMAC for peer B that has a per-peer secret configured
- IMPERSONATION REFUSAL: peer A with its OWN per-peer secret cannot
  forge HMAC for peer B
- positive control: legitimate peer B request verifies
- zero-behavior-change: single-peer install produces the same key bytes
  as before the change

Credit: tg12 (external security audit, P1/High/High confidence)
2026-05-21 10:05:29 -06:00
Shadowbroker 084e563412 Fix #240/#241: require admin auth on oracle resolve endpoints (#280)
Both POST /api/mesh/oracle/resolve and POST /api/mesh/oracle/resolve-stakes
were previously gated only by a rate limit (5/min) and tagged with
`mesh_write_exempt(MeshWriteExemption.ADMIN_CONTROL)`. The exemption
decorator is metadata only — it tells the mesh signed-write middleware
not to require a signature envelope, it does NOT enforce caller
authorization. Any network caller could:

- /resolve: settle any prediction market to any outcome (corrupts every
  downstream profile/win-loss count derived from that ledger).
- /resolve-stakes: trigger stake settlement for all expired contests at
  a time of their choosing (race against operator intent).

Fix: add `dependencies=[Depends(require_admin)]` to both routes. The
existing `mesh_write_exempt` tag stays in place because it accurately
describes the route's relationship to the signed-write envelope system;
adding `require_admin` is what closes the actual auth hole.

Tests in backend/tests/test_oracle_resolve_auth_gate.py:
- anonymous caller -> 403, ledger mutator NOT called
- wrong admin key -> 403, ledger mutator NOT called
- valid admin key -> 200, ledger mutator called
- admin key unconfigured + no debug/insecure-admin -> 403

Credit: tg12 (external security audit)
2026-05-21 09:45:08 -06:00
Shadowbroker 9ef6213284 Fix #250: bind Docker bridge local-operator trust to frontend hostname (#278)
Tightens the bridge-trust check so a connection on the Docker bridge
is only granted local-operator status when its source IP matches a
configured frontend container hostname (default: `frontend` + the
shipped `container_name` `shadowbroker-frontend`). Previously, when
`SHADOWBROKER_TRUST_DOCKER_BRIDGE_LOCAL_OPERATOR=1` was set, ANY IP
in the 172.16.0.0/12 range was granted local-operator privileges —
on a shared Docker host that included any unrelated container on the
same bridge.

Operators with renamed services can list new hostnames via the new
`SHADOWBROKER_TRUSTED_FRONTEND_HOSTS` env var (comma-separated). DNS
resolution is cached for 30s; if Docker DNS can't resolve any of the
configured names we fail closed and refuse the bridge entirely.

Single-user installs see no behavior change — the default-named
frontend container still resolves and is still trusted.

Credit: tg12 (external security audit)
2026-05-21 02:06:11 -06:00
15 changed files with 1067 additions and 54 deletions
+10 -5
View File
@@ -45,6 +45,7 @@ from services.mesh.mesh_compatibility import (
from services.mesh.mesh_crypto import (
_derive_peer_key,
normalize_peer_url,
resolve_peer_key_for_url,
verify_signature,
verify_node_binding,
parse_public_key_algo,
@@ -1403,11 +1404,15 @@ def _peer_hmac_url_from_request(request: Request) -> str:
def _verify_peer_push_hmac(request: Request, body_bytes: bytes) -> bool:
"""Verify HMAC-SHA256 peer authentication on push requests."""
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
if not secret:
return False
"""Verify HMAC-SHA256 peer authentication on push requests.
Issue #256: ``resolve_peer_key_for_url`` looks up a per-peer secret
in ``MESH_PEER_SECRETS`` first, then falls back to the global
``MESH_PEER_PUSH_SECRET``. When a peer URL is listed in the per-peer
map, only the listed secret is accepted for it — the global secret
is ignored, so any peer that knows only the global secret cannot
forge a request claiming to be that peer.
"""
provided = str(request.headers.get("x-peer-hmac", "") or "").strip()
if not provided:
return False
@@ -1416,7 +1421,7 @@ def _verify_peer_push_hmac(request: Request, body_bytes: bytes) -> bool:
allowed_peers = set(authenticated_push_peer_urls())
if not peer_url or peer_url not in allowed_peers:
return False
peer_key = _derive_peer_key(secret, peer_url)
peer_key = resolve_peer_key_for_url(peer_url)
if not peer_key:
return False
+43 -18
View File
@@ -220,6 +220,7 @@ from services.mesh.mesh_crypto import (
_derive_peer_key,
derive_node_id,
normalize_peer_url,
resolve_peer_key_for_url,
verify_node_binding,
parse_public_key_algo,
)
@@ -1079,8 +1080,18 @@ def _public_mesh_log_size(entries: list[dict[str, Any]]) -> int:
return sum(1 for item in entries if _public_mesh_log_entry(item) is not None)
_WORMHOLE_PUBLIC_SETTINGS_FIELDS = {"enabled", "transport", "anonymous_mode"}
_WORMHOLE_PUBLIC_PROFILE_FIELDS = {"profile", "wormhole_enabled"}
# Issue #243 (tg12): the public redaction now exposes only the bare
# "is Wormhole on?" boolean. Transport choice (tor/i2p/mixnet/direct),
# anonymous-mode state, and the named privacy profile are all
# operational posture and were leaking actionable recon to any
# unauthenticated caller. They are now gated behind authenticated reads
# (admin key or scoped-view token). Loopback Tauri shells and Docker
# bridge frontend containers continue to see full status because the
# Next.js catch-all proxy injects the configured ADMIN_KEY for
# same-origin/non-browser callers (see PR #263), so legitimate operator
# UX is unaffected.
_WORMHOLE_PUBLIC_SETTINGS_FIELDS = {"enabled"}
_WORMHOLE_PUBLIC_PROFILE_FIELDS = {"wormhole_enabled"}
_PRIVATE_LANE_CONTROL_FIELDS = {"private_lane_tier", "private_lane_policy"}
_PUBLIC_RNS_STATUS_FIELDS = {"enabled", "ready", "configured_peers", "active_peers"}
_NODE_PUBLIC_EVENT_HOOK_REGISTERED = False
@@ -1745,10 +1756,12 @@ def _http_peer_push_loop() -> None:
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
continue
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
if not secret:
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
continue
# Issue #256: resolve_peer_key_for_url() handles both the
# legacy global MESH_PEER_PUSH_SECRET path and the per-peer
# MESH_PEER_SECRETS map. The per-peer skip happens below
# ("if not peer_key: continue"), so we don't gate the whole
# loop on the global secret being set — an install that only
# configures per-peer secrets is now valid.
peers = authenticated_push_peer_urls()
if not peers:
@@ -1778,7 +1791,7 @@ def _http_peer_push_loop() -> None:
ensure_ascii=False,
).encode("utf-8")
peer_key = _derive_peer_key(secret, normalized)
peer_key = resolve_peer_key_for_url(normalized)
if not peer_key:
continue
import hmac as _hmac_mod2
@@ -1831,10 +1844,7 @@ def _http_gate_pull_loop() -> None:
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
continue
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
if not secret:
_NODE_SYNC_STOP.wait(_GATE_PULL_INTERVAL_S)
continue
# Issue #256: per-peer key resolution; see _http_peer_push_loop.
peers = authenticated_push_peer_urls()
if not peers:
@@ -1846,7 +1856,7 @@ def _http_gate_pull_loop() -> None:
if not normalized:
continue
peer_key = _derive_peer_key(secret, normalized)
peer_key = resolve_peer_key_for_url(normalized)
if not peer_key:
continue
@@ -1959,10 +1969,7 @@ def _http_gate_push_loop() -> None:
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
continue
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
if not secret:
_NODE_SYNC_STOP.wait(_PEER_PUSH_INTERVAL_S)
continue
# Issue #256: per-peer key resolution; see _http_peer_push_loop.
peers = authenticated_push_peer_urls()
if not peers:
@@ -1977,7 +1984,7 @@ def _http_gate_push_loop() -> None:
if not normalized:
continue
peer_key = _derive_peer_key(secret, normalized)
peer_key = resolve_peer_key_for_url(normalized)
if not peer_key:
continue
@@ -8813,9 +8820,14 @@ async def api_uw_flow(request: Request):
from services.news_feed_config import get_feeds, save_feeds, reset_feeds
@app.get("/api/settings/news-feeds")
@app.get(
"/api/settings/news-feeds",
dependencies=[Depends(require_local_operator)],
)
@limiter.limit("30/minute")
async def api_get_news_feeds(request: Request):
"""Issue #252 (tg12): gated on local-operator. See the canonical
handler in backend/routers/admin.py for the full rationale."""
return get_feeds()
@@ -9018,9 +9030,22 @@ class NodeSettingsUpdate(BaseModel):
@app.get("/api/settings/node")
@limiter.limit("30/minute")
async def api_get_node_settings(request: Request):
"""Issue #243 (tg12): node mode and participant state are
operational posture. Anonymous callers receive an empty stub
enough for the UI to know the endpoint exists but nothing
fingerprintable. Authenticated callers see the full state.
Authenticated == local-operator (loopback / Docker bridge) OR an
admin / scoped-view token. The Tauri shell and Docker frontend
container both qualify via their existing transport (PR #263 +
PR #278), so legitimate operator UX is unchanged.
"""
from services.node_settings import read_node_settings
data = await asyncio.to_thread(read_node_settings)
authenticated = _scoped_view_authenticated(request, "node")
if not authenticated:
return {}
return {
**data,
"node_mode": _current_node_mode(),
+30 -2
View File
@@ -82,9 +82,18 @@ async def api_get_keys_meta(request: Request):
return get_env_path_info()
@router.get("/api/settings/news-feeds")
@router.get(
"/api/settings/news-feeds",
dependencies=[Depends(require_local_operator)],
)
@limiter.limit("30/minute")
async def api_get_news_feeds(request: Request):
"""Issue #252 (tg12): the curated feed inventory is configuration
state, not a public data feed. Gated on local-operator so the
Tauri shell, the Docker bridge frontend, and any caller with an
admin key all see the full list; anonymous LAN/internet callers
can no longer enumerate operator source URLs.
"""
from services.news_feed_config import get_feeds
return get_feeds()
@@ -118,9 +127,18 @@ async def api_reset_news_feeds(request: Request):
@router.get("/api/settings/node")
@limiter.limit("30/minute")
async def api_get_node_settings(request: Request):
"""Issue #243 (tg12): node_mode and node_enabled are operational
posture. Anonymous callers receive an empty stub; authenticated
callers (local-operator or admin/scoped token) see the full
state. See the canonical handler in backend/main.py for the full
rationale.
"""
import asyncio
from auth import _scoped_view_authenticated
from services.node_settings import read_node_settings
data = await asyncio.to_thread(read_node_settings)
if not _scoped_view_authenticated(request, "node"):
return {}
return {
**data,
"node_mode": _current_node_mode(),
@@ -210,9 +228,19 @@ async def api_set_meshtastic_mqtt_settings(request: Request, body: MeshtasticMqt
return _meshtastic_runtime_snapshot()
@router.get("/api/settings/timemachine")
@router.get(
"/api/settings/timemachine",
dependencies=[Depends(require_local_operator)],
)
@limiter.limit("30/minute")
async def api_get_timemachine_settings(request: Request):
"""Issue #253 (tg12): archival-capture posture is operationally
sensitive — it tells a remote caller whether this deployment is
retaining replayable historical surveillance data. Gated on
local-operator so the Tauri shell and Docker bridge frontend
still see the toggle state, but anonymous LAN/internet callers
can no longer fingerprint Time Machine state.
"""
import asyncio
from services.node_settings import read_node_settings
data = await asyncio.to_thread(read_node_settings)
+21 -4
View File
@@ -223,11 +223,21 @@ async def oracle_markets_more(request: Request, category: str = "NEWS", offset:
"has_more": offset + limit < len(cat_markets), "total": len(cat_markets)}
@router.post("/api/mesh/oracle/resolve")
@router.post(
"/api/mesh/oracle/resolve",
dependencies=[Depends(require_admin)],
)
@limiter.limit("5/minute")
@mesh_write_exempt(MeshWriteExemption.ADMIN_CONTROL)
async def oracle_resolve(request: Request):
"""Resolve a prediction market."""
"""Resolve a prediction market.
Issue #240 (tg12): requires admin authentication. The
``mesh_write_exempt`` decorator below is **metadata only** — it tags
the route as not requiring a mesh signed-write envelope, it does
NOT itself enforce caller authorization. The ``Depends(require_admin)``
on the route decorator is what actually gates access.
"""
from services.mesh.mesh_oracle import oracle_ledger
body = await request.json()
market_title = body.get("market_title", "")
@@ -327,11 +337,18 @@ async def oracle_predictions(request: Request, node_id: str = ""):
active_predictions, authenticated=_scoped_view_authenticated(request, "mesh.audit"))
@router.post("/api/mesh/oracle/resolve-stakes")
@router.post(
"/api/mesh/oracle/resolve-stakes",
dependencies=[Depends(require_admin)],
)
@limiter.limit("5/minute")
@mesh_write_exempt(MeshWriteExemption.ADMIN_CONTROL)
async def oracle_resolve_stakes(request: Request):
"""Resolve all expired stake contests."""
"""Resolve all expired stake contests.
Issue #241 (tg12): requires admin authentication. See the note on
``oracle_resolve`` above — ``mesh_write_exempt`` is metadata only.
"""
from services.mesh.mesh_oracle import oracle_ledger
resolutions = oracle_ledger.resolve_expired_stakes()
return {"ok": True, "resolutions": resolutions, "count": len(resolutions)}
+7 -2
View File
@@ -160,8 +160,13 @@ router = APIRouter()
# --- Constants ---
_WORMHOLE_PUBLIC_SETTINGS_FIELDS = {"enabled", "transport", "anonymous_mode"}
_WORMHOLE_PUBLIC_PROFILE_FIELDS = {"profile", "wormhole_enabled"}
# Issue #243 (tg12): the public redaction now exposes only the bare
# "is this on?" boolean. Transport choice, anonymous-mode state, and
# the named privacy profile were all leaking actionable recon to
# unauthenticated callers and are now gated behind authenticated reads.
# See the matching block in backend/main.py for the full rationale.
_WORMHOLE_PUBLIC_SETTINGS_FIELDS = {"enabled"}
_WORMHOLE_PUBLIC_PROFILE_FIELDS = {"wormhole_enabled"}
_PRIVATE_LANE_CONTROL_FIELDS = {"private_lane_tier", "private_lane_policy"}
_PUBLIC_RNS_STATUS_FIELDS = {"enabled", "ready", "configured_peers", "active_peers"}
_NODE_PUBLIC_EVENT_HOOK_REGISTERED = False
+6
View File
@@ -53,6 +53,12 @@ class Settings(BaseSettings):
MESH_RELAY_FAILURE_COOLDOWN_S: int = 120
MESH_BOOTSTRAP_SEED_FAILURE_COOLDOWN_S: int = 15
MESH_PEER_PUSH_SECRET: str = ""
# Issue #256 (tg12): optional per-peer HMAC secret map. Comma-separated
# `url=secret` pairs. When a peer URL appears here, only that per-peer
# secret is accepted for it — the global MESH_PEER_PUSH_SECRET above is
# ignored for that specific URL. Single-peer installs and unmigrated
# multi-peer installs leave this empty and behavior is unchanged.
MESH_PEER_SECRETS: str = ""
MESH_RNS_APP_NAME: str = "shadowbroker"
MESH_RNS_ASPECT: str = "infonet"
MESH_RNS_IDENTITY_PATH: str = ""
+109
View File
@@ -69,6 +69,115 @@ def _derive_peer_key(shared_secret: str, peer_url: str) -> bytes:
).digest()
# ---------------------------------------------------------------------------
# Issue #256 (tg12): per-peer HMAC secrets
# ---------------------------------------------------------------------------
#
# Before this change, ALL peer-push HMACs were derived from a single
# fleet-shared ``MESH_PEER_PUSH_SECRET``. The receiver could prove a
# request was signed by *someone who knows the fleet secret*, but it
# could NOT prove which peer signed it — any peer could compute the
# expected HMAC for any other peer's URL and impersonate that peer.
#
# Fix: an optional ``MESH_PEER_SECRETS`` env var maps specific peer URLs
# to per-peer secrets. When a peer URL is listed there, only that
# per-peer secret is accepted for that URL — the global secret is
# ignored for that peer. Peer A no longer learns peer B's secret, so
# peer A cannot forge a request claiming to be peer B.
#
# Backwards-compatible by design:
#
# - Single-peer installs (``MESH_PEER_SECRETS`` empty) keep using the
# global secret. Zero behavior change. Zero operator action required.
# - Multi-peer installs that haven't migrated yet keep using the global
# secret for every peer. Same behavior as before — same exposure.
# - Multi-peer installs that have migrated configure
# ``MESH_PEER_SECRETS=urlA=secretA,urlB=secretB`` and immediately get
# per-peer identity. Migration is incremental: peers not yet listed
# continue using the global secret until both sides of that peering
# add their entry.
_PEER_SECRETS_CACHE: dict[str, str] = {}
_PEER_SECRETS_CACHE_RAW: str = ""
def _lookup_per_peer_secret(normalized_url: str) -> str:
"""Return the per-peer secret for ``normalized_url`` from MESH_PEER_SECRETS.
Returns "" if no per-peer entry is configured for that URL. The parser
is forgiving:
- Whitespace around items, URLs, and secrets is stripped.
- Items without ``=`` or with empty URL/secret halves are skipped.
- The URL half is normalized via ``normalize_peer_url`` so config
authors don't have to match scheme/port/path quirks exactly.
The cache is invalidated whenever the env var's raw value changes,
which keeps tests' ``monkeypatch.setenv`` calls effective without
forcing a process restart.
"""
import os
raw = str(os.environ.get("MESH_PEER_SECRETS", "") or "").strip()
global _PEER_SECRETS_CACHE, _PEER_SECRETS_CACHE_RAW
if raw != _PEER_SECRETS_CACHE_RAW:
new_cache: dict[str, str] = {}
for chunk in raw.split(","):
chunk = chunk.strip()
if not chunk or "=" not in chunk:
continue
url_part, _, secret_part = chunk.partition("=")
normalized = normalize_peer_url(url_part.strip())
secret = secret_part.strip()
if normalized and secret:
new_cache[normalized] = secret
_PEER_SECRETS_CACHE = new_cache
_PEER_SECRETS_CACHE_RAW = raw
return _PEER_SECRETS_CACHE.get(normalized_url, "")
def resolve_peer_key_for_url(peer_url: str) -> bytes:
"""Return the HMAC key for ``peer_url``, preferring per-peer secret.
Issue #256: this is the function every peer-push call site should
use. It looks up the peer-specific secret first, falling back to the
fleet-shared ``MESH_PEER_PUSH_SECRET`` only when the URL is NOT
listed in ``MESH_PEER_SECRETS``.
Both sender (computing X-Peer-HMAC) and receiver (verifying it) call
this with the SENDER's URL — they must derive the same key, so
operators on both ends of a peering need matching MESH_PEER_SECRETS
entries for that URL to stay in sync.
Returns empty bytes when no usable secret exists. Callers must treat
that as fail-closed (skip the push, reject the verification).
"""
normalized_url = normalize_peer_url(peer_url)
if not normalized_url:
return b""
per_peer_secret = _lookup_per_peer_secret(normalized_url)
if per_peer_secret:
return _derive_peer_key(per_peer_secret, normalized_url)
# No per-peer entry for this URL — fall back to the legacy global
# secret. This is what preserves zero-hostility for single-peer
# installs and the migration window for multi-peer installs.
try:
from services.config import get_settings
global_secret = str(
getattr(get_settings(), "MESH_PEER_PUSH_SECRET", "") or ""
).strip()
except Exception:
return b""
if not global_secret:
return b""
return _derive_peer_key(global_secret, normalized_url)
def _node_digest(public_key_b64: str) -> str:
raw = base64.b64decode(public_key_b64)
return hashlib.sha256(raw).hexdigest()
+8 -7
View File
@@ -216,18 +216,19 @@ def _peer_pair_ref_key(peer_url: str) -> bytes:
Returns an empty key on misconfiguration so callers fail closed.
"""
try:
from services.config import get_settings
from services.mesh.mesh_crypto import _derive_peer_key, normalize_peer_url
secret = str(get_settings().MESH_PEER_PUSH_SECRET or "").strip()
from services.mesh.mesh_crypto import (
normalize_peer_url,
resolve_peer_key_for_url,
)
except Exception:
return b""
if not secret:
return b""
normalized = normalize_peer_url(peer_url or "")
if not normalized:
return b""
peer_key = _derive_peer_key(secret, normalized)
# Issue #256: resolve_peer_key_for_url() prefers per-peer secrets
# from MESH_PEER_SECRETS and falls back to the global
# MESH_PEER_PUSH_SECRET only when the URL has no per-peer entry.
peer_key = resolve_peer_key_for_url(normalized)
if not peer_key:
return b""
# Domain-separate from the transport HMAC key so the two
+16 -11
View File
@@ -26,7 +26,11 @@ from enum import Enum
from typing import Any, Callable, Optional
from collections import deque
from urllib.parse import urlparse
from services.mesh.mesh_crypto import _derive_peer_key, normalize_peer_url
from services.mesh.mesh_crypto import (
_derive_peer_key,
normalize_peer_url,
resolve_peer_key_for_url,
)
from services.mesh.mesh_metrics import increment as metrics_inc
from services.mesh.mesh_privacy_policy import (
TRANSPORT_TIER_ORDER as _TIER_RANK,
@@ -703,7 +707,6 @@ class InternetTransport(_PeerPushTransportMixin):
endpoint_path, padded = self._build_peer_push_request(envelope, self.NAME)
except ValueError as exc:
return TransportResult(False, self.NAME, str(exc))
secret = str(settings.MESH_PEER_PUSH_SECRET or "").strip()
delivered = 0
last_error = ""
@@ -713,10 +716,13 @@ class InternetTransport(_PeerPushTransportMixin):
try:
normalized_peer_url = normalize_peer_url(peer_url)
headers = {"Content-Type": "application/json"}
if secret:
peer_key = _derive_peer_key(secret, normalized_peer_url)
if not peer_key:
raise ValueError("invalid peer URL for HMAC derivation")
# Issue #256: per-peer secret takes precedence over the
# global MESH_PEER_PUSH_SECRET. When neither is set the
# key is empty and we skip the HMAC header entirely so a
# bare (unsigned) push still works on test deployments
# that have not yet configured any secret at all.
peer_key = resolve_peer_key_for_url(normalized_peer_url)
if peer_key:
headers["X-Peer-Url"] = normalized_peer_url
headers["X-Peer-HMAC"] = hmac.new(
peer_key,
@@ -798,7 +804,6 @@ class TorArtiTransport(_PeerPushTransportMixin):
endpoint_path, padded = self._build_peer_push_request(envelope, self.NAME)
except ValueError as exc:
return TransportResult(False, self.NAME, str(exc))
secret = str(settings.MESH_PEER_PUSH_SECRET or "").strip()
delivered = 0
last_error = ""
@@ -808,10 +813,10 @@ class TorArtiTransport(_PeerPushTransportMixin):
try:
normalized_peer_url = normalize_peer_url(peer_url)
headers = {"Content-Type": "application/json"}
if secret:
peer_key = _derive_peer_key(secret, normalized_peer_url)
if not peer_key:
raise ValueError("invalid peer URL for HMAC derivation")
# Issue #256: per-peer secret takes precedence; see the
# other transport above for the rationale.
peer_key = resolve_peer_key_for_url(normalized_peer_url)
if peer_key:
headers["X-Peer-Url"] = normalized_peer_url
headers["X-Peer-HMAC"] = hmac.new(
peer_key,
@@ -91,13 +91,15 @@ def _fetch_dm_prekey_bundle_from_peer_lookup(lookup_token: str) -> dict[str, Any
return {"ok": False, "detail": "lookup token required"}
try:
from services.config import get_settings
from services.mesh.mesh_crypto import _derive_peer_key, normalize_peer_url
from services.mesh.mesh_crypto import (
normalize_peer_url,
resolve_peer_key_for_url,
)
from services.mesh.mesh_router import configured_relay_peer_urls
settings = get_settings()
secret = str(getattr(settings, "MESH_PEER_PUSH_SECRET", "") or "").strip()
if not secret:
return {"ok": False, "detail": "peer prekey lookup unavailable"}
# Issue #256: secret check moved per-peer below. We still bail out
# cleanly when there are no peers configured at all.
peers = configured_relay_peer_urls()
if not peers:
return {"ok": False, "detail": "peer prekey lookup unavailable"}
@@ -121,7 +123,8 @@ def _fetch_dm_prekey_bundle_from_peer_lookup(lookup_token: str) -> dict[str, Any
or os.environ.get("SB_TEST_NODE_URL", "").strip()
or normalized_peer_url
)
peer_key = _derive_peer_key(secret, sender_peer_url)
# Issue #256: prefer per-peer secret keyed by the sender URL.
peer_key = resolve_peer_key_for_url(sender_peer_url)
if not peer_key:
continue
headers = {
@@ -0,0 +1,160 @@
"""Issues #240 & #241 (tg12): oracle market/stake resolution endpoints
must require admin authentication.
Before the fix, ``POST /api/mesh/oracle/resolve`` and
``POST /api/mesh/oracle/resolve-stakes`` were decorated with
``@mesh_write_exempt(MeshWriteExemption.ADMIN_CONTROL)``. That decorator
only tags the route as not requiring a mesh signed-write envelope; it
does NOT enforce authorization. The rate limiter (5/minute) was the
only real gate, which is wrong for control-plane state mutations.
The fix adds ``dependencies=[Depends(require_admin)]`` to both routes.
These tests prove:
- Anonymous callers receive 403.
- A request bearing the configured admin key passes the auth gate.
- The underlying ledger mutator is not invoked on a 403.
"""
from __future__ import annotations
from unittest.mock import patch, MagicMock
import pytest
from fastapi.testclient import TestClient
_ADMIN_KEY = "test-admin-key-for-oracle-resolve-fixture-32+"
@pytest.fixture
def client():
"""TestClient with the private-lane transport middleware short-circuited.
The ``enforce_high_privacy_mesh`` middleware in ``main.py`` returns
HTTP 202 ("preparing private lane") for ``/api/mesh/*`` requests
when the Wormhole supervisor is not yet at the required transport
tier. In tests that's always — Wormhole is not running. Patching
``_minimum_transport_tier`` to return None disables the tier check
for the duration of the test, letting the request reach the route
(and therefore reach the ``Depends(require_admin)`` we are testing).
"""
import main
with patch("main._minimum_transport_tier", return_value=None):
yield TestClient(main.app, raise_server_exceptions=False)
@pytest.fixture
def mock_ledger():
"""Replace oracle_ledger methods so tests don't mutate persistent state.
The handler does ``from services.mesh.mesh_oracle import oracle_ledger``
at call time, so we patch the module attribute.
"""
fake = MagicMock()
fake.resolve_market.return_value = (0, 0)
fake.resolve_market_stakes.return_value = {"winners": 0, "losers": 0}
fake.resolve_expired_stakes.return_value = []
with patch("services.mesh.mesh_oracle.oracle_ledger", fake):
yield fake
# ---------------------------------------------------------------------------
# /api/mesh/oracle/resolve — issue #240
# ---------------------------------------------------------------------------
class TestOracleResolveAuthGate:
def test_anonymous_caller_is_rejected(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post(
"/api/mesh/oracle/resolve",
json={"market_title": "test-market", "outcome": "Yes"},
)
assert r.status_code == 403
# Critically: the ledger mutator must NOT have been called on a 403.
assert mock_ledger.resolve_market.call_count == 0
assert mock_ledger.resolve_market_stakes.call_count == 0
def test_wrong_admin_key_rejected(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post(
"/api/mesh/oracle/resolve",
headers={"X-Admin-Key": "this-key-is-wrong"},
json={"market_title": "test-market", "outcome": "Yes"},
)
assert r.status_code == 403
assert mock_ledger.resolve_market.call_count == 0
def test_valid_admin_key_passes_auth_gate(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post(
"/api/mesh/oracle/resolve",
headers={"X-Admin-Key": _ADMIN_KEY},
json={"market_title": "test-market", "outcome": "Yes"},
)
# The auth gate let us through. The handler ran and called the
# (mocked) ledger.
assert r.status_code == 200
assert mock_ledger.resolve_market.call_count == 1
assert mock_ledger.resolve_market.call_args[0] == ("test-market", "Yes")
def test_admin_key_unset_blocks_in_production_posture(self, client, mock_ledger):
"""When ADMIN_KEY env is not configured at all and we're not in
debug, the endpoint must still refuse never silently accept."""
with (
patch("auth._current_admin_key", return_value=""),
patch("auth._allow_insecure_admin", return_value=False),
patch("auth._debug_mode_enabled", return_value=False),
patch("auth._scoped_admin_tokens", return_value={}),
):
r = client.post(
"/api/mesh/oracle/resolve",
json={"market_title": "test-market", "outcome": "Yes"},
)
assert r.status_code == 403
assert mock_ledger.resolve_market.call_count == 0
# ---------------------------------------------------------------------------
# /api/mesh/oracle/resolve-stakes — issue #241
# ---------------------------------------------------------------------------
class TestOracleResolveStakesAuthGate:
def test_anonymous_caller_is_rejected(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post("/api/mesh/oracle/resolve-stakes")
assert r.status_code == 403
assert mock_ledger.resolve_expired_stakes.call_count == 0
def test_wrong_admin_key_rejected(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post(
"/api/mesh/oracle/resolve-stakes",
headers={"X-Admin-Key": "nope"},
)
assert r.status_code == 403
assert mock_ledger.resolve_expired_stakes.call_count == 0
def test_valid_admin_key_passes_auth_gate(self, client, mock_ledger):
with patch("auth._current_admin_key", return_value=_ADMIN_KEY):
r = client.post(
"/api/mesh/oracle/resolve-stakes",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
assert mock_ledger.resolve_expired_stakes.call_count == 1
body = r.json()
assert body["ok"] is True
assert body["count"] == 0
def test_admin_key_unset_blocks_in_production_posture(self, client, mock_ledger):
with (
patch("auth._current_admin_key", return_value=""),
patch("auth._allow_insecure_admin", return_value=False),
patch("auth._debug_mode_enabled", return_value=False),
patch("auth._scoped_admin_tokens", return_value={}),
):
r = client.post("/api/mesh/oracle/resolve-stakes")
assert r.status_code == 403
assert mock_ledger.resolve_expired_stakes.call_count == 0
@@ -0,0 +1,366 @@
"""Issue #256 (tg12): per-peer HMAC secrets must defeat cross-peer
impersonation.
Before the fix, ALL peer-push HMACs were derived from the single
fleet-shared ``MESH_PEER_PUSH_SECRET``. The receiver could only prove
"this request was signed by someone who knows the fleet secret" not
which peer signed it. Any peer that knew the secret could compute the
expected HMAC for any other peer's URL and impersonate that peer.
The fix introduces ``MESH_PEER_SECRETS``, a per-peer URL-to-secret map.
When a peer URL appears there:
- Only the listed per-peer secret is accepted for that URL.
- The global ``MESH_PEER_PUSH_SECRET`` is ignored for that specific URL.
- A peer that knows only the global secret (or a different peer's
per-peer secret) cannot forge a request claiming to be that peer.
When a peer URL is NOT listed (the common case for single-peer installs
and for migration windows), the resolver falls back to the global
secret preserving existing behavior with zero operator action.
These tests exercise ``resolve_peer_key_for_url`` directly so we cover
the security contract without spinning up a full mesh node.
"""
from __future__ import annotations
import hashlib
import hmac
import pytest
# ---------------------------------------------------------------------------
# _lookup_per_peer_secret — env parsing
# ---------------------------------------------------------------------------
class TestLookupPerPeerSecret:
def setup_method(self):
# Invalidate the parser cache so each test sees its own env state.
from services.mesh import mesh_crypto
mesh_crypto._PEER_SECRETS_CACHE = {}
mesh_crypto._PEER_SECRETS_CACHE_RAW = ""
def test_returns_empty_when_env_unset(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.delenv("MESH_PEER_SECRETS", raising=False)
assert _lookup_per_peer_secret("https://peer.example") == ""
def test_returns_empty_when_env_blank(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv("MESH_PEER_SECRETS", "")
assert _lookup_per_peer_secret("https://peer.example") == ""
def test_returns_per_peer_secret_for_listed_url(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-a.example=secretA,https://peer-b.example=secretB",
)
assert _lookup_per_peer_secret("https://peer-a.example") == "secretA"
assert _lookup_per_peer_secret("https://peer-b.example") == "secretB"
def test_returns_empty_for_url_not_listed(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-a.example=secretA",
)
assert _lookup_per_peer_secret("https://other.example") == ""
def test_url_is_normalized_before_lookup(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
# Configure with a trailing slash + uppercase host. Lookup with
# plain lowercase host. Both should normalize to the same key.
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://Peer-A.Example/=secretA",
)
assert _lookup_per_peer_secret("https://peer-a.example") == "secretA"
def test_whitespace_around_entries_is_stripped(self, monkeypatch):
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv(
"MESH_PEER_SECRETS",
" https://peer-a.example = secretA , https://peer-b.example=secretB ",
)
assert _lookup_per_peer_secret("https://peer-a.example") == "secretA"
assert _lookup_per_peer_secret("https://peer-b.example") == "secretB"
def test_malformed_entries_are_skipped_not_raised(self, monkeypatch):
"""A garbled MESH_PEER_SECRETS value must NOT crash the resolver.
Bad entries are silently dropped; well-formed entries still work.
This is the "fail-forward, not loud" rule a typo in operator
config should not take the whole backend down."""
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"no_equals_sign,=missing_url,https://no.secret=,https://good.example=secretGood",
)
assert _lookup_per_peer_secret("https://good.example") == "secretGood"
# The malformed ones produce no entry (and don't poison the cache).
assert _lookup_per_peer_secret("https://no.secret") == ""
def test_cache_invalidates_on_env_change(self, monkeypatch):
"""A test (or operator) updating MESH_PEER_SECRETS must see the
new value immediately no process restart required."""
from services.mesh.mesh_crypto import _lookup_per_peer_secret
monkeypatch.setenv("MESH_PEER_SECRETS", "https://a.example=first")
assert _lookup_per_peer_secret("https://a.example") == "first"
monkeypatch.setenv("MESH_PEER_SECRETS", "https://a.example=second")
assert _lookup_per_peer_secret("https://a.example") == "second"
# ---------------------------------------------------------------------------
# resolve_peer_key_for_url — precedence + fallback
# ---------------------------------------------------------------------------
class TestResolvePeerKeyForUrl:
def setup_method(self):
from services.mesh import mesh_crypto
mesh_crypto._PEER_SECRETS_CACHE = {}
mesh_crypto._PEER_SECRETS_CACHE_RAW = ""
def _fake_settings(self, global_secret: str):
from unittest.mock import MagicMock
s = MagicMock()
s.MESH_PEER_PUSH_SECRET = global_secret
return s
def test_falls_back_to_global_when_no_per_peer_entry(self, monkeypatch):
"""Single-peer installs: MESH_PEER_SECRETS empty, MESH_PEER_PUSH_SECRET
set must keep working as before."""
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
monkeypatch.delenv("MESH_PEER_SECRETS", raising=False)
with monkeypatch.context() as m:
m.setattr(
"services.config.get_settings",
lambda: self._fake_settings("global-secret"),
)
key = resolve_peer_key_for_url("https://peer.example")
expected = _derive_peer_key("global-secret", "https://peer.example")
assert key == expected
assert len(key) == 32 # SHA-256 output
def test_per_peer_secret_takes_precedence_over_global(self, monkeypatch):
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-a.example=per-peer-a-secret",
)
with monkeypatch.context() as m:
m.setattr(
"services.config.get_settings",
lambda: self._fake_settings("global-secret"),
)
key = resolve_peer_key_for_url("https://peer-a.example")
expected_per_peer = _derive_peer_key(
"per-peer-a-secret", "https://peer-a.example"
)
expected_global = _derive_peer_key("global-secret", "https://peer-a.example")
assert key == expected_per_peer
assert key != expected_global
def test_unlisted_peer_uses_global_during_migration(self, monkeypatch):
"""Partial migration: peer A is in MESH_PEER_SECRETS, peer B is
not yet. Peer B must keep working under the global secret."""
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-a.example=per-peer-a-secret",
)
with monkeypatch.context() as m:
m.setattr(
"services.config.get_settings",
lambda: self._fake_settings("global-secret"),
)
key_a = resolve_peer_key_for_url("https://peer-a.example")
key_b = resolve_peer_key_for_url("https://peer-b.example")
expected_b = _derive_peer_key("global-secret", "https://peer-b.example")
assert key_b == expected_b
# Peer A's per-peer key must differ from peer B's global key
# (they're keyed by different secrets and different URLs).
assert key_a != key_b
def test_returns_empty_when_no_secret_available(self, monkeypatch):
from services.mesh.mesh_crypto import resolve_peer_key_for_url
monkeypatch.delenv("MESH_PEER_SECRETS", raising=False)
with monkeypatch.context() as m:
m.setattr(
"services.config.get_settings",
lambda: self._fake_settings(""),
)
key = resolve_peer_key_for_url("https://peer.example")
assert key == b""
def test_returns_empty_when_url_is_unparseable(self, monkeypatch):
from services.mesh.mesh_crypto import resolve_peer_key_for_url
with monkeypatch.context() as m:
m.setattr(
"services.config.get_settings",
lambda: self._fake_settings("global-secret"),
)
assert resolve_peer_key_for_url("") == b""
assert resolve_peer_key_for_url("not-a-url") == b""
assert resolve_peer_key_for_url(None) == b""
# ---------------------------------------------------------------------------
# The actual #256 attack: peer A cannot impersonate peer B
# ---------------------------------------------------------------------------
class TestCrossPeerImpersonationRefused:
"""The core regression: when MESH_PEER_SECRETS is configured, a peer
that knows ONLY the global secret (or a different peer's per-peer
secret) cannot produce a valid HMAC for another peer's URL."""
def setup_method(self):
from services.mesh import mesh_crypto
mesh_crypto._PEER_SECRETS_CACHE = {}
mesh_crypto._PEER_SECRETS_CACHE_RAW = ""
def _hmac(self, key: bytes, body: bytes) -> str:
return hmac.new(key, body, hashlib.sha256).hexdigest()
def test_peer_a_global_secret_cannot_forge_peer_b_hmac(self, monkeypatch):
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
from unittest.mock import MagicMock
# Receiver has BOTH the global secret AND a per-peer secret for B.
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-b.example=per-peer-b-secret",
)
settings = MagicMock()
settings.MESH_PEER_PUSH_SECRET = "global-secret"
monkeypatch.setattr(
"services.config.get_settings", lambda: settings
)
body = b'{"events": [{"id": 1}]}'
# Attacker (peer A) knows only the global secret. Tries to forge
# an HMAC claiming to be peer B.
attacker_key = _derive_peer_key("global-secret", "https://peer-b.example")
attacker_hmac = self._hmac(attacker_key, body)
# Receiver derives B's expected key from B's per-peer secret.
receiver_key = resolve_peer_key_for_url("https://peer-b.example")
expected_hmac = self._hmac(receiver_key, body)
# The forgery MUST NOT match.
assert attacker_hmac != expected_hmac
def test_peer_a_per_peer_secret_cannot_forge_peer_b_hmac(self, monkeypatch):
"""Even harder case: peer A has its OWN per-peer secret, but
still does not know peer B's per-peer secret, and so cannot
forge an HMAC for peer B."""
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
from unittest.mock import MagicMock
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-a.example=secretA,https://peer-b.example=secretB",
)
settings = MagicMock()
settings.MESH_PEER_PUSH_SECRET = ""
monkeypatch.setattr(
"services.config.get_settings", lambda: settings
)
body = b'{"events": [{"id": 99}]}'
# Attacker A tries to forge for B using its own secret (secretA).
attacker_key = _derive_peer_key("secretA", "https://peer-b.example")
attacker_hmac = self._hmac(attacker_key, body)
receiver_key = resolve_peer_key_for_url("https://peer-b.example")
expected_hmac = self._hmac(receiver_key, body)
assert attacker_hmac != expected_hmac
def test_legitimate_peer_b_request_verifies(self, monkeypatch):
"""Positive control: when peer B uses ITS per-peer secret and
claims to be itself, the receiver accepts the HMAC."""
from services.mesh.mesh_crypto import resolve_peer_key_for_url
from unittest.mock import MagicMock
monkeypatch.setenv(
"MESH_PEER_SECRETS",
"https://peer-b.example=secretB",
)
settings = MagicMock()
settings.MESH_PEER_PUSH_SECRET = ""
monkeypatch.setattr(
"services.config.get_settings", lambda: settings
)
body = b'{"events": [{"id": 7}]}'
# Peer B and the receiver both call resolve_peer_key_for_url.
sender_key = resolve_peer_key_for_url("https://peer-b.example")
receiver_key = resolve_peer_key_for_url("https://peer-b.example")
sender_hmac = self._hmac(sender_key, body)
expected_hmac = self._hmac(receiver_key, body)
assert sender_hmac == expected_hmac
def test_single_peer_install_zero_behavior_change(self, monkeypatch):
"""The "no UX hostility" guarantee: an install with the global
secret set and NO MESH_PEER_SECRETS entries must derive exactly
the same key as before this change."""
from services.mesh.mesh_crypto import (
resolve_peer_key_for_url,
_derive_peer_key,
)
from unittest.mock import MagicMock
monkeypatch.delenv("MESH_PEER_SECRETS", raising=False)
settings = MagicMock()
settings.MESH_PEER_PUSH_SECRET = "legacy-global-secret"
monkeypatch.setattr(
"services.config.get_settings", lambda: settings
)
# The legacy derivation that every prior call site used.
legacy_key = _derive_peer_key("legacy-global-secret", "https://peer.example")
# The new resolver, with no per-peer entries configured.
new_key = resolve_peer_key_for_url("https://peer.example")
assert new_key == legacy_key
@@ -0,0 +1,263 @@
"""Issues #243, #252, #253 (tg12): settings endpoints must not leak
operational posture to unauthenticated callers.
- **#243**: ``GET /api/settings/wormhole``, ``/api/settings/privacy-profile``,
and ``/api/settings/node`` were leaking transport choice, anonymous-mode
state, the named privacy profile, and node-participant state to any
unauthenticated caller. The fix tightens the redaction allowlists to
expose ONLY a bare "is this feature on?" boolean and gates node mode
behind authenticated reads.
- **#252**: ``GET /api/settings/news-feeds`` returned the operator's full
curated feed inventory (names + URLs) to anyone. Now gated on
local-operator.
- **#253**: ``GET /api/settings/timemachine`` returned whether archival
capture is enabled to anyone. Now gated on local-operator.
Auth model: ``require_local_operator`` allows loopback (Tauri shell),
the Docker bridge frontend container (via the hostname-bound trust from
PR #278), and any caller that presents the configured admin key.
Anonymous LAN or internet callers do NOT pass and either receive 403
(news-feeds, timemachine) or a redacted minimum (wormhole / node).
"""
from __future__ import annotations
from unittest.mock import patch, MagicMock
import pytest
from fastapi.testclient import TestClient
_ADMIN_KEY = "test-admin-key-for-round5-fixture-32+chars"
@pytest.fixture
def client():
"""TestClient with the private-lane transport middleware disabled.
Same shape as the oracle resolve fixture the mesh privacy
middleware returns 202 for ``/api/settings/*`` under TestClient
because Wormhole is not actually running. Patching out the tier
requirement lets requests reach the route's auth gate.
"""
import main
with patch("main._minimum_transport_tier", return_value=None):
yield TestClient(main.app, raise_server_exceptions=False)
# ---------------------------------------------------------------------------
# #243: Wormhole posture redaction
# ---------------------------------------------------------------------------
class TestWormholeSettingsRedaction:
"""``GET /api/settings/wormhole`` must NOT leak transport choice or
anonymous-mode state to unauthenticated callers."""
def _read_settings_payload(self):
return {
"enabled": True,
"transport": "tor_arti",
"anonymous_mode": True,
"privacy_profile": "high",
"socks_proxy": "socks5h://127.0.0.1:9050",
}
def test_anonymous_caller_sees_only_enabled_bool(self, client):
with (
patch("main.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("routers.wormhole.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("services.wormhole_settings.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get("/api/settings/wormhole")
assert r.status_code == 200
body = r.json()
# Only the bare "is Wormhole on?" boolean is exposed publicly.
assert "enabled" in body
assert body["enabled"] is True
# Posture fields the audit flagged must be absent.
assert "transport" not in body
assert "anonymous_mode" not in body
assert "privacy_profile" not in body
assert "socks_proxy" not in body
def test_authenticated_caller_sees_full_state(self, client):
with (
patch("main.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("routers.wormhole.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("services.wormhole_settings.read_wormhole_settings", return_value=self._read_settings_payload()),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get(
"/api/settings/wormhole",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
body = r.json()
# All fields visible when authenticated.
assert body["enabled"] is True
assert body["transport"] == "tor_arti"
assert body["anonymous_mode"] is True
assert body["privacy_profile"] == "high"
class TestPrivacyProfileRedaction:
"""``GET /api/settings/privacy-profile`` must NOT leak the named
profile to unauthenticated callers (the profile name itself
discloses operator intent)."""
def _payload(self):
return {
"enabled": True,
"transport": "tor_arti",
"anonymous_mode": True,
"privacy_profile": "high",
}
def test_anonymous_caller_sees_only_wormhole_enabled_bool(self, client):
with (
patch("main.read_wormhole_settings", return_value=self._payload()),
patch("routers.wormhole.read_wormhole_settings", return_value=self._payload()),
patch("services.wormhole_settings.read_wormhole_settings", return_value=self._payload()),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get("/api/settings/privacy-profile")
assert r.status_code == 200
body = r.json()
assert "wormhole_enabled" in body
assert body["wormhole_enabled"] is True
# The named profile, transport, and anonymous mode must NOT
# leak to anonymous callers.
assert "profile" not in body or body.get("profile") is None
assert "transport" not in body
assert "anonymous_mode" not in body
def test_authenticated_caller_sees_named_profile_and_transport(self, client):
with (
patch("main.read_wormhole_settings", return_value=self._payload()),
patch("routers.wormhole.read_wormhole_settings", return_value=self._payload()),
patch("services.wormhole_settings.read_wormhole_settings", return_value=self._payload()),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get(
"/api/settings/privacy-profile",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
body = r.json()
assert body["profile"] == "high"
assert body["wormhole_enabled"] is True
assert body["transport"] == "tor_arti"
assert body["anonymous_mode"] is True
class TestNodeSettingsRedaction:
"""``GET /api/settings/node`` must NOT disclose node_mode or
node_enabled to anonymous callers."""
def _node_data(self):
return {"some_node_field": "value"}
def test_anonymous_caller_sees_empty_stub(self, client):
with (
patch("services.node_settings.read_node_settings", return_value=self._node_data()),
patch("routers.admin._current_node_mode", return_value="participant"),
patch("routers.admin._participant_node_enabled", return_value=True),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get("/api/settings/node")
assert r.status_code == 200
body = r.json()
# No posture fields.
assert "node_mode" not in body
assert "node_enabled" not in body
assert "some_node_field" not in body
def test_authenticated_caller_sees_full_node_state(self, client):
with (
patch("services.node_settings.read_node_settings", return_value=self._node_data()),
patch("routers.admin._current_node_mode", return_value="participant"),
patch("routers.admin._participant_node_enabled", return_value=True),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get(
"/api/settings/node",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
body = r.json()
assert body["node_mode"] == "participant"
assert body["node_enabled"] is True
assert body["some_node_field"] == "value"
# ---------------------------------------------------------------------------
# #252: news-feeds auth gate
# ---------------------------------------------------------------------------
class TestNewsFeedsAuthGate:
def _fake_feeds(self):
return [
{"name": "Custom Internal", "url": "https://internal.example/rss", "weight": 5},
{"name": "Default News", "url": "https://news.example/rss", "weight": 3},
]
def test_anonymous_caller_rejected(self, client):
with (
patch("services.news_feed_config.get_feeds", return_value=self._fake_feeds()) as get_feeds,
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get("/api/settings/news-feeds")
assert r.status_code == 403
# Critically: the underlying config read must NOT have been performed
# (else the response body could leak the count via response timing).
assert get_feeds.call_count == 0
def test_authenticated_caller_sees_full_feed_inventory(self, client):
with (
patch("services.news_feed_config.get_feeds", return_value=self._fake_feeds()),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get(
"/api/settings/news-feeds",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
body = r.json()
assert len(body) == 2
assert body[0]["name"] == "Custom Internal"
assert body[0]["url"] == "https://internal.example/rss"
# ---------------------------------------------------------------------------
# #253: timemachine auth gate
# ---------------------------------------------------------------------------
class TestTimemachineAuthGate:
def test_anonymous_caller_rejected(self, client):
node_data = {"timemachine_enabled": True}
with (
patch("services.node_settings.read_node_settings", return_value=node_data),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get("/api/settings/timemachine")
assert r.status_code == 403
def test_authenticated_caller_sees_enabled_state(self, client):
node_data = {"timemachine_enabled": True}
with (
patch("services.node_settings.read_node_settings", return_value=node_data),
patch("auth._current_admin_key", return_value=_ADMIN_KEY),
):
r = client.get(
"/api/settings/timemachine",
headers={"X-Admin-Key": _ADMIN_KEY},
)
assert r.status_code == 200
body = r.json()
assert body["enabled"] is True
assert "storage_warning" in body
+9
View File
@@ -28,6 +28,15 @@ services:
- MESH_RELAY_PEERS=${MESH_RELAY_PEERS:-}
# Shared transport auth for operator peer push. Must be set to a unique secret per deployment.
- MESH_PEER_PUSH_SECRET=${MESH_PEER_PUSH_SECRET:-}
# Issue #256: optional per-peer HMAC secrets. Comma-separated
# `url=secret` pairs (no spaces). When a peer URL appears here, only
# the listed per-peer secret is accepted for it — the global
# MESH_PEER_PUSH_SECRET above is ignored for that specific URL. This
# closes the cross-peer impersonation surface for multi-peer fleets.
# Single-peer installs leave this empty (default) for unchanged
# behavior. Both sides of a peering must agree on the per-peer
# secret for a given URL.
- MESH_PEER_SECRETS=${MESH_PEER_SECRETS:-}
# Meshtastic MQTT is opt-in to avoid passive load on the public broker.
# Set MESH_MQTT_ENABLED=true in .env only when this node should join live MQTT.
- MESH_MQTT_ENABLED=${MESH_MQTT_ENABLED:-false}
Generated
+11
View File
@@ -82,6 +82,7 @@ dependencies = [
{ name = "cachetools" },
{ name = "cloudscraper" },
{ name = "cryptography" },
{ name = "defusedxml" },
{ name = "fastapi" },
{ name = "feedparser" },
{ name = "httpx" },
@@ -120,6 +121,7 @@ requires-dist = [
{ name = "cachetools", specifier = "==5.5.2" },
{ name = "cloudscraper", specifier = "==1.2.71" },
{ name = "cryptography", specifier = ">=41.0.0" },
{ name = "defusedxml", specifier = ">=0.7.1" },
{ name = "fastapi", specifier = "==0.115.12" },
{ name = "feedparser", specifier = "==6.0.10" },
{ name = "httpx", specifier = "==0.28.1" },
@@ -600,6 +602,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/a4/87/d03a718e7bfdbbebaa4b6a66ba5bb069bc00a84e5ad176d8198cc785cd42/dbus_fast-4.0.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:f6af190d8306f1bd506740c39701f5c211aa31ac660a3fcb401ebb97d33166c7", size = 1627620, upload-time = "2026-02-01T21:05:46.878Z" },
]
[[package]]
name = "defusedxml"
version = "0.7.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/0f/d5/c66da9b79e5bdb124974bfe172b4daf3c984ebd9c2a06e2b8a4dc7331c72/defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69", size = 75520, upload-time = "2021-03-08T10:59:26.269Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" },
]
[[package]]
name = "deprecated"
version = "1.3.1"