From a38dd2b4a81fdbe3db1f80f86f9295fba3d01397 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 19 Apr 2026 04:42:35 +0800 Subject: [PATCH] Add files via upload --- internal/app/app.go | 3 + internal/multiagent/eino_adk_run_loop.go | 545 ++++++++++++++++++++++ internal/multiagent/eino_single_runner.go | 217 +++++++++ internal/multiagent/runner.go | 498 +------------------- 4 files changed, 780 insertions(+), 483 deletions(-) create mode 100644 internal/multiagent/eino_adk_run_loop.go create mode 100644 internal/multiagent/eino_single_runner.go diff --git a/internal/app/app.go b/internal/app/app.go index f4a0b026..6128150f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -609,6 +609,9 @@ func setupRoutes( protected.POST("/agent-loop", agentHandler.AgentLoop) // Agent Loop 流式输出 protected.POST("/agent-loop/stream", agentHandler.AgentLoopStream) + // Eino ADK 单代理(ChatModelAgent + Runner;不依赖 multi_agent.enabled) + protected.POST("/eino-agent", agentHandler.EinoSingleAgentLoop) + protected.POST("/eino-agent/stream", agentHandler.EinoSingleAgentLoopStream) // Agent Loop 取消与任务列表 protected.POST("/agent-loop/cancel", agentHandler.CancelAgentLoop) protected.GET("/agent-loop/tasks", agentHandler.ListAgentTasks) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go new file mode 100644 index 00000000..a0f558f3 --- /dev/null +++ b/internal/multiagent/eino_adk_run_loop.go @@ -0,0 +1,545 @@ +package multiagent + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "path/filepath" + "strings" + "sync" + "sync/atomic" + + "cyberstrike-ai/internal/einomcp" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" + "go.uber.org/zap" +) + +// einoADKRunLoopArgs 将 Eino adk.Runner 事件循环从 RunDeepAgent / RunEinoSingleChatModelAgent 中抽出复用。 +type einoADKRunLoopArgs struct { + OrchMode string + OrchestratorName string + ConversationID string + Progress func(eventType, message string, data interface{}) + Logger *zap.Logger + SnapshotMCPIDs func() []string + StreamsMainAssistant func(agent string) bool + EinoRoleTag func(agent string) string + CheckpointDir string + + McpIDsMu *sync.Mutex + McpIDs *[]string + + DA adk.Agent + + // EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。 + EmptyResponseMessage string +} + +func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) { + if args == nil || args.DA == nil { + return nil, fmt.Errorf("eino run loop: args 或 Agent 为空") + } + if args.McpIDs == nil { + s := []string{} + args.McpIDs = &s + } + if args.McpIDsMu == nil { + args.McpIDsMu = &sync.Mutex{} + } + + orchMode := args.OrchMode + orchestratorName := args.OrchestratorName + conversationID := args.ConversationID + progress := args.Progress + logger := args.Logger + snapshotMCPIDs := args.SnapshotMCPIDs + if snapshotMCPIDs == nil { + snapshotMCPIDs = func() []string { return nil } + } + streamsMainAssistant := args.StreamsMainAssistant + if streamsMainAssistant == nil { + streamsMainAssistant = func(agent string) bool { + return agent == "" || agent == orchestratorName + } + } + einoRoleTag := args.EinoRoleTag + if einoRoleTag == nil { + einoRoleTag = func(agent string) string { + if streamsMainAssistant(agent) { + return "orchestrator" + } + return "sub" + } + } + da := args.DA + mcpIDsMu := args.McpIDsMu + mcpIDs := args.McpIDs + + var lastRunMsgs []adk.Message + var lastAssistant string + var lastPlanExecuteExecutor string + var retryHints []adk.Message + + emptyHint := strings.TrimSpace(args.EmptyResponseMessage) + if emptyHint == "" { + emptyHint = "(Eino 会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" + } + +attemptLoop: + for attempt := 0; attempt < maxToolCallRecoveryAttempts; attempt++ { + msgs := make([]adk.Message, 0, len(baseMsgs)+len(retryHints)) + msgs = append(msgs, baseMsgs...) + msgs = append(msgs, retryHints...) + + if attempt > 0 { + mcpIDsMu.Lock() + *mcpIDs = (*mcpIDs)[:0] + mcpIDsMu.Unlock() + } + + lastAssistant = "" + lastPlanExecuteExecutor = "" + var reasoningStreamSeq int64 + var einoSubReplyStreamSeq int64 + toolEmitSeen := make(map[string]struct{}) + var einoMainRound int + var einoLastAgent string + subAgentToolStep := make(map[string]int) + pendingByID := make(map[string]toolCallPendingInfo) + pendingQueueByAgent := make(map[string][]string) + markPending := func(tc toolCallPendingInfo) { + if tc.ToolCallID == "" { + return + } + pendingByID[tc.ToolCallID] = tc + pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) + } + popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { + q := pendingQueueByAgent[agentName] + for len(q) > 0 { + id := q[0] + q = q[1:] + pendingQueueByAgent[agentName] = q + if tc, ok := pendingByID[id]; ok { + delete(pendingByID, id) + return tc, true + } + } + return toolCallPendingInfo{}, false + } + removePendingByID := func(toolCallID string) { + if toolCallID == "" { + return + } + delete(pendingByID, toolCallID) + } + flushAllPendingAsFailed := func(err error) { + if progress == nil { + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + return + } + msg := "" + if err != nil { + msg = err.Error() + } + for _, tc := range pendingByID { + toolName := tc.ToolName + if strings.TrimSpace(toolName) == "" { + toolName = "unknown" + } + progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ + "toolName": toolName, + "success": false, + "isError": true, + "result": msg, + "resultPreview": msg, + "toolCallId": tc.ToolCallID, + "conversationId": conversationID, + "einoAgent": tc.EinoAgent, + "einoRole": tc.EinoRole, + "source": "eino", + }) + } + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + } + + runnerCfg := adk.RunnerConfig{ + Agent: da, + EnableStreaming: true, + } + if cp := strings.TrimSpace(args.CheckpointDir); cp != "" { + cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID)) + st, stErr := newFileCheckPointStore(cpDir) + if stErr != nil { + if logger != nil { + logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr)) + } + } else { + runnerCfg.CheckPointStore = st + if logger != nil { + logger.Info("eino runner: checkpoint store enabled", zap.String("dir", cpDir)) + } + } + } + runner := adk.NewRunner(ctx, runnerCfg) + iter := runner.Run(ctx, msgs) + + for { + ev, ok := iter.Next() + if !ok { + lastRunMsgs = msgs + break attemptLoop + } + if ev == nil { + continue + } + if ev.Err != nil { + canRetry := attempt+1 < maxToolCallRecoveryAttempts + + if canRetry && isRecoverableToolCallArgumentsJSONError(ev.Err) { + if logger != nil { + logger.Warn("eino: recoverable tool-call JSON error from model/API", zap.Error(ev.Err), zap.Int("attempt", attempt)) + } + retryHints = append(retryHints, toolCallArgumentsJSONRetryHint()) + if progress != nil { + progress("eino_recovery", toolCallArgumentsJSONRecoveryTimelineMessage(attempt), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "einoRetry": attempt, + "runIndex": attempt + 1, + "maxRuns": maxToolCallRecoveryAttempts, + "reason": "invalid_tool_arguments_json", + }) + } + continue attemptLoop + } + + if canRetry && isRecoverableToolExecutionError(ev.Err) { + if logger != nil { + logger.Warn("eino: recoverable tool execution error, will retry with corrective hint", + zap.Error(ev.Err), zap.Int("attempt", attempt)) + } + flushAllPendingAsFailed(ev.Err) + retryHints = append(retryHints, toolExecutionRetryHint()) + if progress != nil { + progress("eino_recovery", toolExecutionRecoveryTimelineMessage(attempt), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "einoRetry": attempt, + "runIndex": attempt + 1, + "maxRuns": maxToolCallRecoveryAttempts, + "reason": "tool_execution_error", + }) + } + continue attemptLoop + } + + flushAllPendingAsFailed(ev.Err) + if progress != nil { + progress("error", ev.Err.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } + return nil, ev.Err + } + if ev.AgentName != "" && progress != nil { + iterEinoAgent := orchestratorName + if orchMode == "plan_execute" { + if a := strings.TrimSpace(ev.AgentName); a != "" { + iterEinoAgent = a + } + } + if streamsMainAssistant(ev.AgentName) { + if einoMainRound == 0 { + einoMainRound = 1 + progress("iteration", "", map[string]interface{}{ + "iteration": 1, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": iterEinoAgent, + "orchestration": orchMode, + "conversationId": conversationID, + "source": "eino", + }) + } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { + einoMainRound++ + progress("iteration", "", map[string]interface{}{ + "iteration": einoMainRound, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": iterEinoAgent, + "orchestration": orchMode, + "conversationId": conversationID, + "source": "eino", + }) + } + } + einoLastAgent = ev.AgentName + progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{ + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, + }) + } + if ev.Output == nil || ev.Output.MessageOutput == nil { + continue + } + mv := ev.Output.MessageOutput + + if mv.IsStreaming && mv.MessageStream != nil { + streamHeaderSent := false + var reasoningStreamID string + var toolStreamFragments []schema.ToolCall + var subAssistantBuf strings.Builder + var subReplyStreamID string + var mainAssistantBuf strings.Builder + for { + chunk, rerr := mv.MessageStream.Recv() + if rerr != nil { + if errors.Is(rerr, io.EOF) { + break + } + if logger != nil { + logger.Warn("eino stream recv", zap.Error(rerr)) + } + break + } + if chunk == nil { + continue + } + if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { + if reasoningStreamID == "" { + reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) + progress("thinking_stream_start", " ", map[string]interface{}{ + "streamId": reasoningStreamID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, + }) + } + progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ + "streamId": reasoningStreamID, + }) + } + if chunk.Content != "" { + if progress != nil && streamsMainAssistant(ev.AgentName) { + if !streamHeaderSent { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + streamHeaderSent = true + } + progress("response_delta", chunk.Content, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + mainAssistantBuf.WriteString(chunk.Content) + } else if !streamsMainAssistant(ev.AgentName) { + if progress != nil { + if subReplyStreamID == "" { + subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) + progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } + progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{ + "streamId": subReplyStreamID, + "conversationId": conversationID, + }) + } + subAssistantBuf.WriteString(chunk.Content) + } + } + if len(chunk.ToolCalls) > 0 { + toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) + } + } + if streamsMainAssistant(ev.AgentName) { + if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { + lastAssistant = s + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) + } + } + } + if subAssistantBuf.Len() > 0 && progress != nil { + if s := strings.TrimSpace(subAssistantBuf.String()); s != "" { + if subReplyStreamID != "" { + progress("eino_agent_reply_stream_end", s, map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } else { + progress("eino_agent_reply", s, map[string]interface{}{ + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "source": "eino", + }) + } + } + } + var lastToolChunk *schema.Message + if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { + lastToolChunk = &schema.Message{ToolCalls: merged} + } + tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + continue + } + + msg, gerr := mv.GetMessage() + if gerr != nil || msg == nil { + continue + } + tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + + if mv.Role == schema.Assistant { + if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { + progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, + }) + } + body := strings.TrimSpace(msg.Content) + if body != "" { + if streamsMainAssistant(ev.AgentName) { + if progress != nil { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + progress("response_delta", body, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + } + lastAssistant = body + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) + } + } else if progress != nil { + progress("eino_agent_reply", body, map[string]interface{}{ + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "source": "eino", + }) + } + } + } + + if mv.Role == schema.Tool && progress != nil { + toolName := msg.ToolName + if toolName == "" { + toolName = mv.ToolName + } + + content := msg.Content + isErr := false + if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { + isErr = true + content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) + } + + preview := content + if len(preview) > 200 { + preview = preview[:200] + "..." + } + data := map[string]interface{}{ + "toolName": toolName, + "success": !isErr, + "isError": isErr, + "result": content, + "resultPreview": preview, + "conversationId": conversationID, + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "source": "eino", + } + toolCallID := strings.TrimSpace(msg.ToolCallID) + if toolCallID == "" { + if inferred, ok := popNextPendingForAgent(ev.AgentName); ok { + toolCallID = inferred.ToolCallID + } else if inferred, ok := popNextPendingForAgent(orchestratorName); ok { + toolCallID = inferred.ToolCallID + } else if inferred, ok := popNextPendingForAgent(""); ok { + toolCallID = inferred.ToolCallID + } else { + for id := range pendingByID { + toolCallID = id + delete(pendingByID, id) + break + } + } + } else { + removePendingByID(toolCallID) + } + if toolCallID != "" { + data["toolCallId"] = toolCallID + } + progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) + } + } + } + + mcpIDsMu.Lock() + ids := append([]string(nil), *mcpIDs...) + mcpIDsMu.Unlock() + + histJSON, _ := json.Marshal(lastRunMsgs) + cleaned := strings.TrimSpace(lastAssistant) + if orchMode == "plan_execute" { + if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" { + cleaned = e + } else { + cleaned = UnwrapPlanExecuteUserText(cleaned) + } + } + cleaned = dedupeRepeatedParagraphs(cleaned, 80) + cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) + out := &RunResult{ + Response: cleaned, + MCPExecutionIDs: ids, + LastReActInput: string(histJSON), + LastReActOutput: cleaned, + } + if out.Response == "" { + out.Response = emptyHint + out.LastReActOutput = out.Response + } + return out, nil +} diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go new file mode 100644 index 00000000..c1cd9ec6 --- /dev/null +++ b/internal/multiagent/eino_single_runner.go @@ -0,0 +1,217 @@ +package multiagent + +import ( + "context" + "fmt" + "net" + "net/http" + "strings" + "sync" + "time" + + "cyberstrike-ai/internal/agent" + "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/einomcp" + "cyberstrike-ai/internal/openai" + + einoopenai "github.com/cloudwego/eino-ext/components/model/openai" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" + "go.uber.org/zap" +) + +// einoSingleAgentName 与 ChatModelAgent.Name 一致,供流式事件映射主对话区。 +const einoSingleAgentName = "cyberstrike-eino-single" + +// RunEinoSingleChatModelAgent 使用 Eino adk.NewChatModelAgent + adk.NewRunner.Run(官方 Quick Start 的 Query 同属 Runner API;此处用历史 + 用户消息切片等价于多轮 Query)。 +// 不替代既有原生 ReAct;与 RunDeepAgent 共享 runEinoADKAgentLoop 的 SSE 映射与 MCP 桥。 +func RunEinoSingleChatModelAgent( + ctx context.Context, + appCfg *config.Config, + ma *config.MultiAgentConfig, + ag *agent.Agent, + logger *zap.Logger, + conversationID string, + userMessage string, + history []agent.ChatMessage, + roleTools []string, + roleSkills []string, + progress func(eventType, message string, data interface{}), +) (*RunResult, error) { + if appCfg == nil || ag == nil { + return nil, fmt.Errorf("eino single: 配置或 Agent 为空") + } + if ma == nil { + return nil, fmt.Errorf("eino single: multi_agent 配置为空") + } + + einoLoc, einoSkillMW, einoFSTools, skillsRoot, einoErr := prepareEinoSkills(ctx, appCfg.SkillsDir, ma, logger) + if einoErr != nil { + return nil, einoErr + } + + holder := &einomcp.ConversationHolder{} + holder.Set(conversationID) + + var mcpIDsMu sync.Mutex + var mcpIDs []string + recorder := func(id string) { + if id == "" { + return + } + mcpIDsMu.Lock() + mcpIDs = append(mcpIDs, id) + mcpIDsMu.Unlock() + } + + snapshotMCPIDs := func() []string { + mcpIDsMu.Lock() + defer mcpIDsMu.Unlock() + out := make([]string, len(mcpIDs)) + copy(out, mcpIDs) + return out + } + + toolOutputChunk := func(toolName, toolCallID, chunk string) { + if progress == nil || toolCallID == "" { + return + } + progress("tool_result_delta", chunk, map[string]interface{}{ + "toolName": toolName, + "toolCallId": toolCallID, + "index": 0, + "total": 0, + "iteration": 0, + "source": "eino", + }) + } + + mainDefs := ag.ToolsForRole(roleTools) + mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk) + if err != nil { + return nil, err + } + + mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger) + if err != nil { + return nil, fmt.Errorf("eino single eino 中间件: %w", err) + } + + httpClient := &http.Client{ + Timeout: 30 * time.Minute, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 300 * time.Second, + KeepAlive: 300 * time.Second, + }).DialContext, + MaxIdleConns: 100, + MaxIdleConnsPerHost: 10, + IdleConnTimeout: 90 * time.Second, + TLSHandshakeTimeout: 30 * time.Second, + ResponseHeaderTimeout: 60 * time.Minute, + }, + } + httpClient = openai.NewEinoHTTPClient(&appCfg.OpenAI, httpClient) + + baseModelCfg := &einoopenai.ChatModelConfig{ + APIKey: appCfg.OpenAI.APIKey, + BaseURL: strings.TrimSuffix(appCfg.OpenAI.BaseURL, "/"), + Model: appCfg.OpenAI.Model, + HTTPClient: httpClient, + } + + mainModel, err := einoopenai.NewChatModel(ctx, baseModelCfg) + if err != nil { + return nil, fmt.Errorf("eino single 模型: %w", err) + } + + mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger) + if err != nil { + return nil, fmt.Errorf("eino single summarization: %w", err) + } + + handlers := make([]adk.ChatModelAgentMiddleware, 0, 4) + if len(mainOrchestratorPre) > 0 { + handlers = append(handlers, mainOrchestratorPre...) + } + if einoSkillMW != nil { + if einoFSTools && einoLoc != nil { + fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc) + if fsErr != nil { + return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr) + } + handlers = append(handlers, fsMw) + } + handlers = append(handlers, einoSkillMW) + } + handlers = append(handlers, mainSumMw) + + maxIter := ma.MaxIteration + if maxIter <= 0 { + maxIter = appCfg.Agent.MaxIterations + } + if maxIter <= 0 { + maxIter = 40 + } + + mainToolsCfg := adk.ToolsConfig{ + ToolsNodeConfig: compose.ToolsNodeConfig{ + Tools: mainToolsForCfg, + UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), + ToolCallMiddlewares: []compose.ToolMiddleware{ + {Invokable: softRecoveryToolCallMiddleware()}, + }, + }, + EmitInternalEvents: true, + } + + chatCfg := &adk.ChatModelAgentConfig{ + Name: einoSingleAgentName, + Description: "Eino ADK ChatModelAgent with MCP tools for authorized security testing.", + Instruction: ag.EinoSingleAgentSystemInstruction(roleSkills), + Model: mainModel, + ToolsConfig: mainToolsCfg, + MaxIterations: maxIter, + Handlers: handlers, + } + outKey, modelRetry, _ := deepExtrasFromConfig(ma) + if outKey != "" { + chatCfg.OutputKey = outKey + } + if modelRetry != nil { + chatCfg.ModelRetryConfig = modelRetry + } + + chatAgent, err := adk.NewChatModelAgent(ctx, chatCfg) + if err != nil { + return nil, fmt.Errorf("eino single NewChatModelAgent: %w", err) + } + + baseMsgs := historyToMessages(history) + baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) + + streamsMainAssistant := func(agent string) bool { + return agent == "" || agent == einoSingleAgentName + } + einoRoleTag := func(agent string) string { + _ = agent + return "orchestrator" + } + + return runEinoADKAgentLoop(ctx, &einoADKRunLoopArgs{ + OrchMode: "eino_single", + OrchestratorName: einoSingleAgentName, + ConversationID: conversationID, + Progress: progress, + Logger: logger, + SnapshotMCPIDs: snapshotMCPIDs, + StreamsMainAssistant: streamsMainAssistant, + EinoRoleTag: einoRoleTag, + CheckpointDir: ma.EinoMiddleware.CheckpointDir, + McpIDsMu: &mcpIDsMu, + McpIDs: &mcpIDs, + DA: chatAgent, + EmptyResponseMessage: "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", + }, baseMsgs) +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 49e33bff..8cd70127 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -4,16 +4,12 @@ package multiagent import ( "context" "encoding/json" - "errors" "fmt" - "io" "net" - "path/filepath" "net/http" "sort" "strings" "sync" - "sync/atomic" "time" "cyberstrike-ai/internal/agent" @@ -484,485 +480,21 @@ func RunDeepAgent( return "sub" } - var lastRunMsgs []adk.Message - var lastAssistant string - // plan_execute:最后一轮 assistant 常被 replanner 的 JSON 覆盖,单独保留 executor 对用户文本。 - var lastPlanExecuteExecutor string - - // retryHints tracks the corrective hint to append for each retry attempt. - // Index i corresponds to the hint that will be appended on attempt i+1. - var retryHints []adk.Message - -attemptLoop: - for attempt := 0; attempt < maxToolCallRecoveryAttempts; attempt++ { - msgs := make([]adk.Message, 0, len(baseMsgs)+len(retryHints)) - msgs = append(msgs, baseMsgs...) - msgs = append(msgs, retryHints...) - - if attempt > 0 { - mcpIDsMu.Lock() - mcpIDs = mcpIDs[:0] - mcpIDsMu.Unlock() - } - - // 仅保留主代理最后一次 assistant 输出;每轮重试重置,避免拼接失败轮次的片段。 - lastAssistant = "" - lastPlanExecuteExecutor = "" - var reasoningStreamSeq int64 - var einoSubReplyStreamSeq int64 - toolEmitSeen := make(map[string]struct{}) - var einoMainRound int - var einoLastAgent string - subAgentToolStep := make(map[string]int) - // Track tool calls emitted in this attempt so we can: - // - attach toolCallId to tool_result when framework omits it - // - flush running tool calls as failed when a recoverable tool execution error happens - pendingByID := make(map[string]toolCallPendingInfo) - pendingQueueByAgent := make(map[string][]string) - markPending := func(tc toolCallPendingInfo) { - if tc.ToolCallID == "" { - return - } - pendingByID[tc.ToolCallID] = tc - pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) - } - popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { - q := pendingQueueByAgent[agentName] - for len(q) > 0 { - id := q[0] - q = q[1:] - pendingQueueByAgent[agentName] = q - if tc, ok := pendingByID[id]; ok { - delete(pendingByID, id) - return tc, true - } - } - return toolCallPendingInfo{}, false - } - removePendingByID := func(toolCallID string) { - if toolCallID == "" { - return - } - delete(pendingByID, toolCallID) - // queue cleanup is lazy in popNextPendingForAgent - } - flushAllPendingAsFailed := func(err error) { - if progress == nil { - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) - return - } - msg := "" - if err != nil { - msg = err.Error() - } - for _, tc := range pendingByID { - toolName := tc.ToolName - if strings.TrimSpace(toolName) == "" { - toolName = "unknown" - } - progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ - "toolName": toolName, - "success": false, - "isError": true, - "result": msg, - "resultPreview": msg, - "toolCallId": tc.ToolCallID, - "conversationId": conversationID, - "einoAgent": tc.EinoAgent, - "einoRole": tc.EinoRole, - "source": "eino", - }) - } - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) - } - - runnerCfg := adk.RunnerConfig{ - Agent: da, - EnableStreaming: true, - } - if cp := strings.TrimSpace(ma.EinoMiddleware.CheckpointDir); cp != "" { - cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID)) - st, stErr := newFileCheckPointStore(cpDir) - if stErr != nil { - if logger != nil { - logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr)) - } - } else { - runnerCfg.CheckPointStore = st - if logger != nil { - logger.Info("eino runner: checkpoint store enabled", zap.String("dir", cpDir)) - } - } - } - runner := adk.NewRunner(ctx, runnerCfg) - iter := runner.Run(ctx, msgs) - - for { - ev, ok := iter.Next() - if !ok { - lastRunMsgs = msgs - break attemptLoop - } - if ev == nil { - continue - } - if ev.Err != nil { - canRetry := attempt+1 < maxToolCallRecoveryAttempts - - // Recoverable: API-level JSON argument validation error. - if canRetry && isRecoverableToolCallArgumentsJSONError(ev.Err) { - if logger != nil { - logger.Warn("eino: recoverable tool-call JSON error from model/API", zap.Error(ev.Err), zap.Int("attempt", attempt)) - } - retryHints = append(retryHints, toolCallArgumentsJSONRetryHint()) - if progress != nil { - progress("eino_recovery", toolCallArgumentsJSONRecoveryTimelineMessage(attempt), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "einoRetry": attempt, - "runIndex": attempt + 1, - "maxRuns": maxToolCallRecoveryAttempts, - "reason": "invalid_tool_arguments_json", - }) - } - continue attemptLoop - } - - // Recoverable: tool execution error (unknown sub-agent, tool not found, bad JSON in args, etc.). - if canRetry && isRecoverableToolExecutionError(ev.Err) { - if logger != nil { - logger.Warn("eino: recoverable tool execution error, will retry with corrective hint", - zap.Error(ev.Err), zap.Int("attempt", attempt)) - } - // Ensure UI/tool timeline doesn't get stuck at "running" for tool calls that - // will never receive a proper tool_result due to the recoverable error. - flushAllPendingAsFailed(ev.Err) - retryHints = append(retryHints, toolExecutionRetryHint()) - if progress != nil { - progress("eino_recovery", toolExecutionRecoveryTimelineMessage(attempt), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "einoRetry": attempt, - "runIndex": attempt + 1, - "maxRuns": maxToolCallRecoveryAttempts, - "reason": "tool_execution_error", - }) - } - continue attemptLoop - } - - // Non-recoverable error. - flushAllPendingAsFailed(ev.Err) - if progress != nil { - progress("error", ev.Err.Error(), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - }) - } - return nil, ev.Err - } - if ev.AgentName != "" && progress != nil { - iterEinoAgent := orchestratorName - if orchMode == "plan_execute" { - if a := strings.TrimSpace(ev.AgentName); a != "" { - iterEinoAgent = a - } - } - if streamsMainAssistant(ev.AgentName) { - if einoMainRound == 0 { - einoMainRound = 1 - progress("iteration", "", map[string]interface{}{ - "iteration": 1, - "einoScope": "main", - "einoRole": "orchestrator", - "einoAgent": iterEinoAgent, - "orchestration": orchMode, - "conversationId": conversationID, - "source": "eino", - }) - } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { - einoMainRound++ - progress("iteration", "", map[string]interface{}{ - "iteration": einoMainRound, - "einoScope": "main", - "einoRole": "orchestrator", - "einoAgent": iterEinoAgent, - "orchestration": orchMode, - "conversationId": conversationID, - "source": "eino", - }) - } - } - einoLastAgent = ev.AgentName - progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{ - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "orchestration": orchMode, - }) - } - if ev.Output == nil || ev.Output.MessageOutput == nil { - continue - } - mv := ev.Output.MessageOutput - - if mv.IsStreaming && mv.MessageStream != nil { - streamHeaderSent := false - var reasoningStreamID string - var toolStreamFragments []schema.ToolCall - var subAssistantBuf strings.Builder - var subReplyStreamID string - var mainAssistantBuf strings.Builder - for { - chunk, rerr := mv.MessageStream.Recv() - if rerr != nil { - if errors.Is(rerr, io.EOF) { - break - } - if logger != nil { - logger.Warn("eino stream recv", zap.Error(rerr)) - } - break - } - if chunk == nil { - continue - } - if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - if reasoningStreamID == "" { - reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) - progress("thinking_stream_start", " ", map[string]interface{}{ - "streamId": reasoningStreamID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "orchestration": orchMode, - }) - } - progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ - "streamId": reasoningStreamID, - }) - } - if chunk.Content != "" { - if progress != nil && streamsMainAssistant(ev.AgentName) { - if !streamHeaderSent { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - streamHeaderSent = true - } - progress("response_delta", chunk.Content, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - mainAssistantBuf.WriteString(chunk.Content) - } else if !streamsMainAssistant(ev.AgentName) { - if progress != nil { - if subReplyStreamID == "" { - subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) - progress("eino_agent_reply_stream_start", "", map[string]interface{}{ - "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "conversationId": conversationID, - "source": "eino", - }) - } - progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{ - "streamId": subReplyStreamID, - "conversationId": conversationID, - }) - } - subAssistantBuf.WriteString(chunk.Content) - } - } - // 收集流式 tool_calls 全部分片;arguments 在最后一帧常为 "",需按 index/id 合并后才能展示 subagent_type/description。 - if len(chunk.ToolCalls) > 0 { - toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) - } - } - if streamsMainAssistant(ev.AgentName) { - if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { - lastAssistant = s - if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { - lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) - } - } - } - if subAssistantBuf.Len() > 0 && progress != nil { - if s := strings.TrimSpace(subAssistantBuf.String()); s != "" { - if subReplyStreamID != "" { - progress("eino_agent_reply_stream_end", s, map[string]interface{}{ - "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "conversationId": conversationID, - "source": "eino", - }) - } else { - progress("eino_agent_reply", s, map[string]interface{}{ - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "source": "eino", - }) - } - } - } - var lastToolChunk *schema.Message - if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { - lastToolChunk = &schema.Message{ToolCalls: merged} - } - tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) - continue - } - - msg, gerr := mv.GetMessage() - if gerr != nil || msg == nil { - continue - } - tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) - - if mv.Role == schema.Assistant { - if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { - progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "orchestration": orchMode, - }) - } - body := strings.TrimSpace(msg.Content) - if body != "" { - if streamsMainAssistant(ev.AgentName) { - if progress != nil { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - progress("response_delta", body, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - } - lastAssistant = body - if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { - lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) - } - } else if progress != nil { - progress("eino_agent_reply", body, map[string]interface{}{ - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": "sub", - "source": "eino", - }) - } - } - } - - if mv.Role == schema.Tool && progress != nil { - toolName := msg.ToolName - if toolName == "" { - toolName = mv.ToolName - } - - // bridge 工具在 res.IsError=true 时会返回带前缀的内容;这里解析为 success/isError,避免前端误判为成功。 - content := msg.Content - isErr := false - if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { - isErr = true - content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) - } - - preview := content - if len(preview) > 200 { - preview = preview[:200] + "..." - } - data := map[string]interface{}{ - "toolName": toolName, - "success": !isErr, - "isError": isErr, - "result": content, - "resultPreview": preview, - "conversationId": conversationID, - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "source": "eino", - } - toolCallID := strings.TrimSpace(msg.ToolCallID) - // Some framework paths (e.g. UnknownToolsHandler) may omit ToolCallID on tool messages. - // Infer from the tool_call emission order for this agent to keep UI state consistent. - if toolCallID == "" { - // In some internal tool execution paths, ev.AgentName may be empty for tool-role - // messages. Try several fallbacks to avoid leaving UI tool_call status stuck. - if inferred, ok := popNextPendingForAgent(ev.AgentName); ok { - toolCallID = inferred.ToolCallID - } else if inferred, ok := popNextPendingForAgent(orchestratorName); ok { - toolCallID = inferred.ToolCallID - } else if inferred, ok := popNextPendingForAgent(""); ok { - toolCallID = inferred.ToolCallID - } else { - // last resort: pick any pending toolCallID - for id := range pendingByID { - toolCallID = id - delete(pendingByID, id) - break - } - } - } else { - removePendingByID(toolCallID) - } - if toolCallID != "" { - data["toolCallId"] = toolCallID - } - progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) - } - } - } - - mcpIDsMu.Lock() - ids := append([]string(nil), mcpIDs...) - mcpIDsMu.Unlock() - - histJSON, _ := json.Marshal(lastRunMsgs) - cleaned := strings.TrimSpace(lastAssistant) - if orchMode == "plan_execute" { - if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" { - cleaned = e - } else { - cleaned = UnwrapPlanExecuteUserText(cleaned) - } - } - cleaned = dedupeRepeatedParagraphs(cleaned, 80) - cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) - out := &RunResult{ - Response: cleaned, - MCPExecutionIDs: ids, - LastReActInput: string(histJSON), - LastReActOutput: cleaned, - } - if out.Response == "" { - out.Response = "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" - out.LastReActOutput = out.Response - } - return out, nil + return runEinoADKAgentLoop(ctx, &einoADKRunLoopArgs{ + OrchMode: orchMode, + OrchestratorName: orchestratorName, + ConversationID: conversationID, + Progress: progress, + Logger: logger, + SnapshotMCPIDs: snapshotMCPIDs, + StreamsMainAssistant: streamsMainAssistant, + EinoRoleTag: einoRoleTag, + CheckpointDir: ma.EinoMiddleware.CheckpointDir, + McpIDsMu: &mcpIDsMu, + McpIDs: &mcpIDs, + DA: da, + EmptyResponseMessage: "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", + }, baseMsgs) } func historyToMessages(history []agent.ChatMessage) []adk.Message {