diff --git a/internal/agent/agent.go b/internal/agent/agent.go index 3a4e7f0d..efcbb899 100644 --- a/internal/agent/agent.go +++ b/internal/agent/agent.go @@ -1909,6 +1909,15 @@ func (a *Agent) ExecuteMCPToolForConversation(ctx context.Context, conversationI return a.executeToolViaMCP(ctx, toolName, args) } +// RecordLocalToolExecution 将非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId。 +// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。 +func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string { + if a == nil || a.mcpServer == nil { + return "" + } + return a.mcpServer.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr) +} + // CancelMCPToolExecutionWithNote 取消一次进行中的 MCP 工具(先内部后外部),与监控页「终止工具」一致;note 非空时合并进返回给模型的文本。 func (a *Agent) CancelMCPToolExecutionWithNote(executionID, note string) bool { executionID = strings.TrimSpace(executionID) diff --git a/internal/einomcp/mcp_tools.go b/internal/einomcp/mcp_tools.go index 8361e5be..780e3487 100644 --- a/internal/einomcp/mcp_tools.go +++ b/internal/einomcp/mcp_tools.go @@ -23,12 +23,16 @@ type ExecutionRecorder func(executionID string) const ToolErrorPrefix = "__CYBERSTRIKE_AI_TOOL_ERROR__\n" // ToolsFromDefinitions 将单 Agent 使用的 OpenAI 风格工具定义转为 Eino InvokableTool,执行时走 Agent 的 MCP 路径。 +// invokeNotify 可选:与 runEinoADKAgentLoop 共享,在 InvokableRun 返回时触发 UI 与 pending 清理(与 ADK Tool 事件去重)。 +// einoAgentName 为该套工具所属 ChatModelAgent 的 Name(主代理或子代理 id),用于 SSE 上的 einoAgent 字段。 func ToolsFromDefinitions( ag *agent.Agent, holder *ConversationHolder, defs []agent.Tool, rec ExecutionRecorder, toolOutputChunk func(toolName, toolCallID, chunk string), + invokeNotify *ToolInvokeNotifyHolder, + einoAgentName string, ) ([]tool.BaseTool, error) { out := make([]tool.BaseTool, 0, len(defs)) for _, d := range defs { @@ -40,12 +44,14 @@ func ToolsFromDefinitions( return nil, fmt.Errorf("tool %q: %w", d.Function.Name, err) } out = append(out, &mcpBridgeTool{ - info: info, - name: d.Function.Name, - agent: ag, - holder: holder, - record: rec, - chunk: toolOutputChunk, + info: info, + name: d.Function.Name, + agent: ag, + holder: holder, + record: rec, + chunk: toolOutputChunk, + invokeNotify: invokeNotify, + einoAgentName: strings.TrimSpace(einoAgentName), }) } return out, nil @@ -77,12 +83,14 @@ func toolInfoFromDefinition(d agent.Tool) (*schema.ToolInfo, error) { } type mcpBridgeTool struct { - info *schema.ToolInfo - name string - agent *agent.Agent - holder *ConversationHolder - record ExecutionRecorder - chunk func(toolName, toolCallID, chunk string) + info *schema.ToolInfo + name string + agent *agent.Agent + holder *ConversationHolder + record ExecutionRecorder + chunk func(toolName, toolCallID, chunk string) + invokeNotify *ToolInvokeNotifyHolder + einoAgentName string } func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) { @@ -90,8 +98,27 @@ func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) { return m.info, nil } -func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) { +func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (out string, err error) { _ = opts + toolCallID := compose.GetToolCallID(ctx) + defer func() { + if m.invokeNotify == nil { + return + } + tid := strings.TrimSpace(toolCallID) + if tid == "" { + return + } + success := err == nil && !strings.HasPrefix(out, ToolErrorPrefix) + body := out + if err != nil { + success = false + } else if strings.HasPrefix(out, ToolErrorPrefix) { + success = false + body = strings.TrimPrefix(out, ToolErrorPrefix) + } + m.invokeNotify.Fire(tid, m.name, m.einoAgentName, success, body, err) + }() return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk) } diff --git a/internal/einomcp/tool_invoke_notify.go b/internal/einomcp/tool_invoke_notify.go new file mode 100644 index 00000000..126f5694 --- /dev/null +++ b/internal/einomcp/tool_invoke_notify.go @@ -0,0 +1,39 @@ +package einomcp + +import "sync" + +// ToolInvokeNotifyHolder 由 Eino run loop 在迭代开始前 Set 回调;MCP 桥在每次 InvokableRun 结束时 Fire, +// 用于在 ADK 未透出 schema.Tool 事件时仍推送 tool_result、清 pending,避免 UI 卡在「执行中」或迭代末 force-close。 +type ToolInvokeNotifyHolder struct { + mu sync.RWMutex + fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) +} + +// NewToolInvokeNotifyHolder 创建可在 ToolsFromDefinitions 与 run loop 之间共享的 holder。 +func NewToolInvokeNotifyHolder() *ToolInvokeNotifyHolder { + return &ToolInvokeNotifyHolder{} +} + +// Set 由 runEinoADKAgentLoop 在开始消费 iter 之前调用;可多次覆盖(通常仅一次)。 +func (h *ToolInvokeNotifyHolder) Set(fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)) { + if h == nil { + return + } + h.mu.Lock() + defer h.mu.Unlock() + h.fn = fn +} + +// Fire 由 mcpBridgeTool 在工具调用返回时调用;若尚未 Set 或 toolCallID 为空则忽略。 +func (h *ToolInvokeNotifyHolder) Fire(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) { + if h == nil { + return + } + h.mu.RLock() + fn := h.fn + h.mu.RUnlock() + if fn == nil { + return + } + fn(toolCallID, toolName, einoAgent, success, content, invokeErr) +} diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index f3ad7dd7..96328557 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "sync/atomic" + "unicode/utf8" "cyberstrike-ai/internal/einomcp" @@ -20,7 +21,9 @@ import ( ) // normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。 -// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现“结巴”重复。 +// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现重复文本。 +// +// 注意:与 internal/openai.normalizeStreamingDelta 保持一致。 func normalizeStreamingDelta(current, incoming string) (next, delta string) { if incoming == "" { return current, "" @@ -28,28 +31,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) { if current == "" { return incoming, incoming } - if incoming == current { - return current, "" - } - // incoming 是累计全文(包含 current 前缀) - if strings.HasPrefix(incoming, current) { + if strings.HasPrefix(incoming, current) && len(incoming) > len(current) { return incoming, incoming[len(current):] } - // incoming 完全是已输出尾部重发 - if strings.HasSuffix(current, incoming) { + if incoming == current && utf8.RuneCountInString(current) > 1 { return current, "" } - - // 处理边界重叠:current 后缀与 incoming 前缀重叠,只追加非重叠部分。 - max := len(current) - if len(incoming) < max { - max = len(incoming) - } - for overlap := max; overlap > 0; overlap-- { - if current[len(current)-overlap:] == incoming[:overlap] { - return current + incoming[overlap:], incoming[overlap:] - } - } return current + incoming, incoming } @@ -83,6 +70,9 @@ type einoADKRunLoopArgs struct { McpIDsMu *sync.Mutex McpIDs *[]string + // ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 Set,MCP 桥 Fire 以补全 tool_result。 + ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder + DA adk.Agent // EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。 @@ -224,6 +214,63 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs pendingQueueByAgent = make(map[string][]string) } + // 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。 + var executeStdoutDupMu sync.Mutex + var pendingExecuteStdoutDup string + recordPendingExecuteStdoutDup := func(toolName, stdout string, isErr bool) { + if isErr || !strings.EqualFold(strings.TrimSpace(toolName), "execute") { + return + } + t := strings.TrimSpace(stdout) + if t == "" { + return + } + executeStdoutDupMu.Lock() + pendingExecuteStdoutDup = t + executeStdoutDupMu.Unlock() + } + + var toolResultSent sync.Map // toolCallID -> struct{};与 ADK Tool 消息去重,避免 bridge 与事件流各推一次 + if args.ToolInvokeNotify != nil { + args.ToolInvokeNotify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) { + tid := strings.TrimSpace(toolCallID) + removePendingByID(tid) + if tid == "" || progress == nil { + return + } + if _, loaded := toolResultSent.LoadOrStore(tid, struct{}{}); loaded { + return + } + isErr := !success || invokeErr != nil + body := content + if invokeErr != nil { + body = invokeErr.Error() + isErr = true + } + recordPendingExecuteStdoutDup(toolName, body, isErr) + preview := body + if len(preview) > 200 { + preview = preview[:200] + "..." + } + agentTag := strings.TrimSpace(einoAgent) + if agentTag == "" { + agentTag = orchestratorName + } + progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{ + "toolName": toolName, + "success": !isErr, + "isError": isErr, + "result": body, + "resultPreview": preview, + "toolCallId": tid, + "conversationId": conversationID, + "einoAgent": agentTag, + "einoRole": einoRoleTag(agentTag), + "source": "eino", + }) + }) + } + runnerCfg := adk.RunnerConfig{ Agent: da, EnableStreaming: true, @@ -467,6 +514,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs var subAssistantBuf string var subReplyStreamID string var mainAssistantBuf string + var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重 var reasoningBuf string var streamRecvErr error for { @@ -511,24 +559,35 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs var contentDelta string mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content) if contentDelta != "" { - if !streamHeaderSent { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - streamHeaderSent = true + if mainAssistDupTarget == "" { + executeStdoutDupMu.Lock() + if pendingExecuteStdoutDup != "" { + mainAssistDupTarget = pendingExecuteStdoutDup + } + executeStdoutDupMu.Unlock() + } + if mainAssistDupTarget != "" { + // 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流 + } else { + if !streamHeaderSent { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + streamHeaderSent = true + } + progress("response_delta", contentDelta, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) } - progress("response_delta", contentDelta, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) } } else if !streamsMainAssistant(ev.AgentName) { var subDelta string @@ -558,7 +617,43 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } if streamsMainAssistant(ev.AgentName) { - if s := strings.TrimSpace(mainAssistantBuf); s != "" { + s := strings.TrimSpace(mainAssistantBuf) + if mainAssistDupTarget != "" { + executeStdoutDupMu.Lock() + pendingExecuteStdoutDup = "" + executeStdoutDupMu.Unlock() + if s != "" && s == mainAssistDupTarget { + // 与刚展示的 execute 结果完全一致:不再发助手流式事件,仍写入轨迹与最终回复字段 + lastAssistant = s + runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) + } + } else if s != "" { + if progress != nil { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + progress("response_delta", s, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + } + lastAssistant = s + runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s) + } + } + } else if s != "" { lastAssistant = s runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil)) if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { @@ -627,26 +722,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs body := strings.TrimSpace(msg.Content) if body != "" { if streamsMainAssistant(ev.AgentName) { - if progress != nil { - progress("response_start", "", map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "messageGeneratedBy": "eino:" + ev.AgentName, - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - progress("response_delta", body, map[string]interface{}{ - "conversationId": conversationID, - "mcpExecutionIds": snapshotMCPIDs(), - "einoRole": "orchestrator", - "einoAgent": ev.AgentName, - "orchestration": orchMode, - }) - } - lastAssistant = body - if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { - lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) + executeStdoutDupMu.Lock() + dup := pendingExecuteStdoutDup + if dup != "" && body == dup { + pendingExecuteStdoutDup = "" + executeStdoutDupMu.Unlock() + lastAssistant = body + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) + } + // 非流式:与 execute 输出相同则跳过助手通道展示(msg 已在上方写入 runAccumulatedMsgs) + } else { + if dup != "" { + pendingExecuteStdoutDup = "" + } + executeStdoutDupMu.Unlock() + if progress != nil { + progress("response_start", "", map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "messageGeneratedBy": "eino:" + ev.AgentName, + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + progress("response_delta", body, map[string]interface{}{ + "conversationId": conversationID, + "mcpExecutionIds": snapshotMCPIDs(), + "einoRole": "orchestrator", + "einoAgent": ev.AgentName, + "orchestration": orchMode, + }) + } + lastAssistant = body + if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") { + lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body) + } } } else if progress != nil { progress("eino_agent_reply", body, map[string]interface{}{ @@ -702,12 +813,15 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs break } } - } else { - removePendingByID(toolCallID) } if toolCallID != "" { + removePendingByID(toolCallID) + if _, loaded := toolResultSent.LoadOrStore(toolCallID, struct{}{}); loaded { + continue + } data["toolCallId"] = toolCallID } + recordPendingExecuteStdoutDup(toolName, content, isErr) progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data) } } diff --git a/internal/multiagent/eino_execute_monitor.go b/internal/multiagent/eino_execute_monitor.go new file mode 100644 index 00000000..d2d5bca5 --- /dev/null +++ b/internal/multiagent/eino_execute_monitor.go @@ -0,0 +1,31 @@ +package multiagent + +import ( + "fmt" + + "cyberstrike-ai/internal/agent" + "cyberstrike-ai/internal/einomcp" +) + +// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId), +// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。 +func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(command, stdout string, success bool, invokeErr error) { + return func(command, stdout string, success bool, invokeErr error) { + if ag == nil || recorder == nil { + return + } + var err error + if !success { + if invokeErr != nil { + err = invokeErr + } else { + err = fmt.Errorf("execute failed") + } + } + args := map[string]interface{}{"command": command} + id := ag.RecordLocalToolExecution("execute", args, stdout, err) + if id != "" { + recorder(id) + } + } +} diff --git a/internal/multiagent/eino_execute_streaming_wrap.go b/internal/multiagent/eino_execute_streaming_wrap.go index 0824b777..392739b5 100644 --- a/internal/multiagent/eino_execute_streaming_wrap.go +++ b/internal/multiagent/eino_execute_streaming_wrap.go @@ -2,11 +2,16 @@ package multiagent import ( "context" + "errors" "fmt" + "io" + "strings" + "cyberstrike-ai/internal/einomcp" "cyberstrike-ai/internal/security" "github.com/cloudwego/eino/adk/filesystem" + "github.com/cloudwego/eino/compose" "github.com/cloudwego/eino/schema" ) @@ -14,8 +19,15 @@ import ( // 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连, // streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。 // 对「完全后台」命令自动开启 RunInBackendGround,与 local.runCmdInBackground 行为对齐。 +// +// 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire, +// 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。 type einoStreamingShellWrap struct { - inner filesystem.StreamingShell + inner filesystem.StreamingShell + invokeNotify *einomcp.ToolInvokeNotifyHolder + einoAgentName string + // recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。 + recordMonitor func(command, stdout string, success bool, invokeErr error) } func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) { @@ -26,8 +38,73 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi return w.inner.ExecuteStreaming(ctx, nil) } req := *input + cmd := strings.TrimSpace(req.Command) if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround { req.RunInBackendGround = true } - return w.inner.ExecuteStreaming(ctx, &req) + sr, err := w.inner.ExecuteStreaming(ctx, &req) + if err != nil { + return nil, err + } + tid := strings.TrimSpace(compose.GetToolCallID(ctx)) + if sr == nil || w.invokeNotify == nil || tid == "" { + return sr, nil + } + + outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32) + agentTag := strings.TrimSpace(w.einoAgentName) + + go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) { + defer inner.Close() + + var sb strings.Builder + const maxCapture = 16 * 1024 + success := true + var invokeErr error + exitCode := 0 + hasExitCode := false + + for { + resp, rerr := inner.Recv() + if errors.Is(rerr, io.EOF) { + break + } + if rerr != nil { + success = false + invokeErr = rerr + _ = outW.Send(nil, rerr) + break + } + if resp != nil { + if resp.ExitCode != nil { + hasExitCode = true + exitCode = *resp.ExitCode + } + if remain := maxCapture - sb.Len(); remain > 0 { + out := resp.Output + if len(out) > remain { + out = out[:remain] + } + sb.WriteString(out) + } + if outW.Send(resp, nil) { + success = false + invokeErr = fmt.Errorf("execute stream closed by consumer") + break + } + } + } + + if success && hasExitCode && exitCode != 0 { + success = false + invokeErr = fmt.Errorf("execute exited with code %d", exitCode) + } + if w.recordMonitor != nil { + w.recordMonitor(command, sb.String(), success, invokeErr) + } + w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr) + outW.Close() + }(sr, cmd) + + return outR, nil } diff --git a/internal/multiagent/eino_single_runner.go b/internal/multiagent/eino_single_runner.go index 0dd18efc..a586fce3 100644 --- a/internal/multiagent/eino_single_runner.go +++ b/internal/multiagent/eino_single_runner.go @@ -86,8 +86,10 @@ func RunEinoSingleChatModelAgent( }) } + toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder() + einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder) mainDefs := ag.ToolsForRole(roleTools) - mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk) + mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, einoSingleAgentName) if err != nil { return nil, err } @@ -136,7 +138,7 @@ func RunEinoSingleChatModelAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc) + fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor) if fsErr != nil { return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr) } @@ -232,6 +234,7 @@ func RunEinoSingleChatModelAgent( CheckpointDir: ma.EinoMiddleware.CheckpointDir, McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, + ToolInvokeNotify: toolInvokeNotify, DA: chatAgent, EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " + "(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", diff --git a/internal/multiagent/eino_skills.go b/internal/multiagent/eino_skills.go index df367613..5a0274ac 100644 --- a/internal/multiagent/eino_skills.go +++ b/internal/multiagent/eino_skills.go @@ -8,6 +8,7 @@ import ( "strings" "cyberstrike-ai/internal/config" + "cyberstrike-ai/internal/einomcp" localbk "github.com/cloudwego/eino-ext/adk/backend/local" "github.com/cloudwego/eino/adk" @@ -75,12 +76,23 @@ func prepareEinoSkills( // subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself // does not set Backend (fsTools false on orchestrator) but we still want tools on subs — not used; // when orchestrator has Backend, builtin FS is only on outer agent; subs need explicit FS for parity. -func subAgentFilesystemMiddleware(ctx context.Context, loc *localbk.Local) (adk.ChatModelAgentMiddleware, error) { +func subAgentFilesystemMiddleware( + ctx context.Context, + loc *localbk.Local, + invokeNotify *einomcp.ToolInvokeNotifyHolder, + einoAgentName string, + recordMonitor func(command, stdout string, success bool, invokeErr error), +) (adk.ChatModelAgentMiddleware, error) { if loc == nil { return nil, nil } return filesystem.New(ctx, &filesystem.MiddlewareConfig{ - Backend: loc, - StreamingShell: &einoStreamingShellWrap{inner: loc}, + Backend: loc, + StreamingShell: &einoStreamingShellWrap{ + inner: loc, + invokeNotify: invokeNotify, + einoAgentName: strings.TrimSpace(einoAgentName), + recordMonitor: recordMonitor, + }, }) } diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index c3ed3a88..3483ebeb 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -110,6 +110,7 @@ func RunDeepAgent( mcpIDs = append(mcpIDs, id) mcpIDsMu.Unlock() } + einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder) // 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。 snapshotMCPIDs := func() []string { @@ -120,6 +121,7 @@ func RunDeepAgent( return out } + toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder() mainDefs := ag.ToolsForRole(roleTools) toolOutputChunk := func(toolName, toolCallID, chunk string) { // When toolCallId is missing, frontend ignores tool_result_delta. @@ -137,16 +139,6 @@ func RunDeepAgent( }) } - mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk) - if err != nil { - return nil, err - } - - mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger) - if err != nil { - return nil, err - } - httpClient := &http.Client{ Timeout: 30 * time.Minute, Transport: &http.Transport{ @@ -222,7 +214,7 @@ func RunDeepAgent( } subDefs := ag.ToolsForRole(roleTools) - subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk) + subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk, toolInvokeNotify, id) if err != nil { return nil, fmt.Errorf("子代理 %q 工具: %w", id, err) } @@ -248,7 +240,7 @@ func RunDeepAgent( } if einoSkillMW != nil { if einoFSTools && einoLoc != nil { - subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc) + subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor) if fsErr != nil { return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr) } @@ -338,6 +330,16 @@ func RunDeepAgent( orchDescription = d } } + + mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, orchestratorName) + if err != nil { + return nil, err + } + mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger) + if err != nil { + return nil, err + } + orchInstruction = injectToolNamesOnlyInstruction(ctx, orchInstruction, mainTools) if logger != nil { mainNames := collectToolNames(ctx, mainTools) @@ -381,7 +383,12 @@ func RunDeepAgent( var deepShell filesystem.StreamingShell if einoLoc != nil && einoFSTools { deepBackend = einoLoc - deepShell = einoLoc + deepShell = &einoStreamingShellWrap{ + inner: einoLoc, + invokeNotify: toolInvokeNotify, + einoAgentName: orchestratorName, + recordMonitor: einoExecMonitor, + } } // noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。 @@ -438,7 +445,7 @@ func RunDeepAgent( // 构建 filesystem 中间件(与 Deep sub-agent 一致) var peFsMw adk.ChatModelAgentMiddleware if einoSkillMW != nil && einoFSTools && einoLoc != nil { - peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc) + peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor) if err != nil { return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err) } @@ -560,6 +567,7 @@ func RunDeepAgent( CheckpointDir: ma.EinoMiddleware.CheckpointDir, McpIDsMu: &mcpIDsMu, McpIDs: &mcpIDs, + ToolInvokeNotify: toolInvokeNotify, DA: da, EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " + "(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)", diff --git a/internal/openai/normalize_streaming_delta_test.go b/internal/openai/normalize_streaming_delta_test.go new file mode 100644 index 00000000..6959b590 --- /dev/null +++ b/internal/openai/normalize_streaming_delta_test.go @@ -0,0 +1,56 @@ +package openai + +import "testing" + +func TestNormalizeStreamingDelta_RepeatedCharBoundary(t *testing.T) { + // 流式在重复数字边界分片:不得把 "43" 的首字符与 "194" 尾字符误合并。 + cur, d := normalizeStreamingDelta("https://x:194", "43") + if want := "https://x:19443"; cur != want { + t.Fatalf("next: want %q got %q", want, cur) + } + if d != "43" { + t.Fatalf("delta: want %q got %q", "43", d) + } +} + +func TestNormalizeStreamingDelta_CumulativePrefix(t *testing.T) { + cur, d := normalizeStreamingDelta("今天", "今天天气") + if cur != "今天天气" || d != "天气" { + t.Fatalf("got cur=%q d=%q", cur, d) + } +} + +func TestNormalizeStreamingDelta_FullRetransmit(t *testing.T) { + cur, d := normalizeStreamingDelta("今天", "今天") + if d != "" || cur != "今天" { + t.Fatalf("got cur=%q d=%q", cur, d) + } +} + +func TestNormalizeStreamingDelta_SingleRuneRepeated(t *testing.T) { + cur, d := normalizeStreamingDelta("呀", "呀") + if want := "呀呀"; cur != want { + t.Fatalf("next: want %q got %q", want, cur) + } + if d != "呀" { + t.Fatalf("delta: want %q got %q", "呀", d) + } + cur, d = normalizeStreamingDelta("4", "4") + if want := "44"; cur != want { + t.Fatalf("next: want %q got %q", want, cur) + } + if d != "4" { + t.Fatalf("delta: want %q got %q", "4", d) + } +} + +func TestNormalizeStreamingDelta_CumulativeExtendsNumber(t *testing.T) { + // 已缓冲 "194" 后收到累计串 "19443"(注意 "1943" 并非 "19443" 的前缀,不能靠误写的中间态测 HasPrefix)。 + cur, d := normalizeStreamingDelta("194", "19443") + if want := "19443"; cur != want { + t.Fatalf("next: want %q got %q", want, cur) + } + if d != "43" { + t.Fatalf("delta: want %q got %q", "43", d) + } +} diff --git a/internal/openai/openai.go b/internal/openai/openai.go index 46e0ca9e..6e452b0a 100644 --- a/internal/openai/openai.go +++ b/internal/openai/openai.go @@ -10,6 +10,7 @@ import ( "net/http" "strings" "time" + "unicode/utf8" "cyberstrike-ai/internal/config" @@ -34,7 +35,15 @@ func (e *APIError) Error() string { } // normalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。 -// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本(结巴)。 +// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本。 +// +// 注意: +// - 不做「任意后缀与前缀重叠」合并;流式可能在重复字符边界分片("194"+"43"→"19443")。 +// - HasPrefix 仅在 incoming 严格长于 current 时视为累计全文,否则会把分片产生的第二个相同 +// 单字/单码点(叠字、44、22 等)误判为「整段重复」而吞字。 +// - incoming==current 仅当 current 长度 >1 个码点时才视为整包重发;单码点重复必须走拼接。 +// - 不再使用「current 以 incoming 结尾则丢弃」:否则 "1943"+"43" 会误吞增量(19443 显示成 1943)。 +// 若网关重复发送尾部片段,应重复送完整累计串,由 HasPrefix 分支去重。 func normalizeStreamingDelta(current, incoming string) (next, delta string) { if incoming == "" { return current, "" @@ -42,26 +51,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) { if current == "" { return incoming, incoming } - if incoming == current { - return current, "" - } - if strings.HasPrefix(incoming, current) { + if strings.HasPrefix(incoming, current) && len(incoming) > len(current) { return incoming, incoming[len(current):] } - if strings.HasSuffix(current, incoming) { + if incoming == current && utf8.RuneCountInString(current) > 1 { return current, "" } - - // 边界重叠:current 后缀与 incoming 前缀重合,仅追加非重叠部分。 - max := len(current) - if len(incoming) < max { - max = len(incoming) - } - for overlap := max; overlap > 0; overlap-- { - if current[len(current)-overlap:] == incoming[:overlap] { - return current + incoming[overlap:], incoming[overlap:] - } - } return current + incoming, incoming }