feat: handle repository download and remove using a job queue

This commit is contained in:
tdurieux
2021-09-08 15:00:15 +02:00
parent d82f882ba8
commit 9838891567
8 changed files with 1121 additions and 52 deletions

966
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -30,6 +30,7 @@
"archive-stream-to-s3": "^1.1.3",
"archiver": "^5.3.0",
"aws-sdk": "^2.958.0",
"bull": "^3.29.2",
"compression": "^1.7.4",
"connect-redis": "^6.0.0",
"dotenv": "^10.0.0",
@@ -54,6 +55,7 @@
},
"devDependencies": {
"@types/archiver": "^5.1.1",
"@types/bull": "^3.15.4",
"@types/compression": "^1.7.1",
"@types/connect-redis": "^0.0.17",
"@types/express": "^4.17.13",

View File

@@ -634,11 +634,22 @@ angular
body: `The repository ${repo.repoId} is going to be removed.`,
};
$scope.toasts.push(toast);
$http.delete(`/api/repo/${repo.repoId}`).then(() => {
toast.title = `${repo.repoId} is removed.`;
toast.body = `The repository ${repo.repoId} is removed.`;
getRepositories();
});
$http.delete(`/api/repo/${repo.repoId}`).then(
() => {
setTimeout(() => {
toast.title = `${repo.repoId} is removed.`;
toast.body = `The repository ${repo.repoId} is removed.`;
getRepositories();
$scope.$apply();
}, 5000);
},
(error) => {
toast.title = `Error during the removal of ${repo.repoId}.`;
toast.body = error.body;
getRepositories();
}
);
}
};
@@ -646,16 +657,26 @@ angular
const toast = {
title: `Refreshing ${repo.repoId}...`,
date: new Date(),
body: `The repository ${repo.repoId} is going to be removed.`,
body: `The repository ${repo.repoId} is going to be refreshed.`,
};
$scope.toasts.push(toast);
$http.post(`/api/repo/${repo.repoId}/refresh`).then(() => {
toast.title = `${repo.repoId} is refreshed.`;
toast.body = `The repository ${repo.repoId} is refreshed.`;
$http.post(`/api/repo/${repo.repoId}/refresh`).then(
() => {
setTimeout(() => {
toast.title = `${repo.repoId} is refreshed.`;
toast.body = `The repository ${repo.repoId} is refreshed.`;
getRepositories();
$scope.$apply();
}, 5000);
},
(error) => {
toast.title = `Error during the refresh of ${repo.repoId}.`;
toast.body = error.body;
getRepositories();
});
getRepositories();
}
);
};
$scope.repoFiler = (repo) => {

View File

@@ -14,6 +14,7 @@ import GitHubBase from "./source/GitHubBase";
import Conference from "./Conference";
import ConferenceModel from "./database/conference/conferences.model";
import AnonymousError from "./AnonymousError";
import { downloadQueue } from "./queue";
export default class Repository {
private _model: IAnonymizedRepositoryDocument;
@@ -103,16 +104,21 @@ export default class Repository {
this.expire();
}
}
if (this.status == "expired") {
throw new AnonymousError("repository_expired", this);
}
if (this.status == "removed") {
if (
this.status == "expired" ||
this.status == "expiring" ||
this.status == "removing" ||
this.status == "removed"
) {
throw new AnonymousError("repository_expired", this);
}
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (this.status == "download" && this._model.statusDate > fiveMinuteAgo) {
if (
this.status == "preparing" ||
(this.status == "download" && this._model.statusDate > fiveMinuteAgo)
) {
throw new AnonymousError("repository_not_ready", this);
}
}
@@ -136,17 +142,31 @@ export default class Repository {
* @returns void
*/
async updateIfNeeded(): Promise<void> {
if (
this.status == "expired" ||
this.status == "expiring" ||
this.status == "removing" ||
this.status == "removed"
) {
throw new AnonymousError("repository_expired", this);
}
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (this.status != "ready") {
if (
this._model.statusDate < fiveMinuteAgo &&
this.status != "preparing"
) {
await this.updateStatus("preparing");
await downloadQueue.add(this, { jobId: this.repoId });
}
throw new AnonymousError("repository_not_ready", this);
}
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
if (this._model.options.update && this._model.lastView < yesterday) {
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (this.status == "download" && this._model.statusDate > fiveMinuteAgo) {
throw new AnonymousError("repository_not_ready", this);
}
// Only GitHubBase can be update for the moment
if (this.source instanceof GitHubBase) {
const branches = await this.source.githubRepository.branches({
@@ -154,30 +174,25 @@ export default class Repository {
accessToken: await this.source.getToken(),
});
const branch = this.source.branch;
if (
branch.commit ==
branches.filter((f) => f.name == branch.name)[0]?.commit
) {
const newCommit = branches.filter((f) => f.name == branch.name)[0]
?.commit;
if (branch.commit == newCommit) {
console.log(`${this._model.repoId} is up to date`);
return;
}
this._model.source.commit = branches.filter(
(f) => f.name == branch.name
)[0]?.commit;
branch.commit = this._model.source.commit;
this._model.source.commit = newCommit;
branch.commit = newCommit;
if (!this._model.source.commit) {
if (!newCommit) {
console.error(
`${branch.name} for ${this.source.githubRepository.fullName} is not found`
);
throw new AnonymousError("branch_not_found", this);
}
this._model.anonymizeDate = new Date();
console.log(
`${this._model.repoId} will be updated to ${this._model.source.commit}`
);
console.log(`${this._model.repoId} will be updated to ${newCommit}`);
await this.resetSate("preparing");
await this.anonymize();
await downloadQueue.add(this, { jobId: this.repoId });
}
}
}
@@ -219,14 +234,18 @@ export default class Repository {
* Expire the repository
*/
async expire() {
return this.resetSate("expired");
await this.updateStatus("expiring");
await this.resetSate();
await this.updateStatus("expired");
}
/**
* Remove the repository
*/
async remove() {
return this.resetSate("removed");
await this.updateStatus("removing");
await this.resetSate();
await this.updateStatus("removed");
}
/**
@@ -234,8 +253,10 @@ export default class Repository {
*/
async resetSate(status?: RepositoryStatus) {
if (status) this._model.status = status;
// remove attribute
this._model.size = { storage: 0, file: 0 };
this._model.originalFiles = null;
// remove cache
return Promise.all([
this._model.save(),
storage.rm(this._model.repoId + "/"),

50
src/queue.ts Normal file
View File

@@ -0,0 +1,50 @@
import * as Queue from "bull";
import config from "../config";
import { getRepository } from "./database/database";
import Repository from "./Repository";
export const removeQueue = new Queue<Repository>("repository removal", {
redis: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
});
removeQueue.on("completed", async (job) => {
await job.remove();
});
export const downloadQueue = new Queue<Repository>("repository download", {
redis: {
host: config.REDIS_HOSTNAME,
port: config.REDIS_PORT,
},
});
downloadQueue.on("completed", async (job) => {
await job.remove();
});
removeQueue.process(5, async (job) => {
console.log(`${job.data.repoId} is going to be removed`);
try {
const repo = await getRepository(job.data.repoId);
await repo.remove();
} catch (error) {
console.log("error", error);
} finally {
console.log(`${job.data.repoId} is removed`);
}
});
downloadQueue.process(2, async (job) => {
console.log(`${job.data.repoId} is going to be downloaded`);
try {
const repo = await getRepository(job.data.repoId);
job.progress("get_repo");
await repo.remove();
job.progress("get_remove");
await repo.anonymize();
} catch (error) {
console.log("error", error);
} finally {
console.log(`${job.data.repoId} is downloaded`);
}
});

View File

@@ -12,6 +12,7 @@ import { IAnonymizedRepositoryDocument } from "../database/anonymizedRepositorie
import Repository from "../Repository";
import ConferenceModel from "../database/conference/conferences.model";
import AnonymousError from "../AnonymousError";
import { downloadQueue, removeQueue } from "../queue";
const router = express.Router();
@@ -65,12 +66,15 @@ router.post(
const repo = await getRepo(req, res, { nocheck: true });
if (!repo) return;
if (repo.status == "preparing" || repo.status == "removing") return;
const user = await getUser(req);
if (repo.owner.id != user.id) {
return res.status(401).json({ error: "not_authorized" });
}
await repo.anonymize();
res.end("ok");
await repo.updateStatus("preparing");
await downloadQueue.add(repo, {jobId: repo.repoId});
res.json({ status: repo.status });
} catch (error) {
handleError(error, res);
}
@@ -83,14 +87,16 @@ router.delete(
async (req: express.Request, res: express.Response) => {
const repo = await getRepo(req, res, { nocheck: true });
if (!repo) return;
// if (repo.status == "removing") return res.json({ status: repo.status });
try {
if (repo.status == "removed") throw new AnonymousError("is_removed");
const user = await getUser(req);
if (repo.owner.id != user.id) {
return res.status(401).json({ error: "not_authorized" });
}
await repo.remove();
console.log(`${req.params.repoId} is removed`);
return res.json("ok");
await repo.updateStatus("removing");
await removeQueue.add(repo, {jobId: repo.repoId});
return res.json({ status: repo.status });
} catch (error) {
handleError(error, res);
}
@@ -305,8 +311,8 @@ router.post(
}
repo.model.conference = repoUpdate.conference;
await repo.updateStatus("preparing");
res.send("ok");
new Repository(repo.model).anonymize();
res.json({ status: repo.status });
await downloadQueue.add(repo, {jobId: repo.repoId});
} catch (error) {
return handleError(error, res);
}
@@ -369,11 +375,14 @@ router.post("/", async (req: express.Request, res: express.Response) => {
}
}
res.send("ok");
new Repository(repo).anonymize();
res.send({ status: repo.status });
downloadQueue.add(new Repository(repo), {jobId: repo.repoId});
} catch (error) {
if (error.message?.indexOf(" duplicate key") > -1) {
return handleError(new AnonymousError("repoId_already_used", repoUpdate.repoId), res);
return handleError(
new AnonymousError("repoId_already_used", repoUpdate.repoId),
res
);
}
return handleError(error, res);
}

View File

@@ -3,7 +3,6 @@ import * as express from "express";
import * as stream from "stream";
import config from "../../config";
import { getRepo, handleError } from "./route-utils";
const router = express.Router();
@@ -23,7 +22,7 @@ router.get(
// cache the file for 6 hours
res.header("Cache-Control", "max-age=21600000");
await pipeline(repo.zip(), res)
await pipeline(repo.zip(), res);
} catch (error) {
handleError(error, res);
}
@@ -60,7 +59,6 @@ router.get(
) {
redirectURL = repo.source.url;
} else {
repo.check();
await repo.updateIfNeeded();
}

View File

@@ -115,7 +115,9 @@ export type RepositoryStatus =
| "download"
| "ready"
| "expired"
| "expiring"
| "removed"
| "removing"
| "error";
export type ConferenceStatus = "ready" | "expired" | "removed";