diff --git a/internal/handler/agent.go b/internal/handler/agent.go index f3fa731b..143b1f16 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1336,6 +1336,21 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { }) return } + if h.tasks.AbortActiveEinoExecute(req.ConversationID, note) { + h.logger.Info("对话页仅终止当前 Eino execute", + zap.String("conversationId", req.ConversationID), + zap.Bool("hasNote", note != ""), + ) + c.JSON(http.StatusOK, gin.H{ + "status": "tool_abort_requested", + "conversationId": req.ConversationID, + "message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。", + "continueAfter": true, + "interruptWithNote": note != "", + "continueWithoutTool": false, + }) + return + } // 无进行中的 MCP 工具(模型纯推理/流式输出阶段):取消当前上下文并由 Eino 流式处理器合并用户补充后自动续跑。 h.tasks.SetInterruptContinueNote(req.ConversationID, note) ok, err := h.tasks.CancelTask(req.ConversationID, multiagent.ErrInterruptContinue) @@ -2230,6 +2245,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { progressCallback = h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) taskCtx = mcp.WithMCPConversationID(taskCtx, conversationID) taskCtx = mcp.WithToolRunRegistry(taskCtx, h.tasks) + taskCtx = mcp.WithEinoExecuteRunRegistry(taskCtx, h.tasks) // 使用队列配置的角色工具列表(如果为空,表示使用所有工具) useBatchMulti := false diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index a685676a..f9d4cba6 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -212,6 +212,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { } taskCtxLoop := mcp.WithMCPConversationID(taskCtx, conversationID) taskCtxLoop = mcp.WithToolRunRegistry(taskCtxLoop, h.tasks) + taskCtxLoop = mcp.WithEinoExecuteRunRegistry(taskCtxLoop, h.tasks) taskCtxLoop = multiagent.WithHITLToolInterceptor(taskCtxLoop, func(ctx context.Context, toolName, arguments string) (string, error) { return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) }) diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index ab830918..850ee14c 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -222,6 +222,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { } taskCtxLoop := mcp.WithMCPConversationID(taskCtx, conversationID) taskCtxLoop = mcp.WithToolRunRegistry(taskCtxLoop, h.tasks) + taskCtxLoop = mcp.WithEinoExecuteRunRegistry(taskCtxLoop, h.tasks) taskCtxLoop = multiagent.WithHITLToolInterceptor(taskCtxLoop, func(ctx context.Context, toolName, arguments string) (string, error) { return h.interceptHITLForEinoTool(ctx, cancelWithCause, conversationID, assistantMessageID, sendEvent, toolName, arguments) }) diff --git a/internal/handler/task_manager.go b/internal/handler/task_manager.go index 82e9f304..f09a0d4b 100644 --- a/internal/handler/task_manager.go +++ b/internal/handler/task_manager.go @@ -37,6 +37,11 @@ type AgentTask struct { // InterruptContinueNote 无 MCP 时「中断并继续」由用户在弹窗中填写的补充说明(Cancel 前写入,续跑轮次读取后清空) InterruptContinueNote string `json:"-"` + // activeEinoExecuteCancel 当前进行中的 Eino filesystem execute 取消函数(与 MCP 工具并行,供中断并继续) + activeEinoExecuteCancel context.CancelFunc + // activeEinoExecuteAbortNote AbortActiveEinoExecute 写入的用户说明,由 execute 收尾时合并进工具结果 + activeEinoExecuteAbortNote string + cancel func(error) } @@ -70,6 +75,69 @@ func (m *AgentTaskManager) UnregisterRunningTool(conversationID, executionID str } } +// RegisterActiveEinoExecute 登记进行中的 Eino filesystem execute(每会话同时仅一条)。 +func (m *AgentTaskManager) RegisterActiveEinoExecute(conversationID string, cancel context.CancelFunc) { + conversationID = strings.TrimSpace(conversationID) + if conversationID == "" || cancel == nil { + return + } + m.mu.Lock() + defer m.mu.Unlock() + if t, ok := m.tasks[conversationID]; ok && t != nil { + t.activeEinoExecuteCancel = cancel + t.activeEinoExecuteAbortNote = "" + } +} + +// UnregisterActiveEinoExecute execute 正常结束或已取消后清除登记。 +func (m *AgentTaskManager) UnregisterActiveEinoExecute(conversationID string) { + conversationID = strings.TrimSpace(conversationID) + if conversationID == "" { + return + } + m.mu.Lock() + defer m.mu.Unlock() + if t, ok := m.tasks[conversationID]; ok && t != nil { + t.activeEinoExecuteCancel = nil + t.activeEinoExecuteAbortNote = "" + } +} + +// AbortActiveEinoExecute 终止当前 Eino execute 并暂存用户说明(与 MCP 工具终止一致)。 +func (m *AgentTaskManager) AbortActiveEinoExecute(conversationID, note string) bool { + conversationID = strings.TrimSpace(conversationID) + if conversationID == "" { + return false + } + m.mu.Lock() + t, ok := m.tasks[conversationID] + if !ok || t == nil || t.activeEinoExecuteCancel == nil { + m.mu.Unlock() + return false + } + t.activeEinoExecuteAbortNote = strings.TrimSpace(note) + cancel := t.activeEinoExecuteCancel + m.mu.Unlock() + cancel() + return true +} + +// TakeEinoExecuteAbortNote 读取并清空 execute 终止说明(execute 收尾时调用一次)。 +func (m *AgentTaskManager) TakeEinoExecuteAbortNote(conversationID string) string { + conversationID = strings.TrimSpace(conversationID) + if conversationID == "" { + return "" + } + m.mu.Lock() + defer m.mu.Unlock() + if t, ok := m.tasks[conversationID]; ok && t != nil { + n := t.activeEinoExecuteAbortNote + t.activeEinoExecuteAbortNote = "" + return n + } + return "" +} + // SetInterruptContinueNote 在发起 ErrInterruptContinue 取消前写入用户补充说明(仅内存)。 func (m *AgentTaskManager) SetInterruptContinueNote(conversationID, note string) { conversationID = strings.TrimSpace(conversationID) diff --git a/internal/handler/task_manager_eino_execute_test.go b/internal/handler/task_manager_eino_execute_test.go new file mode 100644 index 00000000..7a88d50c --- /dev/null +++ b/internal/handler/task_manager_eino_execute_test.go @@ -0,0 +1,40 @@ +package handler + +import ( + "context" + "testing" + "time" +) + +func TestAbortActiveEinoExecute(t *testing.T) { + m := NewAgentTaskManager() + conv := "conv-eino-exec-abort" + ctx, cancel := context.WithCancel(context.Background()) + _, err := m.StartTask(conv, "test", func(error) {}) + if err != nil { + t.Fatalf("StartTask: %v", err) + } + m.RegisterActiveEinoExecute(conv, cancel) + + done := make(chan struct{}) + go func() { + <-ctx.Done() + close(done) + }() + + if !m.AbortActiveEinoExecute(conv, "跳过域名收集") { + t.Fatal("expected abort to succeed") + } + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("execute cancel did not propagate") + } + if got := m.TakeEinoExecuteAbortNote(conv); got != "跳过域名收集" { + t.Fatalf("abort note = %q, want 跳过域名收集", got) + } + m.UnregisterActiveEinoExecute(conv) + if m.AbortActiveEinoExecute(conv, "") { + t.Fatal("second abort should fail when no active execute") + } +}