mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-06-17 03:20:08 +02:00
fix(store): harden snapshot vs concurrent writer mutation; fix SIGINT dict aliasing (#389)
get_latest_data_deepcopy_snapshot deep-copies layers outside the data lock; a writer mutating a nested object in place races it and raises "dictionary changed size during iteration" (500 on /api/health, /api/live-data). Two changes: (1) _merge_sigint_snapshot now shallow-copies each entry so latest_data["sigint"] no longer aliases the SIGINT bridge dicts or the meshtastic_map_nodes layer (the concrete offender); (2) the snapshot retries a few times as defense-in-depth for any other in-place mutator. Plus regression tests.
This commit is contained in:
committed by
GitHub
parent
51f377f03d
commit
71a2ef4ce7
@@ -265,10 +265,27 @@ def get_latest_data_subset(*keys: str) -> DashboardData:
|
||||
|
||||
|
||||
def get_latest_data_deepcopy_snapshot() -> DashboardData:
|
||||
"""Deep-copy the full dashboard for legacy /api/live-data consumers."""
|
||||
with _data_lock:
|
||||
items = list(latest_data.items())
|
||||
return {key: copy.deepcopy(value) for key, value in items}
|
||||
"""Deep-copy the full dashboard for /api/health and legacy /api/live-data.
|
||||
|
||||
The per-value deepcopy runs OUTSIDE ``_data_lock`` so a large clone cannot
|
||||
block fetcher writers (#375). The store contract is replace-don't-mutate,
|
||||
but a writer that mutates a nested object in place (e.g. a live bridge
|
||||
updating an entry that is also published in this store) can race the
|
||||
deepcopy and raise ``RuntimeError: dictionary changed size during
|
||||
iteration`` — surfacing a 500 on the health/live-data path. The racing
|
||||
mutation window is tiny, so retry a few times rather than fail; a fresh
|
||||
attempt almost always lands on a quiescent moment. Defense-in-depth on top
|
||||
of fixing the offending writers, not a substitute for it.
|
||||
"""
|
||||
attempts = 4
|
||||
for attempt in range(attempts):
|
||||
with _data_lock:
|
||||
items = list(latest_data.items())
|
||||
try:
|
||||
return {key: copy.deepcopy(value) for key, value in items}
|
||||
except RuntimeError:
|
||||
if attempt == attempts - 1:
|
||||
raise
|
||||
|
||||
|
||||
def get_latest_data_subset_refs(*keys: str) -> DashboardData:
|
||||
|
||||
@@ -21,12 +21,21 @@ def _merge_sigint_snapshot(
|
||||
because they include fresher region/channel metadata.
|
||||
"""
|
||||
|
||||
merged = list(live_signals)
|
||||
# Shallow-copy every entry so the published list owns its own dicts. The
|
||||
# inputs alias objects that other threads keep mutating in place: live
|
||||
# signals are the SIGINT bridge's own dicts (updated as packets arrive),
|
||||
# and api_nodes are the same objects published under latest_data
|
||||
# ["meshtastic_map_nodes"]. Publishing those references into
|
||||
# latest_data["sigint"] lets a concurrent mutation race the lock-free
|
||||
# deepcopy in get_latest_data_deepcopy_snapshot() (/api/health, /api/live-
|
||||
# data) and raise "dictionary changed size during iteration". Copying
|
||||
# honors the replace-don't-mutate contract in fetchers/_store.py.
|
||||
merged = [dict(s) for s in live_signals]
|
||||
live_callsigns = {s["callsign"] for s in merged if s.get("source") == "meshtastic"}
|
||||
for node in api_nodes:
|
||||
if node.get("callsign") in live_callsigns:
|
||||
continue
|
||||
merged.append(node)
|
||||
merged.append(dict(node))
|
||||
merged.sort(key=lambda item: str(item.get("timestamp", "") or ""), reverse=True)
|
||||
return merged
|
||||
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
"""The full-store snapshot must survive a transient concurrent-mutation race.
|
||||
|
||||
``get_latest_data_deepcopy_snapshot`` deep-copies each top-level layer outside
|
||||
the data lock. If a misbehaving writer mutates a nested object in place during
|
||||
the copy, ``copy.deepcopy`` raises ``RuntimeError: dictionary changed size
|
||||
during iteration``. The snapshot retries a few times (the mutation window is
|
||||
tiny) so /api/health and /api/live-data do not 500 on a transient race.
|
||||
"""
|
||||
|
||||
import copy
|
||||
|
||||
from services.fetchers import _store
|
||||
|
||||
|
||||
def test_snapshot_retries_then_succeeds(monkeypatch):
|
||||
real_deepcopy = copy.deepcopy
|
||||
calls = {"n": 0}
|
||||
|
||||
def flaky_deepcopy(value, *args, **kwargs):
|
||||
calls["n"] += 1
|
||||
# Fail only on the very first deepcopy call, then behave normally.
|
||||
if calls["n"] == 1:
|
||||
raise RuntimeError("dictionary changed size during iteration")
|
||||
return real_deepcopy(value, *args, **kwargs)
|
||||
|
||||
monkeypatch.setattr(_store.copy, "deepcopy", flaky_deepcopy)
|
||||
|
||||
snapshot = _store.get_latest_data_deepcopy_snapshot()
|
||||
|
||||
assert isinstance(snapshot, dict)
|
||||
assert calls["n"] >= 2 # it retried after the simulated race
|
||||
|
||||
|
||||
def test_snapshot_reraises_if_race_never_clears(monkeypatch):
|
||||
def always_racing(value, *args, **kwargs):
|
||||
raise RuntimeError("dictionary changed size during iteration")
|
||||
|
||||
monkeypatch.setattr(_store.copy, "deepcopy", always_racing)
|
||||
|
||||
# A persistent (non-transient) violation is a real bug — surface it rather
|
||||
# than hang or return corrupt data.
|
||||
raised = False
|
||||
try:
|
||||
_store.get_latest_data_deepcopy_snapshot()
|
||||
except RuntimeError:
|
||||
raised = True
|
||||
assert raised
|
||||
@@ -0,0 +1,58 @@
|
||||
"""Regression test for SIGINT snapshot dict aliasing.
|
||||
|
||||
``_merge_sigint_snapshot`` used to publish the *same* dict objects it received
|
||||
into ``latest_data["sigint"]``. Those inputs are owned and mutated in place by
|
||||
other threads (the SIGINT bridge updating live signals, and the
|
||||
``meshtastic_map_nodes`` layer), so a concurrent mutation could race the
|
||||
lock-free deepcopy in ``get_latest_data_deepcopy_snapshot`` (/api/health,
|
||||
/api/live-data) and raise ``dictionary changed size during iteration``.
|
||||
|
||||
The merged snapshot must own copies of every entry.
|
||||
"""
|
||||
|
||||
from services.fetchers.sigint import _merge_sigint_snapshot
|
||||
|
||||
|
||||
def test_merged_entries_are_copies_not_aliases():
|
||||
live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}]
|
||||
api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}]
|
||||
|
||||
merged = _merge_sigint_snapshot(live, api)
|
||||
|
||||
# No published entry may be the *same object* as an input the bridge or the
|
||||
# meshtastic_map_nodes layer keeps mutating.
|
||||
inputs = {id(live[0]), id(api[0])}
|
||||
assert all(id(entry) not in inputs for entry in merged)
|
||||
|
||||
|
||||
def test_mutating_inputs_after_merge_does_not_affect_snapshot():
|
||||
live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}]
|
||||
api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}]
|
||||
|
||||
merged = _merge_sigint_snapshot(live, api)
|
||||
|
||||
# Simulate the bridge adding a key to a live signal after publication — this
|
||||
# must not change the size of any dict reachable from the published list.
|
||||
live[0]["region"] = "added-later"
|
||||
api[0]["channel"] = "added-later"
|
||||
|
||||
assert all("region" not in entry for entry in merged)
|
||||
assert all("channel" not in entry for entry in merged)
|
||||
|
||||
|
||||
def test_merge_preserves_data_and_dedup():
|
||||
# Live meshtastic observation wins over the map node for the same callsign.
|
||||
live = [{"callsign": "DUP", "source": "meshtastic", "timestamp": "5"}]
|
||||
api = [
|
||||
{"callsign": "DUP", "source": "meshtastic", "from_api": True, "timestamp": "1"},
|
||||
{"callsign": "OTHER", "source": "meshtastic", "from_api": True, "timestamp": "1"},
|
||||
]
|
||||
|
||||
merged = _merge_sigint_snapshot(live, api)
|
||||
|
||||
callsigns = [m["callsign"] for m in merged]
|
||||
assert callsigns.count("DUP") == 1
|
||||
assert "OTHER" in callsigns
|
||||
# The surviving DUP is the live one (no from_api flag).
|
||||
dup = next(m for m in merged if m["callsign"] == "DUP")
|
||||
assert not dup.get("from_api")
|
||||
Reference in New Issue
Block a user