From 6365de701850d44337427717cdb761afda3af4e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 11 Jun 2026 11:50:31 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 26 ++++++++ internal/handler/eino_resume_segment.go | 58 +++++++++++++++++ internal/handler/eino_single_agent.go | 82 ++++++++++++++++++----- internal/handler/multi_agent.go | 86 ++++++++++++++++++++----- 4 files changed, 220 insertions(+), 32 deletions(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index b680853f..a1e6d500 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -637,13 +637,26 @@ func (h *AgentHandler) runRobotEinoSingleWithRetry( var resultMA *multiagent.RunResult var errMA error var transientRunAttempts int + var emptyResponseAttempts int for { resultMA, errMA = multiagent.RunEinoSingleChatModelAgent( taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), ) + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts, + &curHist, &curMsg, segmentUserMessage, progressCallback, nil, + ) + if exhaustedEmpty { + errMA = nil + break + } + if handledEmpty { + continue + } if errMA == nil { transientRunAttempts = 0 + emptyResponseAttempts = 0 break } if handled, _ := h.handleEinoTransientRetryContinue( @@ -673,14 +686,27 @@ func (h *AgentHandler) runRobotMultiAgentWithRetry( var resultMA *multiagent.RunResult var errMA error var transientRunAttempts int + var emptyResponseAttempts int for { resultMA, errMA = multiagent.RunDeepAgent( taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, conversationID, curMsg, curHist, roleTools, progressCallback, h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID), ) + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts, + &curHist, &curMsg, segmentUserMessage, progressCallback, nil, + ) + if exhaustedEmpty { + errMA = nil + break + } + if handledEmpty { + continue + } if errMA == nil { transientRunAttempts = 0 + emptyResponseAttempts = 0 break } if handled, _ := h.handleEinoTransientRetryContinue( diff --git a/internal/handler/eino_resume_segment.go b/internal/handler/eino_resume_segment.go index a72a1d61..dbd26af9 100644 --- a/internal/handler/eino_resume_segment.go +++ b/internal/handler/eino_resume_segment.go @@ -9,6 +9,8 @@ import ( "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/multiagent" + + "go.uber.org/zap" ) func (h *AgentHandler) einoRunRetryMaxAttempts() int { @@ -120,3 +122,59 @@ func (h *AgentHandler) handleEinoTransientRetryContinue( } return true, nil } + +// handleEinoEmptyResponseContinue 在 SSE 任务循环内处理「正常结束但无助手正文」;返回 exhausted=true 时由外层按成功结束(保留占位文案)。 +// 与临时错误重试一致:仅恢复轨迹并保留本请求原始 user 文案,不向模型注入续跑说明。 +func (h *AgentHandler) handleEinoEmptyResponseContinue( + baseCtx context.Context, + conversationID string, + result *multiagent.RunResult, + runErr error, + emptyResponseAttempts *int, + curHistory *[]agent.ChatMessage, + curFinalMessage *string, + segmentUserMessage string, + progressCallback func(eventType, message string, data interface{}), + sendProgress func(msg string, extra map[string]interface{}), +) (handled bool, exhausted bool) { + if !errors.Is(runErr, multiagent.ErrEmptyResponseContinue) { + return false, false + } + maxAttempts := h.einoRunRetryMaxAttempts() + *emptyResponseAttempts++ + if *emptyResponseAttempts > maxAttempts { + if h.logger != nil { + h.logger.Warn("eino empty response auto resume exhausted", + zap.String("conversationId", conversationID), + zap.Int("maxAttempts", maxAttempts)) + } + if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { + h.persistEinoAgentTraceForResume(conversationID, result) + } + return false, true + } + attemptNo := *emptyResponseAttempts + if h.logger != nil { + h.logger.Info("eino empty response, auto resume from trace", + zap.String("conversationId", conversationID), + zap.Int("attempt", attemptNo), + zap.Int("maxAttempts", maxAttempts)) + } + if progressCallback != nil { + progressCallback("eino_empty_response_continue", fmt.Sprintf("未捕获到助手正文,正在基于轨迹自动续跑(%d/%d)…", attemptNo, maxAttempts), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "attempt": attemptNo, + "maxAttempts": maxAttempts, + "resumeKind": "trace_segment", + }) + } + h.applyEinoTransientRetrySegment(conversationID, result, curHistory, curFinalMessage, segmentUserMessage) + if sendProgress != nil { + sendProgress("已恢复上下文,正在继续推理…", map[string]interface{}{ + "conversationId": conversationID, + "source": "empty_response_continue", + }) + } + return true, false +} diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index fbd488c6..3ce88ded 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -178,6 +178,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { var cumulativeMCPExecutionIDs []string var transientRunAttempts int + var emptyResponseAttempts int // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int @@ -237,9 +238,32 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) } + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + baseCtx, conversationID, result, runErr, &emptyResponseAttempts, + &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, + func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, + ) + if exhaustedEmpty { + runErr = nil + transientRunAttempts = 0 + timeoutCancel() + break + } + if handledEmpty { + mainIterationOffset += segmentMainIterationMax + transientRunAttempts = 0 + timeoutCancel() + baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) + h.tasks.BindTaskCancel(conversationID, cancelWithCause) + taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) + h.tasks.UpdateTaskStatus(conversationID, "running") + continue + } + if runErr == nil { // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 transientRunAttempts = 0 + emptyResponseAttempts = 0 timeoutCancel() break } @@ -418,21 +442,49 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { return } - result, runErr := multiagent.RunEinoSingleChatModelAgent( - taskCtx, - h.config, - &h.config.MultiAgent, - h.agent, - h.logger, - prep.ConversationID, - prep.FinalMessage, - prep.History, - prep.RoleTools, - progressCallback, - chatReasoningToClientIntent(req.Reasoning), - h.projectBlackboardBlock(prep.ConversationID), - ) - if runErr != nil { + curHist := prep.History + curMsg := prep.FinalMessage + var result *multiagent.RunResult + var runErr error + var transientRunAttempts int + var emptyResponseAttempts int + for { + result, runErr = multiagent.RunEinoSingleChatModelAgent( + taskCtx, + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + prep.ConversationID, + curMsg, + curHist, + prep.RoleTools, + progressCallback, + chatReasoningToClientIntent(req.Reasoning), + h.projectBlackboardBlock(prep.ConversationID), + ) + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts, + &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, + ) + if exhaustedEmpty { + runErr = nil + break + } + if handledEmpty { + continue + } + if runErr == nil { + break + } + if handled, fatalErr := h.handleEinoTransientRetryContinue( + baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts, + &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, + ); handled { + continue + } else if fatalErr != nil { + runErr = fatalErr + } if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(prep.ConversationID, result) } diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index 2e76116a..3561e851 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -188,6 +188,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { // 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表 var cumulativeMCPExecutionIDs []string var transientRunAttempts int + var emptyResponseAttempts int // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int @@ -249,9 +250,32 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) } + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + baseCtx, conversationID, result, runErr, &emptyResponseAttempts, + &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, + func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, + ) + if exhaustedEmpty { + runErr = nil + transientRunAttempts = 0 + timeoutCancel() + break + } + if handledEmpty { + mainIterationOffset += segmentMainIterationMax + transientRunAttempts = 0 + timeoutCancel() + baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) + h.tasks.BindTaskCancel(conversationID, cancelWithCause) + taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) + h.tasks.UpdateTaskStatus(conversationID, "running") + continue + } + if runErr == nil { // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 transientRunAttempts = 0 + emptyResponseAttempts = 0 timeoutCancel() break } @@ -430,23 +454,51 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { return h.interceptHITLForEinoTool(ctx, cancelWithCause, prep.ConversationID, prep.AssistantMessageID, nil, toolName, arguments) }) - result, runErr := multiagent.RunDeepAgent( - taskCtx, - h.config, - &h.config.MultiAgent, - h.agent, - h.logger, - prep.ConversationID, - prep.FinalMessage, - prep.History, - prep.RoleTools, - progressCallback, - h.agentsMarkdownDir, - strings.TrimSpace(req.Orchestration), - chatReasoningToClientIntent(req.Reasoning), - h.projectBlackboardBlock(prep.ConversationID), - ) - if runErr != nil { + curHist := prep.History + curMsg := prep.FinalMessage + var result *multiagent.RunResult + var runErr error + var transientRunAttempts int + var emptyResponseAttempts int + for { + result, runErr = multiagent.RunDeepAgent( + taskCtx, + h.config, + &h.config.MultiAgent, + h.agent, + h.logger, + prep.ConversationID, + curMsg, + curHist, + prep.RoleTools, + progressCallback, + h.agentsMarkdownDir, + strings.TrimSpace(req.Orchestration), + chatReasoningToClientIntent(req.Reasoning), + h.projectBlackboardBlock(prep.ConversationID), + ) + handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( + baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts, + &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, + ) + if exhaustedEmpty { + runErr = nil + break + } + if handledEmpty { + continue + } + if runErr == nil { + break + } + if handled, fatalErr := h.handleEinoTransientRetryContinue( + baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts, + &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, + ); handled { + continue + } else if fatalErr != nil { + runErr = fatalErr + } if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(prep.ConversationID, result) }