From 1553e896c587ea47ad0c12b39e27080c0a9dda27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Thu, 28 May 2026 12:58:27 +0800 Subject: [PATCH] Add files via upload --- internal/multiagent/eino_adk_run_loop.go | 47 ++++-- internal/multiagent/eino_orchestration.go | 2 + .../multiagent/plan_execute_lenient_plan.go | 157 ++++++++++++++++++ 3 files changed, 193 insertions(+), 13 deletions(-) create mode 100644 internal/multiagent/plan_execute_lenient_plan.go diff --git a/internal/multiagent/eino_adk_run_loop.go b/internal/multiagent/eino_adk_run_loop.go index b91d3b8f..4f34c904 100644 --- a/internal/multiagent/eino_adk_run_loop.go +++ b/internal/multiagent/eino_adk_run_loop.go @@ -184,14 +184,19 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs mainAgentToolStep := make(map[string]int) pendingByID := make(map[string]toolCallPendingInfo) pendingQueueByAgent := make(map[string][]string) + var pendingMu sync.Mutex markPending := func(tc toolCallPendingInfo) { if tc.ToolCallID == "" { return } + pendingMu.Lock() + defer pendingMu.Unlock() pendingByID[tc.ToolCallID] = tc pendingQueueByAgent[tc.EinoAgent] = append(pendingQueueByAgent[tc.EinoAgent], tc.ToolCallID) } popNextPendingForAgent := func(agentName string) (toolCallPendingInfo, bool) { + pendingMu.Lock() + defer pendingMu.Unlock() q := pendingQueueByAgent[agentName] for len(q) > 0 { id := q[0] @@ -208,19 +213,42 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs if toolCallID == "" { return } + pendingMu.Lock() + defer pendingMu.Unlock() delete(pendingByID, toolCallID) } + popAnyPending := func() (toolCallPendingInfo, bool) { + pendingMu.Lock() + defer pendingMu.Unlock() + for id, tc := range pendingByID { + delete(pendingByID, id) + return tc, true + } + return toolCallPendingInfo{}, false + } + pendingCount := func() int { + pendingMu.Lock() + defer pendingMu.Unlock() + return len(pendingByID) + } flushAllPendingAsFailed := func(err error) { + pendingMu.Lock() + pendingSnapshot := make([]toolCallPendingInfo, 0, len(pendingByID)) + for _, tc := range pendingByID { + pendingSnapshot = append(pendingSnapshot, tc) + } + pendingByID = make(map[string]toolCallPendingInfo) + pendingQueueByAgent = make(map[string][]string) + pendingMu.Unlock() + if progress == nil { - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) return } msg := "" if err != nil { msg = err.Error() } - for _, tc := range pendingByID { + for _, tc := range pendingSnapshot { toolName := tc.ToolName if strings.TrimSpace(toolName) == "" { toolName = "unknown" @@ -238,8 +266,6 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs "source": "eino", }) } - pendingByID = make(map[string]toolCallPendingInfo) - pendingQueueByAgent = make(map[string][]string) } // 最近一次成功的 Eino filesystem execute 的标准输出(trim):用于抑制模型紧接着复述同一字符串时的重复「助手输出」时间线。 @@ -519,8 +545,7 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs } return takePartial(ctxErr) } - if len(pendingByID) > 0 { - orphanCount := len(pendingByID) + if orphanCount := pendingCount(); orphanCount > 0 { flushAllPendingAsFailed(errors.New("pending tool call missing result before run completion")) if progress != nil { progress("eino_pending_orphaned", "pending tool calls were force-closed at run end", map[string]interface{}{ @@ -957,12 +982,8 @@ func runEinoADKAgentLoop(ctx context.Context, args *einoADKRunLoopArgs, baseMsgs toolCallID = inferred.ToolCallID } else if inferred, ok := popNextPendingForAgent(""); ok { toolCallID = inferred.ToolCallID - } else { - for id := range pendingByID { - toolCallID = id - delete(pendingByID, id) - break - } + } else if inferred, ok := popAnyPending(); ok { + toolCallID = inferred.ToolCallID } } if toolCallID != "" { diff --git a/internal/multiagent/eino_orchestration.go b/internal/multiagent/eino_orchestration.go index 40df6c03..6b2dc3d0 100644 --- a/internal/multiagent/eino_orchestration.go +++ b/internal/multiagent/eino_orchestration.go @@ -59,6 +59,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma } plannerCfg := &planexecute.PlannerConfig{ ToolCallingChatModel: tcm, + NewPlan: newLenientPlan, } if fn := planExecutePlannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers); fn != nil { plannerCfg.GenInputFn = fn @@ -70,6 +71,7 @@ func NewPlanExecuteRoot(ctx context.Context, a *PlanExecuteRootArgs) (adk.Resuma replanner, err := planexecute.NewReplanner(ctx, &planexecute.ReplannerConfig{ ChatModel: tcm, GenInputFn: planExecuteReplannerGenInput(a.OrchInstruction, a.AppCfg, a.MwCfg, a.Logger, a.ModelName, a.ConversationID, a.PlannerReplannerRewriteHandlers), + NewPlan: newLenientPlan, }) if err != nil { return nil, fmt.Errorf("plan_execute replanner: %w", err) diff --git a/internal/multiagent/plan_execute_lenient_plan.go b/internal/multiagent/plan_execute_lenient_plan.go new file mode 100644 index 00000000..ffdb12e6 --- /dev/null +++ b/internal/multiagent/plan_execute_lenient_plan.go @@ -0,0 +1,157 @@ +package multiagent + +import ( + "context" + "encoding/json" + "strings" + + "github.com/cloudwego/eino/adk/prebuilt/planexecute" +) + +// lenientPlan keeps plan_execute running even when model tool arguments contain minor JSON defects. +// It first tries strict JSON, then falls back to lightweight step extraction heuristics. +type lenientPlan struct { + Steps []string `json:"steps"` +} + +func newLenientPlan(context.Context) planexecute.Plan { + return &lenientPlan{} +} + +func (p *lenientPlan) FirstStep() string { + if p == nil || len(p.Steps) == 0 { + return "" + } + return p.Steps[0] +} + +func (p *lenientPlan) MarshalJSON() ([]byte, error) { + type alias lenientPlan + return json.Marshal((*alias)(p)) +} + +func (p *lenientPlan) UnmarshalJSON(b []byte) error { + type alias lenientPlan + var strict alias + if err := json.Unmarshal(b, &strict); err == nil { + strict.Steps = normalizePlanSteps(strict.Steps) + if len(strict.Steps) > 0 { + *p = lenientPlan(strict) + return nil + } + } + + steps := extractPlanStepsLenient(string(b)) + if len(steps) == 0 { + steps = []string{"继续按当前目标执行下一步,并输出可验证证据。"} + } + p.Steps = steps + return nil +} + +func extractPlanStepsLenient(raw string) []string { + s := strings.TrimSpace(stripCodeFence(raw)) + if s == "" { + return nil + } + + if extracted, ok := sliceByStepsArray(s); ok { + var arr []string + if err := json.Unmarshal([]byte(extracted), &arr); err == nil { + arr = normalizePlanSteps(arr) + if len(arr) > 0 { + return arr + } + } + if arr := splitStepsHeuristically(strings.Trim(extracted, "[]")); len(arr) > 0 { + return arr + } + } + + // Last-resort: treat plaintext body as one actionable step. + s = strings.TrimSpace(s) + if s == "" { + return nil + } + return []string{s} +} + +func sliceByStepsArray(s string) (string, bool) { + lower := strings.ToLower(s) + key := `"steps"` + i := strings.Index(lower, key) + if i < 0 { + return "", false + } + start := strings.Index(s[i:], "[") + if start < 0 { + return "", false + } + start += i + depth := 0 + for j := start; j < len(s); j++ { + switch s[j] { + case '[': + depth++ + case ']': + depth-- + if depth == 0 { + return s[start : j+1], true + } + } + } + return "", false +} + +func splitStepsHeuristically(body string) []string { + body = strings.ReplaceAll(body, "\r\n", "\n") + body = strings.ReplaceAll(body, "\\n", "\n") + var parts []string + if strings.Contains(body, "\n") { + for _, line := range strings.Split(body, "\n") { + parts = append(parts, line) + } + } else { + for _, seg := range strings.Split(body, ",") { + parts = append(parts, seg) + } + } + + out := make([]string, 0, len(parts)) + for _, part := range parts { + t := strings.TrimSpace(part) + t = strings.Trim(t, "\"'`") + t = strings.TrimLeft(t, "-*0123456789.、 \t") + t = strings.TrimSpace(strings.ReplaceAll(t, `\"`, `"`)) + if t == "" { + continue + } + out = append(out, t) + } + return normalizePlanSteps(out) +} + +func normalizePlanSteps(in []string) []string { + out := make([]string, 0, len(in)) + for _, step := range in { + t := strings.TrimSpace(step) + if t == "" { + continue + } + out = append(out, t) + } + return out +} + +func stripCodeFence(s string) string { + s = strings.TrimSpace(s) + if !strings.HasPrefix(s, "```") { + return s + } + s = strings.TrimPrefix(s, "```json") + s = strings.TrimPrefix(s, "```JSON") + s = strings.TrimPrefix(s, "```") + s = strings.TrimSuffix(strings.TrimSpace(s), "```") + return strings.TrimSpace(s) +} +