Add files via upload

This commit is contained in:
公明
2026-05-13 16:36:09 +08:00
committed by GitHub
parent c94a9fd9e9
commit c74e20c54a
5 changed files with 118 additions and 18 deletions
+22 -2
View File
@@ -15,8 +15,8 @@ import (
"cyberstrike-ai/internal/agent"
"cyberstrike-ai/internal/config"
"cyberstrike-ai/internal/einoobserve"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/einoobserve"
"cyberstrike-ai/internal/openai"
"github.com/cloudwego/eino/adk"
@@ -267,7 +267,16 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
isErr := !success || invokeErr != nil
body := content
if invokeErr != nil {
body = invokeErr.Error()
// 保留已流式累计的 stdout(如 execute 超时前的一半输出),避免 tool_result 只剩错误串、模型与 UI 丢失上下文
tail := friendlyEinoExecuteInvokeTail(invokeErr)
// execute 流式包装可能已把超时句写入 content(供 ADK tool 与流式 delta);勿重复拼接
if tail != "" && strings.Contains(content, tail) {
body = content
} else if strings.TrimSpace(content) != "" {
body = strings.TrimRight(content, "\n") + "\n\n" + tail
} else {
body = tail
}
isErr = true
}
recordPendingExecuteStdoutDup(toolName, body, isErr)
@@ -948,6 +957,17 @@ func einoPartialRunLastOutputHint() string {
"[Run ended abnormally; continue from the trace above without repeating completed steps.]"
}
// friendlyEinoExecuteInvokeTail 将 Eino execute 等非 MCP 路径的结尾错误转成简短提示;其它情况保留原 error 文本。
func friendlyEinoExecuteInvokeTail(invokeErr error) string {
if invokeErr == nil {
return ""
}
if errors.Is(invokeErr, context.DeadlineExceeded) {
return einoExecuteTimeoutUserHint()
}
return "[执行未正常结束] " + invokeErr.Error()
}
func buildEinoRunResultFromAccumulated(
orchMode string,
runAccumulatedMsgs []adk.Message,
@@ -6,6 +6,7 @@ import (
"fmt"
"io"
"strings"
"time"
"cyberstrike-ai/internal/einomcp"
"cyberstrike-ai/internal/security"
@@ -15,6 +16,24 @@ import (
"github.com/cloudwego/eino/schema"
)
// prependPythonUnbufferedEnv 为 /bin/sh -c 注入 PYTHONUNBUFFERED=1。
// eino-ext local 对流式 stdout 使用 bufio 按「行」推送;python3 写管道时默认块缓冲,print 长期留在用户态缓冲,
// 管道里收不到换行,表现为长时间无输出直至超时或退出。若命令里已出现 PYTHONUNBUFFERED 则不再覆盖。
func prependPythonUnbufferedEnv(shellCommand string) string {
if strings.TrimSpace(shellCommand) == "" {
return shellCommand
}
if strings.Contains(strings.ToUpper(shellCommand), "PYTHONUNBUFFERED") {
return shellCommand
}
return "export PYTHONUNBUFFERED=1\n" + shellCommand
}
// einoExecuteTimeoutUserHint 与写入 ADK 工具消息(模型可见)及 SSE tool_result 尾标一致。
func einoExecuteTimeoutUserHint() string {
return "已超时终止 · Timed out"
}
// einoStreamingShellWrap 包装 Eino filesystem 使用的 StreamingShellcloudwego eino-ext local.Local)。
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
@@ -29,6 +48,10 @@ type einoStreamingShellWrap struct {
inner filesystem.StreamingShell
invokeNotify *einomcp.ToolInvokeNotifyHolder
einoAgentName string
// outputChunk 可选;非 nil 时在收到内层 ExecuteResponse 片段时推送,与 MCP 工具的 tool_result_delta 一致(需有效 toolCallId)。
outputChunk func(toolName, toolCallID, chunk string)
// toolTimeoutMinutes 与 agent.tool_timeout_minutes 对齐;>0 时对单次 execute 套用 context 超时(与 MCP 工具经 executeToolViaMCP 行为一致)。0 表示仅依赖上层 ctx(如整任务 10h 上限)。
toolTimeoutMinutes int
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。
recordMonitor func(command, stdout string, success bool, invokeErr error)
}
@@ -41,17 +64,27 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
return w.inner.ExecuteStreaming(ctx, nil)
}
req := *input
cmd := strings.TrimSpace(req.Command)
userCmd := strings.TrimSpace(req.Command)
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
req.RunInBackendGround = true
}
req.Command = prependPythonUnbufferedEnv(req.Command)
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
agentTag := strings.TrimSpace(w.einoAgentName)
sr, err := w.inner.ExecuteStreaming(ctx, &req)
execCtx := ctx
var execCancel context.CancelFunc
if w.toolTimeoutMinutes > 0 {
execCtx, execCancel = context.WithTimeout(ctx, time.Duration(w.toolTimeoutMinutes)*time.Minute)
}
sr, err := w.inner.ExecuteStreaming(execCtx, &req)
if err != nil {
if execCancel != nil {
execCancel()
}
if w.recordMonitor != nil {
w.recordMonitor(cmd, "", false, err)
w.recordMonitor(userCmd, "", false, err)
}
if w.invokeNotify != nil && tid != "" {
w.invokeNotify.Fire(tid, "execute", agentTag, false, "", err)
@@ -59,13 +92,19 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
return nil, err
}
if sr == nil || w.invokeNotify == nil || tid == "" {
if execCancel != nil {
execCancel()
}
return sr, nil
}
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) {
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string, cancel context.CancelFunc, tctx context.Context) {
defer inner.Close()
if cancel != nil {
defer cancel()
}
var sb strings.Builder
const maxCapture = 16 * 1024
@@ -90,12 +129,18 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
hasExitCode = true
exitCode = *resp.ExitCode
}
var appended string
if remain := maxCapture - sb.Len(); remain > 0 {
out := resp.Output
if len(out) > remain {
out = out[:remain]
}
sb.WriteString(out)
appended = out
}
// 仅推送写入 sb 的片段,与末尾 Fire/recordMonitor 的截断累计一致,避免最终 tool_result 短于已展示增量。
if w.outputChunk != nil && strings.TrimSpace(appended) != "" {
w.outputChunk("execute", tid, appended)
}
if outW.Send(resp, nil) {
success = false
@@ -109,12 +154,33 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
success = false
invokeErr = fmt.Errorf("execute exited with code %d", exitCode)
}
// WithTimeout 触发后,子进程常被信号结束,local 侧多报 exit -1 / canceled,错误链里不一定带 DeadlineExceeded。
// 用执行所用 ctx 归一化,便于 UI 展示「超时」而非含糊的 -1。
if tctx != nil && errors.Is(tctx.Err(), context.DeadlineExceeded) {
success = false
invokeErr = context.DeadlineExceeded
}
// ADK 从本 Pipe 拼出 tool 消息正文;仅 Notify 尾标不会进入模型上下文。超时句写入流,与 UI 一致。
if invokeErr != nil && errors.Is(invokeErr, context.DeadlineExceeded) {
hint := "\n\n" + einoExecuteTimeoutUserHint() + "\n"
_ = outW.Send(&filesystem.ExecuteResponse{Output: hint}, nil)
if w.outputChunk != nil && tid != "" {
w.outputChunk("execute", tid, hint)
}
if remain := maxCapture - sb.Len(); remain > 0 {
h := hint
if len(h) > remain {
h = h[:remain]
}
sb.WriteString(h)
}
}
if w.recordMonitor != nil {
w.recordMonitor(command, sb.String(), success, invokeErr)
}
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
outW.Close()
}(sr, cmd)
}(sr, userCmd, execCancel, execCtx)
return outR, nil
}
+1 -1
View File
@@ -143,7 +143,7 @@ func RunEinoSingleChatModelAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor)
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk)
if fsErr != nil {
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
}
+16 -4
View File
@@ -82,6 +82,8 @@ func subAgentFilesystemMiddleware(
invokeNotify *einomcp.ToolInvokeNotifyHolder,
einoAgentName string,
recordMonitor func(command, stdout string, success bool, invokeErr error),
toolTimeoutMinutes int,
outputChunk func(toolName, toolCallID, chunk string),
) (adk.ChatModelAgentMiddleware, error) {
if loc == nil {
return nil, nil
@@ -89,10 +91,20 @@ func subAgentFilesystemMiddleware(
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
Backend: loc,
StreamingShell: &einoStreamingShellWrap{
inner: loc,
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
recordMonitor: recordMonitor,
inner: loc,
invokeNotify: invokeNotify,
einoAgentName: strings.TrimSpace(einoAgentName),
outputChunk: outputChunk,
recordMonitor: recordMonitor,
toolTimeoutMinutes: toolTimeoutMinutes,
},
})
}
// agentToolTimeoutMinutes 返回 agent.tool_timeout_minutes(与 executeToolViaMCP 一致);cfg 为 nil 时 0。
func agentToolTimeoutMinutes(cfg *config.Config) int {
if cfg == nil {
return 0
}
return cfg.Agent.ToolTimeoutMinutes
}
+8 -6
View File
@@ -244,7 +244,7 @@ func RunDeepAgent(
}
if einoSkillMW != nil {
if einoFSTools && einoLoc != nil {
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor)
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk)
if fsErr != nil {
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
}
@@ -376,10 +376,12 @@ func RunDeepAgent(
if einoLoc != nil && einoFSTools {
deepBackend = einoLoc
deepShell = &einoStreamingShellWrap{
inner: einoLoc,
invokeNotify: toolInvokeNotify,
einoAgentName: orchestratorName,
recordMonitor: einoExecMonitor,
inner: einoLoc,
invokeNotify: toolInvokeNotify,
einoAgentName: orchestratorName,
outputChunk: toolOutputChunk,
recordMonitor: einoExecMonitor,
toolTimeoutMinutes: agentToolTimeoutMinutes(appCfg),
}
}
@@ -443,7 +445,7 @@ func RunDeepAgent(
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
var peFsMw adk.ChatModelAgentMiddleware
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor)
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor, agentToolTimeoutMinutes(appCfg), toolOutputChunk)
if err != nil {
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
}