From 95470fefbc1a1435d64d9f27b6cd1a33b499e782 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 25 Jun 2026 14:47:16 +0800 Subject: [PATCH] Add files via upload --- internal/config/config.go | 17 +++- internal/database/monitor.go | 87 +++++++++++++++++ internal/database/monitor_reconcile_test.go | 102 ++++++++++++++++++++ internal/mcp/external_manager.go | 17 ++++ internal/mcp/server.go | 17 ++++ 5 files changed, 238 insertions(+), 2 deletions(-) create mode 100644 internal/database/monitor_reconcile_test.go diff --git a/internal/config/config.go b/internal/config/config.go index 61ca9884..fc8d6137 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -96,9 +96,12 @@ type MultiAgentConfig struct { // OrchestratorInstructionSupervisor supervisor 主代理系统提示(transfer/exit 说明仍由运行追加);非空且 agents/orchestrator-supervisor.md 正文为空或未存在时生效。 OrchestratorInstructionSupervisor string `yaml:"orchestrator_instruction_supervisor,omitempty" json:"orchestrator_instruction_supervisor,omitempty"` SubAgents []MultiAgentSubConfig `yaml:"sub_agents" json:"sub_agents"` - // SubAgentUserContextMaxRunes caps the user-context supplement appended to task descriptions for sub-agents. - // 0 (default) uses the built-in default of 2000 runes; negative value disables injection entirely. + // SubAgentUserContextMaxRunes caps user-context supplement for sub-agent task descriptions. + // 0 (default) preserves all user turns verbatim; >0 caps total runes; negative disables injection. SubAgentUserContextMaxRunes int `yaml:"sub_agent_user_context_max_runes,omitempty" json:"sub_agent_user_context_max_runes,omitempty"` + // UserVerbatimAnchorMaxRunes injects all user turns verbatim into system prompt (survives summarization refresh). + // 0 (default) = no cap; >0 = total rune cap; negative disables anchor injection. + UserVerbatimAnchorMaxRunes int `yaml:"user_verbatim_anchor_max_runes,omitempty" json:"user_verbatim_anchor_max_runes,omitempty"` // EinoSkills configures CloudWeGo Eino ADK skill middleware + optional local filesystem/execute on DeepAgent. EinoSkills MultiAgentEinoSkillsConfig `yaml:"eino_skills,omitempty" json:"eino_skills,omitempty"` // EinoMiddleware wires optional ADK middleware (patchtoolcalls, toolsearch, plantask, reduction) and Deep extras. @@ -107,6 +110,16 @@ type MultiAgentConfig struct { EinoCallbacks MultiAgentEinoCallbacksConfig `yaml:"eino_callbacks,omitempty" json:"eino_callbacks,omitempty"` } +// UserVerbatimAnchorMaxRunesEffective returns max runes for user verbatim anchor; 0 = unlimited; negative = disabled. +func (c MultiAgentConfig) UserVerbatimAnchorMaxRunesEffective() int { + return c.UserVerbatimAnchorMaxRunes +} + +// SubAgentUserContextMaxRunesEffective returns max runes for sub-agent task supplement; 0 = unlimited; negative = disabled. +func (c MultiAgentConfig) SubAgentUserContextMaxRunesEffective() int { + return c.SubAgentUserContextMaxRunes +} + // MultiAgentEinoCallbacksConfig enables Eino unified callbacks on each ADK agent run (deep / plan_execute / supervisor / eino_single). // Modes: log_only (zap + optional OTel; no SSE to browser), sse (adds client SSE eino_trace_* when sse_trace_to_client), full (sse rules + stream callback copies closed). type MultiAgentEinoCallbacksConfig struct { diff --git a/internal/database/monitor.go b/internal/database/monitor.go index 75ad29ae..1e0175d9 100644 --- a/internal/database/monitor.go +++ b/internal/database/monitor.go @@ -288,6 +288,93 @@ func (db *DB) GetToolExecution(id string) (*mcp.ToolExecution, error) { return &exec, nil } +// CancelOrphanedRunningToolExecutions 将仍为 running 的记录批量标记为 cancelled(如进程重启后无对应执行协程)。 +func (db *DB) CancelOrphanedRunningToolExecutions(endTime time.Time, errMsg string) (int64, error) { + errMsg = strings.TrimSpace(errMsg) + if errMsg == "" { + errMsg = "执行已中断(服务重启或会话结束)" + } + query := ` + UPDATE tool_executions + SET status = 'cancelled', + error = ?, + end_time = ?, + duration_ms = MAX(0, CAST((julianday(?) - julianday(start_time)) * 86400000 AS INTEGER)) + WHERE status = 'running' + ` + res, err := db.Exec(query, errMsg, endTime, endTime) + if err != nil { + return 0, err + } + return res.RowsAffected() +} + +// FinalizeStaleRunningToolExecutions 将「非活跃且超过 minAge」的 running 记录标记为 cancelled。 +// activeIDs 为当前进程内仍登记 cancel 的 executionId;不在集合内且已超时的视为孤儿记录。 +func (db *DB) FinalizeStaleRunningToolExecutions(endTime time.Time, minAge time.Duration, activeIDs map[string]struct{}, errMsg string) (int64, error) { + errMsg = strings.TrimSpace(errMsg) + if errMsg == "" { + errMsg = "执行已中断(会话已结束)" + } + if minAge < 0 { + minAge = 0 + } + cutoff := endTime.Add(-minAge) + rows, err := db.Query(` + SELECT id, start_time FROM tool_executions + WHERE status = 'running' AND start_time <= ? + `, cutoff) + if err != nil { + return 0, err + } + defer rows.Close() + + type staleRow struct { + id string + startTime time.Time + } + var stale []staleRow + for rows.Next() { + var row staleRow + if err := rows.Scan(&row.id, &row.startTime); err != nil { + db.logger.Warn("读取 stale running 执行记录失败", zap.Error(err)) + continue + } + if activeIDs != nil { + if _, active := activeIDs[row.id]; active { + continue + } + } + stale = append(stale, row) + } + if err := rows.Err(); err != nil { + return 0, err + } + if len(stale) == 0 { + return 0, nil + } + + var affected int64 + for _, row := range stale { + durationMs := endTime.Sub(row.startTime).Milliseconds() + if durationMs < 0 { + durationMs = 0 + } + res, err := db.Exec(` + UPDATE tool_executions + SET status = 'cancelled', error = ?, end_time = ?, duration_ms = ? + WHERE id = ? AND status = 'running' + `, errMsg, endTime, durationMs, row.id) + if err != nil { + db.logger.Warn("更新 stale running 执行记录失败", zap.Error(err), zap.String("executionId", row.id)) + continue + } + n, _ := res.RowsAffected() + affected += n + } + return affected, nil +} + // DeleteToolExecution 删除工具执行记录 func (db *DB) DeleteToolExecution(id string) error { query := `DELETE FROM tool_executions WHERE id = ?` diff --git a/internal/database/monitor_reconcile_test.go b/internal/database/monitor_reconcile_test.go new file mode 100644 index 00000000..c2f96a3c --- /dev/null +++ b/internal/database/monitor_reconcile_test.go @@ -0,0 +1,102 @@ +package database + +import ( + "path/filepath" + "testing" + "time" + + "cyberstrike-ai/internal/mcp" + + "go.uber.org/zap" +) + +func TestCancelOrphanedRunningToolExecutions(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "monitor.db") + db, err := NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + + start := time.Now().Add(-2 * time.Hour) + exec := &mcp.ToolExecution{ + ID: "orphan-hydra", + ToolName: "hydra", + Arguments: map[string]interface{}{"target": "127.0.0.1"}, + Status: "running", + StartTime: start, + } + if err := db.SaveToolExecution(exec); err != nil { + t.Fatalf("SaveToolExecution: %v", err) + } + + end := time.Now() + n, err := db.CancelOrphanedRunningToolExecutions(end, "执行已中断(服务重启)") + if err != nil { + t.Fatalf("CancelOrphanedRunningToolExecutions: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 row updated, got %d", n) + } + + got, err := db.GetToolExecution("orphan-hydra") + if err != nil { + t.Fatalf("GetToolExecution: %v", err) + } + if got.Status != "cancelled" { + t.Fatalf("expected cancelled, got %s", got.Status) + } + if got.EndTime == nil { + t.Fatal("expected end_time to be set") + } + if got.Duration <= 0 { + t.Fatalf("expected positive duration, got %v", got.Duration) + } +} + +func TestFinalizeStaleRunningToolExecutions_skipsActive(t *testing.T) { + dbPath := filepath.Join(t.TempDir(), "monitor.db") + db, err := NewDB(dbPath, zap.NewNop()) + if err != nil { + t.Fatalf("NewDB: %v", err) + } + defer db.Close() + + now := time.Now() + oldStart := now.Add(-5 * time.Minute) + if err := db.SaveToolExecution(&mcp.ToolExecution{ + ID: "stale", ToolName: "hydra", Status: "running", StartTime: oldStart, + }); err != nil { + t.Fatalf("SaveToolExecution stale: %v", err) + } + if err := db.SaveToolExecution(&mcp.ToolExecution{ + ID: "active", ToolName: "hydra", Status: "running", StartTime: oldStart, + }); err != nil { + t.Fatalf("SaveToolExecution active: %v", err) + } + + active := map[string]struct{}{"active": {}} + n, err := db.FinalizeStaleRunningToolExecutions(now, time.Minute, active, "执行已中断(会话已结束)") + if err != nil { + t.Fatalf("FinalizeStaleRunningToolExecutions: %v", err) + } + if n != 1 { + t.Fatalf("expected 1 stale row updated, got %d", n) + } + + stale, err := db.GetToolExecution("stale") + if err != nil { + t.Fatalf("GetToolExecution stale: %v", err) + } + if stale.Status != "cancelled" { + t.Fatalf("stale expected cancelled, got %s", stale.Status) + } + + activeExec, err := db.GetToolExecution("active") + if err != nil { + t.Fatalf("GetToolExecution active: %v", err) + } + if activeExec.Status != "running" { + t.Fatalf("active expected running, got %s", activeExec.Status) + } +} diff --git a/internal/mcp/external_manager.go b/internal/mcp/external_manager.go index 8e8182d8..cf2ff30b 100644 --- a/internal/mcp/external_manager.go +++ b/internal/mcp/external_manager.go @@ -814,6 +814,23 @@ func (m *ExternalMCPManager) CancelToolExecution(id string) bool { return m.CancelToolExecutionWithNote(id, "") } +// ActiveRunningExecutionIDs 返回当前进程内仍登记 cancel 的外部 MCP executionId 快照。 +func (m *ExternalMCPManager) ActiveRunningExecutionIDs() map[string]struct{} { + if m == nil { + return nil + } + m.mu.Lock() + defer m.mu.Unlock() + if len(m.runningCancels) == 0 { + return nil + } + out := make(map[string]struct{}, len(m.runningCancels)) + for id := range m.runningCancels { + out[id] = struct{}{} + } + return out +} + // updateStats 更新统计信息 func (m *ExternalMCPManager) updateStats(toolName string, failed bool) { now := time.Now() diff --git a/internal/mcp/server.go b/internal/mcp/server.go index 1daf368a..69d67bb8 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -1170,6 +1170,23 @@ func (s *Server) CancelToolExecution(id string) bool { return s.CancelToolExecutionWithNote(id, "") } +// ActiveRunningExecutionIDs 返回当前进程内仍登记 cancel 的 executionId 快照。 +func (s *Server) ActiveRunningExecutionIDs() map[string]struct{} { + if s == nil { + return nil + } + s.runningCancelsMu.Lock() + defer s.runningCancelsMu.Unlock() + if len(s.runningCancels) == 0 { + return nil + } + out := make(map[string]struct{}, len(s.runningCancels)) + for id := range s.runningCancels { + out[id] = struct{}{} + } + return out +} + // initDefaultPrompts 初始化默认提示词模板 func (s *Server) initDefaultPrompts() { s.mu.Lock()