mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-15 04:51:01 +02:00
Add files via upload
This commit is contained in:
@@ -1909,6 +1909,15 @@ func (a *Agent) ExecuteMCPToolForConversation(ctx context.Context, conversationI
|
||||
return a.executeToolViaMCP(ctx, toolName, args)
|
||||
}
|
||||
|
||||
// RecordLocalToolExecution 将非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId。
|
||||
// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。
|
||||
func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
|
||||
if a == nil || a.mcpServer == nil {
|
||||
return ""
|
||||
}
|
||||
return a.mcpServer.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr)
|
||||
}
|
||||
|
||||
// CancelMCPToolExecutionWithNote 取消一次进行中的 MCP 工具(先内部后外部),与监控页「终止工具」一致;note 非空时合并进返回给模型的文本。
|
||||
func (a *Agent) CancelMCPToolExecutionWithNote(executionID, note string) bool {
|
||||
executionID = strings.TrimSpace(executionID)
|
||||
|
||||
@@ -23,12 +23,16 @@ type ExecutionRecorder func(executionID string)
|
||||
const ToolErrorPrefix = "__CYBERSTRIKE_AI_TOOL_ERROR__\n"
|
||||
|
||||
// ToolsFromDefinitions 将单 Agent 使用的 OpenAI 风格工具定义转为 Eino InvokableTool,执行时走 Agent 的 MCP 路径。
|
||||
// invokeNotify 可选:与 runEinoADKAgentLoop 共享,在 InvokableRun 返回时触发 UI 与 pending 清理(与 ADK Tool 事件去重)。
|
||||
// einoAgentName 为该套工具所属 ChatModelAgent 的 Name(主代理或子代理 id),用于 SSE 上的 einoAgent 字段。
|
||||
func ToolsFromDefinitions(
|
||||
ag *agent.Agent,
|
||||
holder *ConversationHolder,
|
||||
defs []agent.Tool,
|
||||
rec ExecutionRecorder,
|
||||
toolOutputChunk func(toolName, toolCallID, chunk string),
|
||||
invokeNotify *ToolInvokeNotifyHolder,
|
||||
einoAgentName string,
|
||||
) ([]tool.BaseTool, error) {
|
||||
out := make([]tool.BaseTool, 0, len(defs))
|
||||
for _, d := range defs {
|
||||
@@ -40,12 +44,14 @@ func ToolsFromDefinitions(
|
||||
return nil, fmt.Errorf("tool %q: %w", d.Function.Name, err)
|
||||
}
|
||||
out = append(out, &mcpBridgeTool{
|
||||
info: info,
|
||||
name: d.Function.Name,
|
||||
agent: ag,
|
||||
holder: holder,
|
||||
record: rec,
|
||||
chunk: toolOutputChunk,
|
||||
info: info,
|
||||
name: d.Function.Name,
|
||||
agent: ag,
|
||||
holder: holder,
|
||||
record: rec,
|
||||
chunk: toolOutputChunk,
|
||||
invokeNotify: invokeNotify,
|
||||
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
@@ -77,12 +83,14 @@ func toolInfoFromDefinition(d agent.Tool) (*schema.ToolInfo, error) {
|
||||
}
|
||||
|
||||
type mcpBridgeTool struct {
|
||||
info *schema.ToolInfo
|
||||
name string
|
||||
agent *agent.Agent
|
||||
holder *ConversationHolder
|
||||
record ExecutionRecorder
|
||||
chunk func(toolName, toolCallID, chunk string)
|
||||
info *schema.ToolInfo
|
||||
name string
|
||||
agent *agent.Agent
|
||||
holder *ConversationHolder
|
||||
record ExecutionRecorder
|
||||
chunk func(toolName, toolCallID, chunk string)
|
||||
invokeNotify *ToolInvokeNotifyHolder
|
||||
einoAgentName string
|
||||
}
|
||||
|
||||
func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
|
||||
@@ -90,8 +98,27 @@ func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
|
||||
return m.info, nil
|
||||
}
|
||||
|
||||
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
|
||||
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (out string, err error) {
|
||||
_ = opts
|
||||
toolCallID := compose.GetToolCallID(ctx)
|
||||
defer func() {
|
||||
if m.invokeNotify == nil {
|
||||
return
|
||||
}
|
||||
tid := strings.TrimSpace(toolCallID)
|
||||
if tid == "" {
|
||||
return
|
||||
}
|
||||
success := err == nil && !strings.HasPrefix(out, ToolErrorPrefix)
|
||||
body := out
|
||||
if err != nil {
|
||||
success = false
|
||||
} else if strings.HasPrefix(out, ToolErrorPrefix) {
|
||||
success = false
|
||||
body = strings.TrimPrefix(out, ToolErrorPrefix)
|
||||
}
|
||||
m.invokeNotify.Fire(tid, m.name, m.einoAgentName, success, body, err)
|
||||
}()
|
||||
return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk)
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
package einomcp
|
||||
|
||||
import "sync"
|
||||
|
||||
// ToolInvokeNotifyHolder 由 Eino run loop 在迭代开始前 Set 回调;MCP 桥在每次 InvokableRun 结束时 Fire,
|
||||
// 用于在 ADK 未透出 schema.Tool 事件时仍推送 tool_result、清 pending,避免 UI 卡在「执行中」或迭代末 force-close。
|
||||
type ToolInvokeNotifyHolder struct {
|
||||
mu sync.RWMutex
|
||||
fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)
|
||||
}
|
||||
|
||||
// NewToolInvokeNotifyHolder 创建可在 ToolsFromDefinitions 与 run loop 之间共享的 holder。
|
||||
func NewToolInvokeNotifyHolder() *ToolInvokeNotifyHolder {
|
||||
return &ToolInvokeNotifyHolder{}
|
||||
}
|
||||
|
||||
// Set 由 runEinoADKAgentLoop 在开始消费 iter 之前调用;可多次覆盖(通常仅一次)。
|
||||
func (h *ToolInvokeNotifyHolder) Set(fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
h.fn = fn
|
||||
}
|
||||
|
||||
// Fire 由 mcpBridgeTool 在工具调用返回时调用;若尚未 Set 或 toolCallID 为空则忽略。
|
||||
func (h *ToolInvokeNotifyHolder) Fire(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
|
||||
if h == nil {
|
||||
return
|
||||
}
|
||||
h.mu.RLock()
|
||||
fn := h.fn
|
||||
h.mu.RUnlock()
|
||||
if fn == nil {
|
||||
return
|
||||
}
|
||||
fn(toolCallID, toolName, einoAgent, success, content, invokeErr)
|
||||
}
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"unicode/utf8"
|
||||
|
||||
"cyberstrike-ai/internal/einomcp"
|
||||
|
||||
@@ -20,7 +21,9 @@ import (
|
||||
)
|
||||
|
||||
// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。
|
||||
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现“结巴”重复。
|
||||
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现重复文本。
|
||||
//
|
||||
// 注意:与 internal/openai.normalizeStreamingDelta 保持一致。
|
||||
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||
if incoming == "" {
|
||||
return current, ""
|
||||
@@ -28,28 +31,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||
if current == "" {
|
||||
return incoming, incoming
|
||||
}
|
||||
if incoming == current {
|
||||
return current, ""
|
||||
}
|
||||
// incoming 是累计全文(包含 current 前缀)
|
||||
if strings.HasPrefix(incoming, current) {
|
||||
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
|
||||
return incoming, incoming[len(current):]
|
||||
}
|
||||
// incoming 完全是已输出尾部重发
|
||||
if strings.HasSuffix(current, incoming) {
|
||||
if incoming == current && utf8.RuneCountInString(current) > 1 {
|
||||
return current, ""
|
||||
}
|
||||
|
||||
// 处理边界重叠:current 后缀与 incoming 前缀重叠,只追加非重叠部分。
|
||||
max := len(current)
|
||||
if len(incoming) < max {
|
||||
max = len(incoming)
|
||||
}
|
||||
for overlap := max; overlap > 0; overlap-- {
|
||||
if current[len(current)-overlap:] == incoming[:overlap] {
|
||||
return current + incoming[overlap:], incoming[overlap:]
|
||||
}
|
||||
}
|
||||
return current + incoming, incoming
|
||||
}
|
||||
|
||||
@@ -83,6 +70,9 @@ type einoADKRunLoopArgs struct {
|
||||
McpIDsMu *sync.Mutex
|
||||
McpIDs *[]string
|
||||
|
||||
// ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 Set,MCP 桥 Fire 以补全 tool_result。
|
||||
ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder
|
||||
|
||||
DA adk.Agent
|
||||
|
||||
// EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。
|
||||
@@ -224,6 +214,63 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
pendingQueueByAgent = make(map[string][]string)
|
||||
}
|
||||
|
||||
// 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。
|
||||
var executeStdoutDupMu sync.Mutex
|
||||
var pendingExecuteStdoutDup string
|
||||
recordPendingExecuteStdoutDup := func(toolName, stdout string, isErr bool) {
|
||||
if isErr || !strings.EqualFold(strings.TrimSpace(toolName), "execute") {
|
||||
return
|
||||
}
|
||||
t := strings.TrimSpace(stdout)
|
||||
if t == "" {
|
||||
return
|
||||
}
|
||||
executeStdoutDupMu.Lock()
|
||||
pendingExecuteStdoutDup = t
|
||||
executeStdoutDupMu.Unlock()
|
||||
}
|
||||
|
||||
var toolResultSent sync.Map // toolCallID -> struct{};与 ADK Tool 消息去重,避免 bridge 与事件流各推一次
|
||||
if args.ToolInvokeNotify != nil {
|
||||
args.ToolInvokeNotify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
|
||||
tid := strings.TrimSpace(toolCallID)
|
||||
removePendingByID(tid)
|
||||
if tid == "" || progress == nil {
|
||||
return
|
||||
}
|
||||
if _, loaded := toolResultSent.LoadOrStore(tid, struct{}{}); loaded {
|
||||
return
|
||||
}
|
||||
isErr := !success || invokeErr != nil
|
||||
body := content
|
||||
if invokeErr != nil {
|
||||
body = invokeErr.Error()
|
||||
isErr = true
|
||||
}
|
||||
recordPendingExecuteStdoutDup(toolName, body, isErr)
|
||||
preview := body
|
||||
if len(preview) > 200 {
|
||||
preview = preview[:200] + "..."
|
||||
}
|
||||
agentTag := strings.TrimSpace(einoAgent)
|
||||
if agentTag == "" {
|
||||
agentTag = orchestratorName
|
||||
}
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
|
||||
"toolName": toolName,
|
||||
"success": !isErr,
|
||||
"isError": isErr,
|
||||
"result": body,
|
||||
"resultPreview": preview,
|
||||
"toolCallId": tid,
|
||||
"conversationId": conversationID,
|
||||
"einoAgent": agentTag,
|
||||
"einoRole": einoRoleTag(agentTag),
|
||||
"source": "eino",
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
runnerCfg := adk.RunnerConfig{
|
||||
Agent: da,
|
||||
EnableStreaming: true,
|
||||
@@ -467,6 +514,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
var subAssistantBuf string
|
||||
var subReplyStreamID string
|
||||
var mainAssistantBuf string
|
||||
var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重
|
||||
var reasoningBuf string
|
||||
var streamRecvErr error
|
||||
for {
|
||||
@@ -511,24 +559,35 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
var contentDelta string
|
||||
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
|
||||
if contentDelta != "" {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
streamHeaderSent = true
|
||||
if mainAssistDupTarget == "" {
|
||||
executeStdoutDupMu.Lock()
|
||||
if pendingExecuteStdoutDup != "" {
|
||||
mainAssistDupTarget = pendingExecuteStdoutDup
|
||||
}
|
||||
executeStdoutDupMu.Unlock()
|
||||
}
|
||||
if mainAssistDupTarget != "" {
|
||||
// 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流
|
||||
} else {
|
||||
if !streamHeaderSent {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
streamHeaderSent = true
|
||||
}
|
||||
progress("response_delta", contentDelta, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
progress("response_delta", contentDelta, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
} else if !streamsMainAssistant(ev.AgentName) {
|
||||
var subDelta string
|
||||
@@ -558,7 +617,43 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
}
|
||||
}
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if s := strings.TrimSpace(mainAssistantBuf); s != "" {
|
||||
s := strings.TrimSpace(mainAssistantBuf)
|
||||
if mainAssistDupTarget != "" {
|
||||
executeStdoutDupMu.Lock()
|
||||
pendingExecuteStdoutDup = ""
|
||||
executeStdoutDupMu.Unlock()
|
||||
if s != "" && s == mainAssistDupTarget {
|
||||
// 与刚展示的 execute 结果完全一致:不再发助手流式事件,仍写入轨迹与最终回复字段
|
||||
lastAssistant = s
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||
}
|
||||
} else if s != "" {
|
||||
if progress != nil {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("response_delta", s, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
lastAssistant = s
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||
}
|
||||
}
|
||||
} else if s != "" {
|
||||
lastAssistant = s
|
||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
@@ -627,26 +722,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
body := strings.TrimSpace(msg.Content)
|
||||
if body != "" {
|
||||
if streamsMainAssistant(ev.AgentName) {
|
||||
if progress != nil {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("response_delta", body, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
lastAssistant = body
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||
executeStdoutDupMu.Lock()
|
||||
dup := pendingExecuteStdoutDup
|
||||
if dup != "" && body == dup {
|
||||
pendingExecuteStdoutDup = ""
|
||||
executeStdoutDupMu.Unlock()
|
||||
lastAssistant = body
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||
}
|
||||
// 非流式:与 execute 输出相同则跳过助手通道展示(msg 已在上方写入 runAccumulatedMsgs)
|
||||
} else {
|
||||
if dup != "" {
|
||||
pendingExecuteStdoutDup = ""
|
||||
}
|
||||
executeStdoutDupMu.Unlock()
|
||||
if progress != nil {
|
||||
progress("response_start", "", map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
progress("response_delta", body, map[string]interface{}{
|
||||
"conversationId": conversationID,
|
||||
"mcpExecutionIds": snapshotMCPIDs(),
|
||||
"einoRole": "orchestrator",
|
||||
"einoAgent": ev.AgentName,
|
||||
"orchestration": orchMode,
|
||||
})
|
||||
}
|
||||
lastAssistant = body
|
||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||
}
|
||||
}
|
||||
} else if progress != nil {
|
||||
progress("eino_agent_reply", body, map[string]interface{}{
|
||||
@@ -702,12 +813,15 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
removePendingByID(toolCallID)
|
||||
}
|
||||
if toolCallID != "" {
|
||||
removePendingByID(toolCallID)
|
||||
if _, loaded := toolResultSent.LoadOrStore(toolCallID, struct{}{}); loaded {
|
||||
continue
|
||||
}
|
||||
data["toolCallId"] = toolCallID
|
||||
}
|
||||
recordPendingExecuteStdoutDup(toolName, content, isErr)
|
||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,31 @@
|
||||
package multiagent
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"cyberstrike-ai/internal/agent"
|
||||
"cyberstrike-ai/internal/einomcp"
|
||||
)
|
||||
|
||||
// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId),
|
||||
// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。
|
||||
func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(command, stdout string, success bool, invokeErr error) {
|
||||
return func(command, stdout string, success bool, invokeErr error) {
|
||||
if ag == nil || recorder == nil {
|
||||
return
|
||||
}
|
||||
var err error
|
||||
if !success {
|
||||
if invokeErr != nil {
|
||||
err = invokeErr
|
||||
} else {
|
||||
err = fmt.Errorf("execute failed")
|
||||
}
|
||||
}
|
||||
args := map[string]interface{}{"command": command}
|
||||
id := ag.RecordLocalToolExecution("execute", args, stdout, err)
|
||||
if id != "" {
|
||||
recorder(id)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,11 +2,16 @@ package multiagent
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"cyberstrike-ai/internal/einomcp"
|
||||
"cyberstrike-ai/internal/security"
|
||||
|
||||
"github.com/cloudwego/eino/adk/filesystem"
|
||||
"github.com/cloudwego/eino/compose"
|
||||
"github.com/cloudwego/eino/schema"
|
||||
)
|
||||
|
||||
@@ -14,8 +19,15 @@ import (
|
||||
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
|
||||
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
|
||||
// 对「完全后台」命令自动开启 RunInBackendGround,与 local.runCmdInBackground 行为对齐。
|
||||
//
|
||||
// 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire,
|
||||
// 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。
|
||||
type einoStreamingShellWrap struct {
|
||||
inner filesystem.StreamingShell
|
||||
inner filesystem.StreamingShell
|
||||
invokeNotify *einomcp.ToolInvokeNotifyHolder
|
||||
einoAgentName string
|
||||
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。
|
||||
recordMonitor func(command, stdout string, success bool, invokeErr error)
|
||||
}
|
||||
|
||||
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||
@@ -26,8 +38,73 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
||||
return w.inner.ExecuteStreaming(ctx, nil)
|
||||
}
|
||||
req := *input
|
||||
cmd := strings.TrimSpace(req.Command)
|
||||
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
|
||||
req.RunInBackendGround = true
|
||||
}
|
||||
return w.inner.ExecuteStreaming(ctx, &req)
|
||||
sr, err := w.inner.ExecuteStreaming(ctx, &req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
|
||||
if sr == nil || w.invokeNotify == nil || tid == "" {
|
||||
return sr, nil
|
||||
}
|
||||
|
||||
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
|
||||
agentTag := strings.TrimSpace(w.einoAgentName)
|
||||
|
||||
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) {
|
||||
defer inner.Close()
|
||||
|
||||
var sb strings.Builder
|
||||
const maxCapture = 16 * 1024
|
||||
success := true
|
||||
var invokeErr error
|
||||
exitCode := 0
|
||||
hasExitCode := false
|
||||
|
||||
for {
|
||||
resp, rerr := inner.Recv()
|
||||
if errors.Is(rerr, io.EOF) {
|
||||
break
|
||||
}
|
||||
if rerr != nil {
|
||||
success = false
|
||||
invokeErr = rerr
|
||||
_ = outW.Send(nil, rerr)
|
||||
break
|
||||
}
|
||||
if resp != nil {
|
||||
if resp.ExitCode != nil {
|
||||
hasExitCode = true
|
||||
exitCode = *resp.ExitCode
|
||||
}
|
||||
if remain := maxCapture - sb.Len(); remain > 0 {
|
||||
out := resp.Output
|
||||
if len(out) > remain {
|
||||
out = out[:remain]
|
||||
}
|
||||
sb.WriteString(out)
|
||||
}
|
||||
if outW.Send(resp, nil) {
|
||||
success = false
|
||||
invokeErr = fmt.Errorf("execute stream closed by consumer")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if success && hasExitCode && exitCode != 0 {
|
||||
success = false
|
||||
invokeErr = fmt.Errorf("execute exited with code %d", exitCode)
|
||||
}
|
||||
if w.recordMonitor != nil {
|
||||
w.recordMonitor(command, sb.String(), success, invokeErr)
|
||||
}
|
||||
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
|
||||
outW.Close()
|
||||
}(sr, cmd)
|
||||
|
||||
return outR, nil
|
||||
}
|
||||
|
||||
@@ -86,8 +86,10 @@ func RunEinoSingleChatModelAgent(
|
||||
})
|
||||
}
|
||||
|
||||
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
|
||||
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||
mainDefs := ag.ToolsForRole(roleTools)
|
||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
|
||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, einoSingleAgentName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -136,7 +138,7 @@ func RunEinoSingleChatModelAgent(
|
||||
}
|
||||
if einoSkillMW != nil {
|
||||
if einoFSTools && einoLoc != nil {
|
||||
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
|
||||
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor)
|
||||
if fsErr != nil {
|
||||
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
|
||||
}
|
||||
@@ -232,6 +234,7 @@ func RunEinoSingleChatModelAgent(
|
||||
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
||||
McpIDsMu: &mcpIDsMu,
|
||||
McpIDs: &mcpIDs,
|
||||
ToolInvokeNotify: toolInvokeNotify,
|
||||
DA: chatAgent,
|
||||
EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " +
|
||||
"(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"strings"
|
||||
|
||||
"cyberstrike-ai/internal/config"
|
||||
"cyberstrike-ai/internal/einomcp"
|
||||
|
||||
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
|
||||
"github.com/cloudwego/eino/adk"
|
||||
@@ -75,12 +76,23 @@ func prepareEinoSkills(
|
||||
// subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself
|
||||
// does not set Backend (fsTools false on orchestrator) but we still want tools on subs — not used;
|
||||
// when orchestrator has Backend, builtin FS is only on outer agent; subs need explicit FS for parity.
|
||||
func subAgentFilesystemMiddleware(ctx context.Context, loc *localbk.Local) (adk.ChatModelAgentMiddleware, error) {
|
||||
func subAgentFilesystemMiddleware(
|
||||
ctx context.Context,
|
||||
loc *localbk.Local,
|
||||
invokeNotify *einomcp.ToolInvokeNotifyHolder,
|
||||
einoAgentName string,
|
||||
recordMonitor func(command, stdout string, success bool, invokeErr error),
|
||||
) (adk.ChatModelAgentMiddleware, error) {
|
||||
if loc == nil {
|
||||
return nil, nil
|
||||
}
|
||||
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
|
||||
Backend: loc,
|
||||
StreamingShell: &einoStreamingShellWrap{inner: loc},
|
||||
Backend: loc,
|
||||
StreamingShell: &einoStreamingShellWrap{
|
||||
inner: loc,
|
||||
invokeNotify: invokeNotify,
|
||||
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||
recordMonitor: recordMonitor,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
@@ -110,6 +110,7 @@ func RunDeepAgent(
|
||||
mcpIDs = append(mcpIDs, id)
|
||||
mcpIDsMu.Unlock()
|
||||
}
|
||||
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||
|
||||
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
|
||||
snapshotMCPIDs := func() []string {
|
||||
@@ -120,6 +121,7 @@ func RunDeepAgent(
|
||||
return out
|
||||
}
|
||||
|
||||
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
|
||||
mainDefs := ag.ToolsForRole(roleTools)
|
||||
toolOutputChunk := func(toolName, toolCallID, chunk string) {
|
||||
// When toolCallId is missing, frontend ignores tool_result_delta.
|
||||
@@ -137,16 +139,6 @@ func RunDeepAgent(
|
||||
})
|
||||
}
|
||||
|
||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
httpClient := &http.Client{
|
||||
Timeout: 30 * time.Minute,
|
||||
Transport: &http.Transport{
|
||||
@@ -222,7 +214,7 @@ func RunDeepAgent(
|
||||
}
|
||||
|
||||
subDefs := ag.ToolsForRole(roleTools)
|
||||
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk)
|
||||
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk, toolInvokeNotify, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("子代理 %q 工具: %w", id, err)
|
||||
}
|
||||
@@ -248,7 +240,7 @@ func RunDeepAgent(
|
||||
}
|
||||
if einoSkillMW != nil {
|
||||
if einoFSTools && einoLoc != nil {
|
||||
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
|
||||
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor)
|
||||
if fsErr != nil {
|
||||
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
|
||||
}
|
||||
@@ -338,6 +330,16 @@ func RunDeepAgent(
|
||||
orchDescription = d
|
||||
}
|
||||
}
|
||||
|
||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, orchestratorName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
orchInstruction = injectToolNamesOnlyInstruction(ctx, orchInstruction, mainTools)
|
||||
if logger != nil {
|
||||
mainNames := collectToolNames(ctx, mainTools)
|
||||
@@ -381,7 +383,12 @@ func RunDeepAgent(
|
||||
var deepShell filesystem.StreamingShell
|
||||
if einoLoc != nil && einoFSTools {
|
||||
deepBackend = einoLoc
|
||||
deepShell = einoLoc
|
||||
deepShell = &einoStreamingShellWrap{
|
||||
inner: einoLoc,
|
||||
invokeNotify: toolInvokeNotify,
|
||||
einoAgentName: orchestratorName,
|
||||
recordMonitor: einoExecMonitor,
|
||||
}
|
||||
}
|
||||
|
||||
// noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。
|
||||
@@ -438,7 +445,7 @@ func RunDeepAgent(
|
||||
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
|
||||
var peFsMw adk.ChatModelAgentMiddleware
|
||||
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
|
||||
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc)
|
||||
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
|
||||
}
|
||||
@@ -560,6 +567,7 @@ func RunDeepAgent(
|
||||
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
||||
McpIDsMu: &mcpIDsMu,
|
||||
McpIDs: &mcpIDs,
|
||||
ToolInvokeNotify: toolInvokeNotify,
|
||||
DA: da,
|
||||
EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " +
|
||||
"(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
||||
|
||||
@@ -0,0 +1,56 @@
|
||||
package openai
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestNormalizeStreamingDelta_RepeatedCharBoundary(t *testing.T) {
|
||||
// 流式在重复数字边界分片:不得把 "43" 的首字符与 "194" 尾字符误合并。
|
||||
cur, d := normalizeStreamingDelta("https://x:194", "43")
|
||||
if want := "https://x:19443"; cur != want {
|
||||
t.Fatalf("next: want %q got %q", want, cur)
|
||||
}
|
||||
if d != "43" {
|
||||
t.Fatalf("delta: want %q got %q", "43", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeStreamingDelta_CumulativePrefix(t *testing.T) {
|
||||
cur, d := normalizeStreamingDelta("今天", "今天天气")
|
||||
if cur != "今天天气" || d != "天气" {
|
||||
t.Fatalf("got cur=%q d=%q", cur, d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeStreamingDelta_FullRetransmit(t *testing.T) {
|
||||
cur, d := normalizeStreamingDelta("今天", "今天")
|
||||
if d != "" || cur != "今天" {
|
||||
t.Fatalf("got cur=%q d=%q", cur, d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeStreamingDelta_SingleRuneRepeated(t *testing.T) {
|
||||
cur, d := normalizeStreamingDelta("呀", "呀")
|
||||
if want := "呀呀"; cur != want {
|
||||
t.Fatalf("next: want %q got %q", want, cur)
|
||||
}
|
||||
if d != "呀" {
|
||||
t.Fatalf("delta: want %q got %q", "呀", d)
|
||||
}
|
||||
cur, d = normalizeStreamingDelta("4", "4")
|
||||
if want := "44"; cur != want {
|
||||
t.Fatalf("next: want %q got %q", want, cur)
|
||||
}
|
||||
if d != "4" {
|
||||
t.Fatalf("delta: want %q got %q", "4", d)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNormalizeStreamingDelta_CumulativeExtendsNumber(t *testing.T) {
|
||||
// 已缓冲 "194" 后收到累计串 "19443"(注意 "1943" 并非 "19443" 的前缀,不能靠误写的中间态测 HasPrefix)。
|
||||
cur, d := normalizeStreamingDelta("194", "19443")
|
||||
if want := "19443"; cur != want {
|
||||
t.Fatalf("next: want %q got %q", want, cur)
|
||||
}
|
||||
if d != "43" {
|
||||
t.Fatalf("delta: want %q got %q", "43", d)
|
||||
}
|
||||
}
|
||||
+12
-17
@@ -10,6 +10,7 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
"unicode/utf8"
|
||||
|
||||
"cyberstrike-ai/internal/config"
|
||||
|
||||
@@ -34,7 +35,15 @@ func (e *APIError) Error() string {
|
||||
}
|
||||
|
||||
// normalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。
|
||||
// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本(结巴)。
|
||||
// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本。
|
||||
//
|
||||
// 注意:
|
||||
// - 不做「任意后缀与前缀重叠」合并;流式可能在重复字符边界分片("194"+"43"→"19443")。
|
||||
// - HasPrefix 仅在 incoming 严格长于 current 时视为累计全文,否则会把分片产生的第二个相同
|
||||
// 单字/单码点(叠字、44、22 等)误判为「整段重复」而吞字。
|
||||
// - incoming==current 仅当 current 长度 >1 个码点时才视为整包重发;单码点重复必须走拼接。
|
||||
// - 不再使用「current 以 incoming 结尾则丢弃」:否则 "1943"+"43" 会误吞增量(19443 显示成 1943)。
|
||||
// 若网关重复发送尾部片段,应重复送完整累计串,由 HasPrefix 分支去重。
|
||||
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||
if incoming == "" {
|
||||
return current, ""
|
||||
@@ -42,26 +51,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||
if current == "" {
|
||||
return incoming, incoming
|
||||
}
|
||||
if incoming == current {
|
||||
return current, ""
|
||||
}
|
||||
if strings.HasPrefix(incoming, current) {
|
||||
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
|
||||
return incoming, incoming[len(current):]
|
||||
}
|
||||
if strings.HasSuffix(current, incoming) {
|
||||
if incoming == current && utf8.RuneCountInString(current) > 1 {
|
||||
return current, ""
|
||||
}
|
||||
|
||||
// 边界重叠:current 后缀与 incoming 前缀重合,仅追加非重叠部分。
|
||||
max := len(current)
|
||||
if len(incoming) < max {
|
||||
max = len(incoming)
|
||||
}
|
||||
for overlap := max; overlap > 0; overlap-- {
|
||||
if current[len(current)-overlap:] == incoming[:overlap] {
|
||||
return current + incoming[overlap:], incoming[overlap:]
|
||||
}
|
||||
}
|
||||
return current + incoming, incoming
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user