diff --git a/web/static/js/chat.js b/web/static/js/chat.js index 2b29e605..3797d91c 100644 --- a/web/static/js/chat.js +++ b/web/static/js/chat.js @@ -765,50 +765,59 @@ async function sendMessage() { if (!response.ok) { throw new Error('请求失败: ' + response.status); } - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - - while (true) { - const { done, value } = await reader.read(); - if (done) break; - - buffer += decoder.decode(value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop(); // 保留最后一个不完整的行 - - for (const line of lines) { - if (line.startsWith('data: ')) { - try { - const eventData = JSON.parse(line.slice(6)); - handleStreamEvent(eventData, progressElement, progressId, - () => assistantMessageId, (id) => { assistantMessageId = id; }, - () => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; }); - } catch (e) { - console.error('解析事件数据失败:', e, line); + + window.__csAgentLiveStream = { + active: true, + conversationId: currentConversationId || null, + progressId: progressId + }; + try { + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + buffer += decoder.decode(value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop(); // 保留最后一个不完整的行 + + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const eventData = JSON.parse(line.slice(6)); + handleStreamEvent(eventData, progressElement, progressId, + () => assistantMessageId, (id) => { assistantMessageId = id; }, + () => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; }); + } catch (e) { + console.error('解析事件数据失败:', e, line); + } } } } - } - - // 处理剩余的buffer - if (buffer.trim()) { - const lines = buffer.split('\n'); - for (const line of lines) { - if (line.startsWith('data: ')) { - try { - const eventData = JSON.parse(line.slice(6)); - handleStreamEvent(eventData, progressElement, progressId, - () => assistantMessageId, (id) => { assistantMessageId = id; }, - () => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; }); - } catch (e) { - console.error('解析事件数据失败:', e, line); + + // 处理剩余的buffer + if (buffer.trim()) { + const lines = buffer.split('\n'); + for (const line of lines) { + if (line.startsWith('data: ')) { + try { + const eventData = JSON.parse(line.slice(6)); + handleStreamEvent(eventData, progressElement, progressId, + () => assistantMessageId, (id) => { assistantMessageId = id; }, + () => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; }); + } catch (e) { + console.error('解析事件数据失败:', e, line); + } } } } + } finally { + window.__csAgentLiveStream = { active: false, conversationId: null, progressId: null }; } - + // 消息发送成功后,再次确保草稿被清除 clearChatDraft(); try { @@ -2922,6 +2931,22 @@ async function loadConversation(conversationId) { await window.restoreHitlInlineForConversation(conversationId); } } + + // 页面刷新后主流式连接会中断;若该会话仍在后端运行,自动挂载 task-events 补流继续更新前端迭代进度。 + const skipReplay = typeof window.shouldSkipTaskEventReplayAttach === 'function' + && window.shouldSkipTaskEventReplayAttach(conversationId); + if ( + seq === loadConversationRequestSeq && + currentConversationId === conversationId && + typeof window.attachRunningTaskEventStream === 'function' && + !skipReplay + ) { + Promise.resolve() + .then(() => window.attachRunningTaskEventStream(conversationId)) + .catch((e) => { + console.warn('attachRunningTaskEventStream on loadConversation failed', e); + }); + } } catch (error) { console.error('加载对话失败:', error); alert('加载对话失败: ' + error.message); diff --git a/web/static/js/monitor.js b/web/static/js/monitor.js index d791780b..5d3e102b 100644 --- a/web/static/js/monitor.js +++ b/web/static/js/monitor.js @@ -3,6 +3,36 @@ let activeTaskInterval = null; const ACTIVE_TASK_REFRESH_INTERVAL = 10000; // 10秒检查一次 const TASK_FINAL_STATUSES = new Set(['failed', 'timeout', 'cancelled', 'completed']); +/** + * 主对话 POST 流仍在读取时,禁止再挂 task-events 补流,否则同一事件会画两遍(与 HITL 是否开启无关)。 + * window.__csAgentLiveStream 由 chat.js sendMessage 在读到 body 后设置,在 finally 中清除。 + */ +function syncAgentLiveStreamConversationId(cid) { + if (!cid) return; + try { + const live = window.__csAgentLiveStream; + if (live && live.active) { + live.conversationId = cid; + } + } catch (e) { /* ignore */ } +} + +function shouldSkipTaskEventReplayAttach(conversationId) { + try { + const live = window.__csAgentLiveStream; + if (!live || !live.active || !live.progressId) return false; + if (!document.getElementById(live.progressId)) return false; + // 新会话:conversation 事件尚未到达前 conversationId 可能仍为 null,一律不补挂 + if (live.conversationId == null) return true; + return live.conversationId === conversationId; + } catch (e) { + return false; + } +} +if (typeof window !== 'undefined') { + window.shouldSkipTaskEventReplayAttach = shouldSkipTaskEventReplayAttach; +} + // 当前界面语言对应的 BCP 47 标签(与时间格式化一致) function getCurrentTimeLocale() { if (typeof window.__locale === 'string' && window.__locale.length) { @@ -934,6 +964,7 @@ function handleStreamEvent(event, progressElement, progressId, // 更新当前对话ID currentConversationId = event.data.conversationId; + syncAgentLiveStreamConversationId(event.data.conversationId); updateActiveConversation(); addAttackChainButton(currentConversationId); loadActiveTasks(); @@ -1472,6 +1503,7 @@ function handleStreamEvent(event, progressElement, progressId, break; } currentConversationId = responseData.conversationId; + syncAgentLiveStreamConversationId(responseData.conversationId); updateActiveConversation(); addAttackChainButton(currentConversationId); updateProgressConversation(progressId, responseData.conversationId); @@ -1552,6 +1584,7 @@ function handleStreamEvent(event, progressElement, progressId, } currentConversationId = responseData.conversationId; + syncAgentLiveStreamConversationId(responseData.conversationId); updateActiveConversation(); addAttackChainButton(currentConversationId); updateProgressConversation(progressId, responseData.conversationId); @@ -1682,6 +1715,7 @@ function handleStreamEvent(event, progressElement, progressId, // 更新对话ID if (event.data && event.data.conversationId) { currentConversationId = event.data.conversationId; + syncAgentLiveStreamConversationId(event.data.conversationId); updateActiveConversation(); addAttackChainButton(currentConversationId); updateProgressConversation(progressId, event.data.conversationId); @@ -1982,90 +2016,120 @@ async function refreshLastAssistantProcessDetails(conversationId) { window.refreshLastAssistantProcessDetails = refreshLastAssistantProcessDetails; +const taskEventReplayAttachState = { + conversationId: null, + inFlightPromise: null +}; + /** * 订阅运行中任务的 SSE 镜像(GET /api/agent-loop/task-events),用于 HITL 通过后主连接已断开时接续 UI。 */ async function attachRunningTaskEventStream(conversationId) { if (!conversationId || typeof apiFetch !== 'function') return false; - try { - const check = await apiFetch('/api/agent-loop/tasks'); - if (!check.ok) return false; - const j = await check.json().catch(function () { return {}; }); - const active = (j.tasks || []).some(function (t) { - return t && t.conversationId === conversationId && (t.status === 'running' || t.status === 'cancelling'); - }); - if (!active) return false; + if ( + taskEventReplayAttachState.inFlightPromise && + taskEventReplayAttachState.conversationId === conversationId + ) { + return taskEventReplayAttachState.inFlightPromise; + } + if (shouldSkipTaskEventReplayAttach(conversationId)) { + return false; + } - const asEl = findLastAssistantMessageElInChat(); - if (!asEl || !asEl.id) return false; - const backendId = asEl.dataset && asEl.dataset.backendMessageId; - if (backendId && typeof renderProcessDetails === 'function') { - const res = await apiFetch('/api/messages/' + encodeURIComponent(String(backendId)) + '/process-details'); - const jd = await res.json().catch(function () { return {}; }); - if (res.ok && Array.isArray(jd.processDetails)) { - renderProcessDetails(asEl.id, jd.processDetails); - } - } - expandProcessDetailsTimeline(asEl.id); + const attachPromise = (async function () { + try { + const check = await apiFetch('/api/agent-loop/tasks'); + if (!check.ok) return false; + const j = await check.json().catch(function () { return {}; }); + const active = (j.tasks || []).some(function (t) { + return t && t.conversationId === conversationId && (t.status === 'running' || t.status === 'cancelling'); + }); + if (!active) return false; - const progressId = taskReplayProgressId(conversationId); - beginCsTaskReplay(progressId, asEl.id, conversationId); - - const url = '/api/agent-loop/task-events?conversationId=' + encodeURIComponent(conversationId); - const response = await apiFetch(url, { - method: 'GET', - headers: { Accept: 'text/event-stream' } - }); - if (!response.ok) { - clearCsTaskReplay(); - if (progressTaskState.has(progressId)) { - progressTaskState.delete(progressId); - } - return false; - } - - let mcpIds = []; - const assistantDomId = asEl.id; - const getAssistantIdFn = function () { return assistantDomId; }; - const setAssistantIdFn = function () {}; - - const reader = response.body.getReader(); - const decoder = new TextDecoder(); - let buffer = ''; - while (true) { - const chunk = await reader.read(); - if (chunk.done) break; - buffer += decoder.decode(chunk.value, { stream: true }); - const lines = buffer.split('\n'); - buffer = lines.pop() || ''; - for (let li = 0; li < lines.length; li++) { - const line = lines[li]; - if (line.indexOf('data: ') === 0) { - try { - const eventData = JSON.parse(line.slice(6)); - handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = ids; }); - } catch (e) { - console.error('task-events parse', e); + const asEl = findLastAssistantMessageElInChat(); + if (!asEl || !asEl.id) return false; + const backendId = asEl.dataset && asEl.dataset.backendMessageId; + if (backendId && typeof renderProcessDetails === 'function') { + const res = await apiFetch('/api/messages/' + encodeURIComponent(String(backendId)) + '/process-details'); + const jd = await res.json().catch(function () { return {}; }); + if (res.ok && Array.isArray(jd.processDetails)) { + renderProcessDetails(asEl.id, jd.processDetails); + // renderProcessDetails 会重建时间线节点,需重新挂载 HITL 审批入口 + if (typeof window.restoreHitlInlineForConversation === 'function') { + await window.restoreHitlInlineForConversation(conversationId); } } } - } - if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) { + expandProcessDetailsTimeline(asEl.id); + + const progressId = taskReplayProgressId(conversationId); + beginCsTaskReplay(progressId, asEl.id, conversationId); + + const url = '/api/agent-loop/task-events?conversationId=' + encodeURIComponent(conversationId); + const response = await apiFetch(url, { + method: 'GET', + headers: { Accept: 'text/event-stream' } + }); + if (!response.ok) { + clearCsTaskReplay(); + if (progressTaskState.has(progressId)) { + progressTaskState.delete(progressId); + } + return false; + } + + let mcpIds = []; + const assistantDomId = asEl.id; + const getAssistantIdFn = function () { return assistantDomId; }; + const setAssistantIdFn = function () {}; + + const reader = response.body.getReader(); + const decoder = new TextDecoder(); + let buffer = ''; + while (true) { + const chunk = await reader.read(); + if (chunk.done) break; + buffer += decoder.decode(chunk.value, { stream: true }); + const lines = buffer.split('\n'); + buffer = lines.pop() || ''; + for (let li = 0; li < lines.length; li++) { + const line = lines[li]; + if (line.indexOf('data: ') === 0) { + try { + const eventData = JSON.parse(line.slice(6)); + handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = ids; }); + } catch (e) { + console.error('task-events parse', e); + } + } + } + } + if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) { + clearCsTaskReplay(); + } + if (progressTaskState.has(progressId)) { + finalizeProgressTask(progressId, typeof window.t === 'function' ? window.t('tasks.statusCompleted') : '已完成'); + } + if (typeof loadActiveTasks === 'function') loadActiveTasks(); + if (typeof window.loadConversation === 'function' && window.currentConversationId === conversationId) { + await window.loadConversation(conversationId); + } + return true; + } catch (e) { + console.warn('attachRunningTaskEventStream', e); clearCsTaskReplay(); + return false; + } finally { + if (taskEventReplayAttachState.inFlightPromise === attachPromise) { + taskEventReplayAttachState.inFlightPromise = null; + taskEventReplayAttachState.conversationId = null; + } } - if (progressTaskState.has(progressId)) { - finalizeProgressTask(progressId, typeof window.t === 'function' ? window.t('tasks.statusCompleted') : '已完成'); - } - if (typeof loadActiveTasks === 'function') loadActiveTasks(); - if (typeof window.loadConversation === 'function' && window.currentConversationId === conversationId) { - await window.loadConversation(conversationId); - } - return true; - } catch (e) { - console.warn('attachRunningTaskEventStream', e); - clearCsTaskReplay(); - return false; - } + })(); + + taskEventReplayAttachState.conversationId = conversationId; + taskEventReplayAttachState.inFlightPromise = attachPromise; + return attachPromise; } window.attachRunningTaskEventStream = attachRunningTaskEventStream;