diff --git a/internal/database/database.go b/internal/database/database.go index eb8fe27b..7b39b52e 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "strings" "time" @@ -12,19 +13,106 @@ import ( "go.uber.org/zap" ) +const ( + // SQLite 在 WAL 模式下建议使用较保守的连接数,降低长读快照导致 checkpoint 饥饿的概率。 + sqliteMaxOpenConns = 25 + sqliteMaxIdleConns = 5 + // 以页为单位的自动 checkpoint 触发阈值(默认 1000 页,约 4MB @ 4KB/page)。 + sqliteWALAutoCheckpointPages = 1000 + // 控制 WAL 目标上限,避免异常场景持续膨胀(256MB)。 + sqliteJournalSizeLimitBytes = 256 * 1024 * 1024 + // 定时执行 PASSIVE checkpoint,平滑推进 WAL 回收。 + sqlitePassiveCheckpointInterval = 300 * time.Second +) + // configureDBPool 设置 SQLite 连接池参数,提升并发稳定性 func configureDBPool(db *sql.DB) { - // SQLite 同一时间只允许一个写入者,限制连接数避免 "database is locked" 错误 - db.SetMaxOpenConns(25) - db.SetMaxIdleConns(5) + // SQLite 同一时间只允许一个写入者;过高连接数会放大锁竞争和 WAL 回收延迟。 + db.SetMaxOpenConns(sqliteMaxOpenConns) + db.SetMaxIdleConns(sqliteMaxIdleConns) db.SetConnMaxLifetime(30 * time.Minute) } +// configureSQLitePragmas 调整 WAL 回收行为,降低 -wal 文件长期膨胀风险。 +func configureSQLitePragmas(db *sql.DB) error { + if _, err := db.Exec(fmt.Sprintf("PRAGMA wal_autocheckpoint=%d", sqliteWALAutoCheckpointPages)); err != nil { + return fmt.Errorf("设置 wal_autocheckpoint 失败: %w", err) + } + if _, err := db.Exec(fmt.Sprintf("PRAGMA journal_size_limit=%d", sqliteJournalSizeLimitBytes)); err != nil { + return fmt.Errorf("设置 journal_size_limit 失败: %w", err) + } + return nil +} + // DB 数据库连接 type DB struct { *sql.DB logger *zap.Logger conversationArtifactsDir string + checkpointLoopName string + checkpointStop chan struct{} + checkpointDone chan struct{} + closeOnce sync.Once + closeErr error +} + +// startPassiveCheckpointLoop 启动后台 PASSIVE checkpoint 循环。 +func (db *DB) startPassiveCheckpointLoop(name string) { + if sqlitePassiveCheckpointInterval <= 0 || db == nil || db.DB == nil { + return + } + db.checkpointLoopName = strings.TrimSpace(name) + db.checkpointStop = make(chan struct{}) + db.checkpointDone = make(chan struct{}) + + go func() { + defer close(db.checkpointDone) + ticker := time.NewTicker(sqlitePassiveCheckpointInterval) + defer ticker.Stop() + + // 启动后先尝试一次,尽快回收已有 WAL 堆积。 + db.runPassiveCheckpoint("startup") + for { + select { + case <-db.checkpointStop: + return + case <-ticker.C: + db.runPassiveCheckpoint("ticker") + } + } + }() +} + +// runPassiveCheckpoint 执行一次 PRAGMA wal_checkpoint(PASSIVE)。 +func (db *DB) runPassiveCheckpoint(trigger string) { + if db == nil || db.DB == nil { + return + } + startAt := time.Now() + var busy, logFrames, checkpointed int + err := db.QueryRow("PRAGMA wal_checkpoint(PASSIVE)").Scan(&busy, &logFrames, &checkpointed) + if db.logger == nil { + return + } + fields := []zap.Field{ + zap.String("db", db.checkpointLoopName), + zap.String("trigger", trigger), + zap.Int("busy", busy), + zap.Int("log_frames", logFrames), + zap.Int("checkpointed_frames", checkpointed), + zap.Int64("elapsed_ms", time.Since(startAt).Milliseconds()), + } + if err != nil { + db.logger.Warn("SQLite PASSIVE checkpoint 完成(失败)", + append(fields, zap.Error(err))..., + ) + return + } + if busy > 0 { + db.logger.Info("SQLite PASSIVE checkpoint 完成(部分推进)", fields...) + return + } + db.logger.Info("SQLite PASSIVE checkpoint 完成(成功)", fields...) } // NewDB 创建数据库连接 @@ -37,8 +125,13 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) { configureDBPool(db) if err := db.Ping(); err != nil { + _ = db.Close() return nil, fmt.Errorf("连接数据库失败: %w", err) } + if err := configureSQLitePragmas(db); err != nil { + _ = db.Close() + return nil, fmt.Errorf("配置数据库 PRAGMA 失败: %w", err) + } database := &DB{ DB: db, @@ -54,8 +147,10 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) { // 初始化表 if err := database.initTables(); err != nil { + _ = db.Close() return nil, fmt.Errorf("初始化表失败: %w", err) } + database.startPassiveCheckpointLoop("conversations") return database, nil } @@ -1159,8 +1254,13 @@ func NewKnowledgeDB(dbPath string, logger *zap.Logger) (*DB, error) { configureDBPool(sqlDB) if err := sqlDB.Ping(); err != nil { + _ = sqlDB.Close() return nil, fmt.Errorf("连接知识库数据库失败: %w", err) } + if err := configureSQLitePragmas(sqlDB); err != nil { + _ = sqlDB.Close() + return nil, fmt.Errorf("配置知识库数据库 PRAGMA 失败: %w", err) + } database := &DB{ DB: sqlDB, @@ -1169,8 +1269,10 @@ func NewKnowledgeDB(dbPath string, logger *zap.Logger) (*DB, error) { // 初始化知识库表 if err := database.initKnowledgeTables(); err != nil { + _ = sqlDB.Close() return nil, fmt.Errorf("初始化知识库表失败: %w", err) } + database.startPassiveCheckpointLoop("knowledge") return database, nil } @@ -1284,5 +1386,19 @@ func (db *DB) migrateKnowledgeEmbeddingsColumns() error { // Close 关闭数据库连接 func (db *DB) Close() error { - return db.DB.Close() + if db == nil { + return nil + } + db.closeOnce.Do(func() { + if db.checkpointStop != nil { + close(db.checkpointStop) + if db.checkpointDone != nil { + <-db.checkpointDone + } + } + if db.DB != nil { + db.closeErr = db.DB.Close() + } + }) + return db.closeErr }