diff --git a/internal/mcp/external_manager.go b/internal/mcp/external_manager.go index 3d3346d6..c67c25d9 100644 --- a/internal/mcp/external_manager.go +++ b/internal/mcp/external_manager.go @@ -32,6 +32,8 @@ type ExternalMCPManager struct { refreshWg sync.WaitGroup // 等待后台刷新goroutine完成 refreshing atomic.Bool // 防止 refreshToolCounts 并发堆积 mu sync.RWMutex + runningCancels map[string]context.CancelFunc + abortUserNotes map[string]string } // NewExternalMCPManager 创建外部MCP管理器 @@ -42,16 +44,18 @@ func NewExternalMCPManager(logger *zap.Logger) *ExternalMCPManager { // NewExternalMCPManagerWithStorage 创建外部MCP管理器(带持久化存储) func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage) *ExternalMCPManager { manager := &ExternalMCPManager{ - clients: make(map[string]ExternalMCPClient), - configs: make(map[string]config.ExternalMCPServerConfig), - logger: logger, - storage: storage, - executions: make(map[string]*ToolExecution), - stats: make(map[string]*ToolStats), - errors: make(map[string]string), - toolCounts: make(map[string]int), - toolCache: make(map[string][]Tool), - stopRefresh: make(chan struct{}), + clients: make(map[string]ExternalMCPClient), + configs: make(map[string]config.ExternalMCPServerConfig), + logger: logger, + storage: storage, + executions: make(map[string]*ToolExecution), + stats: make(map[string]*ToolStats), + errors: make(map[string]string), + toolCounts: make(map[string]int), + toolCache: make(map[string][]Tool), + stopRefresh: make(chan struct{}), + runningCancels: make(map[string]context.CancelFunc), + abortUserNotes: make(map[string]string), } // 启动后台刷新工具数量的goroutine manager.startToolCountRefresh() @@ -452,8 +456,16 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args } } + execCtx, runCancel := context.WithCancel(ctx) + m.registerRunningCancel(executionID, runCancel) + defer func() { + runCancel() + m.unregisterRunningCancel(executionID) + }() + // 调用工具 - result, err := client.CallTool(ctx, actualToolName, args) + result, err := client.CallTool(execCtx, actualToolName, args) + cancelledWithUserNote := m.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err) // 更新执行记录 m.mu.Lock() @@ -462,16 +474,23 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args execution.Duration = now.Sub(execution.StartTime) if err != nil { - execution.Status = "failed" - execution.Error = err.Error() + st, msg := executionStatusAndMessage(err) + execution.Status = st + execution.Error = msg } else if result != nil && result.IsError { - execution.Status = "failed" - if len(result.Content) > 0 { - execution.Error = result.Content[0].Text + if cancelledWithUserNote { + execution.Status = "cancelled" + execution.Error = "" + execution.Result = result } else { - execution.Error = "工具执行返回错误结果" + execution.Status = "failed" + if len(result.Content) > 0 { + execution.Error = result.Content[0].Text + } else { + execution.Error = "工具执行返回错误结果" + } + execution.Result = result } - execution.Result = result } else { execution.Status = "completed" if result == nil { @@ -509,6 +528,50 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args return result, executionID, nil } +func (m *ExternalMCPManager) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) { + note := strings.TrimSpace(m.readAbortUserNote(executionID)) + if note == "" { + return false + } + hasErr := err != nil && *err != nil + hasRes := result != nil && *result != nil + if !hasErr && !hasRes { + return false + } + _ = m.takeAbortUserNote(executionID) + partial := "" + if hasRes { + partial = ToolResultPlainText(*result) + } + if partial == "" && hasErr { + partial = (*err).Error() + } + merged := MergePartialToolOutputAndAbortNote(partial, note) + *err = nil + *result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true} + return true +} + +func (m *ExternalMCPManager) readAbortUserNote(id string) string { + m.mu.Lock() + defer m.mu.Unlock() + if m.abortUserNotes == nil { + return "" + } + return m.abortUserNotes[id] +} + +func (m *ExternalMCPManager) takeAbortUserNote(id string) string { + m.mu.Lock() + defer m.mu.Unlock() + if m.abortUserNotes == nil { + return "" + } + n := m.abortUserNotes[id] + delete(m.abortUserNotes, id) + return n +} + // cleanupOldExecutions 清理旧的执行记录(保持内存中的记录数量在限制内) func (m *ExternalMCPManager) cleanupOldExecutions() { const maxExecutionsInMemory = 1000 @@ -562,6 +625,42 @@ func (m *ExternalMCPManager) GetExecution(id string) (*ToolExecution, bool) { return nil, false } +func (m *ExternalMCPManager) registerRunningCancel(id string, cancel context.CancelFunc) { + m.mu.Lock() + m.runningCancels[id] = cancel + m.mu.Unlock() +} + +func (m *ExternalMCPManager) unregisterRunningCancel(id string) { + m.mu.Lock() + delete(m.runningCancels, id) + m.mu.Unlock() +} + +// CancelToolExecutionWithNote 取消外部 MCP 工具;note 非空时与已返回输出合并后交给模型。 +func (m *ExternalMCPManager) CancelToolExecutionWithNote(id string, note string) bool { + m.mu.Lock() + cancel, ok := m.runningCancels[id] + if !ok || cancel == nil { + m.mu.Unlock() + return false + } + if strings.TrimSpace(note) != "" { + if m.abortUserNotes == nil { + m.abortUserNotes = make(map[string]string) + } + m.abortUserNotes[id] = strings.TrimSpace(note) + } + m.mu.Unlock() + cancel() + return true +} + +// CancelToolExecution 取消正在执行的外部 MCP 工具(无用户说明)。 +func (m *ExternalMCPManager) CancelToolExecution(id string) bool { + return m.CancelToolExecutionWithNote(id, "") +} + // updateStats 更新统计信息 func (m *ExternalMCPManager) updateStats(toolName string, failed bool) { now := time.Now() diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 37670ba6..48b2325d 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -4,6 +4,7 @@ import ( "bufio" "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -40,6 +41,9 @@ type Server struct { logger *zap.Logger maxExecutionsInMemory int // 内存中最大执行记录数 sseClients map[string]*sseClient + runningCancels map[string]context.CancelFunc + runningCancelsMu sync.Mutex + abortUserNotes map[string]string // 监控页终止时附带的用户说明,与 executionID 对应 } type sseClient struct { @@ -50,6 +54,13 @@ type sseClient struct { // ToolHandler 工具处理函数 type ToolHandler func(ctx context.Context, args map[string]interface{}) (*ToolResult, error) +func executionStatusAndMessage(err error) (status string, errMsg string) { + if errors.Is(err, context.Canceled) { + return "cancelled", "已手动终止(MCP 监控)" + } + return "failed", err.Error() +} + // NewServer 创建新的MCP服务器 func NewServer(logger *zap.Logger) *Server { return NewServerWithStorage(logger, nil) @@ -68,6 +79,8 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server { logger: logger, maxExecutionsInMemory: 1000, // 默认最多在内存中保留1000条执行记录 sseClients: make(map[string]*sseClient), + runningCancels: make(map[string]context.CancelFunc), + abortUserNotes: make(map[string]string), } // 初始化默认提示词和资源 @@ -444,15 +457,22 @@ func (s *Server) handleCallTool(msg *Message) *Message { } } - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) - defer cancel() + baseCtx, timeoutCancel := context.WithTimeout(context.Background(), 30*time.Minute) + defer timeoutCancel() + execCtx, runCancel := context.WithCancel(baseCtx) + s.registerRunningCancel(executionID, runCancel) + defer func() { + runCancel() + s.unregisterRunningCancel(executionID) + }() s.logger.Info("开始执行工具", zap.String("toolName", req.Name), zap.Any("arguments", req.Arguments), ) - result, err := handler(ctx, req.Arguments) + result, err := handler(execCtx, req.Arguments) + cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err) now := time.Now() var failed bool var finalResult *ToolResult @@ -462,18 +482,26 @@ func (s *Server) handleCallTool(msg *Message) *Message { execution.Duration = now.Sub(execution.StartTime) if err != nil { - execution.Status = "failed" - execution.Error = err.Error() + st, msg := executionStatusAndMessage(err) + execution.Status = st + execution.Error = msg failed = true } else if result != nil && result.IsError { - execution.Status = "failed" - if len(result.Content) > 0 { - execution.Error = result.Content[0].Text + if cancelledWithUserNote { + execution.Status = "cancelled" + execution.Error = "" + execution.Result = result + failed = true } else { - execution.Error = "工具执行返回错误结果" + execution.Status = "failed" + if len(result.Content) > 0 { + execution.Error = result.Content[0].Text + } else { + execution.Error = "工具执行返回错误结果" + } + execution.Result = result + failed = true } - execution.Result = result - failed = true } else { execution.Status = "completed" if result == nil { @@ -510,9 +538,13 @@ func (s *Server) handleCallTool(msg *Message) *Message { zap.Error(err), ) + errText := fmt.Sprintf("工具执行失败: %v", err) + if errors.Is(err, context.Canceled) { + errText = "工具执行已手动终止(MCP 监控)。后续编排步骤可继续。" + } errorResult, _ := json.Marshal(CallToolResponse{ Content: []Content{ - {Type: "text", Text: fmt.Sprintf("工具执行失败: %v", err)}, + {Type: "text", Text: errText}, }, IsError: true, }) @@ -769,7 +801,15 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string] } } - result, err := handler(ctx, args) + execCtx, runCancel := context.WithCancel(ctx) + s.registerRunningCancel(executionID, runCancel) + defer func() { + runCancel() + s.unregisterRunningCancel(executionID) + }() + + result, err := handler(execCtx, args) + cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err) s.mu.Lock() now := time.Now() @@ -779,19 +819,28 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string] var finalResult *ToolResult if err != nil { - execution.Status = "failed" - execution.Error = err.Error() + st, msg := executionStatusAndMessage(err) + execution.Status = st + execution.Error = msg failed = true } else if result != nil && result.IsError { - execution.Status = "failed" - if len(result.Content) > 0 { - execution.Error = result.Content[0].Text + if cancelledWithUserNote { + execution.Status = "cancelled" + execution.Error = "" + execution.Result = result + failed = true + finalResult = result } else { - execution.Error = "工具执行返回错误结果" + execution.Status = "failed" + if len(result.Content) > 0 { + execution.Error = result.Content[0].Text + } else { + execution.Error = "工具执行返回错误结果" + } + execution.Result = result + failed = true + finalResult = result } - execution.Result = result - failed = true - finalResult = result } else { execution.Status = "completed" if result == nil { @@ -869,6 +918,88 @@ func (s *Server) cleanupOldExecutions() { ) } +func (s *Server) registerRunningCancel(id string, cancel context.CancelFunc) { + s.runningCancelsMu.Lock() + s.runningCancels[id] = cancel + s.runningCancelsMu.Unlock() +} + +func (s *Server) unregisterRunningCancel(id string) { + s.runningCancelsMu.Lock() + delete(s.runningCancels, id) + s.runningCancelsMu.Unlock() +} + +func (s *Server) readAbortUserNote(id string) string { + s.runningCancelsMu.Lock() + defer s.runningCancelsMu.Unlock() + if s.abortUserNotes == nil { + return "" + } + return s.abortUserNotes[id] +} + +func (s *Server) takeAbortUserNote(id string) string { + s.runningCancelsMu.Lock() + defer s.runningCancelsMu.Unlock() + if s.abortUserNotes == nil { + return "" + } + n := s.abortUserNotes[id] + delete(s.abortUserNotes, id) + return n +} + +// applyAbortUserNoteToCancelledToolResult 监控页「终止并填写说明」时合并「工具已输出 + 用户说明」交给模型。 +// exec 等工具会把失败写在 *ToolResult 里并返回 err==nil,若仅在 err!=nil 时合并会漏掉说明,甚至误 clear 掉 note。 +func (s *Server) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) { + note := strings.TrimSpace(s.readAbortUserNote(executionID)) + if note == "" { + return false + } + hasErr := err != nil && *err != nil + hasRes := result != nil && *result != nil + if !hasErr && !hasRes { + return false + } + _ = s.takeAbortUserNote(executionID) + partial := "" + if hasRes { + partial = ToolResultPlainText(*result) + } + if partial == "" && hasErr { + partial = (*err).Error() + } + merged := MergePartialToolOutputAndAbortNote(partial, note) + *err = nil + *result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true} + return true +} + +// CancelToolExecutionWithNote 取消内部工具;note 非空时与工具已返回文本合并后交给上层模型。 +func (s *Server) CancelToolExecutionWithNote(id string, note string) bool { + s.runningCancelsMu.Lock() + cancel, ok := s.runningCancels[id] + if !ok || cancel == nil { + s.runningCancelsMu.Unlock() + return false + } + if strings.TrimSpace(note) != "" { + if s.abortUserNotes == nil { + s.abortUserNotes = make(map[string]string) + } + s.abortUserNotes[id] = strings.TrimSpace(note) + } + s.runningCancelsMu.Unlock() + cancel() + return true +} + +// CancelToolExecution 取消正在执行的内部工具调用(无用户说明)。 +func (s *Server) CancelToolExecution(id string) bool { + return s.CancelToolExecutionWithNote(id, "") +} + // initDefaultPrompts 初始化默认提示词模板 func (s *Server) initDefaultPrompts() { s.mu.Lock() diff --git a/internal/mcp/types.go b/internal/mcp/types.go index 393717b9..bc93bb72 100644 --- a/internal/mcp/types.go +++ b/internal/mcp/types.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" ) @@ -192,7 +193,7 @@ type ToolExecution struct { ID string `json:"id"` ToolName string `json:"toolName"` Arguments map[string]interface{} `json:"arguments"` - Status string `json:"status"` // pending, running, completed, failed + Status string `json:"status"` // pending, running, completed, failed, cancelled Result *ToolResult `json:"result,omitempty"` Error string `json:"error,omitempty"` StartTime time.Time `json:"startTime"` @@ -293,3 +294,36 @@ type SamplingContent struct { Type string `json:"type"` Text string `json:"text,omitempty"` } + +// ToolResultPlainText 拼接工具结果中的文本(手动终止时作为「工具原始输出」)。 +func ToolResultPlainText(r *ToolResult) string { + if r == nil || len(r.Content) == 0 { + return "" + } + var b strings.Builder + for _, c := range r.Content { + b.WriteString(c.Text) + } + return strings.TrimSpace(b.String()) +} + +// AbortNoteBannerForModel 标出后续文本来自「用户手动终止工具时在弹窗中填写」,避免与 stdout/stderr 混淆。 +const AbortNoteBannerForModel = "---\n" + + "【用户终止说明|USER INTERRUPT NOTE】\n" + + "(以下由操作者填写,用于指示模型如何继续;不是工具原始输出。)\n" + + "(Written by the operator when stopping this tool; not raw tool output.)\n" + + "---" + +// MergePartialToolOutputAndAbortNote 格式:工具原始输出 + 醒目标题 + 用户终止说明(无说明则原样返回 partial)。 +func MergePartialToolOutputAndAbortNote(partial, userNote string) string { + partial = strings.TrimSpace(partial) + userNote = strings.TrimSpace(userNote) + if userNote == "" { + return partial + } + section := AbortNoteBannerForModel + "\n" + userNote + if partial == "" { + return section + } + return partial + "\n\n" + section +}