diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 4ec0e9c8..cadd2c30 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -15,8 +15,8 @@ import ( "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/config" - "cyberstrike-ai/internal/einoobserve" "cyberstrike-ai/internal/einomcp" + "cyberstrike-ai/internal/einoobserve" "cyberstrike-ai/internal/openai" "github.com/cloudwego/eino/adk" @@ -267,7 +267,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs isErr := !success || invokeErr != nil body := content if invokeErr != nil { - body = invokeErr.Error() + // 保留已流式累计的 stdout(如 execute 超时前的一半输出),避免 tool_result 只剩错误串、模型与 UI 丢失上下文 + tail := friendlyEinoExecuteInvokeTail(invokeErr) + // execute 流式包装可能已把超时句写入 content(供 ADK tool 与流式 delta);勿重复拼接 + if tail != "" && strings.Contains(content, tail) { + body = content + } else if strings.TrimSpace(content) != "" { + body = strings.TrimRight(content, "\n") + "\n\n" + tail + } else { + body = tail + } isErr = true } recordPendingExecuteStdoutDup(toolName, body, isErr) @@ -948,6 +957,17 @@ func einoPartialRunLastOutputHint() string { "[Run ended abnormally; continue from the trace above without repeating completed steps.]" } +// friendlyEinoExecuteInvokeTail 将 Eino execute 等非 MCP 路径的结尾错误转成简短提示;其它情况保留原 error 文本。 +func friendlyEinoExecuteInvokeTail(invokeErr error) string { + if invokeErr == nil { + return "" + } + if errors.Is(invokeErr, context.DeadlineExceeded) { + return einoExecuteTimeoutUserHint() + } + return "[执行未正常结束] " + invokeErr.Error() +} + func buildEinoRunResultFromAccumulated( orchMode string, runAccumulatedMsgs []adk.Message, diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index c20f7609..387245a5 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "strings" + "time" "cyberstrike-ai/internal/einomcp" "cyberstrike-ai/internal/security" @@ -15,6 +16,24 @@ import ( "github.com/cloudwego/eino/schema" ) +// prependPythonUnbufferedEnv 为 /bin/sh -c 注入 PYTHONUNBUFFERED=1。 +// eino-ext local 对流式 stdout 使用 bufio 按「行」推送;python3 写管道时默认块缓冲,print 长期留在用户态缓冲, +// 管道里收不到换行,表现为长时间无输出直至超时或退出。若命令里已出现 PYTHONUNBUFFERED 则不再覆盖。 +func prependPythonUnbufferedEnv(shellCommand string) string { + if strings.TrimSpace(shellCommand) == "" { + return shellCommand + } + if strings.Contains(strings.ToUpper(shellCommand), "PYTHONUNBUFFERED") { + return shellCommand + } + return "export PYTHONUNBUFFERED=1\n" + shellCommand +} + +// einoExecuteTimeoutUserHint 与写入 ADK 工具消息(模型可见)及 SSE tool_result 尾标一致。 +func einoExecuteTimeoutUserHint() string { + return "已超时终止 · Timed out" +} + // einoStreamingShellWrap 包装 Eino filesystem 使用的 StreamingShell(cloudwego eino-ext local.Local)。 // 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连, // streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。 @@ -29,6 +48,10 @@ type einoStreamingShellWrap struct { inner filesystem.StreamingShell invokeNotify *einomcp.ToolInvokeNotifyHolder einoAgentName string + // outputChunk 可选;非 nil 时在收到内层 ExecuteResponse 片段时推送,与 MCP 工具的 tool_result_delta 一致(需有效 toolCallId)。 + outputChunk func(toolName, toolCallID, chunk string) + // toolTimeoutMinutes 与 agent.tool_timeout_minutes 对齐;>0 时对单次 execute 套用 context 超时(与 MCP 工具经 executeToolViaMCP 行为一致)。0 表示仅依赖上层 ctx(如整任务 10h 上限)。 + toolTimeoutMinutes int // recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。 recordMonitor func(command, stdout string, success bool, invokeErr error) } @@ -41,17 +64,27 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi return w.inner.ExecuteStreaming(ctx, nil) } req := *input - cmd := strings.TrimSpace(req.Command) + userCmd := strings.TrimSpace(req.Command) if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround { req.RunInBackendGround = true } + req.Command = prependPythonUnbufferedEnv(req.Command) tid := strings.TrimSpace(compose.GetToolCallID(ctx)) agentTag := strings.TrimSpace(w.einoAgentName) - sr, err := w.inner.ExecuteStreaming(ctx, &req) + execCtx := ctx + var execCancel context.CancelFunc + if w.toolTimeoutMinutes > 0 { + execCtx, execCancel = context.WithTimeout(ctx, time.Duration(w.toolTimeoutMinutes)*time.Minute) + } + + sr, err := w.inner.ExecuteStreaming(execCtx, &req) if err != nil { + if execCancel != nil { + execCancel() + } if w.recordMonitor != nil { - w.recordMonitor(cmd, "", false, err) + w.recordMonitor(userCmd, "", false, err) } if w.invokeNotify != nil && tid != "" { w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err) @@ -59,13 +92,19 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi return nil, err } if sr == nil || w.invokeNotify == nil || tid == "" { + if execCancel != nil { + execCancel() + } return sr, nil } outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32) - go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) { + go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, tctx context.Context) { defer inner.Close() + if cancel != nil { + defer cancel() + } var sb strings.Builder const maxCapture = 16 * 1024 @@ -90,12 +129,18 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi hasExitCode = true exitCode = *resp.ExitCode } + var appended string if remain := maxCapture - sb.Len(); remain > 0 { out := resp.Output if len(out) > remain { out = out[:remain] } sb.WriteString(out) + appended = out + } + // 仅推送写入 sb 的片段,与末尾 Fire/recordMonitor 的截断累计一致,避免最终 tool_result 短于已展示增量。 + if w.outputChunk != nil && strings.TrimSpace(appended) != "" { + w.outputChunk("execute", tid, appended) } if outW.Send(resp, nil) { success = false @@ -109,12 +154,33 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi success = false invokeErr = fmt.Errorf("execute exited with code %d", exitCode) } + // WithTimeout 触发后,子进程常被信号结束,local 侧多报 exit -1 / canceled,错误链里不一定带 DeadlineExceeded。 + // 用执行所用 ctx 归一化,便于 UI 展示「超时」而非含糊的 -1。 + if tctx != nil && errors.Is(tctx.Err(), context.DeadlineExceeded) { + success = false + invokeErr = context.DeadlineExceeded + } + // ADK 从本 Pipe 拼出 tool 消息正文;仅 Notify 尾标不会进入模型上下文。超时句写入流,与 UI 一致。 + if invokeErr != nil && errors.Is(invokeErr, context.DeadlineExceeded) { + hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n" + _ = outW.Send(&filesystem.ExecuteResponse{Output: hint}, nil) + if w.outputChunk != nil && tid != "" { + w.outputChunk("execute", tid, hint) + } + if remain := maxCapture - sb.Len(); remain > 0 { + h := hint + if len(h) > remain { + h = h[:remain] + } + sb.WriteString(h) + } + } if w.recordMonitor != nil { w.recordMonitor(command, sb.String(), success, invokeErr) } w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr) outW.Close() - }(sr, cmd) + }(sr, userCmd, execCancel, execCtx) return outR, nil } diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 388d6379..c5e66db1 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -143,7 +143,7 @@ func RunEinoSingleChatModelAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor) + fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk) if fsErr != nil { return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr) } diff --git a/internal/multiagent/eino_skills.go b/internal/multiagent/eino_skills.go index 5a0274ac..d20f8f40 100644 --- a/internal/multiagent/eino_skills.go +++ b/internal/multiagent/eino_skills.go @@ -82,6 +82,8 @@ func subAgentFilesystemMiddleware( invokeNotify *einomcp.ToolInvokeNotifyHolder, einoAgentName string, recordMonitor func(command, stdout string, success bool, invokeErr error), + toolTimeoutMinutes int, + outputChunk func(toolName, toolCallID, chunk string), ) (adk.ChatModelAgentMiddleware, error) { if loc == nil { return nil, nil @@ -89,10 +91,20 @@ func subAgentFilesystemMiddleware( return filesystem.New(ctx, &filesystem.MiddlewareConfig{ Backend: loc, StreamingShell: &einoStreamingShellWrap{ - inner: loc, - invokeNotify: invokeNotify, - einoAgentName: strings.TrimSpace(einoAgentName), - recordMonitor: recordMonitor, + inner: loc, + invokeNotify: invokeNotify, + einoAgentName: strings.TrimSpace(einoAgentName), + outputChunk: outputChunk, + recordMonitor: recordMonitor, + toolTimeoutMinutes: toolTimeoutMinutes, }, }) } + +// agentToolTimeoutMinutes 返回 agent.tool_timeout_minutes(与 executeToolViaMCP 一致);cfg 为 nil 时 0。 +func agentToolTimeoutMinutes(cfg *config.Config) int { + if cfg == nil { + return 0 + } + return cfg.Agent.ToolTimeoutMinutes +} diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 65ea5c36..f9478262 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -244,7 +244,7 @@ func RunDeepAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor) + subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk) if fsErr != nil { return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr) } @@ -376,10 +376,12 @@ func RunDeepAgent( if einoLoc != nil && einoFSTools { deepBackend = einoLoc deepShell = &einoStreamingShellWrap{ - inner: einoLoc, - invokeNotify: toolInvokeNotify, - einoAgentName: orchestratorName, - recordMonitor: einoExecMonitor, + inner: einoLoc, + invokeNotify: toolInvokeNotify, + einoAgentName: orchestratorName, + outputChunk: toolOutputChunk, + recordMonitor: einoExecMonitor, + toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg), } } @@ -443,7 +445,7 @@ func RunDeepAgent( // 构建 filesystem 中间件(与 Deep sub-agent 一致) var peFsMw adk.ChatModelAgentMiddleware if einoSkillMW != nil && einoFSTools && einoLoc != nil { - peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor) + peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk) if err != nil { return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err) }