Compare commits

..

3 Commits

Author SHA1 Message Date
TheYellowBeanieGuy 9c5a4054f6 fix(gdelt): stop background thread mutating already-published features (dictionary changed size during iteration) (#388)
* fix(gdelt): publish enriched copies instead of mutating live features

_enrich_gdelt_titles_background ran in a daemon thread that mutated the
nested properties dicts of GDELT features already published into
latest_data[gdelt]. HTTP readers hold live references to those dicts and
serialize them outside the data lock, so the in-place mutation raced the
serializer and raised RuntimeError: dictionary changed size during
iteration on /api/live-data/slow and /api/bootstrap/critical.

Enrich deep copies instead and atomically swap the top-level key under
_data_lock, with an identity guard so a newer fetch_gdelt() is not clobbered.
Honors the replace-don't-mutate contract documented in fetchers/_store.py.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

* test(gdelt): regression test for background enrichment isolation

Asserts _enrich_gdelt_titles_background does not mutate already-published features and instead atomically swaps latest_data["gdelt"] with enriched copies (with the identity guard). Locks in the fix for the dictionary-changed-size race.

---------

Co-authored-by: Claude Fable 5 <noreply@anthropic.com>
2026-06-15 18:23:25 -06:00
TheYellowBeanieGuy 71a2ef4ce7 fix(store): harden snapshot vs concurrent writer mutation; fix SIGINT dict aliasing (#389)
get_latest_data_deepcopy_snapshot deep-copies layers outside the data lock; a writer mutating a nested object in place races it and raises "dictionary changed size during iteration" (500 on /api/health, /api/live-data). Two changes: (1) _merge_sigint_snapshot now shallow-copies each entry so latest_data["sigint"] no longer aliases the SIGINT bridge dicts or the meshtastic_map_nodes layer (the concrete offender); (2) the snapshot retries a few times as defense-in-depth for any other in-place mutator. Plus regression tests.
2026-06-15 17:35:27 -06:00
BigBodyCobain 51f377f03d fix: sync Data Layers toggle-all icon and improve RSS feed saves
Unify toggle-all exclusions for Earth imagery overlays so the icon matches layer state, and let Docker operators save news feeds via the proxy without a misleading network error.
2026-06-15 16:21:38 -06:00
8 changed files with 364 additions and 69 deletions
+21 -4
View File
@@ -265,10 +265,27 @@ def get_latest_data_subset(*keys: str) -> DashboardData:
def get_latest_data_deepcopy_snapshot() -> DashboardData:
"""Deep-copy the full dashboard for legacy /api/live-data consumers."""
with _data_lock:
items = list(latest_data.items())
return {key: copy.deepcopy(value) for key, value in items}
"""Deep-copy the full dashboard for /api/health and legacy /api/live-data.
The per-value deepcopy runs OUTSIDE ``_data_lock`` so a large clone cannot
block fetcher writers (#375). The store contract is replace-don't-mutate,
but a writer that mutates a nested object in place (e.g. a live bridge
updating an entry that is also published in this store) can race the
deepcopy and raise ``RuntimeError: dictionary changed size during
iteration`` — surfacing a 500 on the health/live-data path. The racing
mutation window is tiny, so retry a few times rather than fail; a fresh
attempt almost always lands on a quiescent moment. Defense-in-depth on top
of fixing the offending writers, not a substitute for it.
"""
attempts = 4
for attempt in range(attempts):
with _data_lock:
items = list(latest_data.items())
try:
return {key: copy.deepcopy(value) for key, value in items}
except RuntimeError:
if attempt == attempts - 1:
raise
def get_latest_data_subset_refs(*keys: str) -> DashboardData:
+11 -2
View File
@@ -21,12 +21,21 @@ def _merge_sigint_snapshot(
because they include fresher region/channel metadata.
"""
merged = list(live_signals)
# Shallow-copy every entry so the published list owns its own dicts. The
# inputs alias objects that other threads keep mutating in place: live
# signals are the SIGINT bridge's own dicts (updated as packets arrive),
# and api_nodes are the same objects published under latest_data
# ["meshtastic_map_nodes"]. Publishing those references into
# latest_data["sigint"] lets a concurrent mutation race the lock-free
# deepcopy in get_latest_data_deepcopy_snapshot() (/api/health, /api/live-
# data) and raise "dictionary changed size during iteration". Copying
# honors the replace-don't-mutate contract in fetchers/_store.py.
merged = [dict(s) for s in live_signals]
live_callsigns = {s["callsign"] for s in merged if s.get("source") == "meshtastic"}
for node in api_nodes:
if node.get("callsign") in live_callsigns:
continue
merged.append(node)
merged.append(dict(node))
merged.sort(key=lambda item: str(item.get("timestamp", "") or ""), reverse=True)
return merged
+49 -22
View File
@@ -606,8 +606,19 @@ def _build_feature_html(features, fetched_titles=None):
def _enrich_gdelt_titles_background(features, all_article_urls):
"""Background thread: fetch real article titles then update features in-place."""
"""Background thread: fetch real article titles, then publish enriched COPIES.
The ``features`` handed to us were already published into
``latest_data["gdelt"]`` by ``fetch_gdelt()``. Per the store's thread-safety
contract (see ``get_latest_data_subset_refs`` in fetchers/_store.py), HTTP
readers hold live references to these nested ``properties`` dicts and
serialize them OUTSIDE the data lock. Mutating the published dicts in place
here races that serialization and raises
``RuntimeError: dictionary changed size during iteration``. So we enrich
copies and atomically swap the top-level key under the lock instead.
"""
import html as html_mod
from services.fetchers._store import latest_data, _data_lock, _mark_fresh
try:
logger.info(f"[BG] Fetching real article titles for {len(all_article_urls)} URLs...")
@@ -615,28 +626,44 @@ def _enrich_gdelt_titles_background(features, all_article_urls):
fetched_count = sum(1 for v in fetched_titles.values() if v)
logger.info(f"[BG] Resolved {fetched_count}/{len(all_article_urls)} article titles")
# Update features in-place with real titles and snippets
# Build enriched copies — never touch the already-published objects.
enriched = []
for f in features:
urls = f["properties"].get("_urls_list", [])
if not urls:
continue
headlines = []
snippets = []
for u in urls:
real_title = fetched_titles.get(u)
headlines.append(real_title if real_title else _url_to_headline(u))
snippets.append(_article_snippet_cache.get(u) or "")
f["properties"]["_headlines_list"] = headlines
f["properties"]["_snippets_list"] = snippets
links = []
for u, h in zip(urls, headlines):
safe_url = u if u.startswith(("http://", "https://")) else "about:blank"
safe_h = html_mod.escape(h)
links.append(
f'<div style="margin-bottom:6px;"><a href="{safe_url}" target="_blank" rel="noopener noreferrer">{safe_h}</a></div>'
)
f["properties"]["html"] = "".join(links)
logger.info(f"[BG] GDELT title enrichment complete")
nf = dict(f)
props = dict(f.get("properties", {}))
urls = props.get("_urls_list", [])
if urls:
headlines = []
snippets = []
for u in urls:
real_title = fetched_titles.get(u)
headlines.append(real_title if real_title else _url_to_headline(u))
snippets.append(_article_snippet_cache.get(u) or "")
props["_headlines_list"] = headlines
props["_snippets_list"] = snippets
links = []
for u, h in zip(urls, headlines):
safe_url = u if u.startswith(("http://", "https://")) else "about:blank"
safe_h = html_mod.escape(h)
links.append(
f'<div style="margin-bottom:6px;"><a href="{safe_url}" target="_blank" rel="noopener noreferrer">{safe_h}</a></div>'
)
props["html"] = "".join(links)
nf["properties"] = props
enriched.append(nf)
# Atomically publish — but only if a newer fetch_gdelt() hasn't already
# replaced the layer while we were fetching titles (identity guard).
published = False
with _data_lock:
if latest_data.get("gdelt") is features:
latest_data["gdelt"] = enriched
published = True
if published:
_mark_fresh("gdelt")
logger.info(f"[BG] GDELT title enrichment complete ({len(enriched)} features)")
else:
logger.info("[BG] GDELT layer changed under us; skipping stale enrichment swap")
except Exception as e:
logger.error(f"[BG] GDELT title enrichment failed: {e}")
@@ -0,0 +1,71 @@
"""Regression tests for the GDELT background title enrichment.
The background enrichment thread used to mutate the nested ``properties`` dicts
of GDELT features *after* they were already published into
``latest_data["gdelt"]``. HTTP readers serialize those dicts outside the data
lock, so the in-place mutation raced the serializer and raised
``RuntimeError: dictionary changed size during iteration``.
These tests pin the contract: the enrichment must NOT touch the
already-published feature objects, and must instead publish enriched copies via
an atomic swap (with an identity guard so a newer fetch is not clobbered).
"""
from services.fetchers import _store
from services import geopolitics
def _make_feature():
return {
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [0.0, 0.0]},
"properties": {"name": "loc", "_urls_list": ["http://example.test/article-1"]},
}
def test_enrichment_does_not_mutate_published_features(monkeypatch):
feature = _make_feature()
features = [feature]
with _store._data_lock:
_store.latest_data["gdelt"] = features
monkeypatch.setattr(
geopolitics,
"_batch_fetch_titles",
lambda urls: {"http://example.test/article-1": "Real Headline"},
)
geopolitics._enrich_gdelt_titles_background(features, {"http://example.test/article-1"})
# The originally-published feature object must be untouched (no in-place
# mutation of its properties dict — that was the source of the crash).
assert "_headlines_list" not in feature["properties"]
assert "_snippets_list" not in feature["properties"]
# The layer must have been atomically replaced with an enriched COPY.
published = _store.latest_data["gdelt"]
assert published is not features
assert published[0] is not feature
assert published[0]["properties"]["_headlines_list"] == ["Real Headline"]
def test_enrichment_skips_swap_when_layer_replaced(monkeypatch):
feature = _make_feature()
features = [feature]
# Simulate a newer fetch_gdelt() having already replaced the layer while the
# background thread was still resolving titles.
sentinel = [{"properties": {"name": "newer"}}]
with _store._data_lock:
_store.latest_data["gdelt"] = sentinel
monkeypatch.setattr(
geopolitics,
"_batch_fetch_titles",
lambda urls: {"http://example.test/article-1": "Real Headline"},
)
geopolitics._enrich_gdelt_titles_background(features, {"http://example.test/article-1"})
# The identity guard must prevent clobbering the newer layer.
assert _store.latest_data["gdelt"] is sentinel
@@ -0,0 +1,47 @@
"""The full-store snapshot must survive a transient concurrent-mutation race.
``get_latest_data_deepcopy_snapshot`` deep-copies each top-level layer outside
the data lock. If a misbehaving writer mutates a nested object in place during
the copy, ``copy.deepcopy`` raises ``RuntimeError: dictionary changed size
during iteration``. The snapshot retries a few times (the mutation window is
tiny) so /api/health and /api/live-data do not 500 on a transient race.
"""
import copy
from services.fetchers import _store
def test_snapshot_retries_then_succeeds(monkeypatch):
real_deepcopy = copy.deepcopy
calls = {"n": 0}
def flaky_deepcopy(value, *args, **kwargs):
calls["n"] += 1
# Fail only on the very first deepcopy call, then behave normally.
if calls["n"] == 1:
raise RuntimeError("dictionary changed size during iteration")
return real_deepcopy(value, *args, **kwargs)
monkeypatch.setattr(_store.copy, "deepcopy", flaky_deepcopy)
snapshot = _store.get_latest_data_deepcopy_snapshot()
assert isinstance(snapshot, dict)
assert calls["n"] >= 2 # it retried after the simulated race
def test_snapshot_reraises_if_race_never_clears(monkeypatch):
def always_racing(value, *args, **kwargs):
raise RuntimeError("dictionary changed size during iteration")
monkeypatch.setattr(_store.copy, "deepcopy", always_racing)
# A persistent (non-transient) violation is a real bug — surface it rather
# than hang or return corrupt data.
raised = False
try:
_store.get_latest_data_deepcopy_snapshot()
except RuntimeError:
raised = True
assert raised
@@ -0,0 +1,58 @@
"""Regression test for SIGINT snapshot dict aliasing.
``_merge_sigint_snapshot`` used to publish the *same* dict objects it received
into ``latest_data["sigint"]``. Those inputs are owned and mutated in place by
other threads (the SIGINT bridge updating live signals, and the
``meshtastic_map_nodes`` layer), so a concurrent mutation could race the
lock-free deepcopy in ``get_latest_data_deepcopy_snapshot`` (/api/health,
/api/live-data) and raise ``dictionary changed size during iteration``.
The merged snapshot must own copies of every entry.
"""
from services.fetchers.sigint import _merge_sigint_snapshot
def test_merged_entries_are_copies_not_aliases():
live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}]
api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}]
merged = _merge_sigint_snapshot(live, api)
# No published entry may be the *same object* as an input the bridge or the
# meshtastic_map_nodes layer keeps mutating.
inputs = {id(live[0]), id(api[0])}
assert all(id(entry) not in inputs for entry in merged)
def test_mutating_inputs_after_merge_does_not_affect_snapshot():
live = [{"callsign": "LIVE1", "source": "meshtastic", "timestamp": "2"}]
api = [{"callsign": "MAP1", "source": "meshtastic", "from_api": True, "timestamp": "1"}]
merged = _merge_sigint_snapshot(live, api)
# Simulate the bridge adding a key to a live signal after publication — this
# must not change the size of any dict reachable from the published list.
live[0]["region"] = "added-later"
api[0]["channel"] = "added-later"
assert all("region" not in entry for entry in merged)
assert all("channel" not in entry for entry in merged)
def test_merge_preserves_data_and_dedup():
# Live meshtastic observation wins over the map node for the same callsign.
live = [{"callsign": "DUP", "source": "meshtastic", "timestamp": "5"}]
api = [
{"callsign": "DUP", "source": "meshtastic", "from_api": True, "timestamp": "1"},
{"callsign": "OTHER", "source": "meshtastic", "from_api": True, "timestamp": "1"},
]
merged = _merge_sigint_snapshot(live, api)
callsigns = [m["callsign"] for m in merged]
assert callsigns.count("DUP") == 1
assert "OTHER" in callsigns
# The surviving DUP is the live one (no from_api flag).
dup = next(m for m in merged if m["callsign"] == "DUP")
assert not dup.get("from_api")
+68 -8
View File
@@ -171,6 +171,40 @@ function migratePrivacySensitiveBrowserState(): void {
const MAX_FEEDS = 50;
function formatFeedSettingsError(error: unknown, fallback: string): string {
const message = error instanceof Error ? error.message : String(error || '');
if (!message) return fallback;
if (message === 'admin_session_required') {
return 'Admin key required — paste ADMIN_KEY in Settings and unlock operator tools.';
}
if (message === 'backend_unavailable' || message === 'local_control_plane_unavailable') {
return 'Backend unavailable — check that the backend container is running.';
}
if (message === 'control_plane_rate_limited') {
return 'Too many requests — wait a moment and try again.';
}
return message;
}
function validateFeedEntries(feeds: FeedEntry[]): string | null {
for (const [idx, feed] of feeds.entries()) {
const name = feed.name.trim();
const url = feed.url.trim();
if (!name || !url) {
return `Feed ${idx + 1} needs both a name and URL before saving.`;
}
try {
const parsed = new URL(url);
if (!['http:', 'https:'].includes(parsed.protocol)) {
return `Feed ${idx + 1} must use an http:// or https:// URL.`;
}
} catch {
return `Feed ${idx + 1} has an invalid URL.`;
}
}
return null;
}
// Category colors for the tactical UI
const CATEGORY_COLORS: Record<string, string> = {
Aviation: 'text-cyan-400 border-cyan-500/30 bg-cyan-950/20',
@@ -606,7 +640,11 @@ const SettingsPanel = React.memo(function SettingsPanel({
const fetchFeeds = useCallback(async () => {
try {
setFeeds(await controlPlaneJson<FeedEntry[]>('/api/settings/news-feeds'));
setFeeds(
await controlPlaneJson<FeedEntry[]>('/api/settings/news-feeds', {
requireAdminSession: false,
}),
);
setFeedsDirty(false);
return true;
} catch (e) {
@@ -769,11 +807,10 @@ const SettingsPanel = React.memo(function SettingsPanel({
void fetchEnvMeta();
return;
}
if (!adminSessionReady) return;
if (activeTab === 'news-feeds') {
void fetchFeeds();
}
}, [isOpen, adminSessionReady, activeTab, fetchKeys, fetchEnvMeta, fetchFeeds]);
}, [isOpen, activeTab, fetchKeys, fetchEnvMeta, fetchFeeds]);
useEffect(() => {
if (!isOpen || activeTab !== 'protocol' || !showOperatorTools) return;
@@ -828,6 +865,11 @@ const SettingsPanel = React.memo(function SettingsPanel({
};
const saveFeeds = async () => {
const validationError = validateFeedEntries(feeds);
if (validationError) {
setFeedMsg({ type: 'err', text: validationError });
return;
}
setFeedSaving(true);
setFeedMsg(null);
try {
@@ -835,6 +877,7 @@ const SettingsPanel = React.memo(function SettingsPanel({
method: 'PUT',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(feeds),
requireAdminSession: false,
});
if (res.ok) {
setFeedsDirty(false);
@@ -844,28 +887,45 @@ const SettingsPanel = React.memo(function SettingsPanel({
});
} else {
const d = await res.json().catch(() => ({}));
setFeedMsg({ type: 'err', text: d.message || 'Save failed' });
setFeedMsg({
type: 'err',
text: String(d.message || d.detail || 'Save failed'),
});
}
} catch {
setFeedMsg({ type: 'err', text: 'Network error' });
} catch (error) {
setFeedMsg({
type: 'err',
text: formatFeedSettingsError(error, 'Could not reach the settings API'),
});
} finally {
setFeedSaving(false);
}
};
const resetFeeds = async () => {
setFeedMsg(null);
try {
const res = await controlPlaneFetch('/api/settings/news-feeds/reset', {
method: 'POST',
requireAdminSession: false,
});
if (res.ok) {
const d = await res.json();
setFeeds(d.feeds || []);
setFeedsDirty(false);
setFeedMsg({ type: 'ok', text: 'Reset to defaults' });
} else {
const d = await res.json().catch(() => ({}));
setFeedMsg({
type: 'err',
text: String(d.message || d.detail || 'Reset failed'),
});
}
} catch {
setFeedMsg({ type: 'err', text: 'Reset failed' });
} catch (error) {
setFeedMsg({
type: 'err',
text: formatFeedSettingsError(error, 'Could not reach the settings API'),
});
}
};
+39 -33
View File
@@ -633,6 +633,16 @@ function SdrTracker({
);
}
// Earth-imagery overlays are intentionally excluded from bulk toggle — stacking
// GIBS, Sentinel Hub, nightlights, and high-res tiles is redundant/noisy.
const TOGGLE_ALL_EXCLUDED_LAYERS = new Set<string>([
'gibs_imagery',
'highres_satellite',
'sentinel_hub',
'viirs_nightlights',
'road_corridor_trends',
]);
const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
activeLayers,
setActiveLayers,
@@ -730,6 +740,31 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
[needsConsentBeforeEnable],
);
const isAllToggleableLayersOn = useMemo(
() =>
Object.entries(activeLayers)
.filter(([key]) => !TOGGLE_ALL_EXCLUDED_LAYERS.has(key))
.every(([, enabled]) => enabled),
[activeLayers],
);
const toggleAllLayers = useCallback(() => {
const enableAll = () => {
setActiveLayers((prev: ActiveLayers) => {
const next = { ...prev } as ActiveLayers;
for (const key of Object.keys(prev) as Array<keyof ActiveLayers>) {
next[key] = TOGGLE_ALL_EXCLUDED_LAYERS.has(String(key)) ? prev[key] : !isAllToggleableLayersOn;
}
return next;
});
};
if (!isAllToggleableLayersOn) {
withGlobalIncidentsConsent('global_incidents', true, enableAll);
} else {
enableAll();
}
}, [isAllToggleableLayersOn, setActiveLayers, withGlobalIncidentsConsent]);
// Auto-detect: if the backend already has Mode B creds configured
// (via env or a previous runtime save), promote the stored choice to
// 'b_active' without prompting. If it flips back to off, reset so the
@@ -1456,45 +1491,16 @@ const WorldviewLeftPanel = React.memo(function WorldviewLeftPanel({
</div>
<div className="flex items-center gap-2">
<button
title={
Object.entries(activeLayers)
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends'].includes(k))
.every(([, v]) => v)
? 'Disable all layers'
: 'Enable all layers'
}
title={isAllToggleableLayersOn ? 'Disable all layers' : 'Enable all layers'}
className={`${
Object.entries(activeLayers)
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends'].includes(k))
.every(([, v]) => v)
? 'text-cyan-400'
: 'text-[var(--text-muted)]'
isAllToggleableLayersOn ? 'text-cyan-400' : 'text-[var(--text-muted)]'
} hover:text-cyan-400 transition-colors`}
onClick={(e) => {
e.stopPropagation();
const excluded = new Set(['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights', 'road_corridor_trends']);
const allOn = Object.entries(activeLayers)
.filter(([k]) => !excluded.has(k))
.every(([, v]) => v);
const enableAll = () => {
setActiveLayers((prev: ActiveLayers) => {
const next = { ...prev } as ActiveLayers;
for (const k of Object.keys(prev) as Array<keyof ActiveLayers>) {
next[k] = excluded.has(k) ? prev[k] : !allOn;
}
return next;
});
};
if (!allOn) {
withGlobalIncidentsConsent('global_incidents', true, enableAll);
} else {
enableAll();
}
toggleAllLayers();
}}
>
{Object.entries(activeLayers)
.filter(([k]) => !['gibs_imagery', 'highres_satellite', 'sentinel_hub', 'viirs_nightlights'].includes(k))
.every(([, v]) => v) ? (
{isAllToggleableLayersOn ? (
<ToggleRight size={22} />
) : (
<ToggleLeft size={22} />