diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index ff0e901a..897a4488 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -18,6 +18,21 @@ import ( "go.uber.org/zap" ) +func isEinoIterationLimitError(err error) bool { + if err == nil { + return false + } + msg := strings.ToLower(strings.TrimSpace(err.Error())) + if msg == "" { + return false + } + return strings.Contains(msg, "max iteration") || + strings.Contains(msg, "maximum iteration") || + strings.Contains(msg, "maximum iterations") || + strings.Contains(msg, "iteration limit") || + strings.Contains(msg, "达到最大迭代") +} + // einoADKRunLoopArgs 将 Eino adk.Runner 事件循环从 RunDeepAgent / RunEinoSingleChatModelAgent 中抽出复用。 type einoADKRunLoopArgs struct { OrchMode string @@ -205,6 +220,98 @@ attemptLoop: } runner := adk.NewRunner(ctx, runnerCfg) iter := runner.Run(ctx, msgs) + handleRunErr := func(runErr error, attempt int, reasonOverride string) (retry bool, retErr error) { + if runErr == nil { + return false, nil + } + if errors.Is(runErr, context.DeadlineExceeded) { + flushAllPendingAsFailed(runErr) + if progress != nil { + progress("error", runErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "errorKind": "timeout", + }) + } + return false, runErr + } + // context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。 + if errors.Is(runErr, context.Canceled) { + flushAllPendingAsFailed(runErr) + if progress != nil { + progress("error", runErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } + return false, runErr + } + if isEinoIterationLimitError(runErr) { + flushAllPendingAsFailed(runErr) + if progress != nil { + progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "orchestration": orchMode, + }) + progress("error", runErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "errorKind": "iteration_limit", + }) + } + return false, runErr + } + + canRetry := attempt+1 < maxToolCallRecoveryAttempts + if !canRetry { + // 重试次数已耗尽,终止。 + flushAllPendingAsFailed(runErr) + if progress != nil { + progress("error", runErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } + return false, runErr + } + + // 区分错误类型以选择最合适的纠错提示,但无论哪种都执行重试(default-soft)。 + var hint *schema.Message + var reason, timelineMsg string + switch { + case strings.TrimSpace(reasonOverride) != "": + hint = toolExecutionRetryHint() + reason = strings.TrimSpace(reasonOverride) + timelineMsg = toolExecutionRecoveryTimelineMessage(attempt) + case isRecoverableToolCallArgumentsJSONError(runErr): + hint = toolCallArgumentsJSONRetryHint() + reason = "invalid_tool_arguments_json" + timelineMsg = toolCallArgumentsJSONRecoveryTimelineMessage(attempt) + default: + hint = toolExecutionRetryHint() + reason = "tool_execution_error" + timelineMsg = toolExecutionRecoveryTimelineMessage(attempt) + } + + if logger != nil { + logger.Warn("eino: recoverable error, will retry with corrective hint", + zap.Error(runErr), zap.Int("attempt", attempt), zap.String("reason", reason)) + } + flushAllPendingAsFailed(runErr) + retryHints = append(retryHints, hint) + if progress != nil { + progress("eino_recovery", timelineMsg, map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "einoRetry": attempt, + "runIndex": attempt + 1, + "maxRuns": maxToolCallRecoveryAttempts, + "reason": reason, + }) + } + return true, nil + } for { // 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。 @@ -223,6 +330,18 @@ attemptLoop: ev, ok := iter.Next() if !ok { + if len(pendingByID) > 0 { + orphanCount := len(pendingByID) + flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion")) + if progress != nil { + progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "orchestration": orchMode, + "pendingCount": orphanCount, + }) + } + } lastRunMsgs = msgs break attemptLoop } @@ -230,72 +349,11 @@ attemptLoop: continue } if ev.Err != nil { - if errors.Is(ev.Err, context.DeadlineExceeded) { - flushAllPendingAsFailed(ev.Err) - if progress != nil { - progress("error", ev.Err.Error(), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "errorKind": "timeout", - }) - } - return nil, ev.Err + if retry, retErr := handleRunErr(ev.Err, attempt, ""); retErr != nil { + return nil, retErr + } else if retry { + continue attemptLoop } - // context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。 - if errors.Is(ev.Err, context.Canceled) { - flushAllPendingAsFailed(ev.Err) - if progress != nil { - progress("error", ev.Err.Error(), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - }) - } - return nil, ev.Err - } - - canRetry := attempt+1 < maxToolCallRecoveryAttempts - if !canRetry { - // 重试次数已耗尽,终止。 - flushAllPendingAsFailed(ev.Err) - if progress != nil { - progress("error", ev.Err.Error(), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - }) - } - return nil, ev.Err - } - - // 区分错误类型以选择最合适的纠错提示,但无论哪种都执行重试(default-soft)。 - var hint *schema.Message - var reason, timelineMsg string - if isRecoverableToolCallArgumentsJSONError(ev.Err) { - hint = toolCallArgumentsJSONRetryHint() - reason = "invalid_tool_arguments_json" - timelineMsg = toolCallArgumentsJSONRecoveryTimelineMessage(attempt) - } else { - hint = toolExecutionRetryHint() - reason = "tool_execution_error" - timelineMsg = toolExecutionRecoveryTimelineMessage(attempt) - } - - if logger != nil { - logger.Warn("eino: recoverable error, will retry with corrective hint", - zap.Error(ev.Err), zap.Int("attempt", attempt), zap.String("reason", reason)) - } - flushAllPendingAsFailed(ev.Err) - retryHints = append(retryHints, hint) - if progress != nil { - progress("eino_recovery", timelineMsg, map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "einoRetry": attempt, - "runIndex": attempt + 1, - "maxRuns": maxToolCallRecoveryAttempts, - "reason": reason, - }) - } - continue attemptLoop } if ev.AgentName != "" && progress != nil { iterEinoAgent := orchestratorName @@ -349,6 +407,7 @@ attemptLoop: var subAssistantBuf strings.Builder var subReplyStreamID string var mainAssistantBuf strings.Builder + var streamRecvErr error for { chunk, rerr := mv.MessageStream.Recv() if rerr != nil { @@ -361,6 +420,7 @@ attemptLoop: zap.String("agent", ev.AgentName), zap.Int("toolFragments", len(toolStreamFragments))) } + streamRecvErr = rerr break } if chunk == nil { @@ -459,6 +519,21 @@ attemptLoop: lastToolChunk = &schema.Message{ToolCalls: merged} } tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + if streamRecvErr != nil { + if progress != nil { + progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + }) + } + if retry, retErr := handleRunErr(streamRecvErr, attempt, "stream_recv_error"); retErr != nil { + return nil, retErr + } else if retry { + continue attemptLoop + } + } continue }