Add files via upload

This commit is contained in:
公明
2026-05-20 17:54:30 +08:00
committed by GitHub
parent 8ccf90d067
commit 0569255189
2 changed files with 99 additions and 43 deletions
+91 -39
View File
@@ -724,6 +724,34 @@ func (h *AgentHandler) AgentLoop(c *gin.Context) {
})
}
func (h *AgentHandler) finalizeRobotAgentError(ctx context.Context, assistantMessageID, conversationID string, resultMA *multiagent.RunResult, errMA error) (string, string, error) {
if shouldPersistEinoAgentTraceAfterRunError(ctx) {
h.persistEinoAgentTraceForResume(conversationID, resultMA)
}
errMsg := "执行失败: " + errMA.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
}
return "", conversationID, errMA
}
func (h *AgentHandler) finalizeRobotAgentSuccess(assistantMessageID, conversationID string, resultMA *multiagent.RunResult) (string, string, error) {
if assistantMessageID != "" {
if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, resultMA.Response, resultMA.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(resultMA.LastAgentTraceInput)); errU != nil {
h.logger.Warn("机器人:更新助手消息失败", zap.Error(errU))
}
} else {
if _, err := h.db.AddMessage(conversationID, "assistant", resultMA.Response, resultMA.MCPExecutionIDs); err != nil {
h.logger.Warn("机器人:保存助手消息失败", zap.Error(err))
}
}
if resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "" {
_ = h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput)
}
return resultMA.Response, conversationID, nil
}
// ProcessMessageForRobot 供机器人(企业微信/钉钉/飞书)调用:与 /api/agent-loop/stream 相同执行路径(含 progressCallback、过程详情),仅不发送 SSE,最后返回完整回复
func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, conversationID, message, role string) (response string, convID string, err error) {
if conversationID == "" {
@@ -780,53 +808,58 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con
if assistantMsg != nil {
assistantMessageID = assistantMsg.ID
}
progressCallback := h.createProgressCallback(ctx, nil, conversationID, assistantMessageID, nil)
useRobotMulti := h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.RobotUseMultiAgent
if useRobotMulti {
resultMA, errMA := multiagent.RunDeepAgent(
ctx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
conversationID,
finalMessage,
agentHistoryMessages,
roleTools,
progressCallback,
h.agentsMarkdownDir,
"deep",
nil,
// 注册运行中任务并向 taskEventBus 镜像进度事件,供 Web 端 task-events 补流(与 agent-loop/stream 一致)。
taskCtx, cancelWithCause := context.WithCancelCause(ctx)
defer cancelWithCause(nil)
taskStatus := "completed"
defer func() {
h.tasks.FinishTask(conversationID, taskStatus)
}()
if _, err := h.tasks.StartTask(conversationID, message, cancelWithCause); err != nil {
if errors.Is(err, ErrTaskAlreadyRunning) {
return "", conversationID, fmt.Errorf("当前会话已有任务正在执行中,请稍后再试")
}
return "", conversationID, fmt.Errorf("无法启动任务: %w", err)
}
progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, nil)
robotMode := "react"
if h.config != nil {
robotMode = config.NormalizeRobotAgentMode(h.config.MultiAgent)
}
switch robotMode {
case "eino_single":
resultMA, errMA := multiagent.RunEinoSingleChatModelAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback, nil,
)
if errMA != nil {
if shouldPersistEinoAgentTraceAfterRunError(ctx) {
h.persistEinoAgentTraceForResume(conversationID, resultMA)
}
errMsg := "执行失败: " + errMA.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
}
return "", conversationID, errMA
taskStatus = "failed"
return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA)
}
if assistantMessageID != "" {
if errU := h.db.UpdateAssistantMessageFinalize(assistantMessageID, resultMA.Response, resultMA.MCPExecutionIDs, multiagent.AggregatedReasoningFromTraceJSON(resultMA.LastAgentTraceInput)); errU != nil {
h.logger.Warn("机器人:更新助手消息失败", zap.Error(errU))
}
} else {
if _, err = h.db.AddMessage(conversationID, "assistant", resultMA.Response, resultMA.MCPExecutionIDs); err != nil {
h.logger.Warn("机器人:保存助手消息失败", zap.Error(err))
}
return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA)
case "deep", "plan_execute", "supervisor":
if h.config == nil || !h.config.MultiAgent.Enabled {
h.logger.Warn("机器人配置为多代理模式但未启用 multi_agent,回退原生 ReAct",
zap.String("robot_mode", robotMode))
break
}
if resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "" {
_ = h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput)
resultMA, errMA := multiagent.RunDeepAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, finalMessage, agentHistoryMessages, roleTools, progressCallback,
h.agentsMarkdownDir, robotMode, nil,
)
if errMA != nil {
taskStatus = "failed"
return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA)
}
return resultMA.Response, conversationID, nil
return h.finalizeRobotAgentSuccess(assistantMessageID, conversationID, resultMA)
}
result, err := h.agent.AgentLoopWithProgress(ctx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools)
result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools)
if err != nil {
taskStatus = "failed"
errMsg := "执行失败: " + err.Error()
if assistantMessageID != "" {
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
@@ -858,6 +891,23 @@ type StreamEvent struct {
Data interface{} `json:"data,omitempty"`
}
// publishProgressToTaskEventBus 将进度事件镜像到 taskEventBus(机器人/无 HTTP SSE 客户端时供 Web task-events 订阅)。
func (h *AgentHandler) publishProgressToTaskEventBus(conversationID, eventType, message string, data interface{}) {
if h == nil || h.taskEventBus == nil || strings.TrimSpace(conversationID) == "" {
return
}
event := StreamEvent{Type: eventType, Message: message, Data: data}
eventJSON, err := json.Marshal(event)
if err != nil {
return
}
sseLine := make([]byte, 0, len(eventJSON)+8)
sseLine = append(sseLine, []byte("data: ")...)
sseLine = append(sseLine, eventJSON...)
sseLine = append(sseLine, '\n', '\n')
h.taskEventBus.Publish(conversationID, sseLine)
}
// createProgressCallback 创建进度回调函数,用于保存processDetails
// sendEventFunc: 可选的流式事件发送函数,如果为nil则不发送流式事件
func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun context.CancelCauseFunc, conversationID, assistantMessageID string, sendEventFunc func(eventType, message string, data interface{})) agent.ProgressCallback {
@@ -967,9 +1017,11 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun
}
return func(eventType, message string, data interface{}) {
// 如果提供了sendEventFunc,发送流式事件
// 流式:写 HTTP SSE;非流式(机器人等):镜像到 taskEventBus 供 Web 订阅
if sendEventFunc != nil {
sendEventFunc(eventType, message, data)
} else {
h.publishProgressToTaskEventBus(conversationID, eventType, message, data)
}
// 保存tool_call事件中的参数
+8 -4
View File
@@ -319,7 +319,7 @@ func (h *ConfigHandler) GetConfig(c *gin.Context) {
}
multiPub := config.MultiAgentPublic{
Enabled: h.config.MultiAgent.Enabled,
RobotUseMultiAgent: h.config.MultiAgent.RobotUseMultiAgent,
RobotDefaultAgentMode: config.NormalizeRobotAgentMode(h.config.MultiAgent),
BatchUseMultiAgent: h.config.MultiAgent.BatchUseMultiAgent,
SubAgentCount: subAgentCount,
Orchestration: config.NormalizeMultiAgentOrchestration(h.config.MultiAgent.Orchestration),
@@ -779,8 +779,12 @@ func (h *ConfigHandler) UpdateConfig(c *gin.Context) {
// 多代理标量(sub_agents 等仍由 config.yaml 维护)
if req.MultiAgent != nil {
h.config.MultiAgent.Enabled = req.MultiAgent.Enabled
h.config.MultiAgent.RobotUseMultiAgent = req.MultiAgent.RobotUseMultiAgent
h.config.MultiAgent.BatchUseMultiAgent = req.MultiAgent.BatchUseMultiAgent
if mode := strings.TrimSpace(req.MultiAgent.RobotDefaultAgentMode); mode != "" {
h.config.MultiAgent.RobotDefaultAgentMode = mode
} else {
h.config.MultiAgent.RobotDefaultAgentMode = "react"
}
if req.MultiAgent.PlanExecuteLoopMaxIterations != nil {
h.config.MultiAgent.PlanExecuteLoopMaxIterations = *req.MultiAgent.PlanExecuteLoopMaxIterations
}
@@ -789,7 +793,7 @@ func (h *ConfigHandler) UpdateConfig(c *gin.Context) {
}
h.logger.Info("更新多代理配置",
zap.Bool("enabled", h.config.MultiAgent.Enabled),
zap.Bool("robot_use_multi_agent", h.config.MultiAgent.RobotUseMultiAgent),
zap.String("robot_default_agent_mode", config.NormalizeRobotAgentMode(h.config.MultiAgent)),
zap.Bool("batch_use_multi_agent", h.config.MultiAgent.BatchUseMultiAgent),
zap.Int("plan_execute_loop_max_iterations", h.config.MultiAgent.PlanExecuteLoopMaxIterations),
zap.Int("tool_search_always_visible_tools", len(h.config.MultiAgent.EinoMiddleware.ToolSearchAlwaysVisibleTools)),
@@ -1571,7 +1575,7 @@ func updateMultiAgentConfig(doc *yaml.Node, cfg config.MultiAgentConfig) {
root := doc.Content[0]
maNode := ensureMap(root, "multi_agent")
setBoolInMap(maNode, "enabled", cfg.Enabled)
setBoolInMap(maNode, "robot_use_multi_agent", cfg.RobotUseMultiAgent)
setStringInMap(maNode, "robot_default_agent_mode", config.NormalizeRobotAgentMode(cfg))
setBoolInMap(maNode, "batch_use_multi_agent", cfg.BatchUseMultiAgent)
setIntInMap(maNode, "plan_execute_loop_max_iterations", cfg.PlanExecuteLoopMaxIterations)
mwNode := ensureMap(maNode, "eino_middleware")