diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 37d2b326..b91d3b8f 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -77,6 +77,9 @@ type einoADKRunLoopArgs struct { StreamsMainAssistant func(agent string) bool EinoRoleTag func(agent string) string CheckpointDir string + // RunRetryMaxAttempts / RunRetryMaxBackoffSec:429、5xx、网络抖动时的指数退避续跑(0=默认 10 次 / 30s 上限)。 + RunRetryMaxAttempts int + RunRetryMaxBackoffSec int McpIDsMu *sync.Mutex McpIDs *[]string @@ -437,6 +440,28 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs return runErr } + // maybeRetryTransientRun:不在此层 runner.Run/Resume;由 handler 落库 + loadHistoryFromAgentTrace 分段续跑(同中断并继续)。 + maybeRetryTransientRun := func(runErr error) (retry bool, fatal error) { + if runErr == nil || !isEinoTransientRunError(runErr) { + return false, handleRunErr(runErr) + } + if logger != nil { + logger.Warn("eino transient error, ending run segment for handler resume", + zap.Error(runErr), + zap.String("orchestration", orchMode)) + } + if progress != nil { + progress("eino_run_retry", "遇到临时错误(限流或网络波动),将保存上下文并重试…", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "orchestration": orchMode, + "error": runErr.Error(), + "resumeKind": "trace_segment", + }) + } + return false, ErrTransientRetryContinue + } + takePartial := func(runErr error) (*RunResult, error) { if len(runAccumulatedMsgs) <= baseAccumulatedCount { return nil, runErr @@ -519,7 +544,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs continue } if ev.Err != nil { - if retErr := handleRunErr(ev.Err); retErr != nil { + if _, retErr := maybeRetryTransientRun(ev.Err); retErr != nil { return takePartial(retErr) } } @@ -821,7 +846,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": einoRoleTag(ev.AgentName), }) } - if retErr := handleRunErr(streamRecvErr); retErr != nil { + if _, retErr := maybeRetryTransientRun(streamRecvErr); retErr != nil { return takePartial(retErr) } } diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index c5e66db1..5cdadcc2 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -18,7 +18,6 @@ import ( einoopenai "github.com/cloudwego/eino-ext/components/model/openai" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/compose" - "github.com/cloudwego/eino/schema" "go.uber.org/zap" ) @@ -213,7 +212,7 @@ func RunEinoSingleChatModelAgent( } baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware) - baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) + baseMsgs = appendUserMessageIfNeeded(baseMsgs, userMessage) streamsMainAssistant := func(agent string) bool { return agent == "" || agent == einoSingleAgentName @@ -233,6 +232,8 @@ func RunEinoSingleChatModelAgent( StreamsMainAssistant: streamsMainAssistant, EinoRoleTag: einoRoleTag, CheckpointDir: ma.EinoMiddleware.CheckpointDir, + RunRetryMaxAttempts: ma.EinoMiddleware.RunRetryMaxAttempts, + RunRetryMaxBackoffSec: ma.EinoMiddleware.RunRetryMaxBackoffSec, McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, FilesystemMonitorAgent: ag, diff --git a/internal/multiagent/eino_transient_retry.go b/internal/multiagent/eino_transient_retry.go new file mode 100644 index 00000000..895f062b --- /dev/null +++ b/internal/multiagent/eino_transient_retry.go @@ -0,0 +1,173 @@ +package multiagent + +import ( + "context" + "errors" + "strings" + "time" + + "cyberstrike-ai/internal/config" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" +) + +const ( + defaultEinoRunRetryMaxAttempts = 10 + defaultEinoRunRetryMaxBackoff = 30 * time.Second +) + +// isEinoTransientRunError 判断 ADK 运行期错误是否适合指数退避续跑(429、5xx、网络抖动等)。 +// 用户取消、超时、迭代上限等由 run loop 单独处理,不在此列。 +func isEinoTransientRunError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return false + } + if isEinoIterationLimitError(err) { + return false + } + msg := strings.ToLower(strings.TrimSpace(err.Error())) + if msg == "" { + return false + } + transientMarkers := []string{ + "406", + "429", + "too many requests", + "rate limit", + "rate_limit", + "ratelimit", + "quota exceeded", + "overloaded", + "capacity", + "temporarily unavailable", + "service unavailable", + "bad gateway", + "gateway timeout", + "internal server error", + "connection reset", + "connection refused", + "connection closed", + "i/o timeout", + "no such host", + "network is unreachable", + "broken pipe", + "eof", + "read tcp", + "write tcp", + "dial tcp", + "tls handshake timeout", + "stream error", + "unexpected eof", + "unexpected end of json", + "status code: 406", + "status code: 502", + "502", + "503", + "504", + "500", + } + for _, m := range transientMarkers { + if strings.Contains(msg, m) { + return true + } + } + return false +} + +func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int { + if args != nil && args.RunRetryMaxAttempts > 0 { + return args.RunRetryMaxAttempts + } + return defaultEinoRunRetryMaxAttempts +} + +// RunRetryMaxAttemptsFromConfig 供 handler 分段续跑计数(与 eino_middleware.run_retry_max_attempts 一致)。 +func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) int { + if mw != nil && mw.RunRetryMaxAttempts > 0 { + return mw.RunRetryMaxAttempts + } + return defaultEinoRunRetryMaxAttempts +} + +// TransientRetryBackoff 供 handler 在分段续跑前退避。 +func TransientRetryBackoff(attempt int, maxBackoffSec int) time.Duration { + max := defaultEinoRunRetryMaxBackoff + if maxBackoffSec > 0 { + max = time.Duration(maxBackoffSec) * time.Second + } + return einoTransientRetryBackoff(attempt, max) +} + +func einoRunRetryMaxBackoff(args *einoADKRunLoopArgs) time.Duration { + if args != nil && args.RunRetryMaxBackoffSec > 0 { + return time.Duration(args.RunRetryMaxBackoffSec) * time.Second + } + return defaultEinoRunRetryMaxBackoff +} + +// einoRunRestartContextSource 描述无 checkpoint Resume 时 Run 使用的消息来源(日志/SSE)。 +type einoRunRestartContextSource string + +const ( + einoRestartContextInitial einoRunRestartContextSource = "initial" + einoRestartContextAccumulated einoRunRestartContextSource = "accumulated" + einoRestartContextModelTrace einoRunRestartContextSource = "model_trace" +) + +// einoMessagesForRunRestart 在退避后重新 Run 时选用最完整的上下文: +// 1) ModelFacingTrace(与模型实际入参一致) 2) 事件流累积的 runAccumulatedMsgs 3) 初始 msgs。 +func einoMessagesForRunRestart(args *einoADKRunLoopArgs, baseMsgs, accumulated []adk.Message, baseCount int) ([]adk.Message, einoRunRestartContextSource) { + if trace := persistTraceSource(args, nil); len(trace) > 0 { + return append([]adk.Message(nil), trace...), einoRestartContextModelTrace + } + if len(accumulated) > baseCount { + return append([]adk.Message(nil), accumulated...), einoRestartContextAccumulated + } + return append([]adk.Message(nil), baseMsgs...), einoRestartContextInitial +} + +// adkMessagesHasUserContent 从尾部向前查找,是否已有与 want 相同的 user 消息(避免重复 append)。 +func adkMessagesHasUserContent(msgs []adk.Message, want string) bool { + want = strings.TrimSpace(want) + if want == "" { + return true + } + for i := len(msgs) - 1; i >= 0; i-- { + m := msgs[i] + if m == nil { + continue + } + if m.Role == schema.User { + return strings.TrimSpace(m.Content) == want + } + if m.Role == schema.Assistant || m.Role == schema.Tool { + continue + } + break + } + return false +} + +// appendUserMessageIfNeeded 在 history 轨迹之后追加本轮 user 消息(仅当轨迹中尚未包含该句)。 +func appendUserMessageIfNeeded(msgs []adk.Message, userMessage string) []adk.Message { + if strings.TrimSpace(userMessage) == "" || adkMessagesHasUserContent(msgs, userMessage) { + return msgs + } + return append(msgs, schema.UserMessage(userMessage)) +} + +// einoTransientRetryBackoff 指数退避:2s, 4s, 8s… capped by maxBackoff。 +func einoTransientRetryBackoff(attempt int, maxBackoff time.Duration) time.Duration { + if attempt < 0 { + attempt = 0 + } + backoff := time.Duration(1< 0 && backoff > maxBackoff { + backoff = maxBackoff + } + return backoff +} diff --git a/internal/multiagent/eino_transient_retry_test.go b/internal/multiagent/eino_transient_retry_test.go new file mode 100644 index 00000000..80107501 --- /dev/null +++ b/internal/multiagent/eino_transient_retry_test.go @@ -0,0 +1,104 @@ +package multiagent + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" +) + +func TestIsEinoTransientRunError(t *testing.T) { + t.Parallel() + cases := []struct { + name string + err error + want bool + }{ + {"nil", nil, false}, + {"429", errors.New("HTTP 429 Too Many Requests"), true}, + {"rate limit", errors.New(`{"error":"rate limit exceeded"}`), true}, + {"connection reset", errors.New("read tcp: connection reset by peer"), true}, + {"503", errors.New("upstream returned 503"), true}, + {"iteration limit", errors.New("max iteration reached"), false}, + {"canceled", context.Canceled, false}, + {"deadline", context.DeadlineExceeded, false}, + {"auth", errors.New("invalid api key"), false}, + } + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + if got := isEinoTransientRunError(tc.err); got != tc.want { + t.Fatalf("isEinoTransientRunError(%v) = %v, want %v", tc.err, got, tc.want) + } + }) + } +} + +func TestEinoTransientRetryBackoff(t *testing.T) { + t.Parallel() + max := 30 * time.Second + if got := einoTransientRetryBackoff(0, max); got != 2*time.Second { + t.Fatalf("attempt 0: got %v", got) + } + if got := einoTransientRetryBackoff(4, max); got != 30*time.Second { + t.Fatalf("attempt 4 capped: got %v", got) + } +} + +func TestEinoMessagesForRunRestart(t *testing.T) { + t.Parallel() + base := []adk.Message{schema.UserMessage("hi")} + acc := append([]adk.Message(nil), base...) + acc = append(acc, schema.AssistantMessage("step1", nil)) + + got, src := einoMessagesForRunRestart(nil, base, acc, len(base)) + if src != einoRestartContextAccumulated || len(got) != 2 { + t.Fatalf("accumulated: src=%s len=%d", src, len(got)) + } + + holder := newModelFacingTraceHolder() + holder.storeFromState(&adk.ChatModelAgentState{ + Messages: []adk.Message{schema.UserMessage("u"), schema.AssistantMessage("model-view", nil)}, + }) + got2, src2 := einoMessagesForRunRestart(&einoADKRunLoopArgs{ModelFacingTrace: holder}, base, acc, len(base)) + if src2 != einoRestartContextModelTrace || len(got2) != 2 { + t.Fatalf("model trace: src=%s len=%d", src2, len(got2)) + } +} + +func TestEinoRunRetryMaxAttemptsFromArgs(t *testing.T) { + t.Parallel() + if einoRunRetryMaxAttempts(nil) != defaultEinoRunRetryMaxAttempts { + t.Fatal("nil args should use default") + } + if einoRunRetryMaxAttempts(&einoADKRunLoopArgs{RunRetryMaxAttempts: 3}) != 3 { + t.Fatal("custom max attempts") + } + if RunRetryMaxAttemptsFromConfig(nil) != defaultEinoRunRetryMaxAttempts { + t.Fatal("config nil should use default") + } +} + +func TestAppendUserMessageIfNeeded(t *testing.T) { + t.Parallel() + msgs := []adk.Message{schema.UserMessage("old task")} + out := appendUserMessageIfNeeded(msgs, "你好,你是谁") + if len(out) != 2 || out[1].Content != "你好,你是谁" { + t.Fatalf("should append user: len=%d", len(out)) + } + dup := appendUserMessageIfNeeded(out, "你好,你是谁") + if len(dup) != 2 { + t.Fatalf("should not duplicate user message: len=%d", len(dup)) + } +} + +func TestErrTransientRetryContinue(t *testing.T) { + t.Parallel() + if !errors.Is(ErrTransientRetryContinue, ErrTransientRetryContinue) { + t.Fatal("sentinel should match") + } +} diff --git a/internal/multiagent/interrupt.go b/internal/multiagent/interrupt.go index 500e300f..e58b6ea9 100644 --- a/internal/multiagent/interrupt.go +++ b/internal/multiagent/interrupt.go @@ -5,3 +5,7 @@ import "errors" // ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时, // 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。 var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context") + +// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后 +// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。 +var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace") diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 50ede619..2f5cc10a 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -538,7 +538,7 @@ func RunDeepAgent( } baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware) - baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) + baseMsgs = appendUserMessageIfNeeded(baseMsgs, userMessage) streamsMainAssistant := func(agent string) bool { if orchMode == "plan_execute" { @@ -566,6 +566,8 @@ func RunDeepAgent( StreamsMainAssistant: streamsMainAssistant, EinoRoleTag: einoRoleTag, CheckpointDir: ma.EinoMiddleware.CheckpointDir, + RunRetryMaxAttempts: ma.EinoMiddleware.RunRetryMaxAttempts, + RunRetryMaxBackoffSec: ma.EinoMiddleware.RunRetryMaxBackoffSec, McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, FilesystemMonitorAgent: ag,