Compare commits

..

23 Commits

Author SHA1 Message Date
公明 4661862a1a Add files via upload 2026-06-11 18:03:09 +08:00
公明 f319a0f243 Add files via upload 2026-06-11 18:01:38 +08:00
公明 15c4802319 Add files via upload 2026-06-11 17:18:58 +08:00
公明 6ffde48b0c Add files via upload 2026-06-11 16:54:36 +08:00
公明 c5e2f0d95d Add files via upload 2026-06-11 16:02:48 +08:00
公明 28a826d5b7 Add files via upload 2026-06-11 15:56:25 +08:00
公明 6365de7018 Add files via upload 2026-06-11 11:50:31 +08:00
公明 2e4bf7197b Add files via upload 2026-06-11 11:48:17 +08:00
公明 ed4ba08163 Add files via upload 2026-06-11 11:46:23 +08:00
公明 8b5e55a673 Add files via upload 2026-06-11 11:44:20 +08:00
公明 e8a75e5105 Update config.yaml 2026-06-11 02:03:03 +08:00
公明 48976ed650 Add files via upload 2026-06-11 01:48:42 +08:00
公明 dc9ecae7fd Add files via upload 2026-06-11 01:43:35 +08:00
公明 a9d0a59f7a Add files via upload 2026-06-11 01:41:57 +08:00
公明 5ec4729b83 Add files via upload 2026-06-11 01:40:00 +08:00
公明 9857003018 Add files via upload 2026-06-11 01:38:25 +08:00
公明 a6e7885fed Add files via upload 2026-06-11 01:31:18 +08:00
公明 e69375451c Add files via upload 2026-06-11 01:29:07 +08:00
公明 07e7f104ad Add files via upload 2026-06-11 01:27:50 +08:00
公明 ffce9185bb Add files via upload 2026-06-11 01:16:20 +08:00
公明 612f16455d Add files via upload 2026-06-11 01:14:52 +08:00
公明 ecd5b40bc2 Add files via upload 2026-06-11 01:13:11 +08:00
公明 5aa7306c9b Update config.yaml 2026-06-11 00:53:39 +08:00
34 changed files with 1287 additions and 169 deletions
+6 -6
View File
@@ -29,7 +29,6 @@ If CyberStrikeAI helps you, you can support the project via **WeChat Pay** or **
CyberStrikeAI is an **AI-native security testing platform** built in Go. It integrates 100+ security tools, an intelligent orchestration engine, role-based testing with predefined security roles, a skills system with specialized testing skills, comprehensive lifecycle management capabilities, and a **built-in lightweight C2 (Command & Control) framework** for **authorized** engagements (listeners, encrypted implants, sessions, tasks, real-time events, REST and MCP). Through native MCP protocol and AI agents, it enables end-to-end automation from conversational commands to vulnerability discovery, attack-chain analysis, knowledge retrieval, and result visualization—delivering an auditable, traceable, and collaborative testing environment for security teams. CyberStrikeAI is an **AI-native security testing platform** built in Go. It integrates 100+ security tools, an intelligent orchestration engine, role-based testing with predefined security roles, a skills system with specialized testing skills, comprehensive lifecycle management capabilities, and a **built-in lightweight C2 (Command & Control) framework** for **authorized** engagements (listeners, encrypted implants, sessions, tasks, real-time events, REST and MCP). Through native MCP protocol and AI agents, it enables end-to-end automation from conversational commands to vulnerability discovery, attack-chain analysis, knowledge retrieval, and result visualization—delivering an auditable, traceable, and collaborative testing environment for security teams.
## Interface & Integration Preview ## Interface & Integration Preview
<div align="center"> <div align="center">
@@ -117,9 +116,9 @@ CyberStrikeAI is an **AI-native security testing platform** built in Go. It inte
- 🛡️ Vulnerability management with CRUD operations, severity tracking, status workflow, and statistics - 🛡️ Vulnerability management with CRUD operations, severity tracking, status workflow, and statistics
- 📋 Batch task management: create task queues, add multiple tasks, and execute them sequentially - 📋 Batch task management: create task queues, add multiple tasks, and execute them sequentially
- 🎭 Role-based testing: predefined security testing roles (Penetration Testing, CTF, Web App Scanning, etc.) with custom prompts and tool restrictions - 🎭 Role-based testing: predefined security testing roles (Penetration Testing, CTF, Web App Scanning, etc.) with custom prompts and tool restrictions
- 🧩 **Agent orchestration (CloudWeGo Eino)**: **single-agent** via **`/api/eino-agent/stream`** (Eino ADK `ChatModelAgent`); **multi-agent** via **`/api/multi-agent/stream`** with **`deep`** (coordinator + `task` sub-agents), **`plan_execute`**, or **`supervisor`** (`orchestration` in the request body). Markdown under `agents/`: `orchestrator.md`, `orchestrator-plan-execute.md`, `orchestrator-supervisor.md`, plus sub-agent `*.md` (see [Multi-agent doc](docs/MULTI_AGENT_EINO.md)) - 🧩 **Agent orchestration (CloudWeGo Eino)**: **single-agent** via **`/api/eino-agent/stream`** (Eino ADK `ChatModelAgent`); **multi-agent** via **`/api/multi-agent/stream`** with **`deep`** (coordinator + `task` sub-agents), **`plan_execute`**, or **`supervisor`** (`orchestration` in the request body). ADK **summarization** compresses long contexts; pre-compaction **transcripts** land at `data/conversation_artifacts/<conversation-id>/summarization/transcript.txt` (full user/assistant/tool turns; static system omitted). Markdown under `agents/`: `orchestrator.md`, `orchestrator-plan-execute.md`, `orchestrator-supervisor.md`, plus sub-agent `*.md` (see [Multi-agent doc](docs/MULTI_AGENT_EINO.md))
- 🖼️ **Vision analysis (`analyze_image`)**: separate VL model (e.g. `qwen-vl-max`) via MCP for local screenshots, captchas, and UI; image bytes stay out of agent history (text summaries only). Configure `vision` in `config.yaml`; see [docs/VISION.md](docs/VISION.md) - 🖼️ **Vision analysis (`analyze_image`)**: separate VL model (e.g. `qwen-vl-max`) via MCP for local screenshots, captchas, and UI; image bytes stay out of agent history (text summaries only). Configure `vision` in `config.yaml`; see [docs/VISION.md](docs/VISION.md)
- 🎯 **Skills (refactored for Eino)**: packs under `skills_dir` follow **Agent Skills** layout (`SKILL.md` + optional files); **multi-agent** sessions use the official Eino ADK **`skill`** tool for **progressive disclosure** (load by name), with optional **host filesystem / shell** via `multi_agent.eino_skills`; optional **`eino_middleware`** adds patchtoolcalls, tool_search, plantask, reduction, checkpoints, and Deep tuning—20+ sample domains (SQLi, XSS, API security, …) ship under `skills/` - 🎯 **Skills (refactored for Eino)**: packs under `skills_dir` follow **Agent Skills** layout (`SKILL.md` + optional files); **multi-agent** sessions use the official Eino ADK **`skill`** tool for **progressive disclosure** (load by name), with optional **host filesystem / shell** via `multi_agent.eino_skills`; optional **`eino_middleware`** adds patchtoolcalls, tool_search, **plantask** (`TaskCreate` / `TaskList` boards under `skills_dir/.eino/plantask/`), reduction, file **checkpoints** (`checkpoint_dir`), ChatModel **retries**, session **output key**, and Deep tuning—20+ sample domains (SQLi, XSS, API security, …) ship under `skills/`
- 📱 **Chatbot**: DingTalk and Lark (Feishu) long-lived connections so you can talk to CyberStrikeAI from mobile (see [Robot / Chatbot guide](docs/robot_en.md) for setup and commands) - 📱 **Chatbot**: DingTalk and Lark (Feishu) long-lived connections so you can talk to CyberStrikeAI from mobile (see [Robot / Chatbot guide](docs/robot_en.md) for setup and commands)
- 🧑‍⚖️ **Human-in-the-loop (HITL)**: Chat sidebar to set approval mode and tool allowlists (listed tools skip approval); global list in `config.yaml` under `hitl.tool_whitelist`; **Apply** can merge new tools into the file and update the running server without restart; dedicated **HITL** page for pending approvals - 🧑‍⚖️ **Human-in-the-loop (HITL)**: Chat sidebar to set approval mode and tool allowlists (listed tools skip approval); global list in `config.yaml` under `hitl.tool_whitelist`; **Apply** can merge new tools into the file and update the running server without restart; dedicated **HITL** page for pending approvals
- 🐚 **WebShell management**: Add and manage WebShell connections (e.g. IceSword/AntSword compatible), use a virtual terminal for command execution, a built-in file manager for file operations, and an AI assistant tab that orchestrates tests and keeps per-connection conversation history; supports PHP, ASP, ASPX, JSP and custom shell types with configurable request method and command parameter. - 🐚 **WebShell management**: Add and manage WebShell connections (e.g. IceSword/AntSword compatible), use a virtual terminal for command execution, a built-in file manager for file operations, and an AI assistant tab that orchestrates tests and keeps per-connection conversation history; supports PHP, ASP, ASPX, JSP and custom shell types with configurable request method and command parameter.
@@ -261,7 +260,7 @@ Requirements / tips:
- **Predefined roles** System includes 12+ predefined security testing roles (Penetration Testing, CTF, Web App Scanning, API Security Testing, Binary Analysis, Cloud Security Audit, etc.) in the `roles/` directory. - **Predefined roles** System includes 12+ predefined security testing roles (Penetration Testing, CTF, Web App Scanning, API Security Testing, Binary Analysis, Cloud Security Audit, etc.) in the `roles/` directory.
- **Custom prompts** Each role can define a `user_prompt` that prepends to user messages, guiding the AI to adopt specialized testing methodologies and focus areas. - **Custom prompts** Each role can define a `user_prompt` that prepends to user messages, guiding the AI to adopt specialized testing methodologies and focus areas.
- **Tool restrictions** Roles can specify a `tools` list to limit available tools, ensuring focused testing workflows (e.g., CTF role restricts to CTF-specific utilities). - **Tool restrictions** Roles can specify a `tools` list to limit available tools, ensuring focused testing workflows (e.g., CTF role restricts to CTF-specific utilities).
- **Skills** Skill packs live under `skills_dir` and load via the Eino ADK **`skill`** tool (**progressive disclosure**) in both **single- and multi-agent** sessions when **`multi_agent.eino_skills`** is enabled. Optional host **read_file / glob / grep / write / edit / execute** and **`eino_middleware`** (tool_search, reduction, checkpoints, etc.) apply per mode—see docs. - **Skills** Skill packs live under `skills_dir` and load via the Eino ADK **`skill`** tool (**progressive disclosure**) in both **single- and multi-agent** sessions when **`multi_agent.eino_skills`** is enabled. Optional host **read_file / glob / grep / write / edit / execute** and **`eino_middleware`** (tool_search, plantask, reduction, checkpoints, summarization transcripts, etc.) apply per mode—see docs.
- **Easy role creation** Create custom roles by adding YAML files to the `roles/` directory. Each role defines `name`, `description`, `user_prompt`, `icon`, `tools`, and `enabled` fields. - **Easy role creation** Create custom roles by adding YAML files to the `roles/` directory. Each role defines `name`, `description`, `user_prompt`, `icon`, `tools`, and `enabled` fields.
- **Web UI integration** Select roles from a dropdown in the chat interface. Role selection affects both AI behavior and available tool suggestions. - **Web UI integration** Select roles from a dropdown in the chat interface. Role selection affects both AI behavior and available tool suggestions.
@@ -289,6 +288,7 @@ Requirements / tips:
- **Sub-agents** (for **deep** / **supervisor**): other `*.md` files (YAML front matter + body). Not used as **`task`** targets if marked orchestrator-only. - **Sub-agents** (for **deep** / **supervisor**): other `*.md` files (YAML front matter + body). Not used as **`task`** targets if marked orchestrator-only.
- **Management** Web UI: **Agents → Agent management**; API `/api/multi-agent/markdown-agents`. - **Management** Web UI: **Agents → Agent management**; API `/api/multi-agent/markdown-agents`.
- **Config** `multi_agent` in `config.yaml`: `enabled`, `robot_default_agent_mode`, `batch_use_multi_agent`, `max_iteration`, `plan_execute_loop_max_iterations`, per-mode orchestrator instruction fields, optional YAML `sub_agents` merged with disk (`id` clash → Markdown wins), **`eino_skills`**, **`eino_middleware`** (optional ADK middleware and Deep/Supervisor tuning). - **Config** `multi_agent` in `config.yaml`: `enabled`, `robot_default_agent_mode`, `batch_use_multi_agent`, `max_iteration`, `plan_execute_loop_max_iterations`, per-mode orchestrator instruction fields, optional YAML `sub_agents` merged with disk (`id` clash → Markdown wins), **`eino_skills`**, **`eino_middleware`** (optional ADK middleware and Deep/Supervisor tuning).
- **Resilience & long runs** `checkpoint_dir` enables ADK **resume** after process crashes (distinct from trace-based “interrupt & continue”). `deep_model_retry_max_retries` retries transient LLM API failures within a single call. **Summarization** writes a filtered **transcript** when compression fires; the summary message includes the path so the model can `read_file` for scan output and other pre-compaction details.
- **Details** **[docs/MULTI_AGENT_EINO.md](docs/MULTI_AGENT_EINO.md)** (streaming, robots, batch, middleware caveats). - **Details** **[docs/MULTI_AGENT_EINO.md](docs/MULTI_AGENT_EINO.md)** (streaming, robots, batch, middleware caveats).
### Skills System (Agent Skills + Eino) ### Skills System (Agent Skills + Eino)
@@ -296,7 +296,7 @@ Requirements / tips:
- **Runtime refactor** **`skills_dir`** is the single root for packs. **Multi-agent** loads them through Einos official **`skill`** middleware (**progressive disclosure**: model calls `skill` with a pack **name** instead of receiving full SKILL text up front). Configure via **`multi_agent.eino_skills`**: `disable`, `filesystem_tools` (host read/glob/grep/write/edit/execute), `skill_tool_name`. - **Runtime refactor** **`skills_dir`** is the single root for packs. **Multi-agent** loads them through Einos official **`skill`** middleware (**progressive disclosure**: model calls `skill` with a pack **name** instead of receiving full SKILL text up front). Configure via **`multi_agent.eino_skills`**: `disable`, `filesystem_tools` (host read/glob/grep/write/edit/execute), `skill_tool_name`.
- **Eino / RAG** Packages are also split into `schema.Document` chunks for `FilesystemSkillsRetriever` (`skills.AsEinoRetriever()`) in **compose** graphs (e.g. knowledge/indexing pipelines). - **Eino / RAG** Packages are also split into `schema.Document` chunks for `FilesystemSkillsRetriever` (`skills.AsEinoRetriever()`) in **compose** graphs (e.g. knowledge/indexing pipelines).
- **HTTP API** `/api/skills` listing and `depth` (`summary` | `full`), `section`, and `resource_path` remain for the web UI and ops; **model-side** skill loading in multi-agent uses the **`skill`** tool, not MCP. - **HTTP API** `/api/skills` listing and `depth` (`summary` | `full`), `section`, and `resource_path` remain for the web UI and ops; **model-side** skill loading in multi-agent uses the **`skill`** tool, not MCP.
- **Optional `eino_middleware`** e.g. `tool_search` (dynamic MCP tool list), `patch_tool_calls`, `plantask` (structured tasks; persistence defaults under a subdirectory of `skills_dir`), `reduction`, `checkpoint_dir`, Deep output key / model retries / task-tool description prefix—see `config.yaml` and `internal/config/config.go`. - **Optional `eino_middleware`** e.g. `tool_search` (dynamic MCP tool list), `patch_tool_calls`, **`plantask`** (Eino `TaskCreate` / `TaskGet` / `TaskUpdate` / `TaskList`; JSON under `skills_dir/.eino/plantask/<conversation-id>/`; Eino clears task files when **all** tasks are marked completed), `reduction`, **`checkpoint_dir`** (`data/eino-checkpoints/`), **`deep_model_retry_max_retries`**, **`deep_output_key`**, task-tool description prefix—see `config.yaml` and `internal/config/config.go`.
- **Shipped demo** `skills/cyberstrike-eino-demo/`; see `skills/README.md`. - **Shipped demo** `skills/cyberstrike-eino-demo/`; see `skills/README.md`.
**Creating a skill:** **Creating a skill:**
@@ -544,7 +544,7 @@ multi_agent:
orchestrator_instruction: "" # Deep; used when orchestrator.md body is empty orchestrator_instruction: "" # Deep; used when orchestrator.md body is empty
# orchestrator_instruction_plan_execute / orchestrator_instruction_supervisor optional # orchestrator_instruction_plan_execute / orchestrator_instruction_supervisor optional
# eino_skills: { disable: false, filesystem_tools: true, skill_tool_name: skill } # eino_skills: { disable: false, filesystem_tools: true, skill_tool_name: skill }
# eino_middleware: optional patch_tool_calls, tool_search, plantask, reduction, checkpoint_dir, ... # eino_middleware: plantask_enable, checkpoint_dir, deep_model_retry_max_retries, deep_output_key, ...
``` ```
### Tool Definition Example (`tools/nmap.yaml`) ### Tool Definition Example (`tools/nmap.yaml`)
+6 -6
View File
@@ -28,7 +28,6 @@
CyberStrikeAI 是一款 **AI 原生安全测试平台**,基于 Go 构建,集成了 100+ 安全工具、智能编排引擎、角色化测试与预设安全测试角色、Skills 技能系统与专业测试技能、完整的测试生命周期管理能力,以及面向 **授权场景****内置轻量 C2Command & Control,指挥与控制)** 能力(监听器、加密通信、会话与任务、实时事件、REST 与 MCP 协同)。通过原生 MCP 协议与 AI 智能体,支持从对话指令到漏洞发现、攻击链分析、知识检索与结果可视化的全流程自动化,为安全团队提供可审计、可追溯、可协作的专业测试环境。 CyberStrikeAI 是一款 **AI 原生安全测试平台**,基于 Go 构建,集成了 100+ 安全工具、智能编排引擎、角色化测试与预设安全测试角色、Skills 技能系统与专业测试技能、完整的测试生命周期管理能力,以及面向 **授权场景****内置轻量 C2Command & Control,指挥与控制)** 能力(监听器、加密通信、会话与任务、实时事件、REST 与 MCP 协同)。通过原生 MCP 协议与 AI 智能体,支持从对话指令到漏洞发现、攻击链分析、知识检索与结果可视化的全流程自动化,为安全团队提供可审计、可追溯、可协作的专业测试环境。
## 界面与集成预览 ## 界面与集成预览
<div align="center"> <div align="center">
@@ -116,9 +115,9 @@ CyberStrikeAI 是一款 **AI 原生安全测试平台**,基于 Go 构建,集
- 🛡️ 漏洞管理功能:完整的漏洞 CRUD 操作,支持严重程度分级、状态流转、按对话/严重程度/状态过滤,以及统计看板 - 🛡️ 漏洞管理功能:完整的漏洞 CRUD 操作,支持严重程度分级、状态流转、按对话/严重程度/状态过滤,以及统计看板
- 📋 批量任务管理:创建任务队列,批量添加任务,依次顺序执行,支持任务编辑与状态跟踪 - 📋 批量任务管理:创建任务队列,批量添加任务,依次顺序执行,支持任务编辑与状态跟踪
- 🎭 角色化测试:预设安全测试角色(渗透测试、CTF、Web 应用扫描等),支持自定义提示词和工具限制 - 🎭 角色化测试:预设安全测试角色(渗透测试、CTF、Web 应用扫描等),支持自定义提示词和工具限制
- 🧩 **Agent 编排(CloudWeGo Eino****单代理** `POST /api/eino-agent/stream`Eino ADK);**多代理** `POST /api/multi-agent/stream``orchestration`**`deep`** / **`plan_execute`** / **`supervisor`**。`agents/` 下主代理与子代理 Markdown 见 [多代理说明](docs/MULTI_AGENT_EINO.md) - 🧩 **Agent 编排(CloudWeGo Eino****单代理** `POST /api/eino-agent/stream`Eino ADK);**多代理** `POST /api/multi-agent/stream``orchestration`**`deep`** / **`plan_execute`** / **`supervisor`**。ADK **Summarization** 在上下文过长时压缩历史;压缩前将可恢复 **转录** 写入 `data/conversation_artifacts/<会话ID>/summarization/transcript.txt`(保留完整 user/assistant/tool 轮次,省略静态 system)。`agents/` 下主代理与子代理 Markdown 见 [多代理说明](docs/MULTI_AGENT_EINO.md)
- 🖼️ **视觉分析(`analyze_image`**:独立 Vision 模型(如 `qwen-vl-max`),MCP 工具分析本地截图/验证码/UI;图片仅在单次 VL 调用中出现,对话上下文只保留文字摘要。配置见 `config.yaml``vision` 与 [视觉分析说明](docs/VISION.md) - 🖼️ **视觉分析(`analyze_image`**:独立 Vision 模型(如 `qwen-vl-max`),MCP 工具分析本地截图/验证码/UI;图片仅在单次 VL 调用中出现,对话上下文只保留文字摘要。配置见 `config.yaml``vision` 与 [视觉分析说明](docs/VISION.md)
- 🎯 **Skills(面向 Eino 重构)**:技能包放在 **`skills_dir`**,遵循 **Agent Skills** 目录规范(`SKILL.md` + 可选文件);**多代理** 下通过 Eino 官方 **`skill`** 工具 **渐进式披露**(按 name 加载)。**`multi_agent.eino_skills`** 控制是否启用、本机文件/Shell 工具、工具名覆盖;**`eino_middleware`** 可选 patch、tool_search、plantask、reduction、断点目录及 Deep 调参。20+ 领域示例仍可绑定角色 - 🎯 **Skills(面向 Eino 重构)**:技能包放在 **`skills_dir`**,遵循 **Agent Skills** 目录规范(`SKILL.md` + 可选文件);**多代理** 下通过 Eino 官方 **`skill`** 工具 **渐进式披露**(按 name 加载)。**`multi_agent.eino_skills`** 控制是否启用、本机文件/Shell 工具、工具名覆盖;**`eino_middleware`** 可选 patch、tool_search、**plantask**`TaskCreate` / `TaskList` 任务板,落在 `skills_dir/.eino/plantask/`)、reduction、文件型 **checkpoint**`checkpoint_dir`)、ChatModel **重试**、会话 **输出键** 及 Deep 调参。20+ 领域示例仍可绑定角色
- 📱 **机器人**:支持钉钉、飞书长连接,在手机端与 CyberStrikeAI 对话(配置与命令详见 [机器人使用说明](docs/robot.md) - 📱 **机器人**:支持钉钉、飞书长连接,在手机端与 CyberStrikeAI 对话(配置与命令详见 [机器人使用说明](docs/robot.md)
- 🧑‍⚖️ **人机协同(HITL**:对话页侧栏配置协同模式与免审批工具白名单;全局列表在 `config.yaml``hitl.tool_whitelist`;点「应用」可将新增工具合并写入配置文件且**无需重启**即可生效;导航 **人机协同** 页处理待审批工具调用 - 🧑‍⚖️ **人机协同(HITL**:对话页侧栏配置协同模式与免审批工具白名单;全局列表在 `config.yaml``hitl.tool_whitelist`;点「应用」可将新增工具合并写入配置文件且**无需重启**即可生效;导航 **人机协同** 页处理待审批工具调用
- 🐚 **WebShell 管理**:添加与管理 WebShell 连接(兼容冰蝎/蚁剑等),通过虚拟终端执行命令、内置文件管理进行文件操作,并提供按连接维度保存历史的 AI 助手标签页;支持 PHP/ASP/ASPX/JSP 及自定义类型,可配置请求方法与命令参数。 - 🐚 **WebShell 管理**:添加与管理 WebShell 连接(兼容冰蝎/蚁剑等),通过虚拟终端执行命令、内置文件管理进行文件操作,并提供按连接维度保存历史的 AI 助手标签页;支持 PHP/ASP/ASPX/JSP 及自定义类型,可配置请求方法与命令参数。
@@ -259,7 +258,7 @@ go build -o cyberstrike-ai cmd/server/main.go
- **预设角色**:系统内置 12+ 个预设的安全测试角色(渗透测试、CTF、Web 应用扫描、API 安全测试、二进制分析、云安全审计等),位于 `roles/` 目录。 - **预设角色**:系统内置 12+ 个预设的安全测试角色(渗透测试、CTF、Web 应用扫描、API 安全测试、二进制分析、云安全审计等),位于 `roles/` 目录。
- **自定义提示词**:每个角色可定义 `user_prompt`,会在用户消息前自动添加,引导 AI 采用特定的测试方法和关注重点。 - **自定义提示词**:每个角色可定义 `user_prompt`,会在用户消息前自动添加,引导 AI 采用特定的测试方法和关注重点。
- **工具限制**:角色可指定 `tools` 列表,限制可用工具,实现聚焦的测试流程(如 CTF 角色限制为 CTF 专用工具)。 - **工具限制**:角色可指定 `tools` 列表,限制可用工具,实现聚焦的测试流程(如 CTF 角色限制为 CTF 专用工具)。
- **Skills**:技能包位于 `skills_dir`;启用 **`multi_agent.eino_skills`** 后,**单代理与多代理**均可通过 Eino **`skill`** 工具按需加载。中间件与本机 read_file/glob/grep 等见文档。 - **Skills**:技能包位于 `skills_dir`;启用 **`multi_agent.eino_skills`** 后,**单代理与多代理**均可通过 Eino **`skill`** 工具按需加载。可选 **`eino_middleware`**tool_search、plantask、reduction、checkpoint、Summarization 转录等)与本机 read_file/glob/grep 等见文档。
- **轻松创建角色**:通过在 `roles/` 目录添加 YAML 文件即可创建自定义角色。每个角色定义 `name`、`description`、`user_prompt`、`icon`、`tools`、`enabled` 字段。 - **轻松创建角色**:通过在 `roles/` 目录添加 YAML 文件即可创建自定义角色。每个角色定义 `name`、`description`、`user_prompt`、`icon`、`tools`、`enabled` 字段。
- **Web 界面集成**:在聊天界面通过下拉菜单选择角色。角色选择会影响 AI 行为和可用工具建议。 - **Web 界面集成**:在聊天界面通过下拉菜单选择角色。角色选择会影响 AI 行为和可用工具建议。
@@ -287,6 +286,7 @@ go build -o cyberstrike-ai cmd/server/main.go
- **子代理****deep** / **supervisor**):其余 `*.md`;标成 orchestrator 的不会进入 `task` 列表。 - **子代理****deep** / **supervisor**):其余 `*.md`;标成 orchestrator 的不会进入 `task` 列表。
- **界面管理****Agents → Agent 管理**API `/api/multi-agent/markdown-agents`。 - **界面管理****Agents → Agent 管理**API `/api/multi-agent/markdown-agents`。
- **配置项**`multi_agent``enabled`、`robot_default_agent_mode`、`batch_use_multi_agent`、`max_iteration`、`plan_execute_loop_max_iterations`、各模式 orchestrator 指令字段、可选 YAML `sub_agents` 与目录合并(同 `id` → Markdown 优先)、**`eino_skills`**、**`eino_middleware`**。 - **配置项**`multi_agent``enabled`、`robot_default_agent_mode`、`batch_use_multi_agent`、`max_iteration`、`plan_execute_loop_max_iterations`、各模式 orchestrator 指令字段、可选 YAML `sub_agents` 与目录合并(同 `id` → Markdown 优先)、**`eino_skills`**、**`eino_middleware`**。
- **长任务与恢复**`checkpoint_dir` 支持进程崩溃后 ADK **断点续跑**(与基于 trace 的「中断继续」不同)。`deep_model_retry_max_retries` 在同一次 LLM 调用内重试瞬时 API 失败。**Summarization** 触发压缩时会写入过滤后的 **transcript**,摘要消息中带路径,模型可用 `read_file` 找回扫描输出等压缩前细节。
- **更多细节**[docs/MULTI_AGENT_EINO.md](docs/MULTI_AGENT_EINO.md)(流式、机器人、批量、中间件差异)。 - **更多细节**[docs/MULTI_AGENT_EINO.md](docs/MULTI_AGENT_EINO.md)(流式、机器人、批量、中间件差异)。
### Skills 技能系统(Agent Skills + Eino ### Skills 技能系统(Agent Skills + Eino
@@ -294,7 +294,7 @@ go build -o cyberstrike-ai cmd/server/main.go
- **运行侧重构****`skills_dir`** 为技能包唯一根目录;**多代理** 通过 Eino 官方 **`skill`** 中间件做 **渐进式披露**(模型按 **name** 调用 `skill`,而非一次性注入全文)。由 **`multi_agent.eino_skills`** 控制:`disable`、`filesystem_tools`(本机读写与 Shell)、`skill_tool_name`。 - **运行侧重构****`skills_dir`** 为技能包唯一根目录;**多代理** 通过 Eino 官方 **`skill`** 中间件做 **渐进式披露**(模型按 **name** 调用 `skill`,而非一次性注入全文)。由 **`multi_agent.eino_skills`** 控制:`disable`、`filesystem_tools`(本机读写与 Shell)、`skill_tool_name`。
- **Eino / 知识流水线**:技能包可切分为 `schema.Document`,供 `FilesystemSkillsRetriever``skills.AsEinoRetriever()`)在 **compose** 图(如索引/编排)中使用。 - **Eino / 知识流水线**:技能包可切分为 `schema.Document`,供 `FilesystemSkillsRetriever``skills.AsEinoRetriever()`)在 **compose** 图(如索引/编排)中使用。
- **HTTP 管理**`/api/skills` 列表与 `depth=summary|full`、`section`、`resource_path` 等仍用于 Web 与运维;**模型侧** 多代理走 **`skill`** 工具,而非 MCP。 - **HTTP 管理**`/api/skills` 列表与 `depth=summary|full`、`section`、`resource_path` 等仍用于 Web 与运维;**模型侧** 多代理走 **`skill`** 工具,而非 MCP。
- **可选 `eino_middleware`**:如 `tool_search`(动态工具列表)、`patch_tool_calls`、`plantask`(结构化任务;默认落在 `skills_dir` 下子目录)、`reduction`、`checkpoint_dir`、Deep 输出键 / 模型重试 / task 描述前缀等,见 `config.yaml` 与 `internal/config/config.go`。 - **可选 `eino_middleware`**:如 `tool_search`(动态工具列表)、`patch_tool_calls`、**`plantask`**Eino `TaskCreate` / `TaskGet` / `TaskUpdate` / `TaskList`JSON 存于 `skills_dir/.eino/plantask/<会话ID>/`**全部**任务标为 completed 后 Eino 会清理任务文件)、`reduction`、**`checkpoint_dir`**(如 `data/eino-checkpoints/`)、**`deep_model_retry_max_retries`**、**`deep_output_key`**、task 描述前缀等,见 `config.yaml` 与 `internal/config/config.go`。
- **自带示例**`skills/cyberstrike-eino-demo/`;说明见 `skills/README.md`。 - **自带示例**`skills/cyberstrike-eino-demo/`;说明见 `skills/README.md`。
**新建技能:** **新建技能:**
@@ -542,7 +542,7 @@ multi_agent:
orchestrator_instruction: "" # Deeporchestrator.md 正文为空时使用 orchestrator_instruction: "" # Deeporchestrator.md 正文为空时使用
# orchestrator_instruction_plan_execute / orchestrator_instruction_supervisor 可选 # orchestrator_instruction_plan_execute / orchestrator_instruction_supervisor 可选
# eino_skills: { disable: false, filesystem_tools: true, skill_tool_name: skill } # eino_skills: { disable: false, filesystem_tools: true, skill_tool_name: skill }
# eino_middleware: 可选 patch_tool_calls、tool_search、plantask、reduction、checkpoint_dir # eino_middleware: plantask_enable、checkpoint_dir、deep_model_retry_max_retries、deep_output_key
``` ```
### 工具模版示例(`tools/nmap.yaml` ### 工具模版示例(`tools/nmap.yaml`
+7 -7
View File
@@ -10,7 +10,7 @@
# ============================================ # ============================================
# 前端显示的版本号(可选,不填则显示默认版本) # 前端显示的版本号(可选,不填则显示默认版本)
version: "v1.6.34" version: "v1.6.35"
# 服务器配置 # 服务器配置
server: server:
host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口 host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口
@@ -129,8 +129,8 @@ multi_agent:
tool_search_min_tools: 20 # 达到该数量才启用 tool_search(避免工具很少时多此一举);与 always_visible 配合使用 tool_search_min_tools: 20 # 达到该数量才启用 tool_search(避免工具很少时多此一举);与 always_visible 配合使用
tool_search_always_visible: 12 # 始终直接暴露给模型的工具个数(顺序与角色工具列表一致);其余工具进入动态池,需 tool_search 解锁 tool_search_always_visible: 12 # 始终直接暴露给模型的工具个数(顺序与角色工具列表一致);其余工具进入动态池,需 tool_search 解锁
tool_search_always_visible_tools: [read_file, glob, grep, analyze_image, write_file, edit_file, execute, task, transfer_to_agent, exit, write_todos, skill, tool_search, TaskCreate, TaskGet, TaskUpdate, TaskList, record_vulnerability, list_vulnerabilities, get_vulnerability, list_knowledge_risk_types, search_knowledge_base, webshell_exec, webshell_file_list, webshell_file_read, webshell_file_write, manage_webshell_list, manage_webshell_add, manage_webshell_update, manage_webshell_delete, manage_webshell_test, batch_task_list, batch_task_get, batch_task_start, batch_task_rerun, batch_task_pause, batch_task_update_metadata, batch_task_update_schedule, batch_task_schedule_enabled, batch_task_update_task, batch_task_remove_task, batch_task_delete, batch_task_create, batch_task_add_task, http-framework-test] # 后端内置常驻工具白名单(优先于 always_visible 数量策略) tool_search_always_visible_tools: [read_file, glob, grep, analyze_image, write_file, edit_file, execute, task, transfer_to_agent, exit, write_todos, skill, tool_search, TaskCreate, TaskGet, TaskUpdate, TaskList, record_vulnerability, list_vulnerabilities, get_vulnerability, list_knowledge_risk_types, search_knowledge_base, webshell_exec, webshell_file_list, webshell_file_read, webshell_file_write, manage_webshell_list, manage_webshell_add, manage_webshell_update, manage_webshell_delete, manage_webshell_test, batch_task_list, batch_task_get, batch_task_start, batch_task_rerun, batch_task_pause, batch_task_update_metadata, batch_task_update_schedule, batch_task_schedule_enabled, batch_task_update_task, batch_task_remove_task, batch_task_delete, batch_task_create, batch_task_add_task, http-framework-test] # 后端内置常驻工具白名单(优先于 always_visible 数量策略)
plantask_enable: false # true:主代理(Deep / Supervisor 主)挂载 TaskCreate/Get/Update/List;需 eino_skills 可用且 skills_dir 存在,否则仅打日志并跳过 plantask_enable: true # P0:主代理挂载 TaskCreate/Get/Update/List 结构化任务板;需 eino_skills 可用且 skills_dir 存在
plantask_rel_dir: .eino/plantask # 结构化任务文件相对 skills_dir 的子目录,其下再按会话 ID 分子目录存放 plantask_rel_dir: .eino/plantask # 任务文件相对 skills_dir,按会话分子目录:skills/.eino/plantask/<conversationId>/
reduction_enable: true # true:大工具输出截断/落盘以控上下文;依赖与 plantask 相同的 eino local 写盘后端,无后端时不挂载 reduction_enable: true # true:大工具输出截断/落盘以控上下文;依赖与 plantask 相同的 eino local 写盘后端,无后端时不挂载
reduction_max_length_for_trunc: 50000 # 单条工具结果超过该字符数(bytes)时截断并落盘(由 reduction 中间件处理) reduction_max_length_for_trunc: 50000 # 单条工具结果超过该字符数(bytes)时截断并落盘(由 reduction 中间件处理)
reduction_max_tokens_for_clear: 160000 # 历史工具结果清理阈值(tokens),超阈值时在模型调用前清理旧结果 reduction_max_tokens_for_clear: 160000 # 历史工具结果清理阈值(tokens),超阈值时在模型调用前清理旧结果
@@ -143,11 +143,11 @@ multi_agent:
plan_execute_executed_steps_budget_ratio: 0.2 # plan_execute 中 executed_steps 预算比例 plan_execute_executed_steps_budget_ratio: 0.2 # plan_execute 中 executed_steps 预算比例
plan_execute_max_step_result_runes: 4000 # plan_execute 每步结果最大字符数(超出截断) plan_execute_max_step_result_runes: 4000 # plan_execute 每步结果最大字符数(超出截断)
plan_execute_keep_last_steps: 8 # plan_execute 仅保留最近 N 步正文,早期步骤折叠为标题 plan_execute_keep_last_steps: 8 # plan_execute 仅保留最近 N 步正文,早期步骤折叠为标题
checkpoint_dir: "" # 非空:为 adk.NewRunner 启用按会话子目录的文件型 CheckPointStore,便于中断恢复持久化;Resume 的 HTTP/前端流程需另行对接 checkpoint_dir: data/eino-checkpoints # P0:进程崩溃/OOM 后同会话自动 ADK Resume;正常结束会删 .ckpt;与「中断并继续」(last_react_*) 是两套机制
run_retry_max_attempts: 0 # >0429/5xx/网络抖动时 ADK 运行循环指数退避续跑次数0=默认 10 run_retry_max_attempts: 0 # 429/5xx/网络抖动时整轮 Run 指数退避续跑;0=默认 10(与 deep_model_retry 互补,建议保持默认)
run_retry_max_backoff_sec: 0 # 单次退避上限秒数;0=默认 30 run_retry_max_backoff_sec: 0 # 单次退避上限秒数;0=默认 30
deep_output_key: "" # 非空:将最终助手输出写入 adk session 的键名(DeepSupervisor 主代理);空表示不写入 deep_output_key: final_answer # P0Eino session 写入最终助手结论(框架内部;Deep/Supervisor 主/eino_single
deep_model_retry_max_retries: 0 # >0ChatModel 调用失败时框架级最大重试次数(Deep 与 Supervisor 主);0:不重试 deep_model_retry_max_retries: 3 # P0单次 ChatModel API 失败时框架自动重试(超时/502 等);子代理模型不受此项影响
task_tool_description_prefix: "" # 非空:仅 Deep 的 task 工具使用自定义描述前缀,运行时会拼接子代理名称;空则走 Eino 默认生成逻辑 task_tool_description_prefix: "" # 非空:仅 Deep 的 task 工具使用自定义描述前缀,运行时会拼接子代理名称;空则走 Eino 默认生成逻辑
# Eino callbacks + OpenTelemetry:框架级 span(与 Zap 对齐);默认不向终端用户 UI 推 eino_trace_*(见 sse_trace_to_client # Eino callbacks + OpenTelemetry:框架级 span(与 Zap 对齐);默认不向终端用户 UI 推 eino_trace_*(见 sse_trace_to_client
eino_callbacks: eino_callbacks:
+8
View File
@@ -315,6 +315,14 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
skillsDir := skillpackage.SkillsRootFromConfig(cfg.SkillsDir, configPath) skillsDir := skillpackage.SkillsRootFromConfig(cfg.SkillsDir, configPath)
log.Logger.Info("Skills 目录(Eino ADK skill 中间件 + Web 管理 API", zap.String("skillsDir", skillsDir)) log.Logger.Info("Skills 目录(Eino ADK skill 中间件 + Web 管理 API", zap.String("skillsDir", skillsDir))
configDir := filepath.Dir(configPath) configDir := filepath.Dir(configPath)
plantaskRel := strings.TrimSpace(cfg.MultiAgent.EinoMiddleware.PlantaskRelDir)
if plantaskRel == "" {
plantaskRel = ".eino/plantask"
}
plantaskBase := filepath.Join(skillsDir, plantaskRel)
// Match eino_adk_run_loop: checkpoint_dir is used as configured (relative to process CWD when not absolute).
checkpointBase := strings.TrimSpace(cfg.MultiAgent.EinoMiddleware.CheckpointDir)
db.SetEinoConversationDirs(plantaskBase, checkpointBase)
agent.SetPromptBaseDir(configDir) agent.SetPromptBaseDir(configDir)
agentsDir := cfg.AgentsDir agentsDir := cfg.AgentsDir
+19 -2
View File
@@ -47,6 +47,24 @@ func (l *oneConnListener) Accept() (net.Conn, error) {
func (l *oneConnListener) Close() error { return nil } func (l *oneConnListener) Close() error { return nil }
func (l *oneConnListener) Addr() net.Addr { return l.addr } func (l *oneConnListener) Addr() net.Addr { return l.addr }
// httpServerForTLSConn 从已有 Server 复制可服务字段,用于已握手 TLS 连接上的 HTTP 服务。
// 不能复制整个 http.Server(内含 atomic/noCopy 字段)。
func httpServerForTLSConn(src *http.Server) *http.Server {
return &http.Server{
Handler: src.Handler,
DisableGeneralOptionsHandler: src.DisableGeneralOptionsHandler,
ReadTimeout: src.ReadTimeout,
ReadHeaderTimeout: src.ReadHeaderTimeout,
WriteTimeout: src.WriteTimeout,
IdleTimeout: src.IdleTimeout,
MaxHeaderBytes: src.MaxHeaderBytes,
ConnState: src.ConnState,
ErrorLog: src.ErrorLog,
BaseContext: src.BaseContext,
ConnContext: src.ConnContext,
}
}
func isTLSHandshakeRecord(b byte) bool { func isTLSHandshakeRecord(b byte) bool {
return b == 0x16 return b == 0x16
} }
@@ -172,8 +190,7 @@ func (m *mainServerMux) serveHTTPS(pc *peekedConn, localAddr net.Addr) {
} }
} }
plain := *srv plain := httpServerForTLSConn(srv)
plain.TLSConfig = nil
ocl := &oneConnListener{conn: tlsConn, addr: localAddr} ocl := &oneConnListener{conn: tlsConn, addr: localAddr}
if err := plain.Serve(ocl); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) { if err := plain.Serve(ocl); err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, http.ErrServerClosed) {
m.logger.Debug("HTTPS 连接处理结束", zap.Error(err)) m.logger.Debug("HTTPS 连接处理结束", zap.Error(err))
+18 -5
View File
@@ -160,6 +160,18 @@ func (b *PayloadBuilder) BuildBeacon(in PayloadBuilderInput) (*BuildResult, erro
} }
f.Close() f.Close()
// 平台相关辅助源文件(如无窗口子进程)
for _, name := range []string{"proc_hide_windows.go", "proc_hide_unix.go"} {
helperSrc := filepath.Join(b.tmplDir, name+".tmpl")
helperData, readErr := os.ReadFile(helperSrc)
if readErr != nil {
return nil, fmt.Errorf("read helper %s: %w", name, readErr)
}
if writeErr := os.WriteFile(filepath.Join(workDir, name), helperData, 0644); writeErr != nil {
return nil, fmt.Errorf("write helper %s: %w", name, writeErr)
}
}
// 交叉编译 // 交叉编译
binName := strings.TrimSpace(in.OutputName) binName := strings.TrimSpace(in.OutputName)
if binName == "" { if binName == "" {
@@ -174,15 +186,16 @@ func (b *PayloadBuilder) BuildBeacon(in PayloadBuilderInput) (*BuildResult, erro
return nil, fmt.Errorf("mkdir output: %w", err) return nil, fmt.Errorf("mkdir output: %w", err)
} }
absSrcPath, err := filepath.Abs(srcPath)
if err != nil {
return nil, fmt.Errorf("abs source path: %w", err)
}
absBinPath, err := filepath.Abs(binPath) absBinPath, err := filepath.Abs(binPath)
if err != nil { if err != nil {
return nil, fmt.Errorf("abs output path: %w", err) return nil, fmt.Errorf("abs output path: %w", err)
} }
cmd := exec.Command("go", "build", "-ldflags", "-s -w -buildid=", "-trimpath", "-o", absBinPath, absSrcPath) ldflags := "-s -w -buildid="
if goos == "windows" {
// 无控制台窗口运行 beacon 本体
ldflags += " -H windowsgui"
}
cmd := exec.Command("go", "build", "-ldflags", ldflags, "-trimpath", "-o", absBinPath, ".")
cmd.Env = append(os.Environ(), cmd.Env = append(os.Environ(),
"GOOS="+goos, "GOOS="+goos,
"GOARCH="+goarch, "GOARCH="+goarch,
+3 -1
View File
@@ -729,6 +729,7 @@ func runWithTimeout(cmdStr string, timeoutSec int) (string, error) {
timeoutSec = 60 timeoutSec = 60
} }
cmd := exec.Command(shellByOS(), shellFlag(), cmdStr) cmd := exec.Command(shellByOS(), shellFlag(), cmdStr)
prepareHiddenCmd(cmd)
cwdMu.Lock() cwdMu.Lock()
cmd.Dir = currentCwd cmd.Dir = currentCwd
cwdMu.Unlock() cwdMu.Unlock()
@@ -959,7 +960,7 @@ func taskScreenshot() (string, string, string, string) {
b64Out, err = runWithTimeout("import -window root /tmp/.cs_ss.png 2>/dev/null && base64 /tmp/.cs_ss.png && rm -f /tmp/.cs_ss.png", 30) b64Out, err = runWithTimeout("import -window root /tmp/.cs_ss.png 2>/dev/null && base64 /tmp/.cs_ss.png && rm -f /tmp/.cs_ss.png", 30)
case "windows": case "windows":
ps := `Add-Type -AssemblyName System.Windows.Forms; Add-Type -AssemblyName System.Drawing; $b=New-Object System.Drawing.Bitmap([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Width,[System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Height); $g=[System.Drawing.Graphics]::FromImage($b); $g.CopyFromScreen([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Location,[System.Drawing.Point]::Empty,$b.Size); $m=New-Object IO.MemoryStream; $b.Save($m,[System.Drawing.Imaging.ImageFormat]::Png); [Convert]::ToBase64String($m.ToArray())` ps := `Add-Type -AssemblyName System.Windows.Forms; Add-Type -AssemblyName System.Drawing; $b=New-Object System.Drawing.Bitmap([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Width,[System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Height); $g=[System.Drawing.Graphics]::FromImage($b); $g.CopyFromScreen([System.Windows.Forms.Screen]::PrimaryScreen.Bounds.Location,[System.Drawing.Point]::Empty,$b.Size); $m=New-Object IO.MemoryStream; $b.Save($m,[System.Drawing.Imaging.ImageFormat]::Png); [Convert]::ToBase64String($m.ToArray())`
b64Out, err = runWithTimeout(fmt.Sprintf("powershell -NoProfile -NonInteractive -Command \"%s\"", ps), 30) b64Out, err = runWithTimeout(fmt.Sprintf("powershell -NoProfile -NonInteractive -WindowStyle Hidden -Command \"%s\"", ps), 30)
default: default:
return "", "", "", "screenshot not supported on " + runtime.GOOS return "", "", "", "screenshot not supported on " + runtime.GOOS
} }
@@ -1200,6 +1201,7 @@ func taskLoadAssembly(payload map[string]interface{}) (string, string, string, s
cmdArgs = strings.Fields(args) cmdArgs = strings.Fields(args)
} }
cmd := exec.Command(tmpFile, cmdArgs...) cmd := exec.Command(tmpFile, cmdArgs...)
prepareHiddenCmd(cmd)
cwdMu.Lock() cwdMu.Lock()
cmd.Dir = currentCwd cmd.Dir = currentCwd
cwdMu.Unlock() cwdMu.Unlock()
@@ -0,0 +1,9 @@
//go:build !windows
package main
import "os/exec"
func prepareHiddenCmd(cmd *exec.Cmd) {
_ = cmd
}
@@ -0,0 +1,18 @@
//go:build windows
package main
import (
"os/exec"
"syscall"
)
// prepareHiddenCmd 避免子进程弹出控制台窗口(cmd / powershell / 临时 exe 等)。
func prepareHiddenCmd(cmd *exec.Cmd) {
if cmd == nil {
return
}
// 仅用 HideWindow:等价于 CREATE_NO_WINDOW,且 macOS/Linux 交叉编译 Windows 时
// syscall.CREATE_NO_WINDOW 常量不可用。
cmd.SysProcAttr = &syscall.SysProcAttr{HideWindow: true}
}
+1 -1
View File
@@ -239,7 +239,7 @@ func (db *DB) CountBatchQueues(status, keyword string) (int, error) {
// GetBatchTasks 获取批量任务队列的所有任务 // GetBatchTasks 获取批量任务队列的所有任务
func (db *DB) GetBatchTasks(queueID string) ([]*BatchTaskRow, error) { func (db *DB) GetBatchTasks(queueID string) ([]*BatchTaskRow, error) {
rows, err := db.Query( rows, err := db.Query(
"SELECT id, queue_id, message, conversation_id, status, started_at, completed_at, error, result FROM batch_tasks WHERE queue_id = ? ORDER BY id", "SELECT id, queue_id, message, conversation_id, status, started_at, completed_at, error, result FROM batch_tasks WHERE queue_id = ? ORDER BY rowid ASC",
queueID, queueID,
) )
if err != nil { if err != nil {
+56 -12
View File
@@ -543,18 +543,28 @@ func (db *DB) UpdateConversationTime(id string) error {
return nil return nil
} }
// DeleteConversation 删除对话及其所有相关数据 // DeleteConversation 删除对话及其会话相关数据
// 由于数据库外键约束设置了 ON DELETE CASCADE,删除对话时会自动删除: // 由于数据库外键约束设置了 ON DELETE CASCADE,删除对话时会自动删除:
// - messages(消息) // - messages(消息)
// - process_details(过程详情) // - process_details(过程详情)
// - attack_chain_nodes(攻击链节点) // - attack_chain_nodes(攻击链节点)
// - attack_chain_edges(攻击链边) // - attack_chain_edges(攻击链边)
// - vulnerabilities(漏洞)
// - conversation_group_mappings(分组映射) // - conversation_group_mappings(分组映射)
// 注意:knowledge_retrieval_logs 使用 ON DELETE SET NULL,记录会保留但 conversation_id 会被设为 NULL // 漏洞记录会保留:vulnerabilities.conversation_id 使用 ON DELETE SET NULL,仅解除与会话的关联。
// 注意:knowledge_retrieval_logs 在删除前会被显式清理。
func (db *DB) DeleteConversation(id string) error { func (db *DB) DeleteConversation(id string) error {
// 删除对话前补全漏洞来源标签,便于在漏洞库中追溯已删除会话的发现。
_, err := db.Exec(`
UPDATE vulnerabilities
SET conversation_tag = COALESCE(NULLIF(TRIM(conversation_tag), ''), (SELECT title FROM conversations WHERE id = ?))
WHERE conversation_id = ?
`, id, id)
if err != nil {
db.logger.Warn("更新漏洞来源标签失败", zap.String("conversationId", id), zap.Error(err))
}
// 显式删除知识检索日志(虽然外键是SET NULL,但为了彻底清理,我们手动删除) // 显式删除知识检索日志(虽然外键是SET NULL,但为了彻底清理,我们手动删除)
_, err := db.Exec("DELETE FROM knowledge_retrieval_logs WHERE conversation_id = ?", id) _, err = db.Exec("DELETE FROM knowledge_retrieval_logs WHERE conversation_id = ?", id)
if err != nil { if err != nil {
db.logger.Warn("删除知识检索日志失败", zap.String("conversationId", id), zap.Error(err)) db.logger.Warn("删除知识检索日志失败", zap.String("conversationId", id), zap.Error(err))
// 不返回错误,继续删除对话 // 不返回错误,继续删除对话
@@ -565,17 +575,51 @@ func (db *DB) DeleteConversation(id string) error {
if err != nil { if err != nil {
return fmt.Errorf("删除对话失败: %w", err) return fmt.Errorf("删除对话失败: %w", err)
} }
// Best-effort cleanup for conversation-scoped filesystem artifacts db.removeConversationScopedDirs(id)
// (e.g., summarization transcript, reduction/checkpoint files under conversation_artifacts/<id>).
if base := strings.TrimSpace(db.conversationArtifactsDir); base != "" { db.logger.Info("对话已删除(漏洞记录已保留)", zap.String("conversationId", id))
artDir := filepath.Join(base, id) return nil
if rmErr := os.RemoveAll(artDir); rmErr != nil { }
db.logger.Warn("删除会话 artifacts 目录失败", zap.String("conversationId", id), zap.String("dir", artDir), zap.Error(rmErr))
func sanitizeConversationPathSegment(s string) string {
s = strings.TrimSpace(s)
if s == "" {
return "default"
}
s = strings.ReplaceAll(s, string(filepath.Separator), "-")
s = strings.ReplaceAll(s, "/", "-")
s = strings.ReplaceAll(s, "\\", "-")
s = strings.ReplaceAll(s, "..", "__")
if len(s) > 180 {
s = s[:180]
}
return s
}
func (db *DB) removeConversationScopedDir(base, conversationID, label string) {
base = strings.TrimSpace(base)
if base == "" {
return
}
dir := filepath.Join(base, sanitizeConversationPathSegment(conversationID))
if rmErr := os.RemoveAll(dir); rmErr != nil {
if db.logger != nil {
db.logger.Warn("删除会话目录失败",
zap.String("conversationId", conversationID),
zap.String("kind", label),
zap.String("dir", dir),
zap.Error(rmErr))
}
} }
} }
db.logger.Info("对话及其所有相关数据已删除", zap.String("conversationId", id)) func (db *DB) removeConversationScopedDirs(conversationID string) {
return nil // summarization transcript, reduction files, etc.
db.removeConversationScopedDir(db.conversationArtifactsDir, conversationID, "conversation_artifacts")
// Eino plantask JSON boards (skills_dir/.eino/plantask/<id>/).
db.removeConversationScopedDir(db.einoPlantaskBaseDir, conversationID, "plantask")
// Eino ADK runner checkpoints (checkpoint_dir/<id>/).
db.removeConversationScopedDir(db.einoCheckpointBaseDir, conversationID, "eino_checkpoint")
} }
// SaveAgentTrace 保存最后一轮代理消息轨迹与助手输出摘要。 // SaveAgentTrace 保存最后一轮代理消息轨迹与助手输出摘要。
@@ -0,0 +1,57 @@
package database
import (
"os"
"path/filepath"
"testing"
"go.uber.org/zap"
)
func TestDeleteConversationRemovesEinoScopedDirs(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "conversations.db")
db, err := NewDB(dbPath, zap.NewNop())
if err != nil {
t.Fatalf("NewDB: %v", err)
}
defer db.Close()
plantaskBase := filepath.Join(tmp, "skills", ".eino", "plantask")
checkpointBase := filepath.Join(tmp, "eino-checkpoints")
db.SetEinoConversationDirs(plantaskBase, checkpointBase)
conv, err := db.CreateConversation("cleanup test", ConversationCreateMeta{})
if err != nil {
t.Fatalf("CreateConversation: %v", err)
}
convID := conv.ID
seg := sanitizeConversationPathSegment(convID)
for _, base := range []struct {
root string
file string
}{
{db.conversationArtifactsDir, "transcript.txt"},
{plantaskBase, "task-1.json"},
{checkpointBase, "runner-deep.ckpt"},
} {
dir := filepath.Join(base.root, seg)
if err := os.MkdirAll(dir, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", dir, err)
}
if err := os.WriteFile(filepath.Join(dir, base.file), []byte("x"), 0o644); err != nil {
t.Fatalf("write %s: %v", base.file, err)
}
}
if err := db.DeleteConversation(convID); err != nil {
t.Fatalf("DeleteConversation: %v", err)
}
for _, base := range []string{db.conversationArtifactsDir, plantaskBase, checkpointBase} {
dir := filepath.Join(base, seg)
if _, statErr := os.Stat(dir); !os.IsNotExist(statErr) {
t.Fatalf("expected removed dir %s, stat err=%v", dir, statErr)
}
}
}
@@ -0,0 +1,69 @@
package database
import (
"path/filepath"
"testing"
"go.uber.org/zap"
)
func TestDeleteConversationPreservesVulnerabilities(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "vuln-preserve.db")
db, err := NewDB(dbPath, zap.NewNop())
if err != nil {
t.Fatalf("NewDB: %v", err)
}
defer db.Close()
conv, err := db.CreateConversation("vuln source chat", ConversationCreateMeta{})
if err != nil {
t.Fatalf("CreateConversation: %v", err)
}
vuln, err := db.CreateVulnerability(&Vulnerability{
ConversationID: conv.ID,
Title: "SQL Injection",
Severity: "high",
Status: "open",
})
if err != nil {
t.Fatalf("CreateVulnerability: %v", err)
}
if err := db.DeleteConversation(conv.ID); err != nil {
t.Fatalf("DeleteConversation: %v", err)
}
got, err := db.GetVulnerability(vuln.ID)
if err != nil {
t.Fatalf("GetVulnerability after delete: %v", err)
}
if got.Title != "SQL Injection" {
t.Fatalf("title = %q, want SQL Injection", got.Title)
}
if got.ConversationID != "" {
t.Fatalf("conversation_id = %q, want empty after conversation delete", got.ConversationID)
}
if got.ConversationTag != "vuln source chat" {
t.Fatalf("conversation_tag = %q, want vuln source chat", got.ConversationTag)
}
}
func TestMigrateVulnerabilitiesConversationFK(t *testing.T) {
tmp := t.TempDir()
dbPath := filepath.Join(tmp, "vuln-fk-migrate.db")
db, err := NewDB(dbPath, zap.NewNop())
if err != nil {
t.Fatalf("NewDB: %v", err)
}
defer db.Close()
ok, err := vulnerabilitiesConversationFKOnDeleteSetNull(db.DB)
if err != nil {
t.Fatalf("vulnerabilitiesConversationFKOnDeleteSetNull: %v", err)
}
if !ok {
t.Fatal("expected vulnerabilities.conversation_id FK to use ON DELETE SET NULL")
}
}
+128 -2
View File
@@ -49,6 +49,8 @@ type DB struct {
*sql.DB *sql.DB
logger *zap.Logger logger *zap.Logger
conversationArtifactsDir string conversationArtifactsDir string
einoPlantaskBaseDir string // skills_dir + plantask_rel_dir (per-conversation subdirs)
einoCheckpointBaseDir string // checkpoint_dir root (per-conversation subdirs)
checkpointLoopName string checkpointLoopName string
checkpointStop chan struct{} checkpointStop chan struct{}
checkpointDone chan struct{} checkpointDone chan struct{}
@@ -155,6 +157,16 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) {
return database, nil return database, nil
} }
// SetEinoConversationDirs configures best-effort filesystem cleanup on DeleteConversation.
// plantaskBase is skills_root/plantask_rel (no conversation id); checkpointBase is checkpoint_dir root.
func (db *DB) SetEinoConversationDirs(plantaskBase, checkpointBase string) {
if db == nil {
return
}
db.einoPlantaskBaseDir = strings.TrimSpace(plantaskBase)
db.einoCheckpointBaseDir = strings.TrimSpace(checkpointBase)
}
// initTables 初始化数据库表 // initTables 初始化数据库表
func (db *DB) initTables() error { func (db *DB) initTables() error {
// 创建对话表(last_react_input / last_react_output 存「代理消息轨迹」JSON 与助手摘要,列名保留以兼容已有库) // 创建对话表(last_react_input / last_react_output 存「代理消息轨迹」JSON 与助手摘要,列名保留以兼容已有库)
@@ -345,7 +357,7 @@ func (db *DB) initTables() error {
createVulnerabilitiesTable := ` createVulnerabilitiesTable := `
CREATE TABLE IF NOT EXISTS vulnerabilities ( CREATE TABLE IF NOT EXISTS vulnerabilities (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
conversation_id TEXT NOT NULL, conversation_id TEXT,
conversation_tag TEXT, conversation_tag TEXT,
task_tag TEXT, task_tag TEXT,
title TEXT NOT NULL, title TEXT NOT NULL,
@@ -359,7 +371,8 @@ func (db *DB) initTables() error {
recommendation TEXT, recommendation TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE CASCADE project_id TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE SET NULL
);` );`
// 创建批量任务队列表 // 创建批量任务队列表
@@ -725,6 +738,9 @@ func (db *DB) initTables() error {
db.logger.Warn("迁移vulnerabilities表失败", zap.Error(err)) db.logger.Warn("迁移vulnerabilities表失败", zap.Error(err))
// 不返回错误,允许继续运行 // 不返回错误,允许继续运行
} }
if err := db.migrateVulnerabilitiesConversationFK(); err != nil {
db.logger.Warn("迁移vulnerabilities会话外键失败", zap.Error(err))
}
if err := db.migrateProjectsTable(); err != nil { if err := db.migrateProjectsTable(); err != nil {
db.logger.Warn("迁移projects相关表失败", zap.Error(err)) db.logger.Warn("迁移projects相关表失败", zap.Error(err))
@@ -1134,6 +1150,116 @@ func (db *DB) dropProjectFactVersionsTable() error {
return err return err
} }
// migrateVulnerabilitiesConversationFK 将 vulnerabilities.conversation_id 外键改为 ON DELETE SET NULL,删除对话时保留漏洞记录。
func (db *DB) migrateVulnerabilitiesConversationFK() error {
ok, err := vulnerabilitiesConversationFKOnDeleteSetNull(db.DB)
if err != nil {
return err
}
if ok {
return nil
}
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开启事务失败: %w", err)
}
defer func() { _ = tx.Rollback() }()
const createNew = `
CREATE TABLE vulnerabilities_new (
id TEXT PRIMARY KEY,
conversation_id TEXT,
conversation_tag TEXT,
task_tag TEXT,
title TEXT NOT NULL,
description TEXT,
severity TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'open',
vulnerability_type TEXT,
target TEXT,
proof TEXT,
impact TEXT,
recommendation TEXT,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
project_id TEXT,
FOREIGN KEY (conversation_id) REFERENCES conversations(id) ON DELETE SET NULL
);`
if _, err := tx.Exec(createNew); err != nil {
return fmt.Errorf("创建 vulnerabilities_new 失败: %w", err)
}
const copyRows = `
INSERT INTO vulnerabilities_new (
id, conversation_id, conversation_tag, task_tag, title, description,
severity, status, vulnerability_type, target, proof, impact, recommendation,
created_at, updated_at, project_id
)
SELECT
id, conversation_id, conversation_tag, task_tag, title, description,
severity, status, vulnerability_type, target, proof, impact, recommendation,
created_at, updated_at, project_id
FROM vulnerabilities;`
if _, err := tx.Exec(copyRows); err != nil {
return fmt.Errorf("复制 vulnerabilities 数据失败: %w", err)
}
if _, err := tx.Exec(`DROP TABLE vulnerabilities`); err != nil {
return fmt.Errorf("删除旧 vulnerabilities 表失败: %w", err)
}
if _, err := tx.Exec(`ALTER TABLE vulnerabilities_new RENAME TO vulnerabilities`); err != nil {
return fmt.Errorf("重命名 vulnerabilities 表失败: %w", err)
}
indexes := []string{
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_conversation_id ON vulnerabilities(conversation_id)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_conversation_tag ON vulnerabilities(conversation_tag)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_task_tag ON vulnerabilities(task_tag)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_severity ON vulnerabilities(severity)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_status ON vulnerabilities(status)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_created_at ON vulnerabilities(created_at)`,
`CREATE INDEX IF NOT EXISTS idx_vulnerabilities_project_id ON vulnerabilities(project_id)`,
}
for _, stmt := range indexes {
if _, err := tx.Exec(stmt); err != nil {
return fmt.Errorf("重建 vulnerabilities 索引失败: %w", err)
}
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交 vulnerabilities 外键迁移失败: %w", err)
}
db.logger.Info("vulnerabilities 表已迁移:删除对话时保留漏洞记录")
return nil
}
func vulnerabilitiesConversationFKOnDeleteSetNull(db *sql.DB) (bool, error) {
rows, err := db.Query(`PRAGMA foreign_key_list(vulnerabilities)`)
if err != nil {
return false, err
}
defer rows.Close()
found := false
for rows.Next() {
var id, seq int
var table, from, to, onUpdate, onDelete, match string
if err := rows.Scan(&id, &seq, &table, &from, &to, &onUpdate, &onDelete, &match); err != nil {
return false, err
}
if from == "conversation_id" {
found = true
if !strings.EqualFold(onDelete, "SET NULL") {
return false, nil
}
}
}
if err := rows.Err(); err != nil {
return false, err
}
return found, nil
}
// migrateVulnerabilitiesTable 迁移 vulnerabilities 表,补充标签字段 // migrateVulnerabilitiesTable 迁移 vulnerabilities 表,补充标签字段
func (db *DB) migrateVulnerabilitiesTable() error { func (db *DB) migrateVulnerabilitiesTable() error {
columns := []struct { columns := []struct {
+4 -4
View File
@@ -138,7 +138,7 @@ func (db *DB) CreateVulnerability(vuln *Vulnerability) (*Vulnerability, error) {
_, err := db.Exec( _, err := db.Exec(
query, query,
vuln.ID, vuln.ConversationID, nullIfEmpty(vuln.ProjectID), vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description, vuln.ID, nullIfEmpty(vuln.ConversationID), nullIfEmpty(vuln.ProjectID), vuln.ConversationTag, vuln.TaskTag, vuln.Title, vuln.Description,
vuln.Severity, vuln.Status, vuln.Type, vuln.Target, vuln.Severity, vuln.Status, vuln.Type, vuln.Target,
vuln.Proof, vuln.Impact, vuln.Recommendation, vuln.Proof, vuln.Impact, vuln.Recommendation,
vuln.CreatedAt, vuln.UpdatedAt, vuln.CreatedAt, vuln.UpdatedAt,
@@ -154,7 +154,7 @@ func (db *DB) CreateVulnerability(vuln *Vulnerability) (*Vulnerability, error) {
func (db *DB) GetVulnerability(id string) (*Vulnerability, error) { func (db *DB) GetVulnerability(id string) (*Vulnerability, error) {
var vuln Vulnerability var vuln Vulnerability
query := ` query := `
SELECT id, conversation_id, COALESCE(project_id,''), title, description, severity, status, SELECT id, COALESCE(conversation_id,''), COALESCE(project_id,''), title, description, severity, status,
conversation_tag, task_tag, vulnerability_type, target, proof, impact, recommendation, conversation_tag, task_tag, vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id, COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id, COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
@@ -183,7 +183,7 @@ func (db *DB) GetVulnerability(id string) (*Vulnerability, error) {
// ListVulnerabilities 列出漏洞 // ListVulnerabilities 列出漏洞
func (db *DB) ListVulnerabilities(limit, offset int, filter VulnerabilityListFilter) ([]*Vulnerability, error) { func (db *DB) ListVulnerabilities(limit, offset int, filter VulnerabilityListFilter) ([]*Vulnerability, error) {
query := ` query := `
SELECT id, conversation_id, COALESCE(project_id,''), title, description, severity, status, conversation_tag, task_tag, SELECT id, COALESCE(conversation_id,''), COALESCE(project_id,''), title, description, severity, status, conversation_tag, task_tag,
vulnerability_type, target, proof, impact, recommendation, vulnerability_type, target, proof, impact, recommendation,
COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id, COALESCE((SELECT bt.id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_id,
COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id, COALESCE((SELECT bt.queue_id FROM batch_tasks bt WHERE bt.conversation_id = vulnerabilities.conversation_id LIMIT 1), '') AS task_queue_id,
@@ -403,7 +403,7 @@ func (db *DB) GetVulnerabilityFilterOptions() (map[string][]string, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("查询漏洞ID建议失败: %w", err) return nil, fmt.Errorf("查询漏洞ID建议失败: %w", err)
} }
conversationIDs, err := collect(`SELECT DISTINCT conversation_id FROM vulnerabilities WHERE conversation_id <> '' ORDER BY created_at DESC LIMIT 500`) conversationIDs, err := collect(`SELECT DISTINCT conversation_id FROM vulnerabilities WHERE conversation_id IS NOT NULL AND conversation_id <> '' ORDER BY created_at DESC LIMIT 500`)
if err != nil { if err != nil {
return nil, fmt.Errorf("查询会话ID建议失败: %w", err) return nil, fmt.Errorf("查询会话ID建议失败: %w", err)
} }
+26
View File
@@ -637,13 +637,26 @@ func (h *AgentHandler) runRobotEinoSingleWithRetry(
var resultMA *multiagent.RunResult var resultMA *multiagent.RunResult
var errMA error var errMA error
var transientRunAttempts int var transientRunAttempts int
var emptyResponseAttempts int
for { for {
resultMA, errMA = multiagent.RunEinoSingleChatModelAgent( resultMA, errMA = multiagent.RunEinoSingleChatModelAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID), conversationID, curMsg, curHist, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID),
) )
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts,
&curHist, &curMsg, segmentUserMessage, progressCallback, nil,
)
if exhaustedEmpty {
errMA = nil
break
}
if handledEmpty {
continue
}
if errMA == nil { if errMA == nil {
transientRunAttempts = 0 transientRunAttempts = 0
emptyResponseAttempts = 0
break break
} }
if handled, _ := h.handleEinoTransientRetryContinue( if handled, _ := h.handleEinoTransientRetryContinue(
@@ -673,14 +686,27 @@ func (h *AgentHandler) runRobotMultiAgentWithRetry(
var resultMA *multiagent.RunResult var resultMA *multiagent.RunResult
var errMA error var errMA error
var transientRunAttempts int var transientRunAttempts int
var emptyResponseAttempts int
for { for {
resultMA, errMA = multiagent.RunDeepAgent( resultMA, errMA = multiagent.RunDeepAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger, taskCtx, h.config, &h.config.MultiAgent, h.agent, h.logger,
conversationID, curMsg, curHist, roleTools, progressCallback, conversationID, curMsg, curHist, roleTools, progressCallback,
h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID), h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID),
) )
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
taskCtx, conversationID, resultMA, errMA, &emptyResponseAttempts,
&curHist, &curMsg, segmentUserMessage, progressCallback, nil,
)
if exhaustedEmpty {
errMA = nil
break
}
if handledEmpty {
continue
}
if errMA == nil { if errMA == nil {
transientRunAttempts = 0 transientRunAttempts = 0
emptyResponseAttempts = 0
break break
} }
if handled, _ := h.handleEinoTransientRetryContinue( if handled, _ := h.handleEinoTransientRetryContinue(
+58
View File
@@ -9,6 +9,8 @@ import (
"cyberstrike-ai/internal/agent" "cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/multiagent" "cyberstrike-ai/internal/multiagent"
"go.uber.org/zap"
) )
func (h *AgentHandler) einoRunRetryMaxAttempts() int { func (h *AgentHandler) einoRunRetryMaxAttempts() int {
@@ -120,3 +122,59 @@ func (h *AgentHandler) handleEinoTransientRetryContinue(
} }
return true, nil return true, nil
} }
// handleEinoEmptyResponseContinue 在 SSE 任务循环内处理「正常结束但无助手正文」;返回 exhausted=true 时由外层按成功结束(保留占位文案)。
// 与临时错误重试一致:仅恢复轨迹并保留本请求原始 user 文案,不向模型注入续跑说明。
func (h *AgentHandler) handleEinoEmptyResponseContinue(
baseCtx context.Context,
conversationID string,
result *multiagent.RunResult,
runErr error,
emptyResponseAttempts *int,
curHistory *[]agent.ChatMessage,
curFinalMessage *string,
segmentUserMessage string,
progressCallback func(eventType, message string, data interface{}),
sendProgress func(msg string, extra map[string]interface{}),
) (handled bool, exhausted bool) {
if !errors.Is(runErr, multiagent.ErrEmptyResponseContinue) {
return false, false
}
maxAttempts := h.einoRunRetryMaxAttempts()
*emptyResponseAttempts++
if *emptyResponseAttempts > maxAttempts {
if h.logger != nil {
h.logger.Warn("eino empty response auto resume exhausted",
zap.String("conversationId", conversationID),
zap.Int("maxAttempts", maxAttempts))
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(conversationID, result)
}
return false, true
}
attemptNo := *emptyResponseAttempts
if h.logger != nil {
h.logger.Info("eino empty response, auto resume from trace",
zap.String("conversationId", conversationID),
zap.Int("attempt", attemptNo),
zap.Int("maxAttempts", maxAttempts))
}
if progressCallback != nil {
progressCallback("eino_empty_response_continue", fmt.Sprintf("未捕获到助手正文,正在基于轨迹自动续跑(%d/%d)…", attemptNo, maxAttempts), map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"attempt": attemptNo,
"maxAttempts": maxAttempts,
"resumeKind": "trace_segment",
})
}
h.applyEinoTransientRetrySegment(conversationID, result, curHistory, curFinalMessage, segmentUserMessage)
if sendProgress != nil {
sendProgress("已恢复上下文,正在继续推理…", map[string]interface{}{
"conversationId": conversationID,
"source": "empty_response_continue",
})
}
return true, false
}
+56 -4
View File
@@ -178,6 +178,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
var cumulativeMCPExecutionIDs []string var cumulativeMCPExecutionIDs []string
var transientRunAttempts int var transientRunAttempts int
var emptyResponseAttempts int
// 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。
var mainIterationOffset int var mainIterationOffset int
@@ -237,9 +238,32 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs)
} }
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, conversationID, result, runErr, &emptyResponseAttempts,
&curHistory, &curFinalMessage, segmentUserMessage, progressCallback,
func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) },
)
if exhaustedEmpty {
runErr = nil
transientRunAttempts = 0
timeoutCancel()
break
}
if handledEmpty {
mainIterationOffset += segmentMainIterationMax
transientRunAttempts = 0
timeoutCancel()
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
h.tasks.BindTaskCancel(conversationID, cancelWithCause)
taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute)
h.tasks.UpdateTaskStatus(conversationID, "running")
continue
}
if runErr == nil { if runErr == nil {
// 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。
transientRunAttempts = 0 transientRunAttempts = 0
emptyResponseAttempts = 0
timeoutCancel() timeoutCancel()
break break
} }
@@ -418,21 +442,49 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) {
return return
} }
result, runErr := multiagent.RunEinoSingleChatModelAgent( curHist := prep.History
curMsg := prep.FinalMessage
var result *multiagent.RunResult
var runErr error
var transientRunAttempts int
var emptyResponseAttempts int
for {
result, runErr = multiagent.RunEinoSingleChatModelAgent(
taskCtx, taskCtx,
h.config, h.config,
&h.config.MultiAgent, &h.config.MultiAgent,
h.agent, h.agent,
h.logger, h.logger,
prep.ConversationID, prep.ConversationID,
prep.FinalMessage, curMsg,
prep.History, curHist,
prep.RoleTools, prep.RoleTools,
progressCallback, progressCallback,
chatReasoningToClientIntent(req.Reasoning), chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID), h.projectBlackboardBlock(prep.ConversationID),
) )
if runErr != nil { handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
)
if exhaustedEmpty {
runErr = nil
break
}
if handledEmpty {
continue
}
if runErr == nil {
break
}
if handled, fatalErr := h.handleEinoTransientRetryContinue(
baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
); handled {
continue
} else if fatalErr != nil {
runErr = fatalErr
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result) h.persistEinoAgentTraceForResume(prep.ConversationID, result)
} }
+56 -4
View File
@@ -188,6 +188,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
// 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表 // 同一 HTTP 流内多段 Run(如中断并继续)合并 MCP execution id,供最终 response / 库表与工具芯片展示完整列表
var cumulativeMCPExecutionIDs []string var cumulativeMCPExecutionIDs []string
var transientRunAttempts int var transientRunAttempts int
var emptyResponseAttempts int
// 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。 // 同一请求内分段续跑时,主代理 iteration 事件按偏移累计,避免 UI 出现「第3轮 → 第1轮」回跳。
var mainIterationOffset int var mainIterationOffset int
@@ -249,9 +250,32 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs) cumulativeMCPExecutionIDs = mergeMCPExecutionIDLists(cumulativeMCPExecutionIDs, result.MCPExecutionIDs)
} }
handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, conversationID, result, runErr, &emptyResponseAttempts,
&curHistory, &curFinalMessage, segmentUserMessage, progressCallback,
func(msg string, extra map[string]interface{}) { sendEvent("progress", msg, extra) },
)
if exhaustedEmpty {
runErr = nil
transientRunAttempts = 0
timeoutCancel()
break
}
if handledEmpty {
mainIterationOffset += segmentMainIterationMax
transientRunAttempts = 0
timeoutCancel()
baseCtx, cancelWithCause = context.WithCancelCause(context.Background())
h.tasks.BindTaskCancel(conversationID, cancelWithCause)
taskCtx, timeoutCancel = context.WithTimeout(baseCtx, 600*time.Minute)
h.tasks.UpdateTaskStatus(conversationID, "running")
continue
}
if runErr == nil { if runErr == nil {
// 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。 // 任一段成功完成后,重置临时错误重试窗口(次数/退避从头开始)。
transientRunAttempts = 0 transientRunAttempts = 0
emptyResponseAttempts = 0
timeoutCancel() timeoutCancel()
break break
} }
@@ -430,15 +454,22 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
return h.interceptHITLForEinoTool(ctx, cancelWithCause, prep.ConversationID, prep.AssistantMessageID, nil, toolName, arguments) return h.interceptHITLForEinoTool(ctx, cancelWithCause, prep.ConversationID, prep.AssistantMessageID, nil, toolName, arguments)
}) })
result, runErr := multiagent.RunDeepAgent( curHist := prep.History
curMsg := prep.FinalMessage
var result *multiagent.RunResult
var runErr error
var transientRunAttempts int
var emptyResponseAttempts int
for {
result, runErr = multiagent.RunDeepAgent(
taskCtx, taskCtx,
h.config, h.config,
&h.config.MultiAgent, &h.config.MultiAgent,
h.agent, h.agent,
h.logger, h.logger,
prep.ConversationID, prep.ConversationID,
prep.FinalMessage, curMsg,
prep.History, curHist,
prep.RoleTools, prep.RoleTools,
progressCallback, progressCallback,
h.agentsMarkdownDir, h.agentsMarkdownDir,
@@ -446,7 +477,28 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
chatReasoningToClientIntent(req.Reasoning), chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID), h.projectBlackboardBlock(prep.ConversationID),
) )
if runErr != nil { handledEmpty, exhaustedEmpty := h.handleEinoEmptyResponseContinue(
baseCtx, prep.ConversationID, result, runErr, &emptyResponseAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
)
if exhaustedEmpty {
runErr = nil
break
}
if handledEmpty {
continue
}
if runErr == nil {
break
}
if handled, fatalErr := h.handleEinoTransientRetryContinue(
baseCtx, prep.ConversationID, result, runErr, &transientRunAttempts,
&curHist, &curMsg, prep.FinalMessage, progressCallback, nil,
); handled {
continue
} else if fatalErr != nil {
runErr = fatalErr
}
if shouldPersistEinoAgentTraceAfterRunError(baseCtx) { if shouldPersistEinoAgentTraceAfterRunError(baseCtx) {
h.persistEinoAgentTraceForResume(prep.ConversationID, result) h.persistEinoAgentTraceForResume(prep.ConversationID, result)
} }
+1 -1
View File
@@ -1344,7 +1344,7 @@ func (h *OpenAPIHandler) GetOpenAPISpec(c *gin.Context) {
"delete": map[string]interface{}{ "delete": map[string]interface{}{
"tags": []string{"对话管理"}, "tags": []string{"对话管理"},
"summary": "删除对话", "summary": "删除对话",
"description": "删除指定的对话及其所有相关数据(消息、漏洞等)。**此操作不可恢复**。", "description": "删除指定的对话及其会话数据(消息、攻击链等)。**漏洞记录会保留**,仅解除与会话的关联。**此操作不可恢复**。",
"operationId": "deleteConversation", "operationId": "deleteConversation",
"parameters": []map[string]interface{}{ "parameters": []map[string]interface{}{
{ {
+23
View File
@@ -1027,9 +1027,32 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs), orchMode, runAccumulatedMsgs, persistTraceSource(args, runAccumulatedMsgs),
lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false, lastAssistant, lastPlanExecuteExecutor, emptyHint, ids, false,
) )
if shouldEinoEmptyResponseContinue(out, emptyHint, len(runAccumulatedMsgs), baseAccumulatedCount) {
if logger != nil {
logger.Info("eino empty response, ending run segment for handler resume",
zap.String("conversationId", conversationID),
zap.String("orchestration", orchMode),
zap.Int("traceMessages", len(runAccumulatedMsgs)))
}
if progress != nil {
progress("eino_empty_response_continue", "会话已结束但未产生助手正文,正在基于轨迹自动续跑…", map[string]interface{}{
"conversationId": conversationID,
"source": "eino",
"resumeKind": "trace_segment",
})
}
return out, ErrEmptyResponseContinue
}
return out, nil return out, nil
} }
func shouldEinoEmptyResponseContinue(out *RunResult, emptyHint string, accumulatedLen, baseCount int) bool {
if out == nil || accumulatedLen <= baseCount {
return false
}
return strings.TrimSpace(out.Response) == strings.TrimSpace(emptyHint)
}
func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message { func persistTraceSource(args *einoADKRunLoopArgs, fallback []adk.Message) []adk.Message {
if args != nil && args.ModelFacingTrace != nil { if args != nil && args.ModelFacingTrace != nil {
if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 { if snap := args.ModelFacingTrace.Snapshot(); len(snap) > 0 {
@@ -0,0 +1,21 @@
package multiagent
import "testing"
func TestShouldEinoEmptyResponseContinue(t *testing.T) {
t.Parallel()
hint := "(empty hint)"
out := &RunResult{Response: hint}
if !shouldEinoEmptyResponseContinue(out, hint, 3, 1) {
t.Fatal("expected continue when response is empty hint and trace grew")
}
if shouldEinoEmptyResponseContinue(out, hint, 1, 1) {
t.Fatal("expected no continue when trace did not grow")
}
if shouldEinoEmptyResponseContinue(&RunResult{Response: "hello"}, hint, 3, 1) {
t.Fatal("expected no continue when response has content")
}
if shouldEinoEmptyResponseContinue(nil, hint, 3, 1) {
t.Fatal("expected no continue for nil result")
}
}
+1 -17
View File
@@ -43,22 +43,6 @@ func sanitizeEinoPathSegment(s string) string {
return s return s
} }
// localPlantaskBackend wraps the eino-ext local backend with plantask.Delete (Local has no Delete).
type localPlantaskBackend struct {
*localbk.Local
}
func (l *localPlantaskBackend) Delete(ctx context.Context, req *plantask.DeleteRequest) error {
if l == nil || l.Local == nil || req == nil {
return nil
}
p := strings.TrimSpace(req.FilePath)
if p == "" {
return nil
}
return os.Remove(p)
}
func splitToolsForToolSearch(all []tool.BaseTool, alwaysVisible int) (static []tool.BaseTool, dynamic []tool.BaseTool, ok bool) { func splitToolsForToolSearch(all []tool.BaseTool, alwaysVisible int) (static []tool.BaseTool, dynamic []tool.BaseTool, ok bool) {
if alwaysVisible <= 0 || len(all) <= alwaysVisible+1 { if alwaysVisible <= 0 || len(all) <= alwaysVisible+1 {
return all, nil, false return all, nil, false
@@ -238,7 +222,7 @@ func prependEinoMiddlewares(
if mk := os.MkdirAll(baseDir, 0o755); mk != nil { if mk := os.MkdirAll(baseDir, 0o755); mk != nil {
return nil, nil, toolSearchActive, fmt.Errorf("plantask mkdir: %w", mk) return nil, nil, toolSearchActive, fmt.Errorf("plantask mkdir: %w", mk)
} }
ptBE := &localPlantaskBackend{Local: einoLoc} ptBE := newLocalPlantaskBackend(einoLoc)
pt, perr := plantask.New(ctx, &plantask.Config{Backend: ptBE, BaseDir: baseDir}) pt, perr := plantask.New(ctx, &plantask.Config{Backend: ptBE, BaseDir: baseDir})
if perr != nil { if perr != nil {
return nil, nil, toolSearchActive, fmt.Errorf("plantask: %w", perr) return nil, nil, toolSearchActive, fmt.Errorf("plantask: %w", perr)
+26 -2
View File
@@ -146,9 +146,15 @@ func newEinoSummarizationMiddleware(
return summarizeFinalizeWithRecentAssistantToolTrail(ctx, originalMessages, summary, tokenCounter, recentTrailMax) return summarizeFinalizeWithRecentAssistantToolTrail(ctx, originalMessages, summary, tokenCounter, recentTrailMax)
}, },
Callback: func(ctx context.Context, before, after adk.ChatModelAgentState) error { Callback: func(ctx context.Context, before, after adk.ChatModelAgentState) error {
if logger == nil { if transcriptPath != "" && len(before.Messages) > 0 {
return nil if werr := writeSummarizationTranscript(transcriptPath, before.Messages); werr != nil && logger != nil {
logger.Warn("eino summarization transcript 写入失败",
zap.String("path", transcriptPath),
zap.Error(werr),
)
} }
}
if logger != nil {
beforeTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: before.Messages}) beforeTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: before.Messages})
afterTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: after.Messages}) afterTokens, _ := tokenCounter(ctx, &summarization.TokenCounterInput{Messages: after.Messages})
logger.Info("eino summarization 已压缩上下文", logger.Info("eino summarization 已压缩上下文",
@@ -160,6 +166,7 @@ func newEinoSummarizationMiddleware(
zap.Int("trigger_context_tokens", trigger), zap.Int("trigger_context_tokens", trigger),
zap.String("transcript_file", transcriptPath), zap.String("transcript_file", transcriptPath),
) )
}
return nil return nil
}, },
}) })
@@ -335,6 +342,23 @@ func splitMessagesIntoRounds(msgs []adk.Message) []messageRound {
return rounds return rounds
} }
// writeSummarizationTranscript persists pre-compaction history for read_file after summarization.
// Eino TranscriptFilePath only embeds the path in summary text; the file must be written by the host app.
func writeSummarizationTranscript(path string, msgs []adk.Message) error {
path = strings.TrimSpace(path)
if path == "" {
return nil
}
body := formatSummarizationTranscript(msgs)
if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil {
return fmt.Errorf("mkdir transcript dir: %w", err)
}
if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
return fmt.Errorf("write transcript: %w", err)
}
return nil
}
func einoSummarizationTokenCounter(openAIModel string) summarization.TokenCounterFunc { func einoSummarizationTokenCounter(openAIModel string) summarization.TokenCounterFunc {
tc := agent.NewTikTokenCounter() tc := agent.NewTikTokenCounter()
return func(ctx context.Context, input *summarization.TokenCounterInput) (int, error) { return func(ctx context.Context, input *summarization.TokenCounterInput) (int, error) {
@@ -2,6 +2,9 @@ package multiagent
import ( import (
"context" "context"
"os"
"path/filepath"
"strings"
"testing" "testing"
"github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/adk"
@@ -343,3 +346,91 @@ func assertNoOrphanTool(t *testing.T, msgs []adk.Message) {
} }
} }
} }
func TestWriteSummarizationTranscript(t *testing.T) {
t.Parallel()
dir := t.TempDir()
path := filepath.Join(dir, "summarization", "transcript.txt")
msgs := []adk.Message{
schema.UserMessage("scan target"),
assistantToolCallsMsg("", "tc1"),
schema.ToolMessage("nmap output", "tc1"),
}
if err := writeSummarizationTranscript(path, msgs); err != nil {
t.Fatalf("writeSummarizationTranscript: %v", err)
}
body, err := os.ReadFile(path)
if err != nil {
t.Fatalf("read transcript: %v", err)
}
text := string(body)
if !strings.Contains(text, "Pre-compaction session record") {
t.Fatalf("missing transcript header: %q", text)
}
if !strings.Contains(text, "[user]") || !strings.Contains(text, "scan target") {
t.Fatalf("missing user section: %q", text)
}
if !strings.Contains(text, "tool_calls:") || !strings.Contains(text, "nmap output") {
t.Fatalf("missing tool round: %q", text)
}
}
func TestSanitizeSystemContentForTranscript_BestPractice(t *testing.T) {
t.Parallel()
system := strings.Join([]string{
"以下是当前会话绑定的工具名称索引(仅名称,无参数 JSON Schema)。",
"- nmap",
"- nuclei",
"",
"使用规则:",
"1) 上表仅为名称索引",
"5) 不要臆造不存在的工具名。",
"",
"你是CyberStrikeAI,是一个专业的网络安全渗透测试专家。",
"高强度扫描要求:全力出击",
"",
"## 项目黑板索引(project: 123, id: abc",
"(暂无事实)",
"需要写入请使用 upsert_project_fact。",
"",
"# Skills System",
"**How to Use Skills**",
"Remember: Skills make you more capable",
}, "\n")
out := sanitizeSystemContentForTranscript(system)
if strings.Contains(out, "以下是当前会话绑定的工具名称索引") {
t.Fatalf("tool index should be stripped: %q", out)
}
if strings.Contains(out, "- nmap") || strings.Contains(out, "高强度扫描要求") {
t.Fatalf("static persona should be stripped: %q", out)
}
if strings.Contains(out, "# Skills System") || strings.Contains(out, "How to Use Skills") {
t.Fatalf("skills boilerplate should be stripped: %q", out)
}
if !strings.Contains(out, transcriptStaticSystemOmitNote) {
t.Fatalf("missing omission note: %q", out)
}
if !strings.Contains(out, "## 项目黑板索引(project: 123, id: abc") {
t.Fatalf("project blackboard should be kept: %q", out)
}
}
func TestFormatSummarizationTranscript_OmitsBloatedSystem(t *testing.T) {
t.Parallel()
msgs := []adk.Message{
schema.SystemMessage("以下是当前会话绑定的工具名称索引\n- nmap\n\n你是CyberStrikeAI\n## 项目黑板索引(project: p1, id: x\n(暂无事实)\n# Skills System\nboiler"),
schema.UserMessage("hello"),
schema.AssistantMessage("reply", nil),
}
out := formatSummarizationTranscript(msgs)
if strings.Contains(out, "- nmap") {
t.Fatalf("tool list leaked into transcript: %q", out)
}
if !strings.Contains(out, "hello") || !strings.Contains(out, "reply") {
t.Fatalf("conversation turns missing: %q", out)
}
if !strings.Contains(out, "## 项目黑板索引(project: p1, id: x") {
t.Fatalf("dynamic blackboard missing: %q", out)
}
}
@@ -0,0 +1,145 @@
package multiagent
import (
"strings"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
"github.com/bytedance/sonic"
)
const (
transcriptFileHeader = `# CyberStrikeAI summarization transcript
# Pre-compaction session record for read_file after context compression.
# Omits static system/tool-index/skills boilerplate; full user/assistant/tool turns below.
`
transcriptStaticSystemOmitNote = "[static system prompt omitted — unchanged in live context after compaction]"
transcriptToolIndexStartMarker = "以下是当前会话绑定的工具名称索引"
transcriptPersonaStartMarker = "你是CyberStrikeAI"
transcriptSkillsSystemMarker = "# Skills System"
transcriptProjectBlackboardMarker = "## 项目黑板索引"
)
// formatSummarizationTranscript renders pre-compaction messages for transcript.txt.
// Best practice: keep full user/assistant/tool turns; slim system to dynamic blocks only.
func formatSummarizationTranscript(msgs []adk.Message) string {
var sb strings.Builder
sb.WriteString(transcriptFileHeader)
wrote := false
for _, msg := range msgs {
if msg == nil {
continue
}
switch msg.Role {
case schema.System:
body := sanitizeSystemContentForTranscript(msg.Content)
if strings.TrimSpace(body) == "" {
continue
}
if wrote {
sb.WriteString("\n")
}
appendTranscriptSection(&sb, schema.System, body)
wrote = true
default:
if wrote {
sb.WriteString("\n")
}
appendTranscriptMessage(&sb, msg)
wrote = true
}
}
return sb.String()
}
func sanitizeSystemContentForTranscript(content string) string {
content = stripToolNamesIndexFromSystem(content)
content = stripSkillsSystemBoilerplate(content)
blackboard := extractProjectBlackboardSection(content)
var sb strings.Builder
sb.WriteString(transcriptStaticSystemOmitNote)
if bb := strings.TrimSpace(blackboard); bb != "" {
sb.WriteString("\n\n")
sb.WriteString(bb)
}
return sb.String()
}
func stripToolNamesIndexFromSystem(s string) string {
if !strings.Contains(s, transcriptToolIndexStartMarker) {
return s
}
idx := strings.Index(s, transcriptPersonaStartMarker)
if idx < 0 {
return s
}
return strings.TrimSpace(s[idx:])
}
func stripSkillsSystemBoilerplate(s string) string {
idx := strings.Index(s, transcriptSkillsSystemMarker)
if idx < 0 {
return strings.TrimSpace(s)
}
return strings.TrimSpace(s[:idx])
}
func extractProjectBlackboardSection(s string) string {
idx := strings.Index(s, transcriptProjectBlackboardMarker)
if idx < 0 {
return ""
}
return strings.TrimSpace(s[idx:])
}
func appendTranscriptSection(sb *strings.Builder, role schema.RoleType, body string) {
sb.WriteString("--- [")
sb.WriteString(string(role))
sb.WriteString("] ---\n")
sb.WriteString(body)
if !strings.HasSuffix(body, "\n") {
sb.WriteByte('\n')
}
}
func appendTranscriptMessage(sb *strings.Builder, msg adk.Message) {
sb.WriteString("--- [")
sb.WriteString(string(msg.Role))
sb.WriteString("] ---\n")
if msg.Content != "" {
sb.WriteString(msg.Content)
if !strings.HasSuffix(msg.Content, "\n") {
sb.WriteByte('\n')
}
}
if msg.ReasoningContent != "" {
sb.WriteString("[reasoning]\n")
sb.WriteString(msg.ReasoningContent)
if !strings.HasSuffix(msg.ReasoningContent, "\n") {
sb.WriteByte('\n')
}
}
for _, part := range msg.UserInputMultiContent {
if part.Type == schema.ChatMessagePartTypeText && strings.TrimSpace(part.Text) != "" {
sb.WriteString(part.Text)
if !strings.HasSuffix(part.Text, "\n") {
sb.WriteByte('\n')
}
}
}
if len(msg.ToolCalls) > 0 {
if b, err := sonic.Marshal(msg.ToolCalls); err == nil {
sb.WriteString("tool_calls: ")
sb.Write(b)
sb.WriteByte('\n')
}
}
if msg.ToolCallID != "" {
sb.WriteString("tool_call_id: ")
sb.WriteString(msg.ToolCallID)
sb.WriteByte('\n')
}
}
+4
View File
@@ -9,3 +9,7 @@ var ErrInterruptContinue = errors.New("agent interrupt: continue with user-suppl
// ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后 // ErrTransientRetryContinue 表示 Run 因 429/网络等临时错误结束,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。 // loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue 同级的「分段续跑」语义)。
var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace") var ErrTransientRetryContinue = errors.New("agent transient: retry after persisting trace")
// ErrEmptyResponseContinue 表示 Eino ADK 会话正常结束但未捕获到助手正文,应由 handler 落库轨迹后
// loadHistoryFromAgentTrace 再开下一轮 Run(与 ErrInterruptContinue / ErrTransientRetryContinue 同级)。
var ErrEmptyResponseContinue = errors.New("agent empty response: continue after persisting trace")
@@ -0,0 +1,71 @@
package multiagent
import (
"context"
"fmt"
"os"
"path/filepath"
"strings"
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk/middlewares/plantask"
)
// localPlantaskBackend adapts eino-ext local filesystem backend for Eino plantask.
//
// plantask TaskCreate/TaskList list a directory via LsInfo, then Read using each entry's Path.
// local.LsInfo returns basenames only (e.g. ".highwatermark"), while local.Read expects a
// resolvable path — causing "file not found: .highwatermark" on the second TaskCreate.
type localPlantaskBackend struct {
*localbk.Local
}
func newLocalPlantaskBackend(loc *localbk.Local) *localPlantaskBackend {
if loc == nil {
return nil
}
return &localPlantaskBackend{Local: loc}
}
// LsInfo lists files under req.Path and returns absolute paths suitable for subsequent Read calls.
func (l *localPlantaskBackend) LsInfo(ctx context.Context, req *plantask.LsInfoRequest) ([]plantask.FileInfo, error) {
if l == nil || l.Local == nil {
return nil, fmt.Errorf("plantask backend: local nil")
}
if req == nil || strings.TrimSpace(req.Path) == "" {
return nil, fmt.Errorf("plantask backend: list path empty")
}
files, err := l.Local.LsInfo(ctx, req)
if err != nil {
return nil, err
}
if len(files) == 0 {
return files, nil
}
base := filepath.Clean(req.Path)
out := make([]plantask.FileInfo, len(files))
for i, f := range files {
out[i] = f
name := strings.TrimSpace(f.Path)
if name == "" {
continue
}
if filepath.IsAbs(name) {
out[i].Path = filepath.Clean(name)
continue
}
out[i].Path = filepath.Join(base, name)
}
return out, nil
}
func (l *localPlantaskBackend) Delete(ctx context.Context, req *plantask.DeleteRequest) error {
if l == nil || l.Local == nil || req == nil {
return nil
}
p := strings.TrimSpace(req.FilePath)
if p == "" {
return nil
}
return os.Remove(p)
}
@@ -0,0 +1,83 @@
package multiagent
import (
"context"
"os"
"path/filepath"
"testing"
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk/filesystem"
"github.com/cloudwego/eino/adk/middlewares/plantask"
)
func TestLocalPlantaskBackendLsInfoReturnsFullPaths(t *testing.T) {
t.Parallel()
ctx := context.Background()
baseDir := t.TempDir()
loc, err := localbk.NewBackend(ctx, &localbk.Config{})
if err != nil {
t.Fatalf("NewBackend: %v", err)
}
be := newLocalPlantaskBackend(loc)
hwPath := filepath.Join(baseDir, ".highwatermark")
if err := os.WriteFile(hwPath, []byte("1"), 0o600); err != nil {
t.Fatalf("write highwatermark: %v", err)
}
files, err := be.LsInfo(ctx, &plantask.LsInfoRequest{Path: baseDir})
if err != nil {
t.Fatalf("LsInfo: %v", err)
}
if len(files) != 1 {
t.Fatalf("expected 1 file, got %d", len(files))
}
if files[0].Path != hwPath {
t.Fatalf("expected full path %q, got %q", hwPath, files[0].Path)
}
content, err := be.Read(ctx, &plantask.ReadRequest{FilePath: files[0].Path})
if err != nil {
t.Fatalf("Read via LsInfo path: %v", err)
}
if content.Content != "1" {
t.Fatalf("unexpected content: %q", content.Content)
}
}
func TestLocalPlantaskBackendSecondTaskCreateScenario(t *testing.T) {
t.Parallel()
ctx := context.Background()
baseDir := t.TempDir()
loc, err := localbk.NewBackend(ctx, &localbk.Config{})
if err != nil {
t.Fatalf("NewBackend: %v", err)
}
be := newLocalPlantaskBackend(loc)
hwPath := filepath.Join(baseDir, ".highwatermark")
if err := loc.Write(ctx, &filesystem.WriteRequest{FilePath: hwPath, Content: "1"}); err != nil {
t.Fatalf("seed highwatermark: %v", err)
}
files, err := be.LsInfo(ctx, &plantask.LsInfoRequest{Path: baseDir})
if err != nil {
t.Fatalf("LsInfo: %v", err)
}
var hwFile string
for _, f := range files {
if filepath.Base(f.Path) == ".highwatermark" {
hwFile = f.Path
break
}
}
if hwFile == "" {
t.Fatal("highwatermark not listed")
}
if _, err := be.Read(ctx, &plantask.ReadRequest{FilePath: hwFile}); err != nil {
t.Fatalf("Read highwatermark (second TaskCreate path): %v", err)
}
}
+6
View File
@@ -4095,6 +4095,12 @@ header {
word-break: break-word; word-break: break-word;
} }
/* 长过程详情:跳过视口外时间线条目的布局/绘制,减轻大段工具输出时的主线程压力 */
.progress-timeline .timeline-item {
content-visibility: auto;
contain-intrinsic-size: auto 72px;
}
.tool-details { .tool-details {
display: flex; display: flex;
flex-direction: column; flex-direction: column;
+4 -2
View File
@@ -464,7 +464,7 @@
"noHistoryConversations": "No conversation history yet", "noHistoryConversations": "No conversation history yet",
"renameGroupPrompt": "Please enter new name:", "renameGroupPrompt": "Please enter new name:",
"deleteGroupConfirm": "Are you sure you want to delete this group? Conversations in the group will not be deleted, but will be removed from the group.", "deleteGroupConfirm": "Are you sure you want to delete this group? Conversations in the group will not be deleted, but will be removed from the group.",
"deleteConversationConfirm": "Are you sure you want to delete this conversation?", "deleteConversationConfirm": "Delete this conversation? Chat messages cannot be recovered, but recorded vulnerabilities will remain in the vulnerability library.",
"renameFailed": "Rename failed", "renameFailed": "Rename failed",
"downloadConversationFailed": "Failed to download conversation", "downloadConversationFailed": "Failed to download conversation",
"viewAttackChainSelectConv": "Please select a conversation to view attack chain", "viewAttackChainSelectConv": "Please select a conversation to view attack chain",
@@ -501,6 +501,8 @@
"einoStreamErrorTitle": "⚠️ Eino stream interrupted ({{agent}})", "einoStreamErrorTitle": "⚠️ Eino stream interrupted ({{agent}})",
"einoStreamErrorMessage": "Streaming read failed; the system will retry or terminate according to policy.", "einoStreamErrorMessage": "Streaming read failed; the system will retry or terminate according to policy.",
"einoRunRetryTitle": "🔁 Transient error retry", "einoRunRetryTitle": "🔁 Transient error retry",
"einoEmptyResponseContinueTitle": "🔁 Auto resume (no assistant text)",
"einoEmptyResponseContinueMessage": "Session ended without captured assistant text; resuming from trace…",
"einoRunRetryErrorDetail": "Error detail", "einoRunRetryErrorDetail": "Error detail",
"iterationLimitReachedTitle": "⛔ Iteration limit reached", "iterationLimitReachedTitle": "⛔ Iteration limit reached",
"iterationLimitReachedMessage": "Maximum iteration count reached; automatic iteration has stopped.", "iterationLimitReachedMessage": "Maximum iteration count reached; automatic iteration has stopped.",
@@ -2319,7 +2321,7 @@
"selectAll": "Select all", "selectAll": "Select all",
"deleteSelected": "Delete selected", "deleteSelected": "Delete selected",
"confirmDeleteNone": "Please select at least one conversation to delete", "confirmDeleteNone": "Please select at least one conversation to delete",
"confirmDeleteN": "Delete {{count}} selected conversation(s)?", "confirmDeleteN": "Delete {{count}} selected conversation(s)? Chat messages cannot be recovered, but recorded vulnerabilities will remain in the vulnerability library.",
"deleteFailed": "Delete failed", "deleteFailed": "Delete failed",
"unnamedConversation": "Unnamed conversation" "unnamedConversation": "Unnamed conversation"
}, },
+4 -2
View File
@@ -452,7 +452,7 @@
"noHistoryConversations": "暂无历史对话", "noHistoryConversations": "暂无历史对话",
"renameGroupPrompt": "请输入新名称:", "renameGroupPrompt": "请输入新名称:",
"deleteGroupConfirm": "确定要删除此分组吗?分组中的对话不会被删除,但会从分组中移除。", "deleteGroupConfirm": "确定要删除此分组吗?分组中的对话不会被删除,但会从分组中移除。",
"deleteConversationConfirm": "确定要删除此对话吗?", "deleteConversationConfirm": "确定要删除此对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。",
"renameFailed": "重命名失败", "renameFailed": "重命名失败",
"downloadConversationFailed": "下载对话失败", "downloadConversationFailed": "下载对话失败",
"viewAttackChainSelectConv": "请选择一个对话以查看攻击链", "viewAttackChainSelectConv": "请选择一个对话以查看攻击链",
@@ -489,6 +489,8 @@
"einoStreamErrorTitle": "⚠️ Eino 流式中断({{agent}}", "einoStreamErrorTitle": "⚠️ Eino 流式中断({{agent}}",
"einoStreamErrorMessage": "流式读取异常,系统将按策略重试或结束。", "einoStreamErrorMessage": "流式读取异常,系统将按策略重试或结束。",
"einoRunRetryTitle": "🔁 临时错误重试", "einoRunRetryTitle": "🔁 临时错误重试",
"einoEmptyResponseContinueTitle": "🔁 自动续跑(无助手正文)",
"einoEmptyResponseContinueMessage": "会话已结束但未捕获到助手正文,正在基于轨迹自动续跑…",
"einoRunRetryErrorDetail": "具体报错", "einoRunRetryErrorDetail": "具体报错",
"iterationLimitReachedTitle": "⛔ 达到迭代上限", "iterationLimitReachedTitle": "⛔ 达到迭代上限",
"iterationLimitReachedMessage": "已达到最大迭代次数,任务已停止继续自动迭代。", "iterationLimitReachedMessage": "已达到最大迭代次数,任务已停止继续自动迭代。",
@@ -2307,7 +2309,7 @@
"selectAll": "全选", "selectAll": "全选",
"deleteSelected": "删除所选", "deleteSelected": "删除所选",
"confirmDeleteNone": "请先选择要删除的对话", "confirmDeleteNone": "请先选择要删除的对话",
"confirmDeleteN": "确定要删除选中的 {{count}} 条对话吗?", "confirmDeleteN": "确定要删除选中的 {{count}} 条对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。",
"deleteFailed": "删除失败", "deleteFailed": "删除失败",
"unnamedConversation": "未命名对话" "unnamedConversation": "未命名对话"
}, },
+25 -25
View File
@@ -982,6 +982,24 @@ async function sendMessage() {
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
let buffer = ''; let buffer = '';
const dispatchStreamEvent = function (eventData) {
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
};
const processSseLines = typeof processSseDataLinesYielding === 'function'
? processSseDataLinesYielding
: async function (lines, onEvent) {
for (const line of lines) {
if (line.startsWith('data: ')) {
try {
onEvent(JSON.parse(line.slice(6)));
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
};
while (true) { while (true) {
const { done, value } = await reader.read(); const { done, value } = await reader.read();
@@ -991,18 +1009,7 @@ async function sendMessage() {
const lines = buffer.split('\n'); const lines = buffer.split('\n');
buffer = lines.pop(); // 保留最后一个不完整的行 buffer = lines.pop(); // 保留最后一个不完整的行
for (const line of lines) { await processSseLines(lines, dispatchStreamEvent);
if (line.startsWith('data: ')) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
} }
// Flush decoder internal buffer to avoid losing the final partial UTF-8 code point. // Flush decoder internal buffer to avoid losing the final partial UTF-8 code point.
buffer += decoder.decode(); buffer += decoder.decode();
@@ -1010,18 +1017,7 @@ async function sendMessage() {
// 处理剩余的buffer // 处理剩余的buffer
if (buffer.trim()) { if (buffer.trim()) {
const lines = buffer.split('\n'); const lines = buffer.split('\n');
for (const line of lines) { await processSseLines(lines, dispatchStreamEvent);
if (line.startsWith('data: ')) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, progressElement, progressId,
() => assistantMessageId, (id) => { assistantMessageId = id; },
() => mcpExecutionIds, (ids) => { mcpExecutionIds = ids; });
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
}
} }
} finally { } finally {
window.__csAgentLiveStream = { active: false, conversationId: null, progressId: null }; window.__csAgentLiveStream = { active: false, conversationId: null, progressId: null };
@@ -2384,6 +2380,10 @@ function renderProcessDetails(messageId, processDetails) {
itemTitle = agPx + execLine; itemTitle = agPx + execLine;
} else if (eventType === 'eino_agent_reply') { } else if (eventType === 'eino_agent_reply') {
itemTitle = agPx + '💬 ' + (typeof window.t === 'function' ? window.t('chat.einoAgentReplyTitle') : '子代理回复'); itemTitle = agPx + '💬 ' + (typeof window.t === 'function' ? window.t('chat.einoAgentReplyTitle') : '子代理回复');
} else if (eventType === 'eino_empty_response_continue') {
itemTitle = typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueTitle')
: '🔁 自动续跑(无助手正文)';
} else if (eventType === 'eino_run_retry') { } else if (eventType === 'eino_run_retry') {
itemTitle = typeof window.t === 'function' itemTitle = typeof window.t === 'function'
? window.t('chat.einoRunRetryTitle') ? window.t('chat.einoRunRetryTitle')
@@ -3369,7 +3369,7 @@ async function deleteConversationTurnFromUI(anchorBackendMessageId) {
async function deleteConversation(conversationId, skipConfirm = false) { async function deleteConversation(conversationId, skipConfirm = false) {
// 确认删除(如果调用者没有跳过确认) // 确认删除(如果调用者没有跳过确认)
if (!skipConfirm) { if (!skipConfirm) {
if (!confirm('确定要删除这个对话吗?此操作不可恢复。')) { if (!confirm('确定要删除这个对话吗?对话消息将不可恢复,但已记录的漏洞会保留在漏洞库中。')) {
return; return;
} }
} }
+142 -31
View File
@@ -638,18 +638,126 @@ function mergeStreamBuffer(current, delta, data) {
if (typeof window !== 'undefined') { if (typeof window !== 'undefined') {
window.streamBufferFromAccumulated = streamBufferFromAccumulated; window.streamBufferFromAccumulated = streamBufferFromAccumulated;
window.mergeStreamBuffer = mergeStreamBuffer; window.mergeStreamBuffer = mergeStreamBuffer;
window.processSseDataLinesYielding = processSseDataLinesYielding;
window.flushStreamPlainTextUpdate = flushStreamPlainTextUpdate;
window.scheduleStreamPlainTextUpdate = scheduleStreamPlainTextUpdate;
}
/** 流式纯文本 DOM:按帧合并更新,尽量增量 appendData,避免每条 SSE 全量 textContent 阻塞主线程 */
const streamPlainDomState = new WeakMap();
/** 跟踪仍有待刷新的流式节点,便于快照时间线前一次性 flush */
const streamPlainDomPendingElements = new Set();
function applyStreamPlainTextNow(contentEl, text, state) {
if (!contentEl) return;
const full = text == null ? '' : String(text);
const prevLen = state && state.renderedLen ? state.renderedLen : 0;
contentEl.classList.add('timeline-stream-plain');
if (full.length > prevLen && contentEl.childNodes.length === 1 &&
contentEl.firstChild && contentEl.firstChild.nodeType === Node.TEXT_NODE) {
const existing = contentEl.firstChild.nodeValue || '';
if (existing.length === prevLen && full.startsWith(existing)) {
const delta = full.slice(prevLen);
if (delta) {
contentEl.firstChild.appendData(delta);
if (state) {
state.renderedLen = full.length;
state.pendingText = full;
}
return;
}
}
}
contentEl.textContent = full;
if (state) {
state.renderedLen = full.length;
state.pendingText = full;
}
}
function flushStreamPlainTextUpdate(contentEl) {
if (!contentEl) return;
const state = streamPlainDomState.get(contentEl);
if (!state) return;
if (state.rafId) {
cancelAnimationFrame(state.rafId);
state.rafId = 0;
}
applyStreamPlainTextNow(contentEl, state.pendingText, state);
}
function scheduleStreamPlainTextUpdate(contentEl, text) {
if (!contentEl) return;
const full = text == null ? '' : String(text);
let state = streamPlainDomState.get(contentEl);
if (!state) {
state = { pendingText: full, rafId: 0, renderedLen: 0 };
streamPlainDomState.set(contentEl, state);
} else {
state.pendingText = full;
}
streamPlainDomPendingElements.add(contentEl);
if (state.rafId) return;
state.rafId = requestAnimationFrame(function () {
state.rafId = 0;
applyStreamPlainTextNow(contentEl, state.pendingText, state);
});
}
function resetStreamPlainTextState(contentEl) {
if (!contentEl) return;
const state = streamPlainDomState.get(contentEl);
if (state && state.rafId) {
cancelAnimationFrame(state.rafId);
}
streamPlainDomState.delete(contentEl);
streamPlainDomPendingElements.delete(contentEl);
}
function flushAllPendingStreamPlainUpdates() {
streamPlainDomPendingElements.forEach(function (el) {
if (el && el.isConnected) {
flushStreamPlainTextUpdate(el);
}
});
} }
/** 流式 delta:纯文本,避免每条全量 marked + DOMPurify */ /** 流式 delta:纯文本,避免每条全量 marked + DOMPurify */
function setTimelineItemContentStreamPlain(contentEl, text) { function setTimelineItemContentStreamPlain(contentEl, text) {
if (!contentEl) return; if (!contentEl) return;
contentEl.classList.add('timeline-stream-plain'); resetStreamPlainTextState(contentEl);
contentEl.textContent = text == null ? '' : String(text); applyStreamPlainTextNow(contentEl, text, null);
}
/**
* 分批处理 SSE data 行并在批间让出主线程避免单次 read() 内数百条事件连续阻塞 UI
* @param {string[]} lines
* @param {(event: object) => void} onEvent
* @param {{ yieldEvery?: number }} [options]
*/
async function processSseDataLinesYielding(lines, onEvent, options) {
const yieldEvery = (options && options.yieldEvery) || 32;
for (let i = 0; i < lines.length; i++) {
const line = lines[i];
if (line.startsWith('data: ')) {
try {
onEvent(JSON.parse(line.slice(6)));
} catch (e) {
console.error('解析事件数据失败:', e, line);
}
}
if ((i + 1) % yieldEvery === 0 && i + 1 < lines.length) {
await new Promise(function (resolve) { requestAnimationFrame(resolve); });
}
}
} }
/** 流结束或非流式:富文本(已消毒的 HTML 字符串) */ /** 流结束或非流式:富文本(已消毒的 HTML 字符串) */
function setTimelineItemContentStreamRich(contentEl, html) { function setTimelineItemContentStreamRich(contentEl, html) {
if (!contentEl) return; if (!contentEl) return;
resetStreamPlainTextState(contentEl);
contentEl.classList.remove('timeline-stream-plain'); contentEl.classList.remove('timeline-stream-plain');
contentEl.innerHTML = html; contentEl.innerHTML = html;
} }
@@ -1054,6 +1162,9 @@ function integrateProgressToMCPSection(progressId, assistantMessageId, mcpExecut
const progressElement = document.getElementById(progressId); const progressElement = document.getElementById(progressId);
if (!progressElement) return; if (!progressElement) return;
// 快照 innerHTML 前刷掉尚未执行的 rAF 流式更新,避免过程详情少最后几帧
flushAllPendingStreamPlainUpdates();
// Ensure any "running" tool_call badges are closed before we snapshot timeline HTML. // Ensure any "running" tool_call badges are closed before we snapshot timeline HTML.
// Otherwise, once the progress element is removed, later 'done' events may not be able // Otherwise, once the progress element is removed, later 'done' events may not be able
// to update the original timeline DOM and the copied HTML would stay "执行中". // to update the original timeline DOM and the copied HTML would stay "执行中".
@@ -1668,7 +1779,7 @@ function handleStreamEvent(event, progressElement, progressId,
if (item) { if (item) {
const contentEl = item.querySelector('.timeline-item-content'); const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) { if (contentEl) {
setTimelineItemContentStreamPlain(contentEl, s.buffer); scheduleStreamPlainTextUpdate(contentEl, s.buffer);
} }
} }
break; break;
@@ -1688,6 +1799,7 @@ function handleStreamEvent(event, progressElement, progressId,
if (item) { if (item) {
const contentEl = item.querySelector('.timeline-item-content'); const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) { if (contentEl) {
flushStreamPlainTextUpdate(contentEl);
if (typeof formatMarkdown === 'function') { if (typeof formatMarkdown === 'function') {
setTimelineItemContentStreamRich(contentEl, formatMarkdown(s.buffer, timelineMarkdownOpts)); setTimelineItemContentStreamRich(contentEl, formatMarkdown(s.buffer, timelineMarkdownOpts));
} else { } else {
@@ -1784,6 +1896,21 @@ function handleStreamEvent(event, progressElement, progressId,
break; break;
} }
case 'eino_empty_response_continue': {
const d = event.data || {};
const title = typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueTitle')
: '🔁 自动续跑(无助手正文)';
addTimelineItem(timeline, 'warning', {
title: title,
message: event.message || (typeof window.t === 'function'
? window.t('chat.einoEmptyResponseContinueMessage')
: '会话已结束但未捕获到助手正文,正在基于轨迹自动续跑…'),
data: d
});
break;
}
case 'eino_run_retry': { case 'eino_run_retry': {
const d = event.data || {}; const d = event.data || {};
const title = typeof window.t === 'function' const title = typeof window.t === 'function'
@@ -1899,7 +2026,7 @@ function handleStreamEvent(event, progressElement, progressId,
const pre = item.querySelector('pre.tool-result'); const pre = item.querySelector('pre.tool-result');
if (pre) { if (pre) {
pre.classList.remove('tool-result-pending'); pre.classList.remove('tool-result-pending');
pre.textContent = state.buffer; scheduleStreamPlainTextUpdate(pre, state.buffer);
} }
} }
break; break;
@@ -2006,7 +2133,7 @@ function handleStreamEvent(event, progressElement, progressId,
} }
} }
if (contentEl) { if (contentEl) {
setTimelineItemContentStreamPlain(contentEl, s.buffer); scheduleStreamPlainTextUpdate(contentEl, s.buffer);
} }
} }
break; break;
@@ -2033,6 +2160,7 @@ function handleStreamEvent(event, progressElement, progressId,
contentEl.className = 'timeline-item-content'; contentEl.className = 'timeline-item-content';
item.appendChild(contentEl); item.appendChild(contentEl);
} }
flushStreamPlainTextUpdate(contentEl);
if (typeof formatMarkdown === 'function') { if (typeof formatMarkdown === 'function') {
setTimelineItemContentStreamRich(contentEl, formatMarkdown(full, timelineMarkdownOpts)); setTimelineItemContentStreamRich(contentEl, formatMarkdown(full, timelineMarkdownOpts));
} else { } else {
@@ -2209,15 +2337,13 @@ function handleStreamEvent(event, progressElement, progressId,
if (!deltaContent && streamBufferFromAccumulated(responseData) === null) break; if (!deltaContent && streamBufferFromAccumulated(responseData) === null) break;
state.buffer = mergeStreamBuffer(state.buffer, deltaContent, responseData); state.buffer = mergeStreamBuffer(state.buffer, deltaContent, responseData);
// 更新时间线条目内容 // 流式阶段仅追加纯文本;formatTimelineStreamBody 在终态 response 时一次性处理
if (state.itemId) { if (state.itemId) {
const item = document.getElementById(state.itemId); const item = document.getElementById(state.itemId);
if (item) { if (item) {
const contentEl = item.querySelector('.timeline-item-content'); const contentEl = item.querySelector('.timeline-item-content');
if (contentEl) { if (contentEl) {
const meta = state.streamMeta || responseData; scheduleStreamPlainTextUpdate(contentEl, state.buffer);
const body = formatTimelineStreamBody(state.buffer, meta);
setTimelineItemContentStreamPlain(contentEl, body);
} }
} }
} }
@@ -2757,39 +2883,22 @@ async function attachRunningTaskEventStream(conversationId) {
const reader = response.body.getReader(); const reader = response.body.getReader();
const decoder = new TextDecoder(); const decoder = new TextDecoder();
let buffer = ''; let buffer = '';
const dispatchTaskEvent = function (eventData) {
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
};
while (true) { while (true) {
const chunk = await reader.read(); const chunk = await reader.read();
if (chunk.done) break; if (chunk.done) break;
buffer += decoder.decode(chunk.value, { stream: true }); buffer += decoder.decode(chunk.value, { stream: true });
const lines = buffer.split('\n'); const lines = buffer.split('\n');
buffer = lines.pop() || ''; buffer = lines.pop() || '';
for (let li = 0; li < lines.length; li++) { await processSseDataLinesYielding(lines, dispatchTaskEvent);
const line = lines[li];
if (line.indexOf('data: ') === 0) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
} catch (e) {
console.error('task-events parse', e);
}
}
}
} }
// Flush decoder internal buffer to avoid dropping trailing partial UTF-8 bytes. // Flush decoder internal buffer to avoid dropping trailing partial UTF-8 bytes.
buffer += decoder.decode(); buffer += decoder.decode();
if (buffer.trim()) { if (buffer.trim()) {
const lines = buffer.split('\n'); const lines = buffer.split('\n');
for (let li = 0; li < lines.length; li++) { await processSseDataLinesYielding(lines, dispatchTaskEvent);
const line = lines[li];
if (line.indexOf('data: ') === 0) {
try {
const eventData = JSON.parse(line.slice(6));
handleStreamEvent(eventData, null, progressId, getAssistantIdFn, setAssistantIdFn, function () { return mcpIds; }, function (ids) { mcpIds = mergeMcpExecutionIDLists(mcpIds, ids || []); });
} catch (e) {
console.error('task-events parse', e);
}
}
}
} }
if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) { if (window.csTaskReplay && window.csTaskReplay.progressId === progressId) {
clearCsTaskReplay(); clearCsTaskReplay();
@@ -2921,7 +3030,9 @@ function mergeToolResultIntoCallItem(item, data, options) {
const pre = section.querySelector('pre.tool-result'); const pre = section.querySelector('pre.tool-result');
if (pre) { if (pre) {
pre.classList.remove('tool-result-pending'); pre.classList.remove('tool-result-pending');
flushStreamPlainTextUpdate(pre);
pre.textContent = text; pre.textContent = text;
resetStreamPlainTextState(pre);
} }
if (data.executionId) { if (data.executionId) {