Add files via upload

This commit is contained in:
公明
2026-05-07 17:02:26 +08:00
committed by GitHub
parent 6d180c814d
commit b62dc1f326
3 changed files with 305 additions and 41 deletions
+117 -18
View File
@@ -32,6 +32,8 @@ type ExternalMCPManager struct {
refreshWg sync.WaitGroup // 等待后台刷新goroutine完成
refreshing atomic.Bool // 防止 refreshToolCounts 并发堆积
mu sync.RWMutex
runningCancels map[string]context.CancelFunc
abortUserNotes map[string]string
}
// NewExternalMCPManager 创建外部MCP管理器
@@ -42,16 +44,18 @@ func NewExternalMCPManager(logger *zap.Logger) *ExternalMCPManager {
// NewExternalMCPManagerWithStorage 创建外部MCP管理器(带持久化存储)
func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage) *ExternalMCPManager {
manager := &ExternalMCPManager{
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
toolCounts: make(map[string]int),
toolCache: make(map[string][]Tool),
stopRefresh: make(chan struct{}),
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
toolCounts: make(map[string]int),
toolCache: make(map[string][]Tool),
stopRefresh: make(chan struct{}),
runningCancels: make(map[string]context.CancelFunc),
abortUserNotes: make(map[string]string),
}
// 启动后台刷新工具数量的goroutine
manager.startToolCountRefresh()
@@ -452,8 +456,16 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
}
}
execCtx, runCancel := context.WithCancel(ctx)
m.registerRunningCancel(executionID, runCancel)
defer func() {
runCancel()
m.unregisterRunningCancel(executionID)
}()
// 调用工具
result, err := client.CallTool(ctx, actualToolName, args)
result, err := client.CallTool(execCtx, actualToolName, args)
cancelledWithUserNote := m.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
// 更新执行记录
m.mu.Lock()
@@ -462,16 +474,23 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
execution.Duration = now.Sub(execution.StartTime)
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
}
execution.Result = result
} else {
execution.Status = "completed"
if result == nil {
@@ -509,6 +528,50 @@ func (m *ExternalMCPManager) CallTool(ctx context.Context, toolName string, args
return result, executionID, nil
}
func (m *ExternalMCPManager) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) {
note := strings.TrimSpace(m.readAbortUserNote(executionID))
if note == "" {
return false
}
hasErr := err != nil && *err != nil
hasRes := result != nil && *result != nil
if !hasErr && !hasRes {
return false
}
_ = m.takeAbortUserNote(executionID)
partial := ""
if hasRes {
partial = ToolResultPlainText(*result)
}
if partial == "" && hasErr {
partial = (*err).Error()
}
merged := MergePartialToolOutputAndAbortNote(partial, note)
*err = nil
*result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true}
return true
}
func (m *ExternalMCPManager) readAbortUserNote(id string) string {
m.mu.Lock()
defer m.mu.Unlock()
if m.abortUserNotes == nil {
return ""
}
return m.abortUserNotes[id]
}
func (m *ExternalMCPManager) takeAbortUserNote(id string) string {
m.mu.Lock()
defer m.mu.Unlock()
if m.abortUserNotes == nil {
return ""
}
n := m.abortUserNotes[id]
delete(m.abortUserNotes, id)
return n
}
// cleanupOldExecutions 清理旧的执行记录(保持内存中的记录数量在限制内)
func (m *ExternalMCPManager) cleanupOldExecutions() {
const maxExecutionsInMemory = 1000
@@ -562,6 +625,42 @@ func (m *ExternalMCPManager) GetExecution(id string) (*ToolExecution, bool) {
return nil, false
}
func (m *ExternalMCPManager) registerRunningCancel(id string, cancel context.CancelFunc) {
m.mu.Lock()
m.runningCancels[id] = cancel
m.mu.Unlock()
}
func (m *ExternalMCPManager) unregisterRunningCancel(id string) {
m.mu.Lock()
delete(m.runningCancels, id)
m.mu.Unlock()
}
// CancelToolExecutionWithNote 取消外部 MCP 工具;note 非空时与已返回输出合并后交给模型。
func (m *ExternalMCPManager) CancelToolExecutionWithNote(id string, note string) bool {
m.mu.Lock()
cancel, ok := m.runningCancels[id]
if !ok || cancel == nil {
m.mu.Unlock()
return false
}
if strings.TrimSpace(note) != "" {
if m.abortUserNotes == nil {
m.abortUserNotes = make(map[string]string)
}
m.abortUserNotes[id] = strings.TrimSpace(note)
}
m.mu.Unlock()
cancel()
return true
}
// CancelToolExecution 取消正在执行的外部 MCP 工具(无用户说明)。
func (m *ExternalMCPManager) CancelToolExecution(id string) bool {
return m.CancelToolExecutionWithNote(id, "")
}
// updateStats 更新统计信息
func (m *ExternalMCPManager) updateStats(toolName string, failed bool) {
now := time.Now()
+153 -22
View File
@@ -4,6 +4,7 @@ import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
@@ -40,6 +41,9 @@ type Server struct {
logger *zap.Logger
maxExecutionsInMemory int // 内存中最大执行记录数
sseClients map[string]*sseClient
runningCancels map[string]context.CancelFunc
runningCancelsMu sync.Mutex
abortUserNotes map[string]string // 监控页终止时附带的用户说明,与 executionID 对应
}
type sseClient struct {
@@ -50,6 +54,13 @@ type sseClient struct {
// ToolHandler 工具处理函数
type ToolHandler func(ctx context.Context, args map[string]interface{}) (*ToolResult, error)
func executionStatusAndMessage(err error) (status string, errMsg string) {
if errors.Is(err, context.Canceled) {
return "cancelled", "已手动终止(MCP 监控)"
}
return "failed", err.Error()
}
// NewServer 创建新的MCP服务器
func NewServer(logger *zap.Logger) *Server {
return NewServerWithStorage(logger, nil)
@@ -68,6 +79,8 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server {
logger: logger,
maxExecutionsInMemory: 1000, // 默认最多在内存中保留1000条执行记录
sseClients: make(map[string]*sseClient),
runningCancels: make(map[string]context.CancelFunc),
abortUserNotes: make(map[string]string),
}
// 初始化默认提示词和资源
@@ -444,15 +457,22 @@ func (s *Server) handleCallTool(msg *Message) *Message {
}
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
baseCtx, timeoutCancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer timeoutCancel()
execCtx, runCancel := context.WithCancel(baseCtx)
s.registerRunningCancel(executionID, runCancel)
defer func() {
runCancel()
s.unregisterRunningCancel(executionID)
}()
s.logger.Info("开始执行工具",
zap.String("toolName", req.Name),
zap.Any("arguments", req.Arguments),
)
result, err := handler(ctx, req.Arguments)
result, err := handler(execCtx, req.Arguments)
cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
now := time.Now()
var failed bool
var finalResult *ToolResult
@@ -462,18 +482,26 @@ func (s *Server) handleCallTool(msg *Message) *Message {
execution.Duration = now.Sub(execution.StartTime)
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
failed = true
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
failed = true
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
failed = true
}
execution.Result = result
failed = true
} else {
execution.Status = "completed"
if result == nil {
@@ -510,9 +538,13 @@ func (s *Server) handleCallTool(msg *Message) *Message {
zap.Error(err),
)
errText := fmt.Sprintf("工具执行失败: %v", err)
if errors.Is(err, context.Canceled) {
errText = "工具执行已手动终止(MCP 监控)。后续编排步骤可继续。"
}
errorResult, _ := json.Marshal(CallToolResponse{
Content: []Content{
{Type: "text", Text: fmt.Sprintf("工具执行失败: %v", err)},
{Type: "text", Text: errText},
},
IsError: true,
})
@@ -769,7 +801,15 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
}
}
result, err := handler(ctx, args)
execCtx, runCancel := context.WithCancel(ctx)
s.registerRunningCancel(executionID, runCancel)
defer func() {
runCancel()
s.unregisterRunningCancel(executionID)
}()
result, err := handler(execCtx, args)
cancelledWithUserNote := s.applyAbortUserNoteToCancelledToolResult(executionID, &result, &err)
s.mu.Lock()
now := time.Now()
@@ -779,19 +819,28 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
var finalResult *ToolResult
if err != nil {
execution.Status = "failed"
execution.Error = err.Error()
st, msg := executionStatusAndMessage(err)
execution.Status = st
execution.Error = msg
failed = true
} else if result != nil && result.IsError {
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
if cancelledWithUserNote {
execution.Status = "cancelled"
execution.Error = ""
execution.Result = result
failed = true
finalResult = result
} else {
execution.Error = "工具执行返回错误结果"
execution.Status = "failed"
if len(result.Content) > 0 {
execution.Error = result.Content[0].Text
} else {
execution.Error = "工具执行返回错误结果"
}
execution.Result = result
failed = true
finalResult = result
}
execution.Result = result
failed = true
finalResult = result
} else {
execution.Status = "completed"
if result == nil {
@@ -869,6 +918,88 @@ func (s *Server) cleanupOldExecutions() {
)
}
func (s *Server) registerRunningCancel(id string, cancel context.CancelFunc) {
s.runningCancelsMu.Lock()
s.runningCancels[id] = cancel
s.runningCancelsMu.Unlock()
}
func (s *Server) unregisterRunningCancel(id string) {
s.runningCancelsMu.Lock()
delete(s.runningCancels, id)
s.runningCancelsMu.Unlock()
}
func (s *Server) readAbortUserNote(id string) string {
s.runningCancelsMu.Lock()
defer s.runningCancelsMu.Unlock()
if s.abortUserNotes == nil {
return ""
}
return s.abortUserNotes[id]
}
func (s *Server) takeAbortUserNote(id string) string {
s.runningCancelsMu.Lock()
defer s.runningCancelsMu.Unlock()
if s.abortUserNotes == nil {
return ""
}
n := s.abortUserNotes[id]
delete(s.abortUserNotes, id)
return n
}
// applyAbortUserNoteToCancelledToolResult 监控页「终止并填写说明」时合并「工具已输出 + 用户说明」交给模型。
// exec 等工具会把失败写在 *ToolResult 里并返回 err==nil,若仅在 err!=nil 时合并会漏掉说明,甚至误 clear 掉 note。
func (s *Server) applyAbortUserNoteToCancelledToolResult(executionID string, result **ToolResult, err *error) (cancelledWithUserNote bool) {
note := strings.TrimSpace(s.readAbortUserNote(executionID))
if note == "" {
return false
}
hasErr := err != nil && *err != nil
hasRes := result != nil && *result != nil
if !hasErr && !hasRes {
return false
}
_ = s.takeAbortUserNote(executionID)
partial := ""
if hasRes {
partial = ToolResultPlainText(*result)
}
if partial == "" && hasErr {
partial = (*err).Error()
}
merged := MergePartialToolOutputAndAbortNote(partial, note)
*err = nil
*result = &ToolResult{Content: []Content{{Type: "text", Text: merged}}, IsError: true}
return true
}
// CancelToolExecutionWithNote 取消内部工具;note 非空时与工具已返回文本合并后交给上层模型。
func (s *Server) CancelToolExecutionWithNote(id string, note string) bool {
s.runningCancelsMu.Lock()
cancel, ok := s.runningCancels[id]
if !ok || cancel == nil {
s.runningCancelsMu.Unlock()
return false
}
if strings.TrimSpace(note) != "" {
if s.abortUserNotes == nil {
s.abortUserNotes = make(map[string]string)
}
s.abortUserNotes[id] = strings.TrimSpace(note)
}
s.runningCancelsMu.Unlock()
cancel()
return true
}
// CancelToolExecution 取消正在执行的内部工具调用(无用户说明)。
func (s *Server) CancelToolExecution(id string) bool {
return s.CancelToolExecutionWithNote(id, "")
}
// initDefaultPrompts 初始化默认提示词模板
func (s *Server) initDefaultPrompts() {
s.mu.Lock()
+35 -1
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
)
@@ -192,7 +193,7 @@ type ToolExecution struct {
ID string `json:"id"`
ToolName string `json:"toolName"`
Arguments map[string]interface{} `json:"arguments"`
Status string `json:"status"` // pending, running, completed, failed
Status string `json:"status"` // pending, running, completed, failed, cancelled
Result *ToolResult `json:"result,omitempty"`
Error string `json:"error,omitempty"`
StartTime time.Time `json:"startTime"`
@@ -293,3 +294,36 @@ type SamplingContent struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
}
// ToolResultPlainText 拼接工具结果中的文本(手动终止时作为「工具原始输出」)。
func ToolResultPlainText(r *ToolResult) string {
if r == nil || len(r.Content) == 0 {
return ""
}
var b strings.Builder
for _, c := range r.Content {
b.WriteString(c.Text)
}
return strings.TrimSpace(b.String())
}
// AbortNoteBannerForModel 标出后续文本来自「用户手动终止工具时在弹窗中填写」,避免与 stdout/stderr 混淆。
const AbortNoteBannerForModel = "---\n" +
"【用户终止说明|USER INTERRUPT NOTE】\n" +
"(以下由操作者填写,用于指示模型如何继续;不是工具原始输出。)\n" +
"Written by the operator when stopping this tool; not raw tool output.\n" +
"---"
// MergePartialToolOutputAndAbortNote 格式:工具原始输出 + 醒目标题 + 用户终止说明(无说明则原样返回 partial)。
func MergePartialToolOutputAndAbortNote(partial, userNote string) string {
partial = strings.TrimSpace(partial)
userNote = strings.TrimSpace(userNote)
if userNote == "" {
return partial
}
section := AbortNoteBannerForModel + "\n" + userNote
if partial == "" {
return section
}
return partial + "\n\n" + section
}