Fix BullMQ "Custom Id cannot be integers" error by prefixing jobId

This commit is contained in:
tdurieux
2026-05-07 05:53:26 +03:00
parent 8fc7ac5175
commit b8cfe293ea
5 changed files with 18 additions and 10 deletions
+4 -1
View File
@@ -347,6 +347,9 @@ export default class Repository {
if (!newCommit) {
logger.error("branch not found", {
code: "branch_not_found",
httpStatus: 404,
repoId: this.repoId,
branch: branchName,
repo: this.model.source.repositoryName,
});
@@ -397,7 +400,7 @@ export default class Repository {
}
await this.resetSate(RepositoryStatus.PREPARING);
await downloadQueue.add(this.repoId, { repoId: this.repoId }, {
jobId: this.repoId,
jobId: `repo-${this.repoId}`,
attempts: 3,
});
}
+4 -4
View File
@@ -35,8 +35,8 @@ async function markErrorIfInFlight(repoId: string, message: string) {
).exec();
} catch (e) {
logger.error("markErrorIfInFlight failed", {
...serializeError(e),
repoId,
err: serializeError(e),
});
}
}
@@ -56,7 +56,7 @@ export async function recoverStuckPreparing() {
).lean();
for (const doc of stuck) {
try {
const job = await downloadQueue.getJob(doc.repoId);
const job = await downloadQueue.getJob(`repo-${doc.repoId}`);
if (job) {
const state = await job.getState();
if (state === "active" || state === "waiting" || state === "delayed") {
@@ -67,8 +67,8 @@ export async function recoverStuckPreparing() {
logger.info("recovered stuck repo", { repoId: doc.repoId });
} catch (e) {
logger.warn("recover failed", {
...serializeError(e),
repoId: doc.repoId,
err: serializeError(e),
});
}
}
@@ -157,8 +157,8 @@ export function startWorker() {
downloadWorker.on("failed", async (job, err) => {
const repoId = job?.data?.repoId;
logger.error("download failed", {
...serializeError(err),
repoId,
err: serializeError(err),
});
if (!repoId) return;
if (job && typeof job.attemptsMade === "number" && job.opts?.attempts) {
+1 -1
View File
@@ -592,7 +592,7 @@ router.delete(
const repo = await getRepo(req, res, { nocheck: true });
if (!repo) return;
try {
await cacheQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId });
await cacheQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` });
return res.json({ status: repo.status });
} catch (error) {
handleError(error, res, req);
+3 -3
View File
@@ -180,7 +180,7 @@ router.delete(
const user = await getUser(req);
isOwnerOrAdmin([repo.owner.id], user);
await repo.updateStatus(RepositoryStatus.REMOVING);
await removeQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId });
await removeQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` });
return res.json({ status: repo.status });
} catch (error) {
handleError(error, res, req);
@@ -498,7 +498,7 @@ router.post(
).exec();
await repo.updateStatus(RepositoryStatus.PREPARING);
res.json({ status: repo.status });
await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: repo.repoId });
await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, { jobId: `repo-${repo.repoId}` });
} catch (error) {
return handleError(error, res, req);
}
@@ -592,7 +592,7 @@ router.post("/", async (req: express.Request, res: express.Response) => {
res.send({ status: repo.status });
downloadQueue.add(repo.repoId, { repoId: repo.repoId }, {
jobId: repo.repoId,
jobId: `repo-${repo.repoId}`,
attempts: 3,
});
} catch (error) {
+6 -1
View File
@@ -103,9 +103,14 @@ router.get(
httpStatus = upstreamStatus >= 500 ? 502 : upstreamStatus;
}
logger.warn("streamer zip fetch failed", {
code: errCode,
httpStatus,
repoId: repo.repoId,
upstreamStatus,
upstreamBody: upstreamBody?.slice(0, 500),
url: config.STREAMER_ENTRYPOINT
? join(config.STREAMER_ENTRYPOINT, "api/zip")
: undefined,
err: serializeError(err),
});
handleError(
@@ -230,7 +235,7 @@ router.get(
) {
await repo.updateStatus(RepositoryStatus.PREPARING);
await downloadQueue.add(repo.repoId, { repoId: repo.repoId }, {
jobId: repo.repoId,
jobId: `repo-${repo.repoId}`,
attempts: 3,
});
}