From b6ff80adf25c848d6ae79bd1c9c72d06b584eac3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Mon, 22 Jun 2026 23:27:30 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 98 ++++++++++++------- internal/multiagent/eino_middleware.go | 9 +- internal/multiagent/eino_single_runner.go | 5 +- internal/multiagent/eino_summarize.go | 15 ++- internal/multiagent/eino_transient_retry.go | 79 ++++++++++++--- .../multiagent/eino_transient_retry_test.go | 7 -- internal/multiagent/interrupt.go | 8 -- internal/multiagent/runner.go | 8 +- 8 files changed, 138 insertions(+), 91 deletions(-) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 2b88c2d0..01df2fd3 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -383,6 +383,12 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } runner := adk.NewRunner(ctx, runnerCfg) + startRunnerIter := func(runMsgs []adk.Message) *adk.AsyncIterator[*adk.AgentEvent] { + if checkPointID != "" { + return runner.Run(ctx, runMsgs, adk.WithCheckPointID(checkPointID)) + } + return runner.Run(ctx, runMsgs) + } var iter *adk.AsyncIterator[*adk.AgentEvent] if cpStore != nil && checkPointID != "" { if _, existed, getErr := cpStore.Get(ctx, checkPointID); getErr != nil { @@ -422,12 +428,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } if iter == nil { - if checkPointID != "" { - iter = runner.Run(ctx, msgs, adk.WithCheckPointID(checkPointID)) - } else { - iter = runner.Run(ctx, msgs) - } + iter = startRunnerIter(msgs) } + transientRetrier := newEinoTransientRunRetrier(einoTransientRunRetryPolicyFromArgs(args)) handleRunErr := func(runErr error) error { if runErr == nil { return nil @@ -480,26 +483,60 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs return runErr } - // maybeRetryTransientRun:不在此层 runner.Run/Resume;由 handler 落库 + loadHistoryFromAgentTrace 分段续跑(同中断并继续)。 - maybeRetryTransientRun := func(runErr error) (retry bool, fatal error) { - if runErr == nil || !isEinoTransientRunError(runErr) { + maybeRetryTransientRun := func(runErr error) (restarted bool, fatal error) { + if runErr == nil { + return false, nil + } + if !isEinoTransientRunError(runErr) { return false, handleRunErr(runErr) } + restarted, restartMsgs, ctxSource, backoff, retErr := transientRetrier.tryRetry( + ctx, runErr, args, baseMsgs, runAccumulatedMsgs, baseAccumulatedCount, + ) + if retErr != nil { + flushAllPendingAsFailed(runErr) + if logger != nil { + logger.Warn("eino transient retry exhausted", + zap.Error(retErr), + zap.String("orchestration", orchMode), + zap.Int("maxAttempts", transientRetrier.maxAttempts())) + } + return false, retErr + } + if !restarted { + return false, nil + } + attemptNo := transientRetrier.attempt() + maxAttempts := transientRetrier.maxAttempts() if logger != nil { - logger.Warn("eino transient error, ending run segment for handler resume", + logger.Warn("eino transient error, retrying after backoff", zap.Error(runErr), - zap.String("orchestration", orchMode)) + zap.String("orchestration", orchMode), + zap.Int("attempt", attemptNo), + zap.Int("maxAttempts", maxAttempts), + zap.Duration("backoff", backoff)) } if progress != nil { - progress("eino_run_retry", "遇到临时错误(限流或网络波动),将保存上下文并重试…", map[string]interface{}{ + progress("eino_run_retry", fmt.Sprintf("遇到临时错误(限流或网络波动),%d 秒后第 %d/%d 次重试…", int(backoff.Seconds()), attemptNo, maxAttempts), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "orchestration": orchMode, "error": runErr.Error(), - "resumeKind": "trace_segment", + "attempt": attemptNo, + "maxAttempts": maxAttempts, + "backoffSec": int(backoff.Seconds()), + }) + progress("eino_run_retry", "已恢复上下文,正在重试…", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "orchestration": orchMode, + "attempt": attemptNo, + "contextSource": string(ctxSource), }) } - return false, ErrTransientRetryContinue + msgs = restartMsgs + iter = startRunnerIter(msgs) + return true, nil } takePartial := func(runErr error) (*RunResult, error) { @@ -583,9 +620,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs continue } if ev.Err != nil { - if _, retErr := maybeRetryTransientRun(ev.Err); retErr != nil { + restarted, retErr := maybeRetryTransientRun(ev.Err) + if retErr != nil { return takePartial(retErr) } + if restarted { + continue + } } if ev.AgentName != "" && progress != nil { iterEinoAgent := orchestratorName @@ -951,9 +992,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": einoRoleTag(ev.AgentName), }) } - if _, retErr := maybeRetryTransientRun(streamRecvErr); retErr != nil { + restarted, retErr := maybeRetryTransientRun(streamRecvErr) + if retErr != nil { return takePartial(retErr) } + if restarted { + continue + } } continue } @@ -1057,32 +1102,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false, ) - if shouldEinoEmptyResponseContinue(out, emptyHint, len(runAccumulatedMsgs), baseAccumulatedCount) { - if logger != nil { - logger.Info("eino empty response, ending run segment for handler resume", - zap.String("conversationId", conversationID), - zap.String("orchestration", orchMode), - zap.Int("traceMessages", len(runAccumulatedMsgs))) - } - if progress != nil { - progress("eino_empty_response_continue", "会话已结束但未产生助手正文,正在基于轨迹自动续跑…", map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "resumeKind": "trace_segment", - }) - } - return out, ErrEmptyResponseContinue - } return out, nil } -func shouldEinoEmptyResponseContinue(out *RunResult, emptyHint string, accumulatedLen, baseCount int) bool { - if out == nil || accumulatedLen <= baseCount { - return false - } - return strings.TrimSpace(out.Response) == strings.TrimSpace(emptyHint) -} - func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message { if args != nil && args.ModelFacingTrace != nil { if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 { diff --git a/internal/multiagent/eino_middleware.go b/internal/multiagent/eino_middleware.go index f0367d5b..4e90bc02 100644 --- a/internal/multiagent/eino_middleware.go +++ b/internal/multiagent/eino_middleware.go @@ -243,17 +243,14 @@ func prependEinoMiddlewares( return outTools, extraHandlers, toolSearchActive, nil } -func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, retry *adk.ModelRetryConfig, taskDesc func(context.Context, []adk.Agent) (string, error)) { +func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, taskDesc func(context.Context, []adk.Agent) (string, error)) { if ma == nil { - return "", nil, nil + return "", nil } mw := ma.EinoMiddleware if k := strings.TrimSpace(mw.DeepOutputKey); k != "" { outputKey = k } - if mw.DeepModelRetryMaxRetries > 0 { - retry = &adk.ModelRetryConfig{MaxRetries: mw.DeepModelRetryMaxRetries} - } prefix := strings.TrimSpace(mw.TaskToolDescriptionPrefix) if prefix != "" { taskDesc = func(ctx context.Context, agents []adk.Agent) (string, error) { @@ -274,5 +271,5 @@ func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, retry return prefix + "\n可用子代理(按名称 transfer / task 调用):" + strings.Join(names, "、"), nil } } - return outputKey, retry, taskDesc + return outputKey, taskDesc } diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index c38e508a..2d5cb9cb 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -188,13 +188,10 @@ func RunEinoSingleChatModelAgent( MaxIterations: maxIter, Handlers: handlers, } - outKey, modelRetry, _ := deepExtrasFromConfig(ma) + outKey, _ := deepExtrasFromConfig(ma) if outKey != "" { chatCfg.OutputKey = outKey } - if modelRetry != nil { - chatCfg.ModelRetryConfig = modelRetry - } chatAgent, err := adk.NewChatModelAgent(ctx, chatCfg) if err != nil { diff --git a/internal/multiagent/eino_summarize.go b/internal/multiagent/eino_summarize.go index 702eb6e1..222601fa 100644 --- a/internal/multiagent/eino_summarize.go +++ b/internal/multiagent/eino_summarize.go @@ -22,8 +22,6 @@ import ( "go.uber.org/zap" ) -const defaultSummarizationRetryMax = 3 - // einoSummarizeUserInstruction:压缩历史时保留渗透测试关键信息。 const einoSummarizeUserInstruction = `在保持所有关键安全测试信息完整的前提下压缩对话历史。 @@ -97,10 +95,8 @@ func newEinoSummarizationMiddleware( } } - retryMax := defaultSummarizationRetryMax - if mwCfg != nil && mwCfg.SummarizationRetryMaxAttempts > 0 { - retryMax = mwCfg.SummarizationRetryMaxAttempts - } + retryPolicy := einoTransientRunRetryPolicyFromMW(mwCfg) + retryMax := retryPolicy.maxAttempts // ModelOptions apply only to summarization Generate (same ChatModel instance as the agent). // Strip thinking/reasoning on this call path; mark requests for empty-choices diagnostics. @@ -137,13 +133,14 @@ func newEinoSummarizationMiddleware( Retry: &summarization.RetryConfig{ MaxRetries: &retryMax, ShouldRetry: func(_ context.Context, _ adk.Message, err error) bool { - if err != nil && logger != nil { - logger.Warn("eino summarization generate attempt failed, will retry if attempts remain", + retry := isEinoTransientRunError(err) + if retry && logger != nil { + logger.Warn("eino summarization generate transient error, will retry if attempts remain", zap.Error(err), zap.Int("max_retries", retryMax), ) } - return err != nil + return retry }, }, Finalize: func(ctx context.Context, originalMessages []adk.Message, summary adk.Message) ([]adk.Message, error) { diff --git a/internal/multiagent/eino_transient_retry.go b/internal/multiagent/eino_transient_retry.go index 7311a0f7..7090fe68 100644 --- a/internal/multiagent/eino_transient_retry.go +++ b/internal/multiagent/eino_transient_retry.go @@ -3,6 +3,7 @@ package multiagent import ( "context" "errors" + "fmt" "strings" "time" @@ -17,8 +18,9 @@ const ( defaultEinoRunRetryMaxBackoff = 30 * time.Second ) -// isEinoTransientRunError 判断 ADK 运行期错误是否适合指数退避续跑(429、5xx、网络抖动等)。 -// 用户取消、超时、迭代上限等由 run loop 单独处理,不在此列。 +// isEinoTransientRunError 是 Eino 运行期「可退避重试 vs 直接失败」的唯一判据。 +// 429/5xx/网络抖动等返回 true;用户取消、超时、迭代上限、鉴权失败等返回 false。 +// 其它模块(run loop、summarization 等)只调用本函数,不在别处维护平行规则。 func isEinoTransientRunError(err error) bool { if err == nil { return false @@ -78,6 +80,68 @@ func isEinoTransientRunError(err error) bool { return false } +type einoTransientRunRetryPolicy struct { + maxAttempts int + maxBackoff time.Duration +} + +func einoTransientRunRetryPolicyFromArgs(args *einoADKRunLoopArgs) einoTransientRunRetryPolicy { + return einoTransientRunRetryPolicy{ + maxAttempts: einoRunRetryMaxAttempts(args), + maxBackoff: einoRunRetryMaxBackoff(args), + } +} + +func einoTransientRunRetryPolicyFromMW(mw *config.MultiAgentEinoMiddlewareConfig) einoTransientRunRetryPolicy { + maxBackoff := defaultEinoRunRetryMaxBackoff + if mw != nil && mw.RunRetryMaxBackoffSec > 0 { + maxBackoff = time.Duration(mw.RunRetryMaxBackoffSec) * time.Second + } + return einoTransientRunRetryPolicy{ + maxAttempts: RunRetryMaxAttemptsFromConfig(mw), + maxBackoff: maxBackoff, + } +} + +// einoTransientRunRetrier 在 run loop 内对临时错误做指数退避并重启 Runner(唯一重试执行层)。 +type einoTransientRunRetrier struct { + policy einoTransientRunRetryPolicy + attempts int +} + +func newEinoTransientRunRetrier(policy einoTransientRunRetryPolicy) *einoTransientRunRetrier { + return &einoTransientRunRetrier{policy: policy} +} + +// tryRetry 对临时错误退避后返回重启消息;次数用尽返回 exhausted 错误。 +func (r *einoTransientRunRetrier) tryRetry( + ctx context.Context, + runErr error, + args *einoADKRunLoopArgs, + baseMsgs, accumulated []adk.Message, + baseCount int, +) (restarted bool, restartMsgs []adk.Message, ctxSource einoRunRestartContextSource, backoff time.Duration, fatal error) { + if runErr == nil || !isEinoTransientRunError(runErr) { + return false, nil, "", 0, runErr + } + r.attempts++ + if r.attempts > r.policy.maxAttempts { + return false, nil, "", 0, fmt.Errorf("transient retry exhausted after %d attempts: %w", r.policy.maxAttempts, runErr) + } + backoff = einoTransientRetryBackoff(r.attempts-1, r.policy.maxBackoff) + select { + case <-ctx.Done(): + return false, nil, "", 0, ctx.Err() + case <-time.After(backoff): + } + restartMsgs, ctxSource = einoMessagesForRunRestart(args, baseMsgs, accumulated, baseCount) + return true, restartMsgs, ctxSource, backoff, nil +} + +func (r *einoTransientRunRetrier) attempt() int { return r.attempts } + +func (r *einoTransientRunRetrier) maxAttempts() int { return r.policy.maxAttempts } + func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int { if args != nil && args.RunRetryMaxAttempts > 0 { return args.RunRetryMaxAttempts @@ -85,7 +149,7 @@ func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int { return defaultEinoRunRetryMaxAttempts } -// RunRetryMaxAttemptsFromConfig 供 handler 分段续跑计数(与 eino_middleware.run_retry_max_attempts 一致)。 +// RunRetryMaxAttemptsFromConfig 与 eino_middleware.run_retry_max_attempts 一致。 func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) int { if mw != nil && mw.RunRetryMaxAttempts > 0 { return mw.RunRetryMaxAttempts @@ -93,15 +157,6 @@ func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) in return defaultEinoRunRetryMaxAttempts } -// TransientRetryBackoff 供 handler 在分段续跑前退避。 -func TransientRetryBackoff(attempt int, maxBackoffSec int) time.Duration { - max := defaultEinoRunRetryMaxBackoff - if maxBackoffSec > 0 { - max = time.Duration(maxBackoffSec) * time.Second - } - return einoTransientRetryBackoff(attempt, max) -} - func einoRunRetryMaxBackoff(args *einoADKRunLoopArgs) time.Duration { if args != nil && args.RunRetryMaxBackoffSec > 0 { return time.Duration(args.RunRetryMaxBackoffSec) * time.Second diff --git a/internal/multiagent/eino_transient_retry_test.go b/internal/multiagent/eino_transient_retry_test.go index 1ca8cf58..0761dc40 100644 --- a/internal/multiagent/eino_transient_retry_test.go +++ b/internal/multiagent/eino_transient_retry_test.go @@ -102,10 +102,3 @@ func TestAppendUserMessageIfNeeded(t *testing.T) { t.Fatalf("should not duplicate user message: len=%d", len(dup)) } } - -func TestErrTransientRetryContinue(t *testing.T) { - t.Parallel() - if !errors.Is(ErrTransientRetryContinue, ErrTransientRetryContinue) { - t.Fatal("sentinel should match") - } -} diff --git a/internal/multiagent/interrupt.go b/internal/multiagent/interrupt.go index dc9bc348..500e300f 100644 --- a/internal/multiagent/interrupt.go +++ b/internal/multiagent/interrupt.go @@ -5,11 +5,3 @@ import "errors" // ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时, // 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。 var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context") - -// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后 -// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。 -var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace") - -// ErrEmptyResponseContinue 表示 Eino ADK 会话正常结束但未捕获到助手正文,应由 handler 落库轨迹后 -// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue / ErrTransientRetryContinue 同级)。 -var ErrEmptyResponseContinue = errors.New("agent empty response: continue after persisting trace") diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 2cc2c32d..c5425b49 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -416,7 +416,7 @@ func RunDeepAgent( EmitInternalEvents: true, } - deepOutKey, modelRetry, taskGen := deepExtrasFromConfig(ma) + deepOutKey, taskGen := deepExtrasFromConfig(ma) var da adk.Agent switch orchMode { @@ -473,9 +473,6 @@ func RunDeepAgent( Handlers: supHandlers, Exit: &adk.ExitTool{}, } - if modelRetry != nil { - supCfg.ModelRetryConfig = modelRetry - } if deepOutKey != "" { supCfg.OutputKey = deepOutKey } @@ -509,9 +506,6 @@ func RunDeepAgent( if deepOutKey != "" { dcfg.OutputKey = deepOutKey } - if modelRetry != nil { - dcfg.ModelRetryConfig = modelRetry - } if taskGen != nil { dcfg.TaskToolDescriptionGenerator = taskGen }