chore: replace bull by bullmq

This commit is contained in:
tdurieux
2022-07-22 13:06:22 +02:00
parent fa8a513d93
commit ac52999841
19 changed files with 481 additions and 367 deletions

View File

@@ -1,6 +1,6 @@
import * as path from "path";
import * as express from "express";
import * as stream from "stream";
import { join, basename } from "path";
import { Response } from "express";
import { Readable, pipeline } from "stream";
import { promisify } from "util";
import Repository from "./Repository";
import { Tree, TreeElement, TreeFile } from "./types";
@@ -19,11 +19,11 @@ function tree2sha(
const sha = tree[i].sha as string;
const size = tree[i].size as number;
if (sha != null && size != null) {
output[sha] = path.join(parent, i);
output[sha] = join(parent, i);
} else if (tree[i].child) {
tree2sha(tree[i].child as Tree, output, path.join(parent, i));
tree2sha(tree[i].child as Tree, output, join(parent, i));
} else {
tree2sha(tree[i] as Tree, output, path.join(parent, i));
tree2sha(tree[i] as Tree, output, join(parent, i));
}
}
return output;
@@ -98,13 +98,13 @@ export default class AnonymizedFile {
// if only one option we found the original filename
if (options.length == 1) {
currentOriginalPath = path.join(currentOriginalPath, options[0]);
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = currentOriginal[options[0]];
} else {
isAmbiguous = true;
}
} else if (!isAmbiguous) {
currentOriginalPath = path.join(currentOriginalPath, fileName);
currentOriginalPath = join(currentOriginalPath, fileName);
currentOriginal = currentOriginal[fileName];
}
}
@@ -130,7 +130,7 @@ export default class AnonymizedFile {
});
}
this._originalPath = path.join(currentOriginalPath, shaTree[file.sha]);
this._originalPath = join(currentOriginalPath, shaTree[file.sha]);
} else {
this._originalPath = currentOriginalPath;
}
@@ -139,7 +139,7 @@ export default class AnonymizedFile {
}
async isFileSupported() {
const filename = path.basename(await this.originalPath());
const filename = basename(await this.originalPath());
const extensions = filename.split(".").reverse();
const extension = extensions[0].toLowerCase();
if (!this.repository.options.pdf && extension == "pdf") {
@@ -158,7 +158,7 @@ export default class AnonymizedFile {
return true;
}
async content(): Promise<stream.Readable> {
async content(): Promise<Readable> {
if (this.fileSize && this.fileSize > config.MAX_FILE_SIZE) {
throw new AnonymousError("file_too_big", {
object: this,
@@ -184,13 +184,13 @@ export default class AnonymizedFile {
object: this,
httpStatus: 400,
});
return path.join(this.repository.originalCachePath, this._originalPath);
return join(this.repository.originalCachePath, this._originalPath);
}
async send(res: express.Response): Promise<void> {
const pipeline = promisify(stream.pipeline);
async send(res: Response): Promise<void> {
const pipe = promisify(pipeline);
try {
await pipeline(await this.anonymizedContent(), res);
await pipe(await this.anonymizedContent(), res);
} catch (error) {
handleError(error, res);
}

View File

@@ -1,7 +1,7 @@
import * as path from "path";
import { join } from "path";
import storage from "./storage";
import { RepositoryStatus, Source, Tree, TreeElement, TreeFile } from "./types";
import * as stream from "stream";
import { Readable } from "stream";
import User from "./User";
import GitHubStream from "./source/GitHubStream";
import GitHubDownload from "./source/GitHubDownload";
@@ -136,7 +136,7 @@ export default class Repository {
*
* @returns A stream of anonymized repository compressed
*/
zip(): stream.Readable {
zip(): Readable {
return storage.archive(this.originalCachePath, {
format: "zip",
fileTransformer: (filename) =>
@@ -185,11 +185,13 @@ export default class Repository {
this._model.anonymizeDate = new Date();
console.log(`${this._model.repoId} will be updated to ${newCommit}`);
await this.resetSate("preparing");
await downloadQueue.add(this, { jobId: this.repoId, attempts: 3 });
await downloadQueue.add(this.repoId, this, {
jobId: this.repoId,
attempts: 3,
});
}
}
}
/**
* Download the require state for the repository to work
*
@@ -327,7 +329,7 @@ export default class Repository {
}
get originalCachePath() {
return path.join(this._model.repoId, "original") + "/";
return join(this._model.repoId, "original") + "/";
}
get status() {

View File

@@ -2,15 +2,14 @@ import config from "../config";
import Repository from "./Repository";
import GitHubBase from "./source/GitHubBase";
import { isText } from "istextorbinary";
import * as path from "path";
import * as stream from "stream";
import { basename } from "path";
import { Transform } from "stream";
const urlRegex =
/<?\b((https?|ftp|file):\/\/)[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]\b\/?>?/g;
export function isTextFile(filePath, content) {
const filename = path.basename(filePath);
export function isTextFile(filePath: string, content: Buffer) {
const filename = basename(filePath);
const extensions = filename.split(".").reverse();
const extension = extensions[0].toLowerCase();
if (config.additionalExtensions.includes(extension)) {
@@ -23,7 +22,7 @@ export function isTextFile(filePath, content) {
}
export function anonymizeStream(filename: string, repository: Repository) {
const ts = new stream.Transform();
const ts = new Transform();
var chunks = [],
len = 0,
pos = 0;

View File

@@ -1,14 +1,17 @@
import AnonymousError from "../AnonymousError";
import { connect, getRepository } from "../database/database";
import { SandboxedJob } from "bullmq";
import { config } from "dotenv";
config();
import Repository from "../Repository";
export default async function process(job) {
export default async function (job: SandboxedJob<Repository, void>) {
const { connect, getRepository } = require("../database/database");
console.log(`${job.data.repoId} is going to be downloaded`);
try {
await connect();
const repo = await getRepository(job.data.repoId);
job.progress("get_repo");
await repo.resetSate();
job.progress("resetSate");
job.updateProgress({ status: "get_repo" });
await repo.resetSate("preparing");
job.updateProgress({ status: "resetSate" });
try {
await repo.anonymize();
} catch (error) {
@@ -16,15 +19,7 @@ export default async function process(job) {
throw error;
}
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
console.error(error);
} finally {
console.log(`${job.data.repoId} is downloaded`);
}

View File

@@ -1,7 +1,8 @@
import AnonymousError from "../AnonymousError";
import { connect, getRepository } from "../database/database";
import { SandboxedJob } from "bullmq";
import Repository from "../Repository";
export default async function process(job) {
export default async function (job: SandboxedJob<Repository, void>) {
const { connect, getRepository } = require("../database/database");
try {
await connect();
console.log(`${job.data.repoId} is going to be removed`);
@@ -13,15 +14,7 @@ export default async function process(job) {
throw error;
}
} catch (error) {
if (error instanceof AnonymousError) {
console.error(
"[ERROR]",
error.toString(),
error.stack.split("\n")[1].trim()
);
} else {
console.error(error);
}
console.error(error);
} finally {
console.log(`${job.data.repoId} is removed`);
}

View File

@@ -1,27 +1,80 @@
import * as Queue from "bull";
import { Queue, Worker } from "bullmq";
import config from "../config";
import Repository from "./Repository";
import * as path from "path";
export const removeQueue = new Queue<Repository>("repository removal", {
redis: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
});
removeQueue.on("completed", async (job) => {
await job.remove();
});
export const downloadQueue = new Queue<Repository>("repository download", {
redis: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
});
downloadQueue.on("completed", async (job) => {
await job.remove();
});
export let removeQueue: Queue<Repository>;
export let downloadQueue: Queue<Repository>;
removeQueue.process(5, path.resolve("src/processes/removeRepository.ts"));
// avoid to load the queue outside the main server
export function startWorker() {
removeQueue = new Queue<Repository>("repository removal", {
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
defaultJobOptions: {
removeOnComplete: true,
},
});
downloadQueue = new Queue<Repository>("repository download", {
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
defaultJobOptions: {
removeOnComplete: true,
},
});
const removeWorker = new Worker<Repository>(
removeQueue.name,
path.resolve("dist/src/processes/removeRepository.js"),
//removeRepository,
{
concurrency: 5,
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
autorun: true,
downloadQueue.process(2, path.resolve("src/processes/downloadRepository.ts"));
}
);
removeWorker.on("error", async (error) => {
console.log(error);
});
removeWorker.on("completed", async (job) => {
await job.remove();
});
const downloadWorker = new Worker<Repository>(
downloadQueue.name,
path.resolve("dist/src/processes/downloadRepository.js"),
// downloadRepository,
{
concurrency: 2,
connection: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
autorun: true
}
);
if (!downloadWorker.isRunning) downloadWorker.run();
downloadWorker.on("active", async (job) => {
console.log("active", job.data.repoId);
});
downloadWorker.on("completed", async (job) => {
console.log("completed", job.data.repoId);
});
downloadWorker.on("failed", async (job) => {
console.log("failed", job.data.repoId);
});
downloadWorker.on("closing", async (error) => {
console.log("closing", error);
});
downloadWorker.on("error", async (error) => {
console.log(error);
});
}

View File

@@ -1,9 +1,10 @@
import { Queue } from "bull";
import { Queue } from "bullmq";
import * as express from "express";
import AnonymizedRepositoryModel from "../database/anonymizedRepositories/anonymizedRepositories.model";
import ConferenceModel from "../database/conference/conferences.model";
import UserModel from "../database/users/users.model";
import { downloadQueue, removeQueue } from "../queue";
import Repository from "../Repository";
import { ensureAuthenticated } from "./connection";
import { handleError, getUser, isOwnerOrAdmin } from "./route-utils";
@@ -29,7 +30,7 @@ router.use(
);
router.post("/queue/:name/:repo_id", async (req, res) => {
let queue: Queue;
let queue: Queue<Repository, void>;
if (req.params.name == "download") {
queue = downloadQueue;
} else if (req.params.name == "remove") {
@@ -41,7 +42,7 @@ router.post("/queue/:name/:repo_id", async (req, res) => {
if (!job) {
return res.status(404).json({ error: "job_not_found" });
}
job.retry();
await job.retry();
res.send("ok");
});

View File

@@ -123,7 +123,7 @@ router.delete(
const user = await getUser(req);
isOwnerOrAdmin([repo.owner.id], user);
await repo.updateStatus("removing");
await removeQueue.add(repo, { jobId: repo.repoId });
await removeQueue.add(repo.repoId, repo, { jobId: repo.repoId });
return res.json({ status: repo.status });
} catch (error) {
handleError(error, res);
@@ -362,7 +362,7 @@ router.post(
repo.model.conference = repoUpdate.conference;
await repo.updateStatus("preparing");
res.json({ status: repo.status });
await downloadQueue.add(repo, { jobId: repo.repoId });
await downloadQueue.add(repo.repoId, repo, { jobId: repo.repoId });
} catch (error) {
return handleError(error, res);
}
@@ -432,7 +432,7 @@ router.post("/", async (req: express.Request, res: express.Response) => {
}
res.send({ status: repo.status });
downloadQueue.add(new Repository(repo), {
downloadQueue.add(repo.repoId, new Repository(repo), {
jobId: repo.repoId,
attempts: 3,
});

View File

@@ -91,7 +91,10 @@ router.get(
// && repo.status != "preparing"
) {
await repo.updateStatus("preparing");
await downloadQueue.add(repo, { jobId: repo.repoId, attempts: 3 });
await downloadQueue.add(repo.repoId, repo, {
jobId: repo.repoId,
attempts: 3,
});
}
if (repo.status == "error") {
throw new AnonymousError(

View File

@@ -44,6 +44,7 @@ export function isOwnerOrAdmin(authorizedUsers: string[], user: User) {
function printError(error: any) {
io.notifyError(error, error.value);
if (error instanceof AnonymousError) {
console.log(error);
console.error(
"[ERROR]",
error.toString(),
@@ -78,14 +79,14 @@ export function handleError(error: any, res: express.Response) {
export async function getUser(req: express.Request) {
const user = (req.user as any).user;
if (!user) {
req.logout();
req.logout((error) => console.error(error));
throw new AnonymousError("not_connected", {
httpStatus: 401,
});
}
const model = await UserModel.findById(user._id);
if (!model) {
req.logout();
req.logout((error) => console.error(error));
throw new AnonymousError("not_connected", {
httpStatus: 401,
});

View File

@@ -10,7 +10,7 @@ router.use(ensureAuthenticated);
router.get("/logout", async (req: express.Request, res: express.Response) => {
try {
req.logout();
req.logout((error) => console.error(error));
res.redirect("/");
} catch (error) {
handleError(error, res);

View File

@@ -13,6 +13,7 @@ import * as connection from "./routes/connection";
import router from "./routes";
import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model";
import { conferenceStatusCheck, repositoryStatusCheck } from "./schedule";
import { startWorker } from "./queue";
function indexResponse(req: express.Request, res: express.Response) {
if (
@@ -44,13 +45,15 @@ export default async function start() {
app.use(passport.initialize());
app.use(passport.session());
startWorker();
const redisClient = createClient({
socket: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
});
redisClient.on('error', (err) => console.log('Redis Client Error', err));
redisClient.on("error", (err) => console.log("Redis Client Error", err));
await redisClient.connect();
@@ -59,7 +62,7 @@ export default async function start() {
sendCommand: (...args: string[]) => redisClient.sendCommand(args),
}),
windowMs: 15 * 60 * 1000, // 15 minutes
max: 200, // limit each IP
max: 1000, // limit each IP
standardHeaders: true,
legacyHeaders: false,
// delayMs: 0, // disable delaying - full speed until the max limit is reached

View File

@@ -2,9 +2,8 @@ import AnonymizedFile from "../AnonymizedFile";
import { Branch, Tree } from "../types";
import { GitHubRepository } from "./GitHubRepository";
import config from "../../config";
import { OAuthApp } from "@octokit/oauth-app";
import Repository from "../Repository";
import * as stream from "stream";
import { Readable } from "stream";
import UserModel from "../database/users/users.model";
import AnonymousError from "../AnonymousError";
@@ -37,7 +36,7 @@ export default abstract class GitHubBase {
this.branch = { commit: data.commit, name: data.branch };
}
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
async getFileContent(file: AnonymizedFile): Promise<Readable> {
throw new AnonymousError("method_not_implemented", {
httpStatus: 501,
object: this,

View File

@@ -7,7 +7,7 @@ import GitHubBase from "./GitHubBase";
import AnonymizedFile from "../AnonymizedFile";
import { SourceBase } from "../types";
import got from "got";
import * as stream from "stream";
import { Readable } from "stream";
import { OctokitResponse } from "@octokit/types";
import AnonymousError from "../AnonymousError";
@@ -113,7 +113,7 @@ export default class GitHubDownload extends GitHubBase implements SourceBase {
await this.repository.updateStatus("ready");
}
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
async getFileContent(file: AnonymizedFile): Promise<Readable> {
await this.download();
// update the file list
await this.repository.files({ force: true });

View File

@@ -1,4 +1,3 @@
import * as path from "path";
import AnonymizedFile from "../AnonymizedFile";
import Repository from "../Repository";
import storage from "../storage";

View File

@@ -2,10 +2,10 @@ import { StorageBase, Tree } from "../types";
import config from "../../config";
import * as fs from "fs";
import * as unzip from "unzip-stream";
import * as path from "path";
import * as express from "express";
import * as stream from "stream";
import { Extract } from "unzip-stream";
import { join, basename, dirname } from "path";
import { Response } from "express";
import { Readable, pipeline } from "stream";
import * as archiver from "archiver";
import { promisify } from "util";
@@ -16,32 +16,32 @@ export default class FileSystem implements StorageBase {
/** @override */
async exists(p: string): Promise<boolean> {
return fs.existsSync(path.join(config.FOLDER, p));
return fs.existsSync(join(config.FOLDER, p));
}
/** @override */
send(p: string, res: express.Response) {
res.sendFile(path.join(config.FOLDER, p), { dotfiles: "allow" });
send(p: string, res: Response) {
res.sendFile(join(config.FOLDER, p), { dotfiles: "allow" });
}
/** @override */
read(p: string): stream.Readable {
return fs.createReadStream(path.join(config.FOLDER, p));
read(p: string): Readable {
return fs.createReadStream(join(config.FOLDER, p));
}
/** @override */
async write(p: string, data: Buffer): Promise<void> {
if (!(await this.exists(path.dirname(p)))) {
await fs.promises.mkdir(path.dirname(path.join(config.FOLDER, p)), {
if (!(await this.exists(dirname(p)))) {
await fs.promises.mkdir(dirname(join(config.FOLDER, p)), {
recursive: true,
});
}
return fs.promises.writeFile(path.join(config.FOLDER, p), data);
return fs.promises.writeFile(join(config.FOLDER, p), data);
}
/** @override */
async rm(dir: string): Promise<void> {
await fs.promises.rm(path.join(config.FOLDER, dir), {
await fs.promises.rm(join(config.FOLDER, dir), {
force: true,
recursive: true,
});
@@ -50,7 +50,7 @@ export default class FileSystem implements StorageBase {
/** @override */
async mk(dir: string): Promise<void> {
if (!(await this.exists(dir)))
fs.promises.mkdir(path.join(config.FOLDER, dir), { recursive: true });
fs.promises.mkdir(join(config.FOLDER, dir), { recursive: true });
}
/** @override */
@@ -64,12 +64,12 @@ export default class FileSystem implements StorageBase {
if (opt.root == null) {
opt.root = config.FOLDER;
}
let files = await fs.promises.readdir(path.join(opt.root, dir));
let files = await fs.promises.readdir(join(opt.root, dir));
const output: Tree = {};
for (let file of files) {
let filePath = path.join(dir, file);
let filePath = join(dir, file);
try {
const stats = await fs.promises.stat(path.join(opt.root, filePath));
const stats = await fs.promises.stat(join(opt.root, filePath));
if (file[0] == "$") {
file = "\\" + file;
}
@@ -92,12 +92,12 @@ export default class FileSystem implements StorageBase {
}
/** @override */
async extractZip(p: string, data: stream.Readable): Promise<void> {
const pipeline = promisify(stream.pipeline);
return pipeline(
async extractZip(p: string, data: Readable): Promise<void> {
const pipe = promisify(pipeline);
return pipe(
data,
unzip.Extract({
path: path.join(path.join(config.FOLDER, p)),
Extract({
path: join(join(config.FOLDER, p)),
decodeString: (buf) => {
const name = buf.toString();
const newName = name.substr(name.indexOf("/") + 1);
@@ -127,8 +127,8 @@ export default class FileSystem implements StorageBase {
}
const f = file.path.replace(dir, "");
archive.append(rs, {
name: path.basename(f),
prefix: path.dirname(f),
name: basename(f),
prefix: dirname(f),
});
},
}).then(() => {

View File

@@ -1,13 +1,13 @@
import { StorageBase, Tree, TreeFile } from "../types";
import { S3 } from "aws-sdk";
import config from "../../config";
import * as stream from "stream";
import { pipeline, Readable } from "stream";
import ArchiveStreamToS3 from "decompress-stream-to-s3";
import * as express from "express";
import * as mime from "mime-types";
import { Response } from "express";
import { lookup } from "mime-types";
import * as flow from "xml-flow";
import * as archiver from "archiver";
import * as path from "path";
import { dirname, basename } from "path";
import AnonymousError from "../AnonymousError";
export default class S3Storage implements StorageBase {
@@ -83,7 +83,7 @@ export default class S3Storage implements StorageBase {
}
/** @override */
send(p: string, res: express.Response) {
send(p: string, res: Response) {
const s = this.client
.getObject({
Bucket: config.S3_BUCKET,
@@ -102,8 +102,8 @@ export default class S3Storage implements StorageBase {
res.set("Content-Length", headers["content-length"]);
res.set("Content-Type", headers["content-type"]);
}
stream.pipeline(
response.httpResponse.createUnbufferedStream() as stream.Readable,
pipeline(
response.httpResponse.createUnbufferedStream() as Readable,
res
);
});
@@ -112,7 +112,7 @@ export default class S3Storage implements StorageBase {
}
/** @override */
read(path: string): stream.Readable {
read(path: string): Readable {
return this.client
.getObject({
Bucket: config.S3_BUCKET,
@@ -128,7 +128,7 @@ export default class S3Storage implements StorageBase {
Bucket: config.S3_BUCKET,
Key: path,
Body: data,
ContentType: mime.lookup(path).toString(),
ContentType: lookup(path).toString(),
})
.promise();
return;
@@ -168,7 +168,7 @@ export default class S3Storage implements StorageBase {
}
/** @override */
async extractZip(p: string, data: stream.Readable): Promise<void> {
async extractZip(p: string, data: Readable): Promise<void> {
let toS3: ArchiveStreamToS3;
return new Promise((resolve, reject) => {
@@ -181,8 +181,7 @@ export default class S3Storage implements StorageBase {
header.name = header.name.substr(header.name.indexOf("/") + 1);
},
});
stream
.pipeline(data, toS3, () => {})
pipeline(data, toS3, () => {})
.on("finish", resolve)
.on("error", reject);
});
@@ -210,14 +209,14 @@ export default class S3Storage implements StorageBase {
xmlStream.on("tag:contents", function (file) {
let rs = that.read(file.key);
file.key = file.key.replace(dir, "");
const filename = path.basename(file.key);
const filename = basename(file.key);
if (filename == "") return;
if (opt?.fileTransformer) {
rs = rs.pipe(opt.fileTransformer(filename));
}
archive.append(rs, {
name: filename,
prefix: path.dirname(file.key),
prefix: dirname(file.key),
});
});
xmlStream.on("end", () => {