diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index 2dfb0a18..a69586f7 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -34,6 +34,15 @@ func einoExecuteTimeoutUserHint() string { return "已超时终止 · Timed out" } +// einoExecuteRecvErrIsToolTimeout 判断 Recv 错误是否由 agent.tool_timeout_minutes 触发。 +// WithTimeout 到期后 local 侧常报 canceled / exit -1,但 execCtx.Err() 仍为 DeadlineExceeded。 +func einoExecuteRecvErrIsToolTimeout(rerr error, tctx context.Context) bool { + if tctx != nil && errors.Is(tctx.Err(), context.DeadlineExceeded) { + return true + } + return errors.Is(rerr, context.DeadlineExceeded) +} + // einoStreamingShellWrap 包装 Eino filesystem 使用的 StreamingShell(cloudwego eino-ext local.Local)。 // 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连, // streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。 @@ -83,6 +92,16 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if execCancel != nil { execCancel() } + if einoExecuteRecvErrIsToolTimeout(err, execCtx) { + hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n" + if w.recordMonitor != nil { + w.recordMonitor(tid, userCmd, hint, false, context.DeadlineExceeded) + } + if w.invokeNotify != nil && tid != "" { + w.invokeNotify.Fire(tid, "execute", agentTag, false, hint, context.DeadlineExceeded) + } + return schema.StreamReaderFromArray([]*filesystem.ExecuteResponse{{Output: hint}}), nil + } if w.recordMonitor != nil { w.recordMonitor(tid, userCmd, "", false, err) } @@ -91,7 +110,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi } return nil, err } - if sr == nil || w.invokeNotify == nil || tid == "" { + if sr == nil || w.invokeNotify == nil { if execCancel != nil { execCancel() } @@ -120,6 +139,11 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if rerr != nil { success = false invokeErr = rerr + // 单次 execute 超时须与 MCP 工具一致:写入工具结果尾标、继续迭代,不得向 ADK 流注入硬错误。 + if einoExecuteRecvErrIsToolTimeout(rerr, tctx) { + invokeErr = context.DeadlineExceeded + break + } _ = outW.Send(nil, rerr) break } diff --git a/internal/multiagent/eino_execute_streaming_wrap_test.go b/internal/multiagent/eino_execute_streaming_wrap_test.go new file mode 100644 index 00000000..3cadcfa5 --- /dev/null +++ b/internal/multiagent/eino_execute_streaming_wrap_test.go @@ -0,0 +1,138 @@ +package multiagent + +import ( + "context" + "errors" + "io" + "strings" + "testing" + "time" + + "cyberstrike-ai/internal/einomcp" + + "github.com/cloudwego/eino/adk/filesystem" + "github.com/cloudwego/eino/schema" +) + +type mockStreamingShell struct { + immediateErr error + recvErr error + output string +} + +func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { + if m.immediateErr != nil { + return nil, m.immediateErr + } + outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4) + go func() { + defer outW.Close() + if strings.TrimSpace(m.output) != "" { + _ = outW.Send(&filesystem.ExecuteResponse{Output: m.output}, nil) + } + if m.recvErr != nil { + _ = outW.Send(nil, m.recvErr) + } + }() + return outR, nil +} + +func TestEinoExecuteRecvErrIsToolTimeout(t *testing.T) { + tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + time.Sleep(2 * time.Millisecond) + <-tctx.Done() + + if !einoExecuteRecvErrIsToolTimeout(context.Canceled, tctx) { + t.Fatal("expected canceled recv with deadline exec ctx to count as tool timeout") + } + if !einoExecuteRecvErrIsToolTimeout(context.DeadlineExceeded, nil) { + t.Fatal("expected DeadlineExceeded recv without tctx") + } + if einoExecuteRecvErrIsToolTimeout(errors.New("exit status 1"), context.Background()) { + t.Fatal("unexpected timeout for generic error") + } +} + +func TestEinoStreamingShellWrap_ToolTimeoutImmediateErrIsSoft(t *testing.T) { + inner := &mockStreamingShell{immediateErr: context.DeadlineExceeded} + wrap := &einoStreamingShellWrap{ + inner: inner, + toolTimeoutMinutes: 60, + } + sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "true"}) + if err != nil { + t.Fatalf("immediate tool timeout must return soft stream, got err: %v", err) + } + defer sr.Close() + + var got strings.Builder + for { + resp, rerr := sr.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + t.Fatalf("outer stream must not hard-fail, got: %v", rerr) + } + if resp != nil && resp.Output != "" { + got.WriteString(resp.Output) + } + } + if !strings.Contains(got.String(), einoExecuteTimeoutUserHint()) { + t.Fatalf("expected timeout hint, got: %q", got.String()) + } +} + +func TestEinoStreamingShellWrap_ToolTimeoutRecvErrIsSoft(t *testing.T) { + inner := &mockStreamingShell{recvErr: context.DeadlineExceeded} + notify := einomcp.NewToolInvokeNotifyHolder() + wrap := &einoStreamingShellWrap{ + inner: inner, + invokeNotify: notify, + toolTimeoutMinutes: 60, + } + // 生产路径由 Eino compose 注入 toolCallID;单测通过已过期 execCtx 识别 tool_timeout 软错误。 + tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + time.Sleep(2 * time.Millisecond) + <-tctx.Done() + + sr, err := wrap.ExecuteStreaming(tctx, &filesystem.ExecuteRequest{Command: "sleep 999"}) + if err != nil { + t.Fatalf("ExecuteStreaming: %v", err) + } + defer sr.Close() + + var got strings.Builder + for { + resp, rerr := sr.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + t.Fatalf("outer stream must not hard-fail on tool timeout, got: %v", rerr) + } + if resp != nil && resp.Output != "" { + got.WriteString(resp.Output) + } + } + if !strings.Contains(got.String(), einoExecuteTimeoutUserHint()) { + t.Fatalf("expected timeout hint in stream, got: %q", got.String()) + } +} + +func TestEinoStreamingShellWrap_NonTimeoutRecvErrStillHard(t *testing.T) { + inner := &mockStreamingShell{recvErr: errors.New("broken pipe")} + wrap := &einoStreamingShellWrap{inner: inner} + sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "true"}) + if err != nil { + t.Fatalf("ExecuteStreaming: %v", err) + } + defer sr.Close() + + _, rerr := sr.Recv() + if rerr == nil || errors.Is(rerr, io.EOF) { + t.Fatal("expected hard stream error for non-timeout failure") + } +}