Delete database directory

This commit is contained in:
公明
2026-05-14 19:24:58 +08:00
committed by GitHub
parent 4ee292cc1f
commit 2167735022
12 changed files with 0 additions and 5655 deletions
-167
View File
@@ -1,167 +0,0 @@
package database
import (
"database/sql"
"encoding/json"
"fmt"
"go.uber.org/zap"
)
// AttackChainNode 攻击链节点
type AttackChainNode struct {
ID string `json:"id"`
Type string `json:"type"` // tool, vulnerability, target, exploit
Label string `json:"label"`
ToolExecutionID string `json:"tool_execution_id,omitempty"`
Metadata map[string]interface{} `json:"metadata"`
RiskScore int `json:"risk_score"`
}
// AttackChainEdge 攻击链边
type AttackChainEdge struct {
ID string `json:"id"`
Source string `json:"source"`
Target string `json:"target"`
Type string `json:"type"` // leads_to, exploits, enables, depends_on
Weight int `json:"weight"`
}
// SaveAttackChainNode 保存攻击链节点
func (db *DB) SaveAttackChainNode(conversationID, nodeID, nodeType, nodeName, toolExecutionID, metadata string, riskScore int) error {
var toolExecID sql.NullString
if toolExecutionID != "" {
toolExecID = sql.NullString{String: toolExecutionID, Valid: true}
}
var metadataJSON sql.NullString
if metadata != "" {
metadataJSON = sql.NullString{String: metadata, Valid: true}
}
query := `
INSERT OR REPLACE INTO attack_chain_nodes
(id, conversation_id, node_type, node_name, tool_execution_id, metadata, risk_score, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
`
_, err := db.Exec(query, nodeID, conversationID, nodeType, nodeName, toolExecID, metadataJSON, riskScore)
if err != nil {
db.logger.Error("保存攻击链节点失败", zap.Error(err), zap.String("nodeId", nodeID))
return err
}
return nil
}
// SaveAttackChainEdge 保存攻击链边
func (db *DB) SaveAttackChainEdge(conversationID, edgeID, sourceNodeID, targetNodeID, edgeType string, weight int) error {
query := `
INSERT OR REPLACE INTO attack_chain_edges
(id, conversation_id, source_node_id, target_node_id, edge_type, weight, created_at)
VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
`
_, err := db.Exec(query, edgeID, conversationID, sourceNodeID, targetNodeID, edgeType, weight)
if err != nil {
db.logger.Error("保存攻击链边失败", zap.Error(err), zap.String("edgeId", edgeID))
return err
}
return nil
}
// LoadAttackChainNodes 加载攻击链节点
func (db *DB) LoadAttackChainNodes(conversationID string) ([]AttackChainNode, error) {
query := `
SELECT id, node_type, node_name, tool_execution_id, metadata, risk_score
FROM attack_chain_nodes
WHERE conversation_id = ?
ORDER BY created_at ASC
`
rows, err := db.Query(query, conversationID)
if err != nil {
return nil, fmt.Errorf("查询攻击链节点失败: %w", err)
}
defer rows.Close()
var nodes []AttackChainNode
for rows.Next() {
var node AttackChainNode
var toolExecID sql.NullString
var metadataJSON sql.NullString
err := rows.Scan(&node.ID, &node.Type, &node.Label, &toolExecID, &metadataJSON, &node.RiskScore)
if err != nil {
db.logger.Warn("扫描攻击链节点失败", zap.Error(err))
continue
}
if toolExecID.Valid {
node.ToolExecutionID = toolExecID.String
}
if metadataJSON.Valid && metadataJSON.String != "" {
if err := json.Unmarshal([]byte(metadataJSON.String), &node.Metadata); err != nil {
db.logger.Warn("解析节点元数据失败", zap.Error(err))
node.Metadata = make(map[string]interface{})
}
} else {
node.Metadata = make(map[string]interface{})
}
nodes = append(nodes, node)
}
return nodes, nil
}
// LoadAttackChainEdges 加载攻击链边
func (db *DB) LoadAttackChainEdges(conversationID string) ([]AttackChainEdge, error) {
query := `
SELECT id, source_node_id, target_node_id, edge_type, weight
FROM attack_chain_edges
WHERE conversation_id = ?
ORDER BY created_at ASC
`
rows, err := db.Query(query, conversationID)
if err != nil {
return nil, fmt.Errorf("查询攻击链边失败: %w", err)
}
defer rows.Close()
var edges []AttackChainEdge
for rows.Next() {
var edge AttackChainEdge
err := rows.Scan(&edge.ID, &edge.Source, &edge.Target, &edge.Type, &edge.Weight)
if err != nil {
db.logger.Warn("扫描攻击链边失败", zap.Error(err))
continue
}
edges = append(edges, edge)
}
return edges, nil
}
// DeleteAttackChain 删除对话的攻击链数据
func (db *DB) DeleteAttackChain(conversationID string) error {
// 先删除边(因为有外键约束)
_, err := db.Exec("DELETE FROM attack_chain_edges WHERE conversation_id = ?", conversationID)
if err != nil {
db.logger.Warn("删除攻击链边失败", zap.Error(err))
}
// 再删除节点
_, err = db.Exec("DELETE FROM attack_chain_nodes WHERE conversation_id = ?", conversationID)
if err != nil {
db.logger.Error("删除攻击链节点失败", zap.Error(err), zap.String("conversationId", conversationID))
return err
}
return nil
}
-537
View File
@@ -1,537 +0,0 @@
package database
import (
"database/sql"
"fmt"
"strings"
"time"
"go.uber.org/zap"
)
// BatchTaskQueueRow 批量任务队列数据库行
type BatchTaskQueueRow struct {
ID string
Title sql.NullString
Role sql.NullString
AgentMode sql.NullString
ScheduleMode sql.NullString
CronExpr sql.NullString
NextRunAt sql.NullTime
ScheduleEnabled sql.NullInt64
LastScheduleTriggerAt sql.NullTime
LastScheduleError sql.NullString
LastRunError sql.NullString
Status string
CreatedAt time.Time
StartedAt sql.NullTime
CompletedAt sql.NullTime
CurrentIndex int
}
// BatchTaskRow 批量任务数据库行
type BatchTaskRow struct {
ID string
QueueID string
Message string
ConversationID sql.NullString
Status string
StartedAt sql.NullTime
CompletedAt sql.NullTime
Error sql.NullString
Result sql.NullString
}
// CreateBatchQueue 创建批量任务队列
func (db *DB) CreateBatchQueue(
queueID string,
title string,
role string,
agentMode string,
scheduleMode string,
cronExpr string,
nextRunAt *time.Time,
tasks []map[string]interface{},
) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
now := time.Now()
var nextRunAtValue interface{}
if nextRunAt != nil {
nextRunAtValue = *nextRunAt
}
_, err = tx.Exec(
"INSERT INTO batch_task_queues (id, title, role, agent_mode, schedule_mode, cron_expr, next_run_at, schedule_enabled, status, created_at, current_index) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
queueID, title, role, agentMode, scheduleMode, cronExpr, nextRunAtValue, 1, "pending", now, 0,
)
if err != nil {
return fmt.Errorf("创建批量任务队列失败: %w", err)
}
// 插入任务
for _, task := range tasks {
taskID, ok := task["id"].(string)
if !ok {
continue
}
message, ok := task["message"].(string)
if !ok {
continue
}
_, err = tx.Exec(
"INSERT INTO batch_tasks (id, queue_id, message, status) VALUES (?, ?, ?, ?)",
taskID, queueID, message, "pending",
)
if err != nil {
return fmt.Errorf("创建批量任务失败: %w", err)
}
}
return tx.Commit()
}
// GetBatchQueue 获取批量任务队列
func (db *DB) GetBatchQueue(queueID string) (*BatchTaskQueueRow, error) {
var row BatchTaskQueueRow
var createdAt string
err := db.QueryRow(
"SELECT id, title, role, agent_mode, schedule_mode, cron_expr, next_run_at, schedule_enabled, last_schedule_trigger_at, last_schedule_error, last_run_error, status, created_at, started_at, completed_at, current_index FROM batch_task_queues WHERE id = ?",
queueID,
).Scan(&row.ID, &row.Title, &row.Role, &row.AgentMode, &row.ScheduleMode, &row.CronExpr, &row.NextRunAt, &row.ScheduleEnabled, &row.LastScheduleTriggerAt, &row.LastScheduleError, &row.LastRunError, &row.Status, &createdAt, &row.StartedAt, &row.CompletedAt, &row.CurrentIndex)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
return nil, fmt.Errorf("查询批量任务队列失败: %w", err)
}
parsedTime, parseErr := time.Parse("2006-01-02 15:04:05", createdAt)
if parseErr != nil {
// 尝试其他时间格式
parsedTime, parseErr = time.Parse(time.RFC3339, createdAt)
if parseErr != nil {
db.logger.Warn("解析创建时间失败", zap.String("createdAt", createdAt), zap.Error(parseErr))
parsedTime = time.Now()
}
}
row.CreatedAt = parsedTime
return &row, nil
}
// GetAllBatchQueues 获取所有批量任务队列
func (db *DB) GetAllBatchQueues() ([]*BatchTaskQueueRow, error) {
rows, err := db.Query(
"SELECT id, title, role, agent_mode, schedule_mode, cron_expr, next_run_at, schedule_enabled, last_schedule_trigger_at, last_schedule_error, last_run_error, status, created_at, started_at, completed_at, current_index FROM batch_task_queues ORDER BY created_at DESC",
)
if err != nil {
return nil, fmt.Errorf("查询批量任务队列列表失败: %w", err)
}
defer rows.Close()
var queues []*BatchTaskQueueRow
for rows.Next() {
var row BatchTaskQueueRow
var createdAt string
if err := rows.Scan(&row.ID, &row.Title, &row.Role, &row.AgentMode, &row.ScheduleMode, &row.CronExpr, &row.NextRunAt, &row.ScheduleEnabled, &row.LastScheduleTriggerAt, &row.LastScheduleError, &row.LastRunError, &row.Status, &createdAt, &row.StartedAt, &row.CompletedAt, &row.CurrentIndex); err != nil {
return nil, fmt.Errorf("扫描批量任务队列失败: %w", err)
}
parsedTime, parseErr := time.Parse("2006-01-02 15:04:05", createdAt)
if parseErr != nil {
parsedTime, parseErr = time.Parse(time.RFC3339, createdAt)
if parseErr != nil {
db.logger.Warn("解析创建时间失败", zap.String("createdAt", createdAt), zap.Error(parseErr))
parsedTime = time.Now()
}
}
row.CreatedAt = parsedTime
queues = append(queues, &row)
}
return queues, nil
}
// ListBatchQueues 列出批量任务队列(支持筛选和分页)
func (db *DB) ListBatchQueues(limit, offset int, status, keyword string) ([]*BatchTaskQueueRow, error) {
query := "SELECT id, title, role, agent_mode, schedule_mode, cron_expr, next_run_at, schedule_enabled, last_schedule_trigger_at, last_schedule_error, last_run_error, status, created_at, started_at, completed_at, current_index FROM batch_task_queues WHERE 1=1"
args := []interface{}{}
// 状态筛选
if status != "" && status != "all" {
query += " AND status = ?"
args = append(args, status)
}
// 关键字搜索(搜索队列ID和标题)
if keyword != "" {
query += " AND (id LIKE ? OR title LIKE ?)"
args = append(args, "%"+keyword+"%", "%"+keyword+"%")
}
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("查询批量任务队列列表失败: %w", err)
}
defer rows.Close()
var queues []*BatchTaskQueueRow
for rows.Next() {
var row BatchTaskQueueRow
var createdAt string
if err := rows.Scan(&row.ID, &row.Title, &row.Role, &row.AgentMode, &row.ScheduleMode, &row.CronExpr, &row.NextRunAt, &row.ScheduleEnabled, &row.LastScheduleTriggerAt, &row.LastScheduleError, &row.LastRunError, &row.Status, &createdAt, &row.StartedAt, &row.CompletedAt, &row.CurrentIndex); err != nil {
return nil, fmt.Errorf("扫描批量任务队列失败: %w", err)
}
parsedTime, parseErr := time.Parse("2006-01-02 15:04:05", createdAt)
if parseErr != nil {
parsedTime, parseErr = time.Parse(time.RFC3339, createdAt)
if parseErr != nil {
db.logger.Warn("解析创建时间失败", zap.String("createdAt", createdAt), zap.Error(parseErr))
parsedTime = time.Now()
}
}
row.CreatedAt = parsedTime
queues = append(queues, &row)
}
return queues, nil
}
// CountBatchQueues 统计批量任务队列总数(支持筛选条件)
func (db *DB) CountBatchQueues(status, keyword string) (int, error) {
query := "SELECT COUNT(*) FROM batch_task_queues WHERE 1=1"
args := []interface{}{}
// 状态筛选
if status != "" && status != "all" {
query += " AND status = ?"
args = append(args, status)
}
// 关键字搜索(搜索队列ID和标题)
if keyword != "" {
query += " AND (id LIKE ? OR title LIKE ?)"
args = append(args, "%"+keyword+"%", "%"+keyword+"%")
}
var count int
err := db.QueryRow(query, args...).Scan(&count)
if err != nil {
return 0, fmt.Errorf("统计批量任务队列总数失败: %w", err)
}
return count, nil
}
// GetBatchTasks 获取批量任务队列的所有任务
func (db *DB) GetBatchTasks(queueID string) ([]*BatchTaskRow, error) {
rows, err := db.Query(
"SELECT id, queue_id, message, conversation_id, status, started_at, completed_at, error, result FROM batch_tasks WHERE queue_id = ? ORDER BY id",
queueID,
)
if err != nil {
return nil, fmt.Errorf("查询批量任务失败: %w", err)
}
defer rows.Close()
var tasks []*BatchTaskRow
for rows.Next() {
var task BatchTaskRow
if err := rows.Scan(
&task.ID, &task.QueueID, &task.Message, &task.ConversationID,
&task.Status, &task.StartedAt, &task.CompletedAt, &task.Error, &task.Result,
); err != nil {
return nil, fmt.Errorf("扫描批量任务失败: %w", err)
}
tasks = append(tasks, &task)
}
return tasks, nil
}
// UpdateBatchQueueStatus 更新批量任务队列状态
func (db *DB) UpdateBatchQueueStatus(queueID, status string) error {
var err error
now := time.Now()
if status == "running" {
_, err = db.Exec(
"UPDATE batch_task_queues SET status = ?, started_at = COALESCE(started_at, ?) WHERE id = ?",
status, now, queueID,
)
} else if status == "completed" || status == "cancelled" {
_, err = db.Exec(
"UPDATE batch_task_queues SET status = ?, completed_at = COALESCE(completed_at, ?) WHERE id = ?",
status, now, queueID,
)
} else {
_, err = db.Exec(
"UPDATE batch_task_queues SET status = ? WHERE id = ?",
status, queueID,
)
}
if err != nil {
return fmt.Errorf("更新批量任务队列状态失败: %w", err)
}
return nil
}
// UpdateBatchTaskStatus 更新批量任务状态
func (db *DB) UpdateBatchTaskStatus(queueID, taskID, status string, conversationID, result, errorMsg string) error {
var err error
now := time.Now()
// 构建更新语句
var updates []string
var args []interface{}
updates = append(updates, "status = ?")
args = append(args, status)
if conversationID != "" {
updates = append(updates, "conversation_id = ?")
args = append(args, conversationID)
}
if result != "" {
updates = append(updates, "result = ?")
args = append(args, result)
}
if errorMsg != "" {
updates = append(updates, "error = ?")
args = append(args, errorMsg)
}
if status == "running" {
updates = append(updates, "started_at = COALESCE(started_at, ?)")
args = append(args, now)
}
if status == "completed" || status == "failed" || status == "cancelled" {
updates = append(updates, "completed_at = COALESCE(completed_at, ?)")
args = append(args, now)
}
args = append(args, queueID, taskID)
// 构建SQL语句
sql := "UPDATE batch_tasks SET "
for i, update := range updates {
if i > 0 {
sql += ", "
}
sql += update
}
sql += " WHERE queue_id = ? AND id = ?"
_, err = db.Exec(sql, args...)
if err != nil {
return fmt.Errorf("更新批量任务状态失败: %w", err)
}
return nil
}
// UpdateBatchQueueCurrentIndex 更新批量任务队列的当前索引
func (db *DB) UpdateBatchQueueCurrentIndex(queueID string, currentIndex int) error {
_, err := db.Exec(
"UPDATE batch_task_queues SET current_index = ? WHERE id = ?",
currentIndex, queueID,
)
if err != nil {
return fmt.Errorf("更新批量任务队列当前索引失败: %w", err)
}
return nil
}
// UpdateBatchQueueMetadata 更新批量任务队列标题、角色和代理模式
func (db *DB) UpdateBatchQueueMetadata(queueID, title, role, agentMode string) error {
_, err := db.Exec(
"UPDATE batch_task_queues SET title = ?, role = ?, agent_mode = ? WHERE id = ?",
title, role, agentMode, queueID,
)
if err != nil {
return fmt.Errorf("更新批量任务队列元数据失败: %w", err)
}
return nil
}
// UpdateBatchQueueSchedule 更新批量任务队列调度相关信息
func (db *DB) UpdateBatchQueueSchedule(queueID, scheduleMode, cronExpr string, nextRunAt *time.Time) error {
var nextRunAtValue interface{}
if nextRunAt != nil {
nextRunAtValue = *nextRunAt
}
_, err := db.Exec(
"UPDATE batch_task_queues SET schedule_mode = ?, cron_expr = ?, next_run_at = ? WHERE id = ?",
scheduleMode, cronExpr, nextRunAtValue, queueID,
)
if err != nil {
return fmt.Errorf("更新批量任务调度配置失败: %w", err)
}
return nil
}
// UpdateBatchQueueScheduleEnabled 是否允许 Cron 自动触发(手工「开始执行」不受影响)
func (db *DB) UpdateBatchQueueScheduleEnabled(queueID string, enabled bool) error {
v := 0
if enabled {
v = 1
}
_, err := db.Exec(
"UPDATE batch_task_queues SET schedule_enabled = ? WHERE id = ?",
v, queueID,
)
if err != nil {
return fmt.Errorf("更新批量任务调度开关失败: %w", err)
}
return nil
}
// RecordBatchQueueScheduledTriggerStart 记录一次由调度触发的开始时间并清空调度层错误
func (db *DB) RecordBatchQueueScheduledTriggerStart(queueID string, at time.Time) error {
_, err := db.Exec(
"UPDATE batch_task_queues SET last_schedule_trigger_at = ?, last_schedule_error = NULL WHERE id = ?",
at, queueID,
)
if err != nil {
return fmt.Errorf("记录调度触发时间失败: %w", err)
}
return nil
}
// SetBatchQueueLastScheduleError 调度启动失败等原因(如状态不允许、重置失败)
func (db *DB) SetBatchQueueLastScheduleError(queueID, msg string) error {
_, err := db.Exec(
"UPDATE batch_task_queues SET last_schedule_error = ? WHERE id = ?",
msg, queueID,
)
if err != nil {
return fmt.Errorf("写入调度错误信息失败: %w", err)
}
return nil
}
// SetBatchQueueLastRunError 最近一轮执行中出现的子任务失败摘要(空串表示清空)
func (db *DB) SetBatchQueueLastRunError(queueID, msg string) error {
var v interface{}
if strings.TrimSpace(msg) == "" {
v = nil
} else {
v = msg
}
_, err := db.Exec(
"UPDATE batch_task_queues SET last_run_error = ? WHERE id = ?",
v, queueID,
)
if err != nil {
return fmt.Errorf("写入最近运行错误失败: %w", err)
}
return nil
}
// ResetBatchQueueForRerun 重置队列和任务状态用于下一轮调度执行
func (db *DB) ResetBatchQueueForRerun(queueID string) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
_, err = tx.Exec(
"UPDATE batch_task_queues SET status = ?, current_index = 0, started_at = NULL, completed_at = NULL, last_run_error = NULL, last_schedule_error = NULL WHERE id = ?",
"pending", queueID,
)
if err != nil {
return fmt.Errorf("重置批量任务队列状态失败: %w", err)
}
_, err = tx.Exec(
"UPDATE batch_tasks SET status = ?, conversation_id = NULL, started_at = NULL, completed_at = NULL, error = NULL, result = NULL WHERE queue_id = ?",
"pending", queueID,
)
if err != nil {
return fmt.Errorf("重置批量任务状态失败: %w", err)
}
return tx.Commit()
}
// UpdateBatchTaskMessage 更新批量任务消息
func (db *DB) UpdateBatchTaskMessage(queueID, taskID, message string) error {
_, err := db.Exec(
"UPDATE batch_tasks SET message = ? WHERE queue_id = ? AND id = ?",
message, queueID, taskID,
)
if err != nil {
return fmt.Errorf("更新批量任务消息失败: %w", err)
}
return nil
}
// AddBatchTask 添加任务到批量任务队列
func (db *DB) AddBatchTask(queueID, taskID, message string) error {
_, err := db.Exec(
"INSERT INTO batch_tasks (id, queue_id, message, status) VALUES (?, ?, ?, ?)",
taskID, queueID, message, "pending",
)
if err != nil {
return fmt.Errorf("添加批量任务失败: %w", err)
}
return nil
}
// CancelPendingBatchTasks 批量取消队列中所有 pending 状态的任务(单条 SQL
func (db *DB) CancelPendingBatchTasks(queueID string, completedAt time.Time) error {
_, err := db.Exec(
"UPDATE batch_tasks SET status = ?, completed_at = ? WHERE queue_id = ? AND status = ?",
"cancelled", completedAt, queueID, "pending",
)
if err != nil {
return fmt.Errorf("批量取消 pending 任务失败: %w", err)
}
return nil
}
// DeleteBatchTask 删除批量任务
func (db *DB) DeleteBatchTask(queueID, taskID string) error {
_, err := db.Exec(
"DELETE FROM batch_tasks WHERE queue_id = ? AND id = ?",
queueID, taskID,
)
if err != nil {
return fmt.Errorf("删除批量任务失败: %w", err)
}
return nil
}
// DeleteBatchQueue 删除批量任务队列
func (db *DB) DeleteBatchQueue(queueID string) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 删除任务(外键会自动级联删除)
_, err = tx.Exec("DELETE FROM batch_tasks WHERE queue_id = ?", queueID)
if err != nil {
return fmt.Errorf("删除批量任务失败: %w", err)
}
// 删除队列
_, err = tx.Exec("DELETE FROM batch_task_queues WHERE id = ?", queueID)
if err != nil {
return fmt.Errorf("删除批量任务队列失败: %w", err)
}
return tx.Commit()
}
-1259
View File
File diff suppressed because it is too large Load Diff
-812
View File
@@ -1,812 +0,0 @@
package database
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Conversation 对话
type Conversation struct {
ID string `json:"id"`
Title string `json:"title"`
Pinned bool `json:"pinned"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
Messages []Message `json:"messages,omitempty"`
}
// Message 消息
type Message struct {
ID string `json:"id"`
ConversationID string `json:"conversationId"`
Role string `json:"role"`
Content string `json:"content"`
ReasoningContent string `json:"reasoningContent,omitempty"`
MCPExecutionIDs []string `json:"mcpExecutionIds,omitempty"`
ProcessDetails []map[string]interface{} `json:"processDetails,omitempty"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
// CreateConversation 创建新对话
func (db *DB) CreateConversation(title string) (*Conversation, error) {
return db.CreateConversationWithWebshell("", title)
}
// CreateConversationWithWebshell 创建新对话,可选绑定 WebShell 连接 ID(为空则普通对话)
func (db *DB) CreateConversationWithWebshell(webshellConnectionID, title string) (*Conversation, error) {
id := uuid.New().String()
now := time.Now()
var err error
if webshellConnectionID != "" {
_, err = db.Exec(
"INSERT INTO conversations (id, title, created_at, updated_at, webshell_connection_id) VALUES (?, ?, ?, ?, ?)",
id, title, now, now, webshellConnectionID,
)
} else {
_, err = db.Exec(
"INSERT INTO conversations (id, title, created_at, updated_at) VALUES (?, ?, ?, ?)",
id, title, now, now,
)
}
if err != nil {
return nil, fmt.Errorf("创建对话失败: %w", err)
}
return &Conversation{
ID: id,
Title: title,
CreatedAt: now,
UpdatedAt: now,
}, nil
}
// GetConversationByWebshellConnectionID 根据 WebShell 连接 ID 获取该连接下最近一条对话(用于 AI 助手持久化)
func (db *DB) GetConversationByWebshellConnectionID(connectionID string) (*Conversation, error) {
if connectionID == "" {
return nil, fmt.Errorf("connectionID is empty")
}
var conv Conversation
var createdAt, updatedAt string
var pinned int
err := db.QueryRow(
"SELECT id, title, pinned, created_at, updated_at FROM conversations WHERE webshell_connection_id = ? ORDER BY updated_at DESC LIMIT 1",
connectionID,
).Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("查询对话失败: %w", err)
}
conv.Pinned = pinned != 0
if t, e := time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt); e == nil {
conv.CreatedAt = t
} else if t, e := time.Parse("2006-01-02 15:04:05", createdAt); e == nil {
conv.CreatedAt = t
} else {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
if t, e := time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt); e == nil {
conv.UpdatedAt = t
} else if t, e := time.Parse("2006-01-02 15:04:05", updatedAt); e == nil {
conv.UpdatedAt = t
} else {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
messages, err := db.GetMessages(conv.ID)
if err != nil {
return nil, fmt.Errorf("加载消息失败: %w", err)
}
conv.Messages = messages
// 加载过程详情并附加到对应消息(与 GetConversation 一致,便于刷新后仍可查看执行过程)
processDetailsMap, err := db.GetProcessDetailsByConversation(conv.ID)
if err != nil {
db.logger.Warn("加载过程详情失败", zap.Error(err))
processDetailsMap = make(map[string][]ProcessDetail)
}
for i := range conv.Messages {
if details, ok := processDetailsMap[conv.Messages[i].ID]; ok {
detailsJSON := make([]map[string]interface{}, len(details))
for j, detail := range details {
var data interface{}
if detail.Data != "" {
if err := json.Unmarshal([]byte(detail.Data), &data); err != nil {
db.logger.Warn("解析过程详情数据失败", zap.Error(err))
}
}
detailsJSON[j] = map[string]interface{}{
"id": detail.ID,
"messageId": detail.MessageID,
"conversationId": detail.ConversationID,
"eventType": detail.EventType,
"message": detail.Message,
"data": data,
"createdAt": detail.CreatedAt,
}
}
conv.Messages[i].ProcessDetails = detailsJSON
}
}
return &conv, nil
}
// WebShellConversationItem 用于侧边栏列表,不含消息
type WebShellConversationItem struct {
ID string `json:"id"`
Title string `json:"title"`
UpdatedAt time.Time `json:"updatedAt"`
}
// ListConversationsByWebshellConnectionID 列出该 WebShell 连接下的所有对话(按更新时间倒序),供侧边栏展示
func (db *DB) ListConversationsByWebshellConnectionID(connectionID string) ([]WebShellConversationItem, error) {
if connectionID == "" {
return nil, nil
}
rows, err := db.Query(
"SELECT id, title, updated_at FROM conversations WHERE webshell_connection_id = ? ORDER BY updated_at DESC",
connectionID,
)
if err != nil {
return nil, fmt.Errorf("查询对话列表失败: %w", err)
}
defer rows.Close()
var list []WebShellConversationItem
for rows.Next() {
var item WebShellConversationItem
var updatedAt string
if err := rows.Scan(&item.ID, &item.Title, &updatedAt); err != nil {
continue
}
if t, e := time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt); e == nil {
item.UpdatedAt = t
} else if t, e := time.Parse("2006-01-02 15:04:05", updatedAt); e == nil {
item.UpdatedAt = t
} else {
item.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
list = append(list, item)
}
return list, rows.Err()
}
// GetConversation 获取对话
func (db *DB) GetConversation(id string) (*Conversation, error) {
var conv Conversation
var createdAt, updatedAt string
var pinned int
err := db.QueryRow(
"SELECT id, title, pinned, created_at, updated_at FROM conversations WHERE id = ?",
id,
).Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("对话不存在")
}
return nil, fmt.Errorf("查询对话失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
conv.Pinned = pinned != 0
// 加载消息
messages, err := db.GetMessages(id)
if err != nil {
return nil, fmt.Errorf("加载消息失败: %w", err)
}
conv.Messages = messages
// 加载过程详情(按消息ID分组)
processDetailsMap, err := db.GetProcessDetailsByConversation(id)
if err != nil {
db.logger.Warn("加载过程详情失败", zap.Error(err))
processDetailsMap = make(map[string][]ProcessDetail)
}
// 将过程详情附加到对应的消息上
for i := range conv.Messages {
if details, ok := processDetailsMap[conv.Messages[i].ID]; ok {
// 将ProcessDetail转换为JSON格式,以便前端使用
detailsJSON := make([]map[string]interface{}, len(details))
for j, detail := range details {
var data interface{}
if detail.Data != "" {
if err := json.Unmarshal([]byte(detail.Data), &data); err != nil {
db.logger.Warn("解析过程详情数据失败", zap.Error(err))
}
}
detailsJSON[j] = map[string]interface{}{
"id": detail.ID,
"messageId": detail.MessageID,
"conversationId": detail.ConversationID,
"eventType": detail.EventType,
"message": detail.Message,
"data": data,
"createdAt": detail.CreatedAt,
}
}
conv.Messages[i].ProcessDetails = detailsJSON
}
}
return &conv, nil
}
// GetConversationLite 获取对话(轻量版):包含 messages,但不加载 process_details。
// 用于历史会话快速切换,避免一次性把大体量过程详情灌到前端导致卡顿。
func (db *DB) GetConversationLite(id string) (*Conversation, error) {
var conv Conversation
var createdAt, updatedAt string
var pinned int
err := db.QueryRow(
"SELECT id, title, pinned, created_at, updated_at FROM conversations WHERE id = ?",
id,
).Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("对话不存在")
}
return nil, fmt.Errorf("查询对话失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
conv.Pinned = pinned != 0
// 加载消息(不加载 process_details
messages, err := db.GetMessages(id)
if err != nil {
return nil, fmt.Errorf("加载消息失败: %w", err)
}
conv.Messages = messages
return &conv, nil
}
// ListConversations 列出所有对话
func (db *DB) ListConversations(limit, offset int, search string) ([]*Conversation, error) {
var rows *sql.Rows
var err error
if search != "" {
// 使用 EXISTS 子查询代替 LEFT JOIN + DISTINCT,避免大表笛卡尔积
searchPattern := "%" + search + "%"
rows, err = db.Query(
`SELECT c.id, c.title, COALESCE(c.pinned, 0), c.created_at, c.updated_at
FROM conversations c
WHERE c.title LIKE ?
OR EXISTS (SELECT 1 FROM messages m WHERE m.conversation_id = c.id AND m.content LIKE ?)
ORDER BY c.updated_at DESC
LIMIT ? OFFSET ?`,
searchPattern, searchPattern, limit, offset,
)
} else {
rows, err = db.Query(
"SELECT id, title, COALESCE(pinned, 0), created_at, updated_at FROM conversations ORDER BY updated_at DESC LIMIT ? OFFSET ?",
limit, offset,
)
}
if err != nil {
return nil, fmt.Errorf("查询对话列表失败: %w", err)
}
defer rows.Close()
var conversations []*Conversation
for rows.Next() {
var conv Conversation
var createdAt, updatedAt string
var pinned int
if err := rows.Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("扫描对话失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
conv.Pinned = pinned != 0
conversations = append(conversations, &conv)
}
return conversations, nil
}
// UpdateConversationTitle 更新对话标题
func (db *DB) UpdateConversationTitle(id, title string) error {
// 注意:不更新 updated_at,因为重命名操作不应该改变对话的更新时间
_, err := db.Exec(
"UPDATE conversations SET title = ? WHERE id = ?",
title, id,
)
if err != nil {
return fmt.Errorf("更新对话标题失败: %w", err)
}
return nil
}
// UpdateConversationTime 更新对话时间
func (db *DB) UpdateConversationTime(id string) error {
_, err := db.Exec(
"UPDATE conversations SET updated_at = ? WHERE id = ?",
time.Now(), id,
)
if err != nil {
return fmt.Errorf("更新对话时间失败: %w", err)
}
return nil
}
// DeleteConversation 删除对话及其所有相关数据
// 由于数据库外键约束设置了 ON DELETE CASCADE,删除对话时会自动删除:
// - messages(消息)
// - process_details(过程详情)
// - attack_chain_nodes(攻击链节点)
// - attack_chain_edges(攻击链边)
// - vulnerabilities(漏洞)
// - conversation_group_mappings(分组映射)
// 注意:knowledge_retrieval_logs 使用 ON DELETE SET NULL,记录会保留但 conversation_id 会被设为 NULL
func (db *DB) DeleteConversation(id string) error {
// 显式删除知识检索日志(虽然外键是SET NULL,但为了彻底清理,我们手动删除)
_, err := db.Exec("DELETE FROM knowledge_retrieval_logs WHERE conversation_id = ?", id)
if err != nil {
db.logger.Warn("删除知识检索日志失败", zap.String("conversationId", id), zap.Error(err))
// 不返回错误,继续删除对话
}
// 删除对话(外键CASCADE会自动删除其他相关数据)
_, err = db.Exec("DELETE FROM conversations WHERE id = ?", id)
if err != nil {
return fmt.Errorf("删除对话失败: %w", err)
}
// Best-effort cleanup for conversation-scoped filesystem artifacts
// (e.g., summarization transcript, reduction/checkpoint files under conversation_artifacts/<id>).
if base := strings.TrimSpace(db.conversationArtifactsDir); base != "" {
artDir := filepath.Join(base, id)
if rmErr := os.RemoveAll(artDir); rmErr != nil {
db.logger.Warn("删除会话 artifacts 目录失败", zap.String("conversationId", id), zap.String("dir", artDir), zap.Error(rmErr))
}
}
db.logger.Info("对话及其所有相关数据已删除", zap.String("conversationId", id))
return nil
}
// SaveAgentTrace 保存最后一轮代理消息轨迹与助手输出摘要。
// SQLite 列名仍为 last_react_input / last_react_output,与历史库表兼容;语义上为「全模式代理轨迹」,非仅 ReAct。
func (db *DB) SaveAgentTrace(conversationID, traceInputJSON, assistantOutput string) error {
_, err := db.Exec(
"UPDATE conversations SET last_react_input = ?, last_react_output = ?, updated_at = ? WHERE id = ?",
traceInputJSON, assistantOutput, time.Now(), conversationID,
)
if err != nil {
return fmt.Errorf("保存代理轨迹失败: %w", err)
}
return nil
}
// GetAgentTrace 读取 conversations 中保存的代理轨迹(列名 last_react_*)。
func (db *DB) GetAgentTrace(conversationID string) (traceInputJSON, assistantOutput string, err error) {
var input, output sql.NullString
err = db.QueryRow(
"SELECT last_react_input, last_react_output FROM conversations WHERE id = ?",
conversationID,
).Scan(&input, &output)
if err != nil {
if err == sql.ErrNoRows {
return "", "", fmt.Errorf("对话不存在")
}
return "", "", fmt.Errorf("获取代理轨迹失败: %w", err)
}
if input.Valid {
traceInputJSON = input.String
}
if output.Valid {
assistantOutput = output.String
}
return traceInputJSON, assistantOutput, nil
}
// ConversationHasToolProcessDetails 对话是否存在已落库的工具调用/结果(用于多代理等场景下 MCP execution id 未汇总时的攻击链判定)。
func (db *DB) ConversationHasToolProcessDetails(conversationID string) (bool, error) {
var n int
err := db.QueryRow(
`SELECT COUNT(*) FROM process_details WHERE conversation_id = ? AND event_type IN ('tool_call', 'tool_result')`,
conversationID,
).Scan(&n)
if err != nil {
return false, fmt.Errorf("查询过程详情失败: %w", err)
}
return n > 0, nil
}
// AddMessage 添加消息
func (db *DB) AddMessage(conversationID, role, content string, mcpExecutionIDs []string) (*Message, error) {
id := uuid.New().String()
now := time.Now()
var mcpIDsJSON string
if len(mcpExecutionIDs) > 0 {
jsonData, err := json.Marshal(mcpExecutionIDs)
if err != nil {
db.logger.Warn("序列化MCP执行ID失败", zap.Error(err))
} else {
mcpIDsJSON = string(jsonData)
}
}
_, err := db.Exec(
"INSERT INTO messages (id, conversation_id, role, content, reasoning_content, mcp_execution_ids, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
id, conversationID, role, content, "", mcpIDsJSON, now, now,
)
if err != nil {
return nil, fmt.Errorf("添加消息失败: %w", err)
}
// 更新对话时间
if err := db.UpdateConversationTime(conversationID); err != nil {
db.logger.Warn("更新对话时间失败", zap.Error(err))
}
message := &Message{
ID: id,
ConversationID: conversationID,
Role: role,
Content: content,
MCPExecutionIDs: mcpExecutionIDs,
CreatedAt: now,
UpdatedAt: now,
}
return message, nil
}
// UpdateAssistantMessageFinalize 更新助手消息终态(正文、MCP id、思考链聚合文本,供无轨迹回退时回放)。
func (db *DB) UpdateAssistantMessageFinalize(messageID, content string, mcpExecutionIDs []string, reasoningContent string) error {
var mcpIDsJSON string
if len(mcpExecutionIDs) > 0 {
jsonData, err := json.Marshal(mcpExecutionIDs)
if err != nil {
return fmt.Errorf("序列化MCP执行ID失败: %w", err)
}
mcpIDsJSON = string(jsonData)
}
_, err := db.Exec(
"UPDATE messages SET content = ?, mcp_execution_ids = ?, reasoning_content = ?, updated_at = ? WHERE id = ?",
content, mcpIDsJSON, strings.TrimSpace(reasoningContent), time.Now(), messageID,
)
if err != nil {
return fmt.Errorf("更新助手消息失败: %w", err)
}
return nil
}
// GetMessages 获取对话的所有消息
func (db *DB) GetMessages(conversationID string) ([]Message, error) {
rows, err := db.Query(
"SELECT id, conversation_id, role, content, reasoning_content, mcp_execution_ids, created_at, updated_at FROM messages WHERE conversation_id = ? ORDER BY created_at ASC",
conversationID,
)
if err != nil {
return nil, fmt.Errorf("查询消息失败: %w", err)
}
defer rows.Close()
var messages []Message
for rows.Next() {
var msg Message
var reasoning sql.NullString
var mcpIDsJSON sql.NullString
var createdAt string
var updatedAt sql.NullString
if err := rows.Scan(&msg.ID, &msg.ConversationID, &msg.Role, &msg.Content, &reasoning, &mcpIDsJSON, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("扫描消息失败: %w", err)
}
if reasoning.Valid {
msg.ReasoningContent = reasoning.String
}
// 尝试多种时间格式解析
var err error
msg.CreatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err != nil {
msg.CreatedAt, err = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err != nil {
msg.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
// updated_at 兼容老库:字段不存在/为空时回退为 created_at
if updatedAt.Valid && strings.TrimSpace(updatedAt.String) != "" {
msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt.String)
if err != nil {
msg.UpdatedAt, err = time.Parse("2006-01-02 15:04:05", updatedAt.String)
}
if err != nil {
msg.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt.String)
}
}
if msg.UpdatedAt.IsZero() {
msg.UpdatedAt = msg.CreatedAt
}
// 解析MCP执行ID
if mcpIDsJSON.Valid && mcpIDsJSON.String != "" {
if err := json.Unmarshal([]byte(mcpIDsJSON.String), &msg.MCPExecutionIDs); err != nil {
db.logger.Warn("解析MCP执行ID失败", zap.Error(err))
}
}
messages = append(messages, msg)
}
return messages, nil
}
// turnSliceRange 根据任意一条消息 ID 定位「一轮对话」在 msgs 中的 [start, end) 下标区间(msgs 须已按时间升序,与 GetMessages 一致)。
// 一轮 = 从某条 user 消息起,至下一条 user 之前(含中间所有 assistant)。
func turnSliceRange(msgs []Message, anchorID string) (start, end int, err error) {
idx := -1
for i := range msgs {
if msgs[i].ID == anchorID {
idx = i
break
}
}
if idx < 0 {
return 0, 0, fmt.Errorf("message not found")
}
start = idx
for start > 0 && msgs[start].Role != "user" {
start--
}
if start < len(msgs) && msgs[start].Role != "user" {
start = 0
}
end = len(msgs)
for i := start + 1; i < len(msgs); i++ {
if msgs[i].Role == "user" {
end = i
break
}
}
return start, end, nil
}
// DeleteConversationTurn 删除锚点所在轮次的全部消息(用户提问 + 该轮助手回复等),并清空 last_react_*,避免与消息表不一致。
func (db *DB) DeleteConversationTurn(conversationID, anchorMessageID string) (deletedIDs []string, err error) {
msgs, err := db.GetMessages(conversationID)
if err != nil {
return nil, err
}
start, end, err := turnSliceRange(msgs, anchorMessageID)
if err != nil {
return nil, err
}
if start >= end {
return nil, fmt.Errorf("empty turn range")
}
deletedIDs = make([]string, 0, end-start)
for i := start; i < end; i++ {
deletedIDs = append(deletedIDs, msgs[i].ID)
}
tx, err := db.Begin()
if err != nil {
return nil, fmt.Errorf("begin tx: %w", err)
}
defer func() { _ = tx.Rollback() }()
ph := strings.Repeat("?,", len(deletedIDs))
ph = ph[:len(ph)-1]
args := make([]interface{}, 0, 1+len(deletedIDs))
args = append(args, conversationID)
for _, id := range deletedIDs {
args = append(args, id)
}
res, err := tx.Exec(
"DELETE FROM messages WHERE conversation_id = ? AND id IN ("+ph+")",
args...,
)
if err != nil {
return nil, fmt.Errorf("delete messages: %w", err)
}
n, err := res.RowsAffected()
if err != nil {
return nil, err
}
if int(n) != len(deletedIDs) {
return nil, fmt.Errorf("deleted count mismatch")
}
_, err = tx.Exec(
`UPDATE conversations SET last_react_input = NULL, last_react_output = NULL, updated_at = ? WHERE id = ?`,
time.Now(), conversationID,
)
if err != nil {
return nil, fmt.Errorf("clear react data: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, fmt.Errorf("commit: %w", err)
}
db.logger.Info("conversation turn deleted",
zap.String("conversationId", conversationID),
zap.Strings("deletedMessageIds", deletedIDs),
zap.Int("count", len(deletedIDs)),
)
return deletedIDs, nil
}
// ProcessDetail 过程详情事件
type ProcessDetail struct {
ID string `json:"id"`
MessageID string `json:"messageId"`
ConversationID string `json:"conversationId"`
EventType string `json:"eventType"` // iteration, thinking, reasoning_chain, tool_calls_detected, tool_call, tool_result, progress, error
Message string `json:"message"`
Data string `json:"data"` // JSON格式的数据
CreatedAt time.Time `json:"createdAt"`
}
// AddProcessDetail 添加过程详情事件
func (db *DB) AddProcessDetail(messageID, conversationID, eventType, message string, data interface{}) error {
id := uuid.New().String()
var dataJSON string
if data != nil {
jsonData, err := json.Marshal(data)
if err != nil {
db.logger.Warn("序列化过程详情数据失败", zap.Error(err))
} else {
dataJSON = string(jsonData)
}
}
_, err := db.Exec(
"INSERT INTO process_details (id, message_id, conversation_id, event_type, message, data, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
id, messageID, conversationID, eventType, message, dataJSON, time.Now(),
)
if err != nil {
return fmt.Errorf("添加过程详情失败: %w", err)
}
return nil
}
// GetProcessDetails 获取消息的过程详情
func (db *DB) GetProcessDetails(messageID string) ([]ProcessDetail, error) {
rows, err := db.Query(
"SELECT id, message_id, conversation_id, event_type, message, data, created_at FROM process_details WHERE message_id = ? ORDER BY created_at ASC",
messageID,
)
if err != nil {
return nil, fmt.Errorf("查询过程详情失败: %w", err)
}
defer rows.Close()
var details []ProcessDetail
for rows.Next() {
var detail ProcessDetail
var createdAt string
if err := rows.Scan(&detail.ID, &detail.MessageID, &detail.ConversationID, &detail.EventType, &detail.Message, &detail.Data, &createdAt); err != nil {
return nil, fmt.Errorf("扫描过程详情失败: %w", err)
}
// 尝试多种时间格式解析
var err error
detail.CreatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err != nil {
detail.CreatedAt, err = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err != nil {
detail.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
details = append(details, detail)
}
return details, nil
}
// GetProcessDetailsByConversation 获取对话的所有过程详情(按消息分组)
func (db *DB) GetProcessDetailsByConversation(conversationID string) (map[string][]ProcessDetail, error) {
rows, err := db.Query(
"SELECT id, message_id, conversation_id, event_type, message, data, created_at FROM process_details WHERE conversation_id = ? ORDER BY created_at ASC",
conversationID,
)
if err != nil {
return nil, fmt.Errorf("查询过程详情失败: %w", err)
}
defer rows.Close()
detailsMap := make(map[string][]ProcessDetail)
for rows.Next() {
var detail ProcessDetail
var createdAt string
if err := rows.Scan(&detail.ID, &detail.MessageID, &detail.ConversationID, &detail.EventType, &detail.Message, &detail.Data, &createdAt); err != nil {
return nil, fmt.Errorf("扫描过程详情失败: %w", err)
}
// 尝试多种时间格式解析
var err error
detail.CreatedAt, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err != nil {
detail.CreatedAt, err = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err != nil {
detail.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
detailsMap[detail.MessageID] = append(detailsMap[detail.MessageID], detail)
}
return detailsMap, nil
}
-39
View File
@@ -1,39 +0,0 @@
package database
import (
"testing"
)
func TestTurnSliceRange(t *testing.T) {
mk := func(id, role string) Message {
return Message{ID: id, Role: role}
}
msgs := []Message{
mk("u1", "user"),
mk("a1", "assistant"),
mk("u2", "user"),
mk("a2", "assistant"),
}
cases := []struct {
anchor string
start int
end int
}{
{"u1", 0, 2},
{"a1", 0, 2},
{"u2", 2, 4},
{"a2", 2, 4},
}
for _, tc := range cases {
s, e, err := turnSliceRange(msgs, tc.anchor)
if err != nil {
t.Fatalf("anchor %s: %v", tc.anchor, err)
}
if s != tc.start || e != tc.end {
t.Fatalf("anchor %s: got [%d,%d) want [%d,%d)", tc.anchor, s, e, tc.start, tc.end)
}
}
if _, _, err := turnSliceRange(msgs, "nope"); err == nil {
t.Fatal("expected error for missing id")
}
}
-1108
View File
File diff suppressed because it is too large Load Diff
-449
View File
@@ -1,449 +0,0 @@
package database
import (
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
)
// ConversationGroup 对话分组
type ConversationGroup struct {
ID string `json:"id"`
Name string `json:"name"`
Icon string `json:"icon"`
Pinned bool `json:"pinned"`
CreatedAt time.Time `json:"createdAt"`
UpdatedAt time.Time `json:"updatedAt"`
}
// GroupExistsByName 检查分组名称是否已存在
func (db *DB) GroupExistsByName(name string, excludeID string) (bool, error) {
var count int
var err error
if excludeID != "" {
err = db.QueryRow(
"SELECT COUNT(*) FROM conversation_groups WHERE name = ? AND id != ?",
name, excludeID,
).Scan(&count)
} else {
err = db.QueryRow(
"SELECT COUNT(*) FROM conversation_groups WHERE name = ?",
name,
).Scan(&count)
}
if err != nil {
return false, fmt.Errorf("检查分组名称失败: %w", err)
}
return count > 0, nil
}
// CreateGroup 创建分组
func (db *DB) CreateGroup(name, icon string) (*ConversationGroup, error) {
// 检查名称是否已存在
exists, err := db.GroupExistsByName(name, "")
if err != nil {
return nil, err
}
if exists {
return nil, fmt.Errorf("分组名称已存在")
}
id := uuid.New().String()
now := time.Now()
if icon == "" {
icon = "📁"
}
_, err = db.Exec(
"INSERT INTO conversation_groups (id, name, icon, pinned, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
id, name, icon, 0, now, now,
)
if err != nil {
return nil, fmt.Errorf("创建分组失败: %w", err)
}
return &ConversationGroup{
ID: id,
Name: name,
Icon: icon,
Pinned: false,
CreatedAt: now,
UpdatedAt: now,
}, nil
}
// ListGroups 列出所有分组
func (db *DB) ListGroups() ([]*ConversationGroup, error) {
rows, err := db.Query(
"SELECT id, name, icon, COALESCE(pinned, 0), created_at, updated_at FROM conversation_groups ORDER BY COALESCE(pinned, 0) DESC, created_at ASC",
)
if err != nil {
return nil, fmt.Errorf("查询分组列表失败: %w", err)
}
defer rows.Close()
var groups []*ConversationGroup
for rows.Next() {
var group ConversationGroup
var createdAt, updatedAt string
var pinned int
if err := rows.Scan(&group.ID, &group.Name, &group.Icon, &pinned, &createdAt, &updatedAt); err != nil {
return nil, fmt.Errorf("扫描分组失败: %w", err)
}
group.Pinned = pinned != 0
// 尝试多种时间格式解析
var err1, err2 error
group.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
group.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
group.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
group.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
group.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
group.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
groups = append(groups, &group)
}
return groups, nil
}
// GetGroup 获取分组
func (db *DB) GetGroup(id string) (*ConversationGroup, error) {
var group ConversationGroup
var createdAt, updatedAt string
var pinned int
err := db.QueryRow(
"SELECT id, name, icon, COALESCE(pinned, 0), created_at, updated_at FROM conversation_groups WHERE id = ?",
id,
).Scan(&group.ID, &group.Name, &group.Icon, &pinned, &createdAt, &updatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("分组不存在")
}
return nil, fmt.Errorf("查询分组失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
group.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
group.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
group.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
group.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
group.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
group.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
group.Pinned = pinned != 0
return &group, nil
}
// UpdateGroup 更新分组
func (db *DB) UpdateGroup(id, name, icon string) error {
// 检查名称是否已存在(排除当前分组)
exists, err := db.GroupExistsByName(name, id)
if err != nil {
return err
}
if exists {
return fmt.Errorf("分组名称已存在")
}
_, err = db.Exec(
"UPDATE conversation_groups SET name = ?, icon = ?, updated_at = ? WHERE id = ?",
name, icon, time.Now(), id,
)
if err != nil {
return fmt.Errorf("更新分组失败: %w", err)
}
return nil
}
// DeleteGroup 删除分组
func (db *DB) DeleteGroup(id string) error {
_, err := db.Exec("DELETE FROM conversation_groups WHERE id = ?", id)
if err != nil {
return fmt.Errorf("删除分组失败: %w", err)
}
return nil
}
// AddConversationToGroup 将对话添加到分组
// 注意:一个对话只能属于一个分组,所以在添加新分组之前,会先删除该对话的所有旧分组关联
func (db *DB) AddConversationToGroup(conversationID, groupID string) error {
// 先删除该对话的所有旧分组关联,确保一个对话只属于一个分组
_, err := db.Exec(
"DELETE FROM conversation_group_mappings WHERE conversation_id = ?",
conversationID,
)
if err != nil {
return fmt.Errorf("删除对话旧分组关联失败: %w", err)
}
// 然后插入新的分组关联
id := uuid.New().String()
_, err = db.Exec(
"INSERT INTO conversation_group_mappings (id, conversation_id, group_id, created_at) VALUES (?, ?, ?, ?)",
id, conversationID, groupID, time.Now(),
)
if err != nil {
return fmt.Errorf("添加对话到分组失败: %w", err)
}
return nil
}
// RemoveConversationFromGroup 从分组中移除对话
func (db *DB) RemoveConversationFromGroup(conversationID, groupID string) error {
_, err := db.Exec(
"DELETE FROM conversation_group_mappings WHERE conversation_id = ? AND group_id = ?",
conversationID, groupID,
)
if err != nil {
return fmt.Errorf("从分组中移除对话失败: %w", err)
}
return nil
}
// GetConversationsByGroup 获取分组中的所有对话
func (db *DB) GetConversationsByGroup(groupID string) ([]*Conversation, error) {
rows, err := db.Query(
`SELECT c.id, c.title, COALESCE(c.pinned, 0), c.created_at, c.updated_at, COALESCE(cgm.pinned, 0) as group_pinned
FROM conversations c
INNER JOIN conversation_group_mappings cgm ON c.id = cgm.conversation_id
WHERE cgm.group_id = ?
ORDER BY COALESCE(cgm.pinned, 0) DESC, c.updated_at DESC`,
groupID,
)
if err != nil {
return nil, fmt.Errorf("查询分组对话失败: %w", err)
}
defer rows.Close()
var conversations []*Conversation
for rows.Next() {
var conv Conversation
var createdAt, updatedAt string
var pinned int
var groupPinned int
if err := rows.Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt, &groupPinned); err != nil {
return nil, fmt.Errorf("扫描对话失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
conv.Pinned = pinned != 0
conversations = append(conversations, &conv)
}
return conversations, nil
}
// SearchConversationsByGroup 搜索分组中的对话(按标题和消息内容模糊匹配)
func (db *DB) SearchConversationsByGroup(groupID string, searchQuery string) ([]*Conversation, error) {
// 构建SQL查询,支持按标题和消息内容搜索
// 使用 DISTINCT 避免因为一个对话有多条匹配消息而重复
query := `SELECT DISTINCT c.id, c.title, COALESCE(c.pinned, 0), c.created_at, c.updated_at, COALESCE(cgm.pinned, 0) as group_pinned
FROM conversations c
INNER JOIN conversation_group_mappings cgm ON c.id = cgm.conversation_id
WHERE cgm.group_id = ?`
args := []interface{}{groupID}
// 如果有搜索关键词,添加标题和消息内容搜索条件
if searchQuery != "" {
searchPattern := "%" + searchQuery + "%"
// 搜索标题或消息内容
// 使用 LEFT JOIN 连接消息表,这样即使没有消息的对话也能被搜索到(通过标题)
query += ` AND (
LOWER(c.title) LIKE LOWER(?)
OR EXISTS (
SELECT 1 FROM messages m
WHERE m.conversation_id = c.id
AND LOWER(m.content) LIKE LOWER(?)
)
)`
args = append(args, searchPattern, searchPattern)
}
query += " ORDER BY COALESCE(cgm.pinned, 0) DESC, c.updated_at DESC"
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("搜索分组对话失败: %w", err)
}
defer rows.Close()
var conversations []*Conversation
for rows.Next() {
var conv Conversation
var createdAt, updatedAt string
var pinned int
var groupPinned int
if err := rows.Scan(&conv.ID, &conv.Title, &pinned, &createdAt, &updatedAt, &groupPinned); err != nil {
return nil, fmt.Errorf("扫描对话失败: %w", err)
}
// 尝试多种时间格式解析
var err1, err2 error
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05.999999999-07:00", createdAt)
if err1 != nil {
conv.CreatedAt, err1 = time.Parse("2006-01-02 15:04:05", createdAt)
}
if err1 != nil {
conv.CreatedAt, _ = time.Parse(time.RFC3339, createdAt)
}
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt)
if err2 != nil {
conv.UpdatedAt, err2 = time.Parse("2006-01-02 15:04:05", updatedAt)
}
if err2 != nil {
conv.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
conv.Pinned = pinned != 0
conversations = append(conversations, &conv)
}
return conversations, nil
}
// GetGroupByConversation 获取对话所属的分组
func (db *DB) GetGroupByConversation(conversationID string) (string, error) {
var groupID string
err := db.QueryRow(
"SELECT group_id FROM conversation_group_mappings WHERE conversation_id = ? LIMIT 1",
conversationID,
).Scan(&groupID)
if err != nil {
if err == sql.ErrNoRows {
return "", nil // 没有分组
}
return "", fmt.Errorf("查询对话分组失败: %w", err)
}
return groupID, nil
}
// UpdateConversationPinned 更新对话置顶状态
func (db *DB) UpdateConversationPinned(id string, pinned bool) error {
pinnedValue := 0
if pinned {
pinnedValue = 1
}
// 注意:不更新 updated_at,因为置顶操作不应该改变对话的更新时间
_, err := db.Exec(
"UPDATE conversations SET pinned = ? WHERE id = ?",
pinnedValue, id,
)
if err != nil {
return fmt.Errorf("更新对话置顶状态失败: %w", err)
}
return nil
}
// UpdateGroupPinned 更新分组置顶状态
func (db *DB) UpdateGroupPinned(id string, pinned bool) error {
pinnedValue := 0
if pinned {
pinnedValue = 1
}
_, err := db.Exec(
"UPDATE conversation_groups SET pinned = ?, updated_at = ? WHERE id = ?",
pinnedValue, time.Now(), id,
)
if err != nil {
return fmt.Errorf("更新分组置顶状态失败: %w", err)
}
return nil
}
// GroupMapping 分组映射关系
type GroupMapping struct {
ConversationID string `json:"conversationId"`
GroupID string `json:"groupId"`
}
// GetAllGroupMappings 批量获取所有分组映射(消除 N+1 查询)
func (db *DB) GetAllGroupMappings() ([]GroupMapping, error) {
rows, err := db.Query("SELECT conversation_id, group_id FROM conversation_group_mappings")
if err != nil {
return nil, fmt.Errorf("查询分组映射失败: %w", err)
}
defer rows.Close()
var mappings []GroupMapping
for rows.Next() {
var m GroupMapping
if err := rows.Scan(&m.ConversationID, &m.GroupID); err != nil {
return nil, fmt.Errorf("扫描分组映射失败: %w", err)
}
mappings = append(mappings, m)
}
if mappings == nil {
mappings = []GroupMapping{}
}
return mappings, nil
}
// UpdateConversationPinnedInGroup 更新对话在分组中的置顶状态
func (db *DB) UpdateConversationPinnedInGroup(conversationID, groupID string, pinned bool) error {
pinnedValue := 0
if pinned {
pinnedValue = 1
}
_, err := db.Exec(
"UPDATE conversation_group_mappings SET pinned = ? WHERE conversation_id = ? AND group_id = ?",
pinnedValue, conversationID, groupID,
)
if err != nil {
return fmt.Errorf("更新分组对话置顶状态失败: %w", err)
}
return nil
}
-537
View File
@@ -1,537 +0,0 @@
package database
import (
"database/sql"
"encoding/json"
"strings"
"time"
"cyberstrike-ai/internal/mcp"
"go.uber.org/zap"
)
// SaveToolExecution 保存工具执行记录
func (db *DB) SaveToolExecution(exec *mcp.ToolExecution) error {
argsJSON, err := json.Marshal(exec.Arguments)
if err != nil {
db.logger.Warn("序列化执行参数失败", zap.Error(err))
argsJSON = []byte("{}")
}
var resultJSON sql.NullString
if exec.Result != nil {
resultBytes, err := json.Marshal(exec.Result)
if err != nil {
db.logger.Warn("序列化执行结果失败", zap.Error(err))
} else {
resultJSON = sql.NullString{String: string(resultBytes), Valid: true}
}
}
var errorText sql.NullString
if exec.Error != "" {
errorText = sql.NullString{String: exec.Error, Valid: true}
}
var endTime sql.NullTime
if exec.EndTime != nil {
endTime = sql.NullTime{Time: *exec.EndTime, Valid: true}
}
var durationMs sql.NullInt64
if exec.Duration > 0 {
durationMs = sql.NullInt64{Int64: exec.Duration.Milliseconds(), Valid: true}
}
query := `
INSERT OR REPLACE INTO tool_executions
(id, tool_name, arguments, status, result, error, start_time, end_time, duration_ms, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err = db.Exec(query,
exec.ID,
exec.ToolName,
string(argsJSON),
exec.Status,
resultJSON,
errorText,
exec.StartTime,
endTime,
durationMs,
time.Now(),
)
if err != nil {
db.logger.Error("保存工具执行记录失败", zap.Error(err), zap.String("executionId", exec.ID))
return err
}
return nil
}
// CountToolExecutions 统计工具执行记录总数
func (db *DB) CountToolExecutions(status, toolName string) (int, error) {
query := `SELECT COUNT(*) FROM tool_executions`
args := []interface{}{}
conditions := []string{}
if status != "" {
conditions = append(conditions, "status = ?")
args = append(args, status)
}
if toolName != "" {
// 支持部分匹配(模糊搜索),不区分大小写
conditions = append(conditions, "LOWER(tool_name) LIKE ?")
args = append(args, "%"+strings.ToLower(toolName)+"%")
}
if len(conditions) > 0 {
query += ` WHERE ` + conditions[0]
for i := 1; i < len(conditions); i++ {
query += ` AND ` + conditions[i]
}
}
var count int
err := db.QueryRow(query, args...).Scan(&count)
if err != nil {
return 0, err
}
return count, nil
}
// LoadToolExecutions 加载所有工具执行记录(支持分页)
func (db *DB) LoadToolExecutions() ([]*mcp.ToolExecution, error) {
return db.LoadToolExecutionsWithPagination(0, 1000, "", "")
}
// LoadToolExecutionsWithPagination 分页加载工具执行记录
// limit: 最大返回记录数,0 表示使用默认值 1000
// offset: 跳过的记录数,用于分页
// status: 状态筛选,空字符串表示不过滤
// toolName: 工具名称筛选,空字符串表示不过滤
func (db *DB) LoadToolExecutionsWithPagination(offset, limit int, status, toolName string) ([]*mcp.ToolExecution, error) {
if limit <= 0 {
limit = 1000 // 默认限制
}
if limit > 10000 {
limit = 10000 // 最大限制,防止一次性加载过多数据
}
query := `
SELECT id, tool_name, arguments, status, result, error, start_time, end_time, duration_ms
FROM tool_executions
`
args := []interface{}{}
conditions := []string{}
if status != "" {
conditions = append(conditions, "status = ?")
args = append(args, status)
}
if toolName != "" {
// 支持部分匹配(模糊搜索),不区分大小写
conditions = append(conditions, "LOWER(tool_name) LIKE ?")
args = append(args, "%"+strings.ToLower(toolName)+"%")
}
if len(conditions) > 0 {
query += ` WHERE ` + conditions[0]
for i := 1; i < len(conditions); i++ {
query += ` AND ` + conditions[i]
}
}
query += ` ORDER BY start_time DESC LIMIT ? OFFSET ?`
args = append(args, limit, offset)
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var executions []*mcp.ToolExecution
for rows.Next() {
var exec mcp.ToolExecution
var argsJSON string
var resultJSON sql.NullString
var errorText sql.NullString
var endTime sql.NullTime
var durationMs sql.NullInt64
err := rows.Scan(
&exec.ID,
&exec.ToolName,
&argsJSON,
&exec.Status,
&resultJSON,
&errorText,
&exec.StartTime,
&endTime,
&durationMs,
)
if err != nil {
db.logger.Warn("加载执行记录失败", zap.Error(err))
continue
}
// 解析参数
if err := json.Unmarshal([]byte(argsJSON), &exec.Arguments); err != nil {
db.logger.Warn("解析执行参数失败", zap.Error(err))
exec.Arguments = make(map[string]interface{})
}
// 解析结果
if resultJSON.Valid && resultJSON.String != "" {
var result mcp.ToolResult
if err := json.Unmarshal([]byte(resultJSON.String), &result); err != nil {
db.logger.Warn("解析执行结果失败", zap.Error(err))
} else {
exec.Result = &result
}
}
// 设置错误
if errorText.Valid {
exec.Error = errorText.String
}
// 设置结束时间
if endTime.Valid {
exec.EndTime = &endTime.Time
}
// 设置持续时间
if durationMs.Valid {
exec.Duration = time.Duration(durationMs.Int64) * time.Millisecond
}
executions = append(executions, &exec)
}
return executions, nil
}
// GetToolExecution 根据ID获取单条工具执行记录
func (db *DB) GetToolExecution(id string) (*mcp.ToolExecution, error) {
query := `
SELECT id, tool_name, arguments, status, result, error, start_time, end_time, duration_ms
FROM tool_executions
WHERE id = ?
`
row := db.QueryRow(query, id)
var exec mcp.ToolExecution
var argsJSON string
var resultJSON sql.NullString
var errorText sql.NullString
var endTime sql.NullTime
var durationMs sql.NullInt64
err := row.Scan(
&exec.ID,
&exec.ToolName,
&argsJSON,
&exec.Status,
&resultJSON,
&errorText,
&exec.StartTime,
&endTime,
&durationMs,
)
if err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(argsJSON), &exec.Arguments); err != nil {
db.logger.Warn("解析执行参数失败", zap.Error(err))
exec.Arguments = make(map[string]interface{})
}
if resultJSON.Valid && resultJSON.String != "" {
var result mcp.ToolResult
if err := json.Unmarshal([]byte(resultJSON.String), &result); err != nil {
db.logger.Warn("解析执行结果失败", zap.Error(err))
} else {
exec.Result = &result
}
}
if errorText.Valid {
exec.Error = errorText.String
}
if endTime.Valid {
exec.EndTime = &endTime.Time
}
if durationMs.Valid {
exec.Duration = time.Duration(durationMs.Int64) * time.Millisecond
}
return &exec, nil
}
// DeleteToolExecution 删除工具执行记录
func (db *DB) DeleteToolExecution(id string) error {
query := `DELETE FROM tool_executions WHERE id = ?`
_, err := db.Exec(query, id)
if err != nil {
db.logger.Error("删除工具执行记录失败", zap.Error(err), zap.String("executionId", id))
return err
}
return nil
}
// DeleteToolExecutions 批量删除工具执行记录
func (db *DB) DeleteToolExecutions(ids []string) error {
if len(ids) == 0 {
return nil
}
// 构建 IN 查询的占位符
placeholders := make([]string, len(ids))
args := make([]interface{}, len(ids))
for i, id := range ids {
placeholders[i] = "?"
args[i] = id
}
query := `DELETE FROM tool_executions WHERE id IN (` + strings.Join(placeholders, ",") + `)`
_, err := db.Exec(query, args...)
if err != nil {
db.logger.Error("批量删除工具执行记录失败", zap.Error(err), zap.Int("count", len(ids)))
return err
}
return nil
}
// GetToolExecutionsByIds 根据ID列表获取工具执行记录(用于批量删除前获取统计信息)
func (db *DB) GetToolExecutionsByIds(ids []string) ([]*mcp.ToolExecution, error) {
if len(ids) == 0 {
return []*mcp.ToolExecution{}, nil
}
// 构建 IN 查询的占位符
placeholders := make([]string, len(ids))
args := make([]interface{}, len(ids))
for i, id := range ids {
placeholders[i] = "?"
args[i] = id
}
query := `
SELECT id, tool_name, arguments, status, result, error, start_time, end_time, duration_ms
FROM tool_executions
WHERE id IN (` + strings.Join(placeholders, ",") + `)
`
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var executions []*mcp.ToolExecution
for rows.Next() {
var exec mcp.ToolExecution
var argsJSON string
var resultJSON sql.NullString
var errorText sql.NullString
var endTime sql.NullTime
var durationMs sql.NullInt64
err := rows.Scan(
&exec.ID,
&exec.ToolName,
&argsJSON,
&exec.Status,
&resultJSON,
&errorText,
&exec.StartTime,
&endTime,
&durationMs,
)
if err != nil {
db.logger.Warn("加载执行记录失败", zap.Error(err))
continue
}
// 解析参数
if err := json.Unmarshal([]byte(argsJSON), &exec.Arguments); err != nil {
db.logger.Warn("解析执行参数失败", zap.Error(err))
exec.Arguments = make(map[string]interface{})
}
// 解析结果
if resultJSON.Valid && resultJSON.String != "" {
var result mcp.ToolResult
if err := json.Unmarshal([]byte(resultJSON.String), &result); err != nil {
db.logger.Warn("解析执行结果失败", zap.Error(err))
} else {
exec.Result = &result
}
}
// 设置错误
if errorText.Valid {
exec.Error = errorText.String
}
// 设置结束时间
if endTime.Valid {
exec.EndTime = &endTime.Time
}
// 设置持续时间
if durationMs.Valid {
exec.Duration = time.Duration(durationMs.Int64) * time.Millisecond
}
executions = append(executions, &exec)
}
return executions, nil
}
// SaveToolStats 保存工具统计信息
func (db *DB) SaveToolStats(toolName string, stats *mcp.ToolStats) error {
var lastCallTime sql.NullTime
if stats.LastCallTime != nil {
lastCallTime = sql.NullTime{Time: *stats.LastCallTime, Valid: true}
}
query := `
INSERT OR REPLACE INTO tool_stats
(tool_name, total_calls, success_calls, failed_calls, last_call_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(query,
toolName,
stats.TotalCalls,
stats.SuccessCalls,
stats.FailedCalls,
lastCallTime,
time.Now(),
)
if err != nil {
db.logger.Error("保存工具统计信息失败", zap.Error(err), zap.String("toolName", toolName))
return err
}
return nil
}
// LoadToolStats 加载所有工具统计信息
func (db *DB) LoadToolStats() (map[string]*mcp.ToolStats, error) {
query := `
SELECT tool_name, total_calls, success_calls, failed_calls, last_call_time
FROM tool_stats
`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
stats := make(map[string]*mcp.ToolStats)
for rows.Next() {
var stat mcp.ToolStats
var lastCallTime sql.NullTime
err := rows.Scan(
&stat.ToolName,
&stat.TotalCalls,
&stat.SuccessCalls,
&stat.FailedCalls,
&lastCallTime,
)
if err != nil {
db.logger.Warn("加载统计信息失败", zap.Error(err))
continue
}
if lastCallTime.Valid {
stat.LastCallTime = &lastCallTime.Time
}
stats[stat.ToolName] = &stat
}
return stats, nil
}
// UpdateToolStats 更新工具统计信息(累加模式)
func (db *DB) UpdateToolStats(toolName string, totalCalls, successCalls, failedCalls int, lastCallTime *time.Time) error {
var lastCallTimeSQL sql.NullTime
if lastCallTime != nil {
lastCallTimeSQL = sql.NullTime{Time: *lastCallTime, Valid: true}
}
query := `
INSERT INTO tool_stats (tool_name, total_calls, success_calls, failed_calls, last_call_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(tool_name) DO UPDATE SET
total_calls = total_calls + ?,
success_calls = success_calls + ?,
failed_calls = failed_calls + ?,
last_call_time = COALESCE(?, last_call_time),
updated_at = ?
`
_, err := db.Exec(query,
toolName, totalCalls, successCalls, failedCalls, lastCallTimeSQL, time.Now(),
totalCalls, successCalls, failedCalls, lastCallTimeSQL, time.Now(),
)
if err != nil {
db.logger.Error("更新工具统计信息失败", zap.Error(err), zap.String("toolName", toolName))
return err
}
return nil
}
// DecreaseToolStats 减少工具统计信息(用于删除执行记录时)
// 如果统计信息变为0,则删除该统计记录
func (db *DB) DecreaseToolStats(toolName string, totalCalls, successCalls, failedCalls int) error {
// 先更新统计信息
query := `
UPDATE tool_stats SET
total_calls = CASE WHEN total_calls - ? < 0 THEN 0 ELSE total_calls - ? END,
success_calls = CASE WHEN success_calls - ? < 0 THEN 0 ELSE success_calls - ? END,
failed_calls = CASE WHEN failed_calls - ? < 0 THEN 0 ELSE failed_calls - ? END,
updated_at = ?
WHERE tool_name = ?
`
_, err := db.Exec(query, totalCalls, totalCalls, successCalls, successCalls, failedCalls, failedCalls, time.Now(), toolName)
if err != nil {
db.logger.Error("减少工具统计信息失败", zap.Error(err), zap.String("toolName", toolName))
return err
}
// 检查更新后的 total_calls 是否为 0,如果是则删除该统计记录
checkQuery := `SELECT total_calls FROM tool_stats WHERE tool_name = ?`
var newTotalCalls int
err = db.QueryRow(checkQuery, toolName).Scan(&newTotalCalls)
if err != nil {
// 如果查询失败(记录不存在),直接返回
return nil
}
// 如果 total_calls 为 0,删除该统计记录
if newTotalCalls == 0 {
deleteQuery := `DELETE FROM tool_stats WHERE tool_name = ?`
_, err = db.Exec(deleteQuery, toolName)
if err != nil {
db.logger.Warn("删除零统计记录失败", zap.Error(err), zap.String("toolName", toolName))
// 不返回错误,因为主要操作(更新统计)已成功
} else {
db.logger.Info("已删除零统计记录", zap.String("toolName", toolName))
}
}
return nil
}
-84
View File
@@ -1,84 +0,0 @@
package database
import (
"database/sql"
"fmt"
"strings"
"time"
)
// RobotSessionBinding 机器人会话绑定信息。
type RobotSessionBinding struct {
SessionKey string
ConversationID string
RoleName string
UpdatedAt time.Time
}
// GetRobotSessionBinding 按 session_key 获取机器人会话绑定。
func (db *DB) GetRobotSessionBinding(sessionKey string) (*RobotSessionBinding, error) {
sessionKey = strings.TrimSpace(sessionKey)
if sessionKey == "" {
return nil, nil
}
var b RobotSessionBinding
var updatedAt string
err := db.QueryRow(
"SELECT session_key, conversation_id, role_name, updated_at FROM robot_user_sessions WHERE session_key = ?",
sessionKey,
).Scan(&b.SessionKey, &b.ConversationID, &b.RoleName, &updatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil
}
return nil, fmt.Errorf("查询机器人会话绑定失败: %w", err)
}
if t, e := time.Parse("2006-01-02 15:04:05.999999999-07:00", updatedAt); e == nil {
b.UpdatedAt = t
} else if t, e := time.Parse("2006-01-02 15:04:05", updatedAt); e == nil {
b.UpdatedAt = t
} else {
b.UpdatedAt, _ = time.Parse(time.RFC3339, updatedAt)
}
if strings.TrimSpace(b.RoleName) == "" {
b.RoleName = "默认"
}
return &b, nil
}
// UpsertRobotSessionBinding 写入或更新机器人会话绑定(包含角色)。
func (db *DB) UpsertRobotSessionBinding(sessionKey, conversationID, roleName string) error {
sessionKey = strings.TrimSpace(sessionKey)
conversationID = strings.TrimSpace(conversationID)
roleName = strings.TrimSpace(roleName)
if sessionKey == "" || conversationID == "" {
return nil
}
if roleName == "" {
roleName = "默认"
}
_, err := db.Exec(`
INSERT INTO robot_user_sessions (session_key, conversation_id, role_name, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT(session_key) DO UPDATE SET
conversation_id = excluded.conversation_id,
role_name = excluded.role_name,
updated_at = excluded.updated_at
`, sessionKey, conversationID, roleName, time.Now())
if err != nil {
return fmt.Errorf("写入机器人会话绑定失败: %w", err)
}
return nil
}
// DeleteRobotSessionBinding 删除机器人会话绑定。
func (db *DB) DeleteRobotSessionBinding(sessionKey string) error {
sessionKey = strings.TrimSpace(sessionKey)
if sessionKey == "" {
return nil
}
if _, err := db.Exec("DELETE FROM robot_user_sessions WHERE session_key = ?", sessionKey); err != nil {
return fmt.Errorf("删除机器人会话绑定失败: %w", err)
}
return nil
}
-142
View File
@@ -1,142 +0,0 @@
package database
import (
"database/sql"
"time"
"go.uber.org/zap"
)
// SkillStats Skills统计信息
type SkillStats struct {
SkillName string
TotalCalls int
SuccessCalls int
FailedCalls int
LastCallTime *time.Time
}
// SaveSkillStats 保存Skills统计信息
func (db *DB) SaveSkillStats(skillName string, stats *SkillStats) error {
var lastCallTime sql.NullTime
if stats.LastCallTime != nil {
lastCallTime = sql.NullTime{Time: *stats.LastCallTime, Valid: true}
}
query := `
INSERT OR REPLACE INTO skill_stats
(skill_name, total_calls, success_calls, failed_calls, last_call_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(query,
skillName,
stats.TotalCalls,
stats.SuccessCalls,
stats.FailedCalls,
lastCallTime,
time.Now(),
)
if err != nil {
db.logger.Error("保存Skills统计信息失败", zap.Error(err), zap.String("skillName", skillName))
return err
}
return nil
}
// LoadSkillStats 加载所有Skills统计信息
func (db *DB) LoadSkillStats() (map[string]*SkillStats, error) {
query := `
SELECT skill_name, total_calls, success_calls, failed_calls, last_call_time
FROM skill_stats
`
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
stats := make(map[string]*SkillStats)
for rows.Next() {
var stat SkillStats
var lastCallTime sql.NullTime
err := rows.Scan(
&stat.SkillName,
&stat.TotalCalls,
&stat.SuccessCalls,
&stat.FailedCalls,
&lastCallTime,
)
if err != nil {
db.logger.Warn("加载Skills统计信息失败", zap.Error(err))
continue
}
if lastCallTime.Valid {
stat.LastCallTime = &lastCallTime.Time
}
stats[stat.SkillName] = &stat
}
return stats, nil
}
// UpdateSkillStats 更新Skills统计信息(累加模式)
func (db *DB) UpdateSkillStats(skillName string, totalCalls, successCalls, failedCalls int, lastCallTime *time.Time) error {
var lastCallTimeSQL sql.NullTime
if lastCallTime != nil {
lastCallTimeSQL = sql.NullTime{Time: *lastCallTime, Valid: true}
}
query := `
INSERT INTO skill_stats (skill_name, total_calls, success_calls, failed_calls, last_call_time, updated_at)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(skill_name) DO UPDATE SET
total_calls = total_calls + ?,
success_calls = success_calls + ?,
failed_calls = failed_calls + ?,
last_call_time = COALESCE(?, last_call_time),
updated_at = ?
`
_, err := db.Exec(query,
skillName, totalCalls, successCalls, failedCalls, lastCallTimeSQL, time.Now(),
totalCalls, successCalls, failedCalls, lastCallTimeSQL, time.Now(),
)
if err != nil {
db.logger.Error("更新Skills统计信息失败", zap.Error(err), zap.String("skillName", skillName))
return err
}
return nil
}
// ClearSkillStats 清空所有Skills统计信息
func (db *DB) ClearSkillStats() error {
query := `DELETE FROM skill_stats`
_, err := db.Exec(query)
if err != nil {
db.logger.Error("清空Skills统计信息失败", zap.Error(err))
return err
}
db.logger.Info("已清空所有Skills统计信息")
return nil
}
// ClearSkillStatsByName 清空指定skill的统计信息
func (db *DB) ClearSkillStatsByName(skillName string) error {
query := `DELETE FROM skill_stats WHERE skill_name = ?`
_, err := db.Exec(query, skillName)
if err != nil {
db.logger.Error("清空指定skill统计信息失败", zap.Error(err), zap.String("skillName", skillName))
return err
}
db.logger.Info("已清空指定skill统计信息", zap.String("skillName", skillName))
return nil
}
-369
View File
@@ -1,369 +0,0 @@
package database
import (
"database/sql"
"fmt"
"time"
"github.com/google/uuid"
"go.uber.org/zap"
)
// Vulnerability 漏洞
type Vulnerability struct {
ID string `json:"id"`
ConversationID string `json:"conversation_id"`
ConversationTag string `json:"conversation_tag,omitempty"`
TaskTag string `json:"task_tag,omitempty"`
TaskID string `json:"task_id,omitempty"`
TaskQueueID string `json:"task_queue_id,omitempty"`
Title string `json:"title"`
Description string `json:"description"`
Severity string `json:"severity"` // critical, high, medium, low, info
Status string `json:"status"` // open, confirmed, fixed, false_positive
Type string `json:"type"`
Target string `json:"target"`
Proof string `json:"proof"`
Impact string `json:"impact"`
Recommendation string `json:"recommendation"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CreateVulnerability 创建漏洞
func (db *DB) CreateVulnerability(vuln *Vulnerability) (*Vulnerability, error) {
if vuln.ID == "" {
vuln.ID = uuid.New().String()
}
if vuln.Status == "" {
vuln.Status = "open"
}
now := time.Now()
if vuln.CreatedAt.IsZero() {
vuln.CreatedAt = now
}
vuln.UpdatedAt = now
query := `
INSERT INTO vulnerabilities (
id, conversation_id, conversation_tag, task_tag, title, description, severity, status,
vulnerability_type, target, proof, impact, recommendation,
created_at, updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(
query,
vuln.ID, vuln.ConversationID, vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description,
vuln.Severity, vuln.Status, vuln.Type, vuln.Target,
vuln.Proof, vuln.Impact, vuln.Recommendation,
vuln.CreatedAt, vuln.UpdatedAt,
)
if err != nil {
return nil, fmt.Errorf("创建漏洞失败: %w", err)
}
return vuln, nil
}
// GetVulnerability 获取漏洞
func (db *DB) GetVulnerability(id string) (*Vulnerability, error) {
var vuln Vulnerability
query := `
SELECT id, conversation_id, title, description, severity, status,
conversation_tag, task_tag, vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
created_at, updated_at
FROM vulnerabilities
WHERE id = ?
`
err := db.QueryRow(query, id).Scan(
&vuln.ID, &vuln.ConversationID, &vuln.Title, &vuln.Description,
&vuln.Severity, &vuln.Status, &vuln.ConversationTag, &vuln.TaskTag, &vuln.Type, &vuln.Target,
&vuln.Proof, &vuln.Impact, &vuln.Recommendation,
&vuln.TaskID, &vuln.TaskQueueID,
&vuln.CreatedAt, &vuln.UpdatedAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("漏洞不存在")
}
return nil, fmt.Errorf("获取漏洞失败: %w", err)
}
return &vuln, nil
}
// ListVulnerabilities 列出漏洞
func (db *DB) ListVulnerabilities(limit, offset int, id, conversationID, severity, status, taskID, conversationTag, taskTag string) ([]*Vulnerability, error) {
query := `
SELECT id, conversation_id, title, description, severity, status, conversation_tag, task_tag,
vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
created_at, updated_at
FROM vulnerabilities
WHERE 1=1
`
args := []interface{}{}
if id != "" {
query += " AND id = ?"
args = append(args, id)
}
if conversationID != "" {
query += " AND conversation_id = ?"
args = append(args, conversationID)
}
if taskID != "" {
query += " AND EXISTS (SELECT 1 FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id AND (bt.id = ? OR bt.queue_id = ?))"
args = append(args, taskID, taskID)
}
if conversationTag != "" {
query += " AND conversation_tag = ?"
args = append(args, conversationTag)
}
if taskTag != "" {
query += " AND task_tag = ?"
args = append(args, taskTag)
}
if severity != "" {
query += " AND severity = ?"
args = append(args, severity)
}
if status != "" {
query += " AND status = ?"
args = append(args, status)
}
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("查询漏洞列表失败: %w", err)
}
defer rows.Close()
var vulnerabilities []*Vulnerability
for rows.Next() {
var vuln Vulnerability
err := rows.Scan(
&vuln.ID, &vuln.ConversationID, &vuln.Title, &vuln.Description,
&vuln.Severity, &vuln.Status, &vuln.ConversationTag, &vuln.TaskTag, &vuln.Type, &vuln.Target,
&vuln.Proof, &vuln.Impact, &vuln.Recommendation,
&vuln.TaskID, &vuln.TaskQueueID,
&vuln.CreatedAt, &vuln.UpdatedAt,
)
if err != nil {
db.logger.Warn("扫描漏洞记录失败", zap.Error(err))
continue
}
vulnerabilities = append(vulnerabilities, &vuln)
}
return vulnerabilities, nil
}
// CountVulnerabilities 统计漏洞总数(支持筛选条件)
func (db *DB) CountVulnerabilities(id, conversationID, severity, status, taskID, conversationTag, taskTag string) (int, error) {
query := "SELECT COUNT(*) FROM vulnerabilities WHERE 1=1"
args := []interface{}{}
if id != "" {
query += " AND id = ?"
args = append(args, id)
}
if conversationID != "" {
query += " AND conversation_id = ?"
args = append(args, conversationID)
}
if taskID != "" {
query += " AND EXISTS (SELECT 1 FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id AND (bt.id = ? OR bt.queue_id = ?))"
args = append(args, taskID, taskID)
}
if conversationTag != "" {
query += " AND conversation_tag = ?"
args = append(args, conversationTag)
}
if taskTag != "" {
query += " AND task_tag = ?"
args = append(args, taskTag)
}
if severity != "" {
query += " AND severity = ?"
args = append(args, severity)
}
if status != "" {
query += " AND status = ?"
args = append(args, status)
}
var count int
err := db.QueryRow(query, args...).Scan(&count)
if err != nil {
return 0, fmt.Errorf("统计漏洞总数失败: %w", err)
}
return count, nil
}
// UpdateVulnerability 更新漏洞
func (db *DB) UpdateVulnerability(id string, vuln *Vulnerability) error {
vuln.UpdatedAt = time.Now()
query := `
UPDATE vulnerabilities
SET conversation_tag = ?, task_tag = ?, title = ?, description = ?, severity = ?, status = ?,
vulnerability_type = ?, target = ?, proof = ?, impact = ?,
recommendation = ?, updated_at = ?
WHERE id = ?
`
_, err := db.Exec(
query,
vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description, vuln.Severity, vuln.Status,
vuln.Type, vuln.Target, vuln.Proof, vuln.Impact,
vuln.Recommendation, vuln.UpdatedAt, id,
)
if err != nil {
return fmt.Errorf("更新漏洞失败: %w", err)
}
return nil
}
// DeleteVulnerability 删除漏洞
func (db *DB) DeleteVulnerability(id string) error {
_, err := db.Exec("DELETE FROM vulnerabilities WHERE id = ?", id)
if err != nil {
return fmt.Errorf("删除漏洞失败: %w", err)
}
return nil
}
// GetVulnerabilityStats 获取漏洞统计(筛选条件与 ListVulnerabilities / CountVulnerabilities 一致)
func (db *DB) GetVulnerabilityStats(conversationID, taskID string) (map[string]interface{}, error) {
stats := make(map[string]interface{})
where := "WHERE 1=1"
args := []interface{}{}
if conversationID != "" {
where += " AND conversation_id = ?"
args = append(args, conversationID)
}
if taskID != "" {
where += " AND EXISTS (SELECT 1 FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id AND (bt.id = ? OR bt.queue_id = ?))"
args = append(args, taskID, taskID)
}
// 总漏洞数
var totalCount int
query := "SELECT COUNT(*) FROM vulnerabilities " + where
err := db.QueryRow(query, args...).Scan(&totalCount)
if err != nil {
return nil, fmt.Errorf("获取总漏洞数失败: %w", err)
}
stats["total"] = totalCount
// 按严重程度统计
severityQuery := "SELECT severity, COUNT(*) FROM vulnerabilities " + where + " GROUP BY severity"
rows, err := db.Query(severityQuery, args...)
if err != nil {
return nil, fmt.Errorf("获取严重程度统计失败: %w", err)
}
defer rows.Close()
severityStats := make(map[string]int)
for rows.Next() {
var severity string
var count int
if err := rows.Scan(&severity, &count); err != nil {
continue
}
severityStats[severity] = count
}
stats["by_severity"] = severityStats
// 按状态统计
statusQuery := "SELECT status, COUNT(*) FROM vulnerabilities " + where + " GROUP BY status"
rows, err = db.Query(statusQuery, args...)
if err != nil {
return nil, fmt.Errorf("获取状态统计失败: %w", err)
}
defer rows.Close()
statusStats := make(map[string]int)
for rows.Next() {
var status string
var count int
if err := rows.Scan(&status, &count); err != nil {
continue
}
statusStats[status] = count
}
stats["by_status"] = statusStats
return stats, nil
}
// GetVulnerabilityFilterOptions 获取漏洞筛选建议项
func (db *DB) GetVulnerabilityFilterOptions() (map[string][]string, error) {
collect := func(query string, args ...interface{}) ([]string, error) {
rows, err := db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
items := make([]string, 0)
for rows.Next() {
var val string
if err := rows.Scan(&val); err != nil {
continue
}
if val == "" {
continue
}
items = append(items, val)
}
return items, nil
}
vulnIDs, err := collect(`SELECT DISTINCT id FROM vulnerabilities ORDER BY created_at DESC LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询漏洞ID建议失败: %w", err)
}
conversationIDs, err := collect(`SELECT DISTINCT conversation_id FROM vulnerabilities WHERE conversation_id <> '' ORDER BY created_at DESC LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询会话ID建议失败: %w", err)
}
taskIDs, err := collect(`SELECT DISTINCT id FROM batch_tasks WHERE id <> '' ORDER BY rowid DESC LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询任务ID建议失败: %w", err)
}
queueIDs, err := collect(`SELECT DISTINCT queue_id FROM batch_tasks WHERE queue_id <> '' ORDER BY rowid DESC LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询队列ID建议失败: %w", err)
}
conversationTags, err := collect(`SELECT DISTINCT conversation_tag FROM vulnerabilities WHERE conversation_tag IS NOT NULL AND conversation_tag <> '' ORDER BY conversation_tag LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询对话标签建议失败: %w", err)
}
taskTags, err := collect(`SELECT DISTINCT task_tag FROM vulnerabilities WHERE task_tag IS NOT NULL AND task_tag <> '' ORDER BY task_tag LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询任务标签建议失败: %w", err)
}
return map[string][]string{
"vulnerability_ids": vulnIDs,
"conversation_ids": conversationIDs,
"task_ids": taskIDs,
"queue_ids": queueIDs,
"conversation_tags": conversationTags,
"task_tags": taskTags,
}, nil
}
-152
View File
@@ -1,152 +0,0 @@
package database
import (
"database/sql"
"time"
"go.uber.org/zap"
)
// WebShellConnection WebShell 连接配置
type WebShellConnection struct {
ID string `json:"id"`
URL string `json:"url"`
Password string `json:"password"`
Type string `json:"type"`
Method string `json:"method"`
CmdParam string `json:"cmdParam"`
Remark string `json:"remark"`
Encoding string `json:"encoding"` // 目标响应编码:auto / utf-8 / gbk / gb18030,空值视为 auto
OS string `json:"os"` // 目标操作系统:auto / linux / windows,空值/未知视为 auto
CreatedAt time.Time `json:"createdAt"`
}
// GetWebshellConnectionState 获取连接关联的持久化状态 JSON,不存在时返回 "{}"
func (db *DB) GetWebshellConnectionState(connectionID string) (string, error) {
var stateJSON string
err := db.QueryRow(`SELECT state_json FROM webshell_connection_states WHERE connection_id = ?`, connectionID).Scan(&stateJSON)
if err == sql.ErrNoRows {
return "{}", nil
}
if err != nil {
db.logger.Error("查询 WebShell 连接状态失败", zap.Error(err), zap.String("connectionID", connectionID))
return "", err
}
if stateJSON == "" {
stateJSON = "{}"
}
return stateJSON, nil
}
// UpsertWebshellConnectionState 保存连接关联的持久化状态 JSON
func (db *DB) UpsertWebshellConnectionState(connectionID, stateJSON string) error {
if stateJSON == "" {
stateJSON = "{}"
}
query := `
INSERT INTO webshell_connection_states (connection_id, state_json, updated_at)
VALUES (?, ?, ?)
ON CONFLICT(connection_id) DO UPDATE SET
state_json = excluded.state_json,
updated_at = excluded.updated_at
`
if _, err := db.Exec(query, connectionID, stateJSON, time.Now()); err != nil {
db.logger.Error("保存 WebShell 连接状态失败", zap.Error(err), zap.String("connectionID", connectionID))
return err
}
return nil
}
// ListWebshellConnections 列出所有 WebShell 连接,按创建时间倒序
func (db *DB) ListWebshellConnections() ([]WebShellConnection, error) {
query := `
SELECT id, url, password, type, method, cmd_param, remark,
COALESCE(encoding, '') AS encoding, COALESCE(os, '') AS os, created_at
FROM webshell_connections
ORDER BY created_at DESC
`
rows, err := db.Query(query)
if err != nil {
db.logger.Error("查询 WebShell 连接列表失败", zap.Error(err))
return nil, err
}
defer rows.Close()
var list []WebShellConnection
for rows.Next() {
var c WebShellConnection
err := rows.Scan(&c.ID, &c.URL, &c.Password, &c.Type, &c.Method, &c.CmdParam, &c.Remark, &c.Encoding, &c.OS, &c.CreatedAt)
if err != nil {
db.logger.Warn("扫描 WebShell 连接行失败", zap.Error(err))
continue
}
list = append(list, c)
}
return list, rows.Err()
}
// GetWebshellConnection 根据 ID 获取一条连接
func (db *DB) GetWebshellConnection(id string) (*WebShellConnection, error) {
query := `
SELECT id, url, password, type, method, cmd_param, remark,
COALESCE(encoding, '') AS encoding, COALESCE(os, '') AS os, created_at
FROM webshell_connections WHERE id = ?
`
var c WebShellConnection
err := db.QueryRow(query, id).Scan(&c.ID, &c.URL, &c.Password, &c.Type, &c.Method, &c.CmdParam, &c.Remark, &c.Encoding, &c.OS, &c.CreatedAt)
if err == sql.ErrNoRows {
return nil, nil
}
if err != nil {
db.logger.Error("查询 WebShell 连接失败", zap.Error(err), zap.String("id", id))
return nil, err
}
return &c, nil
}
// CreateWebshellConnection 创建 WebShell 连接
func (db *DB) CreateWebshellConnection(c *WebShellConnection) error {
query := `
INSERT INTO webshell_connections (id, url, password, type, method, cmd_param, remark, encoding, os, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`
_, err := db.Exec(query, c.ID, c.URL, c.Password, c.Type, c.Method, c.CmdParam, c.Remark, c.Encoding, c.OS, c.CreatedAt)
if err != nil {
db.logger.Error("创建 WebShell 连接失败", zap.Error(err), zap.String("id", c.ID))
return err
}
return nil
}
// UpdateWebshellConnection 更新 WebShell 连接
func (db *DB) UpdateWebshellConnection(c *WebShellConnection) error {
query := `
UPDATE webshell_connections
SET url = ?, password = ?, type = ?, method = ?, cmd_param = ?, remark = ?, encoding = ?, os = ?
WHERE id = ?
`
result, err := db.Exec(query, c.URL, c.Password, c.Type, c.Method, c.CmdParam, c.Remark, c.Encoding, c.OS, c.ID)
if err != nil {
db.logger.Error("更新 WebShell 连接失败", zap.Error(err), zap.String("id", c.ID))
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return sql.ErrNoRows
}
return nil
}
// DeleteWebshellConnection 删除 WebShell 连接
func (db *DB) DeleteWebshellConnection(id string) error {
result, err := db.Exec(`DELETE FROM webshell_connections WHERE id = ?`, id)
if err != nil {
db.logger.Error("删除 WebShell 连接失败", zap.Error(err), zap.String("id", id))
return err
}
affected, _ := result.RowsAffected()
if affected == 0 {
return sql.ErrNoRows
}
return nil
}