diff --git a/cmd/cli/upstream_monitor.go b/cmd/cli/upstream_monitor.go index 507a06f..acc02bb 100644 --- a/cmd/cli/upstream_monitor.go +++ b/cmd/cli/upstream_monitor.go @@ -9,7 +9,7 @@ import ( const ( // maxFailureRequest is the maximum failed queries allowed before an upstream is marked as down. - maxFailureRequest = 100 + maxFailureRequest = 50 // checkUpstreamBackoffSleep is the time interval between each upstream checks. checkUpstreamBackoffSleep = 2 * time.Second ) @@ -23,15 +23,19 @@ type upstreamMonitor struct { down map[string]bool failureReq map[string]uint64 recovered map[string]bool + + // failureTimerActive tracks if a timer is already running for a given upstream. + failureTimerActive map[string]bool } func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor { um := &upstreamMonitor{ - cfg: cfg, - checking: make(map[string]bool), - down: make(map[string]bool), - failureReq: make(map[string]uint64), - recovered: make(map[string]bool), + cfg: cfg, + checking: make(map[string]bool), + down: make(map[string]bool), + failureReq: make(map[string]uint64), + recovered: make(map[string]bool), + failureTimerActive: make(map[string]bool), } for n := range cfg.Upstream { upstream := upstreamPrefix + n @@ -42,6 +46,8 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor { } // increaseFailureCount increases failed queries count for an upstream by 1 and logs debug information. +// It uses a timer to debounce failure detection, ensuring that an upstream is marked as down +// within 10 seconds if failures persist, without spawning duplicate goroutines. func (um *upstreamMonitor) increaseFailureCount(upstream string) { um.mu.Lock() defer um.mu.Unlock() @@ -54,13 +60,31 @@ func (um *upstreamMonitor) increaseFailureCount(upstream string) { um.failureReq[upstream] += 1 failedCount := um.failureReq[upstream] - // Log the updated failure count + // Log the updated failure count. mainLog.Load().Debug().Msgf("upstream %q failure count updated to %d", upstream, failedCount) - // Check if the failure count has reached the threshold to mark the upstream as down. + // If this is the first failure and no timer is running, start a 10-second timer. + if failedCount == 1 && !um.failureTimerActive[upstream] { + um.failureTimerActive[upstream] = true + go func(upstream string) { + time.Sleep(10 * time.Second) + um.mu.Lock() + defer um.mu.Unlock() + // If no success occurred during the 10-second window (i.e. counter remains > 0) + // and the upstream is not in a recovered state, mark it as down. + if um.failureReq[upstream] > 0 && !um.recovered[upstream] { + um.down[upstream] = true + mainLog.Load().Warn().Msgf("upstream %q marked as down after 10 seconds (failure count: %d)", upstream, um.failureReq[upstream]) + } + // Reset the timer flag so that a new timer can be spawned if needed. + um.failureTimerActive[upstream] = false + }(upstream) + } + + // If the failure count quickly reaches the threshold, mark the upstream as down immediately. if failedCount >= maxFailureRequest { um.down[upstream] = true - mainLog.Load().Warn().Msgf("upstream %q marked as down (failure count: %d)", upstream, failedCount) + mainLog.Load().Warn().Msgf("upstream %q marked as down immediately (failure count: %d)", upstream, failedCount) } else { um.down[upstream] = false }