mirror of
https://github.com/Control-D-Inc/ctrld.git
synced 2026-05-15 00:50:25 +02:00
remove leaking timeout, fix blocking upstreams checks, leaking is per listener, OS resolvers are tested in parallel, reset is only done is os is down
fix test use upstreamIS var init map, fix watcher flag attempt to detect network changes attempt to detect network changes cancel and rerun reinitializeOSResolver cancel and rerun reinitializeOSResolver cancel and rerun reinitializeOSResolver ignore invalid inferaces ignore invalid inferaces allow OS resolver upstream to fail dont wait for dnsWait group on reinit, check for active interfaces to trigger reinit fix unused var simpler active iface check, debug logs dont spam network service name patching on Mac dont wait for os resolver nameserver testing remove test for osresovlers for now async nameserver testing remove unused test
This commit is contained in:
+222
-78
@@ -19,6 +19,7 @@ import (
|
||||
"golang.org/x/sync/errgroup"
|
||||
"tailscale.com/net/netmon"
|
||||
"tailscale.com/net/tsaddr"
|
||||
"tailscale.com/types/logger"
|
||||
|
||||
"github.com/Control-D-Inc/ctrld"
|
||||
"github.com/Control-D-Inc/ctrld/internal/controld"
|
||||
@@ -77,6 +78,12 @@ type upstreamForResult struct {
|
||||
}
|
||||
|
||||
func (p *prog) serveDNS(listenerNum string) error {
|
||||
// Start network monitoring
|
||||
if err := p.monitorNetworkChanges(); err != nil {
|
||||
mainLog.Load().Error().Err(err).Msg("Failed to start network monitoring")
|
||||
// Don't return here as we still want DNS service to run
|
||||
}
|
||||
|
||||
listenerConfig := p.cfg.Listener[listenerNum]
|
||||
// make sure ip is allocated
|
||||
if allocErr := p.allocateIP(listenerConfig.IP); allocErr != nil {
|
||||
@@ -418,11 +425,17 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
serveStaleCache := p.cache != nil && p.cfg.Service.CacheServeStale
|
||||
upstreamConfigs := p.upstreamConfigsFromUpstreamNumbers(upstreams)
|
||||
|
||||
upstreamMapKey := strings.Join(upstreams, "_")
|
||||
|
||||
leaked := false
|
||||
if len(upstreamConfigs) > 0 && p.leakingQuery.Load() {
|
||||
upstreamConfigs = nil
|
||||
leaked = true
|
||||
ctrld.Log(ctx, mainLog.Load().Debug(), "%v is down, leaking query to OS resolver", upstreams)
|
||||
if len(upstreamConfigs) > 0 {
|
||||
p.leakingQueryMu.Lock()
|
||||
if p.leakingQueryRunning[upstreamMapKey] {
|
||||
upstreamConfigs = nil
|
||||
leaked = true
|
||||
ctrld.Log(ctx, mainLog.Load().Debug(), "%v is down, leaking query to OS resolver", upstreams)
|
||||
}
|
||||
p.leakingQueryMu.Unlock()
|
||||
}
|
||||
|
||||
if len(upstreamConfigs) == 0 {
|
||||
@@ -601,9 +614,15 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
ctrld.Log(ctx, mainLog.Load().Error(), "all %v endpoints failed", upstreams)
|
||||
if p.leakOnUpstreamFailure() {
|
||||
p.leakingQueryMu.Lock()
|
||||
if !p.leakingQueryWasRun {
|
||||
p.leakingQueryWasRun = true
|
||||
go p.performLeakingQuery()
|
||||
// get the map key as concact of upstreams
|
||||
if !p.leakingQueryRunning[upstreamMapKey] {
|
||||
p.leakingQueryRunning[upstreamMapKey] = true
|
||||
// get a map of the failed upstreams
|
||||
failedUpstreams := make(map[string]*ctrld.UpstreamConfig)
|
||||
for n, upstream := range upstreamConfigs {
|
||||
failedUpstreams[upstreams[n]] = upstream
|
||||
}
|
||||
go p.performLeakingQuery(failedUpstreams, upstreamMapKey)
|
||||
}
|
||||
p.leakingQueryMu.Unlock()
|
||||
}
|
||||
@@ -929,95 +948,66 @@ func (p *prog) selfUninstallCoolOfPeriod() {
|
||||
}
|
||||
|
||||
// performLeakingQuery performs necessary works to leak queries to OS resolver.
|
||||
func (p *prog) performLeakingQuery() {
|
||||
mainLog.Load().Warn().Msg("leaking query to OS resolver")
|
||||
// once we store the leakingQuery flag, we are leaking queries to OS resolver
|
||||
// we then start testing all the upstreams forever, waiting for success, but in parallel
|
||||
func (p *prog) performLeakingQuery(failedUpstreams map[string]*ctrld.UpstreamConfig, upstreamMapKey string) {
|
||||
|
||||
// Create a context with timeout for the entire operation
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
mainLog.Load().Warn().Msgf("leaking queries for failed upstreams [%v] to OS resolver", failedUpstreams)
|
||||
|
||||
// Signal dns watchers to stop, so changes made below won't be reverted.
|
||||
p.leakingQuery.Store(true)
|
||||
p.leakingQueryMu.Lock()
|
||||
p.leakingQueryRunning[upstreamMapKey] = true
|
||||
p.leakingQueryMu.Unlock()
|
||||
defer func() {
|
||||
p.leakingQuery.Store(false)
|
||||
p.leakingQueryMu.Lock()
|
||||
p.leakingQueryWasRun = false
|
||||
p.leakingQueryRunning[upstreamMapKey] = false
|
||||
p.leakingQueryMu.Unlock()
|
||||
mainLog.Load().Warn().Msg("stop leaking query")
|
||||
}()
|
||||
|
||||
// Create channels to coordinate operations
|
||||
resetDone := make(chan struct{})
|
||||
checkDone := make(chan struct{})
|
||||
// we only want to reset DNS when our resolver is broken
|
||||
// this allows us to find the new OS resolver nameservers
|
||||
if p.um.isDown(upstreamOS) {
|
||||
|
||||
// Reset DNS with timeout
|
||||
go func() {
|
||||
defer close(resetDone)
|
||||
mainLog.Load().Debug().Msg("attempting to reset DNS")
|
||||
p.resetDNS()
|
||||
mainLog.Load().Debug().Msg("DNS reset completed")
|
||||
}()
|
||||
mainLog.Load().Debug().Msg("OS resolver is down, reinitializing")
|
||||
p.reinitializeOSResolver()
|
||||
|
||||
// Wait for reset with timeout
|
||||
select {
|
||||
case <-resetDone:
|
||||
mainLog.Load().Debug().Msg("DNS reset successful")
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Error().Msg("DNS reset timed out")
|
||||
return
|
||||
}
|
||||
|
||||
// Check upstream in background with progress tracking
|
||||
go func() {
|
||||
defer close(checkDone)
|
||||
mainLog.Load().Debug().Msg("starting upstream checks")
|
||||
for name, uc := range p.cfg.Upstream {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
Msg("checking upstream")
|
||||
p.checkUpstream(name, uc)
|
||||
// Test all failed upstreams in parallel
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
upstreamCh := make(chan string, len(failedUpstreams))
|
||||
for name, uc := range failedUpstreams {
|
||||
go func(name string, uc *ctrld.UpstreamConfig) {
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
Msg("checking upstream")
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
p.checkUpstream(name, uc)
|
||||
mainLog.Load().Debug().
|
||||
Str("upstream", name).
|
||||
Msg("upstream recovered")
|
||||
upstreamCh <- name
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
mainLog.Load().Debug().Msg("upstream checks completed")
|
||||
}()
|
||||
|
||||
// Wait for upstream checks
|
||||
select {
|
||||
case <-checkDone:
|
||||
mainLog.Load().Debug().Msg("upstream checks successful")
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Error().Msg("upstream checks timed out")
|
||||
return
|
||||
}(name, uc)
|
||||
}
|
||||
|
||||
// Initialize OS resolver with timeout
|
||||
mainLog.Load().Debug().Msg("initializing OS resolver")
|
||||
ns := ctrld.InitializeOsResolver()
|
||||
mainLog.Load().Debug().Msgf("re-initialized OS resolver with nameservers: %v", ns)
|
||||
// Wait for any upstream to recover
|
||||
name := <-upstreamCh
|
||||
|
||||
// Wait for DNS operations to complete
|
||||
waitCh := make(chan struct{})
|
||||
go func() {
|
||||
p.dnsWg.Wait()
|
||||
close(waitCh)
|
||||
}()
|
||||
mainLog.Load().Info().
|
||||
Str("upstream", name).
|
||||
Msg("stopping leak as upstream recovered")
|
||||
|
||||
select {
|
||||
case <-waitCh:
|
||||
mainLog.Load().Debug().Msg("DNS operations completed")
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Error().Msg("DNS operations timed out")
|
||||
return
|
||||
}
|
||||
|
||||
// Set DNS with timeout
|
||||
mainLog.Load().Debug().Msg("setting DNS configuration")
|
||||
p.setDNS()
|
||||
mainLog.Load().Debug().Msg("DNS configuration set successfully")
|
||||
}
|
||||
|
||||
// forceFetchingAPI sends signal to force syncing API config if run in cd mode,
|
||||
@@ -1190,3 +1180,157 @@ func resolveInternalDomainTestQuery(ctx context.Context, domain string, m *dns.M
|
||||
answer.SetReply(m)
|
||||
return answer
|
||||
}
|
||||
|
||||
// reinitializeOSResolver reinitializes the OS resolver
|
||||
// by removing ctrld listenr from the interface, collecting the network nameservers
|
||||
// and re-initializing the OS resolver with the nameservers
|
||||
// applying listener back to the interface
|
||||
func (p *prog) reinitializeOSResolver() {
|
||||
// Cancel any existing operations
|
||||
p.resetCtxMu.Lock()
|
||||
if p.resetCancel != nil {
|
||||
p.resetCancel()
|
||||
}
|
||||
|
||||
// Create new context for this operation
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
p.resetCtx = ctx
|
||||
p.resetCancel = cancel
|
||||
p.resetCtxMu.Unlock()
|
||||
|
||||
// Ensure cleanup
|
||||
defer cancel()
|
||||
|
||||
p.leakingQueryReset.Store(true)
|
||||
defer p.leakingQueryReset.Store(false)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Debug().Msg("DNS reset cancelled by new network change")
|
||||
return
|
||||
default:
|
||||
mainLog.Load().Debug().Msg("attempting to reset DNS")
|
||||
p.resetDNS()
|
||||
mainLog.Load().Debug().Msg("DNS reset completed")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Debug().Msg("DNS reset cancelled by new network change")
|
||||
return
|
||||
default:
|
||||
mainLog.Load().Debug().Msg("initializing OS resolver")
|
||||
ns := ctrld.InitializeOsResolver()
|
||||
mainLog.Load().Debug().Msgf("re-initialized OS resolver with nameservers: %v", ns)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
mainLog.Load().Debug().Msg("DNS reset cancelled by new network change")
|
||||
return
|
||||
default:
|
||||
mainLog.Load().Debug().Msg("setting DNS configuration")
|
||||
p.setDNS()
|
||||
mainLog.Load().Debug().Msg("DNS configuration set successfully")
|
||||
}
|
||||
}
|
||||
|
||||
// monitorNetworkChanges starts monitoring for network interface changes
|
||||
func (p *prog) monitorNetworkChanges() error {
|
||||
// Create network monitor
|
||||
mon, err := netmon.New(logger.WithPrefix(mainLog.Load().Printf, "netmon: "))
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating network monitor: %w", err)
|
||||
}
|
||||
|
||||
mon.RegisterChangeCallback(func(delta *netmon.ChangeDelta) {
|
||||
// Get map of valid interfaces
|
||||
validIfaces := validInterfacesMap()
|
||||
|
||||
// Parse old and new interface states
|
||||
oldIfs := parseInterfaceState(delta.Old)
|
||||
newIfs := parseInterfaceState(delta.New)
|
||||
|
||||
// Check for changes in valid interfaces
|
||||
changed := false
|
||||
activeInterfaceExists := false
|
||||
|
||||
for ifaceName := range validIfaces {
|
||||
|
||||
oldState, oldExists := oldIfs[strings.ToLower(ifaceName)]
|
||||
newState, newExists := newIfs[strings.ToLower(ifaceName)]
|
||||
|
||||
if newState != "" && newState != "down" {
|
||||
activeInterfaceExists = true
|
||||
}
|
||||
|
||||
if oldExists != newExists || oldState != newState {
|
||||
changed = true
|
||||
mainLog.Load().Debug().
|
||||
Str("interface", ifaceName).
|
||||
Str("old_state", oldState).
|
||||
Str("new_state", newState).
|
||||
Msg("Valid interface changed state")
|
||||
break
|
||||
} else {
|
||||
mainLog.Load().Debug().
|
||||
Str("interface", ifaceName).
|
||||
Str("old_state", oldState).
|
||||
Str("new_state", newState).
|
||||
Msg("Valid interface unchanged")
|
||||
}
|
||||
}
|
||||
|
||||
if !changed {
|
||||
mainLog.Load().Debug().Msgf("Ignoring interface change - no valid interfaces affected")
|
||||
return
|
||||
}
|
||||
|
||||
mainLog.Load().Debug().Msgf("Network change detected: from %v to %v", delta.Old, delta.New)
|
||||
if activeInterfaceExists {
|
||||
p.reinitializeOSResolver()
|
||||
} else {
|
||||
mainLog.Load().Debug().Msg("No active interfaces found, skipping reinitialization")
|
||||
}
|
||||
})
|
||||
|
||||
mon.Start()
|
||||
mainLog.Load().Debug().Msg("Network monitor started")
|
||||
return nil
|
||||
}
|
||||
|
||||
// parseInterfaceState parses the interface state string into a map of interface name -> state
|
||||
func parseInterfaceState(state *netmon.State) map[string]string {
|
||||
if state == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result := make(map[string]string)
|
||||
|
||||
// Extract ifs={...} section
|
||||
stateStr := state.String()
|
||||
ifsStart := strings.Index(stateStr, "ifs={")
|
||||
if ifsStart == -1 {
|
||||
return result
|
||||
}
|
||||
|
||||
ifsStr := stateStr[ifsStart+5:]
|
||||
ifsEnd := strings.Index(ifsStr, "}")
|
||||
if ifsEnd == -1 {
|
||||
return result
|
||||
}
|
||||
|
||||
// Parse each interface entry
|
||||
ifaces := strings.Split(ifsStr[:ifsEnd], " ")
|
||||
for _, iface := range ifaces {
|
||||
parts := strings.Split(iface, ":")
|
||||
if len(parts) != 2 {
|
||||
continue
|
||||
}
|
||||
name := strings.ToLower(parts[0])
|
||||
state := parts[1]
|
||||
result[name] = state
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -17,9 +17,8 @@ func patchNetIfaceName(iface *net.Interface) (bool, error) {
|
||||
|
||||
patched := false
|
||||
if name := networkServiceName(iface.Name, bytes.NewReader(b)); name != "" {
|
||||
iface.Name = name
|
||||
mainLog.Load().Debug().Str("network_service", name).Msg("found network service name for interface")
|
||||
patched = true
|
||||
iface.Name = name
|
||||
}
|
||||
return patched, nil
|
||||
}
|
||||
|
||||
+12
-6
@@ -115,9 +115,13 @@ type prog struct {
|
||||
loopMu sync.Mutex
|
||||
loop map[string]bool
|
||||
|
||||
leakingQueryMu sync.Mutex
|
||||
leakingQueryWasRun bool
|
||||
leakingQuery atomic.Bool
|
||||
leakingQueryMu sync.Mutex
|
||||
leakingQueryRunning map[string]bool
|
||||
leakingQueryReset atomic.Bool
|
||||
|
||||
resetCtx context.Context
|
||||
resetCancel context.CancelFunc
|
||||
resetCtxMu sync.Mutex
|
||||
|
||||
started chan struct{}
|
||||
onStartedDone chan struct{}
|
||||
@@ -420,6 +424,7 @@ func (p *prog) run(reload bool, reloadCh chan struct{}) {
|
||||
}
|
||||
p.onStartedDone = make(chan struct{})
|
||||
p.loop = make(map[string]bool)
|
||||
p.leakingQueryRunning = make(map[string]bool)
|
||||
p.lanLoopGuard = newLoopGuard()
|
||||
p.ptrLoopGuard = newLoopGuard()
|
||||
p.cacheFlushDomainsMap = nil
|
||||
@@ -737,12 +742,13 @@ func (p *prog) dnsWatchdog(iface *net.Interface, nameservers []string, allIfaces
|
||||
if !requiredMultiNICsConfig() {
|
||||
return
|
||||
}
|
||||
logger := mainLog.Load().With().Str("iface", iface.Name).Logger()
|
||||
logger.Debug().Msg("start DNS settings watchdog")
|
||||
|
||||
mainLog.Load().Debug().Msg("start DNS settings watchdog")
|
||||
ns := nameservers
|
||||
slices.Sort(ns)
|
||||
ticker := time.NewTicker(p.dnsWatchdogDuration())
|
||||
logger := mainLog.Load().With().Str("iface", iface.Name).Logger()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.dnsWatcherStopCh:
|
||||
@@ -751,7 +757,7 @@ func (p *prog) dnsWatchdog(iface *net.Interface, nameservers []string, allIfaces
|
||||
mainLog.Load().Debug().Msg("stop dns watchdog")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if p.leakingQuery.Load() {
|
||||
if p.leakingQueryReset.Load() {
|
||||
return
|
||||
}
|
||||
if dnsChanged(iface, ns) {
|
||||
|
||||
@@ -40,7 +40,7 @@ func (p *prog) watchResolvConf(iface *net.Interface, ns []netip.Addr, setDnsFn f
|
||||
mainLog.Load().Debug().Msgf("stopping watcher for %s", resolvConfPath)
|
||||
return
|
||||
case event, ok := <-watcher.Events:
|
||||
if p.leakingQuery.Load() {
|
||||
if p.leakingQueryReset.Load() {
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
|
||||
@@ -44,10 +44,6 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor {
|
||||
|
||||
// increaseFailureCount increase failed queries count for an upstream by 1.
|
||||
func (um *upstreamMonitor) increaseFailureCount(upstream string) {
|
||||
// Do not count "upstream.os", since it must not be down for leaking queries.
|
||||
if upstream == upstreamOS {
|
||||
return
|
||||
}
|
||||
um.mu.Lock()
|
||||
defer um.mu.Unlock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user