diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index 5240ba06..5e7c1a00 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -5,11 +5,14 @@ import ( "fmt" "strings" + "cyberstrike-ai/internal/config" + "github.com/cloudwego/eino-ext/components/model/openai" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/adk/prebuilt/planexecute" "github.com/cloudwego/eino/components/model" "github.com/cloudwego/eino/schema" + "go.uber.org/zap" ) // PlanExecuteRootArgs 构建 Eino adk/prebuilt/planexecute 根 Agent 所需参数。 @@ -20,6 +23,9 @@ type PlanExecuteRootArgs struct { ToolsCfg adk.ToolsConfig ExecMaxIter int LoopMaxIter int + // AppCfg / Logger 非空时为 Executor 挂载与 Deep/Supervisor 一致的 Eino summarization 中间件。 + AppCfg *config.Config + Logger *zap.Logger } // NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。 @@ -41,17 +47,26 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma return nil, fmt.Errorf("plan_execute planner: %w", err) } replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{ - ChatModel: tcm, + ChatModel: tcm, + GenInputFn: planExecuteReplannerGenInput, }) if err != nil { return nil, fmt.Errorf("plan_execute replanner: %w", err) } - executor, err := planexecute.NewExecutor(ctx, &planexecute.ExecutorConfig{ + var execHandlers []adk.ChatModelAgentMiddleware + if a.AppCfg != nil { + sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.Logger) + if sumErr != nil { + return nil, fmt.Errorf("plan_execute executor summarization: %w", sumErr) + } + execHandlers = append(execHandlers, sumMw) + } + executor, err := newPlanExecuteExecutor(ctx, &planexecute.ExecutorConfig{ Model: a.ExecModel, ToolsConfig: a.ToolsCfg, MaxIterations: a.ExecMaxIter, GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction), - }) + }, execHandlers) if err != nil { return nil, fmt.Errorf("plan_execute executor: %w", err) } @@ -100,13 +115,29 @@ func planExecuteFormatInput(input []adk.Message) string { } func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep) string { + capped := capPlanExecuteExecutedSteps(results) var sb strings.Builder - for _, result := range results { + for _, result := range capped { sb.WriteString(fmt.Sprintf("Step: %s\nResult: %s\n\n", result.Step, result.Result)) } return sb.String() } +// planExecuteReplannerGenInput 与 Eino 默认 Replanner 输入一致,但 executed_steps 经 cap 后再写入 prompt。 +func planExecuteReplannerGenInput(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } + return planexecute.ReplannerPrompt.Format(ctx, map[string]any{ + "plan": string(planContent), + "input": planExecuteFormatInput(in.UserInput), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "plan_tool": planexecute.PlanToolInfo.Name, + "respond_tool": planexecute.RespondToolInfo.Name, + }) +} + // planExecuteStreamsMainAssistant 将规划/执行/重规划各阶段助手流式输出映射到主对话区。 func planExecuteStreamsMainAssistant(agent string) bool { if agent == "" { diff --git a/internal/multiagent/plan_execute_executor.go b/internal/multiagent/plan_execute_executor.go new file mode 100644 index 00000000..d1af3a72 --- /dev/null +++ b/internal/multiagent/plan_execute_executor.go @@ -0,0 +1,77 @@ +package multiagent + +import ( + "context" + "fmt" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/adk/prebuilt/planexecute" +) + +// newPlanExecuteExecutor 与 planexecute.NewExecutor 行为一致,但可为执行器注入 Handlers(例如 summarization 中间件)。 +func newPlanExecuteExecutor(ctx context.Context, cfg *planexecute.ExecutorConfig, handlers []adk.ChatModelAgentMiddleware) (adk.Agent, error) { + if cfg == nil { + return nil, fmt.Errorf("plan_execute: ExecutorConfig 为空") + } + if cfg.Model == nil { + return nil, fmt.Errorf("plan_execute: Executor Model 为空") + } + genInputFn := cfg.GenInputFn + if genInputFn == nil { + genInputFn = planExecuteDefaultGenExecutorInput + } + genInput := func(ctx context.Context, instruction string, _ *adk.AgentInput) ([]adk.Message, error) { + plan, ok := adk.GetSessionValue(ctx, planexecute.PlanSessionKey) + if !ok { + panic("impossible: plan not found") + } + plan_ := plan.(planexecute.Plan) + + userInput, ok := adk.GetSessionValue(ctx, planexecute.UserInputSessionKey) + if !ok { + panic("impossible: user input not found") + } + userInput_ := userInput.([]adk.Message) + + var executedSteps_ []planexecute.ExecutedStep + executedStep, ok := adk.GetSessionValue(ctx, planexecute.ExecutedStepsSessionKey) + if ok { + executedSteps_ = executedStep.([]planexecute.ExecutedStep) + } + + in := &planexecute.ExecutionContext{ + UserInput: userInput_, + Plan: plan_, + ExecutedSteps: executedSteps_, + } + return genInputFn(ctx, in) + } + + agentCfg := &adk.ChatModelAgentConfig{ + Name: "executor", + Description: "an executor agent", + Model: cfg.Model, + ToolsConfig: cfg.ToolsConfig, + GenModelInput: genInput, + MaxIterations: cfg.MaxIterations, + OutputKey: planexecute.ExecutedStepSessionKey, + } + if len(handlers) > 0 { + agentCfg.Handlers = handlers + } + return adk.NewChatModelAgent(ctx, agentCfg) +} + +// planExecuteDefaultGenExecutorInput 对齐 Eino planexecute.defaultGenExecutorInputFn(包外不可引用默认实现)。 +func planExecuteDefaultGenExecutorInput(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } + return planexecute.ExecutorPrompt.Format(ctx, map[string]any{ + "input": planExecuteFormatInput(in.UserInput), + "plan": string(planContent), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "step": in.Plan.FirstStep(), + }) +} diff --git a/internal/multiagent/plan_execute_steps_cap.go b/internal/multiagent/plan_execute_steps_cap.go new file mode 100644 index 00000000..bb5092c0 --- /dev/null +++ b/internal/multiagent/plan_execute_steps_cap.go @@ -0,0 +1,59 @@ +package multiagent + +import ( + "fmt" + "strings" + "unicode/utf8" + + "github.com/cloudwego/eino/adk/prebuilt/planexecute" +) + +// plan_execute 的 Replanner / Executor prompt 会线性拼接每步 Result;无界时易撑爆上下文。 +// 此处仅约束「写入模型 prompt 的视图」,不修改 Eino session 中的原始 ExecutedSteps。 + +const ( + planExecuteMaxStepResultRunes = 12000 + planExecuteKeepLastSteps = 16 +) + +func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string { + if maxRunes <= 0 || s == "" { + return s + } + rs := []rune(s) + if len(rs) <= maxRunes { + return s + } + return string(rs[:maxRunes]) + suffix +} + +// capPlanExecuteExecutedSteps 折叠较早步骤、截断单步过长结果,供 prompt 使用。 +func capPlanExecuteExecutedSteps(steps []planexecute.ExecutedStep) []planexecute.ExecutedStep { + if len(steps) == 0 { + return steps + } + out := make([]planexecute.ExecutedStep, 0, len(steps)+1) + start := 0 + if len(steps) > planExecuteKeepLastSteps { + start = len(steps) - planExecuteKeepLastSteps + var b strings.Builder + b.WriteString(fmt.Sprintf("(上文已完成 %d 步;此处仅保留步骤标题以节省上下文,完整输出已省略。后续 %d 步仍保留正文。)\n", + start, planExecuteKeepLastSteps)) + for i := 0; i < start; i++ { + b.WriteString(fmt.Sprintf("- %s\n", steps[i].Step)) + } + out = append(out, planexecute.ExecutedStep{ + Step: "[Earlier steps — titles only]", + Result: strings.TrimRight(b.String(), "\n"), + }) + } + suffix := "\n…[step result truncated]" + for i := start; i < len(steps); i++ { + e := steps[i] + if utf8.RuneCountInString(e.Result) > planExecuteMaxStepResultRunes { + e.Result = truncateRunesWithSuffix(e.Result, planExecuteMaxStepResultRunes, suffix) + } + out = append(out, e) + } + return out +} diff --git a/internal/multiagent/plan_execute_steps_cap_test.go b/internal/multiagent/plan_execute_steps_cap_test.go new file mode 100644 index 00000000..27e0cf97 --- /dev/null +++ b/internal/multiagent/plan_execute_steps_cap_test.go @@ -0,0 +1,34 @@ +package multiagent + +import ( + "strings" + "testing" + + "github.com/cloudwego/eino/adk/prebuilt/planexecute" +) + +func TestCapPlanExecuteExecutedSteps_TruncatesLongResult(t *testing.T) { + long := strings.Repeat("x", planExecuteMaxStepResultRunes+500) + steps := []planexecute.ExecutedStep{{Step: "s1", Result: long}} + out := capPlanExecuteExecutedSteps(steps) + if len(out) != 1 { + t.Fatalf("len=%d", len(out)) + } + if !strings.Contains(out[0].Result, "truncated") { + t.Fatalf("expected truncation marker in %q", out[0].Result[:80]) + } +} + +func TestCapPlanExecuteExecutedSteps_FoldsEarlySteps(t *testing.T) { + var steps []planexecute.ExecutedStep + for i := 0; i < planExecuteKeepLastSteps+5; i++ { + steps = append(steps, planexecute.ExecutedStep{Step: "step", Result: "ok"}) + } + out := capPlanExecuteExecutedSteps(steps) + if len(out) != planExecuteKeepLastSteps+1 { + t.Fatalf("want %d entries, got %d", planExecuteKeepLastSteps+1, len(out)) + } + if out[0].Step != "[Earlier steps — titles only]" { + t.Fatalf("first entry: %#v", out[0]) + } +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index c650eb6b..49e33bff 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -398,6 +398,8 @@ func RunDeepAgent( ToolsCfg: mainToolsCfg, ExecMaxIter: deepMaxIter, LoopMaxIter: ma.PlanExecuteLoopMaxIterations, + AppCfg: appCfg, + Logger: logger, }) if perr != nil { return nil, perr