Add files via upload

This commit is contained in:
公明
2026-03-29 01:42:23 +08:00
committed by GitHub
parent 40af245eba
commit f988b9f611
3 changed files with 42 additions and 11 deletions
+9 -5
View File
@@ -12,6 +12,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"
@@ -776,6 +777,8 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
// 发送初始事件
// 用于跟踪客户端是否已断开连接
clientDisconnected := false
// 与 sseKeepalive 共用:禁止并发写 ResponseWriter,否则会破坏 chunked 编码(ERR_INVALID_CHUNKED_ENCODING)。
var sseWriteMu sync.Mutex
// 用于快速确认模型是否真的产生了流式 delta
var responseDeltaCount int
var responseStartLogged bool
@@ -843,19 +846,20 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
}
eventJSON, _ := json.Marshal(event)
// 尝试写入事件,如果失败则标记客户端断开
if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", eventJSON); err != nil {
sseWriteMu.Lock()
_, err := fmt.Fprintf(c.Writer, "data: %s\n\n", eventJSON)
if err != nil {
sseWriteMu.Unlock()
clientDisconnected = true
h.logger.Debug("客户端断开连接,停止发送SSE事件", zap.Error(err))
return
}
// 刷新响应,如果失败则标记客户端断开
if flusher, ok := c.Writer.(http.Flusher); ok {
flusher.Flush()
} else {
c.Writer.Flush()
}
sseWriteMu.Unlock()
}
// 如果没有对话ID,创建新对话(WebShell 助手模式下关联连接 ID 以便持久化展示)
@@ -1066,7 +1070,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
sendEvent("progress", "正在分析您的请求...", nil)
// 注意:roleSkills 已在上方根据 req.Role 或 WebShell 模式设置
stopKeepalive := make(chan struct{})
go sseKeepalive(c, stopKeepalive)
go sseKeepalive(c, stopKeepalive, &sseWriteMu)
defer close(stopKeepalive)
result, err := h.agent.AgentLoopWithProgress(taskCtx, finalMessage, agentHistoryMessages, conversationID, progressCallback, roleTools, roleSkills)
+9 -2
View File
@@ -7,6 +7,7 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"
"cyberstrike-ai/internal/multiagent"
@@ -49,6 +50,8 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
var baseCtx context.Context
clientDisconnected := false
// 与 sseKeepalive 共用:禁止并发写 ResponseWriter,否则会破坏 chunked 编码(ERR_INVALID_CHUNKED_ENCODING)。
var sseWriteMu sync.Mutex
sendEvent := func(eventType, message string, data interface{}) {
if clientDisconnected {
return
@@ -66,7 +69,10 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
}
ev := StreamEvent{Type: eventType, Message: message, Data: data}
b, _ := json.Marshal(ev)
if _, err := fmt.Fprintf(c.Writer, "data: %s\n\n", b); err != nil {
sseWriteMu.Lock()
_, err := fmt.Fprintf(c.Writer, "data: %s\n\n", b)
if err != nil {
sseWriteMu.Unlock()
clientDisconnected = true
return
}
@@ -75,6 +81,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
} else {
c.Writer.Flush()
}
sseWriteMu.Unlock()
}
h.logger.Info("收到 Eino DeepAgent 流式请求",
@@ -130,7 +137,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
})
stopKeepalive := make(chan struct{})
go sseKeepalive(c, stopKeepalive)
go sseKeepalive(c, stopKeepalive, &sseWriteMu)
defer close(stopKeepalive)
result, runErr := multiagent.RunDeepAgent(
+24 -4
View File
@@ -3,15 +3,27 @@ package handler
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
)
// sseKeepalive sends periodic SSE comment lines so proxies (e.g. nginx proxy_read_timeout)
// and idle TCP paths do not close long-running streams when no data events are emitted for a while.
func sseKeepalive(c *gin.Context, stop <-chan struct{}) {
ticker := time.NewTicker(20 * time.Second)
// sseInterval is how often we write on long SSE streams. Shorter intervals help NATs and
// some proxies that treat connections as idle; 10s is a reasonable balance with traffic.
const sseKeepaliveInterval = 10 * time.Second
// sseKeepalive sends periodic SSE traffic so proxies (e.g. nginx proxy_read_timeout), NATs,
// and load balancers do not close long-running streams. Some intermediaries ignore comment-only
// lines, so we send both a comment and a minimal data frame (type heartbeat) per tick.
//
// writeMu must be the same mutex used by sendEvent for this request: concurrent writes to
// http.ResponseWriter break chunked transfer encoding (browser: net::ERR_INVALID_CHUNKED_ENCODING).
func sseKeepalive(c *gin.Context, stop <-chan struct{}, writeMu *sync.Mutex) {
if writeMu == nil {
return
}
ticker := time.NewTicker(sseKeepaliveInterval)
defer ticker.Stop()
for {
select {
@@ -27,12 +39,20 @@ func sseKeepalive(c *gin.Context, stop <-chan struct{}) {
return
default:
}
writeMu.Lock()
if _, err := fmt.Fprintf(c.Writer, ": keepalive\n\n"); err != nil {
writeMu.Unlock()
return
}
// data: frame so strict proxies still see downstream bytes (comments alone may not reset timers)
if _, err := fmt.Fprintf(c.Writer, `data: {"type":"heartbeat"}`+"\n\n"); err != nil {
writeMu.Unlock()
return
}
if flusher, ok := c.Writer.(http.Flusher); ok {
flusher.Flush()
}
writeMu.Unlock()
}
}
}