diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index ff4f752c..f3ad7dd7 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -19,6 +19,40 @@ import ( "go.uber.org/zap" ) +// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。 +// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现“结巴”重复。 +func normalizeStreamingDelta(current, incoming string) (next, delta string) { + if incoming == "" { + return current, "" + } + if current == "" { + return incoming, incoming + } + if incoming == current { + return current, "" + } + // incoming 是累计全文(包含 current 前缀) + if strings.HasPrefix(incoming, current) { + return incoming, incoming[len(current):] + } + // incoming 完全是已输出尾部重发 + if strings.HasSuffix(current, incoming) { + return current, "" + } + + // 处理边界重叠:current 后缀与 incoming 前缀重叠,只追加非重叠部分。 + max := len(current) + if len(incoming) < max { + max = len(incoming) + } + for overlap := max; overlap > 0; overlap-- { + if current[len(current)-overlap:] == incoming[:overlap] { + return current + incoming[overlap:], incoming[overlap:] + } + } + return current + incoming, incoming +} + func isEinoIterationLimitError(err error) bool { if err == nil { return false @@ -430,9 +464,10 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs streamHeaderSent := false var reasoningStreamID string var toolStreamFragments []schema.ToolCall - var subAssistantBuf strings.Builder + var subAssistantBuf string var subReplyStreamID string - var mainAssistantBuf strings.Builder + var mainAssistantBuf string + var reasoningBuf string var streamRecvErr error for { chunk, rerr := mv.MessageStream.Recv() @@ -453,59 +488,69 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs continue } if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if reasoningStreamID == "" { - reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) - progress("thinking_stream_start", " ", map[string]interface{}{ - "streamId": reasoningStreamID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "orchestration": orchMode, + var reasoningDelta string + reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent) + if reasoningDelta != "" { + if reasoningStreamID == "" { + reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) + progress("thinking_stream_start", " ", map[string]interface{}{ + "streamId": reasoningStreamID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, + }) + } + progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{ + "streamId": reasoningStreamID, }) } - progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ - "streamId": reasoningStreamID, - }) } if chunk.Content != "" { if progress != nil && streamsMainAssistant(ev.AgentName) { - if !streamHeaderSent { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, + var contentDelta string + mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content) + if contentDelta != "" { + if !streamHeaderSent { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + streamHeaderSent = true + } + progress("response_delta", contentDelta, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) - streamHeaderSent = true } - progress("response_delta", chunk.Content, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - mainAssistantBuf.WriteString(chunk.Content) } else if !streamsMainAssistant(ev.AgentName) { - if progress != nil { - if subReplyStreamID == "" { - subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) - progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + var subDelta string + subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content) + if subDelta != "" { + if progress != nil { + if subReplyStreamID == "" { + subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) + progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } + progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{ "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", "conversationId": conversationID, - "source": "eino", }) } - progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{ - "streamId": subReplyStreamID, - "conversationId": conversationID, - }) } - subAssistantBuf.WriteString(chunk.Content) } } if len(chunk.ToolCalls) > 0 { @@ -513,7 +558,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } if streamsMainAssistant(ev.AgentName) { - if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { + if s := strings.TrimSpace(mainAssistantBuf); s != "" { lastAssistant = s runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { @@ -521,8 +566,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } } - if subAssistantBuf.Len() > 0 && progress != nil { - if s := strings.TrimSpace(subAssistantBuf.String()); s != "" { + if strings.TrimSpace(subAssistantBuf) != "" && progress != nil { + if s := strings.TrimSpace(subAssistantBuf); s != "" { if subReplyStreamID != "" { progress("eino_agent_reply_stream_end", s, map[string]interface{}{ "streamId": subReplyStreamID,