Add files via upload

This commit is contained in:
公明
2025-11-26 00:34:24 +08:00
committed by GitHub
parent 8ffdbbae52
commit 4ce6b3427b

View File

@@ -38,6 +38,12 @@ type Server struct {
mu sync.RWMutex
logger *zap.Logger
maxExecutionsInMemory int // 内存中最大执行记录数
sseClients map[string]*sseClient
}
type sseClient struct {
id string
send chan []byte
}
// ToolHandler 工具处理函数
@@ -60,6 +66,7 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server {
storage: storage,
logger: logger,
maxExecutionsInMemory: 1000, // 默认最多在内存中保留1000条执行记录
sseClients: make(map[string]*sseClient),
}
// 初始化默认提示词和资源
@@ -108,6 +115,11 @@ func (s *Server) ClearTools() {
// HandleHTTP 处理HTTP请求
func (s *Server) HandleHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodGet && strings.Contains(r.Header.Get("Accept"), "text/event-stream") {
s.handleSSE(w, r)
return
}
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
@@ -132,6 +144,69 @@ func (s *Server) HandleHTTP(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(response)
}
// handleSSE 处理SSE连接用于MCP HTTP传输的事件通道
func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
client := &sseClient{
id: uuid.New().String(),
send: make(chan []byte, 8),
}
s.addSSEClient(client)
defer s.removeSSEClient(client.id)
// 发送初始ready事件告知客户端连接成功
fmt.Fprintf(w, "event: message\ndata: {\"type\":\"ready\",\"status\":\"ok\"}\n\n")
flusher.Flush()
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case msg, ok := <-client.send:
if !ok {
return
}
fmt.Fprintf(w, "event: message\ndata: %s\n\n", msg)
flusher.Flush()
case <-ticker.C:
// 心跳保持连接
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
}
}
}
// addSSEClient 注册SSE客户端
func (s *Server) addSSEClient(client *sseClient) {
s.mu.Lock()
defer s.mu.Unlock()
s.sseClients[client.id] = client
}
// removeSSEClient 移除SSE客户端
func (s *Server) removeSSEClient(id string) {
s.mu.Lock()
defer s.mu.Unlock()
if client, exists := s.sseClients[id]; exists {
close(client.send)
delete(s.sseClients, id)
}
}
// handleMessage 处理MCP消息
func (s *Server) handleMessage(msg *Message) *Message {
// 检查是否是通知notification- 通知没有id字段不需要响应