From 7f4e8d2ad237a89f420aa73222738a4a2c8ec95d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 12 Jun 2026 19:41:47 +0800 Subject: [PATCH] Add files via upload --- internal/mcp/external_manager.go | 218 +++++++++++++++++++++++-------- 1 file changed, 162 insertions(+), 56 deletions(-) diff --git a/internal/mcp/external_manager.go b/internal/mcp/external_manager.go index 036f243a..470e9715 100644 --- a/internal/mcp/external_manager.go +++ b/internal/mcp/external_manager.go @@ -15,6 +15,26 @@ import ( "go.uber.org/zap" ) +const ( + // externalToolListCacheTTL 已连接外部 MCP 的工具列表缓存有效期,避免每次 API 请求都打远程 ListTools。 + externalToolListCacheTTL = 60 * time.Second + // externalToolCountRefreshInterval 后台刷新工具数量的间隔(仅刷新缓存过期或缺失的客户端)。 + externalToolCountRefreshInterval = 60 * time.Second +) + +// toolListCacheEntry 外部 MCP 工具列表缓存条目 +type toolListCacheEntry struct { + tools []Tool + updatedAt time.Time +} + +// listToolsInflight 合并同一 MCP 上并发的 ListTools 请求 +type listToolsInflight struct { + done chan struct{} + tools []Tool + err error +} + // ExternalMCPManager 外部MCP管理器 type ExternalMCPManager struct { clients map[string]ExternalMCPClient @@ -26,8 +46,10 @@ type ExternalMCPManager struct { errors map[string]string // 错误信息 toolCounts map[string]int // 工具数量缓存 toolCountsMu sync.RWMutex // 工具数量缓存的锁 - toolCache map[string][]Tool // 工具列表缓存:MCP名称 -> 工具列表 + toolCache map[string]toolListCacheEntry // 工具列表缓存:MCP名称 -> 工具列表 toolCacheMu sync.RWMutex // 工具列表缓存的锁 + listToolsMu sync.Mutex + listToolsInflight map[string]*listToolsInflight stopRefresh chan struct{} // 停止后台刷新的信号 refreshWg sync.WaitGroup // 等待后台刷新goroutine完成 refreshing atomic.Bool // 防止 refreshToolCounts 并发堆积 @@ -51,9 +73,10 @@ func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage executions: make(map[string]*ToolExecution), stats: make(map[string]*ToolStats), errors: make(map[string]string), - toolCounts: make(map[string]int), - toolCache: make(map[string][]Tool), - stopRefresh: make(chan struct{}), + toolCounts: make(map[string]int), + toolCache: make(map[string]toolListCacheEntry), + listToolsInflight: make(map[string]*listToolsInflight), + stopRefresh: make(chan struct{}), runningCancels: make(map[string]context.CancelFunc), abortUserNotes: make(map[string]string), } @@ -210,15 +233,8 @@ func (m *ExternalMCPManager) StartClient(name string) error { m.mu.Lock() delete(m.errors, name) m.mu.Unlock() - // 立即刷新工具数量和工具列表缓存 - m.triggerToolCountRefresh() - m.refreshToolCache(name, client) - // 2 秒后再刷新一次,覆盖 SSE/Streamable 等需稍等就绪的远端 - go func() { - time.Sleep(2 * time.Second) - m.triggerToolCountRefresh() - m.refreshToolCache(name, client) - }() + // 异步拉取工具列表(singleflight 去重,结果同时写入 toolCache 与 toolCounts) + go m.refreshToolCache(name, client) } }() @@ -249,6 +265,10 @@ func (m *ExternalMCPManager) StopClient(name string) error { m.toolCounts[name] = 0 m.toolCountsMu.Unlock() + m.toolCacheMu.Lock() + delete(m.toolCache, name) + m.toolCacheMu.Unlock() + // 更新配置为禁用 serverCfg.ExternalMCPEnable = false m.configs[name] = serverCfg @@ -335,16 +355,19 @@ func (m *ExternalMCPManager) getToolsForClient(name string, client ExternalMCPCl return nil, fmt.Errorf("外部MCP连接失败: %s", name) } - // 已连接:尝试获取最新工具列表 + // 已连接:缓存优先,仅在缺失或过期时打远程 ListTools if client.IsConnected() { - tools, err := client.ListTools(ctx) + if tools, ok := m.getFreshCachedTools(name); ok { + return tools, nil + } + if tools, ok := m.getAnyCachedTools(name); ok { + m.triggerToolListRefresh(name, client) + return tools, nil + } + tools, err := m.listToolsDeduped(ctx, name, client) if err != nil { - // 获取失败,尝试使用缓存 return m.getCachedTools(name, "连接正常但获取失败", err) } - - // 获取成功,更新缓存 - m.updateToolCache(name, tools) return tools, nil } @@ -361,37 +384,126 @@ func (m *ExternalMCPManager) getToolsForClient(name string, client ExternalMCPCl return nil, fmt.Errorf("外部MCP状态未知: %s (状态: %s)", name, status) } -// getCachedTools 获取缓存的工具列表 +// getCachedTools 获取缓存的工具列表(含空列表缓存) func (m *ExternalMCPManager) getCachedTools(name, reason string, originalErr error) ([]Tool, error) { - m.toolCacheMu.RLock() - cachedTools, hasCache := m.toolCache[name] - m.toolCacheMu.RUnlock() - - if hasCache && len(cachedTools) > 0 { + if tools, ok := m.getAnyCachedTools(name); ok { m.logger.Debug("使用缓存的工具列表", zap.String("name", name), zap.String("reason", reason), - zap.Int("count", len(cachedTools)), + zap.Int("count", len(tools)), zap.Error(originalErr), ) - return cachedTools, nil + return tools, nil } - // 无缓存,返回错误 if originalErr != nil { return nil, fmt.Errorf("获取外部MCP工具失败且无缓存: %w", originalErr) } return nil, fmt.Errorf("外部MCP无缓存工具: %s", name) } -// updateToolCache 更新工具列表缓存 -func (m *ExternalMCPManager) updateToolCache(name string, tools []Tool) { +func (m *ExternalMCPManager) isToolCacheFresh(updatedAt time.Time) bool { + return !updatedAt.IsZero() && time.Since(updatedAt) < externalToolListCacheTTL +} + +func cloneTools(tools []Tool) []Tool { + if len(tools) == 0 { + return nil + } + out := make([]Tool, len(tools)) + copy(out, tools) + return out +} + +func (m *ExternalMCPManager) getFreshCachedTools(name string) ([]Tool, bool) { + m.toolCacheMu.RLock() + entry, ok := m.toolCache[name] + m.toolCacheMu.RUnlock() + if !ok || !m.isToolCacheFresh(entry.updatedAt) { + return nil, false + } + return cloneTools(entry.tools), true +} + +func (m *ExternalMCPManager) getAnyCachedTools(name string) ([]Tool, bool) { + m.toolCacheMu.RLock() + entry, ok := m.toolCache[name] + m.toolCacheMu.RUnlock() + if !ok { + return nil, false + } + return cloneTools(entry.tools), true +} + +// listToolsDeduped 对同一 MCP 合并并发 ListTools,并更新 toolCache / toolCounts。 +func (m *ExternalMCPManager) listToolsDeduped(ctx context.Context, name string, client ExternalMCPClient) ([]Tool, error) { + m.listToolsMu.Lock() + if inflight, exists := m.listToolsInflight[name]; exists { + m.listToolsMu.Unlock() + select { + case <-inflight.done: + if inflight.err != nil { + return nil, inflight.err + } + return cloneTools(inflight.tools), nil + case <-ctx.Done(): + return nil, ctx.Err() + } + } + inflight := &listToolsInflight{done: make(chan struct{})} + m.listToolsInflight[name] = inflight + m.listToolsMu.Unlock() + + inflight.tools, inflight.err = client.ListTools(ctx) + if inflight.err == nil { + m.updateToolCache(name, inflight.tools) + } + + m.listToolsMu.Lock() + delete(m.listToolsInflight, name) + close(inflight.done) + m.listToolsMu.Unlock() + + if inflight.err != nil { + return nil, inflight.err + } + return cloneTools(inflight.tools), nil +} + +// InvalidateToolCache 清除指定外部 MCP 的工具列表缓存(手动刷新时使用) +func (m *ExternalMCPManager) InvalidateToolCache(name string) { m.toolCacheMu.Lock() - m.toolCache[name] = tools + delete(m.toolCache, name) + m.toolCacheMu.Unlock() +} + +// InvalidateAllToolCaches 清除所有外部 MCP 工具列表缓存 +func (m *ExternalMCPManager) InvalidateAllToolCaches() { + m.toolCacheMu.Lock() + m.toolCache = make(map[string]toolListCacheEntry) + m.toolCacheMu.Unlock() +} + +func (m *ExternalMCPManager) triggerToolListRefresh(name string, client ExternalMCPClient) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + _, _ = m.listToolsDeduped(ctx, name, client) + }() +} + +// updateToolCache 更新工具列表缓存与工具数量 +func (m *ExternalMCPManager) updateToolCache(name string, tools []Tool) { + stored := cloneTools(tools) + m.toolCacheMu.Lock() + m.toolCache[name] = toolListCacheEntry{tools: stored, updatedAt: time.Now()} m.toolCacheMu.Unlock() - // 如果返回空列表,记录警告 - if len(tools) == 0 { + m.toolCountsMu.Lock() + m.toolCounts[name] = len(stored) + m.toolCountsMu.Unlock() + + if len(stored) == 0 { m.logger.Warn("外部MCP返回空工具列表", zap.String("name", name), zap.String("hint", "服务可能暂时不可用,工具列表为空"), @@ -399,7 +511,7 @@ func (m *ExternalMCPManager) updateToolCache(name string, tools []Tool) { } else { m.logger.Debug("工具列表缓存已更新", zap.String("name", name), - zap.Int("count", len(tools)), + zap.Int("count", len(stored)), ) } } @@ -854,15 +966,21 @@ func (m *ExternalMCPManager) refreshToolCounts() { return } - // 使用合理的超时时间(15秒),既能应对网络延迟,又不会过长阻塞 - // 由于这是后台异步刷新,超时不会影响前端响应 + // 缓存仍新鲜时直接复用,避免与 GetAllTools 重复打远程 + if _, fresh := m.getFreshCachedTools(n); fresh { + m.toolCountsMu.RLock() + count := m.toolCounts[n] + m.toolCountsMu.RUnlock() + resultChan <- countResult{name: n, count: count} + return + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) - tools, err := c.ListTools(ctx) + tools, err := m.listToolsDeduped(ctx, n, c) cancel() if err != nil { errStr := err.Error() - // SSE 连接 EOF:远端可能关闭了流或未按规范在流上推送响应,仅首次用 Warn 提示 if strings.Contains(errStr, "EOF") || strings.Contains(errStr, "client is closing") { m.logger.Warn("获取外部MCP工具数量失败(SSE 流已关闭或服务端未在流上返回 tools/list 响应)", zap.String("name", n), @@ -875,7 +993,7 @@ func (m *ExternalMCPManager) refreshToolCounts() { zap.Error(err), ) } - resultChan <- countResult{name: n, count: -1} // -1 表示使用旧值 + resultChan <- countResult{name: n, count: -1} return } @@ -925,33 +1043,21 @@ func (m *ExternalMCPManager) refreshToolCache(name string, client ExternalMCPCli if !client.IsConnected() { return } - - // 检查状态,如果是error状态,不更新缓存 - status := client.GetStatus() - if status == "error" { + if client.GetStatus() == "error" { m.logger.Debug("跳过刷新工具列表缓存(连接失败)", zap.String("name", name), - zap.String("status", status), ) return } - // 使用较短的超时时间(5秒) - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() - - tools, err := client.ListTools(ctx) - if err != nil { + if _, err := m.listToolsDeduped(ctx, name, client); err != nil { m.logger.Debug("刷新工具列表缓存失败", zap.String("name", name), zap.Error(err), ) - // 刷新失败时不更新缓存,保留旧缓存(如果有) - return } - - // 使用统一的缓存更新方法 - m.updateToolCache(name, tools) } // startToolCountRefresh 启动后台刷新工具数量的goroutine @@ -959,7 +1065,7 @@ func (m *ExternalMCPManager) startToolCountRefresh() { m.refreshWg.Add(1) go func() { defer m.refreshWg.Done() - ticker := time.NewTicker(10 * time.Second) // 每10秒刷新一次 + ticker := time.NewTicker(externalToolCountRefreshInterval) defer ticker.Stop() // 立即执行一次刷新 @@ -1168,7 +1274,7 @@ func (m *ExternalMCPManager) StopAll() { // 清理所有工具列表缓存 m.toolCacheMu.Lock() - m.toolCache = make(map[string][]Tool) + m.toolCache = make(map[string]toolListCacheEntry) m.toolCacheMu.Unlock() // 停止后台刷新(使用 select 避免重复关闭 channel)