Add files via upload

This commit is contained in:
公明
2026-01-09 19:02:16 +08:00
committed by GitHub
parent 28ca7f1851
commit 60e3795322

View File

@@ -16,14 +16,18 @@ import (
// ExternalMCPManager 外部MCP管理器
type ExternalMCPManager struct {
clients map[string]ExternalMCPClient
configs map[string]config.ExternalMCPServerConfig
logger *zap.Logger
storage MonitorStorage // 可选的持久化存储
executions map[string]*ToolExecution // 执行记录
stats map[string]*ToolStats // 工具统计信息
errors map[string]string // 错误信息
mu sync.RWMutex
clients map[string]ExternalMCPClient
configs map[string]config.ExternalMCPServerConfig
logger *zap.Logger
storage MonitorStorage // 可选的持久化存储
executions map[string]*ToolExecution // 执行记录
stats map[string]*ToolStats // 工具统计信息
errors map[string]string // 错误信息
toolCounts map[string]int // 工具数量缓存
toolCountsMu sync.RWMutex // 工具数量缓存的锁
stopRefresh chan struct{} // 停止后台刷新的信号
refreshWg sync.WaitGroup // 等待后台刷新goroutine完成
mu sync.RWMutex
}
// NewExternalMCPManager 创建外部MCP管理器
@@ -33,15 +37,20 @@ func NewExternalMCPManager(logger *zap.Logger) *ExternalMCPManager {
// NewExternalMCPManagerWithStorage 创建外部MCP管理器带持久化存储
func NewExternalMCPManagerWithStorage(logger *zap.Logger, storage MonitorStorage) *ExternalMCPManager {
return &ExternalMCPManager{
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
manager := &ExternalMCPManager{
clients: make(map[string]ExternalMCPClient),
configs: make(map[string]config.ExternalMCPServerConfig),
logger: logger,
storage: storage,
executions: make(map[string]*ToolExecution),
stats: make(map[string]*ToolStats),
errors: make(map[string]string),
toolCounts: make(map[string]int),
stopRefresh: make(chan struct{}),
}
// 启动后台刷新工具数量的goroutine
manager.startToolCountRefresh()
return manager
}
// LoadConfigs 加载配置
@@ -104,6 +113,12 @@ func (m *ExternalMCPManager) RemoveConfig(name string) error {
}
delete(m.configs, name)
// 清理工具数量缓存
m.toolCountsMu.Lock()
delete(m.toolCounts, name)
m.toolCountsMu.Unlock()
return nil
}
@@ -174,11 +189,15 @@ func (m *ExternalMCPManager) StartClient(name string) error {
m.mu.Lock()
m.errors[name] = err.Error()
m.mu.Unlock()
// 触发工具数量刷新连接失败工具数量应为0
m.triggerToolCountRefresh()
} else {
// 连接成功,清除错误信息
m.mu.Lock()
delete(m.errors, name)
m.mu.Unlock()
// 连接成功,立即刷新工具数量
m.triggerToolCountRefresh()
}
}()
@@ -204,6 +223,11 @@ func (m *ExternalMCPManager) StopClient(name string) error {
// 清除错误信息
delete(m.errors, name)
// 更新工具数量缓存停止后工具数量为0
m.toolCountsMu.Lock()
m.toolCounts[name] = 0
m.toolCountsMu.Unlock()
// 更新配置为禁用
serverCfg.ExternalMCPEnable = false
m.configs[name] = serverCfg
@@ -532,31 +556,50 @@ func (m *ExternalMCPManager) GetToolStats() map[string]*ToolStats {
return result
}
// GetToolCount 获取指定外部MCP的工具数量
// GetToolCount 获取指定外部MCP的工具数量(从缓存读取,不阻塞)
func (m *ExternalMCPManager) GetToolCount(name string) (int, error) {
// 先从缓存读取
m.toolCountsMu.RLock()
if count, exists := m.toolCounts[name]; exists {
m.toolCountsMu.RUnlock()
return count, nil
}
m.toolCountsMu.RUnlock()
// 如果缓存中没有,检查客户端状态
client, exists := m.GetClient(name)
if !exists {
return 0, fmt.Errorf("客户端不存在: %s", name)
}
if !client.IsConnected() {
// 未连接缓存为0
m.toolCountsMu.Lock()
m.toolCounts[name] = 0
m.toolCountsMu.Unlock()
return 0, nil
}
// 增加超时时间到30秒因为通过代理连接远程服务器可能需要更长时间
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
tools, err := client.ListTools(ctx)
if err != nil {
return 0, fmt.Errorf("获取工具列表失败: %w", err)
}
return len(tools), nil
// 如果已连接但缓存中没有触发异步刷新并返回0避免阻塞
m.triggerToolCountRefresh()
return 0, nil
}
// GetToolCounts 获取所有外部MCP的工具数量
// GetToolCounts 获取所有外部MCP的工具数量(从缓存读取,不阻塞)
func (m *ExternalMCPManager) GetToolCounts() map[string]int {
m.toolCountsMu.RLock()
defer m.toolCountsMu.RUnlock()
// 返回缓存的副本,避免外部修改
result := make(map[string]int)
for k, v := range m.toolCounts {
result[k] = v
}
return result
}
// refreshToolCounts 刷新工具数量缓存(后台异步执行)
func (m *ExternalMCPManager) refreshToolCounts() {
m.mu.RLock()
clients := make(map[string]ExternalMCPClient)
for k, v := range m.clients {
@@ -564,31 +607,104 @@ func (m *ExternalMCPManager) GetToolCounts() map[string]int {
}
m.mu.RUnlock()
result := make(map[string]int)
newCounts := make(map[string]int)
// 使用goroutine并发获取每个客户端的工具数量避免串行阻塞
type countResult struct {
name string
count int
}
resultChan := make(chan countResult, len(clients))
for name, client := range clients {
if !client.IsConnected() {
result[name] = 0
continue
}
go func(n string, c ExternalMCPClient) {
if !c.IsConnected() {
resultChan <- countResult{name: n, count: 0}
return
}
// 增加超时时间到30秒因为通过代理连接远程服务器可能需要更长时间
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
tools, err := client.ListTools(ctx)
cancel()
// 使用合理的超时时间15秒既能应对网络延迟又不会过长阻塞
// 由于这是后台异步刷新,超时不会影响前端响应
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
tools, err := c.ListTools(ctx)
cancel()
if err != nil {
m.logger.Warn("获取外部MCP工具数量失败",
zap.String("name", name),
zap.Error(err),
)
result[name] = 0
continue
}
if err != nil {
m.logger.Debug("获取外部MCP工具数量失败",
zap.String("name", n),
zap.Error(err),
)
// 如果获取失败,保留旧值(在更新时处理)
resultChan <- countResult{name: n, count: -1} // -1 表示使用旧值
return
}
result[name] = len(tools)
resultChan <- countResult{name: n, count: len(tools)}
}(name, client)
}
return result
// 收集结果
m.toolCountsMu.RLock()
oldCounts := make(map[string]int)
for k, v := range m.toolCounts {
oldCounts[k] = v
}
m.toolCountsMu.RUnlock()
for i := 0; i < len(clients); i++ {
result := <-resultChan
if result.count >= 0 {
newCounts[result.name] = result.count
} else {
// 获取失败,保留旧值
if oldCount, exists := oldCounts[result.name]; exists {
newCounts[result.name] = oldCount
} else {
newCounts[result.name] = 0
}
}
}
// 更新缓存
m.toolCountsMu.Lock()
// 更新所有获取到的值
for name, count := range newCounts {
m.toolCounts[name] = count
}
// 对于未连接的客户端设置为0
for name, client := range clients {
if !client.IsConnected() {
m.toolCounts[name] = 0
}
}
m.toolCountsMu.Unlock()
}
// startToolCountRefresh 启动后台刷新工具数量的goroutine
func (m *ExternalMCPManager) startToolCountRefresh() {
m.refreshWg.Add(1)
go func() {
defer m.refreshWg.Done()
ticker := time.NewTicker(10 * time.Second) // 每10秒刷新一次
defer ticker.Stop()
// 立即执行一次刷新
m.refreshToolCounts()
for {
select {
case <-ticker.C:
m.refreshToolCounts()
case <-m.stopRefresh:
return
}
}
}()
}
// triggerToolCountRefresh 触发立即刷新工具数量(异步)
func (m *ExternalMCPManager) triggerToolCountRefresh() {
go m.refreshToolCounts()
}
// createClient 创建客户端(不连接)
@@ -703,6 +819,9 @@ func (m *ExternalMCPManager) connectClient(name string, serverCfg config.Externa
zap.String("name", name),
)
// 连接成功,触发工具数量刷新
m.triggerToolCountRefresh()
return nil
}
@@ -801,4 +920,18 @@ func (m *ExternalMCPManager) StopAll() {
client.Close()
delete(m.clients, name)
}
// 清理所有工具数量缓存
m.toolCountsMu.Lock()
m.toolCounts = make(map[string]int)
m.toolCountsMu.Unlock()
// 停止后台刷新(使用 select 避免重复关闭 channel
select {
case <-m.stopRefresh:
// 已经关闭,不需要再次关闭
default:
close(m.stopRefresh)
m.refreshWg.Wait()
}
}