mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-06-03 21:08:13 +02:00
31ebcb5cd9
Fixes the retry-storm that's been keeping the local node 429'd out of
the seed peer (the diagnosis we ran earlier in the session). Pre-fix:
1. Sync hits the seed peer, gets HTTP 429 (Too Many Requests)
2. _peer_sync_response stringifies the status into a ValueError
3. _sync_from_peer catches it, error becomes the str() of the exc
4. _run_public_sync_cycle calls finish_sync(error=..., failure_backoff_s=60)
5. next_sync_due_at = now + 60s
6. After 60s, sync runs again, hits same upstream that hasn't reset
its rate-limit bucket, 429 again. Loop indefinitely.
Net effect: a node that hit one transient 429 would hammer the seed
every 60s forever, keeping the bucket full and never recovering. We
saw this in the live status dump: consecutive_failures=49,
last_sync_ok_at=0, retry storm sustained over the entire uptime.
What changed
------------
services/mesh/mesh_infonet_sync_support.py
* New typed exception PeerSyncRateLimited carries the parsed
Retry-After value out of the HTTP layer instead of stringifying
everything into a generic ValueError.
* New parse_retry_after_header() handles both RFC 7231 §7.1.3
forms (delay-seconds and HTTP-date). Clamped at 1 hour so a
hostile peer can't silence us for days.
* New _failure_backoff_seconds() helper computes the next delay
as max(exponential, retry_after_s). Schedule with default
base=60s, cap=1800s:
failure 1 -> 60s (preserves pre-fix for transient blips)
failure 2 -> 120s
failure 3 -> 240s
failure 4 -> 480s
failure 5 -> 960s
failure 6+ -> 1800s (capped at 30 min)
cap_s=0 explicitly disables exponential entirely — operators
who want pure-Retry-After behavior have that option.
* finish_sync now accepts retry_after_s and failure_backoff_cap_s
kwargs. Backward-compatible: existing callers that don't pass
retry_after_s get the same first-failure delay as before (the
base value), only repeat failures grow.
main.py
* _peer_sync_response detects 429 specifically, parses the
Retry-After header, raises PeerSyncRateLimited(retry_after_s=N).
Includes the response body prefix in the message so the
operator's last_error finally shows something useful.
* _sync_from_peer extended to return (ok, error, forked,
retry_after_s) — the 4th tuple element is non-zero only when
the upstream sent a parseable Retry-After. Existing call shape
preserved: the lone caller in _run_public_sync_cycle was
updated in the same commit.
* _run_public_sync_cycle forwards retry_after_s into finish_sync.
Tests
-----
backend/tests/mesh/test_infonet_sync_429_backoff.py — 17 new tests:
TestParseRetryAfter (7):
- integer seconds form
- HTTP-date form (computed as seconds-from-now)
- HTTP-date in the past returns 0
- empty / whitespace returns 0
- malformed returns 0
- clamps to 1 hour (hostile-peer cap)
- negative returns 0
TestFailureBackoffSeconds (5):
- exponential growth schedule pins each level
- retry_after wins when larger than exponential
- exponential wins when larger than retry_after
- cap_s=0 disables exponential entirely
- zero inputs return zero
TestFinishSyncBackoff (5):
- first failure uses base unchanged (pre-fix back-compat)
- consecutive_failures actually grow the delay
- retry_after honored at low failure count
- success resets consecutive_failures
- last_error carries the HTTP status / Retry-After detail
All 24 existing sync-support / status-gate tests still pass. Other
failures in tests/mesh/ are pre-existing on origin/main and unrelated
to this change (verified by running the same tests against the
user's main worktree without these edits).
What the operator sees after this lands + a docker rebuild
----------------------------------------------------------
With the live 429 storm we diagnosed:
Pre-fix: consecutive_failures keeps climbing 1/min forever,
last_error empty or generic
Post-fix: consecutive_failures grows, next_sync_due_at backs off
exponentially (max 30 min), last_error explicitly carries
"HTTP 429 from <peer> (retry_after=Ns): <body>" so the
operator can see what's actually wrong. Once the upstream
bucket drains and a sync succeeds, consecutive_failures
resets to 0 and the schedule returns to the normal 300s
interval.
281 lines
9.7 KiB
Python
281 lines
9.7 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from dataclasses import asdict, dataclass
|
|
from email.utils import parsedate_to_datetime
|
|
from datetime import timezone
|
|
|
|
from services.mesh.mesh_peer_store import PeerRecord
|
|
|
|
|
|
class PeerSyncRateLimited(Exception):
|
|
"""Upstream peer returned HTTP 429 — Too Many Requests.
|
|
|
|
Carries the ``Retry-After`` header value (parsed to seconds) so
|
|
the caller can pass it to ``finish_sync(retry_after_s=...)`` and
|
|
actually wait that long instead of hammering the upstream every
|
|
60s and keeping its rate-limit bucket full.
|
|
|
|
``retry_after_s`` is 0 when the upstream didn't provide a header.
|
|
Caller should still apply the exponential backoff in that case.
|
|
"""
|
|
|
|
def __init__(self, message: str, retry_after_s: int = 0, status: int = 429):
|
|
super().__init__(message)
|
|
self.retry_after_s = max(0, int(retry_after_s or 0))
|
|
self.status = int(status or 429)
|
|
|
|
|
|
def parse_retry_after_header(header_value: str, *, now: float | None = None) -> int:
|
|
"""Parse the ``Retry-After`` HTTP header.
|
|
|
|
Two valid forms per RFC 7231 §7.1.3:
|
|
|
|
* Delay-seconds: a non-negative integer (e.g. ``Retry-After: 120``)
|
|
* HTTP-date: an absolute time (e.g. ``Retry-After: Wed, 21 Oct 2026 07:28:00 GMT``)
|
|
|
|
Returns the wait in **seconds from now**. Unparseable / empty headers
|
|
return 0 (caller falls back to exponential backoff). Clamped at a
|
|
sane upper bound (1 hour) so a typo'd or hostile peer can't pin us
|
|
silent for days.
|
|
"""
|
|
value = str(header_value or "").strip()
|
|
if not value:
|
|
return 0
|
|
upper_bound = 3600 # never trust a peer to silence us > 1h
|
|
# Form 1: pure integer seconds.
|
|
if value.isdigit():
|
|
return min(max(0, int(value)), upper_bound)
|
|
# Form 2: HTTP-date.
|
|
try:
|
|
target = parsedate_to_datetime(value)
|
|
if target is None:
|
|
return 0
|
|
if target.tzinfo is None:
|
|
target = target.replace(tzinfo=timezone.utc)
|
|
current = float(now if now is not None else time.time())
|
|
delta = int(target.timestamp() - current)
|
|
return min(max(0, delta), upper_bound)
|
|
except (TypeError, ValueError):
|
|
return 0
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SyncWorkerState:
|
|
last_sync_started_at: int = 0
|
|
last_sync_finished_at: int = 0
|
|
last_sync_ok_at: int = 0
|
|
next_sync_due_at: int = 0
|
|
last_peer_url: str = ""
|
|
last_error: str = ""
|
|
last_outcome: str = "idle"
|
|
current_head: str = ""
|
|
fork_detected: bool = False
|
|
consecutive_failures: int = 0
|
|
|
|
def to_dict(self) -> dict[str, object]:
|
|
return asdict(self)
|
|
|
|
|
|
def eligible_sync_peers(records: list[PeerRecord], *, now: float | None = None) -> list[PeerRecord]:
|
|
current_time = int(now if now is not None else time.time())
|
|
candidates = [
|
|
record
|
|
for record in records
|
|
if record.bucket == "sync" and record.enabled and int(record.cooldown_until or 0) <= current_time
|
|
]
|
|
|
|
def _seed_priority(record: PeerRecord) -> int:
|
|
role = str(record.role or "").strip().lower()
|
|
source = str(record.source or "").strip().lower()
|
|
if role == "seed" and source in {"bundle", "bootstrap_promoted"}:
|
|
return 0
|
|
return 1
|
|
|
|
return sorted(
|
|
candidates,
|
|
key=lambda record: (
|
|
-int(record.last_sync_ok_at or 0),
|
|
_seed_priority(record),
|
|
int(record.failure_count or 0),
|
|
int(record.added_at or 0),
|
|
record.peer_url,
|
|
),
|
|
)
|
|
|
|
|
|
def begin_sync(
|
|
state: SyncWorkerState,
|
|
*,
|
|
peer_url: str = "",
|
|
current_head: str = "",
|
|
now: float | None = None,
|
|
) -> SyncWorkerState:
|
|
timestamp = int(now if now is not None else time.time())
|
|
return SyncWorkerState(
|
|
last_sync_started_at=timestamp,
|
|
last_sync_finished_at=state.last_sync_finished_at,
|
|
last_sync_ok_at=state.last_sync_ok_at,
|
|
next_sync_due_at=state.next_sync_due_at,
|
|
last_peer_url=peer_url or state.last_peer_url,
|
|
last_error="",
|
|
last_outcome="running",
|
|
current_head=current_head or state.current_head,
|
|
fork_detected=False,
|
|
consecutive_failures=state.consecutive_failures,
|
|
)
|
|
|
|
|
|
def _failure_backoff_seconds(
|
|
*,
|
|
base_backoff_s: int,
|
|
consecutive_failures: int,
|
|
retry_after_s: int,
|
|
cap_s: int = 1800,
|
|
) -> int:
|
|
"""Compute the next-attempt delay after a failed sync.
|
|
|
|
Two inputs combine:
|
|
|
|
* ``retry_after_s`` — when an upstream peer answered HTTP 429
|
|
with a ``Retry-After`` header, we honor it exactly. Continuing
|
|
to hammer the upstream every 60s is the bug this fix exists to
|
|
close: it keeps the upstream's rate-limit bucket full
|
|
indefinitely and no sync ever lands.
|
|
|
|
* Exponential growth on ``consecutive_failures`` — even without an
|
|
explicit Retry-After, repeated failures should slow us down. The
|
|
first failure waits ``base`` (preserves pre-fix behavior for
|
|
one-off blips). Each subsequent failure doubles the wait, capped
|
|
to ``cap_s`` (default 30 minutes). With base=60 and cap=1800,
|
|
the schedule is 60s → 120s → 240s → 480s → 960s → 1800s →
|
|
1800s → … .
|
|
|
|
The actual delay is the MAX of the two — whichever asks for more
|
|
patience wins. ``retry_after_s == 0`` (no header) falls back to
|
|
pure exponential. An aggressive ``Retry-After`` (say 600s while
|
|
we're only at 1 failure) wins over the exponential ladder.
|
|
"""
|
|
base = max(0, int(base_backoff_s or 0))
|
|
failures = max(0, int(consecutive_failures or 0))
|
|
cap = max(0, int(cap_s or 0))
|
|
retry_after = max(0, int(retry_after_s or 0))
|
|
# ``cap_s=0`` explicitly disables the exponential ladder entirely
|
|
# — operators who want the pre-fix "honor Retry-After only" behavior
|
|
# can set this. The default cap of 1800s is what saturates the
|
|
# ladder at the 5th-6th failure for base=60.
|
|
if cap == 0:
|
|
return retry_after
|
|
# 2^(failures-1) — so failure #1 = base (preserves the pre-fix
|
|
# default for transient blips), failure #2 = 2*base, etc. Cap on
|
|
# the exponent (16) is defense against integer overflow on a
|
|
# hostile or very large failures counter.
|
|
if base > 0 and failures > 0:
|
|
exponent = min(max(0, failures - 1), 16)
|
|
grown = base * (2 ** exponent)
|
|
else:
|
|
grown = 0
|
|
exponential = min(max(0, grown), cap)
|
|
return max(exponential, retry_after)
|
|
|
|
|
|
def finish_sync(
|
|
state: SyncWorkerState,
|
|
*,
|
|
ok: bool,
|
|
peer_url: str = "",
|
|
current_head: str = "",
|
|
error: str = "",
|
|
fork_detected: bool = False,
|
|
now: float | None = None,
|
|
interval_s: int = 300,
|
|
failure_backoff_s: int = 60,
|
|
retry_after_s: int = 0,
|
|
failure_backoff_cap_s: int = 1800,
|
|
) -> SyncWorkerState:
|
|
"""Finalise a sync attempt and compute when the next one should run.
|
|
|
|
New args (added for the 429 retry storm fix):
|
|
|
|
* ``retry_after_s`` — if the peer responded with HTTP 429 + a
|
|
``Retry-After`` header, pass that value here. ``finish_sync``
|
|
will use ``max(exponential, retry_after_s)`` for the delay so
|
|
we never hammer a peer that asked us to back off.
|
|
* ``failure_backoff_cap_s`` — upper bound on the exponential
|
|
ladder. Default 1800 (30 min) — keeps a sync queue from going
|
|
silent for hours while still cutting the request rate to
|
|
something the upstream can absorb.
|
|
|
|
The pre-fix behavior (constant 60s on every failure) is recoverable
|
|
by passing ``failure_backoff_cap_s=0`` and ``retry_after_s=0``, but
|
|
there's no reason to.
|
|
"""
|
|
timestamp = int(now if now is not None else time.time())
|
|
if ok:
|
|
return SyncWorkerState(
|
|
last_sync_started_at=state.last_sync_started_at,
|
|
last_sync_finished_at=timestamp,
|
|
last_sync_ok_at=timestamp,
|
|
next_sync_due_at=timestamp + max(0, int(interval_s or 0)),
|
|
last_peer_url=peer_url or state.last_peer_url,
|
|
last_error="",
|
|
last_outcome="ok",
|
|
current_head=current_head or state.current_head,
|
|
fork_detected=bool(fork_detected),
|
|
consecutive_failures=0,
|
|
)
|
|
|
|
next_failures = state.consecutive_failures + 1
|
|
delay_s = _failure_backoff_seconds(
|
|
base_backoff_s=failure_backoff_s,
|
|
consecutive_failures=next_failures,
|
|
retry_after_s=retry_after_s,
|
|
cap_s=failure_backoff_cap_s,
|
|
)
|
|
|
|
return SyncWorkerState(
|
|
last_sync_started_at=state.last_sync_started_at,
|
|
last_sync_finished_at=timestamp,
|
|
last_sync_ok_at=state.last_sync_ok_at,
|
|
next_sync_due_at=timestamp + delay_s,
|
|
last_peer_url=peer_url or state.last_peer_url,
|
|
last_error=str(error or "").strip(),
|
|
last_outcome="fork" if fork_detected else "error",
|
|
current_head=current_head or state.current_head,
|
|
fork_detected=bool(fork_detected),
|
|
consecutive_failures=next_failures,
|
|
)
|
|
|
|
|
|
def finish_solo_sync(
|
|
state: SyncWorkerState,
|
|
*,
|
|
current_head: str = "",
|
|
now: float | None = None,
|
|
interval_s: int = 300,
|
|
) -> SyncWorkerState:
|
|
timestamp = int(now if now is not None else time.time())
|
|
return SyncWorkerState(
|
|
last_sync_started_at=state.last_sync_started_at,
|
|
last_sync_finished_at=timestamp,
|
|
last_sync_ok_at=state.last_sync_ok_at,
|
|
next_sync_due_at=timestamp + max(0, int(interval_s or 0)),
|
|
last_peer_url="",
|
|
last_error="",
|
|
last_outcome="solo",
|
|
current_head=current_head or state.current_head,
|
|
fork_detected=False,
|
|
consecutive_failures=0,
|
|
)
|
|
|
|
|
|
def should_run_sync(
|
|
state: SyncWorkerState,
|
|
*,
|
|
now: float | None = None,
|
|
) -> bool:
|
|
current_time = int(now if now is not None else time.time())
|
|
if state.last_outcome == "running":
|
|
return False
|
|
return int(state.next_sync_due_at or 0) <= current_time
|