From 6d180c814d8bbba1d450230bf25c9ce454b08cc6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 7 May 2026 17:01:15 +0800 Subject: [PATCH] Add files via upload --- internal/app/app.go | 1 + internal/handler/agent.go | 28 ++++- internal/handler/eino_single_agent.go | 150 ++++++++++++++++------- internal/handler/monitor.go | 38 +++++- internal/handler/multi_agent.go | 156 +++++++++++++++++------- internal/handler/multi_agent_prepare.go | 62 ++++++++++ internal/handler/openapi.go | 57 +++++++++ internal/handler/task_manager.go | 50 ++++++++ 8 files changed, 450 insertions(+), 92 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 73185b21..a3b5f32f 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -757,6 +757,7 @@ func setupRoutes( // 监控 protected.GET("/monitor", monitorHandler.Monitor) protected.GET("/monitor/execution/:id", monitorHandler.GetExecution) + protected.POST("/monitor/execution/:id/cancel", monitorHandler.CancelExecution) protected.POST("/monitor/executions/names", monitorHandler.BatchGetToolNames) protected.DELETE("/monitor/execution/:id", monitorHandler.DeleteExecution) protected.DELETE("/monitor/executions", monitorHandler.DeleteExecutions) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index a94f4867..b799d912 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1717,6 +1717,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { var req struct { ConversationID string `json:"conversationId" binding:"required"` + Reason string `json:"reason,omitempty"` + ContinueAfter bool `json:"continueAfter,omitempty"` } if err := c.ShouldBindJSON(&req); err != nil { @@ -1724,7 +1726,23 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { return } - ok, err := h.tasks.CancelTask(req.ConversationID, ErrTaskCancelled) + if req.ContinueAfter && strings.TrimSpace(req.Reason) == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "continueAfter 为 true 时必须提供非空的 reason(中断说明)"}) + return + } + + var cause error = ErrTaskCancelled + msg := "已提交取消请求,任务将在当前步骤完成后停止。" + if req.ContinueAfter { + if !h.tasks.SetInterruptContinueReason(req.ConversationID, req.Reason) { + c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务,无法提交中断说明"}) + return + } + cause = ErrUserInterruptContinue + msg = "已提交中断说明,当前步骤结束后将写入对话并继续迭代。" + } + + ok, err := h.tasks.CancelTask(req.ConversationID, cause) if err != nil { h.logger.Error("取消任务失败", zap.Error(err)) c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) @@ -1737,9 +1755,11 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { } c.JSON(http.StatusOK, gin.H{ - "status": "cancelling", - "conversationId": req.ConversationID, - "message": "已提交取消请求,任务将在当前步骤完成后停止。", + "status": "cancelling", + "conversationId": req.ConversationID, + "message": msg, + "continueAfter": req.ContinueAfter, + "interruptWithNote": req.ContinueAfter, }) } diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index 978dbde9..58ac331f 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -43,8 +43,11 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { var sseWriteMu sync.Mutex var ssePublishConversationID string sendEvent := func(eventType, message string, data interface{}) { - if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) { - return + if eventType == "error" && baseCtx != nil { + cause := context.Cause(baseCtx) + if errors.Is(cause, ErrTaskCancelled) || errors.Is(cause, ErrUserInterruptContinue) { + return + } } ev := StreamEvent{Type: eventType, Message: message, Data: data} b, errMarshal := json.Marshal(ev) @@ -114,33 +117,10 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { } var cancelWithCause context.CancelCauseFunc - baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) - taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) - defer timeoutCancel() - defer cancelWithCause(nil) - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) - taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) { - return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) - }) - - if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { - var errorMsg string - if errors.Is(err, ErrTaskAlreadyRunning) { - errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。" - sendEvent("error", errorMsg, map[string]interface{}{ - "conversationId": conversationID, - "errorType": "task_already_running", - }) - } else { - errorMsg = "❌ 无法启动任务: " + err.Error() - sendEvent("error", errorMsg, nil) - } - if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) - } - sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) - return - } + firstRun := true + curFinalMessage := prep.FinalMessage + curHistory := prep.History + roleTools := prep.RoleTools taskStatus := "completed" defer h.tasks.FinishTask(conversationID, taskStatus) @@ -161,22 +141,108 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { return } - result, runErr := multiagent.RunEinoSingleChatModelAgent( - taskCtx, - h.config, - &h.config.MultiAgent, - h.agent, - h.logger, - conversationID, - prep.FinalMessage, - prep.History, - prep.RoleTools, - progressCallback, - ) + var result *multiagent.RunResult + var runErr error + + for { + baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) + taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) + + if firstRun { + if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { + var errorMsg string + if errors.Is(err, ErrTaskAlreadyRunning) { + errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。" + sendEvent("error", errorMsg, map[string]interface{}{ + "conversationId": conversationID, + "errorType": "task_already_running", + }) + } else { + errorMsg = "❌ 无法启动任务: " + err.Error() + sendEvent("error", errorMsg, nil) + } + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) + } + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + timeoutCancel() + return + } + firstRun = false + } else { + if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil { + h.logger.Error("续跑任务时重置 cancel 失败", zap.Error(err)) + taskStatus = "failed" + sendEvent("error", err.Error(), nil) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + timeoutCancel() + return + } + } + + progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) { + return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) + }) + + result, runErr = multiagent.RunEinoSingleChatModelAgent( + taskCtx, + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + conversationID, + curFinalMessage, + curHistory, + roleTools, + progressCallback, + ) + timeoutCancel() + + if runErr == nil { + break + } - if runErr != nil { h.persistEinoAgentTraceForResume(conversationID, result) cause := context.Cause(baseCtx) + if errors.Is(cause, ErrUserInterruptContinue) { + reason := h.tasks.TakeInterruptContinueReason(conversationID) + prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools) + if perr != nil { + h.logger.Error("准备中断后续跑失败", zap.Error(perr)) + taskStatus = "failed" + h.tasks.UpdateTaskStatus(conversationID, taskStatus) + errMsg := "中断后续跑失败: " + perr.Error() + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) + } + sendEvent("error", errMsg, map[string]interface{}{ + "conversationId": conversationID, + "messageId": assistantMessageID, + }) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + assistantMessageID = prepNext.AssistantMessageID + curFinalMessage = prepNext.FinalMessage + curHistory = prepNext.History + if prepNext.UserMessageID != "" { + sendEvent("message_saved", "", map[string]interface{}{ + "conversationId": conversationID, + "userMessageId": prepNext.UserMessageID, + }) + } + sendEvent("user_interrupt_continue", reason, map[string]interface{}{ + "conversationId": conversationID, + "reason": reason, + "messageId": assistantMessageID, + }) + sendEvent("progress", "已接收中断说明,继续迭代...", map[string]interface{}{ + "conversationId": conversationID, + }) + continue + } + if errors.Is(cause, ErrTaskCancelled) { taskStatus = "cancelled" h.tasks.UpdateTaskStatus(conversationID, taskStatus) diff --git a/internal/handler/monitor.go b/internal/handler/monitor.go index 17e6c79c..a9ba7119 100644 --- a/internal/handler/monitor.go +++ b/internal/handler/monitor.go @@ -1,6 +1,9 @@ package handler import ( + "encoding/json" + "errors" + "io" "net/http" "strconv" "strings" @@ -245,6 +248,37 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": "执行记录未找到"}) } +// CancelExecution 手动取消进行中的 MCP 工具调用(仅取消该次 tools/call 的上下文,不停止整条 Agent / 迭代任务) +// 请求体可选 JSON:{ "note": "用户说明" },将与工具已返回输出合并交给模型(含「用户终止说明」标题块,与命令行原文区分)。 +func (h *MonitorHandler) CancelExecution(c *gin.Context) { + id := c.Param("id") + if id == "" { + c.JSON(http.StatusBadRequest, gin.H{"error": "执行记录ID不能为空"}) + return + } + note := "" + dec := json.NewDecoder(c.Request.Body) + var body struct { + Note string `json:"note"` + } + if err := dec.Decode(&body); err != nil && !errors.Is(err, io.EOF) { + c.JSON(http.StatusBadRequest, gin.H{"error": "请求体须为 JSON,例如 {\"note\":\"说明\"},可为空对象"}) + return + } + note = strings.TrimSpace(body.Note) + if h.mcpServer.CancelToolExecutionWithNote(id, note) { + h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != "")) + c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id}) + return + } + if h.externalMCPMgr != nil && h.externalMCPMgr.CancelToolExecutionWithNote(id, note) { + h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "external"), zap.Bool("hasNote", note != "")) + c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id}) + return + } + c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"}) +} + // BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求) func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) { var req struct { @@ -317,7 +351,7 @@ func (h *MonitorHandler) DeleteExecution(c *gin.Context) { totalCalls := 1 successCalls := 0 failedCalls := 0 - if exec.Status == "failed" { + if exec.Status == "failed" || exec.Status == "cancelled" { failedCalls = 1 } else if exec.Status == "completed" { successCalls = 1 @@ -381,7 +415,7 @@ func (h *MonitorHandler) DeleteExecutions(c *gin.Context) { stats := toolStats[exec.ToolName] stats.totalCalls++ - if exec.Status == "failed" { + if exec.Status == "failed" || exec.Status == "cancelled" { stats.failedCalls++ } else if exec.Status == "completed" { stats.successCalls++ diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index 0d248cf4..0fd9d546 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -60,8 +60,11 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { sendEvent := func(eventType, message string, data interface{}) { // 用户主动停止时,Eino 可能仍会并发上报 eventType=="error"。 // 为避免 UI 看到“取消错误 + cancelled 文案”两条回复,这里直接丢弃取消对应的 error。 - if eventType == "error" && baseCtx != nil && errors.Is(context.Cause(baseCtx), ErrTaskCancelled) { - return + if eventType == "error" && baseCtx != nil { + cause := context.Cause(baseCtx) + if errors.Is(cause, ErrTaskCancelled) || errors.Is(cause, ErrUserInterruptContinue) { + return + } } ev := StreamEvent{Type: eventType, Message: message, Data: data} b, errMarshal := json.Marshal(ev) @@ -130,33 +133,12 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { }) } - baseCtx, cancelWithCause := context.WithCancelCause(context.Background()) - taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) - defer timeoutCancel() - defer cancelWithCause(nil) - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) - taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) { - return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) - }) - - if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { - var errorMsg string - if errors.Is(err, ErrTaskAlreadyRunning) { - errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。" - sendEvent("error", errorMsg, map[string]interface{}{ - "conversationId": conversationID, - "errorType": "task_already_running", - }) - } else { - errorMsg = "❌ 无法启动任务: " + err.Error() - sendEvent("error", errorMsg, nil) - } - if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) - } - sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) - return - } + var cancelWithCause context.CancelCauseFunc + firstRun := true + curFinalMessage := prep.FinalMessage + curHistory := prep.History + roleTools := prep.RoleTools + orch := strings.TrimSpace(req.Orchestration) taskStatus := "completed" defer h.tasks.FinishTask(conversationID, taskStatus) @@ -169,24 +151,110 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { go sseKeepalive(c, stopKeepalive, &sseWriteMu) defer close(stopKeepalive) - result, runErr := multiagent.RunDeepAgent( - taskCtx, - h.config, - &h.config.MultiAgent, - h.agent, - h.logger, - conversationID, - prep.FinalMessage, - prep.History, - prep.RoleTools, - progressCallback, - h.agentsMarkdownDir, - strings.TrimSpace(req.Orchestration), - ) + var result *multiagent.RunResult + var runErr error + + for { + baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) + taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) + + if firstRun { + if _, err := h.tasks.StartTask(conversationID, req.Message, cancelWithCause); err != nil { + var errorMsg string + if errors.Is(err, ErrTaskAlreadyRunning) { + errorMsg = "⚠️ 当前会话已有任务正在执行中,请等待当前任务完成或点击「停止任务」后再尝试。" + sendEvent("error", errorMsg, map[string]interface{}{ + "conversationId": conversationID, + "errorType": "task_already_running", + }) + } else { + errorMsg = "❌ 无法启动任务: " + err.Error() + sendEvent("error", errorMsg, nil) + } + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) + } + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + timeoutCancel() + return + } + firstRun = false + } else { + if err := h.tasks.ResetTaskCancelForContinue(conversationID, cancelWithCause); err != nil { + h.logger.Error("续跑任务时重置 cancel 失败", zap.Error(err)) + taskStatus = "failed" + sendEvent("error", err.Error(), nil) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + timeoutCancel() + return + } + } + + progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + taskCtx = multiagent.WithHITLToolInterceptor(taskCtx, func(ctx context.Context, toolName, arguments string) (string, error) { + return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) + }) + + result, runErr = multiagent.RunDeepAgent( + taskCtx, + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + conversationID, + curFinalMessage, + curHistory, + roleTools, + progressCallback, + h.agentsMarkdownDir, + orch, + ) + timeoutCancel() + + if runErr == nil { + break + } - if runErr != nil { h.persistEinoAgentTraceForResume(conversationID, result) cause := context.Cause(baseCtx) + if errors.Is(cause, ErrUserInterruptContinue) { + reason := h.tasks.TakeInterruptContinueReason(conversationID) + prepNext, perr := h.prepareSessionAfterUserInterrupt(conversationID, assistantMessageID, reason, roleTools) + if perr != nil { + h.logger.Error("准备中断后续跑失败", zap.Error(perr)) + taskStatus = "failed" + h.tasks.UpdateTaskStatus(conversationID, taskStatus) + errMsg := "中断后续跑失败: " + perr.Error() + if assistantMessageID != "" { + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) + } + sendEvent("error", errMsg, map[string]interface{}{ + "conversationId": conversationID, + "messageId": assistantMessageID, + }) + sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) + return + } + assistantMessageID = prepNext.AssistantMessageID + curFinalMessage = prepNext.FinalMessage + curHistory = prepNext.History + if prepNext.UserMessageID != "" { + sendEvent("message_saved", "", map[string]interface{}{ + "conversationId": conversationID, + "userMessageId": prepNext.UserMessageID, + }) + } + sendEvent("user_interrupt_continue", reason, map[string]interface{}{ + "conversationId": conversationID, + "reason": reason, + "messageId": assistantMessageID, + }) + sendEvent("progress", "已接收中断说明,继续迭代...", map[string]interface{}{ + "conversationId": conversationID, + }) + continue + } + if errors.Is(cause, ErrTaskCancelled) { taskStatus = "cancelled" h.tasks.UpdateTaskStatus(conversationID, taskStatus) diff --git a/internal/handler/multi_agent_prepare.go b/internal/handler/multi_agent_prepare.go index 51703e86..47f6ae09 100644 --- a/internal/handler/multi_agent_prepare.go +++ b/internal/handler/multi_agent_prepare.go @@ -3,6 +3,7 @@ package handler import ( "fmt" "strings" + "time" "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/database" @@ -142,3 +143,64 @@ func (h *AgentHandler) prepareMultiAgentSession(req *ChatRequest) (*multiAgentPr UserMessageID: userMessageID, }, nil } + +// prepareSessionAfterUserInterrupt 用户「中断并说明」后:结束当前助手占位、写入用户说明、新建助手占位,并生成下一轮 Run 所需的 History + FinalMessage。 +func (h *AgentHandler) prepareSessionAfterUserInterrupt(conversationID, prevAssistantMessageID, reason string, roleTools []string) (*multiAgentPrepared, error) { + if strings.TrimSpace(conversationID) == "" { + return nil, fmt.Errorf("conversationId 为空") + } + if _, err := h.db.GetConversation(conversationID); err != nil { + return nil, fmt.Errorf("对话不存在") + } + note := "(已根据用户说明中断当前步骤,正在继续迭代。)" + if prevAssistantMessageID != "" { + if _, err := h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", note, time.Now(), prevAssistantMessageID); err != nil { + return nil, fmt.Errorf("更新助手消息失败: %w", err) + } + r := strings.TrimSpace(reason) + detail := "用户中断并说明" + if r != "" { + detail += ":" + r + } + _ = h.db.AddProcessDetail(prevAssistantMessageID, conversationID, "user_interrupt", detail, map[string]interface{}{ + "reason": r, + }) + } + userContent := fmt.Sprintf("【用户中断说明】%s\n\n请根据以上说明调整并继续任务。", strings.TrimSpace(reason)) + if strings.TrimSpace(reason) == "" { + userContent = "【用户中断说明】(未填写具体原因)\n\n请根据情况调整并继续任务。" + } + userMsgRow, err := h.db.AddMessage(conversationID, "user", userContent, nil) + if err != nil { + return nil, fmt.Errorf("保存用户消息失败: %w", err) + } + assistantMsg, err := h.db.AddMessage(conversationID, "assistant", "处理中...", nil) + if err != nil || assistantMsg == nil { + return nil, fmt.Errorf("创建助手占位失败: %w", err) + } + msgs, err := h.db.GetMessages(conversationID) + if err != nil || len(msgs) < 2 { + return nil, fmt.Errorf("读取消息历史失败或消息不足") + } + histMsgs := msgs[:len(msgs)-2] + agentHistory := make([]agent.ChatMessage, 0, len(histMsgs)) + for _, msg := range histMsgs { + agentHistory = append(agentHistory, agent.ChatMessage{ + Role: msg.Role, + Content: msg.Content, + }) + } + userMessageID := "" + if userMsgRow != nil { + userMessageID = userMsgRow.ID + } + return &multiAgentPrepared{ + ConversationID: conversationID, + CreatedNew: false, + History: agentHistory, + FinalMessage: userContent, + RoleTools: roleTools, + AssistantMessageID: assistantMsg.ID, + UserMessageID: userMessageID, + }, nil +} diff --git a/internal/handler/openapi.go b/internal/handler/openapi.go index 1b6dc4d4..b91d9d2b 100644 --- a/internal/handler/openapi.go +++ b/internal/handler/openapi.go @@ -461,6 +461,14 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "type": "string", "description": "对话ID", }, + "reason": map[string]interface{}{ + "type": "string", + "description": "中断说明;与 continueAfter 同时为真时必填,将写入对话并由同一会话流式迭代继续", + }, + "continueAfter": map[string]interface{}{ + "type": "boolean", + "description": "为 true 时取消当前运行步骤并注入 reason 后继续迭代(非彻底停止)", + }, }, }, "AgentTask": map[string]interface{}{ @@ -3318,6 +3326,55 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, }, }, + "/api/monitor/execution/{id}/cancel": map[string]interface{}{ + "post": map[string]interface{}{ + "tags": []string{"监控"}, + "summary": "取消进行中的工具执行", + "description": "对当前进程内正在执行的 MCP 工具调用发送 context 取消信号;上层对话/多步任务可继续。若执行已结束或未在本进程内运行则返回 404。", + "operationId": "cancelExecution", + "parameters": []map[string]interface{}{ + { + "name": "id", + "in": "path", + "required": true, + "description": "执行ID", + "schema": map[string]interface{}{ + "type": "string", + }, + }, + }, + "requestBody": map[string]interface{}{ + "required": false, + "content": map[string]interface{}{ + "application/json": map[string]interface{}{ + "schema": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "note": map[string]interface{}{ + "type": "string", + "description": "可选。非空时与工具已返回输出合并交给大模型,并带有「用户终止说明」标题块以便与命令行原文区分", + }, + }, + }, + }, + }, + }, + "responses": map[string]interface{}{ + "200": map[string]interface{}{ + "description": "已发送终止信号", + }, + "400": map[string]interface{}{ + "description": "请求体不是合法 JSON", + }, + "404": map[string]interface{}{ + "description": "未找到进行中的工具执行", + }, + "401": map[string]interface{}{ + "description": "未授权", + }, + }, + }, + }, "/api/monitor/executions": map[string]interface{}{ "delete": map[string]interface{}{ "tags": []string{"监控"}, diff --git a/internal/handler/task_manager.go b/internal/handler/task_manager.go index acbc4733..33e40432 100644 --- a/internal/handler/task_manager.go +++ b/internal/handler/task_manager.go @@ -3,6 +3,7 @@ package handler import ( "context" "errors" + "strings" "sync" "time" ) @@ -10,6 +11,9 @@ import ( // ErrTaskCancelled 用户取消任务的错误 var ErrTaskCancelled = errors.New("agent task cancelled by user") +// ErrUserInterruptContinue 用户在进度条上「中断并说明」:取消当前运行步骤,将说明写入对话并继续迭代(与 ErrTaskCancelled 区分) +var ErrUserInterruptContinue = errors.New("user interrupt with continue") + // ErrTaskAlreadyRunning 会话已有任务正在执行 var ErrTaskAlreadyRunning = errors.New("agent task already running for conversation") @@ -21,6 +25,9 @@ type AgentTask struct { Status string `json:"status"` CancellingAt time.Time `json:"-"` // 进入 cancelling 状态的时间,用于清理长时间卡住的任务 + // InterruptContinueReason 由 /api/agent-loop/cancel 在 continueAfter 时写入,Run 返回后由 handler 取出并清空 + InterruptContinueReason string `json:"-"` + cancel func(error) } @@ -140,6 +147,49 @@ func (m *AgentTaskManager) StartTask(conversationID, message string, cancel cont return task, nil } +// SetInterruptContinueReason 在发起 ErrUserInterruptContinue 取消前写入用户说明(须任务仍存在)。 +func (m *AgentTaskManager) SetInterruptContinueReason(conversationID, reason string) bool { + m.mu.Lock() + defer m.mu.Unlock() + task, ok := m.tasks[conversationID] + if !ok { + return false + } + task.InterruptContinueReason = strings.TrimSpace(reason) + return true +} + +// TakeInterruptContinueReason 取出并清空用户中断说明。 +func (m *AgentTaskManager) TakeInterruptContinueReason(conversationID string) string { + m.mu.Lock() + defer m.mu.Unlock() + task, ok := m.tasks[conversationID] + if !ok { + return "" + } + r := task.InterruptContinueReason + task.InterruptContinueReason = "" + return r +} + +// ResetTaskCancelForContinue 在一次「中断并继续」后恢复任务为 running 并绑定新的 cancel(同一会话同一条 HTTP 流内续跑)。 +func (m *AgentTaskManager) ResetTaskCancelForContinue(conversationID string, cancel context.CancelCauseFunc) error { + m.mu.Lock() + defer m.mu.Unlock() + task, ok := m.tasks[conversationID] + if !ok { + return errors.New("no active task") + } + task.cancel = func(err error) { + if cancel != nil { + cancel(err) + } + } + task.Status = "running" + task.CancellingAt = time.Time{} + return nil +} + // CancelTask 取消指定会话的任务。若任务已在取消中,仍返回 (true, nil) 以便接口幂等、前端不报错。 func (m *AgentTaskManager) CancelTask(conversationID string, cause error) (bool, error) { m.mu.Lock()