diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index b91d3b8f..4f34c904 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -184,14 +184,19 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mainAgentToolStep := make(map[string]int) pendingByID := make(map[string]toolCallPendingInfo) pendingQueueByAgent := make(map[string][]string) + var pendingMu sync.Mutex markPending := func(tc toolCallPendingInfo) { if tc.ToolCallID == "" { return } + pendingMu.Lock() + defer pendingMu.Unlock() pendingByID[tc.ToolCallID] = tc pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) } popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { + pendingMu.Lock() + defer pendingMu.Unlock() q := pendingQueueByAgent[agentName] for len(q) > 0 { id := q[0] @@ -208,19 +213,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if toolCallID == "" { return } + pendingMu.Lock() + defer pendingMu.Unlock() delete(pendingByID, toolCallID) } + popAnyPending := func() (toolCallPendingInfo, bool) { + pendingMu.Lock() + defer pendingMu.Unlock() + for id, tc := range pendingByID { + delete(pendingByID, id) + return tc, true + } + return toolCallPendingInfo{}, false + } + pendingCount := func() int { + pendingMu.Lock() + defer pendingMu.Unlock() + return len(pendingByID) + } flushAllPendingAsFailed := func(err error) { + pendingMu.Lock() + pendingSnapshot := make([]toolCallPendingInfo, 0, len(pendingByID)) + for _, tc := range pendingByID { + pendingSnapshot = append(pendingSnapshot, tc) + } + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + pendingMu.Unlock() + if progress == nil { - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) return } msg := "" if err != nil { msg = err.Error() } - for _, tc := range pendingByID { + for _, tc := range pendingSnapshot { toolName := tc.ToolName if strings.TrimSpace(toolName) == "" { toolName = "unknown" @@ -238,8 +266,6 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "source": "eino", }) } - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) } // 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。 @@ -519,8 +545,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } return takePartial(ctxErr) } - if len(pendingByID) > 0 { - orphanCount := len(pendingByID) + if orphanCount := pendingCount(); orphanCount > 0 { flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion")) if progress != nil { progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{ @@ -957,12 +982,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs toolCallID = inferred.ToolCallID } else if inferred, ok := popNextPendingForAgent(""); ok { toolCallID = inferred.ToolCallID - } else { - for id := range pendingByID { - toolCallID = id - delete(pendingByID, id) - break - } + } else if inferred, ok := popAnyPending(); ok { + toolCallID = inferred.ToolCallID } } if toolCallID != "" {