Add files via upload

This commit is contained in:
公明
2026-05-29 14:16:09 +08:00
committed by GitHub
parent d4bc9646d9
commit 55d6d449cd
+120 -4
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"strings"
"time"
@@ -12,19 +13,106 @@ import (
"go.uber.org/zap"
)
const (
// SQLite 在 WAL 模式下建议使用较保守的连接数,降低长读快照导致 checkpoint 饥饿的概率。
sqliteMaxOpenConns = 25
sqliteMaxIdleConns = 5
// 以页为单位的自动 checkpoint 触发阈值(默认 1000 页,约 4MB @ 4KB/page)。
sqliteWALAutoCheckpointPages = 1000
// 控制 WAL 目标上限,避免异常场景持续膨胀(256MB)。
sqliteJournalSizeLimitBytes = 256 * 1024 * 1024
// 定时执行 PASSIVE checkpoint,平滑推进 WAL 回收。
sqlitePassiveCheckpointInterval = 300 * time.Second
)
// configureDBPool 设置 SQLite 连接池参数,提升并发稳定性
func configureDBPool(db *sql.DB) {
// SQLite 同一时间只允许一个写入者,限制连接数避免 "database is locked" 错误
db.SetMaxOpenConns(25)
db.SetMaxIdleConns(5)
// SQLite 同一时间只允许一个写入者;过高连接数会放大锁竞争和 WAL 回收延迟。
db.SetMaxOpenConns(sqliteMaxOpenConns)
db.SetMaxIdleConns(sqliteMaxIdleConns)
db.SetConnMaxLifetime(30 * time.Minute)
}
// configureSQLitePragmas 调整 WAL 回收行为,降低 -wal 文件长期膨胀风险。
func configureSQLitePragmas(db *sql.DB) error {
if _, err := db.Exec(fmt.Sprintf("PRAGMA wal_autocheckpoint=%d", sqliteWALAutoCheckpointPages)); err != nil {
return fmt.Errorf("设置 wal_autocheckpoint 失败: %w", err)
}
if _, err := db.Exec(fmt.Sprintf("PRAGMA journal_size_limit=%d", sqliteJournalSizeLimitBytes)); err != nil {
return fmt.Errorf("设置 journal_size_limit 失败: %w", err)
}
return nil
}
// DB 数据库连接
type DB struct {
*sql.DB
logger *zap.Logger
conversationArtifactsDir string
checkpointLoopName string
checkpointStop chan struct{}
checkpointDone chan struct{}
closeOnce sync.Once
closeErr error
}
// startPassiveCheckpointLoop 启动后台 PASSIVE checkpoint 循环。
func (db *DB) startPassiveCheckpointLoop(name string) {
if sqlitePassiveCheckpointInterval <= 0 || db == nil || db.DB == nil {
return
}
db.checkpointLoopName = strings.TrimSpace(name)
db.checkpointStop = make(chan struct{})
db.checkpointDone = make(chan struct{})
go func() {
defer close(db.checkpointDone)
ticker := time.NewTicker(sqlitePassiveCheckpointInterval)
defer ticker.Stop()
// 启动后先尝试一次,尽快回收已有 WAL 堆积。
db.runPassiveCheckpoint("startup")
for {
select {
case <-db.checkpointStop:
return
case <-ticker.C:
db.runPassiveCheckpoint("ticker")
}
}
}()
}
// runPassiveCheckpoint 执行一次 PRAGMA wal_checkpoint(PASSIVE)。
func (db *DB) runPassiveCheckpoint(trigger string) {
if db == nil || db.DB == nil {
return
}
startAt := time.Now()
var busy, logFrames, checkpointed int
err := db.QueryRow("PRAGMA wal_checkpoint(PASSIVE)").Scan(&busy, &logFrames, &checkpointed)
if db.logger == nil {
return
}
fields := []zap.Field{
zap.String("db", db.checkpointLoopName),
zap.String("trigger", trigger),
zap.Int("busy", busy),
zap.Int("log_frames", logFrames),
zap.Int("checkpointed_frames", checkpointed),
zap.Int64("elapsed_ms", time.Since(startAt).Milliseconds()),
}
if err != nil {
db.logger.Warn("SQLite PASSIVE checkpoint 完成(失败)",
append(fields, zap.Error(err))...,
)
return
}
if busy > 0 {
db.logger.Info("SQLite PASSIVE checkpoint 完成(部分推进)", fields...)
return
}
db.logger.Info("SQLite PASSIVE checkpoint 完成(成功)", fields...)
}
// NewDB 创建数据库连接
@@ -37,8 +125,13 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) {
configureDBPool(db)
if err := db.Ping(); err != nil {
_ = db.Close()
return nil, fmt.Errorf("连接数据库失败: %w", err)
}
if err := configureSQLitePragmas(db); err != nil {
_ = db.Close()
return nil, fmt.Errorf("配置数据库 PRAGMA 失败: %w", err)
}
database := &DB{
DB: db,
@@ -54,8 +147,10 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) {
// 初始化表
if err := database.initTables(); err != nil {
_ = db.Close()
return nil, fmt.Errorf("初始化表失败: %w", err)
}
database.startPassiveCheckpointLoop("conversations")
return database, nil
}
@@ -1159,8 +1254,13 @@ func NewKnowledgeDB(dbPath string, logger *zap.Logger) (*DB, error) {
configureDBPool(sqlDB)
if err := sqlDB.Ping(); err != nil {
_ = sqlDB.Close()
return nil, fmt.Errorf("连接知识库数据库失败: %w", err)
}
if err := configureSQLitePragmas(sqlDB); err != nil {
_ = sqlDB.Close()
return nil, fmt.Errorf("配置知识库数据库 PRAGMA 失败: %w", err)
}
database := &DB{
DB: sqlDB,
@@ -1169,8 +1269,10 @@ func NewKnowledgeDB(dbPath string, logger *zap.Logger) (*DB, error) {
// 初始化知识库表
if err := database.initKnowledgeTables(); err != nil {
_ = sqlDB.Close()
return nil, fmt.Errorf("初始化知识库表失败: %w", err)
}
database.startPassiveCheckpointLoop("knowledge")
return database, nil
}
@@ -1284,5 +1386,19 @@ func (db *DB) migrateKnowledgeEmbeddingsColumns() error {
// Close 关闭数据库连接
func (db *DB) Close() error {
return db.DB.Close()
if db == nil {
return nil
}
db.closeOnce.Do(func() {
if db.checkpointStop != nil {
close(db.checkpointStop)
if db.checkpointDone != nil {
<-db.checkpointDone
}
}
if db.DB != nil {
db.closeErr = db.DB.Close()
}
})
return db.closeErr
}