mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-24 16:34:17 +02:00
Add files via upload
This commit is contained in:
+149
-31
@@ -6,39 +6,75 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cyberstrike-ai/internal/config"
|
||||
"cyberstrike-ai/internal/openai"
|
||||
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/time/rate"
|
||||
)
|
||||
|
||||
// Embedder 文本嵌入器
|
||||
type Embedder struct {
|
||||
openAIClient *openai.Client
|
||||
config *config.KnowledgeConfig
|
||||
openAIConfig *config.OpenAIConfig // 用于获取API Key
|
||||
logger *zap.Logger
|
||||
openAIClient *openai.Client
|
||||
config *config.KnowledgeConfig
|
||||
openAIConfig *config.OpenAIConfig // 用于获取 API Key
|
||||
logger *zap.Logger
|
||||
rateLimiter *rate.Limiter // 速率限制器
|
||||
rateLimitDelay time.Duration // 请求间隔时间
|
||||
maxRetries int // 最大重试次数
|
||||
retryDelay time.Duration // 重试间隔
|
||||
mu sync.Mutex // 保护 rateLimiter
|
||||
}
|
||||
|
||||
// NewEmbedder 创建新的嵌入器
|
||||
func NewEmbedder(cfg *config.KnowledgeConfig, openAIConfig *config.OpenAIConfig, openAIClient *openai.Client, logger *zap.Logger) *Embedder {
|
||||
// 初始化速率限制器
|
||||
var rateLimiter *rate.Limiter
|
||||
var rateLimitDelay time.Duration
|
||||
|
||||
// 如果配置了 MaxRPM,根据 RPM 计算速率限制
|
||||
if cfg.Indexing.MaxRPM > 0 {
|
||||
rpm := cfg.Indexing.MaxRPM
|
||||
rateLimiter = rate.NewLimiter(rate.Every(time.Minute/time.Duration(rpm)), rpm)
|
||||
logger.Info("知识库索引速率限制已启用", zap.Int("maxRPM", rpm))
|
||||
} else if cfg.Indexing.RateLimitDelayMs > 0 {
|
||||
// 如果没有配置 MaxRPM 但配置了固定延迟,使用固定延迟模式
|
||||
rateLimitDelay = time.Duration(cfg.Indexing.RateLimitDelayMs) * time.Millisecond
|
||||
logger.Info("知识库索引固定延迟已启用", zap.Duration("delay", rateLimitDelay))
|
||||
}
|
||||
|
||||
// 重试配置
|
||||
maxRetries := 3
|
||||
retryDelay := 1000 * time.Millisecond
|
||||
if cfg.Indexing.MaxRetries > 0 {
|
||||
maxRetries = cfg.Indexing.MaxRetries
|
||||
}
|
||||
if cfg.Indexing.RetryDelayMs > 0 {
|
||||
retryDelay = time.Duration(cfg.Indexing.RetryDelayMs) * time.Millisecond
|
||||
}
|
||||
|
||||
return &Embedder{
|
||||
openAIClient: openAIClient,
|
||||
config: cfg,
|
||||
openAIConfig: openAIConfig,
|
||||
logger: logger,
|
||||
openAIClient: openAIClient,
|
||||
config: cfg,
|
||||
openAIConfig: openAIConfig,
|
||||
logger: logger,
|
||||
rateLimiter: rateLimiter,
|
||||
rateLimitDelay: rateLimitDelay,
|
||||
maxRetries: maxRetries,
|
||||
retryDelay: retryDelay,
|
||||
}
|
||||
}
|
||||
|
||||
// EmbeddingRequest OpenAI嵌入请求
|
||||
// EmbeddingRequest OpenAI 嵌入请求
|
||||
type EmbeddingRequest struct {
|
||||
Model string `json:"model"`
|
||||
Input []string `json:"input"`
|
||||
}
|
||||
|
||||
// EmbeddingResponse OpenAI嵌入响应
|
||||
// EmbeddingResponse OpenAI 嵌入响应
|
||||
type EmbeddingResponse struct {
|
||||
Data []EmbeddingData `json:"data"`
|
||||
Error *EmbeddingError `json:"error,omitempty"`
|
||||
@@ -56,12 +92,69 @@ type EmbeddingError struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
|
||||
// EmbedText 对文本进行嵌入
|
||||
func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error) {
|
||||
if e.openAIClient == nil {
|
||||
return nil, fmt.Errorf("OpenAI客户端未初始化")
|
||||
// waitRateLimiter 等待速率限制器
|
||||
func (e *Embedder) waitRateLimiter() {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if e.rateLimiter != nil {
|
||||
// 等待令牌
|
||||
ctx := context.Background()
|
||||
if err := e.rateLimiter.Wait(ctx); err != nil {
|
||||
e.logger.Warn("速率限制器等待失败", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
if e.rateLimitDelay > 0 {
|
||||
time.Sleep(e.rateLimitDelay)
|
||||
}
|
||||
}
|
||||
|
||||
// EmbedText 对文本进行嵌入(带重试和速率限制)
|
||||
func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error) {
|
||||
if e.openAIClient == nil {
|
||||
return nil, fmt.Errorf("OpenAI 客户端未初始化")
|
||||
}
|
||||
|
||||
var lastErr error
|
||||
for attempt := 0; attempt < e.maxRetries; attempt++ {
|
||||
// 速率限制
|
||||
if attempt > 0 {
|
||||
// 重试时等待更长时间
|
||||
waitTime := e.retryDelay * time.Duration(attempt)
|
||||
e.logger.Debug("重试前等待", zap.Int("attempt", attempt+1), zap.Duration("waitTime", waitTime))
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(waitTime):
|
||||
}
|
||||
} else {
|
||||
e.waitRateLimiter()
|
||||
}
|
||||
|
||||
result, err := e.doEmbedText(ctx, text)
|
||||
if err == nil {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
lastErr = err
|
||||
|
||||
// 检查是否是可重试的错误(429 速率限制、5xx 服务器错误、网络错误)
|
||||
if !e.isRetryableError(err) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.logger.Debug("嵌入请求失败,准备重试",
|
||||
zap.Int("attempt", attempt+1),
|
||||
zap.Int("maxRetries", e.maxRetries),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("达到最大重试次数 (%d): %v", e.maxRetries, lastErr)
|
||||
}
|
||||
|
||||
// doEmbedText 执行实际的嵌入请求(内部方法)
|
||||
func (e *Embedder) doEmbedText(ctx context.Context, text string) ([]float32, error) {
|
||||
// 使用配置的嵌入模型
|
||||
model := e.config.Embedding.Model
|
||||
if model == "" {
|
||||
@@ -73,7 +166,7 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
Input: []string{text},
|
||||
}
|
||||
|
||||
// 清理baseURL:去除前后空格和尾部斜杠
|
||||
// 清理 baseURL:去除前后空格和尾部斜杠
|
||||
baseURL := strings.TrimSpace(e.config.Embedding.BaseURL)
|
||||
baseURL = strings.TrimSuffix(baseURL, "/")
|
||||
if baseURL == "" {
|
||||
@@ -83,24 +176,24 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
// 构建请求
|
||||
body, err := json.Marshal(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("序列化请求失败: %w", err)
|
||||
return nil, fmt.Errorf("序列化请求失败:%w", err)
|
||||
}
|
||||
|
||||
requestURL := baseURL + "/embeddings"
|
||||
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, requestURL, strings.NewReader(string(body)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("创建请求失败: %w", err)
|
||||
return nil, fmt.Errorf("创建请求失败:%w", err)
|
||||
}
|
||||
|
||||
httpReq.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// 使用配置的API Key,如果没有则使用OpenAI配置的
|
||||
|
||||
// 使用配置的 API Key,如果没有则使用 OpenAI 配置的
|
||||
apiKey := strings.TrimSpace(e.config.Embedding.APIKey)
|
||||
if apiKey == "" && e.openAIConfig != nil {
|
||||
apiKey = e.openAIConfig.APIKey
|
||||
}
|
||||
if apiKey == "" {
|
||||
return nil, fmt.Errorf("API Key未配置")
|
||||
return nil, fmt.Errorf("API Key 未配置")
|
||||
}
|
||||
httpReq.Header.Set("Authorization", "Bearer "+apiKey)
|
||||
|
||||
@@ -110,7 +203,7 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
}
|
||||
resp, err := httpClient.Do(httpReq)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("发送请求失败: %w", err)
|
||||
return nil, fmt.Errorf("发送请求失败:%w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
@@ -132,7 +225,7 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
if len(requestBodyPreview) > 200 {
|
||||
requestBodyPreview = requestBodyPreview[:200] + "..."
|
||||
}
|
||||
e.logger.Debug("嵌入API请求",
|
||||
e.logger.Debug("嵌入 API 请求",
|
||||
zap.String("url", httpReq.URL.String()),
|
||||
zap.String("model", model),
|
||||
zap.String("requestBody", requestBodyPreview),
|
||||
@@ -148,12 +241,12 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
if len(bodyPreview) > 500 {
|
||||
bodyPreview = bodyPreview[:500] + "..."
|
||||
}
|
||||
return nil, fmt.Errorf("解析响应失败 (URL: %s, 状态码: %d, 响应长度: %d字节): %w\n请求体: %s\n响应内容预览: %s",
|
||||
return nil, fmt.Errorf("解析响应失败 (URL: %s, 状态码:%d, 响应长度:%d字节): %w\n请求体:%s\n响应内容预览:%s",
|
||||
requestURL, resp.StatusCode, len(bodyBytes), err, requestBodyPreview, bodyPreview)
|
||||
}
|
||||
|
||||
if embeddingResp.Error != nil {
|
||||
return nil, fmt.Errorf("OpenAI API错误 (状态码: %d): 类型=%s, 消息=%s",
|
||||
return nil, fmt.Errorf("OpenAI API 错误 (状态码:%d): 类型=%s, 消息=%s",
|
||||
resp.StatusCode, embeddingResp.Error.Type, embeddingResp.Error.Message)
|
||||
}
|
||||
|
||||
@@ -162,7 +255,7 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
if len(bodyPreview) > 500 {
|
||||
bodyPreview = bodyPreview[:500] + "..."
|
||||
}
|
||||
return nil, fmt.Errorf("HTTP请求失败 (URL: %s, 状态码: %d): 响应内容=%s", requestURL, resp.StatusCode, bodyPreview)
|
||||
return nil, fmt.Errorf("HTTP 请求失败 (URL: %s, 状态码:%d): 响应内容=%s", requestURL, resp.StatusCode, bodyPreview)
|
||||
}
|
||||
|
||||
if len(embeddingResp.Data) == 0 {
|
||||
@@ -170,11 +263,11 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
if len(bodyPreview) > 500 {
|
||||
bodyPreview = bodyPreview[:500] + "..."
|
||||
}
|
||||
return nil, fmt.Errorf("未收到嵌入数据 (状态码: %d, 响应长度: %d字节)\n响应内容: %s",
|
||||
return nil, fmt.Errorf("未收到嵌入数据 (状态码:%d, 响应长度:%d字节)\n响应内容:%s",
|
||||
resp.StatusCode, len(bodyBytes), bodyPreview)
|
||||
}
|
||||
|
||||
// 转换为float32
|
||||
// 转换为 float32
|
||||
embedding := make([]float32, len(embeddingResp.Data[0].Embedding))
|
||||
for i, v := range embeddingResp.Data[0].Embedding {
|
||||
embedding[i] = float32(v)
|
||||
@@ -183,23 +276,48 @@ func (e *Embedder) EmbedText(ctx context.Context, text string) ([]float32, error
|
||||
return embedding, nil
|
||||
}
|
||||
|
||||
// isRetryableError 判断是否是可重试的错误
|
||||
func (e *Embedder) isRetryableError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
errStr := err.Error()
|
||||
|
||||
// 429 速率限制错误
|
||||
if strings.Contains(errStr, "429") || strings.Contains(errStr, "rate limit") {
|
||||
return true
|
||||
}
|
||||
|
||||
// 5xx 服务器错误
|
||||
if strings.Contains(errStr, "500") || strings.Contains(errStr, "502") ||
|
||||
strings.Contains(errStr, "503") || strings.Contains(errStr, "504") {
|
||||
return true
|
||||
}
|
||||
|
||||
// 网络错误
|
||||
if strings.Contains(errStr, "timeout") || strings.Contains(errStr, "connection") ||
|
||||
strings.Contains(errStr, "network") || strings.Contains(errStr, "EOF") {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// EmbedTexts 批量嵌入文本
|
||||
func (e *Embedder) EmbedTexts(ctx context.Context, texts []string) ([][]float32, error) {
|
||||
if len(texts) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// OpenAI API支持批量,但为了简单起见,我们逐个处理
|
||||
// 实际可以使用批量API以提高效率
|
||||
embeddings := make([][]float32, len(texts))
|
||||
for i, text := range texts {
|
||||
embedding, err := e.EmbedText(ctx, text)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("嵌入文本[%d]失败: %w", i, err)
|
||||
return nil, fmt.Errorf("嵌入文本 [%d] 失败:%w", i, err)
|
||||
}
|
||||
embeddings[i] = embedding
|
||||
}
|
||||
|
||||
return embeddings, nil
|
||||
}
|
||||
|
||||
|
||||
+256
-92
@@ -10,56 +10,104 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"cyberstrike-ai/internal/config"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Indexer 索引器,负责将知识项分块并向量化
|
||||
type Indexer struct {
|
||||
db *sql.DB
|
||||
embedder *Embedder
|
||||
logger *zap.Logger
|
||||
chunkSize int // 每个块的最大token数(估算)
|
||||
overlap int // 块之间的重叠token数
|
||||
|
||||
db *sql.DB
|
||||
embedder *Embedder
|
||||
logger *zap.Logger
|
||||
chunkSize int // 每个块的最大 token 数(估算)
|
||||
overlap int // 块之间的重叠 token 数
|
||||
maxChunks int // 单个知识项的最大块数量(0 表示不限制)
|
||||
|
||||
// 错误跟踪
|
||||
mu sync.RWMutex
|
||||
lastError string // 最近一次错误信息
|
||||
mu sync.RWMutex
|
||||
lastError string // 最近一次错误信息
|
||||
lastErrorTime time.Time // 最近一次错误时间
|
||||
errorCount int // 连续错误计数
|
||||
errorCount int // 连续错误计数
|
||||
|
||||
// 重建索引状态跟踪
|
||||
rebuildMu sync.RWMutex
|
||||
isRebuilding bool // 是否正在重建索引
|
||||
rebuildTotalItems int // 重建总项数
|
||||
rebuildCurrent int // 当前已处理项数
|
||||
rebuildFailed int // 重建失败项数
|
||||
rebuildStartTime time.Time // 重建开始时间
|
||||
rebuildLastItemID string // 最近处理的项 ID
|
||||
rebuildLastChunks int // 最近处理的项的分块数
|
||||
}
|
||||
|
||||
// NewIndexer 创建新的索引器
|
||||
func NewIndexer(db *sql.DB, embedder *Embedder, logger *zap.Logger) *Indexer {
|
||||
func NewIndexer(db *sql.DB, embedder *Embedder, logger *zap.Logger, indexingCfg *config.IndexingConfig) *Indexer {
|
||||
chunkSize := 512
|
||||
overlap := 50
|
||||
maxChunks := 0
|
||||
if indexingCfg != nil {
|
||||
if indexingCfg.ChunkSize > 0 {
|
||||
chunkSize = indexingCfg.ChunkSize
|
||||
}
|
||||
if indexingCfg.ChunkOverlap >= 0 {
|
||||
overlap = indexingCfg.ChunkOverlap
|
||||
}
|
||||
if indexingCfg.MaxChunksPerItem > 0 {
|
||||
maxChunks = indexingCfg.MaxChunksPerItem
|
||||
}
|
||||
}
|
||||
return &Indexer{
|
||||
db: db,
|
||||
embedder: embedder,
|
||||
logger: logger,
|
||||
chunkSize: 512, // 默认512 tokens
|
||||
overlap: 50, // 默认50 tokens重叠
|
||||
chunkSize: chunkSize,
|
||||
overlap: overlap,
|
||||
maxChunks: maxChunks,
|
||||
}
|
||||
}
|
||||
|
||||
// ChunkText 将文本分块(支持重叠)
|
||||
// ChunkText 将文本分块(支持重叠,保留标题上下文)
|
||||
func (idx *Indexer) ChunkText(text string) []string {
|
||||
// 按Markdown标题分割
|
||||
chunks := idx.splitByMarkdownHeaders(text)
|
||||
// 按 Markdown 标题分割,获取带标题的块
|
||||
sections := idx.splitByMarkdownHeadersWithContent(text)
|
||||
|
||||
// 如果块太大,进一步分割
|
||||
// 处理每个块
|
||||
result := make([]string, 0)
|
||||
for _, chunk := range chunks {
|
||||
if idx.estimateTokens(chunk) <= idx.chunkSize {
|
||||
result = append(result, chunk)
|
||||
for _, section := range sections {
|
||||
// 构建完整的标题路径(如 "SQL 注入 > 检测方法 > 工具扫描")
|
||||
headerPath := strings.Join(section.HeaderPath, " > ")
|
||||
|
||||
// 如果块太大,进一步分割
|
||||
if idx.estimateTokens(section.Content) <= idx.chunkSize {
|
||||
// 块大小合适,直接添加标题前缀
|
||||
if headerPath != "" {
|
||||
result = append(result, fmt.Sprintf("[%s] %s", headerPath, section.Content))
|
||||
} else {
|
||||
result = append(result, section.Content)
|
||||
}
|
||||
} else {
|
||||
// 按段落分割
|
||||
subChunks := idx.splitByParagraphs(chunk)
|
||||
for _, subChunk := range subChunks {
|
||||
if idx.estimateTokens(subChunk) <= idx.chunkSize {
|
||||
result = append(result, subChunk)
|
||||
// 块太大,按段落分割
|
||||
paragraphs := idx.splitByParagraphs(section.Content)
|
||||
for _, para := range paragraphs {
|
||||
if idx.estimateTokens(para) <= idx.chunkSize {
|
||||
// 段落大小合适,添加标题前缀
|
||||
if headerPath != "" {
|
||||
result = append(result, fmt.Sprintf("[%s] %s", headerPath, para))
|
||||
} else {
|
||||
result = append(result, para)
|
||||
}
|
||||
} else {
|
||||
// 按句子分割(支持重叠)
|
||||
chunksWithOverlap := idx.splitBySentencesWithOverlap(subChunk)
|
||||
result = append(result, chunksWithOverlap...)
|
||||
// 段落仍太大,按句子分割(带重叠和标题前缀)
|
||||
sentenceChunks := idx.splitBySentencesWithOverlap(para)
|
||||
for _, chunk := range sentenceChunks {
|
||||
if headerPath != "" {
|
||||
result = append(result, fmt.Sprintf("[%s] %s", headerPath, chunk))
|
||||
} else {
|
||||
result = append(result, chunk)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -68,43 +116,104 @@ func (idx *Indexer) ChunkText(text string) []string {
|
||||
return result
|
||||
}
|
||||
|
||||
// splitByMarkdownHeaders 按Markdown标题分割
|
||||
func (idx *Indexer) splitByMarkdownHeaders(text string) []string {
|
||||
// 匹配Markdown标题 (# ## ### 等)
|
||||
// Section 表示一个带标题路径的文本块
|
||||
type Section struct {
|
||||
HeaderPath []string // 标题路径(如 ["# SQL 注入", "## 检测方法"])
|
||||
Content string // 块内容
|
||||
}
|
||||
|
||||
// splitByMarkdownHeadersWithContent 按 Markdown 标题分割,返回带标题路径的块
|
||||
func (idx *Indexer) splitByMarkdownHeadersWithContent(text string) []Section {
|
||||
// 匹配 Markdown 标题 (# ## ### 等)
|
||||
headerRegex := regexp.MustCompile(`(?m)^#{1,6}\s+.+$`)
|
||||
|
||||
// 找到所有标题位置
|
||||
matches := headerRegex.FindAllStringIndex(text, -1)
|
||||
if len(matches) == 0 {
|
||||
return []string{text}
|
||||
// 没有标题,返回整个文本
|
||||
return []Section{{HeaderPath: []string{}, Content: text}}
|
||||
}
|
||||
|
||||
chunks := make([]string, 0)
|
||||
sections := make([]Section, 0)
|
||||
currentHeaderPath := []string{}
|
||||
lastPos := 0
|
||||
|
||||
for _, match := range matches {
|
||||
for i, match := range matches {
|
||||
start := match[0]
|
||||
if start > lastPos {
|
||||
chunks = append(chunks, strings.TrimSpace(text[lastPos:start]))
|
||||
end := match[1]
|
||||
|
||||
// 提取当前标题
|
||||
headerLine := strings.TrimSpace(text[start:end])
|
||||
|
||||
// 计算标题层级(# 的数量)
|
||||
level := 0
|
||||
for _, ch := range headerLine {
|
||||
if ch == '#' {
|
||||
level++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
lastPos = start
|
||||
|
||||
// 更新标题路径
|
||||
// 移除比当前层级深或等于的子标题
|
||||
newPath := make([]string, 0)
|
||||
for _, h := range currentHeaderPath {
|
||||
hLevel := 0
|
||||
for _, ch := range h {
|
||||
if ch == '#' {
|
||||
hLevel++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if hLevel < level {
|
||||
newPath = append(newPath, h)
|
||||
}
|
||||
}
|
||||
newPath = append(newPath, headerLine)
|
||||
currentHeaderPath = newPath
|
||||
|
||||
// 提取上一个标题到当前标题之间的内容
|
||||
if start > lastPos {
|
||||
content := strings.TrimSpace(text[lastPos:start])
|
||||
// 使用上一层的标题路径
|
||||
prevPath := make([]string, len(currentHeaderPath))
|
||||
copy(prevPath, currentHeaderPath)
|
||||
if i > 0 {
|
||||
// 去掉当前标题,使用父级路径
|
||||
if len(prevPath) > 0 {
|
||||
prevPath = prevPath[:len(prevPath)-1]
|
||||
}
|
||||
}
|
||||
sections = append(sections, Section{
|
||||
HeaderPath: prevPath,
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
|
||||
lastPos = end
|
||||
}
|
||||
|
||||
// 添加最后一部分
|
||||
// 添加最后一部分(最后一个标题之后的内容)
|
||||
if lastPos < len(text) {
|
||||
chunks = append(chunks, strings.TrimSpace(text[lastPos:]))
|
||||
content := strings.TrimSpace(text[lastPos:])
|
||||
sections = append(sections, Section{
|
||||
HeaderPath: currentHeaderPath,
|
||||
Content: content,
|
||||
})
|
||||
}
|
||||
|
||||
// 过滤空块
|
||||
result := make([]string, 0)
|
||||
for _, chunk := range chunks {
|
||||
if strings.TrimSpace(chunk) != "" {
|
||||
result = append(result, chunk)
|
||||
result := make([]Section, 0)
|
||||
for _, section := range sections {
|
||||
if strings.TrimSpace(section.Content) != "" {
|
||||
result = append(result, section)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result) == 0 {
|
||||
return []string{text}
|
||||
return []Section{{HeaderPath: []string{}, Content: text}}
|
||||
}
|
||||
|
||||
return result
|
||||
@@ -124,8 +233,12 @@ func (idx *Indexer) splitByParagraphs(text string) []string {
|
||||
|
||||
// splitBySentences 按句子分割(用于内部,不包含重叠逻辑)
|
||||
func (idx *Indexer) splitBySentences(text string) []string {
|
||||
// 简单的句子分割(按句号、问号、感叹号)
|
||||
sentenceRegex := regexp.MustCompile(`[.!?]+\s+`)
|
||||
// 简单的句子分割(按句号、问号、感叹号,支持中英文)
|
||||
// . ! ? = 英文标点
|
||||
// \u3002 = 。(中文句号)
|
||||
// \uFF01 = !(中文叹号)
|
||||
// \uFF1F = ?(中文问号)
|
||||
sentenceRegex := regexp.MustCompile(`[.!?\x{3002}\x{FF01}\x{FF1F}]+`)
|
||||
sentences := sentenceRegex.Split(text, -1)
|
||||
result := make([]string, 0)
|
||||
for _, s := range sentences {
|
||||
@@ -221,13 +334,13 @@ func (idx *Indexer) splitBySentencesSimple(text string) []string {
|
||||
return result
|
||||
}
|
||||
|
||||
// extractLastTokens 从文本末尾提取指定token数量的内容
|
||||
// extractLastTokens 从文本末尾提取指定 token 数量的内容
|
||||
func (idx *Indexer) extractLastTokens(text string, tokenCount int) string {
|
||||
if tokenCount <= 0 || text == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
// 估算字符数(1 token ≈ 4字符)
|
||||
// 估算字符数(1 token ≈ 4 字符)
|
||||
charCount := tokenCount * 4
|
||||
runes := []rune(text)
|
||||
|
||||
@@ -236,12 +349,11 @@ func (idx *Indexer) extractLastTokens(text string, tokenCount int) string {
|
||||
}
|
||||
|
||||
// 从末尾提取指定数量的字符
|
||||
// 尝试在句子边界处截断,避免截断句子中间
|
||||
startPos := len(runes) - charCount
|
||||
extracted := string(runes[startPos:])
|
||||
|
||||
// 尝试找到第一个句子边界(句号、问号、感叹号后的空格)
|
||||
sentenceBoundary := regexp.MustCompile(`[.!?]+\s+`)
|
||||
// 尝试找到第一个句子边界(支持中英文标点)
|
||||
sentenceBoundary := regexp.MustCompile(`[.!?\x{3002}\x{FF01}\x{FF1F}]+`)
|
||||
matches := sentenceBoundary.FindStringIndex(extracted)
|
||||
if len(matches) > 0 && matches[0] > 0 {
|
||||
// 在句子边界处截断,保留完整句子
|
||||
@@ -251,41 +363,51 @@ func (idx *Indexer) extractLastTokens(text string, tokenCount int) string {
|
||||
return strings.TrimSpace(extracted)
|
||||
}
|
||||
|
||||
// estimateTokens 估算token数(简单估算:1 token ≈ 4字符)
|
||||
// estimateTokens 估算 token 数(简单估算:1 token ≈ 4 字符)
|
||||
func (idx *Indexer) estimateTokens(text string) int {
|
||||
return len([]rune(text)) / 4
|
||||
}
|
||||
|
||||
// IndexItem 索引知识项(分块并向量化)
|
||||
func (idx *Indexer) IndexItem(ctx context.Context, itemID string) error {
|
||||
// 获取知识项(包含category和title,用于向量化)
|
||||
// 获取知识项(包含 category 和 title,用于向量化)
|
||||
var content, category, title string
|
||||
err := idx.db.QueryRow("SELECT content, category, title FROM knowledge_base_items WHERE id = ?", itemID).Scan(&content, &category, &title)
|
||||
if err != nil {
|
||||
return fmt.Errorf("获取知识项失败: %w", err)
|
||||
return fmt.Errorf("获取知识项失败:%w", err)
|
||||
}
|
||||
|
||||
// 删除旧的向量(在 RebuildIndex 中已经统一清空,这里保留是为了单独调用 IndexItem 时的兼容性)
|
||||
_, err = idx.db.Exec("DELETE FROM knowledge_embeddings WHERE item_id = ?", itemID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("删除旧向量失败: %w", err)
|
||||
return fmt.Errorf("删除旧向量失败:%w", err)
|
||||
}
|
||||
|
||||
// 分块
|
||||
chunks := idx.ChunkText(content)
|
||||
|
||||
// 应用最大块数限制
|
||||
if idx.maxChunks > 0 && len(chunks) > idx.maxChunks {
|
||||
idx.logger.Info("知识项块数量超过限制,已截断",
|
||||
zap.String("itemId", itemID),
|
||||
zap.Int("originalChunks", len(chunks)),
|
||||
zap.Int("maxChunks", idx.maxChunks))
|
||||
chunks = chunks[:idx.maxChunks]
|
||||
}
|
||||
|
||||
idx.logger.Info("知识项分块完成", zap.String("itemId", itemID), zap.Int("chunks", len(chunks)))
|
||||
|
||||
// 跟踪该知识项的错误
|
||||
itemErrorCount := 0
|
||||
var firstError error
|
||||
firstErrorChunkIndex := -1
|
||||
|
||||
// 向量化每个块(包含category和title信息,以便向量检索时能匹配到风险类型)
|
||||
|
||||
// 向量化每个块(包含 category 和 title 信息,以便向量检索时能匹配到风险类型)
|
||||
for i, chunk := range chunks {
|
||||
// 将category和title信息包含到向量化的文本中
|
||||
// 格式:"[风险类型: {category}] [标题: {title}]\n{chunk内容}"
|
||||
// 这样向量嵌入就会包含风险类型信息,即使SQL过滤失败,向量相似度也能帮助匹配
|
||||
textForEmbedding := fmt.Sprintf("[风险类型: %s] [标题: %s]\n%s", category, title, chunk)
|
||||
// 将 category 和 title 信息包含到向量化的文本中
|
||||
// 格式:"[风险类型:{category}] [标题:{title}]\n{chunk 内容}"
|
||||
// 这样向量嵌入就会包含风险类型信息,即使 SQL 过滤失败,向量相似度也能帮助匹配
|
||||
textForEmbedding := fmt.Sprintf("[风险类型:%s] [标题:%s]\n%s", category, title, chunk)
|
||||
|
||||
embedding, err := idx.embedder.EmbedText(ctx, textForEmbedding)
|
||||
if err != nil {
|
||||
@@ -305,17 +427,17 @@ func (idx *Indexer) IndexItem(ctx context.Context, itemID string) error {
|
||||
zap.String("chunkPreview", chunkPreview),
|
||||
zap.Error(err),
|
||||
)
|
||||
|
||||
|
||||
// 更新全局错误跟踪
|
||||
errorMsg := fmt.Sprintf("向量化失败 (知识项: %s): %v", itemID, err)
|
||||
errorMsg := fmt.Sprintf("向量化失败 (知识项:%s): %v", itemID, err)
|
||||
idx.mu.Lock()
|
||||
idx.lastError = errorMsg
|
||||
idx.lastErrorTime = time.Now()
|
||||
idx.mu.Unlock()
|
||||
}
|
||||
|
||||
// 如果连续失败2个块,立即停止处理该知识项(降低阈值,更快停止)
|
||||
// 这样可以避免继续浪费API调用,同时也能更快地检测到配置问题
|
||||
|
||||
// 如果连续失败 2 个块,立即停止处理该知识项(降低阈值,更快停止)
|
||||
// 这样可以避免继续浪费 API 调用,同时也能更快地检测到配置问题
|
||||
if itemErrorCount >= 2 {
|
||||
idx.logger.Error("知识项连续向量化失败,停止处理",
|
||||
zap.String("itemId", itemID),
|
||||
@@ -344,6 +466,13 @@ func (idx *Indexer) IndexItem(ctx context.Context, itemID string) error {
|
||||
}
|
||||
|
||||
idx.logger.Info("知识项索引完成", zap.String("itemId", itemID), zap.Int("chunks", len(chunks)))
|
||||
|
||||
// 更新重建状态中的最近处理信息
|
||||
idx.rebuildMu.Lock()
|
||||
idx.rebuildLastItemID = itemID
|
||||
idx.rebuildLastChunks = len(chunks)
|
||||
idx.rebuildMu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -352,23 +481,38 @@ func (idx *Indexer) HasIndex() (bool, error) {
|
||||
var count int
|
||||
err := idx.db.QueryRow("SELECT COUNT(*) FROM knowledge_embeddings").Scan(&count)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("检查索引失败: %w", err)
|
||||
return false, fmt.Errorf("检查索引失败:%w", err)
|
||||
}
|
||||
return count > 0, nil
|
||||
}
|
||||
|
||||
// RebuildIndex 重建所有索引
|
||||
func (idx *Indexer) RebuildIndex(ctx context.Context) error {
|
||||
// 设置重建状态
|
||||
idx.rebuildMu.Lock()
|
||||
idx.isRebuilding = true
|
||||
idx.rebuildTotalItems = 0
|
||||
idx.rebuildCurrent = 0
|
||||
idx.rebuildFailed = 0
|
||||
idx.rebuildStartTime = time.Now()
|
||||
idx.rebuildLastItemID = ""
|
||||
idx.rebuildLastChunks = 0
|
||||
idx.rebuildMu.Unlock()
|
||||
|
||||
// 重置错误跟踪
|
||||
idx.mu.Lock()
|
||||
idx.lastError = ""
|
||||
idx.lastErrorTime = time.Time{}
|
||||
idx.errorCount = 0
|
||||
idx.mu.Unlock()
|
||||
|
||||
|
||||
rows, err := idx.db.Query("SELECT id FROM knowledge_base_items")
|
||||
if err != nil {
|
||||
return fmt.Errorf("查询知识项失败: %w", err)
|
||||
// 重置重建状态
|
||||
idx.rebuildMu.Lock()
|
||||
idx.isRebuilding = false
|
||||
idx.rebuildMu.Unlock()
|
||||
return fmt.Errorf("查询知识项失败:%w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
@@ -376,34 +520,36 @@ func (idx *Indexer) RebuildIndex(ctx context.Context) error {
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return fmt.Errorf("扫描知识项ID失败: %w", err)
|
||||
// 重置重建状态
|
||||
idx.rebuildMu.Lock()
|
||||
idx.isRebuilding = false
|
||||
idx.rebuildMu.Unlock()
|
||||
return fmt.Errorf("扫描知识项 ID 失败:%w", err)
|
||||
}
|
||||
itemIDs = append(itemIDs, id)
|
||||
}
|
||||
|
||||
idx.rebuildMu.Lock()
|
||||
idx.rebuildTotalItems = len(itemIDs)
|
||||
idx.rebuildMu.Unlock()
|
||||
|
||||
idx.logger.Info("开始重建索引", zap.Int("totalItems", len(itemIDs)))
|
||||
|
||||
// 在开始重建前,先清空所有旧的向量,确保进度从0开始
|
||||
// 这样 GetIndexStatus 可以准确反映重建进度
|
||||
_, err = idx.db.Exec("DELETE FROM knowledge_embeddings")
|
||||
if err != nil {
|
||||
idx.logger.Warn("清空旧索引失败", zap.Error(err))
|
||||
// 继续执行,即使清空失败也尝试重建
|
||||
} else {
|
||||
idx.logger.Info("已清空旧索引,开始重建")
|
||||
}
|
||||
// 注意:不再清空所有旧索引,而是按增量方式更新
|
||||
// 每个知识项在 IndexItem 中会先删除自己的旧向量,然后插入新向量
|
||||
// 这样配置更新后只重新索引变化的知识项,保留其他知识项的索引
|
||||
|
||||
failedCount := 0
|
||||
consecutiveFailures := 0
|
||||
maxConsecutiveFailures := 2 // 连续失败2次后立即停止(降低阈值,更快停止)
|
||||
maxConsecutiveFailures := 2 // 连续失败 2 次后立即停止(降低阈值,更快停止)
|
||||
firstFailureItemID := ""
|
||||
var firstFailureError error
|
||||
|
||||
|
||||
for i, itemID := range itemIDs {
|
||||
if err := idx.IndexItem(ctx, itemID); err != nil {
|
||||
failedCount++
|
||||
consecutiveFailures++
|
||||
|
||||
|
||||
// 只在第一个失败时记录详细日志
|
||||
if consecutiveFailures == 1 {
|
||||
firstFailureItemID = itemID
|
||||
@@ -414,15 +560,15 @@ func (idx *Indexer) RebuildIndex(ctx context.Context) error {
|
||||
zap.Error(err),
|
||||
)
|
||||
}
|
||||
|
||||
|
||||
// 如果连续失败过多,可能是配置问题,立即停止索引
|
||||
if consecutiveFailures >= maxConsecutiveFailures {
|
||||
errorMsg := fmt.Sprintf("连续 %d 个知识项索引失败,可能存在配置问题(如嵌入模型配置错误、API密钥无效、余额不足等)。第一个失败项: %s, 错误: %v", consecutiveFailures, firstFailureItemID, firstFailureError)
|
||||
errorMsg := fmt.Sprintf("连续 %d 个知识项索引失败,可能存在配置问题(如嵌入模型配置错误、API 密钥无效、余额不足等)。第一个失败项:%s, 错误:%v", consecutiveFailures, firstFailureItemID, firstFailureError)
|
||||
idx.mu.Lock()
|
||||
idx.lastError = errorMsg
|
||||
idx.lastErrorTime = time.Now()
|
||||
idx.mu.Unlock()
|
||||
|
||||
|
||||
idx.logger.Error("连续索引失败次数过多,立即停止索引",
|
||||
zap.Int("consecutiveFailures", consecutiveFailures),
|
||||
zap.Int("totalItems", len(itemIDs)),
|
||||
@@ -430,17 +576,17 @@ func (idx *Indexer) RebuildIndex(ctx context.Context) error {
|
||||
zap.String("firstFailureItemId", firstFailureItemID),
|
||||
zap.Error(firstFailureError),
|
||||
)
|
||||
return fmt.Errorf("连续索引失败次数过多: %v", firstFailureError)
|
||||
return fmt.Errorf("连续索引失败次数过多:%v", firstFailureError)
|
||||
}
|
||||
|
||||
// 如果失败的知识项过多,记录警告但继续处理(降低阈值到30%)
|
||||
|
||||
// 如果失败的知识项过多,记录警告但继续处理(降低阈值到 30%)
|
||||
if failedCount > len(itemIDs)*3/10 && failedCount == len(itemIDs)*3/10+1 {
|
||||
errorMsg := fmt.Sprintf("索引失败的知识项过多 (%d/%d),可能存在配置问题。第一个失败项: %s, 错误: %v", failedCount, len(itemIDs), firstFailureItemID, firstFailureError)
|
||||
errorMsg := fmt.Sprintf("索引失败的知识项过多 (%d/%d),可能存在配置问题。第一个失败项:%s, 错误:%v", failedCount, len(itemIDs), firstFailureItemID, firstFailureError)
|
||||
idx.mu.Lock()
|
||||
idx.lastError = errorMsg
|
||||
idx.lastErrorTime = time.Now()
|
||||
idx.mu.Unlock()
|
||||
|
||||
|
||||
idx.logger.Error("索引失败的知识项过多,可能存在配置问题",
|
||||
zap.Int("failedCount", failedCount),
|
||||
zap.Int("totalItems", len(itemIDs)),
|
||||
@@ -450,20 +596,31 @@ func (idx *Indexer) RebuildIndex(ctx context.Context) error {
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
// 成功时重置连续失败计数和第一个失败信息
|
||||
if consecutiveFailures > 0 {
|
||||
consecutiveFailures = 0
|
||||
firstFailureItemID = ""
|
||||
firstFailureError = nil
|
||||
}
|
||||
|
||||
// 减少进度日志频率(每10个或每10%记录一次)
|
||||
|
||||
// 更新重建进度
|
||||
idx.rebuildMu.Lock()
|
||||
idx.rebuildCurrent = i + 1
|
||||
idx.rebuildFailed = failedCount
|
||||
idx.rebuildMu.Unlock()
|
||||
|
||||
// 减少进度日志频率(每 10 个或每 10% 记录一次)
|
||||
if (i+1)%10 == 0 || (len(itemIDs) > 0 && (i+1)*100/len(itemIDs)%10 == 0 && (i+1)*100/len(itemIDs) > 0) {
|
||||
idx.logger.Info("索引进度", zap.Int("current", i+1), zap.Int("total", len(itemIDs)), zap.Int("failed", failedCount))
|
||||
}
|
||||
}
|
||||
|
||||
// 重置重建状态
|
||||
idx.rebuildMu.Lock()
|
||||
idx.isRebuilding = false
|
||||
idx.rebuildMu.Unlock()
|
||||
|
||||
idx.logger.Info("索引重建完成", zap.Int("totalItems", len(itemIDs)), zap.Int("failedCount", failedCount))
|
||||
return nil
|
||||
}
|
||||
@@ -474,3 +631,10 @@ func (idx *Indexer) GetLastError() (string, time.Time) {
|
||||
defer idx.mu.RUnlock()
|
||||
return idx.lastError, idx.lastErrorTime
|
||||
}
|
||||
|
||||
// GetRebuildStatus 获取重建索引状态
|
||||
func (idx *Indexer) GetRebuildStatus() (isRebuilding bool, totalItems int, current int, failed int, lastItemID string, lastChunks int, startTime time.Time) {
|
||||
idx.rebuildMu.RLock()
|
||||
defer idx.rebuildMu.RUnlock()
|
||||
return idx.isRebuilding, idx.rebuildTotalItems, idx.rebuildCurrent, idx.rebuildFailed, idx.rebuildLastItemID, idx.rebuildLastChunks, idx.rebuildStartTime
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user