Add files via upload

This commit is contained in:
公明
2026-05-08 22:32:21 +08:00
committed by GitHub
parent a1ceb9c108
commit 0743086873
+90 -45
View File
@@ -19,6 +19,40 @@ import (
"go.uber.org/zap"
)
// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现“结巴”重复。
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
if incoming == "" {
return current, ""
}
if current == "" {
return incoming, incoming
}
if incoming == current {
return current, ""
}
// incoming 是累计全文(包含 current 前缀)
if strings.HasPrefix(incoming, current) {
return incoming, incoming[len(current):]
}
// incoming 完全是已输出尾部重发
if strings.HasSuffix(current, incoming) {
return current, ""
}
// 处理边界重叠:current 后缀与 incoming 前缀重叠,只追加非重叠部分。
max := len(current)
if len(incoming) < max {
max = len(incoming)
}
for overlap := max; overlap > 0; overlap-- {
if current[len(current)-overlap:] == incoming[:overlap] {
return current + incoming[overlap:], incoming[overlap:]
}
}
return current + incoming, incoming
}
func isEinoIterationLimitError(err error) bool {
if err == nil {
return false
@@ -430,9 +464,10 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
streamHeaderSent := false
var reasoningStreamID string
var toolStreamFragments []schema.ToolCall
var subAssistantBuf strings.Builder
var subAssistantBuf string
var subReplyStreamID string
var mainAssistantBuf strings.Builder
var mainAssistantBuf string
var reasoningBuf string
var streamRecvErr error
for {
chunk, rerr := mv.MessageStream.Recv()
@@ -453,59 +488,69 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if reasoningStreamID == "" {
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
progress("thinking_stream_start", " ", map[string]interface{}{
"streamId": reasoningStreamID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
var reasoningDelta string
reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent)
if reasoningDelta != "" {
if reasoningStreamID == "" {
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
progress("thinking_stream_start", " ", map[string]interface{}{
"streamId": reasoningStreamID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
})
}
progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{
"streamId": reasoningStreamID,
})
}
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
"streamId": reasoningStreamID,
})
}
if chunk.Content != "" {
if progress != nil && streamsMainAssistant(ev.AgentName) {
if !streamHeaderSent {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
var contentDelta string
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
if contentDelta != "" {
if !streamHeaderSent {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
streamHeaderSent = true
}
progress("response_delta", contentDelta, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
streamHeaderSent = true
}
progress("response_delta", chunk.Content, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
mainAssistantBuf.WriteString(chunk.Content)
} else if !streamsMainAssistant(ev.AgentName) {
if progress != nil {
if subReplyStreamID == "" {
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
var subDelta string
subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content)
if subDelta != "" {
if progress != nil {
if subReplyStreamID == "" {
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
}
progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
}
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
"streamId": subReplyStreamID,
"conversationId": conversationID,
})
}
subAssistantBuf.WriteString(chunk.Content)
}
}
if len(chunk.ToolCalls) > 0 {
@@ -513,7 +558,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
if streamsMainAssistant(ev.AgentName) {
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
if s := strings.TrimSpace(mainAssistantBuf); s != "" {
lastAssistant = s
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
@@ -521,8 +566,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
}
if subAssistantBuf.Len() > 0 && progress != nil {
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
if strings.TrimSpace(subAssistantBuf) != "" && progress != nil {
if s := strings.TrimSpace(subAssistantBuf); s != "" {
if subReplyStreamID != "" {
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
"streamId": subReplyStreamID,