diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 4f34c904..6f6629a1 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -345,7 +345,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } runnerCfg := adk.RunnerConfig{ - Agent: da, + Agent: da, + // 启用 ADK 流式事件:plan_execute 也需要输出 reasoning/response 流, + // 与 deep/supervisor/eino_single 的前端体验保持一致。 EnableStreaming: true, } var cpStore *fileCheckPointStore diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index 6b2dc3d0..8461225f 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -148,14 +148,12 @@ func planExecutePlannerGenInput( } return func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) { userInput = capPlanExecuteUserInputMessages(userInput, appCfg, mwCfg) - msgs := make([]adk.Message, 0, 1+len(userInput)) - if oi != "" { - msgs = append(msgs, schema.SystemMessage(oi)) - } + msgs := make([]adk.Message, 0, len(userInput)) msgs = append(msgs, userInput...) if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 { msgs = rewritten } + msgs = normalizeSingleLeadingSystemMessage(msgs, oi) logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_planner", msgs) return msgs, nil } @@ -184,9 +182,7 @@ func planExecuteExecutorGenInput( if err != nil { return nil, err } - if oi != "" { - userMsgs = append([]adk.Message{schema.SystemMessage(oi)}, userMsgs...) - } + userMsgs = normalizeSingleLeadingSystemMessage(userMsgs, oi) logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_executor_gen_input", userMsgs) return userMsgs, nil } @@ -233,17 +229,54 @@ func planExecuteReplannerGenInput( if err != nil { return nil, err } - if oi != "" { - msgs = append([]adk.Message{schema.SystemMessage(oi)}, msgs...) - } if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 { msgs = rewritten } + msgs = normalizeSingleLeadingSystemMessage(msgs, oi) logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_replanner", msgs) return msgs, nil } } +// normalizeSingleLeadingSystemMessage enforces a provider-friendly message shape: +// exactly one system message at index 0 (when any system context exists). +// For strict OpenAI-compatible backends (e.g. qwen/vllm templates), this avoids +// "System message must be at the beginning" caused by multiple/disordered system messages. +func normalizeSingleLeadingSystemMessage(msgs []adk.Message, extraSystem string) []adk.Message { + extraSystem = strings.TrimSpace(extraSystem) + if len(msgs) == 0 { + if extraSystem == "" { + return msgs + } + return []adk.Message{schema.SystemMessage(extraSystem)} + } + + systemParts := make([]string, 0, 2) + if extraSystem != "" { + systemParts = append(systemParts, extraSystem) + } + nonSystem := make([]adk.Message, 0, len(msgs)) + for _, msg := range msgs { + if msg == nil { + continue + } + if msg.Role == schema.System { + if s := strings.TrimSpace(msg.Content); s != "" { + systemParts = append(systemParts, s) + } + continue + } + nonSystem = append(nonSystem, msg) + } + if len(systemParts) == 0 { + return nonSystem + } + out := make([]adk.Message, 0, len(nonSystem)+1) + out = append(out, schema.SystemMessage(strings.Join(systemParts, "\n\n"))) + out = append(out, nonSystem...) + return out +} + func capPlanExecuteUserInputMessages(input []adk.Message, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message { if len(input) == 0 { return input diff --git a/internal/multiagent/eino_orchestration_system_message_test.go b/internal/multiagent/eino_orchestration_system_message_test.go new file mode 100644 index 00000000..2cb32cfc --- /dev/null +++ b/internal/multiagent/eino_orchestration_system_message_test.go @@ -0,0 +1,45 @@ +package multiagent + +import ( + "testing" + + "github.com/cloudwego/eino/adk" + "github.com/cloudwego/eino/schema" +) + +func TestNormalizeSingleLeadingSystemMessage_MergesMultipleSystems(t *testing.T) { + in := []adk.Message{ + schema.SystemMessage("sys-1"), + schema.UserMessage("u1"), + schema.SystemMessage("sys-2"), + schema.AssistantMessage("a1", nil), + } + out := normalizeSingleLeadingSystemMessage(in, "orch") + if len(out) != 3 { + t.Fatalf("unexpected output length: got %d want 3", len(out)) + } + if out[0].Role != schema.System { + t.Fatalf("first message role must be system, got %s", out[0].Role) + } + if got := out[0].Content; got != "orch\n\nsys-1\n\nsys-2" { + t.Fatalf("unexpected merged system content: %q", got) + } + if out[1].Role != schema.User || out[2].Role != schema.Assistant { + t.Fatalf("non-system message order changed unexpectedly") + } +} + +func TestNormalizeSingleLeadingSystemMessage_NoSystemKeepsFlow(t *testing.T) { + in := []adk.Message{ + schema.UserMessage("u1"), + schema.AssistantMessage("a1", nil), + } + out := normalizeSingleLeadingSystemMessage(in, "") + if len(out) != 2 { + t.Fatalf("unexpected output length: got %d want 2", len(out)) + } + if out[0].Role != schema.User || out[1].Role != schema.Assistant { + t.Fatalf("message order changed unexpectedly") + } +} + diff --git a/internal/multiagent/eino_transient_retry.go b/internal/multiagent/eino_transient_retry.go index 895f062b..ca7b214c 100644 --- a/internal/multiagent/eino_transient_retry.go +++ b/internal/multiagent/eino_transient_retry.go @@ -3,6 +3,7 @@ package multiagent import ( "context" "errors" + "io" "strings" "time" @@ -23,6 +24,10 @@ func isEinoTransientRunError(err error) bool { if err == nil { return false } + // io.EOF 常见于流式正常收尾,不应触发分段重试。 + if errors.Is(err, io.EOF) { + return false + } if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return false } @@ -55,7 +60,6 @@ func isEinoTransientRunError(err error) bool { "no such host", "network is unreachable", "broken pipe", - "eof", "read tcp", "write tcp", "dial tcp", diff --git a/internal/multiagent/eino_transient_retry_test.go b/internal/multiagent/eino_transient_retry_test.go index 80107501..7ba15afd 100644 --- a/internal/multiagent/eino_transient_retry_test.go +++ b/internal/multiagent/eino_transient_retry_test.go @@ -3,6 +3,7 @@ package multiagent import ( "context" "errors" + "io" "testing" "time" @@ -18,9 +19,12 @@ func TestIsEinoTransientRunError(t *testing.T) { want bool }{ {"nil", nil, false}, + {"io eof", io.EOF, false}, + {"plain eof text", errors.New("EOF"), false}, {"429", errors.New("HTTP 429 Too Many Requests"), true}, {"rate limit", errors.New(`{"error":"rate limit exceeded"}`), true}, {"connection reset", errors.New("read tcp: connection reset by peer"), true}, + {"unexpected eof", errors.New("unexpected EOF"), true}, {"503", errors.New("upstream returned 503"), true}, {"iteration limit", errors.New("max iteration reached"), false}, {"canceled", context.Canceled, false},