From 7bf069752694af74dfdaac99265f2355a8fd8333 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 8 Apr 2026 22:15:25 +0800 Subject: [PATCH] Add files via upload --- internal/einomcp/mcp_tools.go | 8 +- internal/multiagent/runner.go | 137 ++++++++++++++++++++++++++++++++-- 2 files changed, 136 insertions(+), 9 deletions(-) diff --git a/internal/einomcp/mcp_tools.go b/internal/einomcp/mcp_tools.go index 2718f18d..72228a34 100644 --- a/internal/einomcp/mcp_tools.go +++ b/internal/einomcp/mcp_tools.go @@ -160,13 +160,17 @@ func runMCPToolInvocation( } // UnknownToolReminderHandler 供 compose.ToolsNodeConfig.UnknownToolsHandler 使用: -// 模型请求了未注册的工具名时,仅返回说明性文本,error 恒为 nil,以便 ReAct 继续迭代而不中断图执行。 +// 模型请求了未注册的工具名时,返回一个「可恢复」的错误,让上层 runner 触发重试与纠错提示, +// 同时避免 UI 永远停留在“执行中”(runner 会在 recoverable 分支 flush 掉 pending 的 tool_call)。 // 不进行名称猜测或映射,避免误执行。 func UnknownToolReminderHandler() func(ctx context.Context, name, input string) (string, error) { return func(ctx context.Context, name, input string) (string, error) { _ = ctx _ = input - return unknownToolReminderText(strings.TrimSpace(name)), nil + requested := strings.TrimSpace(name) + // Return a recoverable error that still carries a friendly, bilingual hint. + // This will be caught by multiagent runner as "tool not found" and trigger a retry. + return "", fmt.Errorf("tool %q not found: %s", requested, unknownToolReminderText(requested)) } } diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 3967e3d3..a04590c5 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -36,6 +36,16 @@ type RunResult struct { LastReActOutput string } +// toolCallPendingInfo tracks a tool_call emitted to the UI so we can later +// correlate tool_result events (even when the framework omits ToolCallID) and +// avoid leaving the UI stuck in "running" state on recoverable errors. +type toolCallPendingInfo struct { + ToolCallID string + ToolName string + EinoAgent string + EinoRole string +} + // RunDeepAgent 使用 Eino DeepAgent 执行一轮对话(流式事件通过 progress 回调输出)。 func RunDeepAgent( ctx context.Context, @@ -326,6 +336,69 @@ attemptLoop: var einoMainRound int var einoLastAgent string subAgentToolStep := make(map[string]int) + // Track tool calls emitted in this attempt so we can: + // - attach toolCallId to tool_result when framework omits it + // - flush running tool calls as failed when a recoverable tool execution error happens + pendingByID := make(map[string]toolCallPendingInfo) + pendingQueueByAgent := make(map[string][]string) + markPending := func(tc toolCallPendingInfo) { + if tc.ToolCallID == "" { + return + } + pendingByID[tc.ToolCallID] = tc + pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) + } + popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { + q := pendingQueueByAgent[agentName] + for len(q) > 0 { + id := q[0] + q = q[1:] + pendingQueueByAgent[agentName] = q + if tc, ok := pendingByID[id]; ok { + delete(pendingByID, id) + return tc, true + } + } + return toolCallPendingInfo{}, false + } + removePendingByID := func(toolCallID string) { + if toolCallID == "" { + return + } + delete(pendingByID, toolCallID) + // queue cleanup is lazy in popNextPendingForAgent + } + flushAllPendingAsFailed := func(err error) { + if progress == nil { + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + return + } + msg := "" + if err != nil { + msg = err.Error() + } + for _, tc := range pendingByID { + toolName := tc.ToolName + if strings.TrimSpace(toolName) == "" { + toolName = "unknown" + } + progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ + "toolName": toolName, + "success": false, + "isError": true, + "result": msg, + "resultPreview": msg, + "toolCallId": tc.ToolCallID, + "conversationId": conversationID, + "einoAgent": tc.EinoAgent, + "einoRole": tc.EinoRole, + "source": "eino", + }) + } + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + } runner := adk.NewRunner(ctx, adk.RunnerConfig{ Agent: da, @@ -370,6 +443,9 @@ attemptLoop: logger.Warn("eino: recoverable tool execution error, will retry with corrective hint", zap.Error(ev.Err), zap.Int("attempt", attempt)) } + // Ensure UI/tool timeline doesn't get stuck at "running" for tool calls that + // will never receive a proper tool_result due to the recoverable error. + flushAllPendingAsFailed(ev.Err) retryHints = append(retryHints, toolExecutionRetryHint()) if progress != nil { progress("eino_recovery", toolExecutionRecoveryTimelineMessage(attempt), map[string]interface{}{ @@ -385,6 +461,7 @@ attemptLoop: } // Non-recoverable error. + flushAllPendingAsFailed(ev.Err) if progress != nil { progress("error", ev.Err.Error(), map[string]interface{}{ "conversationId": conversationID, @@ -535,7 +612,7 @@ attemptLoop: if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { lastToolChunk = &schema.Message{ToolCalls: merged} } - tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) + tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) continue } @@ -543,7 +620,7 @@ attemptLoop: if gerr != nil || msg == nil { continue } - tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) + tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) if mv.Role == schema.Assistant { if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { @@ -611,8 +688,31 @@ attemptLoop: "einoRole": einoRoleTag(ev.AgentName), "source": "eino", } - if msg.ToolCallID != "" { - data["toolCallId"] = msg.ToolCallID + toolCallID := strings.TrimSpace(msg.ToolCallID) + // Some framework paths (e.g. UnknownToolsHandler) may omit ToolCallID on tool messages. + // Infer from the tool_call emission order for this agent to keep UI state consistent. + if toolCallID == "" { + // In some internal tool execution paths, ev.AgentName may be empty for tool-role + // messages. Try several fallbacks to avoid leaving UI tool_call status stuck. + if inferred, ok := popNextPendingForAgent(ev.AgentName); ok { + toolCallID = inferred.ToolCallID + } else if inferred, ok := popNextPendingForAgent(orchestratorName); ok { + toolCallID = inferred.ToolCallID + } else if inferred, ok := popNextPendingForAgent(""); ok { + toolCallID = inferred.ToolCallID + } else { + // last resort: pick any pending toolCallID + for id := range pendingByID { + toolCallID = id + delete(pendingByID, id) + break + } + } + } else { + removePendingByID(toolCallID) + } + if toolCallID != "" { + data["toolCallId"] = toolCallID } progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } @@ -755,7 +855,14 @@ func toolCallsRichSignature(msg *schema.Message) string { return base + "|" + strings.Join(parts, ";") } -func tryEmitToolCallsOnce(msg *schema.Message, agentName, orchestratorName, conversationID string, progress func(string, string, interface{}), seen map[string]struct{}, subAgentToolStep map[string]int) { +func tryEmitToolCallsOnce( + msg *schema.Message, + agentName, orchestratorName, conversationID string, + progress func(string, string, interface{}), + seen map[string]struct{}, + subAgentToolStep map[string]int, + markPending func(toolCallPendingInfo), +) { if msg == nil || len(msg.ToolCalls) == 0 || progress == nil || seen == nil { return } @@ -767,10 +874,16 @@ func tryEmitToolCallsOnce(msg *schema.Message, agentName, orchestratorName, conv return } seen[sig] = struct{}{} - emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, progress, subAgentToolStep) + emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, progress, subAgentToolStep, markPending) } -func emitToolCallsFromMessage(msg *schema.Message, agentName, orchestratorName, conversationID string, progress func(string, string, interface{}), subAgentToolStep map[string]int) { +func emitToolCallsFromMessage( + msg *schema.Message, + agentName, orchestratorName, conversationID string, + progress func(string, string, interface{}), + subAgentToolStep map[string]int, + markPending func(toolCallPendingInfo), +) { if msg == nil || len(msg.ToolCalls) == 0 || progress == nil { return } @@ -819,6 +932,16 @@ func emitToolCallsFromMessage(msg *schema.Message, agentName, orchestratorName, if toolCallID == "" && tc.Index != nil { toolCallID = fmt.Sprintf("eino-stream-%d", *tc.Index) } + // Record pending tool calls for later tool_result correlation / recovery flushing. + // We intentionally record even for unknown tools to avoid "running" badge getting stuck. + if markPending != nil && toolCallID != "" { + markPending(toolCallPendingInfo{ + ToolCallID: toolCallID, + ToolName: display, + EinoAgent: agentName, + EinoRole: role, + }) + } progress("tool_call", fmt.Sprintf("正在调用工具: %s", display), map[string]interface{}{ "toolName": display, "arguments": argStr,