Files
CyberStrikeAI/internal/multiagent/eino_adk_run_loop.go
T
2026-04-28 11:40:09 +08:00

736 lines
23 KiB
Go

package multiagent
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"cyberstrike-ai/internal/einomcp"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
"go.uber.org/zap"
)
func isEinoIterationLimitError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(strings.TrimSpace(err.Error()))
if msg == "" {
return false
}
return strings.Contains(msg, "max iteration") ||
strings.Contains(msg, "maximum iteration") ||
strings.Contains(msg, "maximum iterations") ||
strings.Contains(msg, "iteration limit") ||
strings.Contains(msg, "达到最大迭代")
}
// einoADKRunLoopArgs 将 Eino adk.Runner 事件循环从 RunDeepAgent / RunEinoSingleChatModelAgent 中抽出复用。
type einoADKRunLoopArgs struct {
OrchMode string
OrchestratorName string
ConversationID string
Progress func(eventType, message string, data interface{})
Logger *zap.Logger
SnapshotMCPIDs func() []string
StreamsMainAssistant func(agent string) bool
EinoRoleTag func(agent string) string
CheckpointDir string
McpIDsMu *sync.Mutex
McpIDs *[]string
DA adk.Agent
// EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。
EmptyResponseMessage string
}
func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) {
if args == nil || args.DA == nil {
return nil, fmt.Errorf("eino run loop: args 或 Agent 为空")
}
if args.McpIDs == nil {
s := []string{}
args.McpIDs = &s
}
if args.McpIDsMu == nil {
args.McpIDsMu = &sync.Mutex{}
}
orchMode := args.OrchMode
orchestratorName := args.OrchestratorName
conversationID := args.ConversationID
progress := args.Progress
logger := args.Logger
snapshotMCPIDs := args.SnapshotMCPIDs
if snapshotMCPIDs == nil {
snapshotMCPIDs = func() []string { return nil }
}
streamsMainAssistant := args.StreamsMainAssistant
if streamsMainAssistant == nil {
streamsMainAssistant = func(agent string) bool {
return agent == "" || agent == orchestratorName
}
}
einoRoleTag := args.EinoRoleTag
if einoRoleTag == nil {
einoRoleTag = func(agent string) string {
if streamsMainAssistant(agent) {
return "orchestrator"
}
return "sub"
}
}
da := args.DA
mcpIDsMu := args.McpIDsMu
mcpIDs := args.McpIDs
// panic recovery:防止 Eino 框架内部 panic 导致整个 goroutine 崩溃、连接无法正常关闭。
defer func() {
if r := recover(); r != nil {
if logger != nil {
logger.Error("eino runner panic recovered", zap.Any("recover", r), zap.Stack("stack"))
}
if progress != nil {
progress("error", fmt.Sprintf("Internal error: %v / 内部错误: %v", r, r), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
}
}()
var lastAssistant string
var lastPlanExecuteExecutor string
msgs := append([]adk.Message(nil), baseMsgs...)
runAccumulatedMsgs := append([]adk.Message(nil), msgs...)
baseAccumulatedCount := len(runAccumulatedMsgs)
emptyHint := strings.TrimSpace(args.EmptyResponseMessage)
if emptyHint == "" {
emptyHint = "(Eino session completed but no assistant text was captured. Check process details or logs.) " +
"(Eino 会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)"
}
lastAssistant = ""
lastPlanExecuteExecutor = ""
var reasoningStreamSeq int64
var einoSubReplyStreamSeq int64
toolEmitSeen := make(map[string]struct{})
var einoMainRound int
var einoLastAgent string
subAgentToolStep := make(map[string]int)
pendingByID := make(map[string]toolCallPendingInfo)
pendingQueueByAgent := make(map[string][]string)
markPending := func(tc toolCallPendingInfo) {
if tc.ToolCallID == "" {
return
}
pendingByID[tc.ToolCallID] = tc
pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID)
}
popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) {
q := pendingQueueByAgent[agentName]
for len(q) > 0 {
id := q[0]
q = q[1:]
pendingQueueByAgent[agentName] = q
if tc, ok := pendingByID[id]; ok {
delete(pendingByID, id)
return tc, true
}
}
return toolCallPendingInfo{}, false
}
removePendingByID := func(toolCallID string) {
if toolCallID == "" {
return
}
delete(pendingByID, toolCallID)
}
flushAllPendingAsFailed := func(err error) {
if progress == nil {
pendingByID = make(map[string]toolCallPendingInfo)
pendingQueueByAgent = make(map[string][]string)
return
}
msg := ""
if err != nil {
msg = err.Error()
}
for _, tc := range pendingByID {
toolName := tc.ToolName
if strings.TrimSpace(toolName) == "" {
toolName = "unknown"
}
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
"toolName": toolName,
"success": false,
"isError": true,
"result": msg,
"resultPreview": msg,
"toolCallId": tc.ToolCallID,
"conversationId": conversationID,
"einoAgent": tc.EinoAgent,
"einoRole": tc.EinoRole,
"source": "eino",
})
}
pendingByID = make(map[string]toolCallPendingInfo)
pendingQueueByAgent = make(map[string][]string)
}
runnerCfg := adk.RunnerConfig{
Agent: da,
EnableStreaming: true,
}
var cpStore *fileCheckPointStore
var checkPointID string
if cp := strings.TrimSpace(args.CheckpointDir); cp != "" {
cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID))
st, stErr := newFileCheckPointStore(cpDir)
if stErr != nil {
if logger != nil {
logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr))
}
} else {
cpStore = st
checkPointID = buildEinoCheckpointID(orchMode)
runnerCfg.CheckPointStore = st
if logger != nil {
logger.Info("eino runner: checkpoint store enabled",
zap.String("dir", cpDir),
zap.String("checkPointID", checkPointID))
}
}
}
runner := adk.NewRunner(ctx, runnerCfg)
var iter *adk.AsyncIterator[*adk.AgentEvent]
if cpStore != nil && checkPointID != "" {
if _, existed, getErr := cpStore.Get(ctx, checkPointID); getErr != nil {
if logger != nil {
logger.Warn("eino checkpoint preflight get failed", zap.String("checkPointID", checkPointID), zap.Error(getErr))
}
} else if existed {
if progress != nil {
progress("progress", "检测到断点,正在从中断节点恢复执行...", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"checkPointID": checkPointID,
})
}
if logger != nil {
logger.Info("eino runner: resume from checkpoint", zap.String("checkPointID", checkPointID))
}
resumeIter, resumeErr := runner.Resume(ctx, checkPointID)
if resumeErr == nil {
iter = resumeIter
} else {
if logger != nil {
logger.Warn("eino runner: resume failed, fallback to fresh run",
zap.String("checkPointID", checkPointID),
zap.Error(resumeErr))
}
if progress != nil {
progress("progress", "断点恢复失败,已回退为全新执行。", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"checkPointID": checkPointID,
})
}
}
}
}
if iter == nil {
if checkPointID != "" {
iter = runner.Run(ctx, msgs, adk.WithCheckPointID(checkPointID))
} else {
iter = runner.Run(ctx, msgs)
}
}
handleRunErr := func(runErr error) error {
if runErr == nil {
return nil
}
if errors.Is(runErr, context.DeadlineExceeded) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"errorKind": "timeout",
})
}
return runErr
}
// context.Canceled 是唯一应当直接终止编排的错误(用户关闭页面、主动停止等)。
if errors.Is(runErr, context.Canceled) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return runErr
}
if isEinoIterationLimitError(runErr) {
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("iteration_limit_reached", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
})
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"errorKind": "iteration_limit",
})
}
return runErr
}
flushAllPendingAsFailed(runErr)
if progress != nil {
progress("error", runErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return runErr
}
takePartial := func(runErr error) (*RunResult, error) {
if len(runAccumulatedMsgs) <= baseAccumulatedCount {
return nil, runErr
}
ids := snapshotMCPIDs()
return buildEinoRunResultFromAccumulated(
orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, true,
), runErr
}
for {
// 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。
select {
case <-ctx.Done():
flushAllPendingAsFailed(ctx.Err())
if progress != nil {
progress("error", "Request cancelled / 请求已取消", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return takePartial(ctx.Err())
default:
}
ev, ok := iter.Next()
if !ok {
// iter 结束并不总是“正常完成”:
// 当取消/超时发生在 iter.Next() 阻塞期间时,可能直接返回 !ok。
// 此时必须保留 checkpoint,避免后续恢复时被误判为“无断点”而全量重跑。
if ctxErr := ctx.Err(); ctxErr != nil {
flushAllPendingAsFailed(ctxErr)
if progress != nil {
progress("error", ctxErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
})
}
return takePartial(ctxErr)
}
if len(pendingByID) > 0 {
orphanCount := len(pendingByID)
flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion"))
if progress != nil {
progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"orchestration": orchMode,
"pendingCount": orphanCount,
})
}
}
if cpStore != nil && checkPointID != "" {
if p, pErr := cpStore.path(checkPointID); pErr == nil {
if rmErr := os.Remove(p); rmErr != nil && !os.IsNotExist(rmErr) && logger != nil {
logger.Warn("eino checkpoint cleanup failed", zap.String("path", p), zap.Error(rmErr))
}
}
}
break
}
if ev == nil {
continue
}
if ev.Err != nil {
if retErr := handleRunErr(ev.Err); retErr != nil {
return takePartial(retErr)
}
}
if ev.AgentName != "" && progress != nil {
iterEinoAgent := orchestratorName
if orchMode == "plan_execute" {
if a := strings.TrimSpace(ev.AgentName); a != "" {
iterEinoAgent = a
}
}
if streamsMainAssistant(ev.AgentName) {
if einoMainRound == 0 {
einoMainRound = 1
progress("iteration", "", map[string]interface{}{
"iteration": 1,
"einoScope": "main",
"einoRole": "orchestrator",
"einoAgent": iterEinoAgent,
"orchestration": orchMode,
"conversationId": conversationID,
"source": "eino",
})
} else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) {
einoMainRound++
progress("iteration", "", map[string]interface{}{
"iteration": einoMainRound,
"einoScope": "main",
"einoRole": "orchestrator",
"einoAgent": iterEinoAgent,
"orchestration": orchMode,
"conversationId": conversationID,
"source": "eino",
})
}
}
einoLastAgent = ev.AgentName
progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{
"conversationId": conversationID,
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
})
}
if ev.Output == nil || ev.Output.MessageOutput == nil {
continue
}
mv := ev.Output.MessageOutput
if mv.IsStreaming && mv.MessageStream != nil {
streamHeaderSent := false
var reasoningStreamID string
var toolStreamFragments []schema.ToolCall
var subAssistantBuf strings.Builder
var subReplyStreamID string
var mainAssistantBuf strings.Builder
var streamRecvErr error
for {
chunk, rerr := mv.MessageStream.Recv()
if rerr != nil {
if errors.Is(rerr, io.EOF) {
break
}
if logger != nil {
logger.Warn("eino stream recv error, flushing incomplete stream",
zap.Error(rerr),
zap.String("agent", ev.AgentName),
zap.Int("toolFragments", len(toolStreamFragments)))
}
streamRecvErr = rerr
break
}
if chunk == nil {
continue
}
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
if reasoningStreamID == "" {
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
progress("thinking_stream_start", " ", map[string]interface{}{
"streamId": reasoningStreamID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
})
}
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
"streamId": reasoningStreamID,
})
}
if chunk.Content != "" {
if progress != nil && streamsMainAssistant(ev.AgentName) {
if !streamHeaderSent {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
streamHeaderSent = true
}
progress("response_delta", chunk.Content, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
mainAssistantBuf.WriteString(chunk.Content)
} else if !streamsMainAssistant(ev.AgentName) {
if progress != nil {
if subReplyStreamID == "" {
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
}
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
"streamId": subReplyStreamID,
"conversationId": conversationID,
})
}
subAssistantBuf.WriteString(chunk.Content)
}
}
if len(chunk.ToolCalls) > 0 {
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
}
}
if streamsMainAssistant(ev.AgentName) {
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
lastAssistant = s
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
}
}
}
if subAssistantBuf.Len() > 0 && progress != nil {
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
if subReplyStreamID != "" {
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
"streamId": subReplyStreamID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"conversationId": conversationID,
"source": "eino",
})
} else {
progress("eino_agent_reply", s, map[string]interface{}{
"conversationId": conversationID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"source": "eino",
})
}
}
}
var lastToolChunk *schema.Message
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
lastToolChunk = &schema.Message{ToolCalls: merged}
}
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
if streamRecvErr != nil {
if progress != nil {
progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
})
}
if retErr := handleRunErr(streamRecvErr); retErr != nil {
return takePartial(retErr)
}
}
continue
}
msg, gerr := mv.GetMessage()
if gerr != nil || msg == nil {
continue
}
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
if mv.Role == schema.Assistant {
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"orchestration": orchMode,
})
}
body := strings.TrimSpace(msg.Content)
if body != "" {
if streamsMainAssistant(ev.AgentName) {
if progress != nil {
progress("response_start", "", map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"messageGeneratedBy": "eino:" + ev.AgentName,
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
progress("response_delta", body, map[string]interface{}{
"conversationId": conversationID,
"mcpExecutionIds": snapshotMCPIDs(),
"einoRole": "orchestrator",
"einoAgent": ev.AgentName,
"orchestration": orchMode,
})
}
lastAssistant = body
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
}
} else if progress != nil {
progress("eino_agent_reply", body, map[string]interface{}{
"conversationId": conversationID,
"einoAgent": ev.AgentName,
"einoRole": "sub",
"source": "eino",
})
}
}
}
if mv.Role == schema.Tool && progress != nil {
toolName := msg.ToolName
if toolName == "" {
toolName = mv.ToolName
}
content := msg.Content
isErr := false
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
isErr = true
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
}
preview := content
if len(preview) > 200 {
preview = preview[:200] + "..."
}
data := map[string]interface{}{
"toolName": toolName,
"success": !isErr,
"isError": isErr,
"result": content,
"resultPreview": preview,
"conversationId": conversationID,
"einoAgent": ev.AgentName,
"einoRole": einoRoleTag(ev.AgentName),
"source": "eino",
}
toolCallID := strings.TrimSpace(msg.ToolCallID)
if toolCallID == "" {
if inferred, ok := popNextPendingForAgent(ev.AgentName); ok {
toolCallID = inferred.ToolCallID
} else if inferred, ok := popNextPendingForAgent(orchestratorName); ok {
toolCallID = inferred.ToolCallID
} else if inferred, ok := popNextPendingForAgent(""); ok {
toolCallID = inferred.ToolCallID
} else {
for id := range pendingByID {
toolCallID = id
delete(pendingByID, id)
break
}
}
} else {
removePendingByID(toolCallID)
}
if toolCallID != "" {
data["toolCallId"] = toolCallID
}
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
}
}
mcpIDsMu.Lock()
ids := append([]string(nil), *mcpIDs...)
mcpIDsMu.Unlock()
out := buildEinoRunResultFromAccumulated(
orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false,
)
return out, nil
}
func einoPartialRunLastOutputHint() string {
return "[执行未正常结束(用户停止、超时或异常)。续跑时请基于上文已产生的工具与结果继续,勿重复已完成步骤。]\n" +
"[Run ended abnormally; continue from the trace above without repeating completed steps.]"
}
func buildEinoRunResultFromAccumulated(
orchMode string,
runAccumulatedMsgs []adk.Message,
lastAssistant string,
lastPlanExecuteExecutor string,
emptyHint string,
mcpIDs []string,
partial bool,
) *RunResult {
histJSON, _ := json.Marshal(runAccumulatedMsgs)
cleaned := strings.TrimSpace(lastAssistant)
if orchMode == "plan_execute" {
if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" {
cleaned = e
} else {
cleaned = UnwrapPlanExecuteUserText(cleaned)
}
}
cleaned = dedupeRepeatedParagraphs(cleaned, 80)
cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100)
// 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。
const maxResponseRunes = 100000
if rs := []rune(cleaned); len(rs) > maxResponseRunes {
cleaned = string(rs[:maxResponseRunes]) + "\n\n... (response truncated / 响应已截断)"
}
lastOut := cleaned
resp := cleaned
if partial && cleaned == "" {
lastOut = einoPartialRunLastOutputHint()
resp = emptyHint
}
out := &RunResult{
Response: resp,
MCPExecutionIDs: mcpIDs,
LastAgentTraceInput: string(histJSON),
LastAgentTraceOutput: lastOut,
}
if !partial && out.Response == "" {
out.Response = emptyHint
out.LastAgentTraceOutput = out.Response
}
return out
}
func buildEinoCheckpointID(orchMode string) string {
mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode))
if mode == "" {
mode = "default"
}
return "runner-" + mode
}