diff --git a/internal/app/app.go b/internal/app/app.go index cb875a2f..1901b1d5 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -368,6 +368,7 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error // 创建OpenAPI处理器 conversationHandler := handler.NewConversationHandler(db, log.Logger) conversationHandler.SetAudit(auditSvc) + conversationHandler.SetTaskStopper(agentHandler) auditHandler := handler.NewAuditHandler(db, auditSvc, log.Logger) robotHandler := handler.NewRobotHandler(cfg, db, agentHandler, log.Logger) openAPIHandler := handler.NewOpenAPIHandler(db, log.Logger, conversationHandler, agentHandler) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index f6e6eab8..f3fa731b 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -190,6 +190,21 @@ func (h *AgentHandler) SetAudit(s *audit.Service) { h.audit = s } +// CancelRunningTaskForConversation stops any in-flight agent work for the conversation (idempotent). +func (h *AgentHandler) CancelRunningTaskForConversation(conversationID string) { + if h == nil || conversationID == "" || h.tasks == nil { + return + } + if execID := h.tasks.ActiveMCPExecutionID(conversationID); execID != "" { + h.agent.CancelMCPToolExecutionWithNote(execID, "") + } + if ok, err := h.tasks.CancelTask(conversationID, ErrTaskCancelled); ok { + h.logger.Info("已取消会话运行中任务", zap.String("conversationId", conversationID)) + } else if err != nil { + h.logger.Warn("取消会话运行中任务失败", zap.String("conversationId", conversationID), zap.Error(err)) + } +} + // HitlToolWhitelistSaver 合并 HITL 免审批工具到全局配置并落盘 type HitlToolWhitelistSaver interface { MergeHitlToolWhitelistIntoConfig(add []string) error diff --git a/internal/handler/conversation.go b/internal/handler/conversation.go index a4b0c82e..4cc069db 100644 --- a/internal/handler/conversation.go +++ b/internal/handler/conversation.go @@ -12,11 +12,17 @@ import ( "go.uber.org/zap" ) +// ConversationTaskStopper cancels in-flight agent work when a conversation is removed. +type ConversationTaskStopper interface { + CancelRunningTaskForConversation(conversationID string) +} + // ConversationHandler 对话处理器 type ConversationHandler struct { - db *database.DB - logger *zap.Logger - audit *audit.Service + db *database.DB + logger *zap.Logger + audit *audit.Service + taskStopper ConversationTaskStopper } // SetAudit wires platform audit logging. @@ -24,6 +30,11 @@ func (h *ConversationHandler) SetAudit(s *audit.Service) { h.audit = s } +// SetTaskStopper wires cancellation of in-flight agent tasks on conversation delete. +func (h *ConversationHandler) SetTaskStopper(stopper ConversationTaskStopper) { + h.taskStopper = stopper +} + // NewConversationHandler 创建新的对话处理器 func NewConversationHandler(db *database.DB, logger *zap.Logger) *ConversationHandler { return &ConversationHandler{ @@ -245,6 +256,10 @@ func (h *ConversationHandler) UpdateConversation(c *gin.Context) { func (h *ConversationHandler) DeleteConversation(c *gin.Context) { id := c.Param("id") + if h.taskStopper != nil { + h.taskStopper.CancelRunningTaskForConversation(id) + } + if err := h.db.DeleteConversation(id); err != nil { h.logger.Error("删除对话失败", zap.Error(err)) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) diff --git a/internal/handler/conversation_delete_task_test.go b/internal/handler/conversation_delete_task_test.go new file mode 100644 index 00000000..39ef06c0 --- /dev/null +++ b/internal/handler/conversation_delete_task_test.go @@ -0,0 +1,30 @@ +package handler + +import ( + "context" + "testing" + "time" + + "go.uber.org/zap" +) + +func TestConversationHandlerDeleteConversationCancelsRunningTask(t *testing.T) { + tm := NewAgentTaskManager() + ctx, cancel := context.WithCancelCause(context.Background()) + _, err := tm.StartTask("conv-1", "hello", cancel) + if err != nil { + t.Fatalf("StartTask: %v", err) + } + + h := &AgentHandler{tasks: tm, logger: zap.NewNop()} + h.CancelRunningTaskForConversation("conv-1") + + select { + case <-ctx.Done(): + case <-time.After(2 * time.Second): + t.Fatal("task context was not cancelled") + } + if cause := context.Cause(ctx); cause != ErrTaskCancelled { + t.Fatalf("expected ErrTaskCancelled, got %v", cause) + } +} diff --git a/internal/handler/robot.go b/internal/handler/robot.go index 4a1144cb..0c301506 100644 --- a/internal/handler/robot.go +++ b/internal/handler/robot.go @@ -594,6 +594,9 @@ func (h *RobotHandler) cmdDelete(platform, userID, convID string) string { h.mu.Unlock() h.deleteSessionBinding(sk) } + if h.agentHandler != nil { + h.agentHandler.CancelRunningTaskForConversation(convID) + } if err := h.db.DeleteConversation(convID); err != nil { return "删除失败: " + err.Error() }