multiple fixes

This commit is contained in:
tdurieux
2026-05-05 10:32:31 +03:00
parent 5b72b630c4
commit f8c91ca0af
23 changed files with 1675 additions and 661 deletions
+90 -12
View File
@@ -1,11 +1,76 @@
import { Queue, Worker } from "bullmq";
import config from "../config";
import Repository from "../core/Repository";
import AnonymizedRepositoryModel from "../core/model/anonymizedRepositories/anonymizedRepositories.model";
import { RepositoryStatus } from "../core/types";
import * as path from "path";
export let cacheQueue: Queue<Repository>;
export let removeQueue: Queue<Repository>;
export let downloadQueue: Queue<Repository>;
// Minimal payload for queue jobs. Workers re-fetch the Repository from the
// database via getRepository(repoId), so passing the full Mongoose-backed
// Repository instance through msgpackr is unnecessary — and triggers
// ERR_BUFFER_OUT_OF_BOUNDS on long term lists / large nested fields.
export interface RepoJobData {
repoId: string;
}
const IN_FLIGHT_STATUSES: RepositoryStatus[] = [
RepositoryStatus.PREPARING,
RepositoryStatus.QUEUE,
RepositoryStatus.DOWNLOAD,
];
async function markErrorIfInFlight(repoId: string, message: string) {
try {
await AnonymizedRepositoryModel.updateOne(
{ repoId, status: { $in: IN_FLIGHT_STATUSES } },
{
$set: {
status: RepositoryStatus.ERROR,
statusDate: new Date(),
statusMessage: message || "preparation_failed",
},
}
).exec();
} catch (e) {
console.log("[QUEUE] markErrorIfInFlight error", repoId, e);
}
}
/**
* Recover repositories left in an in-flight status (preparing/queue/download)
* with no live BullMQ job — typically caused by a worker process crash or
* server restart during anonymization. Marks them as ERROR so they don't
* appear stuck forever; the public route can re-queue them on next visit.
*/
export async function recoverStuckPreparing() {
if (!downloadQueue) return;
try {
const stuck = await AnonymizedRepositoryModel.find(
{ status: { $in: IN_FLIGHT_STATUSES } },
{ repoId: 1 }
).lean();
for (const doc of stuck) {
try {
const job = await downloadQueue.getJob(doc.repoId);
if (job) {
const state = await job.getState();
if (state === "active" || state === "waiting" || state === "delayed") {
continue;
}
}
await markErrorIfInFlight(doc.repoId, "preparation_interrupted");
console.log("[QUEUE] recovered stuck repo", doc.repoId);
} catch (e) {
console.log("[QUEUE] recover error for", doc.repoId, e);
}
}
} catch (e) {
console.log("[QUEUE] recoverStuckPreparing failed", e);
}
}
export let cacheQueue: Queue<RepoJobData>;
export let removeQueue: Queue<RepoJobData>;
export let downloadQueue: Queue<RepoJobData>;
// avoid to load the queue outside the main server
export function startWorker() {
@@ -14,28 +79,31 @@ export function startWorker() {
port: config.REDIS_PORT,
};
cacheQueue = new Queue<Repository>("cache removal", {
cacheQueue = new Queue<RepoJobData>("cache removal", {
connection,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
removeQueue = new Queue<Repository>("repository removal", {
removeQueue = new Queue<RepoJobData>("repository removal", {
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
downloadQueue = new Queue<Repository>("repository download", {
downloadQueue = new Queue<RepoJobData>("repository download", {
connection,
defaultJobOptions: {
removeOnComplete: true,
removeOnFail: true,
},
});
const cacheWorker = new Worker<Repository>(
const cacheWorker = new Worker<RepoJobData>(
cacheQueue.name,
path.resolve("build/queue/processes/removeCache.js"),
{
@@ -47,7 +115,7 @@ export function startWorker() {
cacheWorker.on("completed", async (job) => {
await job.remove();
});
const removeWorker = new Worker<Repository>(
const removeWorker = new Worker<RepoJobData>(
removeQueue.name,
path.resolve("build/queue/processes/removeRepository.js"),
{
@@ -60,7 +128,7 @@ export function startWorker() {
await job.remove();
});
const downloadWorker = new Worker<Repository>(
const downloadWorker = new Worker<RepoJobData>(
downloadQueue.name,
path.resolve("build/queue/processes/downloadRepository.js"),
{
@@ -77,7 +145,17 @@ export function startWorker() {
downloadWorker.on("completed", async (job) => {
console.log("[QUEUE] download repository completed", job.data.repoId);
});
downloadWorker.on("failed", async (job) => {
console.log("download repository failed", job.data.repoId);
downloadWorker.on("failed", async (job, err) => {
const repoId = job?.data?.repoId;
console.log(
"[QUEUE] download repository failed",
repoId,
err?.message || err
);
if (!repoId) return;
if (job && typeof job.attemptsMade === "number" && job.opts?.attempts) {
if (job.attemptsMade < job.opts.attempts) return;
}
await markErrorIfInFlight(repoId, err?.message || "preparation_failed");
});
}