From 99ce183f41fead7aa6c1e3e5f3293f298df84c90 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 10 May 2026 23:25:11 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 7 ++ .../eino_filesystem_tool_monitor.go | 101 ++++++++++++++++++ internal/multiagent/eino_single_runner.go | 32 +++--- internal/multiagent/hitl_middleware.go | 66 +++++++++--- internal/multiagent/runner.go | 34 +++--- 5 files changed, 197 insertions(+), 43 deletions(-) create mode 100644 internal/multiagent/eino_filesystem_tool_monitor.go diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 778a3ccb..f84f537e 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -13,6 +13,7 @@ import ( "sync/atomic" "unicode/utf8" + "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/einomcp" "github.com/cloudwego/eino/adk" @@ -77,6 +78,11 @@ type einoADKRunLoopArgs struct { McpIDsMu *sync.Mutex McpIDs *[]string + // FilesystemMonitorAgent / FilesystemMonitorRecord 非 nil 时,将 Eino ADK filesystem 中间件工具(ls/read_file/write_file/edit_file/glob/grep) + // 在完成时写入 MCP 监控;execute 仍由 eino_execute_monitor 记录,此处跳过。 + FilesystemMonitorAgent *agent.Agent + FilesystemMonitorRecord einomcp.ExecutionRecorder + // ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 Set,MCP 桥 Fire 以补全 tool_result。 ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder @@ -885,6 +891,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs data["toolCallId"] = toolCallID } recordPendingExecuteStdoutDup(toolName, content, isErr) + recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, toolName, toolCallID, runAccumulatedMsgs, content, isErr) progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } } diff --git a/internal/multiagent/eino_filesystem_tool_monitor.go b/internal/multiagent/eino_filesystem_tool_monitor.go new file mode 100644 index 00000000..5894538b --- /dev/null +++ b/internal/multiagent/eino_filesystem_tool_monitor.go @@ -0,0 +1,101 @@ +package multiagent + +import ( + "encoding/json" + "errors" + "strings" + + "cyberstrike-ai/internal/agent" + "cyberstrike-ai/internal/einomcp" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" +) + +// einoADKFilesystemToolNames 与 cloudwego/eino/adk/middlewares/filesystem 默认 ToolName* 一致。 +// execute 已由 eino_execute_monitor 落库,此处不包含。 +var einoADKFilesystemToolNames = map[string]struct{}{ + "ls": {}, + "read_file": {}, + "write_file": {}, + "edit_file": {}, + "glob": {}, + "grep": {}, +} + +func isBuiltinEinoADKFilesystemToolName(name string) bool { + n := strings.ToLower(strings.TrimSpace(name)) + _, ok := einoADKFilesystemToolNames[n] + return ok +} + +func toolCallArgsFromAccumulated(msgs []adk.Message, toolCallID, expectToolName string) map[string]interface{} { + tid := strings.TrimSpace(toolCallID) + expect := strings.TrimSpace(expectToolName) + for i := len(msgs) - 1; i >= 0; i-- { + m := msgs[i] + if m == nil || m.Role != schema.Assistant || len(m.ToolCalls) == 0 { + continue + } + for j := len(m.ToolCalls) - 1; j >= 0; j-- { + tc := m.ToolCalls[j] + if tid != "" && strings.TrimSpace(tc.ID) != tid { + continue + } + fn := strings.TrimSpace(tc.Function.Name) + if expect != "" && !strings.EqualFold(fn, expect) { + continue + } + raw := strings.TrimSpace(tc.Function.Arguments) + if raw == "" { + return map[string]interface{}{} + } + var args map[string]interface{} + if err := json.Unmarshal([]byte(raw), &args); err != nil { + return map[string]interface{}{"arguments_raw": raw} + } + if args == nil { + return map[string]interface{}{} + } + return args + } + } + return map[string]interface{}{} +} + +// recordEinoADKFilesystemToolMonitor 将 Eino ADK filesystem 中间件工具结果写入 MCP 监控(与 execute / MCP 桥芯片一致)。 +func recordEinoADKFilesystemToolMonitor( + ag *agent.Agent, + rec einomcp.ExecutionRecorder, + toolName string, + toolCallID string, + msgs []adk.Message, + resultText string, + isErr bool, +) { + if ag == nil || rec == nil { + return + } + name := strings.TrimSpace(toolName) + if name == "" || strings.EqualFold(name, "execute") { + return + } + if !isBuiltinEinoADKFilesystemToolName(name) { + return + } + args := toolCallArgsFromAccumulated(msgs, toolCallID, name) + storedName := "eino_fs::" + strings.ToLower(name) + var invErr error + if isErr { + t := strings.TrimSpace(resultText) + if t == "" { + invErr = errors.New("tool error") + } else { + invErr = errors.New(t) + } + } + id := ag.RecordLocalToolExecution(storedName, args, resultText, invErr) + if id != "" { + rec(id) + } +} diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 90d250bd..34af0af1 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -169,7 +169,7 @@ func RunEinoSingleChatModelAgent( Tools: mainToolsForCfg, UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ - {Invokable: hitlToolCallMiddleware()}, + hitlToolCallMiddleware(), {Invokable: softRecoveryToolCallMiddleware()}, }, }, @@ -228,20 +228,22 @@ func RunEinoSingleChatModelAgent( } return runEinoADKAgentLoop(ctx, &einoADKRunLoopArgs{ - OrchMode: "eino_single", - OrchestratorName: einoSingleAgentName, - ConversationID: conversationID, - Progress: progress, - Logger: logger, - SnapshotMCPIDs: snapshotMCPIDs, - StreamsMainAssistant: streamsMainAssistant, - EinoRoleTag: einoRoleTag, - CheckpointDir: ma.EinoMiddleware.CheckpointDir, - McpIDsMu: &mcpIDsMu, - McpIDs: &mcpIDs, - ToolInvokeNotify: toolInvokeNotify, - DA: chatAgent, - ModelFacingTrace: modelFacingTrace, + OrchMode: "eino_single", + OrchestratorName: einoSingleAgentName, + ConversationID: conversationID, + Progress: progress, + Logger: logger, + SnapshotMCPIDs: snapshotMCPIDs, + StreamsMainAssistant: streamsMainAssistant, + EinoRoleTag: einoRoleTag, + CheckpointDir: ma.EinoMiddleware.CheckpointDir, + McpIDsMu: &mcpIDsMu, + McpIDs: &mcpIDs, + FilesystemMonitorAgent: ag, + FilesystemMonitorRecord: recorder, + ToolInvokeNotify: toolInvokeNotify, + DA: chatAgent, + ModelFacingTrace: modelFacingTrace, EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " + "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs) diff --git a/internal/multiagent/hitl_middleware.go b/internal/multiagent/hitl_middleware.go index 2167e1d8..4d4a02a9 100644 --- a/internal/multiagent/hitl_middleware.go +++ b/internal/multiagent/hitl_middleware.go @@ -8,6 +8,7 @@ import ( "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" ) type hitlInterceptorKey struct{} @@ -41,7 +42,31 @@ func WithHITLToolInterceptor(ctx context.Context, fn HITLToolInterceptor) contex return context.WithValue(ctx, hitlInterceptorKey{}, fn) } -func hitlToolCallMiddleware() compose.InvokableToolMiddleware { +// hitlToolCallMiddleware 同时注册 Invokable 与 Streamable。 +// Eino filesystem 的 execute 为流式工具(StreamableTool),仅挂 Invokable 时人机协同不会拦截,会直接执行。 +func hitlToolCallMiddleware() compose.ToolMiddleware { + return compose.ToolMiddleware{ + Invokable: hitlInvokableToolCallMiddleware(), + Streamable: hitlStreamableToolCallMiddleware(), + } +} + +func hitlClearReturnDirectlyIfTransfer(ctx context.Context, toolName string) { + if !strings.EqualFold(strings.TrimSpace(toolName), adk.TransferToAgentToolName) { + return + } + _ = compose.ProcessState[*adk.State](ctx, func(_ context.Context, st *adk.State) error { + if st == nil { + return nil + } + st.ReturnDirectlyToolCallID = "" + st.HasReturnDirectly = false + st.ReturnDirectlyEvent = nil + return nil + }) +} + +func hitlInvokableToolCallMiddleware() compose.InvokableToolMiddleware { return func(next compose.InvokableToolEndpoint) compose.InvokableToolEndpoint { return func(ctx context.Context, input *compose.ToolInput) (*compose.ToolOutput, error) { if input != nil { @@ -55,17 +80,7 @@ func hitlToolCallMiddleware() compose.InvokableToolMiddleware { // transfer_to_agent 在 Eino 中标记为 returnDirectly:工具成功后 ReAct 子图会直接 END, // 并依赖真实工具内的 SendToolGenAction 触发移交。HITL 拒绝时不会执行真实工具, // 若仍走 returnDirectly 分支,监督者会在无 Transfer 动作的情况下结束,模型不再迭代。 - if strings.EqualFold(strings.TrimSpace(input.Name), adk.TransferToAgentToolName) { - _ = compose.ProcessState[*adk.State](ctx, func(_ context.Context, st *adk.State) error { - if st == nil { - return nil - } - st.ReturnDirectlyToolCallID = "" - st.HasReturnDirectly = false - st.ReturnDirectlyEvent = nil - return nil - }) - } + hitlClearReturnDirectlyIfTransfer(ctx, input.Name) return &compose.ToolOutput{Result: msg}, nil } return nil, err @@ -79,3 +94,30 @@ func hitlToolCallMiddleware() compose.InvokableToolMiddleware { } } } + +func hitlStreamableToolCallMiddleware() compose.StreamableToolMiddleware { + return func(next compose.StreamableToolEndpoint) compose.StreamableToolEndpoint { + return func(ctx context.Context, input *compose.ToolInput) (*compose.StreamToolOutput, error) { + if input != nil { + if fn, ok := ctx.Value(hitlInterceptorKey{}).(HITLToolInterceptor); ok && fn != nil { + edited, err := fn(ctx, input.Name, input.Arguments) + if err != nil { + if IsHumanRejectError(err) { + msg := fmt.Sprintf("[HITL Reject] Tool '%s' was rejected by human reviewer. Reason: %s\nPlease adjust parameters/plan and continue without this call.", + input.Name, strings.TrimSpace(err.Error())) + hitlClearReturnDirectlyIfTransfer(ctx, input.Name) + return &compose.StreamToolOutput{ + Result: schema.StreamReaderFromArray([]string{msg}), + }, nil + } + return nil, err + } + if edited != "" { + input.Arguments = edited + } + } + } + return next(ctx, input) + } + } +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 8327cd24..8a0f0e25 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -285,7 +285,7 @@ func RunDeepAgent( Tools: subToolsForCfg, UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ - {Invokable: hitlToolCallMiddleware()}, + hitlToolCallMiddleware(), {Invokable: softRecoveryToolCallMiddleware()}, }, }, @@ -434,7 +434,7 @@ func RunDeepAgent( Tools: mainToolsForCfg, UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ - {Invokable: hitlToolCallMiddleware()}, + hitlToolCallMiddleware(), {Invokable: softRecoveryToolCallMiddleware()}, }, }, @@ -565,20 +565,22 @@ func RunDeepAgent( } return runEinoADKAgentLoop(ctx, &einoADKRunLoopArgs{ - OrchMode: orchMode, - OrchestratorName: orchestratorName, - ConversationID: conversationID, - Progress: progress, - Logger: logger, - SnapshotMCPIDs: snapshotMCPIDs, - StreamsMainAssistant: streamsMainAssistant, - EinoRoleTag: einoRoleTag, - CheckpointDir: ma.EinoMiddleware.CheckpointDir, - McpIDsMu: &mcpIDsMu, - McpIDs: &mcpIDs, - ToolInvokeNotify: toolInvokeNotify, - DA: da, - ModelFacingTrace: modelFacingTrace, + OrchMode: orchMode, + OrchestratorName: orchestratorName, + ConversationID: conversationID, + Progress: progress, + Logger: logger, + SnapshotMCPIDs: snapshotMCPIDs, + StreamsMainAssistant: streamsMainAssistant, + EinoRoleTag: einoRoleTag, + CheckpointDir: ma.EinoMiddleware.CheckpointDir, + McpIDsMu: &mcpIDsMu, + McpIDs: &mcpIDs, + FilesystemMonitorAgent: ag, + FilesystemMonitorRecord: recorder, + ToolInvokeNotify: toolInvokeNotify, + DA: da, + ModelFacingTrace: modelFacingTrace, EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " + "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", }, baseMsgs)