mirror of
https://github.com/Control-D-Inc/ctrld.git
synced 2026-02-03 22:18:39 +00:00
debounce upstream failure checking and failure counts
This commit is contained in:
@@ -559,7 +559,11 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
if isNetworkErr {
|
||||
p.um.increaseFailureCount(upstreams[n])
|
||||
if p.um.isDown(upstreams[n]) {
|
||||
go p.checkUpstream(upstreams[n], upstreamConfig)
|
||||
p.um.mu.RLock()
|
||||
if !p.um.checking[upstreams[n]] {
|
||||
go p.checkUpstream(upstreams[n], upstreamConfig)
|
||||
}
|
||||
p.um.mu.RUnlock()
|
||||
}
|
||||
}
|
||||
// For timeout error (i.e: context deadline exceed), force re-bootstrapping.
|
||||
@@ -569,6 +573,12 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// if we have an answer, we should reset the failure count
|
||||
if answer != nil {
|
||||
p.um.mu.Lock()
|
||||
p.um.failureReq[upstreams[n]] = 0
|
||||
p.um.mu.Unlock()
|
||||
}
|
||||
return answer
|
||||
}
|
||||
for n, upstreamConfig := range upstreamConfigs {
|
||||
@@ -1029,15 +1039,21 @@ func (p *prog) performLeakingQuery(failedUpstreams map[string]*ctrld.UpstreamCon
|
||||
upstreamCh := make(chan string, len(failedUpstreams))
|
||||
for name, uc := range failedUpstreams {
|
||||
go func(name string, uc *ctrld.UpstreamConfig) {
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
Msg("checking upstream")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
// make sure this upstream is not already being checked
|
||||
p.um.mu.RLock()
|
||||
if p.um.checking[name] {
|
||||
p.um.mu.RUnlock()
|
||||
continue
|
||||
}
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
Msg("checking upstream")
|
||||
|
||||
p.checkUpstream(name, uc)
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
@@ -1256,12 +1272,15 @@ func (p *prog) reinitializeOSResolver(networkChange bool) {
|
||||
mainLog.Load().Warn().Msgf("re-initialized OS resolver with nameservers: %v", ns)
|
||||
}
|
||||
|
||||
// start leaking queries immediately// start leaking queries immediately
|
||||
// start leaking queries immediately
|
||||
if networkChange {
|
||||
// set all upstreams to failed and provide to performLeakingQuery
|
||||
failedUpstreams := make(map[string]*ctrld.UpstreamConfig)
|
||||
for _, upstream := range p.cfg.Upstream {
|
||||
failedUpstreams[upstream.Name] = upstream
|
||||
// Iterate over both key and upstream to ensure that we have a fallback key
|
||||
for key, upstream := range p.cfg.Upstream {
|
||||
mainLog.Load().Debug().Msgf("network change upstream checking: %v, key: %q", upstream, key)
|
||||
mapKey := upstreamPrefix + key
|
||||
failedUpstreams[mapKey] = upstream
|
||||
}
|
||||
go p.performLeakingQuery(failedUpstreams, "all")
|
||||
|
||||
|
||||
@@ -21,10 +21,11 @@ const (
|
||||
type upstreamMonitor struct {
|
||||
cfg *ctrld.Config
|
||||
|
||||
mu sync.Mutex
|
||||
mu sync.RWMutex
|
||||
checking map[string]bool
|
||||
down map[string]bool
|
||||
failureReq map[string]uint64
|
||||
recovered map[string]bool
|
||||
}
|
||||
|
||||
func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor {
|
||||
@@ -33,6 +34,7 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor {
|
||||
checking: make(map[string]bool),
|
||||
down: make(map[string]bool),
|
||||
failureReq: make(map[string]uint64),
|
||||
recovered: make(map[string]bool),
|
||||
}
|
||||
for n := range cfg.Upstream {
|
||||
upstream := upstreamPrefix + n
|
||||
@@ -47,6 +49,11 @@ func (um *upstreamMonitor) increaseFailureCount(upstream string) {
|
||||
um.mu.Lock()
|
||||
defer um.mu.Unlock()
|
||||
|
||||
if um.recovered[upstream] {
|
||||
mainLog.Load().Debug().Msgf("upstream %q is recovered, skipping failure count increase", upstream)
|
||||
return
|
||||
}
|
||||
|
||||
um.failureReq[upstream] += 1
|
||||
failedCount := um.failureReq[upstream]
|
||||
|
||||
@@ -77,6 +84,14 @@ func (um *upstreamMonitor) reset(upstream string) {
|
||||
|
||||
um.failureReq[upstream] = 0
|
||||
um.down[upstream] = false
|
||||
um.recovered[upstream] = true
|
||||
go func() {
|
||||
// debounce the recovery to avoid incrementing failure counts already in flight
|
||||
time.Sleep(1 * time.Second)
|
||||
um.mu.Lock()
|
||||
um.recovered[upstream] = false
|
||||
um.mu.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// checkUpstream checks the given upstream status, periodically sending query to upstream
|
||||
|
||||
Reference in New Issue
Block a user