fix(uap): stop HF fallback from serving 3-year-old NUFORC sightings

The UAP sightings layer is sourced from a live scrape of nuforc.org with a
static Hugging Face CSV mirror (kcimc/NUFORC) as a fallback. The fallback
parsed every row, sorted by occurred-desc, and took the top 250 — with no
date cutoff. The HF mirror is a third-party snapshot that hasn't been
refreshed in years, so the "newest 250" rows it returns are from ~2022-23.
When the live path fails (Cloudflare 403, curl disabled on Windows, wdtNonce
regex stale, etc.) users see a map full of sightings from 3 years ago,
labeled as the "last 60 days" layer.

Changes:

* HF fallback now applies the same 60-day cutoff the live path uses. Rows
  outside the window are dropped before take-top-N. If the mirror has
  nothing inside the window the fallback returns [] (don't serve stale).
* When the HF mirror is fully stale a loud ERROR log fires with the count
  of dropped rows so the operator can tell the mirror's the problem, not
  a network issue.
* When BOTH live AND HF fallback produce 0 rows, fetch_uap_sightings now
  trips assert_canary("uap_sightings", 0) so the health registry shows
  the layer as broken instead of "fresh and empty for days."
* Scheduler moved from daily 12:00 UTC to weekly Mondays 12:00 UTC. The
  layer is a rolling 60-day digest; refreshing once a week is enough
  cadence for human-readable map exploration and keeps nuforc.org load
  light.

6 new tests cover the cutoff filter, the doomsday-log path, the mixed-age
path, the both-paths-empty health failure, the positive fallback path, and
the scheduler cadence.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
BigBodyCobain
2026-05-22 23:27:12 -06:00
parent febcce9125
commit 0d0e009867
3 changed files with 301 additions and 2 deletions
+5 -2
View File
@@ -960,16 +960,19 @@ def start_scheduler():
misfire_grace_time=600,
)
# UAP sightings (NUFORC) — daily at 12:00 UTC
# UAP sightings (NUFORC) — weekly on Mondays at 12:00 UTC. The layer is a
# rolling last-60-days digest; refreshing once a week is enough cadence
# for human-readable map exploration and keeps load on nuforc.org light.
_scheduler.add_job(
lambda: _run_task_with_health(
lambda: fetch_uap_sightings(force_refresh=True),
"fetch_uap_sightings",
),
"cron",
day_of_week="mon",
hour=12,
minute=0,
id="uap_sightings_daily",
id="uap_sightings_weekly",
max_instances=1,
misfire_grace_time=3600,
)
@@ -1383,10 +1383,21 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]:
This is a resilience fallback for local/Windows runs where nuforc.org is
Cloudflare-gated and the Mapbox token is not configured. It is not as fresh
as the live NUFORC AJAX feed, but it keeps the layer visible and cached.
Date-cutoff guard: the kcimc/NUFORC HF dataset is a static snapshot whose
maintainer refreshes it sporadically. Without a cutoff, sorting by
occurred-desc and taking the top N rows returns whatever the mirror's
newest rows happen to be — which can be years old if the snapshot is
stale. We apply the same ``_NUFORC_RECENT_DAYS`` window the live path
uses (60 days). If the HF mirror has nothing inside the window we return
``[]`` rather than silently serving 3-year-old "newest" rows.
"""
from services.fetchers.nuforc_enrichment import _HF_CSV_URL, _parse_date
from services.geocode_validate import coord_in_country
cutoff_dt = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)
cutoff_str = cutoff_dt.strftime("%Y-%m-%d")
try:
response = fetch_with_curl(_HF_CSV_URL, timeout=180, follow_redirects=True)
if not response or response.status_code != 200:
@@ -1400,6 +1411,7 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]:
return []
candidates: list[dict] = []
stale_rows_dropped = 0
try:
reader = csv.DictReader(io.StringIO(response.text))
for row in reader:
@@ -1410,6 +1422,9 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]:
)
if not occurred:
continue
if occurred < cutoff_str:
stale_rows_dropped += 1
continue
raw_location = _normalize_uap_location(
row.get("Location", "")
or row.get("City", "")
@@ -1444,6 +1459,19 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]:
logger.warning("UAP sightings: HF fallback parse failed: %s", e)
return []
if not candidates:
# HF mirror returned rows, but none inside the rolling window. This is
# the smoking gun for "the public HF dataset hasn't been refreshed in
# years" — log loudly so the operator sees it instead of guessing.
logger.error(
"UAP sightings: HF fallback yielded 0 rows within last %d days "
"(dropped %d stale rows). HF mirror is likely stale; the layer "
"will be empty until the live NUFORC path recovers.",
_NUFORC_RECENT_DAYS,
stale_rows_dropped,
)
return []
candidates.sort(key=lambda row: (row["occurred"], row["posted"], row["id"]), reverse=True)
candidates = candidates[:_NUFORC_HF_FALLBACK_LIMIT]
@@ -1515,13 +1543,29 @@ def fetch_uap_sightings(*, force_refresh: bool = False):
sightings = _load_nuforc_sightings_cache(force_refresh=force_refresh)
if sightings is None:
live_error: Exception | None = None
try:
sightings = _build_recent_uap_sightings()
except Exception as e:
live_error = e
logger.warning("UAP sightings: live NUFORC rebuild failed, using fallback: %s", e)
sightings = _build_uap_sightings_from_hf_mirror()
if sightings:
_save_nuforc_sightings_cache(sightings)
elif live_error is not None:
# Both paths failed: live raised AND HF fallback returned empty
# (either the HF mirror is stale beyond the cutoff or the network
# is gone entirely). The previous code silently set the layer to
# ``[]`` and kept marking it fresh; that masked the failure for
# days. Surface it via assert_canary so the health registry shows
# the layer as broken instead of "fresh and empty".
from services.slo import assert_canary
assert_canary("uap_sightings", 0)
logger.error(
"UAP sightings: both live NUFORC and HF fallback produced 0 "
"rows; layer is unavailable. Live error: %s",
live_error,
)
with _data_lock:
latest_data["uap_sightings"] = sightings or []
@@ -0,0 +1,252 @@
"""HF NUFORC fallback honors the rolling cutoff window.
Background
----------
The UAP sightings layer is sourced primarily from a live scrape of
nuforc.org. When that fails (Cloudflare 403, curl disabled on Windows,
wdtNonce regex stale, etc.) the code falls back to a static CSV mirror
hosted on Hugging Face at ``kcimc/NUFORC/nuforc_str.csv``.
The HF mirror is maintained by a third party and refreshed sporadically.
Pre-fix, the fallback parsed every row, sorted by ``occurred`` descending,
and took the top 250 — **with no date cutoff**. When the HF mirror is
stale (its "newest" rows are ~2-3 years old), users saw a map full of
2022-2023 sightings labeled as the "last 60 days" layer.
These tests pin the new behavior:
* Rows older than ``_NUFORC_RECENT_DAYS`` are dropped before the take-top-N.
* If the HF mirror has nothing in the window, the fallback returns ``[]``
and logs ERROR (don't silently serve stale data).
* ``fetch_uap_sightings`` records the failure when BOTH paths fail, so
the layer shows as broken in the health registry instead of "fresh".
"""
from __future__ import annotations
import logging
from datetime import datetime as real_datetime
class _FixedDateTime(real_datetime):
"""A datetime whose utcnow() returns a pinned value, for deterministic
cutoff math. Subclasses real datetime so existing operations still work."""
@classmethod
def utcnow(cls):
return cls(2026, 5, 1, 12, 0, 0)
class _StubResponse:
status_code = 200
def __init__(self, text: str):
self.text = text
def _stub_geocode_cache(*_args, **_kwargs):
"""Pre-populated location cache so the fallback doesn't try to hit
Photon during the test."""
return {
"Denver, CO, USA": [39.7392, -104.9903],
"Seattle, WA, USA": [47.6062, -122.3321],
"Phoenix, AZ, USA": [33.4484, -112.0740],
}
def test_hf_fallback_drops_rows_older_than_60_days(monkeypatch):
"""Pre-fix: a row from 2023 would make it into the layer if it was
among the newest 250 in the HF mirror. Post-fix: it's filtered out
before we even count to 250."""
from services.fetchers import earth_observation as eo
# 2026-05-01 - 60 days = 2026-03-02. So 2026-03-01 is one day too old.
csv_text = (
"Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n"
'1,2026-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2026-04-16,"In-window sighting"\n'
'2,2023-06-01 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2023-06-02,"Three years old"\n'
'3,2022-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2022-01-16,"Even older"\n'
)
monkeypatch.setattr(eo, "datetime", _FixedDateTime)
monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text))
monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache)
monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None)
# If the cutoff is missing, the geocoder may still get called for the
# 2022/2023 rows. We assert geocoder is NEVER invoked for stale rows.
geocode_calls: list[str] = []
def _geocode_spy(location, city, state, country=""):
geocode_calls.append(location)
return None # already in cache, shouldn't be hit anyway
monkeypatch.setattr(eo, "_geocode_uap_location", _geocode_spy)
sightings = eo._build_uap_sightings_from_hf_mirror()
ids = [s["id"] for s in sightings]
assert ids == ["NUFORC-1"], f"only the 2026 row should survive: got {ids}"
# Stale rows must not have been geocoded — they should be dropped
# before the geocoding loop is reached.
assert geocode_calls == []
def test_hf_fallback_returns_empty_when_mirror_is_fully_stale(monkeypatch, caplog):
"""The smoking-gun case: the HF mirror is so stale that NO rows are
within the rolling window. Pre-fix returned 250 ancient rows. Post-fix
returns ``[]`` and logs ERROR so the operator knows the layer is dead."""
from services.fetchers import earth_observation as eo
csv_text = (
"Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n"
'1,2023-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2023-04-16,"Old"\n'
'2,2022-06-01 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2022-06-02,"Older"\n'
'3,2021-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2021-01-16,"Ancient"\n'
)
monkeypatch.setattr(eo, "datetime", _FixedDateTime)
monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text))
monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache)
monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None)
monkeypatch.setattr(eo, "_geocode_uap_location", lambda *a, **kw: None)
with caplog.at_level(logging.ERROR, logger="services.fetchers.earth_observation"):
sightings = eo._build_uap_sightings_from_hf_mirror()
assert sightings == []
# The error log should mention how many stale rows were dropped so the
# operator can tell the mirror is the problem (not "we got 0 rows" which
# could also mean the download failed).
relevant = [r for r in caplog.records if "HF fallback yielded 0 rows" in r.getMessage()]
assert relevant, "expected loud ERROR when HF mirror is fully stale"
# The message should report the count of dropped stale rows.
assert any("dropped 3" in r.getMessage() for r in relevant)
def test_hf_fallback_still_returns_data_when_some_rows_are_in_window(monkeypatch):
"""Mixed-age mirror: some rows in the window, some not. The fallback
should return only the in-window rows and not log the doomsday ERROR."""
from services.fetchers import earth_observation as eo
csv_text = (
"Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n"
'1,2026-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2026-04-16,"Fresh"\n'
'2,2026-04-10 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2026-04-10,"Also fresh"\n'
'3,2020-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2020-01-16,"Ancient"\n'
)
monkeypatch.setattr(eo, "datetime", _FixedDateTime)
monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text))
monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache)
monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None)
monkeypatch.setattr(eo, "_geocode_uap_location", lambda *a, **kw: None)
sightings = eo._build_uap_sightings_from_hf_mirror()
ids = sorted(s["id"] for s in sightings)
assert ids == ["NUFORC-1", "NUFORC-2"], f"only in-window rows should appear: got {ids}"
def test_fetch_uap_sightings_marks_failure_when_both_paths_empty(monkeypatch, caplog):
"""When the live path raises AND the HF fallback returns empty,
``fetch_uap_sightings`` must:
* NOT mark the layer fresh (pre-fix bug: it did, so the layer
showed as healthy-but-empty for days)
* call ``assert_canary("uap_sightings", 0)`` so the health
registry surfaces the broken layer
* log an ERROR with the live-path exception for debugging
"""
from services.fetchers import earth_observation as eo
from services.fetchers import _store
monkeypatch.setattr(_store, "is_any_active", lambda layer: True)
monkeypatch.setattr(eo, "_load_nuforc_sightings_cache", lambda force_refresh=False: None)
def _boom():
raise RuntimeError("NUFORC live: zero rows pulled across 3 months")
monkeypatch.setattr(eo, "_build_recent_uap_sightings", _boom)
monkeypatch.setattr(eo, "_build_uap_sightings_from_hf_mirror", lambda: [])
marked: list[str] = []
monkeypatch.setattr(eo, "_mark_fresh", lambda *keys: marked.extend(keys))
canary_calls: list[tuple[str, int]] = []
import services.slo as slo
monkeypatch.setattr(
slo, "assert_canary", lambda key, value: canary_calls.append((key, int(value)))
)
with caplog.at_level(logging.ERROR, logger="services.fetchers.earth_observation"):
eo.fetch_uap_sightings()
assert marked == [], "broken layer must NOT be marked fresh"
assert canary_calls == [("uap_sightings", 0)], (
f"expected canary trip when both paths fail; got {canary_calls}"
)
# The live error message should propagate into the error log so the
# operator can tell live failed AND fallback was empty (not the other
# way around).
assert any(
"both live NUFORC and HF fallback" in r.getMessage()
for r in caplog.records
)
def test_fetch_uap_sightings_succeeds_when_fallback_returns_data(monkeypatch):
"""Positive path: live fails, fallback returns rows. The layer is
populated and marked fresh; assert_canary is NOT tripped (we only
trip the canary when the layer has zero data)."""
from services.fetchers import earth_observation as eo
from services.fetchers import _store
monkeypatch.setattr(_store, "is_any_active", lambda layer: True)
monkeypatch.setattr(eo, "_load_nuforc_sightings_cache", lambda force_refresh=False: None)
monkeypatch.setattr(
eo, "_build_recent_uap_sightings", lambda: (_ for _ in ()).throw(RuntimeError("live down"))
)
fallback_rows = [{"id": "NUFORC-fb-1", "date_time": "2026-04-20", "lat": 0.0, "lng": 0.0}]
monkeypatch.setattr(eo, "_build_uap_sightings_from_hf_mirror", lambda: fallback_rows)
monkeypatch.setattr(eo, "_save_nuforc_sightings_cache", lambda s: None)
marked: list[str] = []
monkeypatch.setattr(eo, "_mark_fresh", lambda *keys: marked.extend(keys))
canary_calls: list[tuple[str, int]] = []
import services.slo as slo
monkeypatch.setattr(
slo, "assert_canary", lambda key, value: canary_calls.append((key, int(value)))
)
eo.fetch_uap_sightings()
assert marked == ["uap_sightings"]
assert canary_calls == [], "canary should not trip when fallback supplies data"
def test_uap_scheduler_runs_weekly_not_daily():
"""The cron job for the UAP layer must be configured for Mondays at
12:00 UTC, not daily. Daily was the pre-fix default; weekly matches
the layer's stated cadence (a rolling 60-day digest) and keeps load
on nuforc.org light."""
from services import data_fetcher
src = data_fetcher.__file__
with open(src, "r", encoding="utf-8") as f:
text = f.read()
# Anchor on the scheduler block by id, then assert the cron triggers.
assert "uap_sightings_weekly" in text, (
"scheduler id should be uap_sightings_weekly (was uap_sightings_daily pre-fix)"
)
# The day_of_week directive is the difference between daily and weekly.
# If somebody flips it back to daily, this fires.
weekly_block = text.split("uap_sightings_weekly", 1)[0]
# Walk backwards for the matching add_job call.
add_job_idx = weekly_block.rfind("add_job(")
assert add_job_idx >= 0, "could not locate add_job block for UAP scheduler"
job_block = text[add_job_idx : text.find(")", text.index("uap_sightings_weekly")) + 1]
assert 'day_of_week="mon"' in job_block, (
f"expected day_of_week='mon' in UAP scheduler block:\n{job_block}"
)