From 4ce6b3427b900b1593cd54895602ad3c72f0d198 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 26 Nov 2025 00:34:24 +0800 Subject: [PATCH] Add files via upload --- internal/mcp/server.go | 75 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 6980dd08..81cabd10 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -38,6 +38,12 @@ type Server struct { mu sync.RWMutex logger *zap.Logger maxExecutionsInMemory int // 内存中最大执行记录数 + sseClients map[string]*sseClient +} + +type sseClient struct { + id string + send chan []byte } // ToolHandler 工具处理函数 @@ -60,6 +66,7 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server { storage: storage, logger: logger, maxExecutionsInMemory: 1000, // 默认最多在内存中保留1000条执行记录 + sseClients: make(map[string]*sseClient), } // 初始化默认提示词和资源 @@ -108,6 +115,11 @@ func (s *Server) ClearTools() { // HandleHTTP 处理HTTP请求 func (s *Server) HandleHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method == http.MethodGet && strings.Contains(r.Header.Get("Accept"), "text/event-stream") { + s.handleSSE(w, r) + return + } + if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return @@ -132,6 +144,69 @@ func (s *Server) HandleHTTP(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(response) } +// handleSSE 处理SSE连接(用于MCP HTTP传输的事件通道) +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + + client := &sseClient{ + id: uuid.New().String(), + send: make(chan []byte, 8), + } + + s.addSSEClient(client) + defer s.removeSSEClient(client.id) + + // 发送初始ready事件,告知客户端连接成功 + fmt.Fprintf(w, "event: message\ndata: {\"type\":\"ready\",\"status\":\"ok\"}\n\n") + flusher.Flush() + + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for { + select { + case <-r.Context().Done(): + return + case msg, ok := <-client.send: + if !ok { + return + } + fmt.Fprintf(w, "event: message\ndata: %s\n\n", msg) + flusher.Flush() + case <-ticker.C: + // 心跳保持连接 + fmt.Fprintf(w, ": ping\n\n") + flusher.Flush() + } + } +} + +// addSSEClient 注册SSE客户端 +func (s *Server) addSSEClient(client *sseClient) { + s.mu.Lock() + defer s.mu.Unlock() + s.sseClients[client.id] = client +} + +// removeSSEClient 移除SSE客户端 +func (s *Server) removeSSEClient(id string) { + s.mu.Lock() + defer s.mu.Unlock() + if client, exists := s.sseClients[id]; exists { + close(client.send) + delete(s.sseClients, id) + } +} + // handleMessage 处理MCP消息 func (s *Server) handleMessage(msg *Message) *Message { // 检查是否是通知(notification)- 通知没有id字段,不需要响应