Add MKT opt-in on threat intercept, jittered market fetches, and Sentinel multi-scene dossier.

Operators enable Polymarket/Kalshi correlation from Global Threat Intercept with a consent dialog; polls use a jittered schedule separate from the slow tier. Right-click Sentinel imagery returns up to three signed scenes again.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-04 09:01:21 -06:00
parent 3ac8442e4b
commit 80a01275ff
12 changed files with 536 additions and 36 deletions
+9
View File
@@ -62,7 +62,16 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
# FIMI_ENABLED=false
#
# Polymarket + Kalshi — US political/election prediction markets.
# Default off; enable from Global Threat Intercept (MKT toggle) or set true here.
# PREDICTION_MARKETS_ENABLED=false
# When enabled, polls use a jittered schedule (not the fixed 5-minute slow tier):
# PREDICTION_MARKETS_INTERVAL_MINUTES=7
# PREDICTION_MARKETS_SCHEDULER_JITTER_S=240
# PREDICTION_MARKETS_INITIAL_DELAY_MAX_S=180
# PREDICTION_MARKETS_PRE_FETCH_JITTER_S=90
# PREDICTION_MARKETS_PROVIDER_GAP_JITTER_S=45
# MESH_POLYMARKET_PAGE_DELAY_JITTER_S=0.08
# MESH_KALSHI_PAGE_DELAY_JITTER_S=0.2
#
# Finnhub fallback / yfinance — financial market data.
# Set FINNHUB_API_KEY to enable Finnhub, or set FINANCIAL_ENABLED=true to allow
+62
View File
@@ -1,6 +1,7 @@
import asyncio
import logging
import math
import os
import threading
from typing import Any
from fastapi import APIRouter, Request, Response, Query, Depends
@@ -34,6 +35,10 @@ class LiveUamapOptInUpdate(BaseModel):
opted_in: bool
class PredictionMarketsOptInUpdate(BaseModel):
opted_in: bool
_LAST_VIEWPORT_UPDATE: tuple | None = None
_LAST_VIEWPORT_UPDATE_TS = 0.0
_VIEWPORT_UPDATE_LOCK = threading.Lock()
@@ -422,6 +427,63 @@ def _run_liveuamap_refresh() -> None:
logger.warning("LiveUAMap refresh after opt-in failed: %s", e)
@router.get("/api/prediction-markets/status", dependencies=[Depends(require_local_operator)])
async def api_prediction_markets_status():
"""Whether Polymarket/Kalshi fetches and news market correlation are enabled."""
from services.prediction_markets_settings import prediction_markets_status
return prediction_markets_status()
@router.post("/api/prediction-markets/opt-in", dependencies=[Depends(require_local_operator)])
@limiter.limit("10/minute")
async def api_prediction_markets_opt_in(body: PredictionMarketsOptInUpdate, request: Request):
"""Enable or disable prediction market fetches + intercept story correlation."""
from services.config import get_settings
from services.prediction_markets_settings import (
prediction_markets_status,
set_prediction_markets_ui_opt_in,
)
from routers.ai_intel import _write_env_value
set_prediction_markets_ui_opt_in(body.opted_in)
_write_env_value("PREDICTION_MARKETS_ENABLED", "true" if body.opted_in else "false")
os.environ["PREDICTION_MARKETS_ENABLED"] = "true" if body.opted_in else "false"
get_settings.cache_clear()
if body.opted_in:
threading.Thread(target=_run_prediction_markets_refresh, daemon=True).start()
else:
threading.Thread(target=_run_prediction_markets_disable, daemon=True).start()
return prediction_markets_status()
def _run_prediction_markets_refresh() -> None:
try:
from services.fetchers.prediction_markets import fetch_prediction_markets
from services.fetchers.news import fetch_news
fetch_prediction_markets()
fetch_news()
except Exception as e:
logger.warning("Prediction markets refresh after opt-in failed: %s", e)
def _run_prediction_markets_disable() -> None:
try:
from services.fetchers._store import _data_lock, _mark_fresh, latest_data
from services.fetchers.news import fetch_news
with _data_lock:
latest_data["prediction_markets"] = []
latest_data["trending_markets"] = []
_mark_fresh("prediction_markets")
fetch_news()
except Exception as e:
logger.warning("Prediction markets disable cleanup failed: %s", e)
@router.post("/api/layers", dependencies=[Depends(require_local_operator)])
@limiter.limit("30/minute")
async def update_layers(update: LayerUpdate, request: Request):
+22 -1
View File
@@ -19,6 +19,7 @@ import concurrent.futures
import json
import math
import os
import random
import threading
import time
from datetime import datetime, timedelta
@@ -405,7 +406,6 @@ def update_slow_data():
logger.info("Slow-tier data update starting...")
slow_funcs = [
fetch_news,
fetch_prediction_markets,
fetch_earthquakes,
fetch_firms_fires,
fetch_firms_country_fires,
@@ -747,6 +747,27 @@ def start_scheduler():
misfire_grace_time=120,
)
# Prediction markets — own jittered cadence (Polymarket/Kalshi clearnet egress).
# Kept off the fixed 5-minute slow tier so poll timing is less fingerprintable.
from services.fetchers.prediction_markets import fetch_prediction_markets
_pm_interval_m = max(5, int(os.environ.get("PREDICTION_MARKETS_INTERVAL_MINUTES", "7")))
_pm_jitter_s = max(0, int(os.environ.get("PREDICTION_MARKETS_SCHEDULER_JITTER_S", "240")))
_pm_initial_max_s = max(0, int(os.environ.get("PREDICTION_MARKETS_INITIAL_DELAY_MAX_S", "180")))
_pm_first_run = datetime.utcnow() + timedelta(
seconds=random.randint(30, max(30, _pm_initial_max_s))
)
_scheduler.add_job(
lambda: _run_task_with_health(fetch_prediction_markets, "fetch_prediction_markets"),
"interval",
minutes=_pm_interval_m,
jitter=_pm_jitter_s,
next_run_time=_pm_first_run,
id="prediction_markets",
max_instances=1,
misfire_grace_time=300,
)
# Weather alerts — every 5 minutes (time-critical, separate from slow tier)
_scheduler.add_job(
lambda: _run_task_with_health(fetch_weather_alerts, "fetch_weather_alerts"),
@@ -9,6 +9,7 @@ import json
import logging
import math
import os
import random
import threading
import time
from urllib.parse import urlencode
@@ -21,23 +22,34 @@ _prev_probabilities: dict[str, float] = {}
_market_cache = TTLCache(maxsize=1, ttl=300)
_POLYMARKET_PAGE_DELAY_S = float(os.environ.get("MESH_POLYMARKET_PAGE_DELAY_S", "0.02"))
_KALSHI_PAGE_DELAY_S = float(os.environ.get("MESH_KALSHI_PAGE_DELAY_S", "0.08"))
_POLYMARKET_PAGE_DELAY_JITTER_S = float(os.environ.get("MESH_POLYMARKET_PAGE_DELAY_JITTER_S", "0.08"))
_KALSHI_PAGE_DELAY_JITTER_S = float(os.environ.get("MESH_KALSHI_PAGE_DELAY_JITTER_S", "0.2"))
# Random delay before each full Polymarket+Kalshi cycle (decorrelates from other slow-tier jobs).
_PRE_FETCH_JITTER_S = float(os.environ.get("PREDICTION_MARKETS_PRE_FETCH_JITTER_S", "90"))
# Random pause between finishing Polymarket pagination and starting Kalshi.
_PROVIDER_GAP_JITTER_S = float(os.environ.get("PREDICTION_MARKETS_PROVIDER_GAP_JITTER_S", "45"))
_provider_pace_lock = threading.Lock()
_provider_last_request_at: dict[str, float] = {}
def prediction_markets_fetch_enabled() -> bool:
"""Return True only when the operator explicitly opts into Polymarket/Kalshi pulls."""
return str(os.environ.get("PREDICTION_MARKETS_ENABLED", "")).strip().lower() in {
"1",
"true",
"yes",
"on",
}
"""Return True when UI opt-in or PREDICTION_MARKETS_ENABLED enables pulls."""
from services.prediction_markets_settings import prediction_markets_fetch_enabled as _enabled
return _enabled()
def _pace_provider(provider: str, min_interval_s: float) -> None:
if min_interval_s <= 0:
return
jitter_s = (
_POLYMARKET_PAGE_DELAY_JITTER_S
if provider == "polymarket"
else _KALSHI_PAGE_DELAY_JITTER_S
if provider == "kalshi"
else 0.0
)
min_interval_s += random.uniform(0.0, jitter_s) if jitter_s > 0 else 0.0
with _provider_pace_lock:
now = time.monotonic()
wait_s = min_interval_s - (now - _provider_last_request_at.get(provider, 0.0))
@@ -47,6 +59,24 @@ def _pace_provider(provider: str, min_interval_s: float) -> None:
_provider_last_request_at[provider] = now
def _apply_pre_fetch_jitter() -> None:
if _PRE_FETCH_JITTER_S <= 0:
return
delay = random.uniform(0.0, _PRE_FETCH_JITTER_S)
if delay >= 1.0:
logger.debug("Prediction markets: pre-fetch jitter %.1fs", delay)
time.sleep(delay)
def _apply_provider_gap_jitter() -> None:
if _PROVIDER_GAP_JITTER_S <= 0:
return
delay = random.uniform(0.0, _PROVIDER_GAP_JITTER_S)
if delay >= 1.0:
logger.debug("Prediction markets: provider gap jitter %.1fs", delay)
time.sleep(delay)
def _finite_or_none(value):
try:
n = float(value)
@@ -750,7 +780,9 @@ def _merge_markets(poly_events: list[dict], kalshi_events: list[dict]) -> list[d
@cached(_market_cache)
def fetch_prediction_markets_raw() -> list[dict]:
"""Fetch and merge prediction markets from both sources. Cached 5 min."""
_apply_pre_fetch_jitter()
poly = _fetch_polymarket_events()
_apply_provider_gap_jitter()
kalshi = _fetch_kalshi_events()
merged = _merge_markets(poly, kalshi)
logger.info(
@@ -0,0 +1,81 @@
"""Operator opt-in for Polymarket/Kalshi outbound fetches (Global Threat Intercept)."""
from __future__ import annotations
import json
import logging
import os
import threading
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
_OPT_IN_FILE = Path(__file__).resolve().parent.parent / "data" / "prediction_markets_opt_in.json"
_OPT_IN_LOCK = threading.Lock()
def _env_flag(name: str) -> str:
return str(os.getenv(name, "")).strip().lower()
def get_prediction_markets_ui_opt_in() -> bool:
if not _OPT_IN_FILE.exists():
return False
try:
payload = json.loads(_OPT_IN_FILE.read_text(encoding="utf-8"))
return bool(payload.get("opted_in"))
except (OSError, json.JSONDecodeError, TypeError) as exc:
logger.warning("Prediction markets opt-in file unreadable: %s", exc)
return False
def set_prediction_markets_ui_opt_in(opted_in: bool) -> None:
_OPT_IN_FILE.parent.mkdir(parents=True, exist_ok=True)
with _OPT_IN_LOCK:
_OPT_IN_FILE.write_text(
json.dumps({"opted_in": bool(opted_in)}, indent=2),
encoding="utf-8",
)
def prediction_markets_env_forced_on() -> bool:
return _env_flag("PREDICTION_MARKETS_ENABLED") in {"1", "true", "yes", "on"}
def prediction_markets_env_forced_off() -> bool:
return _env_flag("PREDICTION_MARKETS_ENABLED") in {"0", "false", "no", "off"}
def prediction_markets_fetch_enabled() -> bool:
"""True when UI opt-in or env enables Polymarket/Kalshi pulls."""
if get_prediction_markets_ui_opt_in():
return True
return prediction_markets_env_forced_on()
def prediction_markets_status() -> dict[str, Any]:
ui_opted_in = get_prediction_markets_ui_opt_in()
env_on = prediction_markets_env_forced_on()
env_off = prediction_markets_env_forced_off()
env_override = None
if env_on:
env_override = "on"
elif env_off:
env_override = "off"
return {
"enabled": prediction_markets_fetch_enabled(),
"ui_opted_in": ui_opted_in,
"env_override": env_override,
"jitter": {
"scheduler_interval_minutes": int(
os.environ.get("PREDICTION_MARKETS_INTERVAL_MINUTES", "7")
),
"scheduler_jitter_seconds": int(
os.environ.get("PREDICTION_MARKETS_SCHEDULER_JITTER_S", "240")
),
"pre_fetch_jitter_seconds": float(
os.environ.get("PREDICTION_MARKETS_PRE_FETCH_JITTER_S", "90")
),
},
}
+44 -22
View File
@@ -26,6 +26,46 @@ def _planetary_user_agent() -> str:
return outbound_user_agent("sentinel2-planetary-computer")
def _sign_planetary_href(href: str) -> str:
"""Sign a Planetary Computer blob URL with a short-lived SAS token."""
if not href or "blob.core.windows.net" not in href:
return href
try:
account = href.split(".blob.core.windows.net")[0].split("//")[-1]
token_resp = requests.get(
f"https://planetarycomputer.microsoft.com/api/sas/v1/token/{account}",
timeout=5,
headers={"User-Agent": _planetary_user_agent()},
)
token_resp.raise_for_status()
token = token_resp.json().get("token", "")
if not token:
return href
sep = "&" if "?" in href else "?"
return f"{href}{sep}{token}"
except (requests.RequestException, ValueError, KeyError):
return href
def _scene_from_stac_feature(item: dict) -> dict:
assets = item.get("assets", {}) or {}
rendered = assets.get("rendered_preview") or {}
thumbnail = assets.get("thumbnail") or {}
props = item.get("properties", {}) or {}
thumb_href = _sign_planetary_href(thumbnail.get("href") or rendered.get("href") or "")
full_href = _sign_planetary_href(rendered.get("href") or thumbnail.get("href") or "")
return {
"found": True,
"scene_id": item.get("id"),
"datetime": props.get("datetime"),
"cloud_cover": props.get("eo:cloud_cover"),
"thumbnail_url": thumb_href or None,
"fullres_url": full_href or None,
"bbox": list(item.get("bbox", [])) if item.get("bbox") else None,
"platform": props.get("platform", "Sentinel-2"),
}
def _esri_imagery_fallback(lat: float, lng: float) -> dict:
lat_span = 0.18
lng_span = 0.24
@@ -53,14 +93,14 @@ def _esri_imagery_fallback(lat: float, lng: float) -> dict:
def search_sentinel2_scene(lat: float, lng: float) -> dict:
"""Search for the latest Sentinel-2 L2A scene covering a point."""
"""Search for up to 3 recent Sentinel-2 L2A scenes covering a point."""
cache_key = f"{round(lat, 2)}_{round(lng, 2)}"
if cache_key in _sentinel_cache:
return _sentinel_cache[cache_key]
try:
end = datetime.utcnow()
start = end - timedelta(days=30)
start = end - timedelta(days=60)
search_payload = {
"collections": ["sentinel-2-l2a"],
"intersects": {"type": "Point", "coordinates": [lng, lat]},
@@ -83,26 +123,8 @@ def search_sentinel2_scene(lat: float, lng: float) -> dict:
_sentinel_cache[cache_key] = result
return result
item = features[0]
assets = item.get("assets", {}) or {}
rendered = assets.get("rendered_preview") or {}
thumbnail = assets.get("thumbnail") or {}
# Full-res image URL — what opens when user clicks
fullres_url = rendered.get("href") or thumbnail.get("href")
# Thumbnail URL — what shows in the popup card
thumb_url = thumbnail.get("href") or rendered.get("href")
result = {
"found": True,
"scene_id": item.get("id"),
"datetime": item.get("properties", {}).get("datetime"),
"cloud_cover": item.get("properties", {}).get("eo:cloud_cover"),
"thumbnail_url": thumb_url,
"fullres_url": fullres_url,
"bbox": list(item.get("bbox", [])) if item.get("bbox") else None,
"platform": item.get("properties", {}).get("platform", "Sentinel-2"),
}
scenes = [_scene_from_stac_feature(item) for item in features[:3]]
result = {**scenes[0], "scenes": scenes}
_sentinel_cache[cache_key] = result
return result
@@ -0,0 +1,55 @@
"""Prediction market fetch timing uses jitter to reduce poll fingerprinting."""
from unittest.mock import MagicMock, patch
import pytest
from services.fetchers import prediction_markets as pm
@pytest.fixture(autouse=True)
def clear_market_cache():
pm._market_cache.clear()
yield
pm._market_cache.clear()
def test_pre_fetch_jitter_sleeps_when_configured(monkeypatch):
monkeypatch.setattr(pm, "_PRE_FETCH_JITTER_S", 10.0)
sleeps: list[float] = []
monkeypatch.setattr(pm.time, "sleep", lambda s: sleeps.append(s))
monkeypatch.setattr(pm.random, "uniform", lambda _a, _b: 4.5)
pm._apply_pre_fetch_jitter()
assert sleeps == [4.5]
def test_fetch_raw_applies_provider_gap(monkeypatch):
monkeypatch.setenv("PREDICTION_MARKETS_ENABLED", "true")
monkeypatch.setattr(pm, "_apply_pre_fetch_jitter", lambda: None)
gap_calls: list[int] = []
def _track_gap():
gap_calls.append(1)
monkeypatch.setattr(pm, "_apply_provider_gap_jitter", _track_gap)
monkeypatch.setattr(pm, "_fetch_polymarket_events", lambda: [])
monkeypatch.setattr(pm, "_fetch_kalshi_events", lambda: [])
monkeypatch.setattr(pm, "_merge_markets", lambda _p, _k: [])
pm.fetch_prediction_markets_raw()
assert gap_calls == [1]
def test_pace_provider_adds_per_page_jitter(monkeypatch):
monkeypatch.setattr(pm, "_POLYMARKET_PAGE_DELAY_JITTER_S", 1.0)
monkeypatch.setattr(pm, "_provider_last_request_at", {"polymarket": pm.time.monotonic()})
monkeypatch.setattr(pm.random, "uniform", lambda _a, _b: 0.5)
sleeps: list[float] = []
monkeypatch.setattr(pm.time, "sleep", lambda s: sleeps.append(s))
pm._pace_provider("polymarket", 0.02)
assert sleeps == [pytest.approx(0.52)]
@@ -0,0 +1,24 @@
"""UI opt-in for prediction markets (Global Threat Intercept)."""
from services import prediction_markets_settings as pm_settings
from services.fetchers import prediction_markets
def test_ui_opt_in_enables_fetch(monkeypatch, tmp_path):
opt_file = tmp_path / "prediction_markets_opt_in.json"
monkeypatch.setattr(pm_settings, "_OPT_IN_FILE", opt_file)
monkeypatch.delenv("PREDICTION_MARKETS_ENABLED", raising=False)
assert pm_settings.prediction_markets_fetch_enabled() is False
pm_settings.set_prediction_markets_ui_opt_in(True)
assert pm_settings.prediction_markets_fetch_enabled() is True
assert prediction_markets.prediction_markets_fetch_enabled() is True
def test_env_force_on_without_ui_file(monkeypatch, tmp_path):
opt_file = tmp_path / "prediction_markets_opt_in.json"
monkeypatch.setattr(pm_settings, "_OPT_IN_FILE", opt_file)
monkeypatch.setenv("PREDICTION_MARKETS_ENABLED", "true")
assert pm_settings.prediction_markets_fetch_enabled() is True
@@ -0,0 +1,74 @@
"""Right-click dossier returns up to 3 signed Sentinel-2 scenes."""
from unittest.mock import MagicMock, patch
import pytest
from services import sentinel_search as ss
@pytest.fixture(autouse=True)
def clear_sentinel_cache():
ss._sentinel_cache.clear()
yield
ss._sentinel_cache.clear()
def _stac_feature(scene_id: str, dt: str, cloud: float) -> dict:
href = f"https://sentinel2euwest.blob.core.windows.net/sentinel2-l2a/{scene_id}.tif"
return {
"id": scene_id,
"bbox": [0, 0, 1, 1],
"properties": {
"datetime": dt,
"eo:cloud_cover": cloud,
"platform": "Sentinel-2A",
},
"assets": {
"rendered_preview": {"href": href},
"thumbnail": {"href": href},
},
}
@patch("services.sentinel_search.requests.get")
@patch("services.sentinel_search.requests.post")
def test_search_returns_three_scenes(mock_post, mock_get):
mock_post.return_value = MagicMock(
ok=True,
raise_for_status=MagicMock(),
json=lambda: {
"features": [
_stac_feature("s1", "2026-05-28T10:00:00Z", 5.0),
_stac_feature("s2", "2026-05-20T10:00:00Z", 12.0),
_stac_feature("s3", "2026-05-10T10:00:00Z", 18.0),
],
},
)
mock_get.return_value = MagicMock(
ok=True,
raise_for_status=MagicMock(),
json=lambda: {"token": "sig=test"},
)
result = ss.search_sentinel2_scene(29.0, 51.0)
assert result["found"] is True
assert result["scene_id"] == "s1"
assert len(result["scenes"]) == 3
assert result["scenes"][1]["scene_id"] == "s2"
assert "sig=test" in (result["scenes"][0]["fullres_url"] or "")
@patch("services.sentinel_search.requests.post")
def test_search_esri_fallback_has_no_scenes(mock_post):
mock_post.return_value = MagicMock(
ok=True,
raise_for_status=MagicMock(),
json=lambda: {"features": []},
)
result = ss.search_sentinel2_scene(29.0, 51.0)
assert result["fallback"] is True
assert "scenes" not in result
@@ -45,8 +45,10 @@ def test_fimi_falsy_value_does_not_call_upstream(monkeypatch):
def test_prediction_markets_disabled_by_default(monkeypatch):
from services.fetchers import _store, prediction_markets
from services import prediction_markets_settings as pm_settings
monkeypatch.delenv("PREDICTION_MARKETS_ENABLED", raising=False)
monkeypatch.setattr(pm_settings, "get_prediction_markets_ui_opt_in", lambda: False)
monkeypatch.setitem(_store.latest_data, "prediction_markets", [{"id": "old"}])
monkeypatch.setattr(
prediction_markets, "fetch_prediction_markets_raw", _explode