mirror of
https://github.com/Control-D-Inc/ctrld.git
synced 2026-02-03 22:18:39 +00:00
feat: enhance DNS proxy logging with comprehensive flow tracking
Add detailed logging throughout DNS proxy operations to improve visibility into query processing, cache operations, and upstream resolver performance. Key improvements: - DNS server setup and listener management logging - Complete query processing pipeline visibility - Cache hit/miss and stale response handling logs - Upstream resolver iteration and failure tracking - Resolver-specific logging (OS, DoH, DoT, DoQ, Legacy) - All log messages capitalized for better readability This provides comprehensive debugging capabilities for DNS proxy operations and helps identify performance bottlenecks and failure points in the resolution chain.
This commit is contained in:
committed by
Cuong Manh Le
parent
a084c87370
commit
b7202f8469
@@ -100,6 +100,9 @@ type upstreamForResult struct {
|
||||
// serveDNS sets up and starts a DNS server on the specified listener, handling DNS queries and network monitoring.
|
||||
// This is the main entry point for DNS server functionality
|
||||
func (p *prog) serveDNS(ctx context.Context, listenerNum string) error {
|
||||
logger := p.logger.Load()
|
||||
logger.Debug().Msg("DNS server setup started")
|
||||
|
||||
listenerConfig := p.cfg.Listener[listenerNum]
|
||||
if allocErr := p.allocateIP(listenerConfig.IP); allocErr != nil {
|
||||
p.Error().Err(allocErr).Str("ip", listenerConfig.IP).Msg("serveUDP: failed to allocate listen ip")
|
||||
@@ -110,6 +113,7 @@ func (p *prog) serveDNS(ctx context.Context, listenerNum string) error {
|
||||
p.handleDNSQuery(w, m, listenerNum, listenerConfig)
|
||||
})
|
||||
|
||||
logger.Debug().Msg("DNS server setup completed")
|
||||
return p.startListeners(ctx, listenerConfig, handler)
|
||||
}
|
||||
|
||||
@@ -117,10 +121,14 @@ func (p *prog) serveDNS(ctx context.Context, listenerNum string) error {
|
||||
// It handles local IPv6, RFC 1918, and specified IP listeners, reacting to stop signals or errors.
|
||||
// This function manages the lifecycle of DNS server listeners
|
||||
func (p *prog) startListeners(ctx context.Context, cfg *ctrld.ListenerConfig, handler dns.Handler) error {
|
||||
logger := p.logger.Load()
|
||||
logger.Debug().Msg("Starting DNS listeners")
|
||||
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
|
||||
for _, proto := range []string{"udp", "tcp"} {
|
||||
if needLocalIPv6Listener() {
|
||||
logger.Debug().Str("protocol", proto).Msg("Starting local IPv6 listener")
|
||||
g.Go(func() error {
|
||||
s, errCh := runDNSServer(net.JoinHostPort("::1", strconv.Itoa(cfg.Port)), proto, handler)
|
||||
defer s.Shutdown()
|
||||
@@ -135,6 +143,7 @@ func (p *prog) startListeners(ctx context.Context, cfg *ctrld.ListenerConfig, ha
|
||||
}
|
||||
|
||||
if needRFC1918Listeners(cfg) {
|
||||
logger.Debug().Str("protocol", proto).Msg("Starting RFC1918 listeners")
|
||||
g.Go(func() error {
|
||||
for _, addr := range ctrld.Rfc1918Addresses() {
|
||||
func() {
|
||||
@@ -153,6 +162,7 @@ func (p *prog) startListeners(ctx context.Context, cfg *ctrld.ListenerConfig, ha
|
||||
})
|
||||
}
|
||||
|
||||
logger.Debug().Str("protocol", proto).Str("ip", cfg.IP).Int("port", cfg.Port).Msg("Starting main listener")
|
||||
g.Go(func() error {
|
||||
addr := net.JoinHostPort(cfg.IP, strconv.Itoa(cfg.Port))
|
||||
s, errCh := runDNSServer(addr, proto, handler)
|
||||
@@ -168,6 +178,7 @@ func (p *prog) startListeners(ctx context.Context, cfg *ctrld.ListenerConfig, ha
|
||||
})
|
||||
}
|
||||
|
||||
logger.Debug().Msg("DNS listeners started successfully")
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
@@ -186,8 +197,10 @@ func (p *prog) handleDNSQuery(w dns.ResponseWriter, m *dns.Msg, listenerNum stri
|
||||
ctx := context.WithValue(context.Background(), ctrld.ReqIdCtxKey{}, reqID)
|
||||
ctx = ctrld.LoggerCtx(ctx, p.logger.Load())
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Processing DNS query from %s", w.RemoteAddr().String())
|
||||
|
||||
if !listenerConfig.AllowWanClients && isWanClient(w.RemoteAddr()) {
|
||||
ctrld.Log(ctx, p.Debug(), "query refused, listener does not allow WAN clients: %s", w.RemoteAddr().String())
|
||||
ctrld.Log(ctx, p.Debug(), "Query refused, listener does not allow WAN clients: %s", w.RemoteAddr().String())
|
||||
sendDNSResponse(w, m, dns.RcodeRefused)
|
||||
return
|
||||
}
|
||||
@@ -198,8 +211,11 @@ func (p *prog) handleDNSQuery(w dns.ResponseWriter, m *dns.Msg, listenerNum stri
|
||||
domain := canonicalName(q.Name)
|
||||
|
||||
if p.handleSpecialDomains(ctx, w, m, domain) {
|
||||
ctrld.Log(ctx, p.Debug(), "Special domain query handled")
|
||||
return
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Processing standard query for domain: %s", domain)
|
||||
p.processStandardQuery(&standardQueryRequest{
|
||||
ctx: ctx,
|
||||
writer: w,
|
||||
@@ -215,9 +231,11 @@ func (p *prog) handleDNSQuery(w dns.ResponseWriter, m *dns.Msg, listenerNum stri
|
||||
func (p *prog) handleSpecialDomains(ctx context.Context, w dns.ResponseWriter, m *dns.Msg, domain string) bool {
|
||||
switch {
|
||||
case domain == "":
|
||||
ctrld.Log(ctx, p.Debug(), "Empty domain query, sending format error")
|
||||
sendDNSResponse(w, m, dns.RcodeFormatError)
|
||||
return true
|
||||
case domain == selfCheckInternalTestDomain:
|
||||
ctrld.Log(ctx, p.Debug(), "Internal test domain query: %s", domain)
|
||||
answer := resolveInternalDomainTestQuery(ctx, domain, m)
|
||||
_ = w.WriteMsg(answer)
|
||||
return true
|
||||
@@ -225,7 +243,7 @@ func (p *prog) handleSpecialDomains(ctx context.Context, w dns.ResponseWriter, m
|
||||
|
||||
if _, ok := p.cacheFlushDomainsMap[domain]; ok && p.cache != nil {
|
||||
p.cache.Purge()
|
||||
ctrld.Log(ctx, p.Debug(), "received query %q, local cache is purged", domain)
|
||||
ctrld.Log(ctx, p.Debug(), "Received query %q, local cache is purged", domain)
|
||||
}
|
||||
|
||||
return false
|
||||
@@ -245,6 +263,8 @@ type standardQueryRequest struct {
|
||||
// processStandardQuery handles a standard DNS query by routing it through appropriate upstreams and writing a DNS response.
|
||||
// This is the main processing pipeline for normal DNS queries
|
||||
func (p *prog) processStandardQuery(req *standardQueryRequest) {
|
||||
ctrld.Log(req.ctx, p.Debug(), "Processing standard query started")
|
||||
|
||||
remoteIP, _, _ := net.SplitHostPort(req.writer.RemoteAddr().String())
|
||||
ci := p.getClientInfo(remoteIP, req.msg)
|
||||
ci.ClientIDPref = p.cfg.Service.ClientIDPref
|
||||
@@ -262,13 +282,14 @@ func (p *prog) processStandardQuery(req *standardQueryRequest) {
|
||||
var answer *dns.Msg
|
||||
// Handle restricted listener case
|
||||
if !ur.matched && req.listenerConfig.Restricted {
|
||||
ctrld.Log(req.ctx, p.Debug(), "query refused, %s does not match any network policy", remoteAddr.String())
|
||||
ctrld.Log(req.ctx, p.Debug(), "Query refused, %s does not match any network policy", remoteAddr.String())
|
||||
answer = new(dns.Msg)
|
||||
answer.SetRcode(req.msg, dns.RcodeRefused)
|
||||
// Process the refused query
|
||||
go p.postProcessStandardQuery(ci, req.listenerConfig, q, &proxyResponse{answer: answer, refused: true})
|
||||
} else {
|
||||
// Process a normal query
|
||||
ctrld.Log(req.ctx, p.Debug(), "Starting proxy query processing")
|
||||
pr := p.proxy(req.ctx, &proxyRequest{
|
||||
msg: req.msg,
|
||||
ci: ci,
|
||||
@@ -277,7 +298,7 @@ func (p *prog) processStandardQuery(req *standardQueryRequest) {
|
||||
})
|
||||
|
||||
rtt := time.Since(startTime)
|
||||
ctrld.Log(req.ctx, p.Debug(), "received response of %d bytes in %s", pr.answer.Len(), rtt)
|
||||
ctrld.Log(req.ctx, p.Debug(), "Received response of %d bytes in %s", pr.answer.Len(), rtt)
|
||||
|
||||
go p.postProcessStandardQuery(ci, req.listenerConfig, q, pr)
|
||||
answer = pr.answer
|
||||
@@ -286,6 +307,8 @@ func (p *prog) processStandardQuery(req *standardQueryRequest) {
|
||||
if err := req.writer.WriteMsg(answer); err != nil {
|
||||
ctrld.Log(req.ctx, p.Error().Err(err), "serveDNS: failed to send DNS response to client")
|
||||
}
|
||||
|
||||
ctrld.Log(req.ctx, p.Debug(), "Standard query processing completed")
|
||||
}
|
||||
|
||||
// postProcessStandardQuery performs additional actions after processing a standard DNS query, such as metrics recording,
|
||||
@@ -557,19 +580,28 @@ func (p *prog) handleSpecialQueryTypes(ctx *context.Context, req *proxyRequest,
|
||||
|
||||
// proxy handles DNS query proxying by selecting upstreams, attempting cache lookups, and querying configured resolvers.
|
||||
func (p *prog) proxy(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
ctrld.Log(ctx, p.Debug(), "Proxy query processing started")
|
||||
|
||||
upstreams, upstreamConfigs := p.initializeUpstreams(req)
|
||||
ctrld.Log(ctx, p.Debug(), "Initialized upstreams: %v", upstreams)
|
||||
|
||||
if specialRes := p.handleSpecialQueryTypes(&ctx, req, &upstreams, &upstreamConfigs); specialRes != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Special query type handled")
|
||||
return specialRes
|
||||
}
|
||||
|
||||
if cachedRes := p.tryCache(ctx, req, upstreams); cachedRes != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Cache hit, returning cached response")
|
||||
return cachedRes
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "No cache hit, trying upstreams")
|
||||
if res := p.tryUpstreams(ctx, req, upstreams, upstreamConfigs); res != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream query successful")
|
||||
return res
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "All upstreams failed, handling failure")
|
||||
return p.handleAllUpstreamsFailure(ctx, req, upstreams)
|
||||
}
|
||||
|
||||
@@ -591,14 +623,19 @@ func (p *prog) initializeUpstreams(req *proxyRequest) ([]string, []*ctrld.Upstre
|
||||
// Iterates through the provided upstreams to find a cached response using the checkCache method.
|
||||
func (p *prog) tryCache(ctx context.Context, req *proxyRequest, upstreams []string) *proxyResponse {
|
||||
if p.cache == nil || req.msg.Question[0].Qtype == dns.TypePTR { // https://www.rfc-editor.org/rfc/rfc1035#section-7.4
|
||||
ctrld.Log(ctx, p.Debug(), "Cache disabled or PTR query, skipping cache lookup")
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Checking cache for upstreams: %v", upstreams)
|
||||
for _, upstream := range upstreams {
|
||||
if res := p.checkCache(ctx, req, upstream); res != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Cache hit found for upstream: %s", upstream)
|
||||
return res
|
||||
}
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "No cache hit found")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -607,6 +644,7 @@ func (p *prog) tryCache(ctx context.Context, req *proxyRequest, upstreams []stri
|
||||
func (p *prog) checkCache(ctx context.Context, req *proxyRequest, upstream string) *proxyResponse {
|
||||
cachedValue := p.cache.Get(dnscache.NewKey(req.msg, upstream))
|
||||
if cachedValue == nil {
|
||||
ctrld.Log(ctx, p.Debug(), "No cached value found for upstream: %s", upstream)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -615,10 +653,12 @@ func (p *prog) checkCache(ctx context.Context, req *proxyRequest, upstream strin
|
||||
now := time.Now()
|
||||
|
||||
if cachedValue.Expire.After(now) {
|
||||
ctrld.Log(ctx, p.Debug(), "hit cached response")
|
||||
ctrld.Log(ctx, p.Debug(), "Hit cached response")
|
||||
setCachedAnswerTTL(answer, now, cachedValue.Expire)
|
||||
return &proxyResponse{answer: answer, cached: true}
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Cached response expired, storing as stale")
|
||||
req.staleAnswer = answer
|
||||
return nil
|
||||
}
|
||||
@@ -633,12 +673,12 @@ func (p *prog) updateCache(ctx context.Context, req *proxyRequest, answer *dns.M
|
||||
}
|
||||
setCachedAnswerTTL(answer, now, expired)
|
||||
p.cache.Add(dnscache.NewKey(req.msg, upstream), dnscache.NewValue(answer, expired))
|
||||
ctrld.Log(ctx, p.Debug(), "add cached response")
|
||||
ctrld.Log(ctx, p.Debug(), "Added cached response")
|
||||
}
|
||||
|
||||
// serveStaleResponse serves a stale cached DNS response when an upstream query fails, updating TTL for cached records.
|
||||
func (p *prog) serveStaleResponse(ctx context.Context, staleAnswer *dns.Msg) *proxyResponse {
|
||||
ctrld.Log(ctx, p.Debug(), "serving stale cached response")
|
||||
ctrld.Log(ctx, p.Debug(), "Serving stale cached response")
|
||||
now := time.Now()
|
||||
setCachedAnswerTTL(staleAnswer, now, now.Add(staleTTL))
|
||||
return &proxyResponse{answer: staleAnswer, cached: true}
|
||||
@@ -646,21 +686,27 @@ func (p *prog) serveStaleResponse(ctx context.Context, staleAnswer *dns.Msg) *pr
|
||||
|
||||
// handleAllUpstreamsFailure handles the failure scenario when all upstream resolvers fail to respond or process the request.
|
||||
func (p *prog) handleAllUpstreamsFailure(ctx context.Context, req *proxyRequest, upstreams []string) *proxyResponse {
|
||||
ctrld.Log(ctx, p.Error(), "all %v endpoints failed", upstreams)
|
||||
ctrld.Log(ctx, p.Error(), "All %v endpoints failed", upstreams)
|
||||
|
||||
if p.leakOnUpstreamFailure() {
|
||||
ctrld.Log(ctx, p.Debug(), "Leak on upstream failure enabled")
|
||||
if p.um.countHealthy(upstreams) == 0 {
|
||||
ctrld.Log(ctx, p.Debug(), "No healthy upstreams, triggering recovery")
|
||||
p.triggerRecovery(upstreams[0] == upstreamOS)
|
||||
} else {
|
||||
p.Debug().Msg("One upstream is down but at least one is healthy; skipping recovery trigger")
|
||||
ctrld.Log(ctx, p.Debug(), "One upstream is down but at least one is healthy; skipping recovery trigger")
|
||||
}
|
||||
|
||||
if upstreams[0] != upstreamOS {
|
||||
ctrld.Log(ctx, p.Debug(), "Trying OS resolver as fallback")
|
||||
if answer := p.tryOSResolver(ctx, req); answer != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "OS resolver fallback successful")
|
||||
return answer
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Returning server failure response")
|
||||
answer := new(dns.Msg)
|
||||
answer.SetRcode(req.msg, dns.RcodeServerFailure)
|
||||
return &proxyResponse{answer: answer}
|
||||
@@ -669,29 +715,34 @@ func (p *prog) handleAllUpstreamsFailure(ctx context.Context, req *proxyRequest,
|
||||
// shouldContinueWithNextUpstream determines whether processing should continue with the next upstream based on response conditions.
|
||||
func (p *prog) shouldContinueWithNextUpstream(ctx context.Context, req *proxyRequest, answer *dns.Msg, upstream string, lastUpstream bool) bool {
|
||||
if answer.Rcode == dns.RcodeSuccess {
|
||||
ctrld.Log(ctx, p.Debug(), "Successful response, not continuing to next upstream")
|
||||
return false
|
||||
}
|
||||
|
||||
// We are doing LAN/PTR lookup using private resolver, so always process the next one.
|
||||
// Except for the last, we want to send a response instead of saying all upstream failed.
|
||||
if req.isLanOrPtrQuery && !lastUpstream {
|
||||
ctrld.Log(ctx, p.Debug(), "no response for LAN/PTR query from %s, process to next upstream", upstream)
|
||||
ctrld.Log(ctx, p.Debug(), "No response for LAN/PTR query from %s, process to next upstream", upstream)
|
||||
return true
|
||||
}
|
||||
|
||||
if len(req.upstreamConfigs) > 1 && slices.Contains(req.failoverRcodes, answer.Rcode) {
|
||||
ctrld.Log(ctx, p.Debug(), "failover rcode matched, process to next upstream")
|
||||
ctrld.Log(ctx, p.Debug(), "Failover rcode matched, process to next upstream")
|
||||
return true
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Not continuing to next upstream")
|
||||
return false
|
||||
}
|
||||
|
||||
// prepareSuccessResponse prepares a successful DNS response for a given request, logs it, and updates the cache if applicable.
|
||||
func (p *prog) prepareSuccessResponse(ctx context.Context, req *proxyRequest, answer *dns.Msg, upstream string, upstreamConfig *ctrld.UpstreamConfig) *proxyResponse {
|
||||
ctrld.Log(ctx, p.Debug(), "Preparing success response")
|
||||
|
||||
answer.Compress = true
|
||||
|
||||
if p.cache != nil && req.msg.Question[0].Qtype != dns.TypePTR {
|
||||
ctrld.Log(ctx, p.Debug(), "Updating cache with successful response")
|
||||
p.updateCache(ctx, req, answer, upstream)
|
||||
}
|
||||
|
||||
@@ -715,12 +766,22 @@ func (p *prog) prepareSuccessResponse(ctx context.Context, req *proxyRequest, an
|
||||
func (p *prog) tryUpstreams(ctx context.Context, req *proxyRequest, upstreams []string, upstreamConfigs []*ctrld.UpstreamConfig) *proxyResponse {
|
||||
serveStaleCache := p.cache != nil && p.cfg.Service.CacheServeStale
|
||||
req.upstreamConfigs = upstreamConfigs
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Trying %d upstreams", len(upstreamConfigs))
|
||||
|
||||
for n, upstreamConfig := range upstreamConfigs {
|
||||
last := n == len(upstreamConfigs)-1
|
||||
ctrld.Log(ctx, p.Debug(), "Processing upstream %d/%d: %s", n+1, len(upstreamConfigs), upstreams[n])
|
||||
|
||||
if res := p.processUpstream(ctx, req, upstreams[n], upstreamConfig, serveStaleCache, last); res != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream %s succeeded", upstreams[n])
|
||||
return res
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream %s failed", upstreams[n])
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "All upstreams failed")
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -729,6 +790,7 @@ func (p *prog) tryUpstreams(ctx context.Context, req *proxyRequest, upstreams []
|
||||
// Returns a proxyResponse on success or nil if the upstream query fails or processing conditions are not met.
|
||||
func (p *prog) processUpstream(ctx context.Context, req *proxyRequest, upstream string, upstreamConfig *ctrld.UpstreamConfig, serveStaleCache, lastUpstream bool) *proxyResponse {
|
||||
if upstreamConfig == nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream config is nil, skipping")
|
||||
return nil
|
||||
}
|
||||
if p.isLoop(upstreamConfig) {
|
||||
@@ -740,14 +802,18 @@ func (p *prog) processUpstream(ctx context.Context, req *proxyRequest, upstream
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Querying upstream: %s", upstream)
|
||||
answer := p.queryUpstream(ctx, req, upstream, upstreamConfig)
|
||||
if answer == nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream query failed")
|
||||
if serveStaleCache && req.staleAnswer != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Serving stale response due to upstream failure")
|
||||
return p.serveStaleResponse(ctx, req.staleAnswer)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream query successful")
|
||||
if p.shouldContinueWithNextUpstream(ctx, req, answer, upstream, lastUpstream) {
|
||||
return nil
|
||||
}
|
||||
@@ -757,21 +823,24 @@ func (p *prog) processUpstream(ctx context.Context, req *proxyRequest, upstream
|
||||
// queryUpstream sends a DNS query to a specified upstream using its configuration and handles errors and retries.
|
||||
func (p *prog) queryUpstream(ctx context.Context, req *proxyRequest, upstream string, upstreamConfig *ctrld.UpstreamConfig) *dns.Msg {
|
||||
if upstreamConfig.UpstreamSendClientInfo() && req.ci != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Adding client info to upstream query")
|
||||
ctx = context.WithValue(ctx, ctrld.ClientInfoCtxKey{}, req.ci)
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "sending query to %s: %s", upstream, upstreamConfig.Name)
|
||||
ctrld.Log(ctx, p.Debug(), "Sending query to %s: %s", upstream, upstreamConfig.Name)
|
||||
dnsResolver, err := ctrld.NewResolver(ctx, upstreamConfig)
|
||||
if err != nil {
|
||||
ctrld.Log(ctx, p.Error().Err(err), "failed to create resolver")
|
||||
ctrld.Log(ctx, p.Error().Err(err), "Failed to create resolver")
|
||||
return nil
|
||||
}
|
||||
|
||||
resolveCtx, cancel := upstreamConfig.Context(ctx)
|
||||
defer cancel()
|
||||
|
||||
ctrld.Log(ctx, p.Debug(), "Resolving query with upstream")
|
||||
answer, err := dnsResolver.Resolve(resolveCtx, req.msg)
|
||||
if answer != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "Upstream resolution successful")
|
||||
p.um.mu.Lock()
|
||||
p.um.failureReq[upstream] = 0
|
||||
p.um.down[upstream] = false
|
||||
@@ -779,17 +848,19 @@ func (p *prog) queryUpstream(ctx context.Context, req *proxyRequest, upstream st
|
||||
return answer
|
||||
}
|
||||
|
||||
ctrld.Log(ctx, p.Error().Err(err), "failed to resolve query")
|
||||
ctrld.Log(ctx, p.Error().Err(err), "Failed to resolve query")
|
||||
// Increasing the failure count when there is no answer regardless of what kind of error we get
|
||||
p.um.increaseFailureCount(upstream)
|
||||
if err != nil {
|
||||
// For timeout error (i.e: context deadline exceed), force re-bootstrapping.
|
||||
var e net.Error
|
||||
if errors.As(err, &e) && e.Timeout() {
|
||||
ctrld.Log(ctx, p.Debug(), "Timeout error, forcing re-bootstrapping")
|
||||
upstreamConfig.ReBootstrap(ctx)
|
||||
}
|
||||
// For network error, turn ipv6 off if enabled.
|
||||
if ctrld.HasIPv6(ctx) && (errUrlNetworkError(err) || errNetworkError(err)) {
|
||||
ctrld.Log(ctx, p.Debug(), "Network error, disabling IPv6")
|
||||
ctrld.DisableIPv6(ctx)
|
||||
}
|
||||
}
|
||||
@@ -820,7 +891,7 @@ func (p *prog) triggerRecovery(isOSFailure bool) {
|
||||
// tryOSResolver attempts to query the OS resolver as a fallback mechanism when other upstreams fail.
|
||||
// Logs success or failure of the query attempt and returns a proxyResponse or nil based on query result.
|
||||
func (p *prog) tryOSResolver(ctx context.Context, req *proxyRequest) *proxyResponse {
|
||||
ctrld.Log(ctx, p.Debug(), "attempting query to OS resolver as a retry catch all")
|
||||
ctrld.Log(ctx, p.Debug(), "Attempting query to OS resolver as a retry catch all")
|
||||
answer := p.queryUpstream(ctx, req, upstreamOS, osUpstreamConfig)
|
||||
if answer != nil {
|
||||
ctrld.Log(ctx, p.Debug(), "OS resolver retry query successful")
|
||||
@@ -1006,6 +1077,8 @@ func spoofRemoteAddr(addr net.Addr, ci *ctrld.ClientInfo) net.Addr {
|
||||
//
|
||||
// It's the caller responsibility to call Shutdown to close the server.
|
||||
func runDNSServer(addr, network string, handler dns.Handler) (*dns.Server, <-chan error) {
|
||||
mainLog.Load().Debug().Str("address", addr).Str("network", network).Msg("Starting DNS server")
|
||||
|
||||
s := &dns.Server{
|
||||
Addr: addr,
|
||||
Net: network,
|
||||
@@ -1025,6 +1098,7 @@ func runDNSServer(addr, network string, handler dns.Handler) (*dns.Server, <-cha
|
||||
}
|
||||
}()
|
||||
<-startedCh
|
||||
mainLog.Load().Debug().Str("address", addr).Str("network", network).Msg("DNS server started successfully")
|
||||
return s, errCh
|
||||
}
|
||||
|
||||
|
||||
23
config.go
23
config.go
@@ -438,21 +438,17 @@ func (uc *UpstreamConfig) UID() string {
|
||||
return uc.uid
|
||||
}
|
||||
|
||||
// SetupBootstrapIP manually find all available IPs of the upstream.
|
||||
// The first usable IP will be used as bootstrap IP of the upstream.
|
||||
// The upstream domain will be looked up using following orders:
|
||||
//
|
||||
// - Current system DNS settings.
|
||||
// - Direct IPs table for ControlD upstreams.
|
||||
// - ControlD Bootstrap DNS 76.76.2.22
|
||||
//
|
||||
// SetupBootstrapIP sets up bootstrap IPs for the upstream config.
|
||||
// The setup process will block until there's usable IPs found.
|
||||
func (uc *UpstreamConfig) SetupBootstrapIP(ctx context.Context) {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "Setting up bootstrap IPs for upstream: %s", uc.Name)
|
||||
|
||||
b := backoff.NewBackoff("setupBootstrapIP", func(format string, args ...any) {}, 10*time.Second)
|
||||
isControlD := uc.IsControlD()
|
||||
logger := LoggerFromCtx(ctx)
|
||||
nss := initDefaultOsResolver(ctx)
|
||||
for {
|
||||
Log(ctx, logger.Debug(), "Looking up bootstrap IPs for domain: %s", uc.Domain)
|
||||
uc.bootstrapIPs = lookupIP(ctx, uc.Domain, uc.Timeout, nss)
|
||||
// For ControlD upstream, the bootstrap IPs could not be RFC 1918 addresses,
|
||||
// filtering them out here to prevent weird behavior.
|
||||
@@ -468,18 +464,18 @@ func (uc *UpstreamConfig) SetupBootstrapIP(ctx context.Context) {
|
||||
uc.bootstrapIPs = uc.bootstrapIPs[:n]
|
||||
if len(uc.bootstrapIPs) == 0 {
|
||||
uc.bootstrapIPs = bootstrapIPsFromControlDDomain(uc.Domain)
|
||||
logger.Warn().Msgf("no record found for %q, lookup from direct IP table", uc.Domain)
|
||||
logger.Warn().Msgf("No record found for %q, lookup from direct IP table", uc.Domain)
|
||||
}
|
||||
}
|
||||
if len(uc.bootstrapIPs) == 0 {
|
||||
logger.Warn().Msgf("no record found for %q, using bootstrap server: %s", uc.Domain, PremiumDNSBoostrapIP)
|
||||
logger.Warn().Msgf("No record found for %q, using bootstrap server: %s", uc.Domain, PremiumDNSBoostrapIP)
|
||||
uc.bootstrapIPs = lookupIP(ctx, uc.Domain, uc.Timeout, []string{net.JoinHostPort(PremiumDNSBoostrapIP, "53")})
|
||||
|
||||
}
|
||||
if len(uc.bootstrapIPs) > 0 {
|
||||
break
|
||||
}
|
||||
logger.Warn().Msg("could not resolve bootstrap IPs, retrying...")
|
||||
logger.Warn().Msg("Could not resolve bootstrap IPs, retrying...")
|
||||
b.BackOff(context.Background(), errors.New("no bootstrap IPs"))
|
||||
}
|
||||
for _, ip := range uc.bootstrapIPs {
|
||||
@@ -489,7 +485,8 @@ func (uc *UpstreamConfig) SetupBootstrapIP(ctx context.Context) {
|
||||
uc.bootstrapIPs4 = append(uc.bootstrapIPs4, ip)
|
||||
}
|
||||
}
|
||||
logger.Debug().Msgf("bootstrap IPs: %v", uc.bootstrapIPs)
|
||||
logger.Debug().Msgf("Bootstrap IPs: %v", uc.bootstrapIPs)
|
||||
Log(ctx, logger.Debug(), "Bootstrap IP setup completed for upstream: %s", uc.Name)
|
||||
}
|
||||
|
||||
// ReBootstrap re-setup the bootstrap IP and the transport.
|
||||
|
||||
18
doh.go
18
doh.go
@@ -88,8 +88,12 @@ type dohResolver struct {
|
||||
|
||||
// Resolve performs DNS query with given DNS message using DOH protocol.
|
||||
func (r *dohResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error) {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "DoH resolver query started")
|
||||
|
||||
data, err := msg.Pack()
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "Failed to pack DNS message")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -101,6 +105,7 @@ func (r *dohResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, erro
|
||||
endpoint.RawQuery = query.Encode()
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint.String(), nil)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "Could not create HTTP request")
|
||||
return nil, fmt.Errorf("could not create request: %w", err)
|
||||
}
|
||||
addHeader(ctx, req, r.uc)
|
||||
@@ -112,16 +117,19 @@ func (r *dohResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, erro
|
||||
if r.isDoH3 {
|
||||
transport := r.uc.doh3Transport(ctx, dnsTyp)
|
||||
if transport == nil {
|
||||
Log(ctx, logger.Error(), "DoH3 is not supported")
|
||||
return nil, errors.New("DoH3 is not supported")
|
||||
}
|
||||
c.Transport = transport
|
||||
}
|
||||
|
||||
Log(ctx, logger.Debug(), "Sending DoH request to: %s", endpoint.String())
|
||||
resp, err := c.Do(req)
|
||||
if err != nil && r.uc.FallbackToDirectIP(ctx) {
|
||||
retryCtx, cancel := r.uc.Context(context.WithoutCancel(ctx))
|
||||
defer cancel()
|
||||
logger := LoggerFromCtx(ctx)
|
||||
logger.Warn().Err(err).Msg("retrying request after fallback to direct ip")
|
||||
logger.Warn().Err(err).Msg("Retrying request after fallback to direct ip")
|
||||
resp, err = c.Do(req.Clone(retryCtx))
|
||||
}
|
||||
if err != nil {
|
||||
@@ -131,23 +139,29 @@ func (r *dohResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, erro
|
||||
closer.Close()
|
||||
}
|
||||
}
|
||||
Log(ctx, logger.Error().Err(err), "DoH request failed")
|
||||
return nil, fmt.Errorf("could not perform request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
buf, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "Could not read response body")
|
||||
return nil, fmt.Errorf("could not read message from response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
Log(ctx, logger.Error(), "Wrong response from DOH server, got: %s, status: %d", string(buf), resp.StatusCode)
|
||||
return nil, fmt.Errorf("wrong response from DOH server, got: %s, status: %d", string(buf), resp.StatusCode)
|
||||
}
|
||||
|
||||
answer := new(dns.Msg)
|
||||
if err := answer.Unpack(buf); err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "Failed to unpack DNS answer")
|
||||
return nil, fmt.Errorf("answer.Unpack: %w", err)
|
||||
}
|
||||
|
||||
Log(ctx, logger.Debug(), "DoH resolver query successful")
|
||||
return answer, nil
|
||||
}
|
||||
|
||||
@@ -168,7 +182,7 @@ func addHeader(ctx context.Context, req *http.Request, uc *UpstreamConfig) {
|
||||
}
|
||||
if printed {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "sending request header: %v", dohHeader)
|
||||
Log(ctx, logger.Debug(), "Sending request header: %v", dohHeader)
|
||||
}
|
||||
dohHeader.Set("Content-Type", headerApplicationDNS)
|
||||
dohHeader.Set("Accept", headerApplicationDNS)
|
||||
|
||||
13
doq.go
13
doq.go
@@ -18,6 +18,9 @@ type doqResolver struct {
|
||||
}
|
||||
|
||||
func (r *doqResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error) {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "DoQ resolver query started")
|
||||
|
||||
endpoint := r.uc.Endpoint
|
||||
tlsConfig := &tls.Config{NextProtos: []string{"doq"}}
|
||||
ip := r.uc.BootstrapIP
|
||||
@@ -31,7 +34,15 @@ func (r *doqResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, erro
|
||||
tlsConfig.ServerName = r.uc.Domain
|
||||
_, port, _ := net.SplitHostPort(endpoint)
|
||||
endpoint = net.JoinHostPort(ip, port)
|
||||
return resolve(ctx, msg, endpoint, tlsConfig)
|
||||
|
||||
Log(ctx, logger.Debug(), "Sending DoQ request to: %s", endpoint)
|
||||
answer, err := resolve(ctx, msg, endpoint, tlsConfig)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "DoQ request failed")
|
||||
} else {
|
||||
Log(ctx, logger.Debug(), "DoQ resolver query successful")
|
||||
}
|
||||
return answer, err
|
||||
}
|
||||
|
||||
func resolve(ctx context.Context, msg *dns.Msg, endpoint string, tlsConfig *tls.Config) (*dns.Msg, error) {
|
||||
|
||||
9
dot.go
9
dot.go
@@ -13,6 +13,9 @@ type dotResolver struct {
|
||||
}
|
||||
|
||||
func (r *dotResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error) {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "DoT resolver query started")
|
||||
|
||||
// The dialer is used to prevent bootstrapping cycle.
|
||||
// If r.endpoint is set to dns.controld.dev, we need to resolve
|
||||
// dns.controld.dev first. By using a dialer with custom resolver,
|
||||
@@ -37,6 +40,12 @@ func (r *dotResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, erro
|
||||
endpoint = net.JoinHostPort(r.uc.BootstrapIP, port)
|
||||
}
|
||||
|
||||
Log(ctx, logger.Debug(), "Sending DoT request to: %s", endpoint)
|
||||
answer, _, err := dnsClient.ExchangeContext(ctx, msg, endpoint)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "DoT request failed")
|
||||
} else {
|
||||
Log(ctx, logger.Debug(), "DoT resolver query successful")
|
||||
}
|
||||
return answer, wrapCertificateVerificationError(err)
|
||||
}
|
||||
|
||||
22
resolver.go
22
resolver.go
@@ -277,10 +277,12 @@ func (o *osResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
key := fmt.Sprintf("%s:%d:", domain, qtype)
|
||||
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "OS resolver query started: %s - %s", domain, dns.TypeToString[qtype])
|
||||
|
||||
// Checking the cache first.
|
||||
if val, ok := o.cache.Load(key); ok {
|
||||
if val, ok := val.(*dns.Msg); ok {
|
||||
Log(ctx, logger.Debug(), "hit hot cached result: %s - %s", domain, dns.TypeToString[qtype])
|
||||
Log(ctx, logger.Debug(), "Hit hot cached result: %s - %s", domain, dns.TypeToString[qtype])
|
||||
res := val.Copy()
|
||||
SetCacheReply(res, msg, val.Rcode)
|
||||
return res, nil
|
||||
@@ -289,8 +291,10 @@ func (o *osResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
|
||||
// Ensure only one DNS query is in flight for the key.
|
||||
v, err, shared := o.group.Do(key, func() (interface{}, error) {
|
||||
Log(ctx, logger.Debug(), "Resolving query: %s - %s", domain, dns.TypeToString[qtype])
|
||||
msg, err := o.resolve(ctx, msg)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "OS resolver query failed: %s - %s", domain, dns.TypeToString[qtype])
|
||||
return nil, err
|
||||
}
|
||||
// If we got an answer, storing it to the hot cache for hotCacheTTL
|
||||
@@ -302,6 +306,7 @@ func (o *osResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
time.AfterFunc(hotCacheTTL, func() {
|
||||
o.removeCache(key)
|
||||
})
|
||||
Log(ctx, logger.Debug(), "OS resolver query successful: %s - %s", domain, dns.TypeToString[qtype])
|
||||
return msg, nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -315,7 +320,7 @@ func (o *osResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
res := sharedMsg.Copy()
|
||||
SetCacheReply(res, msg, sharedMsg.Rcode)
|
||||
if shared {
|
||||
Log(ctx, logger.Debug(), "shared result: %s - %s", domain, dns.TypeToString[qtype])
|
||||
Log(ctx, logger.Debug(), "Shared result: %s - %s", domain, dns.TypeToString[qtype])
|
||||
}
|
||||
|
||||
return res, nil
|
||||
@@ -346,7 +351,7 @@ func (o *osResolver) resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
question = msg.Question[0].Name
|
||||
}
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "os resolver query for %s with nameservers: %v public: %v", question, nss, publicServers)
|
||||
Log(ctx, logger.Debug(), "OS resolver query for %s with nameservers: %v public: %v", question, nss, publicServers)
|
||||
|
||||
// New check: If no resolvers are available, return an error.
|
||||
if numServers == 0 {
|
||||
@@ -395,7 +400,7 @@ func (o *osResolver) resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error
|
||||
// If splitting fails, fallback to the original server string
|
||||
host = server
|
||||
}
|
||||
Log(ctx, logger.Debug(), "got answer from nameserver: %s", host)
|
||||
Log(ctx, logger.Debug(), "Got answer from nameserver: %s", host)
|
||||
}
|
||||
|
||||
// try local nameservers
|
||||
@@ -487,6 +492,9 @@ type legacyResolver struct {
|
||||
}
|
||||
|
||||
func (r *legacyResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, error) {
|
||||
logger := LoggerFromCtx(ctx)
|
||||
Log(ctx, logger.Debug(), "Legacy resolver query started")
|
||||
|
||||
// See comment in (*dotResolver).resolve method.
|
||||
dialer := newDialer(net.JoinHostPort(controldPublicDns, "53"))
|
||||
dnsTyp := uint16(0)
|
||||
@@ -505,7 +513,13 @@ func (r *legacyResolver) Resolve(ctx context.Context, msg *dns.Msg) (*dns.Msg, e
|
||||
endpoint = net.JoinHostPort(r.uc.BootstrapIP, port)
|
||||
}
|
||||
|
||||
Log(ctx, logger.Debug(), "Sending legacy request to: %s", endpoint)
|
||||
answer, _, err := dnsClient.ExchangeContext(ctx, msg, endpoint)
|
||||
if err != nil {
|
||||
Log(ctx, logger.Error().Err(err), "Legacy request failed")
|
||||
} else {
|
||||
Log(ctx, logger.Debug(), "Legacy resolver query successful")
|
||||
}
|
||||
return answer, err
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user