diff --git a/internal/handler/agent.go b/internal/handler/agent.go index bcdde35a..be616ae9 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -842,6 +842,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con conversationID, curMsg, curHist, roleTools, progressCallback, nil, ) if errMA == nil { + // 成功后重置 transient 重试窗口,下一次分段从第 1 次重试开始。 + transientRunAttempts = 0 break } if handled, _ := h.handleEinoTransientRetryContinue( @@ -873,6 +875,8 @@ func (h *AgentHandler) ProcessMessageForRobot(ctx context.Context, platform, con h.agentsMarkdownDir, robotMode, nil, ) if errMA == nil { + // 成功后重置 transient 重试窗口,下一次分段从第 1 次重试开始。 + transientRunAttempts = 0 break } if handled, _ := h.handleEinoTransientRetryContinue( diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index 6fcf2366..2a3e644c 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -178,9 +178,40 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { var cumulativeMCPExecutionIDs []string var transientRunAttempts int + // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 + var mainIterationOffset int for { - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + segmentMainIterationMax := 0 + rawProgressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + progressCallback := func(eventType, message string, data interface{}) { + if eventType == "iteration" { + if m, ok := data.(map[string]interface{}); ok { + if scope, _ := m["einoScope"].(string); scope == "main" { + raw := 0 + switch v := m["iteration"].(type) { + case int: + raw = v + case int32: + raw = int(v) + case int64: + raw = int(v) + case float64: + raw = int(v) + case float32: + raw = int(v) + } + if raw > 0 { + if raw > segmentMainIterationMax { + segmentMainIterationMax = raw + } + m["iteration"] = raw + mainIterationOffset + } + } + } + } + rawProgressCallback(eventType, message, data) + } taskCtxLoop := mcp.WithMCPConversationID(taskCtx, conversationID) taskCtxLoop = mcp.WithToolRunRegistry(taskCtxLoop, h.tasks) taskCtxLoop = multiagent.WithHITLToolInterceptor(taskCtxLoop, func(ctx context.Context, toolName, arguments string) (string, error) { @@ -206,6 +237,8 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { } if runErr == nil { + // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 + transientRunAttempts = 0 timeoutCancel() break } @@ -216,6 +249,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, ) if handled { + mainIterationOffset += segmentMainIterationMax timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause) @@ -250,6 +284,9 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { "conversationId": conversationID, "source": "interrupt_continue", }) + mainIterationOffset += segmentMainIterationMax + // 非临时错误分段续跑(用户中断并继续)时,清空 transient 计数,避免跨分段累加。 + transientRunAttempts = 0 timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause) diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index e1b0ebd4..b83a7a6b 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -188,9 +188,40 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { // 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表 var cumulativeMCPExecutionIDs []string var transientRunAttempts int + // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 + var mainIterationOffset int for { - progressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + segmentMainIterationMax := 0 + rawProgressCallback := h.createProgressCallback(taskCtx, cancelWithCause, conversationID, assistantMessageID, sendEvent) + progressCallback := func(eventType, message string, data interface{}) { + if eventType == "iteration" { + if m, ok := data.(map[string]interface{}); ok { + if scope, _ := m["einoScope"].(string); scope == "main" { + raw := 0 + switch v := m["iteration"].(type) { + case int: + raw = v + case int32: + raw = int(v) + case int64: + raw = int(v) + case float64: + raw = int(v) + case float32: + raw = int(v) + } + if raw > 0 { + if raw > segmentMainIterationMax { + segmentMainIterationMax = raw + } + m["iteration"] = raw + mainIterationOffset + } + } + } + } + rawProgressCallback(eventType, message, data) + } taskCtxLoop := mcp.WithMCPConversationID(taskCtx, conversationID) taskCtxLoop = mcp.WithToolRunRegistry(taskCtxLoop, h.tasks) taskCtxLoop = multiagent.WithHITLToolInterceptor(taskCtxLoop, func(ctx context.Context, toolName, arguments string) (string, error) { @@ -218,6 +249,8 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { } if runErr == nil { + // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 + transientRunAttempts = 0 timeoutCancel() break } @@ -228,6 +261,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, ) if handled { + mainIterationOffset += segmentMainIterationMax timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause) @@ -262,6 +296,9 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { "conversationId": conversationID, "source": "interrupt_continue", }) + mainIterationOffset += segmentMainIterationMax + // 非临时错误分段续跑(用户中断并继续)时,清空 transient 计数,避免跨分段累加。 + transientRunAttempts = 0 timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause)