|
|
|
@@ -296,12 +296,23 @@ func RunDeepAgent(
|
|
|
|
|
streamsMainAssistant := func(agent string) bool {
|
|
|
|
|
return agent == "" || agent == orchestratorName
|
|
|
|
|
}
|
|
|
|
|
einoRoleTag := func(agent string) string {
|
|
|
|
|
if streamsMainAssistant(agent) {
|
|
|
|
|
return "orchestrator"
|
|
|
|
|
}
|
|
|
|
|
return "sub"
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 仅保留主代理最后一次 assistant 输出,避免把多轮中间回复拼接到最终答案。
|
|
|
|
|
var lastAssistant string
|
|
|
|
|
var reasoningStreamSeq int64
|
|
|
|
|
var einoSubReplyStreamSeq int64
|
|
|
|
|
toolEmitSeen := make(map[string]struct{})
|
|
|
|
|
// 主代理「外层轮次」:首次进入编排器为第 1 轮,每从子代理回到编排器 +1。
|
|
|
|
|
// 子代理「步数」:该子代理每次发起一批工具调用前 +1(近似 ReAct 步)。
|
|
|
|
|
var einoMainRound int
|
|
|
|
|
var einoLastAgent string
|
|
|
|
|
subAgentToolStep := make(map[string]int)
|
|
|
|
|
for {
|
|
|
|
|
ev, ok := iter.Next()
|
|
|
|
|
if !ok {
|
|
|
|
@@ -320,9 +331,34 @@ func RunDeepAgent(
|
|
|
|
|
return nil, ev.Err
|
|
|
|
|
}
|
|
|
|
|
if ev.AgentName != "" && progress != nil {
|
|
|
|
|
if streamsMainAssistant(ev.AgentName) {
|
|
|
|
|
if einoMainRound == 0 {
|
|
|
|
|
einoMainRound = 1
|
|
|
|
|
progress("iteration", "", map[string]interface{}{
|
|
|
|
|
"iteration": 1,
|
|
|
|
|
"einoScope": "main",
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
"einoAgent": orchestratorName,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
} else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) {
|
|
|
|
|
einoMainRound++
|
|
|
|
|
progress("iteration", "", map[string]interface{}{
|
|
|
|
|
"iteration": einoMainRound,
|
|
|
|
|
"einoScope": "main",
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
"einoAgent": orchestratorName,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
einoLastAgent = ev.AgentName
|
|
|
|
|
progress("progress", fmt.Sprintf("[Eino] %s", ev.AgentName), map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": einoRoleTag(ev.AgentName),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
if ev.Output == nil || ev.Output.MessageOutput == nil {
|
|
|
|
@@ -355,9 +391,10 @@ func RunDeepAgent(
|
|
|
|
|
if reasoningStreamID == "" {
|
|
|
|
|
reasoningStreamID = fmt.Sprintf("eino-reasoning-%s-%d", conversationID, atomic.AddInt64(&reasoningStreamSeq, 1))
|
|
|
|
|
progress("thinking_stream_start", " ", map[string]interface{}{
|
|
|
|
|
"streamId": reasoningStreamID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"streamId": reasoningStreamID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": einoRoleTag(ev.AgentName),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
progress("thinking_stream_delta", chunk.ReasoningContent, map[string]interface{}{
|
|
|
|
@@ -369,14 +406,16 @@ func RunDeepAgent(
|
|
|
|
|
if !streamHeaderSent {
|
|
|
|
|
progress("response_start", "", map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"mcpExecutionIds": snapshotMCPIDs(),
|
|
|
|
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
|
|
|
|
"mcpExecutionIds": snapshotMCPIDs(),
|
|
|
|
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
})
|
|
|
|
|
streamHeaderSent = true
|
|
|
|
|
}
|
|
|
|
|
progress("response_delta", chunk.Content, map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"mcpExecutionIds": snapshotMCPIDs(),
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
})
|
|
|
|
|
mainAssistantBuf.WriteString(chunk.Content)
|
|
|
|
|
} else if !streamsMainAssistant(ev.AgentName) {
|
|
|
|
@@ -384,10 +423,11 @@ func RunDeepAgent(
|
|
|
|
|
if subReplyStreamID == "" {
|
|
|
|
|
subReplyStreamID = fmt.Sprintf("eino-sub-reply-%s-%d", conversationID, atomic.AddInt64(&einoSubReplyStreamSeq, 1))
|
|
|
|
|
progress("eino_agent_reply_stream_start", "", map[string]interface{}{
|
|
|
|
|
"streamId": subReplyStreamID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"streamId": subReplyStreamID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": "sub",
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
progress("eino_agent_reply_stream_delta", chunk.Content, map[string]interface{}{
|
|
|
|
@@ -412,16 +452,18 @@ func RunDeepAgent(
|
|
|
|
|
if s := strings.TrimSpace(subAssistantBuf.String()); s != "" {
|
|
|
|
|
if subReplyStreamID != "" {
|
|
|
|
|
progress("eino_agent_reply_stream_end", s, map[string]interface{}{
|
|
|
|
|
"streamId": subReplyStreamID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"streamId": subReplyStreamID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": "sub",
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
} else {
|
|
|
|
|
progress("eino_agent_reply", s, map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": "sub",
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@@ -430,7 +472,7 @@ func RunDeepAgent(
|
|
|
|
|
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
|
|
|
|
|
lastToolChunk = &schema.Message{ToolCalls: merged}
|
|
|
|
|
}
|
|
|
|
|
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, conversationID, progress, toolEmitSeen)
|
|
|
|
|
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@@ -438,7 +480,7 @@ func RunDeepAgent(
|
|
|
|
|
if gerr != nil || msg == nil {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, conversationID, progress, toolEmitSeen)
|
|
|
|
|
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep)
|
|
|
|
|
|
|
|
|
|
if mv.Role == schema.Assistant {
|
|
|
|
|
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
|
|
|
|
@@ -446,6 +488,7 @@ func RunDeepAgent(
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": einoRoleTag(ev.AgentName),
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
body := strings.TrimSpace(msg.Content)
|
|
|
|
@@ -456,10 +499,12 @@ func RunDeepAgent(
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"mcpExecutionIds": snapshotMCPIDs(),
|
|
|
|
|
"messageGeneratedBy": "eino:" + ev.AgentName,
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
})
|
|
|
|
|
progress("response_delta", body, map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"mcpExecutionIds": snapshotMCPIDs(),
|
|
|
|
|
"einoRole": "orchestrator",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
lastAssistant = body
|
|
|
|
@@ -467,6 +512,7 @@ func RunDeepAgent(
|
|
|
|
|
progress("eino_agent_reply", body, map[string]interface{}{
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": "sub",
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
@@ -499,6 +545,7 @@ func RunDeepAgent(
|
|
|
|
|
"resultPreview": preview,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"einoAgent": ev.AgentName,
|
|
|
|
|
"einoRole": einoRoleTag(ev.AgentName),
|
|
|
|
|
"source": "eino",
|
|
|
|
|
}
|
|
|
|
|
if msg.ToolCallID != "" {
|
|
|
|
@@ -644,7 +691,7 @@ func toolCallsRichSignature(msg *schema.Message) string {
|
|
|
|
|
return base + "|" + strings.Join(parts, ";")
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func tryEmitToolCallsOnce(msg *schema.Message, agentName, conversationID string, progress func(string, string, interface{}), seen map[string]struct{}) {
|
|
|
|
|
func tryEmitToolCallsOnce(msg *schema.Message, agentName, orchestratorName, conversationID string, progress func(string, string, interface{}), seen map[string]struct{}, subAgentToolStep map[string]int) {
|
|
|
|
|
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil || seen == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
@@ -656,18 +703,39 @@ func tryEmitToolCallsOnce(msg *schema.Message, agentName, conversationID string,
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
seen[sig] = struct{}{}
|
|
|
|
|
emitToolCallsFromMessage(msg, agentName, conversationID, progress)
|
|
|
|
|
emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, progress, subAgentToolStep)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func emitToolCallsFromMessage(msg *schema.Message, agentName, conversationID string, progress func(string, string, interface{})) {
|
|
|
|
|
func emitToolCallsFromMessage(msg *schema.Message, agentName, orchestratorName, conversationID string, progress func(string, string, interface{}), subAgentToolStep map[string]int) {
|
|
|
|
|
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
if subAgentToolStep == nil {
|
|
|
|
|
subAgentToolStep = make(map[string]int)
|
|
|
|
|
}
|
|
|
|
|
isSubToolRound := agentName != "" && agentName != orchestratorName
|
|
|
|
|
if isSubToolRound {
|
|
|
|
|
subAgentToolStep[agentName]++
|
|
|
|
|
n := subAgentToolStep[agentName]
|
|
|
|
|
progress("iteration", "", map[string]interface{}{
|
|
|
|
|
"iteration": n,
|
|
|
|
|
"einoScope": "sub",
|
|
|
|
|
"einoRole": "sub",
|
|
|
|
|
"einoAgent": agentName,
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
role := "orchestrator"
|
|
|
|
|
if isSubToolRound {
|
|
|
|
|
role = "sub"
|
|
|
|
|
}
|
|
|
|
|
progress("tool_calls_detected", fmt.Sprintf("检测到 %d 个工具调用", len(msg.ToolCalls)), map[string]interface{}{
|
|
|
|
|
"count": len(msg.ToolCalls),
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": agentName,
|
|
|
|
|
"einoRole": role,
|
|
|
|
|
})
|
|
|
|
|
for idx, tc := range msg.ToolCalls {
|
|
|
|
|
argStr := strings.TrimSpace(tc.Function.Arguments)
|
|
|
|
@@ -697,6 +765,7 @@ func emitToolCallsFromMessage(msg *schema.Message, agentName, conversationID str
|
|
|
|
|
"conversationId": conversationID,
|
|
|
|
|
"source": "eino",
|
|
|
|
|
"einoAgent": agentName,
|
|
|
|
|
"einoRole": role,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|