From 905dd519ed227d31abedefd07b5d99bd0f9be990 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=AC=E6=98=8E?= <83812544+Ed1s0nZ@users.noreply.github.com> Date: Fri, 5 Jun 2026 11:22:35 +0800 Subject: [PATCH] Add files via upload --- internal/handler/agent.go | 7 +++ .../handler/agent_progress_callback_test.go | 48 +++++++++++++++++++ 2 files changed, 55 insertions(+) create mode 100644 internal/handler/agent_progress_callback_test.go diff --git a/internal/handler/agent.go b/internal/handler/agent.go index 46685162..f2719c14 100644 --- a/internal/handler/agent.go +++ b/internal/handler/agent.go @@ -830,6 +830,10 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun seenToolCallSigs := make(map[string]string) // toolCallId -> payload signature seenToolResultSigs := make(map[string]string) // toolCallId -> payload signature + // progressMu 保护闭包内 map 与聚合状态。Eino parallelRunToolCall 会在多 goroutine 中并发回调 + // progress(ToolInvokeNotifyHolder.Fire → createProgressCallback),未加锁的 map 会触发 fatal panic。 + var progressMu sync.Mutex + // response_start + response_delta:前端时间线显示为「📝 规划中」(monitor.js),不落逐条 delta; // 聚合为一条 planning 写入 process_details,刷新后与线上一致。 var respPlan responsePlanAgg @@ -891,6 +895,9 @@ func (h *AgentHandler) createProgressCallback(runCtx context.Context, cancelRun } return func(eventType, message string, data interface{}) { + progressMu.Lock() + defer progressMu.Unlock() + // 上游在重试/补偿时可能重复回调相同 tool_call/tool_result。 // 这里做幂等过滤,保证前端展示和 process_details 都以唯一事件为准。 if (eventType == "tool_call" || eventType == "tool_result") && data != nil { diff --git a/internal/handler/agent_progress_callback_test.go b/internal/handler/agent_progress_callback_test.go new file mode 100644 index 00000000..0b64f47e --- /dev/null +++ b/internal/handler/agent_progress_callback_test.go @@ -0,0 +1,48 @@ +package handler + +import ( + "context" + "fmt" + "sync" + "testing" + + "cyberstrike-ai/internal/config" + + "go.uber.org/zap" +) + +// TestCreateProgressCallback_ConcurrentToolEvents 回归 issue #142:并行 tool 回调不得 concurrent map panic。 +func TestCreateProgressCallback_ConcurrentToolEvents(t *testing.T) { + logger := zap.NewNop() + h := &AgentHandler{ + logger: logger, + config: &config.Config{}, + } + cb := h.createProgressCallback(context.Background(), nil, "conv-race-test", "", nil) + + const workers = 64 + var wg sync.WaitGroup + wg.Add(workers * 2) + for i := 0; i < workers; i++ { + i := i + go func() { + defer wg.Done() + toolCallID := fmt.Sprintf("tc-%d", i) + cb("tool_call", "calling skill", map[string]interface{}{ + "toolCallId": toolCallID, + "toolName": "skill", + "argumentsObj": map[string]interface{}{"skill_name": "demo-skill"}, + }) + }() + go func() { + defer wg.Done() + toolCallID := fmt.Sprintf("tc-%d", i) + cb("tool_result", "skill done", map[string]interface{}{ + "toolCallId": toolCallID, + "toolName": "skill", + "success": true, + }) + }() + } + wg.Wait() +}