From c91806c0c41f0c9c2343d08afa15fe59e2145c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 26 Jun 2026 23:11:52 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 3 +- internal/handler/terminal_ws_windows.go | 16 ++++++ internal/security/executor.go | 69 ++++++++++++++++++++++--- internal/security/executor_test.go | 32 ++++++++++++ internal/security/procattr_windows.go | 22 ++++++-- 5 files changed, 131 insertions(+), 11 deletions(-) create mode 100644 internal/handler/terminal_ws_windows.go diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 7baf58cf..f9c1eed1 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -1360,6 +1360,7 @@ func (h *AgentHandler) cancelToolContinueAfter(conversationID, preferredExecID, func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { var req struct { ConversationID string `json:"conversationId" binding:"required"` + ExecutionID string `json:"executionId,omitempty"` Reason string `json:"reason,omitempty"` ContinueAfter bool `json:"continueAfter,omitempty"` } @@ -1376,7 +1377,7 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) { } note := strings.TrimSpace(req.Reason) activeExec := strings.TrimSpace(h.tasks.ActiveMCPExecutionID(req.ConversationID)) - if ok, payload := h.cancelToolContinueAfter(req.ConversationID, "", note); ok { + if ok, payload := h.cancelToolContinueAfter(req.ConversationID, strings.TrimSpace(req.ExecutionID), note); ok { execID, _ := payload["executionId"].(string) h.logger.Info("对话页仅终止当前工具", zap.String("conversationId", req.ConversationID), diff --git a/internal/handler/terminal_ws_windows.go b/internal/handler/terminal_ws_windows.go new file mode 100644 index 00000000..2d71fa6b --- /dev/null +++ b/internal/handler/terminal_ws_windows.go @@ -0,0 +1,16 @@ +//go:build windows + +package handler + +import ( + "net/http" + + "github.com/gin-gonic/gin" +) + +// RunCommandWS 交互式 PTY 终端依赖 Unix PTY(见 terminal_ws_unix.go);Windows 暂不支持。 +func (h *TerminalHandler) RunCommandWS(c *gin.Context) { + c.JSON(http.StatusNotImplemented, gin.H{ + "error": "Interactive WebSocket terminal is not supported on Windows; use POST /terminal/run or /terminal/run/stream instead.", + }) +} diff --git a/internal/security/executor.go b/internal/security/executor.go index 2092bf8b..657d583d 100644 --- a/internal/security/executor.go +++ b/internal/security/executor.go @@ -162,9 +162,8 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st output, err = runCommandWithPTY(ctx, cmd2, cb) } } else { - outputBytes, err2 := cmd.CombinedOutput() - output = string(outputBytes) - err = err2 + // 非流式:内存缓冲 + ctx 取消杀进程组;行为对齐原 CombinedOutput,避免双流管道 fan-in 死锁。 + output, err = combinedOutputCancellable(ctx, cmd) if err != nil && shouldRetryWithPTY(output) { e.logger.Info("检测到工具需要 TTY,使用 PTY 重试", zap.String("tool", toolName), @@ -981,9 +980,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int output, err = runCommandWithPTY(ctx, cmd2, cb) } } else { - outputBytes, err2 := cmd.CombinedOutput() - output = string(outputBytes) - err = err2 + output, err = combinedOutputCancellable(ctx, cmd) if err != nil && shouldRetryWithPTY(output) { e.logger.Info("检测到系统命令需要 TTY,使用 PTY 重试") cmd2 := exec.CommandContext(ctx, shell, "-c", command) @@ -1027,6 +1024,57 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int }, nil } +// combinedOutputCancellable 行为对齐 cmd.CombinedOutput(stdout/stderr 写入内存缓冲), +// 但在 ctx 取消时 terminateCmdTree 终止整棵进程树。 +// 非流式路径不使用双流管道 fan-in,避免 stderr 撑满管道缓冲区时与 stdout 互相阻塞导致死锁。 +// 无输出空闲检测由上层 agent.tool_timeout_minutes 兜底,不改变原 CombinedOutput 语义。 +func combinedOutputCancellable(ctx context.Context, cmd *exec.Cmd) (string, error) { + if err := prepareShellCmdSession(cmd); err != nil { + return "", err + } + var stdoutBuf, stderrBuf strings.Builder + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + if err := cmd.Start(); err != nil { + return "", err + } + + done := make(chan error, 1) + go func() { + done <- cmd.Wait() + }() + + stopWatch := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + terminateCmdTree(cmd) + case <-stopWatch: + } + }() + defer close(stopWatch) + + var waitErr error + select { + case waitErr = <-done: + case <-ctx.Done(): + waitErr = <-done + return joinCommandOutput(stdoutBuf.String(), stderrBuf.String()), ctx.Err() + } + return joinCommandOutput(stdoutBuf.String(), stderrBuf.String()), waitErr +} + +func joinCommandOutput(stdout, stderr string) string { + if stderr == "" { + return stdout + } + if stdout == "" { + return stderr + } + return stdout + stderr +} + // streamCommandOutput 以“边读边回调”的方式读取命令 stdout/stderr。 // 使用定长块读取,避免按行读取在无换行输出时永久阻塞;ctx 取消时终止进程树。 func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback, noOutputSec int) (string, error) { @@ -1091,7 +1139,9 @@ func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallba if deltaBuilder.Len() == 0 { return } - cb(deltaBuilder.String()) + if cb != nil { + cb(deltaBuilder.String()) + } deltaBuilder.Reset() lastFlush = time.Now() } @@ -1118,6 +1168,11 @@ chunksLoop: idleCh = idleWatch.Expired } select { + case <-ctx.Done(): + terminateCmdTree(cmd) + flush() + _ = cmd.Wait() + return outBuilder.String(), ctx.Err() case <-idleCh: fireInactivity() return outBuilder.String(), fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec) diff --git a/internal/security/executor_test.go b/internal/security/executor_test.go index fa24d6d0..712674ea 100644 --- a/internal/security/executor_test.go +++ b/internal/security/executor_test.go @@ -2,6 +2,8 @@ package security import ( "context" + "os/exec" + "runtime" "strings" "testing" "time" @@ -147,3 +149,33 @@ func indexOf(slice []string, s string) int { } return -1 } + +// TestCombinedOutputCancellable_ContextCancelKillsTree 验证 ctx 取消时能在数秒内结束(杀进程组,非挂死)。 +func TestCombinedOutputCancellable_ContextCancelKillsTree(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("unix process group kill") + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cmd := exec.CommandContext(ctx, "sh", "-c", "sleep 300") + ConfigureShellCmdForAgentExecute(cmd) + + done := make(chan error, 1) + go func() { + _, err := combinedOutputCancellable(ctx, cmd) + done <- err + }() + + time.Sleep(150 * time.Millisecond) + cancel() + + select { + case err := <-done: + if err == nil { + t.Fatal("expected context cancel error") + } + case <-time.After(5 * time.Second): + t.Fatal("combinedOutputCancellable did not return within 5s after context cancel") + } +} diff --git a/internal/security/procattr_windows.go b/internal/security/procattr_windows.go index df7e2eda..b7d5c720 100644 --- a/internal/security/procattr_windows.go +++ b/internal/security/procattr_windows.go @@ -2,16 +2,32 @@ package security -import "os/exec" +import ( + "os/exec" + "strconv" + "syscall" +) func prepareShellCmdSession(cmd *exec.Cmd) error { - _ = cmd + if cmd == nil { + return nil + } + // 独立进程组,便于 taskkill /T 终止整棵子进程树。 + if cmd.SysProcAttr == nil { + cmd.SysProcAttr = &syscall.SysProcAttr{} + } + cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP return nil } +// terminateCmdTree 使用 taskkill /F /T 终止进程及其子进程(Windows 上 Process.Kill 无法保证杀掉 python 等孙进程)。 func terminateCmdTree(cmd *exec.Cmd) { if cmd == nil || cmd.Process == nil { return } - _ = cmd.Process.Kill() + pid := cmd.Process.Pid + tk := exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(pid)) + if err := tk.Run(); err != nil { + _ = cmd.Process.Kill() + } }