Add files via upload

This commit is contained in:
公明
2026-06-22 23:27:30 +08:00
committed by GitHub
parent 51f1cfde2f
commit b6ff80adf2
8 changed files with 138 additions and 91 deletions
+60 -38
View File
@@ -383,6 +383,12 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
runner := adk.NewRunner(ctx, runnerCfg)
startRunnerIter := func(runMsgs []adk.Message) *adk.AsyncIterator[*adk.AgentEvent] {
if checkPointID != "" {
return runner.Run(ctx, runMsgs, adk.WithCheckPointID(checkPointID))
}
return runner.Run(ctx, runMsgs)
}
var iter *adk.AsyncIterator[*adk.AgentEvent]
if cpStore != nil && checkPointID != "" {
if _, existed, getErr := cpStore.Get(ctx, checkPointID); getErr != nil {
@@ -422,12 +428,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
if iter == nil {
if checkPointID != "" {
iter = runner.Run(ctx, msgs, adk.WithCheckPointID(checkPointID))
} else {
iter = runner.Run(ctx, msgs)
}
iter = startRunnerIter(msgs)
}
transientRetrier := newEinoTransientRunRetrier(einoTransientRunRetryPolicyFromArgs(args))
handleRunErr := func(runErr error) error {
if runErr == nil {
return nil
@@ -480,26 +483,60 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
return runErr
}
// maybeRetryTransientRun:不在此层 runner.Run/Resume;由 handler 落库 + loadHistoryFromAgentTrace 分段续跑(同中断并继续)。
maybeRetryTransientRun := func(runErr error) (retry bool, fatal error) {
if runErr == nil || !isEinoTransientRunError(runErr) {
maybeRetryTransientRun := func(runErr error) (restarted bool, fatal error) {
if runErr == nil {
return false, nil
}
if !isEinoTransientRunError(runErr) {
return false, handleRunErr(runErr)
}
restarted, restartMsgs, ctxSource, backoff, retErr := transientRetrier.tryRetry(
ctx, runErr, args, baseMsgs, runAccumulatedMsgs, baseAccumulatedCount,
)
if retErr != nil {
flushAllPendingAsFailed(runErr)
if logger != nil {
logger.Warn("eino transient retry exhausted",
zap.Error(retErr),
zap.String("orchestration", orchMode),
zap.Int("maxAttempts", transientRetrier.maxAttempts()))
}
return false, retErr
}
if !restarted {
return false, nil
}
attemptNo := transientRetrier.attempt()
maxAttempts := transientRetrier.maxAttempts()
if logger != nil {
logger.Warn("eino transient error, ending run segment for handler resume",
logger.Warn("eino transient error, retrying after backoff",
zap.Error(runErr),
zap.String("orchestration", orchMode))
zap.String("orchestration", orchMode),
zap.Int("attempt", attemptNo),
zap.Int("maxAttempts", maxAttempts),
zap.Duration("backoff", backoff))
}
if progress != nil {
progress("eino_run_retry", "遇到临时错误(限流或网络波动),将保存上下文并重试…", map[string]interface{}{
progress("eino_run_retry", fmt.Sprintf("遇到临时错误(限流或网络波动),%d 秒后第 %d/%d 次重试…", int(backoff.Seconds()), attemptNo, maxAttempts), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"error": runErr.Error(),
"resumeKind": "trace_segment",
"attempt": attemptNo,
"maxAttempts": maxAttempts,
"backoffSec": int(backoff.Seconds()),
})
progress("eino_run_retry", "已恢复上下文,正在重试…", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"attempt": attemptNo,
"contextSource": string(ctxSource),
})
}
return false, ErrTransientRetryContinue
msgs = restartMsgs
iter = startRunnerIter(msgs)
return true, nil
}
takePartial := func(runErr error) (*RunResult, error) {
@@ -583,9 +620,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
if ev.Err != nil {
if _, retErr := maybeRetryTransientRun(ev.Err); retErr != nil {
restarted, retErr := maybeRetryTransientRun(ev.Err)
if retErr != nil {
return takePartial(retErr)
}
if restarted {
continue
}
}
if ev.AgentName != "" && progress != nil {
iterEinoAgent := orchestratorName
@@ -951,9 +992,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
"einoRole": einoRoleTag(ev.AgentName),
})
}
if _, retErr := maybeRetryTransientRun(streamRecvErr); retErr != nil {
restarted, retErr := maybeRetryTransientRun(streamRecvErr)
if retErr != nil {
return takePartial(retErr)
}
if restarted {
continue
}
}
continue
}
@@ -1057,32 +1102,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs),
lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false,
)
if shouldEinoEmptyResponseContinue(out, emptyHint, len(runAccumulatedMsgs), baseAccumulatedCount) {
if logger != nil {
logger.Info("eino empty response, ending run segment for handler resume",
zap.String("conversationId", conversationID),
zap.String("orchestration", orchMode),
zap.Int("traceMessages", len(runAccumulatedMsgs)))
}
if progress != nil {
progress("eino_empty_response_continue", "会话已结束但未产生助手正文,正在基于轨迹自动续跑…", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"resumeKind": "trace_segment",
})
}
return out, ErrEmptyResponseContinue
}
return out, nil
}
func shouldEinoEmptyResponseContinue(out *RunResult, emptyHint string, accumulatedLen, baseCount int) bool {
if out == nil || accumulatedLen <= baseCount {
return false
}
return strings.TrimSpace(out.Response) == strings.TrimSpace(emptyHint)
}
func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message {
if args != nil && args.ModelFacingTrace != nil {
if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 {
+3 -6
View File
@@ -243,17 +243,14 @@ func prependEinoMiddlewares(
return outTools, extraHandlers, toolSearchActive, nil
}
func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, retry *adk.ModelRetryConfig, taskDesc func(context.Context, []adk.Agent) (string, error)) {
func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, taskDesc func(context.Context, []adk.Agent) (string, error)) {
if ma == nil {
return "", nil, nil
return "", nil
}
mw := ma.EinoMiddleware
if k := strings.TrimSpace(mw.DeepOutputKey); k != "" {
outputKey = k
}
if mw.DeepModelRetryMaxRetries > 0 {
retry = &adk.ModelRetryConfig{MaxRetries: mw.DeepModelRetryMaxRetries}
}
prefix := strings.TrimSpace(mw.TaskToolDescriptionPrefix)
if prefix != "" {
taskDesc = func(ctx context.Context, agents []adk.Agent) (string, error) {
@@ -274,5 +271,5 @@ func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, retry
return prefix + "\n可用子代理(按名称 transfer / task 调用):" + strings.Join(names, "、"), nil
}
}
return outputKey, retry, taskDesc
return outputKey, taskDesc
}
+1 -4
View File
@@ -188,13 +188,10 @@ func RunEinoSingleChatModelAgent(
MaxIterations: maxIter,
Handlers: handlers,
}
outKey, modelRetry, _ := deepExtrasFromConfig(ma)
outKey, _ := deepExtrasFromConfig(ma)
if outKey != "" {
chatCfg.OutputKey = outKey
}
if modelRetry != nil {
chatCfg.ModelRetryConfig = modelRetry
}
chatAgent, err := adk.NewChatModelAgent(ctx, chatCfg)
if err != nil {
+6 -9
View File
@@ -22,8 +22,6 @@ import (
"go.uber.org/zap"
)
const defaultSummarizationRetryMax = 3
// einoSummarizeUserInstruction:压缩历史时保留渗透测试关键信息。
const einoSummarizeUserInstruction = `在保持所有关键安全测试信息完整的前提下压缩对话历史。
@@ -97,10 +95,8 @@ func newEinoSummarizationMiddleware(
}
}
retryMax := defaultSummarizationRetryMax
if mwCfg != nil && mwCfg.SummarizationRetryMaxAttempts > 0 {
retryMax = mwCfg.SummarizationRetryMaxAttempts
}
retryPolicy := einoTransientRunRetryPolicyFromMW(mwCfg)
retryMax := retryPolicy.maxAttempts
// ModelOptions apply only to summarization Generate (same ChatModel instance as the agent).
// Strip thinking/reasoning on this call path; mark requests for empty-choices diagnostics.
@@ -137,13 +133,14 @@ func newEinoSummarizationMiddleware(
Retry: &summarization.RetryConfig{
MaxRetries: &retryMax,
ShouldRetry: func(_ context.Context, _ adk.Message, err error) bool {
if err != nil && logger != nil {
logger.Warn("eino summarization generate attempt failed, will retry if attempts remain",
retry := isEinoTransientRunError(err)
if retry && logger != nil {
logger.Warn("eino summarization generate transient error, will retry if attempts remain",
zap.Error(err),
zap.Int("max_retries", retryMax),
)
}
return err != nil
return retry
},
},
Finalize: func(ctx context.Context, originalMessages []adk.Message, summary adk.Message) ([]adk.Message, error) {
+67 -12
View File
@@ -3,6 +3,7 @@ package multiagent
import (
"context"
"errors"
"fmt"
"strings"
"time"
@@ -17,8 +18,9 @@ const (
defaultEinoRunRetryMaxBackoff = 30 * time.Second
)
// isEinoTransientRunError 判断 ADK 运行期错误是否适合指数退避续跑(429、5xx、网络抖动等)
// 用户取消、超时、迭代上限等由 run loop 单独处理,不在此列
// isEinoTransientRunError 是 Eino 运行期「可退避重试 vs 直接失败」的唯一判据
// 429/5xx/网络抖动等返回 true;用户取消、超时、迭代上限、鉴权失败等返回 false
// 其它模块(run loop、summarization 等)只调用本函数,不在别处维护平行规则。
func isEinoTransientRunError(err error) bool {
if err == nil {
return false
@@ -78,6 +80,68 @@ func isEinoTransientRunError(err error) bool {
return false
}
type einoTransientRunRetryPolicy struct {
maxAttempts int
maxBackoff time.Duration
}
func einoTransientRunRetryPolicyFromArgs(args *einoADKRunLoopArgs) einoTransientRunRetryPolicy {
return einoTransientRunRetryPolicy{
maxAttempts: einoRunRetryMaxAttempts(args),
maxBackoff: einoRunRetryMaxBackoff(args),
}
}
func einoTransientRunRetryPolicyFromMW(mw *config.MultiAgentEinoMiddlewareConfig) einoTransientRunRetryPolicy {
maxBackoff := defaultEinoRunRetryMaxBackoff
if mw != nil && mw.RunRetryMaxBackoffSec > 0 {
maxBackoff = time.Duration(mw.RunRetryMaxBackoffSec) * time.Second
}
return einoTransientRunRetryPolicy{
maxAttempts: RunRetryMaxAttemptsFromConfig(mw),
maxBackoff: maxBackoff,
}
}
// einoTransientRunRetrier 在 run loop 内对临时错误做指数退避并重启 Runner(唯一重试执行层)。
type einoTransientRunRetrier struct {
policy einoTransientRunRetryPolicy
attempts int
}
func newEinoTransientRunRetrier(policy einoTransientRunRetryPolicy) *einoTransientRunRetrier {
return &einoTransientRunRetrier{policy: policy}
}
// tryRetry 对临时错误退避后返回重启消息;次数用尽返回 exhausted 错误。
func (r *einoTransientRunRetrier) tryRetry(
ctx context.Context,
runErr error,
args *einoADKRunLoopArgs,
baseMsgs, accumulated []adk.Message,
baseCount int,
) (restarted bool, restartMsgs []adk.Message, ctxSource einoRunRestartContextSource, backoff time.Duration, fatal error) {
if runErr == nil || !isEinoTransientRunError(runErr) {
return false, nil, "", 0, runErr
}
r.attempts++
if r.attempts > r.policy.maxAttempts {
return false, nil, "", 0, fmt.Errorf("transient retry exhausted after %d attempts: %w", r.policy.maxAttempts, runErr)
}
backoff = einoTransientRetryBackoff(r.attempts-1, r.policy.maxBackoff)
select {
case <-ctx.Done():
return false, nil, "", 0, ctx.Err()
case <-time.After(backoff):
}
restartMsgs, ctxSource = einoMessagesForRunRestart(args, baseMsgs, accumulated, baseCount)
return true, restartMsgs, ctxSource, backoff, nil
}
func (r *einoTransientRunRetrier) attempt() int { return r.attempts }
func (r *einoTransientRunRetrier) maxAttempts() int { return r.policy.maxAttempts }
func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int {
if args != nil && args.RunRetryMaxAttempts > 0 {
return args.RunRetryMaxAttempts
@@ -85,7 +149,7 @@ func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int {
return defaultEinoRunRetryMaxAttempts
}
// RunRetryMaxAttemptsFromConfig 供 handler 分段续跑计数(与 eino_middleware.run_retry_max_attempts 一致
// RunRetryMaxAttemptsFromConfig 与 eino_middleware.run_retry_max_attempts 一致。
func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) int {
if mw != nil && mw.RunRetryMaxAttempts > 0 {
return mw.RunRetryMaxAttempts
@@ -93,15 +157,6 @@ func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) in
return defaultEinoRunRetryMaxAttempts
}
// TransientRetryBackoff 供 handler 在分段续跑前退避。
func TransientRetryBackoff(attempt int, maxBackoffSec int) time.Duration {
max := defaultEinoRunRetryMaxBackoff
if maxBackoffSec > 0 {
max = time.Duration(maxBackoffSec) * time.Second
}
return einoTransientRetryBackoff(attempt, max)
}
func einoRunRetryMaxBackoff(args *einoADKRunLoopArgs) time.Duration {
if args != nil && args.RunRetryMaxBackoffSec > 0 {
return time.Duration(args.RunRetryMaxBackoffSec) * time.Second
@@ -102,10 +102,3 @@ func TestAppendUserMessageIfNeeded(t *testing.T) {
t.Fatalf("should not duplicate user message: len=%d", len(dup))
}
}
func TestErrTransientRetryContinue(t *testing.T) {
t.Parallel()
if !errors.Is(ErrTransientRetryContinue, ErrTransientRetryContinue) {
t.Fatal("sentinel should match")
}
}
-8
View File
@@ -5,11 +5,3 @@ import "errors"
// ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时,
// 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。
var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context")
// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。
var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace")
// ErrEmptyResponseContinue 表示 Eino ADK 会话正常结束但未捕获到助手正文,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue / ErrTransientRetryContinue 同级)。
var ErrEmptyResponseContinue = errors.New("agent empty response: continue after persisting trace")
+1 -7
View File
@@ -416,7 +416,7 @@ func RunDeepAgent(
EmitInternalEvents: true,
}
deepOutKey, modelRetry, taskGen := deepExtrasFromConfig(ma)
deepOutKey, taskGen := deepExtrasFromConfig(ma)
var da adk.Agent
switch orchMode {
@@ -473,9 +473,6 @@ func RunDeepAgent(
Handlers: supHandlers,
Exit: &adk.ExitTool{},
}
if modelRetry != nil {
supCfg.ModelRetryConfig = modelRetry
}
if deepOutKey != "" {
supCfg.OutputKey = deepOutKey
}
@@ -509,9 +506,6 @@ func RunDeepAgent(
if deepOutKey != "" {
dcfg.OutputKey = deepOutKey
}
if modelRetry != nil {
dcfg.ModelRetryConfig = modelRetry
}
if taskGen != nil {
dcfg.TaskToolDescriptionGenerator = taskGen
}