diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 25b3895b..67b7aa6b 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -187,6 +187,14 @@ func (h *AgentHandler) SetAudit(s *audit.Service) { h.audit = s } +// TaskManager 返回 Agent 任务管理器(供 MCP 监控页终止 Eino execute 等)。 +func (h *AgentHandler) TaskManager() *AgentTaskManager { + if h == nil { + return nil + } + return h.tasks +} + // CancelRunningTaskForConversation stops any in-flight agent work for the conversation (idempotent). func (h *AgentHandler) CancelRunningTaskForConversation(conversationID string) { if h == nil || conversationID == "" || h.tasks == nil { @@ -1291,6 +1299,55 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun } } +// cancelToolContinueAfter 仅终止当前工具调用,不停止整条 Agent 任务(对话「中断并继续」与 MCP 监控终止共用)。 +func (h *AgentHandler) cancelToolContinueAfter(conversationID, preferredExecID, note string) (bool, gin.H) { + conversationID = strings.TrimSpace(conversationID) + if conversationID == "" || h.tasks.GetTask(conversationID) == nil { + return false, nil + } + note = strings.TrimSpace(note) + execID := strings.TrimSpace(preferredExecID) + if execID == "" { + execID = h.tasks.ActiveMCPExecutionID(conversationID) + } + if execID != "" { + if h.agent.CancelMCPToolExecutionWithNote(execID, note) { + return true, gin.H{ + "status": "tool_abort_requested", + "conversationId": conversationID, + "executionId": execID, + "message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。", + "continueAfter": true, + "interruptWithNote": note != "", + "continueWithoutTool": false, + } + } + if h.tasks.AbortActiveEinoExecute(conversationID, note) { + return true, gin.H{ + "status": "tool_abort_requested", + "conversationId": conversationID, + "executionId": execID, + "message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。", + "continueAfter": true, + "interruptWithNote": note != "", + "continueWithoutTool": false, + } + } + return false, nil + } + if h.tasks.AbortActiveEinoExecute(conversationID, note) { + return true, gin.H{ + "status": "tool_abort_requested", + "conversationId": conversationID, + "message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。", + "continueAfter": true, + "interruptWithNote": note != "", + "continueWithoutTool": false, + } + } + return false, nil +} + // CancelAgentLoop 取消正在执行的任务 func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { var req struct { @@ -1309,42 +1366,20 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务"}) return } - execID := h.tasks.ActiveMCPExecutionID(req.ConversationID) note := strings.TrimSpace(req.Reason) - if execID != "" { - if !h.agent.CancelMCPToolExecutionWithNote(execID, note) { - c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"}) - return - } - h.logger.Info("对话页仅终止当前 MCP 工具", + activeExec := strings.TrimSpace(h.tasks.ActiveMCPExecutionID(req.ConversationID)) + if ok, payload := h.cancelToolContinueAfter(req.ConversationID, "", note); ok { + execID, _ := payload["executionId"].(string) + h.logger.Info("对话页仅终止当前工具", zap.String("conversationId", req.ConversationID), zap.String("executionId", execID), zap.Bool("hasNote", note != ""), ) - c.JSON(http.StatusOK, gin.H{ - "status": "tool_abort_requested", - "conversationId": req.ConversationID, - "executionId": execID, - "message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。", - "continueAfter": true, - "interruptWithNote": note != "", - "continueWithoutTool": false, - }) + c.JSON(http.StatusOK, payload) return } - if h.tasks.AbortActiveEinoExecute(req.ConversationID, note) { - h.logger.Info("对话页仅终止当前 Eino execute", - zap.String("conversationId", req.ConversationID), - zap.Bool("hasNote", note != ""), - ) - c.JSON(http.StatusOK, gin.H{ - "status": "tool_abort_requested", - "conversationId": req.ConversationID, - "message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。", - "continueAfter": true, - "interruptWithNote": note != "", - "continueWithoutTool": false, - }) + if activeExec != "" { + c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"}) return } // 无进行中的 MCP 工具(模型纯推理/流式输出阶段):取消当前上下文并由 Eino 流式处理器合并用户补充后自动续跑。 diff --git a/internal/handler/monitor.go b/internal/handler/monitor.go index fe9b7204..3d5fc4d4 100644 --- a/internal/handler/monitor.go +++ b/internal/handler/monitor.go @@ -23,6 +23,8 @@ import ( type MonitorHandler struct { mcpServer *mcp.Server externalMCPMgr *mcp.ExternalMCPManager + taskManager *AgentTaskManager + agentHandler *AgentHandler executor *security.Executor db *database.DB logger *zap.Logger @@ -56,6 +58,16 @@ func (h *MonitorHandler) SetExternalMCPManager(mgr *mcp.ExternalMCPManager) { h.externalMCPMgr = mgr } +// SetTaskManager 设置 Agent 任务管理器(用于 Eino execute 等按 executionId 终止)。 +func (h *MonitorHandler) SetTaskManager(mgr *AgentTaskManager) { + h.taskManager = mgr +} + +// SetAgentHandler 设置 Agent 处理器(MCP 监控终止与对话页「中断并继续」共用逻辑)。 +func (h *MonitorHandler) SetAgentHandler(ah *AgentHandler) { + h.agentHandler = ah +} + // MonitorResponse 监控响应 type MonitorResponse struct { Executions []*mcp.ToolExecution `json:"executions"` @@ -90,6 +102,7 @@ func (h *MonitorHandler) Monitor(c *gin.Context) { toolName := normalizeToolNameFilter(c.Query("tool")) executions, total := h.loadExecutionsWithPagination(page, pageSize, status, toolName) + h.enrichExecutionsConversationID(executions) stats := h.loadStats() totalPages := (total + pageSize - 1) / pageSize @@ -247,6 +260,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) { // 先从内部MCP服务器查找 exec, exists := h.mcpServer.GetExecution(id) if exists { + h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec}) c.JSON(http.StatusOK, exec) return } @@ -255,6 +269,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) { if h.externalMCPMgr != nil { exec, exists = h.externalMCPMgr.GetExecution(id) if exists { + h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec}) c.JSON(http.StatusOK, exec) return } @@ -264,6 +279,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) { if h.db != nil { exec, err := h.db.GetToolExecution(id) if err == nil && exec != nil { + h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec}) c.JSON(http.StatusOK, exec) return } @@ -290,6 +306,19 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) { return } note = strings.TrimSpace(body.Note) + + convID := h.conversationIDForRunningExecution(id) + if convID != "" && h.agentHandler != nil { + if ok, payload := h.agentHandler.cancelToolContinueAfter(convID, id, note); ok { + h.logger.Info("MCP 监控页终止工具(与对话中断并继续一致)", + zap.String("executionId", id), + zap.String("conversationId", convID), + zap.Bool("hasNote", note != ""), + ) + c.JSON(http.StatusOK, payload) + return + } + } if h.mcpServer.CancelToolExecutionWithNote(id, note) { h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != "")) c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id}) @@ -303,6 +332,52 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) { c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"}) } +func (h *MonitorHandler) enrichExecutionsConversationID(executions []*mcp.ToolExecution) { + for _, exec := range executions { + if exec == nil { + continue + } + exec.ConversationID = h.conversationIDForRunningExecution(exec.ID) + } +} + +func (h *MonitorHandler) conversationIDForRunningExecution(executionID string) string { + executionID = strings.TrimSpace(executionID) + if executionID == "" || h.taskManager == nil { + return "" + } + if conv := h.taskManager.ConversationIDForActiveMCPExecution(executionID); conv != "" { + return conv + } + exec := h.lookupExecution(executionID) + if exec == nil || exec.Status != "running" { + return "" + } + if strings.TrimSpace(exec.ToolName) == "execute" { + if onlyConv, ok := h.taskManager.ConversationIDForActiveEinoExecute(); ok { + return onlyConv + } + } + return "" +} + +func (h *MonitorHandler) lookupExecution(id string) *mcp.ToolExecution { + if exec, ok := h.mcpServer.GetExecution(id); ok { + return exec + } + if h.externalMCPMgr != nil { + if exec, ok := h.externalMCPMgr.GetExecution(id); ok { + return exec + } + } + if h.db != nil { + if exec, err := h.db.GetToolExecution(id); err == nil && exec != nil { + return exec + } + } + return nil +} + // BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求) func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) { var req struct { diff --git a/internal/handler/task_manager.go b/internal/handler/task_manager.go index f09a0d4b..a40c4123 100644 --- a/internal/handler/task_manager.go +++ b/internal/handler/task_manager.go @@ -103,6 +103,40 @@ func (m *AgentTaskManager) UnregisterActiveEinoExecute(conversationID string) { } } +// ConversationIDForActiveMCPExecution 根据当前登记的工具 executionId 反查会话 ID(供 MCP 监控页按 executionId 终止)。 +func (m *AgentTaskManager) ConversationIDForActiveMCPExecution(executionID string) string { + executionID = strings.TrimSpace(executionID) + if executionID == "" { + return "" + } + m.mu.Lock() + defer m.mu.Unlock() + for convID, t := range m.tasks { + if t != nil && t.ActiveMCPExecutionID == executionID { + return convID + } + } + return "" +} + +// ConversationIDForActiveEinoExecute 返回当前唯一进行 Eino execute 的会话 ID;多会话并行时返回空。 +func (m *AgentTaskManager) ConversationIDForActiveEinoExecute() (string, bool) { + m.mu.Lock() + defer m.mu.Unlock() + var found string + count := 0 + for convID, t := range m.tasks { + if t != nil && t.activeEinoExecuteCancel != nil { + found = convID + count++ + } + } + if count == 1 { + return found, true + } + return "", false +} + // AbortActiveEinoExecute 终止当前 Eino execute 并暂存用户说明(与 MCP 工具终止一致)。 func (m *AgentTaskManager) AbortActiveEinoExecute(conversationID, note string) bool { conversationID = strings.TrimSpace(conversationID) diff --git a/internal/handler/task_manager_eino_execute_test.go b/internal/handler/task_manager_eino_execute_test.go index 7a88d50c..d678aff6 100644 --- a/internal/handler/task_manager_eino_execute_test.go +++ b/internal/handler/task_manager_eino_execute_test.go @@ -38,3 +38,19 @@ func TestAbortActiveEinoExecute(t *testing.T) { t.Fatal("second abort should fail when no active execute") } } + +func TestConversationIDForActiveMCPExecution(t *testing.T) { + m := NewAgentTaskManager() + conv := "conv-mcp-exec" + _, err := m.StartTask(conv, "test", func(error) {}) + if err != nil { + t.Fatalf("StartTask: %v", err) + } + m.RegisterRunningTool(conv, "exec-123") + if got := m.ConversationIDForActiveMCPExecution("exec-123"); got != conv { + t.Fatalf("got %q, want %q", got, conv) + } + if got := m.ConversationIDForActiveMCPExecution("missing"); got != "" { + t.Fatalf("missing should be empty, got %q", got) + } +}