From c000fe51955dca67bbca3c0aefde832e954bd169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 19 Jun 2026 01:39:53 +0800 Subject: [PATCH] Add files via upload --- internal/app/app.go | 1 + internal/handler/agent.go | 63 +++++++++ internal/handler/batch_task_manager.go | 169 +++++++++++++++++++++++-- internal/handler/conversation.go | 5 +- internal/handler/robot.go | 2 +- 5 files changed, 229 insertions(+), 11 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 8b58820b..9467cb4c 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -829,6 +829,7 @@ func setupRoutes( protected.PUT("/batch-tasks/:queueId/schedule-enabled", agentHandler.SetBatchQueueScheduleEnabled) protected.DELETE("/batch-tasks/:queueId", agentHandler.DeleteBatchQueue) protected.PUT("/batch-tasks/:queueId/tasks/:taskId", agentHandler.UpdateBatchTask) + protected.POST("/batch-tasks/:queueId/tasks/:taskId/run", agentHandler.RunSingleBatchTask) protected.POST("/batch-tasks/:queueId/tasks", agentHandler.AddBatchTask) protected.DELETE("/batch-tasks/:queueId/tasks/:taskId", agentHandler.DeleteBatchTask) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 07dcdba0..7bd3c88c 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1678,6 +1678,7 @@ func (h *AgentHandler) ListBatchQueues(c *gin.Context) { // StartBatchQueue 开始执行批量任务队列 func (h *AgentHandler) StartBatchQueue(c *gin.Context) { queueID := c.Param("queueId") + h.batchTaskManager.ClearSingleRunTask(queueID) ok, err := h.startBatchQueueExecution(queueID, false) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -1709,6 +1710,7 @@ func (h *AgentHandler) RerunBatchQueue(c *gin.Context) { c.JSON(http.StatusInternalServerError, gin.H{"error": "重置队列失败"}) return } + h.batchTaskManager.ClearSingleRunTask(queueID) ok, err := h.startBatchQueueExecution(queueID, false) if err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) @@ -1908,6 +1910,53 @@ func (h *AgentHandler) AddBatchTask(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "任务已添加", "task": task, "queue": queue}) } +// RunSingleBatchTask 单条执行指定子任务(可覆盖已成功项),完成后暂停队列 +func (h *AgentHandler) RunSingleBatchTask(c *gin.Context) { + queueID := c.Param("queueId") + taskID := c.Param("taskId") + + if err := h.batchTaskManager.PrepareSingleTaskRun(queueID, taskID); err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + h.batchTaskManager.SetSingleRunTask(queueID, taskID) + + // 暂停态单条执行:旧批量协程可能仍占用执行槽,先回收以便重新启动 + if queue, ok := h.batchTaskManager.GetBatchQueue(queueID); ok && queue.Status == BatchQueueStatusPaused { + h.forceUnmarkBatchQueueRunning(queueID) + } + + autoStarted := true + autoStartMsg := "已开始单条执行" + ok, startErr := h.startBatchQueueExecution(queueID, false) + if startErr != nil { + h.batchTaskManager.ClearSingleRunTask(queueID) + autoStarted = false + autoStartMsg = "任务已准备就绪,但自动启动失败: " + startErr.Error() + } else if !ok { + h.batchTaskManager.ClearSingleRunTask(queueID) + autoStarted = false + autoStartMsg = "任务已准备就绪,但队列不存在" + } + + queue, exists := h.batchTaskManager.GetBatchQueue(queueID) + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "队列不存在"}) + return + } + if h.audit != nil { + h.audit.RecordOK(c, "task", "run_single_batch_task", "单条执行批量子任务", "batch_task", taskID, map[string]interface{}{ + "batch_queue_id": queueID, + "auto_started": autoStarted, + }) + } + c.JSON(http.StatusOK, gin.H{ + "message": autoStartMsg, + "queue": queue, + "autoStarted": autoStarted, + }) +} + // DeleteBatchTask 删除批量任务 func (h *AgentHandler) DeleteBatchTask(c *gin.Context) { queueID := c.Param("queueId") @@ -1949,6 +1998,10 @@ func (h *AgentHandler) unmarkBatchQueueRunning(queueID string) { delete(h.batchRunning, queueID) } +func (h *AgentHandler) forceUnmarkBatchQueueRunning(queueID string) { + h.unmarkBatchQueueRunning(queueID) +} + func (h *AgentHandler) nextBatchQueueRunAt(cronExpr string, from time.Time) (*time.Time, error) { expr := strings.TrimSpace(cronExpr) if expr == "" { @@ -2096,6 +2149,10 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { h.logger.Error("创建对话失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(err)) h.batchTaskManager.UpdateTaskStatus(queueID, task.ID, "failed", "", "创建对话失败: "+err.Error()) h.batchTaskManager.MoveToNextTask(queueID) + if h.batchTaskManager.TakeSingleRunTaskIfMatch(queueID, task.ID) { + h.batchTaskManager.UpdateQueueStatus(queueID, "paused") + break + } continue } conversationID = conv.ID @@ -2352,6 +2409,12 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { // 移动到下一个任务 h.batchTaskManager.MoveToNextTask(queueID) + if h.batchTaskManager.TakeSingleRunTaskIfMatch(queueID, task.ID) { + h.batchTaskManager.UpdateQueueStatus(queueID, "paused") + h.logger.Info("单条执行完成,队列已暂停", zap.String("queueId", queueID), zap.String("taskId", task.ID)) + break + } + // 检查是否被取消或暂停 queue, _ = h.batchTaskManager.GetBatchQueue(queueID) if queue.Status == "cancelled" || queue.Status == "paused" { diff --git a/internal/handler/batch_task_manager.go b/internal/handler/batch_task_manager.go index 5bdd2018..9a53d20b 100644 --- a/internal/handler/batch_task_manager.go +++ b/internal/handler/batch_task_manager.go @@ -77,11 +77,12 @@ type BatchTaskQueue struct { // BatchTaskManager 批量任务管理器 type BatchTaskManager struct { - db *database.DB - logger *zap.Logger - queues map[string]*BatchTaskQueue - taskCancels map[string]context.CancelFunc // 存储每个队列当前任务的取消函数 - mu sync.RWMutex + db *database.DB + logger *zap.Logger + queues map[string]*BatchTaskQueue + taskCancels map[string]context.CancelFunc // 存储每个队列当前任务的取消函数 + singleRunTasks map[string]string // queueID -> taskID,单条执行完成后暂停队列 + mu sync.RWMutex } // NewBatchTaskManager 创建批量任务管理器 @@ -90,9 +91,10 @@ func NewBatchTaskManager(logger *zap.Logger) *BatchTaskManager { logger = zap.NewNop() } return &BatchTaskManager{ - logger: logger, - queues: make(map[string]*BatchTaskQueue), - taskCancels: make(map[string]context.CancelFunc), + logger: logger, + queues: make(map[string]*BatchTaskQueue), + taskCancels: make(map[string]context.CancelFunc), + singleRunTasks: make(map[string]string), } } @@ -864,6 +866,138 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask, return task, nil } +// PrepareSingleTaskRun 准备单条执行:重置目标任务(若已有结果)并定位队列索引 +func (m *BatchTaskManager) PrepareSingleTaskRun(queueID, taskID string) error { + var cancelFunc context.CancelFunc + var siblingRunningIDs []string + + m.mu.Lock() + queue, exists := m.queues[queueID] + if !exists { + m.mu.Unlock() + return fmt.Errorf("队列不存在") + } + + var task *BatchTask + taskIndex := -1 + for i, t := range queue.Tasks { + if t.ID == taskID { + taskIndex = i + task = t + break + } + } + if task == nil { + m.mu.Unlock() + return fmt.Errorf("任务不存在") + } + + if !queueAllowsSingleTaskRunLocked(queue, task) { + m.mu.Unlock() + return fmt.Errorf("队列正在执行或未就绪,无法单条执行") + } + + // 暂停态:中止在途子任务并收口仍标记 running 的其它子任务,以便单条执行非冲突项 + if queue.Status == BatchQueueStatusPaused { + if c, ok := m.taskCancels[queueID]; ok { + cancelFunc = c + delete(m.taskCancels, queueID) + } + for _, t := range queue.Tasks { + if t != nil && t.ID != taskID && t.Status == BatchTaskStatusRunning { + siblingRunningIDs = append(siblingRunningIDs, t.ID) + } + } + } + + needsReset := task.Status != BatchTaskStatusPending + resumeQueue := queue.Status == BatchQueueStatusCompleted || queue.Status == BatchQueueStatusCancelled + m.mu.Unlock() + + if cancelFunc != nil { + cancelFunc() + } + const staleRunMsg = "为单条执行其它任务,已中止" + for _, sid := range siblingRunningIDs { + m.UpdateTaskStatus(queueID, sid, BatchTaskStatusCancelled, "", staleRunMsg) + } + + m.mu.Lock() + defer m.mu.Unlock() + + queue, exists = m.queues[queueID] + if !exists { + return fmt.Errorf("队列不存在") + } + + task = nil + taskIndex = -1 + for i, t := range queue.Tasks { + if t.ID == taskID { + taskIndex = i + task = t + break + } + } + if task == nil { + return fmt.Errorf("任务不存在") + } + + if m.db != nil { + if err := m.db.PrepareBatchSingleTaskRun(queueID, taskID, taskIndex, needsReset, resumeQueue); err != nil { + return fmt.Errorf("准备单条执行失败: %w", err) + } + } + + if needsReset { + task.Status = BatchTaskStatusPending + task.ConversationID = "" + task.StartedAt = nil + task.CompletedAt = nil + task.Error = "" + task.Result = "" + } + queue.CurrentIndex = taskIndex + queue.LastRunError = "" + if resumeQueue { + queue.Status = BatchQueueStatusPaused + queue.CompletedAt = nil + } + + return nil +} + +// SetSingleRunTask 标记队列仅执行指定子任务,完成后自动暂停 +func (m *BatchTaskManager) SetSingleRunTask(queueID, taskID string) { + m.mu.Lock() + defer m.mu.Unlock() + if m.singleRunTasks == nil { + m.singleRunTasks = make(map[string]string) + } + m.singleRunTasks[queueID] = taskID +} + +// ClearSingleRunTask 清除单条执行标记 +func (m *BatchTaskManager) ClearSingleRunTask(queueID string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.singleRunTasks, queueID) +} + +// TakeSingleRunTaskIfMatch 若刚完成的子任务为单条执行目标,则清除标记并返回 true +func (m *BatchTaskManager) TakeSingleRunTaskIfMatch(queueID, taskID string) bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.singleRunTasks == nil { + return false + } + if m.singleRunTasks[queueID] != taskID { + return false + } + delete(m.singleRunTasks, queueID) + return true +} + // DeleteTask 删除任务(队列空闲时可删;执行中任务不可删) func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error { m.mu.Lock() @@ -936,6 +1070,25 @@ func queueAllowsTaskListMutationLocked(queue *BatchTaskQueue) bool { } } +// queueAllowsSingleTaskRunLocked 是否允许对指定子任务发起单条执行(必须在持有 BatchTaskManager.mu 下调用) +func queueAllowsSingleTaskRunLocked(queue *BatchTaskQueue, task *BatchTask) bool { + if queue == nil || task == nil { + return false + } + if task.Status == BatchTaskStatusRunning { + return false + } + if queue.Status == BatchQueueStatusRunning { + return false + } + switch queue.Status { + case BatchQueueStatusPending, BatchQueueStatusPaused, BatchQueueStatusCompleted, BatchQueueStatusCancelled: + return true + default: + return false + } +} + // GetNextTask 获取下一个待执行的任务 func (m *BatchTaskManager) GetNextTask(queueID string) (*BatchTask, bool) { m.mu.Lock() diff --git a/internal/handler/conversation.go b/internal/handler/conversation.go index 82215096..a4b0c82e 100644 --- a/internal/handler/conversation.go +++ b/internal/handler/conversation.go @@ -105,17 +105,18 @@ func (h *ConversationHandler) ListConversations(c *gin.Context) { excludeGrouped := strings.TrimSpace(search) == "" && (c.Query("exclude_grouped") == "true" || c.Query("exclude_grouped") == "1") + sortBy := strings.TrimSpace(c.Query("sort_by")) var conversations []*database.Conversation var total int var err error if excludeGrouped { - conversations, err = h.db.ListUngroupedConversations(limit, offset) + conversations, err = h.db.ListUngroupedConversations(limit, offset, sortBy) if err == nil { total, err = h.db.CountUngroupedConversations() } } else { - conversations, err = h.db.ListConversations(limit, offset, search) + conversations, err = h.db.ListConversations(limit, offset, search, sortBy) if err == nil { total, err = h.db.CountConversations(search) } diff --git a/internal/handler/robot.go b/internal/handler/robot.go index ca332869..4a1144cb 100644 --- a/internal/handler/robot.go +++ b/internal/handler/robot.go @@ -447,7 +447,7 @@ func (h *RobotHandler) cmdUnbindProject(platform, userID string) string { } func (h *RobotHandler) cmdList() string { - convs, err := h.db.ListConversations(50, 0, "") + convs, err := h.db.ListConversations(50, 0, "", "") if err != nil { return "获取对话列表失败: " + err.Error() }