diff --git a/internal/mcp/server.go b/internal/mcp/server.go index d964c2b5..074beaa6 100644 --- a/internal/mcp/server.go +++ b/internal/mcp/server.go @@ -44,6 +44,10 @@ type Server struct { runningCancels map[string]context.CancelFunc runningCancelsMu sync.Mutex abortUserNotes map[string]string // 监控页终止时附带的用户说明,与 executionID 对应 + // httpToolTimeoutMinutes 同步 agent.tool_timeout_minutes,用于 POST /api/mcp 的 tools/call(不经 Agent 包装的路径)。 + // nil 表示未配置,沿用默认 30 分钟;指向 0 表示不限制;>0 为分钟数。 + httpToolTimeoutMinutes *int + httpToolTimeoutMu sync.RWMutex } type sseClient struct { @@ -90,6 +94,39 @@ func NewServerWithStorage(logger *zap.Logger, storage MonitorStorage) *Server { return s } +// ConfigureHTTPToolCallTimeoutFromAgentMinutes 将 agent.tool_timeout_minutes 同步到经 HTTP POST /api/mcp 触发的 tools/call。 +// minutes<=0 表示不设置硬性截止时间(与配置「0 不限制」一致);minutes>0 为该次调用的最长等待时间。 +// 未调用前对 tools/call 使用默认 30 分钟(与历史硬编码一致)。 +func (s *Server) ConfigureHTTPToolCallTimeoutFromAgentMinutes(minutes int) { + if s == nil { + return + } + v := minutes + if v < 0 { + v = 0 + } + s.httpToolTimeoutMu.Lock() + defer s.httpToolTimeoutMu.Unlock() + s.httpToolTimeoutMinutes = &v +} + +func (s *Server) effectiveHTTPToolCallDeadline() (context.Context, context.CancelFunc) { + const defaultDur = 30 * time.Minute + if s == nil { + return context.WithTimeout(context.Background(), defaultDur) + } + s.httpToolTimeoutMu.RLock() + mPtr := s.httpToolTimeoutMinutes + s.httpToolTimeoutMu.RUnlock() + if mPtr == nil { + return context.WithTimeout(context.Background(), defaultDur) + } + if *mPtr <= 0 { + return context.WithCancel(context.Background()) + } + return context.WithTimeout(context.Background(), time.Duration(*mPtr)*time.Minute) +} + // RegisterTool 注册工具 func (s *Server) RegisterTool(tool Tool, handler ToolHandler) { s.mu.Lock() @@ -457,7 +494,7 @@ func (s *Server) handleCallTool(msg *Message) *Message { } } - baseCtx, timeoutCancel := context.WithTimeout(context.Background(), 30*time.Minute) + baseCtx, timeoutCancel := s.effectiveHTTPToolCallDeadline() defer timeoutCancel() execCtx, runCancel := context.WithCancel(baseCtx) s.registerRunningCancel(executionID, runCancel) diff --git a/internal/security/executor.go b/internal/security/executor.go index 4192b866..9ce8e066 100644 --- a/internal/security/executor.go +++ b/internal/security/executor.go @@ -153,6 +153,7 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st // 执行命令 cmd := exec.CommandContext(ctx, toolConfig.Command, cmdArgs...) applyDefaultTerminalEnv(cmd) + _ = prepareShellCmdSession(cmd) e.logger.Info("执行安全工具", zap.String("tool", toolName), @@ -163,13 +164,14 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st var err error // 如果上层提供了 stdout/stderr 增量回调,则边执行边读取并回调。 if cb, ok := ctx.Value(ToolOutputCallbackCtxKey).(ToolOutputCallback); ok && cb != nil { - output, err = streamCommandOutput(cmd, cb) + output, err = streamCommandOutput(ctx, cmd, cb) if err != nil && shouldRetryWithPTY(output) { e.logger.Info("检测到工具需要 TTY,使用 PTY 重试", zap.String("tool", toolName), ) cmd2 := exec.CommandContext(ctx, toolConfig.Command, cmdArgs...) applyDefaultTerminalEnv(cmd2) + _ = prepareShellCmdSession(cmd2) output, err = runCommandWithPTY(ctx, cmd2, cb) } } else { @@ -182,6 +184,7 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st ) cmd2 := exec.CommandContext(ctx, toolConfig.Command, cmdArgs...) applyDefaultTerminalEnv(cmd2) + _ = prepareShellCmdSession(cmd2) output, err = runCommandWithPTY(ctx, cmd2, nil) } } @@ -837,6 +840,8 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int } else { cmd = exec.CommandContext(ctx, shell, "-c", command) } + applyDefaultTerminalEnv(cmd) + _ = prepareShellCmdSession(cmd) // 执行命令 e.logger.Info("执行系统命令", @@ -865,6 +870,8 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int } else { pidCmd = exec.CommandContext(ctx, shell, "-c", pidCommand) } + applyDefaultTerminalEnv(pidCmd) + _ = prepareShellCmdSession(pidCmd) // 获取stdout管道 stdout, err := pidCmd.StdoutPipe() @@ -976,7 +983,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int var err error // 若上层提供工具输出增量回调,则边执行边流式读取。 if cb, ok := ctx.Value(ToolOutputCallbackCtxKey).(ToolOutputCallback); ok && cb != nil { - output, err = streamCommandOutput(cmd, cb) + output, err = streamCommandOutput(ctx, cmd, cb) if err != nil && shouldRetryWithPTY(output) { e.logger.Info("检测到系统命令需要 TTY,使用 PTY 重试") cmd2 := exec.CommandContext(ctx, shell, "-c", command) @@ -984,6 +991,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int cmd2.Dir = workDir } applyDefaultTerminalEnv(cmd2) + _ = prepareShellCmdSession(cmd2) output, err = runCommandWithPTY(ctx, cmd2, cb) } } else { @@ -997,6 +1005,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int cmd2.Dir = workDir } applyDefaultTerminalEnv(cmd2) + _ = prepareShellCmdSession(cmd2) output, err = runCommandWithPTY(ctx, cmd2, nil) } } @@ -1034,8 +1043,11 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int } // streamCommandOutput 以“边读边回调”的方式读取命令 stdout/stderr。 -// 保持输出内容完整拼接返回,并用 cb(chunk) 向上层持续推送。 -func streamCommandOutput(cmd *exec.Cmd, cb ToolOutputCallback) (string, error) { +// 使用定长块读取,避免按行读取在无换行输出时永久阻塞;ctx 取消时终止进程树。 +func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback) (string, error) { + if err := prepareShellCmdSession(cmd); err != nil { + return "", err + } stdoutPipe, err := cmd.StdoutPipe() if err != nil { return "", err @@ -1051,18 +1063,27 @@ func streamCommandOutput(cmd *exec.Cmd, cb ToolOutputCallback) (string, error) { return "", err } + stopWatch := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + terminateCmdTree(cmd) + case <-stopWatch: + } + }() + defer close(stopWatch) + chunks := make(chan string, 64) var wg sync.WaitGroup readFn := func(r io.Reader) { defer wg.Done() - br := bufio.NewReader(r) + buf := make([]byte, 8192) for { - s, readErr := br.ReadString('\n') - if s != "" { - chunks <- s + n, readErr := r.Read(buf) + if n > 0 { + chunks <- string(buf[:n]) } if readErr != nil { - // EOF 正常结束 return } } @@ -1158,12 +1179,14 @@ func runCommandWithPTY(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback if runtime.GOOS == "windows" { // PTY 方案为类 Unix;Windows 走原逻辑 if cb != nil { - return streamCommandOutput(cmd, cb) + return streamCommandOutput(ctx, cmd, cb) } + _ = prepareShellCmdSession(cmd) out, err := cmd.CombinedOutput() return string(out), err } + _ = prepareShellCmdSession(cmd) ptmx, err := pty.Start(cmd) if err != nil { return "", err @@ -1176,9 +1199,7 @@ func runCommandWithPTY(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback select { case <-ctx.Done(): _ = ptmx.Close() // 触发读退出 - if cmd.Process != nil { - _ = cmd.Process.Kill() - } + terminateCmdTree(cmd) case <-done: } }() diff --git a/internal/security/procattr_unix.go b/internal/security/procattr_unix.go new file mode 100644 index 00000000..96d4efe2 --- /dev/null +++ b/internal/security/procattr_unix.go @@ -0,0 +1,31 @@ +//go:build !windows + +package security + +import ( + "os/exec" + "syscall" +) + +// prepareShellCmdSession 让 shell 子进程在独立会话中运行,便于超时/取消时整组 SIGKILL(含子进程)。 +func prepareShellCmdSession(cmd *exec.Cmd) error { + if cmd == nil { + return nil + } + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.Setsid = true + return nil +} + +// terminateCmdTree 尽力终止 cmd 及其进程组(Unix 下 Setsid 后 PGID == 首进程 PID)。 +func terminateCmdTree(cmd *exec.Cmd) { + if cmd == nil || cmd.Process == nil { + return + } + pid := cmd.Process.Pid + if err := syscall.Kill(-pid, syscall.SIGKILL); err != nil { + _ = cmd.Process.Kill() + } +} diff --git a/internal/security/procattr_windows.go b/internal/security/procattr_windows.go new file mode 100644 index 00000000..df7e2eda --- /dev/null +++ b/internal/security/procattr_windows.go @@ -0,0 +1,17 @@ +//go:build windows + +package security + +import "os/exec" + +func prepareShellCmdSession(cmd *exec.Cmd) error { + _ = cmd + return nil +} + +func terminateCmdTree(cmd *exec.Cmd) { + if cmd == nil || cmd.Process == nil { + return + } + _ = cmd.Process.Kill() +}