From b029d88359d817ef71a19385e615b618f28ff13b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Wed, 24 Jun 2026 18:14:04 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 52 +++++--- .../eino_execute_failure_format_test.go | 114 ++++++++++++++++++ .../multiagent/eino_execute_streaming_wrap.go | 18 ++- internal/multiagent/eino_skills.go | 3 +- .../multiagent/eino_tool_name_injection.go | 4 + internal/multiagent/execute_exit_error.go | 15 +++ .../multiagent/orchestrator_instruction.go | 5 +- internal/multiagent/runner.go | 3 +- internal/multiagent/shell_tool_guidance.go | 33 +++++ .../multiagent/shell_tool_guidance_test.go | 17 +++ 10 files changed, 245 insertions(+), 19 deletions(-) create mode 100644 internal/multiagent/eino_execute_failure_format_test.go create mode 100644 internal/multiagent/execute_exit_error.go create mode 100644 internal/multiagent/shell_tool_guidance.go create mode 100644 internal/multiagent/shell_tool_guidance_test.go diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index cb820d4f..4c7bbc08 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -18,6 +18,7 @@ import ( "cyberstrike-ai/internal/einomcp" "cyberstrike-ai/internal/einoobserve" "cyberstrike-ai/internal/openai" + "cyberstrike-ai/internal/security" "github.com/cloudwego/eino/adk" "github.com/cloudwego/eino/schema" @@ -354,10 +355,9 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs // Eino execute / MCP 桥在工具返回时 Fire;若 ADK schema.Tool 事件迟迟不到,此处立即推送 // tool_result 解除 UI「执行中」。tryEmitToolResultProgress 经 toolResultSent 去重,ADK 晚到不重复。 isErr := !success || invokeErr != nil - body := content - if strings.HasPrefix(body, einomcp.ToolErrorPrefix) { + body := einoToolResultBody(content) + if einoToolResultIsError(toolName, content) { isErr = true - body = strings.TrimPrefix(body, einomcp.ToolErrorPrefix) } if tail := friendlyEinoExecuteInvokeTail(invokeErr); tail != "" { if body == "" { @@ -718,11 +718,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if mv.IsStreaming && mv.MessageStream != nil && mv.Role == schema.Tool { toolName := strings.TrimSpace(mv.ToolName) content, streamToolCallID, toolStreamRecvErr := recvSchemaMessageStream(ctx, mv.MessageStream) - isErr := false - if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { - isErr = true - content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) - } + isErr := einoToolResultIsError(toolName, content) + content = einoToolResultBody(content) if streamToolCallID != "" { opts := []schema.ToolMessageOption{schema.WithToolName(toolName)} runAccumulatedMsgs = append(runAccumulatedMsgs, schema.ToolMessage(content, streamToolCallID, opts...)) @@ -1094,11 +1091,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } content := msg.Content - isErr := false - if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { - isErr = true - content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix) - } + isErr := einoToolResultIsError(toolName, content) + content = einoToolResultBody(content) toolCallID := strings.TrimSpace(msg.ToolCallID) tryEmitToolResultProgress(toolName, content, toolCallID, isErr, ev.AgentName) @@ -1131,17 +1125,47 @@ func einoPartialRunLastOutputHint() string { "[Run ended abnormally; continue from the trace above without repeating completed steps.]" } -// friendlyEinoExecuteInvokeTail 将 Eino execute 等非 MCP 路径的结尾错误转成简短提示;其它情况保留原 error 文本。 +// friendlyEinoExecuteInvokeTail 将 Eino execute 超时/中断/流异常转为简短提示。 +// 命令非零退出(ExecuteExitError)已有 exec 对齐的正文,不再追加「执行未正常结束」。 func friendlyEinoExecuteInvokeTail(invokeErr error) string { if invokeErr == nil { return "" } + var exitErr *ExecuteExitError + if errors.As(invokeErr, &exitErr) { + return "" + } if errors.Is(invokeErr, context.DeadlineExceeded) { return einoExecuteTimeoutUserHint() } + if errors.Is(invokeErr, context.Canceled) { + return "" + } + if strings.Contains(invokeErr.Error(), "shell inactivity timeout") { + return "" + } return "[执行未正常结束] " + invokeErr.Error() } +// einoToolResultIsError 统一判断 Eino 工具结果是否应标记为错误(与 MCP exec 的 IsError 对齐)。 +func einoToolResultIsError(toolName, content string) bool { + if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { + return true + } + if strings.TrimSpace(toolName) == "execute" && security.IsCommandFailureResult(content) { + return true + } + return false +} + +// einoToolResultBody 去掉工具错误前缀,返回展示/持久化正文。 +func einoToolResultBody(content string) string { + if strings.HasPrefix(content, einomcp.ToolErrorPrefix) { + return strings.TrimPrefix(content, einomcp.ToolErrorPrefix) + } + return content +} + // nextAgentEventWithContext 在 ctx 取消时不再无限阻塞于 iter.Next()(工具执行/模型推理期间常见)。 func nextAgentEventWithContext(ctx context.Context, iter *adk.AsyncIterator[*adk.AgentEvent]) (ev *adk.AgentEvent, ok bool, ctxErr error) { if iter == nil { diff --git a/internal/multiagent/eino_execute_failure_format_test.go b/internal/multiagent/eino_execute_failure_format_test.go new file mode 100644 index 00000000..c83e2853 --- /dev/null +++ b/internal/multiagent/eino_execute_failure_format_test.go @@ -0,0 +1,114 @@ +package multiagent + +import ( + "context" + "errors" + "io" + "strings" + "testing" + + "cyberstrike-ai/internal/einomcp" + "cyberstrike-ai/internal/security" + + "github.com/cloudwego/eino/adk/filesystem" + "github.com/cloudwego/eino/schema" +) + +type mockStreamingShellExitFail struct { + output string + code int +} + +func (m *mockStreamingShellExitFail) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { + outR, outW := schema.Pipe[*filesystem.ExecuteResponse](4) + go func() { + defer outW.Close() + if m.output != "" { + _ = outW.Send(&filesystem.ExecuteResponse{Output: m.output}, nil) + } + code := m.code + _ = outW.Send(&filesystem.ExecuteResponse{ExitCode: &code}, nil) + }() + return outR, nil +} + +func TestEinoStreamingShellWrap_CommandFailureFormat(t *testing.T) { + inner := &mockStreamingShellExitFail{ + output: "sudo: a password is required\n", + code: 1, + } + notify := einomcp.NewToolInvokeNotifyHolder() + var firedBody string + var firedSuccess bool + var firedErr error + notify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) { + firedBody = content + firedSuccess = success + firedErr = invokeErr + }) + wrap := &einoStreamingShellWrap{inner: inner, invokeNotify: notify} + sr, err := wrap.ExecuteStreaming(context.Background(), &filesystem.ExecuteRequest{Command: "sudo whoami"}) + if err != nil { + t.Fatalf("ExecuteStreaming: %v", err) + } + defer sr.Close() + + var stream strings.Builder + for { + resp, rerr := sr.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + t.Fatalf("recv: %v", rerr) + } + if resp != nil { + stream.WriteString(resp.Output) + } + } + + if firedSuccess { + t.Fatal("expected success=false") + } + var exitErr *ExecuteExitError + if !errors.As(firedErr, &exitErr) || exitErr.Code != 1 { + t.Fatalf("expected ExecuteExitError code 1, got %v", firedErr) + } + if !strings.HasPrefix(firedBody, einomcp.ToolErrorPrefix) { + t.Fatalf("missing tool error prefix: %q", firedBody) + } + body := strings.TrimPrefix(firedBody, einomcp.ToolErrorPrefix) + if body != security.FormatCommandFailureResult(1, "sudo: a password is required\n") { + t.Fatalf("fire body = %q", body) + } + if !strings.Contains(stream.String(), "sudo:") { + t.Fatalf("stream missing sudo output: %q", stream.String()) + } + if strings.Contains(stream.String(), "command exited with non-zero") { + t.Fatalf("stream has legacy noise: %q", stream.String()) + } + if strings.Contains(stream.String(), "执行未正常结束") { + t.Fatalf("stream has abnormal tail: %q", stream.String()) + } + if !security.IsCommandFailureResult(stream.String()) { + t.Fatalf("stream missing failure status line: %q", stream.String()) + } + if tail := friendlyEinoExecuteInvokeTail(firedErr); tail != "" { + t.Fatalf("unexpected invoke tail: %q", tail) + } + if !einoToolResultIsError("execute", firedBody) { + t.Fatal("expected isError for execute failure") + } +} + +func TestFriendlyEinoExecuteInvokeTail(t *testing.T) { + if friendlyEinoExecuteInvokeTail(&ExecuteExitError{Code: 1}) != "" { + t.Fatal("exit error should not get abnormal tail") + } + if !strings.Contains(friendlyEinoExecuteInvokeTail(context.DeadlineExceeded), "Timed out") { + t.Fatal("deadline should get timeout hint") + } + if friendlyEinoExecuteInvokeTail(errors.New("broken pipe")) == "" { + t.Fatal("unexpected error should get tail") + } +} diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index 2d2e8dae..51232d16 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -251,9 +251,13 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if resp.ExitCode != nil { hasExitCode = true exitCode = *resp.ExitCode + continue } var appended string if resp.Output != "" { + if security.IsLegacyShellExitNoise(resp.Output) { + continue + } if idleWatch != nil { idleWatch.Bump() } @@ -274,7 +278,7 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi if success && hasExitCode && exitCode != 0 { success = false - invokeErr = fmt.Errorf("execute exited with code %d", exitCode) + invokeErr = &ExecuteExitError{Code: exitCode} } // WithTimeout 触发后,子进程常被信号结束,local 侧多报 exit -1 / canceled,错误链里不一定带 DeadlineExceeded。 // 用执行所用 ctx 归一化,便于 UI 展示「超时」而非含糊的 -1。 @@ -314,11 +318,21 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi _ = outW.Send(&filesystem.ExecuteResponse{Output: text + "\n"}, nil) } } + rawOutput := sb.String() + fireBody := rawOutput + if !success && hasExitCode && exitCode != 0 { + statusLine := security.ExecuteFailureStatusLine(exitCode) + if !strings.Contains(rawOutput, "命令执行失败:") { + _ = outW.Send(&filesystem.ExecuteResponse{Output: statusLine}, nil) + sb.WriteString(statusLine) + } + fireBody = einomcp.ToolErrorPrefix + security.FormatCommandFailureResult(exitCode, rawOutput) + } if w.finishMonitor != nil { w.finishMonitor(execID, toolCallID, command, sb.String(), success, invokeErr) } if w.invokeNotify != nil { - w.invokeNotify.Fire(toolCallID, "execute", agentTag, success, sb.String(), invokeErr) + w.invokeNotify.Fire(toolCallID, "execute", agentTag, success, fireBody, invokeErr) } outW.Close() }(sr, userCmd, execCancel, timeoutCancel, execCtx, convID, execReg, toolRunReg, monitorExecID, tid, w.shellNoOutputTimeoutSec) diff --git a/internal/multiagent/eino_skills.go b/internal/multiagent/eino_skills.go index d629d30c..1af6dbda 100644 --- a/internal/multiagent/eino_skills.go +++ b/internal/multiagent/eino_skills.go @@ -9,6 +9,7 @@ import ( "cyberstrike-ai/internal/config" "cyberstrike-ai/internal/einomcp" + "cyberstrike-ai/internal/security" localbk "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" @@ -93,7 +94,7 @@ func subAgentFilesystemMiddleware( return filesystem.New(ctx, &filesystem.MiddlewareConfig{ Backend: loc, StreamingShell: &einoStreamingShellWrap{ - inner: loc, + inner: security.NewEinoStreamingShell(), invokeNotify: invokeNotify, einoAgentName: strings.TrimSpace(einoAgentName), outputChunk: outputChunk, diff --git a/internal/multiagent/eino_tool_name_injection.go b/internal/multiagent/eino_tool_name_injection.go index 2e0fe9f8..66d75d88 100644 --- a/internal/multiagent/eino_tool_name_injection.go +++ b/internal/multiagent/eino_tool_name_injection.go @@ -46,6 +46,10 @@ func injectToolNamesOnlyInstruction(ctx context.Context, instruction string, too sb.WriteString("2) 调用具体工具前,请先确认该工具的参数要求(以当前请求中的工具定义为准);不确定时先澄清再调用。\n") sb.WriteString("3) 不要臆造不存在的工具名。\n\n") } + if s := strings.TrimSpace(injectShellToolGuidance("", names)); s != "" { + sb.WriteString(s) + sb.WriteString("\n\n") + } if s := strings.TrimSpace(instruction); s != "" { sb.WriteString(s) } diff --git a/internal/multiagent/execute_exit_error.go b/internal/multiagent/execute_exit_error.go new file mode 100644 index 00000000..d6581f6a --- /dev/null +++ b/internal/multiagent/execute_exit_error.go @@ -0,0 +1,15 @@ +package multiagent + +import "fmt" + +// ExecuteExitError 表示 execute 命令非零退出(预期失败,非超时/中断/流异常)。 +type ExecuteExitError struct { + Code int +} + +func (e *ExecuteExitError) Error() string { + if e == nil { + return "exit status unknown" + } + return fmt.Sprintf("exit status %d", e.Code) +} diff --git a/internal/multiagent/orchestrator_instruction.go b/internal/multiagent/orchestrator_instruction.go index a9da5c4c..721542ef 100644 --- a/internal/multiagent/orchestrator_instruction.go +++ b/internal/multiagent/orchestrator_instruction.go @@ -6,6 +6,7 @@ import ( "cyberstrike-ai/internal/agents" "cyberstrike-ai/internal/config" "cyberstrike-ai/internal/project" + "cyberstrike-ai/internal/projectprompt" ) // DefaultPlanExecuteOrchestratorInstruction 当未配置 plan_execute 专用 Markdown / YAML 时的内置主代理(规划/重规划侧)提示。 @@ -122,7 +123,9 @@ func DefaultPlanExecuteOrchestratorInstruction() string { ## 表达 -在调用工具或给出计划变更前,用 2~5 句中文说明当前决策依据与期望证据形态;最终对用户交付结构化结论(发现摘要、证据、风险、下一步)。` +在调用工具或给出计划变更前,用 2~5 句中文说明当前决策依据与期望证据形态;最终对用户交付结构化结论(发现摘要、证据、风险、下一步)。 + +` + projectprompt.ShellExecExecuteGuidanceSection() } // DefaultSupervisorOrchestratorInstruction 当未配置 supervisor 专用 Markdown / YAML 时的内置监督者提示(transfer / exit 说明仍由运行时在末尾追加)。 diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index ed568443..de40d160 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -20,6 +20,7 @@ import ( "cyberstrike-ai/internal/openai" "cyberstrike-ai/internal/project" "cyberstrike-ai/internal/reasoning" + "cyberstrike-ai/internal/security" einoopenai "github.com/cloudwego/eino-ext/components/model/openai" "github.com/cloudwego/eino/adk" @@ -358,7 +359,7 @@ func RunDeepAgent( if einoLoc != nil && einoFSTools { deepBackend = einoLoc deepShell = &einoStreamingShellWrap{ - inner: einoLoc, + inner: security.NewEinoStreamingShell(), invokeNotify: toolInvokeNotify, einoAgentName: orchestratorName, outputChunk: nil, diff --git a/internal/multiagent/shell_tool_guidance.go b/internal/multiagent/shell_tool_guidance.go new file mode 100644 index 00000000..0eec25bf --- /dev/null +++ b/internal/multiagent/shell_tool_guidance.go @@ -0,0 +1,33 @@ +package multiagent + +import ( + "strings" + + "cyberstrike-ai/internal/projectprompt" +) + +func shellToolsPresent(toolNames []string) bool { + for _, n := range toolNames { + switch strings.ToLower(strings.TrimSpace(n)) { + case "exec", "execute": + return true + } + } + return false +} + +// injectShellToolGuidance 在系统提示末尾追加 exec/execute 分工(仅当工具列表含 exec 或 execute)。 +func injectShellToolGuidance(instruction string, toolNames []string) string { + if !shellToolsPresent(toolNames) { + return instruction + } + block := strings.TrimSpace(projectprompt.ShellExecExecuteGuidanceSection()) + if block == "" { + return instruction + } + s := strings.TrimSpace(instruction) + if s == "" { + return block + } + return s + "\n\n" + block +} diff --git a/internal/multiagent/shell_tool_guidance_test.go b/internal/multiagent/shell_tool_guidance_test.go new file mode 100644 index 00000000..91bd2730 --- /dev/null +++ b/internal/multiagent/shell_tool_guidance_test.go @@ -0,0 +1,17 @@ +package multiagent + +import ( + "strings" + "testing" +) + +func TestInjectShellToolGuidance(t *testing.T) { + got := injectShellToolGuidance("base", []string{"nmap"}) + if got != "base" { + t.Fatalf("expected unchanged, got %q", got) + } + got = injectShellToolGuidance("base", []string{"exec", "nmap"}) + if !strings.Contains(got, "exec/execute") || !strings.Contains(got, "base") { + t.Fatalf("expected shell guidance appended, got %q", got) + } +}