Files
anoracleofra-code 668ce16dc7 v0.9.6: InfoNet hashchain, Wormhole gate encryption, mesh reputation, 16 community contributors
Gate messages now propagate via the Infonet hashchain as encrypted blobs — every node syncs them
through normal chain sync while only Gate members with MLS keys can decrypt. Added mesh reputation
system, peer push workers, voluntary Wormhole opt-in for node participation, fork recovery,
killwormhole scripts, obfuscated terminology, and hardened the self-updater to protect encryption
keys and chain state during updates.

New features: Shodan search, train tracking, Sentinel Hub imagery, 8 new intelligence layers,
CCTV expansion to 11,000+ cameras across 6 countries, Mesh Terminal CLI, prediction markets,
desktop-shell scaffold, and comprehensive mesh test suite (215 frontend + backend tests passing).

Community contributors: @wa1id, @AlborzNazari, @adust09, @Xpirix, @imqdcr, @csysp, @suranyami,
@chr0n1x, @johan-martensson, @singularfailure, @smithbh, @OrfeoTerkuci, @deuza, @tm-const,
@Elhard1, @ttulttul
2026-03-26 05:58:04 -06:00

126 lines
3.9 KiB
Python

"""Tests for the shared in-memory data store."""
import threading
import time
import pytest
from services.fetchers._store import latest_data, source_timestamps, _mark_fresh, _data_lock
class TestLatestDataStructure:
"""Verify the store has the expected keys and default values."""
def test_has_all_required_keys(self):
expected_keys = {
"last_updated",
"news",
"stocks",
"oil",
"flights",
"ships",
"military_flights",
"tracked_flights",
"cctv",
"weather",
"earthquakes",
"uavs",
"frontlines",
"gdelt",
"liveuamap",
"kiwisdr",
"space_weather",
"internet_outages",
"firms_fires",
"datacenters",
}
assert expected_keys.issubset(set(latest_data.keys()))
def test_list_keys_default_to_empty_list(self):
list_keys = [
"news",
"flights",
"ships",
"military_flights",
"tracked_flights",
"cctv",
"earthquakes",
"uavs",
"gdelt",
"liveuamap",
"kiwisdr",
"internet_outages",
"firms_fires",
"datacenters",
]
for key in list_keys:
assert isinstance(latest_data[key], list), f"{key} should default to list"
def test_dict_keys_default_to_empty_dict(self):
dict_keys = ["stocks", "oil"]
for key in dict_keys:
assert isinstance(latest_data[key], dict), f"{key} should default to dict"
class TestMarkFresh:
"""Tests for _mark_fresh timestamp helper."""
def test_records_timestamp_for_single_key(self):
_mark_fresh("test_key_1")
assert "test_key_1" in source_timestamps
assert isinstance(source_timestamps["test_key_1"], str)
def test_records_timestamps_for_multiple_keys(self):
_mark_fresh("multi_a", "multi_b", "multi_c")
assert "multi_a" in source_timestamps
assert "multi_b" in source_timestamps
assert "multi_c" in source_timestamps
def test_timestamps_are_iso_format(self):
_mark_fresh("iso_test")
ts = source_timestamps["iso_test"]
# ISO format: YYYY-MM-DDTHH:MM:SS.ffffff
assert "T" in ts
assert len(ts) >= 19 # At least YYYY-MM-DDTHH:MM:SS
def test_successive_calls_update_timestamp(self):
_mark_fresh("update_test")
ts1 = source_timestamps["update_test"]
time.sleep(0.01)
_mark_fresh("update_test")
ts2 = source_timestamps["update_test"]
assert ts2 >= ts1
class TestDataLock:
"""Verify the data lock works for thread safety."""
def test_lock_exists_and_is_a_lock(self):
assert isinstance(_data_lock, type(threading.Lock()))
def test_concurrent_writes_dont_corrupt(self):
"""Simulate concurrent writes to latest_data through the lock."""
errors = []
def writer(key, value, iterations=100):
try:
for _ in range(iterations):
with _data_lock:
latest_data[key] = value
# Read back immediately — should be our value
assert latest_data[key] == value
except Exception as e:
errors.append(e)
threads = [
threading.Thread(target=writer, args=("test_concurrent", [1, 2, 3])),
threading.Thread(target=writer, args=("test_concurrent", [4, 5, 6])),
threading.Thread(target=writer, args=("test_concurrent", [7, 8, 9])),
]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0, f"Thread safety errors: {errors}"
# Restore default
latest_data["test_concurrent"] = []