Add files via upload

This commit is contained in:
公明
2026-04-29 02:59:34 +08:00
committed by GitHub
parent 1318607813
commit eda5f9bba1
12 changed files with 663 additions and 43 deletions
+112
View File
@@ -72,6 +72,8 @@ type MultiAgentEinoMiddlewareConfig struct {
ToolSearchEnable bool `yaml:"tool_search_enable,omitempty" json:"tool_search_enable,omitempty"`
ToolSearchMinTools int `yaml:"tool_search_min_tools,omitempty" json:"tool_search_min_tools,omitempty"` // default 20; applies when len(tools) >= this
ToolSearchAlwaysVisible int `yaml:"tool_search_always_visible,omitempty" json:"tool_search_always_visible,omitempty"` // default 12; first N tools stay always visible
// ToolSearchAlwaysVisibleTools keeps specified tool names always visible (never hidden by tool_search).
ToolSearchAlwaysVisibleTools []string `yaml:"tool_search_always_visible_tools,omitempty" json:"tool_search_always_visible_tools,omitempty"`
// Plantask adds TaskCreate/Get/Update/List (file-backed under skills dir); requires eino_skills + local backend.
PlantaskEnable bool `yaml:"plantask_enable,omitempty" json:"plantask_enable,omitempty"`
// PlantaskRelDir relative to skills_dir for per-conversation task boards (default .eino/plantask).
@@ -79,8 +81,24 @@ type MultiAgentEinoMiddlewareConfig struct {
// Reduction truncates/offloads large tool outputs (requires eino local backend for Write).
ReductionEnable bool `yaml:"reduction_enable,omitempty" json:"reduction_enable,omitempty"`
ReductionRootDir string `yaml:"reduction_root_dir,omitempty" json:"reduction_root_dir,omitempty"` // default: os temp + conversation id
ReductionMaxLengthForTrunc int `yaml:"reduction_max_length_for_trunc,omitempty" json:"reduction_max_length_for_trunc,omitempty"` // default 12000
ReductionMaxTokensForClear int `yaml:"reduction_max_tokens_for_clear,omitempty" json:"reduction_max_tokens_for_clear,omitempty"` // default 50000
ReductionClearExclude []string `yaml:"reduction_clear_exclude,omitempty" json:"reduction_clear_exclude,omitempty"`
ReductionSubAgents bool `yaml:"reduction_sub_agents,omitempty" json:"reduction_sub_agents,omitempty"` // also attach to sub-agents
// SummarizationTriggerRatio controls summarization trigger threshold as max_total_tokens * ratio (default 0.8).
SummarizationTriggerRatio float64 `yaml:"summarization_trigger_ratio,omitempty" json:"summarization_trigger_ratio,omitempty"`
// SummarizationEmitInternalEvents controls middleware internal event emission (default true).
SummarizationEmitInternalEvents *bool `yaml:"summarization_emit_internal_events,omitempty" json:"summarization_emit_internal_events,omitempty"`
// HistoryInputBudgetRatio caps pre-agent history tokens as max_total_tokens * ratio (default 0.35).
HistoryInputBudgetRatio float64 `yaml:"history_input_budget_ratio,omitempty" json:"history_input_budget_ratio,omitempty"`
// PlanExecuteUserInputBudgetRatio caps planner/replanner/executor userInput prompt budget ratio (default 0.35).
PlanExecuteUserInputBudgetRatio float64 `yaml:"plan_execute_user_input_budget_ratio,omitempty" json:"plan_execute_user_input_budget_ratio,omitempty"`
// PlanExecuteExecutedStepsBudgetRatio caps executed_steps prompt budget ratio (default 0.2).
PlanExecuteExecutedStepsBudgetRatio float64 `yaml:"plan_execute_executed_steps_budget_ratio,omitempty" json:"plan_execute_executed_steps_budget_ratio,omitempty"`
// PlanExecuteMaxStepResultRunes caps each executed step result length for prompt view (default 4000).
PlanExecuteMaxStepResultRunes int `yaml:"plan_execute_max_step_result_runes,omitempty" json:"plan_execute_max_step_result_runes,omitempty"`
// PlanExecuteKeepLastSteps keeps only the tail steps in prompt view (default 8).
PlanExecuteKeepLastSteps int `yaml:"plan_execute_keep_last_steps,omitempty" json:"plan_execute_keep_last_steps,omitempty"`
// CheckpointDir when non-empty enables adk.Runner CheckPointStore (file-backed) for interrupt/resume persistence.
CheckpointDir string `yaml:"checkpoint_dir,omitempty" json:"checkpoint_dir,omitempty"`
// DeepOutputKey passed to deep.Config OutputKey (session final text); empty = off.
@@ -91,6 +109,97 @@ type MultiAgentEinoMiddlewareConfig struct {
TaskToolDescriptionPrefix string `yaml:"task_tool_description_prefix,omitempty" json:"task_tool_description_prefix,omitempty"`
}
func (c MultiAgentEinoMiddlewareConfig) SummarizationTriggerRatioEffective() float64 {
v := c.SummarizationTriggerRatio
if v <= 0 {
return 0.8
}
if v < 0.5 {
return 0.5
}
if v > 0.95 {
return 0.95
}
return v
}
func (c MultiAgentEinoMiddlewareConfig) SummarizationEmitInternalEventsEffective() bool {
if c.SummarizationEmitInternalEvents != nil {
return *c.SummarizationEmitInternalEvents
}
return true
}
func (c MultiAgentEinoMiddlewareConfig) HistoryInputBudgetRatioEffective() float64 {
v := c.HistoryInputBudgetRatio
if v <= 0 {
return 0.35
}
if v < 0.15 {
return 0.15
}
if v > 0.6 {
return 0.6
}
return v
}
func (c MultiAgentEinoMiddlewareConfig) PlanExecuteUserInputBudgetRatioEffective() float64 {
v := c.PlanExecuteUserInputBudgetRatio
if v <= 0 {
return 0.35
}
if v < 0.1 {
return 0.1
}
if v > 0.6 {
return 0.6
}
return v
}
func (c MultiAgentEinoMiddlewareConfig) PlanExecuteExecutedStepsBudgetRatioEffective() float64 {
v := c.PlanExecuteExecutedStepsBudgetRatio
if v <= 0 {
return 0.2
}
if v < 0.08 {
return 0.08
}
if v > 0.5 {
return 0.5
}
return v
}
func (c MultiAgentEinoMiddlewareConfig) PlanExecuteMaxStepResultRunesEffective() int {
if c.PlanExecuteMaxStepResultRunes > 0 {
return c.PlanExecuteMaxStepResultRunes
}
return 4000
}
func (c MultiAgentEinoMiddlewareConfig) PlanExecuteKeepLastStepsEffective() int {
if c.PlanExecuteKeepLastSteps > 0 {
return c.PlanExecuteKeepLastSteps
}
return 8
}
func (c MultiAgentEinoMiddlewareConfig) ReductionMaxLengthForTruncEffective() int {
if c.ReductionMaxLengthForTrunc > 0 {
return c.ReductionMaxLengthForTrunc
}
return 12000
}
func (c MultiAgentEinoMiddlewareConfig) ReductionMaxTokensForClearEffective() int {
if c.ReductionMaxTokensForClear > 0 {
return c.ReductionMaxTokensForClear
}
return 50000
}
// MultiAgentEinoSkillsConfig toggles Eino official skill progressive disclosure and host filesystem tools.
type MultiAgentEinoSkillsConfig struct {
// Disable skips skill middleware (and does not attach local FS tools for Deep).
@@ -137,6 +246,8 @@ type MultiAgentPublic struct {
SubAgentCount int `json:"sub_agent_count"`
Orchestration string `json:"orchestration,omitempty"`
PlanExecuteLoopMaxIterations int `json:"plan_execute_loop_max_iterations"`
ToolSearchAlwaysVisibleTools []string `json:"tool_search_always_visible_tools,omitempty"`
ToolSearchAlwaysVisibleEffectiveTools []string `json:"tool_search_always_visible_effective_tools,omitempty"`
}
// NormalizeMultiAgentOrchestration 返回 deep、plan_execute 或 supervisor。
@@ -158,6 +269,7 @@ type MultiAgentAPIUpdate struct {
RobotUseMultiAgent bool `json:"robot_use_multi_agent"`
BatchUseMultiAgent bool `json:"batch_use_multi_agent"`
PlanExecuteLoopMaxIterations *int `json:"plan_execute_loop_max_iterations,omitempty"`
ToolSearchAlwaysVisibleTools []string `json:"tool_search_always_visible_tools,omitempty"`
}
// RobotsConfig 机器人配置(企业微信、钉钉、飞书等)
+10
View File
@@ -4,6 +4,8 @@ import (
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
"time"
@@ -416,6 +418,14 @@ func (db *DB) DeleteConversation(id string) error {
if err != nil {
return fmt.Errorf("删除对话失败: %w", err)
}
// Best-effort cleanup for conversation-scoped filesystem artifacts
// (e.g., summarization transcript, reduction/checkpoint files under conversation_artifacts/<id>).
if base := strings.TrimSpace(db.conversationArtifactsDir); base != "" {
artDir := filepath.Join(base, id)
if rmErr := os.RemoveAll(artDir); rmErr != nil {
db.logger.Warn("删除会话 artifacts 目录失败", zap.String("conversationId", id), zap.String("dir", artDir), zap.Error(rmErr))
}
}
db.logger.Info("对话及其所有相关数据已删除", zap.String("conversationId", id))
return nil
+11 -1
View File
@@ -3,6 +3,8 @@ package database
import (
"database/sql"
"fmt"
"os"
"path/filepath"
"strings"
"time"
@@ -21,7 +23,8 @@ func configureDBPool(db *sql.DB) {
// DB 数据库连接
type DB struct {
*sql.DB
logger *zap.Logger
logger *zap.Logger
conversationArtifactsDir string
}
// NewDB 创建数据库连接
@@ -41,6 +44,13 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) {
DB: db,
logger: logger,
}
// Keep conversation-scoped artifacts near database files, so cleanup can follow conversation lifecycle.
baseDir := filepath.Join(filepath.Dir(dbPath), "conversation_artifacts")
if mkErr := os.MkdirAll(baseDir, 0o755); mkErr == nil {
database.conversationArtifactsDir = baseDir
} else if logger != nil {
logger.Warn("创建 conversation artifacts 目录失败", zap.String("dir", baseDir), zap.Error(mkErr))
}
// 初始化表
if err := database.initTables(); err != nil {
+133
View File
@@ -0,0 +1,133 @@
package multiagent
import (
"context"
"strings"
"cyberstrike-ai/internal/agent"
"github.com/bytedance/sonic"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
"go.uber.org/zap"
)
type einoModelInputTelemetryMiddleware struct {
adk.BaseChatModelAgentMiddleware
logger *zap.Logger
modelName string
conversationID string
phase string
}
func newEinoModelInputTelemetryMiddleware(
logger *zap.Logger,
modelName string,
conversationID string,
phase string,
) adk.ChatModelAgentMiddleware {
if logger == nil {
return nil
}
return &einoModelInputTelemetryMiddleware{
logger: logger,
modelName: strings.TrimSpace(modelName),
conversationID: strings.TrimSpace(conversationID),
phase: strings.TrimSpace(phase),
}
}
func (m *einoModelInputTelemetryMiddleware) BeforeModelRewriteState(
ctx context.Context,
state *adk.ChatModelAgentState,
mc *adk.ModelContext,
) (context.Context, *adk.ChatModelAgentState, error) {
if m == nil || m.logger == nil || state == nil {
return ctx, state, nil
}
tokens := estimateTokensForMessagesAndTools(ctx, m.modelName, state.Messages, mcTools(mc))
m.logger.Info("eino model input estimated",
zap.String("phase", m.phase),
zap.String("conversation_id", m.conversationID),
zap.Int("messages", len(state.Messages)),
zap.Int("tools", len(mcTools(mc))),
zap.Int("input_tokens_estimated", tokens),
)
return ctx, state, nil
}
func mcTools(mc *adk.ModelContext) []*schema.ToolInfo {
if mc == nil || len(mc.Tools) == 0 {
return nil
}
return mc.Tools
}
func estimateTokensForMessagesAndTools(
_ context.Context,
modelName string,
messages []adk.Message,
tools []*schema.ToolInfo,
) int {
var sb strings.Builder
for _, msg := range messages {
if msg == nil {
continue
}
sb.WriteString(string(msg.Role))
sb.WriteByte('\n')
sb.WriteString(msg.Content)
sb.WriteByte('\n')
if msg.ReasoningContent != "" {
sb.WriteString(msg.ReasoningContent)
sb.WriteByte('\n')
}
if len(msg.ToolCalls) > 0 {
if b, err := sonic.Marshal(msg.ToolCalls); err == nil {
sb.Write(b)
sb.WriteByte('\n')
}
}
}
for _, tl := range tools {
if tl == nil {
continue
}
cp := *tl
cp.Extra = nil
if text, err := sonic.MarshalString(cp); err == nil {
sb.WriteString(text)
sb.WriteByte('\n')
}
}
text := sb.String()
if text == "" {
return 0
}
tc := agent.NewTikTokenCounter()
if n, err := tc.Count(modelName, text); err == nil {
return n
}
return (len(text) + 3) / 4
}
func logPlanExecuteModelInputEstimate(
logger *zap.Logger,
modelName string,
conversationID string,
phase string,
msgs []adk.Message,
) {
if logger == nil {
return
}
tokens := estimateTokensForMessagesAndTools(context.Background(), modelName, msgs, nil)
logger.Info("eino model input estimated",
zap.String("phase", phase),
zap.String("conversation_id", strings.TrimSpace(conversationID)),
zap.Int("messages", len(msgs)),
zap.Int("tools", 0),
zap.Int("input_tokens_estimated", tokens),
)
}
+64 -1
View File
@@ -8,6 +8,7 @@ import (
"strings"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/mcp/builtin"
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk"
@@ -65,6 +66,66 @@ func splitToolsForToolSearch(all []tool.BaseTool, alwaysVisible int) (static []t
return append([]tool.BaseTool(nil), all[:alwaysVisible]...), append([]tool.BaseTool(nil), all[alwaysVisible:]...), true
}
func splitToolsForToolSearchByNames(all []tool.BaseTool, names []string, fallbackAlwaysVisible int) (static []tool.BaseTool, dynamic []tool.BaseTool, ok bool) {
nameSet := make(map[string]struct{}, len(names))
for _, n := range names {
n = strings.TrimSpace(strings.ToLower(n))
if n == "" {
continue
}
nameSet[n] = struct{}{}
}
if len(nameSet) == 0 {
return splitToolsForToolSearch(all, fallbackAlwaysVisible)
}
static = make([]tool.BaseTool, 0, len(all))
dynamic = make([]tool.BaseTool, 0, len(all))
for _, t := range all {
if t == nil {
continue
}
info, err := t.Info(context.Background())
name := ""
if err == nil && info != nil {
name = strings.TrimSpace(strings.ToLower(info.Name))
}
if _, keep := nameSet[name]; keep {
static = append(static, t)
continue
}
dynamic = append(dynamic, t)
}
if len(static) == 0 || len(dynamic) == 0 {
// fallback: preserve previous behavior when whitelist misses all or includes all.
return splitToolsForToolSearch(all, fallbackAlwaysVisible)
}
return static, dynamic, true
}
func mergeAlwaysVisibleToolNames(configured []string) []string {
merged := make([]string, 0, len(configured)+32)
seen := make(map[string]struct{}, len(configured)+32)
add := func(name string) {
n := strings.TrimSpace(strings.ToLower(name))
if n == "" {
return
}
if _, ok := seen[n]; ok {
return
}
seen[n] = struct{}{}
merged = append(merged, n)
}
for _, n := range configured {
add(n)
}
// Always include hardcoded backend builtin MCP tools from constants.
for _, n := range builtin.GetAllBuiltinTools() {
add(n)
}
return merged
}
func buildReductionMiddleware(ctx context.Context, mw config.MultiAgentEinoMiddlewareConfig, convID string, loc *localbk.Local, logger *zap.Logger) (adk.ChatModelAgentMiddleware, error) {
if loc == nil {
return nil, fmt.Errorf("reduction: local backend nil")
@@ -87,6 +148,8 @@ func buildReductionMiddleware(ctx context.Context, mw config.MultiAgentEinoMiddl
RootDir: root,
ReadFileToolName: "read_file",
ClearExcludeTools: excl,
MaxLengthForTrunc: mw.ReductionMaxLengthForTruncEffective(),
MaxTokensForClear: int64(mw.ReductionMaxTokensForClearEffective()),
})
if err != nil {
return nil, err
@@ -142,7 +205,7 @@ func prependEinoMiddlewares(
alwaysVis = 12
}
if mw.ToolSearchEnable && len(tools) >= minTools {
static, dynamic, split := splitToolsForToolSearch(tools, alwaysVis)
static, dynamic, split := splitToolsForToolSearchByNames(tools, mergeAlwaysVisibleToolNames(mw.ToolSearchAlwaysVisibleTools), alwaysVis)
if split && len(dynamic) > 0 {
ts, terr := toolsearch.New(ctx, &toolsearch.Config{DynamicTools: dynamic})
if terr != nil {
@@ -0,0 +1,38 @@
package multiagent
import (
"context"
"fmt"
"github.com/cloudwego/eino/adk"
)
func applyBeforeModelRewriteHandlers(
ctx context.Context,
msgs []adk.Message,
handlers []adk.ChatModelAgentMiddleware,
) ([]adk.Message, error) {
if len(msgs) == 0 || len(handlers) == 0 {
return msgs, nil
}
state := &adk.ChatModelAgentState{Messages: msgs}
modelCtx := &adk.ModelContext{}
curCtx := ctx
for _, h := range handlers {
if h == nil {
continue
}
nextCtx, nextState, err := h.BeforeModelRewriteState(curCtx, state, modelCtx)
if err != nil {
return nil, fmt.Errorf("before model rewrite: %w", err)
}
if nextCtx != nil {
curCtx = nextCtx
}
if nextState != nil {
state = nextState
}
}
return state.Messages, nil
}
+168 -20
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/config"
"github.com/cloudwego/eino-ext/components/model/openai"
@@ -25,7 +26,12 @@ type PlanExecuteRootArgs struct {
LoopMaxIter int
// AppCfg / Logger 非空时为 Executor 挂载与 Deep/Supervisor 一致的 Eino summarization 中间件。
AppCfg *config.Config
MwCfg *config.MultiAgentEinoMiddlewareConfig
// ConversationID is used for transcript/isolation paths in middleware.
ConversationID string
Logger *zap.Logger
// ModelName is used for model input token estimation logs.
ModelName string
// ExecPreMiddlewares 是由 prependEinoMiddlewares 构建的前置中间件(patchtoolcalls, reduction, toolsearch, plantask),
// 与 Deep/Supervisor 主代理的 mainOrchestratorPre 一致。
ExecPreMiddlewares []adk.ChatModelAgentMiddleware
@@ -33,6 +39,8 @@ type PlanExecuteRootArgs struct {
SkillMiddleware adk.ChatModelAgentMiddleware
// FilesystemMiddleware 是 Eino filesystem 中间件,当 eino_skills.filesystem_tools 启用时提供本机文件读写与 Shell 能力(可选)。
FilesystemMiddleware adk.ChatModelAgentMiddleware
// PlannerReplannerRewriteHandlers applies BeforeModelRewriteState pipeline for planner/replanner input.
PlannerReplannerRewriteHandlers []adk.ChatModelAgentMiddleware
}
// NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。
@@ -50,7 +58,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma
plannerCfg := &planexecute.PlannerConfig{
ToolCallingChatModel: tcm,
}
if fn := planExecutePlannerGenInput(a.OrchInstruction); fn != nil {
if fn := planExecutePlannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers); fn != nil {
plannerCfg.GenInputFn = fn
}
planner, err := planexecute.NewPlanner(ctx, plannerCfg)
@@ -59,7 +67,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma
}
replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{
ChatModel: tcm,
GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction),
GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers),
})
if err != nil {
return nil, fmt.Errorf("plan_execute replanner: %w", err)
@@ -81,17 +89,20 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma
}
// 4. summarization(最后,与 Deep/Supervisor 一致)
if a.AppCfg != nil {
sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.Logger)
sumMw, sumErr := newEinoSummarizationMiddleware(ctx, a.ExecModel, a.AppCfg, a.MwCfg, a.ConversationID, a.Logger)
if sumErr != nil {
return nil, fmt.Errorf("plan_execute executor summarization: %w", sumErr)
}
execHandlers = append(execHandlers, sumMw)
}
if teleMw := newEinoModelInputTelemetryMiddleware(a.Logger, a.ModelName, a.ConversationID, "plan_execute_executor"); teleMw != nil {
execHandlers = append(execHandlers, teleMw)
}
executor, err := newPlanExecuteExecutor(ctx, &planexecute.ExecutorConfig{
Model: a.ExecModel,
ToolsConfig: a.ToolsCfg,
MaxIterations: a.ExecMaxIter,
GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction),
GenInputFn: planExecuteExecutorGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID),
}, execHandlers)
if err != nil {
return nil, fmt.Errorf("plan_execute executor: %w", err)
@@ -110,20 +121,42 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma
// planExecutePlannerGenInput 将 orchestrator instruction 作为 SystemMessage 注入 planner 输入。
// 返回 nil 时 Eino 使用内置默认 planner prompt。
func planExecutePlannerGenInput(orchInstruction string) planexecute.GenPlannerModelInputFn {
func planExecutePlannerGenInput(
orchInstruction string,
appCfg *config.Config,
mwCfg *config.MultiAgentEinoMiddlewareConfig,
logger *zap.Logger,
modelName string,
conversationID string,
rewriteHandlers []adk.ChatModelAgentMiddleware,
) planexecute.GenPlannerModelInputFn {
oi := strings.TrimSpace(orchInstruction)
if oi == "" {
if oi == "" && appCfg == nil {
return nil
}
return func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) {
userInput = capPlanExecuteUserInputMessages(userInput, appCfg, mwCfg)
msgs := make([]adk.Message, 0, 1+len(userInput))
msgs = append(msgs, schema.SystemMessage(oi))
if oi != "" {
msgs = append(msgs, schema.SystemMessage(oi))
}
msgs = append(msgs, userInput...)
if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 {
msgs = rewritten
}
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_planner", msgs)
return msgs, nil
}
}
func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInputFn {
func planExecuteExecutorGenInput(
orchInstruction string,
appCfg *config.Config,
mwCfg *config.MultiAgentEinoMiddlewareConfig,
logger *zap.Logger,
modelName string,
conversationID string,
) planexecute.GenModelInputFn {
oi := strings.TrimSpace(orchInstruction)
return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) {
planContent, err := in.Plan.MarshalJSON()
@@ -131,9 +164,9 @@ func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInp
return nil, err
}
userMsgs, err := planexecute.ExecutorPrompt.Format(ctx, map[string]any{
"input": planExecuteFormatInput(in.UserInput),
"input": planExecuteFormatInput(capPlanExecuteUserInputMessages(in.UserInput, appCfg, mwCfg)),
"plan": string(planContent),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, appCfg, mwCfg),
"step": in.Plan.FirstStep(),
})
if err != nil {
@@ -142,6 +175,7 @@ func planExecuteExecutorGenInput(orchInstruction string) planexecute.GenModelInp
if oi != "" {
userMsgs = append([]adk.Message{schema.SystemMessage(oi)}, userMsgs...)
}
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_executor_gen_input", userMsgs)
return userMsgs, nil
}
}
@@ -155,18 +189,22 @@ func planExecuteFormatInput(input []adk.Message) string {
return sb.String()
}
func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep) string {
capped := capPlanExecuteExecutedSteps(results)
var sb strings.Builder
for _, result := range capped {
sb.WriteString(fmt.Sprintf("Step: %s\nResult: %s\n\n", result.Step, result.Result))
}
return sb.String()
func planExecuteFormatExecutedSteps(results []planexecute.ExecutedStep, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) string {
capped := capPlanExecuteExecutedStepsWithConfig(results, mwCfg)
return renderPlanExecuteStepsByBudget(capped, appCfg, mwCfg)
}
// planExecuteReplannerGenInput 与 Eino 默认 Replanner 输入一致,但 executed_steps 经 cap 后再写入 prompt
// 且在 orchInstruction 非空时 prepend SystemMessage 使 replanner 也能接收全局指令。
func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelInputFn {
func planExecuteReplannerGenInput(
orchInstruction string,
appCfg *config.Config,
mwCfg *config.MultiAgentEinoMiddlewareConfig,
logger *zap.Logger,
modelName string,
conversationID string,
rewriteHandlers []adk.ChatModelAgentMiddleware,
) planexecute.GenModelInputFn {
oi := strings.TrimSpace(orchInstruction)
return func(ctx context.Context, in *planexecute.ExecutionContext) ([]adk.Message, error) {
planContent, err := in.Plan.MarshalJSON()
@@ -175,8 +213,8 @@ func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelIn
}
msgs, err := planexecute.ReplannerPrompt.Format(ctx, map[string]any{
"plan": string(planContent),
"input": planExecuteFormatInput(in.UserInput),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps),
"input": planExecuteFormatInput(capPlanExecuteUserInputMessages(in.UserInput, appCfg, mwCfg)),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, appCfg, mwCfg),
"plan_tool": planexecute.PlanToolInfo.Name,
"respond_tool": planexecute.RespondToolInfo.Name,
})
@@ -186,10 +224,120 @@ func planExecuteReplannerGenInput(orchInstruction string) planexecute.GenModelIn
if oi != "" {
msgs = append([]adk.Message{schema.SystemMessage(oi)}, msgs...)
}
if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 {
msgs = rewritten
}
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_replanner", msgs)
return msgs, nil
}
}
func capPlanExecuteUserInputMessages(input []adk.Message, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message {
if len(input) == 0 {
return input
}
maxTotal := 120000
modelName := "gpt-4o"
if appCfg != nil {
if appCfg.OpenAI.MaxTotalTokens > 0 {
maxTotal = appCfg.OpenAI.MaxTotalTokens
}
if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" {
modelName = m
}
}
// Reserve most tokens for planner/replanner prompt and tool schema.
ratio := 0.35
if mwCfg != nil {
ratio = mwCfg.PlanExecuteUserInputBudgetRatioEffective()
}
budget := int(float64(maxTotal) * ratio)
if budget < 4096 {
budget = 4096
}
tc := agent.NewTikTokenCounter()
out := make([]adk.Message, 0, len(input))
used := 0
for i := len(input) - 1; i >= 0; i-- {
msg := input[i]
if msg == nil {
continue
}
n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content)
if err != nil {
n = (len(msg.Content) + 3) / 4
}
if n <= 0 {
n = 1
}
if used+n > budget {
break
}
used += n
out = append(out, msg)
}
for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 {
out[i], out[j] = out[j], out[i]
}
if len(out) == 0 {
// Keep the latest user message at least.
return []adk.Message{input[len(input)-1]}
}
return out
}
func renderPlanExecuteStepsByBudget(steps []planexecute.ExecutedStep, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) string {
if len(steps) == 0 {
return ""
}
maxTotal := 120000
modelName := "gpt-4o"
if appCfg != nil {
if appCfg.OpenAI.MaxTotalTokens > 0 {
maxTotal = appCfg.OpenAI.MaxTotalTokens
}
if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" {
modelName = m
}
}
ratio := 0.2
if mwCfg != nil {
ratio = mwCfg.PlanExecuteExecutedStepsBudgetRatioEffective()
}
budget := int(float64(maxTotal) * ratio)
if budget < 3072 {
budget = 3072
}
tc := agent.NewTikTokenCounter()
var kept []string
used := 0
skipped := 0
for i := len(steps) - 1; i >= 0; i-- {
block := fmt.Sprintf("Step: %s\nResult: %s\n\n", steps[i].Step, steps[i].Result)
n, err := tc.Count(modelName, block)
if err != nil {
n = (len(block) + 3) / 4
}
if n <= 0 {
n = 1
}
if used+n > budget {
skipped = i + 1
break
}
used += n
kept = append(kept, block)
}
var sb strings.Builder
if skipped > 0 {
sb.WriteString(fmt.Sprintf("Earlier executed steps omitted due to context budget: %d steps.\n\n", skipped))
}
for i := len(kept) - 1; i >= 0; i-- {
sb.WriteString(kept[i])
}
return sb.String()
}
// planExecuteStreamsMainAssistant 将规划/执行/重规划各阶段助手流式输出映射到主对话区。
func planExecuteStreamsMainAssistant(agent string) bool {
if agent == "" {
+5 -2
View File
@@ -125,7 +125,7 @@ func RunEinoSingleChatModelAgent(
return nil, fmt.Errorf("eino single 模型: %w", err)
}
mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger)
mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, &ma.EinoMiddleware, conversationID, logger)
if err != nil {
return nil, fmt.Errorf("eino single summarization: %w", err)
}
@@ -145,6 +145,9 @@ func RunEinoSingleChatModelAgent(
handlers = append(handlers, einoSkillMW)
}
handlers = append(handlers, mainSumMw)
if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "eino_single"); teleMw != nil {
handlers = append(handlers, teleMw)
}
maxIter := ma.MaxIteration
if maxIter <= 0 {
@@ -188,7 +191,7 @@ func RunEinoSingleChatModelAgent(
return nil, fmt.Errorf("eino single NewChatModelAgent: %w", err)
}
baseMsgs := historyToMessages(history)
baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware)
baseMsgs = append(baseMsgs, schema.UserMessage(userMessage))
streamsMainAssistant := func(agent string) bool {
+31 -2
View File
@@ -3,6 +3,8 @@ package multiagent
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
"cyberstrike-ai/internal/agent"
@@ -32,6 +34,8 @@ func newEinoSummarizationMiddleware(
ctx context.Context,
summaryModel model.BaseChatModel,
appCfg *config.Config,
mwCfg *config.MultiAgentEinoMiddlewareConfig,
conversationID string,
logger *zap.Logger,
) (adk.ChatModelAgentMiddleware, error) {
if summaryModel == nil || appCfg == nil {
@@ -41,7 +45,14 @@ func newEinoSummarizationMiddleware(
if maxTotal <= 0 {
maxTotal = 120000
}
trigger := int(float64(maxTotal) * 0.9)
triggerRatio := 0.8
emitInternalEvents := true
if mwCfg != nil {
triggerRatio = mwCfg.SummarizationTriggerRatioEffective()
emitInternalEvents = mwCfg.SummarizationEmitInternalEventsEffective()
}
// Keep enough safety margin for tokenizer/model-side accounting mismatch.
trigger := int(float64(maxTotal) * triggerRatio)
if trigger < 4096 {
trigger = maxTotal
if trigger < 4096 {
@@ -65,6 +76,18 @@ func newEinoSummarizationMiddleware(
if recentTrailMax > trigger/2 {
recentTrailMax = trigger / 2
}
transcriptPath := ""
if conv := strings.TrimSpace(conversationID); conv != "" {
baseRoot := filepath.Join(os.TempDir(), "cyberstrike-summarization")
if dbPath := strings.TrimSpace(appCfg.Database.Path); dbPath != "" {
// Persist with the same lifecycle as local conversation storage.
baseRoot = filepath.Join(filepath.Dir(dbPath), "conversation_artifacts", sanitizeEinoPathSegment(conv), "summarization")
}
base := baseRoot
if mkErr := os.MkdirAll(base, 0o755); mkErr == nil {
transcriptPath = filepath.Join(base, "transcript.txt")
}
}
mw, err := summarization.New(ctx, &summarization.Config{
Model: summaryModel,
@@ -73,7 +96,8 @@ func newEinoSummarizationMiddleware(
},
TokenCounter: tokenCounter,
UserInstruction: einoSummarizeUserInstruction,
EmitInternalEvents: false,
EmitInternalEvents: emitInternalEvents,
TranscriptFilePath: transcriptPath,
PreserveUserMessages: &summarization.PreserveUserMessages{
Enabled: true,
MaxTokens: preserveMax,
@@ -85,11 +109,16 @@ func newEinoSummarizationMiddleware(
if logger == nil {
return nil
}
beforeTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: before.Messages})
afterTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: after.Messages})
logger.Info("eino summarization 已压缩上下文",
zap.Int("messages_before", len(before.Messages)),
zap.Int("messages_after", len(after.Messages)),
zap.Int("tokens_before_estimated", beforeTokens),
zap.Int("tokens_after_estimated", afterTokens),
zap.Int("max_total_tokens", maxTotal),
zap.Int("trigger_context_tokens", trigger),
zap.String("transcript_file", transcriptPath),
)
return nil
},
+1 -1
View File
@@ -71,7 +71,7 @@ func planExecuteDefaultGenExecutorInput(ctx context.Context, in *planexecute.Exe
return planexecute.ExecutorPrompt.Format(ctx, map[string]any{
"input": planExecuteFormatInput(in.UserInput),
"plan": string(planContent),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps),
"executed_steps": planExecuteFormatExecutedSteps(in.ExecutedSteps, nil, nil),
"step": in.Plan.FirstStep(),
})
}
+22 -7
View File
@@ -5,6 +5,8 @@ import (
"strings"
"unicode/utf8"
"cyberstrike-ai/internal/config"
"github.com/cloudwego/eino/adk/prebuilt/planexecute"
)
@@ -12,8 +14,11 @@ import (
// 此处仅约束「写入模型 prompt 的视图」,不修改 Eino session 中的原始 ExecutedSteps。
const (
planExecuteMaxStepResultRunes = 12000
planExecuteKeepLastSteps = 16
defaultPlanExecuteMaxStepResultRunes = 4000
defaultPlanExecuteKeepLastSteps = 8
// Backward-compatible aliases for tests and existing references.
planExecuteMaxStepResultRunes = defaultPlanExecuteMaxStepResultRunes
planExecuteKeepLastSteps = defaultPlanExecuteKeepLastSteps
)
func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string {
@@ -29,16 +34,26 @@ func truncateRunesWithSuffix(s string, maxRunes int, suffix string) string {
// capPlanExecuteExecutedSteps 折叠较早步骤、截断单步过长结果,供 prompt 使用。
func capPlanExecuteExecutedSteps(steps []planexecute.ExecutedStep) []planexecute.ExecutedStep {
return capPlanExecuteExecutedStepsWithConfig(steps, nil)
}
func capPlanExecuteExecutedStepsWithConfig(steps []planexecute.ExecutedStep, mwCfg *config.MultiAgentEinoMiddlewareConfig) []planexecute.ExecutedStep {
if len(steps) == 0 {
return steps
}
maxStepResultRunes := defaultPlanExecuteMaxStepResultRunes
keepLastSteps := defaultPlanExecuteKeepLastSteps
if mwCfg != nil {
maxStepResultRunes = mwCfg.PlanExecuteMaxStepResultRunesEffective()
keepLastSteps = mwCfg.PlanExecuteKeepLastStepsEffective()
}
out := make([]planexecute.ExecutedStep, 0, len(steps)+1)
start := 0
if len(steps) > planExecuteKeepLastSteps {
start = len(steps) - planExecuteKeepLastSteps
if len(steps) > keepLastSteps {
start = len(steps) - keepLastSteps
var b strings.Builder
b.WriteString(fmt.Sprintf("(上文已完成 %d 步;此处仅保留步骤标题以节省上下文,完整输出已省略。后续 %d 步仍保留正文。)\n",
start, planExecuteKeepLastSteps))
start, keepLastSteps))
for i := 0; i < start; i++ {
b.WriteString(fmt.Sprintf("- %s\n", steps[i].Step))
}
@@ -50,8 +65,8 @@ func capPlanExecuteExecutedSteps(steps []planexecute.ExecutedStep) []planexecute
suffix := "\n…[step result truncated]"
for i := start; i < len(steps); i++ {
e := steps[i]
if utf8.RuneCountInString(e.Result) > planExecuteMaxStepResultRunes {
e.Result = truncateRunesWithSuffix(e.Result, planExecuteMaxStepResultRunes, suffix)
if utf8.RuneCountInString(e.Result) > maxStepResultRunes {
e.Result = truncateRunesWithSuffix(e.Result, maxStepResultRunes, suffix)
}
out = append(out, e)
}
+68 -9
View File
@@ -237,7 +237,7 @@ func RunDeepAgent(
subMax = subDefaultIter
}
subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, logger)
subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, &ma.EinoMiddleware, conversationID, logger)
if err != nil {
return nil, fmt.Errorf("子代理 %q summarization 中间件: %w", id, err)
}
@@ -257,6 +257,9 @@ func RunDeepAgent(
subHandlers = append(subHandlers, einoSkillMW)
}
subHandlers = append(subHandlers, subSumMw)
if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "sub_agent"); teleMw != nil {
subHandlers = append(subHandlers, teleMw)
}
sa, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
Name: id,
@@ -289,7 +292,7 @@ func RunDeepAgent(
return nil, fmt.Errorf("多代理主模型: %w", err)
}
mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger)
mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, &ma.EinoMiddleware, conversationID, logger)
if err != nil {
return nil, fmt.Errorf("多代理主 summarization 中间件: %w", err)
}
@@ -352,6 +355,9 @@ func RunDeepAgent(
deepHandlers = append(deepHandlers, einoSkillMW)
}
deepHandlers = append(deepHandlers, mainSumMw)
if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "deep_orchestrator"); teleMw != nil {
deepHandlers = append(deepHandlers, teleMw)
}
supHandlers := []adk.ChatModelAgentMiddleware{}
if len(mainOrchestratorPre) > 0 {
@@ -361,6 +367,9 @@ func RunDeepAgent(
supHandlers = append(supHandlers, einoSkillMW)
}
supHandlers = append(supHandlers, mainSumMw)
if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "supervisor_orchestrator"); teleMw != nil {
supHandlers = append(supHandlers, teleMw)
}
mainToolsCfg := adk.ToolsConfig{
ToolsNodeConfig: compose.ToolsNodeConfig{
@@ -399,10 +408,17 @@ func RunDeepAgent(
ExecMaxIter: deepMaxIter,
LoopMaxIter: ma.PlanExecuteLoopMaxIterations,
AppCfg: appCfg,
MwCfg: &ma.EinoMiddleware,
ConversationID: conversationID,
Logger: logger,
ModelName: appCfg.OpenAI.Model,
ExecPreMiddlewares: mainOrchestratorPre,
SkillMiddleware: einoSkillMW,
FilesystemMiddleware: peFsMw,
PlannerReplannerRewriteHandlers: []adk.ChatModelAgentMiddleware{
mainSumMw,
newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "plan_execute_planner_replanner_rewrite"),
},
})
if perr != nil {
return nil, perr
@@ -468,7 +484,7 @@ func RunDeepAgent(
da = dDeep
}
baseMsgs := historyToMessages(history)
baseMsgs := historyToMessages(history, appCfg, &ma.EinoMiddleware)
baseMsgs = append(baseMsgs, schema.UserMessage(userMessage))
streamsMainAssistant := func(agent string) bool {
@@ -505,34 +521,77 @@ func RunDeepAgent(
}, baseMsgs)
}
func historyToMessages(history []agent.ChatMessage) []adk.Message {
func historyToMessages(history []agent.ChatMessage, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message {
if len(history) == 0 {
return nil
}
// 放宽条数上限:跨轮历史交给 Eino Summarization(阈值对齐 openai.max_total_tokens)在调用模型前压缩,避免在入队前硬截断为 40 条。
const maxHistoryMessages = 300
// Keep a bounded tail first; then enforce a token budget.
const maxHistoryMessages = 200
start := 0
if len(history) > maxHistoryMessages {
start = len(history) - maxHistoryMessages
}
out := make([]adk.Message, 0, len(history[start:]))
raw := make([]adk.Message, 0, len(history[start:]))
for _, h := range history[start:] {
switch h.Role {
case "user":
if strings.TrimSpace(h.Content) != "" {
out = append(out, schema.UserMessage(h.Content))
raw = append(raw, schema.UserMessage(h.Content))
}
case "assistant":
if strings.TrimSpace(h.Content) == "" && len(h.ToolCalls) > 0 {
continue
}
if strings.TrimSpace(h.Content) != "" {
out = append(out, schema.AssistantMessage(h.Content, nil))
raw = append(raw, schema.AssistantMessage(h.Content, nil))
}
default:
continue
}
}
if len(raw) == 0 {
return raw
}
maxTotal := 120000
modelName := "gpt-4o"
if appCfg != nil {
if appCfg.OpenAI.MaxTotalTokens > 0 {
maxTotal = appCfg.OpenAI.MaxTotalTokens
}
if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" {
modelName = m
}
}
ratio := 0.35
if mwCfg != nil {
ratio = mwCfg.HistoryInputBudgetRatioEffective()
}
budget := int(float64(maxTotal) * ratio)
if budget < 4096 {
budget = 4096
}
tc := agent.NewTikTokenCounter()
outRev := make([]adk.Message, 0, len(raw))
used := 0
for i := len(raw) - 1; i >= 0; i-- {
msg := raw[i]
n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content)
if err != nil {
n = (len(msg.Content) + 3) / 4
}
if n <= 0 {
n = 1
}
if used+n > budget {
break
}
used += n
outRev = append(outRev, msg)
}
out := make([]adk.Message, 0, len(outRev))
for i := len(outRev) - 1; i >= 0; i-- {
out = append(out, outRev[i])
}
return out
}