From 7e4a8db7afd41dee34587f4efe23e62fab812194 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 26 Jun 2026 01:01:49 +0800 Subject: [PATCH] Add files via upload --- internal/database/monitor.go | 227 ++++++++++++++++++--- internal/database/monitor_summary_test.go | 86 ++++++++ internal/handler/monitor.go | 236 ++++++++++++++++++++-- internal/handler/openapi.go | 23 ++- 4 files changed, 525 insertions(+), 47 deletions(-) create mode 100644 internal/database/monitor_summary_test.go diff --git a/internal/database/monitor.go b/internal/database/monitor.go index 1e0175d9..e3bbf087 100644 --- a/internal/database/monitor.go +++ b/internal/database/monitor.go @@ -3,7 +3,6 @@ package database import ( "database/sql" "encoding/json" - "sort" "strings" "time" @@ -227,6 +226,167 @@ func (db *DB) LoadToolExecutionsWithPagination(offset, limit int, status, toolNa return executions, nil } +func toolExecutionsFilterSQL(status, toolName string) (string, []interface{}) { + args := []interface{}{} + conditions := []string{} + if status != "" { + conditions = append(conditions, "status = ?") + args = append(args, status) + } + if toolName != "" { + conditions = append(conditions, "LOWER(tool_name) LIKE ?") + args = append(args, "%"+strings.ToLower(toolName)+"%") + } + if len(conditions) == 0 { + return "", args + } + return ` WHERE ` + strings.Join(conditions, ` AND `), args +} + +// ToolStatsSummary 工具调用汇总(全量聚合,不含逐工具明细) +type ToolStatsSummary struct { + TotalCalls int + SuccessCalls int + FailedCalls int + LastCallTime *time.Time + ToolCount int +} + +// ToolStatsSummaryResult 汇总 + Top N 工具排行 +type ToolStatsSummaryResult struct { + Summary ToolStatsSummary + TopTools []*mcp.ToolStats +} + +// LoadToolStatsSummary 聚合统计信息,仅返回汇总与 Top N 工具(避免全量 map 传输) +func (db *DB) LoadToolStatsSummary(topN int) (*ToolStatsSummaryResult, error) { + if topN <= 0 { + topN = 6 + } + if topN > 100 { + topN = 100 + } + + result := &ToolStatsSummaryResult{ + TopTools: make([]*mcp.ToolStats, 0, topN), + } + + summaryQuery := ` + SELECT COUNT(*), + COALESCE(SUM(total_calls), 0), + COALESCE(SUM(success_calls), 0), + COALESCE(SUM(failed_calls), 0), + MAX(last_call_time) + FROM tool_stats + ` + var lastCallRaw sql.NullString + err := db.QueryRow(summaryQuery).Scan( + &result.Summary.ToolCount, + &result.Summary.TotalCalls, + &result.Summary.SuccessCalls, + &result.Summary.FailedCalls, + &lastCallRaw, + ) + if err != nil { + return nil, err + } + if lastCallRaw.Valid && strings.TrimSpace(lastCallRaw.String) != "" { + if t, parseErr := time.Parse(time.RFC3339Nano, lastCallRaw.String); parseErr == nil { + result.Summary.LastCallTime = &t + } else if t, parseErr := time.Parse("2006-01-02 15:04:05.999999999-07:00", lastCallRaw.String); parseErr == nil { + result.Summary.LastCallTime = &t + } else if t, parseErr := time.Parse("2006-01-02 15:04:05", lastCallRaw.String); parseErr == nil { + result.Summary.LastCallTime = &t + } + } + + topQuery := ` + SELECT tool_name, total_calls, success_calls, failed_calls, last_call_time + FROM tool_stats + WHERE total_calls > 0 + ORDER BY total_calls DESC, tool_name ASC + LIMIT ? + ` + rows, err := db.Query(topQuery, topN) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var stat mcp.ToolStats + var lastCallTime sql.NullTime + if err := rows.Scan( + &stat.ToolName, + &stat.TotalCalls, + &stat.SuccessCalls, + &stat.FailedCalls, + &lastCallTime, + ); err != nil { + db.logger.Warn("加载 Top 工具统计失败", zap.Error(err)) + continue + } + if lastCallTime.Valid { + stat.LastCallTime = &lastCallTime.Time + } + result.TopTools = append(result.TopTools, &stat) + } + + return result, nil +} + +// LoadToolExecutionListPage 分页加载执行记录列表(不含 arguments/result,供监控列表使用) +func (db *DB) LoadToolExecutionListPage(offset, limit int, status, toolName string) ([]*mcp.ToolExecution, error) { + if limit <= 0 { + limit = 20 + } + if limit > 100 { + limit = 100 + } + + query := ` + SELECT id, tool_name, status, start_time, end_time, duration_ms + FROM tool_executions + ` + whereSQL, args := toolExecutionsFilterSQL(status, toolName) + query += whereSQL + ` ORDER BY start_time DESC LIMIT ? OFFSET ?` + args = append(args, limit, offset) + + rows, err := db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + executions := make([]*mcp.ToolExecution, 0, limit) + for rows.Next() { + var exec mcp.ToolExecution + var endTime sql.NullTime + var durationMs sql.NullInt64 + + if err := rows.Scan( + &exec.ID, + &exec.ToolName, + &exec.Status, + &exec.StartTime, + &endTime, + &durationMs, + ); err != nil { + db.logger.Warn("加载执行记录列表失败", zap.Error(err)) + continue + } + if endTime.Valid { + exec.EndTime = &endTime.Time + } + if durationMs.Valid { + exec.Duration = time.Duration(durationMs.Int64) * time.Millisecond + } + executions = append(executions, &exec) + } + + return executions, nil +} + // GetToolExecution 根据ID获取单条工具执行记录 func (db *DB) GetToolExecution(id string) (*mcp.ToolExecution, error) { query := ` @@ -687,13 +847,28 @@ func truncateCallsTimelineBucket(t time.Time, dailyBuckets bool) time.Time { // LoadCallsTimeline 按时间范围加载调用趋势(since 起至今,含边界) func (db *DB) LoadCallsTimeline(since time.Time, dailyBuckets bool) ([]CallsTimelineBucket, error) { - // 在 Go 侧按本地时区分桶,避免 SQLite strftime 对 UTC 存储时间分桶后再误当本地时间解析(差 8h 等问题) - query := ` - SELECT start_time, - CASE WHEN status IN ('failed', 'cancelled') THEN 1 ELSE 0 END AS failed - FROM tool_executions - WHERE start_time >= ? - ` + var query string + if dailyBuckets { + query = ` + SELECT date(start_time, 'localtime') AS bucket, + COUNT(*) AS total, + SUM(CASE WHEN status IN ('failed', 'cancelled') THEN 1 ELSE 0 END) AS failed + FROM tool_executions + WHERE start_time >= ? + GROUP BY bucket + ORDER BY bucket + ` + } else { + query = ` + SELECT strftime('%Y-%m-%d %H:00:00', start_time, 'localtime') AS bucket, + COUNT(*) AS total, + SUM(CASE WHEN status IN ('failed', 'cancelled') THEN 1 ELSE 0 END) AS failed + FROM tool_executions + WHERE start_time >= ? + GROUP BY bucket + ORDER BY bucket + ` + } rows, err := db.Query(query, since) if err != nil { @@ -701,35 +876,35 @@ func (db *DB) LoadCallsTimeline(since time.Time, dailyBuckets bool) ([]CallsTime } defer rows.Close() - bucketMap := make(map[time.Time]struct{ total, failed int }) + buckets := make([]CallsTimelineBucket, 0) for rows.Next() { - var startTime time.Time - var failed int - if err := rows.Scan(&startTime, &failed); err != nil { + var bucketStr string + var total, failed int + if err := rows.Scan(&bucketStr, &total, &failed); err != nil { db.logger.Warn("加载调用趋势失败", zap.Error(err)) continue } - key := truncateCallsTimelineBucket(startTime, dailyBuckets) - entry := bucketMap[key] - entry.total++ - entry.failed += failed - bucketMap[key] = entry - } - - buckets := make([]CallsTimelineBucket, 0, len(bucketMap)) - for bucketTime, counts := range bucketMap { + bucketTime, err := parseCallsTimelineBucket(bucketStr, dailyBuckets) + if err != nil { + db.logger.Warn("解析调用趋势时间桶失败", zap.Error(err), zap.String("bucket", bucketStr)) + continue + } buckets = append(buckets, CallsTimelineBucket{ BucketTime: bucketTime, - Total: counts.total, - Failed: counts.failed, + Total: total, + Failed: failed, }) } - sort.Slice(buckets, func(i, j int) bool { - return buckets[i].BucketTime.Before(buckets[j].BucketTime) - }) return buckets, nil } +func parseCallsTimelineBucket(bucketStr string, dailyBuckets bool) (time.Time, error) { + if dailyBuckets { + return time.ParseInLocation("2006-01-02", bucketStr, time.Local) + } + return time.ParseInLocation("2006-01-02 15:04:05", bucketStr, time.Local) +} + // DecreaseToolStats 减少工具统计信息(用于删除执行记录时) // 如果统计信息变为0,则删除该统计记录 func (db *DB) DecreaseToolStats(toolName string, totalCalls, successCalls, failedCalls int) error { diff --git a/internal/database/monitor_summary_test.go b/internal/database/monitor_summary_test.go new file mode 100644 index 00000000..7c162aed --- /dev/null +++ b/internal/database/monitor_summary_test.go @@ -0,0 +1,86 @@ +package database + +import ( + "fmt" + "path/filepath" + "testing" + "time" + + "cyberstrike-ai/internal/mcp" + + "go.uber.org/zap" +) + +func TestLoadToolStatsSummaryAndListPage(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "monitor-summary.db") + db, err := NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + + now := time.Now() + tools := []struct { + name string + calls int + ok int + fail int + result string + }{ + {"alpha::run", 10, 9, 1, `{"content":[{"type":"text","text":"` + string(make([]byte, 64*1024)) + `"}]}`}, + {"beta::scan", 5, 5, 0, `{"content":[{"type":"text","text":"ok"}]}`}, + {"gamma::ping", 1, 1, 0, `{"content":[{"type":"text","text":"pong"}]}`}, + } + + for _, tool := range tools { + if err := db.UpdateToolStats(tool.name, tool.calls, tool.ok, tool.fail, &now); err != nil { + t.Fatalf("UpdateToolStats(%s): %v", tool.name, err) + } + for j := 0; j < tool.calls; j++ { + exec := &mcp.ToolExecution{ + ID: fmt.Sprintf("%s-exec-%d", tool.name, j), + ToolName: tool.name, + Arguments: map[string]interface{}{"n": j}, + Status: "completed", + StartTime: now.Add(-time.Duration(j) * time.Minute), + Result: &mcp.ToolResult{Content: []mcp.Content{{Type: "text", Text: tool.result}}}, + } + end := exec.StartTime.Add(time.Second) + exec.EndTime = &end + exec.Duration = time.Second + if err := db.SaveToolExecution(exec); err != nil { + t.Fatalf("SaveToolExecution: %v", err) + } + } + } + + summary, err := db.LoadToolStatsSummary(2) + if err != nil { + t.Fatalf("LoadToolStatsSummary: %v", err) + } + if summary.Summary.ToolCount != 3 { + t.Fatalf("toolCount = %d, want 3", summary.Summary.ToolCount) + } + if summary.Summary.TotalCalls != 16 { + t.Fatalf("totalCalls = %d, want 16", summary.Summary.TotalCalls) + } + if len(summary.TopTools) != 2 { + t.Fatalf("top tools = %d, want 2", len(summary.TopTools)) + } + if summary.TopTools[0].ToolName != "alpha::run" { + t.Fatalf("top tool = %q, want alpha::run", summary.TopTools[0].ToolName) + } + + list, err := db.LoadToolExecutionListPage(0, 5, "", "") + if err != nil { + t.Fatalf("LoadToolExecutionListPage: %v", err) + } + if len(list) != 5 { + t.Fatalf("list len = %d, want 5", len(list)) + } + for _, exec := range list { + if exec.Arguments != nil || exec.Result != nil || exec.Error != "" { + t.Fatalf("expected lite execution row, got args/result/error on %s", exec.ID) + } + } +} diff --git a/internal/handler/monitor.go b/internal/handler/monitor.go index 3d5fc4d4..e44a17cf 100644 --- a/internal/handler/monitor.go +++ b/internal/handler/monitor.go @@ -5,6 +5,7 @@ import ( "errors" "io" "net/http" + "sort" "strconv" "strings" "time" @@ -68,16 +69,34 @@ func (h *MonitorHandler) SetAgentHandler(ah *AgentHandler) { h.agentHandler = ah } +const monitorPageTopTools = 6 + +// MonitorStatsSummary 工具调用汇总 +type MonitorStatsSummary struct { + TotalCalls int `json:"totalCalls"` + SuccessCalls int `json:"successCalls"` + FailedCalls int `json:"failedCalls"` + LastCallTime *time.Time `json:"lastCallTime,omitempty"` + ToolCount int `json:"toolCount"` +} + // MonitorResponse 监控响应 type MonitorResponse struct { - Executions []*mcp.ToolExecution `json:"executions"` - Stats map[string]*mcp.ToolStats `json:"stats"` - Timestamp time.Time `json:"timestamp"` - Total int `json:"total,omitempty"` - Page int `json:"page,omitempty"` - PageSize int `json:"page_size,omitempty"` - TotalPages int `json:"total_pages,omitempty"` - RetentionDays int `json:"retention_days,omitempty"` + Executions []*mcp.ToolExecution `json:"executions"` + Summary *MonitorStatsSummary `json:"summary"` + TopTools []*mcp.ToolStats `json:"topTools"` + Timestamp time.Time `json:"timestamp"` + Total int `json:"total"` + Page int `json:"page"` + PageSize int `json:"pageSize"` + TotalPages int `json:"totalPages"` + RetentionDays int `json:"retentionDays"` +} + +// StatsResponse 统计信息响应(Dashboard 等) +type StatsResponse struct { + Summary *MonitorStatsSummary `json:"summary"` + TopTools []*mcp.ToolStats `json:"topTools"` } // Monitor 获取监控信息 @@ -101,9 +120,9 @@ func (h *MonitorHandler) Monitor(c *gin.Context) { // 解析工具筛选参数(兼容 mcp__tool 与内部 mcp::tool) toolName := normalizeToolNameFilter(c.Query("tool")) - executions, total := h.loadExecutionsWithPagination(page, pageSize, status, toolName) + executions, total := h.loadExecutionListWithPagination(page, pageSize, status, toolName) h.enrichExecutionsConversationID(executions) - stats := h.loadStats() + summary, topTools := h.loadStatsSummary(monitorPageTopTools) totalPages := (total + pageSize - 1) / pageSize if totalPages == 0 { @@ -112,7 +131,8 @@ func (h *MonitorHandler) Monitor(c *gin.Context) { c.JSON(http.StatusOK, MonitorResponse{ Executions: executions, - Stats: stats, + Summary: summary, + TopTools: topTools, Timestamp: time.Now(), Total: total, Page: page, @@ -134,6 +154,112 @@ func (h *MonitorHandler) loadExecutions() []*mcp.ToolExecution { return executions } +func (h *MonitorHandler) loadExecutionListWithPagination(page, pageSize int, status, toolName string) ([]*mcp.ToolExecution, int) { + if h.db == nil { + allExecutions := h.mcpServer.GetAllExecutions() + if status != "" || toolName != "" { + filtered := make([]*mcp.ToolExecution, 0) + for _, exec := range allExecutions { + matchStatus := status == "" || exec.Status == status + matchTool := toolNameFilterMatches(exec.ToolName, toolName) + if matchStatus && matchTool { + filtered = append(filtered, exec) + } + } + allExecutions = filtered + } + total := len(allExecutions) + offset := (page - 1) * pageSize + end := offset + pageSize + if end > total { + end = total + } + if offset >= total { + return []*mcp.ToolExecution{}, total + } + pageSlice := allExecutions[offset:end] + out := make([]*mcp.ToolExecution, 0, len(pageSlice)) + for _, exec := range pageSlice { + if exec == nil { + continue + } + out = append(out, slimToolExecution(exec)) + } + return out, total + } + + offset := (page - 1) * pageSize + executions, err := h.db.LoadToolExecutionListPage(offset, pageSize, status, toolName) + if err != nil { + h.logger.Warn("从数据库加载执行记录列表失败,回退到内存数据", zap.Error(err)) + return h.loadExecutionListWithPaginationFromMemory(page, pageSize, status, toolName) + } + + total, err := h.db.CountToolExecutions(status, toolName) + if err != nil { + h.logger.Warn("获取执行记录总数失败", zap.Error(err)) + total = offset + len(executions) + if len(executions) == pageSize { + total = offset + len(executions) + 1 + } + } + + return executions, total +} + +func (h *MonitorHandler) loadExecutionListWithPaginationFromMemory(page, pageSize int, status, toolName string) ([]*mcp.ToolExecution, int) { + allExecutions := h.mcpServer.GetAllExecutions() + if status != "" || toolName != "" { + filtered := make([]*mcp.ToolExecution, 0) + for _, exec := range allExecutions { + matchStatus := status == "" || exec.Status == status + matchTool := toolNameFilterMatches(exec.ToolName, toolName) + if matchStatus && matchTool { + filtered = append(filtered, exec) + } + } + allExecutions = filtered + } + total := len(allExecutions) + offset := (page - 1) * pageSize + end := offset + pageSize + if end > total { + end = total + } + if offset >= total { + return []*mcp.ToolExecution{}, total + } + pageSlice := allExecutions[offset:end] + out := make([]*mcp.ToolExecution, 0, len(pageSlice)) + for _, exec := range pageSlice { + if exec == nil { + continue + } + out = append(out, slimToolExecution(exec)) + } + return out, total +} + +func slimToolExecution(exec *mcp.ToolExecution) *mcp.ToolExecution { + if exec == nil { + return nil + } + slim := &mcp.ToolExecution{ + ID: exec.ID, + ToolName: exec.ToolName, + Status: exec.Status, + StartTime: exec.StartTime, + } + if exec.EndTime != nil { + end := *exec.EndTime + slim.EndTime = &end + } + if exec.Duration > 0 { + slim.Duration = exec.Duration + } + return slim +} + func (h *MonitorHandler) loadExecutionsWithPagination(page, pageSize int, status, toolName string) ([]*mcp.ToolExecution, int) { if h.db == nil { allExecutions := h.mcpServer.GetAllExecutions() @@ -206,7 +332,78 @@ func (h *MonitorHandler) loadExecutionsWithPagination(page, pageSize int, status return executions, total } -func (h *MonitorHandler) loadStats() map[string]*mcp.ToolStats { +func (h *MonitorHandler) loadStatsSummary(topN int) (*MonitorStatsSummary, []*mcp.ToolStats) { + if topN <= 0 { + topN = monitorPageTopTools + } + + if h.db != nil { + result, err := h.db.LoadToolStatsSummary(topN) + if err == nil { + return dbStatsSummaryToMonitor(result), result.TopTools + } + h.logger.Warn("从数据库加载统计汇总失败,回退到内存数据", zap.Error(err)) + } + + stats := h.loadStatsMap() + return summarizeToolStats(stats, topN) +} + +func dbStatsSummaryToMonitor(result *database.ToolStatsSummaryResult) *MonitorStatsSummary { + if result == nil { + return &MonitorStatsSummary{} + } + summary := &MonitorStatsSummary{ + TotalCalls: result.Summary.TotalCalls, + SuccessCalls: result.Summary.SuccessCalls, + FailedCalls: result.Summary.FailedCalls, + ToolCount: result.Summary.ToolCount, + } + if result.Summary.LastCallTime != nil { + t := *result.Summary.LastCallTime + summary.LastCallTime = &t + } + return summary +} + +func summarizeToolStats(stats map[string]*mcp.ToolStats, topN int) (*MonitorStatsSummary, []*mcp.ToolStats) { + summary := &MonitorStatsSummary{} + if len(stats) == 0 { + return summary, nil + } + + all := make([]*mcp.ToolStats, 0, len(stats)) + for _, stat := range stats { + if stat == nil { + continue + } + summary.ToolCount++ + summary.TotalCalls += stat.TotalCalls + summary.SuccessCalls += stat.SuccessCalls + summary.FailedCalls += stat.FailedCalls + if stat.LastCallTime != nil && (summary.LastCallTime == nil || stat.LastCallTime.After(*summary.LastCallTime)) { + t := *stat.LastCallTime + summary.LastCallTime = &t + } + if stat.TotalCalls > 0 { + statCopy := *stat + all = append(all, &statCopy) + } + } + + sort.Slice(all, func(i, j int) bool { + if all[i].TotalCalls == all[j].TotalCalls { + return all[i].ToolName < all[j].ToolName + } + return all[i].TotalCalls > all[j].TotalCalls + }) + if len(all) > topN { + all = all[:topN] + } + return summary, all +} + +func (h *MonitorHandler) loadStatsMap() map[string]*mcp.ToolStats { // 合并内部MCP服务器和外部MCP管理器的统计信息 stats := make(map[string]*mcp.ToolStats) @@ -334,7 +531,7 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) { func (h *MonitorHandler) enrichExecutionsConversationID(executions []*mcp.ToolExecution) { for _, exec := range executions { - if exec == nil { + if exec == nil || exec.Status != "running" { continue } exec.ConversationID = h.conversationIDForRunningExecution(exec.ID) @@ -415,8 +612,17 @@ func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) { // GetStats 获取统计信息 func (h *MonitorHandler) GetStats(c *gin.Context) { - stats := h.loadStats() - c.JSON(http.StatusOK, stats) + topN := 30 + if topStr := c.Query("top"); topStr != "" { + if t, err := strconv.Atoi(topStr); err == nil && t > 0 && t <= 100 { + topN = t + } + } + summary, topTools := h.loadStatsSummary(topN) + c.JSON(http.StatusOK, StatsResponse{ + Summary: summary, + TopTools: topTools, + }) } // CallsTimelinePoint 调用趋势数据点 diff --git a/internal/handler/openapi.go b/internal/handler/openapi.go index 70979da0..e14444d5 100644 --- a/internal/handler/openapi.go +++ b/internal/handler/openapi.go @@ -740,14 +740,21 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { "properties": map[string]interface{}{ "executions": map[string]interface{}{ "type": "array", - "description": "执行记录列表", + "description": "执行记录列表(轻量字段,不含 arguments/result)", "items": map[string]interface{}{ "$ref": "#/components/schemas/ToolExecution", }, }, - "stats": map[string]interface{}{ + "summary": map[string]interface{}{ "type": "object", - "description": "统计信息", + "description": "工具调用汇总", + }, + "topTools": map[string]interface{}{ + "type": "array", + "description": "调用量 Top N 工具", + "items": map[string]interface{}{ + "type": "object", + }, }, "timestamp": map[string]interface{}{ "type": "string", @@ -756,20 +763,24 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) { }, "total": map[string]interface{}{ "type": "integer", - "description": "总数", + "description": "执行记录总数", }, "page": map[string]interface{}{ "type": "integer", "description": "当前页", }, - "page_size": map[string]interface{}{ + "pageSize": map[string]interface{}{ "type": "integer", "description": "每页数量", }, - "total_pages": map[string]interface{}{ + "totalPages": map[string]interface{}{ "type": "integer", "description": "总页数", }, + "retentionDays": map[string]interface{}{ + "type": "integer", + "description": "执行记录保留天数", + }, }, }, "ConfigResponse": map[string]interface{}{