package multiagent import ( "context" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strings" "sync" "sync/atomic" "unicode/utf8" "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/config" "cyberstrike-ai/internal/einomcp" "cyberstrike-ai/internal/einoobserve" "cyberstrike-ai/internal/openai" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/schema" "go.uber.org/zap" ) // normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。 // 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现重复文本。 // // 注意:与 internal/openai.normalizeStreamingDelta 保持一致。 func normalizeStreamingDelta(current, incoming string) (next, delta string) { if incoming == "" { return current, "" } if current == "" { return incoming, incoming } if strings.HasPrefix(incoming, current) && len(incoming) > len(current) { return incoming, incoming[len(current):] } if incoming == current && utf8.RuneCountInString(current) > 1 { return current, "" } return current + incoming, incoming } func isInterruptContinue(ctx context.Context) bool { if ctx == nil { return false } return errors.Is(context.Cause(ctx), ErrInterruptContinue) } func isEinoIterationLimitError(err error) bool { if err == nil { return false } msg := strings.ToLower(strings.TrimSpace(err.Error())) if msg == "" { return false } return strings.Contains(msg, "max iteration") || strings.Contains(msg, "maximum iteration") || strings.Contains(msg, "maximum iterations") || strings.Contains(msg, "iteration limit") || strings.Contains(msg, "达到最大迭代") } // einoADKRunLoopArgs 将 Eino adk.Runner 事件循环从 RunDeepAgent / RunEinoSingleChatModelAgent 中抽出复用。 type einoADKRunLoopArgs struct { OrchMode string OrchestratorName string ConversationID string Progress func(eventType, message string, data interface{}) Logger *zap.Logger SnapshotMCPIDs func() []string StreamsMainAssistant func(agent string) bool EinoRoleTag func(agent string) string CheckpointDir string McpIDsMu *sync.Mutex McpIDs *[]string // FilesystemMonitorAgent / FilesystemMonitorRecord 非 nil 时,将 Eino ADK filesystem 中间件工具(ls/read_file/write_file/edit_file/glob/grep) // 在完成时写入 MCP 监控;execute 仍由 eino_execute_monitor 记录,此处跳过。 FilesystemMonitorAgent *agent.Agent FilesystemMonitorRecord einomcp.ExecutionRecorder // ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 Set,MCP 桥 Fire 以补全 tool_result。 ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder DA adk.Agent // EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。 EmptyResponseMessage string // ModelFacingTrace 可选:由各 ChatModelAgent Handlers 链末尾中间件写入「即将送入模型」的消息快照; // 非空时优先用于 LastAgentTraceInput 序列化,使续跑与 summarization/reduction 后的上下文一致。 ModelFacingTrace *modelFacingTraceHolder // EinoCallbacks 可选:为 ADK Runner 注入 eino [callbacks] 全链路观测(见 internal/einoobserve)。 EinoCallbacks *config.MultiAgentEinoCallbacksConfig } func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) { if args == nil || args.DA == nil { return nil, fmt.Errorf("eino run loop: args 或 Agent 为空") } if args.McpIDs == nil { s := []string{} args.McpIDs = &s } if args.McpIDsMu == nil { args.McpIDsMu = &sync.Mutex{} } orchMode := args.OrchMode orchestratorName := args.OrchestratorName conversationID := args.ConversationID progress := args.Progress logger := args.Logger snapshotMCPIDs := args.SnapshotMCPIDs if snapshotMCPIDs == nil { snapshotMCPIDs = func() []string { return nil } } streamsMainAssistant := args.StreamsMainAssistant if streamsMainAssistant == nil { streamsMainAssistant = func(agent string) bool { return agent == "" || agent == orchestratorName } } einoRoleTag := args.EinoRoleTag if einoRoleTag == nil { einoRoleTag = func(agent string) string { if streamsMainAssistant(agent) { return "orchestrator" } return "sub" } } da := args.DA mcpIDsMu := args.McpIDsMu mcpIDs := args.McpIDs // panic recovery:防止 Eino 框架内部 panic 导致整个 goroutine 崩溃、连接无法正常关闭。 defer func() { if r := recover(); r != nil { if logger != nil { logger.Error("eino runner panic recovered", zap.Any("recover", r), zap.Stack("stack")) } if progress != nil { progress("error", fmt.Sprintf("Internal error: %v / 内部错误: %v", r, r), map[string]interface{}{ "conversationId": conversationID, "source": "eino", }) } } }() var lastAssistant string var lastPlanExecuteExecutor string msgs := append([]adk.Message(nil), baseMsgs...) runAccumulatedMsgs := append([]adk.Message(nil), msgs...) baseAccumulatedCount := len(runAccumulatedMsgs) emptyHint := strings.TrimSpace(args.EmptyResponseMessage) if emptyHint == "" { emptyHint = "(Eino session completed but no assistant text was captured. Check process details or logs.) " + "(Eino 会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" } lastAssistant = "" lastPlanExecuteExecutor = "" var reasoningStreamSeq int64 var einoSubReplyStreamSeq int64 toolEmitSeen := make(map[string]struct{}) var einoMainRound int var einoLastAgent string subAgentToolStep := make(map[string]int) pendingByID := make(map[string]toolCallPendingInfo) pendingQueueByAgent := make(map[string][]string) markPending := func(tc toolCallPendingInfo) { if tc.ToolCallID == "" { return } pendingByID[tc.ToolCallID] = tc pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) } popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { q := pendingQueueByAgent[agentName] for len(q) > 0 { id := q[0] q = q[1:] pendingQueueByAgent[agentName] = q if tc, ok := pendingByID[id]; ok { delete(pendingByID, id) return tc, true } } return toolCallPendingInfo{}, false } removePendingByID := func(toolCallID string) { if toolCallID == "" { return } delete(pendingByID, toolCallID) } flushAllPendingAsFailed := func(err error) { if progress == nil { pendingByID = make(map[string]toolCallPendingInfo) pendingQueueByAgent = make(map[string][]string) return } msg := "" if err != nil { msg = err.Error() } for _, tc := range pendingByID { toolName := tc.ToolName if strings.TrimSpace(toolName) == "" { toolName = "unknown" } progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ "toolName": toolName, "success": false, "isError": true, "result": msg, "resultPreview": msg, "toolCallId": tc.ToolCallID, "conversationId": conversationID, "einoAgent": tc.EinoAgent, "einoRole": tc.EinoRole, "source": "eino", }) } pendingByID = make(map[string]toolCallPendingInfo) pendingQueueByAgent = make(map[string][]string) } // 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。 var executeStdoutDupMu sync.Mutex var pendingExecuteStdoutDup string recordPendingExecuteStdoutDup := func(toolName, stdout string, isErr bool) { if isErr || !strings.EqualFold(strings.TrimSpace(toolName), "execute") { return } t := strings.TrimSpace(stdout) if t == "" { return } executeStdoutDupMu.Lock() pendingExecuteStdoutDup = t executeStdoutDupMu.Unlock() } var toolResultSent sync.Map // toolCallID -> struct{};与 ADK Tool 消息去重,避免 bridge 与事件流各推一次 if args.ToolInvokeNotify != nil { args.ToolInvokeNotify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) { tid := strings.TrimSpace(toolCallID) removePendingByID(tid) if tid == "" || progress == nil { return } if _, loaded := toolResultSent.LoadOrStore(tid, struct{}{}); loaded { return } isErr := !success || invokeErr != nil body := content if invokeErr != nil { // 保留已流式累计的 stdout(如 execute 超时前的一半输出),避免 tool_result 只剩错误串、模型与 UI 丢失上下文 tail := friendlyEinoExecuteInvokeTail(invokeErr) // execute 流式包装可能已把超时句写入 content(供 ADK tool 与流式 delta);勿重复拼接 if tail != "" && strings.Contains(content, tail) { body = content } else if strings.TrimSpace(content) != "" { body = strings.TrimRight(content, "\n") + "\n\n" + tail } else { body = tail } isErr = true } recordPendingExecuteStdoutDup(toolName, body, isErr) preview := body if len(preview) > 200 { preview = preview[:200] + "..." } agentTag := strings.TrimSpace(einoAgent) if agentTag == "" { agentTag = orchestratorName } progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ "toolName": toolName, "success": !isErr, "isError": isErr, "result": body, "resultPreview": preview, "toolCallId": tid, "conversationId": conversationID, "einoAgent": agentTag, "einoRole": einoRoleTag(agentTag), "source": "eino", }) }) } if args.EinoCallbacks != nil { ctx = einoobserve.AttachAgentRunCallbacks(ctx, args.EinoCallbacks, einoobserve.Params{ Logger: logger, Progress: progress, ConversationID: conversationID, OrchMode: orchMode, OrchestratorName: orchestratorName, }) } runnerCfg := adk.RunnerConfig{ Agent: da, EnableStreaming: true, } var cpStore *fileCheckPointStore var checkPointID string if cp := strings.TrimSpace(args.CheckpointDir); cp != "" { cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID)) st, stErr := newFileCheckPointStore(cpDir) if stErr != nil { if logger != nil { logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr)) } } else { cpStore = st checkPointID = buildEinoCheckpointID(orchMode) runnerCfg.CheckPointStore = st if logger != nil { logger.Info("eino runner: checkpoint store enabled", zap.String("dir", cpDir), zap.String("checkPointID", checkPointID)) } } } runner := adk.NewRunner(ctx, runnerCfg) var iter *adk.AsyncIterator[*adk.AgentEvent] if cpStore != nil && checkPointID != "" { if _, existed, getErr := cpStore.Get(ctx, checkPointID); getErr != nil { if logger != nil { logger.Warn("eino checkpoint preflight get failed", zap.String("checkPointID", checkPointID), zap.Error(getErr)) } } else if existed { if progress != nil { progress("progress", "检测到断点,正在从中断节点恢复执行...", map[string]interface{}{ "conversationId": conversationID, "source": "eino", "orchestration": orchMode, "checkPointID": checkPointID, }) } if logger != nil { logger.Info("eino runner: resume from checkpoint", zap.String("checkPointID", checkPointID)) } resumeIter, resumeErr := runner.Resume(ctx, checkPointID) if resumeErr == nil { iter = resumeIter } else { if logger != nil { logger.Warn("eino runner: resume failed, fallback to fresh run", zap.String("checkPointID", checkPointID), zap.Error(resumeErr)) } if progress != nil { progress("progress", "断点恢复失败,已回退为全新执行。", map[string]interface{}{ "conversationId": conversationID, "source": "eino", "orchestration": orchMode, "checkPointID": checkPointID, }) } } } } if iter == nil { if checkPointID != "" { iter = runner.Run(ctx, msgs, adk.WithCheckPointID(checkPointID)) } else { iter = runner.Run(ctx, msgs) } } handleRunErr := func(runErr error) error { if runErr == nil { return nil } if errors.Is(runErr, context.DeadlineExceeded) { flushAllPendingAsFailed(runErr) if progress != nil { progress("error", runErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "errorKind": "timeout", }) } return runErr } // context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。 if errors.Is(runErr, context.Canceled) { flushAllPendingAsFailed(runErr) if progress != nil { progress("error", runErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", }) } return runErr } if isEinoIterationLimitError(runErr) { flushAllPendingAsFailed(runErr) if progress != nil { progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "orchestration": orchMode, }) progress("error", runErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "errorKind": "iteration_limit", }) } return runErr } flushAllPendingAsFailed(runErr) if progress != nil { progress("error", runErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", }) } return runErr } takePartial := func(runErr error) (*RunResult, error) { if len(runAccumulatedMsgs) <= baseAccumulatedCount { return nil, runErr } ids := snapshotMCPIDs() return buildEinoRunResultFromAccumulated( orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, true, ), runErr } for { // 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。 select { case <-ctx.Done(): flushAllPendingAsFailed(ctx.Err()) if progress != nil { if isInterruptContinue(ctx) { progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{ "conversationId": conversationID, "source": "eino", "kind": "interrupt_continue", }) } else { progress("error", "Request cancelled / 请求已取消", map[string]interface{}{ "conversationId": conversationID, "source": "eino", }) } } return takePartial(ctx.Err()) default: } ev, ok := iter.Next() if !ok { // iter 结束并不总是“正常完成”: // 当取消/超时发生在 iter.Next() 阻塞期间时,可能直接返回 !ok。 // 此时必须保留 checkpoint,避免后续恢复时被误判为“无断点”而全量重跑。 if ctxErr := ctx.Err(); ctxErr != nil { flushAllPendingAsFailed(ctxErr) if progress != nil { if isInterruptContinue(ctx) { progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{ "conversationId": conversationID, "source": "eino", "kind": "interrupt_continue", }) } else { progress("error", ctxErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", }) } } return takePartial(ctxErr) } if len(pendingByID) > 0 { orphanCount := len(pendingByID) flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion")) if progress != nil { progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{ "conversationId": conversationID, "source": "eino", "orchestration": orchMode, "pendingCount": orphanCount, }) } } if cpStore != nil && checkPointID != "" { if p, pErr := cpStore.path(checkPointID); pErr == nil { if rmErr := os.Remove(p); rmErr != nil && !os.IsNotExist(rmErr) && logger != nil { logger.Warn("eino checkpoint cleanup failed", zap.String("path", p), zap.Error(rmErr)) } } } break } if ev == nil { continue } if ev.Err != nil { if retErr := handleRunErr(ev.Err); retErr != nil { return takePartial(retErr) } } if ev.AgentName != "" && progress != nil { iterEinoAgent := orchestratorName if orchMode == "plan_execute" { if a := strings.TrimSpace(ev.AgentName); a != "" { iterEinoAgent = a } } if streamsMainAssistant(ev.AgentName) { if einoMainRound == 0 { einoMainRound = 1 progress("iteration", "", map[string]interface{}{ "iteration": 1, "einoScope": "main", "einoRole": "orchestrator", "einoAgent": iterEinoAgent, "orchestration": orchMode, "conversationId": conversationID, "source": "eino", }) } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { einoMainRound++ progress("iteration", "", map[string]interface{}{ "iteration": einoMainRound, "einoScope": "main", "einoRole": "orchestrator", "einoAgent": iterEinoAgent, "orchestration": orchMode, "conversationId": conversationID, "source": "eino", }) } } einoLastAgent = ev.AgentName progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{ "conversationId": conversationID, "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), "orchestration": orchMode, }) } if ev.Output == nil || ev.Output.MessageOutput == nil { continue } mv := ev.Output.MessageOutput if mv.IsStreaming && mv.MessageStream != nil { streamHeaderSent := false var reasoningStreamID string var toolStreamFragments []schema.ToolCall var subAssistantBuf string var subReplyStreamID string var mainAssistantBuf string // 已通过 response_delta 推到前端的正文(与 monitor.js normalizeStreamingDeltaJs 累积一致) var mainAssistWireAccum string var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重 var reasoningBuf string var prevReasoningDisplay string // UI 用:剥离 Claude 内部 signature 尾缀后的累计展示 var streamRecvErr error type streamMsg struct { chunk *schema.Message err error } recvCh := make(chan streamMsg, 8) go func() { defer close(recvCh) for { ch, rerr := mv.MessageStream.Recv() recvCh <- streamMsg{chunk: ch, err: rerr} if rerr != nil { return } } }() streamRecvLoop: for { select { case <-ctx.Done(): streamRecvErr = ctx.Err() break streamRecvLoop case sm, ok := <-recvCh: if !ok { break streamRecvLoop } chunk, rerr := sm.chunk, sm.err if rerr != nil { if errors.Is(rerr, io.EOF) { break streamRecvLoop } if logger != nil { logger.Warn("eino stream recv error, flushing incomplete stream", zap.Error(rerr), zap.String("agent", ev.AgentName), zap.Int("toolFragments", len(toolStreamFragments))) } streamRecvErr = rerr break streamRecvLoop } if chunk == nil { continue } if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { var reasoningDelta string reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent) if reasoningDelta != "" { fullDisplay := openai.DisplayReasoningContent(reasoningBuf) var displayDelta string if strings.HasPrefix(fullDisplay, prevReasoningDisplay) { displayDelta = fullDisplay[len(prevReasoningDisplay):] } else { displayDelta = fullDisplay } prevReasoningDisplay = fullDisplay if displayDelta != "" { if reasoningStreamID == "" { reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) progress("reasoning_chain_stream_start", " ", map[string]interface{}{ "streamId": reasoningStreamID, "source": "eino", "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), "orchestration": orchMode, }) } progress("reasoning_chain_stream_delta", displayDelta, map[string]interface{}{ "streamId": reasoningStreamID, }) } } } if chunk.Content != "" { if progress != nil && streamsMainAssistant(ev.AgentName) { var contentDelta string mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content) if contentDelta != "" { if mainAssistDupTarget == "" { executeStdoutDupMu.Lock() if pendingExecuteStdoutDup != "" { mainAssistDupTarget = pendingExecuteStdoutDup } executeStdoutDupMu.Unlock() } if mainAssistDupTarget != "" { // 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流 } else { if !streamHeaderSent { progress("response_start", "", map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "messageGeneratedBy": "eino:" + ev.AgentName, "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) streamHeaderSent = true } progress("response_delta", contentDelta, map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, contentDelta) } } } else if !streamsMainAssistant(ev.AgentName) { var subDelta string subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content) if subDelta != "" { if progress != nil { if subReplyStreamID == "" { subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) progress("eino_agent_reply_stream_start", "", map[string]interface{}{ "streamId": subReplyStreamID, "einoAgent": ev.AgentName, "einoRole": "sub", "conversationId": conversationID, "source": "eino", }) } progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{ "streamId": subReplyStreamID, "conversationId": conversationID, }) } } } } if len(chunk.ToolCalls) > 0 { toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) } } } if streamsMainAssistant(ev.AgentName) { s := strings.TrimSpace(mainAssistantBuf) if mainAssistDupTarget != "" { executeStdoutDupMu.Lock() pendingExecuteStdoutDup = "" executeStdoutDupMu.Unlock() if s != "" && s == mainAssistDupTarget { // 与刚展示的 execute 结果完全一致:不再发助手流式事件,仍写入轨迹与最终回复字段 lastAssistant = s runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) } } else if s != "" { if progress != nil { // 仅用 TrimSpace 与 execute 比对;推到 UI 的必须是 mainAssistantBuf, // 否则尾部空白/换行与已流式前缀不一致时,前端 normalize 会走拼接路径造成叠字。 _, eofTail := normalizeStreamingDelta(mainAssistWireAccum, mainAssistantBuf) if eofTail != "" { if !streamHeaderSent { progress("response_start", "", map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "messageGeneratedBy": "eino:" + ev.AgentName, "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) } progress("response_delta", eofTail, map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) mainAssistWireAccum, _ = normalizeStreamingDelta(mainAssistWireAccum, eofTail) } } lastAssistant = s runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) } } } else if s != "" { lastAssistant = s runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) } } } if strings.TrimSpace(subAssistantBuf) != "" && progress != nil { if s := strings.TrimSpace(subAssistantBuf); s != "" { if subReplyStreamID != "" { progress("eino_agent_reply_stream_end", s, map[string]interface{}{ "streamId": subReplyStreamID, "einoAgent": ev.AgentName, "einoRole": "sub", "conversationId": conversationID, "source": "eino", }) } else { progress("eino_agent_reply", s, map[string]interface{}{ "conversationId": conversationID, "einoAgent": ev.AgentName, "einoRole": "sub", "source": "eino", }) } } } var lastToolChunk *schema.Message if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged}) } tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) // 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。 if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 { runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls)) } if streamRecvErr != nil { if isInterruptContinue(ctx) { return takePartial(streamRecvErr) } if progress != nil { progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), }) } if retErr := handleRunErr(streamRecvErr); retErr != nil { return takePartial(retErr) } } continue } msg, gerr := mv.GetMessage() if gerr != nil || msg == nil { continue } runAccumulatedMsgs = append(runAccumulatedMsgs, msg) tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) if mv.Role == schema.Assistant { if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { progress("reasoning_chain", openai.DisplayReasoningContent(strings.TrimSpace(msg.ReasoningContent)), map[string]interface{}{ "conversationId": conversationID, "source": "eino", "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), "orchestration": orchMode, }) } body := strings.TrimSpace(msg.Content) if body != "" { if streamsMainAssistant(ev.AgentName) { executeStdoutDupMu.Lock() dup := pendingExecuteStdoutDup if dup != "" && body == dup { pendingExecuteStdoutDup = "" executeStdoutDupMu.Unlock() lastAssistant = body if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) } // 非流式:与 execute 输出相同则跳过助手通道展示(msg 已在上方写入 runAccumulatedMsgs) } else { if dup != "" { pendingExecuteStdoutDup = "" } executeStdoutDupMu.Unlock() if progress != nil { progress("response_start", "", map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "messageGeneratedBy": "eino:" + ev.AgentName, "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) progress("response_delta", body, map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", "einoAgent": ev.AgentName, "orchestration": orchMode, }) } lastAssistant = body if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) } } } else if progress != nil { progress("eino_agent_reply", body, map[string]interface{}{ "conversationId": conversationID, "einoAgent": ev.AgentName, "einoRole": "sub", "source": "eino", }) } } } if mv.Role == schema.Tool && progress != nil { toolName := msg.ToolName if toolName == "" { toolName = mv.ToolName } content := msg.Content isErr := false if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { isErr = true content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) } preview := content if len(preview) > 200 { preview = preview[:200] + "..." } data := map[string]interface{}{ "toolName": toolName, "success": !isErr, "isError": isErr, "result": content, "resultPreview": preview, "conversationId": conversationID, "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), "source": "eino", } toolCallID := strings.TrimSpace(msg.ToolCallID) if toolCallID == "" { if inferred, ok := popNextPendingForAgent(ev.AgentName); ok { toolCallID = inferred.ToolCallID } else if inferred, ok := popNextPendingForAgent(orchestratorName); ok { toolCallID = inferred.ToolCallID } else if inferred, ok := popNextPendingForAgent(""); ok { toolCallID = inferred.ToolCallID } else { for id := range pendingByID { toolCallID = id delete(pendingByID, id) break } } } if toolCallID != "" { removePendingByID(toolCallID) if _, loaded := toolResultSent.LoadOrStore(toolCallID, struct{}{}); loaded { // ToolInvokeNotify 可能已推过 tool_result(如 execute 流式包装里 Fire 仅携带截断后的 stdout), // 此处仍应用 ADK Tool 消息中的完整内容刷新去重基准,避免模型复述全文时与截断串比对失败而重复展示「助手输出」。 recordPendingExecuteStdoutDup(toolName, content, isErr) continue } data["toolCallId"] = toolCallID } recordPendingExecuteStdoutDup(toolName, content, isErr) recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, toolName, toolCallID, runAccumulatedMsgs, content, isErr) progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } } mcpIDsMu.Lock() ids := append([]string(nil), *mcpIDs...) mcpIDsMu.Unlock() out := buildEinoRunResultFromAccumulated( orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false, ) return out, nil } func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message { if args != nil && args.ModelFacingTrace != nil { if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 { return snap } } return fallback } func einoPartialRunLastOutputHint() string { return "[执行未正常结束(用户停止、超时或异常)。续跑时请基于上文已产生的工具与结果继续,勿重复已完成步骤。]\n" + "[Run ended abnormally; continue from the trace above without repeating completed steps.]" } // friendlyEinoExecuteInvokeTail 将 Eino execute 等非 MCP 路径的结尾错误转成简短提示;其它情况保留原 error 文本。 func friendlyEinoExecuteInvokeTail(invokeErr error) string { if invokeErr == nil { return "" } if errors.Is(invokeErr, context.DeadlineExceeded) { return einoExecuteTimeoutUserHint() } return "[执行未正常结束] " + invokeErr.Error() } func buildEinoRunResultFromAccumulated( orchMode string, runAccumulatedMsgs []adk.Message, persistMsgs []adk.Message, lastAssistant string, lastPlanExecuteExecutor string, emptyHint string, mcpIDs []string, partial bool, ) *RunResult { traceForJSON := persistMsgs if len(traceForJSON) == 0 { traceForJSON = runAccumulatedMsgs } histJSON, _ := json.Marshal(traceForJSON) cleaned := strings.TrimSpace(lastAssistant) if orchMode == "plan_execute" { if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" { cleaned = e } else { cleaned = UnwrapPlanExecuteUserText(cleaned) } } if cleaned == "" { if fb := strings.TrimSpace(einoExtractFallbackAssistantFromMsgs(runAccumulatedMsgs)); fb != "" { cleaned = fb } } cleaned = dedupeRepeatedParagraphs(cleaned, 80) cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) // 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。 const maxResponseRunes = 100000 if rs := []rune(cleaned); len(rs) > maxResponseRunes { cleaned = string(rs[:maxResponseRunes]) + "\n\n... (response truncated / 响应已截断)" } lastOut := cleaned resp := cleaned if partial && cleaned == "" { lastOut = einoPartialRunLastOutputHint() resp = emptyHint } out := &RunResult{ Response: resp, MCPExecutionIDs: mcpIDs, LastAgentTraceInput: string(histJSON), LastAgentTraceOutput: lastOut, } if !partial && out.Response == "" { out.Response = emptyHint out.LastAgentTraceOutput = out.Response } return out } // einoExtractFallbackAssistantFromMsgs 在「主通道未产出助手正文」时,从 Eino ADK 轨迹中回填用户可见回复。 // 典型场景:监督者仅调用 exit(final_result 落在 Tool 消息中),或工具结果已写入历史但 lastAssistant 未更新。 // // 优先级:最后一次 exit 工具输出 → 最后一条含 exit 的助手 tool_calls 参数中的 final_result。 func einoExtractFallbackAssistantFromMsgs(msgs []adk.Message) string { for i := len(msgs) - 1; i >= 0; i-- { m := msgs[i] if m == nil || m.Role != schema.Tool { continue } if !strings.EqualFold(strings.TrimSpace(m.ToolName), adk.ToolInfoExit.Name) { continue } content := strings.TrimSpace(m.Content) if content == "" || strings.HasPrefix(content, einomcp.ToolErrorPrefix) { continue } return content } for i := len(msgs) - 1; i >= 0; i-- { m := msgs[i] if m == nil || m.Role != schema.Assistant { continue } if s := einoExtractExitFinalFromAssistantToolCalls(m); s != "" { return s } } return "" } func einoExtractExitFinalFromAssistantToolCalls(msg *schema.Message) string { if msg == nil || len(msg.ToolCalls) == 0 { return "" } for i := len(msg.ToolCalls) - 1; i >= 0; i-- { tc := msg.ToolCalls[i] if !strings.EqualFold(strings.TrimSpace(tc.Function.Name), adk.ToolInfoExit.Name) { continue } if s := einoParseExitFinalResultArguments(tc.Function.Arguments); s != "" { return s } } return "" } func einoParseExitFinalResultArguments(arguments string) string { arguments = strings.TrimSpace(arguments) if arguments == "" { return "" } var wrap struct { FinalResult json.RawMessage `json:"final_result"` } if err := json.Unmarshal([]byte(arguments), &wrap); err != nil || len(wrap.FinalResult) == 0 { return "" } var s string if err := json.Unmarshal(wrap.FinalResult, &s); err == nil { return strings.TrimSpace(s) } var anyVal interface{} if err := json.Unmarshal(wrap.FinalResult, &anyVal); err != nil { return "" } b, err := json.Marshal(anyVal) if err != nil { return "" } return strings.TrimSpace(string(b)) } func buildEinoCheckpointID(orchMode string) string { mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode)) if mode == "" { mode = "default" } return "runner-" + mode }