mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-06-24 14:59:59 +02:00
Add files via upload
This commit is contained in:
@@ -110,6 +110,7 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
|
||||
|
||||
// 创建安全工具执行器
|
||||
executor := security.NewExecutor(&cfg.Security, mcpServer, log.Logger)
|
||||
executor.SetShellNoOutputTimeoutSeconds(cfg.Agent.ShellNoOutputTimeoutSeconds)
|
||||
|
||||
// 注册工具
|
||||
executor.RegisterTools(mcpServer)
|
||||
@@ -333,6 +334,8 @@ func New(cfg *config.Config, log *logger.Logger, configPath string) (*App, error
|
||||
monitorHandler.SetAudit(auditSvc)
|
||||
monitorHandler.SetMonitorRetention(monitorRetention)
|
||||
monitorHandler.SetExternalMCPManager(externalMCPMgr) // 设置外部MCP管理器,以便获取外部MCP执行记录
|
||||
monitorHandler.SetTaskManager(agentHandler.TaskManager())
|
||||
monitorHandler.SetAgentHandler(agentHandler)
|
||||
notificationHandler := handler.NewNotificationHandler(db, agentHandler, log.Logger)
|
||||
groupHandler := handler.NewGroupHandler(db, log.Logger)
|
||||
authHandler := handler.NewAuthHandler(authManager, cfg, configPath, log.Logger)
|
||||
|
||||
@@ -196,6 +196,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
pendingByID[tc.ToolCallID] = tc
|
||||
pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID)
|
||||
}
|
||||
markPendingWithMonitor := func(tc toolCallPendingInfo) {
|
||||
markPending(tc)
|
||||
beginEinoADKFilesystemToolMonitor(
|
||||
args.FilesystemMonitorAgent,
|
||||
args.FilesystemMonitorRecord,
|
||||
args.MCPExecutionBinder,
|
||||
tc.ToolCallID,
|
||||
tc.ToolName,
|
||||
)
|
||||
}
|
||||
popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) {
|
||||
pendingMu.Lock()
|
||||
defer pendingMu.Unlock()
|
||||
@@ -331,7 +341,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
toolCallID = tid
|
||||
}
|
||||
recordPendingExecuteStdoutDup(toolName, content, isErr)
|
||||
recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, toolName, toolCallID, runAccumulatedMsgs, content, isErr)
|
||||
recordEinoADKFilesystemToolMonitor(args.FilesystemMonitorAgent, args.FilesystemMonitorRecord, args.MCPExecutionBinder, toolName, toolCallID, runAccumulatedMsgs, content, isErr)
|
||||
if args.FilesystemMonitorAgent != nil && args.MCPExecutionBinder != nil {
|
||||
if execID := args.MCPExecutionBinder.ExecutionID(toolCallID); execID != "" {
|
||||
args.FilesystemMonitorAgent.UpdateMCPExecutionDisplayResult(execID, content)
|
||||
@@ -974,7 +984,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
||||
lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged})
|
||||
}
|
||||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
|
||||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor)
|
||||
// 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。
|
||||
if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 {
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls))
|
||||
@@ -1009,7 +1019,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
continue
|
||||
}
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
|
||||
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
|
||||
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPendingWithMonitor)
|
||||
|
||||
if mv.Role == schema.Assistant {
|
||||
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
|
||||
|
||||
@@ -7,11 +7,25 @@ import (
|
||||
"cyberstrike-ai/internal/einomcp"
|
||||
)
|
||||
|
||||
// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId),
|
||||
// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。
|
||||
func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(toolCallID, command, stdout string, success bool, invokeErr error) {
|
||||
return func(toolCallID, command, stdout string, success bool, invokeErr error) {
|
||||
if ag == nil || recorder == nil {
|
||||
// newEinoExecuteMonitorCallbacks 在 Eino filesystem execute 开始/结束时写入 MCP 监控库并 recorder(executionId),
|
||||
// 与 CallTool 路径一致,使监控页能展示「执行中」状态。
|
||||
func newEinoExecuteMonitorCallbacks(ag *agent.Agent, recorder einomcp.ExecutionRecorder) (
|
||||
begin func(toolCallID, command string) string,
|
||||
finish func(executionID, toolCallID, command, stdout string, success bool, invokeErr error),
|
||||
) {
|
||||
begin = func(toolCallID, command string) string {
|
||||
if ag == nil {
|
||||
return ""
|
||||
}
|
||||
args := map[string]interface{}{"command": command}
|
||||
id := ag.BeginLocalToolExecution("execute", args)
|
||||
if id != "" && recorder != nil {
|
||||
recorder(id, toolCallID)
|
||||
}
|
||||
return id
|
||||
}
|
||||
finish = func(executionID, toolCallID, command, stdout string, success bool, invokeErr error) {
|
||||
if ag == nil {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
@@ -23,9 +37,10 @@ func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRe
|
||||
}
|
||||
}
|
||||
args := map[string]interface{}{"command": command}
|
||||
id := ag.RecordLocalToolExecution("execute", args, stdout, err)
|
||||
if id != "" {
|
||||
id := ag.FinishLocalToolExecution(executionID, "execute", args, stdout, err)
|
||||
if id != "" && recorder != nil && executionID == "" {
|
||||
recorder(id, toolCallID)
|
||||
}
|
||||
}
|
||||
return begin, finish
|
||||
}
|
||||
|
||||
@@ -63,8 +63,11 @@ type einoStreamingShellWrap struct {
|
||||
outputChunk func(toolName, toolCallID, chunk string)
|
||||
// toolTimeoutMinutes 与 agent.tool_timeout_minutes 对齐;>0 时对单次 execute 套用 context 超时(与 MCP 工具经 executeToolViaMCP 行为一致)。0 表示仅依赖上层 ctx(如整任务 10h 上限)。
|
||||
toolTimeoutMinutes int
|
||||
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。
|
||||
recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error)
|
||||
// shellNoOutputTimeoutSec:无任何输出时的空闲秒数;0=关闭。
|
||||
shellNoOutputTimeoutSec int
|
||||
// beginMonitor 在 execute 开始时写入 running 状态;finishMonitor 在流结束后更新为 completed/failed。
|
||||
beginMonitor func(toolCallID, command string) string
|
||||
finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error)
|
||||
}
|
||||
|
||||
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||
@@ -76,15 +79,26 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
}
|
||||
req := *input
|
||||
userCmd := strings.TrimSpace(req.Command)
|
||||
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
|
||||
agentTag := strings.TrimSpace(w.einoAgentName)
|
||||
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
|
||||
req.RunInBackendGround = true
|
||||
}
|
||||
req.Command = prependPythonUnbufferedEnv(req.Command)
|
||||
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
|
||||
agentTag := strings.TrimSpace(w.einoAgentName)
|
||||
req.Command = security.PrepareNonInteractiveShellCommand(prependPythonUnbufferedEnv(req.Command))
|
||||
convID := mcp.MCPConversationIDFromContext(ctx)
|
||||
execReg := mcp.EinoExecuteRunRegistryFromContext(ctx)
|
||||
|
||||
var monitorExecID string
|
||||
if w.beginMonitor != nil {
|
||||
monitorExecID = w.beginMonitor(tid, userCmd)
|
||||
}
|
||||
if monitorExecID != "" && convID != "" {
|
||||
if toolReg := mcp.ToolRunRegistryFromContext(ctx); toolReg != nil {
|
||||
toolReg.RegisterRunningTool(convID, monitorExecID)
|
||||
}
|
||||
}
|
||||
toolRunReg := mcp.ToolRunRegistryFromContext(ctx)
|
||||
|
||||
execCtx, execCancel := context.WithCancel(ctx)
|
||||
var timeoutCancel context.CancelFunc
|
||||
if w.toolTimeoutMinutes > 0 {
|
||||
@@ -104,23 +118,23 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
}
|
||||
if einoExecuteRecvErrIsToolTimeout(err, execCtx) {
|
||||
hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n"
|
||||
if w.recordMonitor != nil {
|
||||
w.recordMonitor(tid, userCmd, hint, false, context.DeadlineExceeded)
|
||||
if w.finishMonitor != nil {
|
||||
w.finishMonitor(monitorExecID, tid, userCmd, hint, false, context.DeadlineExceeded)
|
||||
}
|
||||
if w.invokeNotify != nil && tid != "" {
|
||||
w.invokeNotify.Fire(tid, "execute", agentTag, false, hint, context.DeadlineExceeded)
|
||||
}
|
||||
return schema.StreamReaderFromArray([]*filesystem.ExecuteResponse{{Output: hint}}), nil
|
||||
}
|
||||
if w.recordMonitor != nil {
|
||||
w.recordMonitor(tid, userCmd, "", false, err)
|
||||
if w.finishMonitor != nil {
|
||||
w.finishMonitor(monitorExecID, tid, userCmd, "", false, err)
|
||||
}
|
||||
if w.invokeNotify != nil && tid != "" {
|
||||
w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if sr == nil || w.invokeNotify == nil {
|
||||
if sr == nil {
|
||||
if timeoutCancel != nil {
|
||||
timeoutCancel()
|
||||
}
|
||||
@@ -132,7 +146,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
|
||||
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
|
||||
|
||||
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry) {
|
||||
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, timeoutCleanup context.CancelFunc, tctx context.Context, conversationID string, reg mcp.EinoExecuteRunRegistry, toolReg mcp.ToolRunRegistry, execID string, toolCallID string, noOutputSec int) {
|
||||
var innerCloseOnce sync.Once
|
||||
closeInner := func() {
|
||||
innerCloseOnce.Do(func() { inner.Close() })
|
||||
@@ -147,6 +161,9 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
if reg != nil && conversationID != "" {
|
||||
defer reg.UnregisterActiveEinoExecute(conversationID)
|
||||
}
|
||||
if toolReg != nil && conversationID != "" && execID != "" {
|
||||
defer toolReg.UnregisterRunningTool(conversationID, execID)
|
||||
}
|
||||
|
||||
// ctx 取消时关闭内层流,避免 amass 等长时间无换行输出时 Recv 永久阻塞。
|
||||
stopWatch := make(chan struct{})
|
||||
@@ -165,43 +182,92 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
exitCode := 0
|
||||
hasExitCode := false
|
||||
|
||||
idleWatch := security.NewShellInactivityWatch(noOutputSec)
|
||||
if idleWatch != nil {
|
||||
defer idleWatch.Stop()
|
||||
}
|
||||
|
||||
type execRecvMsg struct {
|
||||
resp *filesystem.ExecuteResponse
|
||||
err error
|
||||
}
|
||||
recvCh := make(chan execRecvMsg, 1)
|
||||
go func() {
|
||||
for {
|
||||
resp, rerr := inner.Recv()
|
||||
recvCh <- execRecvMsg{resp: resp, err: rerr}
|
||||
if rerr != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
fireInactivityTimeout := func() {
|
||||
success = false
|
||||
invokeErr = fmt.Errorf("shell inactivity timeout (%ds)", idleWatch.Sec)
|
||||
msg := security.ShellNoOutputTimeoutMessage(idleWatch.Sec)
|
||||
_ = outW.Send(&filesystem.ExecuteResponse{Output: msg}, nil)
|
||||
sb.WriteString(msg)
|
||||
if w.outputChunk != nil && toolCallID != "" {
|
||||
w.outputChunk("execute", toolCallID, msg)
|
||||
}
|
||||
if cancel != nil {
|
||||
cancel()
|
||||
}
|
||||
closeInner()
|
||||
}
|
||||
|
||||
recvLoop:
|
||||
for {
|
||||
resp, rerr := inner.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
var idleCh <-chan struct{}
|
||||
if idleWatch != nil {
|
||||
idleCh = idleWatch.Expired
|
||||
}
|
||||
if rerr != nil {
|
||||
success = false
|
||||
invokeErr = rerr
|
||||
// 单次 execute 超时须与 MCP 工具一致:写入工具结果尾标、继续迭代,不得向 ADK 流注入硬错误。
|
||||
if einoExecuteRecvErrIsToolTimeout(rerr, tctx) {
|
||||
invokeErr = context.DeadlineExceeded
|
||||
break
|
||||
select {
|
||||
case <-idleCh:
|
||||
fireInactivityTimeout()
|
||||
break recvLoop
|
||||
case msg := <-recvCh:
|
||||
rerr := msg.err
|
||||
resp := msg.resp
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break recvLoop
|
||||
}
|
||||
if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) {
|
||||
invokeErr = context.Canceled
|
||||
break
|
||||
}
|
||||
_ = outW.Send(nil, rerr)
|
||||
break
|
||||
}
|
||||
if resp != nil {
|
||||
if resp.ExitCode != nil {
|
||||
hasExitCode = true
|
||||
exitCode = *resp.ExitCode
|
||||
}
|
||||
var appended string
|
||||
if resp.Output != "" {
|
||||
sb.WriteString(resp.Output)
|
||||
appended = resp.Output
|
||||
}
|
||||
if w.outputChunk != nil && strings.TrimSpace(appended) != "" {
|
||||
w.outputChunk("execute", tid, appended)
|
||||
}
|
||||
if outW.Send(resp, nil) {
|
||||
if rerr != nil {
|
||||
success = false
|
||||
invokeErr = fmt.Errorf("execute stream closed by consumer")
|
||||
break
|
||||
invokeErr = rerr
|
||||
if einoExecuteRecvErrIsToolTimeout(rerr, tctx) {
|
||||
invokeErr = context.DeadlineExceeded
|
||||
break recvLoop
|
||||
}
|
||||
if errors.Is(rerr, context.Canceled) || (tctx != nil && errors.Is(tctx.Err(), context.Canceled)) {
|
||||
invokeErr = context.Canceled
|
||||
break recvLoop
|
||||
}
|
||||
_ = outW.Send(nil, rerr)
|
||||
break recvLoop
|
||||
}
|
||||
if resp != nil {
|
||||
if resp.ExitCode != nil {
|
||||
hasExitCode = true
|
||||
exitCode = *resp.ExitCode
|
||||
}
|
||||
var appended string
|
||||
if resp.Output != "" {
|
||||
if idleWatch != nil {
|
||||
idleWatch.Bump()
|
||||
}
|
||||
sb.WriteString(resp.Output)
|
||||
appended = resp.Output
|
||||
}
|
||||
if w.outputChunk != nil && strings.TrimSpace(appended) != "" {
|
||||
w.outputChunk("execute", toolCallID, appended)
|
||||
}
|
||||
if outW.Send(resp, nil) {
|
||||
success = false
|
||||
invokeErr = fmt.Errorf("execute stream closed by consumer")
|
||||
break recvLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -248,12 +314,14 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
_ = outW.Send(&filesystem.ExecuteResponse{Output: text + "\n"}, nil)
|
||||
}
|
||||
}
|
||||
if w.recordMonitor != nil {
|
||||
w.recordMonitor(tid, command, sb.String(), success, invokeErr)
|
||||
if w.finishMonitor != nil {
|
||||
w.finishMonitor(execID, toolCallID, command, sb.String(), success, invokeErr)
|
||||
}
|
||||
if w.invokeNotify != nil {
|
||||
w.invokeNotify.Fire(toolCallID, "execute", agentTag, success, sb.String(), invokeErr)
|
||||
}
|
||||
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
|
||||
outW.Close()
|
||||
}(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg)
|
||||
}(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg, toolRunReg, monitorExecID, tid, w.shellNoOutputTimeoutSec)
|
||||
|
||||
return outR, nil
|
||||
}
|
||||
|
||||
@@ -19,9 +19,15 @@ type mockStreamingShell struct {
|
||||
immediateErr error
|
||||
recvErr error
|
||||
output string
|
||||
called bool
|
||||
lastCommand string
|
||||
}
|
||||
|
||||
func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||
m.called = true
|
||||
if input != nil {
|
||||
m.lastCommand = input.Command
|
||||
}
|
||||
if m.immediateErr != nil {
|
||||
return nil, m.immediateErr
|
||||
}
|
||||
@@ -38,6 +44,135 @@ func (m *mockStreamingShell) ExecuteStreaming(ctx context.Context, input *filesy
|
||||
return outR, nil
|
||||
}
|
||||
|
||||
func TestEinoStreamingShellWrap_PreparesNonInteractiveCommand(t *testing.T) {
|
||||
inner := &mockStreamingShell{output: "ok\n"}
|
||||
wrap := &einoStreamingShellWrap{inner: inner}
|
||||
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "echo ok"})
|
||||
if err != nil {
|
||||
t.Fatalf("ExecuteStreaming: %v", err)
|
||||
}
|
||||
defer sr.Close()
|
||||
for {
|
||||
_, rerr := sr.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if rerr != nil {
|
||||
t.Fatalf("recv: %v", rerr)
|
||||
}
|
||||
}
|
||||
if !strings.Contains(inner.lastCommand, "exec </dev/null") {
|
||||
t.Fatalf("missing stdin redirect in inner command: %q", inner.lastCommand)
|
||||
}
|
||||
if !strings.Contains(inner.lastCommand, "GIT_PAGER=cat") {
|
||||
t.Fatalf("missing pager export in inner command: %q", inner.lastCommand)
|
||||
}
|
||||
if !strings.Contains(inner.lastCommand, "PYTHONUNBUFFERED=1") {
|
||||
t.Fatalf("missing python unbuffer in inner command: %q", inner.lastCommand)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEinoStreamingShellWrap_NoOutputTimeout(t *testing.T) {
|
||||
inner := &mockStreamingShellHanging{}
|
||||
notify := einomcp.NewToolInvokeNotifyHolder()
|
||||
var fired string
|
||||
notify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
|
||||
fired = content
|
||||
})
|
||||
wrap := &einoStreamingShellWrap{
|
||||
inner: inner,
|
||||
invokeNotify: notify,
|
||||
shellNoOutputTimeoutSec: 1,
|
||||
}
|
||||
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"})
|
||||
if err != nil {
|
||||
t.Fatalf("ExecuteStreaming: %v", err)
|
||||
}
|
||||
defer sr.Close()
|
||||
var got strings.Builder
|
||||
for {
|
||||
resp, rerr := sr.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if rerr != nil {
|
||||
t.Fatalf("recv: %v", rerr)
|
||||
}
|
||||
if resp != nil {
|
||||
got.WriteString(resp.Output)
|
||||
}
|
||||
}
|
||||
if !inner.called {
|
||||
t.Fatal("inner shell should run (no command blacklist)")
|
||||
}
|
||||
out := got.String()
|
||||
if !strings.Contains(out, "没有新的输出") && !strings.Contains(out, "no new output") {
|
||||
t.Fatalf("expected inactivity timeout message, got: %q notify=%q", out, fired)
|
||||
}
|
||||
}
|
||||
|
||||
type mockStreamingShellPartialThenHang struct {
|
||||
called bool
|
||||
}
|
||||
|
||||
func (m *mockStreamingShellPartialThenHang) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||
m.called = true
|
||||
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
|
||||
go func() {
|
||||
_ = outW.Send(&filesystem.ExecuteResponse{Output: "[sudo] password:\n"}, nil)
|
||||
<-ctx.Done()
|
||||
outW.Close()
|
||||
}()
|
||||
return outR, nil
|
||||
}
|
||||
|
||||
func TestEinoStreamingShellWrap_InactivityAfterPartialOutput(t *testing.T) {
|
||||
inner := &mockStreamingShellPartialThenHang{}
|
||||
wrap := &einoStreamingShellWrap{
|
||||
inner: inner,
|
||||
shellNoOutputTimeoutSec: 1,
|
||||
}
|
||||
start := time.Now()
|
||||
sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"})
|
||||
if err != nil {
|
||||
t.Fatalf("ExecuteStreaming: %v", err)
|
||||
}
|
||||
defer sr.Close()
|
||||
var got strings.Builder
|
||||
for {
|
||||
resp, rerr := sr.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if rerr != nil {
|
||||
t.Fatalf("recv: %v", rerr)
|
||||
}
|
||||
if resp != nil {
|
||||
got.WriteString(resp.Output)
|
||||
}
|
||||
}
|
||||
if time.Since(start) > 5*time.Second {
|
||||
t.Fatalf("expected inactivity timeout ~1s, took %v", time.Since(start))
|
||||
}
|
||||
if !strings.Contains(got.String(), "没有新的输出") && !strings.Contains(got.String(), "no new output") {
|
||||
t.Fatalf("expected inactivity message, got: %q", got.String())
|
||||
}
|
||||
}
|
||||
|
||||
type mockStreamingShellHanging struct {
|
||||
called bool
|
||||
}
|
||||
|
||||
func (m *mockStreamingShellHanging) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||
m.called = true
|
||||
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4)
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
outW.Close()
|
||||
}()
|
||||
return outR, nil
|
||||
}
|
||||
|
||||
func TestEinoExecuteRecvErrIsToolTimeout(t *testing.T) {
|
||||
tctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
@@ -63,10 +63,43 @@ func toolCallArgsFromAccumulated(msgs []adk.Message, toolCallID, expectToolName
|
||||
return map[string]interface{}{}
|
||||
}
|
||||
|
||||
// beginEinoADKFilesystemToolMonitor 在 Eino ADK filesystem 工具开始调用时写入 running 状态。
|
||||
func beginEinoADKFilesystemToolMonitor(
|
||||
ag *agent.Agent,
|
||||
rec einomcp.ExecutionRecorder,
|
||||
binder *MCPExecutionBinder,
|
||||
toolCallID, toolName string,
|
||||
) {
|
||||
if ag == nil || rec == nil {
|
||||
return
|
||||
}
|
||||
name := strings.TrimSpace(toolName)
|
||||
if name == "" || strings.EqualFold(name, "execute") {
|
||||
return
|
||||
}
|
||||
if !isBuiltinEinoADKFilesystemToolName(name) {
|
||||
return
|
||||
}
|
||||
tid := strings.TrimSpace(toolCallID)
|
||||
if tid == "" {
|
||||
return
|
||||
}
|
||||
storedName := "eino_fs::" + strings.ToLower(name)
|
||||
id := ag.BeginLocalToolExecution(storedName, map[string]interface{}{})
|
||||
if id == "" {
|
||||
return
|
||||
}
|
||||
rec(id, tid)
|
||||
if binder != nil {
|
||||
binder.Bind(tid, id)
|
||||
}
|
||||
}
|
||||
|
||||
// recordEinoADKFilesystemToolMonitor 将 Eino ADK filesystem 中间件工具结果写入 MCP 监控(与 execute / MCP 桥芯片一致)。
|
||||
func recordEinoADKFilesystemToolMonitor(
|
||||
ag *agent.Agent,
|
||||
rec einomcp.ExecutionRecorder,
|
||||
binder *MCPExecutionBinder,
|
||||
toolName string,
|
||||
toolCallID string,
|
||||
msgs []adk.Message,
|
||||
@@ -94,8 +127,12 @@ func recordEinoADKFilesystemToolMonitor(
|
||||
invErr = errors.New(t)
|
||||
}
|
||||
}
|
||||
id := ag.RecordLocalToolExecution(storedName, args, resultText, invErr)
|
||||
if id != "" {
|
||||
execID := ""
|
||||
if binder != nil {
|
||||
execID = binder.ExecutionID(toolCallID)
|
||||
}
|
||||
id := ag.FinishLocalToolExecution(execID, storedName, args, resultText, invErr)
|
||||
if id != "" && execID == "" {
|
||||
rec(id, toolCallID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +81,7 @@ func RunEinoSingleChatModelAgent(
|
||||
}
|
||||
|
||||
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
|
||||
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||
einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder)
|
||||
mainDefs := ag.ToolsForRole(roleTools)
|
||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, nil, toolInvokeNotify, einoSingleAgentName)
|
||||
if err != nil {
|
||||
@@ -136,7 +136,7 @@ func RunEinoSingleChatModelAgent(
|
||||
}
|
||||
if einoSkillMW != nil {
|
||||
if einoFSTools && einoLoc != nil {
|
||||
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
|
||||
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
|
||||
if fsErr != nil {
|
||||
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
|
||||
}
|
||||
|
||||
@@ -81,8 +81,10 @@ func subAgentFilesystemMiddleware(
|
||||
loc *localbk.Local,
|
||||
invokeNotify *einomcp.ToolInvokeNotifyHolder,
|
||||
einoAgentName string,
|
||||
recordMonitor func(toolCallID, command, stdout string, success bool, invokeErr error),
|
||||
beginMonitor func(toolCallID, command string) string,
|
||||
finishMonitor func(executionID, toolCallID, command, stdout string, success bool, invokeErr error),
|
||||
toolTimeoutMinutes int,
|
||||
shellNoOutputTimeoutSec int,
|
||||
outputChunk func(toolName, toolCallID, chunk string),
|
||||
) (adk.ChatModelAgentMiddleware, error) {
|
||||
if loc == nil {
|
||||
@@ -91,12 +93,14 @@ func subAgentFilesystemMiddleware(
|
||||
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
|
||||
Backend: loc,
|
||||
StreamingShell: &einoStreamingShellWrap{
|
||||
inner: loc,
|
||||
invokeNotify: invokeNotify,
|
||||
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||
outputChunk: outputChunk,
|
||||
recordMonitor: recordMonitor,
|
||||
toolTimeoutMinutes: toolTimeoutMinutes,
|
||||
inner: loc,
|
||||
invokeNotify: invokeNotify,
|
||||
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||
outputChunk: outputChunk,
|
||||
beginMonitor: beginMonitor,
|
||||
finishMonitor: finishMonitor,
|
||||
toolTimeoutMinutes: toolTimeoutMinutes,
|
||||
shellNoOutputTimeoutSec: shellNoOutputTimeoutSec,
|
||||
},
|
||||
})
|
||||
}
|
||||
@@ -108,3 +112,18 @@ func agentToolTimeoutMinutes(cfg *config.Config) int {
|
||||
}
|
||||
return cfg.Agent.ToolTimeoutMinutes
|
||||
}
|
||||
|
||||
// agentShellNoOutputTimeoutSeconds:0=默认 300s(5 分钟);-1=关闭;>0=自定义秒数。
|
||||
func agentShellNoOutputTimeoutSeconds(cfg *config.Config) int {
|
||||
if cfg == nil {
|
||||
return 300
|
||||
}
|
||||
v := cfg.Agent.ShellNoOutputTimeoutSeconds
|
||||
if v < 0 {
|
||||
return 0
|
||||
}
|
||||
if v == 0 {
|
||||
return 300
|
||||
}
|
||||
return v
|
||||
}
|
||||
|
||||
@@ -120,7 +120,7 @@ func RunDeepAgent(
|
||||
mcpIDs = append(mcpIDs, id)
|
||||
mcpIDsMu.Unlock()
|
||||
}
|
||||
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||
einoExecBegin, einoExecFinish := newEinoExecuteMonitorCallbacks(ag, recorder)
|
||||
|
||||
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
|
||||
snapshotMCPIDs := func() []string {
|
||||
@@ -223,7 +223,7 @@ func RunDeepAgent(
|
||||
}
|
||||
if einoSkillMW != nil {
|
||||
if einoFSTools && einoLoc != nil {
|
||||
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
|
||||
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
|
||||
if fsErr != nil {
|
||||
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
|
||||
}
|
||||
@@ -358,12 +358,14 @@ func RunDeepAgent(
|
||||
if einoLoc != nil && einoFSTools {
|
||||
deepBackend = einoLoc
|
||||
deepShell = &einoStreamingShellWrap{
|
||||
inner: einoLoc,
|
||||
invokeNotify: toolInvokeNotify,
|
||||
einoAgentName: orchestratorName,
|
||||
outputChunk: nil,
|
||||
recordMonitor: einoExecMonitor,
|
||||
toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg),
|
||||
inner: einoLoc,
|
||||
invokeNotify: toolInvokeNotify,
|
||||
einoAgentName: orchestratorName,
|
||||
outputChunk: nil,
|
||||
beginMonitor: einoExecBegin,
|
||||
finishMonitor: einoExecFinish,
|
||||
toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg),
|
||||
shellNoOutputTimeoutSec: agentShellNoOutputTimeoutSeconds(appCfg),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -428,7 +430,7 @@ func RunDeepAgent(
|
||||
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
|
||||
var peFsMw adk.ChatModelAgentMiddleware
|
||||
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
|
||||
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor, agentToolTimeoutMinutes(appCfg), nil)
|
||||
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecBegin, einoExecFinish, agentToolTimeoutMinutes(appCfg), agentShellNoOutputTimeoutSeconds(appCfg), nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user