From c43fde2612fc86c01998f8f5eb57a4cdf73ae083 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Mon, 20 Apr 2026 19:46:40 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 42 +++++++++- internal/multiagent/eino_orchestration.go | 82 ++++++++++++++++---- internal/multiagent/eino_single_runner.go | 3 +- internal/multiagent/plan_execute_executor.go | 4 +- internal/multiagent/runner.go | 22 +++++- internal/multiagent/tool_error_middleware.go | 12 +++ 6 files changed, 142 insertions(+), 23 deletions(-) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index a0f558f3..9aa497bd 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -79,6 +79,21 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mcpIDsMu := args.McpIDsMu mcpIDs := args.McpIDs + // panic recovery:防止 Eino 框架内部 panic 导致整个 goroutine 崩溃、连接无法正常关闭。 + defer func() { + if r := recover(); r != nil { + if logger != nil { + logger.Error("eino runner panic recovered", zap.Any("recover", r), zap.Stack("stack")) + } + if progress != nil { + progress("error", fmt.Sprintf("Internal error: %v / 内部错误: %v", r, r), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } + } + }() + var lastRunMsgs []adk.Message var lastAssistant string var lastPlanExecuteExecutor string @@ -86,7 +101,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs emptyHint := strings.TrimSpace(args.EmptyResponseMessage) if emptyHint == "" { - emptyHint = "(Eino 会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" + emptyHint = "(Eino session completed but no assistant text was captured. Check process details or logs.) " + + "(Eino 会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" } attemptLoop: @@ -191,6 +207,20 @@ attemptLoop: iter := runner.Run(ctx, msgs) for { + // 检测 context 取消(用户关闭浏览器、请求超时等),flush pending 工具状态避免 UI 卡在 "执行中"。 + select { + case <-ctx.Done(): + flushAllPendingAsFailed(ctx.Err()) + if progress != nil { + progress("error", "Request cancelled / 请求已取消", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } + return nil, ctx.Err() + default: + } + ev, ok := iter.Next() if !ok { lastRunMsgs = msgs @@ -308,7 +338,10 @@ attemptLoop: break } if logger != nil { - logger.Warn("eino stream recv", zap.Error(rerr)) + logger.Warn("eino stream recv error, flushing incomplete stream", + zap.Error(rerr), + zap.String("agent", ev.AgentName), + zap.Int("toolFragments", len(toolStreamFragments))) } break } @@ -531,6 +564,11 @@ attemptLoop: } cleaned = dedupeRepeatedParagraphs(cleaned, 80) cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) + // 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。 + const maxResponseRunes = 100000 + if rs := []rune(cleaned); len(rs) > maxResponseRunes { + cleaned = string(rs[:maxResponseRunes]) + "\n\n... (response truncated / 响应已截断)" + } out := &RunResult{ Response: cleaned, MCPExecutionIDs: ids, diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index 5e7c1a00..96d1ab2b 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -26,6 +26,13 @@ type PlanExecuteRootArgs struct { // AppCfg / Logger 非空时为 Executor 挂载与 Deep/Supervisor 一致的 Eino summarization 中间件。 AppCfg *config.Config Logger *zap.Logger + // ExecPreMiddlewares 是由 prependEinoMiddlewares 构建的前置中间件(patchtoolcalls, reduction, toolsearch, plantask), + // 与 Deep/Supervisor 主代理的 mainOrchestratorPre 一致。 + ExecPreMiddlewares []adk.ChatModelAgentMiddleware + // SkillMiddleware 是 Eino 官方 skill 渐进式披露中间件(可选)。 + SkillMiddleware adk.ChatModelAgentMiddleware + // FilesystemMiddleware 是 Eino filesystem 中间件,当 eino_skills.filesystem_tools 启用时提供本机文件读写与 Shell 能力(可选)。 + FilesystemMiddleware adk.ChatModelAgentMiddleware } // NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。 @@ -40,20 +47,39 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma if !ok { return nil, fmt.Errorf("plan_execute: 主模型需实现 ToolCallingChatModel") } - planner, err := planexecute.NewPlanner(ctx, &planexecute.PlannerConfig{ + plannerCfg := &planexecute.PlannerConfig{ ToolCallingChatModel: tcm, - }) + } + if fn := planExecutePlannerGenInput(a.OrchInstruction); fn != nil { + plannerCfg.GenInputFn = fn + } + planner, err := planexecute.NewPlanner(ctx, plannerCfg) if err != nil { return nil, fmt.Errorf("plan_execute planner: %w", err) } replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{ ChatModel: tcm, - GenInputFn: planExecuteReplannerGenInput, + GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction), }) if err != nil { return nil, fmt.Errorf("plan_execute replanner: %w", err) } + + // 组装 executor handler 栈,顺序与 Deep/Supervisor 主代理一致(outermost first)。 var execHandlers []adk.ChatModelAgentMiddleware + // 1. patchtoolcalls, reduction, toolsearch, plantask(来自 prependEinoMiddlewares) + if len(a.ExecPreMiddlewares) > 0 { + execHandlers = append(execHandlers, a.ExecPreMiddlewares...) + } + // 2. filesystem 中间件(可选) + if a.FilesystemMiddleware != nil { + execHandlers = append(execHandlers, a.FilesystemMiddleware) + } + // 3. skill 中间件(可选) + if a.SkillMiddleware != nil { + execHandlers = append(execHandlers, a.SkillMiddleware) + } + // 4. summarization(最后,与 Deep/Supervisor 一致) if a.AppCfg != nil { sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.Logger) if sumErr != nil { @@ -82,6 +108,21 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma }) } +// planExecutePlannerGenInput 将 orchestrator instruction 作为 SystemMessage 注入 planner 输入。 +// 返回 nil 时 Eino 使用内置默认 planner prompt。 +func planExecutePlannerGenInput(orchInstruction string) planexecute.GenPlannerModelInputFn { + oi := strings.TrimSpace(orchInstruction) + if oi == "" { + return nil + } + return func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) { + msgs := make([]adk.Message, 0, 1+len(userInput)) + msgs = append(msgs, schema.SystemMessage(oi)) + msgs = append(msgs, userInput...) + return msgs, nil + } +} + func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInputFn { oi := strings.TrimSpace(orchInstruction) return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { @@ -123,19 +164,30 @@ func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep) string { return sb.String() } -// planExecuteReplannerGenInput 与 Eino 默认 Replanner 输入一致,但 executed_steps 经 cap 后再写入 prompt。 -func planExecuteReplannerGenInput(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { - planContent, err := in.Plan.MarshalJSON() - if err != nil { - return nil, err +// planExecuteReplannerGenInput 与 Eino 默认 Replanner 输入一致,但 executed_steps 经 cap 后再写入 prompt, +// 且在 orchInstruction 非空时 prepend SystemMessage 使 replanner 也能接收全局指令。 +func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelInputFn { + oi := strings.TrimSpace(orchInstruction) + return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } + msgs, err := planexecute.ReplannerPrompt.Format(ctx, map[string]any{ + "plan": string(planContent), + "input": planExecuteFormatInput(in.UserInput), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "plan_tool": planexecute.PlanToolInfo.Name, + "respond_tool": planexecute.RespondToolInfo.Name, + }) + if err != nil { + return nil, err + } + if oi != "" { + msgs = append([]adk.Message{schema.SystemMessage(oi)}, msgs...) + } + return msgs, nil } - return planexecute.ReplannerPrompt.Format(ctx, map[string]any{ - "plan": string(planContent), - "input": planExecuteFormatInput(in.UserInput), - "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), - "plan_tool": planexecute.PlanToolInfo.Name, - "respond_tool": planexecute.RespondToolInfo.Name, - }) } // planExecuteStreamsMainAssistant 将规划/执行/重规划各阶段助手流式输出映射到主对话区。 diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index c1cd9ec6..e708b5db 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -212,6 +212,7 @@ func RunEinoSingleChatModelAgent( McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, DA: chatAgent, - EmptyResponseMessage: "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", + EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " + + "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs) } diff --git a/internal/multiagent/plan_execute_executor.go b/internal/multiagent/plan_execute_executor.go index d1af3a72..fe138803 100644 --- a/internal/multiagent/plan_execute_executor.go +++ b/internal/multiagent/plan_execute_executor.go @@ -23,13 +23,13 @@ func newPlanExecuteExecutor(ctx context.Context, cfg *planexecute.ExecutorConfig genInput := func(ctx context.Context, instruction string, _ *adk.AgentInput) ([]adk.Message, error) { plan, ok := adk.GetSessionValue(ctx, planexecute.PlanSessionKey) if !ok { - panic("impossible: plan not found") + return nil, fmt.Errorf("plan_execute executor: session value %q missing (possible session corruption)", planexecute.PlanSessionKey) } plan_ := plan.(planexecute.Plan) userInput, ok := adk.GetSessionValue(ctx, planexecute.UserInputSessionKey) if !ok { - panic("impossible: user input not found") + return nil, fmt.Errorf("plan_execute executor: session value %q missing (possible session corruption)", planexecute.UserInputSessionKey) } userInput_ := userInput.([]adk.Message) diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 8cd70127..c6e8e7e4 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -335,6 +335,9 @@ func RunDeepAgent( } sb.WriteString("你是监督协调者:可将任务通过 transfer 工具委派给下列专家子代理(使用其在系统中的 Agent 名称)。专家列表:") for _, sa := range subAgents { + if sa == nil { + continue + } sb.WriteString("\n- ") sb.WriteString(sa.Name(ctx)) } @@ -349,14 +352,15 @@ func RunDeepAgent( deepShell = einoLoc } - deepHandlers := []adk.ChatModelAgentMiddleware{} + // noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。 + deepHandlers := []adk.ChatModelAgentMiddleware{newNoNestedTaskMiddleware()} if len(mainOrchestratorPre) > 0 { deepHandlers = append(deepHandlers, mainOrchestratorPre...) } if einoSkillMW != nil { deepHandlers = append(deepHandlers, einoSkillMW) } - deepHandlers = append(deepHandlers, newNoNestedTaskMiddleware(), mainSumMw) + deepHandlers = append(deepHandlers, mainSumMw) supHandlers := []adk.ChatModelAgentMiddleware{} if len(mainOrchestratorPre) > 0 { @@ -387,6 +391,14 @@ func RunDeepAgent( if perr != nil { return nil, fmt.Errorf("plan_execute 执行器模型: %w", perr) } + // 构建 filesystem 中间件(与 Deep sub-agent 一致) + var peFsMw adk.ChatModelAgentMiddleware + if einoSkillMW != nil && einoFSTools && einoLoc != nil { + peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc) + if err != nil { + return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err) + } + } peRoot, perr := NewPlanExecuteRoot(ctx, &PlanExecuteRootArgs{ MainToolCallingModel: mainModel, ExecModel: execModel, @@ -396,6 +408,9 @@ func RunDeepAgent( LoopMaxIter: ma.PlanExecuteLoopMaxIterations, AppCfg: appCfg, Logger: logger, + ExecPreMiddlewares: mainOrchestratorPre, + SkillMiddleware: einoSkillMW, + FilesystemMiddleware: peFsMw, }) if perr != nil { return nil, perr @@ -493,7 +508,8 @@ func RunDeepAgent( McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, DA: da, - EmptyResponseMessage: "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", + EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " + + "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs) } diff --git a/internal/multiagent/tool_error_middleware.go b/internal/multiagent/tool_error_middleware.go index 10158fc2..147aaa29 100644 --- a/internal/multiagent/tool_error_middleware.go +++ b/internal/multiagent/tool_error_middleware.go @@ -3,6 +3,7 @@ package multiagent import ( "context" "encoding/json" + "errors" "fmt" "strings" @@ -44,6 +45,17 @@ func isSoftRecoverableToolError(err error) bool { if err == nil { return false } + + // 用户取消 — 不应重试,让 hard error 传播以终止编排。 + if errors.Is(err, context.Canceled) { + return false + } + + // 工具执行超时 — 转为 soft error 让 LLM 知晓并选择替代方案,而非全局重试。 + if errors.Is(err, context.DeadlineExceeded) { + return true + } + s := strings.ToLower(err.Error()) // JSON unmarshal/parse failures — the model generated truncated or malformed arguments.