diff --git a/backend/.env.example b/backend/.env.example index 578c1b8..e63cb30 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -69,7 +69,11 @@ AIS_API_KEY= # https://aisstream.io/ — free tier WebSocket key # the unauthenticated yfinance fallback to call Yahoo Finance. # FINANCIAL_ENABLED=false # -# NUFORC UAP sightings — huggingface.co dataset download. +# NUFORC UAP map layer — live scrape from nuforc.org (rolling window, default 60 days). +# NUFORC_RECENT_DAYS=60 +# On Windows, live scrape uses Python requests by default; optional: +# SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=true +# NUFORC enrichment index (HF dataset) is separate — opt-in only: # NUFORC_ENABLED=false # # News RSS aggregator — defaults ON. Set to "false" to disable all diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 03c213d..1c74c68 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -993,19 +993,17 @@ def start_scheduler(): misfire_grace_time=600, ) - # UAP sightings (NUFORC) — weekly on Mondays at 12:00 UTC. The layer is a - # rolling last-60-days digest; refreshing once a week is enough cadence - # for human-readable map exploration and keeps load on nuforc.org light. + # UAP sightings (NUFORC) — daily at 12:00 UTC. Rolling ~60-day window from + # live nuforc.org; disk cache is re-validated on every read. _scheduler.add_job( lambda: _run_task_with_health( lambda: fetch_uap_sightings(force_refresh=True), "fetch_uap_sightings", ), "cron", - day_of_week="mon", hour=12, minute=0, - id="uap_sightings_weekly", + id="uap_sightings_daily", max_instances=1, misfire_grace_time=3600, ) diff --git a/backend/services/fetchers/earth_observation.py b/backend/services/fetchers/earth_observation.py index 94c6fad..5495474 100644 --- a/backend/services/fetchers/earth_observation.py +++ b/backend/services/fetchers/earth_observation.py @@ -692,7 +692,8 @@ _NUFORC_TILESET = "nuforc.cmm18aqea06bu1mmselhpnano-0ce5v" _NUFORC_TOKEN = os.environ.get("NUFORC_MAPBOX_TOKEN", "").strip() _NUFORC_RADIUS_M = 200_000 # 200 km query radius _NUFORC_LIMIT = 50 # max features per tilequery call -_NUFORC_RECENT_DAYS = int(os.environ.get("NUFORC_RECENT_DAYS", "60")) +# Rolling window shown on the map (~2 calendar months). Override via NUFORC_RECENT_DAYS. +_NUFORC_RECENT_DAYS = max(1, int(os.environ.get("NUFORC_RECENT_DAYS", "60"))) _NUFORC_HF_FALLBACK_LIMIT = max(25, int(os.environ.get("NUFORC_HF_FALLBACK_LIMIT", "250"))) _NUFORC_HF_GEOCODE_LIMIT = max(25, int(os.environ.get("NUFORC_HF_GEOCODE_LIMIT", "150"))) _NUFORC_GEOCODE_WORKERS = max(1, int(os.environ.get("NUFORC_GEOCODE_WORKERS", "1"))) @@ -766,6 +767,35 @@ def _fetch_nuforc_tilequery(lng: float, lat: float) -> list[dict]: return [] +def _uap_cutoff_date_str() -> str: + return (datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS)).strftime("%Y-%m-%d") + + +def _uap_sighting_date_str(sighting: dict) -> str | None: + """Normalize a sighting row to YYYY-MM-DD for window filtering.""" + from services.fetchers.nuforc_enrichment import _parse_date + + raw = str(sighting.get("date_time") or sighting.get("occurred") or "").strip() + if not raw: + return None + parsed = _parse_date(raw) + if parsed: + return parsed + if len(raw) >= 10 and raw[4] == "-" and raw[7] == "-": + return raw[:10] + return None + + +def _filter_uap_sightings_recent(sightings: list[dict]) -> list[dict]: + """Drop anything outside the rolling NUFORC_RECENT_DAYS window.""" + cutoff = _uap_cutoff_date_str() + return [ + sighting + for sighting in sightings + if (_uap_sighting_date_str(sighting) or "") >= cutoff + ] + + def _parse_nuforc_tile_date(value: str) -> datetime | None: raw = str(value or "").strip() if not raw: @@ -804,17 +834,39 @@ def _load_nuforc_sightings_cache(*, force_refresh: bool = False) -> list[dict] | return None if (datetime.utcnow() - built_dt).total_seconds() > 86400: return None + if raw.get("cutoff_days") != _NUFORC_RECENT_DAYS: + logger.info( + "UAP sightings: cache cutoff_days mismatch (%s != %s); rebuilding", + raw.get("cutoff_days"), + _NUFORC_RECENT_DAYS, + ) + return None sightings = raw.get("sightings") if isinstance(sightings, list): if len(sightings) <= 0: logger.info("UAP sightings: cache is fresh but empty; rebuilding") return None + filtered = _filter_uap_sightings_recent(sightings) + if not filtered: + logger.warning( + "UAP sightings: cache had %d rows but none within last %d days; rebuilding", + len(sightings), + _NUFORC_RECENT_DAYS, + ) + return None + if len(filtered) < len(sightings): + logger.info( + "UAP sightings: dropped %d stale cached rows outside %d-day window", + len(sightings) - len(filtered), + _NUFORC_RECENT_DAYS, + ) logger.info( - "UAP sightings: loaded %d cached reports from %s", - len(sightings), + "UAP sightings: loaded %d cached reports from %s (within %d-day window)", + len(filtered), built, + _NUFORC_RECENT_DAYS, ) - return sightings + return filtered except Exception as e: logger.warning("UAP sightings: cache load error: %s", e) return None @@ -828,6 +880,7 @@ def _save_nuforc_sightings_cache(sightings: list[dict]) -> None: _NUFORC_DATA_DIR.mkdir(parents=True, exist_ok=True) payload = { "built": datetime.utcnow().isoformat(), + "cutoff_days": _NUFORC_RECENT_DAYS, "count": len(sightings), "sightings": sightings, } @@ -1035,27 +1088,128 @@ def _nuforc_months_for_window(days: int) -> list[str]: return months -def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]: - """Pull one month of NUFORC sightings via the live wpDataTables AJAX. - - Returns a list of raw row dicts with the fields we care about: - id, occurred (YYYY-MM-DD), posted (YYYY-MM-DD), city, state, country, - shape_raw, summary, explanation. Empty list on any failure — caller - decides whether a failure is fatal. - """ +def _parse_nuforc_live_datatables_rows(raw_rows: list) -> list[dict]: + """Parse wpDataTables ``data`` array into normalized row dicts.""" from services.fetchers.nuforc_enrichment import _parse_date - curl_bin = shutil.which("curl") or "curl" + out: list[dict] = [] + for raw in raw_rows: + if not isinstance(raw, list) or len(raw) < 8: + continue + link_html = str(raw[0] or "") + occurred_raw = str(raw[1] or "") + city = str(raw[2] or "").strip() + state = str(raw[3] or "").strip() + country = str(raw[4] or "").strip() + shape_raw = (str(raw[5] or "").strip() or "Unknown") + summary = str(raw[6] or "").strip() + reported_raw = str(raw[7] or "") + explanation = str(raw[9] or "").strip() if len(raw) > 9 and raw[9] else "" + + occurred_ymd = _parse_date(occurred_raw) + if not occurred_ymd: + continue + if not city and not state and not country: + continue + + id_match = _NUFORC_LIVE_SIGHTING_ID_RE.search(link_html) + if id_match: + sighting_id = f"NUFORC-{id_match.group(1)}" + else: + digest = hashlib.sha1( + f"{occurred_ymd}|{city}|{state}|{summary}".encode("utf-8", "ignore") + ).hexdigest()[:12] + sighting_id = f"NUFORC-{digest}" + + if summary and len(summary) > 280: + summary = summary[:277] + "..." + if not summary: + summary = "Sighting reported" + + out.append({ + "id": sighting_id, + "occurred": occurred_ymd, + "posted": _parse_date(reported_raw) or occurred_ymd, + "city": city, + "state": state, + "country": country, + "shape_raw": shape_raw, + "summary": summary, + "explanation": explanation, + }) + return out + + +def _nuforc_fetch_month_live_requests(yyyymm: str) -> list[dict]: + """Live NUFORC month fetch via requests (Windows-safe when curl is disabled).""" + import requests + index_url = _NUFORC_LIVE_INDEX_URL.format(yyyymm=yyyymm) ajax_url = _NUFORC_LIVE_AJAX_URL.format(yyyymm=yyyymm) - - if not external_curl_fallback_enabled(): + headers = {"User-Agent": _nuforc_live_user_agent()} + session = requests.Session() + session.headers.update(headers) + try: + index_res = session.get(index_url, timeout=60) + except requests.RequestException as e: + logger.warning("NUFORC live (requests): index fetch failed for %s: %s", yyyymm, e) + return [] + if index_res.status_code != 200 or not index_res.text: logger.warning( - "NUFORC live: external curl disabled on Windows for %s; " - "set SHADOWBROKER_ENABLE_WINDOWS_CURL_FALLBACK=1 to opt in.", + "NUFORC live (requests): index HTTP %s for %s", + index_res.status_code, yyyymm, ) return [] + nonce_match = _NUFORC_LIVE_NONCE_RE.search(index_res.text) + if not nonce_match: + logger.warning("NUFORC live (requests): wdtNonce not found for %s", yyyymm) + return [] + nonce = nonce_match.group(1) + post_data = ( + "draw=1" + "&columns%5B0%5D%5Bdata%5D=0&columns%5B0%5D%5Bsearchable%5D=true&columns%5B0%5D%5Borderable%5D=false" + "&columns%5B1%5D%5Bdata%5D=1&columns%5B1%5D%5Bsearchable%5D=true&columns%5B1%5D%5Borderable%5D=true" + "&order%5B0%5D%5Bcolumn%5D=1&order%5B0%5D%5Bdir%5D=desc" + "&start=0&length=-1" + "&search%5Bvalue%5D=&search%5Bregex%5D=false" + f"&wdtNonce={nonce}" + ) + try: + ajax_res = session.post( + ajax_url, + data=post_data, + headers={ + **headers, + "Referer": index_url, + "X-Requested-With": "XMLHttpRequest", + "Content-Type": "application/x-www-form-urlencoded", + }, + timeout=120, + ) + except requests.RequestException as e: + logger.warning("NUFORC live (requests): ajax failed for %s: %s", yyyymm, e) + return [] + if ajax_res.status_code != 200 or not ajax_res.text: + logger.warning( + "NUFORC live (requests): ajax HTTP %s for %s", + ajax_res.status_code, + yyyymm, + ) + return [] + try: + payload = ajax_res.json() + except json.JSONDecodeError as e: + logger.warning("NUFORC live (requests): ajax JSON decode failed for %s: %s", yyyymm, e) + return [] + return _parse_nuforc_live_datatables_rows(payload.get("data") or []) + + +def _nuforc_fetch_month_live_curl(yyyymm: str, cookie_jar: Path) -> list[dict]: + """Pull one month of NUFORC sightings via curl + wpDataTables AJAX.""" + curl_bin = shutil.which("curl") or "curl" + index_url = _NUFORC_LIVE_INDEX_URL.format(yyyymm=yyyymm) + ajax_url = _NUFORC_LIVE_AJAX_URL.format(yyyymm=yyyymm) # Step 1: GET the month index to capture session cookies + fresh nonce. try: @@ -1125,65 +1279,27 @@ def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]: logger.warning("NUFORC live: ajax JSON decode failed for %s: %s", yyyymm, e) return [] - raw_rows = payload.get("data") or [] - out: list[dict] = [] - for raw in raw_rows: - if not isinstance(raw, list) or len(raw) < 8: - continue - link_html = str(raw[0] or "") - occurred_raw = str(raw[1] or "") - city = str(raw[2] or "").strip() - state = str(raw[3] or "").strip() - country = str(raw[4] or "").strip() - shape_raw = (str(raw[5] or "").strip() or "Unknown") - summary = str(raw[6] or "").strip() - reported_raw = str(raw[7] or "") - explanation = str(raw[9] or "").strip() if len(raw) > 9 and raw[9] else "" + return _parse_nuforc_live_datatables_rows(payload.get("data") or []) - occurred_ymd = _parse_date(occurred_raw) - if not occurred_ymd: - continue - if not city and not state and not country: - continue - id_match = _NUFORC_LIVE_SIGHTING_ID_RE.search(link_html) - if id_match: - sighting_id = f"NUFORC-{id_match.group(1)}" - else: - digest = hashlib.sha1( - f"{occurred_ymd}|{city}|{state}|{summary}".encode("utf-8", "ignore") - ).hexdigest()[:12] - sighting_id = f"NUFORC-{digest}" - - if summary and len(summary) > 280: - summary = summary[:277] + "..." - if not summary: - summary = "Sighting reported" - - out.append({ - "id": sighting_id, - "occurred": occurred_ymd, - "posted": _parse_date(reported_raw) or occurred_ymd, - "city": city, - "state": state, - "country": country, - "shape_raw": shape_raw, - "summary": summary, - "explanation": explanation, - }) - return out +def _nuforc_fetch_month_live(yyyymm: str, cookie_jar: Path) -> list[dict]: + """Pull one month of NUFORC sightings via live wpDataTables AJAX.""" + if external_curl_fallback_enabled(): + rows = _nuforc_fetch_month_live_curl(yyyymm, cookie_jar) + if rows: + return rows + return _nuforc_fetch_month_live_requests(yyyymm) def _build_recent_uap_sightings() -> list[dict]: - """Build the rolling 1-year UAP sightings layer from live NUFORC data. + """Build the rolling UAP sightings layer from live NUFORC data. Hits nuforc.org's public sub-index once per month in the window, drops anything outside the exact day-precision cutoff, dedupes by sighting id, geocodes city+state via the existing location cache, and returns rows keyed to the same schema the frontend already renders. """ - cutoff_dt = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS) - cutoff_str = cutoff_dt.strftime("%Y-%m-%d") + cutoff_str = _uap_cutoff_date_str() months = _nuforc_months_for_window(_NUFORC_RECENT_DAYS) try: @@ -1567,12 +1683,16 @@ def fetch_uap_sightings(*, force_refresh: bool = False): live_error, ) + if sightings: + sightings = _filter_uap_sightings_recent(sightings) + with _data_lock: latest_data["uap_sightings"] = sightings or [] if sightings: _mark_fresh("uap_sightings") return + # Unreachable legacy Mapbox tilequery path (kept for reference). cutoff = datetime.utcnow() - timedelta(days=_NUFORC_RECENT_DAYS) # Query the grid concurrently (up to 8 threads) diff --git a/backend/tests/test_uap_hf_fallback_cutoff.py b/backend/tests/test_uap_hf_fallback_cutoff.py index 058f829..4d36ffa 100644 --- a/backend/tests/test_uap_hf_fallback_cutoff.py +++ b/backend/tests/test_uap_hf_fallback_cutoff.py @@ -24,6 +24,7 @@ These tests pin the new behavior: from __future__ import annotations +import json import logging from datetime import datetime as real_datetime @@ -225,28 +226,39 @@ def test_fetch_uap_sightings_succeeds_when_fallback_returns_data(monkeypatch): assert canary_calls == [], "canary should not trip when fallback supplies data" -def test_uap_scheduler_runs_weekly_not_daily(): - """The cron job for the UAP layer must be configured for Mondays at - 12:00 UTC, not daily. Daily was the pre-fix default; weekly matches - the layer's stated cadence (a rolling 60-day digest) and keeps load - on nuforc.org light.""" +def test_uap_scheduler_runs_daily(): + """UAP layer refreshes daily so the rolling ~60-day window stays current.""" from services import data_fetcher - src = data_fetcher.__file__ - with open(src, "r", encoding="utf-8") as f: + with open(data_fetcher.__file__, "r", encoding="utf-8") as f: text = f.read() - # Anchor on the scheduler block by id, then assert the cron triggers. - assert "uap_sightings_weekly" in text, ( - "scheduler id should be uap_sightings_weekly (was uap_sightings_daily pre-fix)" - ) - # The day_of_week directive is the difference between daily and weekly. - # If somebody flips it back to daily, this fires. - weekly_block = text.split("uap_sightings_weekly", 1)[0] - # Walk backwards for the matching add_job call. - add_job_idx = weekly_block.rfind("add_job(") - assert add_job_idx >= 0, "could not locate add_job block for UAP scheduler" - job_block = text[add_job_idx : text.find(")", text.index("uap_sightings_weekly")) + 1] - assert 'day_of_week="mon"' in job_block, ( - f"expected day_of_week='mon' in UAP scheduler block:\n{job_block}" + assert "uap_sightings_daily" in text + idx = text.index("uap_sightings_daily") + block = text[max(0, idx - 600) : idx + 80] + assert 'day_of_week="mon"' not in block + + +def test_uap_cache_rejects_stale_rows_on_load(tmp_path, monkeypatch): + """Disk cache must not resurrect sightings outside the rolling window.""" + from services.fetchers import earth_observation as eo + + cache_file = tmp_path / "nuforc_recent_sightings.json" + monkeypatch.setattr(eo, "_NUFORC_SIGHTINGS_CACHE_FILE", cache_file) + monkeypatch.setattr(eo, "datetime", _FixedDateTime) + + cache_file.write_text( + json.dumps({ + "built": _FixedDateTime.utcnow().isoformat(), + "cutoff_days": 60, + "sightings": [ + {"id": "NUFORC-old", "date_time": "2023-06-01", "lat": 39.0, "lng": -105.0}, + {"id": "NUFORC-new", "date_time": "2026-04-20", "lat": 40.0, "lng": -104.0}, + ], + }), + encoding="utf-8", ) + + loaded = eo._load_nuforc_sightings_cache() + assert loaded is not None + assert [s["id"] for s in loaded] == ["NUFORC-new"]