Add files via upload

This commit is contained in:
公明
2026-06-22 17:53:52 +08:00
committed by GitHub
parent ae98288b62
commit b2c8913014
2 changed files with 163 additions and 1 deletions
@@ -34,6 +34,15 @@ func einoExecuteTimeoutUserHint() string {
return "已超时终止 · Timed out"
}
// einoExecuteRecvErrIsToolTimeout 判断 Recv 错误是否由 agent.tool_timeout_minutes 触发。
// WithTimeout 到期后 local 侧常报 canceled / exit -1,但 execCtx.Err() 仍为 DeadlineExceeded。
func einoExecuteRecvErrIsToolTimeout(rerr error, tctx context.Context) bool {
if tctx != nil && errors.Is(tctx.Err(), context.DeadlineExceeded) {
return true
}
return errors.Is(rerr, context.DeadlineExceeded)
}
// einoStreamingShellWrap 包装 Eino filesystem 使用的 StreamingShellcloudwego eino-ext local.Local)。
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
@@ -83,6 +92,16 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
if execCancel != nil {
execCancel()
}
if einoExecuteRecvErrIsToolTimeout(err, execCtx) {
hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n"
if w.recordMonitor != nil {
w.recordMonitor(tid, userCmd, hint, false, context.DeadlineExceeded)
}
if w.invokeNotify != nil && tid != "" {
w.invokeNotify.Fire(tid, "execute", agentTag, false, hint, context.DeadlineExceeded)
}
return schema.StreamReaderFromArray([]*filesystem.ExecuteResponse{{Output: hint}}), nil
}
if w.recordMonitor != nil {
w.recordMonitor(tid, userCmd, "", false, err)
}
@@ -91,7 +110,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
}
return nil, err
}
if sr == nil || w.invokeNotify == nil || tid == "" {
if sr == nil || w.invokeNotify == nil {
if execCancel != nil {
execCancel()
}
@@ -120,6 +139,11 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
if rerr != nil {
success = false
invokeErr = rerr
// 单次 execute 超时须与 MCP 工具一致:写入工具结果尾标、继续迭代,不得向 ADK 流注入硬错误。
if einoExecuteRecvErrIsToolTimeout(rerr, tctx) {
invokeErr = context.DeadlineExceeded
break
}
_ = outW.Send(nil, rerr)
break
}
@@ -0,0 +1,138 @@
package multiagent
import (
"context"
"errors"
"io"
"strings"
"testing"
"time"
"cyberstrike-ai/internal/einomcp"
"github.com/cloudwego/eino/adk/filesystem"
"github.com/cloudwego/eino/schema"
)
type mockStreamingShell struct {
immediateErr error
recvErr error
output string
}
func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
if m.immediateErr != nil {
return nil, m.immediateErr
}
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
go func() {
defer outW.Close()
if strings.TrimSpace(m.output) != "" {
_ = outW.Send(&filesystem.ExecuteResponse{Output: m.output}, nil)
}
if m.recvErr != nil {
_ = outW.Send(nil, m.recvErr)
}
}()
return outR, nil
}
func TestEinoExecuteRecvErrIsToolTimeout(t *testing.T) {
tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
time.Sleep(2 * time.Millisecond)
<-tctx.Done()
if !einoExecuteRecvErrIsToolTimeout(context.Canceled, tctx) {
t.Fatal("expected canceled recv with deadline exec ctx to count as tool timeout")
}
if !einoExecuteRecvErrIsToolTimeout(context.DeadlineExceeded, nil) {
t.Fatal("expected DeadlineExceeded recv without tctx")
}
if einoExecuteRecvErrIsToolTimeout(errors.New("exit status 1"), context.Background()) {
t.Fatal("unexpected timeout for generic error")
}
}
func TestEinoStreamingShellWrap_ToolTimeoutImmediateErrIsSoft(t *testing.T) {
inner := &mockStreamingShell{immediateErr: context.DeadlineExceeded}
wrap := &einoStreamingShellWrap{
inner: inner,
toolTimeoutMinutes: 60,
}
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "true"})
if err != nil {
t.Fatalf("immediate tool timeout must return soft stream, got err: %v", err)
}
defer sr.Close()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("outer stream must not hard-fail, got: %v", rerr)
}
if resp != nil && resp.Output != "" {
got.WriteString(resp.Output)
}
}
if !strings.Contains(got.String(), einoExecuteTimeoutUserHint()) {
t.Fatalf("expected timeout hint, got: %q", got.String())
}
}
func TestEinoStreamingShellWrap_ToolTimeoutRecvErrIsSoft(t *testing.T) {
inner := &mockStreamingShell{recvErr: context.DeadlineExceeded}
notify := einomcp.NewToolInvokeNotifyHolder()
wrap := &einoStreamingShellWrap{
inner: inner,
invokeNotify: notify,
toolTimeoutMinutes: 60,
}
// 生产路径由 Eino compose 注入 toolCallID;单测通过已过期 execCtx 识别 tool_timeout 软错误。
tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
time.Sleep(2 * time.Millisecond)
<-tctx.Done()
sr, err := wrap.ExecuteStreaming(tctx, &filesystem.ExecuteRequest{Command: "sleep 999"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("outer stream must not hard-fail on tool timeout, got: %v", rerr)
}
if resp != nil && resp.Output != "" {
got.WriteString(resp.Output)
}
}
if !strings.Contains(got.String(), einoExecuteTimeoutUserHint()) {
t.Fatalf("expected timeout hint in stream, got: %q", got.String())
}
}
func TestEinoStreamingShellWrap_NonTimeoutRecvErrStillHard(t *testing.T) {
inner := &mockStreamingShell{recvErr: errors.New("broken pipe")}
wrap := &einoStreamingShellWrap{inner: inner}
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "true"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
_, rerr := sr.Recv()
if rerr == nil || errors.Is(rerr, io.EOF) {
t.Fatal("expected hard stream error for non-timeout failure")
}
}