From 73a39ef8689b9aab24868ea44a661a7937b112c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Tue, 19 May 2026 16:25:47 +0800 Subject: [PATCH] Add files via upload --- internal/agent/agent_trace.go | 167 +++++++++++++++++++++++ internal/agent/agent_trace_test.go | 57 ++++++++ internal/multiagent/eino_adk_run_loop.go | 39 ++++-- internal/multiagent/runner.go | 37 ++++- 4 files changed, 282 insertions(+), 18 deletions(-) create mode 100644 internal/agent/agent_trace.go create mode 100644 internal/agent/agent_trace_test.go diff --git a/internal/agent/agent_trace.go b/internal/agent/agent_trace.go new file mode 100644 index 00000000..9628ce2c --- /dev/null +++ b/internal/agent/agent_trace.go @@ -0,0 +1,167 @@ +package agent + +import ( + "encoding/json" + "strings" +) + +// ParseTraceMessages 解析落库的 last_react_input(OpenAI 风格 messages JSON 数组)。 +func ParseTraceMessages(traceInputJSON string) ([]ChatMessage, error) { + traceInputJSON = strings.TrimSpace(traceInputJSON) + if traceInputJSON == "" { + return nil, nil + } + var raw []map[string]interface{} + if err := json.Unmarshal([]byte(traceInputJSON), &raw); err != nil { + return nil, err + } + out := make([]ChatMessage, 0, len(raw)) + for _, msgMap := range raw { + msg := ChatMessage{} + role, _ := msgMap["role"].(string) + if role == "" { + continue + } + msg.Role = role + if content, ok := msgMap["content"].(string); ok { + msg.Content = content + } + if rc, ok := msgMap["reasoning_content"].(string); ok && strings.TrimSpace(rc) != "" { + msg.ReasoningContent = rc + } + if toolCallsRaw, ok := msgMap["tool_calls"]; ok && toolCallsRaw != nil { + if toolCallsArray, ok := toolCallsRaw.([]interface{}); ok { + for _, tcRaw := range toolCallsArray { + tcMap, ok := tcRaw.(map[string]interface{}) + if !ok { + continue + } + toolCall := ToolCall{} + if id, ok := tcMap["id"].(string); ok { + toolCall.ID = id + } + if toolType, ok := tcMap["type"].(string); ok { + toolCall.Type = toolType + } + if funcMap, ok := tcMap["function"].(map[string]interface{}); ok { + toolCall.Function = FunctionCall{} + if name, ok := funcMap["name"].(string); ok { + toolCall.Function.Name = name + } + if argsRaw, ok := funcMap["arguments"]; ok { + if argsStr, ok := argsRaw.(string); ok { + var argsMap map[string]interface{} + if err := json.Unmarshal([]byte(argsStr), &argsMap); err == nil { + toolCall.Function.Arguments = argsMap + } + } else if argsMap, ok := argsRaw.(map[string]interface{}); ok { + toolCall.Function.Arguments = argsMap + } + } + } + if toolCall.ID != "" { + msg.ToolCalls = append(msg.ToolCalls, toolCall) + } + } + } + } + if toolCallID, ok := msgMap["tool_call_id"].(string); ok { + msg.ToolCallID = toolCallID + } + if tn, ok := msgMap["tool_name"].(string); ok && strings.TrimSpace(tn) != "" { + msg.ToolName = strings.TrimSpace(tn) + } else if tn, ok := msgMap["name"].(string); ok && strings.TrimSpace(tn) != "" && strings.EqualFold(msg.Role, "tool") { + msg.ToolName = strings.TrimSpace(tn) + } + out = append(out, msg) + } + return out, nil +} + +// ExtractLastUserTurnMessages 仅保留最后一次 user 提问起的消息(不含更早的用户轮次;跳过 system)。 +// 与「继续对话」续跑所用轨迹范围一致:当前任务轮次,而非整段多轮对话历史。 +func ExtractLastUserTurnMessages(msgs []ChatMessage) []ChatMessage { + if len(msgs) == 0 { + return msgs + } + lastUser := -1 + for i, m := range msgs { + if strings.EqualFold(m.Role, "user") { + lastUser = i + } + } + if lastUser < 0 { + return msgs + } + trimmed := msgs[lastUser:] + out := make([]ChatMessage, 0, len(trimmed)) + for _, m := range trimmed { + if strings.EqualFold(m.Role, "system") { + continue + } + out = append(out, m) + } + return out +} + +// ExtractLastUserTurnTraceJSON 在 JSON 轨迹上裁剪为最后一次 user 起的片段(供落库格式直接处理)。 +func ExtractLastUserTurnTraceJSON(traceInputJSON string) string { + traceInputJSON = strings.TrimSpace(traceInputJSON) + if traceInputJSON == "" { + return traceInputJSON + } + var arr []map[string]interface{} + if err := json.Unmarshal([]byte(traceInputJSON), &arr); err != nil { + return traceInputJSON + } + lastUser := -1 + for i, m := range arr { + if r, _ := m["role"].(string); strings.EqualFold(r, "user") { + lastUser = i + } + } + if lastUser <= 0 { + return traceInputJSON + } + trimmed := arr[lastUser:] + b, err := json.Marshal(trimmed) + if err != nil { + return traceInputJSON + } + return string(b) +} + +// MergeAssistantTraceOutput 将 last_react_output 合并进轨迹最后一条 assistant(与 loadHistoryFromAgentTrace 一致)。 +func MergeAssistantTraceOutput(msgs []ChatMessage, assistantOut string) []ChatMessage { + assistantOut = strings.TrimSpace(assistantOut) + if assistantOut == "" || len(msgs) == 0 { + return msgs + } + out := append([]ChatMessage(nil), msgs...) + last := &out[len(out)-1] + if strings.EqualFold(last.Role, "assistant") && len(last.ToolCalls) == 0 { + last.Content = assistantOut + return out + } + out = append(out, ChatMessage{ + Role: "assistant", + Content: assistantOut, + }) + return out +} + +// MessagesToTraceJSON 将消息带序列化为 JSON(跳过 system)。 +func MessagesToTraceJSON(msgs []ChatMessage) (string, error) { + filtered := make([]ChatMessage, 0, len(msgs)) + for _, m := range msgs { + if strings.EqualFold(m.Role, "system") { + continue + } + filtered = append(filtered, m) + } + b, err := json.Marshal(filtered) + if err != nil { + return "", err + } + return string(b), nil +} diff --git a/internal/agent/agent_trace_test.go b/internal/agent/agent_trace_test.go new file mode 100644 index 00000000..c248255f --- /dev/null +++ b/internal/agent/agent_trace_test.go @@ -0,0 +1,57 @@ +package agent + +import ( + "encoding/json" + "testing" +) + +func TestExtractLastUserTurnTraceJSON(t *testing.T) { + raw := []map[string]interface{}{ + {"role": "user", "content": "old question"}, + {"role": "assistant", "content": "old answer"}, + {"role": "user", "content": "new target 1.1.1.1"}, + {"role": "assistant", "tool_calls": []interface{}{map[string]interface{}{ + "id": "c1", "type": "function", + "function": map[string]interface{}{"name": "nmap", "arguments": "{}"}, + }}}, + {"role": "tool", "tool_call_id": "c1", "content": "open ports"}, + } + b, _ := json.Marshal(raw) + out := ExtractLastUserTurnTraceJSON(string(b)) + var trimmed []map[string]interface{} + if err := json.Unmarshal([]byte(out), &trimmed); err != nil { + t.Fatal(err) + } + if len(trimmed) != 3 { + t.Fatalf("expected 3 messages, got %d", len(trimmed)) + } + if trimmed[0]["content"] != "new target 1.1.1.1" { + t.Fatalf("unexpected first message: %v", trimmed[0]) + } +} + +func TestExtractLastUserTurnMessagesSkipsSystem(t *testing.T) { + msgs := []ChatMessage{ + {Role: "system", Content: "sys"}, + {Role: "user", Content: "q"}, + {Role: "assistant", Content: "a"}, + } + out := ExtractLastUserTurnMessages(msgs) + if len(out) != 2 { + t.Fatalf("expected 2, got %d", len(out)) + } + if out[0].Role != "user" { + t.Fatal("expected user first") + } +} + +func TestMergeAssistantTraceOutput(t *testing.T) { + msgs := []ChatMessage{ + {Role: "user", Content: "q"}, + {Role: "assistant", Content: "draft"}, + } + out := MergeAssistantTraceOutput(msgs, "final summary") + if out[len(out)-1].Content != "final summary" { + t.Fatalf("expected merged output, got %q", out[len(out)-1].Content) + } +} diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index 186b346d..f91f993e 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -177,6 +177,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs var einoMainRound int var einoLastAgent string subAgentToolStep := make(map[string]int) + // mainAgentToolStep:主代理每次工具调用批次递增,供 UI 显示「第 N 轮」(单代理无子代理切换时原先会一直停在第 1 轮)。 + mainAgentToolStep := make(map[string]int) pendingByID := make(map[string]toolCallPendingInfo) pendingQueueByAgent := make(map[string][]string) markPending := func(tc toolCallPendingInfo) { @@ -529,8 +531,10 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } } if streamsMainAssistant(ev.AgentName) { + mainIterKey := einoMainIterationKey(iterEinoAgent, orchestratorName) if einoMainRound == 0 { einoMainRound = 1 + mainAgentToolStep[mainIterKey] = 1 progress("iteration", "", map[string]interface{}{ "iteration": 1, "einoScope": "main", @@ -540,17 +544,26 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "conversationId": conversationID, "source": "eino", }) - } else if einoLastAgent != "" && !streamsMainAssistant(einoLastAgent) { - einoMainRound++ - progress("iteration", "", map[string]interface{}{ - "iteration": einoMainRound, - "einoScope": "main", - "einoRole": "orchestrator", - "einoAgent": iterEinoAgent, - "orchestration": orchMode, - "conversationId": conversationID, - "source": "eino", - }) + } else if einoLastAgent != "" { + needBump := false + if !streamsMainAssistant(einoLastAgent) { + needBump = true // 子代理 → 主代理 + } else if einoLastAgent != ev.AgentName { + needBump = true // plan_execute:planner ↔ executor 等主代理切换 + } + if needBump { + einoMainRound++ + mainAgentToolStep[mainIterKey] = einoMainRound + progress("iteration", "", map[string]interface{}{ + "iteration": einoMainRound, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": iterEinoAgent, + "orchestration": orchMode, + "conversationId": conversationID, + "source": "eino", + }) + } } } einoLastAgent = ev.AgentName @@ -791,7 +804,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if merged := mergeStreamingToolCallFragments(toolStreamFragments); len(merged) > 0 { lastToolChunk = mergeMessageToolCalls(&schema.Message{ToolCalls: merged}) } - tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + tryEmitToolCallsOnce(lastToolChunk, ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending) // 流式路径此前只把 tool_calls 推给进度 UI,未写入 runAccumulatedMsgs;落库后 loadHistory→RepairOrphan 会删掉全部 tool 结果,表现为「续跑/下轮失忆」。 if lastToolChunk != nil && len(lastToolChunk.ToolCalls) > 0 { runAccumulatedMsgs = append(runAccumulatedMsgs, schema.AssistantMessage("", lastToolChunk.ToolCalls)) @@ -820,7 +833,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs continue } runAccumulatedMsgs = append(runAccumulatedMsgs, msg) - tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, progress, toolEmitSeen, subAgentToolStep, markPending) + tryEmitToolCallsOnce(mergeMessageToolCalls(msg), ev.AgentName, orchestratorName, conversationID, orchMode, progress, toolEmitSeen, subAgentToolStep, mainAgentToolStep, markPending) if mv.Role == schema.Assistant { if progress != nil && strings.TrimSpace(msg.ReasoningContent) != "" { diff --git a/internal/multiagent/runner.go b/internal/multiagent/runner.go index f9478262..50ede619 100644 --- a/internal/multiagent/runner.go +++ b/internal/multiagent/runner.go @@ -737,12 +737,23 @@ func toolCallsRichSignature(msg *schema.Message) string { return base + "|" + strings.Join(parts, ";") } +func einoMainIterationKey(agentName, orchestratorName string) string { + key := strings.TrimSpace(agentName) + if key == "" { + key = strings.TrimSpace(orchestratorName) + } + if key == "" { + return "_main" + } + return key +} + func tryEmitToolCallsOnce( msg *schema.Message, - agentName, orchestratorName, conversationID string, + agentName, orchestratorName, conversationID, orchMode string, progress func(string, string, interface{}), seen map[string]struct{}, - subAgentToolStep map[string]int, + subAgentToolStep, mainAgentToolStep map[string]int, markPending func(toolCallPendingInfo), ) { if msg == nil || len(msg.ToolCalls) == 0 || progress == nil || seen == nil { @@ -756,14 +767,14 @@ func tryEmitToolCallsOnce( return } seen[sig] = struct{}{} - emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, progress, subAgentToolStep, markPending) + emitToolCallsFromMessage(msg, agentName, orchestratorName, conversationID, orchMode, progress, subAgentToolStep, mainAgentToolStep, markPending) } func emitToolCallsFromMessage( msg *schema.Message, - agentName, orchestratorName, conversationID string, + agentName, orchestratorName, conversationID, orchMode string, progress func(string, string, interface{}), - subAgentToolStep map[string]int, + subAgentToolStep, mainAgentToolStep map[string]int, markPending func(toolCallPendingInfo), ) { if msg == nil || len(msg.ToolCalls) == 0 || progress == nil { @@ -784,6 +795,22 @@ func emitToolCallsFromMessage( "conversationId": conversationID, "source": "eino", }) + } else if mainAgentToolStep != nil { + key := einoMainIterationKey(agentName, orchestratorName) + mainAgentToolStep[key]++ + n := mainAgentToolStep[key] + // 第 1 轮已在主代理进入时发出;此后每次工具批次对应新一轮 ReAct(与子代理按工具计步一致)。 + if n > 1 { + progress("iteration", "", map[string]interface{}{ + "iteration": n, + "einoScope": "main", + "einoRole": "orchestrator", + "einoAgent": agentName, + "orchestration": orchMode, + "conversationId": conversationID, + "source": "eino", + }) + } } role := "orchestrator" if isSubToolRound {