From 92204adbaaa47b41c648f54d7c7593a4090cbe60 Mon Sep 17 00:00:00 2001 From: ezl-keygraph Date: Tue, 17 Mar 2026 03:20:33 +0530 Subject: [PATCH] fix: resolve SessionMutex race condition with 3+ concurrent waiters --- apps/worker/src/utils/concurrency.ts | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/apps/worker/src/utils/concurrency.ts b/apps/worker/src/utils/concurrency.ts index c3f308e..4c3c6ce 100644 --- a/apps/worker/src/utils/concurrency.ts +++ b/apps/worker/src/utils/concurrency.ts @@ -31,27 +31,30 @@ type UnlockFunction = () => void; * } * ``` */ -// Promise-based mutex with queue semantics - safe for parallel agents on same session +// Promise-based mutex with chained queue semantics - safe for parallel agents on same session export class SessionMutex { - // Map of sessionId -> Promise (represents active lock) + // Map of sessionId -> Promise (tail of the FIFO queue) private locks: Map> = new Map(); - // Wait for existing lock, then acquire. Queue ensures FIFO ordering. + // Chain onto the queue tail, then wait for predecessor to release. Guarantees FIFO ordering. async lock(sessionId: string): Promise { - if (this.locks.has(sessionId)) { - // Wait for existing lock to be released - await this.locks.get(sessionId); - } + // 1. Capture the current tail of the queue + const prev = this.locks.get(sessionId) ?? Promise.resolve(); - // Create new lock promise + // 2. Create our lock and immediately become the new tail let resolve: () => void; const promise = new Promise((r) => (resolve = r)); this.locks.set(sessionId, promise); - // Return unlock function + // 3. Wait for predecessor to release + await prev; + + // 4. Return unlock that releases the next waiter in the chain return () => { - this.locks.delete(sessionId); - resolve?.(); + if (this.locks.get(sessionId) === promise) { + this.locks.delete(sessionId); + } + resolve(); }; } }