From f988b9f611122b4a63d53374fcc9746585aae1f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 29 Mar 2026 01:42:23 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 14 +++++++++----- internal/handler/multi_agent.go | 11 +++++++++-- internal/handler/sse_keepalive.go | 28 ++++++++++++++++++++++++---- 3 files changed, 42 insertions(+), 11 deletions(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index f8ef0b2c..645777ce 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -12,6 +12,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "unicode/utf8" @@ -776,6 +777,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { // 发送初始事件 // 用于跟踪客户端是否已断开连接 clientDisconnected := false + // 与 sseKeepalive 共用:禁止并发写 ResponseWriter,否则会破坏 chunked 编码(ERR_INVALID_CHUNKED_ENCODING)。 + var sseWriteMu sync.Mutex // 用于快速确认模型是否真的产生了流式 delta var responseDeltaCount int var responseStartLogged bool @@ -843,19 +846,20 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { } eventJSON, _ := json.Marshal(event) - // 尝试写入事件,如果失败则标记客户端断开 - if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", eventJSON); err != nil { + sseWriteMu.Lock() + _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", eventJSON) + if err != nil { + sseWriteMu.Unlock() clientDisconnected = true h.logger.Debug("客户端断开连接,停止发送SSE事件", zap.Error(err)) return } - - // 刷新响应,如果失败则标记客户端断开 if flusher, ok := c.Writer.(http.Flusher); ok { flusher.Flush() } else { c.Writer.Flush() } + sseWriteMu.Unlock() } // 如果没有对话ID,创建新对话(WebShell 助手模式下关联连接 ID 以便持久化展示) @@ -1066,7 +1070,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { sendEvent("progress", "正在分析您的请求...", nil) // 注意:roleSkills 已在上方根据 req.Role 或 WebShell 模式设置 stopKeepalive := make(chan struct{}) - go sseKeepalive(c, stopKeepalive) + go sseKeepalive(c, stopKeepalive, &sseWriteMu) defer close(stopKeepalive) result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools, roleSkills) diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index a596d8ea..4ff6b7f6 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -7,6 +7,7 @@ import ( "fmt" "net/http" "strings" + "sync" "time" "cyberstrike-ai/internal/multiagent" @@ -49,6 +50,8 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { var baseCtx context.Context clientDisconnected := false + // 与 sseKeepalive 共用:禁止并发写 ResponseWriter,否则会破坏 chunked 编码(ERR_INVALID_CHUNKED_ENCODING)。 + var sseWriteMu sync.Mutex sendEvent := func(eventType, message string, data interface{}) { if clientDisconnected { return @@ -66,7 +69,10 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { } ev := StreamEvent{Type: eventType, Message: message, Data: data} b, _ := json.Marshal(ev) - if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", b); err != nil { + sseWriteMu.Lock() + _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", b) + if err != nil { + sseWriteMu.Unlock() clientDisconnected = true return } @@ -75,6 +81,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { } else { c.Writer.Flush() } + sseWriteMu.Unlock() } h.logger.Info("收到 Eino DeepAgent 流式请求", @@ -130,7 +137,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { }) stopKeepalive := make(chan struct{}) - go sseKeepalive(c, stopKeepalive) + go sseKeepalive(c, stopKeepalive, &sseWriteMu) defer close(stopKeepalive) result, runErr := multiagent.RunDeepAgent( diff --git a/internal/handler/sse_keepalive.go b/internal/handler/sse_keepalive.go index d21e4095..ae750ecd 100644 --- a/internal/handler/sse_keepalive.go +++ b/internal/handler/sse_keepalive.go @@ -3,15 +3,27 @@ package handler import ( "fmt" "net/http" + "sync" "time" "github.com/gin-gonic/gin" ) -// sseKeepalive sends periodic SSE comment lines so proxies (e.g. nginx proxy_read_timeout) -// and idle TCP paths do not close long-running streams when no data events are emitted for a while. -func sseKeepalive(c *gin.Context, stop <-chan struct{}) { - ticker := time.NewTicker(20 * time.Second) +// sseInterval is how often we write on long SSE streams. Shorter intervals help NATs and +// some proxies that treat connections as idle; 10s is a reasonable balance with traffic. +const sseKeepaliveInterval = 10 * time.Second + +// sseKeepalive sends periodic SSE traffic so proxies (e.g. nginx proxy_read_timeout), NATs, +// and load balancers do not close long-running streams. Some intermediaries ignore comment-only +// lines, so we send both a comment and a minimal data frame (type heartbeat) per tick. +// +// writeMu must be the same mutex used by sendEvent for this request: concurrent writes to +// http.ResponseWriter break chunked transfer encoding (browser: net::ERR_INVALID_CHUNKED_ENCODING). +func sseKeepalive(c *gin.Context, stop <-chan struct{}, writeMu *sync.Mutex) { + if writeMu == nil { + return + } + ticker := time.NewTicker(sseKeepaliveInterval) defer ticker.Stop() for { select { @@ -27,12 +39,20 @@ func sseKeepalive(c *gin.Context, stop <-chan struct{}) { return default: } + writeMu.Lock() if _, err := fmt.Fprintf(c.Writer, ": keepalive\n\n"); err != nil { + writeMu.Unlock() + return + } + // data: frame so strict proxies still see downstream bytes (comments alone may not reset timers) + if _, err := fmt.Fprintf(c.Writer, `data: {"type":"heartbeat"}`+"\n\n"); err != nil { + writeMu.Unlock() return } if flusher, ok := c.Writer.(http.Flusher); ok { flusher.Flush() } + writeMu.Unlock() } } }