Add files via upload

This commit is contained in:
公明
2026-04-24 20:00:50 +08:00
committed by GitHub
parent 3dfb3b4e82
commit 5edf3a70f9
+140 -65
View File
@@ -18,6 +18,21 @@ import (
"go.uber.org/zap"
)
func isEinoIterationLimitError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
return strings.Contains(msg, "max iteration") ||
strings.Contains(msg, "maximum iteration") ||
strings.Contains(msg, "maximum iterations") ||
strings.Contains(msg, "iteration limit") ||
strings.Contains(msg, "达到最大迭代")
}
// einoADKRunLoopArgs 将 Eino adk.Runner 事件循环从 RunDeepAgent / RunEinoSingleChatModelAgent 中抽出复用。
type einoADKRunLoopArgs struct {
OrchMode string
@@ -205,6 +220,98 @@ attemptLoop:
}
runner := adk.NewRunner(ctx, runnerCfg)
iter := runner.Run(ctx, msgs)
handleRunErr := func(runErr error, attempt int, reasonOverride string) (retry bool, retErr error) {
if runErr == nil {
return false, nil
}
if errors.Is(runErr, context.DeadlineExceeded) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"errorKind": "timeout",
})
}
return false, runErr
}
// context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。
if errors.Is(runErr, context.Canceled) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return false, runErr
}
if isEinoIterationLimitError(runErr) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
})
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"errorKind": "iteration_limit",
})
}
return false, runErr
}
canRetry := attempt+1 < maxToolCallRecoveryAttempts
if !canRetry {
// 重试次数已耗尽,终止。
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return false, runErr
}
// 区分错误类型以选择最合适的纠错提示,但无论哪种都执行重试(default-soft)。
var hint *schema.Message
var reason, timelineMsg string
switch {
case strings.TrimSpace(reasonOverride) != "":
hint = toolExecutionRetryHint()
reason = strings.TrimSpace(reasonOverride)
timelineMsg = toolExecutionRecoveryTimelineMessage(attempt)
case isRecoverableToolCallArgumentsJSONError(runErr):
hint = toolCallArgumentsJSONRetryHint()
reason = "invalid_tool_arguments_json"
timelineMsg = toolCallArgumentsJSONRecoveryTimelineMessage(attempt)
default:
hint = toolExecutionRetryHint()
reason = "tool_execution_error"
timelineMsg = toolExecutionRecoveryTimelineMessage(attempt)
}
if logger != nil {
logger.Warn("eino: recoverable error, will retry with corrective hint",
zap.Error(runErr), zap.Int("attempt", attempt), zap.String("reason", reason))
}
flushAllPendingAsFailed(runErr)
retryHints = append(retryHints, hint)
if progress != nil {
progress("eino_recovery", timelineMsg, map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"einoRetry": attempt,
"runIndex": attempt + 1,
"maxRuns": maxToolCallRecoveryAttempts,
"reason": reason,
})
}
return true, nil
}
for {
// 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。
@@ -223,6 +330,18 @@ attemptLoop:
ev, ok := iter.Next()
if !ok {
if len(pendingByID) > 0 {
orphanCount := len(pendingByID)
flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion"))
if progress != nil {
progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"pendingCount": orphanCount,
})
}
}
lastRunMsgs = msgs
break attemptLoop
}
@@ -230,72 +349,11 @@ attemptLoop:
continue
}
if ev.Err != nil {
if errors.Is(ev.Err, context.DeadlineExceeded) {
flushAllPendingAsFailed(ev.Err)
if progress != nil {
progress("error", ev.Err.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"errorKind": "timeout",
})
}
return nil, ev.Err
if retry, retErr := handleRunErr(ev.Err, attempt, ""); retErr != nil {
return nil, retErr
} else if retry {
continue attemptLoop
}
// context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。
if errors.Is(ev.Err, context.Canceled) {
flushAllPendingAsFailed(ev.Err)
if progress != nil {
progress("error", ev.Err.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return nil, ev.Err
}
canRetry := attempt+1 < maxToolCallRecoveryAttempts
if !canRetry {
// 重试次数已耗尽,终止。
flushAllPendingAsFailed(ev.Err)
if progress != nil {
progress("error", ev.Err.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return nil, ev.Err
}
// 区分错误类型以选择最合适的纠错提示,但无论哪种都执行重试(default-soft)。
var hint *schema.Message
var reason, timelineMsg string
if isRecoverableToolCallArgumentsJSONError(ev.Err) {
hint = toolCallArgumentsJSONRetryHint()
reason = "invalid_tool_arguments_json"
timelineMsg = toolCallArgumentsJSONRecoveryTimelineMessage(attempt)
} else {
hint = toolExecutionRetryHint()
reason = "tool_execution_error"
timelineMsg = toolExecutionRecoveryTimelineMessage(attempt)
}
if logger != nil {
logger.Warn("eino: recoverable error, will retry with corrective hint",
zap.Error(ev.Err), zap.Int("attempt", attempt), zap.String("reason", reason))
}
flushAllPendingAsFailed(ev.Err)
retryHints = append(retryHints, hint)
if progress != nil {
progress("eino_recovery", timelineMsg, map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"einoRetry": attempt,
"runIndex": attempt + 1,
"maxRuns": maxToolCallRecoveryAttempts,
"reason": reason,
})
}
continue attemptLoop
}
if ev.AgentName != "" && progress != nil {
iterEinoAgent := orchestratorName
@@ -349,6 +407,7 @@ attemptLoop:
var subAssistantBuf strings.Builder
var subReplyStreamID string
var mainAssistantBuf strings.Builder
var streamRecvErr error
for {
chunk, rerr := mv.MessageStream.Recv()
if rerr != nil {
@@ -361,6 +420,7 @@ attemptLoop:
zap.String("agent", ev.AgentName),
zap.Int("toolFragments", len(toolStreamFragments)))
}
streamRecvErr = rerr
break
}
if chunk == nil {
@@ -459,6 +519,21 @@ attemptLoop:
lastToolChunk = &schema.Message{ToolCalls: merged}
}
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
if streamRecvErr != nil {
if progress != nil {
progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
})
}
if retry, retErr := handleRunErr(streamRecvErr, attempt, "stream_recv_error"); retErr != nil {
return nil, retErr
} else if retry {
continue attemptLoop
}
}
continue
}