From 89600f6091adc7b57b5df631959ef8a6108f60be Mon Sep 17 00:00:00 2001 From: Cuong Manh Le Date: Wed, 15 Jan 2025 19:51:55 +0700 Subject: [PATCH] cmd/cli: new flow for leaking queries to OS resolver The current flow involves marking OS resolver as down, which is not right at all, since ctrld depends on it for leaking queries. This commits implements new flow, which ctrld will restore DNS settings once leaking marked, allowing queries go to OS resolver until the internet connection is established. --- cmd/cli/dns_proxy.go | 19 ++++++++++++++----- cmd/cli/prog.go | 2 +- cmd/cli/resolvconf.go | 2 +- cmd/cli/upstream_monitor.go | 26 ++++---------------------- 4 files changed, 20 insertions(+), 29 deletions(-) diff --git a/cmd/cli/dns_proxy.go b/cmd/cli/dns_proxy.go index 631b0e3..4f4b980 100644 --- a/cmd/cli/dns_proxy.go +++ b/cmd/cli/dns_proxy.go @@ -419,12 +419,7 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse { upstreamConfigs := p.upstreamConfigsFromUpstreamNumbers(upstreams) leaked := false - // If ctrld is going to leak query to OS resolver, check remote upstream in background, - // so ctrld could be back to normal operation as long as the network is back online. if len(upstreamConfigs) > 0 && p.leakingQuery.Load() { - for n, uc := range upstreamConfigs { - go p.checkUpstream(upstreams[n], uc) - } upstreamConfigs = nil leaked = true ctrld.Log(ctx, mainLog.Load().Debug(), "%v is down, leaking query to OS resolver", upstreams) @@ -936,11 +931,25 @@ func (p *prog) performLeakingQuery() { mainLog.Load().Warn().Msg("leaking query to OS resolver") // Signal dns watchers to stop, so changes made below won't be reverted. p.leakingQuery.Store(true) + defer func() { + p.leakingQuery.Store(false) + p.leakingQueryMu.Lock() + p.leakingQueryWasRun = false + p.leakingQueryMu.Unlock() + }() + // Reset DNS, so queries are forwarded to OS resolver normally. p.resetDNS() + // Check remote upstream in background, so ctrld could be back to normal + // operation as long as the network is back online. + for name, uc := range p.cfg.Upstream { + p.checkUpstream(name, uc) + } + // After all upstream back, re-initializing OS resolver. ns := ctrld.InitializeOsResolver() mainLog.Load().Debug().Msgf("re-initialized OS resolver with nameservers: %v", ns) p.dnsWg.Wait() p.setDNS() + mainLog.Load().Warn().Msg("stop leaking query") } // forceFetchingAPI sends signal to force syncing API config if run in cd mode, diff --git a/cmd/cli/prog.go b/cmd/cli/prog.go index 2ceac7c..29c1120 100644 --- a/cmd/cli/prog.go +++ b/cmd/cli/prog.go @@ -729,7 +729,7 @@ func (p *prog) dnsWatchdog(iface *net.Interface, nameservers []string, allIfaces mainLog.Load().Debug().Msg("stop dns watchdog") return case <-ticker.C: - if p.leakingQuery.Load() || p.um.isChecking(upstreamOS) { + if p.leakingQuery.Load() { return } if dnsChanged(iface, ns) { diff --git a/cmd/cli/resolvconf.go b/cmd/cli/resolvconf.go index 6bd8c2a..6df7be6 100644 --- a/cmd/cli/resolvconf.go +++ b/cmd/cli/resolvconf.go @@ -40,7 +40,7 @@ func (p *prog) watchResolvConf(iface *net.Interface, ns []netip.Addr, setDnsFn f mainLog.Load().Debug().Msgf("stopping watcher for %s", resolvConfPath) return case event, ok := <-watcher.Events: - if p.leakingQuery.Load() || p.um.isChecking(upstreamOS) { + if p.leakingQuery.Load() { return } if !ok { diff --git a/cmd/cli/upstream_monitor.go b/cmd/cli/upstream_monitor.go index 512a8b6..1f3484b 100644 --- a/cmd/cli/upstream_monitor.go +++ b/cmd/cli/upstream_monitor.go @@ -44,6 +44,10 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor { // increaseFailureCount increase failed queries count for an upstream by 1. func (um *upstreamMonitor) increaseFailureCount(upstream string) { + // Do not count "upstream.os", since it must not be down for leaking queries. + if upstream == upstreamOS { + return + } um.mu.Lock() defer um.mu.Unlock() @@ -60,14 +64,6 @@ func (um *upstreamMonitor) isDown(upstream string) bool { return um.down[upstream] } -// isChecking reports whether the given upstream is being checked. -func (um *upstreamMonitor) isChecking(upstream string) bool { - um.mu.Lock() - defer um.mu.Unlock() - - return um.checking[upstream] -} - // reset marks an upstream as up and set failed queries counter to zero. func (um *upstreamMonitor) reset(upstream string) { um.mu.Lock() @@ -94,11 +90,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) { p.um.mu.Unlock() }() - isOsResolver := uc.Type == ctrld.ResolverTypeOS - if isOsResolver { - p.resetDNS() - defer p.setDNS() - } resolver, err := ctrld.NewResolver(uc) if err != nil { mainLog.Load().Warn().Err(err).Msg("could not check upstream") @@ -114,9 +105,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() uc.ReBootstrap() - if isOsResolver { - ctrld.InitializeOsResolver() - } _, err := resolver.Resolve(ctx, msg) return err } @@ -129,12 +117,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) { if err := check(); err == nil { mainLog.Load().Warn().Msgf("upstream %q is online", endpoint) p.um.reset(upstream) - if p.leakingQuery.CompareAndSwap(true, false) { - p.leakingQueryMu.Lock() - p.leakingQueryWasRun = false - p.leakingQueryMu.Unlock() - mainLog.Load().Warn().Msg("stop leaking query") - } return } else { mainLog.Load().Debug().Msgf("checked upstream %q failed: %v", endpoint, err)