mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-05-18 14:04:52 +02:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| df2506b651 | |||
| efe9172f85 | |||
| b788bc6dab | |||
| 9134f2bbcb | |||
| d76cf2a162 | |||
| 2f96feb98f | |||
| a374c3950c | |||
| a93e3455fa |
+1
-1
@@ -10,7 +10,7 @@
|
|||||||
# ============================================
|
# ============================================
|
||||||
|
|
||||||
# 前端显示的版本号(可选,不填则显示默认版本)
|
# 前端显示的版本号(可选,不填则显示默认版本)
|
||||||
version: "v1.6.5"
|
version: "v1.6.6"
|
||||||
# 服务器配置
|
# 服务器配置
|
||||||
server:
|
server:
|
||||||
host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口
|
host: 0.0.0.0 # 监听地址,0.0.0.0 表示监听所有网络接口
|
||||||
|
|||||||
@@ -1909,6 +1909,15 @@ func (a *Agent) ExecuteMCPToolForConversation(ctx context.Context, conversationI
|
|||||||
return a.executeToolViaMCP(ctx, toolName, args)
|
return a.executeToolViaMCP(ctx, toolName, args)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordLocalToolExecution 将非 CallTool 路径完成的工具调用写入 MCP 监控库(与 CallTool 落库一致),返回 executionId。
|
||||||
|
// 用于 Eino filesystem execute 等场景,使助手气泡「渗透测试详情」与常规 MCP 一致可点进监控。
|
||||||
|
func (a *Agent) RecordLocalToolExecution(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
|
||||||
|
if a == nil || a.mcpServer == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return a.mcpServer.RecordCompletedToolInvocation(toolName, args, resultText, invokeErr)
|
||||||
|
}
|
||||||
|
|
||||||
// CancelMCPToolExecutionWithNote 取消一次进行中的 MCP 工具(先内部后外部),与监控页「终止工具」一致;note 非空时合并进返回给模型的文本。
|
// CancelMCPToolExecutionWithNote 取消一次进行中的 MCP 工具(先内部后外部),与监控页「终止工具」一致;note 非空时合并进返回给模型的文本。
|
||||||
func (a *Agent) CancelMCPToolExecutionWithNote(executionID, note string) bool {
|
func (a *Agent) CancelMCPToolExecutionWithNote(executionID, note string) bool {
|
||||||
executionID = strings.TrimSpace(executionID)
|
executionID = strings.TrimSpace(executionID)
|
||||||
|
|||||||
@@ -23,12 +23,16 @@ type ExecutionRecorder func(executionID string)
|
|||||||
const ToolErrorPrefix = "__CYBERSTRIKE_AI_TOOL_ERROR__\n"
|
const ToolErrorPrefix = "__CYBERSTRIKE_AI_TOOL_ERROR__\n"
|
||||||
|
|
||||||
// ToolsFromDefinitions 将单 Agent 使用的 OpenAI 风格工具定义转为 Eino InvokableTool,执行时走 Agent 的 MCP 路径。
|
// ToolsFromDefinitions 将单 Agent 使用的 OpenAI 风格工具定义转为 Eino InvokableTool,执行时走 Agent 的 MCP 路径。
|
||||||
|
// invokeNotify 可选:与 runEinoADKAgentLoop 共享,在 InvokableRun 返回时触发 UI 与 pending 清理(与 ADK Tool 事件去重)。
|
||||||
|
// einoAgentName 为该套工具所属 ChatModelAgent 的 Name(主代理或子代理 id),用于 SSE 上的 einoAgent 字段。
|
||||||
func ToolsFromDefinitions(
|
func ToolsFromDefinitions(
|
||||||
ag *agent.Agent,
|
ag *agent.Agent,
|
||||||
holder *ConversationHolder,
|
holder *ConversationHolder,
|
||||||
defs []agent.Tool,
|
defs []agent.Tool,
|
||||||
rec ExecutionRecorder,
|
rec ExecutionRecorder,
|
||||||
toolOutputChunk func(toolName, toolCallID, chunk string),
|
toolOutputChunk func(toolName, toolCallID, chunk string),
|
||||||
|
invokeNotify *ToolInvokeNotifyHolder,
|
||||||
|
einoAgentName string,
|
||||||
) ([]tool.BaseTool, error) {
|
) ([]tool.BaseTool, error) {
|
||||||
out := make([]tool.BaseTool, 0, len(defs))
|
out := make([]tool.BaseTool, 0, len(defs))
|
||||||
for _, d := range defs {
|
for _, d := range defs {
|
||||||
@@ -40,12 +44,14 @@ func ToolsFromDefinitions(
|
|||||||
return nil, fmt.Errorf("tool %q: %w", d.Function.Name, err)
|
return nil, fmt.Errorf("tool %q: %w", d.Function.Name, err)
|
||||||
}
|
}
|
||||||
out = append(out, &mcpBridgeTool{
|
out = append(out, &mcpBridgeTool{
|
||||||
info: info,
|
info: info,
|
||||||
name: d.Function.Name,
|
name: d.Function.Name,
|
||||||
agent: ag,
|
agent: ag,
|
||||||
holder: holder,
|
holder: holder,
|
||||||
record: rec,
|
record: rec,
|
||||||
chunk: toolOutputChunk,
|
chunk: toolOutputChunk,
|
||||||
|
invokeNotify: invokeNotify,
|
||||||
|
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
return out, nil
|
return out, nil
|
||||||
@@ -77,12 +83,14 @@ func toolInfoFromDefinition(d agent.Tool) (*schema.ToolInfo, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type mcpBridgeTool struct {
|
type mcpBridgeTool struct {
|
||||||
info *schema.ToolInfo
|
info *schema.ToolInfo
|
||||||
name string
|
name string
|
||||||
agent *agent.Agent
|
agent *agent.Agent
|
||||||
holder *ConversationHolder
|
holder *ConversationHolder
|
||||||
record ExecutionRecorder
|
record ExecutionRecorder
|
||||||
chunk func(toolName, toolCallID, chunk string)
|
chunk func(toolName, toolCallID, chunk string)
|
||||||
|
invokeNotify *ToolInvokeNotifyHolder
|
||||||
|
einoAgentName string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
|
func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
|
||||||
@@ -90,8 +98,27 @@ func (m *mcpBridgeTool) Info(ctx context.Context) (*schema.ToolInfo, error) {
|
|||||||
return m.info, nil
|
return m.info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
|
func (m *mcpBridgeTool) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (out string, err error) {
|
||||||
_ = opts
|
_ = opts
|
||||||
|
toolCallID := compose.GetToolCallID(ctx)
|
||||||
|
defer func() {
|
||||||
|
if m.invokeNotify == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
tid := strings.TrimSpace(toolCallID)
|
||||||
|
if tid == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
success := err == nil && !strings.HasPrefix(out, ToolErrorPrefix)
|
||||||
|
body := out
|
||||||
|
if err != nil {
|
||||||
|
success = false
|
||||||
|
} else if strings.HasPrefix(out, ToolErrorPrefix) {
|
||||||
|
success = false
|
||||||
|
body = strings.TrimPrefix(out, ToolErrorPrefix)
|
||||||
|
}
|
||||||
|
m.invokeNotify.Fire(tid, m.name, m.einoAgentName, success, body, err)
|
||||||
|
}()
|
||||||
return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk)
|
return runMCPToolInvocation(ctx, m.agent, m.holder, m.name, argumentsInJSON, m.record, m.chunk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,39 @@
|
|||||||
|
package einomcp
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// ToolInvokeNotifyHolder 由 Eino run loop 在迭代开始前 Set 回调;MCP 桥在每次 InvokableRun 结束时 Fire,
|
||||||
|
// 用于在 ADK 未透出 schema.Tool 事件时仍推送 tool_result、清 pending,避免 UI 卡在「执行中」或迭代末 force-close。
|
||||||
|
type ToolInvokeNotifyHolder struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewToolInvokeNotifyHolder 创建可在 ToolsFromDefinitions 与 run loop 之间共享的 holder。
|
||||||
|
func NewToolInvokeNotifyHolder() *ToolInvokeNotifyHolder {
|
||||||
|
return &ToolInvokeNotifyHolder{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set 由 runEinoADKAgentLoop 在开始消费 iter 之前调用;可多次覆盖(通常仅一次)。
|
||||||
|
func (h *ToolInvokeNotifyHolder) Set(fn func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error)) {
|
||||||
|
if h == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
h.fn = fn
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fire 由 mcpBridgeTool 在工具调用返回时调用;若尚未 Set 或 toolCallID 为空则忽略。
|
||||||
|
func (h *ToolInvokeNotifyHolder) Fire(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
|
||||||
|
if h == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
h.mu.RLock()
|
||||||
|
fn := h.fn
|
||||||
|
h.mu.RUnlock()
|
||||||
|
if fn == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fn(toolCallID, toolName, einoAgent, success, content, invokeErr)
|
||||||
|
}
|
||||||
@@ -883,6 +883,49 @@ func (s *Server) CallTool(ctx context.Context, toolName string, args map[string]
|
|||||||
return finalResult, executionID, nil
|
return finalResult, executionID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RecordCompletedToolInvocation 将已在其它路径完成的工具调用写入监控存储(格式与 CallTool 结束后一致),
|
||||||
|
// 用于 Eino ADK filesystem execute 等未经过 CallTool 的场景;返回 executionId 供助手消息 mcpExecutionIds 关联。
|
||||||
|
func (s *Server) RecordCompletedToolInvocation(toolName string, args map[string]interface{}, resultText string, invokeErr error) string {
|
||||||
|
if s == nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
if args == nil {
|
||||||
|
args = map[string]interface{}{}
|
||||||
|
}
|
||||||
|
executionID := uuid.New().String()
|
||||||
|
now := time.Now()
|
||||||
|
failed := invokeErr != nil
|
||||||
|
exec := &ToolExecution{
|
||||||
|
ID: executionID,
|
||||||
|
ToolName: toolName,
|
||||||
|
Arguments: args,
|
||||||
|
StartTime: now,
|
||||||
|
EndTime: &now,
|
||||||
|
Duration: 0,
|
||||||
|
}
|
||||||
|
if failed {
|
||||||
|
exec.Status = "failed"
|
||||||
|
exec.Error = invokeErr.Error()
|
||||||
|
if strings.TrimSpace(resultText) != "" {
|
||||||
|
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: resultText}}}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
exec.Status = "completed"
|
||||||
|
text := resultText
|
||||||
|
if strings.TrimSpace(text) == "" {
|
||||||
|
text = "(无输出)"
|
||||||
|
}
|
||||||
|
exec.Result = &ToolResult{Content: []Content{{Type: "text", Text: text}}}
|
||||||
|
}
|
||||||
|
if s.storage != nil {
|
||||||
|
if err := s.storage.SaveToolExecution(exec); err != nil {
|
||||||
|
s.logger.Warn("RecordCompletedToolInvocation 保存失败", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.updateStats(toolName, failed)
|
||||||
|
return executionID
|
||||||
|
}
|
||||||
|
|
||||||
// cleanupOldExecutions 清理旧的执行记录,防止内存无限增长
|
// cleanupOldExecutions 清理旧的执行记录,防止内存无限增长
|
||||||
func (s *Server) cleanupOldExecutions() {
|
func (s *Server) cleanupOldExecutions() {
|
||||||
if len(s.executions) <= s.maxExecutionsInMemory {
|
if len(s.executions) <= s.maxExecutionsInMemory {
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
"cyberstrike-ai/internal/einomcp"
|
"cyberstrike-ai/internal/einomcp"
|
||||||
|
|
||||||
@@ -20,7 +21,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。
|
// normalizeStreamingDelta 将可能是“累计片段”的 chunk 归一化为“纯增量”。
|
||||||
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现“结巴”重复。
|
// 一些模型/桥接层在流式过程中会重复发送已输出前缀,前端若直接 buffer+=chunk 会出现重复文本。
|
||||||
|
//
|
||||||
|
// 注意:与 internal/openai.normalizeStreamingDelta 保持一致。
|
||||||
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||||
if incoming == "" {
|
if incoming == "" {
|
||||||
return current, ""
|
return current, ""
|
||||||
@@ -28,28 +31,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
|||||||
if current == "" {
|
if current == "" {
|
||||||
return incoming, incoming
|
return incoming, incoming
|
||||||
}
|
}
|
||||||
if incoming == current {
|
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
|
||||||
return current, ""
|
|
||||||
}
|
|
||||||
// incoming 是累计全文(包含 current 前缀)
|
|
||||||
if strings.HasPrefix(incoming, current) {
|
|
||||||
return incoming, incoming[len(current):]
|
return incoming, incoming[len(current):]
|
||||||
}
|
}
|
||||||
// incoming 完全是已输出尾部重发
|
if incoming == current && utf8.RuneCountInString(current) > 1 {
|
||||||
if strings.HasSuffix(current, incoming) {
|
|
||||||
return current, ""
|
return current, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理边界重叠:current 后缀与 incoming 前缀重叠,只追加非重叠部分。
|
|
||||||
max := len(current)
|
|
||||||
if len(incoming) < max {
|
|
||||||
max = len(incoming)
|
|
||||||
}
|
|
||||||
for overlap := max; overlap > 0; overlap-- {
|
|
||||||
if current[len(current)-overlap:] == incoming[:overlap] {
|
|
||||||
return current + incoming[overlap:], incoming[overlap:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return current + incoming, incoming
|
return current + incoming, incoming
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -83,6 +70,9 @@ type einoADKRunLoopArgs struct {
|
|||||||
McpIDsMu *sync.Mutex
|
McpIDsMu *sync.Mutex
|
||||||
McpIDs *[]string
|
McpIDs *[]string
|
||||||
|
|
||||||
|
// ToolInvokeNotify 与 einomcp.ToolsFromDefinitions 共享:run loop 在迭代前 Set,MCP 桥 Fire 以补全 tool_result。
|
||||||
|
ToolInvokeNotify *einomcp.ToolInvokeNotifyHolder
|
||||||
|
|
||||||
DA adk.Agent
|
DA adk.Agent
|
||||||
|
|
||||||
// EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。
|
// EmptyResponseMessage 当未捕获到助手正文时的占位(多代理与单代理文案不同)。
|
||||||
@@ -224,6 +214,63 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
pendingQueueByAgent = make(map[string][]string)
|
pendingQueueByAgent = make(map[string][]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。
|
||||||
|
var executeStdoutDupMu sync.Mutex
|
||||||
|
var pendingExecuteStdoutDup string
|
||||||
|
recordPendingExecuteStdoutDup := func(toolName, stdout string, isErr bool) {
|
||||||
|
if isErr || !strings.EqualFold(strings.TrimSpace(toolName), "execute") {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t := strings.TrimSpace(stdout)
|
||||||
|
if t == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
executeStdoutDupMu.Lock()
|
||||||
|
pendingExecuteStdoutDup = t
|
||||||
|
executeStdoutDupMu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
var toolResultSent sync.Map // toolCallID -> struct{};与 ADK Tool 消息去重,避免 bridge 与事件流各推一次
|
||||||
|
if args.ToolInvokeNotify != nil {
|
||||||
|
args.ToolInvokeNotify.Set(func(toolCallID, toolName, einoAgent string, success bool, content string, invokeErr error) {
|
||||||
|
tid := strings.TrimSpace(toolCallID)
|
||||||
|
removePendingByID(tid)
|
||||||
|
if tid == "" || progress == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, loaded := toolResultSent.LoadOrStore(tid, struct{}{}); loaded {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
isErr := !success || invokeErr != nil
|
||||||
|
body := content
|
||||||
|
if invokeErr != nil {
|
||||||
|
body = invokeErr.Error()
|
||||||
|
isErr = true
|
||||||
|
}
|
||||||
|
recordPendingExecuteStdoutDup(toolName, body, isErr)
|
||||||
|
preview := body
|
||||||
|
if len(preview) > 200 {
|
||||||
|
preview = preview[:200] + "..."
|
||||||
|
}
|
||||||
|
agentTag := strings.TrimSpace(einoAgent)
|
||||||
|
if agentTag == "" {
|
||||||
|
agentTag = orchestratorName
|
||||||
|
}
|
||||||
|
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), map[string]interface{}{
|
||||||
|
"toolName": toolName,
|
||||||
|
"success": !isErr,
|
||||||
|
"isError": isErr,
|
||||||
|
"result": body,
|
||||||
|
"resultPreview": preview,
|
||||||
|
"toolCallId": tid,
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"einoAgent": agentTag,
|
||||||
|
"einoRole": einoRoleTag(agentTag),
|
||||||
|
"source": "eino",
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
runnerCfg := adk.RunnerConfig{
|
runnerCfg := adk.RunnerConfig{
|
||||||
Agent: da,
|
Agent: da,
|
||||||
EnableStreaming: true,
|
EnableStreaming: true,
|
||||||
@@ -467,6 +514,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
var subAssistantBuf string
|
var subAssistantBuf string
|
||||||
var subReplyStreamID string
|
var subReplyStreamID string
|
||||||
var mainAssistantBuf string
|
var mainAssistantBuf string
|
||||||
|
var mainAssistDupTarget string // 非空表示本段主助手流需缓冲至 EOF,与 execute 输出比对去重
|
||||||
var reasoningBuf string
|
var reasoningBuf string
|
||||||
var streamRecvErr error
|
var streamRecvErr error
|
||||||
for {
|
for {
|
||||||
@@ -511,24 +559,35 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
var contentDelta string
|
var contentDelta string
|
||||||
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
|
mainAssistantBuf, contentDelta = normalizeStreamingDelta(mainAssistantBuf, chunk.Content)
|
||||||
if contentDelta != "" {
|
if contentDelta != "" {
|
||||||
if !streamHeaderSent {
|
if mainAssistDupTarget == "" {
|
||||||
progress("response_start", "", map[string]interface{}{
|
executeStdoutDupMu.Lock()
|
||||||
"conversationId": conversationID,
|
if pendingExecuteStdoutDup != "" {
|
||||||
"mcpExecutionIds": snapshotMCPIDs(),
|
mainAssistDupTarget = pendingExecuteStdoutDup
|
||||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
}
|
||||||
"einoRole": "orchestrator",
|
executeStdoutDupMu.Unlock()
|
||||||
"einoAgent": ev.AgentName,
|
}
|
||||||
"orchestration": orchMode,
|
if mainAssistDupTarget != "" {
|
||||||
})
|
// 已展示过 tool_result,缓冲全文;EOF 后与 execute 输出相同则不再发助手流
|
||||||
streamHeaderSent = true
|
} else {
|
||||||
|
if !streamHeaderSent {
|
||||||
|
progress("response_start", "", map[string]interface{}{
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
|
streamHeaderSent = true
|
||||||
|
}
|
||||||
|
progress("response_delta", contentDelta, map[string]interface{}{
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
progress("response_delta", contentDelta, map[string]interface{}{
|
|
||||||
"conversationId": conversationID,
|
|
||||||
"mcpExecutionIds": snapshotMCPIDs(),
|
|
||||||
"einoRole": "orchestrator",
|
|
||||||
"einoAgent": ev.AgentName,
|
|
||||||
"orchestration": orchMode,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
} else if !streamsMainAssistant(ev.AgentName) {
|
} else if !streamsMainAssistant(ev.AgentName) {
|
||||||
var subDelta string
|
var subDelta string
|
||||||
@@ -558,7 +617,43 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if streamsMainAssistant(ev.AgentName) {
|
if streamsMainAssistant(ev.AgentName) {
|
||||||
if s := strings.TrimSpace(mainAssistantBuf); s != "" {
|
s := strings.TrimSpace(mainAssistantBuf)
|
||||||
|
if mainAssistDupTarget != "" {
|
||||||
|
executeStdoutDupMu.Lock()
|
||||||
|
pendingExecuteStdoutDup = ""
|
||||||
|
executeStdoutDupMu.Unlock()
|
||||||
|
if s != "" && s == mainAssistDupTarget {
|
||||||
|
// 与刚展示的 execute 结果完全一致:不再发助手流式事件,仍写入轨迹与最终回复字段
|
||||||
|
lastAssistant = s
|
||||||
|
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||||
|
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||||
|
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||||
|
}
|
||||||
|
} else if s != "" {
|
||||||
|
if progress != nil {
|
||||||
|
progress("response_start", "", map[string]interface{}{
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
|
progress("response_delta", s, map[string]interface{}{
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
lastAssistant = s
|
||||||
|
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||||
|
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||||
|
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if s != "" {
|
||||||
lastAssistant = s
|
lastAssistant = s
|
||||||
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage(s, nil))
|
||||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||||
@@ -627,26 +722,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
body := strings.TrimSpace(msg.Content)
|
body := strings.TrimSpace(msg.Content)
|
||||||
if body != "" {
|
if body != "" {
|
||||||
if streamsMainAssistant(ev.AgentName) {
|
if streamsMainAssistant(ev.AgentName) {
|
||||||
if progress != nil {
|
executeStdoutDupMu.Lock()
|
||||||
progress("response_start", "", map[string]interface{}{
|
dup := pendingExecuteStdoutDup
|
||||||
"conversationId": conversationID,
|
if dup != "" && body == dup {
|
||||||
"mcpExecutionIds": snapshotMCPIDs(),
|
pendingExecuteStdoutDup = ""
|
||||||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
executeStdoutDupMu.Unlock()
|
||||||
"einoRole": "orchestrator",
|
lastAssistant = body
|
||||||
"einoAgent": ev.AgentName,
|
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||||
"orchestration": orchMode,
|
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||||
})
|
}
|
||||||
progress("response_delta", body, map[string]interface{}{
|
// 非流式:与 execute 输出相同则跳过助手通道展示(msg 已在上方写入 runAccumulatedMsgs)
|
||||||
"conversationId": conversationID,
|
} else {
|
||||||
"mcpExecutionIds": snapshotMCPIDs(),
|
if dup != "" {
|
||||||
"einoRole": "orchestrator",
|
pendingExecuteStdoutDup = ""
|
||||||
"einoAgent": ev.AgentName,
|
}
|
||||||
"orchestration": orchMode,
|
executeStdoutDupMu.Unlock()
|
||||||
})
|
if progress != nil {
|
||||||
}
|
progress("response_start", "", map[string]interface{}{
|
||||||
lastAssistant = body
|
"conversationId": conversationID,
|
||||||
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
|
progress("response_delta", body, map[string]interface{}{
|
||||||
|
"conversationId": conversationID,
|
||||||
|
"mcpExecutionIds": snapshotMCPIDs(),
|
||||||
|
"einoRole": "orchestrator",
|
||||||
|
"einoAgent": ev.AgentName,
|
||||||
|
"orchestration": orchMode,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
lastAssistant = body
|
||||||
|
if orchMode == "plan_execute" && strings.EqualFold(strings.TrimSpace(ev.AgentName), "executor") {
|
||||||
|
lastPlanExecuteExecutor = UnwrapPlanExecuteUserText(body)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if progress != nil {
|
} else if progress != nil {
|
||||||
progress("eino_agent_reply", body, map[string]interface{}{
|
progress("eino_agent_reply", body, map[string]interface{}{
|
||||||
@@ -702,12 +813,15 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
removePendingByID(toolCallID)
|
|
||||||
}
|
}
|
||||||
if toolCallID != "" {
|
if toolCallID != "" {
|
||||||
|
removePendingByID(toolCallID)
|
||||||
|
if _, loaded := toolResultSent.LoadOrStore(toolCallID, struct{}{}); loaded {
|
||||||
|
continue
|
||||||
|
}
|
||||||
data["toolCallId"] = toolCallID
|
data["toolCallId"] = toolCallID
|
||||||
}
|
}
|
||||||
|
recordPendingExecuteStdoutDup(toolName, content, isErr)
|
||||||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -745,6 +859,11 @@ func buildEinoRunResultFromAccumulated(
|
|||||||
cleaned = UnwrapPlanExecuteUserText(cleaned)
|
cleaned = UnwrapPlanExecuteUserText(cleaned)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if cleaned == "" {
|
||||||
|
if fb := strings.TrimSpace(einoExtractFallbackAssistantFromMsgs(runAccumulatedMsgs)); fb != "" {
|
||||||
|
cleaned = fb
|
||||||
|
}
|
||||||
|
}
|
||||||
cleaned = dedupeRepeatedParagraphs(cleaned, 80)
|
cleaned = dedupeRepeatedParagraphs(cleaned, 80)
|
||||||
cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100)
|
cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100)
|
||||||
// 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。
|
// 防止超长响应导致 JSON 序列化慢或 OOM(多代理拼接大量工具输出时可能触发)。
|
||||||
@@ -771,6 +890,79 @@ func buildEinoRunResultFromAccumulated(
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// einoExtractFallbackAssistantFromMsgs 在「主通道未产出助手正文」时,从 Eino ADK 轨迹中回填用户可见回复。
|
||||||
|
// 典型场景:监督者仅调用 exit(final_result 落在 Tool 消息中),或工具结果已写入历史但 lastAssistant 未更新。
|
||||||
|
//
|
||||||
|
// 优先级:最后一次 exit 工具输出 → 最后一条含 exit 的助手 tool_calls 参数中的 final_result。
|
||||||
|
func einoExtractFallbackAssistantFromMsgs(msgs []adk.Message) string {
|
||||||
|
for i := len(msgs) - 1; i >= 0; i-- {
|
||||||
|
m := msgs[i]
|
||||||
|
if m == nil || m.Role != schema.Tool {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !strings.EqualFold(strings.TrimSpace(m.ToolName), adk.ToolInfoExit.Name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
content := strings.TrimSpace(m.Content)
|
||||||
|
if content == "" || strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
return content
|
||||||
|
}
|
||||||
|
for i := len(msgs) - 1; i >= 0; i-- {
|
||||||
|
m := msgs[i]
|
||||||
|
if m == nil || m.Role != schema.Assistant {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s := einoExtractExitFinalFromAssistantToolCalls(m); s != "" {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func einoExtractExitFinalFromAssistantToolCalls(msg *schema.Message) string {
|
||||||
|
if msg == nil || len(msg.ToolCalls) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
for i := len(msg.ToolCalls) - 1; i >= 0; i-- {
|
||||||
|
tc := msg.ToolCalls[i]
|
||||||
|
if !strings.EqualFold(strings.TrimSpace(tc.Function.Name), adk.ToolInfoExit.Name) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if s := einoParseExitFinalResultArguments(tc.Function.Arguments); s != "" {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func einoParseExitFinalResultArguments(arguments string) string {
|
||||||
|
arguments = strings.TrimSpace(arguments)
|
||||||
|
if arguments == "" {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var wrap struct {
|
||||||
|
FinalResult json.RawMessage `json:"final_result"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(arguments), &wrap); err != nil || len(wrap.FinalResult) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var s string
|
||||||
|
if err := json.Unmarshal(wrap.FinalResult, &s); err == nil {
|
||||||
|
return strings.TrimSpace(s)
|
||||||
|
}
|
||||||
|
var anyVal interface{}
|
||||||
|
if err := json.Unmarshal(wrap.FinalResult, &anyVal); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
b, err := json.Marshal(anyVal)
|
||||||
|
if err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return strings.TrimSpace(string(b))
|
||||||
|
}
|
||||||
|
|
||||||
func buildEinoCheckpointID(orchMode string) string {
|
func buildEinoCheckpointID(orchMode string) string {
|
||||||
mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode))
|
mode := sanitizeEinoPathSegment(strings.TrimSpace(orchMode))
|
||||||
if mode == "" {
|
if mode == "" {
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package multiagent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"cyberstrike-ai/internal/agent"
|
||||||
|
"cyberstrike-ai/internal/einomcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// newEinoExecuteMonitorCallback 在 Eino filesystem execute 结束时写入 MCP 监控库并 recorder(executionId),
|
||||||
|
// 与 CallTool 路径一致,供助手消息展示「渗透测试详情」芯片。
|
||||||
|
func newEinoExecuteMonitorCallback(ag *agent.Agent, recorder einomcp.ExecutionRecorder) func(command, stdout string, success bool, invokeErr error) {
|
||||||
|
return func(command, stdout string, success bool, invokeErr error) {
|
||||||
|
if ag == nil || recorder == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if !success {
|
||||||
|
if invokeErr != nil {
|
||||||
|
err = invokeErr
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("execute failed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
args := map[string]interface{}{"command": command}
|
||||||
|
id := ag.RecordLocalToolExecution("execute", args, stdout, err)
|
||||||
|
if id != "" {
|
||||||
|
recorder(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,11 +2,16 @@ package multiagent
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"cyberstrike-ai/internal/einomcp"
|
||||||
"cyberstrike-ai/internal/security"
|
"cyberstrike-ai/internal/security"
|
||||||
|
|
||||||
"github.com/cloudwego/eino/adk/filesystem"
|
"github.com/cloudwego/eino/adk/filesystem"
|
||||||
|
"github.com/cloudwego/eino/compose"
|
||||||
"github.com/cloudwego/eino/schema"
|
"github.com/cloudwego/eino/schema"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -14,8 +19,15 @@ import (
|
|||||||
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
|
// 官方 execute 工具默认走 ExecuteStreaming 且不设 RunInBackendGround;末尾带 & 时子进程仍与管道相连,
|
||||||
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
|
// streamStdout 按行读取会在无换行输出时长时间阻塞(与 MCP 工具 exec 的独立实现不同)。
|
||||||
// 对「完全后台」命令自动开启 RunInBackendGround,与 local.runCmdInBackground 行为对齐。
|
// 对「完全后台」命令自动开启 RunInBackendGround,与 local.runCmdInBackground 行为对齐。
|
||||||
|
//
|
||||||
|
// 使用 Pipe 将内层流转发给调用方:在 inner EOF 后、关闭 Pipe 前同步调用 ToolInvokeNotify.Fire,
|
||||||
|
// 保证 run loop 在模型开始下一轮输出前已记录 execute 结果(用于 UI 与「重复助手复述」去重)。
|
||||||
type einoStreamingShellWrap struct {
|
type einoStreamingShellWrap struct {
|
||||||
inner filesystem.StreamingShell
|
inner filesystem.StreamingShell
|
||||||
|
invokeNotify *einomcp.ToolInvokeNotifyHolder
|
||||||
|
einoAgentName string
|
||||||
|
// recordMonitor 在 execute 流结束后写入 tool_executions 并 recorder(executionId),使「渗透测试详情」与常规 MCP 一致。
|
||||||
|
recordMonitor func(command, stdout string, success bool, invokeErr error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *filesystem.ExecuteRequest) (*schema.StreamReader[*filesystem.ExecuteResponse], error) {
|
||||||
@@ -26,8 +38,73 @@ func (w *einoStreamingShellWrap) ExecuteStreaming(ctx context.Context, input *fi
|
|||||||
return w.inner.ExecuteStreaming(ctx, nil)
|
return w.inner.ExecuteStreaming(ctx, nil)
|
||||||
}
|
}
|
||||||
req := *input
|
req := *input
|
||||||
|
cmd := strings.TrimSpace(req.Command)
|
||||||
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
|
if security.IsBackgroundShellCommand(req.Command) && !req.RunInBackendGround {
|
||||||
req.RunInBackendGround = true
|
req.RunInBackendGround = true
|
||||||
}
|
}
|
||||||
return w.inner.ExecuteStreaming(ctx, &req)
|
sr, err := w.inner.ExecuteStreaming(ctx, &req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tid := strings.TrimSpace(compose.GetToolCallID(ctx))
|
||||||
|
if sr == nil || w.invokeNotify == nil || tid == "" {
|
||||||
|
return sr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
outR, outW := schema.Pipe[*filesystem.ExecuteResponse](32)
|
||||||
|
agentTag := strings.TrimSpace(w.einoAgentName)
|
||||||
|
|
||||||
|
go func(inner *schema.StreamReader[*filesystem.ExecuteResponse], command string) {
|
||||||
|
defer inner.Close()
|
||||||
|
|
||||||
|
var sb strings.Builder
|
||||||
|
const maxCapture = 16 * 1024
|
||||||
|
success := true
|
||||||
|
var invokeErr error
|
||||||
|
exitCode := 0
|
||||||
|
hasExitCode := false
|
||||||
|
|
||||||
|
for {
|
||||||
|
resp, rerr := inner.Recv()
|
||||||
|
if errors.Is(rerr, io.EOF) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if rerr != nil {
|
||||||
|
success = false
|
||||||
|
invokeErr = rerr
|
||||||
|
_ = outW.Send(nil, rerr)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if resp != nil {
|
||||||
|
if resp.ExitCode != nil {
|
||||||
|
hasExitCode = true
|
||||||
|
exitCode = *resp.ExitCode
|
||||||
|
}
|
||||||
|
if remain := maxCapture - sb.Len(); remain > 0 {
|
||||||
|
out := resp.Output
|
||||||
|
if len(out) > remain {
|
||||||
|
out = out[:remain]
|
||||||
|
}
|
||||||
|
sb.WriteString(out)
|
||||||
|
}
|
||||||
|
if outW.Send(resp, nil) {
|
||||||
|
success = false
|
||||||
|
invokeErr = fmt.Errorf("execute stream closed by consumer")
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if success && hasExitCode && exitCode != 0 {
|
||||||
|
success = false
|
||||||
|
invokeErr = fmt.Errorf("execute exited with code %d", exitCode)
|
||||||
|
}
|
||||||
|
if w.recordMonitor != nil {
|
||||||
|
w.recordMonitor(command, sb.String(), success, invokeErr)
|
||||||
|
}
|
||||||
|
w.invokeNotify.Fire(tid, "execute", agentTag, success, sb.String(), invokeErr)
|
||||||
|
outW.Close()
|
||||||
|
}(sr, cmd)
|
||||||
|
|
||||||
|
return outR, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,62 @@
|
|||||||
|
package multiagent
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/cloudwego/eino/schema"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEinoExtractFallbackAssistantFromMsgs_exitToolMessage(t *testing.T) {
|
||||||
|
u := schema.UserMessage("hi")
|
||||||
|
tm := schema.ToolMessage("answer for user", "call-exit-1")
|
||||||
|
tm.ToolName = "exit"
|
||||||
|
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{u, tm}); got != "answer for user" {
|
||||||
|
t.Fatalf("got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEinoExtractFallbackAssistantFromMsgs_lastExitWins(t *testing.T) {
|
||||||
|
msgs := []*schema.Message{
|
||||||
|
schema.UserMessage("hi"),
|
||||||
|
toolExitMsg("first", "c1"),
|
||||||
|
toolExitMsg("second", "c2"),
|
||||||
|
}
|
||||||
|
if got := einoExtractFallbackAssistantFromMsgs(msgs); got != "second" {
|
||||||
|
t.Fatalf("got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEinoExtractFallbackAssistantFromMsgs_fromAssistantToolCalls(t *testing.T) {
|
||||||
|
m := schema.AssistantMessage("", []schema.ToolCall{{
|
||||||
|
ID: "x",
|
||||||
|
Type: "function",
|
||||||
|
Function: schema.FunctionCall{
|
||||||
|
Name: "exit",
|
||||||
|
Arguments: `{"final_result":"from args"}`,
|
||||||
|
},
|
||||||
|
}})
|
||||||
|
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{m}); got != "from args" {
|
||||||
|
t.Fatalf("got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestEinoExtractFallbackAssistantFromMsgs_prefersToolOverEarlierAssistant(t *testing.T) {
|
||||||
|
asst := schema.AssistantMessage("", []schema.ToolCall{{
|
||||||
|
ID: "x",
|
||||||
|
Type: "function",
|
||||||
|
Function: schema.FunctionCall{
|
||||||
|
Name: "exit",
|
||||||
|
Arguments: `{"final_result":"from args"}`,
|
||||||
|
},
|
||||||
|
}})
|
||||||
|
tool := toolExitMsg("from tool", "c1")
|
||||||
|
if got := einoExtractFallbackAssistantFromMsgs([]*schema.Message{asst, tool}); got != "from tool" {
|
||||||
|
t.Fatalf("got %q", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func toolExitMsg(content, callID string) *schema.Message {
|
||||||
|
m := schema.ToolMessage(content, callID)
|
||||||
|
m.ToolName = "exit"
|
||||||
|
return m
|
||||||
|
}
|
||||||
@@ -86,8 +86,10 @@ func RunEinoSingleChatModelAgent(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
|
||||||
|
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||||
mainDefs := ag.ToolsForRole(roleTools)
|
mainDefs := ag.ToolsForRole(roleTools)
|
||||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
|
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, einoSingleAgentName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -136,7 +138,7 @@ func RunEinoSingleChatModelAgent(
|
|||||||
}
|
}
|
||||||
if einoSkillMW != nil {
|
if einoSkillMW != nil {
|
||||||
if einoFSTools && einoLoc != nil {
|
if einoFSTools && einoLoc != nil {
|
||||||
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
|
fsMw, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, einoSingleAgentName, einoExecMonitor)
|
||||||
if fsErr != nil {
|
if fsErr != nil {
|
||||||
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
|
return nil, fmt.Errorf("eino single filesystem 中间件: %w", fsErr)
|
||||||
}
|
}
|
||||||
@@ -232,6 +234,7 @@ func RunEinoSingleChatModelAgent(
|
|||||||
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
||||||
McpIDsMu: &mcpIDsMu,
|
McpIDsMu: &mcpIDsMu,
|
||||||
McpIDs: &mcpIDs,
|
McpIDs: &mcpIDs,
|
||||||
|
ToolInvokeNotify: toolInvokeNotify,
|
||||||
DA: chatAgent,
|
DA: chatAgent,
|
||||||
EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " +
|
EmptyResponseMessage: "(Eino ADK single-agent session completed but no assistant text was captured. Check process details or logs.) " +
|
||||||
"(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
"(Eino ADK 单代理会话已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"cyberstrike-ai/internal/config"
|
"cyberstrike-ai/internal/config"
|
||||||
|
"cyberstrike-ai/internal/einomcp"
|
||||||
|
|
||||||
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
|
localbk "github.com/cloudwego/eino-ext/adk/backend/local"
|
||||||
"github.com/cloudwego/eino/adk"
|
"github.com/cloudwego/eino/adk"
|
||||||
@@ -75,12 +76,23 @@ func prepareEinoSkills(
|
|||||||
// subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself
|
// subAgentFilesystemMiddleware returns filesystem middleware for a sub-agent when Deep itself
|
||||||
// does not set Backend (fsTools false on orchestrator) but we still want tools on subs — not used;
|
// does not set Backend (fsTools false on orchestrator) but we still want tools on subs — not used;
|
||||||
// when orchestrator has Backend, builtin FS is only on outer agent; subs need explicit FS for parity.
|
// when orchestrator has Backend, builtin FS is only on outer agent; subs need explicit FS for parity.
|
||||||
func subAgentFilesystemMiddleware(ctx context.Context, loc *localbk.Local) (adk.ChatModelAgentMiddleware, error) {
|
func subAgentFilesystemMiddleware(
|
||||||
|
ctx context.Context,
|
||||||
|
loc *localbk.Local,
|
||||||
|
invokeNotify *einomcp.ToolInvokeNotifyHolder,
|
||||||
|
einoAgentName string,
|
||||||
|
recordMonitor func(command, stdout string, success bool, invokeErr error),
|
||||||
|
) (adk.ChatModelAgentMiddleware, error) {
|
||||||
if loc == nil {
|
if loc == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
|
return filesystem.New(ctx, &filesystem.MiddlewareConfig{
|
||||||
Backend: loc,
|
Backend: loc,
|
||||||
StreamingShell: &einoStreamingShellWrap{inner: loc},
|
StreamingShell: &einoStreamingShellWrap{
|
||||||
|
inner: loc,
|
||||||
|
invokeNotify: invokeNotify,
|
||||||
|
einoAgentName: strings.TrimSpace(einoAgentName),
|
||||||
|
recordMonitor: recordMonitor,
|
||||||
|
},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -110,6 +110,7 @@ func RunDeepAgent(
|
|||||||
mcpIDs = append(mcpIDs, id)
|
mcpIDs = append(mcpIDs, id)
|
||||||
mcpIDsMu.Unlock()
|
mcpIDsMu.Unlock()
|
||||||
}
|
}
|
||||||
|
einoExecMonitor := newEinoExecuteMonitorCallback(ag, recorder)
|
||||||
|
|
||||||
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
|
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
|
||||||
snapshotMCPIDs := func() []string {
|
snapshotMCPIDs := func() []string {
|
||||||
@@ -120,6 +121,7 @@ func RunDeepAgent(
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toolInvokeNotify := einomcp.NewToolInvokeNotifyHolder()
|
||||||
mainDefs := ag.ToolsForRole(roleTools)
|
mainDefs := ag.ToolsForRole(roleTools)
|
||||||
toolOutputChunk := func(toolName, toolCallID, chunk string) {
|
toolOutputChunk := func(toolName, toolCallID, chunk string) {
|
||||||
// When toolCallId is missing, frontend ignores tool_result_delta.
|
// When toolCallId is missing, frontend ignores tool_result_delta.
|
||||||
@@ -137,16 +139,6 @@ func RunDeepAgent(
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
httpClient := &http.Client{
|
httpClient := &http.Client{
|
||||||
Timeout: 30 * time.Minute,
|
Timeout: 30 * time.Minute,
|
||||||
Transport: &http.Transport{
|
Transport: &http.Transport{
|
||||||
@@ -222,7 +214,7 @@ func RunDeepAgent(
|
|||||||
}
|
}
|
||||||
|
|
||||||
subDefs := ag.ToolsForRole(roleTools)
|
subDefs := ag.ToolsForRole(roleTools)
|
||||||
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk)
|
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk, toolInvokeNotify, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("子代理 %q 工具: %w", id, err)
|
return nil, fmt.Errorf("子代理 %q 工具: %w", id, err)
|
||||||
}
|
}
|
||||||
@@ -248,7 +240,7 @@ func RunDeepAgent(
|
|||||||
}
|
}
|
||||||
if einoSkillMW != nil {
|
if einoSkillMW != nil {
|
||||||
if einoFSTools && einoLoc != nil {
|
if einoFSTools && einoLoc != nil {
|
||||||
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc)
|
subFs, fsErr := subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, id, einoExecMonitor)
|
||||||
if fsErr != nil {
|
if fsErr != nil {
|
||||||
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
|
return nil, fmt.Errorf("子代理 %q filesystem 中间件: %w", id, fsErr)
|
||||||
}
|
}
|
||||||
@@ -338,6 +330,16 @@ func RunDeepAgent(
|
|||||||
orchDescription = d
|
orchDescription = d
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk, toolInvokeNotify, orchestratorName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
mainToolsForCfg, mainOrchestratorPre, err := prependEinoMiddlewares(ctx, &ma.EinoMiddleware, einoMWMain, mainTools, einoLoc, skillsRoot, conversationID, logger)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
orchInstruction = injectToolNamesOnlyInstruction(ctx, orchInstruction, mainTools)
|
orchInstruction = injectToolNamesOnlyInstruction(ctx, orchInstruction, mainTools)
|
||||||
if logger != nil {
|
if logger != nil {
|
||||||
mainNames := collectToolNames(ctx, mainTools)
|
mainNames := collectToolNames(ctx, mainTools)
|
||||||
@@ -381,7 +383,12 @@ func RunDeepAgent(
|
|||||||
var deepShell filesystem.StreamingShell
|
var deepShell filesystem.StreamingShell
|
||||||
if einoLoc != nil && einoFSTools {
|
if einoLoc != nil && einoFSTools {
|
||||||
deepBackend = einoLoc
|
deepBackend = einoLoc
|
||||||
deepShell = einoLoc
|
deepShell = &einoStreamingShellWrap{
|
||||||
|
inner: einoLoc,
|
||||||
|
invokeNotify: toolInvokeNotify,
|
||||||
|
einoAgentName: orchestratorName,
|
||||||
|
recordMonitor: einoExecMonitor,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。
|
// noNestedTaskMiddleware 必须在最外层(最先拦截),防止 skill 或其他中间件内部触发 task 调用绕过检测。
|
||||||
@@ -438,7 +445,7 @@ func RunDeepAgent(
|
|||||||
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
|
// 构建 filesystem 中间件(与 Deep sub-agent 一致)
|
||||||
var peFsMw adk.ChatModelAgentMiddleware
|
var peFsMw adk.ChatModelAgentMiddleware
|
||||||
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
|
if einoSkillMW != nil && einoFSTools && einoLoc != nil {
|
||||||
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc)
|
peFsMw, err = subAgentFilesystemMiddleware(ctx, einoLoc, toolInvokeNotify, "executor", einoExecMonitor)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
|
return nil, fmt.Errorf("plan_execute filesystem 中间件: %w", err)
|
||||||
}
|
}
|
||||||
@@ -560,6 +567,7 @@ func RunDeepAgent(
|
|||||||
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
CheckpointDir: ma.EinoMiddleware.CheckpointDir,
|
||||||
McpIDsMu: &mcpIDsMu,
|
McpIDsMu: &mcpIDsMu,
|
||||||
McpIDs: &mcpIDs,
|
McpIDs: &mcpIDs,
|
||||||
|
ToolInvokeNotify: toolInvokeNotify,
|
||||||
DA: da,
|
DA: da,
|
||||||
EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " +
|
EmptyResponseMessage: "(Eino multi-agent orchestration completed but no assistant text was captured. Check process details or logs.) " +
|
||||||
"(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
"(Eino 多代理编排已完成,但未捕获到助手文本输出。请查看过程详情或日志。)",
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package openai
|
||||||
|
|
||||||
|
import "testing"
|
||||||
|
|
||||||
|
func TestNormalizeStreamingDelta_RepeatedCharBoundary(t *testing.T) {
|
||||||
|
// 流式在重复数字边界分片:不得把 "43" 的首字符与 "194" 尾字符误合并。
|
||||||
|
cur, d := normalizeStreamingDelta("https://x:194", "43")
|
||||||
|
if want := "https://x:19443"; cur != want {
|
||||||
|
t.Fatalf("next: want %q got %q", want, cur)
|
||||||
|
}
|
||||||
|
if d != "43" {
|
||||||
|
t.Fatalf("delta: want %q got %q", "43", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeStreamingDelta_CumulativePrefix(t *testing.T) {
|
||||||
|
cur, d := normalizeStreamingDelta("今天", "今天天气")
|
||||||
|
if cur != "今天天气" || d != "天气" {
|
||||||
|
t.Fatalf("got cur=%q d=%q", cur, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeStreamingDelta_FullRetransmit(t *testing.T) {
|
||||||
|
cur, d := normalizeStreamingDelta("今天", "今天")
|
||||||
|
if d != "" || cur != "今天" {
|
||||||
|
t.Fatalf("got cur=%q d=%q", cur, d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeStreamingDelta_SingleRuneRepeated(t *testing.T) {
|
||||||
|
cur, d := normalizeStreamingDelta("呀", "呀")
|
||||||
|
if want := "呀呀"; cur != want {
|
||||||
|
t.Fatalf("next: want %q got %q", want, cur)
|
||||||
|
}
|
||||||
|
if d != "呀" {
|
||||||
|
t.Fatalf("delta: want %q got %q", "呀", d)
|
||||||
|
}
|
||||||
|
cur, d = normalizeStreamingDelta("4", "4")
|
||||||
|
if want := "44"; cur != want {
|
||||||
|
t.Fatalf("next: want %q got %q", want, cur)
|
||||||
|
}
|
||||||
|
if d != "4" {
|
||||||
|
t.Fatalf("delta: want %q got %q", "4", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNormalizeStreamingDelta_CumulativeExtendsNumber(t *testing.T) {
|
||||||
|
// 已缓冲 "194" 后收到累计串 "19443"(注意 "1943" 并非 "19443" 的前缀,不能靠误写的中间态测 HasPrefix)。
|
||||||
|
cur, d := normalizeStreamingDelta("194", "19443")
|
||||||
|
if want := "19443"; cur != want {
|
||||||
|
t.Fatalf("next: want %q got %q", want, cur)
|
||||||
|
}
|
||||||
|
if d != "43" {
|
||||||
|
t.Fatalf("delta: want %q got %q", "43", d)
|
||||||
|
}
|
||||||
|
}
|
||||||
+12
-17
@@ -10,6 +10,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
"unicode/utf8"
|
||||||
|
|
||||||
"cyberstrike-ai/internal/config"
|
"cyberstrike-ai/internal/config"
|
||||||
|
|
||||||
@@ -34,7 +35,15 @@ func (e *APIError) Error() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// normalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。
|
// normalizeStreamingDelta 将可能是“累计片段/重发片段”的内容归一化为“纯增量”。
|
||||||
// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本(结巴)。
|
// 部分兼容网关会返回累计 content;若直接 append 会出现重复文本。
|
||||||
|
//
|
||||||
|
// 注意:
|
||||||
|
// - 不做「任意后缀与前缀重叠」合并;流式可能在重复字符边界分片("194"+"43"→"19443")。
|
||||||
|
// - HasPrefix 仅在 incoming 严格长于 current 时视为累计全文,否则会把分片产生的第二个相同
|
||||||
|
// 单字/单码点(叠字、44、22 等)误判为「整段重复」而吞字。
|
||||||
|
// - incoming==current 仅当 current 长度 >1 个码点时才视为整包重发;单码点重复必须走拼接。
|
||||||
|
// - 不再使用「current 以 incoming 结尾则丢弃」:否则 "1943"+"43" 会误吞增量(19443 显示成 1943)。
|
||||||
|
// 若网关重复发送尾部片段,应重复送完整累计串,由 HasPrefix 分支去重。
|
||||||
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
||||||
if incoming == "" {
|
if incoming == "" {
|
||||||
return current, ""
|
return current, ""
|
||||||
@@ -42,26 +51,12 @@ func normalizeStreamingDelta(current, incoming string) (next, delta string) {
|
|||||||
if current == "" {
|
if current == "" {
|
||||||
return incoming, incoming
|
return incoming, incoming
|
||||||
}
|
}
|
||||||
if incoming == current {
|
if strings.HasPrefix(incoming, current) && len(incoming) > len(current) {
|
||||||
return current, ""
|
|
||||||
}
|
|
||||||
if strings.HasPrefix(incoming, current) {
|
|
||||||
return incoming, incoming[len(current):]
|
return incoming, incoming[len(current):]
|
||||||
}
|
}
|
||||||
if strings.HasSuffix(current, incoming) {
|
if incoming == current && utf8.RuneCountInString(current) > 1 {
|
||||||
return current, ""
|
return current, ""
|
||||||
}
|
}
|
||||||
|
|
||||||
// 边界重叠:current 后缀与 incoming 前缀重合,仅追加非重叠部分。
|
|
||||||
max := len(current)
|
|
||||||
if len(incoming) < max {
|
|
||||||
max = len(incoming)
|
|
||||||
}
|
|
||||||
for overlap := max; overlap > 0; overlap-- {
|
|
||||||
if current[len(current)-overlap:] == incoming[:overlap] {
|
|
||||||
return current + incoming[overlap:], incoming[overlap:]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return current + incoming, incoming
|
return current + incoming, incoming
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user