mirror of
https://github.com/Control-D-Inc/ctrld.git
synced 2026-02-03 22:18:39 +00:00
cmd/cli: new flow for leaking queries to OS resolver
The current flow involves marking OS resolver as down, which is not right at all, since ctrld depends on it for leaking queries. This commits implements new flow, which ctrld will restore DNS settings once leaking marked, allowing queries go to OS resolver until the internet connection is established.
This commit is contained in:
committed by
Cuong Manh Le
parent
f986a575e8
commit
89600f6091
@@ -419,12 +419,7 @@ func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
upstreamConfigs := p.upstreamConfigsFromUpstreamNumbers(upstreams)
|
||||
|
||||
leaked := false
|
||||
// If ctrld is going to leak query to OS resolver, check remote upstream in background,
|
||||
// so ctrld could be back to normal operation as long as the network is back online.
|
||||
if len(upstreamConfigs) > 0 && p.leakingQuery.Load() {
|
||||
for n, uc := range upstreamConfigs {
|
||||
go p.checkUpstream(upstreams[n], uc)
|
||||
}
|
||||
upstreamConfigs = nil
|
||||
leaked = true
|
||||
ctrld.Log(ctx, mainLog.Load().Debug(), "%v is down, leaking query to OS resolver", upstreams)
|
||||
@@ -936,11 +931,25 @@ func (p *prog) performLeakingQuery() {
|
||||
mainLog.Load().Warn().Msg("leaking query to OS resolver")
|
||||
// Signal dns watchers to stop, so changes made below won't be reverted.
|
||||
p.leakingQuery.Store(true)
|
||||
defer func() {
|
||||
p.leakingQuery.Store(false)
|
||||
p.leakingQueryMu.Lock()
|
||||
p.leakingQueryWasRun = false
|
||||
p.leakingQueryMu.Unlock()
|
||||
}()
|
||||
// Reset DNS, so queries are forwarded to OS resolver normally.
|
||||
p.resetDNS()
|
||||
// Check remote upstream in background, so ctrld could be back to normal
|
||||
// operation as long as the network is back online.
|
||||
for name, uc := range p.cfg.Upstream {
|
||||
p.checkUpstream(name, uc)
|
||||
}
|
||||
// After all upstream back, re-initializing OS resolver.
|
||||
ns := ctrld.InitializeOsResolver()
|
||||
mainLog.Load().Debug().Msgf("re-initialized OS resolver with nameservers: %v", ns)
|
||||
p.dnsWg.Wait()
|
||||
p.setDNS()
|
||||
mainLog.Load().Warn().Msg("stop leaking query")
|
||||
}
|
||||
|
||||
// forceFetchingAPI sends signal to force syncing API config if run in cd mode,
|
||||
|
||||
@@ -729,7 +729,7 @@ func (p *prog) dnsWatchdog(iface *net.Interface, nameservers []string, allIfaces
|
||||
mainLog.Load().Debug().Msg("stop dns watchdog")
|
||||
return
|
||||
case <-ticker.C:
|
||||
if p.leakingQuery.Load() || p.um.isChecking(upstreamOS) {
|
||||
if p.leakingQuery.Load() {
|
||||
return
|
||||
}
|
||||
if dnsChanged(iface, ns) {
|
||||
|
||||
@@ -40,7 +40,7 @@ func (p *prog) watchResolvConf(iface *net.Interface, ns []netip.Addr, setDnsFn f
|
||||
mainLog.Load().Debug().Msgf("stopping watcher for %s", resolvConfPath)
|
||||
return
|
||||
case event, ok := <-watcher.Events:
|
||||
if p.leakingQuery.Load() || p.um.isChecking(upstreamOS) {
|
||||
if p.leakingQuery.Load() {
|
||||
return
|
||||
}
|
||||
if !ok {
|
||||
|
||||
@@ -44,6 +44,10 @@ func newUpstreamMonitor(cfg *ctrld.Config) *upstreamMonitor {
|
||||
|
||||
// increaseFailureCount increase failed queries count for an upstream by 1.
|
||||
func (um *upstreamMonitor) increaseFailureCount(upstream string) {
|
||||
// Do not count "upstream.os", since it must not be down for leaking queries.
|
||||
if upstream == upstreamOS {
|
||||
return
|
||||
}
|
||||
um.mu.Lock()
|
||||
defer um.mu.Unlock()
|
||||
|
||||
@@ -60,14 +64,6 @@ func (um *upstreamMonitor) isDown(upstream string) bool {
|
||||
return um.down[upstream]
|
||||
}
|
||||
|
||||
// isChecking reports whether the given upstream is being checked.
|
||||
func (um *upstreamMonitor) isChecking(upstream string) bool {
|
||||
um.mu.Lock()
|
||||
defer um.mu.Unlock()
|
||||
|
||||
return um.checking[upstream]
|
||||
}
|
||||
|
||||
// reset marks an upstream as up and set failed queries counter to zero.
|
||||
func (um *upstreamMonitor) reset(upstream string) {
|
||||
um.mu.Lock()
|
||||
@@ -94,11 +90,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) {
|
||||
p.um.mu.Unlock()
|
||||
}()
|
||||
|
||||
isOsResolver := uc.Type == ctrld.ResolverTypeOS
|
||||
if isOsResolver {
|
||||
p.resetDNS()
|
||||
defer p.setDNS()
|
||||
}
|
||||
resolver, err := ctrld.NewResolver(uc)
|
||||
if err != nil {
|
||||
mainLog.Load().Warn().Err(err).Msg("could not check upstream")
|
||||
@@ -114,9 +105,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
uc.ReBootstrap()
|
||||
if isOsResolver {
|
||||
ctrld.InitializeOsResolver()
|
||||
}
|
||||
_, err := resolver.Resolve(ctx, msg)
|
||||
return err
|
||||
}
|
||||
@@ -129,12 +117,6 @@ func (p *prog) checkUpstream(upstream string, uc *ctrld.UpstreamConfig) {
|
||||
if err := check(); err == nil {
|
||||
mainLog.Load().Warn().Msgf("upstream %q is online", endpoint)
|
||||
p.um.reset(upstream)
|
||||
if p.leakingQuery.CompareAndSwap(true, false) {
|
||||
p.leakingQueryMu.Lock()
|
||||
p.leakingQueryWasRun = false
|
||||
p.leakingQueryMu.Unlock()
|
||||
mainLog.Load().Warn().Msg("stop leaking query")
|
||||
}
|
||||
return
|
||||
} else {
|
||||
mainLog.Load().Debug().Msgf("checked upstream %q failed: %v", endpoint, err)
|
||||
|
||||
Reference in New Issue
Block a user