diff --git a/internal/database/batch_task.go b/internal/database/batch_task.go index 1fd478b2..8e72ed73 100644 --- a/internal/database/batch_task.go +++ b/internal/database/batch_task.go @@ -507,6 +507,42 @@ func (db *DB) CancelPendingBatchTasks(queueID string, completedAt time.Time) err return nil } +// PrepareBatchSingleTaskRun 准备单条执行:可选重置子任务,并更新队列索引与状态 +func (db *DB) PrepareBatchSingleTaskRun(queueID, taskID string, taskIndex int, resetTask, resumeQueue bool) error { + tx, err := db.Begin() + if err != nil { + return fmt.Errorf("开始事务失败: %w", err) + } + defer tx.Rollback() + + if resetTask { + _, err = tx.Exec( + "UPDATE batch_tasks SET status = ?, conversation_id = NULL, started_at = NULL, completed_at = NULL, error = NULL, result = NULL WHERE queue_id = ? AND id = ?", + "pending", queueID, taskID, + ) + if err != nil { + return fmt.Errorf("重置批量任务状态失败: %w", err) + } + } + + if resumeQueue { + _, err = tx.Exec( + "UPDATE batch_task_queues SET status = ?, current_index = ?, completed_at = NULL, last_run_error = NULL WHERE id = ?", + "paused", taskIndex, queueID, + ) + } else { + _, err = tx.Exec( + "UPDATE batch_task_queues SET current_index = ?, last_run_error = NULL WHERE id = ?", + taskIndex, queueID, + ) + } + if err != nil { + return fmt.Errorf("更新批量任务队列状态失败: %w", err) + } + + return tx.Commit() +} + // DeleteBatchTask 删除批量任务 func (db *DB) DeleteBatchTask(queueID, taskID string) error { _, err := db.Exec( diff --git a/internal/database/conversation.go b/internal/database/conversation.go index ccff1e0e..1803912a 100644 --- a/internal/database/conversation.go +++ b/internal/database/conversation.go @@ -382,26 +382,40 @@ func (db *DB) CountConversations(search string) (int, error) { return count, nil } +func conversationOrderClause(sortBy, tableAlias string) string { + col := "updated_at" + if strings.TrimSpace(strings.ToLower(sortBy)) == "created_at" { + col = "created_at" + } + prefix := tableAlias + if prefix != "" { + prefix += "." + } + return "ORDER BY " + prefix + col + " DESC" +} + // ListConversations 列出所有对话 -func (db *DB) ListConversations(limit, offset int, search string) ([]*Conversation, error) { +func (db *DB) ListConversations(limit, offset int, search, sortBy string) ([]*Conversation, error) { var rows *sql.Rows var err error if search != "" { // 使用 EXISTS 子查询代替 LEFT JOIN + DISTINCT,避免大表笛卡尔积 searchPattern := "%" + search + "%" + orderClause := conversationOrderClause(sortBy, "c") rows, err = db.Query( `SELECT c.id, c.title, COALESCE(c.pinned, 0), c.created_at, c.updated_at, c.project_id FROM conversations c WHERE c.title LIKE ? OR EXISTS (SELECT 1 FROM messages m WHERE m.conversation_id = c.id AND m.content LIKE ?) - ORDER BY c.updated_at DESC + `+orderClause+` LIMIT ? OFFSET ?`, searchPattern, searchPattern, limit, offset, ) } else { + orderClause := conversationOrderClause(sortBy, "") rows, err = db.Query( - "SELECT id, title, COALESCE(pinned, 0), created_at, updated_at, project_id FROM conversations ORDER BY updated_at DESC LIMIT ? OFFSET ?", + "SELECT id, title, COALESCE(pinned, 0), created_at, updated_at, project_id FROM conversations "+orderClause+" LIMIT ? OFFSET ?", limit, offset, ) } @@ -467,11 +481,12 @@ func (db *DB) CountUngroupedConversations() (int, error) { } // ListUngroupedConversations 列出不在任何分组中的对话(最近对话侧栏)。 -func (db *DB) ListUngroupedConversations(limit, offset int) ([]*Conversation, error) { +func (db *DB) ListUngroupedConversations(limit, offset int, sortBy string) ([]*Conversation, error) { + orderClause := conversationOrderClause(sortBy, "c") rows, err := db.Query( `SELECT c.id, c.title, COALESCE(c.pinned, 0), c.created_at, c.updated_at, c.project_id `+ ungroupedConversationsSQL+` - ORDER BY c.updated_at DESC + `+orderClause+` LIMIT ? OFFSET ?`, limit, offset, )