Add files via upload

This commit is contained in:
公明
2026-05-19 16:25:47 +08:00
committed by GitHub
parent a022baef03
commit 73a39ef868
4 changed files with 282 additions and 18 deletions
+167
View File
@@ -0,0 +1,167 @@
package agent
import (
"encoding/json"
"strings"
)
// ParseTraceMessages 解析落库的 last_react_inputOpenAI 风格 messages JSON 数组)。
func ParseTraceMessages(traceInputJSON string) ([]ChatMessage, error) {
traceInputJSON = strings.TrimSpace(traceInputJSON)
if traceInputJSON == "" {
return nil, nil
}
var raw []map[string]interface{}
if err := json.Unmarshal([]byte(traceInputJSON), &raw); err != nil {
return nil, err
}
out := make([]ChatMessage, 0, len(raw))
for _, msgMap := range raw {
msg := ChatMessage{}
role, _ := msgMap["role"].(string)
if role == "" {
continue
}
msg.Role = role
if content, ok := msgMap["content"].(string); ok {
msg.Content = content
}
if rc, ok := msgMap["reasoning_content"].(string); ok && strings.TrimSpace(rc) != "" {
msg.ReasoningContent = rc
}
if toolCallsRaw, ok := msgMap["tool_calls"]; ok && toolCallsRaw != nil {
if toolCallsArray, ok := toolCallsRaw.([]interface{}); ok {
for _, tcRaw := range toolCallsArray {
tcMap, ok := tcRaw.(map[string]interface{})
if !ok {
continue
}
toolCall := ToolCall{}
if id, ok := tcMap["id"].(string); ok {
toolCall.ID = id
}
if toolType, ok := tcMap["type"].(string); ok {
toolCall.Type = toolType
}
if funcMap, ok := tcMap["function"].(map[string]interface{}); ok {
toolCall.Function = FunctionCall{}
if name, ok := funcMap["name"].(string); ok {
toolCall.Function.Name = name
}
if argsRaw, ok := funcMap["arguments"]; ok {
if argsStr, ok := argsRaw.(string); ok {
var argsMap map[string]interface{}
if err := json.Unmarshal([]byte(argsStr), &argsMap); err == nil {
toolCall.Function.Arguments = argsMap
}
} else if argsMap, ok := argsRaw.(map[string]interface{}); ok {
toolCall.Function.Arguments = argsMap
}
}
}
if toolCall.ID != "" {
msg.ToolCalls = append(msg.ToolCalls, toolCall)
}
}
}
}
if toolCallID, ok := msgMap["tool_call_id"].(string); ok {
msg.ToolCallID = toolCallID
}
if tn, ok := msgMap["tool_name"].(string); ok && strings.TrimSpace(tn) != "" {
msg.ToolName = strings.TrimSpace(tn)
} else if tn, ok := msgMap["name"].(string); ok && strings.TrimSpace(tn) != "" && strings.EqualFold(msg.Role, "tool") {
msg.ToolName = strings.TrimSpace(tn)
}
out = append(out, msg)
}
return out, nil
}
// ExtractLastUserTurnMessages 仅保留最后一次 user 提问起的消息(不含更早的用户轮次;跳过 system)。
// 与「继续对话」续跑所用轨迹范围一致:当前任务轮次,而非整段多轮对话历史。
func ExtractLastUserTurnMessages(msgs []ChatMessage) []ChatMessage {
if len(msgs) == 0 {
return msgs
}
lastUser := -1
for i, m := range msgs {
if strings.EqualFold(m.Role, "user") {
lastUser = i
}
}
if lastUser < 0 {
return msgs
}
trimmed := msgs[lastUser:]
out := make([]ChatMessage, 0, len(trimmed))
for _, m := range trimmed {
if strings.EqualFold(m.Role, "system") {
continue
}
out = append(out, m)
}
return out
}
// ExtractLastUserTurnTraceJSON 在 JSON 轨迹上裁剪为最后一次 user 起的片段(供落库格式直接处理)。
func ExtractLastUserTurnTraceJSON(traceInputJSON string) string {
traceInputJSON = strings.TrimSpace(traceInputJSON)
if traceInputJSON == "" {
return traceInputJSON
}
var arr []map[string]interface{}
if err := json.Unmarshal([]byte(traceInputJSON), &arr); err != nil {
return traceInputJSON
}
lastUser := -1
for i, m := range arr {
if r, _ := m["role"].(string); strings.EqualFold(r, "user") {
lastUser = i
}
}
if lastUser <= 0 {
return traceInputJSON
}
trimmed := arr[lastUser:]
b, err := json.Marshal(trimmed)
if err != nil {
return traceInputJSON
}
return string(b)
}
// MergeAssistantTraceOutput 将 last_react_output 合并进轨迹最后一条 assistant(与 loadHistoryFromAgentTrace 一致)。
func MergeAssistantTraceOutput(msgs []ChatMessage, assistantOut string) []ChatMessage {
assistantOut = strings.TrimSpace(assistantOut)
if assistantOut == "" || len(msgs) == 0 {
return msgs
}
out := append([]ChatMessage(nil), msgs...)
last := &out[len(out)-1]
if strings.EqualFold(last.Role, "assistant") && len(last.ToolCalls) == 0 {
last.Content = assistantOut
return out
}
out = append(out, ChatMessage{
Role: "assistant",
Content: assistantOut,
})
return out
}
// MessagesToTraceJSON 将消息带序列化为 JSON(跳过 system)。
func MessagesToTraceJSON(msgs []ChatMessage) (string, error) {
filtered := make([]ChatMessage, 0, len(msgs))
for _, m := range msgs {
if strings.EqualFold(m.Role, "system") {
continue
}
filtered = append(filtered, m)
}
b, err := json.Marshal(filtered)
if err != nil {
return "", err
}
return string(b), nil
}
+57
View File
@@ -0,0 +1,57 @@
package agent
import (
"encoding/json"
"testing"
)
func TestExtractLastUserTurnTraceJSON(t *testing.T) {
raw := []map[string]interface{}{
{"role": "user", "content": "old question"},
{"role": "assistant", "content": "old answer"},
{"role": "user", "content": "new target 1.1.1.1"},
{"role": "assistant", "tool_calls": []interface{}{map[string]interface{}{
"id": "c1", "type": "function",
"function": map[string]interface{}{"name": "nmap", "arguments": "{}"},
}}},
{"role": "tool", "tool_call_id": "c1", "content": "open ports"},
}
b, _ := json.Marshal(raw)
out := ExtractLastUserTurnTraceJSON(string(b))
var trimmed []map[string]interface{}
if err := json.Unmarshal([]byte(out), &trimmed); err != nil {
t.Fatal(err)
}
if len(trimmed) != 3 {
t.Fatalf("expected 3 messages, got %d", len(trimmed))
}
if trimmed[0]["content"] != "new target 1.1.1.1" {
t.Fatalf("unexpected first message: %v", trimmed[0])
}
}
func TestExtractLastUserTurnMessagesSkipsSystem(t *testing.T) {
msgs := []ChatMessage{
{Role: "system", Content: "sys"},
{Role: "user", Content: "q"},
{Role: "assistant", Content: "a"},
}
out := ExtractLastUserTurnMessages(msgs)
if len(out) != 2 {
t.Fatalf("expected 2, got %d", len(out))
}
if out[0].Role != "user" {
t.Fatal("expected user first")
}
}
func TestMergeAssistantTraceOutput(t *testing.T) {
msgs := []ChatMessage{
{Role: "user", Content: "q"},
{Role: "assistant", Content: "draft"},
}
out := MergeAssistantTraceOutput(msgs, "final summary")
if out[len(out)-1].Content != "final summary" {
t.Fatalf("expected merged output, got %q", out[len(out)-1].Content)
}
}
+26 -13
View File
@@ -177,6 +177,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
var einoMainRound int
var einoLastAgent string
subAgentToolStep := make(map[string]int)
// mainAgentToolStep:主代理每次工具调用批次递增,供 UI 显示「第 N 轮」(单代理无子代理切换时原先会一直停在第 1 轮)。
mainAgentToolStep := make(map[string]int)
pendingByID := make(map[string]toolCallPendingInfo)
pendingQueueByAgent := make(map[string][]string)
markPending := func(tc toolCallPendingInfo) {
@@ -529,8 +531,10 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
}
}
if streamsMainAssistant(ev.AgentName) {
mainIterKey := einoMainIterationKey(iterEinoAgent, orchestratorName)
if einoMainRound == 0 {
einoMainRound = 1
mainAgentToolStep[mainIterKey] = 1
progress("iteration", "", map[string]interface{}{
"iteration": 1,
"einoScope": "main",
@@ -540,17 +544,26 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
"conversationId": conversationID,
"source": "eino",
})
} else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) {
einoMainRound++
progress("iteration", "", map[string]interface{}{
"iteration": einoMainRound,
"einoScope": "main",
"einoRole": "orchestrator",
"einoAgent": iterEinoAgent,
"orchestration": orchMode,
"conversationId": conversationID,
"source": "eino",
})
} else if einoLastAgent != "" {
needBump := false
if !streamsMainAssistant(einoLastAgent) {
needBump = true // 子代理 → 主代理
} else if einoLastAgent != ev.AgentName {
needBump = true // plan_executeplanner ↔ executor 等主代理切换
}
if needBump {
einoMainRound++
mainAgentToolStep[mainIterKey] = einoMainRound
progress("iteration", "", map[string]interface{}{
"iteration": einoMainRound,
"einoScope": "main",
"einoRole": "orchestrator",
"einoAgent": iterEinoAgent,
"orchestration": orchMode,
"conversationId": conversationID,
"source": "eino",
})
}
}
}
einoLastAgent = ev.AgentName
@@ -791,7 +804,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 {
lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged})
}
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
// 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。
if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 {
runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls))
@@ -820,7 +833,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs
continue
}
runAccumulatedMsgs = append(runAccumulatedMsgs, msg)
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending)
tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending)
if mv.Role == schema.Assistant {
if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" {
+32 -5
View File
@@ -737,12 +737,23 @@ func toolCallsRichSignature(msg *schema.Message) string {
return base + "|" + strings.Join(parts, ";")
}
func einoMainIterationKey(agentName, orchestratorName string) string {
key := strings.TrimSpace(agentName)
if key == "" {
key = strings.TrimSpace(orchestratorName)
}
if key == "" {
return "_main"
}
return key
}
func tryEmitToolCallsOnce(
msg *schema.Message,
agentName, orchestratorName, conversationID string,
agentName, orchestratorName, conversationID, orchMode string,
progress func(string, string, interface{}),
seen map[string]struct{},
subAgentToolStep map[string]int,
subAgentToolStep, mainAgentToolStep map[string]int,
markPending func(toolCallPendingInfo),
) {
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil || seen == nil {
@@ -756,14 +767,14 @@ func tryEmitToolCallsOnce(
return
}
seen[sig] = struct{}{}
emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, progress, subAgentToolStep, markPending)
emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, orchMode, progress, subAgentToolStep, mainAgentToolStep, markPending)
}
func emitToolCallsFromMessage(
msg *schema.Message,
agentName, orchestratorName, conversationID string,
agentName, orchestratorName, conversationID, orchMode string,
progress func(string, string, interface{}),
subAgentToolStep map[string]int,
subAgentToolStep, mainAgentToolStep map[string]int,
markPending func(toolCallPendingInfo),
) {
if msg == nil || len(msg.ToolCalls) == 0 || progress == nil {
@@ -784,6 +795,22 @@ func emitToolCallsFromMessage(
"conversationId": conversationID,
"source": "eino",
})
} else if mainAgentToolStep != nil {
key := einoMainIterationKey(agentName, orchestratorName)
mainAgentToolStep[key]++
n := mainAgentToolStep[key]
// 第 1 轮已在主代理进入时发出;此后每次工具批次对应新一轮 ReAct(与子代理按工具计步一致)。
if n > 1 {
progress("iteration", "", map[string]interface{}{
"iteration": n,
"einoScope": "main",
"einoRole": "orchestrator",
"einoAgent": agentName,
"orchestration": orchMode,
"conversationId": conversationID,
"source": "eino",
})
}
}
role := "orchestrator"
if isSubToolRound {