From ea2184773e9abcd9510ccd72fa28ac787b18458b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 28 May 2026 11:53:33 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 47 +++++++----------------- 1 file changed, 13 insertions(+), 34 deletions(-) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 4f34c904..b91d3b8f 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -184,19 +184,14 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mainAgentToolStep := make(map[string]int) pendingByID := make(map[string]toolCallPendingInfo) pendingQueueByAgent := make(map[string][]string) - var pendingMu sync.Mutex markPending := func(tc toolCallPendingInfo) { if tc.ToolCallID == "" { return } - pendingMu.Lock() - defer pendingMu.Unlock() pendingByID[tc.ToolCallID] = tc pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) } popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { - pendingMu.Lock() - defer pendingMu.Unlock() q := pendingQueueByAgent[agentName] for len(q) > 0 { id := q[0] @@ -213,42 +208,19 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if toolCallID == "" { return } - pendingMu.Lock() - defer pendingMu.Unlock() delete(pendingByID, toolCallID) } - popAnyPending := func() (toolCallPendingInfo, bool) { - pendingMu.Lock() - defer pendingMu.Unlock() - for id, tc := range pendingByID { - delete(pendingByID, id) - return tc, true - } - return toolCallPendingInfo{}, false - } - pendingCount := func() int { - pendingMu.Lock() - defer pendingMu.Unlock() - return len(pendingByID) - } flushAllPendingAsFailed := func(err error) { - pendingMu.Lock() - pendingSnapshot := make([]toolCallPendingInfo, 0, len(pendingByID)) - for _, tc := range pendingByID { - pendingSnapshot = append(pendingSnapshot, tc) - } - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) - pendingMu.Unlock() - if progress == nil { + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) return } msg := "" if err != nil { msg = err.Error() } - for _, tc := range pendingSnapshot { + for _, tc := range pendingByID { toolName := tc.ToolName if strings.TrimSpace(toolName) == "" { toolName = "unknown" @@ -266,6 +238,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "source": "eino", }) } + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) } // 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。 @@ -545,7 +519,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } return takePartial(ctxErr) } - if orphanCount := pendingCount(); orphanCount > 0 { + if len(pendingByID) > 0 { + orphanCount := len(pendingByID) flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion")) if progress != nil { progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{ @@ -982,8 +957,12 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs toolCallID = inferred.ToolCallID } else if inferred, ok := popNextPendingForAgent(""); ok { toolCallID = inferred.ToolCallID - } else if inferred, ok := popAnyPending(); ok { - toolCallID = inferred.ToolCallID + } else { + for id := range pendingByID { + toolCallID = id + delete(pendingByID, id) + break + } } } if toolCallID != "" {