From bafa0b325b98da0017d43fa5a022448e42567abd Mon Sep 17 00:00:00 2001 From: tdurieux Date: Sun, 12 Sep 2021 00:05:05 +0200 Subject: [PATCH] feat: use differnt processes to handle the queue --- src/processes/downloadRepository.ts | 31 +++++++++++++++++++ src/processes/removeRepository.ts | 28 ++++++++++++++++++ src/queue.ts | 46 ++--------------------------- 3 files changed, 62 insertions(+), 43 deletions(-) create mode 100644 src/processes/downloadRepository.ts create mode 100644 src/processes/removeRepository.ts diff --git a/src/processes/downloadRepository.ts b/src/processes/downloadRepository.ts new file mode 100644 index 0000000..18dda83 --- /dev/null +++ b/src/processes/downloadRepository.ts @@ -0,0 +1,31 @@ +import AnonymousError from "../AnonymousError"; +import { connect, getRepository } from "../database/database"; + +export default async function process(job) { + console.log(`${job.data.repoId} is going to be downloaded`); + try { + await connect(); + const repo = await getRepository(job.data.repoId); + job.progress("get_repo"); + await repo.resetSate(); + job.progress("resetSate"); + try { + await repo.anonymize(); + } catch (error) { + await repo.updateStatus("error", error.message); + throw error; + } + } catch (error) { + if (error instanceof AnonymousError) { + console.error( + "[ERROR]", + error.toString(), + error.stack.split("\n")[1].trim() + ); + } else { + console.error(error); + } + } finally { + console.log(`${job.data.repoId} is downloaded`); + } +} diff --git a/src/processes/removeRepository.ts b/src/processes/removeRepository.ts new file mode 100644 index 0000000..6cc0fd5 --- /dev/null +++ b/src/processes/removeRepository.ts @@ -0,0 +1,28 @@ +import AnonymousError from "../AnonymousError"; +import { connect, getRepository } from "../database/database"; + +export default async function process(job) { + try { + await connect(); + console.log(`${job.data.repoId} is going to be removed`); + const repo = await getRepository(job.data.repoId); + try { + await repo.remove(); + } catch (error) { + await repo.updateStatus("error", error.message); + throw error; + } + } catch (error) { + if (error instanceof AnonymousError) { + console.error( + "[ERROR]", + error.toString(), + error.stack.split("\n")[1].trim() + ); + } else { + console.error(error); + } + } finally { + console.log(`${job.data.repoId} is removed`); + } +} diff --git a/src/queue.ts b/src/queue.ts index 586a87d..8a8fc99 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,8 +1,7 @@ import * as Queue from "bull"; import config from "../config"; -import AnonymousError from "./AnonymousError"; -import { getRepository } from "./database/database"; import Repository from "./Repository"; +import * as path from "path"; export const removeQueue = new Queue("repository removal", { redis: { @@ -23,45 +22,6 @@ downloadQueue.on("completed", async (job) => { await job.remove(); }); -removeQueue.process(5, async (job) => { - console.log(`${job.data.repoId} is going to be removed`); - try { - const repo = await getRepository(job.data.repoId); - await repo.remove(); - } catch (error) { - if (error instanceof AnonymousError) { - console.error( - "[ERROR]", - error.toString(), - error.stack.split("\n")[1].trim() - ); - } else { - console.error(error); - } - } finally { - console.log(`${job.data.repoId} is removed`); - } -}); +removeQueue.process(5, path.resolve("src/processes/removeRepository.ts")); -downloadQueue.process(2, async (job) => { - console.log(`${job.data.repoId} is going to be downloaded`); - try { - const repo = await getRepository(job.data.repoId); - job.progress("get_repo"); - await repo.resetSate(); - job.progress("resetSate"); - await repo.anonymize(); - } catch (error) { - if (error instanceof AnonymousError) { - console.error( - "[ERROR]", - error.toString(), - error.stack.split("\n")[1].trim() - ); - } else { - console.error(error); - } - } finally { - console.log(`${job.data.repoId} is downloaded`); - } -}); +downloadQueue.process(2, path.resolve("src/processes/downloadRepository.ts"));