diff --git a/backend/services/fetchers/_store.py b/backend/services/fetchers/_store.py index 81eacf1..1eeba77 100644 --- a/backend/services/fetchers/_store.py +++ b/backend/services/fetchers/_store.py @@ -265,10 +265,27 @@ def get_latest_data_subset(*keys: str) -> DashboardData: def get_latest_data_deepcopy_snapshot() -> DashboardData: - """Deep-copy the full dashboard for legacy /api/live-data consumers.""" - with _data_lock: - items = list(latest_data.items()) - return {key: copy.deepcopy(value) for key, value in items} + """Deep-copy the full dashboard for /api/health and legacy /api/live-data. + + The per-value deepcopy runs OUTSIDE ``_data_lock`` so a large clone cannot + block fetcher writers (#375). The store contract is replace-don't-mutate, + but a writer that mutates a nested object in place (e.g. a live bridge + updating an entry that is also published in this store) can race the + deepcopy and raise ``RuntimeError: dictionary changed size during + iteration`` — surfacing a 500 on the health/live-data path. The racing + mutation window is tiny, so retry a few times rather than fail; a fresh + attempt almost always lands on a quiescent moment. Defense-in-depth on top + of fixing the offending writers, not a substitute for it. + """ + attempts = 4 + for attempt in range(attempts): + with _data_lock: + items = list(latest_data.items()) + try: + return {key: copy.deepcopy(value) for key, value in items} + except RuntimeError: + if attempt == attempts - 1: + raise def get_latest_data_subset_refs(*keys: str) -> DashboardData: diff --git a/backend/services/fetchers/sigint.py b/backend/services/fetchers/sigint.py index f6a2c7a..185cbe0 100644 --- a/backend/services/fetchers/sigint.py +++ b/backend/services/fetchers/sigint.py @@ -21,12 +21,21 @@ def _merge_sigint_snapshot( because they include fresher region/channel metadata. """ - merged = list(live_signals) + # Shallow-copy every entry so the published list owns its own dicts. The + # inputs alias objects that other threads keep mutating in place: live + # signals are the SIGINT bridge's own dicts (updated as packets arrive), + # and api_nodes are the same objects published under latest_data + # ["meshtastic_map_nodes"]. Publishing those references into + # latest_data["sigint"] lets a concurrent mutation race the lock-free + # deepcopy in get_latest_data_deepcopy_snapshot() (/api/health, /api/live- + # data) and raise "dictionary changed size during iteration". Copying + # honors the replace-don't-mutate contract in fetchers/_store.py. + merged = [dict(s) for s in live_signals] live_callsigns = {s["callsign"] for s in merged if s.get("source") == "meshtastic"} for node in api_nodes: if node.get("callsign") in live_callsigns: continue - merged.append(node) + merged.append(dict(node)) merged.sort(key=lambda item: str(item.get("timestamp", "") or ""), reverse=True) return merged diff --git a/backend/tests/test_live_data_snapshot_resilience.py b/backend/tests/test_live_data_snapshot_resilience.py new file mode 100644 index 0000000..0e3d02d --- /dev/null +++ b/backend/tests/test_live_data_snapshot_resilience.py @@ -0,0 +1,47 @@ +"""The full-store snapshot must survive a transient concurrent-mutation race. + +``get_latest_data_deepcopy_snapshot`` deep-copies each top-level layer outside +the data lock. If a misbehaving writer mutates a nested object in place during +the copy, ``copy.deepcopy`` raises ``RuntimeError: dictionary changed size +during iteration``. The snapshot retries a few times (the mutation window is +tiny) so /api/health and /api/live-data do not 500 on a transient race. +""" + +import copy + +from services.fetchers import _store + + +def test_snapshot_retries_then_succeeds(monkeypatch): + real_deepcopy = copy.deepcopy + calls = {"n": 0} + + def flaky_deepcopy(value, *args, **kwargs): + calls["n"] += 1 + # Fail only on the very first deepcopy call, then behave normally. + if calls["n"] == 1: + raise RuntimeError("dictionary changed size during iteration") + return real_deepcopy(value, *args, **kwargs) + + monkeypatch.setattr(_store.copy, "deepcopy", flaky_deepcopy) + + snapshot = _store.get_latest_data_deepcopy_snapshot() + + assert isinstance(snapshot, dict) + assert calls["n"] >= 2 # it retried after the simulated race + + +def test_snapshot_reraises_if_race_never_clears(monkeypatch): + def always_racing(value, *args, **kwargs): + raise RuntimeError("dictionary changed size during iteration") + + monkeypatch.setattr(_store.copy, "deepcopy", always_racing) + + # A persistent (non-transient) violation is a real bug — surface it rather + # than hang or return corrupt data. + raised = False + try: + _store.get_latest_data_deepcopy_snapshot() + except RuntimeError: + raised = True + assert raised diff --git a/backend/tests/test_sigint_snapshot_isolation.py b/backend/tests/test_sigint_snapshot_isolation.py new file mode 100644 index 0000000..1dc7105 --- /dev/null +++ b/backend/tests/test_sigint_snapshot_isolation.py @@ -0,0 +1,58 @@ +"""Regression test for SIGINT snapshot dict aliasing. + +``_merge_sigint_snapshot`` used to publish the *same* dict objects it received +into ``latest_data["sigint"]``. Those inputs are owned and mutated in place by +other threads (the SIGINT bridge updating live signals, and the +``meshtastic_map_nodes`` layer), so a concurrent mutation could race the +lock-free deepcopy in ``get_latest_data_deepcopy_snapshot`` (/api/health, +/api/live-data) and raise ``dictionary changed size during iteration``. + +The merged snapshot must own copies of every entry. +""" + +from services.fetchers.sigint import _merge_sigint_snapshot + + +def test_merged_entries_are_copies_not_aliases(): + live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}] + api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}] + + merged = _merge_sigint_snapshot(live, api) + + # No published entry may be the *same object* as an input the bridge or the + # meshtastic_map_nodes layer keeps mutating. + inputs = {id(live[0]), id(api[0])} + assert all(id(entry) not in inputs for entry in merged) + + +def test_mutating_inputs_after_merge_does_not_affect_snapshot(): + live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}] + api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}] + + merged = _merge_sigint_snapshot(live, api) + + # Simulate the bridge adding a key to a live signal after publication — this + # must not change the size of any dict reachable from the published list. + live[0]["region"] = "added-later" + api[0]["channel"] = "added-later" + + assert all("region" not in entry for entry in merged) + assert all("channel" not in entry for entry in merged) + + +def test_merge_preserves_data_and_dedup(): + # Live meshtastic observation wins over the map node for the same callsign. + live = [{"callsign": "DUP", "source": "meshtastic", "timestamp": "5"}] + api = [ + {"callsign": "DUP", "source": "meshtastic", "from_api": True, "timestamp": "1"}, + {"callsign": "OTHER", "source": "meshtastic", "from_api": True, "timestamp": "1"}, + ] + + merged = _merge_sigint_snapshot(live, api) + + callsigns = [m["callsign"] for m in merged] + assert callsigns.count("DUP") == 1 + assert "OTHER" in callsigns + # The surviving DUP is the live one (no from_api flag). + dup = next(m for m in merged if m["callsign"] == "DUP") + assert not dup.get("from_api")