From 0569255189dbe87adc82a0f4358d0c6833aebea8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 20 May 2026 17:54:30 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 130 ++++++++++++++++++++++++++----------- internal/handler/config.go | 12 ++-- 2 files changed, 99 insertions(+), 43 deletions(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 119220b1..7a92b34b 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -724,6 +724,34 @@ func (h *AgentHandler) AgentLoop(c *gin.Context) { }) } +func (h *AgentHandler) finalizeRobotAgentError(ctx context.Context, assistantMessageID, conversationID string, resultMA *multiagent.RunResult, errMA error) (string, string, error) { + if shouldPersistEinoAgentTraceAfterRunError(ctx) { + h.persistEinoAgentTraceForResume(conversationID, resultMA) + } + errMsg := "执行失败: " + errMA.Error() + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) + _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) + } + return "", conversationID, errMA +} + +func (h *AgentHandler) finalizeRobotAgentSuccess(assistantMessageID, conversationID string, resultMA *multiagent.RunResult) (string, string, error) { + if assistantMessageID != "" { + if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, resultMA.Response, resultMA.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(resultMA.LastAgentTraceInput)); errU != nil { + h.logger.Warn("机器人:更新助手消息失败", zap.Error(errU)) + } + } else { + if _, err := h.db.AddMessage(conversationID, "assistant", resultMA.Response, resultMA.MCPExecutionIDs); err != nil { + h.logger.Warn("机器人:保存助手消息失败", zap.Error(err)) + } + } + if resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "" { + _ = h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput) + } + return resultMA.Response, conversationID, nil +} + // ProcessMessageForRobot 供机器人(企业微信/钉钉/飞书)调用:与 /api/agent-loop/stream 相同执行路径(含 progressCallback、过程详情),仅不发送 SSE,最后返回完整回复 func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, conversationID, message, role string) (response string, convID string, err error) { if conversationID == "" { @@ -780,53 +808,58 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con if assistantMsg != nil { assistantMessageID = assistantMsg.ID } - progressCallback := h.createProgressCallback(ctx, nil, conversationID, assistantMessageID, nil) - useRobotMulti := h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.RobotUseMultiAgent - if useRobotMulti { - resultMA, errMA := multiagent.RunDeepAgent( - ctx, - h.config, - &h.config.MultiAgent, - h.agent, - h.logger, - conversationID, - finalMessage, - agentHistoryMessages, - roleTools, - progressCallback, - h.agentsMarkdownDir, - "deep", - nil, + // 注册运行中任务并向 taskEventBus 镜像进度事件,供 Web 端 task-events 补流(与 agent-loop/stream 一致)。 + taskCtx, cancelWithCause := context.WithCancelCause(ctx) + defer cancelWithCause(nil) + taskStatus := "completed" + defer func() { + h.tasks.FinishTask(conversationID, taskStatus) + }() + if _, err := h.tasks.StartTask(conversationID, message, cancelWithCause); err != nil { + if errors.Is(err, ErrTaskAlreadyRunning) { + return "", conversationID, fmt.Errorf("当前会话已有任务正在执行中,请稍后再试") + } + return "", conversationID, fmt.Errorf("无法启动任务: %w", err) + } + progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, nil) + + robotMode := "react" + if h.config != nil { + robotMode = config.NormalizeRobotAgentMode(h.config.MultiAgent) + } + switch robotMode { + case "eino_single": + resultMA, errMA := multiagent.RunEinoSingleChatModelAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, + conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback, nil, ) if errMA != nil { - if shouldPersistEinoAgentTraceAfterRunError(ctx) { - h.persistEinoAgentTraceForResume(conversationID, resultMA) - } - errMsg := "执行失败: " + errMA.Error() - if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) - _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) - } - return "", conversationID, errMA + taskStatus = "failed" + return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) } - if assistantMessageID != "" { - if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, resultMA.Response, resultMA.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(resultMA.LastAgentTraceInput)); errU != nil { - h.logger.Warn("机器人:更新助手消息失败", zap.Error(errU)) - } - } else { - if _, err = h.db.AddMessage(conversationID, "assistant", resultMA.Response, resultMA.MCPExecutionIDs); err != nil { - h.logger.Warn("机器人:保存助手消息失败", zap.Error(err)) - } + return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) + case "deep", "plan_execute", "supervisor": + if h.config == nil || !h.config.MultiAgent.Enabled { + h.logger.Warn("机器人配置为多代理模式但未启用 multi_agent,回退原生 ReAct", + zap.String("robot_mode", robotMode)) + break } - if resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "" { - _ = h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput) + resultMA, errMA := multiagent.RunDeepAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, + conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback, + h.agentsMarkdownDir, robotMode, nil, + ) + if errMA != nil { + taskStatus = "failed" + return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) } - return resultMA.Response, conversationID, nil + return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA) } - result, err := h.agent.AgentLoopWithProgress(ctx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools) + result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools) if err != nil { + taskStatus = "failed" errMsg := "执行失败: " + err.Error() if assistantMessageID != "" { _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) @@ -858,6 +891,23 @@ type StreamEvent struct { Data interface{} `json:"data,omitempty"` } +// publishProgressToTaskEventBus 将进度事件镜像到 taskEventBus(机器人/无 HTTP SSE 客户端时供 Web task-events 订阅)。 +func (h *AgentHandler) publishProgressToTaskEventBus(conversationID, eventType, message string, data interface{}) { + if h == nil || h.taskEventBus == nil || strings.TrimSpace(conversationID) == "" { + return + } + event := StreamEvent{Type: eventType, Message: message, Data: data} + eventJSON, err := json.Marshal(event) + if err != nil { + return + } + sseLine := make([]byte, 0, len(eventJSON)+8) + sseLine = append(sseLine, []byte("data: ")...) + sseLine = append(sseLine, eventJSON...) + sseLine = append(sseLine, '\n', '\n') + h.taskEventBus.Publish(conversationID, sseLine) +} + // createProgressCallback 创建进度回调函数,用于保存processDetails // sendEventFunc: 可选的流式事件发送函数,如果为nil则不发送流式事件 func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun context.CancelCauseFunc, conversationID, assistantMessageID string, sendEventFunc func(eventType, message string, data interface{})) agent.ProgressCallback { @@ -967,9 +1017,11 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun } return func(eventType, message string, data interface{}) { - // 如果提供了sendEventFunc,发送流式事件 + // 流式:写 HTTP SSE;非流式(机器人等):镜像到 taskEventBus 供 Web 订阅 if sendEventFunc != nil { sendEventFunc(eventType, message, data) + } else { + h.publishProgressToTaskEventBus(conversationID, eventType, message, data) } // 保存tool_call事件中的参数 diff --git a/internal/handler/config.go b/internal/handler/config.go index 21239f7d..d1cb35f4 100644 --- a/internal/handler/config.go +++ b/internal/handler/config.go @@ -319,7 +319,7 @@ func (h *ConfigHandler) GetConfig(c *gin.Context) { } multiPub := config.MultiAgentPublic{ Enabled: h.config.MultiAgent.Enabled, - RobotUseMultiAgent: h.config.MultiAgent.RobotUseMultiAgent, + RobotDefaultAgentMode: config.NormalizeRobotAgentMode(h.config.MultiAgent), BatchUseMultiAgent: h.config.MultiAgent.BatchUseMultiAgent, SubAgentCount: subAgentCount, Orchestration: config.NormalizeMultiAgentOrchestration(h.config.MultiAgent.Orchestration), @@ -779,8 +779,12 @@ func (h *ConfigHandler) UpdateConfig(c *gin.Context) { // 多代理标量(sub_agents 等仍由 config.yaml 维护) if req.MultiAgent != nil { h.config.MultiAgent.Enabled = req.MultiAgent.Enabled - h.config.MultiAgent.RobotUseMultiAgent = req.MultiAgent.RobotUseMultiAgent h.config.MultiAgent.BatchUseMultiAgent = req.MultiAgent.BatchUseMultiAgent + if mode := strings.TrimSpace(req.MultiAgent.RobotDefaultAgentMode); mode != "" { + h.config.MultiAgent.RobotDefaultAgentMode = mode + } else { + h.config.MultiAgent.RobotDefaultAgentMode = "react" + } if req.MultiAgent.PlanExecuteLoopMaxIterations != nil { h.config.MultiAgent.PlanExecuteLoopMaxIterations = *req.MultiAgent.PlanExecuteLoopMaxIterations } @@ -789,7 +793,7 @@ func (h *ConfigHandler) UpdateConfig(c *gin.Context) { } h.logger.Info("更新多代理配置", zap.Bool("enabled", h.config.MultiAgent.Enabled), - zap.Bool("robot_use_multi_agent", h.config.MultiAgent.RobotUseMultiAgent), + zap.String("robot_default_agent_mode", config.NormalizeRobotAgentMode(h.config.MultiAgent)), zap.Bool("batch_use_multi_agent", h.config.MultiAgent.BatchUseMultiAgent), zap.Int("plan_execute_loop_max_iterations", h.config.MultiAgent.PlanExecuteLoopMaxIterations), zap.Int("tool_search_always_visible_tools", len(h.config.MultiAgent.EinoMiddleware.ToolSearchAlwaysVisibleTools)), @@ -1571,7 +1575,7 @@ func updateMultiAgentConfig(doc *yaml.Node, cfg config.MultiAgentConfig) { root := doc.Content[0] maNode := ensureMap(root, "multi_agent") setBoolInMap(maNode, "enabled", cfg.Enabled) - setBoolInMap(maNode, "robot_use_multi_agent", cfg.RobotUseMultiAgent) + setStringInMap(maNode, "robot_default_agent_mode", config.NormalizeRobotAgentMode(cfg)) setBoolInMap(maNode, "batch_use_multi_agent", cfg.BatchUseMultiAgent) setIntInMap(maNode, "plan_execute_loop_max_iterations", cfg.PlanExecuteLoopMaxIterations) mwNode := ensureMap(maNode, "eino_middleware")