Add files via upload

This commit is contained in:
公明
2026-04-15 00:13:09 +08:00
committed by GitHub
parent dda4edb952
commit 62241e0e66
4 changed files with 121 additions and 26 deletions
+18
View File
@@ -1589,6 +1589,7 @@ type BatchTaskRequest struct {
AgentMode string `json:"agentMode,omitempty"` // single | multi
ScheduleMode string `json:"scheduleMode,omitempty"` // manual | cron
CronExpr string `json:"cronExpr,omitempty"` // scheduleMode=cron 时必填
ExecuteNow bool `json:"executeNow,omitempty"` // 创建后是否立即执行(默认 false)
}
func normalizeBatchQueueAgentMode(mode string) string {
@@ -1650,9 +1651,26 @@ func (h *AgentHandler) CreateBatchQueue(c *gin.Context) {
}
queue := h.batchTaskManager.CreateBatchQueue(req.Title, req.Role, agentMode, scheduleMode, cronExpr, nextRunAt, validTasks)
started := false
if req.ExecuteNow {
ok, err := h.startBatchQueueExecution(queue.ID, false)
if !ok {
c.JSON(http.StatusNotFound, gin.H{"error": "队列不存在"})
return
}
if err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error(), "queueId": queue.ID})
return
}
started = true
if refreshed, exists := h.batchTaskManager.GetBatchQueue(queue.ID); exists {
queue = refreshed
}
}
c.JSON(http.StatusOK, gin.H{
"queueId": queue.ID,
"queue": queue,
"started": started,
})
}
+44 -18
View File
@@ -678,7 +678,7 @@ func (m *BatchTaskManager) ResetQueueForRerun(queueID string) bool {
return true
}
// UpdateTaskMessage 更新任务消息(仅限待执行状态
// UpdateTaskMessage 更新任务消息(队列空闲时可改;任务需非 running
func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) error {
m.mu.Lock()
defer m.mu.Unlock()
@@ -688,17 +688,15 @@ func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) er
return fmt.Errorf("队列不存在")
}
// 检查队列状态,只有待执行状态的队列才能编辑任务
if queue.Status != "pending" {
return fmt.Errorf("只有待执行状态的队列才能编辑任务")
if !queueAllowsTaskListMutationLocked(queue) {
return fmt.Errorf("队列正在执行或未就绪,无法编辑任务")
}
// 查找并更新任务
for _, task := range queue.Tasks {
if task.ID == taskID {
// 只有待执行状态的任务才能编辑
if task.Status != "pending" {
return fmt.Errorf("只有待执行状态的任务才能编辑")
if task.Status == "running" {
return fmt.Errorf("执行中的任务不能编辑")
}
task.Message = message
@@ -715,7 +713,7 @@ func (m *BatchTaskManager) UpdateTaskMessage(queueID, taskID, message string) er
return fmt.Errorf("任务不存在")
}
// AddTaskToQueue 添加任务到队列(仅限待执行状态
// AddTaskToQueue 添加任务到队列(队列空闲时可添加:含 cron 本轮 completed、手动暂停后等
func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask, error) {
m.mu.Lock()
defer m.mu.Unlock()
@@ -725,9 +723,8 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask,
return nil, fmt.Errorf("队列不存在")
}
// 检查队列状态,只有待执行状态的队列才能添加任务
if queue.Status != "pending" {
return nil, fmt.Errorf("只有待执行状态的队列才能添加任务")
if !queueAllowsTaskListMutationLocked(queue) {
return nil, fmt.Errorf("队列正在执行或未就绪,无法添加任务")
}
if message == "" {
@@ -757,7 +754,7 @@ func (m *BatchTaskManager) AddTaskToQueue(queueID, message string) (*BatchTask,
return task, nil
}
// DeleteTask 删除任务(仅限待执行状态
// DeleteTask 删除任务(队列空闲时可删;执行中任务不可删
func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error {
m.mu.Lock()
defer m.mu.Unlock()
@@ -767,18 +764,16 @@ func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error {
return fmt.Errorf("队列不存在")
}
// 检查队列状态,只有待执行状态的队列才能删除任务
if queue.Status != "pending" {
return fmt.Errorf("只有待执行状态的队列才能删除任务")
if !queueAllowsTaskListMutationLocked(queue) {
return fmt.Errorf("队列正在执行或未就绪,无法删除任务")
}
// 查找并删除任务
taskIndex := -1
for i, task := range queue.Tasks {
if task.ID == taskID {
// 只有待执行状态的任务才能删除
if task.Status != "pending" {
return fmt.Errorf("只有待执行状态的任务才能删除")
if task.Status == "running" {
return fmt.Errorf("执行中的任务不能删除")
}
taskIndex = i
break
@@ -804,6 +799,37 @@ func (m *BatchTaskManager) DeleteTask(queueID, taskID string) error {
return nil
}
func queueHasRunningTaskLocked(queue *BatchTaskQueue) bool {
if queue == nil {
return false
}
for _, t := range queue.Tasks {
if t != nil && t.Status == "running" {
return true
}
}
return false
}
// queueAllowsTaskListMutationLocked 是否允许增删改子任务文案/列表(必须在持有 BatchTaskManager.mu 下调用)
func queueAllowsTaskListMutationLocked(queue *BatchTaskQueue) bool {
if queue == nil {
return false
}
if queue.Status == "running" {
return false
}
if queueHasRunningTaskLocked(queue) {
return false
}
switch queue.Status {
case "pending", "paused", "completed", "cancelled":
return true
default:
return false
}
}
// GetNextTask 获取下一个待执行的任务
func (m *BatchTaskManager) GetNextTask(queueID string) (*BatchTask, bool) {
m.mu.RLock()
+34 -5
View File
@@ -120,8 +120,8 @@ func RegisterBatchTaskMCPTools(mcpServer *mcp.Server, h *AgentHandler, logger *z
Name: builtin.ToolBatchTaskCreate,
Description: `创建新的批量任务队列任务列表使用 tasks字符串数组 tasks_text多行每行一条
agent_mode: single默认 multi需系统启用多代理schedule_mode: manual默认 cron cron 时必须提供 cron_expr "0 */6 * * *"
重要创建成功后队列处于 pending不会自动开始跑子任务若要立即执行或手工开跑必须再调用工具 batch_task_start传入返回的 queue_idCron 队列若需按表达式自动触发下一轮还需保持调度开关开启可用 batch_task_schedule_enabled`,
ShortDescription: "创建批量任务队列(创建后需 batch_task_start 才会执行)",
默认创建后不会立即执行可通过 execute_now=true 在创建后立即启动也可后续调用 batch_task_start 手工启动Cron 队列若需按表达式自动触发下一轮还需保持调度开关开启可用 batch_task_schedule_enabled`,
ShortDescription: "创建批量任务队列(可选立即执行)",
InputSchema: map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
@@ -156,6 +156,10 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule
"type": "string",
"description": "schedule_mode 为 cron 时必填",
},
"execute_now": map[string]interface{}{
"type": "boolean",
"description": "是否创建后立即执行,默认 false",
},
},
},
}, func(ctx context.Context, args map[string]interface{}) (*mcp.ToolResult, error) {
@@ -180,12 +184,37 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule
n := sch.Next(time.Now())
nextRunAt = &n
}
executeNow, ok := mcpArgBool(args, "execute_now")
if !ok {
executeNow = false
}
queue := h.batchTaskManager.CreateBatchQueue(title, role, agentMode, scheduleMode, cronExpr, nextRunAt, tasks)
started := false
if executeNow {
ok, err := h.startBatchQueueExecution(queue.ID, false)
if !ok {
return batchMCPTextResult("队列不存在: "+queue.ID, true), nil
}
if err != nil {
return batchMCPTextResult("创建成功但启动失败: "+err.Error(), true), nil
}
started = true
if refreshed, exists := h.batchTaskManager.GetBatchQueue(queue.ID); exists {
queue = refreshed
}
}
logger.Info("MCP batch_task_create", zap.String("queueId", queue.ID), zap.Int("taskCount", len(tasks)))
return batchMCPJSONResult(map[string]interface{}{
"queue_id": queue.ID,
"queue": queue,
"reminder": "队列已创建,当前为 pending。需要开始执行时请调用 MCP工具 batch_task_startqueue_id 同上)。Cron 自动调度需 schedule_enabled 为 true,可用 batch_task_schedule_enabled。",
"queue_id": queue.ID,
"queue": queue,
"started": started,
"execute_now": executeNow,
"reminder": func() string {
if started {
return "队列已创建并立即启动。"
}
return "队列已创建,当前为 pending。需要开始执行时请调用 MCP 工具 batch_task_startqueue_id 同上)。Cron 自动调度需 schedule_enabled 为 true,可用 batch_task_schedule_enabled。"
}(),
})
})
+25 -3
View File
@@ -403,6 +403,24 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"type": "string",
"description": "角色名称(可选)",
},
"agentMode": map[string]interface{}{
"type": "string",
"description": "代理模式(single | multi",
"enum": []string{"single", "multi"},
},
"scheduleMode": map[string]interface{}{
"type": "string",
"description": "调度方式(manual | cron",
"enum": []string{"manual", "cron"},
},
"cronExpr": map[string]interface{}{
"type": "string",
"description": "Cron 表达式(scheduleMode=cron 时必填)",
},
"executeNow": map[string]interface{}{
"type": "boolean",
"description": "是否创建后立即执行(默认 false)",
},
},
},
"BatchQueue": map[string]interface{}{
@@ -1540,9 +1558,9 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"schema": map[string]interface{}{
"type": "object",
"properties": map[string]interface{}{
"message": map[string]interface{}{"type": "string"},
"conversationId": map[string]interface{}{"type": "string"},
"role": map[string]interface{}{"type": "string"},
"message": map[string]interface{}{"type": "string"},
"conversationId": map[string]interface{}{"type": "string"},
"role": map[string]interface{}{"type": "string"},
"webshellConnectionId": map[string]interface{}{"type": "string"},
},
"required": []string{"message"},
@@ -1711,6 +1729,10 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"queue": map[string]interface{}{
"$ref": "#/components/schemas/BatchQueue",
},
"started": map[string]interface{}{
"type": "boolean",
"description": "是否已立即启动执行",
},
},
},
},