mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-28 18:11:31 +02:00
76750caa92
== Per-install operator handle for every third-party API call ==
Before this PR, every Shadowbroker install identified itself to
Wikipedia, Wikidata, Nominatim, GDELT, OpenMHz, Broadcastify,
weather.gov, NUFORC, Sentinel/Planetary Computer, TinyGS / CelesTrak,
Shodan, Finnhub, and others with a single project-wide User-Agent
("Shadowbroker/1.0" or "ShadowBroker-OSINT/1.0"). From the upstream's
perspective every install in the world looked like one giant scraper.
If one install misbehaved, the upstream's only recourse was to block
"Shadowbroker" as a whole.
PR #284 inadvertently doubled down on this in the frontend by
introducing a shared `WIKIMEDIA_API_USER_AGENT` constant. This PR
retrofits both backends to per-operator attribution.
New setting: OPERATOR_HANDLE (env var / settings UI / auto-gen)
New helper: network_utils.outbound_user_agent("purpose")
The handle is auto-generated as "operator-XXXXXX" on first call (the
"shadow-" prefix from earlier drafts was deliberately dropped — too
suspicious-looking for abuse-detection systems). Operators can
override via OPERATOR_HANDLE; the value is sanitized to lowercase
alphanumeric+dash+underscore and capped at 48 chars. Persisted to
backend/data/operator_handle.json so it survives container restarts.
Retrofitted call sites (every previously-MONSTER User-Agent):
- services/region_dossier.py (Wikipedia + Wikidata + Nominatim)
- services/geocode.py (Nominatim)
- services/sentinel_search.py (Microsoft Planetary Computer)
- services/feed_ingester.py (operator-curated RSS feeds)
- services/fetchers/earth_observation.py (weather.gov, NUFORC)
- services/fetchers/infrastructure.py
- services/fetchers/aircraft_database.py
- services/fetchers/route_database.py
- services/fetchers/trains.py
- services/fetchers/meshtastic_map.py
- services/shodan_connector.py
- services/unusual_whales_connector.py (Finnhub)
- services/tinygs_fetcher.py (CelesTrak + TinyGS)
- services/sar/sar_products_client.py
- services/geopolitics.py (GDELT)
- services/radio_intercept.py (Broadcastify + OpenMHz)
- routers/cctv.py + main.py (CCTV proxy)
- routers/ai_intel.py
- scripts/convert_power_plants.py (release-time data refresh)
Spoofed browser UAs removed (issues #289 / #290 / #291 — tg12 audit):
- cloudscraper-based Chrome impersonation against api.openmhz.com
-> replaced with honest requests + per-install UA
- Mozilla/5.0 spoofed UA on Broadcastify scrape
-> replaced with honest UA
- Mozilla/5.0 + fake first-party Referer on OpenMHz audio relay
-> replaced with honest UA
- cloudscraper dependency dropped from pyproject.toml + uv.lock
Frontend retrofit:
- new GET /api/settings/operator-handle endpoint (local-operator
gated) returns the install's handle
- frontend/src/lib/wikimediaClient.ts fetches the handle once on
first use, caches it for page lifetime, embeds it in the
Api-User-Agent for every Wikipedia / Wikidata browser-direct call
== GDELT GCS-direct fix ==
GDELT's data.gdeltproject.org is a CNAME to a Google Cloud Storage
bucket. GCS responds with the wildcard *.storage.googleapis.com cert
which legitimately does NOT cover the GDELT custom domain, so Python's
TLS verification correctly refuses the connection. Some networks
happen to route through a path where this works; many (notably Docker
Desktop's outbound NAT on local installs) do not. Verified on the
maintainer's local install: GDELT was unreachable; 1610 geopolitical
events / 48 export files were dropping silently.
Fix: services/geopolitics._gcs_direct_gdelt_url() rewrites any
data.gdeltproject.org URL to its GCS-direct equivalent
(storage.googleapis.com/data.gdeltproject.org/...) where the standard
GCS cert is genuinely valid. api.gdeltproject.org and every other host
are left untouched.
Confirmed live: backend log goes from
GDELT lastupdate failed: 500
to
Downloading 48 GDELT export files...
Downloaded 48/48 GDELT exports
GDELT parsed: 1610 conflict locations from 48 files
== Tests ==
backend/tests/test_per_operator_outbound_attribution.py (12 tests)
backend/tests/test_gdelt_gcs_direct_rewrite.py (6 tests)
backend/tests/test_region_dossier_wikimedia_ua.py (updated to
pin the helper + per-operator handle, not the old constant)
frontend/src/__tests__/utils/wikimediaClient.test.ts (rewritten
to mock /api/settings/operator-handle and assert per-operator UA)
Local: backend 114/114 security+audit+round7a suite green;
frontend 718/718 vitest suite green.
Credit: tg12 (external security audit, issues #289/#290/#291
relating to spoofed UAs); BigBodyCobain (operator-prefix call,
GDELT cloud-vs-local diagnosis).
381 lines
15 KiB
Python
381 lines
15 KiB
Python
import logging
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import shutil
|
|
import time
|
|
import threading
|
|
import uuid
|
|
import requests
|
|
from pathlib import Path
|
|
from urllib.parse import urlparse
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Reusable session with connection pooling and retry logic.
|
|
# Only retry once (total=1) to fail fast — the curl fallback is the real safety net.
|
|
_session = requests.Session()
|
|
_retry = Retry(total=1, backoff_factor=0.3, status_forcelist=[502, 503, 504])
|
|
_session.mount("https://", HTTPAdapter(max_retries=_retry, pool_maxsize=20))
|
|
_session.mount("http://", HTTPAdapter(max_retries=_retry, pool_maxsize=10))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Per-operator outbound identification
|
|
# ---------------------------------------------------------------------------
|
|
#
|
|
# Issues #289 / #290 / #291 and the retrofit of PR #284 (#218 / #219 / #220):
|
|
# every third-party API the backend calls used to identify itself with a
|
|
# single "Shadowbroker" aggregate User-Agent. From the upstream's
|
|
# perspective, that meant every Shadowbroker install in the world looked
|
|
# like one giant entity hammering them. If one install misbehaved, the
|
|
# upstream's only recourse was to block "Shadowbroker" as a whole — which
|
|
# would take out every other install too.
|
|
#
|
|
# Fix: give each install a stable pseudonymous handle and include it in
|
|
# the User-Agent. Now an upstream can rate-limit or block the offending
|
|
# operator without affecting anyone else.
|
|
#
|
|
# The handle:
|
|
#
|
|
# - Is auto-generated on first call if no `OPERATOR_HANDLE` is configured
|
|
# (looks like "operator-7f3a92" — 6 hex chars from uuid4()).
|
|
# - Is persisted to ``backend/data/operator_handle.json`` so it survives
|
|
# restarts. Under Docker compose that file lives in the volume mount
|
|
# alongside `carrier_cache.json` and the other persistent state.
|
|
# - Can be overridden by the operator via the `OPERATOR_HANDLE` setting
|
|
# (env var or settings UI). Operators with their own GitHub handle,
|
|
# organization name, etc. can use that for traceability.
|
|
# - Is NEVER mixed into mesh / Wormhole / Infonet identity. This layer is
|
|
# strictly for public third-party API attribution.
|
|
|
|
_SHADOWBROKER_VERSION = "0.9"
|
|
_OPERATOR_HANDLE_FILE = (
|
|
Path(__file__).parent.parent / "data" / "operator_handle.json"
|
|
)
|
|
_OPERATOR_HANDLE_CACHE: str = ""
|
|
_OPERATOR_HANDLE_LOCK = threading.Lock()
|
|
|
|
|
|
def _generate_operator_handle() -> str:
|
|
"""Produce a stable pseudonymous handle for first-launch installs.
|
|
|
|
Format: ``operator-7f3a92`` (6 hex chars from a fresh uuid4()).
|
|
Distinct per install. Carries no real-world identity by default —
|
|
operators who want one can override via ``OPERATOR_HANDLE``.
|
|
|
|
Note: the prefix is deliberately neutral. Earlier drafts used
|
|
``shadow-`` which, while accurate to the project name, looks
|
|
exactly like the kind of pattern a third-party abuse-detection
|
|
system would auto-block as suspicious. ``operator-`` describes
|
|
what the value actually is and doesn't pattern-match malware.
|
|
"""
|
|
return f"operator-{uuid.uuid4().hex[:6]}"
|
|
|
|
|
|
def _load_persisted_operator_handle() -> str:
|
|
"""Return the previously-saved handle from disk, or empty if none.
|
|
|
|
Reads ``backend/data/operator_handle.json`` if it exists. Any read
|
|
error returns empty so a fresh handle gets generated rather than
|
|
crashing the request.
|
|
"""
|
|
try:
|
|
if _OPERATOR_HANDLE_FILE.exists():
|
|
data = json.loads(_OPERATOR_HANDLE_FILE.read_text(encoding="utf-8"))
|
|
return str(data.get("handle", "") or "").strip()
|
|
except (OSError, json.JSONDecodeError, ValueError):
|
|
pass
|
|
return ""
|
|
|
|
|
|
def _persist_operator_handle(handle: str) -> None:
|
|
"""Atomically save the auto-generated handle so subsequent restarts
|
|
use the same one. Failure to persist is non-fatal — the request still
|
|
succeeds with the in-memory handle, we just may generate a different
|
|
one on the next process restart."""
|
|
try:
|
|
_OPERATOR_HANDLE_FILE.parent.mkdir(parents=True, exist_ok=True)
|
|
tmp = _OPERATOR_HANDLE_FILE.with_suffix(_OPERATOR_HANDLE_FILE.suffix + ".tmp")
|
|
tmp.write_text(
|
|
json.dumps({"handle": handle, "_meta": {
|
|
"purpose": "Per-install operator handle for outbound third-party API attribution.",
|
|
"see": "backend/services/network_utils.py:outbound_user_agent",
|
|
}}, indent=2),
|
|
encoding="utf-8",
|
|
)
|
|
os.replace(tmp, _OPERATOR_HANDLE_FILE)
|
|
except OSError as exc:
|
|
logger.debug("Could not persist operator_handle (continuing in-memory): %s", exc)
|
|
|
|
|
|
def get_operator_handle() -> str:
|
|
"""Return the stable per-install operator handle.
|
|
|
|
Resolution order:
|
|
1. ``OPERATOR_HANDLE`` setting (env var / settings UI) if non-empty.
|
|
2. Process-cached value from previous call this run.
|
|
3. Value persisted to ``operator_handle.json`` (from a previous run).
|
|
4. Newly generated pseudonymous handle, persisted to disk.
|
|
|
|
The handle is normalized: stripped of whitespace, lowercased,
|
|
non-alphanumeric chars (except ``-`` and ``_``) replaced with ``-``.
|
|
This both sanitizes any HTTP-header-unsafe characters AND prevents
|
|
the operator from impersonating real third-party projects via
|
|
inventive whitespace.
|
|
"""
|
|
global _OPERATOR_HANDLE_CACHE
|
|
with _OPERATOR_HANDLE_LOCK:
|
|
# 1. Configured override always wins.
|
|
configured = ""
|
|
try:
|
|
from services.config import get_settings
|
|
|
|
configured = str(getattr(get_settings(), "OPERATOR_HANDLE", "") or "").strip()
|
|
except Exception:
|
|
configured = ""
|
|
if configured:
|
|
return _normalize_handle(configured)
|
|
|
|
# 2. In-memory cache (fast path for repeated calls).
|
|
if _OPERATOR_HANDLE_CACHE:
|
|
return _OPERATOR_HANDLE_CACHE
|
|
|
|
# 3. On-disk handle from a previous run.
|
|
persisted = _load_persisted_operator_handle()
|
|
if persisted:
|
|
_OPERATOR_HANDLE_CACHE = _normalize_handle(persisted)
|
|
return _OPERATOR_HANDLE_CACHE
|
|
|
|
# 4. Generate, persist, return.
|
|
fresh = _generate_operator_handle()
|
|
_persist_operator_handle(fresh)
|
|
_OPERATOR_HANDLE_CACHE = fresh
|
|
return fresh
|
|
|
|
|
|
def _normalize_handle(raw: str) -> str:
|
|
"""Strip whitespace, lowercase, replace unsafe characters with dashes."""
|
|
safe = "".join(
|
|
ch if (ch.isalnum() or ch in "-_") else "-"
|
|
for ch in raw.strip().lower()
|
|
)
|
|
# Collapse runs of dashes and trim to a reasonable length so an
|
|
# operator can't make our outbound logs unreadable.
|
|
while "--" in safe:
|
|
safe = safe.replace("--", "-")
|
|
safe = safe.strip("-")
|
|
return safe[:48] if safe else "anonymous"
|
|
|
|
|
|
_CONTACT_URL = "https://github.com/BigBodyCobain/Shadowbroker/issues"
|
|
|
|
|
|
def outbound_user_agent(purpose: str = "") -> str:
|
|
"""Build a User-Agent for an outbound third-party HTTP request.
|
|
|
|
Returns something like::
|
|
|
|
Shadowbroker/0.9 (operator: shadow-7f3a92; purpose: wikipedia;
|
|
+https://github.com/BigBodyCobain/Shadowbroker/issues)
|
|
|
|
The ``purpose`` is optional but recommended — it tells the upstream
|
|
what feature of ours is making the call (``wikipedia``, ``openmhz``,
|
|
``nominatim``, etc.), which makes their logs and our complaints
|
|
actionable.
|
|
|
|
Every outbound call in the backend that previously sent a custom
|
|
User-Agent should call this helper instead. Centralizing here means:
|
|
- one place to change the contact URL,
|
|
- one place to bump the version on release,
|
|
- one place a Wikimedia / OpenMHz operator can reach to ask for
|
|
the project to back off, with a per-install handle so they can
|
|
target the specific install instead of the project as a whole.
|
|
"""
|
|
handle = get_operator_handle()
|
|
if purpose:
|
|
purpose_clean = _normalize_handle(purpose)
|
|
return (
|
|
f"Shadowbroker/{_SHADOWBROKER_VERSION} "
|
|
f"(operator: {handle}; purpose: {purpose_clean}; +{_CONTACT_URL})"
|
|
)
|
|
return (
|
|
f"Shadowbroker/{_SHADOWBROKER_VERSION} "
|
|
f"(operator: {handle}; +{_CONTACT_URL})"
|
|
)
|
|
|
|
|
|
def _reset_operator_handle_cache_for_tests() -> None:
|
|
"""Test-only: invalidate the in-memory cache so a test can set a
|
|
new ``OPERATOR_HANDLE`` env var and see it picked up immediately."""
|
|
global _OPERATOR_HANDLE_CACHE
|
|
with _OPERATOR_HANDLE_LOCK:
|
|
_OPERATOR_HANDLE_CACHE = ""
|
|
|
|
|
|
# Default outbound User-Agent. Retained for backwards compatibility with
|
|
# call sites that haven't been migrated to ``outbound_user_agent()`` yet.
|
|
# Operators who want full per-install attribution should set the
|
|
# ``OPERATOR_HANDLE`` setting and migrate call sites incrementally.
|
|
#
|
|
# Operators who run a public-facing relay can also override the whole UA
|
|
# string via the ``SHADOWBROKER_USER_AGENT`` env var. That override
|
|
# completely bypasses the per-operator helper; only use it if you know
|
|
# what you're doing.
|
|
DEFAULT_USER_AGENT = os.environ.get(
|
|
"SHADOWBROKER_USER_AGENT",
|
|
f"Shadowbroker/{_SHADOWBROKER_VERSION}",
|
|
)
|
|
|
|
# Find bash for curl fallback — Git bash's curl has the TLS features
|
|
# needed to pass CDN fingerprint checks (brotli, zstd, libpsl)
|
|
|
|
# Cache domains where requests fails — skip straight to curl for 5 minutes
|
|
_domain_fail_cache: dict[str, float] = {}
|
|
_DOMAIN_FAIL_TTL = 300 # 5 minutes
|
|
|
|
# Circuit breaker: track domains where BOTH requests AND curl fail
|
|
# If a domain failed completely within the last 2 minutes, skip it entirely
|
|
_circuit_breaker: dict[str, float] = {}
|
|
_CIRCUIT_BREAKER_TTL = 120 # 2 minutes
|
|
|
|
# Lock protecting _domain_fail_cache and _circuit_breaker mutations
|
|
_cb_lock = threading.Lock()
|
|
|
|
|
|
class UpstreamCircuitBreakerError(OSError):
|
|
"""Raised when a domain recently failed hard and is temporarily skipped."""
|
|
|
|
|
|
def _env_truthy(name: str) -> bool:
|
|
return str(os.getenv(name, "")).strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def external_curl_fallback_enabled() -> bool:
|
|
"""Return whether the backend may spawn an external curl process."""
|
|
if os.name != "nt":
|
|
return True
|
|
return _env_truthy("SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK")
|
|
|
|
|
|
class _DummyResponse:
|
|
"""Minimal response object matching requests.Response interface."""
|
|
def __init__(self, status_code, text):
|
|
self.status_code = status_code
|
|
self.text = text
|
|
self.content = text.encode('utf-8', errors='replace')
|
|
|
|
def json(self):
|
|
return json.loads(self.text)
|
|
|
|
def raise_for_status(self):
|
|
if self.status_code >= 400:
|
|
raise Exception(f"HTTP {self.status_code}: {self.text[:100]}")
|
|
|
|
|
|
def fetch_with_curl(url, method="GET", json_data=None, timeout=15, headers=None, follow_redirects=False):
|
|
"""Wrapper to bypass aggressive local firewall that blocks Python but permits curl.
|
|
|
|
Falls back to running curl through Git bash, which has the TLS features
|
|
(brotli, zstd, libpsl) needed to pass CDN fingerprint checks that block
|
|
both Python requests and the barebones Windows system curl.
|
|
"""
|
|
default_headers = {
|
|
"User-Agent": DEFAULT_USER_AGENT,
|
|
}
|
|
if headers:
|
|
default_headers.update(headers)
|
|
|
|
domain = urlparse(url).netloc
|
|
|
|
# Circuit breaker: if domain failed completely <2min ago, fail fast
|
|
with _cb_lock:
|
|
if domain in _circuit_breaker and (time.time() - _circuit_breaker[domain]) < _CIRCUIT_BREAKER_TTL:
|
|
raise UpstreamCircuitBreakerError(
|
|
f"Circuit breaker open for {domain} (failed <{_CIRCUIT_BREAKER_TTL}s ago)"
|
|
)
|
|
|
|
# Check if this domain recently failed with requests — skip straight to curl
|
|
with _cb_lock:
|
|
_skip_requests = domain in _domain_fail_cache and (time.time() - _domain_fail_cache[domain]) < _DOMAIN_FAIL_TTL
|
|
if not _skip_requests:
|
|
try:
|
|
# Use a short connect timeout (3s) so firewall blocks fail fast,
|
|
# but allow the full timeout for reading the response body.
|
|
req_timeout = (min(3, timeout), timeout)
|
|
if method == "POST":
|
|
res = _session.post(url, json=json_data, timeout=req_timeout, headers=default_headers)
|
|
else:
|
|
res = _session.get(url, timeout=req_timeout, headers=default_headers)
|
|
if res.status_code == 429:
|
|
logger.warning(f"Upstream rate limit hit for {url}; not bypassing with curl.")
|
|
return res
|
|
res.raise_for_status()
|
|
# Clear failure caches on success
|
|
with _cb_lock:
|
|
_domain_fail_cache.pop(domain, None)
|
|
_circuit_breaker.pop(domain, None)
|
|
return res
|
|
except (requests.RequestException, ConnectionError, TimeoutError, OSError) as e:
|
|
fallback = "falling back to curl" if external_curl_fallback_enabled() else "skipping external curl"
|
|
logger.warning(f"Python requests failed for {url} ({e}), {fallback}...")
|
|
with _cb_lock:
|
|
_domain_fail_cache[domain] = time.time()
|
|
|
|
# Curl fallback — reached from both _skip_requests and requests-exception paths
|
|
if not external_curl_fallback_enabled():
|
|
logger.warning(
|
|
"External curl fallback disabled on Windows for %s; set "
|
|
"SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=1 to opt in.",
|
|
domain,
|
|
)
|
|
with _cb_lock:
|
|
_circuit_breaker[domain] = time.time()
|
|
return _DummyResponse(500, "")
|
|
|
|
_CURL_PATH = shutil.which("curl") or "curl"
|
|
cmd = [_CURL_PATH, "-s", "-w", "\n%{http_code}"]
|
|
if follow_redirects:
|
|
cmd.append("-L")
|
|
for k, v in default_headers.items():
|
|
cmd += ["-H", f"{k}: {v}"]
|
|
if method == "POST" and json_data:
|
|
cmd += ["-X", "POST", "-H", "Content-Type: application/json",
|
|
"--data-binary", "@-"]
|
|
cmd.append(url)
|
|
|
|
try:
|
|
stdin_data = json.dumps(json_data) if (method == "POST" and json_data) else None
|
|
creationflags = 0
|
|
if os.name == "nt":
|
|
creationflags = (
|
|
getattr(subprocess, "CREATE_NO_WINDOW", 0)
|
|
| getattr(subprocess, "CREATE_NEW_PROCESS_GROUP", 0)
|
|
)
|
|
res = subprocess.run(
|
|
cmd, capture_output=True, text=True, timeout=timeout + 5,
|
|
input=stdin_data, encoding="utf-8", errors="replace",
|
|
creationflags=creationflags,
|
|
)
|
|
if res.returncode == 0 and (res.stdout or "").strip():
|
|
# Parse HTTP status code from -w output (last line)
|
|
lines = res.stdout.rstrip().rsplit("\n", 1)
|
|
body = lines[0] if len(lines) > 1 else res.stdout
|
|
http_code = int(lines[-1]) if len(lines) > 1 and lines[-1].strip().isdigit() else 200
|
|
if http_code < 400:
|
|
with _cb_lock:
|
|
_circuit_breaker.pop(domain, None) # Clear circuit breaker on success
|
|
return _DummyResponse(http_code, body)
|
|
else:
|
|
logger.error(f"curl fallback failed: exit={res.returncode} stderr={res.stderr[:200]}")
|
|
with _cb_lock:
|
|
_circuit_breaker[domain] = time.time()
|
|
return _DummyResponse(500, "")
|
|
except (subprocess.SubprocessError, ConnectionError, TimeoutError, OSError) as curl_e:
|
|
logger.error(f"curl fallback exception: {curl_e}")
|
|
with _cb_lock:
|
|
_circuit_breaker[domain] = time.time()
|
|
return _DummyResponse(500, "")
|