From d82ea608273d3310e0f708b17cf4cfe2267437b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 6 May 2026 17:56:30 +0800 Subject: [PATCH] Add files via upload --- internal/database/conversation.go | 28 ++- internal/database/database.go | 33 ++++ internal/handler/agent.go | 235 +++++++++++++++++--------- internal/handler/eino_single_agent.go | 14 +- internal/handler/multi_agent.go | 16 +- 5 files changed, 225 insertions(+), 101 deletions(-) diff --git a/internal/database/conversation.go b/internal/database/conversation.go index 35d36499..d4c91086 100644 --- a/internal/database/conversation.go +++ b/internal/database/conversation.go @@ -32,6 +32,7 @@ type Message struct { MCPExecutionIDs []string `json:"mcpExecutionIds,omitempty"` ProcessDetails []map[string]interface{} `json:"processDetails,omitempty"` CreatedAt time.Time `json:"createdAt"` + UpdatedAt time.Time `json:"updatedAt"` } // CreateConversation 创建新对话 @@ -484,6 +485,7 @@ func (db *DB) ConversationHasToolProcessDetails(conversationID string) (bool, er // AddMessage 添加消息 func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs []string) (*Message, error) { id := uuid.New().String() + now := time.Now() var mcpIDsJSON string if len(mcpExecutionIDs) > 0 { @@ -496,8 +498,8 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [ } _, err := db.Exec( - "INSERT INTO messages (id, conversation_id, role, content, mcp_execution_ids, created_at) VALUES (?, ?, ?, ?, ?, ?)", - id, conversationID, role, content, mcpIDsJSON, time.Now(), + "INSERT INTO messages (id, conversation_id, role, content, mcp_execution_ids, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)", + id, conversationID, role, content, mcpIDsJSON, now, now, ) if err != nil { return nil, fmt.Errorf("添加消息失败: %w", err) @@ -514,7 +516,8 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [ Role: role, Content: content, MCPExecutionIDs: mcpExecutionIDs, - CreatedAt: time.Now(), + CreatedAt: now, + UpdatedAt: now, } return message, nil @@ -523,7 +526,7 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [ // GetMessages 获取对话的所有消息 func (db *DB) GetMessages(conversationID string) ([]Message, error) { rows, err := db.Query( - "SELECT id, conversation_id, role, content, mcp_execution_ids, created_at FROM messages WHERE conversation_id = ? ORDER BY created_at ASC", + "SELECT id, conversation_id, role, content, mcp_execution_ids, created_at, updated_at FROM messages WHERE conversation_id = ? ORDER BY created_at ASC", conversationID, ) if err != nil { @@ -536,8 +539,9 @@ func (db *DB) GetMessages(conversationID string) ([]Message, error) { var msg Message var mcpIDsJSON sql.NullString var createdAt string + var updatedAt sql.NullString - if err := rows.Scan(&msg.ID, &msg.ConversationID, &msg.Role, &msg.Content, &mcpIDsJSON, &createdAt); err != nil { + if err := rows.Scan(&msg.ID, &msg.ConversationID, &msg.Role, &msg.Content, &mcpIDsJSON, &createdAt, &updatedAt); err != nil { return nil, fmt.Errorf("扫描消息失败: %w", err) } @@ -551,6 +555,20 @@ func (db *DB) GetMessages(conversationID string) ([]Message, error) { msg.CreatedAt, _ = time.Parse(time.RFC3339, createdAt) } + // updated_at 兼容老库:字段不存在/为空时回退为 created_at + if updatedAt.Valid && strings.TrimSpace(updatedAt.String) != "" { + msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt.String) + if err != nil { + msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05", updatedAt.String) + } + if err != nil { + msg.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt.String) + } + } + if msg.UpdatedAt.IsZero() { + msg.UpdatedAt = msg.CreatedAt + } + // 解析MCP执行ID if mcpIDsJSON.Valid && mcpIDsJSON.String != "" { if err := json.Unmarshal([]byte(mcpIDsJSON.String), &msg.MCPExecutionIDs); err != nil { diff --git a/internal/database/database.go b/internal/database/database.go index f18c2244..8de22a54 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -82,6 +82,7 @@ func (db *DB) initTables() error { content TEXT NOT NULL, mcp_execution_ids TEXT, created_at DATETIME NOT NULL, + updated_at DATETIME NOT NULL, FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE );` @@ -518,6 +519,11 @@ func (db *DB) initTables() error { // 不返回错误,允许继续运行 } + if err := db.migrateMessagesTable(); err != nil { + db.logger.Warn("迁移messages表失败", zap.Error(err)) + // 不返回错误,允许继续运行 + } + if err := db.migrateConversationGroupsTable(); err != nil { db.logger.Warn("迁移conversation_groups表失败", zap.Error(err)) // 不返回错误,允许继续运行 @@ -550,6 +556,33 @@ func (db *DB) initTables() error { return nil } +// migrateMessagesTable 迁移 messages 表,补充 updated_at 字段。 +// 语义:updated_at 表示该条消息最后一次被写入/更新的时间(例如助手占位消息在任务结束时更新正文)。 +func (db *DB) migrateMessagesTable() error { + var count int + err := db.QueryRow("SELECT COUNT(*) FROM pragma_table_info('messages') WHERE name='updated_at'").Scan(&count) + if err != nil { + // 如果查询失败,尝试添加字段 + if _, addErr := db.Exec("ALTER TABLE messages ADD COLUMN updated_at DATETIME"); addErr != nil { + errMsg := strings.ToLower(addErr.Error()) + if !strings.Contains(errMsg, "duplicate column") && !strings.Contains(errMsg, "already exists") { + return fmt.Errorf("添加 messages.updated_at 字段失败: %w", addErr) + } + } + } else if count == 0 { + if _, err := db.Exec("ALTER TABLE messages ADD COLUMN updated_at DATETIME"); err != nil { + errMsg := strings.ToLower(err.Error()) + if !strings.Contains(errMsg, "duplicate column") && !strings.Contains(errMsg, "already exists") { + return fmt.Errorf("添加 messages.updated_at 字段失败: %w", err) + } + } + } + + // 回填已有数据:让 updated_at 至少等于 created_at,避免前端出现空/当前时间回退。 + _, _ = db.Exec("UPDATE messages SET updated_at = created_at WHERE updated_at IS NULL OR updated_at = ''") + return nil +} + // migrateConversationsTable 迁移conversations表,添加新字段 func (db *DB) migrateConversationsTable() error { // 检查last_react_input字段是否存在 diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 93fad620..a94f4867 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -728,7 +728,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI h.persistEinoAgentTraceForResume(conversationID, resultMA) errMsg := "执行失败: " + errMA.Error() if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) } return "", conversationID, errMA @@ -740,8 +740,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI mcpIDsJSON = string(jsonData) } _, err = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", - resultMA.Response, mcpIDsJSON, assistantMessageID, + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", + resultMA.Response, mcpIDsJSON, time.Now(), assistantMessageID, ) if err != nil { h.logger.Warn("机器人:更新助手消息失败", zap.Error(err)) @@ -761,7 +761,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI if err != nil { errMsg := "执行失败: " + err.Error() if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) } return "", conversationID, err @@ -775,8 +775,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI mcpIDsJSON = string(jsonData) } _, err = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", - result.Response, mcpIDsJSON, assistantMessageID, + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", + result.Response, mcpIDsJSON, time.Now(), assistantMessageID, ) if err != nil { h.logger.Warn("机器人:更新助手消息失败", zap.Error(err)) @@ -1515,9 +1515,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { // 更新助手消息内容并保存错误详情到数据库 if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新错误后的助手消息失败", zap.Error(updateErr)) } @@ -1569,9 +1569,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新取消后的助手消息失败", zap.Error(updateErr)) } @@ -1604,9 +1604,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", timeoutMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新超时后的助手消息失败", zap.Error(updateErr)) } @@ -1639,9 +1639,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新失败后的助手消息失败", zap.Error(updateErr)) } @@ -1671,7 +1671,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { // 更新助手消息内容 if assistantMsg != nil { _, err = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", result.Response, func() string { if len(result.MCPExecutionIDs) > 0 { @@ -1680,7 +1680,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) { } return "" }(), - assistantMessageID, + time.Now(), assistantMessageID, ) if err != nil { h.logger.Error("更新助手消息失败", zap.Error(err)) @@ -2448,76 +2448,144 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { if assistantMsg != nil { assistantMessageID = assistantMsg.ID } - progressCallback := h.createProgressCallback(context.Background(), nil, conversationID, assistantMessageID, nil) + // 注意:批量任务没有前端直连的 POST /stream,因此若要支持「刷新后补流」, + // 需要把进度事件镜像到 TaskEventBus(GET /api/agent-loop/task-events 会订阅这里)。 + // progressCallback 将在子任务的 IIFE 内创建,以便拿到 taskCtx/cancelWithCause 与 sendEvent。 + var progressCallback func(eventType, message string, data interface{}) // 执行任务(使用包含角色提示词的finalMessage和角色工具列表) h.logger.Info("执行批量任务", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("message", task.Message), zap.String("role", queue.Role), zap.String("conversationId", conversationID)) - // 单个子任务超时时间:从30分钟调整为6小时,适配长时间渗透/扫描任务 - ctx, cancel := context.WithTimeout(context.Background(), 6*time.Hour) - // 存储取消函数,以便在取消队列时能够取消当前任务 - h.batchTaskManager.SetTaskCancel(queueID, cancel) - // 使用队列配置的角色工具列表(如果为空,表示使用所有工具) - useBatchMulti := false - useEinoSingle := false - batchOrch := "deep" - am := strings.TrimSpace(strings.ToLower(queue.AgentMode)) - if am == "multi" { - am = "deep" - } - if am == "eino_single" { - useEinoSingle = true - } else if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled { - useBatchMulti = true - batchOrch = config.NormalizeMultiAgentOrchestration(am) - } else if queue.AgentMode == "" { - // 兼容历史数据:未配置队列代理模式时,沿用旧的系统级开关 - if h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent { + func() { + // 与对话流式接口一致:同 conversationId 仅允许一个运行中任务,并支持 /api/agent-loop/cancel 与会话锁对齐。 + baseCtx, cancelWithCause := context.WithCancelCause(context.Background()) + // 单个子任务超时:6 小时(与原先 WithTimeout(Background) 一致) + taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 6*time.Hour) + + registered := false + finishStatus := "completed" + + defer func() { + h.batchTaskManager.SetTaskCancel(queueID, nil) + timeoutCancel() + if registered { + // 与流式接口保持一致:结束前补一个 done,便于前端 task-events 侧及时收口 UI。 + if h.taskEventBus != nil { + ev := StreamEvent{Type: "done", Message: "", Data: map[string]interface{}{"conversationId": conversationID}} + if b, err := json.Marshal(ev); err == nil { + h.taskEventBus.Publish(conversationID, append(append([]byte("data: "), b...), '\n', '\n')) + } + } + h.tasks.FinishTask(conversationID, finishStatus) + } + cancelWithCause(nil) + }() + + // 事件镜像:只发布到 TaskEventBus,不直接写 HTTP Response(用于刷新后的补流)。 + sendEvent := func(eventType, message string, data interface{}) { + if h.taskEventBus == nil { + return + } + ev := StreamEvent{Type: eventType, Message: message, Data: data} + b, err := json.Marshal(ev) + if err != nil { + b = []byte(`{"type":"error","message":"marshal failed"}`) + } + line := make([]byte, 0, len(b)+8) + line = append(line, []byte("data: ")...) + line = append(line, b...) + line = append(line, '\n', '\n') + h.taskEventBus.Publish(conversationID, line) + } + + if _, err := h.tasks.StartTask(conversationID, task.Message, cancelWithCause); err != nil { + h.logger.Warn("批量队列子任务注册会话运行状态失败", + zap.String("queueId", queueID), + zap.String("taskId", task.ID), + zap.String("conversationId", conversationID), + zap.Error(err)) + failMsg := err.Error() + if errors.Is(err, ErrTaskAlreadyRunning) { + failMsg = "会话已有任务正在执行,无法在该会话上并行启动批量子任务" + } + h.batchTaskManager.UpdateTaskStatus(queueID, task.ID, "failed", "", failMsg) + return + } + registered = true + // 存储取消函数:暂停队列时取消子任务 context(与原先语义一致) + h.batchTaskManager.SetTaskCancel(queueID, timeoutCancel) + + // 创建进度回调函数:写 DB + 镜像到 task-events,支持刷新后继续流式展示。 + progressCallback = h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + + // 使用队列配置的角色工具列表(如果为空,表示使用所有工具) + useBatchMulti := false + useEinoSingle := false + batchOrch := "deep" + am := strings.TrimSpace(strings.ToLower(queue.AgentMode)) + if am == "multi" { + am = "deep" + } + if am == "eino_single" { + useEinoSingle = true + } else if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled { useBatchMulti = true - batchOrch = "deep" + batchOrch = config.NormalizeMultiAgentOrchestration(am) + } else if queue.AgentMode == "" { + // 兼容历史数据:未配置队列代理模式时,沿用旧的系统级开关 + if h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent { + useBatchMulti = true + batchOrch = "deep" + } } - } - useRunResult := useBatchMulti || useEinoSingle - var result *agent.AgentLoopResult - var resultMA *multiagent.RunResult - var runErr error - switch { - case useBatchMulti: - resultMA, runErr = multiagent.RunDeepAgent(ctx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch) - case useEinoSingle: - if h.config == nil { - runErr = fmt.Errorf("服务器配置未加载") - } else { - resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(ctx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback) + useRunResult := useBatchMulti || useEinoSingle + var result *agent.AgentLoopResult + var resultMA *multiagent.RunResult + var runErr error + switch { + case useBatchMulti: + resultMA, runErr = multiagent.RunDeepAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch) + case useEinoSingle: + if h.config == nil { + runErr = fmt.Errorf("服务器配置未加载") + } else { + resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback) + } + default: + result, runErr = h.agent.AgentLoopWithProgress(taskCtx, finalMessage, []agent.ChatMessage{}, conversationID, progressCallback, roleTools) } - default: - result, runErr = h.agent.AgentLoopWithProgress(ctx, finalMessage, []agent.ChatMessage{}, conversationID, progressCallback, roleTools) - } - // 任务执行完成,清理取消函数 - h.batchTaskManager.SetTaskCancel(queueID, nil) - cancel() - if runErr != nil { - if useRunResult { - h.persistEinoAgentTraceForResume(conversationID, resultMA) - } - // 检查是否是取消错误 - // 1. 直接检查是否是 context.Canceled(包括包装后的错误) - // 2. 检查错误消息中是否包含"context canceled"或"cancelled"关键字 - // 3. 检查 result.Response 中是否包含取消相关的消息 - errStr := runErr.Error() - partialResp := "" - if useRunResult && resultMA != nil { - partialResp = resultMA.Response - } else if result != nil { - partialResp = result.Response - } - isCancelled := errors.Is(runErr, context.Canceled) || - strings.Contains(strings.ToLower(errStr), "context canceled") || - strings.Contains(strings.ToLower(errStr), "context cancelled") || - (partialResp != "" && (strings.Contains(partialResp, "任务已被取消") || strings.Contains(partialResp, "任务执行中断"))) + if runErr != nil { + if useRunResult { + h.persistEinoAgentTraceForResume(conversationID, resultMA) + } + // 检查是否是取消错误 + // 1. 直接检查是否是 context.Canceled(包括包装后的错误) + // 2. 检查错误消息中是否包含"context canceled"或"cancelled"关键字 + // 3. 检查 result.Response 中是否包含取消相关的消息 + errStr := runErr.Error() + partialResp := "" + if useRunResult && resultMA != nil { + partialResp = resultMA.Response + } else if result != nil { + partialResp = result.Response + } + isCancelled := errors.Is(context.Cause(baseCtx), ErrTaskCancelled) || + errors.Is(runErr, context.Canceled) || + strings.Contains(strings.ToLower(errStr), "context canceled") || + strings.Contains(strings.ToLower(errStr), "context cancelled") || + (partialResp != "" && (strings.Contains(partialResp, "任务已被取消") || strings.Contains(partialResp, "任务执行中断"))) + isTimeout := errors.Is(runErr, context.DeadlineExceeded) || errors.Is(context.Cause(taskCtx), context.DeadlineExceeded) - if isCancelled { + if isTimeout { + finishStatus = "timeout" + } else if isCancelled { + finishStatus = "cancelled" + } else { + finishStatus = "failed" + } + + if isCancelled { h.logger.Info("批量任务被取消", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("conversationId", conversationID)) cancelMsg := "任务已被用户取消,后续操作已停止。" // 如果执行结果中有更具体的取消消息,使用它 @@ -2527,9 +2595,9 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { // 更新助手消息内容 if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新取消后的助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr)) } @@ -2561,9 +2629,9 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { // 更新助手消息内容 if assistantMessageID != "" { if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ? WHERE id = ?", + "UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新失败后的助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr)) } @@ -2600,10 +2668,10 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { mcpIDsJSON = string(jsonData) } if _, updateErr := h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", resText, mcpIDsJSON, - assistantMessageID, + time.Now(), assistantMessageID, ); updateErr != nil { h.logger.Warn("更新助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr)) // 如果更新失败,尝试创建新消息 @@ -2632,6 +2700,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) { // 保存结果 h.batchTaskManager.UpdateTaskStatusWithConversationID(queueID, task.ID, "completed", resText, "", conversationID) } + }() // 移动到下一个任务 h.batchTaskManager.MoveToNextTask(queueID) diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index a3ed3e6c..978dbde9 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -136,7 +136,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { sendEvent("error", errorMsg, nil) } if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errorMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) } sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) return @@ -182,7 +182,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) cancelMsg := "任务已被用户取消,后续操作已停止。" if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", cancelMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil) } sendEvent("cancelled", cancelMsg, map[string]interface{}{ @@ -198,7 +198,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) timeoutMsg := "任务执行超时,已自动终止。" if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", timeoutMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", timeoutMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "timeout", timeoutMsg, nil) } sendEvent("error", timeoutMsg, map[string]interface{}{ @@ -215,7 +215,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) errMsg := "执行失败: " + runErr.Error() if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) } sendEvent("error", errMsg, map[string]interface{}{ @@ -233,9 +233,10 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { mcpIDsJSON = string(jsonData) } _, _ = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", result.Response, mcpIDsJSON, + time.Now(), assistantMessageID, ) } @@ -319,9 +320,10 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { mcpIDsJSON = string(jsonData) } _, _ = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", result.Response, mcpIDsJSON, + time.Now(), prep.AssistantMessageID, ) } diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index 8d8d896f..0d248cf4 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -152,7 +152,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { sendEvent("error", errorMsg, nil) } if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errorMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID) } sendEvent("done", "", map[string]interface{}{"conversationId": conversationID}) return @@ -192,7 +192,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) cancelMsg := "任务已被用户取消,后续操作已停止。" if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", cancelMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil) } sendEvent("cancelled", cancelMsg, map[string]interface{}{ @@ -208,7 +208,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) timeoutMsg := "任务执行超时,已自动终止。" if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", timeoutMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", timeoutMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "timeout", timeoutMsg, nil) } sendEvent("error", timeoutMsg, map[string]interface{}{ @@ -225,7 +225,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { h.tasks.UpdateTaskStatus(conversationID, taskStatus) errMsg := "执行失败: " + runErr.Error() if assistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID) _ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil) } sendEvent("error", errMsg, map[string]interface{}{ @@ -243,9 +243,10 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { mcpIDsJSON = string(jsonData) } _, _ = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", result.Response, mcpIDsJSON, + time.Now(), assistantMessageID, ) } @@ -323,7 +324,7 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { h.logger.Error("Eino DeepAgent 执行失败", zap.Error(runErr)) errMsg := "执行失败: " + runErr.Error() if prep.AssistantMessageID != "" { - _, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, prep.AssistantMessageID) + _, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), prep.AssistantMessageID) } c.JSON(http.StatusInternalServerError, gin.H{"error": errMsg}) return @@ -336,9 +337,10 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { mcpIDsJSON = string(jsonData) } _, _ = h.db.Exec( - "UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?", + "UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?", result.Response, mcpIDsJSON, + time.Now(), prep.AssistantMessageID, ) }