diff --git a/internal/config/config.go b/internal/config/config.go index 1712a3fa..e700030e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -72,6 +72,8 @@ type MultiAgentEinoMiddlewareConfig struct { ToolSearchEnable bool `yaml:"tool_search_enable,omitempty" json:"tool_search_enable,omitempty"` ToolSearchMinTools int `yaml:"tool_search_min_tools,omitempty" json:"tool_search_min_tools,omitempty"` // default 20; applies when len(tools) >= this ToolSearchAlwaysVisible int `yaml:"tool_search_always_visible,omitempty" json:"tool_search_always_visible,omitempty"` // default 12; first N tools stay always visible + // ToolSearchAlwaysVisibleTools keeps specified tool names always visible (never hidden by tool_search). + ToolSearchAlwaysVisibleTools []string `yaml:"tool_search_always_visible_tools,omitempty" json:"tool_search_always_visible_tools,omitempty"` // Plantask adds TaskCreate/Get/Update/List (file-backed under skills dir); requires eino_skills + local backend. PlantaskEnable bool `yaml:"plantask_enable,omitempty" json:"plantask_enable,omitempty"` // PlantaskRelDir relative to skills_dir for per-conversation task boards (default .eino/plantask). @@ -79,8 +81,24 @@ type MultiAgentEinoMiddlewareConfig struct { // Reduction truncates/offloads large tool outputs (requires eino local backend for Write). ReductionEnable bool `yaml:"reduction_enable,omitempty" json:"reduction_enable,omitempty"` ReductionRootDir string `yaml:"reduction_root_dir,omitempty" json:"reduction_root_dir,omitempty"` // default: os temp + conversation id + ReductionMaxLengthForTrunc int `yaml:"reduction_max_length_for_trunc,omitempty" json:"reduction_max_length_for_trunc,omitempty"` // default 12000 + ReductionMaxTokensForClear int `yaml:"reduction_max_tokens_for_clear,omitempty" json:"reduction_max_tokens_for_clear,omitempty"` // default 50000 ReductionClearExclude []string `yaml:"reduction_clear_exclude,omitempty" json:"reduction_clear_exclude,omitempty"` ReductionSubAgents bool `yaml:"reduction_sub_agents,omitempty" json:"reduction_sub_agents,omitempty"` // also attach to sub-agents + // SummarizationTriggerRatio controls summarization trigger threshold as max_total_tokens * ratio (default 0.8). + SummarizationTriggerRatio float64 `yaml:"summarization_trigger_ratio,omitempty" json:"summarization_trigger_ratio,omitempty"` + // SummarizationEmitInternalEvents controls middleware internal event emission (default true). + SummarizationEmitInternalEvents *bool `yaml:"summarization_emit_internal_events,omitempty" json:"summarization_emit_internal_events,omitempty"` + // HistoryInputBudgetRatio caps pre-agent history tokens as max_total_tokens * ratio (default 0.35). + HistoryInputBudgetRatio float64 `yaml:"history_input_budget_ratio,omitempty" json:"history_input_budget_ratio,omitempty"` + // PlanExecuteUserInputBudgetRatio caps planner/replanner/executor userInput prompt budget ratio (default 0.35). + PlanExecuteUserInputBudgetRatio float64 `yaml:"plan_execute_user_input_budget_ratio,omitempty" json:"plan_execute_user_input_budget_ratio,omitempty"` + // PlanExecuteExecutedStepsBudgetRatio caps executed_steps prompt budget ratio (default 0.2). + PlanExecuteExecutedStepsBudgetRatio float64 `yaml:"plan_execute_executed_steps_budget_ratio,omitempty" json:"plan_execute_executed_steps_budget_ratio,omitempty"` + // PlanExecuteMaxStepResultRunes caps each executed step result length for prompt view (default 4000). + PlanExecuteMaxStepResultRunes int `yaml:"plan_execute_max_step_result_runes,omitempty" json:"plan_execute_max_step_result_runes,omitempty"` + // PlanExecuteKeepLastSteps keeps only the tail steps in prompt view (default 8). + PlanExecuteKeepLastSteps int `yaml:"plan_execute_keep_last_steps,omitempty" json:"plan_execute_keep_last_steps,omitempty"` // CheckpointDir when non-empty enables adk.Runner CheckPointStore (file-backed) for interrupt/resume persistence. CheckpointDir string `yaml:"checkpoint_dir,omitempty" json:"checkpoint_dir,omitempty"` // DeepOutputKey passed to deep.Config OutputKey (session final text); empty = off. @@ -91,6 +109,97 @@ type MultiAgentEinoMiddlewareConfig struct { TaskToolDescriptionPrefix string `yaml:"task_tool_description_prefix,omitempty" json:"task_tool_description_prefix,omitempty"` } +func (c MultiAgentEinoMiddlewareConfig) SummarizationTriggerRatioEffective() float64 { + v := c.SummarizationTriggerRatio + if v <= 0 { + return 0.8 + } + if v < 0.5 { + return 0.5 + } + if v > 0.95 { + return 0.95 + } + return v +} + +func (c MultiAgentEinoMiddlewareConfig) SummarizationEmitInternalEventsEffective() bool { + if c.SummarizationEmitInternalEvents != nil { + return *c.SummarizationEmitInternalEvents + } + return true +} + +func (c MultiAgentEinoMiddlewareConfig) HistoryInputBudgetRatioEffective() float64 { + v := c.HistoryInputBudgetRatio + if v <= 0 { + return 0.35 + } + if v < 0.15 { + return 0.15 + } + if v > 0.6 { + return 0.6 + } + return v +} + +func (c MultiAgentEinoMiddlewareConfig) PlanExecuteUserInputBudgetRatioEffective() float64 { + v := c.PlanExecuteUserInputBudgetRatio + if v <= 0 { + return 0.35 + } + if v < 0.1 { + return 0.1 + } + if v > 0.6 { + return 0.6 + } + return v +} + +func (c MultiAgentEinoMiddlewareConfig) PlanExecuteExecutedStepsBudgetRatioEffective() float64 { + v := c.PlanExecuteExecutedStepsBudgetRatio + if v <= 0 { + return 0.2 + } + if v < 0.08 { + return 0.08 + } + if v > 0.5 { + return 0.5 + } + return v +} + +func (c MultiAgentEinoMiddlewareConfig) PlanExecuteMaxStepResultRunesEffective() int { + if c.PlanExecuteMaxStepResultRunes > 0 { + return c.PlanExecuteMaxStepResultRunes + } + return 4000 +} + +func (c MultiAgentEinoMiddlewareConfig) PlanExecuteKeepLastStepsEffective() int { + if c.PlanExecuteKeepLastSteps > 0 { + return c.PlanExecuteKeepLastSteps + } + return 8 +} + +func (c MultiAgentEinoMiddlewareConfig) ReductionMaxLengthForTruncEffective() int { + if c.ReductionMaxLengthForTrunc > 0 { + return c.ReductionMaxLengthForTrunc + } + return 12000 +} + +func (c MultiAgentEinoMiddlewareConfig) ReductionMaxTokensForClearEffective() int { + if c.ReductionMaxTokensForClear > 0 { + return c.ReductionMaxTokensForClear + } + return 50000 +} + // MultiAgentEinoSkillsConfig toggles Eino official skill progressive disclosure and host filesystem tools. type MultiAgentEinoSkillsConfig struct { // Disable skips skill middleware (and does not attach local FS tools for Deep). @@ -137,6 +246,8 @@ type MultiAgentPublic struct { SubAgentCount int `json:"sub_agent_count"` Orchestration string `json:"orchestration,omitempty"` PlanExecuteLoopMaxIterations int `json:"plan_execute_loop_max_iterations"` + ToolSearchAlwaysVisibleTools []string `json:"tool_search_always_visible_tools,omitempty"` + ToolSearchAlwaysVisibleEffectiveTools []string `json:"tool_search_always_visible_effective_tools,omitempty"` } // NormalizeMultiAgentOrchestration 返回 deep、plan_execute 或 supervisor。 @@ -158,6 +269,7 @@ type MultiAgentAPIUpdate struct { RobotUseMultiAgent bool `json:"robot_use_multi_agent"` BatchUseMultiAgent bool `json:"batch_use_multi_agent"` PlanExecuteLoopMaxIterations *int `json:"plan_execute_loop_max_iterations,omitempty"` + ToolSearchAlwaysVisibleTools []string `json:"tool_search_always_visible_tools,omitempty"` } // RobotsConfig 机器人配置(企业微信、钉钉、飞书等) diff --git a/internal/database/conversation.go b/internal/database/conversation.go index 8145f91a..35d36499 100644 --- a/internal/database/conversation.go +++ b/internal/database/conversation.go @@ -4,6 +4,8 @@ import ( "database/sql" "encoding/json" "fmt" + "os" + "path/filepath" "strings" "time" @@ -416,6 +418,14 @@ func (db *DB) DeleteConversation(id string) error { if err != nil { return fmt.Errorf("删除对话失败: %w", err) } + // Best-effort cleanup for conversation-scoped filesystem artifacts + // (e.g., summarization transcript, reduction/checkpoint files under conversation_artifacts/). + if base := strings.TrimSpace(db.conversationArtifactsDir); base != "" { + artDir := filepath.Join(base, id) + if rmErr := os.RemoveAll(artDir); rmErr != nil { + db.logger.Warn("删除会话 artifacts 目录失败", zap.String("conversationId", id), zap.String("dir", artDir), zap.Error(rmErr)) + } + } db.logger.Info("对话及其所有相关数据已删除", zap.String("conversationId", id)) return nil diff --git a/internal/database/database.go b/internal/database/database.go index a6fee794..30cba35b 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -3,6 +3,8 @@ package database import ( "database/sql" "fmt" + "os" + "path/filepath" "strings" "time" @@ -21,7 +23,8 @@ func configureDBPool(db *sql.DB) { // DB 数据库连接 type DB struct { *sql.DB - logger *zap.Logger + logger *zap.Logger + conversationArtifactsDir string } // NewDB 创建数据库连接 @@ -41,6 +44,13 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) { DB: db, logger: logger, } + // Keep conversation-scoped artifacts near database files, so cleanup can follow conversation lifecycle. + baseDir := filepath.Join(filepath.Dir(dbPath), "conversation_artifacts") + if mkErr := os.MkdirAll(baseDir, 0o755); mkErr == nil { + database.conversationArtifactsDir = baseDir + } else if logger != nil { + logger.Warn("创建 conversation artifacts 目录失败", zap.String("dir", baseDir), zap.Error(mkErr)) + } // 初始化表 if err := database.initTables(); err != nil { diff --git a/internal/multiagent/eino_input_telemetry.go b/internal/multiagent/eino_input_telemetry.go new file mode 100644 index 00000000..dbf3c576 --- /dev/null +++ b/internal/multiagent/eino_input_telemetry.go @@ -0,0 +1,133 @@ +package multiagent + +import ( + "context" + "strings" + + "cyberstrike-ai/internal/agent" + + "github.com/bytedance/sonic" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" + "go.uber.org/zap" +) + +type einoModelInputTelemetryMiddleware struct { + adk.BaseChatModelAgentMiddleware + logger *zap.Logger + modelName string + conversationID string + phase string +} + +func newEinoModelInputTelemetryMiddleware( + logger *zap.Logger, + modelName string, + conversationID string, + phase string, +) adk.ChatModelAgentMiddleware { + if logger == nil { + return nil + } + return &einoModelInputTelemetryMiddleware{ + logger: logger, + modelName: strings.TrimSpace(modelName), + conversationID: strings.TrimSpace(conversationID), + phase: strings.TrimSpace(phase), + } +} + +func (m *einoModelInputTelemetryMiddleware) BeforeModelRewriteState( + ctx context.Context, + state *adk.ChatModelAgentState, + mc *adk.ModelContext, +) (context.Context, *adk.ChatModelAgentState, error) { + if m == nil || m.logger == nil || state == nil { + return ctx, state, nil + } + tokens := estimateTokensForMessagesAndTools(ctx, m.modelName, state.Messages, mcTools(mc)) + m.logger.Info("eino model input estimated", + zap.String("phase", m.phase), + zap.String("conversation_id", m.conversationID), + zap.Int("messages", len(state.Messages)), + zap.Int("tools", len(mcTools(mc))), + zap.Int("input_tokens_estimated", tokens), + ) + return ctx, state, nil +} + +func mcTools(mc *adk.ModelContext) []*schema.ToolInfo { + if mc == nil || len(mc.Tools) == 0 { + return nil + } + return mc.Tools +} + +func estimateTokensForMessagesAndTools( + _ context.Context, + modelName string, + messages []adk.Message, + tools []*schema.ToolInfo, +) int { + var sb strings.Builder + for _, msg := range messages { + if msg == nil { + continue + } + sb.WriteString(string(msg.Role)) + sb.WriteByte('\n') + sb.WriteString(msg.Content) + sb.WriteByte('\n') + if msg.ReasoningContent != "" { + sb.WriteString(msg.ReasoningContent) + sb.WriteByte('\n') + } + if len(msg.ToolCalls) > 0 { + if b, err := sonic.Marshal(msg.ToolCalls); err == nil { + sb.Write(b) + sb.WriteByte('\n') + } + } + } + for _, tl := range tools { + if tl == nil { + continue + } + cp := *tl + cp.Extra = nil + if text, err := sonic.MarshalString(cp); err == nil { + sb.WriteString(text) + sb.WriteByte('\n') + } + } + text := sb.String() + if text == "" { + return 0 + } + tc := agent.NewTikTokenCounter() + if n, err := tc.Count(modelName, text); err == nil { + return n + } + return (len(text) + 3) / 4 +} + +func logPlanExecuteModelInputEstimate( + logger *zap.Logger, + modelName string, + conversationID string, + phase string, + msgs []adk.Message, +) { + if logger == nil { + return + } + tokens := estimateTokensForMessagesAndTools(context.Background(), modelName, msgs, nil) + logger.Info("eino model input estimated", + zap.String("phase", phase), + zap.String("conversation_id", strings.TrimSpace(conversationID)), + zap.Int("messages", len(msgs)), + zap.Int("tools", 0), + zap.Int("input_tokens_estimated", tokens), + ) +} + diff --git a/internal/multiagent/eino_middleware.go b/internal/multiagent/eino_middleware.go index f874da4d..53591fae 100644 --- a/internal/multiagent/eino_middleware.go +++ b/internal/multiagent/eino_middleware.go @@ -8,6 +8,7 @@ import ( "strings" "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/mcp/builtin" localbk "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" @@ -65,6 +66,66 @@ func splitToolsForToolSearch(all []tool.BaseTool, alwaysVisible int) (static []t return append([]tool.BaseTool(nil), all[:alwaysVisible]...), append([]tool.BaseTool(nil), all[alwaysVisible:]...), true } +func splitToolsForToolSearchByNames(all []tool.BaseTool, names []string, fallbackAlwaysVisible int) (static []tool.BaseTool, dynamic []tool.BaseTool, ok bool) { + nameSet := make(map[string]struct{}, len(names)) + for _, n := range names { + n = strings.TrimSpace(strings.ToLower(n)) + if n == "" { + continue + } + nameSet[n] = struct{}{} + } + if len(nameSet) == 0 { + return splitToolsForToolSearch(all, fallbackAlwaysVisible) + } + static = make([]tool.BaseTool, 0, len(all)) + dynamic = make([]tool.BaseTool, 0, len(all)) + for _, t := range all { + if t == nil { + continue + } + info, err := t.Info(context.Background()) + name := "" + if err == nil && info != nil { + name = strings.TrimSpace(strings.ToLower(info.Name)) + } + if _, keep := nameSet[name]; keep { + static = append(static, t) + continue + } + dynamic = append(dynamic, t) + } + if len(static) == 0 || len(dynamic) == 0 { + // fallback: preserve previous behavior when whitelist misses all or includes all. + return splitToolsForToolSearch(all, fallbackAlwaysVisible) + } + return static, dynamic, true +} + +func mergeAlwaysVisibleToolNames(configured []string) []string { + merged := make([]string, 0, len(configured)+32) + seen := make(map[string]struct{}, len(configured)+32) + add := func(name string) { + n := strings.TrimSpace(strings.ToLower(name)) + if n == "" { + return + } + if _, ok := seen[n]; ok { + return + } + seen[n] = struct{}{} + merged = append(merged, n) + } + for _, n := range configured { + add(n) + } + // Always include hardcoded backend builtin MCP tools from constants. + for _, n := range builtin.GetAllBuiltinTools() { + add(n) + } + return merged +} + func buildReductionMiddleware(ctx context.Context, mw config.MultiAgentEinoMiddlewareConfig, convID string, loc *localbk.Local, logger *zap.Logger) (adk.ChatModelAgentMiddleware, error) { if loc == nil { return nil, fmt.Errorf("reduction: local backend nil") @@ -87,6 +148,8 @@ func buildReductionMiddleware(ctx context.Context, mw config.MultiAgentEinoMiddl RootDir: root, ReadFileToolName: "read_file", ClearExcludeTools: excl, + MaxLengthForTrunc: mw.ReductionMaxLengthForTruncEffective(), + MaxTokensForClear: int64(mw.ReductionMaxTokensForClearEffective()), }) if err != nil { return nil, err @@ -142,7 +205,7 @@ func prependEinoMiddlewares( alwaysVis = 12 } if mw.ToolSearchEnable && len(tools) >= minTools { - static, dynamic, split := splitToolsForToolSearch(tools, alwaysVis) + static, dynamic, split := splitToolsForToolSearchByNames(tools, mergeAlwaysVisibleToolNames(mw.ToolSearchAlwaysVisibleTools), alwaysVis) if split && len(dynamic) > 0 { ts, terr := toolsearch.New(ctx, &toolsearch.Config{DynamicTools: dynamic}) if terr != nil { diff --git a/internal/multiagent/eino_model_rewrite_pipeline.go b/internal/multiagent/eino_model_rewrite_pipeline.go new file mode 100644 index 00000000..aabd3c1d --- /dev/null +++ b/internal/multiagent/eino_model_rewrite_pipeline.go @@ -0,0 +1,38 @@ +package multiagent + +import ( + "context" + "fmt" + + "github.com/cloudwego/eino/adk" +) + +func applyBeforeModelRewriteHandlers( + ctx context.Context, + msgs []adk.Message, + handlers []adk.ChatModelAgentMiddleware, +) ([]adk.Message, error) { + if len(msgs) == 0 || len(handlers) == 0 { + return msgs, nil + } + state := &adk.ChatModelAgentState{Messages: msgs} + modelCtx := &adk.ModelContext{} + curCtx := ctx + for _, h := range handlers { + if h == nil { + continue + } + nextCtx, nextState, err := h.BeforeModelRewriteState(curCtx, state, modelCtx) + if err != nil { + return nil, fmt.Errorf("before model rewrite: %w", err) + } + if nextCtx != nil { + curCtx = nextCtx + } + if nextState != nil { + state = nextState + } + } + return state.Messages, nil +} + diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index 96d1ab2b..7dc7a968 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -5,6 +5,7 @@ import ( "fmt" "strings" + "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/config" "github.com/cloudwego/eino-ext/components/model/openai" @@ -25,7 +26,12 @@ type PlanExecuteRootArgs struct { LoopMaxIter int // AppCfg / Logger 非空时为 Executor 挂载与 Deep/Supervisor 一致的 Eino summarization 中间件。 AppCfg *config.Config + MwCfg *config.MultiAgentEinoMiddlewareConfig + // ConversationID is used for transcript/isolation paths in middleware. + ConversationID string Logger *zap.Logger + // ModelName is used for model input token estimation logs. + ModelName string // ExecPreMiddlewares 是由 prependEinoMiddlewares 构建的前置中间件(patchtoolcalls, reduction, toolsearch, plantask), // 与 Deep/Supervisor 主代理的 mainOrchestratorPre 一致。 ExecPreMiddlewares []adk.ChatModelAgentMiddleware @@ -33,6 +39,8 @@ type PlanExecuteRootArgs struct { SkillMiddleware adk.ChatModelAgentMiddleware // FilesystemMiddleware 是 Eino filesystem 中间件,当 eino_skills.filesystem_tools 启用时提供本机文件读写与 Shell 能力(可选)。 FilesystemMiddleware adk.ChatModelAgentMiddleware + // PlannerReplannerRewriteHandlers applies BeforeModelRewriteState pipeline for planner/replanner input. + PlannerReplannerRewriteHandlers []adk.ChatModelAgentMiddleware } // NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。 @@ -50,7 +58,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma plannerCfg := &planexecute.PlannerConfig{ ToolCallingChatModel: tcm, } - if fn := planExecutePlannerGenInput(a.OrchInstruction); fn != nil { + if fn := planExecutePlannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers); fn != nil { plannerCfg.GenInputFn = fn } planner, err := planexecute.NewPlanner(ctx, plannerCfg) @@ -59,7 +67,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma } replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{ ChatModel: tcm, - GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction), + GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers), }) if err != nil { return nil, fmt.Errorf("plan_execute replanner: %w", err) @@ -81,17 +89,20 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma } // 4. summarization(最后,与 Deep/Supervisor 一致) if a.AppCfg != nil { - sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.Logger) + sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.MwCfg, a.ConversationID, a.Logger) if sumErr != nil { return nil, fmt.Errorf("plan_execute executor summarization: %w", sumErr) } execHandlers = append(execHandlers, sumMw) } + if teleMw := newEinoModelInputTelemetryMiddleware(a.Logger, a.ModelName, a.ConversationID, "plan_execute_executor"); teleMw != nil { + execHandlers = append(execHandlers, teleMw) + } executor, err := newPlanExecuteExecutor(ctx, &planexecute.ExecutorConfig{ Model: a.ExecModel, ToolsConfig: a.ToolsCfg, MaxIterations: a.ExecMaxIter, - GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction), + GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID), }, execHandlers) if err != nil { return nil, fmt.Errorf("plan_execute executor: %w", err) @@ -110,20 +121,42 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma // planExecutePlannerGenInput 将 orchestrator instruction 作为 SystemMessage 注入 planner 输入。 // 返回 nil 时 Eino 使用内置默认 planner prompt。 -func planExecutePlannerGenInput(orchInstruction string) planexecute.GenPlannerModelInputFn { +func planExecutePlannerGenInput( + orchInstruction string, + appCfg *config.Config, + mwCfg *config.MultiAgentEinoMiddlewareConfig, + logger *zap.Logger, + modelName string, + conversationID string, + rewriteHandlers []adk.ChatModelAgentMiddleware, +) planexecute.GenPlannerModelInputFn { oi := strings.TrimSpace(orchInstruction) - if oi == "" { + if oi == "" && appCfg == nil { return nil } return func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) { + userInput = capPlanExecuteUserInputMessages(userInput, appCfg, mwCfg) msgs := make([]adk.Message, 0, 1+len(userInput)) - msgs = append(msgs, schema.SystemMessage(oi)) + if oi != "" { + msgs = append(msgs, schema.SystemMessage(oi)) + } msgs = append(msgs, userInput...) + if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 { + msgs = rewritten + } + logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_planner", msgs) return msgs, nil } } -func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInputFn { +func planExecuteExecutorGenInput( + orchInstruction string, + appCfg *config.Config, + mwCfg *config.MultiAgentEinoMiddlewareConfig, + logger *zap.Logger, + modelName string, + conversationID string, +) planexecute.GenModelInputFn { oi := strings.TrimSpace(orchInstruction) return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { planContent, err := in.Plan.MarshalJSON() @@ -131,9 +164,9 @@ func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInp return nil, err } userMsgs, err := planexecute.ExecutorPrompt.Format(ctx, map[string]any{ - "input": planExecuteFormatInput(in.UserInput), + "input": planExecuteFormatInput(capPlanExecuteUserInputMessages(in.UserInput, appCfg, mwCfg)), "plan": string(planContent), - "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, appCfg, mwCfg), "step": in.Plan.FirstStep(), }) if err != nil { @@ -142,6 +175,7 @@ func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInp if oi != "" { userMsgs = append([]adk.Message{schema.SystemMessage(oi)}, userMsgs...) } + logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_executor_gen_input", userMsgs) return userMsgs, nil } } @@ -155,18 +189,22 @@ func planExecuteFormatInput(input []adk.Message) string { return sb.String() } -func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep) string { - capped := capPlanExecuteExecutedSteps(results) - var sb strings.Builder - for _, result := range capped { - sb.WriteString(fmt.Sprintf("Step: %s\nResult: %s\n\n", result.Step, result.Result)) - } - return sb.String() +func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) string { + capped := capPlanExecuteExecutedStepsWithConfig(results, mwCfg) + return renderPlanExecuteStepsByBudget(capped, appCfg, mwCfg) } // planExecuteReplannerGenInput 与 Eino 默认 Replanner 输入一致,但 executed_steps 经 cap 后再写入 prompt, // 且在 orchInstruction 非空时 prepend SystemMessage 使 replanner 也能接收全局指令。 -func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelInputFn { +func planExecuteReplannerGenInput( + orchInstruction string, + appCfg *config.Config, + mwCfg *config.MultiAgentEinoMiddlewareConfig, + logger *zap.Logger, + modelName string, + conversationID string, + rewriteHandlers []adk.ChatModelAgentMiddleware, +) planexecute.GenModelInputFn { oi := strings.TrimSpace(orchInstruction) return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { planContent, err := in.Plan.MarshalJSON() @@ -175,8 +213,8 @@ func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelIn } msgs, err := planexecute.ReplannerPrompt.Format(ctx, map[string]any{ "plan": string(planContent), - "input": planExecuteFormatInput(in.UserInput), - "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "input": planExecuteFormatInput(capPlanExecuteUserInputMessages(in.UserInput, appCfg, mwCfg)), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, appCfg, mwCfg), "plan_tool": planexecute.PlanToolInfo.Name, "respond_tool": planexecute.RespondToolInfo.Name, }) @@ -186,10 +224,120 @@ func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelIn if oi != "" { msgs = append([]adk.Message{schema.SystemMessage(oi)}, msgs...) } + if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 { + msgs = rewritten + } + logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_replanner", msgs) return msgs, nil } } +func capPlanExecuteUserInputMessages(input []adk.Message, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message { + if len(input) == 0 { + return input + } + maxTotal := 120000 + modelName := "gpt-4o" + if appCfg != nil { + if appCfg.OpenAI.MaxTotalTokens > 0 { + maxTotal = appCfg.OpenAI.MaxTotalTokens + } + if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" { + modelName = m + } + } + // Reserve most tokens for planner/replanner prompt and tool schema. + ratio := 0.35 + if mwCfg != nil { + ratio = mwCfg.PlanExecuteUserInputBudgetRatioEffective() + } + budget := int(float64(maxTotal) * ratio) + if budget < 4096 { + budget = 4096 + } + tc := agent.NewTikTokenCounter() + out := make([]adk.Message, 0, len(input)) + used := 0 + for i := len(input) - 1; i >= 0; i-- { + msg := input[i] + if msg == nil { + continue + } + n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content) + if err != nil { + n = (len(msg.Content) + 3) / 4 + } + if n <= 0 { + n = 1 + } + if used+n > budget { + break + } + used += n + out = append(out, msg) + } + for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { + out[i], out[j] = out[j], out[i] + } + if len(out) == 0 { + // Keep the latest user message at least. + return []adk.Message{input[len(input)-1]} + } + return out +} + +func renderPlanExecuteStepsByBudget(steps []planexecute.ExecutedStep, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) string { + if len(steps) == 0 { + return "" + } + maxTotal := 120000 + modelName := "gpt-4o" + if appCfg != nil { + if appCfg.OpenAI.MaxTotalTokens > 0 { + maxTotal = appCfg.OpenAI.MaxTotalTokens + } + if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" { + modelName = m + } + } + ratio := 0.2 + if mwCfg != nil { + ratio = mwCfg.PlanExecuteExecutedStepsBudgetRatioEffective() + } + budget := int(float64(maxTotal) * ratio) + if budget < 3072 { + budget = 3072 + } + tc := agent.NewTikTokenCounter() + var kept []string + used := 0 + skipped := 0 + for i := len(steps) - 1; i >= 0; i-- { + block := fmt.Sprintf("Step: %s\nResult: %s\n\n", steps[i].Step, steps[i].Result) + n, err := tc.Count(modelName, block) + if err != nil { + n = (len(block) + 3) / 4 + } + if n <= 0 { + n = 1 + } + if used+n > budget { + skipped = i + 1 + break + } + used += n + kept = append(kept, block) + } + var sb strings.Builder + if skipped > 0 { + sb.WriteString(fmt.Sprintf("Earlier executed steps omitted due to context budget: %d steps.\n\n", skipped)) + } + for i := len(kept) - 1; i >= 0; i-- { + sb.WriteString(kept[i]) + } + return sb.String() +} + // planExecuteStreamsMainAssistant 将规划/执行/重规划各阶段助手流式输出映射到主对话区。 func planExecuteStreamsMainAssistant(agent string) bool { if agent == "" { diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 2f67ab58..cb8a352a 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -125,7 +125,7 @@ func RunEinoSingleChatModelAgent( return nil, fmt.Errorf("eino single 模型: %w", err) } - mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger) + mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, &ma.EinoMiddleware, conversationID, logger) if err != nil { return nil, fmt.Errorf("eino single summarization: %w", err) } @@ -145,6 +145,9 @@ func RunEinoSingleChatModelAgent( handlers = append(handlers, einoSkillMW) } handlers = append(handlers, mainSumMw) + if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "eino_single"); teleMw != nil { + handlers = append(handlers, teleMw) + } maxIter := ma.MaxIteration if maxIter <= 0 { @@ -188,7 +191,7 @@ func RunEinoSingleChatModelAgent( return nil, fmt.Errorf("eino single NewChatModelAgent: %w", err) } - baseMsgs := historyToMessages(history) + baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware) baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) streamsMainAssistant := func(agent string) bool { diff --git a/internal/multiagent/eino_summarize.go b/internal/multiagent/eino_summarize.go index 4c40e906..3f8defcd 100644 --- a/internal/multiagent/eino_summarize.go +++ b/internal/multiagent/eino_summarize.go @@ -3,6 +3,8 @@ package multiagent import ( "context" "fmt" + "os" + "path/filepath" "strings" "cyberstrike-ai/internal/agent" @@ -32,6 +34,8 @@ func newEinoSummarizationMiddleware( ctx context.Context, summaryModel model.BaseChatModel, appCfg *config.Config, + mwCfg *config.MultiAgentEinoMiddlewareConfig, + conversationID string, logger *zap.Logger, ) (adk.ChatModelAgentMiddleware, error) { if summaryModel == nil || appCfg == nil { @@ -41,7 +45,14 @@ func newEinoSummarizationMiddleware( if maxTotal <= 0 { maxTotal = 120000 } - trigger := int(float64(maxTotal) * 0.9) + triggerRatio := 0.8 + emitInternalEvents := true + if mwCfg != nil { + triggerRatio = mwCfg.SummarizationTriggerRatioEffective() + emitInternalEvents = mwCfg.SummarizationEmitInternalEventsEffective() + } + // Keep enough safety margin for tokenizer/model-side accounting mismatch. + trigger := int(float64(maxTotal) * triggerRatio) if trigger < 4096 { trigger = maxTotal if trigger < 4096 { @@ -65,6 +76,18 @@ func newEinoSummarizationMiddleware( if recentTrailMax > trigger/2 { recentTrailMax = trigger / 2 } + transcriptPath := "" + if conv := strings.TrimSpace(conversationID); conv != "" { + baseRoot := filepath.Join(os.TempDir(), "cyberstrike-summarization") + if dbPath := strings.TrimSpace(appCfg.Database.Path); dbPath != "" { + // Persist with the same lifecycle as local conversation storage. + baseRoot = filepath.Join(filepath.Dir(dbPath), "conversation_artifacts", sanitizeEinoPathSegment(conv), "summarization") + } + base := baseRoot + if mkErr := os.MkdirAll(base, 0o755); mkErr == nil { + transcriptPath = filepath.Join(base, "transcript.txt") + } + } mw, err := summarization.New(ctx, &summarization.Config{ Model: summaryModel, @@ -73,7 +96,8 @@ func newEinoSummarizationMiddleware( }, TokenCounter: tokenCounter, UserInstruction: einoSummarizeUserInstruction, - EmitInternalEvents: false, + EmitInternalEvents: emitInternalEvents, + TranscriptFilePath: transcriptPath, PreserveUserMessages: &summarization.PreserveUserMessages{ Enabled: true, MaxTokens: preserveMax, @@ -85,11 +109,16 @@ func newEinoSummarizationMiddleware( if logger == nil { return nil } + beforeTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: before.Messages}) + afterTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: after.Messages}) logger.Info("eino summarization 已压缩上下文", zap.Int("messages_before", len(before.Messages)), zap.Int("messages_after", len(after.Messages)), + zap.Int("tokens_before_estimated", beforeTokens), + zap.Int("tokens_after_estimated", afterTokens), zap.Int("max_total_tokens", maxTotal), zap.Int("trigger_context_tokens", trigger), + zap.String("transcript_file", transcriptPath), ) return nil }, diff --git a/internal/multiagent/plan_execute_executor.go b/internal/multiagent/plan_execute_executor.go index fe138803..170a99b5 100644 --- a/internal/multiagent/plan_execute_executor.go +++ b/internal/multiagent/plan_execute_executor.go @@ -71,7 +71,7 @@ func planExecuteDefaultGenExecutorInput(ctx context.Context, in *planexecute.Exe return planexecute.ExecutorPrompt.Format(ctx, map[string]any{ "input": planExecuteFormatInput(in.UserInput), "plan": string(planContent), - "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, nil, nil), "step": in.Plan.FirstStep(), }) } diff --git a/internal/multiagent/plan_execute_steps_cap.go b/internal/multiagent/plan_execute_steps_cap.go index a5af85d8..c6ddf723 100644 --- a/internal/multiagent/plan_execute_steps_cap.go +++ b/internal/multiagent/plan_execute_steps_cap.go @@ -5,6 +5,8 @@ import ( "strings" "unicode/utf8" + "cyberstrike-ai/internal/config" + "github.com/cloudwego/eino/adk/prebuilt/planexecute" ) @@ -12,8 +14,11 @@ import ( // 此处仅约束「写入模型 prompt 的视图」,不修改 Eino session 中的原始 ExecutedSteps。 const ( - planExecuteMaxStepResultRunes = 12000 - planExecuteKeepLastSteps = 16 + defaultPlanExecuteMaxStepResultRunes = 4000 + defaultPlanExecuteKeepLastSteps = 8 + // Backward-compatible aliases for tests and existing references. + planExecuteMaxStepResultRunes = defaultPlanExecuteMaxStepResultRunes + planExecuteKeepLastSteps = defaultPlanExecuteKeepLastSteps ) func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string { @@ -29,16 +34,26 @@ func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string { // capPlanExecuteExecutedSteps 折叠较早步骤、截断单步过长结果,供 prompt 使用。 func capPlanExecuteExecutedSteps(steps []planexecute.ExecutedStep) []planexecute.ExecutedStep { + return capPlanExecuteExecutedStepsWithConfig(steps, nil) +} + +func capPlanExecuteExecutedStepsWithConfig(steps []planexecute.ExecutedStep, mwCfg *config.MultiAgentEinoMiddlewareConfig) []planexecute.ExecutedStep { if len(steps) == 0 { return steps } + maxStepResultRunes := defaultPlanExecuteMaxStepResultRunes + keepLastSteps := defaultPlanExecuteKeepLastSteps + if mwCfg != nil { + maxStepResultRunes = mwCfg.PlanExecuteMaxStepResultRunesEffective() + keepLastSteps = mwCfg.PlanExecuteKeepLastStepsEffective() + } out := make([]planexecute.ExecutedStep, 0, len(steps)+1) start := 0 - if len(steps) > planExecuteKeepLastSteps { - start = len(steps) - planExecuteKeepLastSteps + if len(steps) > keepLastSteps { + start = len(steps) - keepLastSteps var b strings.Builder b.WriteString(fmt.Sprintf("(上文已完成 %d 步;此处仅保留步骤标题以节省上下文,完整输出已省略。后续 %d 步仍保留正文。)\n", - start, planExecuteKeepLastSteps)) + start, keepLastSteps)) for i := 0; i < start; i++ { b.WriteString(fmt.Sprintf("- %s\n", steps[i].Step)) } @@ -50,8 +65,8 @@ func capPlanExecuteExecutedSteps(steps []planexecute.ExecutedStep) []planexecute suffix := "\n…[step result truncated]" for i := start; i < len(steps); i++ { e := steps[i] - if utf8.RuneCountInString(e.Result) > planExecuteMaxStepResultRunes { - e.Result = truncateRunesWithSuffix(e.Result, planExecuteMaxStepResultRunes, suffix) + if utf8.RuneCountInString(e.Result) > maxStepResultRunes { + e.Result = truncateRunesWithSuffix(e.Result, maxStepResultRunes, suffix) } out = append(out, e) } diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index dfb24b4f..9878f351 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -237,7 +237,7 @@ func RunDeepAgent( subMax = subDefaultIter } - subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, logger) + subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, &ma.EinoMiddleware, conversationID, logger) if err != nil { return nil, fmt.Errorf("子代理 %q summarization 中间件: %w", id, err) } @@ -257,6 +257,9 @@ func RunDeepAgent( subHandlers = append(subHandlers, einoSkillMW) } subHandlers = append(subHandlers, subSumMw) + if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "sub_agent"); teleMw != nil { + subHandlers = append(subHandlers, teleMw) + } sa, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ Name: id, @@ -289,7 +292,7 @@ func RunDeepAgent( return nil, fmt.Errorf("多代理主模型: %w", err) } - mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger) + mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, &ma.EinoMiddleware, conversationID, logger) if err != nil { return nil, fmt.Errorf("多代理主 summarization 中间件: %w", err) } @@ -352,6 +355,9 @@ func RunDeepAgent( deepHandlers = append(deepHandlers, einoSkillMW) } deepHandlers = append(deepHandlers, mainSumMw) + if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "deep_orchestrator"); teleMw != nil { + deepHandlers = append(deepHandlers, teleMw) + } supHandlers := []adk.ChatModelAgentMiddleware{} if len(mainOrchestratorPre) > 0 { @@ -361,6 +367,9 @@ func RunDeepAgent( supHandlers = append(supHandlers, einoSkillMW) } supHandlers = append(supHandlers, mainSumMw) + if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "supervisor_orchestrator"); teleMw != nil { + supHandlers = append(supHandlers, teleMw) + } mainToolsCfg := adk.ToolsConfig{ ToolsNodeConfig: compose.ToolsNodeConfig{ @@ -399,10 +408,17 @@ func RunDeepAgent( ExecMaxIter: deepMaxIter, LoopMaxIter: ma.PlanExecuteLoopMaxIterations, AppCfg: appCfg, + MwCfg: &ma.EinoMiddleware, + ConversationID: conversationID, Logger: logger, + ModelName: appCfg.OpenAI.Model, ExecPreMiddlewares: mainOrchestratorPre, SkillMiddleware: einoSkillMW, FilesystemMiddleware: peFsMw, + PlannerReplannerRewriteHandlers: []adk.ChatModelAgentMiddleware{ + mainSumMw, + newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "plan_execute_planner_replanner_rewrite"), + }, }) if perr != nil { return nil, perr @@ -468,7 +484,7 @@ func RunDeepAgent( da = dDeep } - baseMsgs := historyToMessages(history) + baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware) baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) streamsMainAssistant := func(agent string) bool { @@ -505,34 +521,77 @@ func RunDeepAgent( }, baseMsgs) } -func historyToMessages(history []agent.ChatMessage) []adk.Message { +func historyToMessages(history []agent.ChatMessage, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message { if len(history) == 0 { return nil } - // 放宽条数上限:跨轮历史交给 Eino Summarization(阈值对齐 openai.max_total_tokens)在调用模型前压缩,避免在入队前硬截断为 40 条。 - const maxHistoryMessages = 300 + // Keep a bounded tail first; then enforce a token budget. + const maxHistoryMessages = 200 start := 0 if len(history) > maxHistoryMessages { start = len(history) - maxHistoryMessages } - out := make([]adk.Message, 0, len(history[start:])) + raw := make([]adk.Message, 0, len(history[start:])) for _, h := range history[start:] { switch h.Role { case "user": if strings.TrimSpace(h.Content) != "" { - out = append(out, schema.UserMessage(h.Content)) + raw = append(raw, schema.UserMessage(h.Content)) } case "assistant": if strings.TrimSpace(h.Content) == "" && len(h.ToolCalls) > 0 { continue } if strings.TrimSpace(h.Content) != "" { - out = append(out, schema.AssistantMessage(h.Content, nil)) + raw = append(raw, schema.AssistantMessage(h.Content, nil)) } default: continue } } + if len(raw) == 0 { + return raw + } + maxTotal := 120000 + modelName := "gpt-4o" + if appCfg != nil { + if appCfg.OpenAI.MaxTotalTokens > 0 { + maxTotal = appCfg.OpenAI.MaxTotalTokens + } + if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" { + modelName = m + } + } + ratio := 0.35 + if mwCfg != nil { + ratio = mwCfg.HistoryInputBudgetRatioEffective() + } + budget := int(float64(maxTotal) * ratio) + if budget < 4096 { + budget = 4096 + } + tc := agent.NewTikTokenCounter() + outRev := make([]adk.Message, 0, len(raw)) + used := 0 + for i := len(raw) - 1; i >= 0; i-- { + msg := raw[i] + n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content) + if err != nil { + n = (len(msg.Content) + 3) / 4 + } + if n <= 0 { + n = 1 + } + if used+n > budget { + break + } + used += n + outRev = append(outRev, msg) + } + out := make([]adk.Message, 0, len(outRev)) + for i := len(outRev) - 1; i >= 0; i-- { + out = append(out, outRev[i]) + } return out }