From 7eb2fd50f3cbf7f44274c32e96090c4c32dffd30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 24 Jun 2026 17:19:29 +0800 Subject: [PATCH] Add files via upload --- internal/app/app.go | 3 + internal/multiagent/eino_adk_run_loop.go | 16 +- internal/multiagent/eino_execute_monitor.go | 29 +++- .../multiagent/eino_execute_streaming_wrap.go | 164 +++++++++++++----- .../eino_execute_streaming_wrap_test.go | 135 ++++++++++++++ .../eino_filesystem_tool_monitor.go | 41 ++++- internal/multiagent/eino_single_runner.go | 4 +- internal/multiagent/eino_skills.go | 33 +++- internal/multiagent/runner.go | 20 ++- 9 files changed, 367 insertions(+), 78 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index 53f9e81d..c508a5a4 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -110,6 +110,7 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error // 创建安全工具执行器 executor := security.NewExecutor(&cfg.Security, mcpServer, log.Logger) + executor.SetShellNoOutputTimeoutSeconds(cfg.Agent.ShellNoOutputTimeoutSeconds) // 注册工具 executor.RegisterTools(mcpServer) @@ -333,6 +334,8 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error monitorHandler.SetAudit(auditSvc) monitorHandler.SetMonitorRetention(monitorRetention) monitorHandler.SetExternalMCPManager(externalMCPMgr) // 设置外部MCP管理器,以便获取外部MCP执行记录 + monitorHandler.SetTaskManager(agentHandler.TaskManager()) + monitorHandler.SetAgentHandler(agentHandler) notificationHandler := handler.NewNotificationHandler(db, agentHandler, log.Logger) groupHandler := handler.NewGroupHandler(db, log.Logger) authHandler := handler.NewAuthHandler(authManager, cfg, configPath, log.Logger) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 4f2c9482..cb820d4f 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -196,6 +196,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs pendingByID[tc.ToolCallID] = tc pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) } + markPendingWithMonitor := func(tc toolCallPendingInfo) { + markPending(tc) + beginEinoADKFilesystemToolMonitor( + args.FilesystemMonitorAgent, + args.FilesystemMonitorRecord, + args.MCPExecutionBinder, + tc.ToolCallID, + tc.ToolName, + ) + } popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { pendingMu.Lock() defer pendingMu.Unlock() @@ -331,7 +341,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs toolCallID = tid } recordPendingExecuteStdoutDup(toolName, content, isErr) - recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, toolName, toolCallID, runAccumulatedMsgs, content, isErr) + recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, args.MCPExecutionBinder, toolName, toolCallID, runAccumulatedMsgs, content, isErr) if args.FilesystemMonitorAgent != nil && args.MCPExecutionBinder != nil { if execID := args.MCPExecutionBinder.ExecutionID(toolCallID); execID != "" { args.FilesystemMonitorAgent.UpdateMCPExecutionDisplayResult(execID, content) @@ -974,7 +984,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged}) } - tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending) + tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor) // 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。 if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 { runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls)) @@ -1009,7 +1019,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs continue } runAccumulatedMsgs = append(runAccumulatedMsgs, msg) - tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending) + tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor) if mv.Role == schema.Assistant { if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { diff --git a/internal/multiagent/eino_execute_monitor.go b/internal/multiagent/eino_execute_monitor.go index 1f11b544..ecc919ee 100644 --- a/internal/multiagent/eino_execute_monitor.go +++ b/internal/multiagent/eino_execute_monitor.go @@ -7,11 +7,25 @@ import ( "cyberstrike-ai/internal/einomcp" ) -// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId), -// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。 -func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(toolCallID, command, stdout string, success bool, invokeErr error) { - return func(toolCallID, command, stdout string, success bool, invokeErr error) { - if ag == nil || recorder == nil { +// newEinoExecuteMonitorCallbacks 在 Eino filesystem execute 开始/结束时写入 MCP 监控库并 recorder(executionId), +// 与 CallTool 路径一致,使监控页能展示「执行中」状态。 +func newEinoExecuteMonitorCallbacks(ag *agent.Agent, recorder einomcp.ExecutionRecorder) ( + begin func(toolCallID, command string) string, + finish func(executionID, toolCallID, command, stdout string, success bool, invokeErr error), +) { + begin = func(toolCallID, command string) string { + if ag == nil { + return "" + } + args := map[string]interface{}{"command": command} + id := ag.BeginLocalToolExecution("execute", args) + if id != "" && recorder != nil { + recorder(id, toolCallID) + } + return id + } + finish = func(executionID, toolCallID, command, stdout string, success bool, invokeErr error) { + if ag == nil { return } var err error @@ -23,9 +37,10 @@ func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRe } } args := map[string]interface{}{"command": command} - id := ag.RecordLocalToolExecution("execute", args, stdout, err) - if id != "" { + id := ag.FinishLocalToolExecution(executionID, "execute", args, stdout, err) + if id != "" && recorder != nil && executionID == "" { recorder(id, toolCallID) } } + return begin, finish } diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index 1af004e3..2d2e8dae 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -63,8 +63,11 @@ type einoStreamingShellWrap struct { outputChunk func(toolName, toolCallID, chunk string) // toolTimeoutMinutes 与 agent.tool_timeout_minutes 对齐;>0 时对单次 execute 套用 context 超时(与 MCP 工具经 executeToolViaMCP 行为一致)。0 表示仅依赖上层 ctx(如整任务 10h 上限)。 toolTimeoutMinutes int - // recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。 - recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error) + // shellNoOutputTimeoutSec:无任何输出时的空闲秒数;0=关闭。 + shellNoOutputTimeoutSec int + // beginMonitor 在 execute 开始时写入 running 状态;finishMonitor 在流结束后更新为 completed/failed。 + beginMonitor func(toolCallID, command string) string + finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error) } func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { @@ -76,15 +79,26 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi } req := *input userCmd := strings.TrimSpace(req.Command) + tid := strings.TrimSpace(compose.GetToolCallID(ctx)) + agentTag := strings.TrimSpace(w.einoAgentName) if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround { req.RunInBackendGround = true } - req.Command = prependPythonUnbufferedEnv(req.Command) - tid := strings.TrimSpace(compose.GetToolCallID(ctx)) - agentTag := strings.TrimSpace(w.einoAgentName) + req.Command = security.PrepareNonInteractiveShellCommand(prependPythonUnbufferedEnv(req.Command)) convID := mcp.MCPConversationIDFromContext(ctx) execReg := mcp.EinoExecuteRunRegistryFromContext(ctx) + var monitorExecID string + if w.beginMonitor != nil { + monitorExecID = w.beginMonitor(tid, userCmd) + } + if monitorExecID != "" && convID != "" { + if toolReg := mcp.ToolRunRegistryFromContext(ctx); toolReg != nil { + toolReg.RegisterRunningTool(convID, monitorExecID) + } + } + toolRunReg := mcp.ToolRunRegistryFromContext(ctx) + execCtx, execCancel := context.WithCancel(ctx) var timeoutCancel context.CancelFunc if w.toolTimeoutMinutes > 0 { @@ -104,23 +118,23 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi } if einoExecuteRecvErrIsToolTimeout(err, execCtx) { hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n" - if w.recordMonitor != nil { - w.recordMonitor(tid, userCmd, hint, false, context.DeadlineExceeded) + if w.finishMonitor != nil { + w.finishMonitor(monitorExecID, tid, userCmd, hint, false, context.DeadlineExceeded) } if w.invokeNotify != nil && tid != "" { w.invokeNotify.Fire(tid, "execute", agentTag, false, hint, context.DeadlineExceeded) } return schema.StreamReaderFromArray([]*filesystem.ExecuteResponse{{Output: hint}}), nil } - if w.recordMonitor != nil { - w.recordMonitor(tid, userCmd, "", false, err) + if w.finishMonitor != nil { + w.finishMonitor(monitorExecID, tid, userCmd, "", false, err) } if w.invokeNotify != nil && tid != "" { w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err) } return nil, err } - if sr == nil || w.invokeNotify == nil { + if sr == nil { if timeoutCancel != nil { timeoutCancel() } @@ -132,7 +146,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32) - go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry) { + go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry, toolReg mcp.ToolRunRegistry, execID string, toolCallID string, noOutputSec int) { var innerCloseOnce sync.Once closeInner := func() { innerCloseOnce.Do(func() { inner.Close() }) @@ -147,6 +161,9 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if reg != nil && conversationID != "" { defer reg.UnregisterActiveEinoExecute(conversationID) } + if toolReg != nil && conversationID != "" && execID != "" { + defer toolReg.UnregisterRunningTool(conversationID, execID) + } // ctx 取消时关闭内层流,避免 amass 等长时间无换行输出时 Recv 永久阻塞。 stopWatch := make(chan struct{}) @@ -165,43 +182,92 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi exitCode := 0 hasExitCode := false + idleWatch := security.NewShellInactivityWatch(noOutputSec) + if idleWatch != nil { + defer idleWatch.Stop() + } + + type execRecvMsg struct { + resp *filesystem.ExecuteResponse + err error + } + recvCh := make(chan execRecvMsg, 1) + go func() { + for { + resp, rerr := inner.Recv() + recvCh <- execRecvMsg{resp: resp, err: rerr} + if rerr != nil { + return + } + } + }() + + fireInactivityTimeout := func() { + success = false + invokeErr = fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec) + msg := security.ShellNoOutputTimeoutMessage(idleWatch.Sec) + _ = outW.Send(&filesystem.ExecuteResponse{Output: msg}, nil) + sb.WriteString(msg) + if w.outputChunk != nil && toolCallID != "" { + w.outputChunk("execute", toolCallID, msg) + } + if cancel != nil { + cancel() + } + closeInner() + } + + recvLoop: for { - resp, rerr := inner.Recv() - if errors.Is(rerr, io.EOF) { - break + var idleCh <-chan struct{} + if idleWatch != nil { + idleCh = idleWatch.Expired } - if rerr != nil { - success = false - invokeErr = rerr - // 单次 execute 超时须与 MCP 工具一致:写入工具结果尾标、继续迭代,不得向 ADK 流注入硬错误。 - if einoExecuteRecvErrIsToolTimeout(rerr, tctx) { - invokeErr = context.DeadlineExceeded - break + select { + case <-idleCh: + fireInactivityTimeout() + break recvLoop + case msg := <-recvCh: + rerr := msg.err + resp := msg.resp + if errors.Is(rerr, io.EOF) { + break recvLoop } - if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) { - invokeErr = context.Canceled - break - } - _ = outW.Send(nil, rerr) - break - } - if resp != nil { - if resp.ExitCode != nil { - hasExitCode = true - exitCode = *resp.ExitCode - } - var appended string - if resp.Output != "" { - sb.WriteString(resp.Output) - appended = resp.Output - } - if w.outputChunk != nil && strings.TrimSpace(appended) != "" { - w.outputChunk("execute", tid, appended) - } - if outW.Send(resp, nil) { + if rerr != nil { success = false - invokeErr = fmt.Errorf("execute stream closed by consumer") - break + invokeErr = rerr + if einoExecuteRecvErrIsToolTimeout(rerr, tctx) { + invokeErr = context.DeadlineExceeded + break recvLoop + } + if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) { + invokeErr = context.Canceled + break recvLoop + } + _ = outW.Send(nil, rerr) + break recvLoop + } + if resp != nil { + if resp.ExitCode != nil { + hasExitCode = true + exitCode = *resp.ExitCode + } + var appended string + if resp.Output != "" { + if idleWatch != nil { + idleWatch.Bump() + } + sb.WriteString(resp.Output) + appended = resp.Output + } + if w.outputChunk != nil && strings.TrimSpace(appended) != "" { + w.outputChunk("execute", toolCallID, appended) + } + if outW.Send(resp, nil) { + success = false + invokeErr = fmt.Errorf("execute stream closed by consumer") + break recvLoop + } } } } @@ -248,12 +314,14 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi _ = outW.Send(&filesystem.ExecuteResponse{Output: text + "\n"}, nil) } } - if w.recordMonitor != nil { - w.recordMonitor(tid, command, sb.String(), success, invokeErr) + if w.finishMonitor != nil { + w.finishMonitor(execID, toolCallID, command, sb.String(), success, invokeErr) + } + if w.invokeNotify != nil { + w.invokeNotify.Fire(toolCallID, "execute", agentTag, success, sb.String(), invokeErr) } - w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr) outW.Close() - }(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg) + }(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg, toolRunReg, monitorExecID, tid, w.shellNoOutputTimeoutSec) return outR, nil } diff --git a/internal/multiagent/eino_execute_streaming_wrap_test.go b/internal/multiagent/eino_execute_streaming_wrap_test.go index 5e8d0751..068eb9bb 100644 --- a/internal/multiagent/eino_execute_streaming_wrap_test.go +++ b/internal/multiagent/eino_execute_streaming_wrap_test.go @@ -19,9 +19,15 @@ type mockStreamingShell struct { immediateErr error recvErr error output string + called bool + lastCommand string } func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { + m.called = true + if input != nil { + m.lastCommand = input.Command + } if m.immediateErr != nil { return nil, m.immediateErr } @@ -38,6 +44,135 @@ func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesy return outR, nil } +func TestEinoStreamingShellWrap_PreparesNonInteractiveCommand(t *testing.T) { + inner := &mockStreamingShell{output: "ok\n"} + wrap := &einoStreamingShellWrap{inner: inner} + sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "echo ok"}) + if err != nil { + t.Fatalf("ExecuteStreaming: %v", err) + } + defer sr.Close() + for { + _, rerr := sr.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + t.Fatalf("recv: %v", rerr) + } + } + if !strings.Contains(inner.lastCommand, "exec 5*time.Second { + t.Fatalf("expected inactivity timeout ~1s, took %v", time.Since(start)) + } + if !strings.Contains(got.String(), "没有新的输出") && !strings.Contains(got.String(), "no new output") { + t.Fatalf("expected inactivity message, got: %q", got.String()) + } +} + +type mockStreamingShellHanging struct { + called bool +} + +func (m *mockStreamingShellHanging) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { + m.called = true + outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4) + go func() { + <-ctx.Done() + outW.Close() + }() + return outR, nil +} + func TestEinoExecuteRecvErrIsToolTimeout(t *testing.T) { tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() diff --git a/internal/multiagent/eino_filesystem_tool_monitor.go b/internal/multiagent/eino_filesystem_tool_monitor.go index 9f3efb02..3cbeddd6 100644 --- a/internal/multiagent/eino_filesystem_tool_monitor.go +++ b/internal/multiagent/eino_filesystem_tool_monitor.go @@ -63,10 +63,43 @@ func toolCallArgsFromAccumulated(msgs []adk.Message, toolCallID, expectToolName return map[string]interface{}{} } +// beginEinoADKFilesystemToolMonitor 在 Eino ADK filesystem 工具开始调用时写入 running 状态。 +func beginEinoADKFilesystemToolMonitor( + ag *agent.Agent, + rec einomcp.ExecutionRecorder, + binder *MCPExecutionBinder, + toolCallID, toolName string, +) { + if ag == nil || rec == nil { + return + } + name := strings.TrimSpace(toolName) + if name == "" || strings.EqualFold(name, "execute") { + return + } + if !isBuiltinEinoADKFilesystemToolName(name) { + return + } + tid := strings.TrimSpace(toolCallID) + if tid == "" { + return + } + storedName := "eino_fs::" + strings.ToLower(name) + id := ag.BeginLocalToolExecution(storedName, map[string]interface{}{}) + if id == "" { + return + } + rec(id, tid) + if binder != nil { + binder.Bind(tid, id) + } +} + // recordEinoADKFilesystemToolMonitor 将 Eino ADK filesystem 中间件工具结果写入 MCP 监控(与 execute / MCP 桥芯片一致)。 func recordEinoADKFilesystemToolMonitor( ag *agent.Agent, rec einomcp.ExecutionRecorder, + binder *MCPExecutionBinder, toolName string, toolCallID string, msgs []adk.Message, @@ -94,8 +127,12 @@ func recordEinoADKFilesystemToolMonitor( invErr = errors.New(t) } } - id := ag.RecordLocalToolExecution(storedName, args, resultText, invErr) - if id != "" { + execID := "" + if binder != nil { + execID = binder.ExecutionID(toolCallID) + } + id := ag.FinishLocalToolExecution(execID, storedName, args, resultText, invErr) + if id != "" && execID == "" { rec(id, toolCallID) } } diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index d92e0c3c..6f1005fb 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -81,7 +81,7 @@ func RunEinoSingleChatModelAgent( } toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder() - einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder) + einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder) mainDefs := ag.ToolsForRole(roleTools) mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, nil, toolInvokeNotify, einoSingleAgentName) if err != nil { @@ -136,7 +136,7 @@ func RunEinoSingleChatModelAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil) + fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil) if fsErr != nil { return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr) } diff --git a/internal/multiagent/eino_skills.go b/internal/multiagent/eino_skills.go index e5e17726..d629d30c 100644 --- a/internal/multiagent/eino_skills.go +++ b/internal/multiagent/eino_skills.go @@ -81,8 +81,10 @@ func subAgentFilesystemMiddleware( loc *localbk.Local, invokeNotify *einomcp.ToolInvokeNotifyHolder, einoAgentName string, - recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error), + beginMonitor func(toolCallID, command string) string, + finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error), toolTimeoutMinutes int, + shellNoOutputTimeoutSec int, outputChunk func(toolName, toolCallID, chunk string), ) (adk.ChatModelAgentMiddleware, error) { if loc == nil { @@ -91,12 +93,14 @@ func subAgentFilesystemMiddleware( return filesystem.New(ctx, &filesystem.MiddlewareConfig{ Backend: loc, StreamingShell: &einoStreamingShellWrap{ - inner: loc, - invokeNotify: invokeNotify, - einoAgentName: strings.TrimSpace(einoAgentName), - outputChunk: outputChunk, - recordMonitor: recordMonitor, - toolTimeoutMinutes: toolTimeoutMinutes, + inner: loc, + invokeNotify: invokeNotify, + einoAgentName: strings.TrimSpace(einoAgentName), + outputChunk: outputChunk, + beginMonitor: beginMonitor, + finishMonitor: finishMonitor, + toolTimeoutMinutes: toolTimeoutMinutes, + shellNoOutputTimeoutSec: shellNoOutputTimeoutSec, }, }) } @@ -108,3 +112,18 @@ func agentToolTimeoutMinutes(cfg *config.Config) int { } return cfg.Agent.ToolTimeoutMinutes } + +// agentShellNoOutputTimeoutSeconds:0=默认 300s(5 分钟);-1=关闭;>0=自定义秒数。 +func agentShellNoOutputTimeoutSeconds(cfg *config.Config) int { + if cfg == nil { + return 300 + } + v := cfg.Agent.ShellNoOutputTimeoutSeconds + if v < 0 { + return 0 + } + if v == 0 { + return 300 + } + return v +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 1fd02cf8..ed568443 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -120,7 +120,7 @@ func RunDeepAgent( mcpIDs = append(mcpIDs, id) mcpIDsMu.Unlock() } - einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder) + einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder) // 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。 snapshotMCPIDs := func() []string { @@ -223,7 +223,7 @@ func RunDeepAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil) + subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil) if fsErr != nil { return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr) } @@ -358,12 +358,14 @@ func RunDeepAgent( if einoLoc != nil && einoFSTools { deepBackend = einoLoc deepShell = &einoStreamingShellWrap{ - inner: einoLoc, - invokeNotify: toolInvokeNotify, - einoAgentName: orchestratorName, - outputChunk: nil, - recordMonitor: einoExecMonitor, - toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg), + inner: einoLoc, + invokeNotify: toolInvokeNotify, + einoAgentName: orchestratorName, + outputChunk: nil, + beginMonitor: einoExecBegin, + finishMonitor: einoExecFinish, + toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg), + shellNoOutputTimeoutSec: agentShellNoOutputTimeoutSeconds(appCfg), } } @@ -428,7 +430,7 @@ func RunDeepAgent( // 构建 filesystem 中间件(与 Deep sub-agent 一致) var peFsMw adk.ChatModelAgentMiddleware if einoSkillMW != nil && einoFSTools && einoLoc != nil { - peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil) + peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil) if err != nil { return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err) }