mirror of
https://github.com/Ed1s0nZ/CyberStrikeAI.git
synced 2026-03-31 00:09:29 +02:00
772 lines
23 KiB
Go
772 lines
23 KiB
Go
// Package multiagent 使用 CloudWeGo Eino 的 DeepAgent(adk/prebuilt/deep)编排多代理,MCP 工具经 einomcp 桥接到现有 Agent。
|
||
package multiagent
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"net"
|
||
"net/http"
|
||
"sort"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"cyberstrike-ai/internal/agent"
|
||
"cyberstrike-ai/internal/agents"
|
||
"cyberstrike-ai/internal/config"
|
||
"cyberstrike-ai/internal/einomcp"
|
||
|
||
einoopenai "github.com/cloudwego/eino-ext/components/model/openai"
|
||
"github.com/cloudwego/eino/adk"
|
||
"github.com/cloudwego/eino/adk/prebuilt/deep"
|
||
"github.com/cloudwego/eino/compose"
|
||
"github.com/cloudwego/eino/schema"
|
||
"go.uber.org/zap"
|
||
)
|
||
|
||
// RunResult 与单 Agent 循环结果字段对齐,便于复用存储与 SSE 收尾逻辑。
|
||
type RunResult struct {
|
||
Response string
|
||
MCPExecutionIDs []string
|
||
LastReActInput string
|
||
LastReActOutput string
|
||
}
|
||
|
||
// RunDeepAgent 使用 Eino DeepAgent 执行一轮对话(流式事件通过 progress 回调输出)。
|
||
func RunDeepAgent(
|
||
ctx context.Context,
|
||
appCfg *config.Config,
|
||
ma *config.MultiAgentConfig,
|
||
ag *agent.Agent,
|
||
logger *zap.Logger,
|
||
conversationID string,
|
||
userMessage string,
|
||
history []agent.ChatMessage,
|
||
roleTools []string,
|
||
progress func(eventType, message string, data interface{}),
|
||
agentsMarkdownDir string,
|
||
) (*RunResult, error) {
|
||
if appCfg == nil || ma == nil || ag == nil {
|
||
return nil, fmt.Errorf("multiagent: 配置或 Agent 为空")
|
||
}
|
||
|
||
effectiveSubs := ma.SubAgents
|
||
var orch *agents.OrchestratorMarkdown
|
||
if strings.TrimSpace(agentsMarkdownDir) != "" {
|
||
load, merr := agents.LoadMarkdownAgentsDir(agentsMarkdownDir)
|
||
if merr != nil {
|
||
if logger != nil {
|
||
logger.Warn("加载 agents 目录 Markdown 失败,沿用 config 中的 sub_agents", zap.Error(merr))
|
||
}
|
||
} else {
|
||
effectiveSubs = agents.MergeYAMLAndMarkdown(ma.SubAgents, load.SubAgents)
|
||
orch = load.Orchestrator
|
||
}
|
||
}
|
||
if ma.WithoutGeneralSubAgent && len(effectiveSubs) == 0 {
|
||
return nil, fmt.Errorf("multi_agent.without_general_sub_agent 为 true 时,必须在 multi_agent.sub_agents 或 agents 目录 Markdown 中配置至少一个子代理")
|
||
}
|
||
|
||
holder := &einomcp.ConversationHolder{}
|
||
holder.Set(conversationID)
|
||
|
||
var mcpIDsMu sync.Mutex
|
||
var mcpIDs []string
|
||
recorder := func(id string) {
|
||
if id == "" {
|
||
return
|
||
}
|
||
mcpIDsMu.Lock()
|
||
mcpIDs = append(mcpIDs, id)
|
||
mcpIDsMu.Unlock()
|
||
}
|
||
|
||
// 与单代理流式一致:在 response_start / response_delta 的 data 中带当前 mcpExecutionIds,供主聊天绑定复制与展示。
|
||
snapshotMCPIDs := func() []string {
|
||
mcpIDsMu.Lock()
|
||
defer mcpIDsMu.Unlock()
|
||
out := make([]string, len(mcpIDs))
|
||
copy(out, mcpIDs)
|
||
return out
|
||
}
|
||
|
||
mainDefs := ag.ToolsForRole(roleTools)
|
||
toolOutputChunk := func(toolName, toolCallID, chunk string) {
|
||
// When toolCallId is missing, frontend ignores tool_result_delta.
|
||
if progress == nil || toolCallID == "" {
|
||
return
|
||
}
|
||
progress("tool_result_delta", chunk, map[string]interface{}{
|
||
"toolName": toolName,
|
||
"toolCallId": toolCallID,
|
||
// index/total/iteration are optional for UI; we don't know them in this bridge.
|
||
"index": 0,
|
||
"total": 0,
|
||
"iteration": 0,
|
||
"source": "eino",
|
||
})
|
||
}
|
||
|
||
mainTools, err := einomcp.ToolsFromDefinitions(ag, holder, mainDefs, recorder, toolOutputChunk)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
httpClient := &http.Client{
|
||
Timeout: 30 * time.Minute,
|
||
Transport: &http.Transport{
|
||
DialContext: (&net.Dialer{
|
||
Timeout: 300 * time.Second,
|
||
KeepAlive: 300 * time.Second,
|
||
}).DialContext,
|
||
MaxIdleConns: 100,
|
||
MaxIdleConnsPerHost: 10,
|
||
IdleConnTimeout: 90 * time.Second,
|
||
TLSHandshakeTimeout: 30 * time.Second,
|
||
ResponseHeaderTimeout: 60 * time.Minute,
|
||
},
|
||
}
|
||
|
||
baseModelCfg := &einoopenai.ChatModelConfig{
|
||
APIKey: appCfg.OpenAI.APIKey,
|
||
BaseURL: strings.TrimSuffix(appCfg.OpenAI.BaseURL, "/"),
|
||
Model: appCfg.OpenAI.Model,
|
||
HTTPClient: httpClient,
|
||
}
|
||
|
||
deepMaxIter := ma.MaxIteration
|
||
if deepMaxIter <= 0 {
|
||
deepMaxIter = appCfg.Agent.MaxIterations
|
||
}
|
||
if deepMaxIter <= 0 {
|
||
deepMaxIter = 40
|
||
}
|
||
|
||
subDefaultIter := ma.SubAgentMaxIterations
|
||
if subDefaultIter <= 0 {
|
||
subDefaultIter = 20
|
||
}
|
||
|
||
subAgents := make([]adk.Agent, 0, len(effectiveSubs))
|
||
for _, sub := range effectiveSubs {
|
||
id := strings.TrimSpace(sub.ID)
|
||
if id == "" {
|
||
return nil, fmt.Errorf("multi_agent.sub_agents 中存在空的 id")
|
||
}
|
||
name := strings.TrimSpace(sub.Name)
|
||
if name == "" {
|
||
name = id
|
||
}
|
||
desc := strings.TrimSpace(sub.Description)
|
||
if desc == "" {
|
||
desc = fmt.Sprintf("Specialist agent %s for penetration testing workflow.", id)
|
||
}
|
||
instr := strings.TrimSpace(sub.Instruction)
|
||
if instr == "" {
|
||
instr = "你是 CyberStrikeAI 中的专业子代理,在授权渗透测试场景下协助完成用户委托的子任务。优先使用可用工具获取证据,回答简洁专业。"
|
||
}
|
||
|
||
roleTools := sub.RoleTools
|
||
bind := strings.TrimSpace(sub.BindRole)
|
||
if bind != "" && appCfg.Roles != nil {
|
||
if r, ok := appCfg.Roles[bind]; ok && r.Enabled {
|
||
if len(roleTools) == 0 && len(r.Tools) > 0 {
|
||
roleTools = r.Tools
|
||
}
|
||
if len(r.Skills) > 0 {
|
||
var b strings.Builder
|
||
b.WriteString(instr)
|
||
b.WriteString("\n\n本角色推荐通过 list_skills / read_skill 按需加载的 Skills:")
|
||
for i, s := range r.Skills {
|
||
if i > 0 {
|
||
b.WriteString("、")
|
||
}
|
||
b.WriteString(s)
|
||
}
|
||
b.WriteString("。")
|
||
instr = b.String()
|
||
}
|
||
}
|
||
}
|
||
|
||
subModel, err := einoopenai.NewChatModel(ctx, baseModelCfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("子代理 %q ChatModel: %w", id, err)
|
||
}
|
||
|
||
subDefs := ag.ToolsForRole(roleTools)
|
||
subTools, err := einomcp.ToolsFromDefinitions(ag, holder, subDefs, recorder, toolOutputChunk)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("子代理 %q 工具: %w", id, err)
|
||
}
|
||
|
||
subMax := sub.MaxIterations
|
||
if subMax <= 0 {
|
||
subMax = subDefaultIter
|
||
}
|
||
|
||
subSumMw, err := newEinoSummarizationMiddleware(ctx, subModel, appCfg, logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("子代理 %q summarization 中间件: %w", id, err)
|
||
}
|
||
|
||
sa, err := adk.NewChatModelAgent(ctx, &adk.ChatModelAgentConfig{
|
||
Name: id,
|
||
Description: desc,
|
||
Instruction: instr,
|
||
Model: subModel,
|
||
ToolsConfig: adk.ToolsConfig{
|
||
ToolsNodeConfig: compose.ToolsNodeConfig{
|
||
Tools: subTools,
|
||
},
|
||
EmitInternalEvents: true,
|
||
},
|
||
MaxIterations: subMax,
|
||
Handlers: []adk.ChatModelAgentMiddleware{subSumMw},
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("子代理 %q: %w", id, err)
|
||
}
|
||
subAgents = append(subAgents, sa)
|
||
}
|
||
|
||
mainModel, err := einoopenai.NewChatModel(ctx, baseModelCfg)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("Deep 主模型: %w", err)
|
||
}
|
||
|
||
mainSumMw, err := newEinoSummarizationMiddleware(ctx, mainModel, appCfg, logger)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("Deep 主代理 summarization 中间件: %w", err)
|
||
}
|
||
|
||
// 与 deep.Config.Name 一致。子代理的 assistant 正文也会经 EmitInternalEvents 流出,若全部当主回复会重复(编排器总结 + 子代理原文)。
|
||
orchestratorName := "cyberstrike-deep"
|
||
orchDescription := "Coordinates specialist agents and MCP tools for authorized security testing."
|
||
orchInstruction := strings.TrimSpace(ma.OrchestratorInstruction)
|
||
if orch != nil {
|
||
if strings.TrimSpace(orch.EinoName) != "" {
|
||
orchestratorName = strings.TrimSpace(orch.EinoName)
|
||
}
|
||
if d := strings.TrimSpace(orch.Description); d != "" {
|
||
orchDescription = d
|
||
}
|
||
if ins := strings.TrimSpace(orch.Instruction); ins != "" {
|
||
orchInstruction = ins
|
||
}
|
||
}
|
||
da, err := deep.New(ctx, &deep.Config{
|
||
Name: orchestratorName,
|
||
Description: orchDescription,
|
||
ChatModel: mainModel,
|
||
Instruction: orchInstruction,
|
||
SubAgents: subAgents,
|
||
WithoutGeneralSubAgent: ma.WithoutGeneralSubAgent,
|
||
WithoutWriteTodos: ma.WithoutWriteTodos,
|
||
MaxIteration: deepMaxIter,
|
||
// 防止 sub-agent 再调用 task(再委派 sub-agent),形成无限委派链。
|
||
Handlers: []adk.ChatModelAgentMiddleware{
|
||
newNoNestedTaskMiddleware(),
|
||
mainSumMw,
|
||
},
|
||
ToolsConfig: adk.ToolsConfig{
|
||
ToolsNodeConfig: compose.ToolsNodeConfig{
|
||
Tools: mainTools,
|
||
},
|
||
EmitInternalEvents: true,
|
||
},
|
||
})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("deep.New: %w", err)
|
||
}
|
||
|
||
msgs := historyToMessages(history)
|
||
msgs = append(msgs, schema.UserMessage(userMessage))
|
||
|
||
runner := adk.NewRunner(ctx, adk.RunnerConfig{
|
||
Agent: da,
|
||
EnableStreaming: true,
|
||
})
|
||
iter := runner.Run(ctx, msgs)
|
||
|
||
streamsMainAssistant := func(agent string) bool {
|
||
return agent == "" || agent == orchestratorName
|
||
}
|
||
|
||
// 仅保留主代理最后一次 assistant 输出,避免把多轮中间回复拼接到最终答案。
|
||
var lastAssistant string
|
||
var reasoningStreamSeq int64
|
||
var einoSubReplyStreamSeq int64
|
||
toolEmitSeen := make(map[string]struct{})
|
||
for {
|
||
ev, ok := iter.Next()
|
||
if !ok {
|
||
break
|
||
}
|
||
if ev == nil {
|
||
continue
|
||
}
|
||
if ev.Err != nil {
|
||
if progress != nil {
|
||
progress("error", ev.Err.Error(), map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
})
|
||
}
|
||
return nil, ev.Err
|
||
}
|
||
if ev.AgentName != "" && progress != nil {
|
||
progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"einoAgent": ev.AgentName,
|
||
})
|
||
}
|
||
if ev.Output == nil || ev.Output.MessageOutput == nil {
|
||
continue
|
||
}
|
||
mv := ev.Output.MessageOutput
|
||
|
||
if mv.IsStreaming && mv.MessageStream != nil {
|
||
streamHeaderSent := false
|
||
var reasoningStreamID string
|
||
var toolStreamFragments []schema.ToolCall
|
||
var subAssistantBuf strings.Builder
|
||
var subReplyStreamID string
|
||
var mainAssistantBuf strings.Builder
|
||
for {
|
||
chunk, rerr := mv.MessageStream.Recv()
|
||
if rerr != nil {
|
||
if errors.Is(rerr, io.EOF) {
|
||
break
|
||
}
|
||
if logger != nil {
|
||
logger.Warn("eino stream recv", zap.Error(rerr))
|
||
}
|
||
break
|
||
}
|
||
if chunk == nil {
|
||
continue
|
||
}
|
||
if progress != nil && strings.TrimSpace(chunk.ReasoningContent) != "" {
|
||
if reasoningStreamID == "" {
|
||
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
||
progress("thinking_stream_start", " ", map[string]interface{}{
|
||
"streamId": reasoningStreamID,
|
||
"source": "eino",
|
||
"einoAgent": ev.AgentName,
|
||
})
|
||
}
|
||
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
|
||
"streamId": reasoningStreamID,
|
||
})
|
||
}
|
||
if chunk.Content != "" {
|
||
if progress != nil && streamsMainAssistant(ev.AgentName) {
|
||
if !streamHeaderSent {
|
||
progress("response_start", "", map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"mcpExecutionIds": snapshotMCPIDs(),
|
||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||
})
|
||
streamHeaderSent = true
|
||
}
|
||
progress("response_delta", chunk.Content, map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"mcpExecutionIds": snapshotMCPIDs(),
|
||
})
|
||
mainAssistantBuf.WriteString(chunk.Content)
|
||
} else if !streamsMainAssistant(ev.AgentName) {
|
||
if progress != nil {
|
||
if subReplyStreamID == "" {
|
||
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
||
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
||
"streamId": subReplyStreamID,
|
||
"einoAgent": ev.AgentName,
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
})
|
||
}
|
||
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
|
||
"streamId": subReplyStreamID,
|
||
"conversationId": conversationID,
|
||
})
|
||
}
|
||
subAssistantBuf.WriteString(chunk.Content)
|
||
}
|
||
}
|
||
// 收集流式 tool_calls 全部分片;arguments 在最后一帧常为 "",需按 index/id 合并后才能展示 subagent_type/description。
|
||
if len(chunk.ToolCalls) > 0 {
|
||
toolStreamFragments = append(toolStreamFragments, chunk.ToolCalls...)
|
||
}
|
||
}
|
||
if streamsMainAssistant(ev.AgentName) {
|
||
if s := strings.TrimSpace(mainAssistantBuf.String()); s != "" {
|
||
lastAssistant = s
|
||
}
|
||
}
|
||
if subAssistantBuf.Len() > 0 && progress != nil {
|
||
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
|
||
if subReplyStreamID != "" {
|
||
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
|
||
"streamId": subReplyStreamID,
|
||
"einoAgent": ev.AgentName,
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
})
|
||
} else {
|
||
progress("eino_agent_reply", s, map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"einoAgent": ev.AgentName,
|
||
"source": "eino",
|
||
})
|
||
}
|
||
}
|
||
}
|
||
var lastToolChunk *schema.Message
|
||
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
||
lastToolChunk = &schema.Message{ToolCalls: merged}
|
||
}
|
||
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, conversationID, progress, toolEmitSeen)
|
||
continue
|
||
}
|
||
|
||
msg, gerr := mv.GetMessage()
|
||
if gerr != nil || msg == nil {
|
||
continue
|
||
}
|
||
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, conversationID, progress, toolEmitSeen)
|
||
|
||
if mv.Role == schema.Assistant {
|
||
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
|
||
progress("thinking", strings.TrimSpace(msg.ReasoningContent), map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
"einoAgent": ev.AgentName,
|
||
})
|
||
}
|
||
body := strings.TrimSpace(msg.Content)
|
||
if body != "" {
|
||
if streamsMainAssistant(ev.AgentName) {
|
||
if progress != nil {
|
||
progress("response_start", "", map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"mcpExecutionIds": snapshotMCPIDs(),
|
||
"messageGeneratedBy": "eino:" + ev.AgentName,
|
||
})
|
||
progress("response_delta", body, map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"mcpExecutionIds": snapshotMCPIDs(),
|
||
})
|
||
}
|
||
lastAssistant = body
|
||
} else if progress != nil {
|
||
progress("eino_agent_reply", body, map[string]interface{}{
|
||
"conversationId": conversationID,
|
||
"einoAgent": ev.AgentName,
|
||
"source": "eino",
|
||
})
|
||
}
|
||
}
|
||
}
|
||
|
||
if mv.Role == schema.Tool && progress != nil {
|
||
toolName := msg.ToolName
|
||
if toolName == "" {
|
||
toolName = mv.ToolName
|
||
}
|
||
|
||
// bridge 工具在 res.IsError=true 时会返回带前缀的内容;这里解析为 success/isError,避免前端误判为成功。
|
||
content := msg.Content
|
||
isErr := false
|
||
if strings.HasPrefix(content, einomcp.ToolErrorPrefix) {
|
||
isErr = true
|
||
content = strings.TrimPrefix(content, einomcp.ToolErrorPrefix)
|
||
}
|
||
|
||
preview := content
|
||
if len(preview) > 200 {
|
||
preview = preview[:200] + "..."
|
||
}
|
||
data := map[string]interface{}{
|
||
"toolName": toolName,
|
||
"success": !isErr,
|
||
"isError": isErr,
|
||
"result": content,
|
||
"resultPreview": preview,
|
||
"conversationId": conversationID,
|
||
"einoAgent": ev.AgentName,
|
||
"source": "eino",
|
||
}
|
||
if msg.ToolCallID != "" {
|
||
data["toolCallId"] = msg.ToolCallID
|
||
}
|
||
progress("tool_result", fmt.Sprintf("工具结果 (%s)", toolName), data)
|
||
}
|
||
}
|
||
|
||
mcpIDsMu.Lock()
|
||
ids := append([]string(nil), mcpIDs...)
|
||
mcpIDsMu.Unlock()
|
||
|
||
histJSON, _ := json.Marshal(msgs)
|
||
cleaned := strings.TrimSpace(lastAssistant)
|
||
cleaned = dedupeRepeatedParagraphs(cleaned, 80)
|
||
cleaned = dedupeParagraphsByLineFingerprint(cleaned, 100)
|
||
out := &RunResult{
|
||
Response: cleaned,
|
||
MCPExecutionIDs: ids,
|
||
LastReActInput: string(histJSON),
|
||
LastReActOutput: cleaned,
|
||
}
|
||
if out.Response == "" {
|
||
out.Response = "(Eino DeepAgent 已完成,但未捕获到助手文本输出。请查看过程详情或日志。)"
|
||
out.LastReActOutput = out.Response
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
func historyToMessages(history []agent.ChatMessage) []adk.Message {
|
||
if len(history) == 0 {
|
||
return nil
|
||
}
|
||
// 放宽条数上限:跨轮历史交给 Eino Summarization(阈值对齐 openai.max_total_tokens)在调用模型前压缩,避免在入队前硬截断为 40 条。
|
||
const maxHistoryMessages = 300
|
||
start := 0
|
||
if len(history) > maxHistoryMessages {
|
||
start = len(history) - maxHistoryMessages
|
||
}
|
||
out := make([]adk.Message, 0, len(history[start:]))
|
||
for _, h := range history[start:] {
|
||
switch h.Role {
|
||
case "user":
|
||
if strings.TrimSpace(h.Content) != "" {
|
||
out = append(out, schema.UserMessage(h.Content))
|
||
}
|
||
case "assistant":
|
||
if strings.TrimSpace(h.Content) == "" && len(h.ToolCalls) > 0 {
|
||
continue
|
||
}
|
||
if strings.TrimSpace(h.Content) != "" {
|
||
out = append(out, schema.AssistantMessage(h.Content, nil))
|
||
}
|
||
default:
|
||
continue
|
||
}
|
||
}
|
||
return out
|
||
}
|
||
|
||
// mergeStreamingToolCallFragments 将流式多帧的 ToolCall 按 index 合并 arguments(与 schema.concatToolCalls 行为一致)。
|
||
func mergeStreamingToolCallFragments(fragments []schema.ToolCall) []schema.ToolCall {
|
||
if len(fragments) == 0 {
|
||
return nil
|
||
}
|
||
m, err := schema.ConcatMessages([]*schema.Message{{ToolCalls: fragments}})
|
||
if err != nil || m == nil {
|
||
return fragments
|
||
}
|
||
return m.ToolCalls
|
||
}
|
||
|
||
// mergeMessageToolCalls 非流式路径上若仍带分片式 tool_calls,合并后再上报 UI。
|
||
func mergeMessageToolCalls(msg *schema.Message) *schema.Message {
|
||
if msg == nil || len(msg.ToolCalls) == 0 {
|
||
return msg
|
||
}
|
||
m, err := schema.ConcatMessages([]*schema.Message{msg})
|
||
if err != nil || m == nil {
|
||
return msg
|
||
}
|
||
out := *msg
|
||
out.ToolCalls = m.ToolCalls
|
||
return &out
|
||
}
|
||
|
||
// toolCallStableID 用于流式阶段去重;OpenAI 流式常先给 index 后补 id。
|
||
func toolCallStableID(tc schema.ToolCall) string {
|
||
if tc.ID != "" {
|
||
return tc.ID
|
||
}
|
||
if tc.Index != nil {
|
||
return fmt.Sprintf("idx:%d", *tc.Index)
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// toolCallDisplayName 避免前端「未知工具」:DeepAgent 内置 task 等可能延迟写入 function.name。
|
||
func toolCallDisplayName(tc schema.ToolCall) string {
|
||
if n := strings.TrimSpace(tc.Function.Name); n != "" {
|
||
return n
|
||
}
|
||
if n := strings.TrimSpace(tc.Type); n != "" && !strings.EqualFold(n, "function") {
|
||
return n
|
||
}
|
||
return "task"
|
||
}
|
||
|
||
// toolCallsSignatureFlush 用于去重键;无 id/index 时用占位 pos,避免流末帧缺 id 时整条工具事件丢失。
|
||
func toolCallsSignatureFlush(msg *schema.Message) string {
|
||
if msg == nil || len(msg.ToolCalls) == 0 {
|
||
return ""
|
||
}
|
||
parts := make([]string, 0, len(msg.ToolCalls))
|
||
for i, tc := range msg.ToolCalls {
|
||
id := toolCallStableID(tc)
|
||
if id == "" {
|
||
id = fmt.Sprintf("pos:%d", i)
|
||
}
|
||
parts = append(parts, id+"|"+toolCallDisplayName(tc))
|
||
}
|
||
sort.Strings(parts)
|
||
return strings.Join(parts, ";")
|
||
}
|
||
|
||
// toolCallsRichSignature 用于去重:同一次流式已上报后,紧随其后的非流式消息常带相同 tool_calls。
|
||
func toolCallsRichSignature(msg *schema.Message) string {
|
||
base := toolCallsSignatureFlush(msg)
|
||
if base == "" {
|
||
return ""
|
||
}
|
||
parts := make([]string, 0, len(msg.ToolCalls))
|
||
for _, tc := range msg.ToolCalls {
|
||
id := toolCallStableID(tc)
|
||
arg := tc.Function.Arguments
|
||
if len(arg) > 240 {
|
||
arg = arg[:240]
|
||
}
|
||
parts = append(parts, id+":"+arg)
|
||
}
|
||
sort.Strings(parts)
|
||
return base + "|" + strings.Join(parts, ";")
|
||
}
|
||
|
||
func tryEmitToolCallsOnce(msg *schema.Message, agentName, conversationID string, progress func(string, string, interface{}), seen map[string]struct{}) {
|
||
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil || seen == nil {
|
||
return
|
||
}
|
||
if toolCallsSignatureFlush(msg) == "" {
|
||
return
|
||
}
|
||
sig := agentName + "\x1e" + toolCallsRichSignature(msg)
|
||
if _, ok := seen[sig]; ok {
|
||
return
|
||
}
|
||
seen[sig] = struct{}{}
|
||
emitToolCallsFromMessage(msg, agentName, conversationID, progress)
|
||
}
|
||
|
||
func emitToolCallsFromMessage(msg *schema.Message, agentName, conversationID string, progress func(string, string, interface{})) {
|
||
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil {
|
||
return
|
||
}
|
||
progress("tool_calls_detected", fmt.Sprintf("检测到 %d 个工具调用", len(msg.ToolCalls)), map[string]interface{}{
|
||
"count": len(msg.ToolCalls),
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
"einoAgent": agentName,
|
||
})
|
||
for idx, tc := range msg.ToolCalls {
|
||
argStr := strings.TrimSpace(tc.Function.Arguments)
|
||
if argStr == "" && len(tc.Extra) > 0 {
|
||
if b, mErr := json.Marshal(tc.Extra); mErr == nil {
|
||
argStr = string(b)
|
||
}
|
||
}
|
||
var argsObj map[string]interface{}
|
||
if argStr != "" {
|
||
if uErr := json.Unmarshal([]byte(argStr), &argsObj); uErr != nil || argsObj == nil {
|
||
argsObj = map[string]interface{}{"_raw": argStr}
|
||
}
|
||
}
|
||
display := toolCallDisplayName(tc)
|
||
toolCallID := tc.ID
|
||
if toolCallID == "" && tc.Index != nil {
|
||
toolCallID = fmt.Sprintf("eino-stream-%d", *tc.Index)
|
||
}
|
||
progress("tool_call", fmt.Sprintf("正在调用工具: %s", display), map[string]interface{}{
|
||
"toolName": display,
|
||
"arguments": argStr,
|
||
"argumentsObj": argsObj,
|
||
"toolCallId": toolCallID,
|
||
"index": idx + 1,
|
||
"total": len(msg.ToolCalls),
|
||
"conversationId": conversationID,
|
||
"source": "eino",
|
||
"einoAgent": agentName,
|
||
})
|
||
}
|
||
}
|
||
|
||
// dedupeRepeatedParagraphs 去掉完全相同的连续/重复段落,缓解多代理各自复述同一列表。
|
||
func dedupeRepeatedParagraphs(s string, minLen int) string {
|
||
if s == "" || minLen <= 0 {
|
||
return s
|
||
}
|
||
paras := strings.Split(s, "\n\n")
|
||
var out []string
|
||
seen := make(map[string]bool)
|
||
for _, p := range paras {
|
||
t := strings.TrimSpace(p)
|
||
if len(t) < minLen {
|
||
out = append(out, p)
|
||
continue
|
||
}
|
||
if seen[t] {
|
||
continue
|
||
}
|
||
seen[t] = true
|
||
out = append(out, p)
|
||
}
|
||
return strings.TrimSpace(strings.Join(out, "\n\n"))
|
||
}
|
||
|
||
// dedupeParagraphsByLineFingerprint 去掉「正文行集合相同」的重复段落(开场白略不同也会合并),缓解多代理各写一遍目录清单。
|
||
func dedupeParagraphsByLineFingerprint(s string, minParaLen int) string {
|
||
if s == "" || minParaLen <= 0 {
|
||
return s
|
||
}
|
||
paras := strings.Split(s, "\n\n")
|
||
var out []string
|
||
seen := make(map[string]bool)
|
||
for _, p := range paras {
|
||
t := strings.TrimSpace(p)
|
||
if len(t) < minParaLen {
|
||
out = append(out, p)
|
||
continue
|
||
}
|
||
fp := paragraphLineFingerprint(t)
|
||
// 指纹仅在「≥4 条非空行」时有效;单行/短段落长回复(如自我介绍)fp 为空,必须保留,否则会误删全文并触发「未捕获到助手文本」占位。
|
||
if fp == "" {
|
||
out = append(out, p)
|
||
continue
|
||
}
|
||
if seen[fp] {
|
||
continue
|
||
}
|
||
seen[fp] = true
|
||
out = append(out, p)
|
||
}
|
||
return strings.TrimSpace(strings.Join(out, "\n\n"))
|
||
}
|
||
|
||
func paragraphLineFingerprint(t string) string {
|
||
lines := strings.Split(t, "\n")
|
||
norm := make([]string, 0, len(lines))
|
||
for _, L := range lines {
|
||
s := strings.TrimSpace(L)
|
||
if s == "" {
|
||
continue
|
||
}
|
||
norm = append(norm, s)
|
||
}
|
||
if len(norm) < 4 {
|
||
return ""
|
||
}
|
||
sort.Strings(norm)
|
||
return strings.Join(norm, "\x1e")
|
||
}
|