From 2b38595b420eff5fe16e5b75f63a47303b4ec3b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 29 May 2026 10:54:39 +0800 Subject: [PATCH] Add files via upload --- web/static/js/chat.js | 2 + web/static/js/monitor.js | 87 ++++++++++++++++++++++++++++++++++++---- 2 files changed, 81 insertions(+), 8 deletions(-) diff --git a/web/static/js/chat.js b/web/static/js/chat.js index 18c2d39c..44d36823 100644 --- a/web/static/js/chat.js +++ b/web/static/js/chat.js @@ -995,6 +995,8 @@ async function sendMessage() { } } } + // Flush decoder internal buffer to avoid losing the final partial UTF-8 code point. + buffer += decoder.decode(); // 处理剩余的buffer if (buffer.trim()) { diff --git a/web/static/js/monitor.js b/web/static/js/monitor.js index 8eb60c85..97cd8911 100644 --- a/web/static/js/monitor.js +++ b/web/static/js/monitor.js @@ -238,6 +238,8 @@ function finalizeOutstandingToolCallsForProgress(progressId, finalStatus) { // 模型流式输出缓存:progressId -> { assistantId, buffer } const responseStreamStateByProgressId = new Map(); +// 主通道当前迭代轮次缓存:progressId -> { iteration, orchestration } +const mainIterationStateByProgressId = new Map(); /** 同一段主通道流式输出(Eino 可能重复 response_start) */ function sameMainResponseStreamMeta(a, b) { @@ -250,6 +252,40 @@ function sameMainResponseStreamMeta(a, b) { return orchA === orchB; } +function resolveMainIterationTag(progressId, responseData) { + const d = responseData || {}; + if (d.iteration != null) { + return String(d.iteration); + } + const cached = mainIterationStateByProgressId.get(String(progressId)); + if (!cached || cached.iteration == null) { + return ''; + } + const cachedOrch = String(cached.orchestration != null ? cached.orchestration : '').trim(); + const streamOrch = String(d.orchestration != null ? d.orchestration : '').trim(); + if (cachedOrch && streamOrch && cachedOrch !== streamOrch) { + return ''; + } + return String(cached.iteration); +} + +function buildMainResponseStreamIdentity(progressId, responseData) { + const d = responseData || {}; + const agent = String(d.einoAgent != null ? d.einoAgent : '').trim(); + const orch = String(d.orchestration != null ? d.orchestration : '').trim(); + const iterTag = resolveMainIterationTag(progressId, d); + return agent + '|' + orch + '|iter=' + iterTag; +} + +function extractIterationTagFromStreamIdentity(identity) { + const s = String(identity || ''); + const idx = s.lastIndexOf('|iter='); + if (idx < 0) { + return ''; + } + return s.slice(idx + 6); +} + // AI 思考流式输出:progressId -> Map(streamId -> { itemId, buffer }) const thinkingStreamStateByProgressId = new Map(); @@ -1372,6 +1408,13 @@ function handleStreamEvent(event, progressElement, progressId, case 'iteration': { const d = event.data || {}; const n = d.iteration != null ? d.iteration : 1; + const scope = d.einoScope != null ? String(d.einoScope).trim() : ''; + if (scope !== 'sub') { + mainIterationStateByProgressId.set(String(progressId), { + iteration: n, + orchestration: d.orchestration != null ? d.orchestration : '' + }); + } let iterTitle; if (d.orchestration === 'plan_execute' && d.einoScope === 'main') { const phase = translatePlanExecuteAgentName(d.einoAgent != null ? d.einoAgent : ''); @@ -1939,6 +1982,8 @@ function handleStreamEvent(event, progressElement, progressId, const responseOriginalConversationId = responseTaskState?.conversationId; const responseData = event.data || {}; + const streamIdentity = buildMainResponseStreamIdentity(progressId, responseData); + const streamIterTag = extractIterationTagFromStreamIdentity(streamIdentity); const mcpIds = responseData.mcpExecutionIds || []; setMcpIds(mergeMcpExecutionIDLists(typeof getMcpIds === 'function' ? (getMcpIds() || []) : [], mcpIds)); @@ -1958,25 +2003,33 @@ function handleStreamEvent(event, progressElement, progressId, // 多代理模式下,迭代过程中的输出只显示在时间线中,不创建助手消息气泡 const prevStream = responseStreamStateByProgressId.get(progressId); - if (prevStream && prevStream.itemId && sameMainResponseStreamMeta(prevStream.streamMeta, responseData)) { + const prevIterTag = extractIterationTagFromStreamIdentity(prevStream && prevStream.streamIdentity ? prevStream.streamIdentity : ''); + const compatibleIterTag = !prevIterTag || !streamIterTag || prevIterTag === streamIterTag; + if ( + prevStream && + prevStream.itemId && + sameMainResponseStreamMeta(prevStream.streamMeta, responseData) && + compatibleIterTag + ) { // Eino 可能对同一段流重复发 response_start;复用已有条目与 buffer,避免多条「助手输出」 prevStream.streamMeta = Object.assign({}, prevStream.streamMeta || {}, responseData); + // 若此前轮次未知(空),在后续事件带来轮次后升级 identity,避免跨轮误复用。 + prevStream.streamIdentity = streamIdentity; responseStreamStateByProgressId.set(progressId, prevStream); break; } - if (prevStream && prevStream.itemId) { - const oldItem = document.getElementById(prevStream.itemId); - if (oldItem && oldItem.parentNode) { - oldItem.parentNode.removeChild(oldItem); - } - } const title = einoMainStreamPlanningTitle(responseData); const itemId = addTimelineItem(timeline, 'thinking', { title: title, message: ' ', data: Object.assign({}, responseData, { responseStreamPlaceholder: true }) }); - responseStreamStateByProgressId.set(progressId, { itemId: itemId, buffer: '', streamMeta: responseData }); + responseStreamStateByProgressId.set(progressId, { + itemId: itemId, + buffer: '', + streamMeta: responseData, + streamIdentity: streamIdentity + }); break; } @@ -2145,11 +2198,13 @@ function handleStreamEvent(event, progressElement, progressId, loadActiveTasks(); // Close any remaining running tool calls for this progress. finalizeOutstandingToolCallsForProgress(progressId, 'failed'); + mainIterationStateByProgressId.delete(String(progressId)); break; case 'done': // 清理流式输出状态 responseStreamStateByProgressId.delete(progressId); + mainIterationStateByProgressId.delete(String(progressId)); thinkingStreamStateByProgressId.delete(progressId); einoAgentReplyStreamStateByProgressId.delete(progressId); // 清理工具流式输出占位 @@ -2570,6 +2625,22 @@ async function attachRunningTaskEventStream(conversationId) { } } } + // Flush decoder internal buffer to avoid dropping trailing partial UTF-8 bytes. + buffer += decoder.decode(); + if (buffer.trim()) { + const lines = buffer.split('\n'); + for (let li = 0; li < lines.length; li++) { + const line = lines[li]; + if (line.indexOf('data: ') === 0) { + try { + const eventData = JSON.parse(line.slice(6)); + handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); }); + } catch (e) { + console.error('task-events parse', e); + } + } + } + } if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) { clearCsTaskReplay(); }