From 5159773e718315c8186efacd546f33df1f720880 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 19 Apr 2026 03:24:28 +0800 Subject: [PATCH] Add files via upload --- internal/config/config.go | 129 +++-- internal/multiagent/eino_checkpoint.go | 68 +++ internal/multiagent/eino_middleware.go | 222 +++++++++ internal/multiagent/eino_middleware_test.go | 34 ++ internal/multiagent/eino_orchestration.go | 126 +++++ internal/multiagent/eino_skills.go | 19 +- .../multiagent/orchestrator_instruction.go | 108 +++++ internal/multiagent/plan_execute_text.go | 36 ++ internal/multiagent/plan_execute_text_test.go | 17 + internal/multiagent/runner.go | 448 ++++++++++++------ 10 files changed, 1032 insertions(+), 175 deletions(-) create mode 100644 internal/multiagent/eino_checkpoint.go create mode 100644 internal/multiagent/eino_middleware.go create mode 100644 internal/multiagent/eino_middleware_test.go create mode 100644 internal/multiagent/eino_orchestration.go create mode 100644 internal/multiagent/orchestrator_instruction.go create mode 100644 internal/multiagent/plan_execute_text.go create mode 100644 internal/multiagent/plan_execute_text_test.go diff --git a/internal/config/config.go b/internal/config/config.go index c7ad6147..96b6816b 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -35,20 +35,57 @@ type Config struct { MultiAgent MultiAgentConfig `yaml:"multi_agent,omitempty" json:"multi_agent,omitempty"` } -// MultiAgentConfig 基于 CloudWeGo Eino DeepAgent 的多代理编排(与单 Agent /agent-loop 并存)。 +// MultiAgentConfig 基于 CloudWeGo Eino adk/prebuilt 的多代理编排(deep | plan_execute | supervisor,与单 Agent /agent-loop 并存)。 type MultiAgentConfig struct { - Enabled bool `yaml:"enabled" json:"enabled"` - DefaultMode string `yaml:"default_mode" json:"default_mode"` // single | multi,供前端默认展示 - RobotUseMultiAgent bool `yaml:"robot_use_multi_agent" json:"robot_use_multi_agent"` // 为 true 时钉钉/飞书/企微机器人走 Eino 多代理 - BatchUseMultiAgent bool `yaml:"batch_use_multi_agent" json:"batch_use_multi_agent"` // 为 true 时批量任务队列中每子任务走 Eino 多代理 - MaxIteration int `yaml:"max_iteration" json:"max_iteration"` // Deep 主代理最大推理轮次 - SubAgentMaxIterations int `yaml:"sub_agent_max_iterations" json:"sub_agent_max_iterations"` - WithoutGeneralSubAgent bool `yaml:"without_general_sub_agent" json:"without_general_sub_agent"` - WithoutWriteTodos bool `yaml:"without_write_todos" json:"without_write_todos"` - OrchestratorInstruction string `yaml:"orchestrator_instruction" json:"orchestrator_instruction"` - SubAgents []MultiAgentSubConfig `yaml:"sub_agents" json:"sub_agents"` + Enabled bool `yaml:"enabled" json:"enabled"` + DefaultMode string `yaml:"default_mode" json:"default_mode"` // single | multi,供前端默认展示 + RobotUseMultiAgent bool `yaml:"robot_use_multi_agent" json:"robot_use_multi_agent"` // 为 true 时钉钉/飞书/企微机器人走 Eino 多代理 + BatchUseMultiAgent bool `yaml:"batch_use_multi_agent" json:"batch_use_multi_agent"` // 为 true 时批量任务队列中每子任务走 Eino 多代理 + // Orchestration 已弃用:保留仅兼容旧版 config.yaml;编排由聊天/WebShell 请求体 orchestration 决定,未传时按 deep。 + Orchestration string `yaml:"orchestration,omitempty" json:"orchestration,omitempty"` + MaxIteration int `yaml:"max_iteration" json:"max_iteration"` // 主代理 / 执行器最大推理轮次(Deep、Supervisor、plan_execute 的 Executor) + // PlanExecuteLoopMaxIterations plan_execute 模式下 execute↔replan 外层循环上限;0 表示用 Eino 默认 10。 + PlanExecuteLoopMaxIterations int `yaml:"plan_execute_loop_max_iterations,omitempty" json:"plan_execute_loop_max_iterations,omitempty"` + SubAgentMaxIterations int `yaml:"sub_agent_max_iterations" json:"sub_agent_max_iterations"` + WithoutGeneralSubAgent bool `yaml:"without_general_sub_agent" json:"without_general_sub_agent"` + WithoutWriteTodos bool `yaml:"without_write_todos" json:"without_write_todos"` + OrchestratorInstruction string `yaml:"orchestrator_instruction" json:"orchestrator_instruction"` + // OrchestratorInstructionPlanExecute plan_execute 主代理(规划侧)系统提示;非空且 agents/orchestrator-plan-execute.md 正文为空或未存在时生效。不与 Deep 的 orchestrator_instruction 混用。 + OrchestratorInstructionPlanExecute string `yaml:"orchestrator_instruction_plan_execute,omitempty" json:"orchestrator_instruction_plan_execute,omitempty"` + // OrchestratorInstructionSupervisor supervisor 主代理系统提示(transfer/exit 说明仍由运行追加);非空且 agents/orchestrator-supervisor.md 正文为空或未存在时生效。 + OrchestratorInstructionSupervisor string `yaml:"orchestrator_instruction_supervisor,omitempty" json:"orchestrator_instruction_supervisor,omitempty"` + SubAgents []MultiAgentSubConfig `yaml:"sub_agents" json:"sub_agents"` // EinoSkills configures CloudWeGo Eino ADK skill middleware + optional local filesystem/execute on DeepAgent. EinoSkills MultiAgentEinoSkillsConfig `yaml:"eino_skills,omitempty" json:"eino_skills,omitempty"` + // EinoMiddleware wires optional ADK middleware (patchtoolcalls, toolsearch, plantask, reduction) and Deep extras. + EinoMiddleware MultiAgentEinoMiddlewareConfig `yaml:"eino_middleware,omitempty" json:"eino_middleware,omitempty"` +} + +// MultiAgentEinoMiddlewareConfig optional Eino ADK middleware and Deep / supervisor tuning. +type MultiAgentEinoMiddlewareConfig struct { + // PatchToolCalls inserts placeholder tool results for dangling assistant tool_calls (nil = enabled). + PatchToolCalls *bool `yaml:"patch_tool_calls,omitempty" json:"patch_tool_calls,omitempty"` + // ToolSearch enables dynamictool/toolsearch: hide tail tools until model calls tool_search (reduces prompt tools). + ToolSearchEnable bool `yaml:"tool_search_enable,omitempty" json:"tool_search_enable,omitempty"` + ToolSearchMinTools int `yaml:"tool_search_min_tools,omitempty" json:"tool_search_min_tools,omitempty"` // default 20; applies when len(tools) >= this + ToolSearchAlwaysVisible int `yaml:"tool_search_always_visible,omitempty" json:"tool_search_always_visible,omitempty"` // default 12; first N tools stay always visible + // Plantask adds TaskCreate/Get/Update/List (file-backed under skills dir); requires eino_skills + local backend. + PlantaskEnable bool `yaml:"plantask_enable,omitempty" json:"plantask_enable,omitempty"` + // PlantaskRelDir relative to skills_dir for per-conversation task boards (default .eino/plantask). + PlantaskRelDir string `yaml:"plantask_rel_dir,omitempty" json:"plantask_rel_dir,omitempty"` + // Reduction truncates/offloads large tool outputs (requires eino local backend for Write). + ReductionEnable bool `yaml:"reduction_enable,omitempty" json:"reduction_enable,omitempty"` + ReductionRootDir string `yaml:"reduction_root_dir,omitempty" json:"reduction_root_dir,omitempty"` // default: os temp + conversation id + ReductionClearExclude []string `yaml:"reduction_clear_exclude,omitempty" json:"reduction_clear_exclude,omitempty"` + ReductionSubAgents bool `yaml:"reduction_sub_agents,omitempty" json:"reduction_sub_agents,omitempty"` // also attach to sub-agents + // CheckpointDir when non-empty enables adk.Runner CheckPointStore (file-backed) for interrupt/resume persistence. + CheckpointDir string `yaml:"checkpoint_dir,omitempty" json:"checkpoint_dir,omitempty"` + // DeepOutputKey passed to deep.Config OutputKey (session final text); empty = off. + DeepOutputKey string `yaml:"deep_output_key,omitempty" json:"deep_output_key,omitempty"` + // DeepModelRetryMaxRetries > 0 enables deep.Config ModelRetryConfig (framework-level chat model retries). + DeepModelRetryMaxRetries int `yaml:"deep_model_retry_max_retries,omitempty" json:"deep_model_retry_max_retries,omitempty"` + // TaskToolDescriptionPrefix when non-empty sets deep.Config TaskToolDescriptionGenerator (sub-agent names appended). + TaskToolDescriptionPrefix string `yaml:"task_tool_description_prefix,omitempty" json:"task_tool_description_prefix,omitempty"` } // MultiAgentEinoSkillsConfig toggles Eino official skill progressive disclosure and host filesystem tools. @@ -69,7 +106,15 @@ func (c MultiAgentEinoSkillsConfig) EinoSkillFilesystemToolsEffective() bool { return true } -// MultiAgentSubConfig 子代理(Eino ChatModelAgent),由 DeepAgent 通过 task 工具调度。 +// PatchToolCallsEffective returns whether patchtoolcalls middleware should run (default true). +func (c MultiAgentEinoMiddlewareConfig) PatchToolCallsEffective() bool { + if c.PatchToolCalls != nil { + return *c.PatchToolCalls + } + return true +} + +// MultiAgentSubConfig 子代理(Eino ChatModelAgent):deep 下由 task 调度;supervisor 下由 transfer 委派;plan_execute 不使用子代理列表。 type MultiAgentSubConfig struct { ID string `yaml:"id" json:"id"` Name string `yaml:"name" json:"name"` @@ -83,19 +128,35 @@ type MultiAgentSubConfig struct { // MultiAgentPublic 返回给前端的精简信息(不含子代理指令全文)。 type MultiAgentPublic struct { - Enabled bool `json:"enabled"` - DefaultMode string `json:"default_mode"` - RobotUseMultiAgent bool `json:"robot_use_multi_agent"` - BatchUseMultiAgent bool `json:"batch_use_multi_agent"` - SubAgentCount int `json:"sub_agent_count"` + Enabled bool `json:"enabled"` + DefaultMode string `json:"default_mode"` + RobotUseMultiAgent bool `json:"robot_use_multi_agent"` + BatchUseMultiAgent bool `json:"batch_use_multi_agent"` + SubAgentCount int `json:"sub_agent_count"` + Orchestration string `json:"orchestration,omitempty"` + PlanExecuteLoopMaxIterations int `json:"plan_execute_loop_max_iterations"` +} + +// NormalizeMultiAgentOrchestration 返回 deep、plan_execute 或 supervisor。 +func NormalizeMultiAgentOrchestration(s string) string { + v := strings.TrimSpace(strings.ToLower(s)) + switch v { + case "plan_execute", "plan-execute", "planexecute", "pe": + return "plan_execute" + case "supervisor", "super", "sv": + return "supervisor" + default: + return "deep" + } } // MultiAgentAPIUpdate 设置页/API 仅更新多代理标量字段;写入 YAML 时不覆盖 sub_agents 等块。 type MultiAgentAPIUpdate struct { - Enabled bool `json:"enabled"` - DefaultMode string `json:"default_mode"` - RobotUseMultiAgent bool `json:"robot_use_multi_agent"` - BatchUseMultiAgent bool `json:"batch_use_multi_agent"` + Enabled bool `json:"enabled"` + DefaultMode string `json:"default_mode"` + RobotUseMultiAgent bool `json:"robot_use_multi_agent"` + BatchUseMultiAgent bool `json:"batch_use_multi_agent"` + PlanExecuteLoopMaxIterations *int `json:"plan_execute_loop_max_iterations,omitempty"` } // RobotsConfig 机器人配置(企业微信、钉钉、飞书等) @@ -179,6 +240,8 @@ type AgentConfig struct { LargeResultThreshold int `yaml:"large_result_threshold" json:"large_result_threshold"` // 大结果阈值(字节),默认50KB ResultStorageDir string `yaml:"result_storage_dir" json:"result_storage_dir"` // 结果存储目录,默认tmp ToolTimeoutMinutes int `yaml:"tool_timeout_minutes" json:"tool_timeout_minutes"` // 单次工具执行最大时长(分钟),超时自动终止,防止长时间挂起;0 表示不限制(不推荐) + // SystemPromptPath 单代理系统提示 Markdown/文本文件路径(相对 config.yaml 所在目录,或可写绝对路径)。非空且可读时替换内置单代理提示;留空用内置。 + SystemPromptPath string `yaml:"system_prompt_path,omitempty" json:"system_prompt_path,omitempty"` } type AuthConfig struct { @@ -776,18 +839,18 @@ func Default() *Config { SimilarityThreshold: 0.65, // 降低阈值到 0.65,减少漏检 }, Indexing: IndexingConfig{ - ChunkStrategy: "markdown_then_recursive", - RequestTimeoutSeconds: 120, - ChunkSize: 768, // 增加到 768,更好的上下文保持 - ChunkOverlap: 50, - MaxChunksPerItem: 20, // 限制单个知识项最多 20 个块,避免消耗过多配额 - BatchSize: 64, - PreferSourceFile: false, - MaxRPM: 100, // 默认 100 RPM,避免 429 错误 - RateLimitDelayMs: 600, // 600ms 间隔,对应 100 RPM - MaxRetries: 3, - RetryDelayMs: 1000, - SubIndexes: nil, + ChunkStrategy: "markdown_then_recursive", + RequestTimeoutSeconds: 120, + ChunkSize: 768, // 增加到 768,更好的上下文保持 + ChunkOverlap: 50, + MaxChunksPerItem: 20, // 限制单个知识项最多 20 个块,避免消耗过多配额 + BatchSize: 64, + PreferSourceFile: false, + MaxRPM: 100, // 默认 100 RPM,避免 429 错误 + RateLimitDelayMs: 600, // 600ms 间隔,对应 100 RPM + MaxRetries: 3, + RetryDelayMs: 1000, + SubIndexes: nil, }, }, } diff --git a/internal/multiagent/eino_checkpoint.go b/internal/multiagent/eino_checkpoint.go new file mode 100644 index 00000000..569c698c --- /dev/null +++ b/internal/multiagent/eino_checkpoint.go @@ -0,0 +1,68 @@ +package multiagent + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" +) + +// fileCheckPointStore implements adk.CheckPointStore with one file per checkpoint id. +type fileCheckPointStore struct { + dir string +} + +func newFileCheckPointStore(baseDir string) (*fileCheckPointStore, error) { + if strings.TrimSpace(baseDir) == "" { + return nil, fmt.Errorf("checkpoint base dir empty") + } + abs, err := filepath.Abs(baseDir) + if err != nil { + return nil, err + } + if err := os.MkdirAll(abs, 0o755); err != nil { + return nil, err + } + return &fileCheckPointStore{dir: abs}, nil +} + +func (s *fileCheckPointStore) path(id string) (string, error) { + id = strings.TrimSpace(id) + if id == "" { + return "", fmt.Errorf("checkpoint id empty") + } + if strings.ContainsAny(id, `/\`) { + return "", fmt.Errorf("invalid checkpoint id") + } + return filepath.Join(s.dir, id+".ckpt"), nil +} + +func (s *fileCheckPointStore) Get(ctx context.Context, checkPointID string) ([]byte, bool, error) { + _ = ctx + p, err := s.path(checkPointID) + if err != nil { + return nil, false, err + } + b, err := os.ReadFile(p) + if err != nil { + if os.IsNotExist(err) { + return nil, false, nil + } + return nil, false, err + } + return b, true, nil +} + +func (s *fileCheckPointStore) Set(ctx context.Context, checkPointID string, checkPoint []byte) error { + _ = ctx + p, err := s.path(checkPointID) + if err != nil { + return err + } + tmp := p + ".tmp" + if err := os.WriteFile(tmp, checkPoint, 0o600); err != nil { + return err + } + return os.Rename(tmp, p) +} diff --git a/internal/multiagent/eino_middleware.go b/internal/multiagent/eino_middleware.go new file mode 100644 index 00000000..f874da4d --- /dev/null +++ b/internal/multiagent/eino_middleware.go @@ -0,0 +1,222 @@ +package multiagent + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + + "cyberstrike-ai/internal/config" + + localbk "github.com/cloudwego/eino-ext/adk/backend/local" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/adk/middlewares/dynamictool/toolsearch" + "github.com/cloudwego/eino/adk/middlewares/patchtoolcalls" + "github.com/cloudwego/eino/adk/middlewares/plantask" + "github.com/cloudwego/eino/adk/middlewares/reduction" + "github.com/cloudwego/eino/components/tool" + "go.uber.org/zap" +) + +// einoMWPlacement controls which optional middleware runs on orchestrator vs sub-agents. +type einoMWPlacement int + +const ( + einoMWMain einoMWPlacement = iota // Deep / Supervisor main chat agent + einoMWSub // Specialist ChatModelAgent +) + +func sanitizeEinoPathSegment(s string) string { + s = strings.TrimSpace(s) + if s == "" { + return "default" + } + s = strings.ReplaceAll(s, string(filepath.Separator), "-") + s = strings.ReplaceAll(s, "/", "-") + s = strings.ReplaceAll(s, "\\", "-") + s = strings.ReplaceAll(s, "..", "__") + if len(s) > 180 { + s = s[:180] + } + return s +} + +// localPlantaskBackend wraps the eino-ext local backend with plantask.Delete (Local has no Delete). +type localPlantaskBackend struct { + *localbk.Local +} + +func (l *localPlantaskBackend) Delete(ctx context.Context, req *plantask.DeleteRequest) error { + if l == nil || l.Local == nil || req == nil { + return nil + } + p := strings.TrimSpace(req.FilePath) + if p == "" { + return nil + } + return os.Remove(p) +} + +func splitToolsForToolSearch(all []tool.BaseTool, alwaysVisible int) (static []tool.BaseTool, dynamic []tool.BaseTool, ok bool) { + if alwaysVisible <= 0 || len(all) <= alwaysVisible+1 { + return all, nil, false + } + return append([]tool.BaseTool(nil), all[:alwaysVisible]...), append([]tool.BaseTool(nil), all[alwaysVisible:]...), true +} + +func buildReductionMiddleware(ctx context.Context, mw config.MultiAgentEinoMiddlewareConfig, convID string, loc *localbk.Local, logger *zap.Logger) (adk.ChatModelAgentMiddleware, error) { + if loc == nil { + return nil, fmt.Errorf("reduction: local backend nil") + } + root := strings.TrimSpace(mw.ReductionRootDir) + if root == "" { + root = filepath.Join(os.TempDir(), "cyberstrike-reduction", sanitizeEinoPathSegment(convID)) + } + if err := os.MkdirAll(root, 0o755); err != nil { + return nil, fmt.Errorf("reduction root: %w", err) + } + excl := append([]string(nil), mw.ReductionClearExclude...) + defaultExcl := []string{ + "task", "transfer_to_agent", "exit", "write_todos", "skill", "tool_search", + "TaskCreate", "TaskGet", "TaskUpdate", "TaskList", + } + excl = append(excl, defaultExcl...) + redMW, err := reduction.New(ctx, &reduction.Config{ + Backend: loc, + RootDir: root, + ReadFileToolName: "read_file", + ClearExcludeTools: excl, + }) + if err != nil { + return nil, err + } + if logger != nil { + logger.Info("eino middleware: reduction enabled", zap.String("root", root)) + } + return redMW, nil +} + +// prependEinoMiddlewares returns handlers to prepend (outermost first) and optionally replaces tools when tool_search is used. +func prependEinoMiddlewares( + ctx context.Context, + mw *config.MultiAgentEinoMiddlewareConfig, + place einoMWPlacement, + tools []tool.BaseTool, + einoLoc *localbk.Local, + skillsRoot string, + conversationID string, + logger *zap.Logger, +) (outTools []tool.BaseTool, extraHandlers []adk.ChatModelAgentMiddleware, err error) { + if mw == nil { + return tools, nil, nil + } + outTools = tools + + if mw.PatchToolCallsEffective() { + patchMW, perr := patchtoolcalls.New(ctx, &patchtoolcalls.Config{}) + if perr != nil { + return nil, nil, fmt.Errorf("patchtoolcalls: %w", perr) + } + extraHandlers = append(extraHandlers, patchMW) + } + + if mw.ReductionEnable && einoLoc != nil { + if place == einoMWSub && !mw.ReductionSubAgents { + // skip + } else { + redMW, rerr := buildReductionMiddleware(ctx, *mw, conversationID, einoLoc, logger) + if rerr != nil { + return nil, nil, rerr + } + extraHandlers = append(extraHandlers, redMW) + } + } + + minTools := mw.ToolSearchMinTools + if minTools <= 0 { + minTools = 20 + } + alwaysVis := mw.ToolSearchAlwaysVisible + if alwaysVis <= 0 { + alwaysVis = 12 + } + if mw.ToolSearchEnable && len(tools) >= minTools { + static, dynamic, split := splitToolsForToolSearch(tools, alwaysVis) + if split && len(dynamic) > 0 { + ts, terr := toolsearch.New(ctx, &toolsearch.Config{DynamicTools: dynamic}) + if terr != nil { + return nil, nil, fmt.Errorf("toolsearch: %w", terr) + } + extraHandlers = append(extraHandlers, ts) + outTools = static + if logger != nil { + logger.Info("eino middleware: tool_search enabled", + zap.Int("static_tools", len(static)), + zap.Int("dynamic_tools", len(dynamic))) + } + } + } + + if place == einoMWMain && mw.PlantaskEnable { + if einoLoc == nil || strings.TrimSpace(skillsRoot) == "" { + if logger != nil { + logger.Warn("eino middleware: plantask_enable ignored (need eino_skills + skills_dir)") + } + } else { + rel := strings.TrimSpace(mw.PlantaskRelDir) + if rel == "" { + rel = ".eino/plantask" + } + baseDir := filepath.Join(skillsRoot, rel, sanitizeEinoPathSegment(conversationID)) + if mk := os.MkdirAll(baseDir, 0o755); mk != nil { + return nil, nil, fmt.Errorf("plantask mkdir: %w", mk) + } + ptBE := &localPlantaskBackend{Local: einoLoc} + pt, perr := plantask.New(ctx, &plantask.Config{Backend: ptBE, BaseDir: baseDir}) + if perr != nil { + return nil, nil, fmt.Errorf("plantask: %w", perr) + } + extraHandlers = append(extraHandlers, pt) + if logger != nil { + logger.Info("eino middleware: plantask enabled", zap.String("baseDir", baseDir)) + } + } + } + + return outTools, extraHandlers, nil +} + +func deepExtrasFromConfig(ma *config.MultiAgentConfig) (outputKey string, retry *adk.ModelRetryConfig, taskDesc func(context.Context, []adk.Agent) (string, error)) { + if ma == nil { + return "", nil, nil + } + mw := ma.EinoMiddleware + if k := strings.TrimSpace(mw.DeepOutputKey); k != "" { + outputKey = k + } + if mw.DeepModelRetryMaxRetries > 0 { + retry = &adk.ModelRetryConfig{MaxRetries: mw.DeepModelRetryMaxRetries} + } + prefix := strings.TrimSpace(mw.TaskToolDescriptionPrefix) + if prefix != "" { + taskDesc = func(ctx context.Context, agents []adk.Agent) (string, error) { + _ = ctx + var names []string + for _, a := range agents { + if a == nil { + continue + } + n := strings.TrimSpace(a.Name(ctx)) + if n != "" { + names = append(names, n) + } + } + if len(names) == 0 { + return prefix, nil + } + return prefix + "\n可用子代理(按名称 transfer / task 调用):" + strings.Join(names, "、"), nil + } + } + return outputKey, retry, taskDesc +} diff --git a/internal/multiagent/eino_middleware_test.go b/internal/multiagent/eino_middleware_test.go new file mode 100644 index 00000000..04c42104 --- /dev/null +++ b/internal/multiagent/eino_middleware_test.go @@ -0,0 +1,34 @@ +package multiagent + +import ( + "context" + "fmt" + "testing" + + "github.com/cloudwego/eino/components/tool" + "github.com/cloudwego/eino/schema" +) + +type stubTool struct{ name string } + +func (s stubTool) Info(_ context.Context) (*schema.ToolInfo, error) { + return &schema.ToolInfo{Name: s.name}, nil +} + +func TestSplitToolsForToolSearch(t *testing.T) { + mk := func(n int) []tool.BaseTool { + out := make([]tool.BaseTool, n) + for i := 0; i < n; i++ { + out[i] = stubTool{name: fmt.Sprintf("t%d", i)} + } + return out + } + static, dynamic, ok := splitToolsForToolSearch(mk(4), 3) + if ok || len(static) != 4 || dynamic != nil { + t.Fatalf("expected no split when len<=alwaysVisible+1, got ok=%v static=%d dynamic=%v", ok, len(static), dynamic) + } + static, dynamic, ok = splitToolsForToolSearch(mk(20), 5) + if !ok || len(static) != 5 || len(dynamic) != 15 { + t.Fatalf("expected split 5+15, got ok=%v static=%d dynamic=%d", ok, len(static), len(dynamic)) + } +} diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go new file mode 100644 index 00000000..5240ba06 --- /dev/null +++ b/internal/multiagent/eino_orchestration.go @@ -0,0 +1,126 @@ +package multiagent + +import ( + "context" + "fmt" + "strings" + + "github.com/cloudwego/eino-ext/components/model/openai" + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/adk/prebuilt/planexecute" + "github.com/cloudwego/eino/components/model" + "github.com/cloudwego/eino/schema" +) + +// PlanExecuteRootArgs 构建 Eino adk/prebuilt/planexecute 根 Agent 所需参数。 +type PlanExecuteRootArgs struct { + MainToolCallingModel *openai.ChatModel + ExecModel *openai.ChatModel + OrchInstruction string + ToolsCfg adk.ToolsConfig + ExecMaxIter int + LoopMaxIter int +} + +// NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。 +func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.ResumableAgent, error) { + if a == nil { + return nil, fmt.Errorf("plan_execute: args 为空") + } + if a.MainToolCallingModel == nil || a.ExecModel == nil { + return nil, fmt.Errorf("plan_execute: 模型为空") + } + tcm, ok := interface{}(a.MainToolCallingModel).(model.ToolCallingChatModel) + if !ok { + return nil, fmt.Errorf("plan_execute: 主模型需实现 ToolCallingChatModel") + } + planner, err := planexecute.NewPlanner(ctx, &planexecute.PlannerConfig{ + ToolCallingChatModel: tcm, + }) + if err != nil { + return nil, fmt.Errorf("plan_execute planner: %w", err) + } + replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{ + ChatModel: tcm, + }) + if err != nil { + return nil, fmt.Errorf("plan_execute replanner: %w", err) + } + executor, err := planexecute.NewExecutor(ctx, &planexecute.ExecutorConfig{ + Model: a.ExecModel, + ToolsConfig: a.ToolsCfg, + MaxIterations: a.ExecMaxIter, + GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction), + }) + if err != nil { + return nil, fmt.Errorf("plan_execute executor: %w", err) + } + loopMax := a.LoopMaxIter + if loopMax <= 0 { + loopMax = 10 + } + return planexecute.New(ctx, &planexecute.Config{ + Planner: planner, + Executor: executor, + Replanner: replanner, + MaxIterations: loopMax, + }) +} + +func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInputFn { + oi := strings.TrimSpace(orchInstruction) + return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) { + planContent, err := in.Plan.MarshalJSON() + if err != nil { + return nil, err + } + userMsgs, err := planexecute.ExecutorPrompt.Format(ctx, map[string]any{ + "input": planExecuteFormatInput(in.UserInput), + "plan": string(planContent), + "executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps), + "step": in.Plan.FirstStep(), + }) + if err != nil { + return nil, err + } + if oi != "" { + userMsgs = append([]adk.Message{schema.SystemMessage(oi)}, userMsgs...) + } + return userMsgs, nil + } +} + +func planExecuteFormatInput(input []adk.Message) string { + var sb strings.Builder + for _, msg := range input { + sb.WriteString(msg.Content) + sb.WriteString("\n") + } + return sb.String() +} + +func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep) string { + var sb strings.Builder + for _, result := range results { + sb.WriteString(fmt.Sprintf("Step: %s\nResult: %s\n\n", result.Step, result.Result)) + } + return sb.String() +} + +// planExecuteStreamsMainAssistant 将规划/执行/重规划各阶段助手流式输出映射到主对话区。 +func planExecuteStreamsMainAssistant(agent string) bool { + if agent == "" { + return true + } + switch agent { + case "planner", "executor", "replanner", "execute_replan", "plan_execute_replan": + return true + default: + return false + } +} + +func planExecuteEinoRoleTag(agent string) string { + _ = agent + return "orchestrator" +} diff --git a/internal/multiagent/eino_skills.go b/internal/multiagent/eino_skills.go index 73dafe5a..9a5c0f46 100644 --- a/internal/multiagent/eino_skills.go +++ b/internal/multiagent/eino_skills.go @@ -18,36 +18,37 @@ import ( // prepareEinoSkills builds Eino official skill backend + middleware, and a shared local disk backend // for skill discovery and (optionally) filesystem/execute tools. Returns nils when disabled or dir missing. +// skillsRoot is the absolute skills directory (empty when skills are not active). func prepareEinoSkills( ctx context.Context, skillsDir string, ma *config.MultiAgentConfig, logger *zap.Logger, -) (loc *localbk.Local, skillMW adk.ChatModelAgentMiddleware, fsTools bool, err error) { +) (loc *localbk.Local, skillMW adk.ChatModelAgentMiddleware, fsTools bool, skillsRoot string, err error) { if ma == nil || ma.EinoSkills.Disable { - return nil, nil, false, nil + return nil, nil, false, "", nil } root := strings.TrimSpace(skillsDir) if root == "" { if logger != nil { logger.Warn("eino skills: skills_dir empty, skip") } - return nil, nil, false, nil + return nil, nil, false, "", nil } abs, err := filepath.Abs(root) if err != nil { - return nil, nil, false, fmt.Errorf("skills_dir abs: %w", err) + return nil, nil, false, "", fmt.Errorf("skills_dir abs: %w", err) } if st, err := os.Stat(abs); err != nil || !st.IsDir() { if logger != nil { logger.Warn("eino skills: directory missing, skip", zap.String("dir", abs), zap.Error(err)) } - return nil, nil, false, nil + return nil, nil, false, "", nil } loc, err = localbk.NewBackend(ctx, &localbk.Config{}) if err != nil { - return nil, nil, false, fmt.Errorf("eino local backend: %w", err) + return nil, nil, false, "", fmt.Errorf("eino local backend: %w", err) } skillBE, err := skill.NewBackendFromFilesystem(ctx, &skill.BackendFromFilesystemConfig{ @@ -55,7 +56,7 @@ func prepareEinoSkills( BaseDir: abs, }) if err != nil { - return nil, nil, false, fmt.Errorf("eino skill filesystem backend: %w", err) + return nil, nil, false, "", fmt.Errorf("eino skill filesystem backend: %w", err) } sc := &skill.Config{Backend: skillBE} @@ -64,11 +65,11 @@ func prepareEinoSkills( } skillMW, err = skill.NewMiddleware(ctx, sc) if err != nil { - return nil, nil, false, fmt.Errorf("eino skill middleware: %w", err) + return nil, nil, false, "", fmt.Errorf("eino skill middleware: %w", err) } fsTools = ma.EinoSkills.EinoSkillFilesystemToolsEffective() - return loc, skillMW, fsTools, nil + return loc, skillMW, fsTools, abs, nil } // subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself diff --git a/internal/multiagent/orchestrator_instruction.go b/internal/multiagent/orchestrator_instruction.go new file mode 100644 index 00000000..db22718f --- /dev/null +++ b/internal/multiagent/orchestrator_instruction.go @@ -0,0 +1,108 @@ +package multiagent + +import ( + "strings" + + "cyberstrike-ai/internal/agents" + "cyberstrike-ai/internal/config" +) + +// DefaultPlanExecuteOrchestratorInstruction 当未配置 plan_execute 专用 Markdown / YAML 时的内置主代理(规划/重规划侧)提示。 +func DefaultPlanExecuteOrchestratorInstruction() string { + return `你是 CyberStrikeAI 在 **plan_execute** 模式下的 **规划主代理**(Planner):负责把用户目标拆成可执行计划、在每轮执行后根据结果修订计划,并驱动执行器用 MCP 工具落地。你不使用 Deep 的 task 子代理委派;执行器会按你的计划直接调用工具。 + +## 授权与范围 + +- 授权已由系统前置裁决:不索取或核实「是否有权」;在已授权范围内做非破坏性测试。 +- 严格遵守用户给定的范围与约束;超出范围的要求需明确说明并回到授权边界内。 + +## 职责 + +- **计划**:输出清晰阶段(侦察 / 验证 / 汇总等)、每步的输入输出、验收标准与依赖关系;避免模糊动词。 +- **重规划**:执行器返回后,对照证据决定「继续 / 调整顺序 / 缩小范围 / 终止」;用新信息更新计划,不要重复无效步骤。 +- **风险**:标注破坏性操作、速率与封禁风险;优先可逆、可证据化的步骤。 +- **质量**:禁止无证据的确定结论;要求执行器用请求/响应、命令输出等支撑发现。 + +## 漏洞 + +发现有效漏洞时要求执行器或你在后续轮次使用 record_vulnerability 记录(标题、描述、严重程度、类型、目标、POC、影响、修复建议;级别 critical/high/medium/low/info)。 + +## 执行器对用户输出(重要) + +- 执行器在对话中**直接展示给用户的正文**须为可读纯文本,勿使用 {"response":"..."} 等 JSON 包裹;结构化计划由框架/planner 处理,与用户寒暄、结论、说明均用自然语言。 + +## 表达 + +在调用工具或给出计划变更前,用 2~5 句中文说明当前决策依据与期望证据形态;最终对用户交付结构化结论(发现摘要、证据、风险、下一步)。` +} + +// DefaultSupervisorOrchestratorInstruction 当未配置 supervisor 专用 Markdown / YAML 时的内置监督者提示(transfer / exit 说明仍由运行时在末尾追加)。 +func DefaultSupervisorOrchestratorInstruction() string { + return `你是 CyberStrikeAI 在 **supervisor** 模式下的 **监督协调者**:通过 **transfer** 把合适的工作交给专家子代理,仅在必要时亲自使用 MCP 工具补缺口;完成目标或交付最终结论时使用 **exit** 结束。 + +## 授权 + +- 授权已前置:不讨论是否有权;在已授权范围内推进非破坏性测试。 + +## 策略 + +- **委派优先**:可独立封装、需要专项上下文的子目标(枚举、验证、归纳、报告素材)优先 transfer 给匹配子代理,并在委派说明中写清:子目标、约束、期望交付物结构、证据要求。 +- **亲自执行**:仅当无合适专家、需全局衔接或子代理结果不足时,由你直接调用工具。 +- **汇总**:子代理输出是证据来源;你要对齐矛盾、补全上下文,给出统一结论与可复现验证步骤,避免机械拼接。 +- **漏洞**:有效漏洞应通过 record_vulnerability 记录(含 POC 与严重性)。 + +## 表达 + +委派或调用工具前用简短中文说明子目标与理由;对用户回复结构清晰(结论、证据、不确定性、建议)。` +} + +// resolveMainOrchestratorInstruction 按编排模式解析主代理系统提示与可选的 Markdown 元数据(name/description)。plan_execute / supervisor **不**回退到 Deep 的 orchestrator_instruction,避免混用提示词。 +func resolveMainOrchestratorInstruction(mode string, ma *config.MultiAgentConfig, markdownLoad *agents.MarkdownDirLoad) (instruction string, meta *agents.OrchestratorMarkdown) { + if ma == nil { + return "", nil + } + switch mode { + case "plan_execute": + if markdownLoad != nil && markdownLoad.OrchestratorPlanExecute != nil { + meta = markdownLoad.OrchestratorPlanExecute + if s := strings.TrimSpace(meta.Instruction); s != "" { + return s, meta + } + } + if s := strings.TrimSpace(ma.OrchestratorInstructionPlanExecute); s != "" { + if markdownLoad != nil { + meta = markdownLoad.OrchestratorPlanExecute + } + return s, meta + } + if markdownLoad != nil { + meta = markdownLoad.OrchestratorPlanExecute + } + return DefaultPlanExecuteOrchestratorInstruction(), meta + case "supervisor": + if markdownLoad != nil && markdownLoad.OrchestratorSupervisor != nil { + meta = markdownLoad.OrchestratorSupervisor + if s := strings.TrimSpace(meta.Instruction); s != "" { + return s, meta + } + } + if s := strings.TrimSpace(ma.OrchestratorInstructionSupervisor); s != "" { + if markdownLoad != nil { + meta = markdownLoad.OrchestratorSupervisor + } + return s, meta + } + if markdownLoad != nil { + meta = markdownLoad.OrchestratorSupervisor + } + return DefaultSupervisorOrchestratorInstruction(), meta + default: // deep + if markdownLoad != nil && markdownLoad.Orchestrator != nil { + meta = markdownLoad.Orchestrator + if s := strings.TrimSpace(markdownLoad.Orchestrator.Instruction); s != "" { + return s, meta + } + } + return strings.TrimSpace(ma.OrchestratorInstruction), meta + } +} diff --git a/internal/multiagent/plan_execute_text.go b/internal/multiagent/plan_execute_text.go new file mode 100644 index 00000000..390e1e62 --- /dev/null +++ b/internal/multiagent/plan_execute_text.go @@ -0,0 +1,36 @@ +package multiagent + +import ( + "encoding/json" + "strings" +) + +// UnwrapPlanExecuteUserText 若模型输出单层 JSON 且含常见「对用户回复」字段,则取出纯文本;否则原样返回。 +// 用于 Plan-Execute 下 executor 套 `{"response":"..."}` 或误把 replanner/planner JSON 当作最终气泡时的缓解。 +func UnwrapPlanExecuteUserText(s string) string { + s = strings.TrimSpace(s) + if len(s) < 2 || s[0] != '{' || s[len(s)-1] != '}' { + return s + } + var m map[string]interface{} + if err := json.Unmarshal([]byte(s), &m); err != nil { + return s + } + for _, key := range []string{ + "response", "answer", "message", "content", "output", + "final_answer", "reply", "text", "result_text", + } { + v, ok := m[key] + if !ok || v == nil { + continue + } + str, ok := v.(string) + if !ok { + continue + } + if t := strings.TrimSpace(str); t != "" { + return t + } + } + return s +} diff --git a/internal/multiagent/plan_execute_text_test.go b/internal/multiagent/plan_execute_text_test.go new file mode 100644 index 00000000..a6ddda24 --- /dev/null +++ b/internal/multiagent/plan_execute_text_test.go @@ -0,0 +1,17 @@ +package multiagent + +import "testing" + +func TestUnwrapPlanExecuteUserText(t *testing.T) { + raw := `{"response": "你好!很高兴见到你。"}` + if got := UnwrapPlanExecuteUserText(raw); got != "你好!很高兴见到你。" { + t.Fatalf("got %q", got) + } + if got := UnwrapPlanExecuteUserText("plain"); got != "plain" { + t.Fatalf("got %q", got) + } + steps := `{"steps":["a","b"]}` + if got := UnwrapPlanExecuteUserText(steps); got != steps { + t.Fatalf("expected unchanged steps json, got %q", got) + } +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 835b6eee..c650eb6b 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -1,4 +1,4 @@ -// Package multiagent 使用 CloudWeGo Eino 的 DeepAgent(adk/prebuilt/deep)编排多代理,MCP 工具经 einomcp 桥接到现有 Agent。 +// Package multiagent 使用 CloudWeGo Eino adk/prebuilt(deep / plan_execute / supervisor)编排多代理,MCP 工具经 einomcp 桥接到现有 Agent。 package multiagent import ( @@ -8,6 +8,7 @@ import ( "fmt" "io" "net" + "path/filepath" "net/http" "sort" "strings" @@ -25,6 +26,7 @@ import ( "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/adk/filesystem" "github.com/cloudwego/eino/adk/prebuilt/deep" + "github.com/cloudwego/eino/adk/prebuilt/supervisor" "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" "go.uber.org/zap" @@ -48,7 +50,8 @@ type toolCallPendingInfo struct { EinoRole string } -// RunDeepAgent 使用 Eino DeepAgent 执行一轮对话(流式事件通过 progress 回调输出)。 +// RunDeepAgent 使用 Eino 多代理预置编排执行一轮对话(deep / plan_execute / supervisor;流式事件通过 progress 回调输出)。 +// orchestrationOverride 非空时优先(如聊天/WebShell 请求体);否则用 multi_agent.orchestration(遗留 yaml);皆空则按 deep。 func RunDeepAgent( ctx context.Context, appCfg *config.Config, @@ -61,12 +64,14 @@ func RunDeepAgent( roleTools []string, progress func(eventType, message string, data interface{}), agentsMarkdownDir string, + orchestrationOverride string, ) (*RunResult, error) { if appCfg == nil || ma == nil || ag == nil { return nil, fmt.Errorf("multiagent: 配置或 Agent 为空") } effectiveSubs := ma.SubAgents + var markdownLoad *agents.MarkdownDirLoad var orch *agents.OrchestratorMarkdown if strings.TrimSpace(agentsMarkdownDir) != "" { load, merr := agents.LoadMarkdownAgentsDir(agentsMarkdownDir) @@ -75,15 +80,23 @@ func RunDeepAgent( logger.Warn("加载 agents 目录 Markdown 失败,沿用 config 中的 sub_agents", zap.Error(merr)) } } else { + markdownLoad = load effectiveSubs = agents.MergeYAMLAndMarkdown(ma.SubAgents, load.SubAgents) orch = load.Orchestrator } } - if ma.WithoutGeneralSubAgent && len(effectiveSubs) == 0 { + orchMode := config.NormalizeMultiAgentOrchestration(ma.Orchestration) + if o := strings.TrimSpace(orchestrationOverride); o != "" { + orchMode = config.NormalizeMultiAgentOrchestration(o) + } + if orchMode != "plan_execute" && ma.WithoutGeneralSubAgent && len(effectiveSubs) == 0 { return nil, fmt.Errorf("multi_agent.without_general_sub_agent 为 true 时,必须在 multi_agent.sub_agents 或 agents 目录 Markdown 中配置至少一个子代理") } + if orchMode == "supervisor" && len(effectiveSubs) == 0 { + return nil, fmt.Errorf("multi_agent.orchestration=supervisor 时需至少配置一个子代理(sub_agents 或 agents 目录 Markdown)") + } - einoLoc, einoSkillMW, einoFSTools, einoErr := prepareEinoSkills(ctx, appCfg.SkillsDir, ma, logger) + einoLoc, einoSkillMW, einoFSTools, skillsRoot, einoErr := prepareEinoSkills(ctx, appCfg.SkillsDir, ma, logger) if einoErr != nil { return nil, einoErr } @@ -133,6 +146,11 @@ func RunDeepAgent( return nil, err } + mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger) + if err != nil { + return nil, err + } + httpClient := &http.Client{ Timeout: 30 * time.Minute, Transport: &http.Transport{ @@ -171,131 +189,163 @@ func RunDeepAgent( subDefaultIter = 20 } - subAgents := make([]adk.Agent, 0, len(effectiveSubs)) - for _, sub := range effectiveSubs { - id := strings.TrimSpace(sub.ID) - if id == "" { - return nil, fmt.Errorf("multi_agent.sub_agents 中存在空的 id") - } - name := strings.TrimSpace(sub.Name) - if name == "" { - name = id - } - desc := strings.TrimSpace(sub.Description) - if desc == "" { - desc = fmt.Sprintf("Specialist agent %s for penetration testing workflow.", id) - } - instr := strings.TrimSpace(sub.Instruction) - if instr == "" { - instr = "你是 CyberStrikeAI 中的专业子代理,在授权渗透测试场景下协助完成用户委托的子任务。优先使用可用工具获取证据,回答简洁专业。" - } + var subAgents []adk.Agent + if orchMode != "plan_execute" { + subAgents = make([]adk.Agent, 0, len(effectiveSubs)) + for _, sub := range effectiveSubs { + id := strings.TrimSpace(sub.ID) + if id == "" { + return nil, fmt.Errorf("multi_agent.sub_agents 中存在空的 id") + } + name := strings.TrimSpace(sub.Name) + if name == "" { + name = id + } + desc := strings.TrimSpace(sub.Description) + if desc == "" { + desc = fmt.Sprintf("Specialist agent %s for penetration testing workflow.", id) + } + instr := strings.TrimSpace(sub.Instruction) + if instr == "" { + instr = "你是 CyberStrikeAI 中的专业子代理,在授权渗透测试场景下协助完成用户委托的子任务。优先使用可用工具获取证据,回答简洁专业。" + } - roleTools := sub.RoleTools - bind := strings.TrimSpace(sub.BindRole) - if bind != "" && appCfg.Roles != nil { - if r, ok := appCfg.Roles[bind]; ok && r.Enabled { - if len(roleTools) == 0 && len(r.Tools) > 0 { - roleTools = r.Tools - } - if len(r.Skills) > 0 { - var b strings.Builder - b.WriteString(instr) - b.WriteString("\n\n本角色推荐优先通过 Eino `skill` 工具(渐进式披露)加载的技能包 name:") - for i, s := range r.Skills { - if i > 0 { - b.WriteString("、") - } - b.WriteString(s) + roleTools := sub.RoleTools + bind := strings.TrimSpace(sub.BindRole) + if bind != "" && appCfg.Roles != nil { + if r, ok := appCfg.Roles[bind]; ok && r.Enabled { + if len(roleTools) == 0 && len(r.Tools) > 0 { + roleTools = r.Tools + } + if len(r.Skills) > 0 { + var b strings.Builder + b.WriteString(instr) + b.WriteString("\n\n本角色推荐优先通过 Eino `skill` 工具(渐进式披露)加载的技能包 name:") + for i, s := range r.Skills { + if i > 0 { + b.WriteString("、") + } + b.WriteString(s) + } + b.WriteString("。") + instr = b.String() } - b.WriteString("。") - instr = b.String() } } - } - subModel, err := einoopenai.NewChatModel(ctx, baseModelCfg) - if err != nil { - return nil, fmt.Errorf("子代理 %q ChatModel: %w", id, err) - } - - subDefs := ag.ToolsForRole(roleTools) - subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk) - if err != nil { - return nil, fmt.Errorf("子代理 %q 工具: %w", id, err) - } - - subMax := sub.MaxIterations - if subMax <= 0 { - subMax = subDefaultIter - } - - subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, logger) - if err != nil { - return nil, fmt.Errorf("子代理 %q summarization 中间件: %w", id, err) - } - - var subHandlers []adk.ChatModelAgentMiddleware - if einoSkillMW != nil { - if einoFSTools && einoLoc != nil { - subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc) - if fsErr != nil { - return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr) - } - subHandlers = append(subHandlers, subFs) + subModel, err := einoopenai.NewChatModel(ctx, baseModelCfg) + if err != nil { + return nil, fmt.Errorf("子代理 %q ChatModel: %w", id, err) } - subHandlers = append(subHandlers, einoSkillMW) - } - subHandlers = append(subHandlers, subSumMw) - sa, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ - Name: id, - Description: desc, - Instruction: instr, - Model: subModel, - ToolsConfig: adk.ToolsConfig{ - ToolsNodeConfig: compose.ToolsNodeConfig{ - Tools: subTools, - UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), - ToolCallMiddlewares: []compose.ToolMiddleware{ - {Invokable: softRecoveryToolCallMiddleware()}, + subDefs := ag.ToolsForRole(roleTools) + subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk) + if err != nil { + return nil, fmt.Errorf("子代理 %q 工具: %w", id, err) + } + + subToolsForCfg, subPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWSub, subTools, einoLoc, skillsRoot, conversationID, logger) + if err != nil { + return nil, fmt.Errorf("子代理 %q eino 中间件: %w", id, err) + } + + subMax := sub.MaxIterations + if subMax <= 0 { + subMax = subDefaultIter + } + + subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, logger) + if err != nil { + return nil, fmt.Errorf("子代理 %q summarization 中间件: %w", id, err) + } + + var subHandlers []adk.ChatModelAgentMiddleware + if len(subPre) > 0 { + subHandlers = append(subHandlers, subPre...) + } + if einoSkillMW != nil { + if einoFSTools && einoLoc != nil { + subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc) + if fsErr != nil { + return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr) + } + subHandlers = append(subHandlers, subFs) + } + subHandlers = append(subHandlers, einoSkillMW) + } + subHandlers = append(subHandlers, subSumMw) + + sa, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{ + Name: id, + Description: desc, + Instruction: instr, + Model: subModel, + ToolsConfig: adk.ToolsConfig{ + ToolsNodeConfig: compose.ToolsNodeConfig{ + Tools: subToolsForCfg, + UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), + ToolCallMiddlewares: []compose.ToolMiddleware{ + {Invokable: softRecoveryToolCallMiddleware()}, + }, }, + EmitInternalEvents: true, }, - EmitInternalEvents: true, - }, - MaxIterations: subMax, - Handlers: subHandlers, - }) - if err != nil { - return nil, fmt.Errorf("子代理 %q: %w", id, err) + MaxIterations: subMax, + Handlers: subHandlers, + }) + if err != nil { + return nil, fmt.Errorf("子代理 %q: %w", id, err) + } + subAgents = append(subAgents, sa) } - subAgents = append(subAgents, sa) } mainModel, err := einoopenai.NewChatModel(ctx, baseModelCfg) if err != nil { - return nil, fmt.Errorf("Deep 主模型: %w", err) + return nil, fmt.Errorf("多代理主模型: %w", err) } mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger) if err != nil { - return nil, fmt.Errorf("Deep 主代理 summarization 中间件: %w", err) + return nil, fmt.Errorf("多代理主 summarization 中间件: %w", err) } - // 与 deep.Config.Name 一致。子代理的 assistant 正文也会经 EmitInternalEvents 流出,若全部当主回复会重复(编排器总结 + 子代理原文)。 + // 与 deep.Config.Name / supervisor 主代理 Name 一致。 orchestratorName := "cyberstrike-deep" orchDescription := "Coordinates specialist agents and MCP tools for authorized security testing." - orchInstruction := strings.TrimSpace(ma.OrchestratorInstruction) - if orch != nil { + orchInstruction, orchMeta := resolveMainOrchestratorInstruction(orchMode, ma, markdownLoad) + if orchMeta != nil { + if strings.TrimSpace(orchMeta.EinoName) != "" { + orchestratorName = strings.TrimSpace(orchMeta.EinoName) + } + if d := strings.TrimSpace(orchMeta.Description); d != "" { + orchDescription = d + } + } else if orchMode == "deep" && orch != nil { if strings.TrimSpace(orch.EinoName) != "" { orchestratorName = strings.TrimSpace(orch.EinoName) } if d := strings.TrimSpace(orch.Description); d != "" { orchDescription = d } - if ins := strings.TrimSpace(orch.Instruction); ins != "" { - orchInstruction = ins - } } + + supInstr := strings.TrimSpace(orchInstruction) + if orchMode == "supervisor" { + var sb strings.Builder + if supInstr != "" { + sb.WriteString(supInstr) + sb.WriteString("\n\n") + } + sb.WriteString("你是监督协调者:可将任务通过 transfer 工具委派给下列专家子代理(使用其在系统中的 Agent 名称)。专家列表:") + for _, sa := range subAgents { + sb.WriteString("\n- ") + sb.WriteString(sa.Name(ctx)) + } + sb.WriteString("\n\n当你已完成用户目标或需要将最终结论交付用户时,使用 exit 工具结束。") + supInstr = sb.String() + } + var deepBackend filesystem.Backend var deepShell filesystem.StreamingShell if einoLoc != nil && einoFSTools { @@ -304,46 +354,128 @@ func RunDeepAgent( } deepHandlers := []adk.ChatModelAgentMiddleware{} + if len(mainOrchestratorPre) > 0 { + deepHandlers = append(deepHandlers, mainOrchestratorPre...) + } if einoSkillMW != nil { deepHandlers = append(deepHandlers, einoSkillMW) } deepHandlers = append(deepHandlers, newNoNestedTaskMiddleware(), mainSumMw) - da, err := deep.New(ctx, &deep.Config{ - Name: orchestratorName, - Description: orchDescription, - ChatModel: mainModel, - Instruction: orchInstruction, - SubAgents: subAgents, - WithoutGeneralSubAgent: ma.WithoutGeneralSubAgent, - WithoutWriteTodos: ma.WithoutWriteTodos, - MaxIteration: deepMaxIter, - Backend: deepBackend, - StreamingShell: deepShell, - // 防止 sub-agent 再调用 task(再委派 sub-agent),形成无限委派链。 - Handlers: deepHandlers, - ToolsConfig: adk.ToolsConfig{ - ToolsNodeConfig: compose.ToolsNodeConfig{ - Tools: mainTools, - UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), - ToolCallMiddlewares: []compose.ToolMiddleware{ - {Invokable: softRecoveryToolCallMiddleware()}, - }, + supHandlers := []adk.ChatModelAgentMiddleware{} + if len(mainOrchestratorPre) > 0 { + supHandlers = append(supHandlers, mainOrchestratorPre...) + } + if einoSkillMW != nil { + supHandlers = append(supHandlers, einoSkillMW) + } + supHandlers = append(supHandlers, mainSumMw) + + mainToolsCfg := adk.ToolsConfig{ + ToolsNodeConfig: compose.ToolsNodeConfig{ + Tools: mainToolsForCfg, + UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), + ToolCallMiddlewares: []compose.ToolMiddleware{ + {Invokable: softRecoveryToolCallMiddleware()}, }, - EmitInternalEvents: true, }, - }) - if err != nil { - return nil, fmt.Errorf("deep.New: %w", err) + EmitInternalEvents: true, + } + + deepOutKey, modelRetry, taskGen := deepExtrasFromConfig(ma) + + var da adk.Agent + switch orchMode { + case "plan_execute": + execModel, perr := einoopenai.NewChatModel(ctx, baseModelCfg) + if perr != nil { + return nil, fmt.Errorf("plan_execute 执行器模型: %w", perr) + } + peRoot, perr := NewPlanExecuteRoot(ctx, &PlanExecuteRootArgs{ + MainToolCallingModel: mainModel, + ExecModel: execModel, + OrchInstruction: orchInstruction, + ToolsCfg: mainToolsCfg, + ExecMaxIter: deepMaxIter, + LoopMaxIter: ma.PlanExecuteLoopMaxIterations, + }) + if perr != nil { + return nil, perr + } + da = peRoot + case "supervisor": + supCfg := &adk.ChatModelAgentConfig{ + Name: orchestratorName, + Description: orchDescription, + Instruction: supInstr, + Model: mainModel, + ToolsConfig: mainToolsCfg, + MaxIterations: deepMaxIter, + Handlers: supHandlers, + Exit: &adk.ExitTool{}, + } + if modelRetry != nil { + supCfg.ModelRetryConfig = modelRetry + } + if deepOutKey != "" { + supCfg.OutputKey = deepOutKey + } + superChat, serr := adk.NewChatModelAgent(ctx, supCfg) + if serr != nil { + return nil, fmt.Errorf("supervisor 主代理: %w", serr) + } + supRoot, serr := supervisor.New(ctx, &supervisor.Config{ + Supervisor: superChat, + SubAgents: subAgents, + }) + if serr != nil { + return nil, fmt.Errorf("supervisor.New: %w", serr) + } + da = supRoot + default: + dcfg := &deep.Config{ + Name: orchestratorName, + Description: orchDescription, + ChatModel: mainModel, + Instruction: orchInstruction, + SubAgents: subAgents, + WithoutGeneralSubAgent: ma.WithoutGeneralSubAgent, + WithoutWriteTodos: ma.WithoutWriteTodos, + MaxIteration: deepMaxIter, + Backend: deepBackend, + StreamingShell: deepShell, + Handlers: deepHandlers, + ToolsConfig: mainToolsCfg, + } + if deepOutKey != "" { + dcfg.OutputKey = deepOutKey + } + if modelRetry != nil { + dcfg.ModelRetryConfig = modelRetry + } + if taskGen != nil { + dcfg.TaskToolDescriptionGenerator = taskGen + } + dDeep, derr := deep.New(ctx, dcfg) + if derr != nil { + return nil, fmt.Errorf("deep.New: %w", derr) + } + da = dDeep } baseMsgs := historyToMessages(history) baseMsgs = append(baseMsgs, schema.UserMessage(userMessage)) streamsMainAssistant := func(agent string) bool { + if orchMode == "plan_execute" { + return planExecuteStreamsMainAssistant(agent) + } return agent == "" || agent == orchestratorName } einoRoleTag := func(agent string) string { + if orchMode == "plan_execute" { + return planExecuteEinoRoleTag(agent) + } if streamsMainAssistant(agent) { return "orchestrator" } @@ -352,6 +484,8 @@ func RunDeepAgent( var lastRunMsgs []adk.Message var lastAssistant string + // plan_execute:最后一轮 assistant 常被 replanner 的 JSON 覆盖,单独保留 executor 对用户文本。 + var lastPlanExecuteExecutor string // retryHints tracks the corrective hint to append for each retry attempt. // Index i corresponds to the hint that will be appended on attempt i+1. @@ -371,6 +505,7 @@ attemptLoop: // 仅保留主代理最后一次 assistant 输出;每轮重试重置,避免拼接失败轮次的片段。 lastAssistant = "" + lastPlanExecuteExecutor = "" var reasoningStreamSeq int64 var einoSubReplyStreamSeq int64 toolEmitSeen := make(map[string]struct{}) @@ -441,10 +576,25 @@ attemptLoop: pendingQueueByAgent = make(map[string][]string) } - runner := adk.NewRunner(ctx, adk.RunnerConfig{ + runnerCfg := adk.RunnerConfig{ Agent: da, EnableStreaming: true, - }) + } + if cp := strings.TrimSpace(ma.EinoMiddleware.CheckpointDir); cp != "" { + cpDir := filepath.Join(cp, sanitizeEinoPathSegment(conversationID)) + st, stErr := newFileCheckPointStore(cpDir) + if stErr != nil { + if logger != nil { + logger.Warn("eino checkpoint store disabled", zap.String("dir", cpDir), zap.Error(stErr)) + } + } else { + runnerCfg.CheckPointStore = st + if logger != nil { + logger.Info("eino runner: checkpoint store enabled", zap.String("dir", cpDir)) + } + } + } + runner := adk.NewRunner(ctx, runnerCfg) iter := runner.Run(ctx, msgs) for { @@ -512,6 +662,12 @@ attemptLoop: return nil, ev.Err } if ev.AgentName != "" && progress != nil { + iterEinoAgent := orchestratorName + if orchMode == "plan_execute" { + if a := strings.TrimSpace(ev.AgentName); a != "" { + iterEinoAgent = a + } + } if streamsMainAssistant(ev.AgentName) { if einoMainRound == 0 { einoMainRound = 1 @@ -519,7 +675,8 @@ attemptLoop: "iteration": 1, "einoScope": "main", "einoRole": "orchestrator", - "einoAgent": orchestratorName, + "einoAgent": iterEinoAgent, + "orchestration": orchMode, "conversationId": conversationID, "source": "eino", }) @@ -529,7 +686,8 @@ attemptLoop: "iteration": einoMainRound, "einoScope": "main", "einoRole": "orchestrator", - "einoAgent": orchestratorName, + "einoAgent": iterEinoAgent, + "orchestration": orchMode, "conversationId": conversationID, "source": "eino", }) @@ -540,6 +698,7 @@ attemptLoop: "conversationId": conversationID, "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, }) } if ev.Output == nil || ev.Output.MessageOutput == nil { @@ -572,10 +731,11 @@ attemptLoop: if reasoningStreamID == "" { reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) progress("thinking_stream_start", " ", map[string]interface{}{ - "streamId": reasoningStreamID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), + "streamId": reasoningStreamID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, }) } progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{ @@ -590,6 +750,8 @@ attemptLoop: "mcpExecutionIds": snapshotMCPIDs(), "messageGeneratedBy": "eino:" + ev.AgentName, "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) streamHeaderSent = true } @@ -597,6 +759,8 @@ attemptLoop: "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) mainAssistantBuf.WriteString(chunk.Content) } else if !streamsMainAssistant(ev.AgentName) { @@ -627,6 +791,9 @@ attemptLoop: if streamsMainAssistant(ev.AgentName) { if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" { lastAssistant = s + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) + } } } if subAssistantBuf.Len() > 0 && progress != nil { @@ -670,6 +837,7 @@ attemptLoop: "source": "eino", "einoAgent": ev.AgentName, "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, }) } body := strings.TrimSpace(msg.Content) @@ -681,14 +849,21 @@ attemptLoop: "mcpExecutionIds": snapshotMCPIDs(), "messageGeneratedBy": "eino:" + ev.AgentName, "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) progress("response_delta", body, map[string]interface{}{ "conversationId": conversationID, "mcpExecutionIds": snapshotMCPIDs(), "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) } lastAssistant = body + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) + } } else if progress != nil { progress("eino_agent_reply", body, map[string]interface{}{ "conversationId": conversationID, @@ -766,6 +941,13 @@ attemptLoop: histJSON, _ := json.Marshal(lastRunMsgs) cleaned := strings.TrimSpace(lastAssistant) + if orchMode == "plan_execute" { + if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" { + cleaned = e + } else { + cleaned = UnwrapPlanExecuteUserText(cleaned) + } + } cleaned = dedupeRepeatedParagraphs(cleaned, 80) cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100) out := &RunResult{ @@ -775,7 +957,7 @@ attemptLoop: LastReActOutput: cleaned, } if out.Response == "" { - out.Response = "(Eino DeepAgent 已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" + out.Response = "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)" out.LastReActOutput = out.Response } return out, nil