Add files via upload

This commit is contained in:
公明
2026-05-28 14:12:44 +08:00
committed by GitHub
parent 4d048f6da0
commit 21c36fcce8
5 changed files with 100 additions and 12 deletions
+3 -1
View File
@@ -345,7 +345,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
runnerCfg := adk.RunnerConfig{
Agent: da,
Agent: da,
// 启用 ADK 流式事件:plan_execute 也需要输出 reasoning/response 流,
// 与 deep/supervisor/eino_single 的前端体验保持一致。
EnableStreaming: true,
}
var cpStore *fileCheckPointStore
+43 -10
View File
@@ -148,14 +148,12 @@ func planExecutePlannerGenInput(
}
return func(ctx context.Context, userInput []adk.Message) ([]adk.Message, error) {
userInput = capPlanExecuteUserInputMessages(userInput, appCfg, mwCfg)
msgs := make([]adk.Message, 0, 1+len(userInput))
if oi != "" {
msgs = append(msgs, schema.SystemMessage(oi))
}
msgs := make([]adk.Message, 0, len(userInput))
msgs = append(msgs, userInput...)
if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 {
msgs = rewritten
}
msgs = normalizeSingleLeadingSystemMessage(msgs, oi)
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_planner", msgs)
return msgs, nil
}
@@ -184,9 +182,7 @@ func planExecuteExecutorGenInput(
if err != nil {
return nil, err
}
if oi != "" {
userMsgs = append([]adk.Message{schema.SystemMessage(oi)}, userMsgs...)
}
userMsgs = normalizeSingleLeadingSystemMessage(userMsgs, oi)
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_executor_gen_input", userMsgs)
return userMsgs, nil
}
@@ -233,17 +229,54 @@ func planExecuteReplannerGenInput(
if err != nil {
return nil, err
}
if oi != "" {
msgs = append([]adk.Message{schema.SystemMessage(oi)}, msgs...)
}
if rewritten, rerr := applyBeforeModelRewriteHandlers(ctx, msgs, rewriteHandlers); rerr == nil && len(rewritten) > 0 {
msgs = rewritten
}
msgs = normalizeSingleLeadingSystemMessage(msgs, oi)
logPlanExecuteModelInputEstimate(logger, modelName, conversationID, "plan_execute_replanner", msgs)
return msgs, nil
}
}
// normalizeSingleLeadingSystemMessage enforces a provider-friendly message shape:
// exactly one system message at index 0 (when any system context exists).
// For strict OpenAI-compatible backends (e.g. qwen/vllm templates), this avoids
// "System message must be at the beginning" caused by multiple/disordered system messages.
func normalizeSingleLeadingSystemMessage(msgs []adk.Message, extraSystem string) []adk.Message {
extraSystem = strings.TrimSpace(extraSystem)
if len(msgs) == 0 {
if extraSystem == "" {
return msgs
}
return []adk.Message{schema.SystemMessage(extraSystem)}
}
systemParts := make([]string, 0, 2)
if extraSystem != "" {
systemParts = append(systemParts, extraSystem)
}
nonSystem := make([]adk.Message, 0, len(msgs))
for _, msg := range msgs {
if msg == nil {
continue
}
if msg.Role == schema.System {
if s := strings.TrimSpace(msg.Content); s != "" {
systemParts = append(systemParts, s)
}
continue
}
nonSystem = append(nonSystem, msg)
}
if len(systemParts) == 0 {
return nonSystem
}
out := make([]adk.Message, 0, len(nonSystem)+1)
out = append(out, schema.SystemMessage(strings.Join(systemParts, "\n\n")))
out = append(out, nonSystem...)
return out
}
func capPlanExecuteUserInputMessages(input []adk.Message, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message {
if len(input) == 0 {
return input
@@ -0,0 +1,45 @@
package multiagent
import (
"testing"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
)
func TestNormalizeSingleLeadingSystemMessage_MergesMultipleSystems(t *testing.T) {
in := []adk.Message{
schema.SystemMessage("sys-1"),
schema.UserMessage("u1"),
schema.SystemMessage("sys-2"),
schema.AssistantMessage("a1", nil),
}
out := normalizeSingleLeadingSystemMessage(in, "orch")
if len(out) != 3 {
t.Fatalf("unexpected output length: got %d want 3", len(out))
}
if out[0].Role != schema.System {
t.Fatalf("first message role must be system, got %s", out[0].Role)
}
if got := out[0].Content; got != "orch\n\nsys-1\n\nsys-2" {
t.Fatalf("unexpected merged system content: %q", got)
}
if out[1].Role != schema.User || out[2].Role != schema.Assistant {
t.Fatalf("non-system message order changed unexpectedly")
}
}
func TestNormalizeSingleLeadingSystemMessage_NoSystemKeepsFlow(t *testing.T) {
in := []adk.Message{
schema.UserMessage("u1"),
schema.AssistantMessage("a1", nil),
}
out := normalizeSingleLeadingSystemMessage(in, "")
if len(out) != 2 {
t.Fatalf("unexpected output length: got %d want 2", len(out))
}
if out[0].Role != schema.User || out[1].Role != schema.Assistant {
t.Fatalf("message order changed unexpectedly")
}
}
+5 -1
View File
@@ -3,6 +3,7 @@ package multiagent
import (
"context"
"errors"
"io"
"strings"
"time"
@@ -23,6 +24,10 @@ func isEinoTransientRunError(err error) bool {
if err == nil {
return false
}
// io.EOF 常见于流式正常收尾,不应触发分段重试。
if errors.Is(err, io.EOF) {
return false
}
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return false
}
@@ -55,7 +60,6 @@ func isEinoTransientRunError(err error) bool {
"no such host",
"network is unreachable",
"broken pipe",
"eof",
"read tcp",
"write tcp",
"dial tcp",
@@ -3,6 +3,7 @@ package multiagent
import (
"context"
"errors"
"io"
"testing"
"time"
@@ -18,9 +19,12 @@ func TestIsEinoTransientRunError(t *testing.T) {
want bool
}{
{"nil", nil, false},
{"io eof", io.EOF, false},
{"plain eof text", errors.New("EOF"), false},
{"429", errors.New("HTTP 429 Too Many Requests"), true},
{"rate limit", errors.New(`{"error":"rate limit exceeded"}`), true},
{"connection reset", errors.New("read tcp: connection reset by peer"), true},
{"unexpected eof", errors.New("unexpected EOF"), true},
{"503", errors.New("upstream returned 503"), true},
{"iteration limit", errors.New("max iteration reached"), false},
{"canceled", context.Canceled, false},