From b8cfe293ea19c000d3998c14a79588587f6d8681 Mon Sep 17 00:00:00 2001 From: tdurieux Date: Thu, 7 May 2026 05:53:26 +0300 Subject: [PATCH] Fix BullMQ "Custom Id cannot be integers" error by prefixing jobId --- src/core/Repository.ts | 5 ++++- src/queue/index.ts | 8 ++++---- src/server/routes/admin.ts | 2 +- src/server/routes/repository-private.ts | 6 +++--- src/server/routes/repository-public.ts | 7 ++++++- 5 files changed, 18 insertions(+), 10 deletions(-) diff --git a/src/core/Repository.ts b/src/core/Repository.ts index 5f0fb92..cb16480 100644 --- a/src/core/Repository.ts +++ b/src/core/Repository.ts @@ -347,6 +347,9 @@ export default class Repository { if (!newCommit) { logger.error("branch not found", { + code: "branch_not_found", + httpStatus: 404, + repoId: this.repoId, branch: branchName, repo: this.model.source.repositoryName, }); @@ -397,7 +400,7 @@ export default class Repository { } await this.resetSate(RepositoryStatus.PREPARING); await downloadQueue.add(this.repoId, { repoId: this.repoId }, { - jobId: this.repoId, + jobId: `repo-${this.repoId}`, attempts: 3, }); } diff --git a/src/queue/index.ts b/src/queue/index.ts index bf6bd2a..c095739 100644 --- a/src/queue/index.ts +++ b/src/queue/index.ts @@ -35,8 +35,8 @@ async function markErrorIfInFlight(repoId: string, message: string) { ).exec(); } catch (e) { logger.error("markErrorIfInFlight failed", { + ...serializeError(e), repoId, - err: serializeError(e), }); } } @@ -56,7 +56,7 @@ export async function recoverStuckPreparing() { ).lean(); for (const doc of stuck) { try { - const job = await downloadQueue.getJob(doc.repoId); + const job = await downloadQueue.getJob(`repo-${doc.repoId}`); if (job) { const state = await job.getState(); if (state === "active" || state === "waiting" || state === "delayed") { @@ -67,8 +67,8 @@ export async function recoverStuckPreparing() { logger.info("recovered stuck repo", { repoId: doc.repoId }); } catch (e) { logger.warn("recover failed", { + ...serializeError(e), repoId: doc.repoId, - err: serializeError(e), }); } } @@ -157,8 +157,8 @@ export function startWorker() { downloadWorker.on("failed", async (job, err) => { const repoId = job?.data?.repoId; logger.error("download failed", { + ...serializeError(err), repoId, - err: serializeError(err), }); if (!repoId) return; if (job && typeof job.attemptsMade === "number" && job.opts?.attempts) { diff --git a/src/server/routes/admin.ts b/src/server/routes/admin.ts index 75199ae..3aebf0c 100644 --- a/src/server/routes/admin.ts +++ b/src/server/routes/admin.ts @@ -592,7 +592,7 @@ router.delete( const repo = await getRepo(req, res, { nocheck: true }); if (!repo) return; try { - await cacheQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId }); + await cacheQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` }); return res.json({ status: repo.status }); } catch (error) { handleError(error, res, req); diff --git a/src/server/routes/repository-private.ts b/src/server/routes/repository-private.ts index f03f812..49668e5 100644 --- a/src/server/routes/repository-private.ts +++ b/src/server/routes/repository-private.ts @@ -180,7 +180,7 @@ router.delete( const user = await getUser(req); isOwnerOrAdmin([repo.owner.id], user); await repo.updateStatus(RepositoryStatus.REMOVING); - await removeQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId }); + await removeQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` }); return res.json({ status: repo.status }); } catch (error) { handleError(error, res, req); @@ -498,7 +498,7 @@ router.post( ).exec(); await repo.updateStatus(RepositoryStatus.PREPARING); res.json({ status: repo.status }); - await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId }); + await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` }); } catch (error) { return handleError(error, res, req); } @@ -592,7 +592,7 @@ router.post("/", async (req: express.Request, res: express.Response) => { res.send({ status: repo.status }); downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { - jobId: repo.repoId, + jobId: `repo-${repo.repoId}`, attempts: 3, }); } catch (error) { diff --git a/src/server/routes/repository-public.ts b/src/server/routes/repository-public.ts index 4803658..6498349 100644 --- a/src/server/routes/repository-public.ts +++ b/src/server/routes/repository-public.ts @@ -103,9 +103,14 @@ router.get( httpStatus = upstreamStatus >= 500 ? 502 : upstreamStatus; } logger.warn("streamer zip fetch failed", { + code: errCode, + httpStatus, repoId: repo.repoId, upstreamStatus, upstreamBody: upstreamBody?.slice(0, 500), + url: config.STREAMER_ENTRYPOINT + ? join(config.STREAMER_ENTRYPOINT, "api/zip") + : undefined, err: serializeError(err), }); handleError( @@ -230,7 +235,7 @@ router.get( ) { await repo.updateStatus(RepositoryStatus.PREPARING); await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { - jobId: repo.repoId, + jobId: `repo-${repo.repoId}`, attempts: 3, }); }