From e05b0089032989a4cf7bad3b7e729d7597008df8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 11 Jun 2026 00:38:00 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 35 +++++++++++++++++++++++- internal/multiagent/eino_adk_run_loop.go | 15 ++++++++++ 2 files changed, 49 insertions(+), 1 deletion(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index f2719c14..b680853f 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -101,7 +101,40 @@ func sameResponseStreamMeta(a, b map[string]interface{}) bool { } orchA, _ := a["orchestration"].(string) orchB, _ := b["orchestration"].(string) - return strings.TrimSpace(orchA) == strings.TrimSpace(orchB) + if strings.TrimSpace(orchA) != strings.TrimSpace(orchB) { + return false + } + iterA := responseStreamIterationFromMeta(a) + iterB := responseStreamIterationFromMeta(b) + if iterA != 0 && iterB != 0 && iterA != iterB { + return false + } + streamA, _ := a["streamId"].(string) + streamB, _ := b["streamId"].(string) + streamA = strings.TrimSpace(streamA) + streamB = strings.TrimSpace(streamB) + if streamA != "" && streamB != "" && streamA != streamB { + return false + } + return true +} + +func responseStreamIterationFromMeta(m map[string]interface{}) int { + if m == nil { + return 0 + } + switch v := m["iteration"].(type) { + case int: + return v + case int32: + return int(v) + case int64: + return int(v) + case float64: + return int(v) + default: + return 0 + } } func discardPlanningIfEchoesToolResult(respPlan *responsePlanAgg, toolData interface{}) { diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 6f6629a1..eb0a7f1a 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -176,6 +176,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs lastPlanExecuteExecutor = "" var reasoningStreamSeq int64 var einoSubReplyStreamSeq int64 + var mainResponseStreamSeq int64 toolEmitSeen := make(map[string]struct{}) var einoMainRound int var einoLastAgent string @@ -632,6 +633,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mv := ev.Output.MessageOutput if mv.IsStreaming && mv.MessageStream != nil { + mainStreamID := fmt.Sprintf("eino-main-%s-%d", conversationID, atomic.AddInt64(&mainResponseStreamSeq, 1)) streamHeaderSent := false var reasoningStreamID string var toolStreamFragments []schema.ToolCall @@ -738,6 +740,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": mainStreamID, }) streamHeaderSent = true } @@ -747,6 +751,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": mainStreamID, }, mainAssistantBuf)) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, contentDelta) } @@ -806,6 +812,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": mainStreamID, }) } progress("response_delta", eofTail, openai.WithSSEAccumulated(map[string]interface{}{ @@ -814,6 +822,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": mainStreamID, }, mainAssistantBuf)) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, eofTail) } @@ -916,6 +926,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } executeStdoutDupMu.Unlock() if progress != nil { + nonStreamID := fmt.Sprintf("eino-main-%s-%d", conversationID, atomic.AddInt64(&mainResponseStreamSeq, 1)) progress("response_start", "", map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), @@ -923,6 +934,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": nonStreamID, }) progress("response_delta", body, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, @@ -930,6 +943,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, + "iteration": einoMainRound, + "streamId": nonStreamID, }, body)) } lastAssistant = body