mirror of
https://github.com/BigBodyCobain/Shadowbroker.git
synced 2026-05-28 10:01:31 +02:00
Infonet sync: honor HTTP 429 Retry-After + exponential backoff
Fixes the retry-storm that's been keeping the local node 429'd out of
the seed peer (the diagnosis we ran earlier in the session). Pre-fix:
1. Sync hits the seed peer, gets HTTP 429 (Too Many Requests)
2. _peer_sync_response stringifies the status into a ValueError
3. _sync_from_peer catches it, error becomes the str() of the exc
4. _run_public_sync_cycle calls finish_sync(error=..., failure_backoff_s=60)
5. next_sync_due_at = now + 60s
6. After 60s, sync runs again, hits same upstream that hasn't reset
its rate-limit bucket, 429 again. Loop indefinitely.
Net effect: a node that hit one transient 429 would hammer the seed
every 60s forever, keeping the bucket full and never recovering. We
saw this in the live status dump: consecutive_failures=49,
last_sync_ok_at=0, retry storm sustained over the entire uptime.
What changed
------------
services/mesh/mesh_infonet_sync_support.py
* New typed exception PeerSyncRateLimited carries the parsed
Retry-After value out of the HTTP layer instead of stringifying
everything into a generic ValueError.
* New parse_retry_after_header() handles both RFC 7231 §7.1.3
forms (delay-seconds and HTTP-date). Clamped at 1 hour so a
hostile peer can't silence us for days.
* New _failure_backoff_seconds() helper computes the next delay
as max(exponential, retry_after_s). Schedule with default
base=60s, cap=1800s:
failure 1 -> 60s (preserves pre-fix for transient blips)
failure 2 -> 120s
failure 3 -> 240s
failure 4 -> 480s
failure 5 -> 960s
failure 6+ -> 1800s (capped at 30 min)
cap_s=0 explicitly disables exponential entirely — operators
who want pure-Retry-After behavior have that option.
* finish_sync now accepts retry_after_s and failure_backoff_cap_s
kwargs. Backward-compatible: existing callers that don't pass
retry_after_s get the same first-failure delay as before (the
base value), only repeat failures grow.
main.py
* _peer_sync_response detects 429 specifically, parses the
Retry-After header, raises PeerSyncRateLimited(retry_after_s=N).
Includes the response body prefix in the message so the
operator's last_error finally shows something useful.
* _sync_from_peer extended to return (ok, error, forked,
retry_after_s) — the 4th tuple element is non-zero only when
the upstream sent a parseable Retry-After. Existing call shape
preserved: the lone caller in _run_public_sync_cycle was
updated in the same commit.
* _run_public_sync_cycle forwards retry_after_s into finish_sync.
Tests
-----
backend/tests/mesh/test_infonet_sync_429_backoff.py — 17 new tests:
TestParseRetryAfter (7):
- integer seconds form
- HTTP-date form (computed as seconds-from-now)
- HTTP-date in the past returns 0
- empty / whitespace returns 0
- malformed returns 0
- clamps to 1 hour (hostile-peer cap)
- negative returns 0
TestFailureBackoffSeconds (5):
- exponential growth schedule pins each level
- retry_after wins when larger than exponential
- exponential wins when larger than retry_after
- cap_s=0 disables exponential entirely
- zero inputs return zero
TestFinishSyncBackoff (5):
- first failure uses base unchanged (pre-fix back-compat)
- consecutive_failures actually grow the delay
- retry_after honored at low failure count
- success resets consecutive_failures
- last_error carries the HTTP status / Retry-After detail
All 24 existing sync-support / status-gate tests still pass. Other
failures in tests/mesh/ are pre-existing on origin/main and unrelated
to this change (verified by running the same tests against the
user's main worktree without these edits).
What the operator sees after this lands + a docker rebuild
----------------------------------------------------------
With the live 429 storm we diagnosed:
Pre-fix: consecutive_failures keeps climbing 1/min forever,
last_error empty or generic
Post-fix: consecutive_failures grows, next_sync_due_at backs off
exponentially (max 30 min), last_error explicitly carries
"HTTP 429 from <peer> (retry_after=Ns): <body>" so the
operator can see what's actually wrong. Once the upstream
bucket drains and a sync succeeds, consecutive_failures
resets to 0 and the schedule returns to the normal 300s
interval.
This commit is contained in:
+59
-10
@@ -1417,6 +1417,29 @@ def _peer_sync_response(peer_url: str, body: dict[str, Any]) -> dict[str, Any]:
|
||||
proxy = f"socks5h://127.0.0.1:{socks_port}"
|
||||
kwargs["proxies"] = {"http": proxy, "https": proxy}
|
||||
response = _requests.post(f"{normalized}/api/mesh/infonet/sync", **kwargs)
|
||||
# HTTP 429 must be surfaced as a typed exception carrying the
|
||||
# Retry-After value, so finish_sync can honor it and stop hammering
|
||||
# the upstream. Pre-fix this path just stringified the status into
|
||||
# a ValueError, which finish_sync then ignored — keeping the
|
||||
# upstream's rate-limit bucket full indefinitely.
|
||||
if response.status_code == 429:
|
||||
from services.mesh.mesh_infonet_sync_support import (
|
||||
PeerSyncRateLimited,
|
||||
parse_retry_after_header,
|
||||
)
|
||||
|
||||
retry_after_s = parse_retry_after_header(
|
||||
response.headers.get("Retry-After", "") or "",
|
||||
)
|
||||
try:
|
||||
body_text = response.text[:200]
|
||||
except Exception:
|
||||
body_text = ""
|
||||
raise PeerSyncRateLimited(
|
||||
f"HTTP 429 from {normalized} (retry_after={retry_after_s}s): {body_text}",
|
||||
retry_after_s=retry_after_s,
|
||||
status=429,
|
||||
)
|
||||
try:
|
||||
payload = response.json()
|
||||
except Exception as exc:
|
||||
@@ -1462,8 +1485,23 @@ def _hydrate_gate_store_from_chain(events: list[dict]) -> int:
|
||||
return count
|
||||
|
||||
|
||||
def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5) -> tuple[bool, str, bool]:
|
||||
def _sync_from_peer(
|
||||
peer_url: str,
|
||||
*,
|
||||
page_limit: int = 100,
|
||||
max_rounds: int = 5,
|
||||
) -> tuple[bool, str, bool, int]:
|
||||
"""Sync the local Infonet chain against ``peer_url``.
|
||||
|
||||
Returns ``(ok, error, forked, retry_after_s)``. The fourth tuple
|
||||
element is non-zero only when the peer responded with HTTP 429
|
||||
and supplied a parseable ``Retry-After`` header — see the typed
|
||||
``PeerSyncRateLimited`` exception in mesh_infonet_sync_support.py.
|
||||
Callers should pass that value to ``finish_sync(retry_after_s=...)``
|
||||
so the next attempt actually waits.
|
||||
"""
|
||||
from services.mesh.mesh_hashchain import infonet
|
||||
from services.mesh.mesh_infonet_sync_support import PeerSyncRateLimited
|
||||
|
||||
rounds = 0
|
||||
while rounds < max_rounds:
|
||||
@@ -1472,7 +1510,11 @@ def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5
|
||||
"locator": infonet.get_locator(),
|
||||
"limit": page_limit,
|
||||
}
|
||||
payload = _peer_sync_response(peer_url, body)
|
||||
try:
|
||||
payload = _peer_sync_response(peer_url, body)
|
||||
except PeerSyncRateLimited as exc:
|
||||
# Bubble up the retry-after so finish_sync can honor it.
|
||||
return False, str(exc), False, exc.retry_after_s
|
||||
if bool(payload.get("forked")):
|
||||
# Auto-recover small local forks: if the local chain is tiny
|
||||
# (< 20 events) and the remote has a longer chain, reset local
|
||||
@@ -1488,23 +1530,23 @@ def _sync_from_peer(peer_url: str, *, page_limit: int = 100, max_rounds: int = 5
|
||||
)
|
||||
infonet.reset_chain()
|
||||
continue # retry sync with clean genesis locator
|
||||
return False, "fork detected", True
|
||||
return False, "fork detected", True, 0
|
||||
events = payload.get("events", [])
|
||||
if not isinstance(events, list):
|
||||
return False, "peer sync events must be a list", False
|
||||
return False, "peer sync events must be a list", False, 0
|
||||
if not events:
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
result = infonet.ingest_events(events)
|
||||
_hydrate_gate_store_from_chain(events)
|
||||
rejected = list(result.get("rejected", []) or [])
|
||||
if rejected:
|
||||
return False, f"sync ingest rejected {len(rejected)} event(s)", False
|
||||
return False, f"sync ingest rejected {len(rejected)} event(s)", False, 0
|
||||
if int(result.get("accepted", 0) or 0) == 0 and int(result.get("duplicates", 0) or 0) >= len(events):
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
if len(events) < page_limit:
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
rounds += 1
|
||||
return True, "", False
|
||||
return True, "", False, 0
|
||||
|
||||
|
||||
def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
@@ -1567,11 +1609,12 @@ def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
with _NODE_RUNTIME_LOCK:
|
||||
set_sync_state(started)
|
||||
try:
|
||||
ok, error, forked = _sync_from_peer(record.peer_url)
|
||||
ok, error, forked, retry_after_s = _sync_from_peer(record.peer_url)
|
||||
except Exception as exc:
|
||||
ok = False
|
||||
error = str(exc or type(exc).__name__)
|
||||
forked = False
|
||||
retry_after_s = 0
|
||||
if ok:
|
||||
store.mark_seen(record.peer_url, "sync", now=time.time())
|
||||
store.mark_sync_success(record.peer_url, now=time.time())
|
||||
@@ -1618,6 +1661,12 @@ def _run_public_sync_cycle() -> SyncWorkerState:
|
||||
now=time.time(),
|
||||
interval_s=int(get_settings().MESH_SYNC_INTERVAL_S or 300),
|
||||
failure_backoff_s=failure_backoff_s,
|
||||
# 429 retry-storm fix: when the peer returned HTTP 429 with
|
||||
# a Retry-After header, finish_sync uses max(exponential,
|
||||
# retry_after) for next_sync_due_at — so we actually wait
|
||||
# the time the upstream asked for instead of hammering
|
||||
# every 60s and keeping its rate-limit bucket full forever.
|
||||
retry_after_s=retry_after_s,
|
||||
)
|
||||
with _NODE_RUNTIME_LOCK:
|
||||
set_sync_state(updated)
|
||||
|
||||
@@ -2,10 +2,64 @@ from __future__ import annotations
|
||||
|
||||
import time
|
||||
from dataclasses import asdict, dataclass
|
||||
from email.utils import parsedate_to_datetime
|
||||
from datetime import timezone
|
||||
|
||||
from services.mesh.mesh_peer_store import PeerRecord
|
||||
|
||||
|
||||
class PeerSyncRateLimited(Exception):
|
||||
"""Upstream peer returned HTTP 429 — Too Many Requests.
|
||||
|
||||
Carries the ``Retry-After`` header value (parsed to seconds) so
|
||||
the caller can pass it to ``finish_sync(retry_after_s=...)`` and
|
||||
actually wait that long instead of hammering the upstream every
|
||||
60s and keeping its rate-limit bucket full.
|
||||
|
||||
``retry_after_s`` is 0 when the upstream didn't provide a header.
|
||||
Caller should still apply the exponential backoff in that case.
|
||||
"""
|
||||
|
||||
def __init__(self, message: str, retry_after_s: int = 0, status: int = 429):
|
||||
super().__init__(message)
|
||||
self.retry_after_s = max(0, int(retry_after_s or 0))
|
||||
self.status = int(status or 429)
|
||||
|
||||
|
||||
def parse_retry_after_header(header_value: str, *, now: float | None = None) -> int:
|
||||
"""Parse the ``Retry-After`` HTTP header.
|
||||
|
||||
Two valid forms per RFC 7231 §7.1.3:
|
||||
|
||||
* Delay-seconds: a non-negative integer (e.g. ``Retry-After: 120``)
|
||||
* HTTP-date: an absolute time (e.g. ``Retry-After: Wed, 21 Oct 2026 07:28:00 GMT``)
|
||||
|
||||
Returns the wait in **seconds from now**. Unparseable / empty headers
|
||||
return 0 (caller falls back to exponential backoff). Clamped at a
|
||||
sane upper bound (1 hour) so a typo'd or hostile peer can't pin us
|
||||
silent for days.
|
||||
"""
|
||||
value = str(header_value or "").strip()
|
||||
if not value:
|
||||
return 0
|
||||
upper_bound = 3600 # never trust a peer to silence us > 1h
|
||||
# Form 1: pure integer seconds.
|
||||
if value.isdigit():
|
||||
return min(max(0, int(value)), upper_bound)
|
||||
# Form 2: HTTP-date.
|
||||
try:
|
||||
target = parsedate_to_datetime(value)
|
||||
if target is None:
|
||||
return 0
|
||||
if target.tzinfo is None:
|
||||
target = target.replace(tzinfo=timezone.utc)
|
||||
current = float(now if now is not None else time.time())
|
||||
delta = int(target.timestamp() - current)
|
||||
return min(max(0, delta), upper_bound)
|
||||
except (TypeError, ValueError):
|
||||
return 0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class SyncWorkerState:
|
||||
last_sync_started_at: int = 0
|
||||
@@ -72,6 +126,59 @@ def begin_sync(
|
||||
)
|
||||
|
||||
|
||||
def _failure_backoff_seconds(
|
||||
*,
|
||||
base_backoff_s: int,
|
||||
consecutive_failures: int,
|
||||
retry_after_s: int,
|
||||
cap_s: int = 1800,
|
||||
) -> int:
|
||||
"""Compute the next-attempt delay after a failed sync.
|
||||
|
||||
Two inputs combine:
|
||||
|
||||
* ``retry_after_s`` — when an upstream peer answered HTTP 429
|
||||
with a ``Retry-After`` header, we honor it exactly. Continuing
|
||||
to hammer the upstream every 60s is the bug this fix exists to
|
||||
close: it keeps the upstream's rate-limit bucket full
|
||||
indefinitely and no sync ever lands.
|
||||
|
||||
* Exponential growth on ``consecutive_failures`` — even without an
|
||||
explicit Retry-After, repeated failures should slow us down. The
|
||||
first failure waits ``base`` (preserves pre-fix behavior for
|
||||
one-off blips). Each subsequent failure doubles the wait, capped
|
||||
to ``cap_s`` (default 30 minutes). With base=60 and cap=1800,
|
||||
the schedule is 60s → 120s → 240s → 480s → 960s → 1800s →
|
||||
1800s → … .
|
||||
|
||||
The actual delay is the MAX of the two — whichever asks for more
|
||||
patience wins. ``retry_after_s == 0`` (no header) falls back to
|
||||
pure exponential. An aggressive ``Retry-After`` (say 600s while
|
||||
we're only at 1 failure) wins over the exponential ladder.
|
||||
"""
|
||||
base = max(0, int(base_backoff_s or 0))
|
||||
failures = max(0, int(consecutive_failures or 0))
|
||||
cap = max(0, int(cap_s or 0))
|
||||
retry_after = max(0, int(retry_after_s or 0))
|
||||
# ``cap_s=0`` explicitly disables the exponential ladder entirely
|
||||
# — operators who want the pre-fix "honor Retry-After only" behavior
|
||||
# can set this. The default cap of 1800s is what saturates the
|
||||
# ladder at the 5th-6th failure for base=60.
|
||||
if cap == 0:
|
||||
return retry_after
|
||||
# 2^(failures-1) — so failure #1 = base (preserves the pre-fix
|
||||
# default for transient blips), failure #2 = 2*base, etc. Cap on
|
||||
# the exponent (16) is defense against integer overflow on a
|
||||
# hostile or very large failures counter.
|
||||
if base > 0 and failures > 0:
|
||||
exponent = min(max(0, failures - 1), 16)
|
||||
grown = base * (2 ** exponent)
|
||||
else:
|
||||
grown = 0
|
||||
exponential = min(max(0, grown), cap)
|
||||
return max(exponential, retry_after)
|
||||
|
||||
|
||||
def finish_sync(
|
||||
state: SyncWorkerState,
|
||||
*,
|
||||
@@ -83,7 +190,26 @@ def finish_sync(
|
||||
now: float | None = None,
|
||||
interval_s: int = 300,
|
||||
failure_backoff_s: int = 60,
|
||||
retry_after_s: int = 0,
|
||||
failure_backoff_cap_s: int = 1800,
|
||||
) -> SyncWorkerState:
|
||||
"""Finalise a sync attempt and compute when the next one should run.
|
||||
|
||||
New args (added for the 429 retry storm fix):
|
||||
|
||||
* ``retry_after_s`` — if the peer responded with HTTP 429 + a
|
||||
``Retry-After`` header, pass that value here. ``finish_sync``
|
||||
will use ``max(exponential, retry_after_s)`` for the delay so
|
||||
we never hammer a peer that asked us to back off.
|
||||
* ``failure_backoff_cap_s`` — upper bound on the exponential
|
||||
ladder. Default 1800 (30 min) — keeps a sync queue from going
|
||||
silent for hours while still cutting the request rate to
|
||||
something the upstream can absorb.
|
||||
|
||||
The pre-fix behavior (constant 60s on every failure) is recoverable
|
||||
by passing ``failure_backoff_cap_s=0`` and ``retry_after_s=0``, but
|
||||
there's no reason to.
|
||||
"""
|
||||
timestamp = int(now if now is not None else time.time())
|
||||
if ok:
|
||||
return SyncWorkerState(
|
||||
@@ -99,17 +225,25 @@ def finish_sync(
|
||||
consecutive_failures=0,
|
||||
)
|
||||
|
||||
next_failures = state.consecutive_failures + 1
|
||||
delay_s = _failure_backoff_seconds(
|
||||
base_backoff_s=failure_backoff_s,
|
||||
consecutive_failures=next_failures,
|
||||
retry_after_s=retry_after_s,
|
||||
cap_s=failure_backoff_cap_s,
|
||||
)
|
||||
|
||||
return SyncWorkerState(
|
||||
last_sync_started_at=state.last_sync_started_at,
|
||||
last_sync_finished_at=timestamp,
|
||||
last_sync_ok_at=state.last_sync_ok_at,
|
||||
next_sync_due_at=timestamp + max(0, int(failure_backoff_s or 0)),
|
||||
next_sync_due_at=timestamp + delay_s,
|
||||
last_peer_url=peer_url or state.last_peer_url,
|
||||
last_error=str(error or "").strip(),
|
||||
last_outcome="fork" if fork_detected else "error",
|
||||
current_head=current_head or state.current_head,
|
||||
fork_detected=bool(fork_detected),
|
||||
consecutive_failures=state.consecutive_failures + 1,
|
||||
consecutive_failures=next_failures,
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,261 @@
|
||||
"""Infonet sync respects upstream HTTP 429 + applies exponential backoff.
|
||||
|
||||
Background
|
||||
----------
|
||||
Before this fix, ``finish_sync`` used a constant 60s ``failure_backoff_s``
|
||||
regardless of how many consecutive failures preceded. When an upstream
|
||||
peer (e.g. the seed onion) returned HTTP 429 "Too Many Requests", the
|
||||
sync worker would:
|
||||
|
||||
1. Receive 429
|
||||
2. Stringify the status into a generic ``ValueError``
|
||||
3. Call ``finish_sync(error=str(exc))`` -- losing the status code
|
||||
4. Schedule next attempt for ``now + 60s``
|
||||
5. Retry. Upstream's rate-limit bucket is still full. 429 again. Loop.
|
||||
|
||||
Net effect: a node with one transient 429 would hammer the upstream
|
||||
every 60s forever, keeping the bucket full and never recovering. This
|
||||
is what kept the user's Infonet node from reaching the seed peer.
|
||||
|
||||
What the fix does
|
||||
-----------------
|
||||
* New typed exception ``PeerSyncRateLimited`` carries the parsed
|
||||
``Retry-After`` value out of the HTTP layer.
|
||||
* ``_sync_from_peer`` returns ``(ok, error, forked, retry_after_s)``
|
||||
instead of the old 3-tuple.
|
||||
* ``finish_sync`` honors ``retry_after_s`` AND applies exponential
|
||||
backoff: ``delay = max(retry_after_s, base * 2^failures, cap=1800)``.
|
||||
* ``parse_retry_after_header`` handles both RFC 7231 forms (delay
|
||||
seconds, and HTTP-date).
|
||||
|
||||
These tests pin every part of the new contract.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# parse_retry_after_header — both RFC 7231 forms + edge cases
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseRetryAfter:
|
||||
def test_integer_seconds(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("120") == 120
|
||||
assert parse_retry_after_header(" 30 ") == 30
|
||||
assert parse_retry_after_header("0") == 0
|
||||
|
||||
def test_http_date(self):
|
||||
"""RFC 7231 §7.1.3 explicitly allows ``Retry-After: <HTTP-date>``.
|
||||
We compute seconds-from-now so callers can use the same field
|
||||
regardless of which form the upstream chose."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
# Pin "now" so the test is deterministic.
|
||||
now = 1_700_000_000.0 # 2023-11-14T22:13:20Z
|
||||
# 300 seconds in the future, formatted per RFC 7231.
|
||||
future = "Tue, 14 Nov 2023 22:18:20 GMT"
|
||||
result = parse_retry_after_header(future, now=now)
|
||||
assert 295 <= result <= 305, f"expected ~300s, got {result}"
|
||||
|
||||
def test_http_date_in_past_returns_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
now = 1_700_000_000.0
|
||||
past = "Mon, 13 Nov 2023 00:00:00 GMT"
|
||||
assert parse_retry_after_header(past, now=now) == 0
|
||||
|
||||
def test_empty_and_whitespace_return_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("") == 0
|
||||
assert parse_retry_after_header(" ") == 0
|
||||
|
||||
def test_malformed_returns_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("not a header") == 0
|
||||
assert parse_retry_after_header("xyz") == 0
|
||||
|
||||
def test_clamps_to_one_hour(self):
|
||||
"""A hostile peer can't silence us for a week by claiming a
|
||||
24h Retry-After. We cap at 1 hour."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("86400") == 3600 # 24h -> 1h
|
||||
assert parse_retry_after_header("99999999") == 3600
|
||||
|
||||
def test_negative_returns_zero(self):
|
||||
"""RFC 7231 says ``Retry-After`` is a non-negative integer;
|
||||
leading-minus parses as a non-digit and yields 0 here."""
|
||||
from services.mesh.mesh_infonet_sync_support import parse_retry_after_header
|
||||
assert parse_retry_after_header("-10") == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _failure_backoff_seconds — exponential growth, retry-after override, cap
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFailureBackoffSeconds:
|
||||
def test_exponential_growth(self):
|
||||
"""First failure uses the base (preserves pre-fix behavior
|
||||
for one-off blips). Each subsequent failure doubles the wait,
|
||||
capped at 1800s. With base=60: 60, 120, 240, 480, 960, 1800,
|
||||
1800, 1800."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
delays = [
|
||||
_failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=n,
|
||||
retry_after_s=0,
|
||||
cap_s=1800,
|
||||
)
|
||||
for n in range(1, 9)
|
||||
]
|
||||
assert delays == [60, 120, 240, 480, 960, 1800, 1800, 1800], delays
|
||||
|
||||
def test_retry_after_wins_when_larger(self):
|
||||
"""If the upstream says ``Retry-After: 600`` but exponential
|
||||
would only ask for 60s (one failure), we honor the upstream."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=1,
|
||||
retry_after_s=600,
|
||||
cap_s=1800,
|
||||
) == 600
|
||||
|
||||
def test_exponential_wins_when_larger(self):
|
||||
"""If exponential is asking for 1800s (6+ failures) but
|
||||
upstream only sent ``Retry-After: 30``, we honor exponential.
|
||||
The 30s was the upstream's view at one moment; our exponential
|
||||
reflects sustained failure."""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
result = _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=7,
|
||||
retry_after_s=30,
|
||||
cap_s=1800,
|
||||
)
|
||||
assert result == 1800
|
||||
|
||||
def test_cap_zero_disables_exponential(self):
|
||||
"""Operators who want pre-fix behavior can set cap=0; only the
|
||||
upstream's Retry-After is respected. (Pre-fix had no
|
||||
exponential growth at all.)"""
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=60,
|
||||
consecutive_failures=10,
|
||||
retry_after_s=120,
|
||||
cap_s=0,
|
||||
) == 120
|
||||
|
||||
def test_zero_inputs_return_zero(self):
|
||||
from services.mesh.mesh_infonet_sync_support import _failure_backoff_seconds
|
||||
assert _failure_backoff_seconds(
|
||||
base_backoff_s=0,
|
||||
consecutive_failures=0,
|
||||
retry_after_s=0,
|
||||
) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# finish_sync end-to-end — failure path with retry-after + growing counter
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestFinishSyncBackoff:
|
||||
def _state(self, **overrides):
|
||||
from services.mesh.mesh_infonet_sync_support import SyncWorkerState
|
||||
base = {
|
||||
"last_sync_started_at": 0,
|
||||
"last_sync_finished_at": 0,
|
||||
"last_sync_ok_at": 0,
|
||||
"next_sync_due_at": 0,
|
||||
"last_peer_url": "",
|
||||
"last_error": "",
|
||||
"last_outcome": "idle",
|
||||
"current_head": "",
|
||||
"fork_detected": False,
|
||||
"consecutive_failures": 0,
|
||||
}
|
||||
base.update(overrides)
|
||||
return SyncWorkerState(**base)
|
||||
|
||||
def test_first_failure_uses_base_unchanged(self):
|
||||
"""One failure means consecutive_failures becomes 1, which uses
|
||||
``base * 2^0 = base``. Preserves the pre-fix behavior so a
|
||||
single transient upstream blip doesn't suddenly take 2 minutes
|
||||
to retry — that change has to be earned by sustained failure."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(),
|
||||
ok=False,
|
||||
error="some upstream blip",
|
||||
now=1000.0,
|
||||
failure_backoff_s=60,
|
||||
)
|
||||
assert result.consecutive_failures == 1
|
||||
assert result.next_sync_due_at == 1000 + 60
|
||||
assert result.last_error == "some upstream blip"
|
||||
assert result.last_outcome == "error"
|
||||
|
||||
def test_consecutive_failures_grow_the_delay(self):
|
||||
"""After 5 prior failures already in state, the next failure
|
||||
sets consecutive=6 and uses the cap (1800s = 60 * 2^5)."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=5),
|
||||
ok=False,
|
||||
error="HTTP 429",
|
||||
now=2000.0,
|
||||
failure_backoff_s=60,
|
||||
)
|
||||
assert result.consecutive_failures == 6
|
||||
assert result.next_sync_due_at == 2000 + 1800
|
||||
|
||||
def test_retry_after_honored_at_low_failure_count(self):
|
||||
"""When the upstream says ``Retry-After: 900`` but we'd
|
||||
otherwise only wait 240s (4 failures = 60*2^3), wait 900s."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=3),
|
||||
ok=False,
|
||||
error="HTTP 429",
|
||||
now=5000.0,
|
||||
failure_backoff_s=60,
|
||||
retry_after_s=900,
|
||||
)
|
||||
assert result.consecutive_failures == 4
|
||||
assert result.next_sync_due_at == 5000 + 900
|
||||
|
||||
def test_success_resets_consecutive_failures(self):
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(consecutive_failures=4),
|
||||
ok=True,
|
||||
now=7000.0,
|
||||
interval_s=300,
|
||||
)
|
||||
assert result.consecutive_failures == 0
|
||||
assert result.next_sync_due_at == 7000 + 300
|
||||
assert result.last_outcome == "ok"
|
||||
|
||||
def test_last_error_carries_status_string(self):
|
||||
"""The pre-fix path stringified exceptions into ``last_error``
|
||||
but the string was often empty (HTTP layer raised ValueError
|
||||
with no message). We now require callers to pass something
|
||||
meaningful — see the typed exception path in main.py."""
|
||||
from services.mesh.mesh_infonet_sync_support import finish_sync
|
||||
result = finish_sync(
|
||||
self._state(),
|
||||
ok=False,
|
||||
error="HTTP 429 from peer (retry_after=120s): rate-limited",
|
||||
now=1000.0,
|
||||
failure_backoff_s=60,
|
||||
retry_after_s=120,
|
||||
)
|
||||
assert "HTTP 429" in result.last_error
|
||||
assert "retry_after=120s" in result.last_error
|
||||
Reference in New Issue
Block a user