feat: introduce streamers that handle the stream and anonymization from github

This commit is contained in:
tdurieux
2024-04-03 11:13:01 +01:00
parent 73019c1b44
commit 4d12641c7e
64 changed files with 419 additions and 257 deletions

83
src/queue/index.ts Normal file
View File

@@ -0,0 +1,83 @@
import { Queue, Worker } from "bullmq";
import config from "../config";
import Repository from "../core/Repository";
import * as path from "path";
export let cacheQueue: Queue<Repository>;
export let removeQueue: Queue<Repository>;
export let downloadQueue: Queue<Repository>;
// avoid to load the queue outside the main server
export function startWorker() {
const connection = {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
};
cacheQueue = new Queue<Repository>("cache removal", {
connection,
defaultJobOptions: {
removeOnComplete: true,
},
});
removeQueue = new Queue<Repository>("repository removal", {
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
defaultJobOptions: {
removeOnComplete: true,
},
});
downloadQueue = new Queue<Repository>("repository download", {
connection,
defaultJobOptions: {
removeOnComplete: true,
},
});
const cacheWorker = new Worker<Repository>(
cacheQueue.name,
path.resolve("build/queue/processes/removeCache.js"),
{
concurrency: 5,
connection,
autorun: true,
}
);
cacheWorker.on("completed", async (job) => {
await job.remove();
});
const removeWorker = new Worker<Repository>(
removeQueue.name,
path.resolve("build/queue/processes/removeRepository.js"),
{
concurrency: 5,
connection,
autorun: true,
}
);
removeWorker.on("completed", async (job) => {
await job.remove();
});
const downloadWorker = new Worker<Repository>(
downloadQueue.name,
path.resolve("build/queue/processes/downloadRepository.js"),
{
concurrency: 3,
connection,
autorun: true,
}
);
if (!downloadWorker.isRunning) downloadWorker.run();
downloadWorker.on("active", async (job) => {
console.log("[QUEUE] download repository start", job.data.repoId);
});
downloadWorker.on("completed", async (job) => {
console.log("[QUEUE] download repository completed", job.data.repoId);
});
downloadWorker.on("failed", async (job) => {
console.log("download repository failed", job.data.repoId);
});
}

View File

@@ -0,0 +1,48 @@
import { Exception, trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq";
import { config } from "dotenv";
config();
import Repository from "../../core/Repository";
import { getRepository as getRepositoryImport } from "../../server/database";
import { RepositoryStatus } from "../../core/types";
export default async function (job: SandboxedJob<Repository, void>) {
const {
connect,
getRepository,
}: {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.downloadRepository");
span.setAttribute("repoId", job.data.repoId);
console.log(`[QUEUE] ${job.data.repoId} is going to be downloaded`);
try {
await connect();
const repo = await getRepository(job.data.repoId);
job.updateProgress({ status: "get_repo" });
try {
job.updateProgress({ status: "resetSate" });
await repo.resetSate(RepositoryStatus.PREPARING, "");
job.updateProgress({ status: "download" });
await repo.anonymize();
console.log(`[QUEUE] ${job.data.repoId} is downloaded`);
} catch (error) {
job.updateProgress({ status: "error" });
if (error instanceof Error) {
span.recordException(error as Exception);
await repo.updateStatus(RepositoryStatus.ERROR, error.message);
} else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error);
span.recordException(error);
}
throw error;
}
} catch (error) {
console.error(error)
span.recordException(error as Exception);
console.log(`[QUEUE] ${job.data.repoId} is finished with an error`);
} finally {
span.end();
}
}

View File

@@ -0,0 +1,34 @@
import { Exception, trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq";
import Repository from "../../core/Repository";
import { getRepository as getRepositoryImport } from "../../server/database";
export default async function (job: SandboxedJob<Repository, void>) {
const {
connect,
getRepository,
}: {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeCache");
span.setAttribute("repoId", job.data.repoId);
try {
await connect();
console.log(
`[QUEUE] Cache of ${job.data.repoId} is going to be removed...`
);
const repo = await getRepository(job.data.repoId);
try {
await repo.removeCache();
} catch (error) {
span.recordException(error as Exception);
throw error;
}
} catch (error) {
span.recordException(error as Exception);
} finally {
console.log(`[QUEUE] Cache of ${job.data.repoId} is removed.`);
span.end();
}
}

View File

@@ -0,0 +1,39 @@
import { trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq";
import Repository from "../../core/Repository";
import { getRepository as getRepositoryImport } from "../../server/database";
import { RepositoryStatus } from "../../core/types";
export default async function (job: SandboxedJob<Repository, void>) {
const {
connect,
getRepository,
}: {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeRepository");
span.setAttribute("repoId", job.data.repoId);
try {
await connect();
console.log(`[QUEUE] ${job.data.repoId} is going to be removed`);
const repo = await getRepository(job.data.repoId);
await repo.updateStatus(RepositoryStatus.REMOVING, "");
try {
await repo.remove();
} catch (error) {
if (error instanceof Error) {
await repo.updateStatus(RepositoryStatus.ERROR, error.message);
} else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error);
}
span.recordException(error as Error);
throw error;
}
} catch (error) {
span.recordException(error as Error);
} finally {
console.log(`[QUEUE] ${job.data.repoId} is removed`);
span.end();
}
}