mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-15 12:58:01 +02:00
Add files via upload
This commit is contained in:
@@ -32,6 +32,7 @@ type Message struct {
|
||||
MCPExecutionIDs []string `json:"mcpExecutionIds,omitempty"`
|
||||
ProcessDetails []map[string]interface{} `json:"processDetails,omitempty"`
|
||||
CreatedAt time.Time `json:"createdAt"`
|
||||
UpdatedAt time.Time `json:"updatedAt"`
|
||||
}
|
||||
|
||||
// CreateConversation 创建新对话
|
||||
@@ -484,6 +485,7 @@ func (db *DB) ConversationHasToolProcessDetails(conversationID string) (bool, er
|
||||
// AddMessage 添加消息
|
||||
func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs []string) (*Message, error) {
|
||||
id := uuid.New().String()
|
||||
now := time.Now()
|
||||
|
||||
var mcpIDsJSON string
|
||||
if len(mcpExecutionIDs) > 0 {
|
||||
@@ -496,8 +498,8 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [
|
||||
}
|
||||
|
||||
_, err := db.Exec(
|
||||
"INSERT INTO messages (id, conversation_id, role, content, mcp_execution_ids, created_at) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
id, conversationID, role, content, mcpIDsJSON, time.Now(),
|
||||
"INSERT INTO messages (id, conversation_id, role, content, mcp_execution_ids, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||||
id, conversationID, role, content, mcpIDsJSON, now, now,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("添加消息失败: %w", err)
|
||||
@@ -514,7 +516,8 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [
|
||||
Role: role,
|
||||
Content: content,
|
||||
MCPExecutionIDs: mcpExecutionIDs,
|
||||
CreatedAt: time.Now(),
|
||||
CreatedAt: now,
|
||||
UpdatedAt: now,
|
||||
}
|
||||
|
||||
return message, nil
|
||||
@@ -523,7 +526,7 @@ func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs [
|
||||
// GetMessages 获取对话的所有消息
|
||||
func (db *DB) GetMessages(conversationID string) ([]Message, error) {
|
||||
rows, err := db.Query(
|
||||
"SELECT id, conversation_id, role, content, mcp_execution_ids, created_at FROM messages WHERE conversation_id = ? ORDER BY created_at ASC",
|
||||
"SELECT id, conversation_id, role, content, mcp_execution_ids, created_at, updated_at FROM messages WHERE conversation_id = ? ORDER BY created_at ASC",
|
||||
conversationID,
|
||||
)
|
||||
if err != nil {
|
||||
@@ -536,8 +539,9 @@ func (db *DB) GetMessages(conversationID string) ([]Message, error) {
|
||||
var msg Message
|
||||
var mcpIDsJSON sql.NullString
|
||||
var createdAt string
|
||||
var updatedAt sql.NullString
|
||||
|
||||
if err := rows.Scan(&msg.ID, &msg.ConversationID, &msg.Role, &msg.Content, &mcpIDsJSON, &createdAt); err != nil {
|
||||
if err := rows.Scan(&msg.ID, &msg.ConversationID, &msg.Role, &msg.Content, &mcpIDsJSON, &createdAt, &updatedAt); err != nil {
|
||||
return nil, fmt.Errorf("扫描消息失败: %w", err)
|
||||
}
|
||||
|
||||
@@ -551,6 +555,20 @@ func (db *DB) GetMessages(conversationID string) ([]Message, error) {
|
||||
msg.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
|
||||
}
|
||||
|
||||
// updated_at 兼容老库:字段不存在/为空时回退为 created_at
|
||||
if updatedAt.Valid && strings.TrimSpace(updatedAt.String) != "" {
|
||||
msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt.String)
|
||||
if err != nil {
|
||||
msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05", updatedAt.String)
|
||||
}
|
||||
if err != nil {
|
||||
msg.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt.String)
|
||||
}
|
||||
}
|
||||
if msg.UpdatedAt.IsZero() {
|
||||
msg.UpdatedAt = msg.CreatedAt
|
||||
}
|
||||
|
||||
// 解析MCP执行ID
|
||||
if mcpIDsJSON.Valid && mcpIDsJSON.String != "" {
|
||||
if err := json.Unmarshal([]byte(mcpIDsJSON.String), &msg.MCPExecutionIDs); err != nil {
|
||||
|
||||
@@ -82,6 +82,7 @@ func (db *DB) initTables() error {
|
||||
content TEXT NOT NULL,
|
||||
mcp_execution_ids TEXT,
|
||||
created_at DATETIME NOT NULL,
|
||||
updated_at DATETIME NOT NULL,
|
||||
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE
|
||||
);`
|
||||
|
||||
@@ -518,6 +519,11 @@ func (db *DB) initTables() error {
|
||||
// 不返回错误,允许继续运行
|
||||
}
|
||||
|
||||
if err := db.migrateMessagesTable(); err != nil {
|
||||
db.logger.Warn("迁移messages表失败", zap.Error(err))
|
||||
// 不返回错误,允许继续运行
|
||||
}
|
||||
|
||||
if err := db.migrateConversationGroupsTable(); err != nil {
|
||||
db.logger.Warn("迁移conversation_groups表失败", zap.Error(err))
|
||||
// 不返回错误,允许继续运行
|
||||
@@ -550,6 +556,33 @@ func (db *DB) initTables() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateMessagesTable 迁移 messages 表,补充 updated_at 字段。
|
||||
// 语义:updated_at 表示该条消息最后一次被写入/更新的时间(例如助手占位消息在任务结束时更新正文)。
|
||||
func (db *DB) migrateMessagesTable() error {
|
||||
var count int
|
||||
err := db.QueryRow("SELECT COUNT(*) FROM pragma_table_info('messages') WHERE name='updated_at'").Scan(&count)
|
||||
if err != nil {
|
||||
// 如果查询失败,尝试添加字段
|
||||
if _, addErr := db.Exec("ALTER TABLE messages ADD COLUMN updated_at DATETIME"); addErr != nil {
|
||||
errMsg := strings.ToLower(addErr.Error())
|
||||
if !strings.Contains(errMsg, "duplicate column") && !strings.Contains(errMsg, "already exists") {
|
||||
return fmt.Errorf("添加 messages.updated_at 字段失败: %w", addErr)
|
||||
}
|
||||
}
|
||||
} else if count == 0 {
|
||||
if _, err := db.Exec("ALTER TABLE messages ADD COLUMN updated_at DATETIME"); err != nil {
|
||||
errMsg := strings.ToLower(err.Error())
|
||||
if !strings.Contains(errMsg, "duplicate column") && !strings.Contains(errMsg, "already exists") {
|
||||
return fmt.Errorf("添加 messages.updated_at 字段失败: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 回填已有数据:让 updated_at 至少等于 created_at,避免前端出现空/当前时间回退。
|
||||
_, _ = db.Exec("UPDATE messages SET updated_at = created_at WHERE updated_at IS NULL OR updated_at = ''")
|
||||
return nil
|
||||
}
|
||||
|
||||
// migrateConversationsTable 迁移conversations表,添加新字段
|
||||
func (db *DB) migrateConversationsTable() error {
|
||||
// 检查last_react_input字段是否存在
|
||||
|
||||
+152
-83
@@ -728,7 +728,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI
|
||||
h.persistEinoAgentTraceForResume(conversationID, resultMA)
|
||||
errMsg := "执行失败: " + errMA.Error()
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
|
||||
}
|
||||
return "", conversationID, errMA
|
||||
@@ -740,8 +740,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, err = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
resultMA.Response, mcpIDsJSON, assistantMessageID,
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
resultMA.Response, mcpIDsJSON, time.Now(), assistantMessageID,
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Warn("机器人:更新助手消息失败", zap.Error(err))
|
||||
@@ -761,7 +761,7 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI
|
||||
if err != nil {
|
||||
errMsg := "执行失败: " + err.Error()
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
|
||||
}
|
||||
return "", conversationID, err
|
||||
@@ -775,8 +775,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, conversationI
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, err = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
result.Response, mcpIDsJSON, assistantMessageID,
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response, mcpIDsJSON, time.Now(), assistantMessageID,
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Warn("机器人:更新助手消息失败", zap.Error(err))
|
||||
@@ -1515,9 +1515,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
// 更新助手消息内容并保存错误详情到数据库
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
errorMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新错误后的助手消息失败", zap.Error(updateErr))
|
||||
}
|
||||
@@ -1569,9 +1569,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
cancelMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新取消后的助手消息失败", zap.Error(updateErr))
|
||||
}
|
||||
@@ -1604,9 +1604,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
timeoutMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新超时后的助手消息失败", zap.Error(updateErr))
|
||||
}
|
||||
@@ -1639,9 +1639,9 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
errorMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新失败后的助手消息失败", zap.Error(updateErr))
|
||||
}
|
||||
@@ -1671,7 +1671,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
// 更新助手消息内容
|
||||
if assistantMsg != nil {
|
||||
_, err = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response,
|
||||
func() string {
|
||||
if len(result.MCPExecutionIDs) > 0 {
|
||||
@@ -1680,7 +1680,7 @@ func (h *AgentHandler) AgentLoopStream(c *gin.Context) {
|
||||
}
|
||||
return ""
|
||||
}(),
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
)
|
||||
if err != nil {
|
||||
h.logger.Error("更新助手消息失败", zap.Error(err))
|
||||
@@ -2448,76 +2448,144 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
|
||||
if assistantMsg != nil {
|
||||
assistantMessageID = assistantMsg.ID
|
||||
}
|
||||
progressCallback := h.createProgressCallback(context.Background(), nil, conversationID, assistantMessageID, nil)
|
||||
// 注意:批量任务没有前端直连的 POST /stream,因此若要支持「刷新后补流」,
|
||||
// 需要把进度事件镜像到 TaskEventBus(GET /api/agent-loop/task-events 会订阅这里)。
|
||||
// progressCallback 将在子任务的 IIFE 内创建,以便拿到 taskCtx/cancelWithCause 与 sendEvent。
|
||||
var progressCallback func(eventType, message string, data interface{})
|
||||
|
||||
// 执行任务(使用包含角色提示词的finalMessage和角色工具列表)
|
||||
h.logger.Info("执行批量任务", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("message", task.Message), zap.String("role", queue.Role), zap.String("conversationId", conversationID))
|
||||
|
||||
// 单个子任务超时时间:从30分钟调整为6小时,适配长时间渗透/扫描任务
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Hour)
|
||||
// 存储取消函数,以便在取消队列时能够取消当前任务
|
||||
h.batchTaskManager.SetTaskCancel(queueID, cancel)
|
||||
// 使用队列配置的角色工具列表(如果为空,表示使用所有工具)
|
||||
useBatchMulti := false
|
||||
useEinoSingle := false
|
||||
batchOrch := "deep"
|
||||
am := strings.TrimSpace(strings.ToLower(queue.AgentMode))
|
||||
if am == "multi" {
|
||||
am = "deep"
|
||||
}
|
||||
if am == "eino_single" {
|
||||
useEinoSingle = true
|
||||
} else if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled {
|
||||
useBatchMulti = true
|
||||
batchOrch = config.NormalizeMultiAgentOrchestration(am)
|
||||
} else if queue.AgentMode == "" {
|
||||
// 兼容历史数据:未配置队列代理模式时,沿用旧的系统级开关
|
||||
if h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent {
|
||||
func() {
|
||||
// 与对话流式接口一致:同 conversationId 仅允许一个运行中任务,并支持 /api/agent-loop/cancel 与会话锁对齐。
|
||||
baseCtx, cancelWithCause := context.WithCancelCause(context.Background())
|
||||
// 单个子任务超时:6 小时(与原先 WithTimeout(Background) 一致)
|
||||
taskCtx, timeoutCancel := context.WithTimeout(baseCtx, 6*time.Hour)
|
||||
|
||||
registered := false
|
||||
finishStatus := "completed"
|
||||
|
||||
defer func() {
|
||||
h.batchTaskManager.SetTaskCancel(queueID, nil)
|
||||
timeoutCancel()
|
||||
if registered {
|
||||
// 与流式接口保持一致:结束前补一个 done,便于前端 task-events 侧及时收口 UI。
|
||||
if h.taskEventBus != nil {
|
||||
ev := StreamEvent{Type: "done", Message: "", Data: map[string]interface{}{"conversationId": conversationID}}
|
||||
if b, err := json.Marshal(ev); err == nil {
|
||||
h.taskEventBus.Publish(conversationID, append(append([]byte("data: "), b...), '\n', '\n'))
|
||||
}
|
||||
}
|
||||
h.tasks.FinishTask(conversationID, finishStatus)
|
||||
}
|
||||
cancelWithCause(nil)
|
||||
}()
|
||||
|
||||
// 事件镜像:只发布到 TaskEventBus,不直接写 HTTP Response(用于刷新后的补流)。
|
||||
sendEvent := func(eventType, message string, data interface{}) {
|
||||
if h.taskEventBus == nil {
|
||||
return
|
||||
}
|
||||
ev := StreamEvent{Type: eventType, Message: message, Data: data}
|
||||
b, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
b = []byte(`{"type":"error","message":"marshal failed"}`)
|
||||
}
|
||||
line := make([]byte, 0, len(b)+8)
|
||||
line = append(line, []byte("data: ")...)
|
||||
line = append(line, b...)
|
||||
line = append(line, '\n', '\n')
|
||||
h.taskEventBus.Publish(conversationID, line)
|
||||
}
|
||||
|
||||
if _, err := h.tasks.StartTask(conversationID, task.Message, cancelWithCause); err != nil {
|
||||
h.logger.Warn("批量队列子任务注册会话运行状态失败",
|
||||
zap.String("queueId", queueID),
|
||||
zap.String("taskId", task.ID),
|
||||
zap.String("conversationId", conversationID),
|
||||
zap.Error(err))
|
||||
failMsg := err.Error()
|
||||
if errors.Is(err, ErrTaskAlreadyRunning) {
|
||||
failMsg = "会话已有任务正在执行,无法在该会话上并行启动批量子任务"
|
||||
}
|
||||
h.batchTaskManager.UpdateTaskStatus(queueID, task.ID, "failed", "", failMsg)
|
||||
return
|
||||
}
|
||||
registered = true
|
||||
// 存储取消函数:暂停队列时取消子任务 context(与原先语义一致)
|
||||
h.batchTaskManager.SetTaskCancel(queueID, timeoutCancel)
|
||||
|
||||
// 创建进度回调函数:写 DB + 镜像到 task-events,支持刷新后继续流式展示。
|
||||
progressCallback = h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent)
|
||||
|
||||
// 使用队列配置的角色工具列表(如果为空,表示使用所有工具)
|
||||
useBatchMulti := false
|
||||
useEinoSingle := false
|
||||
batchOrch := "deep"
|
||||
am := strings.TrimSpace(strings.ToLower(queue.AgentMode))
|
||||
if am == "multi" {
|
||||
am = "deep"
|
||||
}
|
||||
if am == "eino_single" {
|
||||
useEinoSingle = true
|
||||
} else if batchQueueWantsEino(queue.AgentMode) && h.config != nil && h.config.MultiAgent.Enabled {
|
||||
useBatchMulti = true
|
||||
batchOrch = "deep"
|
||||
batchOrch = config.NormalizeMultiAgentOrchestration(am)
|
||||
} else if queue.AgentMode == "" {
|
||||
// 兼容历史数据:未配置队列代理模式时,沿用旧的系统级开关
|
||||
if h.config != nil && h.config.MultiAgent.Enabled && h.config.MultiAgent.BatchUseMultiAgent {
|
||||
useBatchMulti = true
|
||||
batchOrch = "deep"
|
||||
}
|
||||
}
|
||||
}
|
||||
useRunResult := useBatchMulti || useEinoSingle
|
||||
var result *agent.AgentLoopResult
|
||||
var resultMA *multiagent.RunResult
|
||||
var runErr error
|
||||
switch {
|
||||
case useBatchMulti:
|
||||
resultMA, runErr = multiagent.RunDeepAgent(ctx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch)
|
||||
case useEinoSingle:
|
||||
if h.config == nil {
|
||||
runErr = fmt.Errorf("服务器配置未加载")
|
||||
} else {
|
||||
resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(ctx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback)
|
||||
useRunResult := useBatchMulti || useEinoSingle
|
||||
var result *agent.AgentLoopResult
|
||||
var resultMA *multiagent.RunResult
|
||||
var runErr error
|
||||
switch {
|
||||
case useBatchMulti:
|
||||
resultMA, runErr = multiagent.RunDeepAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch)
|
||||
case useEinoSingle:
|
||||
if h.config == nil {
|
||||
runErr = fmt.Errorf("服务器配置未加载")
|
||||
} else {
|
||||
resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, finalMessage, []agent.ChatMessage{}, roleTools, progressCallback)
|
||||
}
|
||||
default:
|
||||
result, runErr = h.agent.AgentLoopWithProgress(taskCtx, finalMessage, []agent.ChatMessage{}, conversationID, progressCallback, roleTools)
|
||||
}
|
||||
default:
|
||||
result, runErr = h.agent.AgentLoopWithProgress(ctx, finalMessage, []agent.ChatMessage{}, conversationID, progressCallback, roleTools)
|
||||
}
|
||||
// 任务执行完成,清理取消函数
|
||||
h.batchTaskManager.SetTaskCancel(queueID, nil)
|
||||
cancel()
|
||||
|
||||
if runErr != nil {
|
||||
if useRunResult {
|
||||
h.persistEinoAgentTraceForResume(conversationID, resultMA)
|
||||
}
|
||||
// 检查是否是取消错误
|
||||
// 1. 直接检查是否是 context.Canceled(包括包装后的错误)
|
||||
// 2. 检查错误消息中是否包含"context canceled"或"cancelled"关键字
|
||||
// 3. 检查 result.Response 中是否包含取消相关的消息
|
||||
errStr := runErr.Error()
|
||||
partialResp := ""
|
||||
if useRunResult && resultMA != nil {
|
||||
partialResp = resultMA.Response
|
||||
} else if result != nil {
|
||||
partialResp = result.Response
|
||||
}
|
||||
isCancelled := errors.Is(runErr, context.Canceled) ||
|
||||
strings.Contains(strings.ToLower(errStr), "context canceled") ||
|
||||
strings.Contains(strings.ToLower(errStr), "context cancelled") ||
|
||||
(partialResp != "" && (strings.Contains(partialResp, "任务已被取消") || strings.Contains(partialResp, "任务执行中断")))
|
||||
if runErr != nil {
|
||||
if useRunResult {
|
||||
h.persistEinoAgentTraceForResume(conversationID, resultMA)
|
||||
}
|
||||
// 检查是否是取消错误
|
||||
// 1. 直接检查是否是 context.Canceled(包括包装后的错误)
|
||||
// 2. 检查错误消息中是否包含"context canceled"或"cancelled"关键字
|
||||
// 3. 检查 result.Response 中是否包含取消相关的消息
|
||||
errStr := runErr.Error()
|
||||
partialResp := ""
|
||||
if useRunResult && resultMA != nil {
|
||||
partialResp = resultMA.Response
|
||||
} else if result != nil {
|
||||
partialResp = result.Response
|
||||
}
|
||||
isCancelled := errors.Is(context.Cause(baseCtx), ErrTaskCancelled) ||
|
||||
errors.Is(runErr, context.Canceled) ||
|
||||
strings.Contains(strings.ToLower(errStr), "context canceled") ||
|
||||
strings.Contains(strings.ToLower(errStr), "context cancelled") ||
|
||||
(partialResp != "" && (strings.Contains(partialResp, "任务已被取消") || strings.Contains(partialResp, "任务执行中断")))
|
||||
isTimeout := errors.Is(runErr, context.DeadlineExceeded) || errors.Is(context.Cause(taskCtx), context.DeadlineExceeded)
|
||||
|
||||
if isCancelled {
|
||||
if isTimeout {
|
||||
finishStatus = "timeout"
|
||||
} else if isCancelled {
|
||||
finishStatus = "cancelled"
|
||||
} else {
|
||||
finishStatus = "failed"
|
||||
}
|
||||
|
||||
if isCancelled {
|
||||
h.logger.Info("批量任务被取消", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.String("conversationId", conversationID))
|
||||
cancelMsg := "任务已被用户取消,后续操作已停止。"
|
||||
// 如果执行结果中有更具体的取消消息,使用它
|
||||
@@ -2527,9 +2595,9 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
|
||||
// 更新助手消息内容
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
cancelMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新取消后的助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr))
|
||||
}
|
||||
@@ -2561,9 +2629,9 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
|
||||
// 更新助手消息内容
|
||||
if assistantMessageID != "" {
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, updated_at = ? WHERE id = ?",
|
||||
errorMsg,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新失败后的助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr))
|
||||
}
|
||||
@@ -2600,10 +2668,10 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
if _, updateErr := h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
resText,
|
||||
mcpIDsJSON,
|
||||
assistantMessageID,
|
||||
time.Now(), assistantMessageID,
|
||||
); updateErr != nil {
|
||||
h.logger.Warn("更新助手消息失败", zap.String("queueId", queueID), zap.String("taskId", task.ID), zap.Error(updateErr))
|
||||
// 如果更新失败,尝试创建新消息
|
||||
@@ -2632,6 +2700,7 @@ func (h *AgentHandler) executeBatchQueue(queueID string) {
|
||||
// 保存结果
|
||||
h.batchTaskManager.UpdateTaskStatusWithConversationID(queueID, task.ID, "completed", resText, "", conversationID)
|
||||
}
|
||||
}()
|
||||
|
||||
// 移动到下一个任务
|
||||
h.batchTaskManager.MoveToNextTask(queueID)
|
||||
|
||||
@@ -136,7 +136,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
|
||||
sendEvent("error", errorMsg, nil)
|
||||
}
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errorMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
|
||||
}
|
||||
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
|
||||
return
|
||||
@@ -182,7 +182,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
cancelMsg := "任务已被用户取消,后续操作已停止。"
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", cancelMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil)
|
||||
}
|
||||
sendEvent("cancelled", cancelMsg, map[string]interface{}{
|
||||
@@ -198,7 +198,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
timeoutMsg := "任务执行超时,已自动终止。"
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", timeoutMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", timeoutMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "timeout", timeoutMsg, nil)
|
||||
}
|
||||
sendEvent("error", timeoutMsg, map[string]interface{}{
|
||||
@@ -215,7 +215,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
errMsg := "执行失败: " + runErr.Error()
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
|
||||
}
|
||||
sendEvent("error", errMsg, map[string]interface{}{
|
||||
@@ -233,9 +233,10 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, _ = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response,
|
||||
mcpIDsJSON,
|
||||
time.Now(),
|
||||
assistantMessageID,
|
||||
)
|
||||
}
|
||||
@@ -319,9 +320,10 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) {
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, _ = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response,
|
||||
mcpIDsJSON,
|
||||
time.Now(),
|
||||
prep.AssistantMessageID,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -152,7 +152,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
|
||||
sendEvent("error", errorMsg, nil)
|
||||
}
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errorMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errorMsg, time.Now(), assistantMessageID)
|
||||
}
|
||||
sendEvent("done", "", map[string]interface{}{"conversationId": conversationID})
|
||||
return
|
||||
@@ -192,7 +192,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
cancelMsg := "任务已被用户取消,后续操作已停止。"
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", cancelMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", cancelMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "cancelled", cancelMsg, nil)
|
||||
}
|
||||
sendEvent("cancelled", cancelMsg, map[string]interface{}{
|
||||
@@ -208,7 +208,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
timeoutMsg := "任务执行超时,已自动终止。"
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", timeoutMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", timeoutMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "timeout", timeoutMsg, nil)
|
||||
}
|
||||
sendEvent("error", timeoutMsg, map[string]interface{}{
|
||||
@@ -225,7 +225,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
|
||||
h.tasks.UpdateTaskStatus(conversationID, taskStatus)
|
||||
errMsg := "执行失败: " + runErr.Error()
|
||||
if assistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, assistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), assistantMessageID)
|
||||
_ = h.db.AddProcessDetail(assistantMessageID, conversationID, "error", errMsg, nil)
|
||||
}
|
||||
sendEvent("error", errMsg, map[string]interface{}{
|
||||
@@ -243,9 +243,10 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, _ = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response,
|
||||
mcpIDsJSON,
|
||||
time.Now(),
|
||||
assistantMessageID,
|
||||
)
|
||||
}
|
||||
@@ -323,7 +324,7 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
|
||||
h.logger.Error("Eino DeepAgent 执行失败", zap.Error(runErr))
|
||||
errMsg := "执行失败: " + runErr.Error()
|
||||
if prep.AssistantMessageID != "" {
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ? WHERE id = ?", errMsg, prep.AssistantMessageID)
|
||||
_, _ = h.db.Exec("UPDATE messages SET content = ?, updated_at = ? WHERE id = ?", errMsg, time.Now(), prep.AssistantMessageID)
|
||||
}
|
||||
c.JSON(http.StatusInternalServerError, gin.H{"error": errMsg})
|
||||
return
|
||||
@@ -336,9 +337,10 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
|
||||
mcpIDsJSON = string(jsonData)
|
||||
}
|
||||
_, _ = h.db.Exec(
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ? WHERE id = ?",
|
||||
"UPDATE messages SET content = ?, mcp_execution_ids = ?, updated_at = ? WHERE id = ?",
|
||||
result.Response,
|
||||
mcpIDsJSON,
|
||||
time.Now(),
|
||||
prep.AssistantMessageID,
|
||||
)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user