Add files via upload

This commit is contained in:
公明
2026-05-22 17:11:34 +08:00
committed by GitHub
parent f826b91362
commit acba8e5a39
6 changed files with 314 additions and 5 deletions
+27 -2
View File
@@ -77,6 +77,9 @@ type einoADKRunLoopArgs struct {
StreamsMainAssistant func(agent string) bool
EinoRoleTag func(agent string) string
CheckpointDir string
// RunRetryMaxAttempts / RunRetryMaxBackoffSec429、5xx、网络抖动时的指数退避续跑(0=默认 10 次 / 30s 上限)。
RunRetryMaxAttempts int
RunRetryMaxBackoffSec int
McpIDsMu *sync.Mutex
McpIDs *[]string
@@ -437,6 +440,28 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
return runErr
}
// maybeRetryTransientRun:不在此层 runner.Run/Resume;由 handler 落库 + loadHistoryFromAgentTrace 分段续跑(同中断并继续)。
maybeRetryTransientRun := func(runErr error) (retry bool, fatal error) {
if runErr == nil || !isEinoTransientRunError(runErr) {
return false, handleRunErr(runErr)
}
if logger != nil {
logger.Warn("eino transient error, ending run segment for handler resume",
zap.Error(runErr),
zap.String("orchestration", orchMode))
}
if progress != nil {
progress("eino_run_retry", "遇到临时错误(限流或网络波动),将保存上下文并重试…", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"error": runErr.Error(),
"resumeKind": "trace_segment",
})
}
return false, ErrTransientRetryContinue
}
takePartial := func(runErr error) (*RunResult, error) {
if len(runAccumulatedMsgs) <= baseAccumulatedCount {
return nil, runErr
@@ -519,7 +544,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
if ev.Err != nil {
if retErr := handleRunErr(ev.Err); retErr != nil {
if _, retErr := maybeRetryTransientRun(ev.Err); retErr != nil {
return takePartial(retErr)
}
}
@@ -821,7 +846,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
"einoRole": einoRoleTag(ev.AgentName),
})
}
if retErr := handleRunErr(streamRecvErr); retErr != nil {
if _, retErr := maybeRetryTransientRun(streamRecvErr); retErr != nil {
return takePartial(retErr)
}
}
+3 -2
View File
@@ -18,7 +18,6 @@ import (
einoopenai "github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
"go.uber.org/zap"
)
@@ -213,7 +212,7 @@ func RunEinoSingleChatModelAgent(
}
baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware)
baseMsgs = append(baseMsgs, schema.UserMessage(userMessage))
baseMsgs = appendUserMessageIfNeeded(baseMsgs, userMessage)
streamsMainAssistant := func(agent string) bool {
return agent == "" || agent == einoSingleAgentName
@@ -233,6 +232,8 @@ func RunEinoSingleChatModelAgent(
StreamsMainAssistant: streamsMainAssistant,
EinoRoleTag: einoRoleTag,
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
RunRetryMaxAttempts: ma.EinoMiddleware.RunRetryMaxAttempts,
RunRetryMaxBackoffSec: ma.EinoMiddleware.RunRetryMaxBackoffSec,
McpIDsMu: &mcpIDsMu,
McpIDs: &mcpIDs,
FilesystemMonitorAgent: ag,
+173
View File
@@ -0,0 +1,173 @@
package multiagent
import (
"context"
"errors"
"strings"
"time"
"cyberstrike-ai/internal/config"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
)
const (
defaultEinoRunRetryMaxAttempts = 10
defaultEinoRunRetryMaxBackoff = 30 * time.Second
)
// isEinoTransientRunError 判断 ADK 运行期错误是否适合指数退避续跑(429、5xx、网络抖动等)。
// 用户取消、超时、迭代上限等由 run loop 单独处理,不在此列。
func isEinoTransientRunError(err error) bool {
if err == nil {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
if isEinoIterationLimitError(err) {
return false
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
transientMarkers := []string{
"406",
"429",
"too many requests",
"rate limit",
"rate_limit",
"ratelimit",
"quota exceeded",
"overloaded",
"capacity",
"temporarily unavailable",
"service unavailable",
"bad gateway",
"gateway timeout",
"internal server error",
"connection reset",
"connection refused",
"connection closed",
"i/o timeout",
"no such host",
"network is unreachable",
"broken pipe",
"eof",
"read tcp",
"write tcp",
"dial tcp",
"tls handshake timeout",
"stream error",
"unexpected eof",
"unexpected end of json",
"status code: 406",
"status code: 502",
"502",
"503",
"504",
"500",
}
for _, m := range transientMarkers {
if strings.Contains(msg, m) {
return true
}
}
return false
}
func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int {
if args != nil && args.RunRetryMaxAttempts > 0 {
return args.RunRetryMaxAttempts
}
return defaultEinoRunRetryMaxAttempts
}
// RunRetryMaxAttemptsFromConfig 供 handler 分段续跑计数(与 eino_middleware.run_retry_max_attempts 一致)。
func RunRetryMaxAttemptsFromConfig(mw *config.MultiAgentEinoMiddlewareConfig) int {
if mw != nil && mw.RunRetryMaxAttempts > 0 {
return mw.RunRetryMaxAttempts
}
return defaultEinoRunRetryMaxAttempts
}
// TransientRetryBackoff 供 handler 在分段续跑前退避。
func TransientRetryBackoff(attempt int, maxBackoffSec int) time.Duration {
max := defaultEinoRunRetryMaxBackoff
if maxBackoffSec > 0 {
max = time.Duration(maxBackoffSec) * time.Second
}
return einoTransientRetryBackoff(attempt, max)
}
func einoRunRetryMaxBackoff(args *einoADKRunLoopArgs) time.Duration {
if args != nil && args.RunRetryMaxBackoffSec > 0 {
return time.Duration(args.RunRetryMaxBackoffSec) * time.Second
}
return defaultEinoRunRetryMaxBackoff
}
// einoRunRestartContextSource 描述无 checkpoint Resume 时 Run 使用的消息来源(日志/SSE)。
type einoRunRestartContextSource string
const (
einoRestartContextInitial einoRunRestartContextSource = "initial"
einoRestartContextAccumulated einoRunRestartContextSource = "accumulated"
einoRestartContextModelTrace einoRunRestartContextSource = "model_trace"
)
// einoMessagesForRunRestart 在退避后重新 Run 时选用最完整的上下文:
// 1) ModelFacingTrace(与模型实际入参一致) 2) 事件流累积的 runAccumulatedMsgs 3) 初始 msgs。
func einoMessagesForRunRestart(args *einoADKRunLoopArgs, baseMsgs, accumulated []adk.Message, baseCount int) ([]adk.Message, einoRunRestartContextSource) {
if trace := persistTraceSource(args, nil); len(trace) > 0 {
return append([]adk.Message(nil), trace...), einoRestartContextModelTrace
}
if len(accumulated) > baseCount {
return append([]adk.Message(nil), accumulated...), einoRestartContextAccumulated
}
return append([]adk.Message(nil), baseMsgs...), einoRestartContextInitial
}
// adkMessagesHasUserContent 从尾部向前查找,是否已有与 want 相同的 user 消息(避免重复 append)。
func adkMessagesHasUserContent(msgs []adk.Message, want string) bool {
want = strings.TrimSpace(want)
if want == "" {
return true
}
for i := len(msgs) - 1; i >= 0; i-- {
m := msgs[i]
if m == nil {
continue
}
if m.Role == schema.User {
return strings.TrimSpace(m.Content) == want
}
if m.Role == schema.Assistant || m.Role == schema.Tool {
continue
}
break
}
return false
}
// appendUserMessageIfNeeded 在 history 轨迹之后追加本轮 user 消息(仅当轨迹中尚未包含该句)。
func appendUserMessageIfNeeded(msgs []adk.Message, userMessage string) []adk.Message {
if strings.TrimSpace(userMessage) == "" || adkMessagesHasUserContent(msgs, userMessage) {
return msgs
}
return append(msgs, schema.UserMessage(userMessage))
}
// einoTransientRetryBackoff 指数退避:2s, 4s, 8s… capped by maxBackoff。
func einoTransientRetryBackoff(attempt int, maxBackoff time.Duration) time.Duration {
if attempt < 0 {
attempt = 0
}
backoff := time.Duration(1<<uint(attempt+1)) * time.Second
if maxBackoff > 0 && backoff > maxBackoff {
backoff = maxBackoff
}
return backoff
}
@@ -0,0 +1,104 @@
package multiagent
import (
"context"
"errors"
"testing"
"time"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
)
func TestIsEinoTransientRunError(t *testing.T) {
t.Parallel()
cases := []struct {
name string
err error
want bool
}{
{"nil", nil, false},
{"429", errors.New("HTTP 429 Too Many Requests"), true},
{"rate limit", errors.New(`{"error":"rate limit exceeded"}`), true},
{"connection reset", errors.New("read tcp: connection reset by peer"), true},
{"503", errors.New("upstream returned 503"), true},
{"iteration limit", errors.New("max iteration reached"), false},
{"canceled", context.Canceled, false},
{"deadline", context.DeadlineExceeded, false},
{"auth", errors.New("invalid api key"), false},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
if got := isEinoTransientRunError(tc.err); got != tc.want {
t.Fatalf("isEinoTransientRunError(%v) = %v, want %v", tc.err, got, tc.want)
}
})
}
}
func TestEinoTransientRetryBackoff(t *testing.T) {
t.Parallel()
max := 30 * time.Second
if got := einoTransientRetryBackoff(0, max); got != 2*time.Second {
t.Fatalf("attempt 0: got %v", got)
}
if got := einoTransientRetryBackoff(4, max); got != 30*time.Second {
t.Fatalf("attempt 4 capped: got %v", got)
}
}
func TestEinoMessagesForRunRestart(t *testing.T) {
t.Parallel()
base := []adk.Message{schema.UserMessage("hi")}
acc := append([]adk.Message(nil), base...)
acc = append(acc, schema.AssistantMessage("step1", nil))
got, src := einoMessagesForRunRestart(nil, base, acc, len(base))
if src != einoRestartContextAccumulated || len(got) != 2 {
t.Fatalf("accumulated: src=%s len=%d", src, len(got))
}
holder := newModelFacingTraceHolder()
holder.storeFromState(&adk.ChatModelAgentState{
Messages: []adk.Message{schema.UserMessage("u"), schema.AssistantMessage("model-view", nil)},
})
got2, src2 := einoMessagesForRunRestart(&einoADKRunLoopArgs{ModelFacingTrace: holder}, base, acc, len(base))
if src2 != einoRestartContextModelTrace || len(got2) != 2 {
t.Fatalf("model trace: src=%s len=%d", src2, len(got2))
}
}
func TestEinoRunRetryMaxAttemptsFromArgs(t *testing.T) {
t.Parallel()
if einoRunRetryMaxAttempts(nil) != defaultEinoRunRetryMaxAttempts {
t.Fatal("nil args should use default")
}
if einoRunRetryMaxAttempts(&einoADKRunLoopArgs{RunRetryMaxAttempts: 3}) != 3 {
t.Fatal("custom max attempts")
}
if RunRetryMaxAttemptsFromConfig(nil) != defaultEinoRunRetryMaxAttempts {
t.Fatal("config nil should use default")
}
}
func TestAppendUserMessageIfNeeded(t *testing.T) {
t.Parallel()
msgs := []adk.Message{schema.UserMessage("old task")}
out := appendUserMessageIfNeeded(msgs, "你好,你是谁")
if len(out) != 2 || out[1].Content != "你好,你是谁" {
t.Fatalf("should append user: len=%d", len(out))
}
dup := appendUserMessageIfNeeded(out, "你好,你是谁")
if len(dup) != 2 {
t.Fatalf("should not duplicate user message: len=%d", len(dup))
}
}
func TestErrTransientRetryContinue(t *testing.T) {
t.Parallel()
if !errors.Is(ErrTransientRetryContinue, ErrTransientRetryContinue) {
t.Fatal("sentinel should match")
}
}
+4
View File
@@ -5,3 +5,7 @@ import "errors"
// ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时,
// 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。
var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context")
// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。
var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace")
+3 -1
View File
@@ -538,7 +538,7 @@ func RunDeepAgent(
}
baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware)
baseMsgs = append(baseMsgs, schema.UserMessage(userMessage))
baseMsgs = appendUserMessageIfNeeded(baseMsgs, userMessage)
streamsMainAssistant := func(agent string) bool {
if orchMode == "plan_execute" {
@@ -566,6 +566,8 @@ func RunDeepAgent(
StreamsMainAssistant: streamsMainAssistant,
EinoRoleTag: einoRoleTag,
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
RunRetryMaxAttempts: ma.EinoMiddleware.RunRetryMaxAttempts,
RunRetryMaxBackoffSec: ma.EinoMiddleware.RunRetryMaxBackoffSec,
McpIDsMu: &mcpIDsMu,
McpIDs: &mcpIDs,
FilesystemMonitorAgent: ag,