diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index bdbefb0..727c112 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -960,16 +960,19 @@ def start_scheduler(): misfire_grace_time=600, ) - # UAP sightings (NUFORC) — daily at 12:00 UTC + # UAP sightings (NUFORC) — weekly on Mondays at 12:00 UTC. The layer is a + # rolling last-60-days digest; refreshing once a week is enough cadence + # for human-readable map exploration and keeps load on nuforc.org light. _scheduler.add_job( lambda: _run_task_with_health( lambda: fetch_uap_sightings(force_refresh=True), "fetch_uap_sightings", ), "cron", + day_of_week="mon", hour=12, minute=0, - id="uap_sightings_daily", + id="uap_sightings_weekly", max_instances=1, misfire_grace_time=3600, ) diff --git a/backend/services/fetchers/earth_observation.py b/backend/services/fetchers/earth_observation.py index 6712a3c..94c6fad 100644 --- a/backend/services/fetchers/earth_observation.py +++ b/backend/services/fetchers/earth_observation.py @@ -1383,10 +1383,21 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]: This is a resilience fallback for local/Windows runs where nuforc.org is Cloudflare-gated and the Mapbox token is not configured. It is not as fresh as the live NUFORC AJAX feed, but it keeps the layer visible and cached. + + Date-cutoff guard: the kcimc/NUFORC HF dataset is a static snapshot whose + maintainer refreshes it sporadically. Without a cutoff, sorting by + occurred-desc and taking the top N rows returns whatever the mirror's + newest rows happen to be — which can be years old if the snapshot is + stale. We apply the same ``_NUFORC_RECENT_DAYS`` window the live path + uses (60 days). If the HF mirror has nothing inside the window we return + ``[]`` rather than silently serving 3-year-old "newest" rows. """ from services.fetchers.nuforc_enrichment import _HF_CSV_URL, _parse_date from services.geocode_validate import coord_in_country + cutoff_dt = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS) + cutoff_str = cutoff_dt.strftime("%Y-%m-%d") + try: response = fetch_with_curl(_HF_CSV_URL, timeout=180, follow_redirects=True) if not response or response.status_code != 200: @@ -1400,6 +1411,7 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]: return [] candidates: list[dict] = [] + stale_rows_dropped = 0 try: reader = csv.DictReader(io.StringIO(response.text)) for row in reader: @@ -1410,6 +1422,9 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]: ) if not occurred: continue + if occurred < cutoff_str: + stale_rows_dropped += 1 + continue raw_location = _normalize_uap_location( row.get("Location", "") or row.get("City", "") @@ -1444,6 +1459,19 @@ def _build_uap_sightings_from_hf_mirror() -> list[dict]: logger.warning("UAP sightings: HF fallback parse failed: %s", e) return [] + if not candidates: + # HF mirror returned rows, but none inside the rolling window. This is + # the smoking gun for "the public HF dataset hasn't been refreshed in + # years" — log loudly so the operator sees it instead of guessing. + logger.error( + "UAP sightings: HF fallback yielded 0 rows within last %d days " + "(dropped %d stale rows). HF mirror is likely stale; the layer " + "will be empty until the live NUFORC path recovers.", + _NUFORC_RECENT_DAYS, + stale_rows_dropped, + ) + return [] + candidates.sort(key=lambda row: (row["occurred"], row["posted"], row["id"]), reverse=True) candidates = candidates[:_NUFORC_HF_FALLBACK_LIMIT] @@ -1515,13 +1543,29 @@ def fetch_uap_sightings(*, force_refresh: bool = False): sightings = _load_nuforc_sightings_cache(force_refresh=force_refresh) if sightings is None: + live_error: Exception | None = None try: sightings = _build_recent_uap_sightings() except Exception as e: + live_error = e logger.warning("UAP sightings: live NUFORC rebuild failed, using fallback: %s", e) sightings = _build_uap_sightings_from_hf_mirror() if sightings: _save_nuforc_sightings_cache(sightings) + elif live_error is not None: + # Both paths failed: live raised AND HF fallback returned empty + # (either the HF mirror is stale beyond the cutoff or the network + # is gone entirely). The previous code silently set the layer to + # ``[]`` and kept marking it fresh; that masked the failure for + # days. Surface it via assert_canary so the health registry shows + # the layer as broken instead of "fresh and empty". + from services.slo import assert_canary + assert_canary("uap_sightings", 0) + logger.error( + "UAP sightings: both live NUFORC and HF fallback produced 0 " + "rows; layer is unavailable. Live error: %s", + live_error, + ) with _data_lock: latest_data["uap_sightings"] = sightings or [] diff --git a/backend/tests/test_uap_hf_fallback_cutoff.py b/backend/tests/test_uap_hf_fallback_cutoff.py new file mode 100644 index 0000000..058f829 --- /dev/null +++ b/backend/tests/test_uap_hf_fallback_cutoff.py @@ -0,0 +1,252 @@ +"""HF NUFORC fallback honors the rolling cutoff window. + +Background +---------- +The UAP sightings layer is sourced primarily from a live scrape of +nuforc.org. When that fails (Cloudflare 403, curl disabled on Windows, +wdtNonce regex stale, etc.) the code falls back to a static CSV mirror +hosted on Hugging Face at ``kcimc/NUFORC/nuforc_str.csv``. + +The HF mirror is maintained by a third party and refreshed sporadically. +Pre-fix, the fallback parsed every row, sorted by ``occurred`` descending, +and took the top 250 — **with no date cutoff**. When the HF mirror is +stale (its "newest" rows are ~2-3 years old), users saw a map full of +2022-2023 sightings labeled as the "last 60 days" layer. + +These tests pin the new behavior: + +* Rows older than ``_NUFORC_RECENT_DAYS`` are dropped before the take-top-N. +* If the HF mirror has nothing in the window, the fallback returns ``[]`` + and logs ERROR (don't silently serve stale data). +* ``fetch_uap_sightings`` records the failure when BOTH paths fail, so + the layer shows as broken in the health registry instead of "fresh". +""" + +from __future__ import annotations + +import logging +from datetime import datetime as real_datetime + + +class _FixedDateTime(real_datetime): + """A datetime whose utcnow() returns a pinned value, for deterministic + cutoff math. Subclasses real datetime so existing operations still work.""" + + @classmethod + def utcnow(cls): + return cls(2026, 5, 1, 12, 0, 0) + + +class _StubResponse: + status_code = 200 + + def __init__(self, text: str): + self.text = text + + +def _stub_geocode_cache(*_args, **_kwargs): + """Pre-populated location cache so the fallback doesn't try to hit + Photon during the test.""" + return { + "Denver, CO, USA": [39.7392, -104.9903], + "Seattle, WA, USA": [47.6062, -122.3321], + "Phoenix, AZ, USA": [33.4484, -112.0740], + } + + +def test_hf_fallback_drops_rows_older_than_60_days(monkeypatch): + """Pre-fix: a row from 2023 would make it into the layer if it was + among the newest 250 in the HF mirror. Post-fix: it's filtered out + before we even count to 250.""" + from services.fetchers import earth_observation as eo + + # 2026-05-01 - 60 days = 2026-03-02. So 2026-03-01 is one day too old. + csv_text = ( + "Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n" + '1,2026-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2026-04-16,"In-window sighting"\n' + '2,2023-06-01 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2023-06-02,"Three years old"\n' + '3,2022-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2022-01-16,"Even older"\n' + ) + + monkeypatch.setattr(eo, "datetime", _FixedDateTime) + monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text)) + monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache) + monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None) + # If the cutoff is missing, the geocoder may still get called for the + # 2022/2023 rows. We assert geocoder is NEVER invoked for stale rows. + geocode_calls: list[str] = [] + + def _geocode_spy(location, city, state, country=""): + geocode_calls.append(location) + return None # already in cache, shouldn't be hit anyway + + monkeypatch.setattr(eo, "_geocode_uap_location", _geocode_spy) + + sightings = eo._build_uap_sightings_from_hf_mirror() + + ids = [s["id"] for s in sightings] + assert ids == ["NUFORC-1"], f"only the 2026 row should survive: got {ids}" + # Stale rows must not have been geocoded — they should be dropped + # before the geocoding loop is reached. + assert geocode_calls == [] + + +def test_hf_fallback_returns_empty_when_mirror_is_fully_stale(monkeypatch, caplog): + """The smoking-gun case: the HF mirror is so stale that NO rows are + within the rolling window. Pre-fix returned 250 ancient rows. Post-fix + returns ``[]`` and logs ERROR so the operator knows the layer is dead.""" + from services.fetchers import earth_observation as eo + + csv_text = ( + "Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n" + '1,2023-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2023-04-16,"Old"\n' + '2,2022-06-01 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2022-06-02,"Older"\n' + '3,2021-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2021-01-16,"Ancient"\n' + ) + + monkeypatch.setattr(eo, "datetime", _FixedDateTime) + monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text)) + monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache) + monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None) + monkeypatch.setattr(eo, "_geocode_uap_location", lambda *a, **kw: None) + + with caplog.at_level(logging.ERROR, logger="services.fetchers.earth_observation"): + sightings = eo._build_uap_sightings_from_hf_mirror() + + assert sightings == [] + # The error log should mention how many stale rows were dropped so the + # operator can tell the mirror is the problem (not "we got 0 rows" which + # could also mean the download failed). + relevant = [r for r in caplog.records if "HF fallback yielded 0 rows" in r.getMessage()] + assert relevant, "expected loud ERROR when HF mirror is fully stale" + # The message should report the count of dropped stale rows. + assert any("dropped 3" in r.getMessage() for r in relevant) + + +def test_hf_fallback_still_returns_data_when_some_rows_are_in_window(monkeypatch): + """Mixed-age mirror: some rows in the window, some not. The fallback + should return only the in-window rows and not log the doomsday ERROR.""" + from services.fetchers import earth_observation as eo + + csv_text = ( + "Sighting,Occurred,Location,Shape,Duration,Posted,Summary\n" + '1,2026-04-15 21:00:00 Local,"Denver, CO, USA",Triangle,5 minutes,2026-04-16,"Fresh"\n' + '2,2026-04-10 21:00:00 Local,"Seattle, WA, USA",Light,30 seconds,2026-04-10,"Also fresh"\n' + '3,2020-01-15 20:00:00 Local,"Phoenix, AZ, USA",Disk,2 minutes,2020-01-16,"Ancient"\n' + ) + + monkeypatch.setattr(eo, "datetime", _FixedDateTime) + monkeypatch.setattr(eo, "fetch_with_curl", lambda *a, **kw: _StubResponse(csv_text)) + monkeypatch.setattr(eo, "_load_nuforc_location_cache", _stub_geocode_cache) + monkeypatch.setattr(eo, "_save_nuforc_location_cache", lambda cache: None) + monkeypatch.setattr(eo, "_geocode_uap_location", lambda *a, **kw: None) + + sightings = eo._build_uap_sightings_from_hf_mirror() + + ids = sorted(s["id"] for s in sightings) + assert ids == ["NUFORC-1", "NUFORC-2"], f"only in-window rows should appear: got {ids}" + + +def test_fetch_uap_sightings_marks_failure_when_both_paths_empty(monkeypatch, caplog): + """When the live path raises AND the HF fallback returns empty, + ``fetch_uap_sightings`` must: + * NOT mark the layer fresh (pre-fix bug: it did, so the layer + showed as healthy-but-empty for days) + * call ``assert_canary("uap_sightings", 0)`` so the health + registry surfaces the broken layer + * log an ERROR with the live-path exception for debugging + """ + from services.fetchers import earth_observation as eo + from services.fetchers import _store + + monkeypatch.setattr(_store, "is_any_active", lambda layer: True) + monkeypatch.setattr(eo, "_load_nuforc_sightings_cache", lambda force_refresh=False: None) + + def _boom(): + raise RuntimeError("NUFORC live: zero rows pulled across 3 months") + + monkeypatch.setattr(eo, "_build_recent_uap_sightings", _boom) + monkeypatch.setattr(eo, "_build_uap_sightings_from_hf_mirror", lambda: []) + + marked: list[str] = [] + monkeypatch.setattr(eo, "_mark_fresh", lambda *keys: marked.extend(keys)) + + canary_calls: list[tuple[str, int]] = [] + import services.slo as slo + monkeypatch.setattr( + slo, "assert_canary", lambda key, value: canary_calls.append((key, int(value))) + ) + + with caplog.at_level(logging.ERROR, logger="services.fetchers.earth_observation"): + eo.fetch_uap_sightings() + + assert marked == [], "broken layer must NOT be marked fresh" + assert canary_calls == [("uap_sightings", 0)], ( + f"expected canary trip when both paths fail; got {canary_calls}" + ) + # The live error message should propagate into the error log so the + # operator can tell live failed AND fallback was empty (not the other + # way around). + assert any( + "both live NUFORC and HF fallback" in r.getMessage() + for r in caplog.records + ) + + +def test_fetch_uap_sightings_succeeds_when_fallback_returns_data(monkeypatch): + """Positive path: live fails, fallback returns rows. The layer is + populated and marked fresh; assert_canary is NOT tripped (we only + trip the canary when the layer has zero data).""" + from services.fetchers import earth_observation as eo + from services.fetchers import _store + + monkeypatch.setattr(_store, "is_any_active", lambda layer: True) + monkeypatch.setattr(eo, "_load_nuforc_sightings_cache", lambda force_refresh=False: None) + monkeypatch.setattr( + eo, "_build_recent_uap_sightings", lambda: (_ for _ in ()).throw(RuntimeError("live down")) + ) + + fallback_rows = [{"id": "NUFORC-fb-1", "date_time": "2026-04-20", "lat": 0.0, "lng": 0.0}] + monkeypatch.setattr(eo, "_build_uap_sightings_from_hf_mirror", lambda: fallback_rows) + monkeypatch.setattr(eo, "_save_nuforc_sightings_cache", lambda s: None) + + marked: list[str] = [] + monkeypatch.setattr(eo, "_mark_fresh", lambda *keys: marked.extend(keys)) + + canary_calls: list[tuple[str, int]] = [] + import services.slo as slo + monkeypatch.setattr( + slo, "assert_canary", lambda key, value: canary_calls.append((key, int(value))) + ) + + eo.fetch_uap_sightings() + + assert marked == ["uap_sightings"] + assert canary_calls == [], "canary should not trip when fallback supplies data" + + +def test_uap_scheduler_runs_weekly_not_daily(): + """The cron job for the UAP layer must be configured for Mondays at + 12:00 UTC, not daily. Daily was the pre-fix default; weekly matches + the layer's stated cadence (a rolling 60-day digest) and keeps load + on nuforc.org light.""" + from services import data_fetcher + + src = data_fetcher.__file__ + with open(src, "r", encoding="utf-8") as f: + text = f.read() + + # Anchor on the scheduler block by id, then assert the cron triggers. + assert "uap_sightings_weekly" in text, ( + "scheduler id should be uap_sightings_weekly (was uap_sightings_daily pre-fix)" + ) + # The day_of_week directive is the difference between daily and weekly. + # If somebody flips it back to daily, this fires. + weekly_block = text.split("uap_sightings_weekly", 1)[0] + # Walk backwards for the matching add_job call. + add_job_idx = weekly_block.rfind("add_job(") + assert add_job_idx >= 0, "could not locate add_job block for UAP scheduler" + job_block = text[add_job_idx : text.find(")", text.index("uap_sightings_weekly")) + 1] + assert 'day_of_week="mon"' in job_block, ( + f"expected day_of_week='mon' in UAP scheduler block:\n{job_block}" + )