From 7b3860971f910ae9e95cac55b5d12f787da503da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Tue, 19 May 2026 17:42:12 +0800 Subject: [PATCH] Add files via upload --- internal/agent/agent.go | 40 +++++++++++++++++++----- internal/multiagent/eino_adk_run_loop.go | 20 ++++++------ 2 files changed, 42 insertions(+), 18 deletions(-) diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 95cca1fb..b72106fa 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -598,11 +598,17 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his thinkingStreamSeq++ thinkingStreamId := fmt.Sprintf("thinking-stream-%s-%d-%d", conversationID, i+1, thinkingStreamSeq) thinkingStreamStarted := false + var thinkingWire string response, err := a.callOpenAIStreamWithToolCalls(ctx, messages, tools, func(delta string) error { if delta == "" { return nil } + var deltaOut string + thinkingWire, deltaOut = openai.NormalizeStreamingDelta(thinkingWire, delta) + if deltaOut == "" { + return nil + } if !thinkingStreamStarted { thinkingStreamStarted = true sendProgress("thinking_stream_start", " ", map[string]interface{}{ @@ -611,10 +617,10 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his "toolStream": false, }) } - sendProgress("thinking_stream_delta", delta, map[string]interface{}{ + sendProgress("thinking_stream_delta", deltaOut, openai.WithSSEAccumulated(map[string]interface{}{ "streamId": thinkingStreamId, "iteration": i + 1, - }) + }, thinkingWire)) return nil }) if err != nil { @@ -827,10 +833,16 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his "mcpExecutionIds": result.MCPExecutionIDs, "messageGeneratedBy": "summary", }) + var summaryWire string streamText, _ := a.callOpenAIStreamText(ctx, messages, []Tool{}, func(delta string) error { - sendProgress("response_delta", delta, map[string]interface{}{ + var deltaOut string + summaryWire, deltaOut = openai.NormalizeStreamingDelta(summaryWire, delta) + if deltaOut == "" { + return nil + } + sendProgress("response_delta", deltaOut, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, - }) + }, summaryWire)) return nil }) if strings.TrimSpace(streamText) != "" { @@ -874,10 +886,16 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his "mcpExecutionIds": result.MCPExecutionIDs, "messageGeneratedBy": "summary", }) + var summaryWire string streamText, _ := a.callOpenAIStreamText(ctx, messages, []Tool{}, func(delta string) error { - sendProgress("response_delta", delta, map[string]interface{}{ + var deltaOut string + summaryWire, deltaOut = openai.NormalizeStreamingDelta(summaryWire, delta) + if deltaOut == "" { + return nil + } + sendProgress("response_delta", deltaOut, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, - }) + }, summaryWire)) return nil }) if strings.TrimSpace(streamText) != "" { @@ -921,10 +939,16 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his "mcpExecutionIds": result.MCPExecutionIDs, "messageGeneratedBy": "max_iter_summary", }) + var summaryWire string streamText, _ := a.callOpenAIStreamText(ctx, messages, []Tool{}, func(delta string) error { - sendProgress("response_delta", delta, map[string]interface{}{ + var deltaOut string + summaryWire, deltaOut = openai.NormalizeStreamingDelta(summaryWire, delta) + if deltaOut == "" { + return nil + } + sendProgress("response_delta", deltaOut, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, - }) + }, summaryWire)) return nil }) if strings.TrimSpace(streamText) != "" { diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index f91f993e..37d2b326 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -657,9 +657,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "orchestration": orchMode, }) } - progress("reasoning_chain_stream_delta", displayDelta, map[string]interface{}{ + progress("reasoning_chain_stream_delta", displayDelta, openai.WithSSEAccumulated(map[string]interface{}{ "streamId": reasoningStreamID, - }) + }, fullDisplay)) } } } @@ -689,13 +689,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs }) streamHeaderSent = true } - progress("response_delta", contentDelta, map[string]interface{}{ + progress("response_delta", contentDelta, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, - }) + }, mainAssistantBuf)) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, contentDelta) } } @@ -714,10 +714,10 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "source": "eino", }) } - progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{ + progress("eino_agent_reply_stream_delta", subDelta, openai.WithSSEAccumulated(map[string]interface{}{ "streamId": subReplyStreamID, "conversationId": conversationID, - }) + }, subAssistantBuf)) } } } @@ -756,13 +756,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "orchestration": orchMode, }) } - progress("response_delta", eofTail, map[string]interface{}{ + progress("response_delta", eofTail, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, - }) + }, mainAssistantBuf)) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, eofTail) } } @@ -872,13 +872,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "einoAgent": ev.AgentName, "orchestration": orchMode, }) - progress("response_delta", body, map[string]interface{}{ + progress("response_delta", body, openai.WithSSEAccumulated(map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, - }) + }, body)) } lastAssistant = body if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {