diff --git a/internal/app/app.go b/internal/app/app.go index ae6d2c60..53f9e81d 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -25,6 +25,7 @@ import ( "cyberstrike-ai/internal/logger" "cyberstrike-ai/internal/mcp" "cyberstrike-ai/internal/mcp/builtin" + "cyberstrike-ai/internal/monitor" "cyberstrike-ai/internal/robot" "cyberstrike-ai/internal/security" "cyberstrike-ai/internal/skillpackage" @@ -99,6 +100,10 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error auditSvc.PurgeExpired() audit.StartRetentionLoop(auditSvc, log.Logger) + monitorRetention := monitor.NewService(db, cfg, log.Logger) + monitorRetention.PurgeExpired() + monitor.StartRetentionLoop(monitorRetention, log.Logger) + // 创建MCP服务器(带数据库持久化) mcpServer := mcp.NewServerWithStorage(log.Logger, db) mcpServer.ConfigureHTTPToolCallTimeoutFromAgentMinutes(cfg.Agent.ToolTimeoutMinutes) @@ -326,6 +331,7 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error } monitorHandler := handler.NewMonitorHandler(mcpServer, executor, db, log.Logger) monitorHandler.SetAudit(auditSvc) + monitorHandler.SetMonitorRetention(monitorRetention) monitorHandler.SetExternalMCPManager(externalMCPMgr) // 设置外部MCP管理器,以便获取外部MCP执行记录 notificationHandler := handler.NewNotificationHandler(db, agentHandler, log.Logger) groupHandler := handler.NewGroupHandler(db, log.Logger) diff --git a/internal/monitor/retention.go b/internal/monitor/retention.go new file mode 100644 index 00000000..d1ffb295 --- /dev/null +++ b/internal/monitor/retention.go @@ -0,0 +1,71 @@ +package monitor + +import ( + "time" + + "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/database" + + "go.uber.org/zap" +) + +const retentionPurgeInterval = time.Hour + +// Service manages MCP tool execution monitor retention. +type Service struct { + db *database.DB + cfg *config.Config + logger *zap.Logger +} + +// NewService creates a monitor retention service. +func NewService(db *database.DB, cfg *config.Config, logger *zap.Logger) *Service { + return &Service{db: db, cfg: cfg, logger: logger} +} + +// RetentionDays returns configured retention; 0 means keep forever. +func (s *Service) RetentionDays() int { + if s == nil || s.cfg == nil { + return config.MonitorConfig{}.RetentionDaysEffective() + } + return s.cfg.Monitor.RetentionDaysEffective() +} + +// PurgeExpired deletes tool execution rows older than retention_days when configured. +func (s *Service) PurgeExpired() { + if s == nil || s.db == nil || s.cfg == nil { + return + } + days := s.cfg.Monitor.RetentionDaysEffective() + if days <= 0 { + return + } + cutoff := time.Now().AddDate(0, 0, -days) + n, err := s.db.PurgeToolExecutionsBefore(cutoff) + if err != nil { + if s.logger != nil { + s.logger.Warn("清理过期 MCP 执行记录失败", zap.Error(err)) + } + return + } + if n > 0 && s.logger != nil { + s.logger.Info("已清理过期 MCP 执行记录", zap.Int64("deleted", n), zap.Int("retention_days", days)) + } +} + +// StartRetentionLoop periodically purges expired tool execution rows. +func StartRetentionLoop(s *Service, logger *zap.Logger) { + if s == nil { + return + } + go func() { + ticker := time.NewTicker(retentionPurgeInterval) + defer ticker.Stop() + for range ticker.C { + s.PurgeExpired() + if logger != nil { + logger.Debug("monitor retention tick completed") + } + } + }() +} diff --git a/internal/monitor/retention_test.go b/internal/monitor/retention_test.go new file mode 100644 index 00000000..40425fd6 --- /dev/null +++ b/internal/monitor/retention_test.go @@ -0,0 +1,94 @@ +package monitor + +import ( + "path/filepath" + "testing" + "time" + + "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/database" + "cyberstrike-ai/internal/mcp" + + "go.uber.org/zap" +) + +func TestServicePurgeExpired_respectsZeroRetention(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "monitor.db") + db, err := database.NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + + exec := &mcp.ToolExecution{ + ID: "ancient", + ToolName: "curl::get", + Arguments: map[string]interface{}{}, + Status: "completed", + StartTime: mustParseTime(t, "2020-01-01T00:00:00Z"), + } + if err := db.SaveToolExecution(exec); err != nil { + t.Fatalf("SaveToolExecution: %v", err) + } + + zero := 0 + svc := NewService(db, &config.Config{ + Monitor: config.MonitorConfig{RetentionDays: &zero}, + }, zap.NewNop()) + svc.PurgeExpired() + + if _, err := db.GetToolExecution("ancient"); err != nil { + t.Fatalf("record should remain when retention_days=0: %v", err) + } +} + +func TestServicePurgeExpired_deletesOldRows(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "monitor.db") + db, err := database.NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + + exec := &mcp.ToolExecution{ + ID: "ancient", + ToolName: "curl::get", + Arguments: map[string]interface{}{}, + Status: "completed", + StartTime: mustParseTime(t, "2020-01-01T00:00:00Z"), + } + if err := db.SaveToolExecution(exec); err != nil { + t.Fatalf("SaveToolExecution: %v", err) + } + + days := 90 + svc := NewService(db, &config.Config{ + Monitor: config.MonitorConfig{RetentionDays: &days}, + }, zap.NewNop()) + svc.PurgeExpired() + + if _, err := db.GetToolExecution("ancient"); err == nil { + t.Fatal("record should be purged when older than retention_days") + } +} + +func TestRetentionDaysEffective_defaults(t *testing.T) { + got := config.MonitorConfig{}.RetentionDaysEffective() + if got != 90 { + t.Fatalf("default = %d, want 90", got) + } + zero := 0 + cfg := config.MonitorConfig{RetentionDays: &zero} + if cfg.RetentionDaysEffective() != 0 { + t.Fatalf("zero = %d, want 0", cfg.RetentionDaysEffective()) + } +} + +func mustParseTime(t *testing.T, value string) time.Time { + t.Helper() + parsed, err := time.Parse(time.RFC3339, value) + if err != nil { + t.Fatalf("parse time: %v", err) + } + return parsed +}