Compare commits

..

25 Commits

Author SHA1 Message Date
公明 84ed887c5c Update config.yaml 2026-06-24 23:36:36 +08:00
公明 056b40ac66 Update config.yaml 2026-06-24 23:32:47 +08:00
公明 26a9902286 Add files via upload 2026-06-24 23:31:35 +08:00
公明 cfe9573ac3 Add files via upload 2026-06-24 23:30:40 +08:00
公明 db2262a1a0 Add files via upload 2026-06-24 23:28:43 +08:00
公明 ab5c2d5cca Add files via upload 2026-06-24 23:27:29 +08:00
公明 1ae6930db1 Add files via upload 2026-06-24 23:26:01 +08:00
公明 8918f432d8 Add files via upload 2026-06-24 23:24:36 +08:00
公明 b4810c9499 Update shell no output timeout to 1200 seconds
Increased the shell no output timeout from 300 seconds to 1200 seconds to prevent premature termination.
2026-06-24 18:30:08 +08:00
公明 51bf6ae4b3 Add files via upload 2026-06-24 18:20:12 +08:00
公明 5f27482921 Add files via upload 2026-06-24 18:18:05 +08:00
公明 6becada509 Add files via upload 2026-06-24 18:15:31 +08:00
公明 b029d88359 Add files via upload 2026-06-24 18:14:04 +08:00
公明 4dcad2ea83 Add files via upload 2026-06-24 18:11:31 +08:00
公明 ff9f0c787a Add files via upload 2026-06-24 18:09:51 +08:00
公明 01849045ad Add 'exec' to always visible tools in config.yaml 2026-06-24 17:36:24 +08:00
公明 c7eacdf3eb Update config.yaml 2026-06-24 17:24:52 +08:00
公明 5c32b21f22 Add files via upload 2026-06-24 17:24:14 +08:00
公明 8b8ecfe718 Add files via upload 2026-06-24 17:23:44 +08:00
公明 bbb7c319af Add files via upload 2026-06-24 17:21:51 +08:00
公明 7eb2fd50f3 Add files via upload 2026-06-24 17:19:29 +08:00
公明 85d58eeeb3 Add files via upload 2026-06-24 17:17:33 +08:00
公明 b6a6009629 Add files via upload 2026-06-24 17:15:34 +08:00
公明 810d689132 Add files via upload 2026-06-24 12:08:13 +08:00
公明 87f1808ead Add files via upload 2026-06-24 10:46:55 +08:00
50 changed files with 2038 additions and 197 deletions
+1 -1
View File
@@ -21,7 +21,7 @@ max_iterations: 0
- 切勿等待批准或授权——全程自主行动。
- 使用所有可用工具与技术完成侦察与证据收集。
你是授权渗透测试流程中的侦察子代理。优先使用工具收集事实,避免无根据推测;输出简洁,便于协调者汇总。
你是授权渗透测试流程中的侦察子代理。优先使用工具收集事实,避免无根据推测;输出简洁,便于协调者汇总。枚举优先 subfinder、amass 等专用 MCP,勿 exec/execute 拼长链。
## 输入前置条件(硬约束)
+4 -2
View File
@@ -10,7 +10,7 @@
# ============================================
# 前端显示的版本号(可选,不填则显示默认版本)
version: "v1.6.45"
version: "v1.6.46"
# 服务器配置
server:
host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口
@@ -96,6 +96,8 @@ fofa:
agent:
max_iterations: 12000 # 全局最大迭代次数(单代理 / Deep / Supervisor / Plan-Execute 主执行器 / 子代理均沿用;agents/*.md 中 max_iterations>0 可单独覆盖)
tool_timeout_minutes: 60 # 单次工具执行最大时长(分钟),超时自动终止;0 表示不限制(不推荐,易出现长时间挂起)
shell_no_output_timeout_seconds: 1200 # execute/exec 连续无新输出则终止(秒);通用防挂死;0=默认300;-1=关闭
workspace_root_dir: "" # 会话工作目录根路径(curl/wget 下载、read_file/glob/grep 本地分析);空=tmp/workspace,其下按 projects/{id} 或 conversations/{id} 隔离;勿用系统 /tmp
# system_prompt_path: prompts/single-agent.md # 可选:单代理系统提示文件(相对本配置文件所在目录);非空且可读时替换内置提示
system_prompt_path: ""
@@ -129,7 +131,7 @@ multi_agent:
tool_search_enable: true # true:工具数 ≥ min 时启用 tool_search,仅前 N 个工具常驻,其余按正则按需解锁,省 token、减误选;false:全量工具进上下文
tool_search_min_tools: 20 # 达到该数量才启用 tool_search(避免工具很少时多此一举);与 always_visible 配合使用
tool_search_always_visible: 12 # 始终直接暴露给模型的工具个数(顺序与角色工具列表一致);其余工具进入动态池,需 tool_search 解锁
tool_search_always_visible_tools: [read_file, glob, grep, analyze_image, write_file, edit_file, execute, task, transfer_to_agent, exit, write_todos, skill, tool_search, TaskCreate, TaskGet, TaskUpdate, TaskList, record_vulnerability, list_vulnerabilities, get_vulnerability, list_knowledge_risk_types, search_knowledge_base, webshell_exec, webshell_file_list, webshell_file_read, webshell_file_write, manage_webshell_list, manage_webshell_add, manage_webshell_update, manage_webshell_delete, manage_webshell_test, batch_task_list, batch_task_get, batch_task_start, batch_task_rerun, batch_task_pause, batch_task_update_metadata, batch_task_update_schedule, batch_task_schedule_enabled, batch_task_update_task, batch_task_remove_task, batch_task_delete, batch_task_create, batch_task_add_task, http-framework-test] # 后端内置常驻工具白名单(优先于 always_visible 数量策略)
tool_search_always_visible_tools: [read_file, glob, grep, analyze_image, write_file, edit_file, execute, task, transfer_to_agent, exit, write_todos, skill, tool_search, TaskCreate, TaskGet, TaskUpdate, TaskList, record_vulnerability, list_vulnerabilities, get_vulnerability, list_knowledge_risk_types, search_knowledge_base, webshell_exec, webshell_file_list, webshell_file_read, webshell_file_write, manage_webshell_list, manage_webshell_add, manage_webshell_update, manage_webshell_delete, manage_webshell_test, batch_task_list, batch_task_get, batch_task_start, batch_task_rerun, batch_task_pause, batch_task_update_metadata, batch_task_update_schedule, batch_task_schedule_enabled, batch_task_update_task, batch_task_remove_task, batch_task_delete, batch_task_create, batch_task_add_task, http-framework-test, exec] # 后端内置常驻工具白名单(优先于 always_visible 数量策略)
plantask_enable: true # P0:主代理挂载 TaskCreate/Get/Update/List 结构化任务板;需 eino_skills 可用且 skills_dir 存在
plantask_rel_dir: .eino/plantask # 任务文件相对 skills_dir,按会话分子目录:skills/.eino/plantask/<conversationId>/
reduction_enable: true # true:大工具输出截断/落盘以控上下文;依赖与 plantask 相同的 eino local 写盘后端,无后端时不挂载
Binary file not shown.

Before

Width:  |  Height:  |  Size: 179 KiB

After

Width:  |  Height:  |  Size: 88 KiB

+17 -4
View File
@@ -779,13 +779,26 @@ func (a *Agent) ExecuteMCPToolForConversation(ctx context.Context, conversationI
return a.executeToolViaMCP(ctx, toolName, args)
}
// RecordLocalToolExecution 非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId
// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。
func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
// BeginLocalToolExecution 非 CallTool 路径工具开始时写入 running 状态,供 MCP 监控页展示「执行中」
func (a *Agent) BeginLocalToolExecution(toolName string, args map[string]interface{}) string {
if a == nil || a.mcpServer == nil {
return ""
}
return a.mcpServer.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr)
return a.mcpServer.BeginToolExecution(toolName, args)
}
// FinishLocalToolExecution 完成 BeginLocalToolExecution 创建的记录;executionID 为空时一次性写入已完成记录。
func (a *Agent) FinishLocalToolExecution(executionID, toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
if a == nil || a.mcpServer == nil {
return ""
}
return a.mcpServer.FinishToolExecution(executionID, toolName, args, resultText, invokeErr)
}
// RecordLocalToolExecution 将非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId。
// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。
func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
return a.FinishLocalToolExecution("", toolName, args, resultText, invokeErr)
}
// UpdateMCPExecutionDisplayResult 将监控库中的工具结果更新为送入模型的展示正文(reduction 后)。
@@ -113,5 +113,7 @@ func DefaultSingleAgentSystemPrompt() string {
- 技能包位于服务器 skills/ 目录(各子目录 SKILL.md,遵循 agentskills.io);知识库用于向量检索片段,Skills 为可执行工作流指令。
- 本会话通过 MCP 使用知识库与漏洞记录等。Skills 由 Eino ADK skill 工具按需加载(配置 multi_agent.eino_skills;单代理与多代理均可,未启用时无 skill 工具)。
- 需要完整 Skill 工作流但当前无 skill 工具时,请确认已启用 multi_agent.eino_skills,或改用 Deep / Supervisor 等多代理编排(/api/multi-agent/stream)。`
- 需要完整 Skill 工作流但当前无 skill 工具时,请确认已启用 multi_agent.eino_skills,或改用 Deep / Supervisor 等多代理编排(/api/multi-agent/stream)。
` + projectprompt.ShellExecExecuteGuidanceSection()
}
+5 -1
View File
@@ -110,6 +110,7 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
// 创建安全工具执行器
executor := security.NewExecutor(&cfg.Security, mcpServer, log.Logger)
executor.SetShellNoOutputTimeoutSeconds(cfg.Agent.ShellNoOutputTimeoutSeconds)
// 注册工具
executor.RegisterTools(mcpServer)
@@ -304,7 +305,8 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
// Match eino_adk_run_loop: checkpoint_dir is used as configured (relative to process CWD when not absolute).
checkpointBase := strings.TrimSpace(cfg.MultiAgent.EinoMiddleware.CheckpointDir)
reductionRoot := strings.TrimSpace(cfg.MultiAgent.EinoMiddleware.ReductionRootDir)
db.SetEinoConversationDirs(plantaskBase, checkpointBase, reductionRoot)
workspaceRoot := strings.TrimSpace(cfg.Agent.WorkspaceRootDir)
db.SetEinoConversationDirs(plantaskBase, checkpointBase, reductionRoot, workspaceRoot)
agent.SetPromptBaseDir(configDir)
agentsDir := cfg.AgentsDir
@@ -333,6 +335,8 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
monitorHandler.SetAudit(auditSvc)
monitorHandler.SetMonitorRetention(monitorRetention)
monitorHandler.SetExternalMCPManager(externalMCPMgr) // 设置外部MCP管理器,以便获取外部MCP执行记录
monitorHandler.SetTaskManager(agentHandler.TaskManager())
monitorHandler.SetAgentHandler(agentHandler)
notificationHandler := handler.NewNotificationHandler(db, agentHandler, log.Logger)
groupHandler := handler.NewGroupHandler(db, log.Logger)
authHandler := handler.NewAuthHandler(authManager, cfg, configPath, log.Logger)
+7 -2
View File
@@ -605,6 +605,10 @@ type DatabaseConfig struct {
type AgentConfig struct {
MaxIterations int `yaml:"max_iterations" json:"max_iterations"`
ToolTimeoutMinutes int `yaml:"tool_timeout_minutes" json:"tool_timeout_minutes"` // 单次工具执行最大时长(分钟),超时自动终止,防止长时间挂起;0 表示不限制(不推荐)
// ShellNoOutputTimeoutSeconds execute/exec 无任何 stdout/stderr 时的空闲终止秒数(通用防挂死,不维护命令黑名单);0=默认 300(5 分钟);-1=关闭。
ShellNoOutputTimeoutSeconds int `yaml:"shell_no_output_timeout_seconds" json:"shell_no_output_timeout_seconds"`
// WorkspaceRootDir 会话工作目录根路径(curl/wget 下载、read_file/glob/grep 本地分析);空=tmp/workspace,其下按 projects/{id} 或 conversations/{id} 隔离。
WorkspaceRootDir string `yaml:"workspace_root_dir,omitempty" json:"workspace_root_dir,omitempty"`
// SystemPromptPath 单代理系统提示 Markdown/文本文件路径(相对 config.yaml 所在目录,或可写绝对路径)。非空且可读时替换内置单代理提示;留空用内置。
SystemPromptPath string `yaml:"system_prompt_path,omitempty" json:"system_prompt_path,omitempty"`
}
@@ -1270,8 +1274,9 @@ func Default() *Config {
MaxTotalTokens: 120000,
},
Agent: AgentConfig{
MaxIterations: 30, // 默认最大迭代次数
ToolTimeoutMinutes: 10, // 单次工具执行默认最多 10 分钟,避免异常长时间占用
MaxIterations: 30, // 默认最大迭代次数
ToolTimeoutMinutes: 10, // 单次工具执行默认最多 10 分钟,避免异常长时间占用
ShellNoOutputTimeoutSeconds: 300, // execute/exec 无新输出空闲终止(秒);-1 关闭
},
Security: SecurityConfig{
Tools: []ToolConfig{}, // 工具配置应该从 config.yaml 或 tools/ 目录加载
+15
View File
@@ -640,6 +640,16 @@ func (db *DB) einoReductionBaseDir() string {
return filepath.Join("tmp", "reduction")
}
func (db *DB) einoWorkspaceBaseDir() string {
if db == nil {
return ""
}
if base := strings.TrimSpace(db.einoWorkspaceRootDir); base != "" {
return base
}
return filepath.Join("tmp", "workspace")
}
func (db *DB) removeConversationScopedDirs(conversationID, projectID string) {
// summarization transcript, etc.
db.removeConversationScopedDir(db.conversationArtifactsDir, conversationID, "conversation_artifacts")
@@ -652,6 +662,8 @@ func (db *DB) removeConversationScopedDirs(conversationID, projectID string) {
if strings.TrimSpace(projectID) == "" {
reductionBase := filepath.Join(db.einoReductionBaseDir(), "conversations")
db.removeConversationScopedDir(reductionBase, conversationID, "reduction")
workspaceBase := filepath.Join(db.einoWorkspaceBaseDir(), "conversations")
db.removeConversationScopedDir(workspaceBase, conversationID, "workspace")
}
}
@@ -659,6 +671,9 @@ func (db *DB) removeProjectScopedDirs(projectID string) {
// Eino reduction persisted tool outputs (tmp/reduction/projects/<id>/).
reductionBase := filepath.Join(db.einoReductionBaseDir(), "projects")
db.removeConversationScopedDir(reductionBase, projectID, "reduction")
// Agent download/analysis workspace (tmp/workspace/projects/<id>/).
workspaceBase := filepath.Join(db.einoWorkspaceBaseDir(), "projects")
db.removeConversationScopedDir(workspaceBase, projectID, "workspace")
}
// SaveAgentTrace 保存最后一轮代理消息轨迹与助手输出摘要。
+17 -3
View File
@@ -20,7 +20,8 @@ func TestDeleteConversationRemovesEinoScopedDirs(t *testing.T) {
plantaskBase := filepath.Join(tmp, "skills", ".eino", "plantask")
checkpointBase := filepath.Join(tmp, "eino-checkpoints")
reductionBase := filepath.Join(tmp, "reduction")
db.SetEinoConversationDirs(plantaskBase, checkpointBase, reductionBase)
workspaceBase := filepath.Join(tmp, "workspace")
db.SetEinoConversationDirs(plantaskBase, checkpointBase, reductionBase, workspaceBase)
conv, err := db.CreateConversation("cleanup test", ConversationCreateMeta{})
if err != nil {
@@ -36,6 +37,7 @@ func TestDeleteConversationRemovesEinoScopedDirs(t *testing.T) {
{plantaskBase, "task-1.json"},
{checkpointBase, "runner-deep.ckpt"},
{filepath.Join(reductionBase, "conversations"), "tool-output.txt"},
{filepath.Join(workspaceBase, "conversations"), "page.html"},
} {
dir := filepath.Join(base.root, seg)
if err := os.MkdirAll(dir, 0o755); err != nil {
@@ -50,7 +52,7 @@ func TestDeleteConversationRemovesEinoScopedDirs(t *testing.T) {
t.Fatalf("DeleteConversation: %v", err)
}
for _, base := range []string{db.conversationArtifactsDir, plantaskBase, checkpointBase, filepath.Join(reductionBase, "conversations")} {
for _, base := range []string{db.conversationArtifactsDir, plantaskBase, checkpointBase, filepath.Join(reductionBase, "conversations"), filepath.Join(workspaceBase, "conversations")} {
dir := filepath.Join(base, seg)
if _, statErr := os.Stat(dir); !os.IsNotExist(statErr) {
t.Fatalf("expected removed dir %s, stat err=%v", dir, statErr)
@@ -68,7 +70,8 @@ func TestDeleteProjectRemovesReductionDir(t *testing.T) {
defer db.Close()
reductionBase := filepath.Join(tmp, "reduction")
db.SetEinoConversationDirs("", "", reductionBase)
workspaceBase := filepath.Join(tmp, "workspace")
db.SetEinoConversationDirs("", "", reductionBase, workspaceBase)
project, err := db.CreateProject(&Project{Name: "cleanup test"})
if err != nil {
@@ -82,6 +85,13 @@ func TestDeleteProjectRemovesReductionDir(t *testing.T) {
if err := os.WriteFile(filepath.Join(reductionDir, "call-1.txt"), []byte("x"), 0o644); err != nil {
t.Fatalf("write: %v", err)
}
workspaceDir := filepath.Join(workspaceBase, "projects", seg, "downloads")
if err := os.MkdirAll(workspaceDir, 0o755); err != nil {
t.Fatalf("mkdir %s: %v", workspaceDir, err)
}
if err := os.WriteFile(filepath.Join(workspaceDir, "app.js"), []byte("x"), 0o644); err != nil {
t.Fatalf("write workspace: %v", err)
}
if err := db.DeleteProject(project.ID); err != nil {
t.Fatalf("DeleteProject: %v", err)
@@ -91,4 +101,8 @@ func TestDeleteProjectRemovesReductionDir(t *testing.T) {
if _, statErr := os.Stat(projectReductionDir); !os.IsNotExist(statErr) {
t.Fatalf("expected removed dir %s, stat err=%v", projectReductionDir, statErr)
}
projectWorkspaceDir := filepath.Join(workspaceBase, "projects", seg)
if _, statErr := os.Stat(projectWorkspaceDir); !os.IsNotExist(statErr) {
t.Fatalf("expected removed dir %s, stat err=%v", projectWorkspaceDir, statErr)
}
}
+4 -1
View File
@@ -52,6 +52,7 @@ type DB struct {
einoPlantaskBaseDir string // skills_dir + plantask_rel_dir (per-conversation subdirs)
einoCheckpointBaseDir string // checkpoint_dir root (per-conversation subdirs)
einoReductionRootDir string // reduction_root_dir or default tmp/reduction (conversations/<id> subdirs)
einoWorkspaceRootDir string // workspace_root_dir or default tmp/workspace (projects|conversations/<id> subdirs)
checkpointLoopName string
checkpointStop chan struct{}
checkpointDone chan struct{}
@@ -161,13 +162,15 @@ func NewDB(dbPath string, logger *zap.Logger) (*DB, error) {
// SetEinoConversationDirs configures best-effort filesystem cleanup on DeleteConversation.
// plantaskBase is skills_root/plantask_rel (no conversation id); checkpointBase is checkpoint_dir root.
// reductionRoot is reduction_root_dir from config; empty uses tmp/reduction (conversation-scoped subdirs only).
func (db *DB) SetEinoConversationDirs(plantaskBase, checkpointBase, reductionRoot string) {
// workspaceRoot is agent.workspace_root_dir from config; empty uses tmp/workspace.
func (db *DB) SetEinoConversationDirs(plantaskBase, checkpointBase, reductionRoot, workspaceRoot string) {
if db == nil {
return
}
db.einoPlantaskBaseDir = strings.TrimSpace(plantaskBase)
db.einoCheckpointBaseDir = strings.TrimSpace(checkpointBase)
db.einoReductionRootDir = strings.TrimSpace(reductionRoot)
db.einoWorkspaceRootDir = strings.TrimSpace(workspaceRoot)
}
// initTables 初始化数据库表
+66 -31
View File
@@ -187,6 +187,14 @@ func (h *AgentHandler) SetAudit(s *audit.Service) {
h.audit = s
}
// TaskManager 返回 Agent 任务管理器(供 MCP 监控页终止 Eino execute 等)。
func (h *AgentHandler) TaskManager() *AgentTaskManager {
if h == nil {
return nil
}
return h.tasks
}
// CancelRunningTaskForConversation stops any in-flight agent work for the conversation (idempotent).
func (h *AgentHandler) CancelRunningTaskForConversation(conversationID string) {
if h == nil || conversationID == "" || h.tasks == nil {
@@ -644,7 +652,7 @@ func (h *AgentHandler) runRobotEinoSingleWithRetry(
) (string, string, error) {
resultMA, errMA := multiagent.RunEinoSingleChatModelAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger,
conversationID, h.conversationProjectID(conversationID), finalMessage, history, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID),
conversationID, h.conversationProjectID(conversationID), finalMessage, history, roleTools, progressCallback, nil, h.agentSessionContextBlock(conversationID),
)
if errMA != nil {
*taskStatus = "failed"
@@ -665,7 +673,7 @@ func (h *AgentHandler) runRobotMultiAgentWithRetry(
resultMA, errMA := multiagent.RunDeepAgent(
taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger,
conversationID, h.conversationProjectID(conversationID), finalMessage, history, roleTools, progressCallback,
h.agentsMarkdownDir, orchestration, nil, h.projectBlackboardBlock(conversationID),
h.agentsMarkdownDir, orchestration, nil, h.agentSessionContextBlock(conversationID),
)
if errMA != nil {
*taskStatus = "failed"
@@ -1291,6 +1299,55 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun
}
}
// cancelToolContinueAfter 仅终止当前工具调用,不停止整条 Agent 任务(对话「中断并继续」与 MCP 监控终止共用)。
func (h *AgentHandler) cancelToolContinueAfter(conversationID, preferredExecID, note string) (bool, gin.H) {
conversationID = strings.TrimSpace(conversationID)
if conversationID == "" || h.tasks.GetTask(conversationID) == nil {
return false, nil
}
note = strings.TrimSpace(note)
execID := strings.TrimSpace(preferredExecID)
if execID == "" {
execID = h.tasks.ActiveMCPExecutionID(conversationID)
}
if execID != "" {
if h.agent.CancelMCPToolExecutionWithNote(execID, note) {
return true, gin.H{
"status": "tool_abort_requested",
"conversationId": conversationID,
"executionId": execID,
"message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。",
"continueAfter": true,
"interruptWithNote": note != "",
"continueWithoutTool": false,
}
}
if h.tasks.AbortActiveEinoExecute(conversationID, note) {
return true, gin.H{
"status": "tool_abort_requested",
"conversationId": conversationID,
"executionId": execID,
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
"continueAfter": true,
"interruptWithNote": note != "",
"continueWithoutTool": false,
}
}
return false, nil
}
if h.tasks.AbortActiveEinoExecute(conversationID, note) {
return true, gin.H{
"status": "tool_abort_requested",
"conversationId": conversationID,
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
"continueAfter": true,
"interruptWithNote": note != "",
"continueWithoutTool": false,
}
}
return false, nil
}
// CancelAgentLoop 取消正在执行的任务
func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
var req struct {
@@ -1309,42 +1366,20 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到正在执行的任务"})
return
}
execID := h.tasks.ActiveMCPExecutionID(req.ConversationID)
note := strings.TrimSpace(req.Reason)
if execID != "" {
if !h.agent.CancelMCPToolExecutionWithNote(execID, note) {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"})
return
}
h.logger.Info("对话页仅终止当前 MCP 工具",
activeExec := strings.TrimSpace(h.tasks.ActiveMCPExecutionID(req.ConversationID))
if ok, payload := h.cancelToolContinueAfter(req.ConversationID, "", note); ok {
execID, _ := payload["executionId"].(string)
h.logger.Info("对话页仅终止当前工具",
zap.String("conversationId", req.ConversationID),
zap.String("executionId", execID),
zap.Bool("hasNote", note != ""),
)
c.JSON(http.StatusOK, gin.H{
"status": "tool_abort_requested",
"conversationId": req.ConversationID,
"executionId": execID,
"message": "已请求终止当前工具调用;工具返回后本轮推理将继续(与 MCP 监控页终止一致)。",
"continueAfter": true,
"interruptWithNote": note != "",
"continueWithoutTool": false,
})
c.JSON(http.StatusOK, payload)
return
}
if h.tasks.AbortActiveEinoExecute(req.ConversationID, note) {
h.logger.Info("对话页仅终止当前 Eino execute",
zap.String("conversationId", req.ConversationID),
zap.Bool("hasNote", note != ""),
)
c.JSON(http.StatusOK, gin.H{
"status": "tool_abort_requested",
"conversationId": req.ConversationID,
"message": "已请求终止当前 execute 命令;命令返回后本轮推理将继续。",
"continueAfter": true,
"interruptWithNote": note != "",
"continueWithoutTool": false,
})
if activeExec != "" {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行或该调用已结束"})
return
}
// 无进行中的 MCP 工具(模型纯推理/流式输出阶段):取消当前上下文并由 Eino 流式处理器合并用户补充后自动续跑。
+2 -2
View File
@@ -232,12 +232,12 @@ func (h *AgentHandler) executeOneBatchSubTask(queueID string, queue *BatchTaskQu
var runErr error
switch {
case useBatchMulti:
resultMA, runErr = multiagent.RunDeepAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, conversationID, h.conversationProjectID(conversationID), finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch, nil, h.projectBlackboardBlock(conversationID))
resultMA, runErr = multiagent.RunDeepAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, conversationID, h.conversationProjectID(conversationID), finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, h.agentsMarkdownDir, batchOrch, nil, h.agentSessionContextBlock(conversationID))
default:
if h.config == nil {
runErr = fmt.Errorf("服务器配置未加载")
} else {
resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, conversationID, h.conversationProjectID(conversationID), finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, nil, h.projectBlackboardBlock(conversationID))
resultMA, runErr = multiagent.RunEinoSingleChatModelAgent(taskCtx, h.config, &h.config.MultiAgent, h.agent, h.db, h.logger, conversationID, h.conversationProjectID(conversationID), finalMessage, []agent.ChatMessage{}, roleTools, progressCallback, nil, h.agentSessionContextBlock(conversationID))
}
}
+2 -2
View File
@@ -231,7 +231,7 @@ func (h *AgentHandler) EinoSingleAgentLoopStream(c *gin.Context) {
roleTools,
progressCallback,
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(conversationID),
h.agentSessionContextBlock(conversationID),
)
if result != nil && len(result.MCPExecutionIDs) > 0 {
@@ -416,7 +416,7 @@ func (h *AgentHandler) EinoSingleAgentLoop(c *gin.Context) {
prep.RoleTools,
progressCallback,
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
h.agentSessionContextBlock(prep.ConversationID),
)
if runErr == nil {
break
+75
View File
@@ -23,6 +23,8 @@ import (
type MonitorHandler struct {
mcpServer *mcp.Server
externalMCPMgr *mcp.ExternalMCPManager
taskManager *AgentTaskManager
agentHandler *AgentHandler
executor *security.Executor
db *database.DB
logger *zap.Logger
@@ -56,6 +58,16 @@ func (h *MonitorHandler) SetExternalMCPManager(mgr *mcp.ExternalMCPManager) {
h.externalMCPMgr = mgr
}
// SetTaskManager 设置 Agent 任务管理器(用于 Eino execute 等按 executionId 终止)。
func (h *MonitorHandler) SetTaskManager(mgr *AgentTaskManager) {
h.taskManager = mgr
}
// SetAgentHandler 设置 Agent 处理器(MCP 监控终止与对话页「中断并继续」共用逻辑)。
func (h *MonitorHandler) SetAgentHandler(ah *AgentHandler) {
h.agentHandler = ah
}
// MonitorResponse 监控响应
type MonitorResponse struct {
Executions []*mcp.ToolExecution `json:"executions"`
@@ -90,6 +102,7 @@ func (h *MonitorHandler) Monitor(c *gin.Context) {
toolName := normalizeToolNameFilter(c.Query("tool"))
executions, total := h.loadExecutionsWithPagination(page, pageSize, status, toolName)
h.enrichExecutionsConversationID(executions)
stats := h.loadStats()
totalPages := (total + pageSize - 1) / pageSize
@@ -247,6 +260,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
// 先从内部MCP服务器查找
exec, exists := h.mcpServer.GetExecution(id)
if exists {
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
c.JSON(http.StatusOK, exec)
return
}
@@ -255,6 +269,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
if h.externalMCPMgr != nil {
exec, exists = h.externalMCPMgr.GetExecution(id)
if exists {
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
c.JSON(http.StatusOK, exec)
return
}
@@ -264,6 +279,7 @@ func (h *MonitorHandler) GetExecution(c *gin.Context) {
if h.db != nil {
exec, err := h.db.GetToolExecution(id)
if err == nil && exec != nil {
h.enrichExecutionsConversationID([]*mcp.ToolExecution{exec})
c.JSON(http.StatusOK, exec)
return
}
@@ -290,6 +306,19 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) {
return
}
note = strings.TrimSpace(body.Note)
convID := h.conversationIDForRunningExecution(id)
if convID != "" && h.agentHandler != nil {
if ok, payload := h.agentHandler.cancelToolContinueAfter(convID, id, note); ok {
h.logger.Info("MCP 监控页终止工具(与对话中断并继续一致)",
zap.String("executionId", id),
zap.String("conversationId", convID),
zap.Bool("hasNote", note != ""),
)
c.JSON(http.StatusOK, payload)
return
}
}
if h.mcpServer.CancelToolExecutionWithNote(id, note) {
h.logger.Info("已请求取消 MCP 工具执行", zap.String("executionId", id), zap.String("source", "internal"), zap.Bool("hasNote", note != ""))
c.JSON(http.StatusOK, gin.H{"message": "已发送终止信号", "executionId": id})
@@ -303,6 +332,52 @@ func (h *MonitorHandler) CancelExecution(c *gin.Context) {
c.JSON(http.StatusNotFound, gin.H{"error": "未找到进行中的工具执行,或该任务已结束"})
}
func (h *MonitorHandler) enrichExecutionsConversationID(executions []*mcp.ToolExecution) {
for _, exec := range executions {
if exec == nil {
continue
}
exec.ConversationID = h.conversationIDForRunningExecution(exec.ID)
}
}
func (h *MonitorHandler) conversationIDForRunningExecution(executionID string) string {
executionID = strings.TrimSpace(executionID)
if executionID == "" || h.taskManager == nil {
return ""
}
if conv := h.taskManager.ConversationIDForActiveMCPExecution(executionID); conv != "" {
return conv
}
exec := h.lookupExecution(executionID)
if exec == nil || exec.Status != "running" {
return ""
}
if strings.TrimSpace(exec.ToolName) == "execute" {
if onlyConv, ok := h.taskManager.ConversationIDForActiveEinoExecute(); ok {
return onlyConv
}
}
return ""
}
func (h *MonitorHandler) lookupExecution(id string) *mcp.ToolExecution {
if exec, ok := h.mcpServer.GetExecution(id); ok {
return exec
}
if h.externalMCPMgr != nil {
if exec, ok := h.externalMCPMgr.GetExecution(id); ok {
return exec
}
}
if h.db != nil {
if exec, err := h.db.GetToolExecution(id); err == nil && exec != nil {
return exec
}
}
return nil
}
// BatchGetToolNames 批量获取工具执行的工具名称(消除前端 N+1 请求)
func (h *MonitorHandler) BatchGetToolNames(c *gin.Context) {
var req struct {
+2 -2
View File
@@ -243,7 +243,7 @@ func (h *AgentHandler) MultiAgentLoopStream(c *gin.Context) {
h.agentsMarkdownDir,
orch,
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(conversationID),
h.agentSessionContextBlock(conversationID),
)
if result != nil && len(result.MCPExecutionIDs) > 0 {
@@ -430,7 +430,7 @@ func (h *AgentHandler) MultiAgentLoop(c *gin.Context) {
h.agentsMarkdownDir,
strings.TrimSpace(req.Orchestration),
chatReasoningToClientIntent(req.Reasoning),
h.projectBlackboardBlock(prep.ConversationID),
h.agentSessionContextBlock(prep.ConversationID),
)
if runErr == nil {
break
+36
View File
@@ -7,6 +7,42 @@ import (
"go.uber.org/zap"
)
// agentSessionContextBlock 注入会话工作目录与项目黑板(用于 system prompt 追加块)。
func (h *AgentHandler) agentSessionContextBlock(conversationID string) string {
var parts []string
if ws := h.buildWorkspaceBlock(conversationID); ws != "" {
parts = append(parts, ws)
}
if bb := h.projectBlackboardBlock(conversationID); bb != "" {
parts = append(parts, bb)
}
return strings.Join(parts, "\n\n")
}
func (h *AgentHandler) buildWorkspaceBlock(conversationID string) string {
if h == nil || h.config == nil {
return ""
}
conversationID = strings.TrimSpace(conversationID)
if conversationID == "" {
return ""
}
projectID := h.conversationProjectID(conversationID)
rel := project.WorkspaceRootDir(h.config.Agent.WorkspaceRootDir, projectID, conversationID)
abs, err := project.EnsureWorkspace(rel)
if err != nil {
if h.logger != nil {
h.logger.Warn("创建会话工作目录失败",
zap.String("conversationId", conversationID),
zap.String("projectId", projectID),
zap.String("path", rel),
zap.Error(err))
}
return ""
}
return project.BuildWorkspaceBlock(abs)
}
// projectBlackboardBlock 根据对话 ID 构建项目事实索引块(用于注入 system prompt)。
func (h *AgentHandler) projectBlackboardBlock(conversationID string) string {
if h == nil || h.db == nil || h.config == nil {
+34
View File
@@ -103,6 +103,40 @@ func (m *AgentTaskManager) UnregisterActiveEinoExecute(conversationID string) {
}
}
// ConversationIDForActiveMCPExecution 根据当前登记的工具 executionId 反查会话 ID(供 MCP 监控页按 executionId 终止)。
func (m *AgentTaskManager) ConversationIDForActiveMCPExecution(executionID string) string {
executionID = strings.TrimSpace(executionID)
if executionID == "" {
return ""
}
m.mu.Lock()
defer m.mu.Unlock()
for convID, t := range m.tasks {
if t != nil && t.ActiveMCPExecutionID == executionID {
return convID
}
}
return ""
}
// ConversationIDForActiveEinoExecute 返回当前唯一进行 Eino execute 的会话 ID;多会话并行时返回空。
func (m *AgentTaskManager) ConversationIDForActiveEinoExecute() (string, bool) {
m.mu.Lock()
defer m.mu.Unlock()
var found string
count := 0
for convID, t := range m.tasks {
if t != nil && t.activeEinoExecuteCancel != nil {
found = convID
count++
}
}
if count == 1 {
return found, true
}
return "", false
}
// AbortActiveEinoExecute 终止当前 Eino execute 并暂存用户说明(与 MCP 工具终止一致)。
func (m *AgentTaskManager) AbortActiveEinoExecute(conversationID, note string) bool {
conversationID = strings.TrimSpace(conversationID)
@@ -38,3 +38,19 @@ func TestAbortActiveEinoExecute(t *testing.T) {
t.Fatal("second abort should fail when no active execute")
}
}
func TestConversationIDForActiveMCPExecution(t *testing.T) {
m := NewAgentTaskManager()
conv := "conv-mcp-exec"
_, err := m.StartTask(conv, "test", func(error) {})
if err != nil {
t.Fatalf("StartTask: %v", err)
}
m.RegisterRunningTool(conv, "exec-123")
if got := m.ConversationIDForActiveMCPExecution("exec-123"); got != conv {
t.Fatalf("got %q, want %q", got, conv)
}
if got := m.ConversationIDForActiveMCPExecution("missing"); got != "" {
t.Fatalf("missing should be empty, got %q", got)
}
}
+83 -16
View File
@@ -921,9 +921,8 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
return finalResult, executionID, nil
}
// RecordCompletedToolInvocation 将已在其它路径完成的工具调用写入监控存储(格式与 CallTool 结束后一致),
// 用于 Eino ADK filesystem execute 等未经过 CallTool 的场景;返回 executionId 供助手消息 mcpExecutionIds 关联。
func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
// BeginToolExecution 创建 running 状态的执行记录,供 Eino 等非 CallTool 路径在工具开始时落库。
func (s *Server) BeginToolExecution(toolName string, args map[string]interface{}) string {
if s == nil {
return ""
}
@@ -931,21 +930,73 @@ func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]
args = map[string]interface{}{}
}
executionID := uuid.New().String()
now := time.Now()
failed := invokeErr != nil
exec := &ToolExecution{
execution := &ToolExecution{
ID: executionID,
ToolName: toolName,
Arguments: args,
StartTime: now,
EndTime: &now,
Duration: 0,
Status: "running",
StartTime: time.Now(),
}
s.mu.Lock()
s.executions[executionID] = execution
s.cleanupOldExecutions()
s.mu.Unlock()
if s.storage != nil {
if err := s.storage.SaveToolExecution(execution); err != nil {
s.logger.Warn("保存执行记录到数据库失败", zap.Error(err))
}
}
return executionID
}
// FinishToolExecution 完成先前 BeginToolExecution 创建的记录;executionID 为空时等同 RecordCompletedToolInvocation。
func (s *Server) FinishToolExecution(executionID, toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
if s == nil {
return ""
}
if args == nil {
args = map[string]interface{}{}
}
id := strings.TrimSpace(executionID)
if id == "" {
return s.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr)
}
now := time.Now()
failed := invokeErr != nil
var finalResult *ToolResult
s.mu.Lock()
exec, inMem := s.executions[id]
if !inMem || exec == nil {
exec = &ToolExecution{
ID: id,
ToolName: toolName,
Arguments: args,
StartTime: now,
}
s.executions[id] = exec
} else if toolName != "" {
exec.ToolName = toolName
}
if len(args) > 0 {
exec.Arguments = args
}
exec.EndTime = &now
if exec.StartTime.IsZero() {
exec.StartTime = now
}
exec.Duration = now.Sub(exec.StartTime)
if failed {
exec.Status = "failed"
exec.Error = invokeErr.Error()
st, msg := executionStatusAndMessage(invokeErr)
exec.Status = st
exec.Error = msg
if strings.TrimSpace(resultText) != "" {
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: resultText}}}
finalResult = &ToolResult{Content: []Content{{Type: "text", Text: resultText}}}
exec.Result = finalResult
}
} else {
exec.Status = "completed"
@@ -953,15 +1004,31 @@ func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]
if strings.TrimSpace(text) == "" {
text = "(无输出)"
}
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: text}}}
finalResult = &ToolResult{Content: []Content{{Type: "text", Text: text}}}
exec.Result = finalResult
}
s.mu.Unlock()
if s.storage != nil {
if err := s.storage.SaveToolExecution(exec); err != nil {
s.logger.Warn("RecordCompletedToolInvocation 保存失败", zap.Error(err))
s.logger.Warn("保存执行记录到数据库失败", zap.Error(err))
}
}
s.updateStats(toolName, failed)
return executionID
s.updateStats(exec.ToolName, failed)
if s.storage != nil {
s.mu.Lock()
delete(s.executions, id)
s.mu.Unlock()
}
return id
}
// RecordCompletedToolInvocation 将已在其它路径完成的工具调用写入监控存储(格式与 CallTool 结束后一致),
// 用于 Eino ADK filesystem execute 等未经过 CallTool 的场景;返回 executionId 供助手消息 mcpExecutionIds 关联。
func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
return s.FinishToolExecution("", toolName, args, resultText, invokeErr)
}
// UpdateToolExecutionResult 将监控库中的工具结果更新为送入模型的展示正文(如 reduction 后的 persisted-output)。
+2
View File
@@ -199,6 +199,8 @@ type ToolExecution struct {
StartTime time.Time `json:"startTime"`
EndTime *time.Time `json:"endTime,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
// ConversationID 仅 API 展示用(进行中的 Agent 任务),不写入 tool_executions 表。
ConversationID string `json:"conversationId,omitempty"`
}
// ToolStats 工具统计信息
+64 -19
View File
@@ -18,6 +18,7 @@ import (
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/einoobserve"
"cyberstrike-ai/internal/openai"
"cyberstrike-ai/internal/security"
"github.com/cloudwego/eino/adk"
"github.com/cloudwego/eino/schema"
@@ -196,6 +197,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
pendingByID[tc.ToolCallID] = tc
pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID)
}
markPendingWithMonitor := func(tc toolCallPendingInfo) {
markPending(tc)
beginEinoADKFilesystemToolMonitor(
args.FilesystemMonitorAgent,
args.FilesystemMonitorRecord,
args.MCPExecutionBinder,
tc.ToolCallID,
tc.ToolName,
)
}
popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) {
pendingMu.Lock()
defer pendingMu.Unlock()
@@ -331,7 +342,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
toolCallID = tid
}
recordPendingExecuteStdoutDup(toolName, content, isErr)
recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, toolName, toolCallID, runAccumulatedMsgs, content, isErr)
recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, args.MCPExecutionBinder, toolName, toolCallID, runAccumulatedMsgs, content, isErr)
if args.FilesystemMonitorAgent != nil && args.MCPExecutionBinder != nil {
if execID := args.MCPExecutionBinder.ExecutionID(toolCallID); execID != "" {
args.FilesystemMonitorAgent.UpdateMCPExecutionDisplayResult(execID, content)
@@ -344,10 +355,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
// Eino execute / MCP 桥在工具返回时 Fire;若 ADK schema.Tool 事件迟迟不到,此处立即推送
// tool_result 解除 UI「执行中」。tryEmitToolResultProgress 经 toolResultSent 去重,ADK 晚到不重复。
isErr := !success || invokeErr != nil
body := content
if strings.HasPrefix(body, einomcp.ToolErrorPrefix) {
body := einoToolResultBody(content)
if einoToolResultIsError(toolName, content) {
isErr = true
body = strings.TrimPrefix(body, einomcp.ToolErrorPrefix)
}
if tail := friendlyEinoExecuteInvokeTail(invokeErr); tail != "" {
if body == "" {
@@ -553,6 +563,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
return true, nil
}
// 仅在退避重试后真正收到数据/完成一步时清零,避免重启后首个无错 ADK 事件误把计数打回 0。
confirmTransientRetryRecovery := func() {
if transientRetrier.attempt() > 0 {
transientRetrier.reset()
}
}
takePartial := func(runErr error) (*RunResult, error) {
if len(runAccumulatedMsgs) <= baseAccumulatedCount {
return nil, runErr
@@ -638,8 +655,6 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
if restarted {
continue
}
} else {
transientRetrier.reset()
}
if ev.AgentName != "" && progress != nil {
iterEinoAgent := orchestratorName
@@ -703,11 +718,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
if mv.IsStreaming && mv.MessageStream != nil && mv.Role == schema.Tool {
toolName := strings.TrimSpace(mv.ToolName)
content, streamToolCallID, toolStreamRecvErr := recvSchemaMessageStream(ctx, mv.MessageStream)
isErr := false
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
isErr = true
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
}
isErr := einoToolResultIsError(toolName, content)
content = einoToolResultBody(content)
if streamToolCallID != "" {
opts := []schema.ToolMessageOption{schema.WithToolName(toolName)}
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.ToolMessage(content, streamToolCallID, opts...))
@@ -719,6 +731,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
zap.String("agent", ev.AgentName),
zap.String("tool", toolName))
}
if toolStreamRecvErr == nil {
confirmTransientRetryRecovery()
}
continue
}
@@ -966,7 +981,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged})
}
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor)
// 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。
if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 {
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls))
@@ -990,6 +1005,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
if restarted {
continue
}
} else {
confirmTransientRetryRecovery()
}
continue
}
@@ -999,7 +1016,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor)
if mv.Role == schema.Assistant {
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
@@ -1074,15 +1091,13 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
content := msg.Content
isErr := false
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
isErr = true
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
}
isErr := einoToolResultIsError(toolName, content)
content = einoToolResultBody(content)
toolCallID := strings.TrimSpace(msg.ToolCallID)
tryEmitToolResultProgress(toolName, content, toolCallID, isErr, ev.AgentName)
}
confirmTransientRetryRecovery()
}
mcpIDsMu.Lock()
@@ -1110,17 +1125,47 @@ func einoPartialRunLastOutputHint() string {
"[Run ended abnormally; continue from the trace above without repeating completed steps.]"
}
// friendlyEinoExecuteInvokeTail 将 Eino execute 等非 MCP 路径的结尾错误转成简短提示;其它情况保留原 error 文本
// friendlyEinoExecuteInvokeTail 将 Eino execute 超时/中断/流异常转为简短提示
// 命令非零退出(ExecuteExitError)已有 exec 对齐的正文,不再追加「执行未正常结束」。
func friendlyEinoExecuteInvokeTail(invokeErr error) string {
if invokeErr == nil {
return ""
}
var exitErr *ExecuteExitError
if errors.As(invokeErr, &exitErr) {
return ""
}
if errors.Is(invokeErr, context.DeadlineExceeded) {
return einoExecuteTimeoutUserHint()
}
if errors.Is(invokeErr, context.Canceled) {
return ""
}
if strings.Contains(invokeErr.Error(), "shell inactivity timeout") {
return ""
}
return "[执行未正常结束] " + invokeErr.Error()
}
// einoToolResultIsError 统一判断 Eino 工具结果是否应标记为错误(与 MCP exec 的 IsError 对齐)。
func einoToolResultIsError(toolName, content string) bool {
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
return true
}
if strings.TrimSpace(toolName) == "execute" && security.IsCommandFailureResult(content) {
return true
}
return false
}
// einoToolResultBody 去掉工具错误前缀,返回展示/持久化正文。
func einoToolResultBody(content string) string {
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
return strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
}
return content
}
// nextAgentEventWithContext 在 ctx 取消时不再无限阻塞于 iter.Next()(工具执行/模型推理期间常见)。
func nextAgentEventWithContext(ctx context.Context, iter *adk.AsyncIterator[*adk.AgentEvent]) (ev *adk.AgentEvent, ok bool, ctxErr error) {
if iter == nil {
@@ -0,0 +1,114 @@
package multiagent
import (
"context"
"errors"
"io"
"strings"
"testing"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/security"
"github.com/cloudwego/eino/adk/filesystem"
"github.com/cloudwego/eino/schema"
)
type mockStreamingShellExitFail struct {
output string
code int
}
func (m *mockStreamingShellExitFail) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
go func() {
defer outW.Close()
if m.output != "" {
_ = outW.Send(&filesystem.ExecuteResponse{Output: m.output}, nil)
}
code := m.code
_ = outW.Send(&filesystem.ExecuteResponse{ExitCode: &code}, nil)
}()
return outR, nil
}
func TestEinoStreamingShellWrap_CommandFailureFormat(t *testing.T) {
inner := &mockStreamingShellExitFail{
output: "sudo: a password is required\n",
code: 1,
}
notify := einomcp.NewToolInvokeNotifyHolder()
var firedBody string
var firedSuccess bool
var firedErr error
notify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
firedBody = content
firedSuccess = success
firedErr = invokeErr
})
wrap := &einoStreamingShellWrap{inner: inner, invokeNotify: notify}
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
var stream strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
if resp != nil {
stream.WriteString(resp.Output)
}
}
if firedSuccess {
t.Fatal("expected success=false")
}
var exitErr *ExecuteExitError
if !errors.As(firedErr, &exitErr) || exitErr.Code != 1 {
t.Fatalf("expected ExecuteExitError code 1, got %v", firedErr)
}
if !strings.HasPrefix(firedBody, einomcp.ToolErrorPrefix) {
t.Fatalf("missing tool error prefix: %q", firedBody)
}
body := strings.TrimPrefix(firedBody, einomcp.ToolErrorPrefix)
if body != security.FormatCommandFailureResult(1, "sudo: a password is required\n") {
t.Fatalf("fire body = %q", body)
}
if !strings.Contains(stream.String(), "sudo:") {
t.Fatalf("stream missing sudo output: %q", stream.String())
}
if strings.Contains(stream.String(), "command exited with non-zero") {
t.Fatalf("stream has legacy noise: %q", stream.String())
}
if strings.Contains(stream.String(), "执行未正常结束") {
t.Fatalf("stream has abnormal tail: %q", stream.String())
}
if !security.IsCommandFailureResult(stream.String()) {
t.Fatalf("stream missing failure status line: %q", stream.String())
}
if tail := friendlyEinoExecuteInvokeTail(firedErr); tail != "" {
t.Fatalf("unexpected invoke tail: %q", tail)
}
if !einoToolResultIsError("execute", firedBody) {
t.Fatal("expected isError for execute failure")
}
}
func TestFriendlyEinoExecuteInvokeTail(t *testing.T) {
if friendlyEinoExecuteInvokeTail(&ExecuteExitError{Code: 1}) != "" {
t.Fatal("exit error should not get abnormal tail")
}
if !strings.Contains(friendlyEinoExecuteInvokeTail(context.DeadlineExceeded), "Timed out") {
t.Fatal("deadline should get timeout hint")
}
if friendlyEinoExecuteInvokeTail(errors.New("broken pipe")) == "" {
t.Fatal("unexpected error should get tail")
}
}
+22 -7
View File
@@ -7,11 +7,25 @@ import (
"cyberstrike-ai/internal/einomcp"
)
// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId)
// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片
func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(toolCallID, command, stdout string, success bool, invokeErr error) {
return func(toolCallID, command, stdout string, success bool, invokeErr error) {
if ag == nil || recorder == nil {
// newEinoExecuteMonitorCallbacks 在 Eino filesystem execute 开始/结束时写入 MCP 监控库并 recorder(executionId)
// 与 CallTool 路径一致,使监控页能展示「执行中」状态
func newEinoExecuteMonitorCallbacks(ag *agent.Agent, recorder einomcp.ExecutionRecorder) (
begin func(toolCallID, command string) string,
finish func(executionID, toolCallID, command, stdout string, success bool, invokeErr error),
) {
begin = func(toolCallID, command string) string {
if ag == nil {
return ""
}
args := map[string]interface{}{"command": command}
id := ag.BeginLocalToolExecution("execute", args)
if id != "" && recorder != nil {
recorder(id, toolCallID)
}
return id
}
finish = func(executionID, toolCallID, command, stdout string, success bool, invokeErr error) {
if ag == nil {
return
}
var err error
@@ -23,9 +37,10 @@ func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRe
}
}
args := map[string]interface{}{"command": command}
id := ag.RecordLocalToolExecution("execute", args, stdout, err)
if id != "" {
id := ag.FinishLocalToolExecution(executionID, "execute", args, stdout, err)
if id != "" && recorder != nil && executionID == "" {
recorder(id, toolCallID)
}
}
return begin, finish
}
@@ -63,8 +63,11 @@ type einoStreamingShellWrap struct {
outputChunk func(toolName, toolCallID, chunk string)
// toolTimeoutMinutes 与 agent.tool_timeout_minutes 对齐;>0 时对单次 execute 套用 context 超时(与 MCP 工具经 executeToolViaMCP 行为一致)。0 表示仅依赖上层 ctx(如整任务 10h 上限)。
toolTimeoutMinutes int
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致
recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error)
// shellNoOutputTimeoutSec:无任何输出时的空闲秒数;0=关闭
shellNoOutputTimeoutSec int
// beginMonitor 在 execute 开始时写入 running 状态;finishMonitor 在流结束后更新为 completed/failed。
beginMonitor func(toolCallID, command string) string
finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error)
}
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
@@ -76,15 +79,26 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
}
req := *input
userCmd := strings.TrimSpace(req.Command)
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
agentTag := strings.TrimSpace(w.einoAgentName)
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
req.RunInBackendGround = true
}
req.Command = prependPythonUnbufferedEnv(req.Command)
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
agentTag := strings.TrimSpace(w.einoAgentName)
req.Command = security.PrepareNonInteractiveShellCommand(prependPythonUnbufferedEnv(req.Command))
convID := mcp.MCPConversationIDFromContext(ctx)
execReg := mcp.EinoExecuteRunRegistryFromContext(ctx)
var monitorExecID string
if w.beginMonitor != nil {
monitorExecID = w.beginMonitor(tid, userCmd)
}
if monitorExecID != "" && convID != "" {
if toolReg := mcp.ToolRunRegistryFromContext(ctx); toolReg != nil {
toolReg.RegisterRunningTool(convID, monitorExecID)
}
}
toolRunReg := mcp.ToolRunRegistryFromContext(ctx)
execCtx, execCancel := context.WithCancel(ctx)
var timeoutCancel context.CancelFunc
if w.toolTimeoutMinutes > 0 {
@@ -104,23 +118,23 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
}
if einoExecuteRecvErrIsToolTimeout(err, execCtx) {
hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n"
if w.recordMonitor != nil {
w.recordMonitor(tid, userCmd, hint, false, context.DeadlineExceeded)
if w.finishMonitor != nil {
w.finishMonitor(monitorExecID, tid, userCmd, hint, false, context.DeadlineExceeded)
}
if w.invokeNotify != nil && tid != "" {
w.invokeNotify.Fire(tid, "execute", agentTag, false, hint, context.DeadlineExceeded)
}
return schema.StreamReaderFromArray([]*filesystem.ExecuteResponse{{Output: hint}}), nil
}
if w.recordMonitor != nil {
w.recordMonitor(tid, userCmd, "", false, err)
if w.finishMonitor != nil {
w.finishMonitor(monitorExecID, tid, userCmd, "", false, err)
}
if w.invokeNotify != nil && tid != "" {
w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err)
}
return nil, err
}
if sr == nil || w.invokeNotify == nil {
if sr == nil {
if timeoutCancel != nil {
timeoutCancel()
}
@@ -132,7 +146,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry) {
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry, toolReg mcp.ToolRunRegistry, execID string, toolCallID string, noOutputSec int) {
var innerCloseOnce sync.Once
closeInner := func() {
innerCloseOnce.Do(func() { inner.Close() })
@@ -147,6 +161,9 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
if reg != nil && conversationID != "" {
defer reg.UnregisterActiveEinoExecute(conversationID)
}
if toolReg != nil && conversationID != "" && execID != "" {
defer toolReg.UnregisterRunningTool(conversationID, execID)
}
// ctx 取消时关闭内层流,避免 amass 等长时间无换行输出时 Recv 永久阻塞。
stopWatch := make(chan struct{})
@@ -165,50 +182,103 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
exitCode := 0
hasExitCode := false
idleWatch := security.NewShellInactivityWatch(noOutputSec)
if idleWatch != nil {
defer idleWatch.Stop()
}
type execRecvMsg struct {
resp *filesystem.ExecuteResponse
err error
}
recvCh := make(chan execRecvMsg, 1)
go func() {
for {
resp, rerr := inner.Recv()
recvCh <- execRecvMsg{resp: resp, err: rerr}
if rerr != nil {
return
}
}
}()
fireInactivityTimeout := func() {
success = false
invokeErr = fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec)
msg := security.ShellNoOutputTimeoutMessage(idleWatch.Sec)
_ = outW.Send(&filesystem.ExecuteResponse{Output: msg}, nil)
sb.WriteString(msg)
if w.outputChunk != nil && toolCallID != "" {
w.outputChunk("execute", toolCallID, msg)
}
if cancel != nil {
cancel()
}
closeInner()
}
recvLoop:
for {
resp, rerr := inner.Recv()
if errors.Is(rerr, io.EOF) {
break
var idleCh <-chan struct{}
if idleWatch != nil {
idleCh = idleWatch.Expired
}
if rerr != nil {
success = false
invokeErr = rerr
// 单次 execute 超时须与 MCP 工具一致:写入工具结果尾标、继续迭代,不得向 ADK 流注入硬错误。
if einoExecuteRecvErrIsToolTimeout(rerr, tctx) {
invokeErr = context.DeadlineExceeded
break
select {
case <-idleCh:
fireInactivityTimeout()
break recvLoop
case msg := <-recvCh:
rerr := msg.err
resp := msg.resp
if errors.Is(rerr, io.EOF) {
break recvLoop
}
if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) {
invokeErr = context.Canceled
break
}
_ = outW.Send(nil, rerr)
break
}
if resp != nil {
if resp.ExitCode != nil {
hasExitCode = true
exitCode = *resp.ExitCode
}
var appended string
if resp.Output != "" {
sb.WriteString(resp.Output)
appended = resp.Output
}
if w.outputChunk != nil && strings.TrimSpace(appended) != "" {
w.outputChunk("execute", tid, appended)
}
if outW.Send(resp, nil) {
if rerr != nil {
success = false
invokeErr = fmt.Errorf("execute stream closed by consumer")
break
invokeErr = rerr
if einoExecuteRecvErrIsToolTimeout(rerr, tctx) {
invokeErr = context.DeadlineExceeded
break recvLoop
}
if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) {
invokeErr = context.Canceled
break recvLoop
}
_ = outW.Send(nil, rerr)
break recvLoop
}
if resp != nil {
if resp.ExitCode != nil {
hasExitCode = true
exitCode = *resp.ExitCode
continue
}
var appended string
if resp.Output != "" {
if security.IsLegacyShellExitNoise(resp.Output) {
continue
}
if idleWatch != nil {
idleWatch.Bump()
}
sb.WriteString(resp.Output)
appended = resp.Output
}
if w.outputChunk != nil && strings.TrimSpace(appended) != "" {
w.outputChunk("execute", toolCallID, appended)
}
if outW.Send(resp, nil) {
success = false
invokeErr = fmt.Errorf("execute stream closed by consumer")
break recvLoop
}
}
}
}
if success && hasExitCode && exitCode != 0 {
success = false
invokeErr = fmt.Errorf("execute exited with code %d", exitCode)
invokeErr = &ExecuteExitError{Code: exitCode}
}
// WithTimeout 触发后,子进程常被信号结束,local 侧多报 exit -1 / canceled,错误链里不一定带 DeadlineExceeded。
// 用执行所用 ctx 归一化,便于 UI 展示「超时」而非含糊的 -1。
@@ -248,12 +318,24 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
_ = outW.Send(&filesystem.ExecuteResponse{Output: text + "\n"}, nil)
}
}
if w.recordMonitor != nil {
w.recordMonitor(tid, command, sb.String(), success, invokeErr)
rawOutput := sb.String()
fireBody := rawOutput
if !success && hasExitCode && exitCode != 0 {
statusLine := security.ExecuteFailureStatusLine(exitCode)
if !strings.Contains(rawOutput, "命令执行失败:") {
_ = outW.Send(&filesystem.ExecuteResponse{Output: statusLine}, nil)
sb.WriteString(statusLine)
}
fireBody = einomcp.ToolErrorPrefix + security.FormatCommandFailureResult(exitCode, rawOutput)
}
if w.finishMonitor != nil {
w.finishMonitor(execID, toolCallID, command, sb.String(), success, invokeErr)
}
if w.invokeNotify != nil {
w.invokeNotify.Fire(toolCallID, "execute", agentTag, success, fireBody, invokeErr)
}
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
outW.Close()
}(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg)
}(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg, toolRunReg, monitorExecID, tid, w.shellNoOutputTimeoutSec)
return outR, nil
}
@@ -19,9 +19,15 @@ type mockStreamingShell struct {
immediateErr error
recvErr error
output string
called bool
lastCommand string
}
func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
m.called = true
if input != nil {
m.lastCommand = input.Command
}
if m.immediateErr != nil {
return nil, m.immediateErr
}
@@ -38,6 +44,135 @@ func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesy
return outR, nil
}
func TestEinoStreamingShellWrap_PreparesNonInteractiveCommand(t *testing.T) {
inner := &mockStreamingShell{output: "ok\n"}
wrap := &einoStreamingShellWrap{inner: inner}
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "echo ok"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
for {
_, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
}
if !strings.Contains(inner.lastCommand, "exec </dev/null") {
t.Fatalf("missing stdin redirect in inner command: %q", inner.lastCommand)
}
if !strings.Contains(inner.lastCommand, "GIT_PAGER=cat") {
t.Fatalf("missing pager export in inner command: %q", inner.lastCommand)
}
if !strings.Contains(inner.lastCommand, "PYTHONUNBUFFERED=1") {
t.Fatalf("missing python unbuffer in inner command: %q", inner.lastCommand)
}
}
func TestEinoStreamingShellWrap_NoOutputTimeout(t *testing.T) {
inner := &mockStreamingShellHanging{}
notify := einomcp.NewToolInvokeNotifyHolder()
var fired string
notify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
fired = content
})
wrap := &einoStreamingShellWrap{
inner: inner,
invokeNotify: notify,
shellNoOutputTimeoutSec: 1,
}
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
if resp != nil {
got.WriteString(resp.Output)
}
}
if !inner.called {
t.Fatal("inner shell should run (no command blacklist)")
}
out := got.String()
if !strings.Contains(out, "没有新的输出") && !strings.Contains(out, "no new output") {
t.Fatalf("expected inactivity timeout message, got: %q notify=%q", out, fired)
}
}
type mockStreamingShellPartialThenHang struct {
called bool
}
func (m *mockStreamingShellPartialThenHang) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
m.called = true
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
go func() {
_ = outW.Send(&filesystem.ExecuteResponse{Output: "[sudo] password:\n"}, nil)
<-ctx.Done()
outW.Close()
}()
return outR, nil
}
func TestEinoStreamingShellWrap_InactivityAfterPartialOutput(t *testing.T) {
inner := &mockStreamingShellPartialThenHang{}
wrap := &einoStreamingShellWrap{
inner: inner,
shellNoOutputTimeoutSec: 1,
}
start := time.Now()
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
if resp != nil {
got.WriteString(resp.Output)
}
}
if time.Since(start) > 5*time.Second {
t.Fatalf("expected inactivity timeout ~1s, took %v", time.Since(start))
}
if !strings.Contains(got.String(), "没有新的输出") && !strings.Contains(got.String(), "no new output") {
t.Fatalf("expected inactivity message, got: %q", got.String())
}
}
type mockStreamingShellHanging struct {
called bool
}
func (m *mockStreamingShellHanging) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
m.called = true
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
go func() {
<-ctx.Done()
outW.Close()
}()
return outR, nil
}
func TestEinoExecuteRecvErrIsToolTimeout(t *testing.T) {
tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
@@ -63,10 +63,43 @@ func toolCallArgsFromAccumulated(msgs []adk.Message, toolCallID, expectToolName
return map[string]interface{}{}
}
// beginEinoADKFilesystemToolMonitor 在 Eino ADK filesystem 工具开始调用时写入 running 状态。
func beginEinoADKFilesystemToolMonitor(
ag *agent.Agent,
rec einomcp.ExecutionRecorder,
binder *MCPExecutionBinder,
toolCallID, toolName string,
) {
if ag == nil || rec == nil {
return
}
name := strings.TrimSpace(toolName)
if name == "" || strings.EqualFold(name, "execute") {
return
}
if !isBuiltinEinoADKFilesystemToolName(name) {
return
}
tid := strings.TrimSpace(toolCallID)
if tid == "" {
return
}
storedName := "eino_fs::" + strings.ToLower(name)
id := ag.BeginLocalToolExecution(storedName, map[string]interface{}{})
if id == "" {
return
}
rec(id, tid)
if binder != nil {
binder.Bind(tid, id)
}
}
// recordEinoADKFilesystemToolMonitor 将 Eino ADK filesystem 中间件工具结果写入 MCP 监控(与 execute / MCP 桥芯片一致)。
func recordEinoADKFilesystemToolMonitor(
ag *agent.Agent,
rec einomcp.ExecutionRecorder,
binder *MCPExecutionBinder,
toolName string,
toolCallID string,
msgs []adk.Message,
@@ -94,8 +127,12 @@ func recordEinoADKFilesystemToolMonitor(
invErr = errors.New(t)
}
}
id := ag.RecordLocalToolExecution(storedName, args, resultText, invErr)
if id != "" {
execID := ""
if binder != nil {
execID = binder.ExecutionID(toolCallID)
}
id := ag.FinishLocalToolExecution(execID, storedName, args, resultText, invErr)
if id != "" && execID == "" {
rec(id, toolCallID)
}
}
+2 -2
View File
@@ -81,7 +81,7 @@ func RunEinoSingleChatModelAgent(
}
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder)
mainDefs := ag.ToolsForRole(roleTools)
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, nil, toolInvokeNotify, einoSingleAgentName)
if err != nil {
@@ -136,7 +136,7 @@ func RunEinoSingleChatModelAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
if fsErr != nil {
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
}
+27 -7
View File
@@ -9,6 +9,7 @@ import (
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/security"
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
"github.com/cloudwego/eino/adk"
@@ -81,8 +82,10 @@ func subAgentFilesystemMiddleware(
loc *localbk.Local,
invokeNotify *einomcp.ToolInvokeNotifyHolder,
einoAgentName string,
recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error),
beginMonitor func(toolCallID, command string) string,
finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error),
toolTimeoutMinutes int,
shellNoOutputTimeoutSec int,
outputChunk func(toolName, toolCallID, chunk string),
) (adk.ChatModelAgentMiddleware, error) {
if loc == nil {
@@ -91,12 +94,14 @@ func subAgentFilesystemMiddleware(
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
Backend: loc,
StreamingShell: &einoStreamingShellWrap{
inner: loc,
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
outputChunk: outputChunk,
recordMonitor: recordMonitor,
toolTimeoutMinutes: toolTimeoutMinutes,
inner: security.NewEinoStreamingShell(),
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
outputChunk: outputChunk,
beginMonitor: beginMonitor,
finishMonitor: finishMonitor,
toolTimeoutMinutes: toolTimeoutMinutes,
shellNoOutputTimeoutSec: shellNoOutputTimeoutSec,
},
})
}
@@ -108,3 +113,18 @@ func agentToolTimeoutMinutes(cfg *config.Config) int {
}
return cfg.Agent.ToolTimeoutMinutes
}
// agentShellNoOutputTimeoutSeconds0=默认 300s5 分钟);-1=关闭;>0=自定义秒数。
func agentShellNoOutputTimeoutSeconds(cfg *config.Config) int {
if cfg == nil {
return 300
}
v := cfg.Agent.ShellNoOutputTimeoutSeconds
if v < 0 {
return 0
}
if v == 0 {
return 300
}
return v
}
@@ -46,6 +46,10 @@ func injectToolNamesOnlyInstruction(ctx context.Context, instruction string, too
sb.WriteString("2) 调用具体工具前,请先确认该工具的参数要求(以当前请求中的工具定义为准);不确定时先澄清再调用。\n")
sb.WriteString("3) 不要臆造不存在的工具名。\n\n")
}
if s := strings.TrimSpace(injectShellToolGuidance("", names)); s != "" {
sb.WriteString(s)
sb.WriteString("\n\n")
}
if s := strings.TrimSpace(instruction); s != "" {
sb.WriteString(s)
}
+1 -1
View File
@@ -143,7 +143,7 @@ func (r *einoTransientRunRetrier) attempt() int { return r.attempts }
func (r *einoTransientRunRetrier) maxAttempts() int { return r.policy.maxAttempts }
// reset 在一次成功推进后清零重试计数,使后续临时错误从第 1 次退避重新开始。
// reset 在退避重试后成功推进(流/消息完整接收)时清零计数,使后续临时错误从第 1 次退避重新开始。
func (r *einoTransientRunRetrier) reset() { r.attempts = 0 }
func einoRunRetryMaxAttempts(args *einoADKRunLoopArgs) int {
@@ -105,6 +105,32 @@ func TestEinoTransientRunRetrierReset(t *testing.T) {
}
}
func TestEinoTransientRunRetrierConsecutiveFailures(t *testing.T) {
t.Parallel()
r := newEinoTransientRunRetrier(einoTransientRunRetryPolicy{maxAttempts: 10, maxBackoff: 30 * time.Second})
ctx := context.Background()
runErr := errors.New("internal server error")
args := &einoADKRunLoopArgs{}
base := []adk.Message{schema.UserMessage("hi")}
for want := 1; want <= 3; want++ {
restarted, _, _, _, err := r.tryRetry(ctx, runErr, args, base, nil, len(base))
if err != nil {
t.Fatalf("tryRetry attempt %d: %v", want, err)
}
if !restarted {
t.Fatalf("tryRetry attempt %d: want restarted", want)
}
if got := r.attempt(); got != want {
t.Fatalf("after failure %d: attempt=%d, want %d", want, got, want)
}
}
r.reset()
if r.attempt() != 0 {
t.Fatalf("after successful recovery reset: attempt=%d, want 0", r.attempt())
}
}
func TestAppendUserMessageIfNeeded(t *testing.T) {
t.Parallel()
msgs := []adk.Message{schema.UserMessage("old task")}
+15
View File
@@ -0,0 +1,15 @@
package multiagent
import "fmt"
// ExecuteExitError 表示 execute 命令非零退出(预期失败,非超时/中断/流异常)。
type ExecuteExitError struct {
Code int
}
func (e *ExecuteExitError) Error() string {
if e == nil {
return "exit status unknown"
}
return fmt.Sprintf("exit status %d", e.Code)
}
@@ -6,6 +6,7 @@ import (
"cyberstrike-ai/internal/agents"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/project"
"cyberstrike-ai/internal/projectprompt"
)
// DefaultPlanExecuteOrchestratorInstruction 当未配置 plan_execute 专用 Markdown / YAML 时的内置主代理(规划/重规划侧)提示。
@@ -122,7 +123,9 @@ func DefaultPlanExecuteOrchestratorInstruction() string {
## 表达
在调用工具或给出计划变更前 25 句中文说明当前决策依据与期望证据形态最终对用户交付结构化结论发现摘要证据风险下一步`
在调用工具或给出计划变更前 25 句中文说明当前决策依据与期望证据形态最终对用户交付结构化结论发现摘要证据风险下一步
` + projectprompt.ShellExecExecuteGuidanceSection()
}
// DefaultSupervisorOrchestratorInstruction 当未配置 supervisor 专用 Markdown / YAML 时的内置监督者提示(transfer / exit 说明仍由运行时在末尾追加)。
+12 -9
View File
@@ -20,6 +20,7 @@ import (
"cyberstrike-ai/internal/openai"
"cyberstrike-ai/internal/project"
"cyberstrike-ai/internal/reasoning"
"cyberstrike-ai/internal/security"
einoopenai "github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/adk"
@@ -120,7 +121,7 @@ func RunDeepAgent(
mcpIDs = append(mcpIDs, id)
mcpIDsMu.Unlock()
}
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder)
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
snapshotMCPIDs := func() []string {
@@ -223,7 +224,7 @@ func RunDeepAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
if fsErr != nil {
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
}
@@ -358,12 +359,14 @@ func RunDeepAgent(
if einoLoc != nil && einoFSTools {
deepBackend = einoLoc
deepShell = &einoStreamingShellWrap{
inner: einoLoc,
invokeNotify: toolInvokeNotify,
einoAgentName: orchestratorName,
outputChunk: nil,
recordMonitor: einoExecMonitor,
toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg),
inner: security.NewEinoStreamingShell(),
invokeNotify: toolInvokeNotify,
einoAgentName: orchestratorName,
outputChunk: nil,
beginMonitor: einoExecBegin,
finishMonitor: einoExecFinish,
toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg),
shellNoOutputTimeoutSec: agentShellNoOutputTimeoutSeconds(appCfg),
}
}
@@ -428,7 +431,7 @@ func RunDeepAgent(
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
var peFsMw adk.ChatModelAgentMiddleware
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
if err != nil {
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
}
@@ -0,0 +1,33 @@
package multiagent
import (
"strings"
"cyberstrike-ai/internal/projectprompt"
)
func shellToolsPresent(toolNames []string) bool {
for _, n := range toolNames {
switch strings.ToLower(strings.TrimSpace(n)) {
case "exec", "execute":
return true
}
}
return false
}
// injectShellToolGuidance 在系统提示末尾追加 exec/execute 分工(仅当工具列表含 exec 或 execute)。
func injectShellToolGuidance(instruction string, toolNames []string) string {
if !shellToolsPresent(toolNames) {
return instruction
}
block := strings.TrimSpace(projectprompt.ShellExecExecuteGuidanceSection())
if block == "" {
return instruction
}
s := strings.TrimSpace(instruction)
if s == "" {
return block
}
return s + "\n\n" + block
}
@@ -0,0 +1,17 @@
package multiagent
import (
"strings"
"testing"
)
func TestInjectShellToolGuidance(t *testing.T) {
got := injectShellToolGuidance("base", []string{"nmap"})
if got != "base" {
t.Fatalf("expected unchanged, got %q", got)
}
got = injectShellToolGuidance("base", []string{"exec", "nmap"})
if !strings.Contains(got, "exec/execute") || !strings.Contains(got, "base") {
t.Fatalf("expected shell guidance appended, got %q", got)
}
}
+68
View File
@@ -0,0 +1,68 @@
package project
import (
"fmt"
"os"
"path/filepath"
"strings"
)
func sanitizeWorkspacePathSegment(s string) string {
s = strings.TrimSpace(s)
if s == "" {
return "default"
}
s = strings.ReplaceAll(s, string(filepath.Separator), "-")
s = strings.ReplaceAll(s, "/", "-")
s = strings.ReplaceAll(s, "\\", "-")
s = strings.ReplaceAll(s, "..", "__")
if len(s) > 180 {
s = s[:180]
}
return s
}
// WorkspaceRootDir returns the relative workspace root for downloads and local analysis.
// Project-bound sessions share projects/<id>/; otherwise conversations/<id>/.
func WorkspaceRootDir(configuredBase, projectID, conversationID string) string {
base := strings.TrimSpace(configuredBase)
if base == "" {
base = filepath.Join("tmp", "workspace")
}
if pid := strings.TrimSpace(projectID); pid != "" {
return filepath.Join(base, "projects", sanitizeWorkspacePathSegment(pid))
}
conv := strings.TrimSpace(conversationID)
if conv == "" {
conv = "default"
}
return filepath.Join(base, "conversations", sanitizeWorkspacePathSegment(conv))
}
// EnsureWorkspace creates the workspace directory and returns its absolute path.
func EnsureWorkspace(root string) (string, error) {
abs, err := filepath.Abs(strings.TrimSpace(root))
if err != nil {
return "", fmt.Errorf("workspace abs: %w", err)
}
if err := os.MkdirAll(abs, 0o755); err != nil {
return "", fmt.Errorf("workspace mkdir: %w", err)
}
return abs, nil
}
// BuildWorkspaceBlock instructs the agent to use the session workspace instead of /tmp.
func BuildWorkspaceBlock(absPath string) string {
absPath = strings.TrimSpace(absPath)
if absPath == "" {
return ""
}
return fmt.Sprintf(`## 会话工作目录下载与本地分析
**必须使用以下目录**保存 curl/wget 下载的文件临时 HTML/JS以及 read_file/glob/grep 的检索范围
`+"`%s`"+`
- **禁止**使用系统 `+"`/tmp`"+` 或其它全局临时目录多项目/多会话会互窜遗留文件
- 下载示例`+"`curl -o '%s/page.html' 'https://target/'`"+`exec 时可将 `+"`workdir`"+` 设为该目录。
- 读取前用 glob/grep/read_file **限定在该目录**下搜索勿在 `+"`/tmp`"+` 盲目检索`, absPath, absPath)
}
+52
View File
@@ -0,0 +1,52 @@
package project
import (
"os"
"path/filepath"
"strings"
"testing"
)
func TestWorkspaceRootDirProjectScoped(t *testing.T) {
got := WorkspaceRootDir("", "proj-1", "conv-1")
want := filepath.Join("tmp", "workspace", "projects", "proj-1")
if got != want {
t.Fatalf("got %q want %q", got, want)
}
}
func TestWorkspaceRootDirConversationScoped(t *testing.T) {
got := WorkspaceRootDir("/data/ws", "", "conv-abc")
want := filepath.Join("/data/ws", "conversations", "conv-abc")
if got != want {
t.Fatalf("got %q want %q", got, want)
}
}
func TestEnsureWorkspaceCreatesDir(t *testing.T) {
root := filepath.Join(t.TempDir(), "nested", "workspace")
abs, err := EnsureWorkspace(root)
if err != nil {
t.Fatalf("EnsureWorkspace: %v", err)
}
st, err := os.Stat(abs)
if err != nil {
t.Fatalf("Stat: %v", err)
}
if !st.IsDir() {
t.Fatal("expected directory")
}
}
func TestBuildWorkspaceBlockMentionsPath(t *testing.T) {
block := BuildWorkspaceBlock("/opt/csai/tmp/workspace/projects/p1")
if block == "" {
t.Fatal("expected non-empty block")
}
if !strings.Contains(block, "/opt/csai/tmp/workspace/projects/p1") {
t.Fatalf("block missing path: %s", block)
}
if !strings.Contains(block, "/tmp") {
t.Fatalf("block should warn about /tmp: %s", block)
}
}
+11
View File
@@ -0,0 +1,11 @@
package projectprompt
// ShellExecExecuteGuidanceSection 供单代理/多代理系统提示追加:exec 与 execute 分工(尽量短)。
func ShellExecExecuteGuidanceSection() string {
return `Shellexec/execute):有专用 MCP 工具时优先专用工具;系统命令(管道、workdir、后台 &)用 execskills/ 内脚本(配合 read_file、skill)用 execute;多步扫描分拆调用,禁止一条 shell 串多个扫描器。下载/临时文件须写入系统提示中的「会话工作目录」,禁止用 /tmp。`
}
// ShellExecExecuteGuidanceReconSuffix 侦察子代理可选追加(一行)。
func ShellExecExecuteGuidanceReconSuffix() string {
return `枚举优先 subfinder、amass 等专用 MCP,勿 exec/execute 拼长链。`
}
@@ -0,0 +1,56 @@
package security
import (
"errors"
"fmt"
"os/exec"
"strings"
)
// FormatCommandFailureResult 与 exec 工具 ToolResult 文案一致(不含 ToolErrorPrefix)。
func FormatCommandFailureResult(exitCode int, output string) string {
output = strings.TrimSpace(output)
errMsg := fmt.Sprintf("exit status %d", exitCode)
if output == "" {
return fmt.Sprintf("命令执行失败: %s", errMsg)
}
if strings.HasPrefix(output, "命令执行失败:") {
return output
}
return fmt.Sprintf("命令执行失败: %s\n输出: %s", errMsg, output)
}
// FormatCommandFailureFromErr 根据 exec/execute 返回的 error 生成统一失败文案(IsError 正文)。
func FormatCommandFailureFromErr(err error, output string) string {
if err == nil {
return strings.TrimSpace(output)
}
var exitError *exec.ExitError
if errors.As(err, &exitError) {
return FormatCommandFailureResult(exitError.ExitCode(), output)
}
output = strings.TrimSpace(output)
if output == "" {
return fmt.Sprintf("命令执行失败: %v", err)
}
if strings.HasPrefix(output, "命令执行失败:") {
return output
}
return fmt.Sprintf("命令执行失败: %v\n输出: %s", err, output)
}
// ExecuteFailureStatusLine 流式 execute 结束时追加的单行状态(输出正文已在流中推送过)。
func ExecuteFailureStatusLine(exitCode int) string {
return fmt.Sprintf("\n命令执行失败: exit status %d", exitCode)
}
// IsCommandFailureResult 判断工具结果正文是否表示命令非零退出(用于 execute / exec 对齐 isError)。
func IsCommandFailureResult(content string) bool {
return strings.Contains(content, "命令执行失败:")
}
// IsLegacyShellExitNoise 过滤旧版 shell 流中冗余的 exit code 行。
func IsLegacyShellExitNoise(s string) bool {
trimmed := strings.TrimSpace(s)
return strings.HasPrefix(trimmed, "command exited with non-zero code ")
}
@@ -0,0 +1,54 @@
package security
import (
"errors"
"os/exec"
"strings"
"testing"
)
func TestFormatCommandFailureResult(t *testing.T) {
got := FormatCommandFailureResult(1, "sudo: password required")
want := "命令执行失败: exit status 1\n输出: sudo: password required"
if got != want {
t.Fatalf("got %q want %q", got, want)
}
if FormatCommandFailureResult(2, "") != "命令执行失败: exit status 2" {
t.Fatal("empty output format")
}
if FormatCommandFailureResult(1, "命令执行失败: exit status 1") != "命令执行失败: exit status 1" {
t.Fatal("should not double-wrap")
}
}
func TestIsCommandFailureResult(t *testing.T) {
if !IsCommandFailureResult("sudo: err\n命令执行失败: exit status 1") {
t.Fatal("expected true")
}
if IsCommandFailureResult("sudo: err only") {
t.Fatal("expected false")
}
}
func TestFormatCommandFailureFromErr(t *testing.T) {
cmd := exec.Command("sh", "-c", "exit 42")
err := cmd.Run()
got := FormatCommandFailureFromErr(err, "oops")
if got != "命令执行失败: exit status 42\n输出: oops" {
t.Fatalf("got %q", got)
}
timeoutErr := errors.New("shell inactivity timeout (300s)")
got2 := FormatCommandFailureFromErr(timeoutErr, "already timed out")
if !strings.Contains(got2, "shell inactivity timeout") || !strings.Contains(got2, "already timed out") {
t.Fatalf("got %q", got2)
}
}
func TestIsLegacyShellExitNoise(t *testing.T) {
if !IsLegacyShellExitNoise("command exited with non-zero code 1\n") {
t.Fatal("expected legacy noise")
}
if IsLegacyShellExitNoise("sudo: failed") {
t.Fatal("unexpected noise")
}
}
+60 -23
View File
@@ -32,10 +32,11 @@ var ToolOutputCallbackCtxKey = toolOutputCallbackCtxKey{}
// Executor 安全工具执行器
type Executor struct {
config *config.SecurityConfig
toolIndex map[string]*config.ToolConfig // 工具索引,用于 O(1) 查找
mcpServer *mcp.Server
logger *zap.Logger
config *config.SecurityConfig
toolIndex map[string]*config.ToolConfig // 工具索引,用于 O(1) 查找
mcpServer *mcp.Server
logger *zap.Logger
shellNoOutputTimeoutSec int // execute/exec 无新输出空闲秒数;0=默认 300-1=关闭(见 SetShellNoOutputTimeoutSeconds
}
// NewExecutor 创建新的执行器
@@ -51,6 +52,11 @@ func NewExecutor(cfg *config.SecurityConfig, mcpServer *mcp.Server, logger *zap.
return executor
}
// SetShellNoOutputTimeoutSeconds 配置 exec 工具无输出空闲终止(与 agent.shell_no_output_timeout_seconds 一致)。
func (e *Executor) SetShellNoOutputTimeoutSeconds(sec int) {
e.shellNoOutputTimeoutSec = sec
}
// buildToolIndex 构建工具索引,将 O(n) 查找优化为 O(1)
func (e *Executor) buildToolIndex() {
e.toolIndex = make(map[string]*config.ToolConfig)
@@ -133,6 +139,7 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st
// 执行命令
cmd := exec.CommandContext(ctx, toolConfig.Command, cmdArgs...)
applyDefaultTerminalEnv(cmd)
attachNonInteractiveStdin(cmd)
_ = prepareShellCmdSession(cmd)
e.logger.Info("执行安全工具",
@@ -144,7 +151,7 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st
var err error
// 如果上层提供了 stdout/stderr 增量回调,则边执行边读取并回调。
if cb, ok := ctx.Value(ToolOutputCallbackCtxKey).(ToolOutputCallback); ok && cb != nil {
output, err = streamCommandOutput(ctx, cmd, cb)
output, err = streamCommandOutput(ctx, cmd, cb, ResolveShellNoOutputTimeoutSeconds(e.shellNoOutputTimeoutSec))
if err != nil && shouldRetryWithPTY(output) {
e.logger.Info("检测到工具需要 TTY,使用 PTY 重试",
zap.String("tool", toolName),
@@ -797,6 +804,8 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
zap.String("command", command),
)
command = PrepareNonInteractiveShellCommand(command)
// 获取shell类型(可选,默认为sh)
shell := "sh"
if s, ok := args["shell"].(string); ok && s != "" {
@@ -820,8 +829,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
} else {
cmd = exec.CommandContext(ctx, shell, "-c", command)
}
applyDefaultTerminalEnv(cmd)
_ = prepareShellCmdSession(cmd)
ConfigureShellCmdForAgentExecute(cmd)
// 执行命令
e.logger.Info("执行系统命令",
@@ -850,8 +858,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
} else {
pidCmd = exec.CommandContext(ctx, shell, "-c", pidCommand)
}
applyDefaultTerminalEnv(pidCmd)
_ = prepareShellCmdSession(pidCmd)
ConfigureShellCmdForAgentExecute(pidCmd)
// 获取stdout管道
stdout, err := pidCmd.StdoutPipe()
@@ -963,15 +970,14 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
var err error
// 若上层提供工具输出增量回调,则边执行边流式读取。
if cb, ok := ctx.Value(ToolOutputCallbackCtxKey).(ToolOutputCallback); ok && cb != nil {
output, err = streamCommandOutput(ctx, cmd, cb)
output, err = streamCommandOutput(ctx, cmd, cb, ResolveShellNoOutputTimeoutSeconds(e.shellNoOutputTimeoutSec))
if err != nil && shouldRetryWithPTY(output) {
e.logger.Info("检测到系统命令需要 TTY,使用 PTY 重试")
cmd2 := exec.CommandContext(ctx, shell, "-c", command)
if workDir != "" {
cmd2.Dir = workDir
}
applyDefaultTerminalEnv(cmd2)
_ = prepareShellCmdSession(cmd2)
ConfigureShellCmdForAgentExecute(cmd2)
output, err = runCommandWithPTY(ctx, cmd2, cb)
}
} else {
@@ -984,8 +990,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
if workDir != "" {
cmd2.Dir = workDir
}
applyDefaultTerminalEnv(cmd2)
_ = prepareShellCmdSession(cmd2)
ConfigureShellCmdForAgentExecute(cmd2)
output, err = runCommandWithPTY(ctx, cmd2, nil)
}
}
@@ -999,7 +1004,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
Content: []mcp.Content{
{
Type: "text",
Text: fmt.Sprintf("命令执行失败: %v\n输出: %s", err, string(output)),
Text: FormatCommandFailureFromErr(err, output),
},
},
IsError: true,
@@ -1024,7 +1029,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
// streamCommandOutput 以“边读边回调”的方式读取命令 stdout/stderr。
// 使用定长块读取,避免按行读取在无换行输出时永久阻塞;ctx 取消时终止进程树。
func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback) (string, error) {
func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback, noOutputSec int) (string, error) {
if err := prepareShellCmdSession(cmd); err != nil {
return "", err
}
@@ -1091,12 +1096,43 @@ func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallba
lastFlush = time.Now()
}
for chunk := range chunks {
outBuilder.WriteString(chunk)
deltaBuilder.WriteString(chunk)
// 简单节流:buffer 大于 2KB 或 200ms 就刷新一次
if deltaBuilder.Len() >= 2048 || time.Since(lastFlush) >= 200*time.Millisecond {
flush()
idleWatch := NewShellInactivityWatch(noOutputSec)
if idleWatch != nil {
defer idleWatch.Stop()
}
fireInactivity := func() {
terminateCmdTree(cmd)
msg := ShellNoOutputTimeoutMessage(idleWatch.Sec)
outBuilder.WriteString(msg)
if cb != nil {
cb(msg)
}
_ = cmd.Wait()
}
chunksLoop:
for {
var idleCh <-chan struct{}
if idleWatch != nil {
idleCh = idleWatch.Expired
}
select {
case <-idleCh:
fireInactivity()
return outBuilder.String(), fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec)
case chunk, ok := <-chunks:
if !ok {
break chunksLoop
}
if chunk != "" && idleWatch != nil {
idleWatch.Bump()
}
outBuilder.WriteString(chunk)
deltaBuilder.WriteString(chunk)
if deltaBuilder.Len() >= 2048 || time.Since(lastFlush) >= 200*time.Millisecond {
flush()
}
}
}
flush()
@@ -1116,6 +1152,7 @@ func applyDefaultTerminalEnv(cmd *exec.Cmd) {
if cmd.Env == nil {
cmd.Env = os.Environ()
}
cmd.Env = ApplyNonInteractivePagerEnv(cmd.Env)
// 如果用户已设置 TERM/COLUMNS/LINES,则不覆盖
has := func(k string) bool {
prefix := k + "="
@@ -1159,7 +1196,7 @@ func runCommandWithPTY(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback
if runtime.GOOS == "windows" {
// PTY 方案为类 UnixWindows 走原逻辑
if cb != nil {
return streamCommandOutput(ctx, cmd, cb)
return streamCommandOutput(ctx, cmd, cb, 0)
}
_ = prepareShellCmdSession(cmd)
out, err := cmd.CombinedOutput()
+21
View File
@@ -71,6 +71,27 @@ func TestExecuteSystemCommand_BackgroundDoesNotBlockOnChildStdout(t *testing.T)
}
}
func TestExecuteSystemCommand_FailureFormat(t *testing.T) {
executor, _ := setupTestExecutor(t)
res, err := executor.executeSystemCommand(context.Background(), map[string]interface{}{
"command": "echo fail-msg >&2; exit 7",
"shell": "sh",
})
if err != nil {
t.Fatalf("executeSystemCommand: %v", err)
}
if res == nil || !res.IsError {
t.Fatalf("expected IsError, got %+v", res)
}
text := res.Content[0].Text
if text != FormatCommandFailureResult(7, "fail-msg\n") && text != FormatCommandFailureResult(7, "fail-msg") {
t.Fatalf("unexpected failure text: %q", text)
}
if !strings.Contains(text, "exit status 7") || !strings.Contains(text, "fail-msg") {
t.Fatalf("unexpected failure text: %q", text)
}
}
func TestBuildCommandArgs_NmapSkipsEmptyOptionalFlags(t *testing.T) {
pos1 := 1
executor, _ := setupTestExecutor(t)
+200
View File
@@ -0,0 +1,200 @@
package security
import (
"context"
"errors"
"fmt"
"io"
"os/exec"
"sync"
"github.com/cloudwego/eino/adk/filesystem"
"github.com/cloudwego/eino/schema"
)
// ConfigureShellCmdForAgentExecute 与 exec 工具一致:非交互 stdin、pager/TERM 环境、独立进程组。
func ConfigureShellCmdForAgentExecute(cmd *exec.Cmd) {
if cmd == nil {
return
}
applyDefaultTerminalEnv(cmd)
attachNonInteractiveStdin(cmd)
_ = prepareShellCmdSession(cmd)
}
// TerminateShellCmdTree 尽力终止 shell 及其子进程组(与 exec/execute 超时取消一致)。
func TerminateShellCmdTree(cmd *exec.Cmd) {
terminateCmdTree(cmd)
}
// EinoStreamingShell 为 Eino ADK execute 工具提供流式 shell,行为与 exec 对齐:
// 并发读取 stdout/stderr(定长块,非按行),避免官方 local.ExecuteStreaming 先排空 stdout
// 导致 stderr 错误(如 sudo 密码提示)长时间不可见、UI 一直显示「执行中」。
type EinoStreamingShell struct{}
// NewEinoStreamingShell 创建 execute 流式 shell 实现。
func NewEinoStreamingShell() *EinoStreamingShell {
return &EinoStreamingShell{}
}
// ExecuteStreaming 实现 filesystem.StreamingShell。
func (s *EinoStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
if input == nil || input.Command == "" {
return nil, fmt.Errorf("command is required")
}
sr, w := schema.Pipe[*filesystem.ExecuteResponse](100)
if input.RunInBackendGround {
go runShellInBackground(ctx, input.Command, w)
return sr, nil
}
go streamShellForeground(ctx, input.Command, w)
return sr, nil
}
func runShellInBackground(ctx context.Context, command string, w *schema.StreamWriter[*filesystem.ExecuteResponse]) {
defer w.Close()
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command)
ConfigureShellCmdForAgentExecute(cmd)
stdout, err := cmd.StdoutPipe()
if err != nil {
_ = w.Send(nil, fmt.Errorf("failed to create stdout pipe: %w", err))
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
_ = stdout.Close()
_ = w.Send(nil, fmt.Errorf("failed to create stderr pipe: %w", err))
return
}
if err := cmd.Start(); err != nil {
_ = stdout.Close()
_ = stderr.Close()
_ = w.Send(nil, fmt.Errorf("failed to start command: %w", err))
return
}
done := make(chan struct{})
go func() {
drainShellPipes(stdout, stderr)
_ = cmd.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
TerminateShellCmdTree(cmd)
}
exitCode := 0
_ = w.Send(&filesystem.ExecuteResponse{
Output: "command started in background\n",
ExitCode: &exitCode,
}, nil)
}
func drainShellPipes(stdout, stderr io.Reader) {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
_, _ = io.Copy(io.Discard, stdout)
}()
go func() {
defer wg.Done()
_, _ = io.Copy(io.Discard, stderr)
}()
wg.Wait()
}
func streamShellForeground(ctx context.Context, command string, w *schema.StreamWriter[*filesystem.ExecuteResponse]) {
defer w.Close()
cmd := exec.CommandContext(ctx, "/bin/sh", "-c", command)
ConfigureShellCmdForAgentExecute(cmd)
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
_ = w.Send(nil, fmt.Errorf("failed to create stdout pipe: %w", err))
return
}
stderrPipe, err := cmd.StderrPipe()
if err != nil {
_ = stdoutPipe.Close()
_ = w.Send(nil, fmt.Errorf("failed to create stderr pipe: %w", err))
return
}
if err := cmd.Start(); err != nil {
_ = stdoutPipe.Close()
_ = stderrPipe.Close()
_ = w.Send(nil, fmt.Errorf("failed to start command: %w", err))
return
}
stopWatch := make(chan struct{})
go func() {
select {
case <-ctx.Done():
TerminateShellCmdTree(cmd)
case <-stopWatch:
}
}()
defer close(stopWatch)
chunks := make(chan string, 64)
var wg sync.WaitGroup
readFn := func(r io.Reader) {
defer wg.Done()
buf := make([]byte, 8192)
for {
n, readErr := r.Read(buf)
if n > 0 {
chunks <- string(buf[:n])
}
if readErr != nil {
return
}
}
}
wg.Add(2)
go readFn(stdoutPipe)
go readFn(stderrPipe)
go func() {
wg.Wait()
close(chunks)
}()
hadOutput := false
for chunk := range chunks {
if chunk == "" {
continue
}
hadOutput = true
if w.Send(&filesystem.ExecuteResponse{Output: chunk}, nil) {
TerminateShellCmdTree(cmd)
return
}
}
waitErr := cmd.Wait()
if waitErr == nil {
exitCode := 0
_ = w.Send(&filesystem.ExecuteResponse{ExitCode: &exitCode}, nil)
return
}
var exitError *exec.ExitError
if errors.As(waitErr, &exitError) {
exitCode := exitError.ExitCode()
resp := &filesystem.ExecuteResponse{ExitCode: &exitCode}
if !hadOutput {
resp.Output = FormatCommandFailureResult(exitCode, "")
}
_ = w.Send(resp, nil)
return
}
_ = w.Send(nil, fmt.Errorf("command failed: %w", waitErr))
}
@@ -0,0 +1,117 @@
package security
import (
"context"
"errors"
"io"
"strings"
"testing"
"time"
"github.com/cloudwego/eino/adk/filesystem"
)
func TestEinoStreamingShell_StreamsStderrBeforeStdoutEOF(t *testing.T) {
shell := NewEinoStreamingShell()
cmd := PrepareNonInteractiveShellCommand("echo err-only >&2; exit 1")
sr, err := shell.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: cmd})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
start := time.Now()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
if resp != nil && resp.Output != "" {
got.WriteString(resp.Output)
}
}
if time.Since(start) > 3*time.Second {
t.Fatalf("expected fast completion, took %v", time.Since(start))
}
if !strings.Contains(got.String(), "err-only") {
t.Fatalf("expected stderr in output, got: %q", got.String())
}
}
func TestEinoStreamingShell_SudoFailsFast(t *testing.T) {
shell := NewEinoStreamingShell()
cmd := PrepareNonInteractiveShellCommand("sudo whoami && sudo cat /etc/os-release")
sr, err := shell.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: cmd})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
start := time.Now()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
t.Fatalf("recv: %v", rerr)
}
if resp == nil {
continue
}
got.WriteString(resp.Output)
}
if time.Since(start) > 5*time.Second {
t.Fatalf("sudo should fail quickly, took %v output=%q", time.Since(start), got.String())
}
out := got.String()
if strings.Contains(out, "command exited with non-zero code") {
t.Fatalf("legacy exit line present: %q", out)
}
if !strings.Contains(out, "sudo") && !strings.Contains(out, "password") && !strings.Contains(out, "terminal") {
t.Fatalf("expected sudo error text, got: %q", out)
}
}
func TestEinoStreamingShell_StderrWhileStdoutBlocks(t *testing.T) {
shell := NewEinoStreamingShell()
// 模拟 sudostderr 先有输出,stdout 侧进程仍挂起;旧 eino local 在首包 stderr 前不会向流写任何内容。
cmd := PrepareNonInteractiveShellCommand(`echo "password prompt" >&2; sleep 30`)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
sr, err := shell.ExecuteStreaming(ctx, &filesystem.ExecuteRequest{Command: cmd})
if err != nil {
t.Fatalf("ExecuteStreaming: %v", err)
}
defer sr.Close()
start := time.Now()
var got strings.Builder
for {
resp, rerr := sr.Recv()
if errors.Is(rerr, io.EOF) {
break
}
if rerr != nil {
break
}
if resp != nil && resp.Output != "" {
got.WriteString(resp.Output)
if strings.Contains(got.String(), "password prompt") {
break
}
}
}
if time.Since(start) > 1500*time.Millisecond {
t.Fatalf("expected stderr promptly, took %v output=%q", time.Since(start), got.String())
}
if !strings.Contains(got.String(), "password prompt") {
t.Fatalf("expected early stderr, got: %q", got.String())
}
}
+163
View File
@@ -0,0 +1,163 @@
package security
import (
"fmt"
"os"
"os/exec"
"strings"
"sync"
"time"
)
// ShellNoOutputTimeoutMessage 长时间无新 stdout/stderr 时的提示(软失败,模型可见)。
func ShellNoOutputTimeoutMessage(idleSec int) string {
return fmt.Sprintf(`命令已终止超过 %d 秒没有新的输出疑似在等待交互输入或已挂起
长时静默任务请使用末尾 & 后台运行或增大 agent.shell_no_output_timeout_seconds-1=关闭此检测
Command terminated: no new output for %d seconds (possible interactive wait or hung process).`, idleSec, idleSec)
}
// ShellInactivityWatch 在 noOutputSec 内无任何新输出时向 expired 发送信号;每次 Bump 重置计时。
// 与「仅有首包输出就永久取消计时」不同,可兜住 sudo 打印 Password 提示后继续挂起等情况。
type ShellInactivityWatch struct {
Sec int
mu sync.Mutex
timer *time.Timer
Expired chan struct{}
}
func NewShellInactivityWatch(noOutputSec int) *ShellInactivityWatch {
sec := ResolveShellNoOutputTimeoutSeconds(noOutputSec)
if sec <= 0 {
return nil
}
w := &ShellInactivityWatch{
Sec: sec,
Expired: make(chan struct{}, 1),
}
w.Bump()
return w
}
func (w *ShellInactivityWatch) Bump() {
if w == nil || w.Sec <= 0 {
return
}
w.mu.Lock()
defer w.mu.Unlock()
if w.timer != nil {
w.timer.Stop()
}
w.timer = time.AfterFunc(time.Duration(w.Sec)*time.Second, func() {
select {
case w.Expired <- struct{}{}:
default:
}
})
}
func (w *ShellInactivityWatch) Stop() {
if w == nil {
return
}
w.mu.Lock()
defer w.mu.Unlock()
if w.timer != nil {
w.timer.Stop()
w.timer = nil
}
}
// ResolveShellNoOutputTimeoutSeconds0=默认 3005 分钟);-1=关闭;>0=自定义。
func ResolveShellNoOutputTimeoutSeconds(sec int) int {
if sec < 0 {
return 0
}
if sec == 0 {
return 300
}
return sec
}
// PrependNonInteractiveShellExports 为 sh -c 注入通用非交互环境(pager 等),不维护命令黑名单。
func PrependNonInteractiveShellExports(shellCommand string) string {
if strings.TrimSpace(shellCommand) == "" {
return shellCommand
}
upper := strings.ToUpper(shellCommand)
var pairs []string
add := func(key, val string) {
if strings.Contains(upper, strings.ToUpper(key)) {
return
}
pairs = append(pairs, key+"="+val)
}
add("GIT_PAGER", "cat")
add("PAGER", "cat")
add("SYSTEMD_PAGER", "cat")
add("DEBIAN_FRONTEND", "noninteractive")
if len(pairs) == 0 {
return shellCommand
}
return "export " + strings.Join(pairs, " ") + "\n" + shellCommand
}
// PrependNonInteractiveStdinRedirect 为 sh -c 关闭 stdin(与 attachNonInteractiveStdin 等价),
// 使 read/input()/sudo -S 等从 stdin 读取的程序快速失败而非挂起。已含 </dev/null 时不重复注入。
func PrependNonInteractiveStdinRedirect(shellCommand string) string {
if strings.TrimSpace(shellCommand) == "" {
return shellCommand
}
lower := strings.ToLower(shellCommand)
if strings.Contains(lower, "</dev/null") || strings.Contains(lower, "0</dev/null") {
return shellCommand
}
return "exec </dev/null\n" + shellCommand
}
// PrepareNonInteractiveShellCommand 组合非交互包装:stdin 关闭 + pager 等环境变量(零名单)。
func PrepareNonInteractiveShellCommand(shellCommand string) string {
return PrependNonInteractiveStdinRedirect(PrependNonInteractiveShellExports(shellCommand))
}
// ApplyNonInteractivePagerEnv 为 exec.Cmd 补齐与 PrependNonInteractiveShellExports 一致的环境变量。
func ApplyNonInteractivePagerEnv(cmdEnv []string) []string {
if cmdEnv == nil {
cmdEnv = []string{}
}
has := func(k string) bool {
prefix := k + "="
for _, e := range cmdEnv {
if strings.HasPrefix(e, prefix) {
return true
}
}
return false
}
if !has("GIT_PAGER") {
cmdEnv = append(cmdEnv, "GIT_PAGER=cat")
}
if !has("PAGER") {
cmdEnv = append(cmdEnv, "PAGER=cat")
}
if !has("SYSTEMD_PAGER") {
cmdEnv = append(cmdEnv, "SYSTEMD_PAGER=cat")
}
if !has("DEBIAN_FRONTEND") {
cmdEnv = append(cmdEnv, "DEBIAN_FRONTEND=noninteractive")
}
return cmdEnv
}
// attachNonInteractiveStdin 关闭交互式 stdin,使部分命令快速失败而非等待输入。
func attachNonInteractiveStdin(cmd *exec.Cmd) {
if cmd == nil || cmd.Stdin != nil {
return
}
f, err := os.Open(os.DevNull)
if err != nil {
return
}
cmd.Stdin = f
}
@@ -0,0 +1,128 @@
package security
import (
"context"
"os"
"os/exec"
"strings"
"testing"
"time"
)
func TestPrependNonInteractiveShellExports(t *testing.T) {
out := PrependNonInteractiveShellExports("echo hi")
if !strings.Contains(out, "GIT_PAGER=cat") || !strings.Contains(out, "PAGER=cat") {
t.Fatalf("missing pager exports: %q", out)
}
if !strings.HasSuffix(strings.TrimSpace(out), "echo hi") {
t.Fatalf("command suffix lost: %q", out)
}
skip := PrependNonInteractiveShellExports("GIT_PAGER=less echo hi")
if strings.Contains(skip, "export GIT_PAGER=cat") {
t.Fatalf("should not override existing GIT_PAGER: %q", skip)
}
}
func TestPrependNonInteractiveStdinRedirect(t *testing.T) {
out := PrependNonInteractiveStdinRedirect("echo hi")
if !strings.HasPrefix(out, "exec </dev/null") {
t.Fatalf("missing stdin redirect: %q", out)
}
if !strings.HasSuffix(strings.TrimSpace(out), "echo hi") {
t.Fatalf("command suffix lost: %q", out)
}
skip := PrependNonInteractiveStdinRedirect("cmd </dev/null")
if strings.HasPrefix(skip, "exec </dev/null") {
t.Fatalf("should not double redirect: %q", skip)
}
}
func TestPrepareNonInteractiveShellCommand(t *testing.T) {
out := PrepareNonInteractiveShellCommand("echo hi")
if !strings.Contains(out, "exec </dev/null") {
t.Fatalf("missing stdin redirect: %q", out)
}
if !strings.Contains(out, "GIT_PAGER=cat") {
t.Fatalf("missing pager export: %q", out)
}
}
func TestNewShellInactivityWatch(t *testing.T) {
w := NewShellInactivityWatch(1)
if w == nil {
t.Fatal("expected watch")
}
w.Bump()
select {
case <-w.Expired:
case <-time.After(3 * time.Second):
t.Fatal("expected inactivity fire within 3s")
}
}
func TestResolveShellNoOutputTimeoutSeconds(t *testing.T) {
if ResolveShellNoOutputTimeoutSeconds(0) != 300 {
t.Fatal("zero should default to 300")
}
if ResolveShellNoOutputTimeoutSeconds(-1) != 0 {
t.Fatal("-1 should disable")
}
if ResolveShellNoOutputTimeoutSeconds(30) != 30 {
t.Fatal("explicit value")
}
}
// TestNonInteractiveStdinReadExitsQuickly 验证 exec </dev/null + attachNonInteractiveStdin 时 read 立即 EOF,不挂起。
func TestNonInteractiveStdinReadExitsQuickly(t *testing.T) {
if testing.Short() {
t.Skip("skipping shell integration in -short")
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, "sh", "-c", PrepareNonInteractiveShellCommand(`read x; echo "x=<$x>"`))
attachNonInteractiveStdin(cmd)
start := time.Now()
out, err := cmd.CombinedOutput()
elapsed := time.Since(start)
if elapsed > 2*time.Second {
t.Fatalf("read with closed stdin took %v, want <2s", elapsed)
}
if err != nil {
t.Fatalf("unexpected error: %v output=%q", err, out)
}
if !strings.Contains(string(out), "x=<>") {
t.Fatalf("unexpected output: %q", out)
}
}
// TestNonInteractiveStdinReadBlocksWithoutRedirect 对照:stdin 为永不写入的管道时 read 会挂起。
func TestNonInteractiveStdinReadBlocksWithoutRedirect(t *testing.T) {
if testing.Short() {
t.Skip("skipping shell integration in -short")
}
r, w, err := os.Pipe()
if err != nil {
t.Fatal(err)
}
defer r.Close()
// 保持 w 打开且不写数据,模拟「等待用户输入」
cmd := exec.Command("sh", "-c", `read x; echo done`)
cmd.Stdin = r
done := make(chan error, 1)
go func() { done <- cmd.Run() }()
select {
case err := <-done:
t.Fatalf("expected hang, but command finished: %v", err)
case <-time.After(500 * time.Millisecond):
if cmd.Process != nil {
_ = cmd.Process.Kill()
}
_ = w.Close()
<-done // 等待 goroutine 退出
}
}
+32 -9
View File
@@ -3110,15 +3110,26 @@ async function cancelMCPToolExecutionSubmit(executionId, userNote, options = {})
if (!executionId) {
return;
}
let conversationId = '';
if (typeof monitorState !== 'undefined' && Array.isArray(monitorState.executions)) {
const exec = monitorState.executions.find(e => e && e.id === executionId);
if (exec) {
conversationId = (exec.conversationId || '').trim();
}
}
try {
const res = await apiFetch(`/api/monitor/execution/${encodeURIComponent(executionId)}/cancel`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ note: userNote || '' }),
});
const body = await res.json().catch(() => ({}));
if (!res.ok) {
throw new Error(body.error || body.message || res.statusText);
if (conversationId && typeof requestCancelWithContinue === 'function') {
await requestCancelWithContinue(conversationId, userNote || '');
} else {
const res = await apiFetch(`/api/monitor/execution/${encodeURIComponent(executionId)}/cancel`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ note: userNote || '' }),
});
const body = await res.json().catch(() => ({}));
if (!res.ok) {
throw new Error(body.error || body.message || res.statusText);
}
}
const okMsg = typeof window.t === 'function' ? window.t('mcpDetailModal.abortSuccess') : '已发送终止请求';
alert(okMsg);
@@ -3136,7 +3147,7 @@ async function cancelMCPToolExecutionSubmit(executionId, userNote, options = {})
}
/**
* 取消单次 MCP 工具执行监控页终止弹出说明框后提交仅取消该次 tools/call不停止整条对话/迭代任务
* 取消单次 MCP 工具执行监控页终止 conversationId 时复用对话页中断并继续弹窗与 API
* @param {string} executionId
* @param {{ refreshDetail?: boolean }} [options]
*/
@@ -3144,6 +3155,18 @@ async function cancelMCPToolExecution(executionId, options = {}) {
if (!executionId) {
return;
}
let conversationId = '';
if (typeof monitorState !== 'undefined' && Array.isArray(monitorState.executions)) {
const exec = monitorState.executions.find(e => e && e.id === executionId);
if (exec) {
conversationId = (exec.conversationId || '').trim();
}
}
if (conversationId && typeof openUserInterruptModal === 'function') {
openUserInterruptModal(null, conversationId);
window.__monitorInterruptContext = { executionId: executionId, options: options || {} };
return;
}
openMcpToolAbortModal(executionId, options);
}
+36
View File
@@ -1003,6 +1003,7 @@ function openUserInterruptModal(progressId, conversationId) {
function closeUserInterruptModal() {
userInterruptModalPending = null;
window.__monitorInterruptContext = null;
closeAppModal('user-interrupt-modal');
}
@@ -1012,6 +1013,7 @@ async function submitUserInterruptContinue() {
}
const reason = (document.getElementById('user-interrupt-reason') && document.getElementById('user-interrupt-reason').value || '').trim();
const { progressId, conversationId } = userInterruptModalPending;
const monitorCtx = window.__monitorInterruptContext;
closeUserInterruptModal();
const stopBtn = progressId ? document.getElementById(`${progressId}-stop-btn`) : null;
try {
@@ -1020,6 +1022,13 @@ async function submitUserInterruptContinue() {
stopBtn.textContent = typeof window.t === 'function' ? window.t('tasks.interruptSubmitting') : '提交中...';
}
await requestCancelWithContinue(conversationId, reason);
if (monitorCtx && monitorCtx.executionId && typeof refreshMonitorPanel === 'function') {
const page = (typeof monitorState !== 'undefined' && monitorState.pagination && monitorState.pagination.page)
? monitorState.pagination.page
: 1;
await refreshMonitorPanel(page);
window.__monitorInterruptContext = null;
}
loadActiveTasks();
} catch (error) {
console.error('中断并继续失败:', error);
@@ -3536,6 +3545,33 @@ const monitorState = {
}
};
let monitorPollTimer = null;
const MONITOR_POLL_INTERVAL_MS = 3000;
function startMonitorPoll() {
stopMonitorPoll();
monitorPollTimer = setInterval(function () {
const page = document.getElementById('page-mcp-monitor');
if (!page || !page.classList.contains('active')) {
stopMonitorPoll();
return;
}
if (document.hidden) {
return;
}
if (typeof refreshMonitorPanel === 'function') {
refreshMonitorPanel().catch(function () { /* ignore */ });
}
}, MONITOR_POLL_INTERVAL_MS);
}
function stopMonitorPoll() {
if (monitorPollTimer) {
clearInterval(monitorPollTimer);
monitorPollTimer = null;
}
}
function openMonitorPanel() {
// 切换到MCP监控页面
if (typeof switchPage === 'function') {
+3
View File
@@ -356,6 +356,9 @@ async function initPage(pageId) {
if (typeof refreshMonitorPanel === 'function') {
refreshMonitorPanel();
}
if (typeof startMonitorPoll === 'function') {
startMonitorPoll();
}
break;
case 'mcp-management':
// 初始化MCP管理