all: implement policy failover rcodes

While at it, ensure that config is validated, and fixing a bug related
to reuse ctx between multiple upstreams resolving.
This commit is contained in:
Cuong Manh Le
2022-12-14 23:34:24 +07:00
committed by Cuong Manh Le
parent fe0faac8c4
commit ccada70e31
8 changed files with 148 additions and 23 deletions

View File

@@ -7,10 +7,13 @@ import (
"os/exec"
"runtime"
"github.com/go-playground/validator/v10"
"github.com/kardianos/service"
"github.com/pelletier/go-toml"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/Control-D-Inc/ctrld"
)
var (
@@ -52,6 +55,9 @@ func initCLI() {
if err := v.Unmarshal(&cfg); err != nil {
log.Fatalf("failed to unmarshal config: %v", err)
}
if err := ctrld.ValidateConfig(validator.New(), &cfg); err != nil {
log.Fatalf("invalid config: %v", err)
}
initLogging()
if daemon {
exe, err := os.Executable()

View File

@@ -22,7 +22,10 @@ func (p *prog) serveUDP(listenerNum string) error {
mainLog.Error().Err(allocErr).Str("ip", listenerConfig.IP).Msg("serveUDP: failed to allocate listen ip")
return allocErr
}
var failoverRcodes []int
if listenerConfig.Policy != nil {
failoverRcodes = listenerConfig.Policy.FailoverRcodeNumbers
}
handler := dns.HandlerFunc(func(w dns.ResponseWriter, m *dns.Msg) {
domain := canonicalName(m.Question[0].Name)
reqId := requestID()
@@ -37,7 +40,7 @@ func (p *prog) serveUDP(listenerNum string) error {
answer.SetRcode(m, dns.RcodeRefused)
} else {
answer = p.proxy(ctx, upstreams, m)
answer = p.proxy(ctx, upstreams, failoverRcodes, m)
rtt := time.Since(t)
ctrld.Log(ctx, proxyLog.Debug(), "received response of %d bytes in %s", answer.Len(), rtt)
}
@@ -119,7 +122,7 @@ func (p *prog) upstreamFor(ctx context.Context, defaultUpstreamNum string, lc *c
return upstreams, matched
}
func (p *prog) proxy(ctx context.Context, upstreams []string, msg *dns.Msg) *dns.Msg {
func (p *prog) proxy(ctx context.Context, upstreams []string, failoverRcodes []int, msg *dns.Msg) *dns.Msg {
upstreamConfigs := p.upstreamConfigsFromUpstreamNumbers(upstreams)
resolve := func(n int, upstreamConfig *ctrld.UpstreamConfig, msg *dns.Msg) *dns.Msg {
ctrld.Log(ctx, proxyLog.Debug(), "sending query to %s: %s", upstreams[n], upstreamConfig.Name)
@@ -128,12 +131,14 @@ func (p *prog) proxy(ctx context.Context, upstreams []string, msg *dns.Msg) *dns
ctrld.Log(ctx, proxyLog.Error().Err(err), "failed to create resolver")
return nil
}
resolveCtx, cancel := context.WithCancel(ctx)
defer cancel()
if upstreamConfig.Timeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Millisecond*time.Duration(upstreamConfig.Timeout))
timeoutCtx, cancel := context.WithTimeout(resolveCtx, time.Millisecond*time.Duration(upstreamConfig.Timeout))
defer cancel()
ctx = timeoutCtx
resolveCtx = timeoutCtx
}
answer, err := dnsResolver.Resolve(ctx, msg)
answer, err := dnsResolver.Resolve(resolveCtx, msg)
if err != nil {
ctrld.Log(ctx, proxyLog.Error().Err(err), "failed to resolve query")
return nil
@@ -141,9 +146,15 @@ func (p *prog) proxy(ctx context.Context, upstreams []string, msg *dns.Msg) *dns
return answer
}
for n, upstreamConfig := range upstreamConfigs {
if answer := resolve(n, upstreamConfig, msg); answer != nil {
return answer
answer := resolve(n, upstreamConfig, msg)
if answer == nil {
continue
}
if answer.Rcode != dns.RcodeSuccess && len(upstreamConfigs) > 1 && containRcode(failoverRcodes, answer.Rcode) {
ctrld.Log(ctx, proxyLog.Debug(), "failover rcode matched, process to next upstream")
continue
}
return answer
}
ctrld.Log(ctx, proxyLog.Error(), "all upstreams failed")
answer := new(dns.Msg)
@@ -151,6 +162,18 @@ func (p *prog) proxy(ctx context.Context, upstreams []string, msg *dns.Msg) *dns
return answer
}
func (p *prog) upstreamConfigsFromUpstreamNumbers(upstreams []string) []*ctrld.UpstreamConfig {
upstreamConfigs := make([]*ctrld.UpstreamConfig, 0, len(upstreams))
for _, upstream := range upstreams {
upstreamNum := strings.TrimPrefix(upstream, "upstream.")
upstreamConfigs = append(upstreamConfigs, p.cfg.Upstream[upstreamNum])
}
if len(upstreamConfigs) == 0 {
upstreamConfigs = []*ctrld.UpstreamConfig{osUpstreamConfig}
}
return upstreamConfigs
}
// canonicalName returns canonical name from FQDN with "." trimmed.
func canonicalName(fqdn string) string {
q := strings.TrimSpace(fqdn)
@@ -189,18 +212,6 @@ func fmtRemoteToLocal(listenerNum, remote, local string) string {
return fmt.Sprintf("%s -> listener.%s: %s:", remote, listenerNum, local)
}
func (p *prog) upstreamConfigsFromUpstreamNumbers(upstreams []string) []*ctrld.UpstreamConfig {
upstreamConfigs := make([]*ctrld.UpstreamConfig, 0, len(upstreams))
for _, upstream := range upstreams {
upstreamNum := strings.TrimPrefix(upstream, "upstream.")
upstreamConfigs = append(upstreamConfigs, p.cfg.Upstream[upstreamNum])
}
if len(upstreamConfigs) == 0 {
upstreamConfigs = []*ctrld.UpstreamConfig{osUpstreamConfig}
}
return upstreamConfigs
}
func requestID() string {
b := make([]byte, 3) // 6 chars
if _, err := rand.Read(b); err != nil {
@@ -209,6 +220,15 @@ func requestID() string {
return hex.EncodeToString(b)
}
func containRcode(rcodes []int, rcode int) bool {
for i := range rcodes {
if rcodes[i] == rcode {
return true
}
}
return false
}
var osUpstreamConfig = &ctrld.UpstreamConfig{
Name: "OS resolver",
Type: "os",

View File

@@ -74,6 +74,7 @@ func (p *prog) run() {
}
for listenerNum := range p.cfg.Listener {
p.cfg.Listener[listenerNum].Init()
go func(listenerNum string) {
defer wg.Done()
listenerConfig := p.cfg.Listener[listenerNum]