From 0fe39fb98a7cd7937de056f19ca28e68f3a67c7a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 17 Apr 2026 18:03:55 +0800 Subject: [PATCH] Add files via upload --- internal/app/app.go | 1 + internal/database/batch_task.go | 8 ++-- internal/handler/agent.go | 37 +++++++++++++++-- internal/handler/batch_task_manager.go | 15 +++++-- internal/handler/batch_task_mcp.go | 55 ++++++++++++++++++++++++-- internal/mcp/builtin/constants.go | 3 ++ 6 files changed, 103 insertions(+), 16 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 46f9e04b..28e79f75 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -658,6 +658,7 @@ func setupRoutes( protected.GET("/batch-tasks", agentHandler.ListBatchQueues) protected.GET("/batch-tasks/:queueId", agentHandler.GetBatchQueue) protected.POST("/batch-tasks/:queueId/start", agentHandler.StartBatchQueue) + protected.POST("/batch-tasks/:queueId/rerun", agentHandler.RerunBatchQueue) protected.POST("/batch-tasks/:queueId/pause", agentHandler.PauseBatchQueue) protected.PUT("/batch-tasks/:queueId/metadata", agentHandler.UpdateBatchQueueMetadata) protected.PUT("/batch-tasks/:queueId/schedule", agentHandler.UpdateBatchQueueSchedule) diff --git a/internal/database/batch_task.go b/internal/database/batch_task.go index 2a331617..c774be65 100644 --- a/internal/database/batch_task.go +++ b/internal/database/batch_task.go @@ -352,11 +352,11 @@ func (db *DB) UpdateBatchQueueCurrentIndex(queueID string, currentIndex int) err return nil } -// UpdateBatchQueueMetadata 更新批量任务队列标题和角色 -func (db *DB) UpdateBatchQueueMetadata(queueID, title, role string) error { +// UpdateBatchQueueMetadata 更新批量任务队列标题、角色和代理模式 +func (db *DB) UpdateBatchQueueMetadata(queueID, title, role, agentMode string) error { _, err := db.Exec( - "UPDATE batch_task_queues SET title = ?, role = ? WHERE id = ?", - title, role, queueID, + "UPDATE batch_task_queues SET title = ?, role = ?, agent_mode = ? WHERE id = ?", + title, role, agentMode, queueID, ) if err != nil { return fmt.Errorf("更新批量任务队列元数据失败: %w", err) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 9885b3cd..832e218d 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1781,6 +1781,34 @@ func (h *AgentHandler) StartBatchQueue(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "批量任务已开始执行", "queueId": queueID}) } +// RerunBatchQueue 重跑批量任务队列(重置所有子任务后重新执行) +func (h *AgentHandler) RerunBatchQueue(c *gin.Context) { + queueID := c.Param("queueId") + queue, exists := h.batchTaskManager.GetBatchQueue(queueID) + if !exists { + c.JSON(http.StatusNotFound, gin.H{"error": "队列不存在"}) + return + } + if queue.Status != "completed" && queue.Status != "cancelled" { + c.JSON(http.StatusBadRequest, gin.H{"error": "仅已完成或已取消的队列可以重跑"}) + return + } + if !h.batchTaskManager.ResetQueueForRerun(queueID) { + c.JSON(http.StatusInternalServerError, gin.H{"error": "重置队列失败"}) + return + } + ok, err := h.startBatchQueueExecution(queueID, false) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + return + } + if !ok { + c.JSON(http.StatusInternalServerError, gin.H{"error": "启动失败"}) + return + } + c.JSON(http.StatusOK, gin.H{"message": "批量任务已重新开始执行", "queueId": queueID}) +} + // PauseBatchQueue 暂停批量任务队列 func (h *AgentHandler) PauseBatchQueue(c *gin.Context) { queueID := c.Param("queueId") @@ -1792,18 +1820,19 @@ func (h *AgentHandler) PauseBatchQueue(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"message": "批量任务已暂停"}) } -// UpdateBatchQueueMetadata 修改批量任务队列的标题和角色 +// UpdateBatchQueueMetadata 修改批量任务队列的标题、角色和代理模式 func (h *AgentHandler) UpdateBatchQueueMetadata(c *gin.Context) { queueID := c.Param("queueId") var req struct { - Title string `json:"title"` - Role string `json:"role"` + Title string `json:"title"` + Role string `json:"role"` + AgentMode string `json:"agentMode"` } if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } - if err := h.batchTaskManager.UpdateQueueMetadata(queueID, req.Title, req.Role); err != nil { + if err := h.batchTaskManager.UpdateQueueMetadata(queueID, req.Title, req.Role, req.AgentMode); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } diff --git a/internal/handler/batch_task_manager.go b/internal/handler/batch_task_manager.go index f6353dc7..aef4c9e5 100644 --- a/internal/handler/batch_task_manager.go +++ b/internal/handler/batch_task_manager.go @@ -645,15 +645,14 @@ func (m *BatchTaskManager) UpdateQueueSchedule(queueID, scheduleMode, cronExpr s } } -// UpdateQueueMetadata 更新队列标题和角色(非 running 时可用) -func (m *BatchTaskManager) UpdateQueueMetadata(queueID, title, role string) error { +// UpdateQueueMetadata 更新队列标题、角色和代理模式(非 running 时可用) +func (m *BatchTaskManager) UpdateQueueMetadata(queueID, title, role, agentMode string) error { if utf8.RuneCountInString(title) > MaxBatchQueueTitleLen { return fmt.Errorf("标题不能超过 %d 个字符", MaxBatchQueueTitleLen) } if utf8.RuneCountInString(role) > MaxBatchQueueRoleLen { return fmt.Errorf("角色名不能超过 %d 个字符", MaxBatchQueueRoleLen) } - m.mu.Lock() defer m.mu.Unlock() @@ -665,11 +664,19 @@ func (m *BatchTaskManager) UpdateQueueMetadata(queueID, title, role string) erro return fmt.Errorf("队列正在运行中,无法修改") } + // 如果未传 agentMode,保留原值 + if strings.TrimSpace(agentMode) != "" { + agentMode = normalizeBatchQueueAgentMode(agentMode) + } else { + agentMode = queue.AgentMode + } + queue.Title = title queue.Role = role + queue.AgentMode = agentMode if m.db != nil { - if err := m.db.UpdateBatchQueueMetadata(queueID, title, role); err != nil { + if err := m.db.UpdateBatchQueueMetadata(queueID, title, role, agentMode); err != nil { m.logger.Warn("batch queue DB metadata update failed", zap.String("queueId", queueID), zap.Error(err)) } } diff --git a/internal/handler/batch_task_mcp.go b/internal/handler/batch_task_mcp.go index 6d093fb1..72ae8457 100644 --- a/internal/handler/batch_task_mcp.go +++ b/internal/handler/batch_task_mcp.go @@ -263,6 +263,47 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule return batchMCPTextResult("已提交启动,队列将开始执行。", false), nil }) + // --- rerun (reset + start for completed/cancelled queues) --- + reg(mcp.Tool{ + Name: builtin.ToolBatchTaskRerun, + Description: "重跑已完成或已取消的批量任务队列。会重置所有子任务状态后重新执行一轮。", + ShortDescription: "重跑批量任务队列", + InputSchema: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "queue_id": map[string]interface{}{ + "type": "string", + "description": "队列 ID", + }, + }, + "required": []string{"queue_id"}, + }, + }, func(ctx context.Context, args map[string]interface{}) (*mcp.ToolResult, error) { + qid := mcpArgString(args, "queue_id") + if qid == "" { + return batchMCPTextResult("queue_id 不能为空", true), nil + } + queue, exists := h.batchTaskManager.GetBatchQueue(qid) + if !exists { + return batchMCPTextResult("队列不存在: "+qid, true), nil + } + if queue.Status != "completed" && queue.Status != "cancelled" { + return batchMCPTextResult("仅已完成或已取消的队列可以重跑,当前状态: "+queue.Status, true), nil + } + if !h.batchTaskManager.ResetQueueForRerun(qid) { + return batchMCPTextResult("重置队列失败", true), nil + } + ok, err := h.startBatchQueueExecution(qid, false) + if !ok { + return batchMCPTextResult("启动失败", true), nil + } + if err != nil { + return batchMCPTextResult("启动失败: "+err.Error(), true), nil + } + logger.Info("MCP batch_task_rerun", zap.String("queueId", qid)) + return batchMCPTextResult("已重置并重新启动队列。", false), nil + }) + // --- pause --- reg(mcp.Tool{ Name: builtin.ToolBatchTaskPause, @@ -317,11 +358,11 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule return batchMCPTextResult("队列已删除。", false), nil }) - // --- update metadata (title/role) --- + // --- update metadata (title/role/agentMode) --- reg(mcp.Tool{ Name: builtin.ToolBatchTaskUpdateMetadata, - Description: "修改批量任务队列的标题和角色。仅在队列非 running 状态下可修改。", - ShortDescription: "修改批量任务队列标题/角色", + Description: "修改批量任务队列的标题、角色和代理模式。仅在队列非 running 状态下可修改。", + ShortDescription: "修改批量任务队列标题/角色/代理模式", InputSchema: map[string]interface{}{ "type": "object", "properties": map[string]interface{}{ @@ -337,6 +378,11 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule "type": "string", "description": "新角色名(空字符串使用默认角色)", }, + "agent_mode": map[string]interface{}{ + "type": "string", + "description": "代理模式:single(单代理 ReAct)或 multi(多代理)", + "enum": []string{"single", "multi"}, + }, }, "required": []string{"queue_id"}, }, @@ -347,7 +393,8 @@ agent_mode: single(默认)或 multi(需系统启用多代理)。schedule } title := mcpArgString(args, "title") role := mcpArgString(args, "role") - if err := h.batchTaskManager.UpdateQueueMetadata(qid, title, role); err != nil { + agentMode := mcpArgString(args, "agent_mode") + if err := h.batchTaskManager.UpdateQueueMetadata(qid, title, role, agentMode); err != nil { return batchMCPTextResult(err.Error(), true), nil } updated, _ := h.batchTaskManager.GetBatchQueue(qid) diff --git a/internal/mcp/builtin/constants.go b/internal/mcp/builtin/constants.go index ef84c246..94a3da92 100644 --- a/internal/mcp/builtin/constants.go +++ b/internal/mcp/builtin/constants.go @@ -32,6 +32,7 @@ const ( ToolBatchTaskGet = "batch_task_get" ToolBatchTaskCreate = "batch_task_create" ToolBatchTaskStart = "batch_task_start" + ToolBatchTaskRerun = "batch_task_rerun" ToolBatchTaskPause = "batch_task_pause" ToolBatchTaskDelete = "batch_task_delete" ToolBatchTaskUpdateMetadata = "batch_task_update_metadata" @@ -63,6 +64,7 @@ func IsBuiltinTool(toolName string) bool { ToolBatchTaskGet, ToolBatchTaskCreate, ToolBatchTaskStart, + ToolBatchTaskRerun, ToolBatchTaskPause, ToolBatchTaskDelete, ToolBatchTaskUpdateMetadata, @@ -98,6 +100,7 @@ func GetAllBuiltinTools() []string { ToolBatchTaskGet, ToolBatchTaskCreate, ToolBatchTaskStart, + ToolBatchTaskRerun, ToolBatchTaskPause, ToolBatchTaskDelete, ToolBatchTaskUpdateMetadata,