diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index 392739b5..c20f7609 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -22,6 +22,9 @@ import ( // // 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire, // 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。 +// +// 若 inner 在校验阶段直接返回 error(未建立 reader),不会进入下方 goroutine,也必须 Fire; +// 否则 pending tool_call 要等整轮 run 结束才被 force-close,与已展示的助手/工具软错误文案不同步。 type einoStreamingShellWrap struct { inner filesystem.StreamingShell invokeNotify *einomcp.ToolInvokeNotifyHolder @@ -42,17 +45,24 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround { req.RunInBackendGround = true } + tid := strings.TrimSpace(compose.GetToolCallID(ctx)) + agentTag := strings.TrimSpace(w.einoAgentName) + sr, err := w.inner.ExecuteStreaming(ctx, &req) if err != nil { + if w.recordMonitor != nil { + w.recordMonitor(cmd, "", false, err) + } + if w.invokeNotify != nil && tid != "" { + w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err) + } return nil, err } - tid := strings.TrimSpace(compose.GetToolCallID(ctx)) if sr == nil || w.invokeNotify == nil || tid == "" { return sr, nil } outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32) - agentTag := strings.TrimSpace(w.einoAgentName) go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) { defer inner.Close() diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 1d5267df..5f7cf4bd 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -173,7 +173,7 @@ func RunEinoSingleChatModelAgent( UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ hitlToolCallMiddleware(), - {Invokable: softRecoveryToolCallMiddleware()}, + softRecoveryToolMiddleware(), }, }, EmitInternalEvents: true, diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 34b2a40c..df37c7d5 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -290,7 +290,7 @@ func RunDeepAgent( UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ hitlToolCallMiddleware(), - {Invokable: softRecoveryToolCallMiddleware()}, + softRecoveryToolMiddleware(), }, }, EmitInternalEvents: true, @@ -439,7 +439,7 @@ func RunDeepAgent( UnknownToolsHandler: einomcp.UnknownToolReminderHandler(), ToolCallMiddlewares: []compose.ToolMiddleware{ hitlToolCallMiddleware(), - {Invokable: softRecoveryToolCallMiddleware()}, + softRecoveryToolMiddleware(), }, }, EmitInternalEvents: true, diff --git a/internal/multiagent/tool_error_middleware.go b/internal/multiagent/tool_error_middleware.go index 15e523a9..899faeb7 100644 --- a/internal/multiagent/tool_error_middleware.go +++ b/internal/multiagent/tool_error_middleware.go @@ -8,6 +8,7 @@ import ( "strings" "github.com/cloudwego/eino/compose" + "github.com/cloudwego/eino/schema" ) // softRecoveryToolCallMiddleware returns an InvokableToolMiddleware that catches @@ -16,8 +17,9 @@ import ( // returned to the LLM. This allows the model to self-correct within the same // iteration rather than crashing the entire graph and requiring a full replay. // -// Without this middleware, a JSON parse failure in any tool's InvokableRun propagates -// as a hard error through the Eino ToolsNode → [NodeRunError] → ev.Err, which +// Without Invokable (+ Streamable where applicable) registration, a JSON parse failure +// in InvokableRun / StreamableRun propagates as a hard error through the Eino ToolsNode +// → [NodeRunError] → ev.Err, which // either triggers the full-replay retry loop (expensive) or terminates the run // entirely once retries are exhausted. With it, the LLM simply sees an error message // in the tool result and can adjust its next tool call accordingly. @@ -39,6 +41,44 @@ func softRecoveryToolCallMiddleware() compose.InvokableToolMiddleware { } } +// softRecoveryStreamableToolCallMiddleware mirrors softRecoveryToolCallMiddleware for +// tools that implement StreamableTool only (e.g. Eino ADK filesystem execute). +// Eino applies Invokable vs Streamable middleware to disjoint code paths in ToolsNode; +// registering only Invokable leaves streaming tools uncovered — empty/malformed JSON +// then fails inside [LocalStreamFunc] before the inner endpoint runs. +func softRecoveryStreamableToolCallMiddleware() compose.StreamableToolMiddleware { + return func(next compose.StreamableToolEndpoint) compose.StreamableToolEndpoint { + return func(ctx context.Context, input *compose.ToolInput) (*compose.StreamToolOutput, error) { + out, err := next(ctx, input) + if err == nil { + return out, nil + } + if !isSoftRecoverableToolError(err) { + return out, err + } + toolName := "" + args := "" + if input != nil { + toolName = input.Name + args = input.Arguments + } + msg := buildSoftRecoveryMessage(toolName, args, err) + return &compose.StreamToolOutput{ + Result: schema.StreamReaderFromArray([]string{msg}), + }, nil + } + } +} + +// softRecoveryToolMiddleware returns a ToolMiddleware with both Invokable and Streamable +// soft recovery (same semantics as hitlToolCallMiddleware bundling). +func softRecoveryToolMiddleware() compose.ToolMiddleware { + return compose.ToolMiddleware{ + Invokable: softRecoveryToolCallMiddleware(), + Streamable: softRecoveryStreamableToolCallMiddleware(), + } +} + // isSoftRecoverableToolError determines whether a tool execution error should be // silently converted to a tool-result message rather than crashing the graph. // diff --git a/internal/multiagent/tool_error_middleware_test.go b/internal/multiagent/tool_error_middleware_test.go index bf2e622e..37e4fd70 100644 --- a/internal/multiagent/tool_error_middleware_test.go +++ b/internal/multiagent/tool_error_middleware_test.go @@ -4,6 +4,8 @@ import ( "context" "encoding/json" "errors" + "io" + "strings" "testing" "github.com/cloudwego/eino/compose" @@ -108,6 +110,39 @@ func TestSoftRecoveryToolCallMiddleware_PassesThrough(t *testing.T) { } } +func TestSoftRecoveryStreamableToolCallMiddleware_LocalStreamFuncJSONError(t *testing.T) { + mw := softRecoveryStreamableToolCallMiddleware() + next := func(ctx context.Context, input *compose.ToolInput) (*compose.StreamToolOutput, error) { + return nil, errors.New(`[LocalStreamFunc] failed to unmarshal arguments in json, toolName=execute, err="Syntax error no sources available, the input json is empty`) + } + wrapped := mw(next) + out, err := wrapped(context.Background(), &compose.ToolInput{ + Name: "execute", + Arguments: "", + }) + if err != nil { + t.Fatalf("expected nil error (soft recovery), got: %v", err) + } + if out == nil || out.Result == nil { + t.Fatal("expected stream result") + } + var sb strings.Builder + for { + chunk, rerr := out.Result.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + t.Fatalf("recv: %v", rerr) + } + sb.WriteString(chunk) + } + text := sb.String() + if !containsAll(text, "[Tool Error]", "execute", "JSON") { + t.Fatalf("recovery message missing expected content: %s", text) + } +} + func TestSoftRecoveryToolCallMiddleware_ConvertsJSONError(t *testing.T) { mw := softRecoveryToolCallMiddleware() next := func(ctx context.Context, input *compose.ToolInput) (*compose.ToolOutput, error) {