From fb49cb71e3a99ec103387e82c6968e4e7f84109b Mon Sep 17 00:00:00 2001 From: Alex Date: Fri, 7 Feb 2025 00:09:03 -0500 Subject: [PATCH] debounce upstream failure checking and failure counts --- cmd/cli/dns_proxy.go | 35 +++++++++++++++++++++++++++-------- cmd/cli/upstream_monitor.go | 17 ++++++++++++++++- 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/cmd/cli/dns_proxy.go b/cmd/cli/dns_proxy.go index d5b77c7..f7bbe6e 100644 --- a/cmd/cli/dns_proxy.go +++ b/cmd/cli/dns_proxy.go @@ -559,7 +559,11 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse { if isNetworkErr { p.um.increaseFailureCount(upstreams[n]) if p.um.isDown(upstreams[n]) { - go p.checkUpstream(upstreams[n], upstreamConfig) + p.um.mu.RLock() + if !p.um.checking[upstreams[n]] { + go p.checkUpstream(upstreams[n], upstreamConfig) + } + p.um.mu.RUnlock() } } // For timeout error (i.e: context deadline exceed), force re-bootstrapping. @@ -569,6 +573,12 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse { } return nil } + // if we have an answer, we should reset the failure count + if answer != nil { + p.um.mu.Lock() + p.um.failureReq[upstreams[n]] = 0 + p.um.mu.Unlock() + } return answer } for n, upstreamConfig := range upstreamConfigs { @@ -1029,15 +1039,21 @@ func (p *prog) performLeakingQuery(failedUpstreams map[string]*ctrld.UpstreamCon upstreamCh := make(chan string, len(failedUpstreams)) for name, uc := range failedUpstreams { go func(name string, uc *ctrld.UpstreamConfig) { - mainLog.Load().Debug(). - Str("upstream", name). - Msg("checking upstream") - for { select { case <-ctx.Done(): return default: + // make sure this upstream is not already being checked + p.um.mu.RLock() + if p.um.checking[name] { + p.um.mu.RUnlock() + continue + } + mainLog.Load().Debug(). + Str("upstream", name). + Msg("checking upstream") + p.checkUpstream(name, uc) mainLog.Load().Debug(). Str("upstream", name). @@ -1256,12 +1272,15 @@ func (p *prog) reinitializeOSResolver(networkChange bool) { mainLog.Load().Warn().Msgf("re-initialized OS resolver with nameservers: %v", ns) } - // start leaking queries immediately// start leaking queries immediately + // start leaking queries immediately if networkChange { // set all upstreams to failed and provide to performLeakingQuery failedUpstreams := make(map[string]*ctrld.UpstreamConfig) - for _, upstream := range p.cfg.Upstream { - failedUpstreams[upstream.Name] = upstream + // Iterate over both key and upstream to ensure that we have a fallback key + for key, upstream := range p.cfg.Upstream { + mainLog.Load().Debug().Msgf("network change upstream checking: %v, key: %q", upstream, key) + mapKey := upstreamPrefix + key + failedUpstreams[mapKey] = upstream } go p.performLeakingQuery(failedUpstreams, "all") diff --git a/cmd/cli/upstream_monitor.go b/cmd/cli/upstream_monitor.go index fc5d65d..df52a14 100644 --- a/cmd/cli/upstream_monitor.go +++ b/cmd/cli/upstream_monitor.go @@ -21,10 +21,11 @@ const ( type upstreamMonitor struct { cfg *ctrld.Config - mu sync.Mutex + mu sync.RWMutex checking map[string]bool down map[string]bool failureReq map[string]uint64 + recovered map[string]bool } func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor { @@ -33,6 +34,7 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor { checking: make(map[string]bool), down: make(map[string]bool), failureReq: make(map[string]uint64), + recovered: make(map[string]bool), } for n := range cfg.Upstream { upstream := upstreamPrefix + n @@ -47,6 +49,11 @@ func (um *upstreamMonitor) increaseFailureCount(upstream string) { um.mu.Lock() defer um.mu.Unlock() + if um.recovered[upstream] { + mainLog.Load().Debug().Msgf("upstream %q is recovered, skipping failure count increase", upstream) + return + } + um.failureReq[upstream] += 1 failedCount := um.failureReq[upstream] @@ -77,6 +84,14 @@ func (um *upstreamMonitor) reset(upstream string) { um.failureReq[upstream] = 0 um.down[upstream] = false + um.recovered[upstream] = true + go func() { + // debounce the recovery to avoid incrementing failure counts already in flight + time.Sleep(1 * time.Second) + um.mu.Lock() + um.recovered[upstream] = false + um.mu.Unlock() + }() } // checkUpstream checks the given upstream status, periodically sending query to upstream