diff --git a/internal/handler/agent.go b/internal/handler/agent.go index b799d912..b3599c3f 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -725,7 +725,9 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI "deep", ) if errMA != nil { - h.persistEinoAgentTraceForResume(conversationID, resultMA) + if shouldPersistEinoAgentTraceAfterRunError(ctx) { + h.persistEinoAgentTraceForResume(conversationID, resultMA) + } errMsg := "执行失败: " + errMA.Error() if assistantMessageID != "" { _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) @@ -2576,7 +2578,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { } if runErr != nil { - if useRunResult { + if useRunResult && shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(conversationID, resultMA) } // 检查是否是取消错误 @@ -2632,16 +2634,6 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { h.logger.Warn("保存取消消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(errMsg)) } } - // 保存代理轨迹(如果存在) - if result != nil && (result.LastAgentTraceInput != "" || result.LastAgentTraceOutput != "") { - if err := h.db.SaveAgentTrace(conversationID, result.LastAgentTraceInput, result.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存取消任务的代理轨迹失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(err)) - } - } else if useRunResult && resultMA != nil && (resultMA.LastAgentTraceInput != "" || resultMA.LastAgentTraceOutput != "") { - if err := h.db.SaveAgentTrace(conversationID, resultMA.LastAgentTraceInput, resultMA.LastAgentTraceOutput); err != nil { - h.logger.Warn("保存取消任务的代理轨迹失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(err)) - } - } h.batchTaskManager.UpdateTaskStatusWithConversationID(queueID, task.ID, "cancelled", cancelMsg, "", conversationID) } else { h.logger.Error("批量任务执行失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("conversationId", conversationID), zap.Error(runErr)) diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index 58ac331f..6801d93e 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -123,7 +123,14 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { roleTools := prep.RoleTools taskStatus := "completed" - defer h.tasks.FinishTask(conversationID, taskStatus) + // 仅在成功 StartTask 后再 FinishTask。若 StartTask 因 ErrTaskAlreadyRunning 失败仍 defer FinishTask, + // 会误删其他连接上正在运行的同会话任务,导致「第一次拦截、第二次却放行」。 + taskOwned := false + defer func() { + if taskOwned { + h.tasks.FinishTask(conversationID, taskStatus) + } + }() sendEvent("progress", "正在启动 Eino ADK 单代理(ChatModelAgent)...", map[string]interface{}{ "conversationId": conversationID, @@ -168,6 +175,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { timeoutCancel() return } + taskOwned = true firstRun = false } else { if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil { @@ -203,8 +211,10 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { break } - h.persistEinoAgentTraceForResume(conversationID, result) cause := context.Cause(baseCtx) + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + h.persistEinoAgentTraceForResume(conversationID, result) + } if errors.Is(cause, ErrUserInterruptContinue) { reason := h.tasks.TakeInterruptContinueReason(conversationID) prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools) @@ -374,7 +384,9 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { progressCallback, ) if runErr != nil { - h.persistEinoAgentTraceForResume(prep.ConversationID, result) + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + h.persistEinoAgentTraceForResume(prep.ConversationID, result) + } c.JSON(http.StatusInternalServerError, gin.H{"error": runErr.Error()}) return } diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index 0fd9d546..4793a28d 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -141,7 +141,13 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { orch := strings.TrimSpace(req.Orchestration) taskStatus := "completed" - defer h.tasks.FinishTask(conversationID, taskStatus) + // 仅在成功 StartTask 后再 FinishTask;避免「任务已存在」分支 return 时误删正在运行的同会话任务。 + taskOwned := false + defer func() { + if taskOwned { + h.tasks.FinishTask(conversationID, taskStatus) + } + }() sendEvent("progress", "正在启动 Eino 多代理...", map[string]interface{}{ "conversationId": conversationID, @@ -178,6 +184,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { timeoutCancel() return } + taskOwned = true firstRun = false } else { if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil { @@ -215,8 +222,10 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { break } - h.persistEinoAgentTraceForResume(conversationID, result) cause := context.Cause(baseCtx) + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + h.persistEinoAgentTraceForResume(conversationID, result) + } if errors.Is(cause, ErrUserInterruptContinue) { reason := h.tasks.TakeInterruptContinueReason(conversationID) prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools) @@ -388,7 +397,9 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { strings.TrimSpace(req.Orchestration), ) if runErr != nil { - h.persistEinoAgentTraceForResume(prep.ConversationID, result) + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + h.persistEinoAgentTraceForResume(prep.ConversationID, result) + } h.logger.Error("Eino DeepAgent 执行失败", zap.Error(runErr)) errMsg := "执行失败: " + runErr.Error() if prep.AssistantMessageID != "" { diff --git a/internal/handler/task_manager.go b/internal/handler/task_manager.go index 33e40432..55a0910d 100644 --- a/internal/handler/task_manager.go +++ b/internal/handler/task_manager.go @@ -17,6 +17,15 @@ var ErrUserInterruptContinue = errors.New("user interrupt with continue") // ErrTaskAlreadyRunning 会话已有任务正在执行 var ErrTaskAlreadyRunning = errors.New("agent task already running for conversation") +// shouldPersistEinoAgentTraceAfterRunError:Eino 相关 Run 非成功返回时,是否仍写入 last_react_* 供下轮 loadHistoryFromAgentTrace。 +// 用户主动停止(WithCancelCause(ErrTaskCancelled))时不写入半截轨迹,避免下一轮 Plan-Execute 等因损坏/不完整 tool 上下文出现 no tool call 等异常。 +func shouldPersistEinoAgentTraceAfterRunError(baseCtx context.Context) bool { + if baseCtx == nil { + return true + } + return !errors.Is(context.Cause(baseCtx), ErrTaskCancelled) +} + // AgentTask 描述正在运行的Agent任务 type AgentTask struct { ConversationID string `json:"conversationId"`