diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 5306c02e..e3c90b51 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -84,6 +84,10 @@ type einoADKRunLoopArgs struct { // EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。 EmptyResponseMessage string + + // ModelFacingTrace 可选:由各 ChatModelAgent Handlers 链末尾中间件写入「即将送入模型」的消息快照; + // 非空时优先用于 LastAgentTraceInput 序列化,使续跑与 summarization/reduction 后的上下文一致。 + ModelFacingTrace *modelFacingTraceHolder } func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs []adk.Message) (*RunResult, error) { @@ -406,7 +410,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } ids := snapshotMCPIDs() return buildEinoRunResultFromAccumulated( - orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, true, + orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), + lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, true, ), runErr } @@ -886,11 +891,21 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mcpIDsMu.Unlock() out := buildEinoRunResultFromAccumulated( - orchMode, runAccumulatedMsgs, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false, + orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), + lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false, ) return out, nil } +func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message { + if args != nil && args.ModelFacingTrace != nil { + if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 { + return snap + } + } + return fallback +} + func einoPartialRunLastOutputHint() string { return "[执行未正常结束(用户停止、超时或异常)。续跑时请基于上文已产生的工具与结果继续,勿重复已完成步骤。]\n" + "[Run ended abnormally; continue from the trace above without repeating completed steps.]" @@ -899,13 +914,18 @@ func einoPartialRunLastOutputHint() string { func buildEinoRunResultFromAccumulated( orchMode string, runAccumulatedMsgs []adk.Message, + persistMsgs []adk.Message, lastAssistant string, lastPlanExecuteExecutor string, emptyHint string, mcpIDs []string, partial bool, ) *RunResult { - histJSON, _ := json.Marshal(runAccumulatedMsgs) + traceForJSON := persistMsgs + if len(traceForJSON) == 0 { + traceForJSON = runAccumulatedMsgs + } + histJSON, _ := json.Marshal(traceForJSON) cleaned := strings.TrimSpace(lastAssistant) if orchMode == "plan_execute" { if e := strings.TrimSpace(lastPlanExecuteExecutor); e != "" { diff --git a/internal/multiagent/eino_model_facing_trace.go b/internal/multiagent/eino_model_facing_trace.go new file mode 100644 index 00000000..e18f3307 --- /dev/null +++ b/internal/multiagent/eino_model_facing_trace.go @@ -0,0 +1,84 @@ +package multiagent + +import ( + "context" + "encoding/json" + "sync" + + "github.com/cloudwego/eino/adk" +) + +// modelFacingTraceHolder 保存「即将送入 ChatModel」的消息快照(已走 summarization / reduction / orphan 修剪等), +// 用于 last_react_input 落库,使续跑与「上下文压缩后」的模型视角一致,而非仅依赖事件流 append 的 runAccumulatedMsgs。 +type modelFacingTraceHolder struct { + mu sync.Mutex + // msgs 为深拷贝后的切片,避免框架后续原地修改污染快照 + msgs []adk.Message +} + +func newModelFacingTraceHolder() *modelFacingTraceHolder { + return &modelFacingTraceHolder{} +} + +// Snapshot 返回当前快照的再一次深拷贝(供序列化落库,避免与 holder 互斥长期持锁)。 +func (h *modelFacingTraceHolder) Snapshot() []adk.Message { + if h == nil { + return nil + } + h.mu.Lock() + defer h.mu.Unlock() + return cloneADKMessagesForTrace(h.msgs) +} + +func (h *modelFacingTraceHolder) storeFromState(state *adk.ChatModelAgentState) { + if h == nil || state == nil || len(state.Messages) == 0 { + return + } + cloned := cloneADKMessagesForTrace(state.Messages) + if len(cloned) == 0 { + return + } + h.mu.Lock() + h.msgs = cloned + h.mu.Unlock() +} + +func cloneADKMessagesForTrace(msgs []adk.Message) []adk.Message { + if len(msgs) == 0 { + return nil + } + b, err := json.Marshal(msgs) + if err != nil { + return nil + } + var out []adk.Message + if err := json.Unmarshal(b, &out); err != nil { + return nil + } + return out +} + +// modelFacingTraceMiddleware 必须在 Handlers 链中处于 **BeforeModel 最后**(telemetry 之后), +// 此时 state.Messages 即为本次 LLM 调用的最终入参。 +type modelFacingTraceMiddleware struct { + adk.BaseChatModelAgentMiddleware + holder *modelFacingTraceHolder +} + +func newModelFacingTraceMiddleware(holder *modelFacingTraceHolder) adk.ChatModelAgentMiddleware { + if holder == nil { + return nil + } + return &modelFacingTraceMiddleware{holder: holder} +} + +func (m *modelFacingTraceMiddleware) BeforeModelRewriteState( + ctx context.Context, + state *adk.ChatModelAgentState, + mc *adk.ModelContext, +) (context.Context, *adk.ChatModelAgentState, error) { + if m.holder != nil && state != nil { + m.holder.storeFromState(state) + } + return ctx, state, nil +} diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index dccd99d5..40df6c03 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -41,6 +41,8 @@ type PlanExecuteRootArgs struct { FilesystemMiddleware adk.ChatModelAgentMiddleware // PlannerReplannerRewriteHandlers applies BeforeModelRewriteState pipeline for planner/replanner input. PlannerReplannerRewriteHandlers []adk.ChatModelAgentMiddleware + // ModelFacingTrace 可选:由 Executor Handlers 链末尾写入,供 last_react 与 summarization 后上下文对齐。 + ModelFacingTrace *modelFacingTraceHolder } // NewPlanExecuteRoot 返回 plan → execute → replan 预置编排根节点(与 Deep / Supervisor 并列)。 @@ -101,6 +103,11 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma if teleMw := newEinoModelInputTelemetryMiddleware(a.Logger, a.ModelName, a.ConversationID, "plan_execute_executor"); teleMw != nil { execHandlers = append(execHandlers, teleMw) } + if a.ModelFacingTrace != nil { + if capMw := newModelFacingTraceMiddleware(a.ModelFacingTrace); capMw != nil { + execHandlers = append(execHandlers, capMw) + } + } executor, err := newPlanExecuteExecutor(ctx, &planexecute.ExecutorConfig{ Model: a.ExecModel, ToolsConfig: a.ToolsCfg, diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index a586fce3..90d250bd 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -132,7 +132,9 @@ func RunEinoSingleChatModelAgent( return nil, fmt.Errorf("eino single summarization: %w", err) } - handlers := make([]adk.ChatModelAgentMiddleware, 0, 4) + modelFacingTrace := newModelFacingTraceHolder() + + handlers := make([]adk.ChatModelAgentMiddleware, 0, 8) if len(mainOrchestratorPre) > 0 { handlers = append(handlers, mainOrchestratorPre...) } @@ -150,6 +152,9 @@ func RunEinoSingleChatModelAgent( if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "eino_single"); teleMw != nil { handlers = append(handlers, teleMw) } + if capMw := newModelFacingTraceMiddleware(modelFacingTrace); capMw != nil { + handlers = append(handlers, capMw) + } maxIter := ma.MaxIteration if maxIter <= 0 { @@ -236,6 +241,7 @@ func RunEinoSingleChatModelAgent( McpIDs: &mcpIDs, ToolInvokeNotify: toolInvokeNotify, DA: chatAgent, + ModelFacingTrace: modelFacingTrace, EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " + "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs) diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 13e49b73..8327cd24 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -311,6 +311,8 @@ func RunDeepAgent( return nil, fmt.Errorf("多代理主 summarization 中间件: %w", err) } + modelFacingTrace := newModelFacingTraceHolder() + // 与 deep.Config.Name / supervisor 主代理 Name 一致。 orchestratorName := "cyberstrike-deep" orchDescription := "Coordinates specialist agents and MCP tools for authorized security testing." @@ -407,6 +409,9 @@ func RunDeepAgent( if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "deep_orchestrator"); teleMw != nil { deepHandlers = append(deepHandlers, teleMw) } + if capMw := newModelFacingTraceMiddleware(modelFacingTrace); capMw != nil { + deepHandlers = append(deepHandlers, capMw) + } supHandlers := []adk.ChatModelAgentMiddleware{} if len(mainOrchestratorPre) > 0 { @@ -420,6 +425,9 @@ func RunDeepAgent( if teleMw := newEinoModelInputTelemetryMiddleware(logger, appCfg.OpenAI.Model, conversationID, "supervisor_orchestrator"); teleMw != nil { supHandlers = append(supHandlers, teleMw) } + if capMw := newModelFacingTraceMiddleware(modelFacingTrace); capMw != nil { + supHandlers = append(supHandlers, capMw) + } mainToolsCfg := adk.ToolsConfig{ ToolsNodeConfig: compose.ToolsNodeConfig{ @@ -465,6 +473,7 @@ func RunDeepAgent( ExecPreMiddlewares: mainOrchestratorPre, SkillMiddleware: einoSkillMW, FilesystemMiddleware: peFsMw, + ModelFacingTrace: modelFacingTrace, PlannerReplannerRewriteHandlers: []adk.ChatModelAgentMiddleware{ mainSumMw, // 孤儿 tool 消息兜底:必须挂在 summarization 之后、telemetry 之前。 @@ -569,6 +578,7 @@ func RunDeepAgent( McpIDs: &mcpIDs, ToolInvokeNotify: toolInvokeNotify, DA: da, + ModelFacingTrace: modelFacingTrace, EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " + "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs)