Add files via upload

This commit is contained in:
公明
2026-04-02 00:01:13 +08:00
committed by GitHub
parent 157f1c9754
commit 6f70d7b851
+147 -8
View File
@@ -79,8 +79,8 @@ type AgentHandler struct {
knowledgeManager interface { // 知识库管理器接口
LogRetrieval(conversationID, messageID, query, riskType string, retrievedItems []string) error
}
skillsManager *skills.Manager // Skills管理器
agentsMarkdownDir string // 多代理:Markdown 子 Agent 目录(绝对路径,空则不从磁盘合并)
skillsManager *skills.Manager // Skills管理器
agentsMarkdownDir string // 多代理:Markdown 子 Agent 目录(绝对路径,空则不从磁盘合并)
}
// NewAgentHandler 创建新的Agent处理器
@@ -122,8 +122,8 @@ func (h *AgentHandler) SetAgentsMarkdownDir(absDir string) {
// ChatAttachment 聊天附件(用户上传的文件)
type ChatAttachment struct {
FileName string `json:"fileName"` // 展示用文件名
Content string `json:"content,omitempty"` // 文本或 base64;若已预先上传到服务器可留空
FileName string `json:"fileName"` // 展示用文件名
Content string `json:"content,omitempty"` // 文本或 base64;若已预先上传到服务器可留空
MimeType string `json:"mimeType,omitempty"`
ServerPath string `json:"serverPath,omitempty"` // 已保存在 chat_uploads 下的绝对路径(由 POST /api/chat-uploads 返回)
}
@@ -714,6 +714,73 @@ func (h *AgentHandler) createProgressCallback(conversationID, assistantMessageID
// 用于保存tool_call事件中的参数,以便在tool_result时使用
toolCallCache := make(map[string]map[string]interface{}) // toolCallId -> arguments
// thinking_stream_*:不逐条落库,按 streamId 聚合,在后续关键事件前补一条可持久化的 thinking
type thinkingBuf struct {
b strings.Builder
meta map[string]interface{}
}
thinkingStreams := make(map[string]*thinkingBuf) // streamId -> buf
flushedThinking := make(map[string]bool) // streamId -> flushed
// response_start + response_delta:前端时间线显示为「📝 规划中」(monitor.js),不落逐条 delta
// 聚合为一条 planning 写入 process_details,刷新后与线上一致。
var respPlan struct {
meta map[string]interface{}
b strings.Builder
}
flushResponsePlan := func() {
if assistantMessageID == "" {
return
}
content := strings.TrimSpace(respPlan.b.String())
if content == "" {
respPlan.meta = nil
respPlan.b.Reset()
return
}
data := map[string]interface{}{
"source": "response_stream",
}
for k, v := range respPlan.meta {
data[k] = v
}
if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "planning", content, data); err != nil {
h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", "planning"))
}
respPlan.meta = nil
respPlan.b.Reset()
}
flushThinkingStreams := func() {
if assistantMessageID == "" {
return
}
for sid, tb := range thinkingStreams {
if sid == "" || flushedThinking[sid] || tb == nil {
continue
}
content := strings.TrimSpace(tb.b.String())
if content == "" {
flushedThinking[sid] = true
continue
}
data := map[string]interface{}{
"streamId": sid,
}
for k, v := range tb.meta {
// 避免覆盖 streamId
if k == "streamId" {
continue
}
data[k] = v
}
if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "thinking", content, data); err != nil {
h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", "thinking"))
}
flushedThinking[sid] = true
}
}
return func(eventType, message string, data interface{}) {
// 如果提供了sendEventFunc,发送流式事件
if sendEventFunc != nil {
@@ -846,25 +913,97 @@ func (h *AgentHandler) createProgressCallback(conversationID, assistantMessageID
// 子代理回复流式增量不落库;结束时合并为一条 eino_agent_reply
if assistantMessageID != "" && eventType == "eino_agent_reply_stream_end" {
flushResponsePlan()
// 确保思考流在子代理回复前能持久化(刷新后可读)
flushThinkingStreams()
if err := h.db.AddProcessDetail(assistantMessageID, conversationID, "eino_agent_reply", message, data); err != nil {
h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", eventType))
}
return
}
// 保存过程详情到数据库(排除response/done事件,它们会在后面单独处理)
// 另外:response_start/response_delta 是模型流式增量,保存会导致过程详情膨胀,因此不落库。
// 多代理主代理「规划中」:response_start / response_delta 仅用于 SSE,聚合落一条 planning
if eventType == "response_start" {
flushResponsePlan()
respPlan.meta = nil
if dataMap, ok := data.(map[string]interface{}); ok {
respPlan.meta = make(map[string]interface{}, len(dataMap))
for k, v := range dataMap {
respPlan.meta[k] = v
}
}
respPlan.b.Reset()
return
}
if eventType == "response_delta" {
respPlan.b.WriteString(message)
if dataMap, ok := data.(map[string]interface{}); ok && respPlan.meta == nil {
respPlan.meta = make(map[string]interface{}, len(dataMap))
for k, v := range dataMap {
respPlan.meta[k] = v
}
} else if dataMap, ok := data.(map[string]interface{}); ok {
for k, v := range dataMap {
respPlan.meta[k] = v
}
}
return
}
if eventType == "response" {
flushResponsePlan()
return
}
// 聚合 thinking_stream_*ReasoningContent),不逐条落库
if eventType == "thinking_stream_start" {
if dataMap, ok := data.(map[string]interface{}); ok {
if sid, ok2 := dataMap["streamId"].(string); ok2 && sid != "" {
tb := thinkingStreams[sid]
if tb == nil {
tb = &thinkingBuf{meta: map[string]interface{}{}}
thinkingStreams[sid] = tb
}
// 记录元信息(source/einoAgent/einoRole/iteration 等)
for k, v := range dataMap {
tb.meta[k] = v
}
}
}
return
}
if eventType == "thinking_stream_delta" {
if dataMap, ok := data.(map[string]interface{}); ok {
if sid, ok2 := dataMap["streamId"].(string); ok2 && sid != "" {
tb := thinkingStreams[sid]
if tb == nil {
tb = &thinkingBuf{meta: map[string]interface{}{}}
thinkingStreams[sid] = tb
}
// delta 片段直接拼接;message 本身就是 reasoning content
tb.b.WriteString(message)
// 有时 delta 先到 start 未到,补充元信息
for k, v := range dataMap {
tb.meta[k] = v
}
}
}
return
}
// 保存过程详情到数据库(排除 response/doneresponse 正文已在 messages 表)
// response_start/response_delta 已聚合为 planning,不落逐条。
if assistantMessageID != "" &&
eventType != "response" &&
eventType != "done" &&
eventType != "response_start" &&
eventType != "response_delta" &&
eventType != "tool_result_delta" &&
eventType != "thinking_stream_start" &&
eventType != "thinking_stream_delta" &&
eventType != "eino_agent_reply_stream_start" &&
eventType != "eino_agent_reply_stream_delta" &&
eventType != "eino_agent_reply_stream_end" {
// 在关键过程事件落库前,先把「规划中」与 thinking_stream 落库
flushResponsePlan()
flushThinkingStreams()
if err := h.db.AddProcessDetail(assistantMessageID, conversationID, eventType, message, data); err != nil {
h.logger.Warn("保存过程详情失败", zap.Error(err), zap.String("eventType", eventType))
}