mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-27 01:22:27 +02:00
External security audit by @tg12 (May 17, 2026) filed 11 issues against the backend. PR #227 (May 18, AI-generated) closed seven of them by adding require_local_operator to control-plane endpoints. Four remained live; this PR closes the rest. #192 — CCTV proxy followed redirects without re-validating host Issue: /api/cctv/media validated only the caller-supplied URL host before passing it to requests.get(..., allow_redirects=True). A 302 to http://127.0.0.1 or any internal/disallowed host was silently followed, turning the proxy into an open-redirect-to-SSRF chain. Fix in routers/cctv.py: replace the single allow_redirects=True call with a manual follow loop. Each hop's Location is parsed, the host is rerun through _cctv_host_allowed(), and non-HTTP schemes (file://, ftp://, etc.) are rejected. Cap chain length at 5 hops. Test: backend/tests/test_cctv_redirect_ssrf.py covers - redirect to disallowed host -> 502 - redirect to localhost -> 502 - redirect to another allowed host -> 200 - redirect chain length cap - non-HTTP scheme rejected #198 — Gate introspection GETs were unauthenticated Issue: /api/wormhole/gate/{gate_id}/{identity,personas,key} were callable with no auth dependency. Any caller that could reach the backend could dump the operator's active persona, persona inventory, and key status for any gate_id they knew. The wiki's privacy threat model explicitly markets gate personas as rotating, unlinkable pseudonyms — this leak defeated that property. Fix in routers/wormhole.py: add dependencies=[Depends(require_local_operator)] to all three routes. Test: backend/tests/test_control_surface_auth.py extended with three new parameterized cases (lines 75-77). #199 — GDELT military incident ingestion used plaintext HTTP Issue: backend/services/geopolitics.py fetched http://data.gdeltproject.org/gdeltv2/lastupdate.txt and ~48 export archive URLs over plaintext HTTP. Passive observers could identify Shadowbroker nodes from the fetch pattern. Active MITM could inject doctored military incident records into the global map. Fix in services/geopolitics.py: rewrite the lastupdate.txt fetch and the export download URL constructor to use https://. GDELT's data.gdeltproject.org serves the same content over HTTPS. Test: backend/tests/test_gdelt_https.py asserts no plaintext HTTP URLs to data.gdeltproject.org remain in code (comments excluded) and that the HTTPS URLs we expect are present. #200 — Sentinel token cache lookup used client_id only Issue: routers/tools.py kept a process-global cache of Copernicus bearer tokens. The lookup compared _sh_token_cache["client_id"] == client_id. A caller who knew a valid client_id but supplied any wrong client_secret hit the cache and reused the legitimate caller's bearer token — burning their quota and accessing imagery on their account. Fix in routers/tools.py: replace the client_id field with credential_fp, an HMAC-SHA256 over (client_id, client_secret) under a per-process random key (_SH_TOKEN_CACHE_HMAC_KEY = os.urandom(32), regenerated at startup). A caller who doesn't know the secret cannot compute a matching fingerprint, so they miss the cache and hit the real Copernicus token endpoint — which will reject their wrong secret with a 401. Test: backend/tests/test_sentinel_token_cache.py covers - same client_id + different secrets => different fingerprints - same credentials => same fingerprint (cache still works) - different client_ids + same secret => different fingerprints - cache no longer stores raw client_id (catches regression) - attacker with wrong secret cannot reuse victim's token Validation pytest backend/tests/test_control_surface_auth.py backend/tests/test_cctv_redirect_ssrf.py backend/tests/test_gdelt_https.py backend/tests/test_sentinel_token_cache.py -> 37 passed Credit: @tg12 reported all four of these in their May 17 audit with correct line-number citations and accurate remediation recommendations. Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+58
-1
@@ -191,11 +191,68 @@ def _cctv_response_headers(resp, cache_seconds: int, include_length: bool = True
|
||||
return headers
|
||||
|
||||
|
||||
# Maximum number of redirects we'll follow on the CCTV upstream. Each hop is
|
||||
# re-validated against _cctv_host_allowed() before continuing, so this caps
|
||||
# the redirect-chain SSRF blast radius.
|
||||
_CCTV_MAX_REDIRECTS = 5
|
||||
|
||||
|
||||
def _fetch_cctv_upstream_response(request: Request, target_url: str, profile: _CCTVProxyProfile):
|
||||
"""Fetch an upstream CCTV URL, following redirects manually with host re-validation.
|
||||
|
||||
Why manual redirect following:
|
||||
The original code used ``allow_redirects=True``, which only validated
|
||||
the initial caller-supplied URL host against the allowlist. An attacker
|
||||
could submit an allowed host that 302-redirected to an internal address
|
||||
(e.g. ``http://localhost:8000/api/...`` or a private RFC1918 range),
|
||||
and the backend would dutifully follow and proxy the response — a
|
||||
classic open-redirect-to-SSRF chain.
|
||||
|
||||
With this loop, we re-run ``_cctv_host_allowed()`` on every hop's
|
||||
``Location`` header. A redirect to a host that isn't on the allowlist
|
||||
is rejected with 502 rather than silently followed.
|
||||
"""
|
||||
import requests as _req
|
||||
from urllib.parse import urlparse, urljoin
|
||||
|
||||
headers = _cctv_upstream_headers(request, profile)
|
||||
current_url = target_url
|
||||
hops = 0
|
||||
try:
|
||||
resp = _req.get(target_url, timeout=profile.timeout, stream=True, allow_redirects=True, headers=headers)
|
||||
while True:
|
||||
resp = _req.get(
|
||||
current_url,
|
||||
timeout=profile.timeout,
|
||||
stream=True,
|
||||
allow_redirects=False,
|
||||
headers=headers,
|
||||
)
|
||||
# Redirect handling — re-validate the next-hop host before following.
|
||||
if resp.is_redirect or resp.status_code in (301, 302, 303, 307, 308):
|
||||
location = resp.headers.get("Location", "")
|
||||
resp.close()
|
||||
if hops >= _CCTV_MAX_REDIRECTS:
|
||||
logger.warning(
|
||||
"CCTV upstream redirect chain exceeded limit [%s] %s",
|
||||
profile.name, target_url,
|
||||
)
|
||||
raise HTTPException(status_code=502, detail="Upstream redirect chain too long")
|
||||
if not location:
|
||||
raise HTTPException(status_code=502, detail="Upstream redirect missing Location")
|
||||
next_url = urljoin(current_url, location)
|
||||
next_parsed = urlparse(next_url)
|
||||
if next_parsed.scheme not in ("http", "https"):
|
||||
raise HTTPException(status_code=502, detail="Upstream redirect to non-HTTP scheme")
|
||||
if not _cctv_host_allowed(next_parsed.hostname):
|
||||
logger.warning(
|
||||
"CCTV upstream redirect to disallowed host [%s] %s -> %s",
|
||||
profile.name, current_url, next_url,
|
||||
)
|
||||
raise HTTPException(status_code=502, detail="Upstream redirect to disallowed host")
|
||||
current_url = next_url
|
||||
hops += 1
|
||||
continue
|
||||
break
|
||||
except _req.exceptions.Timeout as exc:
|
||||
logger.warning("CCTV upstream timeout [%s] %s", profile.name, target_url)
|
||||
raise HTTPException(status_code=504, detail="Upstream timeout") from exc
|
||||
|
||||
@@ -120,7 +120,36 @@ async def api_sentinel_token(request: Request):
|
||||
raise HTTPException(502, "Token request failed")
|
||||
|
||||
|
||||
_sh_token_cache: dict = {"token": None, "expiry": 0, "client_id": ""}
|
||||
# Cache key is an HMAC of (client_id, client_secret) — a caller cannot hit
|
||||
# this cache without knowing the same secret that originally populated it.
|
||||
# Without this binding, the lookup only checked client_id, so anyone who
|
||||
# knew a valid client_id could reuse another caller's cached token (and
|
||||
# burn their Copernicus quota / access tiles on their account).
|
||||
_sh_token_cache: dict = {"token": None, "expiry": 0, "credential_fp": ""}
|
||||
|
||||
|
||||
def _credential_fingerprint(client_id: str, client_secret: str) -> str:
|
||||
"""Return a stable, secret-binding fingerprint for the Sentinel cache key.
|
||||
|
||||
Uses HMAC-SHA256 so the raw secret is never stored in process memory as
|
||||
a cache key. The HMAC key is a per-process random value, which means the
|
||||
fingerprint cannot be precomputed across restarts (additional defense
|
||||
against an attacker who learned a valid client_id but not the secret).
|
||||
"""
|
||||
import hashlib
|
||||
import hmac
|
||||
|
||||
return hmac.new(
|
||||
_SH_TOKEN_CACHE_HMAC_KEY,
|
||||
f"{client_id}\x00{client_secret}".encode("utf-8"),
|
||||
hashlib.sha256,
|
||||
).hexdigest()
|
||||
|
||||
|
||||
# Per-process random HMAC key. Regenerated on each backend startup so cached
|
||||
# fingerprints don't survive restarts.
|
||||
import os as _os
|
||||
_SH_TOKEN_CACHE_HMAC_KEY = _os.urandom(32)
|
||||
|
||||
|
||||
@router.post("/api/sentinel/tile")
|
||||
@@ -146,7 +175,9 @@ async def api_sentinel_tile(request: Request):
|
||||
raise HTTPException(400, "client_id, client_secret, and date required")
|
||||
|
||||
now = _time.time()
|
||||
if (_sh_token_cache["token"] and _sh_token_cache["client_id"] == client_id
|
||||
credential_fp = _credential_fingerprint(client_id, client_secret)
|
||||
if (_sh_token_cache["token"]
|
||||
and _sh_token_cache["credential_fp"] == credential_fp
|
||||
and now < _sh_token_cache["expiry"] - 30):
|
||||
token = _sh_token_cache["token"]
|
||||
else:
|
||||
@@ -161,7 +192,7 @@ async def api_sentinel_tile(request: Request):
|
||||
token = tdata["access_token"]
|
||||
_sh_token_cache["token"] = token
|
||||
_sh_token_cache["expiry"] = now + tdata.get("expires_in", 300)
|
||||
_sh_token_cache["client_id"] = client_id
|
||||
_sh_token_cache["credential_fp"] = credential_fp
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception:
|
||||
|
||||
@@ -793,19 +793,19 @@ async def api_wormhole_gate_leave(request: Request, body: WormholeGateRequest):
|
||||
return leave_gate(str(body.gate_id or ""))
|
||||
|
||||
|
||||
@router.get("/api/wormhole/gate/{gate_id}/identity")
|
||||
@router.get("/api/wormhole/gate/{gate_id}/identity", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def api_wormhole_gate_identity(request: Request, gate_id: str):
|
||||
return get_active_gate_identity(gate_id)
|
||||
|
||||
|
||||
@router.get("/api/wormhole/gate/{gate_id}/personas")
|
||||
@router.get("/api/wormhole/gate/{gate_id}/personas", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def api_wormhole_gate_personas(request: Request, gate_id: str):
|
||||
return list_gate_personas(gate_id)
|
||||
|
||||
|
||||
@router.get("/api/wormhole/gate/{gate_id}/key")
|
||||
@router.get("/api/wormhole/gate/{gate_id}/key", dependencies=[Depends(require_local_operator)])
|
||||
@limiter.limit("30/minute")
|
||||
async def api_wormhole_gate_key_status(request: Request, gate_id: str):
|
||||
import main as _m
|
||||
|
||||
@@ -616,9 +616,12 @@ def fetch_global_military_incidents():
|
||||
try:
|
||||
logger.info("Fetching GDELT events via export CDN (multi-file)...")
|
||||
|
||||
# Get the latest export URL to determine current timestamp
|
||||
# Get the latest export URL to determine current timestamp.
|
||||
# HTTPS is used to prevent passive network observers from injecting
|
||||
# poisoned export records into the global incident map via MITM.
|
||||
# GDELT serves the same content over HTTPS as HTTP.
|
||||
index_res = fetch_with_curl(
|
||||
"http://data.gdeltproject.org/gdeltv2/lastupdate.txt", timeout=10
|
||||
"https://data.gdeltproject.org/gdeltv2/lastupdate.txt", timeout=10
|
||||
)
|
||||
if index_res.status_code != 200:
|
||||
logger.error(f"GDELT lastupdate failed: {index_res.status_code}")
|
||||
@@ -636,7 +639,9 @@ def fetch_global_military_incidents():
|
||||
logger.error("Could not find GDELT export URL")
|
||||
return []
|
||||
|
||||
# Extract timestamp from URL like: http://data.gdeltproject.org/gdeltv2/20260301120000.export.CSV.zip
|
||||
# Extract timestamp from URL like: https://data.gdeltproject.org/gdeltv2/20260301120000.export.CSV.zip
|
||||
# (GDELT's lastupdate.txt may still list URLs with http:// — we ignore
|
||||
# the scheme there and reconstruct each download URL as https:// below.)
|
||||
import re
|
||||
|
||||
ts_match = re.search(r"(\d{14})\.export\.CSV\.zip", latest_url)
|
||||
@@ -652,7 +657,7 @@ def fetch_global_military_incidents():
|
||||
for i in range(NUM_FILES):
|
||||
ts = latest_ts - timedelta(minutes=15 * i)
|
||||
fname = ts.strftime("%Y%m%d%H%M%S") + ".export.CSV.zip"
|
||||
url = f"http://data.gdeltproject.org/gdeltv2/{fname}"
|
||||
url = f"https://data.gdeltproject.org/gdeltv2/{fname}"
|
||||
urls.append(url)
|
||||
|
||||
logger.info(f"Downloading {len(urls)} GDELT export files...")
|
||||
|
||||
@@ -0,0 +1,119 @@
|
||||
"""Issue #192 (tg12): CCTV proxy must re-validate the host on every redirect hop.
|
||||
|
||||
Before this fix, the proxy validated only the initial caller-supplied URL
|
||||
host and then used ``requests.get(..., allow_redirects=True)``, which would
|
||||
silently follow a 302 to an arbitrary internal address — an open-redirect-
|
||||
to-SSRF chain.
|
||||
|
||||
These tests assert that:
|
||||
|
||||
1. A redirect to a disallowed host is rejected (502).
|
||||
2. A redirect to an allowed host is followed (200).
|
||||
3. The redirect chain length is bounded.
|
||||
"""
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from fastapi import HTTPException
|
||||
|
||||
from routers.cctv import _fetch_cctv_upstream_response, _CCTV_MAX_REDIRECTS
|
||||
|
||||
|
||||
class _Resp:
|
||||
"""Minimal mock for requests.Response that mimics what _fetch needs."""
|
||||
|
||||
def __init__(self, status_code=200, headers=None, is_redirect=False):
|
||||
self.status_code = status_code
|
||||
self.headers = headers or {}
|
||||
self.is_redirect = is_redirect
|
||||
self.closed = False
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
|
||||
|
||||
def _profile():
|
||||
"""Build a tiny _CCTVProxyProfile-shaped mock the function expects."""
|
||||
p = MagicMock()
|
||||
p.name = "test"
|
||||
p.timeout = 5
|
||||
p.cache_seconds = 60
|
||||
return p
|
||||
|
||||
|
||||
def _request():
|
||||
"""Build a tiny Request-shaped mock — only headers are read."""
|
||||
req = MagicMock()
|
||||
req.headers = {}
|
||||
return req
|
||||
|
||||
|
||||
@patch("routers.cctv._cctv_upstream_headers", return_value={})
|
||||
@patch("routers.cctv._cctv_host_allowed", side_effect=lambda host: host == "allowed.example")
|
||||
@patch("routers.cctv._req" if False else "requests.get") # patched below per-call
|
||||
def test_redirect_to_disallowed_host_is_rejected(mock_get, mock_allow, mock_headers):
|
||||
"""A 302 from allowed.example -> evil.example must be rejected with 502."""
|
||||
# First call: 302 with Location: http://evil.example/path
|
||||
mock_get.side_effect = [
|
||||
_Resp(status_code=302, headers={"Location": "http://evil.example/path"}, is_redirect=True),
|
||||
]
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
_fetch_cctv_upstream_response(_request(), "http://allowed.example/cam", _profile())
|
||||
assert exc_info.value.status_code == 502
|
||||
assert "disallowed host" in str(exc_info.value.detail).lower()
|
||||
|
||||
|
||||
@patch("routers.cctv._cctv_upstream_headers", return_value={})
|
||||
@patch("routers.cctv._cctv_host_allowed", side_effect=lambda host: host == "allowed.example")
|
||||
@patch("requests.get")
|
||||
def test_redirect_to_localhost_is_rejected(mock_get, mock_allow, mock_headers):
|
||||
"""A redirect to 127.0.0.1 (internal SSRF target) must be rejected."""
|
||||
mock_get.side_effect = [
|
||||
_Resp(status_code=302, headers={"Location": "http://127.0.0.1:8000/api/secret"}, is_redirect=True),
|
||||
]
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
_fetch_cctv_upstream_response(_request(), "http://allowed.example/cam", _profile())
|
||||
assert exc_info.value.status_code == 502
|
||||
|
||||
|
||||
@patch("routers.cctv._cctv_upstream_headers", return_value={})
|
||||
@patch("routers.cctv._cctv_host_allowed", side_effect=lambda host: host in {"allowed.example", "other-allowed.example"})
|
||||
@patch("requests.get")
|
||||
def test_redirect_to_another_allowed_host_is_followed(mock_get, mock_allow, mock_headers):
|
||||
"""A 302 from one allowed host to another allowed host should succeed."""
|
||||
mock_get.side_effect = [
|
||||
_Resp(status_code=302, headers={"Location": "http://other-allowed.example/cam"}, is_redirect=True),
|
||||
_Resp(status_code=200, headers={"Content-Type": "image/jpeg"}),
|
||||
]
|
||||
resp = _fetch_cctv_upstream_response(_request(), "http://allowed.example/cam", _profile())
|
||||
assert resp.status_code == 200
|
||||
|
||||
|
||||
@patch("routers.cctv._cctv_upstream_headers", return_value={})
|
||||
@patch("routers.cctv._cctv_host_allowed", return_value=True)
|
||||
@patch("requests.get")
|
||||
def test_redirect_chain_length_is_bounded(mock_get, mock_allow, mock_headers):
|
||||
"""A pathological redirect loop must terminate within _CCTV_MAX_REDIRECTS."""
|
||||
# Generate enough 302s to exceed the cap.
|
||||
mock_get.side_effect = [
|
||||
_Resp(status_code=302, headers={"Location": f"http://allowed.example/{i}"}, is_redirect=True)
|
||||
for i in range(_CCTV_MAX_REDIRECTS + 2)
|
||||
]
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
_fetch_cctv_upstream_response(_request(), "http://allowed.example/cam", _profile())
|
||||
assert exc_info.value.status_code == 502
|
||||
assert "too long" in str(exc_info.value.detail).lower()
|
||||
|
||||
|
||||
@patch("routers.cctv._cctv_upstream_headers", return_value={})
|
||||
@patch("routers.cctv._cctv_host_allowed", return_value=True)
|
||||
@patch("requests.get")
|
||||
def test_redirect_to_non_http_scheme_is_rejected(mock_get, mock_allow, mock_headers):
|
||||
"""A 302 to ``file://`` or ``ftp://`` must be rejected even if the host parses cleanly."""
|
||||
mock_get.side_effect = [
|
||||
_Resp(status_code=302, headers={"Location": "file:///etc/passwd"}, is_redirect=True),
|
||||
]
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
_fetch_cctv_upstream_response(_request(), "http://allowed.example/cam", _profile())
|
||||
assert exc_info.value.status_code == 502
|
||||
assert "non-http" in str(exc_info.value.detail).lower()
|
||||
@@ -70,6 +70,13 @@ import pytest
|
||||
"message": "test",
|
||||
},
|
||||
),
|
||||
# Issue #198 (tg12, May 17): three gate introspection GETs leak the
|
||||
# operator's active persona, persona inventory, and key status for
|
||||
# any gate_id an anonymous caller knows. Defeats the unlinkability
|
||||
# property documented in the privacy threat model.
|
||||
("get", "/api/wormhole/gate/general-talk/identity", None),
|
||||
("get", "/api/wormhole/gate/general-talk/personas", None),
|
||||
("get", "/api/wormhole/gate/general-talk/key", None),
|
||||
],
|
||||
)
|
||||
def test_remote_control_surface_rejects_without_local_operator_or_admin(
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
"""Issue #199 (tg12): GDELT military incident ingestion must use HTTPS.
|
||||
|
||||
The previous code fetched ``http://data.gdeltproject.org/gdeltv2/lastupdate.txt``
|
||||
and ~48 export archives over plaintext HTTP, which let a passive observer
|
||||
identify Shadowbroker nodes by their fetch pattern and let an active MITM
|
||||
inject doctored export records into the global incident map.
|
||||
|
||||
These tests assert the URL constants and outbound URL constructor in
|
||||
``services/geopolitics.py`` only use HTTPS.
|
||||
"""
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
_GEOPOLITICS_SRC = Path(__file__).resolve().parent.parent / "services" / "geopolitics.py"
|
||||
|
||||
|
||||
def _read_source() -> str:
|
||||
return _GEOPOLITICS_SRC.read_text(encoding="utf-8")
|
||||
|
||||
|
||||
def test_geopolitics_does_not_use_plaintext_http_for_gdelt():
|
||||
"""No string literal in geopolitics.py should fetch GDELT over plaintext HTTP."""
|
||||
src = _read_source()
|
||||
# Strings that would issue an HTTP request — comments are excluded because
|
||||
# comments include "http://" in example URLs even after the fix.
|
||||
code_lines = [
|
||||
ln for ln in src.split("\n")
|
||||
if "http://data.gdeltproject.org" in ln and not ln.lstrip().startswith("#")
|
||||
]
|
||||
assert code_lines == [], (
|
||||
"Found plaintext http://data.gdeltproject.org usage in geopolitics.py:\n"
|
||||
+ "\n".join(code_lines)
|
||||
)
|
||||
|
||||
|
||||
def test_geopolitics_uses_https_for_gdelt():
|
||||
"""The HTTPS URLs we expect must be present."""
|
||||
src = _read_source()
|
||||
assert "https://data.gdeltproject.org/gdeltv2/lastupdate.txt" in src
|
||||
# The download URL is constructed via f-string with {fname}
|
||||
assert re.search(
|
||||
r'https://data\.gdeltproject\.org/gdeltv2/\{fname\}', src
|
||||
), "expected https URL template for individual GDELT export downloads"
|
||||
@@ -0,0 +1,59 @@
|
||||
"""Issue #200 (tg12): Sentinel token cache must require knowledge of the
|
||||
client secret to hit, not just client_id.
|
||||
|
||||
Before this fix, the cache lookup was ``_sh_token_cache["client_id"] ==
|
||||
client_id``. A caller who knew a valid client_id but supplied any secret
|
||||
would hit the cache and reuse the original caller's bearer token, burning
|
||||
their Copernicus quota and accessing imagery on their account.
|
||||
|
||||
After the fix, the cache key is an HMAC of ``(client_id, client_secret)``
|
||||
under a per-process random key, so two callers with the same client_id but
|
||||
different secrets compute different fingerprints and miss each other's
|
||||
cache entries.
|
||||
"""
|
||||
from routers.tools import _credential_fingerprint, _sh_token_cache
|
||||
|
||||
|
||||
def test_same_client_id_different_secrets_yield_different_fingerprints():
|
||||
fp_a = _credential_fingerprint("client-id-X", "secret-A")
|
||||
fp_b = _credential_fingerprint("client-id-X", "secret-B")
|
||||
assert fp_a != fp_b
|
||||
|
||||
|
||||
def test_same_credentials_yield_same_fingerprint():
|
||||
"""The cache is still useful — same caller hits its own entry."""
|
||||
fp1 = _credential_fingerprint("client-id-X", "secret-A")
|
||||
fp2 = _credential_fingerprint("client-id-X", "secret-A")
|
||||
assert fp1 == fp2
|
||||
|
||||
|
||||
def test_different_client_ids_yield_different_fingerprints():
|
||||
fp_a = _credential_fingerprint("client-id-A", "shared-secret")
|
||||
fp_b = _credential_fingerprint("client-id-B", "shared-secret")
|
||||
assert fp_a != fp_b
|
||||
|
||||
|
||||
def test_cache_lookup_key_field_renamed():
|
||||
"""Catch accidental reintroduction of the client_id-only lookup."""
|
||||
# If a future commit re-adds `_sh_token_cache["client_id"]` we want this
|
||||
# test to fail loudly. The new schema only stores `credential_fp`.
|
||||
assert "client_id" not in _sh_token_cache
|
||||
assert "credential_fp" in _sh_token_cache
|
||||
|
||||
|
||||
def test_attacker_with_wrong_secret_misses_cache(monkeypatch):
|
||||
"""An attacker with valid client_id but wrong secret cannot hit the cache."""
|
||||
# Populate cache as if a legitimate caller just succeeded.
|
||||
legit_fp = _credential_fingerprint("legit-client", "legit-secret")
|
||||
_sh_token_cache["token"] = "VICTIM-BEARER-TOKEN"
|
||||
_sh_token_cache["credential_fp"] = legit_fp
|
||||
_sh_token_cache["expiry"] = 10**12 # far future
|
||||
|
||||
# Attacker arrives with the same client_id but the wrong secret.
|
||||
attacker_fp = _credential_fingerprint("legit-client", "wrong-secret")
|
||||
assert attacker_fp != legit_fp
|
||||
|
||||
# Reset cache for hygiene between tests.
|
||||
_sh_token_cache["token"] = None
|
||||
_sh_token_cache["credential_fp"] = ""
|
||||
_sh_token_cache["expiry"] = 0
|
||||
Reference in New Issue
Block a user