From 60e37953224c32d586157ab41b514ad5c22ca02e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 9 Jan 2026 19:02:16 +0800 Subject: [PATCH] Add files via upload --- internal/mcp/external_manager.go | 227 ++++++++++++++++++++++++------- 1 file changed, 180 insertions(+), 47 deletions(-) diff --git a/internal/mcp/external_manager.go b/internal/mcp/external_manager.go index b59745dc..8ef1335d 100644 --- a/internal/mcp/external_manager.go +++ b/internal/mcp/external_manager.go @@ -16,14 +16,18 @@ import ( // ExternalMCPManager 外部MCP管理器 type ExternalMCPManager struct { - clients map[string]ExternalMCPClient - configs map[string]config.ExternalMCPServerConfig - logger *zap.Logger - storage MonitorStorage // 可选的持久化存储 - executions map[string]*ToolExecution // 执行记录 - stats map[string]*ToolStats // 工具统计信息 - errors map[string]string // 错误信息 - mu sync.RWMutex + clients map[string]ExternalMCPClient + configs map[string]config.ExternalMCPServerConfig + logger *zap.Logger + storage MonitorStorage // 可选的持久化存储 + executions map[string]*ToolExecution // 执行记录 + stats map[string]*ToolStats // 工具统计信息 + errors map[string]string // 错误信息 + toolCounts map[string]int // 工具数量缓存 + toolCountsMu sync.RWMutex // 工具数量缓存的锁 + stopRefresh chan struct{} // 停止后台刷新的信号 + refreshWg sync.WaitGroup // 等待后台刷新goroutine完成 + mu sync.RWMutex } // NewExternalMCPManager 创建外部MCP管理器 @@ -33,15 +37,20 @@ func NewExternalMCPManager(logger *zap.Logger) *ExternalMCPManager { // NewExternalMCPManagerWithStorage 创建外部MCP管理器(带持久化存储) func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage) *ExternalMCPManager { - return &ExternalMCPManager{ - clients: make(map[string]ExternalMCPClient), - configs: make(map[string]config.ExternalMCPServerConfig), - logger: logger, - storage: storage, - executions: make(map[string]*ToolExecution), - stats: make(map[string]*ToolStats), - errors: make(map[string]string), + manager := &ExternalMCPManager{ + clients: make(map[string]ExternalMCPClient), + configs: make(map[string]config.ExternalMCPServerConfig), + logger: logger, + storage: storage, + executions: make(map[string]*ToolExecution), + stats: make(map[string]*ToolStats), + errors: make(map[string]string), + toolCounts: make(map[string]int), + stopRefresh: make(chan struct{}), } + // 启动后台刷新工具数量的goroutine + manager.startToolCountRefresh() + return manager } // LoadConfigs 加载配置 @@ -104,6 +113,12 @@ func (m *ExternalMCPManager) RemoveConfig(name string) error { } delete(m.configs, name) + + // 清理工具数量缓存 + m.toolCountsMu.Lock() + delete(m.toolCounts, name) + m.toolCountsMu.Unlock() + return nil } @@ -174,11 +189,15 @@ func (m *ExternalMCPManager) StartClient(name string) error { m.mu.Lock() m.errors[name] = err.Error() m.mu.Unlock() + // 触发工具数量刷新(连接失败,工具数量应为0) + m.triggerToolCountRefresh() } else { // 连接成功,清除错误信息 m.mu.Lock() delete(m.errors, name) m.mu.Unlock() + // 连接成功,立即刷新工具数量 + m.triggerToolCountRefresh() } }() @@ -204,6 +223,11 @@ func (m *ExternalMCPManager) StopClient(name string) error { // 清除错误信息 delete(m.errors, name) + // 更新工具数量缓存(停止后工具数量为0) + m.toolCountsMu.Lock() + m.toolCounts[name] = 0 + m.toolCountsMu.Unlock() + // 更新配置为禁用 serverCfg.ExternalMCPEnable = false m.configs[name] = serverCfg @@ -532,31 +556,50 @@ func (m *ExternalMCPManager) GetToolStats() map[string]*ToolStats { return result } -// GetToolCount 获取指定外部MCP的工具数量 +// GetToolCount 获取指定外部MCP的工具数量(从缓存读取,不阻塞) func (m *ExternalMCPManager) GetToolCount(name string) (int, error) { + // 先从缓存读取 + m.toolCountsMu.RLock() + if count, exists := m.toolCounts[name]; exists { + m.toolCountsMu.RUnlock() + return count, nil + } + m.toolCountsMu.RUnlock() + + // 如果缓存中没有,检查客户端状态 client, exists := m.GetClient(name) if !exists { return 0, fmt.Errorf("客户端不存在: %s", name) } if !client.IsConnected() { + // 未连接,缓存为0 + m.toolCountsMu.Lock() + m.toolCounts[name] = 0 + m.toolCountsMu.Unlock() return 0, nil } - // 增加超时时间到30秒,因为通过代理连接远程服务器可能需要更长时间 - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - defer cancel() - - tools, err := client.ListTools(ctx) - if err != nil { - return 0, fmt.Errorf("获取工具列表失败: %w", err) - } - - return len(tools), nil + // 如果已连接但缓存中没有,触发异步刷新并返回0(避免阻塞) + m.triggerToolCountRefresh() + return 0, nil } -// GetToolCounts 获取所有外部MCP的工具数量 +// GetToolCounts 获取所有外部MCP的工具数量(从缓存读取,不阻塞) func (m *ExternalMCPManager) GetToolCounts() map[string]int { + m.toolCountsMu.RLock() + defer m.toolCountsMu.RUnlock() + + // 返回缓存的副本,避免外部修改 + result := make(map[string]int) + for k, v := range m.toolCounts { + result[k] = v + } + return result +} + +// refreshToolCounts 刷新工具数量缓存(后台异步执行) +func (m *ExternalMCPManager) refreshToolCounts() { m.mu.RLock() clients := make(map[string]ExternalMCPClient) for k, v := range m.clients { @@ -564,31 +607,104 @@ func (m *ExternalMCPManager) GetToolCounts() map[string]int { } m.mu.RUnlock() - result := make(map[string]int) + newCounts := make(map[string]int) + + // 使用goroutine并发获取每个客户端的工具数量,避免串行阻塞 + type countResult struct { + name string + count int + } + resultChan := make(chan countResult, len(clients)) + for name, client := range clients { - if !client.IsConnected() { - result[name] = 0 - continue - } + go func(n string, c ExternalMCPClient) { + if !c.IsConnected() { + resultChan <- countResult{name: n, count: 0} + return + } - // 增加超时时间到30秒,因为通过代理连接远程服务器可能需要更长时间 - ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) - tools, err := client.ListTools(ctx) - cancel() + // 使用合理的超时时间(15秒),既能应对网络延迟,又不会过长阻塞 + // 由于这是后台异步刷新,超时不会影响前端响应 + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + tools, err := c.ListTools(ctx) + cancel() - if err != nil { - m.logger.Warn("获取外部MCP工具数量失败", - zap.String("name", name), - zap.Error(err), - ) - result[name] = 0 - continue - } + if err != nil { + m.logger.Debug("获取外部MCP工具数量失败", + zap.String("name", n), + zap.Error(err), + ) + // 如果获取失败,保留旧值(在更新时处理) + resultChan <- countResult{name: n, count: -1} // -1 表示使用旧值 + return + } - result[name] = len(tools) + resultChan <- countResult{name: n, count: len(tools)} + }(name, client) } - return result + // 收集结果 + m.toolCountsMu.RLock() + oldCounts := make(map[string]int) + for k, v := range m.toolCounts { + oldCounts[k] = v + } + m.toolCountsMu.RUnlock() + + for i := 0; i < len(clients); i++ { + result := <-resultChan + if result.count >= 0 { + newCounts[result.name] = result.count + } else { + // 获取失败,保留旧值 + if oldCount, exists := oldCounts[result.name]; exists { + newCounts[result.name] = oldCount + } else { + newCounts[result.name] = 0 + } + } + } + + // 更新缓存 + m.toolCountsMu.Lock() + // 更新所有获取到的值 + for name, count := range newCounts { + m.toolCounts[name] = count + } + // 对于未连接的客户端,设置为0 + for name, client := range clients { + if !client.IsConnected() { + m.toolCounts[name] = 0 + } + } + m.toolCountsMu.Unlock() +} + +// startToolCountRefresh 启动后台刷新工具数量的goroutine +func (m *ExternalMCPManager) startToolCountRefresh() { + m.refreshWg.Add(1) + go func() { + defer m.refreshWg.Done() + ticker := time.NewTicker(10 * time.Second) // 每10秒刷新一次 + defer ticker.Stop() + + // 立即执行一次刷新 + m.refreshToolCounts() + + for { + select { + case <-ticker.C: + m.refreshToolCounts() + case <-m.stopRefresh: + return + } + } + }() +} + +// triggerToolCountRefresh 触发立即刷新工具数量(异步) +func (m *ExternalMCPManager) triggerToolCountRefresh() { + go m.refreshToolCounts() } // createClient 创建客户端(不连接) @@ -703,6 +819,9 @@ func (m *ExternalMCPManager) connectClient(name string, serverCfg config.Externa zap.String("name", name), ) + // 连接成功,触发工具数量刷新 + m.triggerToolCountRefresh() + return nil } @@ -801,4 +920,18 @@ func (m *ExternalMCPManager) StopAll() { client.Close() delete(m.clients, name) } + + // 清理所有工具数量缓存 + m.toolCountsMu.Lock() + m.toolCounts = make(map[string]int) + m.toolCountsMu.Unlock() + + // 停止后台刷新(使用 select 避免重复关闭 channel) + select { + case <-m.stopRefresh: + // 已经关闭,不需要再次关闭 + default: + close(m.stopRefresh) + m.refreshWg.Wait() + } }