diff --git a/backend/services/data_fetcher.py b/backend/services/data_fetcher.py index 4026b8f..03c213d 100644 --- a/backend/services/data_fetcher.py +++ b/backend/services/data_fetcher.py @@ -777,6 +777,19 @@ def start_scheduler(): misfire_grace_time=60, ) + # Flight observation pruning — drops icao24 → first_seen_at entries we + # haven't seen in an hour. Same cadence as AIS prune for symmetry; the + # per-tick scan is O(in-flight aircraft) so it's cheap. + from services.fetchers.flight_observations import prune as _prune_flight_observations + _scheduler.add_job( + lambda: _run_task_with_health(_prune_flight_observations, "prune_flight_observations"), + "interval", + minutes=5, + id="flight_observation_prune", + max_instances=1, + misfire_grace_time=60, + ) + # AISHub REST fallback — slow polling when the AISStream WebSocket # primary is offline. Configurable interval via # AISHUB_POLL_INTERVAL_MINUTES env (default 20 min). Operator must diff --git a/backend/services/fetchers/flight_observations.py b/backend/services/fetchers/flight_observations.py new file mode 100644 index 0000000..ac87b8a --- /dev/null +++ b/backend/services/fetchers/flight_observations.py @@ -0,0 +1,148 @@ +"""Per-aircraft observation tracking for cumulative fuel/CO2 estimates. + +Background +---------- +The pre-existing emissions enrichment attached a *rate* to each flight +(GPH and kg/hr) based on aircraft model. Users — reasonably — wanted the +running total: how much fuel HAS this plane burned since we started +seeing it? Multiplying the rate by elapsed observation time gets us +there, but it requires somewhere to remember "when did this icao24 +first appear on our radar?" + +Why this lives outside ``flight_trails`` +---------------------------------------- +``flight_trails`` is sized and pruned aggressively for map rendering +(5-minute TTL for untracked aircraft, 200 trail points max). That's +wrong for cumulative burn: if a plane has been airborne 2 hours but +its trail was pruned 30 min in, the "first trail point" timestamp is +30 min ago, not 2h ago. Worse, when the trail expires and re-creates, +the cumulative counter would reset mid-flight. + +This module tracks observation lifecycle separately: + +* When a hex is first observed: start a new flight session. +* While observed regularly (gap < ``REOPEN_GAP_S``): keep accumulating. +* When unseen for longer than ``REOPEN_GAP_S``: treat next sighting as + a new session (the plane landed and took off again, or it's a + different leg). Reset ``first_seen_at``. +* Stale sessions are pruned every ``PRUNE_INTERVAL_S`` so memory stays + bounded. + +The user explicitly asked for this counting semantic: "as soon as a +plane appears there should be a counter that keeps a running count of +the fuel being burned... If there is no estimate take off time then it +can just be from the time the server starts to keep a log of whats in +the air." +""" + +from __future__ import annotations + +import threading +import time + + +# Gap between sightings that resets the session. ADS-B refreshes the +# whole aircraft list every minute or two, so anything over a few +# minutes means the plane left our coverage window (landed, transit +# through dead zone, etc). 15 minutes is conservative. +REOPEN_GAP_S = 15 * 60 + +# Don't accumulate runaway memory: drop entries unseen for an hour. +PRUNE_AFTER_S = 60 * 60 + +# Cap on accumulated airtime per session so a single bug elsewhere +# (e.g. ts clock skew) can't produce comically large numbers. +MAX_SESSION_SECONDS = 24 * 3600 # 24h — longest realistic civilian leg + + +_observations: dict[str, dict[str, float]] = {} +_lock = threading.Lock() +_last_prune_at = 0.0 + + +def record_observation(icao_hex: str, *, now: float | None = None) -> int: + """Record a sighting of ``icao_hex`` and return airtime so far (seconds). + + Returns 0 for the first-ever sighting (no elapsed time yet) or when + ``icao_hex`` is falsy. The caller can multiply the returned seconds + by ``rate_per_hour / 3600`` to get cumulative consumption. + """ + if not icao_hex: + return 0 + key = str(icao_hex).strip().lower() + if not key: + return 0 + current = float(now if now is not None else time.time()) + + with _lock: + entry = _observations.get(key) + if entry is None: + _observations[key] = {"first_seen_at": current, "last_seen_at": current} + return 0 + # Use explicit ``is None`` checks instead of ``or`` short-circuit: + # ``0.0`` is a legitimate timestamp value (e.g. test fixtures + # seeding a far-past first_seen_at to exercise the clamp) but + # ``0.0 or fallback`` collapses to ``fallback`` because 0.0 is + # falsy. Bit me on my own test — leaving the safer form here. + last_raw = entry.get("last_seen_at") + last_seen = float(last_raw) if last_raw is not None else current + gap = current - last_seen + if gap > REOPEN_GAP_S: + # Treat as a new flight session — the plane landed/disappeared + # long enough that the prior cumulative count is no longer + # the same flight. + _observations[key] = {"first_seen_at": current, "last_seen_at": current} + return 0 + first_raw = entry.get("first_seen_at") + first = float(first_raw) if first_raw is not None else current + # Clamp absurd values from clock skew or bad input. + elapsed = max(0, min(int(current - first), MAX_SESSION_SECONDS)) + entry["last_seen_at"] = current + return elapsed + + +def prune(*, now: float | None = None) -> int: + """Drop entries we haven't seen in ``PRUNE_AFTER_S`` seconds. + + Returns number of entries dropped. Safe to call from a scheduler tick; + cheap (single dict scan) so cadence doesn't matter much. + """ + current = float(now if now is not None else time.time()) + dropped = 0 + with _lock: + stale_keys = [] + for k, v in _observations.items(): + last_raw = v.get("last_seen_at") + last = float(last_raw) if last_raw is not None else 0.0 + if current - last > PRUNE_AFTER_S: + stale_keys.append(k) + for k in stale_keys: + del _observations[k] + dropped += 1 + return dropped + + +def get_session_seconds(icao_hex: str, *, now: float | None = None) -> int: + """Read-only accessor: airtime for a known icao without bumping last-seen. + + Used by tests and external consumers (e.g. when rendering a snapshot + of all in-flight aircraft, you want the current value, not to update + last_seen_at as a side effect). + """ + if not icao_hex: + return 0 + key = str(icao_hex).strip().lower() + with _lock: + entry = _observations.get(key) + if entry is None: + return 0 + current = float(now if now is not None else time.time()) + first_raw = entry.get("first_seen_at") + first = float(first_raw) if first_raw is not None else current + return max(0, min(int(current - first), MAX_SESSION_SECONDS)) + + +def _reset_for_tests() -> None: + """Drop all observations. Test helper only.""" + with _lock: + _observations.clear() diff --git a/backend/services/fetchers/flights.py b/backend/services/fetchers/flights.py index cb15f28..eb49ba6 100644 --- a/backend/services/fetchers/flights.py +++ b/backend/services/fetchers/flights.py @@ -17,6 +17,7 @@ from services.network_utils import fetch_with_curl from services.fetchers._store import latest_data, _data_lock, _mark_fresh from services.fetchers.plane_alert import enrich_with_plane_alert, enrich_with_tracked_names from services.fetchers.emissions import get_emissions_info +from services.fetchers.flight_observations import record_observation as _record_flight_observation from services.fetchers.retry import with_retry from services.fetchers.route_database import lookup_route from services.fetchers.aircraft_database import lookup_aircraft_type @@ -601,6 +602,22 @@ def _classify_and_publish(all_adsb_flights): if model: emi = get_emissions_info(model) if emi: + # Cumulative fuel/CO2: multiply the per-hour rate by how + # long we've been observing this airframe. Users want to + # see the *amount* burned, not just the rate. If we've + # never seen this hex before, observed_seconds is 0 and + # the cumulative values are 0 until the next refresh — + # the rate is still useful info on its own. + observed_seconds = _record_flight_observation( + f.get("icao24") or "" + ) + elapsed_h = observed_seconds / 3600.0 + emi = { + **emi, + "observed_seconds": observed_seconds, + "fuel_gallons_burned": round(emi["fuel_gph"] * elapsed_h, 1), + "co2_kg_emitted": round(emi["co2_kg_per_hour"] * elapsed_h, 1), + } f["emissions"] = emi callsign = f.get("callsign", "").strip().upper() diff --git a/backend/services/fetchers/military.py b/backend/services/fetchers/military.py index 6ec8b68..5898308 100644 --- a/backend/services/fetchers/military.py +++ b/backend/services/fetchers/military.py @@ -7,6 +7,7 @@ import requests from services.network_utils import fetch_with_curl from services.fetchers._store import latest_data, _data_lock, _mark_fresh from services.fetchers.emissions import get_emissions_info +from services.fetchers.flight_observations import record_observation as _record_flight_observation from services.fetchers.plane_alert import enrich_with_plane_alert logger = logging.getLogger("services.data_fetcher") @@ -300,6 +301,18 @@ def fetch_military_flights(): if model: emissions = get_emissions_info(model) if emissions: + # Cumulative fuel/CO2 since first observation — mirrors + # the civilian path in flights._classify_and_publish. + observed_seconds = _record_flight_observation( + mf.get("icao24") or "" + ) + elapsed_h = observed_seconds / 3600.0 + emissions = { + **emissions, + "observed_seconds": observed_seconds, + "fuel_gallons_burned": round(emissions["fuel_gph"] * elapsed_h, 1), + "co2_kg_emitted": round(emissions["co2_kg_per_hour"] * elapsed_h, 1), + } mf["emissions"] = emissions if mf.get("alert_category"): mf["type"] = "tracked_flight" diff --git a/backend/tests/test_flight_observations.py b/backend/tests/test_flight_observations.py new file mode 100644 index 0000000..e538c8d --- /dev/null +++ b/backend/tests/test_flight_observations.py @@ -0,0 +1,258 @@ +"""Cumulative fuel/CO2 tracking via per-aircraft observation timestamps. + +Background +---------- +Users want the running total of fuel burned per aircraft — not just the +rate. We track first-seen-at per icao24 and multiply elapsed observation +time by the model-based rate. This module's job is exclusively the +timestamp bookkeeping; multiplication happens in the flights/military +fetchers. + +These tests pin: + + * First sighting returns 0 (no airtime yet). + * Repeated sightings within ``REOPEN_GAP_S`` accumulate elapsed time. + * Gap longer than ``REOPEN_GAP_S`` resets the session (plane landed + and took off again — different flight). + * ``MAX_SESSION_SECONDS`` clamp protects against clock skew bugs. + * ``prune()`` drops stale entries. + * ``get_session_seconds`` reads without bumping last_seen. + * Empty / None icao input is a defensive no-op. +""" + +from __future__ import annotations + +import pytest + + +@pytest.fixture(autouse=True) +def _reset_observations(): + from services.fetchers import flight_observations as obs + obs._reset_for_tests() + yield + obs._reset_for_tests() + + +class TestRecordObservation: + def test_first_sighting_returns_zero(self): + from services.fetchers.flight_observations import record_observation + assert record_observation("a12345", now=1000.0) == 0 + + def test_repeated_sightings_accumulate(self): + """ADS-B refreshes every ~minute in practice, so each observation + is within ``REOPEN_GAP_S`` (15 min) of the last and we keep + accumulating. Walking the timestamps in 5-minute steps so we + stay inside the reopen window the whole way.""" + from services.fetchers.flight_observations import record_observation + record_observation("a12345", now=1000.0) + # 1 minute later (within REOPEN_GAP_S) + assert record_observation("a12345", now=1060.0) == 60 + # Step through 5-minute spaced refreshes — first_seen_at stays + # at 1000.0 the whole time, and we approach a 1-hour airtime. + assert record_observation("a12345", now=1360.0) == 360 + assert record_observation("a12345", now=1660.0) == 660 + assert record_observation("a12345", now=1960.0) == 960 + assert record_observation("a12345", now=2260.0) == 1260 + assert record_observation("a12345", now=2560.0) == 1560 + assert record_observation("a12345", now=2860.0) == 1860 + assert record_observation("a12345", now=3160.0) == 2160 + assert record_observation("a12345", now=3460.0) == 2460 + assert record_observation("a12345", now=3760.0) == 2760 + assert record_observation("a12345", now=4060.0) == 3060 + assert record_observation("a12345", now=4360.0) == 3360 + # 1 hour after first sighting — still inside the 15-min reopen + # window from the prior 4360 observation. + assert record_observation("a12345", now=4600.0) == 3600 + + def test_gap_longer_than_reopen_resets_session(self): + """If a hex hasn't been seen in ``REOPEN_GAP_S`` (15 min default), + the next sighting is treated as a new flight — first_seen_at resets.""" + from services.fetchers.flight_observations import record_observation + record_observation("a12345", now=1000.0) + record_observation("a12345", now=1500.0) # 500s later — within gap + # Now 20 minutes of silence (1200s > 900s threshold) → session reset. + assert record_observation("a12345", now=2700.0) == 0 + # And the next quick sighting starts accumulating from 2700 again. + assert record_observation("a12345", now=2760.0) == 60 + + def test_session_clamp(self): + """Clock skew protection: when a hex has been continuously + observed for longer than ``MAX_SESSION_SECONDS``, clamp. + + Synthesizes the state directly because driving 86,400+ seconds of + observations through the public API in a test would take 1000+ + REOPEN_GAP_S-respecting steps. + """ + from services.fetchers import flight_observations as obs + from services.fetchers.flight_observations import _observations, _lock + + # last_seen_at very recent so REOPEN_GAP_S branch does NOT fire, + # but first_seen_at way in the past so the elapsed math overflows + # MAX_SESSION_SECONDS. Clamp must kick in. + big_now = float(obs.MAX_SESSION_SECONDS + 1_000_000) + with _lock: + _observations["a12345"] = { + "first_seen_at": 0.0, + "last_seen_at": big_now - 60, # 60s ago — well inside gap window + } + elapsed = obs.record_observation("a12345", now=big_now) + assert elapsed == obs.MAX_SESSION_SECONDS, ( + f"elapsed must be clamped to MAX_SESSION_SECONDS; got {elapsed}" + ) + + def test_empty_input_returns_zero(self): + from services.fetchers.flight_observations import record_observation + assert record_observation("") == 0 + assert record_observation(None) == 0 # type: ignore[arg-type] + assert record_observation(" ") == 0 + + def test_case_insensitive_key(self): + """ICAO24 hex codes are case-insensitive — adsb.lol lowercases + them, OpenSky may not. Normalize so both refer to the same airframe.""" + from services.fetchers.flight_observations import record_observation + record_observation("A12345", now=1000.0) + # Different case must hit the same entry. + assert record_observation("a12345", now=1060.0) == 60 + + +class TestGetSessionSeconds: + def test_read_only_does_not_bump(self): + from services.fetchers.flight_observations import ( + record_observation, + get_session_seconds, + ) + record_observation("a12345", now=1000.0) + record_observation("a12345", now=1060.0) # bumps last_seen + + # Now read at t=2000. Without bumping, gap=2000-1060=940 > 900, + # so a recording call would reset. But the read should NOT reset. + seconds_at_2000 = get_session_seconds("a12345", now=2000.0) + assert seconds_at_2000 == 1000, ( + f"read should return 2000-1000=1000s; got {seconds_at_2000}" + ) + # Verify the next recording at t=2001 still resets (gap > 900s + # from the read above — proves the read didn't bump last_seen). + from services.fetchers.flight_observations import record_observation as rec + assert rec("a12345", now=2001.0) == 0 # session reset + + def test_unknown_hex_returns_zero(self): + from services.fetchers.flight_observations import get_session_seconds + assert get_session_seconds("nonexistent") == 0 + + +class TestPrune: + def test_drops_stale_entries(self): + from services.fetchers import flight_observations as obs + + obs.record_observation("active", now=10_000.0) + obs.record_observation("stale", now=1.0) + + dropped = obs.prune(now=10_000.0) + + assert dropped == 1 + # Active entry survives: + assert obs.get_session_seconds("active", now=10_001.0) == 1 + # Stale entry was dropped — next obs starts fresh: + assert obs.record_observation("stale", now=10_002.0) == 0 + + def test_no_op_when_nothing_stale(self): + from services.fetchers import flight_observations as obs + obs.record_observation("hex1", now=1000.0) + obs.record_observation("hex2", now=1000.0) + + dropped = obs.prune(now=1500.0) + + assert dropped == 0 + + +# --------------------------------------------------------------------------- +# Integration: emissions enrichment in _classify_and_publish honors the +# cumulative tracker. +# --------------------------------------------------------------------------- + + +class TestEmissionsCumulativeIntegration: + def _reset_store(self): + from services.fetchers._store import latest_data, _data_lock + with _data_lock: + for key in ( + "flights", "commercial_flights", "private_flights", + "private_jets", "military_flights", "tracked_flights", + ): + latest_data[key] = [] + + def test_first_publish_zero_cumulative(self, monkeypatch): + """On the first observation, cumulative values are 0 — but the + rate fields and observed_seconds are still present in the dict.""" + from services.fetchers import flights as flights_module + from services.fetchers._store import latest_data, _data_lock + + self._reset_store() + monkeypatch.setattr(flights_module, "lookup_route", lambda _: None) + monkeypatch.setattr(flights_module, "lookup_aircraft_type", lambda _: "") + + flights_module._classify_and_publish([ + { + "hex": "test001", + "flight": "JBU711", + "r": "N1", + "t": "C172", # Cessna 172, 9 GPH + "lat": 40.0, + "lon": -100.0, + "alt_baro": 3000, + "gs": 100, + } + ]) + + with _data_lock: + published = list(latest_data.get("flights", [])) + assert len(published) == 1 + emi = published[0].get("emissions") + assert emi is not None + assert emi["fuel_gph"] == 9 + assert emi["observed_seconds"] == 0 + assert emi["fuel_gallons_burned"] == 0.0 + assert emi["co2_kg_emitted"] == 0.0 + + def test_second_publish_accumulates(self, monkeypatch): + """Publishing the same hex a second time picks up real elapsed time + and produces non-zero cumulative values.""" + import time as _time_real + from services.fetchers import flights as flights_module + from services.fetchers import flight_observations as obs + from services.fetchers._store import latest_data, _data_lock + + self._reset_store() + monkeypatch.setattr(flights_module, "lookup_route", lambda _: None) + monkeypatch.setattr(flights_module, "lookup_aircraft_type", lambda _: "") + + # Manually seed an observation 1 hour in the past so the next + # publish picks up ~3600s elapsed. + with obs._lock: + obs._observations["test002"] = { + "first_seen_at": _time_real.time() - 3600, + "last_seen_at": _time_real.time() - 60, + } + + flights_module._classify_and_publish([ + { + "hex": "test002", + "flight": "JBU711", + "r": "N1", + "t": "C172", # 9 GPH + "lat": 40.0, + "lon": -100.0, + "alt_baro": 3000, + "gs": 100, + } + ]) + + with _data_lock: + published = list(latest_data.get("flights", [])) + assert len(published) == 1 + emi = published[0].get("emissions") + # Roughly 1 hour observed → 9 gal burned. + assert 3500 <= emi["observed_seconds"] <= 3700 + assert 8.7 <= emi["fuel_gallons_burned"] <= 9.3 + # CO2 = 9 gph * 9.57 kg/gal = 86.1 kg/hr. + assert 84 <= emi["co2_kg_emitted"] <= 88 diff --git a/frontend/src/components/NewsFeed.tsx b/frontend/src/components/NewsFeed.tsx index af63ffb..87f4dc5 100644 --- a/frontend/src/components/NewsFeed.tsx +++ b/frontend/src/components/NewsFeed.tsx @@ -249,34 +249,70 @@ const VESSEL_TYPE_WIKI: Record = { type FlightTrailPoint = { lat?: number; lng?: number; alt?: number; ts?: number } | number[]; +function formatObservedDuration(seconds: number): string { + // Compact "1h 14m" / "23m" / "45s" — matches the density of the rest + // of the flight tooltip. < 60s is shown as "<1m" so the user knows + // we've JUST started observing this hex (cumulative will still be 0). + if (!Number.isFinite(seconds) || seconds <= 0) return '<1m'; + if (seconds < 60) return '<1m'; + const totalMinutes = Math.floor(seconds / 60); + const hours = Math.floor(totalMinutes / 60); + const minutes = totalMinutes % 60; + if (hours > 0) return `${hours}h ${minutes}m`; + return `${minutes}m`; +} + function EmissionsEstimateBlock({ flight }: { flight: any }) { const emissions = flight?.emissions; - const context = emissions ? 'Model-based cruise estimate' : null; + // Cumulative fuel/CO2 since the backend first saw this hex this + // flight session. Prefer these big numbers — the user explicitly + // wanted "the actual fuel that has been burned", not the rate. + // Rates are still shown below as smaller context. + const observedSec = Number(emissions?.observed_seconds ?? 0); + const fuelBurned = Number(emissions?.fuel_gallons_burned ?? 0); + const co2Emitted = Number(emissions?.co2_kg_emitted ?? 0); + const haveCumulative = emissions && observedSec > 0; return (
EMISSIONS ESTIMATE
-
FUEL RATE
-
- {emissions ? ( - <>{emissions.fuel_gph} GPH +
FUEL BURNED
+
+ {haveCumulative ? ( + <>{fuelBurned.toLocaleString(undefined, { maximumFractionDigits: 1 })} gal + ) : emissions ? ( + ) : 'UNKNOWN'}
+ {emissions && ( +
+ @ {emissions.fuel_gph} gph +
+ )}
-
CO2 RATE
-
- {emissions ? ( - <>{emissions.co2_kg_per_hour.toLocaleString()} KG/HR +
CO2 EMITTED
+
+ {haveCumulative ? ( + <>{co2Emitted.toLocaleString(undefined, { maximumFractionDigits: 1 })} kg + ) : emissions ? ( + ) : 'UNKNOWN'}
+ {emissions && ( +
+ @ {emissions.co2_kg_per_hour.toLocaleString()} kg/hr +
+ )}
- {context && ( + {emissions && (
- {context} + {haveCumulative + ? `Observed in flight for ${formatObservedDuration(observedSec)} · model-based cruise estimate` + : 'Just observed · totals will appear on next refresh'}
)}