From 218e9b9880b88901a1e056954a4bfd82469b97d2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Sun, 10 May 2026 21:36:28 +0800 Subject: [PATCH] Add files via upload --- internal/agent/agent.go | 6 + internal/multiagent/eino_adk_run_loop.go | 233 ++++++++++++++--------- internal/multiagent/interrupt.go | 7 + internal/multiagent/runner.go | 107 +++++------ 4 files changed, 208 insertions(+), 145 deletions(-) create mode 100644 internal/multiagent/interrupt.go diff --git a/internal/agent/agent.go b/internal/agent/agent.go index efcbb899..00105209 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -193,6 +193,8 @@ type ChatMessage struct { Content string `json:"content,omitempty"` ToolCalls []ToolCall `json:"tool_calls,omitempty"` ToolCallID string `json:"tool_call_id,omitempty"` + // ToolName 仅 tool 角色:从 Eino/轨迹 JSON 的 name 或 tool_name 恢复,供续跑构造 ToolMessage。 + ToolName string `json:"tool_name,omitempty"` } // MarshalJSON 自定义JSON序列化,将tool_calls中的arguments转换为JSON字符串 @@ -211,6 +213,9 @@ func (cm ChatMessage) MarshalJSON() ([]byte, error) { if cm.ToolCallID != "" { aux["tool_call_id"] = cm.ToolCallID } + if cm.ToolName != "" { + aux["tool_name"] = cm.ToolName + } // 转换tool_calls,将arguments转换为JSON字符串 if len(cm.ToolCalls) > 0 { @@ -438,6 +443,7 @@ func (a *Agent) AgentLoopWithProgress(ctx context.Context, userInput string, his Content: msg.Content, ToolCalls: msg.ToolCalls, ToolCallID: msg.ToolCallID, + ToolName: msg.ToolName, }) addedCount++ contentPreview := msg.Content diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 10e46cc8..5306c02e 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -40,6 +40,13 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) { return current + incoming, incoming } +func isInterruptContinue(ctx context.Context) bool { + if ctx == nil { + return false + } + return errors.Is(context.Cause(ctx), ErrInterruptContinue) +} + func isEinoIterationLimitError(err error) bool { if err == nil { return false @@ -409,10 +416,18 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs case <-ctx.Done(): flushAllPendingAsFailed(ctx.Err()) if progress != nil { - progress("error", "Request cancelled / 请求已取消", map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - }) + if isInterruptContinue(ctx) { + progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "kind": "interrupt_continue", + }) + } else { + progress("error", "Request cancelled / 请求已取消", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } } return takePartial(ctx.Err()) default: @@ -426,10 +441,18 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if ctxErr := ctx.Err(); ctxErr != nil { flushAllPendingAsFailed(ctxErr) if progress != nil { - progress("error", ctxErr.Error(), map[string]interface{}{ - "conversationId": conversationID, - "source": "eino", - }) + if isInterruptContinue(ctx) { + progress("progress", "已暂停当前输出,正在合并用户补充并继续…", map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + "kind": "interrupt_continue", + }) + } else { + progress("error", ctxErr.Error(), map[string]interface{}{ + "conversationId": conversationID, + "source": "eino", + }) + } } return takePartial(ctxErr) } @@ -517,103 +540,128 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重 var reasoningBuf string var streamRecvErr error + type streamMsg struct { + chunk *schema.Message + err error + } + recvCh := make(chan streamMsg, 8) + go func() { + defer close(recvCh) + for { + ch, rerr := mv.MessageStream.Recv() + recvCh <- streamMsg{chunk: ch, err: rerr} + if rerr != nil { + return + } + } + }() + streamRecvLoop: for { - chunk, rerr := mv.MessageStream.Recv() - if rerr != nil { - if errors.Is(rerr, io.EOF) { - break + select { + case <-ctx.Done(): + streamRecvErr = ctx.Err() + break streamRecvLoop + case sm, ok := <-recvCh: + if !ok { + break streamRecvLoop } - if logger != nil { - logger.Warn("eino stream recv error, flushing incomplete stream", - zap.Error(rerr), - zap.String("agent", ev.AgentName), - zap.Int("toolFragments", len(toolStreamFragments))) + chunk, rerr := sm.chunk, sm.err + if rerr != nil { + if errors.Is(rerr, io.EOF) { + break streamRecvLoop + } + if logger != nil { + logger.Warn("eino stream recv error, flushing incomplete stream", + zap.Error(rerr), + zap.String("agent", ev.AgentName), + zap.Int("toolFragments", len(toolStreamFragments))) + } + streamRecvErr = rerr + break streamRecvLoop } - streamRecvErr = rerr - break - } - if chunk == nil { - continue - } - if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { - var reasoningDelta string - reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent) - if reasoningDelta != "" { - if reasoningStreamID == "" { - reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) - progress("thinking_stream_start", " ", map[string]interface{}{ - "streamId": reasoningStreamID, - "source": "eino", - "einoAgent": ev.AgentName, - "einoRole": einoRoleTag(ev.AgentName), - "orchestration": orchMode, + if chunk == nil { + continue + } + if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" { + var reasoningDelta string + reasoningBuf, reasoningDelta = normalizeStreamingDelta(reasoningBuf, chunk.ReasoningContent) + if reasoningDelta != "" { + if reasoningStreamID == "" { + reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1)) + progress("thinking_stream_start", " ", map[string]interface{}{ + "streamId": reasoningStreamID, + "source": "eino", + "einoAgent": ev.AgentName, + "einoRole": einoRoleTag(ev.AgentName), + "orchestration": orchMode, + }) + } + progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{ + "streamId": reasoningStreamID, }) } - progress("thinking_stream_delta", reasoningDelta, map[string]interface{}{ - "streamId": reasoningStreamID, - }) } - } - if chunk.Content != "" { - if progress != nil && streamsMainAssistant(ev.AgentName) { - var contentDelta string - mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content) - if contentDelta != "" { - if mainAssistDupTarget == "" { - executeStdoutDupMu.Lock() - if pendingExecuteStdoutDup != "" { - mainAssistDupTarget = pendingExecuteStdoutDup + if chunk.Content != "" { + if progress != nil && streamsMainAssistant(ev.AgentName) { + var contentDelta string + mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content) + if contentDelta != "" { + if mainAssistDupTarget == "" { + executeStdoutDupMu.Lock() + if pendingExecuteStdoutDup != "" { + mainAssistDupTarget = pendingExecuteStdoutDup + } + executeStdoutDupMu.Unlock() } - executeStdoutDupMu.Unlock() - } - if mainAssistDupTarget != "" { - // 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流 - } else { - if !streamHeaderSent { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, + if mainAssistDupTarget != "" { + // 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流 + } else { + if !streamHeaderSent { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + streamHeaderSent = true + } + progress("response_delta", contentDelta, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, }) - streamHeaderSent = true } - progress("response_delta", contentDelta, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) } - } - } else if !streamsMainAssistant(ev.AgentName) { - var subDelta string - subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content) - if subDelta != "" { - if progress != nil { - if subReplyStreamID == "" { - subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) - progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + } else if !streamsMainAssistant(ev.AgentName) { + var subDelta string + subAssistantBuf, subDelta = normalizeStreamingDelta(subAssistantBuf, chunk.Content) + if subDelta != "" { + if progress != nil { + if subReplyStreamID == "" { + subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1)) + progress("eino_agent_reply_stream_start", "", map[string]interface{}{ + "streamId": subReplyStreamID, + "einoAgent": ev.AgentName, + "einoRole": "sub", + "conversationId": conversationID, + "source": "eino", + }) + } + progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{ "streamId": subReplyStreamID, - "einoAgent": ev.AgentName, - "einoRole": "sub", "conversationId": conversationID, - "source": "eino", }) } - progress("eino_agent_reply_stream_delta", subDelta, map[string]interface{}{ - "streamId": subReplyStreamID, - "conversationId": conversationID, - }) } } } - } - if len(chunk.ToolCalls) > 0 { - toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) + if len(chunk.ToolCalls) > 0 { + toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...) + } } } if streamsMainAssistant(ev.AgentName) { @@ -683,10 +731,17 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } var lastToolChunk *schema.Message if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { - lastToolChunk = &schema.Message{ToolCalls: merged} + lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged}) } tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + // 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。 + if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 { + runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls)) + } if streamRecvErr != nil { + if isInterruptContinue(ctx) { + return takePartial(streamRecvErr) + } if progress != nil { progress("eino_stream_error", streamRecvErr.Error(), map[string]interface{}{ "conversationId": conversationID, diff --git a/internal/multiagent/interrupt.go b/internal/multiagent/interrupt.go new file mode 100644 index 00000000..500e300f --- /dev/null +++ b/internal/multiagent/interrupt.go @@ -0,0 +1,7 @@ +package multiagent + +import "errors" + +// ErrInterruptContinue 作为 context.CancelCause 使用:用户选择「中断并继续」且当前无进行中的 MCP 工具时, +// 取消当前推理/流式输出,并在同一会话任务内携带用户补充说明自动续跑下一轮(类似 Hermes 式人机回合)。 +var ErrInterruptContinue = errors.New("agent interrupt: continue with user-supplied context") diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index 3483ebeb..13e49b73 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -574,78 +574,73 @@ func RunDeepAgent( }, baseMsgs) } +func chatToolCallsToSchema(tcs []agent.ToolCall) []schema.ToolCall { + if len(tcs) == 0 { + return nil + } + out := make([]schema.ToolCall, 0, len(tcs)) + for _, tc := range tcs { + if strings.TrimSpace(tc.ID) == "" { + continue + } + argsStr := "" + if tc.Function.Arguments != nil { + b, err := json.Marshal(tc.Function.Arguments) + if err == nil { + argsStr = string(b) + } + } + typ := tc.Type + if typ == "" { + typ = "function" + } + out = append(out, schema.ToolCall{ + ID: tc.ID, + Type: typ, + Function: schema.FunctionCall{ + Name: tc.Function.Name, + Arguments: argsStr, + }, + }) + } + return out +} + +// historyToMessages 将轨迹恢复的 ChatMessage 转为 Eino ADK 消息:**不裁剪条数、不按 token 预算截断**, +// 并保留 user / assistant(含仅 tool_calls)/ tool,与库中 last_react 轨迹一致。 func historyToMessages(history []agent.ChatMessage, appCfg *config.Config, mwCfg *config.MultiAgentEinoMiddlewareConfig) []adk.Message { + _ = appCfg + _ = mwCfg if len(history) == 0 { return nil } - // Keep a bounded tail first; then enforce a token budget. - const maxHistoryMessages = 200 - start := 0 - if len(history) > maxHistoryMessages { - start = len(history) - maxHistoryMessages - } - raw := make([]adk.Message, 0, len(history[start:])) - for _, h := range history[start:] { - switch h.Role { + raw := make([]adk.Message, 0, len(history)) + for _, h := range history { + role := strings.ToLower(strings.TrimSpace(h.Role)) + switch role { case "user": if strings.TrimSpace(h.Content) != "" { raw = append(raw, schema.UserMessage(h.Content)) } case "assistant": - if strings.TrimSpace(h.Content) == "" && len(h.ToolCalls) > 0 { + toolSchema := chatToolCallsToSchema(h.ToolCalls) + if len(toolSchema) > 0 || strings.TrimSpace(h.Content) != "" { + raw = append(raw, schema.AssistantMessage(h.Content, toolSchema)) + } + case "tool": + if strings.TrimSpace(h.ToolCallID) == "" && strings.TrimSpace(h.Content) == "" { continue } - if strings.TrimSpace(h.Content) != "" { - raw = append(raw, schema.AssistantMessage(h.Content, nil)) + var opts []schema.ToolMessageOption + if tn := strings.TrimSpace(h.ToolName); tn != "" { + opts = append(opts, schema.WithToolName(tn)) } + raw = append(raw, schema.ToolMessage(h.Content, h.ToolCallID, opts...)) default: continue } } - if len(raw) == 0 { - return raw - } - maxTotal := 120000 - modelName := "gpt-4o" - if appCfg != nil { - if appCfg.OpenAI.MaxTotalTokens > 0 { - maxTotal = appCfg.OpenAI.MaxTotalTokens - } - if m := strings.TrimSpace(appCfg.OpenAI.Model); m != "" { - modelName = m - } - } - ratio := 0.35 - if mwCfg != nil { - ratio = mwCfg.HistoryInputBudgetRatioEffective() - } - budget := int(float64(maxTotal) * ratio) - if budget < 4096 { - budget = 4096 - } - tc := agent.NewTikTokenCounter() - outRev := make([]adk.Message, 0, len(raw)) - used := 0 - for i := len(raw) - 1; i >= 0; i-- { - msg := raw[i] - n, err := tc.Count(modelName, string(msg.Role)+"\n"+msg.Content) - if err != nil { - n = (len(msg.Content) + 3) / 4 - } - if n <= 0 { - n = 1 - } - if used+n > budget { - break - } - used += n - outRev = append(outRev, msg) - } - out := make([]adk.Message, 0, len(outRev)) - for i := len(outRev) - 1; i >= 0; i-- { - out = append(out, outRev[i]) - } - return out + return raw } // mergeStreamingToolCallFragments 将流式多帧的 ToolCall 按 index 合并 arguments(与 schema.concatToolCalls 行为一致)。