fix(uap): enforce 60-day window, refresh daily, live NUFORC on Windows

Filter stale rows out of nuforc_recent_sightings.json on load; add requests-based
live scrape when curl is disabled; daily scheduler rebuild instead of weekly-only.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
BigBodyCobain
2026-06-03 18:27:30 -06:00
parent 363b5a49c8
commit 5f322b0a79
4 changed files with 225 additions and 91 deletions
+5 -1
View File
@@ -69,7 +69,11 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key
# the unauthenticated yfinance fallback to call Yahoo Finance.
# FINANCIAL_ENABLED=false
#
# NUFORC UAP sightings — huggingface.co dataset download.
# NUFORC UAP map layer — live scrape from nuforc.org (rolling window, default 60 days).
# NUFORC_RECENT_DAYS=60
# On Windows, live scrape uses Python requests by default; optional:
# SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=true
# NUFORC enrichment index (HF dataset) is separate — opt-in only:
# NUFORC_ENABLED=false
#
# News RSS aggregator — defaults ON. Set to "false" to disable all
+3 -5
View File
@@ -993,19 +993,17 @@ def start_scheduler():
misfire_grace_time=600,
)
# UAP sightings (NUFORC) — weekly on Mondays at 12:00 UTC. The layer is a
# rolling last-60-days digest; refreshing once a week is enough cadence
# for human-readable map exploration and keeps load on nuforc.org light.
# UAP sightings (NUFORC) — daily at 12:00 UTC. Rolling ~60-day window from
# live nuforc.org; disk cache is re-validated on every read.
_scheduler.add_job(
lambda: _run_task_with_health(
lambda: fetch_uap_sightings(force_refresh=True),
"fetch_uap_sightings",
),
"cron",
day_of_week="mon",
hour=12,
minute=0,
id="uap_sightings_weekly",
id="uap_sightings_daily",
max_instances=1,
misfire_grace_time=3600,
)
+185 -65
View File
@@ -692,7 +692,8 @@ _NUFORC_TILESET = "nuforc.cmm18aqea06bu1mmselhpnano-0ce5v"
_NUFORC_TOKEN = os.environ.get("NUFORC_MAPBOX_TOKEN", "").strip()
_NUFORC_RADIUS_M = 200_000 # 200 km query radius
_NUFORC_LIMIT = 50 # max features per tilequery call
_NUFORC_RECENT_DAYS = int(os.environ.get("NUFORC_RECENT_DAYS", "60"))
# Rolling window shown on the map (~2 calendar months). Override via NUFORC_RECENT_DAYS.
_NUFORC_RECENT_DAYS = max(1, int(os.environ.get("NUFORC_RECENT_DAYS", "60")))
_NUFORC_HF_FALLBACK_LIMIT = max(25, int(os.environ.get("NUFORC_HF_FALLBACK_LIMIT", "250")))
_NUFORC_HF_GEOCODE_LIMIT = max(25, int(os.environ.get("NUFORC_HF_GEOCODE_LIMIT", "150")))
_NUFORC_GEOCODE_WORKERS = max(1, int(os.environ.get("NUFORC_GEOCODE_WORKERS", "1")))
@@ -766,6 +767,35 @@ def _fetch_nuforc_tilequery(lng: float, lat: float) -> list[dict]:
return []
def _uap_cutoff_date_str() -> str:
return (datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)).strftime("%Y-%m-%d")
def _uap_sighting_date_str(sighting: dict) -> str | None:
"""Normalize a sighting row to YYYY-MM-DD for window filtering."""
from services.fetchers.nuforc_enrichment import _parse_date
raw = str(sighting.get("date_time") or sighting.get("occurred") or "").strip()
if not raw:
return None
parsed = _parse_date(raw)
if parsed:
return parsed
if len(raw) >= 10 and raw[4] == "-" and raw[7] == "-":
return raw[:10]
return None
def _filter_uap_sightings_recent(sightings: list[dict]) -> list[dict]:
"""Drop anything outside the rolling NUFORC_RECENT_DAYS window."""
cutoff = _uap_cutoff_date_str()
return [
sighting
for sighting in sightings
if (_uap_sighting_date_str(sighting) or "") >= cutoff
]
def _parse_nuforc_tile_date(value: str) -> datetime | None:
raw = str(value or "").strip()
if not raw:
@@ -804,17 +834,39 @@ def _load_nuforc_sightings_cache(*, force_refresh: bool = False) -> list[dict] |
return None
if (datetime.utcnow() - built_dt).total_seconds() > 86400:
return None
if raw.get("cutoff_days") != _NUFORC_RECENT_DAYS:
logger.info(
"UAP sightings: cache cutoff_days mismatch (%s != %s); rebuilding",
raw.get("cutoff_days"),
_NUFORC_RECENT_DAYS,
)
return None
sightings = raw.get("sightings")
if isinstance(sightings, list):
if len(sightings) <= 0:
logger.info("UAP sightings: cache is fresh but empty; rebuilding")
return None
filtered = _filter_uap_sightings_recent(sightings)
if not filtered:
logger.warning(
"UAP sightings: cache had %d rows but none within last %d days; rebuilding",
len(sightings),
_NUFORC_RECENT_DAYS,
)
return None
if len(filtered) < len(sightings):
logger.info(
"UAP sightings: dropped %d stale cached rows outside %d-day window",
len(sightings) - len(filtered),
_NUFORC_RECENT_DAYS,
)
logger.info(
"UAP sightings: loaded %d cached reports from %s",
len(sightings),
"UAP sightings: loaded %d cached reports from %s (within %d-day window)",
len(filtered),
built,
_NUFORC_RECENT_DAYS,
)
return sightings
return filtered
except Exception as e:
logger.warning("UAP sightings: cache load error: %s", e)
return None
@@ -828,6 +880,7 @@ def _save_nuforc_sightings_cache(sightings: list[dict]) -> None:
_NUFORC_DATA_DIR.mkdir(parents=True, exist_ok=True)
payload = {
"built": datetime.utcnow().isoformat(),
"cutoff_days": _NUFORC_RECENT_DAYS,
"count": len(sightings),
"sightings": sightings,
}
@@ -1035,27 +1088,128 @@ def _nuforc_months_for_window(days: int) -> list[str]:
return months
def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]:
"""Pull one month of NUFORC sightings via the live wpDataTables AJAX.
Returns a list of raw row dicts with the fields we care about:
id, occurred (YYYY-MM-DD), posted (YYYY-MM-DD), city, state, country,
shape_raw, summary, explanation. Empty list on any failure — caller
decides whether a failure is fatal.
"""
def _parse_nuforc_live_datatables_rows(raw_rows: list) -> list[dict]:
"""Parse wpDataTables ``data`` array into normalized row dicts."""
from services.fetchers.nuforc_enrichment import _parse_date
curl_bin = shutil.which("curl") or "curl"
out: list[dict] = []
for raw in raw_rows:
if not isinstance(raw, list) or len(raw) < 8:
continue
link_html = str(raw[0] or "")
occurred_raw = str(raw[1] or "")
city = str(raw[2] or "").strip()
state = str(raw[3] or "").strip()
country = str(raw[4] or "").strip()
shape_raw = (str(raw[5] or "").strip() or "Unknown")
summary = str(raw[6] or "").strip()
reported_raw = str(raw[7] or "")
explanation = str(raw[9] or "").strip() if len(raw) > 9 and raw[9] else ""
occurred_ymd = _parse_date(occurred_raw)
if not occurred_ymd:
continue
if not city and not state and not country:
continue
id_match = _NUFORC_LIVE_SIGHTING_ID_RE.search(link_html)
if id_match:
sighting_id = f"NUFORC-{id_match.group(1)}"
else:
digest = hashlib.sha1(
f"{occurred_ymd}|{city}|{state}|{summary}".encode("utf-8", "ignore")
).hexdigest()[:12]
sighting_id = f"NUFORC-{digest}"
if summary and len(summary) > 280:
summary = summary[:277] + "..."
if not summary:
summary = "Sighting reported"
out.append({
"id": sighting_id,
"occurred": occurred_ymd,
"posted": _parse_date(reported_raw) or occurred_ymd,
"city": city,
"state": state,
"country": country,
"shape_raw": shape_raw,
"summary": summary,
"explanation": explanation,
})
return out
def _nuforc_fetch_month_live_requests(yyyymm: str) -> list[dict]:
"""Live NUFORC month fetch via requests (Windows-safe when curl is disabled)."""
import requests
index_url = _NUFORC_LIVE_INDEX_URL.format(yyyymm=yyyymm)
ajax_url = _NUFORC_LIVE_AJAX_URL.format(yyyymm=yyyymm)
if not external_curl_fallback_enabled():
headers = {"User-Agent": _nuforc_live_user_agent()}
session = requests.Session()
session.headers.update(headers)
try:
index_res = session.get(index_url, timeout=60)
except requests.RequestException as e:
logger.warning("NUFORC live (requests): index fetch failed for %s: %s", yyyymm, e)
return []
if index_res.status_code != 200 or not index_res.text:
logger.warning(
"NUFORC live: external curl disabled on Windows for %s; "
"set SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=1 to opt in.",
"NUFORC live (requests): index HTTP %s for %s",
index_res.status_code,
yyyymm,
)
return []
nonce_match = _NUFORC_LIVE_NONCE_RE.search(index_res.text)
if not nonce_match:
logger.warning("NUFORC live (requests): wdtNonce not found for %s", yyyymm)
return []
nonce = nonce_match.group(1)
post_data = (
"draw=1"
"&columns%5B0%5D%5Bdata%5D=0&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=false"
"&columns%5B1%5D%5Bdata%5D=1&columns%5B1%5D%5Bsearchable%5D=true&columns%5B1%5D%5Borderable%5D=true"
"&order%5B0%5D%5Bcolumn%5D=1&order%5B0%5D%5Bdir%5D=desc"
"&start=0&length=-1"
"&search%5Bvalue%5D=&search%5Bregex%5D=false"
f"&wdtNonce={nonce}"
)
try:
ajax_res = session.post(
ajax_url,
data=post_data,
headers={
**headers,
"Referer": index_url,
"X-Requested-With": "XMLHttpRequest",
"Content-Type": "application/x-www-form-urlencoded",
},
timeout=120,
)
except requests.RequestException as e:
logger.warning("NUFORC live (requests): ajax failed for %s: %s", yyyymm, e)
return []
if ajax_res.status_code != 200 or not ajax_res.text:
logger.warning(
"NUFORC live (requests): ajax HTTP %s for %s",
ajax_res.status_code,
yyyymm,
)
return []
try:
payload = ajax_res.json()
except json.JSONDecodeError as e:
logger.warning("NUFORC live (requests): ajax JSON decode failed for %s: %s", yyyymm, e)
return []
return _parse_nuforc_live_datatables_rows(payload.get("data") or [])
def _nuforc_fetch_month_live_curl(yyyymm: str, cookie_jar: Path) -> list[dict]:
"""Pull one month of NUFORC sightings via curl + wpDataTables AJAX."""
curl_bin = shutil.which("curl") or "curl"
index_url = _NUFORC_LIVE_INDEX_URL.format(yyyymm=yyyymm)
ajax_url = _NUFORC_LIVE_AJAX_URL.format(yyyymm=yyyymm)
# Step 1: GET the month index to capture session cookies + fresh nonce.
try:
@@ -1125,65 +1279,27 @@ def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]:
logger.warning("NUFORC live: ajax JSON decode failed for %s: %s", yyyymm, e)
return []
raw_rows = payload.get("data") or []
out: list[dict] = []
for raw in raw_rows:
if not isinstance(raw, list) or len(raw) < 8:
continue
link_html = str(raw[0] or "")
occurred_raw = str(raw[1] or "")
city = str(raw[2] or "").strip()
state = str(raw[3] or "").strip()
country = str(raw[4] or "").strip()
shape_raw = (str(raw[5] or "").strip() or "Unknown")
summary = str(raw[6] or "").strip()
reported_raw = str(raw[7] or "")
explanation = str(raw[9] or "").strip() if len(raw) > 9 and raw[9] else ""
return _parse_nuforc_live_datatables_rows(payload.get("data") or [])
occurred_ymd = _parse_date(occurred_raw)
if not occurred_ymd:
continue
if not city and not state and not country:
continue
id_match = _NUFORC_LIVE_SIGHTING_ID_RE.search(link_html)
if id_match:
sighting_id = f"NUFORC-{id_match.group(1)}"
else:
digest = hashlib.sha1(
f"{occurred_ymd}|{city}|{state}|{summary}".encode("utf-8", "ignore")
).hexdigest()[:12]
sighting_id = f"NUFORC-{digest}"
if summary and len(summary) > 280:
summary = summary[:277] + "..."
if not summary:
summary = "Sighting reported"
out.append({
"id": sighting_id,
"occurred": occurred_ymd,
"posted": _parse_date(reported_raw) or occurred_ymd,
"city": city,
"state": state,
"country": country,
"shape_raw": shape_raw,
"summary": summary,
"explanation": explanation,
})
return out
def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]:
"""Pull one month of NUFORC sightings via live wpDataTables AJAX."""
if external_curl_fallback_enabled():
rows = _nuforc_fetch_month_live_curl(yyyymm, cookie_jar)
if rows:
return rows
return _nuforc_fetch_month_live_requests(yyyymm)
def _build_recent_uap_sightings() -> list[dict]:
"""Build the rolling 1-year UAP sightings layer from live NUFORC data.
"""Build the rolling UAP sightings layer from live NUFORC data.
Hits nuforc.org's public sub-index once per month in the window, drops
anything outside the exact day-precision cutoff, dedupes by sighting id,
geocodes city+state via the existing location cache, and returns rows
keyed to the same schema the frontend already renders.
"""
cutoff_dt = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)
cutoff_str = cutoff_dt.strftime("%Y-%m-%d")
cutoff_str = _uap_cutoff_date_str()
months = _nuforc_months_for_window(_NUFORC_RECENT_DAYS)
try:
@@ -1567,12 +1683,16 @@ def fetch_uap_sightings(*, force_refresh: bool = False):
live_error,
)
if sightings:
sightings = _filter_uap_sightings_recent(sightings)
with _data_lock:
latest_data["uap_sightings"] = sightings or []
if sightings:
_mark_fresh("uap_sightings")
return
# Unreachable legacy Mapbox tilequery path (kept for reference).
cutoff = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)
# Query the grid concurrently (up to 8 threads)
+32 -20
View File
@@ -24,6 +24,7 @@ These tests pin the new behavior:
from __future__ import annotations
import json
import logging
from datetime import datetime as real_datetime
@@ -225,28 +226,39 @@ def test_fetch_uap_sightings_succeeds_when_fallback_returns_data(monkeypatch):
assert canary_calls == [], "canary should not trip when fallback supplies data"
def test_uap_scheduler_runs_weekly_not_daily():
"""The cron job for the UAP layer must be configured for Mondays at
12:00 UTC, not daily. Daily was the pre-fix default; weekly matches
the layer's stated cadence (a rolling 60-day digest) and keeps load
on nuforc.org light."""
def test_uap_scheduler_runs_daily():
"""UAP layer refreshes daily so the rolling ~60-day window stays current."""
from services import data_fetcher
src = data_fetcher.__file__
with open(src, "r", encoding="utf-8") as f:
with open(data_fetcher.__file__, "r", encoding="utf-8") as f:
text = f.read()
# Anchor on the scheduler block by id, then assert the cron triggers.
assert "uap_sightings_weekly" in text, (
"scheduler id should be uap_sightings_weekly (was uap_sightings_daily pre-fix)"
)
# The day_of_week directive is the difference between daily and weekly.
# If somebody flips it back to daily, this fires.
weekly_block = text.split("uap_sightings_weekly", 1)[0]
# Walk backwards for the matching add_job call.
add_job_idx = weekly_block.rfind("add_job(")
assert add_job_idx >= 0, "could not locate add_job block for UAP scheduler"
job_block = text[add_job_idx : text.find(")", text.index("uap_sightings_weekly")) + 1]
assert 'day_of_week="mon"' in job_block, (
f"expected day_of_week='mon' in UAP scheduler block:\n{job_block}"
assert "uap_sightings_daily" in text
idx = text.index("uap_sightings_daily")
block = text[max(0, idx - 600) : idx + 80]
assert 'day_of_week="mon"' not in block
def test_uap_cache_rejects_stale_rows_on_load(tmp_path, monkeypatch):
"""Disk cache must not resurrect sightings outside the rolling window."""
from services.fetchers import earth_observation as eo
cache_file = tmp_path / "nuforc_recent_sightings.json"
monkeypatch.setattr(eo, "_NUFORC_SIGHTINGS_CACHE_FILE", cache_file)
monkeypatch.setattr(eo, "datetime", _FixedDateTime)
cache_file.write_text(
json.dumps({
"built": _FixedDateTime.utcnow().isoformat(),
"cutoff_days": 60,
"sightings": [
{"id": "NUFORC-old", "date_time": "2023-06-01", "lat": 39.0, "lng": -105.0},
{"id": "NUFORC-new", "date_time": "2026-04-20", "lat": 40.0, "lng": -104.0},
],
}),
encoding="utf-8",
)
loaded = eo._load_nuforc_sightings_cache()
assert loaded is not None
assert [s["id"] for s in loaded] == ["NUFORC-new"]