From e22382aab0ae2041ddc138e470c8c7bfb4f0b054 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Mon, 22 Jun 2026 23:29:57 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 80 ++----------- internal/handler/eino_resume_segment.go | 153 ------------------------ internal/handler/eino_single_agent.go | 69 ----------- internal/handler/multi_agent.go | 69 ----------- 4 files changed, 11 insertions(+), 360 deletions(-) diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 88a47a9a..f6e6eab8 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -631,40 +631,11 @@ func (h *AgentHandler) runRobotEinoSingleWithRetry( assistantMessageID string, taskStatus *string, ) (string, string, error) { - curHist := history - curMsg := finalMessage - segmentUserMessage := finalMessage - var resultMA *multiagent.RunResult - var errMA error - var transientRunAttempts int - var emptyResponseAttempts int - for { - resultMA, errMA = multiagent.RunEinoSingleChatModelAgent( - taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, - conversationID, h.conversationProjectID(conversationID), curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), - ) - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ) - if exhaustedEmpty { - errMA = nil - break - } - if handledEmpty { - continue - } - if errMA == nil { - transientRunAttempts = 0 - emptyResponseAttempts = 0 - break - } - if handled, _ := h.handleEinoTransientRetryContinue( - taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ); handled { - continue - } + resultMA, errMA := multiagent.RunEinoSingleChatModelAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, + conversationID, h.conversationProjectID(conversationID), finalMessage, history, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), + ) + if errMA != nil { *taskStatus = "failed" return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) } @@ -680,41 +651,12 @@ func (h *AgentHandler) runRobotMultiAgentWithRetry( assistantMessageID string, taskStatus *string, ) (string, string, error) { - curHist := history - curMsg := finalMessage - segmentUserMessage := finalMessage - var resultMA *multiagent.RunResult - var errMA error - var transientRunAttempts int - var emptyResponseAttempts int - for { - resultMA, errMA = multiagent.RunDeepAgent( - taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, - conversationID, h.conversationProjectID(conversationID), curMsg, curHist, roleTools, progressCallback, - h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID), - ) - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ) - if exhaustedEmpty { - errMA = nil - break - } - if handledEmpty { - continue - } - if errMA == nil { - transientRunAttempts = 0 - emptyResponseAttempts = 0 - break - } - if handled, _ := h.handleEinoTransientRetryContinue( - taskCtx, conversationID, resultMA, errMA, &transientRunAttempts, - &curHist, &curMsg, segmentUserMessage, progressCallback, nil, - ); handled { - continue - } + resultMA, errMA := multiagent.RunDeepAgent( + taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, + conversationID, h.conversationProjectID(conversationID), finalMessage, history, roleTools, progressCallback, + h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID), + ) + if errMA != nil { *taskStatus = "failed" return h.finalizeRobotAgentError(taskCtx, assistantMessageID, conversationID, resultMA, errMA) } diff --git a/internal/handler/eino_resume_segment.go b/internal/handler/eino_resume_segment.go index dbd26af9..811162a1 100644 --- a/internal/handler/eino_resume_segment.go +++ b/internal/handler/eino_resume_segment.go @@ -2,31 +2,11 @@ package handler import ( "context" - "errors" - "fmt" - "strings" - "time" "cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/multiagent" - - "go.uber.org/zap" ) -func (h *AgentHandler) einoRunRetryMaxAttempts() int { - if h.config != nil { - return multiagent.RunRetryMaxAttemptsFromConfig(&h.config.MultiAgent.EinoMiddleware) - } - return multiagent.RunRetryMaxAttemptsFromConfig(nil) -} - -func (h *AgentHandler) einoRunRetryMaxBackoffSec() int { - if h.config != nil && h.config.MultiAgent.EinoMiddleware.RunRetryMaxBackoffSec > 0 { - return h.config.MultiAgent.EinoMiddleware.RunRetryMaxBackoffSec - } - return 0 -} - // applyEinoTraceResumeSegment 中断并继续:persist last_react_* → loadHistory,可选替换下一段 user 文案。 func (h *AgentHandler) applyEinoTraceResumeSegment( conversationID string, @@ -45,136 +25,3 @@ func (h *AgentHandler) applyEinoTraceResumeSegment( *curFinalMessage = segmentUserMessage } } - -// applyEinoTransientRetrySegment 临时错误重试:恢复轨迹并保留本请求原始 user 文案(不注入续跑说明)。 -// segmentUserMessage 为本轮 HTTP 请求开始时用户发送的内容,避免因清空 finalMessage 而丢失「你好」等短句。 -func (h *AgentHandler) applyEinoTransientRetrySegment( - conversationID string, - result *multiagent.RunResult, - curHistory *[]agent.ChatMessage, - curFinalMessage *string, - segmentUserMessage string, -) { - if shouldPersistEinoAgentTraceAfterRunError(context.Background()) { - h.persistEinoAgentTraceForResume(conversationID, result) - } - if hist, err := h.loadHistoryFromAgentTrace(conversationID); err == nil && len(hist) > 0 { - *curHistory = hist - } - if s := strings.TrimSpace(segmentUserMessage); s != "" { - *curFinalMessage = segmentUserMessage - } -} - -// handleEinoTransientRetryContinue 在 SSE 任务循环内处理临时错误重试;返回 true 表示外层 for 应 continue。 -func (h *AgentHandler) handleEinoTransientRetryContinue( - baseCtx context.Context, - conversationID string, - result *multiagent.RunResult, - runErr error, - transientAttempts *int, - curHistory *[]agent.ChatMessage, - curFinalMessage *string, - segmentUserMessage string, - progressCallback func(eventType, message string, data interface{}), - sendProgress func(msg string, extra map[string]interface{}), -) (handled bool, fatal error) { - if !errors.Is(runErr, multiagent.ErrTransientRetryContinue) { - return false, nil - } - maxAttempts := h.einoRunRetryMaxAttempts() - *transientAttempts++ - if *transientAttempts > maxAttempts { - if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { - h.persistEinoAgentTraceForResume(conversationID, result) - } - return false, errors.New("transient retry exhausted: " + runErr.Error()) - } - attemptNo := *transientAttempts - backoff := multiagent.TransientRetryBackoff(attemptNo-1, h.einoRunRetryMaxBackoffSec()) - if progressCallback != nil { - progressCallback("eino_run_retry", fmt.Sprintf("遇到临时错误,%d 秒后第 %d/%d 次重试…", int(backoff.Seconds()), attemptNo, maxAttempts), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "attempt": attemptNo, - "maxAttempts": maxAttempts, - "backoffSec": int(backoff.Seconds()), - }) - } - select { - case <-baseCtx.Done(): - return false, context.Cause(baseCtx) - case <-time.After(backoff): - } - h.applyEinoTransientRetrySegment(conversationID, result, curHistory, curFinalMessage, segmentUserMessage) - if progressCallback != nil { - progressCallback("eino_run_retry", "已恢复上下文,正在重试…", map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "attempt": attemptNo, - }) - } - if sendProgress != nil { - sendProgress("正在重试…", map[string]interface{}{ - "conversationId": conversationID, - "source": "transient_retry", - }) - } - return true, nil -} - -// handleEinoEmptyResponseContinue 在 SSE 任务循环内处理「正常结束但无助手正文」;返回 exhausted=true 时由外层按成功结束(保留占位文案)。 -// 与临时错误重试一致:仅恢复轨迹并保留本请求原始 user 文案,不向模型注入续跑说明。 -func (h *AgentHandler) handleEinoEmptyResponseContinue( - baseCtx context.Context, - conversationID string, - result *multiagent.RunResult, - runErr error, - emptyResponseAttempts *int, - curHistory *[]agent.ChatMessage, - curFinalMessage *string, - segmentUserMessage string, - progressCallback func(eventType, message string, data interface{}), - sendProgress func(msg string, extra map[string]interface{}), -) (handled bool, exhausted bool) { - if !errors.Is(runErr, multiagent.ErrEmptyResponseContinue) { - return false, false - } - maxAttempts := h.einoRunRetryMaxAttempts() - *emptyResponseAttempts++ - if *emptyResponseAttempts > maxAttempts { - if h.logger != nil { - h.logger.Warn("eino empty response auto resume exhausted", - zap.String("conversationId", conversationID), - zap.Int("maxAttempts", maxAttempts)) - } - if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { - h.persistEinoAgentTraceForResume(conversationID, result) - } - return false, true - } - attemptNo := *emptyResponseAttempts - if h.logger != nil { - h.logger.Info("eino empty response, auto resume from trace", - zap.String("conversationId", conversationID), - zap.Int("attempt", attemptNo), - zap.Int("maxAttempts", maxAttempts)) - } - if progressCallback != nil { - progressCallback("eino_empty_response_continue", fmt.Sprintf("未捕获到助手正文,正在基于轨迹自动续跑(%d/%d)…", attemptNo, maxAttempts), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - "attempt": attemptNo, - "maxAttempts": maxAttempts, - "resumeKind": "trace_segment", - }) - } - h.applyEinoTransientRetrySegment(conversationID, result, curHistory, curFinalMessage, segmentUserMessage) - if sendProgress != nil { - sendProgress("已恢复上下文,正在继续推理…", map[string]interface{}{ - "conversationId": conversationID, - "source": "empty_response_continue", - }) - } - return true, false -} diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index 87bb9fb0..a685676a 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -119,7 +119,6 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { var cancelWithCause context.CancelCauseFunc curFinalMessage := prep.FinalMessage - segmentUserMessage := prep.FinalMessage // 本请求原始用户句,临时重试时不得丢失 curHistory := prep.History roleTools := prep.RoleTools @@ -177,8 +176,6 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { taskOwned = true var cumulativeMCPExecutionIDs []string - var transientRunAttempts int - var emptyResponseAttempts int // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int @@ -240,54 +237,11 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) } - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - baseCtx, conversationID, result, runErr, &emptyResponseAttempts, - &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, - func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, - ) - if exhaustedEmpty { - runErr = nil - transientRunAttempts = 0 - timeoutCancel() - break - } - if handledEmpty { - mainIterationOffset += segmentMainIterationMax - transientRunAttempts = 0 - timeoutCancel() - baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) - h.tasks.BindTaskCancel(conversationID, cancelWithCause) - taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) - h.tasks.UpdateTaskStatus(conversationID, "running") - continue - } - if runErr == nil { - // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 - transientRunAttempts = 0 - emptyResponseAttempts = 0 timeoutCancel() break } - handled, fatalErr := h.handleEinoTransientRetryContinue( - baseCtx, conversationID, result, runErr, &transientRunAttempts, - &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, - func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, - ) - if handled { - mainIterationOffset += segmentMainIterationMax - timeoutCancel() - baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) - h.tasks.BindTaskCancel(conversationID, cancelWithCause) - taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) - h.tasks.UpdateTaskStatus(conversationID, "running") - continue - } - if fatalErr != nil { - runErr = fatalErr - } - cause := context.Cause(baseCtx) if errors.Is(cause, multiagent.ErrInterruptContinue) { if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { @@ -312,8 +266,6 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { "source": "interrupt_continue", }) mainIterationOffset += segmentMainIterationMax - // 非临时错误分段续跑(用户中断并继续)时,清空 transient 计数,避免跨分段累加。 - transientRunAttempts = 0 timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause) @@ -448,8 +400,6 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { curMsg := prep.FinalMessage var result *multiagent.RunResult var runErr error - var transientRunAttempts int - var emptyResponseAttempts int for { result, runErr = multiagent.RunEinoSingleChatModelAgent( taskCtx, @@ -467,28 +417,9 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) { chatReasoningToClientIntent(req.Reasoning), h.projectBlackboardBlock(prep.ConversationID), ) - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts, - &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, - ) - if exhaustedEmpty { - runErr = nil - break - } - if handledEmpty { - continue - } if runErr == nil { break } - if handled, fatalErr := h.handleEinoTransientRetryContinue( - baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts, - &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, - ); handled { - continue - } else if fatalErr != nil { - runErr = fatalErr - } if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(prep.ConversationID, result) } diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index 152bae7b..ab830918 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -136,7 +136,6 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { var cancelWithCause context.CancelCauseFunc curFinalMessage := prep.FinalMessage - segmentUserMessage := prep.FinalMessage // 本请求原始用户句,临时重试时不得丢失 curHistory := prep.History roleTools := prep.RoleTools orch := strings.TrimSpace(req.Orchestration) @@ -187,8 +186,6 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { // 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表 var cumulativeMCPExecutionIDs []string - var transientRunAttempts int - var emptyResponseAttempts int // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int @@ -252,54 +249,11 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) } - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - baseCtx, conversationID, result, runErr, &emptyResponseAttempts, - &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, - func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, - ) - if exhaustedEmpty { - runErr = nil - transientRunAttempts = 0 - timeoutCancel() - break - } - if handledEmpty { - mainIterationOffset += segmentMainIterationMax - transientRunAttempts = 0 - timeoutCancel() - baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) - h.tasks.BindTaskCancel(conversationID, cancelWithCause) - taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) - h.tasks.UpdateTaskStatus(conversationID, "running") - continue - } - if runErr == nil { - // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 - transientRunAttempts = 0 - emptyResponseAttempts = 0 timeoutCancel() break } - handled, fatalErr := h.handleEinoTransientRetryContinue( - baseCtx, conversationID, result, runErr, &transientRunAttempts, - &curHistory, &curFinalMessage, segmentUserMessage, progressCallback, - func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) }, - ) - if handled { - mainIterationOffset += segmentMainIterationMax - timeoutCancel() - baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) - h.tasks.BindTaskCancel(conversationID, cancelWithCause) - taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute) - h.tasks.UpdateTaskStatus(conversationID, "running") - continue - } - if fatalErr != nil { - runErr = fatalErr - } - cause := context.Cause(baseCtx) if errors.Is(cause, multiagent.ErrInterruptContinue) { if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { @@ -324,8 +278,6 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { "source": "interrupt_continue", }) mainIterationOffset += segmentMainIterationMax - // 非临时错误分段续跑(用户中断并继续)时,清空 transient 计数,避免跨分段累加。 - transientRunAttempts = 0 timeoutCancel() baseCtx, cancelWithCause = context.WithCancelCause(context.Background()) h.tasks.BindTaskCancel(conversationID, cancelWithCause) @@ -460,8 +412,6 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { curMsg := prep.FinalMessage var result *multiagent.RunResult var runErr error - var transientRunAttempts int - var emptyResponseAttempts int for { result, runErr = multiagent.RunDeepAgent( taskCtx, @@ -481,28 +431,9 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) { chatReasoningToClientIntent(req.Reasoning), h.projectBlackboardBlock(prep.ConversationID), ) - handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue( - baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts, - &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, - ) - if exhaustedEmpty { - runErr = nil - break - } - if handledEmpty { - continue - } if runErr == nil { break } - if handled, fatalErr := h.handleEinoTransientRetryContinue( - baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts, - &curHist, &curMsg, prep.FinalMessage, progressCallback, nil, - ); handled { - continue - } else if fatalErr != nil { - runErr = fatalErr - } if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { h.persistEinoAgentTraceForResume(prep.ConversationID, result) }