Add files via upload

This commit is contained in:
公明
2026-05-12 16:39:09 +08:00
committed by GitHub
parent eb04ac0c3a
commit 832532213a
5 changed files with 92 additions and 7 deletions
@@ -22,6 +22,9 @@ import (
//
// 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire
// 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。
//
// 若 inner 在校验阶段直接返回 error(未建立 reader),不会进入下方 goroutine,也必须 Fire
// 否则 pending tool_call 要等整轮 run 结束才被 force-close,与已展示的助手/工具软错误文案不同步。
type einoStreamingShellWrap struct {
inner filesystem.StreamingShell
invokeNotify *einomcp.ToolInvokeNotifyHolder
@@ -42,17 +45,24 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
req.RunInBackendGround = true
}
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
agentTag := strings.TrimSpace(w.einoAgentName)
sr, err := w.inner.ExecuteStreaming(ctx, &req)
if err != nil {
if w.recordMonitor != nil {
w.recordMonitor(cmd, "", false, err)
}
if w.invokeNotify != nil && tid != "" {
w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err)
}
return nil, err
}
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
if sr == nil || w.invokeNotify == nil || tid == "" {
return sr, nil
}
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
agentTag := strings.TrimSpace(w.einoAgentName)
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) {
defer inner.Close()
+1 -1
View File
@@ -173,7 +173,7 @@ func RunEinoSingleChatModelAgent(
UnknownToolsHandler: einomcp.UnknownToolReminderHandler(),
ToolCallMiddlewares: []compose.ToolMiddleware{
hitlToolCallMiddleware(),
{Invokable: softRecoveryToolCallMiddleware()},
softRecoveryToolMiddleware(),
},
},
EmitInternalEvents: true,
+2 -2
View File
@@ -290,7 +290,7 @@ func RunDeepAgent(
UnknownToolsHandler: einomcp.UnknownToolReminderHandler(),
ToolCallMiddlewares: []compose.ToolMiddleware{
hitlToolCallMiddleware(),
{Invokable: softRecoveryToolCallMiddleware()},
softRecoveryToolMiddleware(),
},
},
EmitInternalEvents: true,
@@ -439,7 +439,7 @@ func RunDeepAgent(
UnknownToolsHandler: einomcp.UnknownToolReminderHandler(),
ToolCallMiddlewares: []compose.ToolMiddleware{
hitlToolCallMiddleware(),
{Invokable: softRecoveryToolCallMiddleware()},
softRecoveryToolMiddleware(),
},
},
EmitInternalEvents: true,
+42 -2
View File
@@ -8,6 +8,7 @@ import (
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
)
// softRecoveryToolCallMiddleware returns an InvokableToolMiddleware that catches
@@ -16,8 +17,9 @@ import (
// returned to the LLM. This allows the model to self-correct within the same
// iteration rather than crashing the entire graph and requiring a full replay.
//
// Without this middleware, a JSON parse failure in any tool's InvokableRun propagates
// as a hard error through the Eino ToolsNode → [NodeRunError] → ev.Err, which
// Without Invokable (+ Streamable where applicable) registration, a JSON parse failure
// in InvokableRun / StreamableRun propagates as a hard error through the Eino ToolsNode
// → [NodeRunError] → ev.Err, which
// either triggers the full-replay retry loop (expensive) or terminates the run
// entirely once retries are exhausted. With it, the LLM simply sees an error message
// in the tool result and can adjust its next tool call accordingly.
@@ -39,6 +41,44 @@ func softRecoveryToolCallMiddleware() compose.InvokableToolMiddleware {
}
}
// softRecoveryStreamableToolCallMiddleware mirrors softRecoveryToolCallMiddleware for
// tools that implement StreamableTool only (e.g. Eino ADK filesystem execute).
// Eino applies Invokable vs Streamable middleware to disjoint code paths in ToolsNode;
// registering only Invokable leaves streaming tools uncovered — empty/malformed JSON
// then fails inside [LocalStreamFunc] before the inner endpoint runs.
func softRecoveryStreamableToolCallMiddleware() compose.StreamableToolMiddleware {
return func(next compose.StreamableToolEndpoint) compose.StreamableToolEndpoint {
return func(ctx context.Context, input *compose.ToolInput) (*compose.StreamToolOutput, error) {
out, err := next(ctx, input)
if err == nil {
return out, nil
}
if !isSoftRecoverableToolError(err) {
return out, err
}
toolName := ""
args := ""
if input != nil {
toolName = input.Name
args = input.Arguments
}
msg := buildSoftRecoveryMessage(toolName, args, err)
return &compose.StreamToolOutput{
Result: schema.StreamReaderFromArray([]string{msg}),
}, nil
}
}
}
// softRecoveryToolMiddleware returns a ToolMiddleware with both Invokable and Streamable
// soft recovery (same semantics as hitlToolCallMiddleware bundling).
func softRecoveryToolMiddleware() compose.ToolMiddleware {
return compose.ToolMiddleware{
Invokable: softRecoveryToolCallMiddleware(),
Streamable: softRecoveryStreamableToolCallMiddleware(),
}
}
// isSoftRecoverableToolError determines whether a tool execution error should be
// silently converted to a tool-result message rather than crashing the graph.
//
@@ -4,6 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"io"
"strings"
"testing"
"github.com/cloudwego/eino/compose"
@@ -108,6 +110,39 @@ func TestSoftRecoveryToolCallMiddleware_PassesThrough(t *testing.T) {
}
}
func TestSoftRecoveryStreamableToolCallMiddleware_LocalStreamFuncJSONError(t *testing.T) {
mw := softRecoveryStreamableToolCallMiddleware()
next := func(ctx context.Context, input *compose.ToolInput) (*compose.StreamToolOutput, error) {
return nil, errors.New(`[LocalStreamFunc] failed to unmarshal arguments in json, toolName=execute, err="Syntax error no sources available, the input json is empty`)
}
wrapped := mw(next)
out, err := wrapped(context.Background(), &compose.ToolInput{
Name: "execute",
Arguments: "",
})
if err != nil {
t.Fatalf("expected nil error (soft recovery), got: %v", err)
}
if out == nil || out.Result == nil {
t.Fatal("expected stream result")
}
var sb strings.Builder
for {
chunk, rerr := out.Result.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
sb.WriteString(chunk)
}
text := sb.String()
if !containsAll(text, "[Tool Error]", "execute", "JSON") {
t.Fatalf("recovery message missing expected content: %s", text)
}
}
func TestSoftRecoveryToolCallMiddleware_ConvertsJSONError(t *testing.T) {
mw := softRecoveryToolCallMiddleware()
next := func(ctx context.Context, input *compose.ToolInput) (*compose.ToolOutput, error) {