From cb4900c61d6ed312bf0fa1b792941dac71a4f6f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Mon, 29 Jun 2026 16:51:54 +0800 Subject: [PATCH] Add files via upload --- internal/config/config.go | 2 + .../handler/eino_empty_response_continue.go | 83 +++++++++++++++++++ internal/handler/eino_single_agent.go | 8 ++ internal/handler/multi_agent.go | 8 ++ 4 files changed, 101 insertions(+) create mode 100644 internal/handler/eino_empty_response_continue.go diff --git a/internal/config/config.go b/internal/config/config.go index e4ca310b..6ed9a118 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -283,6 +283,8 @@ type MultiAgentEinoMiddlewareConfig struct { RunRetryMaxAttempts int `yaml:"run_retry_max_attempts,omitempty" json:"run_retry_max_attempts,omitempty"` // RunRetryMaxBackoffSec 单次退避上限秒数;0=默认 30。 RunRetryMaxBackoffSec int `yaml:"run_retry_max_backoff_sec,omitempty" json:"run_retry_max_backoff_sec,omitempty"` + // EmptyResponseContinueMaxAttempts Run 成功但未捕获助手正文时 Handler 层退避续跑次数;0=默认 5。 + EmptyResponseContinueMaxAttempts int `yaml:"empty_response_continue_max_attempts,omitempty" json:"empty_response_continue_max_attempts,omitempty"` // TaskToolDescriptionPrefix when non-empty sets deep.Config TaskToolDescriptionGenerator (sub-agent names appended). TaskToolDescriptionPrefix string `yaml:"task_tool_description_prefix,omitempty" json:"task_tool_description_prefix,omitempty"` } diff --git a/internal/handler/eino_empty_response_continue.go b/internal/handler/eino_empty_response_continue.go new file mode 100644 index 00000000..f63e3b1c --- /dev/null +++ b/internal/handler/eino_empty_response_continue.go @@ -0,0 +1,83 @@ +package handler + +import ( + "context" + "fmt" + "time" + + "cyberstrike-ai/internal/agent" + "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/multiagent" + + "go.uber.org/zap" +) + +// rebindEinoRunningTask 中断并继续 / 空正文续跑:重建 cancel 链与超时 ctx,保持任务 running。 +func (h *AgentHandler) rebindEinoRunningTask(conversationID string, timeoutCancel context.CancelFunc) (context.Context, context.CancelCauseFunc, context.Context, context.CancelFunc) { + if timeoutCancel != nil { + timeoutCancel() + } + baseCtx, cancelWithCause := context.WithCancelCause(context.Background()) + h.tasks.BindTaskCancel(conversationID, cancelWithCause) + taskCtx, newTimeoutCancel := context.WithTimeout(baseCtx, 600*time.Minute) + h.tasks.UpdateTaskStatus(conversationID, "running") + return baseCtx, cancelWithCause, taskCtx, newTimeoutCancel +} + +// tryContinueOnEinoEmptyResponse Run 成功但 Response 为 emptyHint 时退避续跑;true 表示已准备下一段 Run。 +func (h *AgentHandler) tryContinueOnEinoEmptyResponse( + taskCtx context.Context, + mw *config.MultiAgentEinoMiddlewareConfig, + conversationID string, + result *multiagent.RunResult, + attempt *int, + curHistory *[]agent.ChatMessage, + curFinalMessage *string, + progressCallback func(eventType, message string, data interface{}), +) bool { + if result == nil || !multiagent.IsEinoEmptyResponseResult(result) || !multiagent.HasEinoResumeTrace(result) { + return false + } + maxAttempts := multiagent.EmptyResponseContinueMaxAttemptsFromConfig(mw) + if *attempt >= maxAttempts { + if h.logger != nil { + h.logger.Warn("eino empty response continue exhausted", + zap.String("conversationId", conversationID), + zap.Int("maxAttempts", maxAttempts)) + } + return false + } + *attempt++ + h.persistEinoAgentTraceForResume(conversationID, result) + + backoff := multiagent.EmptyResponseContinueBackoff(*attempt-1, mw) + waitMsg := fmt.Sprintf("会话已结束但未捕获到助手正文,%d 秒后第 %d/%d 次自动续跑…", + int(backoff.Seconds()), *attempt, maxAttempts) + if progressCallback != nil { + progressCallback("eino_empty_response_continue", waitMsg, map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "attempt": *attempt, + "maxAttempts": maxAttempts, + "backoffSec": int(backoff.Seconds()), + }) + } + select { + case <-taskCtx.Done(): + return false + case <-time.After(backoff): + } + + inject := multiagent.FormatEmptyResponseContinueUserMessage() + h.applyEinoTraceResumeSegment(conversationID, result, curHistory, curFinalMessage, inject) + if progressCallback != nil { + progressCallback("eino_empty_response_continue", "已恢复上下文,正在续跑…", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "attempt": *attempt, + "maxAttempts": maxAttempts, + "contextSource": "empty_response_continue", + }) + } + return true +} diff --git a/internal/handler/eino_single_agent.go b/internal/handler/eino_single_agent.go index 54b8e978..19b697be 100644 --- a/internal/handler/eino_single_agent.go +++ b/internal/handler/eino_single_agent.go @@ -178,6 +178,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { var cumulativeMCPExecutionIDs []string // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int + var emptyResponseContinueAttempt int for { segmentMainIterationMax := 0 @@ -239,6 +240,13 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) { } if runErr == nil { + mw := &h.config.MultiAgent.EinoMiddleware + if h.tryContinueOnEinoEmptyResponse(taskCtx, mw, conversationID, result, &emptyResponseContinueAttempt, &curHistory, &curFinalMessage, progressCallback) { + mainIterationOffset += segmentMainIterationMax + timeoutCancel() + baseCtx, cancelWithCause, taskCtx, timeoutCancel = h.rebindEinoRunningTask(conversationID, timeoutCancel) + continue + } timeoutCancel() break } diff --git a/internal/handler/multi_agent.go b/internal/handler/multi_agent.go index cbf97ef8..1c7828ac 100644 --- a/internal/handler/multi_agent.go +++ b/internal/handler/multi_agent.go @@ -188,6 +188,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { var cumulativeMCPExecutionIDs []string // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 var mainIterationOffset int + var emptyResponseContinueAttempt int for { segmentMainIterationMax := 0 @@ -251,6 +252,13 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) { } if runErr == nil { + mw := &h.config.MultiAgent.EinoMiddleware + if h.tryContinueOnEinoEmptyResponse(taskCtx, mw, conversationID, result, &emptyResponseContinueAttempt, &curHistory, &curFinalMessage, progressCallback) { + mainIterationOffset += segmentMainIterationMax + timeoutCancel() + baseCtx, cancelWithCause, taskCtx, timeoutCancel = h.rebindEinoRunningTask(conversationID, timeoutCancel) + continue + } timeoutCancel() break }