feat: use differnt processes to handle the queue

This commit is contained in:
tdurieux
2021-09-12 00:05:05 +02:00
parent 07750f7a64
commit bafa0b325b
3 changed files with 62 additions and 43 deletions

View File

@@ -0,0 +1,31 @@
import AnonymousError from "../AnonymousError";
import { connect, getRepository } from "../database/database";
export default async function process(job) {
console.log(`${job.data.repoId} is going to be downloaded`);
try {
await connect();
const repo = await getRepository(job.data.repoId);
job.progress("get_repo");
await repo.resetSate();
job.progress("resetSate");
try {
await repo.anonymize();
} catch (error) {
await repo.updateStatus("error", error.message);
throw error;
}
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
} finally {
console.log(`${job.data.repoId} is downloaded`);
}
}

View File

@@ -0,0 +1,28 @@
import AnonymousError from "../AnonymousError";
import { connect, getRepository } from "../database/database";
export default async function process(job) {
try {
await connect();
console.log(`${job.data.repoId} is going to be removed`);
const repo = await getRepository(job.data.repoId);
try {
await repo.remove();
} catch (error) {
await repo.updateStatus("error", error.message);
throw error;
}
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
} finally {
console.log(`${job.data.repoId} is removed`);
}
}

View File

@@ -1,8 +1,7 @@
import * as Queue from "bull";
import config from "../config";
import AnonymousError from "./AnonymousError";
import { getRepository } from "./database/database";
import Repository from "./Repository";
import * as path from "path";
export const removeQueue = new Queue<Repository>("repository removal", {
redis: {
@@ -23,45 +22,6 @@ downloadQueue.on("completed", async (job) => {
await job.remove();
});
removeQueue.process(5, async (job) => {
console.log(`${job.data.repoId} is going to be removed`);
try {
const repo = await getRepository(job.data.repoId);
await repo.remove();
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
} finally {
console.log(`${job.data.repoId} is removed`);
}
});
removeQueue.process(5, path.resolve("src/processes/removeRepository.ts"));
downloadQueue.process(2, async (job) => {
console.log(`${job.data.repoId} is going to be downloaded`);
try {
const repo = await getRepository(job.data.repoId);
job.progress("get_repo");
await repo.resetSate();
job.progress("resetSate");
await repo.anonymize();
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
} finally {
console.log(`${job.data.repoId} is downloaded`);
}
});
downloadQueue.process(2, path.resolve("src/processes/downloadRepository.ts"));