diff --git a/internal/handler/agent.go b/internal/handler/agent.go index e63e0c86..5a46e894 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1589,6 +1589,7 @@ type BatchTaskRequest struct { AgentMode string `json:"agentMode,omitempty"` // single | multi ScheduleMode string `json:"scheduleMode,omitempty"` // manual | cron CronExpr string `json:"cronExpr,omitempty"` // scheduleMode=cron 时必填 + ExecuteNow bool `json:"executeNow,omitempty"` // 创建后是否立即执行(默认 false) } func normalizeBatchQueueAgentMode(mode string) string { @@ -1650,9 +1651,26 @@ func (h *AgentHandler) CreateBatchQueue(c *gin.Context) { } queue := h.batchTaskManager.CreateBatchQueue(req.Title, req.Role, agentMode, scheduleMode, cronExpr, nextRunAt, validTasks) + started := false + if req.ExecuteNow { + ok, err := h.startBatchQueueExecution(queue.ID, false) + if !ok { + c.JSON(http.StatusNotFound, gin.H{"error": "队列不存在"}) + return + } + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error(), "queueId": queue.ID}) + return + } + started = true + if refreshed, exists := h.batchTaskManager.GetBatchQueue(queue.ID); exists { + queue = refreshed + } + } c.JSON(http.StatusOK, gin.H{ "queueId": queue.ID, "queue": queue, + "started": started, }) } diff --git a/internal/handler/batch_task_manager.go b/internal/handler/batch_task_manager.go index 8701476d..5f933e5b 100644 --- a/internal/handler/batch_task_manager.go +++ b/internal/handler/batch_task_manager.go @@ -678,7 +678,7 @@ func (m *BatchTaskManager) ResetQueueForRerun(queueID string) bool { return true } -// UpdateTaskMessage 更新任务消息(仅限待执行状态) +// UpdateTaskMessage 更新任务消息(队列空闲时可改;任务需非 running) func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) error { m.mu.Lock() defer m.mu.Unlock() @@ -688,17 +688,15 @@ func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) er return fmt.Errorf("队列不存在") } - // 检查队列状态,只有待执行状态的队列才能编辑任务 - if queue.Status != "pending" { - return fmt.Errorf("只有待执行状态的队列才能编辑任务") + if !queueAllowsTaskListMutationLocked(queue) { + return fmt.Errorf("队列正在执行或未就绪,无法编辑任务") } // 查找并更新任务 for _, task := range queue.Tasks { if task.ID == taskID { - // 只有待执行状态的任务才能编辑 - if task.Status != "pending" { - return fmt.Errorf("只有待执行状态的任务才能编辑") + if task.Status == "running" { + return fmt.Errorf("执行中的任务不能编辑") } task.Message = message @@ -715,7 +713,7 @@ func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) er return fmt.Errorf("任务不存在") } -// AddTaskToQueue 添加任务到队列(仅限待执行状态) +// AddTaskToQueue 添加任务到队列(队列空闲时可添加:含 cron 本轮 completed、手动暂停后等) func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask, error) { m.mu.Lock() defer m.mu.Unlock() @@ -725,9 +723,8 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask, return nil, fmt.Errorf("队列不存在") } - // 检查队列状态,只有待执行状态的队列才能添加任务 - if queue.Status != "pending" { - return nil, fmt.Errorf("只有待执行状态的队列才能添加任务") + if !queueAllowsTaskListMutationLocked(queue) { + return nil, fmt.Errorf("队列正在执行或未就绪,无法添加任务") } if message == "" { @@ -757,7 +754,7 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask, return task, nil } -// DeleteTask 删除任务(仅限待执行状态) +// DeleteTask 删除任务(队列空闲时可删;执行中任务不可删) func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error { m.mu.Lock() defer m.mu.Unlock() @@ -767,18 +764,16 @@ func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error { return fmt.Errorf("队列不存在") } - // 检查队列状态,只有待执行状态的队列才能删除任务 - if queue.Status != "pending" { - return fmt.Errorf("只有待执行状态的队列才能删除任务") + if !queueAllowsTaskListMutationLocked(queue) { + return fmt.Errorf("队列正在执行或未就绪,无法删除任务") } // 查找并删除任务 taskIndex := -1 for i, task := range queue.Tasks { if task.ID == taskID { - // 只有待执行状态的任务才能删除 - if task.Status != "pending" { - return fmt.Errorf("只有待执行状态的任务才能删除") + if task.Status == "running" { + return fmt.Errorf("执行中的任务不能删除") } taskIndex = i break @@ -804,6 +799,37 @@ func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error { return nil } +func queueHasRunningTaskLocked(queue *BatchTaskQueue) bool { + if queue == nil { + return false + } + for _, t := range queue.Tasks { + if t != nil && t.Status == "running" { + return true + } + } + return false +} + +// queueAllowsTaskListMutationLocked 是否允许增删改子任务文案/列表(必须在持有 BatchTaskManager.mu 下调用) +func queueAllowsTaskListMutationLocked(queue *BatchTaskQueue) bool { + if queue == nil { + return false + } + if queue.Status == "running" { + return false + } + if queueHasRunningTaskLocked(queue) { + return false + } + switch queue.Status { + case "pending", "paused", "completed", "cancelled": + return true + default: + return false + } +} + // GetNextTask 获取下一个待执行的任务 func (m *BatchTaskManager) GetNextTask(queueID string) (*BatchTask, bool) { m.mu.RLock() diff --git a/internal/handler/batch_task_mcp.go b/internal/handler/batch_task_mcp.go index 2463750a..c5078863 100644 --- a/internal/handler/batch_task_mcp.go +++ b/internal/handler/batch_task_mcp.go @@ -120,8 +120,8 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z Name: builtin.ToolBatchTaskCreate, Description: `创建新的批量任务队列。任务列表使用 tasks(字符串数组)或 tasks_text(多行,每行一条)。 agent_mode: single(默认)或 multi(需系统启用多代理)。schedule_mode: manual(默认)或 cron;为 cron 时必须提供 cron_expr(如 "0 */6 * * *")。 -重要:创建成功后队列处于 pending,不会自动开始跑子任务。若要立即执行或手工开跑,必须再调用工具 batch_task_start(传入返回的 queue_id)。Cron 队列若需按表达式自动触发下一轮,还需保持调度开关开启(可用 batch_task_schedule_enabled)。`, - ShortDescription: "创建批量任务队列(创建后需 batch_task_start 才会执行)", +默认创建后不会立即执行。可通过 execute_now=true 在创建后立即启动;也可后续调用 batch_task_start 手工启动。Cron 队列若需按表达式自动触发下一轮,还需保持调度开关开启(可用 batch_task_schedule_enabled)。`, + ShortDescription: "创建批量任务队列(可选立即执行)", InputSchema: map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ @@ -156,6 +156,10 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule "type": "string", "description": "schedule_mode 为 cron 时必填", }, + "execute_now": map[string]interface{}{ + "type": "boolean", + "description": "是否创建后立即执行,默认 false", + }, }, }, }, func(ctx context.Context, args map[string]interface{}) (*mcp.ToolResult, error) { @@ -180,12 +184,37 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule n := sch.Next(time.Now()) nextRunAt = &n } + executeNow, ok := mcpArgBool(args, "execute_now") + if !ok { + executeNow = false + } queue := h.batchTaskManager.CreateBatchQueue(title, role, agentMode, scheduleMode, cronExpr, nextRunAt, tasks) + started := false + if executeNow { + ok, err := h.startBatchQueueExecution(queue.ID, false) + if !ok { + return batchMCPTextResult("队列不存在: "+queue.ID, true), nil + } + if err != nil { + return batchMCPTextResult("创建成功但启动失败: "+err.Error(), true), nil + } + started = true + if refreshed, exists := h.batchTaskManager.GetBatchQueue(queue.ID); exists { + queue = refreshed + } + } logger.Info("MCP batch_task_create", zap.String("queueId", queue.ID), zap.Int("taskCount", len(tasks))) return batchMCPJSONResult(map[string]interface{}{ - "queue_id": queue.ID, - "queue": queue, - "reminder": "队列已创建,当前为 pending。需要开始执行时请调用 MCP工具 batch_task_start(queue_id 同上)。Cron 自动调度需 schedule_enabled 为 true,可用 batch_task_schedule_enabled。", + "queue_id": queue.ID, + "queue": queue, + "started": started, + "execute_now": executeNow, + "reminder": func() string { + if started { + return "队列已创建并立即启动。" + } + return "队列已创建,当前为 pending。需要开始执行时请调用 MCP 工具 batch_task_start(queue_id 同上)。Cron 自动调度需 schedule_enabled 为 true,可用 batch_task_schedule_enabled。" + }(), }) }) diff --git a/internal/handler/openapi.go b/internal/handler/openapi.go index 136df0d1..6245b34f 100644 --- a/internal/handler/openapi.go +++ b/internal/handler/openapi.go @@ -403,6 +403,24 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "type": "string", "description": "角色名称(可选)", }, + "agentMode": map[string]interface{}{ + "type": "string", + "description": "代理模式(single | multi)", + "enum": []string{"single", "multi"}, + }, + "scheduleMode": map[string]interface{}{ + "type": "string", + "description": "调度方式(manual | cron)", + "enum": []string{"manual", "cron"}, + }, + "cronExpr": map[string]interface{}{ + "type": "string", + "description": "Cron 表达式(scheduleMode=cron 时必填)", + }, + "executeNow": map[string]interface{}{ + "type": "boolean", + "description": "是否创建后立即执行(默认 false)", + }, }, }, "BatchQueue": map[string]interface{}{ @@ -1540,9 +1558,9 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "schema": map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ - "message": map[string]interface{}{"type": "string"}, - "conversationId": map[string]interface{}{"type": "string"}, - "role": map[string]interface{}{"type": "string"}, + "message": map[string]interface{}{"type": "string"}, + "conversationId": map[string]interface{}{"type": "string"}, + "role": map[string]interface{}{"type": "string"}, "webshellConnectionId": map[string]interface{}{"type": "string"}, }, "required": []string{"message"}, @@ -1711,6 +1729,10 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "queue": map[string]interface{}{ "$ref": "#/components/schemas/BatchQueue", }, + "started": map[string]interface{}{ + "type": "boolean", + "description": "是否已立即启动执行", + }, }, }, },