From 810d689132e75f7b13c1472722f39b82faef9cbb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 24 Jun 2026 12:08:13 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 15 +++++++++-- internal/multiagent/eino_transient_retry.go | 2 +- .../multiagent/eino_transient_retry_test.go | 26 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 59176c8e..4f2c9482 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -553,6 +553,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs return true, nil } + // 仅在退避重试后真正收到数据/完成一步时清零,避免重启后首个无错 ADK 事件误把计数打回 0。 + confirmTransientRetryRecovery := func() { + if transientRetrier.attempt() > 0 { + transientRetrier.reset() + } + } + takePartial := func(runErr error) (*RunResult, error) { if len(runAccumulatedMsgs) <= baseAccumulatedCount { return nil, runErr @@ -638,8 +645,6 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if restarted { continue } - } else { - transientRetrier.reset() } if ev.AgentName != "" && progress != nil { iterEinoAgent := orchestratorName @@ -719,6 +724,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs zap.String("agent", ev.AgentName), zap.String("tool", toolName)) } + if toolStreamRecvErr == nil { + confirmTransientRetryRecovery() + } continue } @@ -990,6 +998,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if restarted { continue } + } else { + confirmTransientRetryRecovery() } continue } @@ -1083,6 +1093,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs toolCallID := strings.TrimSpace(msg.ToolCallID) tryEmitToolResultProgress(toolName, content, toolCallID, isErr, ev.AgentName) } + confirmTransientRetryRecovery() } mcpIDsMu.Lock() diff --git a/internal/multiagent/eino_transient_retry.go b/internal/multiagent/eino_transient_retry.go index dfeb8228..b73760a6 100644 --- a/internal/multiagent/eino_transient_retry.go +++ b/internal/multiagent/eino_transient_retry.go @@ -143,7 +143,7 @@ func (r *einoTransientRunRetrier) attempt() int { return r.attempts } func (r *einoTransientRunRetrier) maxAttempts() int { return r.policy.maxAttempts } -// reset 在一次成功推进后清零重试计数,使后续临时错误从第 1 次退避重新开始。 +// reset 在退避重试后成功推进(流/消息完整接收)时清零计数,使后续临时错误从第 1 次退避重新开始。 func (r *einoTransientRunRetrier) reset() { r.attempts = 0 } func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int { diff --git a/internal/multiagent/eino_transient_retry_test.go b/internal/multiagent/eino_transient_retry_test.go index 0b2a5c69..39b9c81c 100644 --- a/internal/multiagent/eino_transient_retry_test.go +++ b/internal/multiagent/eino_transient_retry_test.go @@ -105,6 +105,32 @@ func TestEinoTransientRunRetrierReset(t *testing.T) { } } +func TestEinoTransientRunRetrierConsecutiveFailures(t *testing.T) { + t.Parallel() + r := newEinoTransientRunRetrier(einoTransientRunRetryPolicy{maxAttempts: 10, maxBackoff: 30 * time.Second}) + ctx := context.Background() + runErr := errors.New("internal server error") + args := &einoADKRunLoopArgs{} + base := []adk.Message{schema.UserMessage("hi")} + + for want := 1; want <= 3; want++ { + restarted, _, _, _, err := r.tryRetry(ctx, runErr, args, base, nil, len(base)) + if err != nil { + t.Fatalf("tryRetry attempt %d: %v", want, err) + } + if !restarted { + t.Fatalf("tryRetry attempt %d: want restarted", want) + } + if got := r.attempt(); got != want { + t.Fatalf("after failure %d: attempt=%d, want %d", want, got, want) + } + } + r.reset() + if r.attempt() != 0 { + t.Fatalf("after successful recovery reset: attempt=%d, want 0", r.attempt()) + } +} + func TestAppendUserMessageIfNeeded(t *testing.T) { t.Parallel() msgs := []adk.Message{schema.UserMessage("old task")}