diff --git a/internal/einomcp/mcp_tools.go b/internal/einomcp/mcp_tools.go index 18d8ce5e..9fcd2b7a 100644 --- a/internal/einomcp/mcp_tools.go +++ b/internal/einomcp/mcp_tools.go @@ -92,6 +92,19 @@ func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) { func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) { _ = opts + return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk) +} + +// runMCPToolInvocation 与 mcpBridgeTool.InvokableRun 共用。 +func runMCPToolInvocation( + ctx context.Context, + ag *agent.Agent, + holder *ConversationHolder, + toolName string, + argumentsInJSON string, + record ExecutionRecorder, + chunk func(toolName, toolCallID, chunk string), +) (string, error) { var args map[string]interface{} if argumentsInJSON != "" && argumentsInJSON != "null" { if err := json.Unmarshal([]byte(argumentsInJSON), &args); err != nil { @@ -102,44 +115,62 @@ func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string args = map[string]interface{}{} } - // Stream tool output (stdout/stderr) to upper layer via security.Executor's callback. - // This enables multi-agent mode to show execution progress on the frontend. - if m.chunk != nil { + if chunk != nil { toolCallID := compose.GetToolCallID(ctx) if toolCallID != "" { if existing, ok := ctx.Value(security.ToolOutputCallbackCtxKey).(security.ToolOutputCallback); ok && existing != nil { - // Chain existing callback (if any) + our progress forwarder. ctx = context.WithValue(ctx, security.ToolOutputCallbackCtxKey, security.ToolOutputCallback(func(c string) { existing(c) if strings.TrimSpace(c) == "" { return } - m.chunk(m.name, toolCallID, c) + chunk(toolName, toolCallID, c) })) } else { ctx = context.WithValue(ctx, security.ToolOutputCallbackCtxKey, security.ToolOutputCallback(func(c string) { if strings.TrimSpace(c) == "" { return } - m.chunk(m.name, toolCallID, c) + chunk(toolName, toolCallID, c) })) } } } - conv := m.holder.Get() - res, err := m.agent.ExecuteMCPToolForConversation(ctx, conv, m.name, args) + res, err := ag.ExecuteMCPToolForConversation(ctx, holder.Get(), toolName, args) if err != nil { return "", err } if res == nil { return "", nil } - if res.ExecutionID != "" && m.record != nil { - m.record(res.ExecutionID) + if res.ExecutionID != "" && record != nil { + record(res.ExecutionID) } if res.IsError { return ToolErrorPrefix + res.Result, nil } return res.Result, nil } + +// UnknownToolReminderHandler 供 compose.ToolsNodeConfig.UnknownToolsHandler 使用: +// 模型请求了未注册的工具名时,仅返回说明性文本,error 恒为 nil,以便 ReAct 继续迭代而不中断图执行。 +// 不进行名称猜测或映射,避免误执行。 +func UnknownToolReminderHandler() func(ctx context.Context, name, input string) (string, error) { + return func(ctx context.Context, name, input string) (string, error) { + _ = ctx + _ = input + return unknownToolReminderText(strings.TrimSpace(name)), nil + } +} + +func unknownToolReminderText(requested string) string { + if requested == "" { + requested = "(empty)" + } + return fmt.Sprintf(`The tool name %q is not registered for this agent. + +Please retry using only names that appear in the tool definitions for this turn (exact match, case-sensitive). Do not invent or rename tools; adjust your plan and continue. + +(工具 %q 未注册:请仅使用本回合上下文中给出的工具名称,须完全一致;请勿自行改写或猜测名称,并继续后续步骤。)`, requested, requested) +} diff --git a/internal/einomcp/mcp_tools_test.go b/internal/einomcp/mcp_tools_test.go new file mode 100644 index 00000000..078c8c04 --- /dev/null +++ b/internal/einomcp/mcp_tools_test.go @@ -0,0 +1,16 @@ +package einomcp + +import ( + "strings" + "testing" +) + +func TestUnknownToolReminderText(t *testing.T) { + s := unknownToolReminderText("bad_tool") + if !strings.Contains(s, "bad_tool") { + t.Fatalf("expected requested name in message: %s", s) + } + if strings.Contains(s, "Tools currently available") { + t.Fatal("unified message must not list tool names") + } +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index e504fcf0..dc45e20a 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -101,8 +101,8 @@ func RunDeepAgent( return } progress("tool_result_delta", chunk, map[string]interface{}{ - "toolName": toolName, - "toolCallId": toolCallID, + "toolName": toolName, + "toolCallId": toolCallID, // index/total/iteration are optional for UI; we don't know them in this bridge. "index": 0, "total": 0, @@ -221,7 +221,8 @@ func RunDeepAgent( Model: subModel, ToolsConfig: adk.ToolsConfig{ ToolsNodeConfig: compose.ToolsNodeConfig{ - Tools: subTools, + Tools: subTools, + UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), }, EmitInternalEvents: true, }, @@ -275,7 +276,8 @@ func RunDeepAgent( }, ToolsConfig: adk.ToolsConfig{ ToolsNodeConfig: compose.ToolsNodeConfig{ - Tools: mainTools, + Tools: mainTools, + UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), }, EmitInternalEvents: true, }, @@ -284,14 +286,8 @@ func RunDeepAgent( return nil, fmt.Errorf("deep.New: %w", err) } - msgs := historyToMessages(history) - msgs = append(msgs, schema.UserMessage(userMessage)) - - runner := adk.NewRunner(ctx, adk.RunnerConfig{ - Agent: da, - EnableStreaming: true, - }) - iter := runner.Run(ctx, msgs) + baseMsgs := historyToMessages(history) + baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) streamsMainAssistant := func(agent string) bool { return agent == "" || agent == orchestratorName @@ -303,255 +299,301 @@ func RunDeepAgent( return "sub" } - // 仅保留主代理最后一次 assistant 输出,避免把多轮中间回复拼接到最终答案。 + var lastRunMsgs []adk.Message var lastAssistant string - var reasoningStreamSeq int64 - var einoSubReplyStreamSeq int64 - toolEmitSeen := make(map[string]struct{}) - // 主代理「外层轮次」:首次进入编排器为第 1 轮,每从子代理回到编排器 +1。 - // 子代理「步数」:该子代理每次发起一批工具调用前 +1(近似 ReAct 步)。 - var einoMainRound int - var einoLastAgent string - subAgentToolStep := make(map[string]int) - for { - ev, ok := iter.Next() - if !ok { - break + +attemptLoop: + for attempt := 0; attempt < maxToolCallArgumentsJSONAttempts; attempt++ { + msgs := make([]adk.Message, 0, len(baseMsgs)+attempt) + msgs = append(msgs, baseMsgs...) + for i := 0; i < attempt; i++ { + msgs = append(msgs, toolCallArgumentsJSONRetryHint()) } - if ev == nil { - continue - } - if ev.Err != nil { + + if attempt > 0 { + mcpIDsMu.Lock() + mcpIDs = mcpIDs[:0] + mcpIDsMu.Unlock() + if logger != nil { + logger.Warn("eino DeepAgent: 工具参数 JSON 被接口拒绝,追加提示后重试", + zap.Int("attempt", attempt), + zap.Int("maxAttempts", maxToolCallArgumentsJSONAttempts)) + } if progress != nil { - progress("error", ev.Err.Error(), map[string]interface{}{ + // 使用专用事件类型 eino_recovery,便于前端时间线展示(progress 仅改标题,不进时间线) + progress("eino_recovery", toolCallArgumentsJSONRecoveryTimelineMessage(attempt), map[string]interface{}{ "conversationId": conversationID, - "source": "eino", + "source": "eino", + "einoRetry": attempt, + "runIndex": attempt + 1, // 第几轮完整运行(1 为首次,重试后递增) + "maxRuns": maxToolCallArgumentsJSONAttempts, + "reason": "invalid_tool_arguments_json", }) } - return nil, ev.Err } - if ev.AgentName != "" && progress != nil { - if streamsMainAssistant(ev.AgentName) { - if einoMainRound == 0 { - einoMainRound = 1 - progress("iteration", "", map[string]interface{}{ - "iteration": 1, - "einoScope": "main", - "einoRole": "orchestrator", - "einoAgent": orchestratorName, - "conversationId": conversationID, - "source": "eino", - }) - } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { - einoMainRound++ - progress("iteration", "", map[string]interface{}{ - "iteration": einoMainRound, - "einoScope": "main", - "einoRole": "orchestrator", - "einoAgent": orchestratorName, - "conversationId": conversationID, - "source": "eino", - }) - } - } - einoLastAgent = ev.AgentName - progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{ - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - }) - } - if ev.Output == nil || ev.Output.MessageOutput == nil { - continue - } - mv := ev.Output.MessageOutput - if mv.IsStreaming && mv.MessageStream != nil { - streamHeaderSent := false - var reasoningStreamID string - var toolStreamFragments []schema.ToolCall - var subAssistantBuf strings.Builder - var subReplyStreamID string - var mainAssistantBuf strings.Builder - for { - chunk, rerr := mv.MessageStream.Recv() - if rerr != nil { - if errors.Is(rerr, io.EOF) { - break - } + // 仅保留主代理最后一次 assistant 输出;每轮重试重置,避免拼接失败轮次的片段。 + lastAssistant = "" + var reasoningStreamSeq int64 + var einoSubReplyStreamSeq int64 + toolEmitSeen := make(map[string]struct{}) + var einoMainRound int + var einoLastAgent string + subAgentToolStep := make(map[string]int) + + runner := adk.NewRunner(ctx, adk.RunnerConfig{ + Agent: da, + EnableStreaming: true, + }) + iter := runner.Run(ctx, msgs) + + for { + ev, ok := iter.Next() + if !ok { + lastRunMsgs = msgs + break attemptLoop + } + if ev == nil { + continue + } + if ev.Err != nil { + if isRecoverableToolCallArgumentsJSONError(ev.Err) && attempt+1 < maxToolCallArgumentsJSONAttempts { if logger != nil { - logger.Warn("eino stream recv", zap.Error(rerr)) + logger.Warn("eino: recoverable tool-call JSON error from model/API", zap.Error(ev.Err), zap.Int("attempt", attempt)) } - break + continue attemptLoop } - if chunk == nil { - continue - } - if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if reasoningStreamID == "" { - reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) - progress("thinking_stream_start", " ", map[string]interface{}{ - "streamId": reasoningStreamID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - }) - } - progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ - "streamId": reasoningStreamID, + if progress != nil { + progress("error", ev.Err.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", }) } - if chunk.Content != "" { - if progress != nil && streamsMainAssistant(ev.AgentName) { - if !streamHeaderSent { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - }) - streamHeaderSent = true - } - progress("response_delta", chunk.Content, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - }) - mainAssistantBuf.WriteString(chunk.Content) - } else if !streamsMainAssistant(ev.AgentName) { - if progress != nil { - if subReplyStreamID == "" { - subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) - progress("eino_agent_reply_stream_start", "", map[string]interface{}{ - "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "conversationId": conversationID, - "source": "eino", - }) - } - progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{ - "streamId": subReplyStreamID, - "conversationId": conversationID, - }) - } - subAssistantBuf.WriteString(chunk.Content) - } - } - // 收集流式 tool_calls 全部分片;arguments 在最后一帧常为 "",需按 index/id 合并后才能展示 subagent_type/description。 - if len(chunk.ToolCalls) > 0 { - toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) - } + return nil, ev.Err } - if streamsMainAssistant(ev.AgentName) { - if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { - lastAssistant = s - } - } - if subAssistantBuf.Len() > 0 && progress != nil { - if s := strings.TrimSpace(subAssistantBuf.String()); s != "" { - if subReplyStreamID != "" { - progress("eino_agent_reply_stream_end", s, map[string]interface{}{ - "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "conversationId": conversationID, - "source": "eino", - }) - } else { - progress("eino_agent_reply", s, map[string]interface{}{ + if ev.AgentName != "" && progress != nil { + if streamsMainAssistant(ev.AgentName) { + if einoMainRound == 0 { + einoMainRound = 1 + progress("iteration", "", map[string]interface{}{ + "iteration": 1, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": orchestratorName, "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "source": "eino", + "source": "eino", + }) + } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { + einoMainRound++ + progress("iteration", "", map[string]interface{}{ + "iteration": einoMainRound, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": orchestratorName, + "conversationId": conversationID, + "source": "eino", }) } } - } - var lastToolChunk *schema.Message - if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { - lastToolChunk = &schema.Message{ToolCalls: merged} - } - tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) - continue - } - - msg, gerr := mv.GetMessage() - if gerr != nil || msg == nil { - continue - } - tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) - - if mv.Role == schema.Assistant { - if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { - progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{ + einoLastAgent = ev.AgentName + progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{ "conversationId": conversationID, - "source": "eino", "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), }) } - body := strings.TrimSpace(msg.Content) - if body != "" { - if streamsMainAssistant(ev.AgentName) { - if progress != nil { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - }) - progress("response_delta", body, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", + if ev.Output == nil || ev.Output.MessageOutput == nil { + continue + } + mv := ev.Output.MessageOutput + + if mv.IsStreaming && mv.MessageStream != nil { + streamHeaderSent := false + var reasoningStreamID string + var toolStreamFragments []schema.ToolCall + var subAssistantBuf strings.Builder + var subReplyStreamID string + var mainAssistantBuf strings.Builder + for { + chunk, rerr := mv.MessageStream.Recv() + if rerr != nil { + if errors.Is(rerr, io.EOF) { + break + } + if logger != nil { + logger.Warn("eino stream recv", zap.Error(rerr)) + } + break + } + if chunk == nil { + continue + } + if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { + if reasoningStreamID == "" { + reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) + progress("thinking_stream_start", " ", map[string]interface{}{ + "streamId": reasoningStreamID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + }) + } + progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ + "streamId": reasoningStreamID, }) } - lastAssistant = body - } else if progress != nil { - progress("eino_agent_reply", body, map[string]interface{}{ + if chunk.Content != "" { + if progress != nil && streamsMainAssistant(ev.AgentName) { + if !streamHeaderSent { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + }) + streamHeaderSent = true + } + progress("response_delta", chunk.Content, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + }) + mainAssistantBuf.WriteString(chunk.Content) + } else if !streamsMainAssistant(ev.AgentName) { + if progress != nil { + if subReplyStreamID == "" { + subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) + progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } + progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{ + "streamId": subReplyStreamID, + "conversationId": conversationID, + }) + } + subAssistantBuf.WriteString(chunk.Content) + } + } + // 收集流式 tool_calls 全部分片;arguments 在最后一帧常为 "",需按 index/id 合并后才能展示 subagent_type/description。 + if len(chunk.ToolCalls) > 0 { + toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) + } + } + if streamsMainAssistant(ev.AgentName) { + if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { + lastAssistant = s + } + } + if subAssistantBuf.Len() > 0 && progress != nil { + if s := strings.TrimSpace(subAssistantBuf.String()); s != "" { + if subReplyStreamID != "" { + progress("eino_agent_reply_stream_end", s, map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } else { + progress("eino_agent_reply", s, map[string]interface{}{ + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "source": "eino", + }) + } + } + } + var lastToolChunk *schema.Message + if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { + lastToolChunk = &schema.Message{ToolCalls: merged} + } + tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) + continue + } + + msg, gerr := mv.GetMessage() + if gerr != nil || msg == nil { + continue + } + tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep) + + if mv.Role == schema.Assistant { + if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { + progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{ "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": "sub", "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), }) } - } - } - - if mv.Role == schema.Tool && progress != nil { - toolName := msg.ToolName - if toolName == "" { - toolName = mv.ToolName + body := strings.TrimSpace(msg.Content) + if body != "" { + if streamsMainAssistant(ev.AgentName) { + if progress != nil { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + }) + progress("response_delta", body, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + }) + } + lastAssistant = body + } else if progress != nil { + progress("eino_agent_reply", body, map[string]interface{}{ + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "source": "eino", + }) + } + } } - // bridge 工具在 res.IsError=true 时会返回带前缀的内容;这里解析为 success/isError,避免前端误判为成功。 - content := msg.Content - isErr := false - if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { - isErr = true - content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) - } + if mv.Role == schema.Tool && progress != nil { + toolName := msg.ToolName + if toolName == "" { + toolName = mv.ToolName + } - preview := content - if len(preview) > 200 { - preview = preview[:200] + "..." + // bridge 工具在 res.IsError=true 时会返回带前缀的内容;这里解析为 success/isError,避免前端误判为成功。 + content := msg.Content + isErr := false + if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { + isErr = true + content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) + } + + preview := content + if len(preview) > 200 { + preview = preview[:200] + "..." + } + data := map[string]interface{}{ + "toolName": toolName, + "success": !isErr, + "isError": isErr, + "result": content, + "resultPreview": preview, + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "source": "eino", + } + if msg.ToolCallID != "" { + data["toolCallId"] = msg.ToolCallID + } + progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } - data := map[string]interface{}{ - "toolName": toolName, - "success": !isErr, - "isError": isErr, - "result": content, - "resultPreview": preview, - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "source": "eino", - } - if msg.ToolCallID != "" { - data["toolCallId"] = msg.ToolCallID - } - progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } } @@ -559,7 +601,7 @@ func RunDeepAgent( ids := append([]string(nil), mcpIDs...) mcpIDsMu.Unlock() - histJSON, _ := json.Marshal(msgs) + histJSON, _ := json.Marshal(lastRunMsgs) cleaned := strings.TrimSpace(lastAssistant) cleaned = dedupeRepeatedParagraphs(cleaned, 80) cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) diff --git a/internal/multiagent/tool_args_json_retry.go b/internal/multiagent/tool_args_json_retry.go new file mode 100644 index 00000000..119797e6 --- /dev/null +++ b/internal/multiagent/tool_args_json_retry.go @@ -0,0 +1,50 @@ +package multiagent + +import ( + "fmt" + "strings" + + "github.com/cloudwego/eino/schema" +) + +// maxToolCallArgumentsJSONAttempts 含首次运行:首次 + 自动重试次数。 +// 例如为 3 表示最多共 3 次完整 DeepAgent 运行(2 次失败后各追加一条纠错提示)。 +const maxToolCallArgumentsJSONAttempts = 3 + +// toolCallArgumentsJSONRetryHint 追加在用户消息后,提示模型输出合法 JSON 工具参数(部分云厂商会在流式阶段校验 arguments)。 +func toolCallArgumentsJSONRetryHint() *schema.Message { + return schema.UserMessage(`[系统提示] 上一次输出中,工具调用的 function.arguments 不是合法 JSON,接口已拒绝。请重新生成:每个 tool call 的 arguments 必须是完整、可解析的 JSON 对象字符串(键名用双引号,无多余逗号,括号配对)。不要输出截断或不完整的 JSON。 + +[System] Your previous tool call used invalid JSON in function.arguments and was rejected by the API. Regenerate with strictly valid JSON objects only (double-quoted keys, matched braces, no trailing commas).`) +} + +// toolCallArgumentsJSONRecoveryTimelineMessage 供 eino_recovery 事件落库与前端时间线展示。 +func toolCallArgumentsJSONRecoveryTimelineMessage(attempt int) string { + return fmt.Sprintf( + "接口拒绝了无效的工具参数 JSON。已向对话追加系统提示并要求模型重新生成合法的 function.arguments。"+ + "当前为第 %d/%d 轮完整运行。\n\n"+ + "The API rejected invalid JSON in tool arguments. A system hint was appended. This is full run %d of %d.", + attempt+1, maxToolCallArgumentsJSONAttempts, attempt+1, maxToolCallArgumentsJSONAttempts, + ) +} + +// isRecoverableToolCallArgumentsJSONError 判断是否为「工具参数非合法 JSON」类流式错误,可通过追加提示后重跑一轮。 +func isRecoverableToolCallArgumentsJSONError(err error) bool { + if err == nil { + return false + } + s := strings.ToLower(err.Error()) + if !strings.Contains(s, "json") { + return false + } + if strings.Contains(s, "function.arguments") || strings.Contains(s, "function arguments") { + return true + } + if strings.Contains(s, "invalidparameter") && strings.Contains(s, "json") { + return true + } + if strings.Contains(s, "must be in json format") { + return true + } + return false +} diff --git a/internal/multiagent/tool_args_json_retry_test.go b/internal/multiagent/tool_args_json_retry_test.go new file mode 100644 index 00000000..41264eb0 --- /dev/null +++ b/internal/multiagent/tool_args_json_retry_test.go @@ -0,0 +1,17 @@ +package multiagent + +import ( + "errors" + "testing" +) + +func TestIsRecoverableToolCallArgumentsJSONError(t *testing.T) { + yes := errors.New(`failed to receive stream chunk: error, <400> InternalError.Algo.InvalidParameter: The "function.arguments" parameter of the code model must be in JSON format.`) + if !isRecoverableToolCallArgumentsJSONError(yes) { + t.Fatal("expected recoverable for function.arguments + JSON") + } + no := errors.New("unrelated network failure") + if isRecoverableToolCallArgumentsJSONError(no) { + t.Fatal("expected not recoverable") + } +}