Files
anoracleofra-code 668ce16dc7 v0.9.6: InfoNet hashchain, Wormhole gate encryption, mesh reputation, 16 community contributors
Gate messages now propagate via the Infonet hashchain as encrypted blobs — every node syncs them
through normal chain sync while only Gate members with MLS keys can decrypt. Added mesh reputation
system, peer push workers, voluntary Wormhole opt-in for node participation, fork recovery,
killwormhole scripts, obfuscated terminology, and hardened the self-updater to protect encryption
keys and chain state during updates.

New features: Shodan search, train tracking, Sentinel Hub imagery, 8 new intelligence layers,
CCTV expansion to 11,000+ cameras across 6 countries, Mesh Terminal CLI, prediction markets,
desktop-shell scaffold, and comprehensive mesh test suite (215 frontend + backend tests passing).

Community contributors: @wa1id, @AlborzNazari, @adust09, @Xpirix, @imqdcr, @csysp, @suranyami,
@chr0n1x, @johan-martensson, @singularfailure, @smithbh, @OrfeoTerkuci, @deuza, @tm-const,
@Elhard1, @ttulttul
2026-03-26 05:58:04 -06:00

95 lines
2.9 KiB
Python

"""Fetch health registry — tracks per-source success/failure counts and timings."""
import logging
import threading
from datetime import datetime
from typing import Any, Dict, Optional
from services.fetchers._store import _data_lock, source_freshness
logger = logging.getLogger(__name__)
_health: Dict[str, Dict[str, Any]] = {}
_lock = threading.Lock()
def _now_iso() -> str:
return datetime.utcnow().isoformat()
def _update_source_freshness(source: str, *, ok: bool, error_msg: Optional[str] = None):
"""Mirror health summary into shared store for visibility."""
with _data_lock:
entry = source_freshness.get(source, {})
if ok:
entry["last_ok"] = _now_iso()
else:
entry["last_error"] = _now_iso()
if error_msg:
entry["last_error_msg"] = error_msg[:200]
source_freshness[source] = entry
def record_success(source: str, duration_s: Optional[float] = None, count: Optional[int] = None):
"""Record a successful fetch for a source."""
now = _now_iso()
with _lock:
entry = _health.setdefault(
source,
{
"ok_count": 0,
"error_count": 0,
"last_ok": None,
"last_error": None,
"last_error_msg": None,
"last_duration_ms": None,
"avg_duration_ms": None,
"last_count": None,
},
)
entry["ok_count"] += 1
entry["last_ok"] = now
if duration_s is not None:
dur_ms = round(duration_s * 1000, 1)
entry["last_duration_ms"] = dur_ms
prev_avg = entry["avg_duration_ms"] or 0.0
n = entry["ok_count"]
entry["avg_duration_ms"] = round(((prev_avg * (n - 1)) + dur_ms) / n, 1)
if count is not None:
entry["last_count"] = count
_update_source_freshness(source, ok=True)
def record_failure(source: str, error: Exception, duration_s: Optional[float] = None):
"""Record a failed fetch for a source."""
now = _now_iso()
err_msg = str(error)
with _lock:
entry = _health.setdefault(
source,
{
"ok_count": 0,
"error_count": 0,
"last_ok": None,
"last_error": None,
"last_error_msg": None,
"last_duration_ms": None,
"avg_duration_ms": None,
"last_count": None,
},
)
entry["error_count"] += 1
entry["last_error"] = now
entry["last_error_msg"] = err_msg[:200]
if duration_s is not None:
entry["last_duration_ms"] = round(duration_s * 1000, 1)
_update_source_freshness(source, ok=False, error_msg=err_msg)
def get_health_snapshot() -> Dict[str, Dict[str, Any]]:
"""Return a snapshot of current fetch health state."""
with _lock:
return {k: dict(v) for k, v in _health.items()}