From 6f70d7b851a36e9cb7b5687ecd824124d8dc484b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 2 Apr 2026 00:01:13 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 155 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 147 insertions(+), 8 deletions(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index fa8745d5..e0ecafb4 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -79,8 +79,8 @@ type AgentHandler struct { knowledgeManager interface { // 知识库管理器接口 LogRetrieval(conversationID, messageID, query, riskType string, retrievedItems []string) error } - skillsManager *skills.Manager // Skills管理器 - agentsMarkdownDir string // 多代理:Markdown 子 Agent 目录(绝对路径,空则不从磁盘合并) + skillsManager *skills.Manager // Skills管理器 + agentsMarkdownDir string // 多代理:Markdown 子 Agent 目录(绝对路径,空则不从磁盘合并) } // NewAgentHandler 创建新的Agent处理器 @@ -122,8 +122,8 @@ func (h *AgentHandler) SetAgentsMarkdownDir(absDir string) { // ChatAttachment 聊天附件(用户上传的文件) type ChatAttachment struct { - FileName string `json:"fileName"` // 展示用文件名 - Content string `json:"content,omitempty"` // 文本或 base64;若已预先上传到服务器可留空 + FileName string `json:"fileName"` // 展示用文件名 + Content string `json:"content,omitempty"` // 文本或 base64;若已预先上传到服务器可留空 MimeType string `json:"mimeType,omitempty"` ServerPath string `json:"serverPath,omitempty"` // 已保存在 chat_uploads 下的绝对路径(由 POST /api/chat-uploads 返回) } @@ -714,6 +714,73 @@ func (h *AgentHandler) createProgressCallback(conversationID, assistantMessageID // 用于保存tool_call事件中的参数,以便在tool_result时使用 toolCallCache := make(map[string]map[string]interface{}) // toolCallId -> arguments + // thinking_stream_*:不逐条落库,按 streamId 聚合,在后续关键事件前补一条可持久化的 thinking + type thinkingBuf struct { + b strings.Builder + meta map[string]interface{} + } + thinkingStreams := make(map[string]*thinkingBuf) // streamId -> buf + flushedThinking := make(map[string]bool) // streamId -> flushed + + // response_start + response_delta:前端时间线显示为「📝 规划中」(monitor.js),不落逐条 delta; + // 聚合为一条 planning 写入 process_details,刷新后与线上一致。 + var respPlan struct { + meta map[string]interface{} + b strings.Builder + } + flushResponsePlan := func() { + if assistantMessageID == "" { + return + } + content := strings.TrimSpace(respPlan.b.String()) + if content == "" { + respPlan.meta = nil + respPlan.b.Reset() + return + } + data := map[string]interface{}{ + "source": "response_stream", + } + for k, v := range respPlan.meta { + data[k] = v + } + if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "planning", content, data); err != nil { + h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", "planning")) + } + respPlan.meta = nil + respPlan.b.Reset() + } + + flushThinkingStreams := func() { + if assistantMessageID == "" { + return + } + for sid, tb := range thinkingStreams { + if sid == "" || flushedThinking[sid] || tb == nil { + continue + } + content := strings.TrimSpace(tb.b.String()) + if content == "" { + flushedThinking[sid] = true + continue + } + data := map[string]interface{}{ + "streamId": sid, + } + for k, v := range tb.meta { + // 避免覆盖 streamId + if k == "streamId" { + continue + } + data[k] = v + } + if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "thinking", content, data); err != nil { + h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", "thinking")) + } + flushedThinking[sid] = true + } + } + return func(eventType, message string, data interface{}) { // 如果提供了sendEventFunc,发送流式事件 if sendEventFunc != nil { @@ -846,25 +913,97 @@ func (h *AgentHandler) createProgressCallback(conversationID, assistantMessageID // 子代理回复流式增量不落库;结束时合并为一条 eino_agent_reply if assistantMessageID != "" && eventType == "eino_agent_reply_stream_end" { + flushResponsePlan() + // 确保思考流在子代理回复前能持久化(刷新后可读) + flushThinkingStreams() if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "eino_agent_reply", message, data); err != nil { h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", eventType)) } return } - // 保存过程详情到数据库(排除response/done事件,它们会在后面单独处理) - // 另外:response_start/response_delta 是模型流式增量,保存会导致过程详情膨胀,因此不落库。 + // 多代理主代理「规划中」:response_start / response_delta 仅用于 SSE,聚合落一条 planning + if eventType == "response_start" { + flushResponsePlan() + respPlan.meta = nil + if dataMap, ok := data.(map[string]interface{}); ok { + respPlan.meta = make(map[string]interface{}, len(dataMap)) + for k, v := range dataMap { + respPlan.meta[k] = v + } + } + respPlan.b.Reset() + return + } + if eventType == "response_delta" { + respPlan.b.WriteString(message) + if dataMap, ok := data.(map[string]interface{}); ok && respPlan.meta == nil { + respPlan.meta = make(map[string]interface{}, len(dataMap)) + for k, v := range dataMap { + respPlan.meta[k] = v + } + } else if dataMap, ok := data.(map[string]interface{}); ok { + for k, v := range dataMap { + respPlan.meta[k] = v + } + } + return + } + if eventType == "response" { + flushResponsePlan() + return + } + + // 聚合 thinking_stream_*(ReasoningContent),不逐条落库 + if eventType == "thinking_stream_start" { + if dataMap, ok := data.(map[string]interface{}); ok { + if sid, ok2 := dataMap["streamId"].(string); ok2 && sid != "" { + tb := thinkingStreams[sid] + if tb == nil { + tb = &thinkingBuf{meta: map[string]interface{}{}} + thinkingStreams[sid] = tb + } + // 记录元信息(source/einoAgent/einoRole/iteration 等) + for k, v := range dataMap { + tb.meta[k] = v + } + } + } + return + } + if eventType == "thinking_stream_delta" { + if dataMap, ok := data.(map[string]interface{}); ok { + if sid, ok2 := dataMap["streamId"].(string); ok2 && sid != "" { + tb := thinkingStreams[sid] + if tb == nil { + tb = &thinkingBuf{meta: map[string]interface{}{}} + thinkingStreams[sid] = tb + } + // delta 片段直接拼接;message 本身就是 reasoning content + tb.b.WriteString(message) + // 有时 delta 先到 start 未到,补充元信息 + for k, v := range dataMap { + tb.meta[k] = v + } + } + } + return + } + + // 保存过程详情到数据库(排除 response/done;response 正文已在 messages 表) + // response_start/response_delta 已聚合为 planning,不落逐条。 if assistantMessageID != "" && eventType != "response" && eventType != "done" && eventType != "response_start" && eventType != "response_delta" && eventType != "tool_result_delta" && - eventType != "thinking_stream_start" && - eventType != "thinking_stream_delta" && eventType != "eino_agent_reply_stream_start" && eventType != "eino_agent_reply_stream_delta" && eventType != "eino_agent_reply_stream_end" { + // 在关键过程事件落库前,先把「规划中」与 thinking_stream 落库 + flushResponsePlan() + flushThinkingStreams() if err := h.db.AddProcessDetail(assistantMessageID, conversationID, eventType, message, data); err != nil { h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", eventType)) }