Add files via upload

This commit is contained in:
公明
2026-06-26 23:11:52 +08:00
committed by GitHub
parent e537236bf3
commit c91806c0c4
5 changed files with 131 additions and 11 deletions
+2 -1
View File
@@ -1360,6 +1360,7 @@ func (h *AgentHandler) cancelToolContinueAfter(conversationID, preferredExecID,
func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
var req struct {
ConversationID string `json:"conversationId" binding:"required"`
ExecutionID string `json:"executionId,omitempty"`
Reason string `json:"reason,omitempty"`
ContinueAfter bool `json:"continueAfter,omitempty"`
}
@@ -1376,7 +1377,7 @@ func (h *AgentHandler) CancelAgentLoop(c *gin.Context) {
}
note := strings.TrimSpace(req.Reason)
activeExec := strings.TrimSpace(h.tasks.ActiveMCPExecutionID(req.ConversationID))
if ok, payload := h.cancelToolContinueAfter(req.ConversationID, "", note); ok {
if ok, payload := h.cancelToolContinueAfter(req.ConversationID, strings.TrimSpace(req.ExecutionID), note); ok {
execID, _ := payload["executionId"].(string)
h.logger.Info("对话页仅终止当前工具",
zap.String("conversationId", req.ConversationID),
+16
View File
@@ -0,0 +1,16 @@
//go:build windows
package handler
import (
"net/http"
"github.com/gin-gonic/gin"
)
// RunCommandWS 交互式 PTY 终端依赖 Unix PTY(见 terminal_ws_unix.go);Windows 暂不支持。
func (h *TerminalHandler) RunCommandWS(c *gin.Context) {
c.JSON(http.StatusNotImplemented, gin.H{
"error": "Interactive WebSocket terminal is not supported on Windows; use POST /terminal/run or /terminal/run/stream instead.",
})
}
+62 -7
View File
@@ -162,9 +162,8 @@ func (e *Executor) ExecuteTool(ctx context.Context, toolName string, args map[st
output, err = runCommandWithPTY(ctx, cmd2, cb)
}
} else {
outputBytes, err2 := cmd.CombinedOutput()
output = string(outputBytes)
err = err2
// 非流式:内存缓冲 + ctx 取消杀进程组;行为对齐原 CombinedOutput,避免双流管道 fan-in 死锁。
output, err = combinedOutputCancellable(ctx, cmd)
if err != nil && shouldRetryWithPTY(output) {
e.logger.Info("检测到工具需要 TTY,使用 PTY 重试",
zap.String("tool", toolName),
@@ -981,9 +980,7 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
output, err = runCommandWithPTY(ctx, cmd2, cb)
}
} else {
outputBytes, err2 := cmd.CombinedOutput()
output = string(outputBytes)
err = err2
output, err = combinedOutputCancellable(ctx, cmd)
if err != nil && shouldRetryWithPTY(output) {
e.logger.Info("检测到系统命令需要 TTY,使用 PTY 重试")
cmd2 := exec.CommandContext(ctx, shell, "-c", command)
@@ -1027,6 +1024,57 @@ func (e *Executor) executeSystemCommand(ctx context.Context, args map[string]int
}, nil
}
// combinedOutputCancellable 行为对齐 cmd.CombinedOutputstdout/stderr 写入内存缓冲),
// 但在 ctx 取消时 terminateCmdTree 终止整棵进程树。
// 非流式路径不使用双流管道 fan-in,避免 stderr 撑满管道缓冲区时与 stdout 互相阻塞导致死锁。
// 无输出空闲检测由上层 agent.tool_timeout_minutes 兜底,不改变原 CombinedOutput 语义。
func combinedOutputCancellable(ctx context.Context, cmd *exec.Cmd) (string, error) {
if err := prepareShellCmdSession(cmd); err != nil {
return "", err
}
var stdoutBuf, stderrBuf strings.Builder
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
if err := cmd.Start(); err != nil {
return "", err
}
done := make(chan error, 1)
go func() {
done <- cmd.Wait()
}()
stopWatch := make(chan struct{})
go func() {
select {
case <-ctx.Done():
terminateCmdTree(cmd)
case <-stopWatch:
}
}()
defer close(stopWatch)
var waitErr error
select {
case waitErr = <-done:
case <-ctx.Done():
waitErr = <-done
return joinCommandOutput(stdoutBuf.String(), stderrBuf.String()), ctx.Err()
}
return joinCommandOutput(stdoutBuf.String(), stderrBuf.String()), waitErr
}
func joinCommandOutput(stdout, stderr string) string {
if stderr == "" {
return stdout
}
if stdout == "" {
return stderr
}
return stdout + stderr
}
// streamCommandOutput 以“边读边回调”的方式读取命令 stdout/stderr。
// 使用定长块读取,避免按行读取在无换行输出时永久阻塞;ctx 取消时终止进程树。
func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallback, noOutputSec int) (string, error) {
@@ -1091,7 +1139,9 @@ func streamCommandOutput(ctx context.Context, cmd *exec.Cmd, cb ToolOutputCallba
if deltaBuilder.Len() == 0 {
return
}
cb(deltaBuilder.String())
if cb != nil {
cb(deltaBuilder.String())
}
deltaBuilder.Reset()
lastFlush = time.Now()
}
@@ -1118,6 +1168,11 @@ chunksLoop:
idleCh = idleWatch.Expired
}
select {
case <-ctx.Done():
terminateCmdTree(cmd)
flush()
_ = cmd.Wait()
return outBuilder.String(), ctx.Err()
case <-idleCh:
fireInactivity()
return outBuilder.String(), fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec)
+32
View File
@@ -2,6 +2,8 @@ package security
import (
"context"
"os/exec"
"runtime"
"strings"
"testing"
"time"
@@ -147,3 +149,33 @@ func indexOf(slice []string, s string) int {
}
return -1
}
// TestCombinedOutputCancellable_ContextCancelKillsTree 验证 ctx 取消时能在数秒内结束(杀进程组,非挂死)。
func TestCombinedOutputCancellable_ContextCancelKillsTree(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("unix process group kill")
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := exec.CommandContext(ctx, "sh", "-c", "sleep 300")
ConfigureShellCmdForAgentExecute(cmd)
done := make(chan error, 1)
go func() {
_, err := combinedOutputCancellable(ctx, cmd)
done <- err
}()
time.Sleep(150 * time.Millisecond)
cancel()
select {
case err := <-done:
if err == nil {
t.Fatal("expected context cancel error")
}
case <-time.After(5 * time.Second):
t.Fatal("combinedOutputCancellable did not return within 5s after context cancel")
}
}
+19 -3
View File
@@ -2,16 +2,32 @@
package security
import "os/exec"
import (
"os/exec"
"strconv"
"syscall"
)
func prepareShellCmdSession(cmd *exec.Cmd) error {
_ = cmd
if cmd == nil {
return nil
}
// 独立进程组,便于 taskkill /T 终止整棵子进程树。
if cmd.SysProcAttr == nil {
cmd.SysProcAttr = &syscall.SysProcAttr{}
}
cmd.SysProcAttr.CreationFlags = syscall.CREATE_NEW_PROCESS_GROUP
return nil
}
// terminateCmdTree 使用 taskkill /F /T 终止进程及其子进程(Windows 上 Process.Kill 无法保证杀掉 python 等孙进程)。
func terminateCmdTree(cmd *exec.Cmd) {
if cmd == nil || cmd.Process == nil {
return
}
_ = cmd.Process.Kill()
pid := cmd.Process.Pid
tk := exec.Command("taskkill", "/F", "/T", "/PID", strconv.Itoa(pid))
if err := tk.Run(); err != nil {
_ = cmd.Process.Kill()
}
}