diff --git a/internal/handler/agent.go b/internal/handler/agent.go index b338c04f..4af89989 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -23,6 +23,7 @@ import ( "cyberstrike-ai/internal/mcp" "cyberstrike-ai/internal/mcp/builtin" "cyberstrike-ai/internal/multiagent" + "cyberstrike-ai/internal/openai" "github.com/gin-gonic/gin" "github.com/robfig/cron/v3" @@ -1158,7 +1159,16 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun return } if eventType == "response_delta" { - respPlan.b.WriteString(message) + if dataMap, ok := data.(map[string]interface{}); ok { + if acc, okAcc := dataMap[openai.SSEAccumulatedKey].(string); okAcc { + respPlan.b.Reset() + respPlan.b.WriteString(acc) + } else { + respPlan.b.WriteString(message) + } + } else { + respPlan.b.WriteString(message) + } if dataMap, ok := data.(map[string]interface{}); ok && respPlan.meta == nil { respPlan.meta = make(map[string]interface{}, len(dataMap)) for k, v := range dataMap { @@ -1213,8 +1223,12 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun } else if tb.persistAs == "" { tb.persistAs = persistAs } - // delta 片段直接拼接 - tb.b.WriteString(message) + if acc, okAcc := dataMap[openai.SSEAccumulatedKey].(string); okAcc { + tb.b.Reset() + tb.b.WriteString(acc) + } else { + tb.b.WriteString(message) + } // 有时 delta 先到 start 未到,补充元信息 for k, v := range dataMap { tb.meta[k] = v diff --git a/internal/openai/sse_stream.go b/internal/openai/sse_stream.go new file mode 100644 index 00000000..a86d6306 --- /dev/null +++ b/internal/openai/sse_stream.go @@ -0,0 +1,20 @@ +package openai + +// SSEAccumulatedKey 为 SSE progress 事件 data 中的服务端权威流式全文快照字段。 +// 前端应优先用该字段更新 buffer,避免对 delta 二次 normalize 导致叠字。 +const SSEAccumulatedKey = "accumulated" + +// WithSSEAccumulated 在 progress data 中附带当前流式累计全文(权威快照)。 +func WithSSEAccumulated(data map[string]interface{}, accumulated string) map[string]interface{} { + if data == nil { + data = make(map[string]interface{}, 1) + } + data[SSEAccumulatedKey] = accumulated + return data +} + +// NormalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。 +// 与 unexported normalizeStreamingDelta 相同,供 agent / multiagent 等包在发 SSE 前累计正文。 +func NormalizeStreamingDelta(current, incoming string) (next, delta string) { + return normalizeStreamingDelta(current, incoming) +}