diff --git a/internal/handler/task_manager.go b/internal/handler/task_manager.go index ff059322..9964ad5c 100644 --- a/internal/handler/task_manager.go +++ b/internal/handler/task_manager.go @@ -19,6 +19,7 @@ type AgentTask struct { Message string `json:"message,omitempty"` StartedAt time.Time `json:"startedAt"` Status string `json:"status"` + CancellingAt time.Time `json:"-"` // 进入 cancelling 状态的时间,用于清理长时间卡住的任务 cancel func(error) } @@ -41,13 +42,61 @@ type AgentTaskManager struct { historyRetention time.Duration // 历史记录保留时间 } +const ( + // cancellingStuckThreshold 处于「取消中」超过此时长则强制从运行列表移除。正常取消会在当前步骤内返回, + // 超过则视为卡住,尽快释放会话。常见做法多为 30–60s 内释放。 + cancellingStuckThreshold = 45 * time.Second + // cancellingStuckThresholdLegacy 未记录 CancellingAt 时用 StartedAt 判断的兜底时长 + cancellingStuckThresholdLegacy = 2 * time.Minute + cleanupInterval = 15 * time.Second // 与上面阈值配合,最长约 60s 内移除 +) + // NewAgentTaskManager 创建任务管理器 func NewAgentTaskManager() *AgentTaskManager { - return &AgentTaskManager{ + m := &AgentTaskManager{ tasks: make(map[string]*AgentTask), completedTasks: make([]*CompletedTask, 0), - maxHistorySize: 50, // 最多保留50条历史记录 - historyRetention: 24 * time.Hour, // 保留24小时 + maxHistorySize: 50, // 最多保留50条历史记录 + historyRetention: 24 * time.Hour, // 保留24小时 + } + go m.runStuckCancellingCleanup() + return m +} + +// runStuckCancellingCleanup 定期将长时间处于「取消中」的任务强制结束,避免卡住无法发新消息 +func (m *AgentTaskManager) runStuckCancellingCleanup() { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + for range ticker.C { + m.cleanupStuckCancelling() + } +} + +func (m *AgentTaskManager) cleanupStuckCancelling() { + m.mu.Lock() + var toFinish []string + now := time.Now() + for id, task := range m.tasks { + if task.Status != "cancelling" { + continue + } + var elapsed time.Duration + if !task.CancellingAt.IsZero() { + elapsed = now.Sub(task.CancellingAt) + if elapsed < cancellingStuckThreshold { + continue + } + } else { + elapsed = now.Sub(task.StartedAt) + if elapsed < cancellingStuckThresholdLegacy { + continue + } + } + toFinish = append(toFinish, id) + } + m.mu.Unlock() + for _, id := range toFinish { + m.FinishTask(id, "cancelled") } } @@ -76,7 +125,7 @@ func (m *AgentTaskManager) StartTask(conversationID, message string, cancel cont return task, nil } -// CancelTask 取消指定会话的任务 +// CancelTask 取消指定会话的任务。若任务已在取消中,仍返回 (true, nil) 以便接口幂等、前端不报错。 func (m *AgentTaskManager) CancelTask(conversationID string, cause error) (bool, error) { m.mu.Lock() task, exists := m.tasks[conversationID] @@ -85,13 +134,14 @@ func (m *AgentTaskManager) CancelTask(conversationID string, cause error) (bool, return false, nil } - // 如果已经处于取消流程,直接返回 + // 如果已经处于取消流程,视为成功(幂等),避免前端重复点击报「未找到任务」 if task.Status == "cancelling" { m.mu.Unlock() - return false, nil + return true, nil } task.Status = "cancelling" + task.CancellingAt = time.Now() cancel := task.cancel m.mu.Unlock()