From ce8b57501dd6ac77de76ad4072f41e0ceee763de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Tue, 30 Jun 2026 20:14:28 +0800 Subject: [PATCH] Add files via upload --- internal/database/hitl_logs.go | 75 ++++++++++++++++++++ internal/database/hitl_logs_test.go | 106 ++++++++++++++++++++++++++++ 2 files changed, 181 insertions(+) create mode 100644 internal/database/hitl_logs.go create mode 100644 internal/database/hitl_logs_test.go diff --git a/internal/database/hitl_logs.go b/internal/database/hitl_logs.go new file mode 100644 index 00000000..6a5e10b6 --- /dev/null +++ b/internal/database/hitl_logs.go @@ -0,0 +1,75 @@ +package database + +import ( + "fmt" + "strings" + "time" + + "go.uber.org/zap" +) + +// DeleteHitlInterruptLogsByIDs deletes decided HITL audit logs by id (pending rows are skipped). +func (db *DB) DeleteHitlInterruptLogsByIDs(ids []string) (int64, error) { + if db == nil { + return 0, fmt.Errorf("database is nil") + } + clean := make([]string, 0, len(ids)) + for _, id := range ids { + id = strings.TrimSpace(id) + if id != "" { + clean = append(clean, id) + } + } + if len(clean) == 0 { + return 0, nil + } + placeholders := strings.TrimRight(strings.Repeat("?,", len(clean)), ",") + q := fmt.Sprintf(`DELETE FROM hitl_interrupts WHERE status != 'pending' AND id IN (%s)`, placeholders) + args := make([]interface{}, len(clean)) + for i, id := range clean { + args[i] = id + } + res, err := db.Exec(q, args...) + if err != nil { + db.logger.Error("批量删除人机协同审计日志失败", zap.Error(err), zap.Int("count", len(clean))) + return 0, fmt.Errorf("批量删除人机协同审计日志失败: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +// DeleteHitlInterruptLogsMatching deletes decided logs matching whereSQL (e.g. "WHERE 1=1 AND status != 'pending' ..."). +func (db *DB) DeleteHitlInterruptLogsMatching(whereSQL string, args []interface{}) (int64, error) { + if db == nil { + return 0, fmt.Errorf("database is nil") + } + whereSQL = strings.TrimSpace(whereSQL) + if whereSQL == "" { + return 0, fmt.Errorf("where clause is required") + } + q := `DELETE FROM hitl_interrupts ` + whereSQL + res, err := db.Exec(q, args...) + if err != nil { + db.logger.Error("清空人机协同审计日志失败", zap.Error(err)) + return 0, fmt.Errorf("清空人机协同审计日志失败: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} + +// PurgeHitlInterruptLogsBefore deletes decided logs with decided/created time before cutoff. +func (db *DB) PurgeHitlInterruptLogsBefore(cutoff time.Time) (int64, error) { + if db == nil { + return 0, fmt.Errorf("database is nil") + } + res, err := db.Exec( + `DELETE FROM hitl_interrupts WHERE status != 'pending' AND datetime(COALESCE(decided_at, created_at)) < datetime(?)`, + cutoff.UTC().Format(time.RFC3339), + ) + if err != nil { + db.logger.Error("清理过期人机协同审计日志失败", zap.Error(err)) + return 0, fmt.Errorf("清理过期人机协同审计日志失败: %w", err) + } + n, _ := res.RowsAffected() + return n, nil +} diff --git a/internal/database/hitl_logs_test.go b/internal/database/hitl_logs_test.go new file mode 100644 index 00000000..90958865 --- /dev/null +++ b/internal/database/hitl_logs_test.go @@ -0,0 +1,106 @@ +package database + +import ( + "path/filepath" + "testing" + "time" + + "go.uber.org/zap" +) + +func ensureHitlInterruptsTable(t *testing.T, db *DB) { + t.Helper() + if _, err := db.Exec(` +CREATE TABLE IF NOT EXISTS hitl_interrupts ( + id TEXT PRIMARY KEY, + conversation_id TEXT NOT NULL, + message_id TEXT, + mode TEXT NOT NULL, + tool_name TEXT NOT NULL, + tool_call_id TEXT, + payload TEXT, + status TEXT NOT NULL, + decision TEXT, + decision_comment TEXT, + created_at DATETIME NOT NULL, + decided_at DATETIME +);`); err != nil { + t.Fatalf("create hitl_interrupts: %v", err) + } +} + +func TestDeleteHitlInterruptLogsByIDs_skipsPending(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "hitl.db") + db, err := NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + ensureHitlInterruptsTable(t, db) + + now := time.Now().UTC().Format(time.RFC3339) + if _, err := db.Exec(`INSERT INTO hitl_interrupts + (id, conversation_id, mode, tool_name, status, created_at) + VALUES ('pending-1', 'c1', 'approval', 'exec', 'pending', ?)`, now); err != nil { + t.Fatalf("insert pending: %v", err) + } + if _, err := db.Exec(`INSERT INTO hitl_interrupts + (id, conversation_id, mode, tool_name, status, decision, created_at, decided_at) + VALUES ('done-1', 'c1', 'approval', 'exec', 'decided', 'approve', ?, ?)`, now, now); err != nil { + t.Fatalf("insert decided: %v", err) + } + + deleted, err := db.DeleteHitlInterruptLogsByIDs([]string{"pending-1", "done-1"}) + if err != nil { + t.Fatalf("DeleteHitlInterruptLogsByIDs: %v", err) + } + if deleted != 1 { + t.Fatalf("deleted = %d, want 1", deleted) + } + + var status string + if err := db.QueryRow(`SELECT status FROM hitl_interrupts WHERE id = 'pending-1'`).Scan(&status); err != nil { + t.Fatalf("pending row missing: %v", err) + } + if err := db.QueryRow(`SELECT id FROM hitl_interrupts WHERE id = 'done-1'`).Scan(new(string)); err == nil { + t.Fatal("decided row should be deleted") + } +} + +func TestPurgeHitlInterruptLogsBefore(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "hitl.db") + db, err := NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + ensureHitlInterruptsTable(t, db) + + old := time.Now().AddDate(0, 0, -100).UTC().Format(time.RFC3339) + recent := time.Now().AddDate(0, 0, -1).UTC().Format(time.RFC3339) + for _, row := range []struct{ id, decided string }{ + {"old-1", old}, + {"new-1", recent}, + } { + if _, err := db.Exec(`INSERT INTO hitl_interrupts + (id, conversation_id, mode, tool_name, status, decision, created_at, decided_at) + VALUES (?, 'c1', 'approval', 'exec', 'decided', 'approve', ?, ?)`, row.id, row.decided, row.decided); err != nil { + t.Fatalf("insert %s: %v", row.id, err) + } + } + + cutoff := time.Now().AddDate(0, 0, -90) + deleted, err := db.PurgeHitlInterruptLogsBefore(cutoff) + if err != nil { + t.Fatalf("PurgeHitlInterruptLogsBefore: %v", err) + } + if deleted != 1 { + t.Fatalf("deleted = %d, want 1", deleted) + } + if err := db.QueryRow(`SELECT id FROM hitl_interrupts WHERE id = 'old-1'`).Scan(new(string)); err == nil { + t.Fatal("old row should be purged") + } + if err := db.QueryRow(`SELECT id FROM hitl_interrupts WHERE id = 'new-1'`).Scan(new(string)); err != nil { + t.Fatalf("new row should remain: %v", err) + } +}