Files
ctrld/cmd/cli/upstream_monitor.go
Alex 2687a4a018 remove leaking timeout, fix blocking upstreams checks, leaking is per listener, OS resolvers are tested in parallel, reset is only done is os is down
fix test

use upstreamIS var

init map, fix watcher flag

attempt to detect network changes

attempt to detect network changes

cancel and rerun reinitializeOSResolver

cancel and rerun reinitializeOSResolver

cancel and rerun reinitializeOSResolver

ignore invalid inferaces

ignore invalid inferaces

allow OS resolver upstream to fail

dont wait for dnsWait group on reinit, check for active interfaces to trigger reinit

fix unused var

simpler active iface check, debug logs

dont spam network service name patching on Mac

dont wait for os resolver nameserver testing

remove test for osresovlers for now

async nameserver testing

remove unused test
2025-01-20 15:03:27 +07:00

123 lines
3.0 KiB
Go

package cli
import (
"context"
"sync"
"time"
"github.com/miekg/dns"
"github.com/Control-D-Inc/ctrld"
)
const (
// maxFailureRequest is the maximum failed queries allowed before an upstream is marked as down.
maxFailureRequest = 100
// checkUpstreamBackoffSleep is the time interval between each upstream checks.
checkUpstreamBackoffSleep = 2 * time.Second
)
// upstreamMonitor performs monitoring upstreams health.
type upstreamMonitor struct {
cfg *ctrld.Config
mu sync.Mutex
checking map[string]bool
down map[string]bool
failureReq map[string]uint64
}
func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor {
um := &upstreamMonitor{
cfg: cfg,
checking: make(map[string]bool),
down: make(map[string]bool),
failureReq: make(map[string]uint64),
}
for n := range cfg.Upstream {
upstream := upstreamPrefix + n
um.reset(upstream)
}
um.reset(upstreamOS)
return um
}
// increaseFailureCount increase failed queries count for an upstream by 1.
func (um *upstreamMonitor) increaseFailureCount(upstream string) {
um.mu.Lock()
defer um.mu.Unlock()
um.failureReq[upstream] += 1
failedCount := um.failureReq[upstream]
um.down[upstream] = failedCount >= maxFailureRequest
}
// isDown reports whether the given upstream is being marked as down.
func (um *upstreamMonitor) isDown(upstream string) bool {
um.mu.Lock()
defer um.mu.Unlock()
return um.down[upstream]
}
// reset marks an upstream as up and set failed queries counter to zero.
func (um *upstreamMonitor) reset(upstream string) {
um.mu.Lock()
defer um.mu.Unlock()
um.failureReq[upstream] = 0
um.down[upstream] = false
}
// checkUpstream checks the given upstream status, periodically sending query to upstream
// until successfully. An upstream status/counter will be reset once it becomes reachable.
func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) {
p.um.mu.Lock()
isChecking := p.um.checking[upstream]
if isChecking {
p.um.mu.Unlock()
return
}
p.um.checking[upstream] = true
p.um.mu.Unlock()
defer func() {
p.um.mu.Lock()
p.um.checking[upstream] = false
p.um.mu.Unlock()
}()
resolver, err := ctrld.NewResolver(uc)
if err != nil {
mainLog.Load().Warn().Err(err).Msg("could not check upstream")
return
}
msg := new(dns.Msg)
msg.SetQuestion(".", dns.TypeNS)
timeout := 1000 * time.Millisecond
if uc.Timeout > 0 {
timeout = time.Duration(uc.Timeout) * time.Millisecond
}
check := func() error {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
uc.ReBootstrap()
_, err := resolver.Resolve(ctx, msg)
return err
}
endpoint := uc.Endpoint
if endpoint == "" {
endpoint = uc.Name
}
mainLog.Load().Warn().Msgf("upstream %q is offline", endpoint)
for {
if err := check(); err == nil {
mainLog.Load().Warn().Msgf("upstream %q is online", endpoint)
p.um.reset(upstream)
return
} else {
mainLog.Load().Debug().Msgf("checked upstream %q failed: %v", endpoint, err)
}
time.Sleep(checkUpstreamBackoffSleep)
}
}