refactor: improve file streaming

This commit is contained in:
tdurieux
2024-04-01 15:17:26 +01:00
parent 87c7e8c470
commit 35f4b4ce52
15 changed files with 560 additions and 499 deletions

86
package-lock.json generated
View File

@@ -10,6 +10,7 @@
"license": "GPL-3.0",
"dependencies": {
"@aws-sdk/client-s3": "^3.540.0",
"@aws-sdk/lib-storage": "^3.540.0",
"@mongodb-js/zstd": "^1.2.0",
"@octokit/rest": "^20.0.2",
"@opentelemetry/api": "^1.8.0",
@@ -557,6 +558,35 @@
"node": ">=14.0.0"
}
},
"node_modules/@aws-sdk/lib-storage": {
"version": "3.540.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.540.0.tgz",
"integrity": "sha512-xNLOpuOSzGO90fwn+GBsM//a4ALYl85WEsovKyJI6jYJTMCGLrzJQeq8cxeC5Xz6w8Ol86lf80Gll/cz4phy7g==",
"dependencies": {
"@smithy/abort-controller": "^2.2.0",
"@smithy/middleware-endpoint": "^2.5.0",
"@smithy/smithy-client": "^2.5.0",
"buffer": "5.6.0",
"events": "3.3.0",
"stream-browserify": "3.0.0",
"tslib": "^2.6.2"
},
"engines": {
"node": ">=14.0.0"
},
"peerDependencies": {
"@aws-sdk/client-s3": "^3.0.0"
}
},
"node_modules/@aws-sdk/lib-storage/node_modules/buffer": {
"version": "5.6.0",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz",
"integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==",
"dependencies": {
"base64-js": "^1.0.2",
"ieee754": "^1.1.4"
}
},
"node_modules/@aws-sdk/middleware-bucket-endpoint": {
"version": "3.535.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/middleware-bucket-endpoint/-/middleware-bucket-endpoint-3.535.0.tgz",
@@ -5946,6 +5976,14 @@
"node": ">= 0.6"
}
},
"node_modules/events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
"integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==",
"engines": {
"node": ">=0.8.x"
}
},
"node_modules/express": {
"version": "4.19.2",
"resolved": "https://registry.npmjs.org/express/-/express-4.19.2.tgz",
@@ -9578,6 +9616,15 @@
"node": ">= 0.8"
}
},
"node_modules/stream-browserify": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz",
"integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==",
"dependencies": {
"inherits": "~2.0.4",
"readable-stream": "^3.5.0"
}
},
"node_modules/stream-shift": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz",
@@ -10828,6 +10875,31 @@
"tslib": "^2.6.2"
}
},
"@aws-sdk/lib-storage": {
"version": "3.540.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.540.0.tgz",
"integrity": "sha512-xNLOpuOSzGO90fwn+GBsM//a4ALYl85WEsovKyJI6jYJTMCGLrzJQeq8cxeC5Xz6w8Ol86lf80Gll/cz4phy7g==",
"requires": {
"@smithy/abort-controller": "^2.2.0",
"@smithy/middleware-endpoint": "^2.5.0",
"@smithy/smithy-client": "^2.5.0",
"buffer": "5.6.0",
"events": "3.3.0",
"stream-browserify": "3.0.0",
"tslib": "^2.6.2"
},
"dependencies": {
"buffer": {
"version": "5.6.0",
"resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz",
"integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==",
"requires": {
"base64-js": "^1.0.2",
"ieee754": "^1.1.4"
}
}
}
},
"@aws-sdk/middleware-bucket-endpoint": {
"version": "3.535.0",
"resolved": "https://registry.npmjs.org/@aws-sdk/middleware-bucket-endpoint/-/middleware-bucket-endpoint-3.535.0.tgz",
@@ -14866,6 +14938,11 @@
"resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz",
"integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg=="
},
"events": {
"version": "3.3.0",
"resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz",
"integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q=="
},
"express": {
"version": "4.19.2",
"resolved": "https://registry.npmjs.org/express/-/express-4.19.2.tgz",
@@ -17521,6 +17598,15 @@
"resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz",
"integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ=="
},
"stream-browserify": {
"version": "3.0.0",
"resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz",
"integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==",
"requires": {
"inherits": "~2.0.4",
"readable-stream": "^3.5.0"
}
},
"stream-shift": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz",

View File

@@ -32,6 +32,7 @@
],
"dependencies": {
"@aws-sdk/client-s3": "^3.540.0",
"@aws-sdk/lib-storage": "^3.540.0",
"@mongodb-js/zstd": "^1.2.0",
"@octokit/rest": "^20.0.2",
"@opentelemetry/api": "^1.8.0",

View File

@@ -3,7 +3,7 @@ import { Response } from "express";
import { Readable } from "stream";
import { trace } from "@opentelemetry/api";
import Repository from "./Repository";
import { FILE_TYPE, Tree, TreeElement, TreeFile } from "./types";
import { RepositoryStatus, Tree, TreeElement, TreeFile } from "./types";
import storage from "./storage";
import config from "../config";
import {
@@ -14,7 +14,7 @@ import {
import AnonymousError from "./AnonymousError";
import { handleError } from "./routes/route-utils";
import { lookup } from "mime-types";
import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model";
import { FILE_TYPE } from "./storage/Storage";
/**
* Represent a file in a anonymized repository
@@ -193,28 +193,20 @@ export default class AnonymizedFile {
.getTracer("ano-file")
.startActiveSpan("content", async (span) => {
try {
span.setAttribute("anonymizedPath", this.anonymizedPath);
if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
await this.originalPath();
}
span.addEvent("originalPath", { originalPath: this._originalPath });
span.addEvent("filePath", { originalPath: this.filePath });
if (this.fileSize && this.fileSize > config.MAX_FILE_SIZE) {
throw new AnonymousError("file_too_big", {
object: this,
httpStatus: 403,
});
}
const exist = await storage.exists(this.originalCachePath);
span.addEvent("file_exist", { exist });
if (exist == FILE_TYPE.FILE) {
return storage.read(this.originalCachePath);
} else if (exist == FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
object: this,
httpStatus: 400,
});
}
return await this.repository.source?.getFileContent(this);
const out = await this.repository.source?.getFileContent(this);
this.repository.model.isReseted = false;
this.repository.updateStatus(RepositoryStatus.READY);
return out;
} finally {
span.end();
}
@@ -233,81 +225,70 @@ export default class AnonymizedFile {
});
}
get originalCachePath() {
if (!this.originalPath)
throw new AnonymousError("path_not_defined", {
object: this,
httpStatus: 400,
});
get filePath() {
if (!this._originalPath) {
if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
throw new AnonymousError("path_not_defined", {
object: this,
httpStatus: 400,
});
} else {
return join(this.repository.originalCachePath, this.anonymizedPath);
}
return this.anonymizedPath;
}
return join(this.repository.originalCachePath, this._originalPath);
return this._originalPath;
}
async send(res: Response): Promise<void> {
return trace.getTracer("ano-file").startActiveSpan("send", async (span) => {
span.setAttribute("anonymizedPath", this.anonymizedPath);
return new Promise<void>(async (resolve, reject) => {
try {
const content = await this.content();
const mime = lookup(this.anonymizedPath);
if (mime && this.extension() != "ts") {
res.contentType(mime);
} else if (isTextFile(this.anonymizedPath)) {
res.contentType("text/plain");
}
res.header("Accept-Ranges", "none");
let fileInfo: Awaited<ReturnType<typeof storage.fileInfo>>;
return trace
.getTracer("ano-file")
.startActiveSpan("AnonymizedFile.send", async (span) => {
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("anonymizedPath", this.anonymizedPath);
return new Promise<void>(async (resolve, reject) => {
try {
fileInfo = await storage.fileInfo(this.originalCachePath);
} catch (error) {
// unable to get file size
console.error(error);
}
const anonymizer = new AnonymizeTransformer(this);
anonymizer.once("transform", (data) => {
if (data.isText && !mime) {
const mime = lookup(this.anonymizedPath);
if (mime && this.extension() != "ts") {
res.contentType(mime);
} else if (isTextFile(this.anonymizedPath)) {
res.contentType("text/plain");
}
if (fileInfo?.size && !data.wasAnonimized) {
// the text files may be anonymized and therefore the size may be different
res.header("Content-Length", fileInfo.size.toString());
}
});
content
.pipe(anonymizer)
.pipe(res)
.on("close", () => {
if (!content.closed && !content.destroyed) {
content.destroy();
res.header("Accept-Ranges", "none");
const anonymizer = new AnonymizeTransformer(this);
anonymizer.once("transform", (data) => {
if (!mime && data.isText) {
res.contentType("text/plain");
}
span.end();
resolve();
})
.on("error", (error) => {
if (!content.closed && !content.destroyed) {
content.destroy();
if (!data.wasAnonimized && this.fileSize) {
// the text files may be anonymized and therefore the size may be different
res.header("Content-Length", this.fileSize.toString());
}
span.recordException(error);
span.end();
reject(error);
handleError(error, res);
});
} catch (error) {
handleError(error, res);
}
const content = await this.content();
content
.pipe(anonymizer)
.pipe(res)
.on("close", () => {
if (!content.closed && !content.destroyed) {
content.destroy();
}
span.end();
resolve();
})
.on("error", (error) => {
if (!content.closed && !content.destroyed) {
content.destroy();
}
span.recordException(error);
span.end();
reject(error);
handleError(error, res);
});
} catch (error) {
handleError(error, res);
}
});
});
});
}
}

View File

@@ -39,7 +39,7 @@ export default class AnonymousError extends CustomError {
} else if (this.value instanceof User) {
detail = `${this.value.username}`;
} else if (this.value instanceof GitHubBase) {
detail = `${this.value.repository.repoId}`;
detail = `${this.value.githubRepository.fullName}`;
}
out += this.message;
if (detail) {

View File

@@ -1,13 +1,5 @@
import { join } from "path";
import storage from "./storage";
import {
FILE_TYPE,
RepositoryStatus,
Source,
Tree,
TreeElement,
TreeFile,
} from "./types";
import { RepositoryStatus, Source, Tree, TreeElement, TreeFile } from "./types";
import { Readable } from "stream";
import User from "./User";
import GitHubStream from "./source/GitHubStream";
@@ -21,7 +13,7 @@ import GitHubBase from "./source/GitHubBase";
import Conference from "./Conference";
import ConferenceModel from "./database/conference/conferences.model";
import AnonymousError from "./AnonymousError";
import { downloadQueue, removeQueue } from "./queue";
import { downloadQueue } from "./queue";
import { isConnected } from "./database/database";
import AnonymizedFile from "./AnonymizedFile";
import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model";
@@ -64,13 +56,13 @@ export default class Repository {
this._model = data;
switch (data.source.type) {
case "GitHubDownload":
this.source = new GitHubDownload(data.source, this);
this.source = new GitHubDownload(data.source, this.repoId);
break;
case "GitHubStream":
this.source = new GitHubStream(data.source, this);
this.source = new GitHubStream(data.source);
break;
case "Zip":
this.source = new Zip(data.source, this);
this.source = new Zip(data.source, this.repoId);
break;
default:
throw new AnonymousError("unsupported_source", {
@@ -180,7 +172,7 @@ export default class Repository {
* @returns A stream of anonymized repository compressed
*/
zip(): Promise<Readable> {
return storage.archive(this.originalCachePath, {
return storage.archive(this.repoId, "", {
format: "zip",
fileTransformer: (filename: string) =>
new AnonymizeTransformer(
@@ -394,14 +386,10 @@ export default class Repository {
.startSpan("Repository.removeCache");
span.setAttribute("repoId", this.repoId);
try {
return storage.rm(this.repoId);
} finally {
this.model.isReseted = true;
await this.model.save();
if (
(await storage.exists(this._model.repoId + "/")) !== FILE_TYPE.NOT_FOUND
) {
return storage.rm(this._model.repoId + "/");
}
} finally {
span.end();
}
}
@@ -490,13 +478,6 @@ export default class Repository {
return this._model;
}
get originalCachePath() {
return (
join(this._model.repoId, "original") +
(process.platform === "win32" ? "\\" : "/")
);
}
get status() {
return this._model.status;
}

View File

@@ -39,6 +39,7 @@ export default async function (job: SandboxedJob<Repository, void>) {
throw error;
}
} catch (error) {
console.error(error)
span.recordException(error as Exception);
console.log(`[QUEUE] ${job.data.repoId} is finished with an error`);
} finally {

View File

@@ -19,6 +19,14 @@ import AnonymizedPullRequestModel from "./database/anonymizedPullRequests/anonym
import { getUser } from "./routes/route-utils";
function indexResponse(req: express.Request, res: express.Response) {
if (
req.path.startsWith("/script") ||
req.path.startsWith("/style") ||
req.path.startsWith("/favicon") ||
req.path.startsWith("/api")
) {
return res.status(404).send("Not found");
}
if (
req.params.repoId &&
req.headers["accept"] &&

View File

@@ -6,30 +6,22 @@ import AnonymizedFile from "../AnonymizedFile";
import { Branch, Tree } from "../types";
import { GitHubRepository } from "./GitHubRepository";
import config from "../../config";
import Repository from "../Repository";
import UserModel from "../database/users/users.model";
import AnonymousError from "../AnonymousError";
export default abstract class GitHubBase {
type: "GitHubDownload" | "GitHubStream" | "Zip";
abstract type: "GitHubDownload" | "GitHubStream" | "Zip";
githubRepository: GitHubRepository;
branch: Branch;
accessToken: string | undefined;
repository: Repository;
validToken: boolean = false;
constructor(
data: {
type: "GitHubDownload" | "GitHubStream" | "Zip";
branch?: string;
commit?: string;
repositoryId?: string;
repositoryName?: string;
accessToken?: string;
},
repository: Repository
) {
this.type = data.type;
constructor(data: {
accessToken?: string;
commit?: string;
branch?: string;
repositoryId?: string;
repositoryName?: string;
}) {
this.accessToken = data.accessToken;
const branches = [];
if (data.branch && data.commit) {
@@ -40,23 +32,15 @@ export default abstract class GitHubBase {
externalId: data.repositoryId,
branches,
});
this.repository = repository;
this.branch = branches[0];
}
async getFileContent(file: AnonymizedFile): Promise<Readable> {
throw new AnonymousError("method_not_implemented", {
httpStatus: 501,
object: this,
});
}
abstract getFileContent(
file: AnonymizedFile,
progress?: (status: string) => void
): Promise<Readable>;
getFiles(): Promise<Tree> {
throw new AnonymousError("method_not_implemented", {
httpStatus: 501,
object: this,
});
}
abstract getFiles(progress?: (status: string) => void): Promise<Tree>;
static octokit(token: string) {
return new Octokit({
@@ -77,22 +61,24 @@ export default abstract class GitHubBase {
}
}
async getToken() {
async getToken(ownerID?: any) {
const span = trace.getTracer("ano-file").startSpan("GHBase.getToken");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("repoId", this.githubRepository.fullName || "");
try {
if (this.validToken) {
return this.accessToken as string;
}
const user = await UserModel.findById(this.repository.owner.id, {
accessTokens: 1,
});
if (user?.accessTokens.github) {
const check = await GitHubBase.checkToken(user.accessTokens.github);
if (check) {
this.accessToken = user.accessTokens.github;
this.validToken = true;
return this.accessToken;
if (ownerID) {
const user = await UserModel.findById(ownerID, {
accessTokens: 1,
});
if (user?.accessTokens.github) {
const check = await GitHubBase.checkToken(user.accessTokens.github);
if (check) {
this.accessToken = user.accessTokens.github;
this.validToken = true;
return this.accessToken;
}
}
}
if (this.accessToken) {

View File

@@ -2,32 +2,31 @@ import got from "got";
import { Readable } from "stream";
import { OctokitResponse } from "@octokit/types";
import config from "../../config";
import storage from "../storage";
import Repository from "../Repository";
import GitHubBase from "./GitHubBase";
import AnonymizedFile from "../AnonymizedFile";
import { FILE_TYPE, RepositoryStatus, SourceBase } from "../types";
import { SourceBase } from "../types";
import AnonymousError from "../AnonymousError";
import { trace } from "@opentelemetry/api";
import { FILE_TYPE } from "../storage/Storage";
export default class GitHubDownload extends GitHubBase implements SourceBase {
type: "GitHubDownload" | "GitHubStream" | "Zip" = "GitHubDownload";
constructor(
data: {
type: "GitHubDownload" | "GitHubStream" | "Zip";
branch?: string;
commit?: string;
repositoryId?: string;
repositoryName?: string;
accessToken?: string;
},
repository: Repository
readonly repoId: string
) {
super(data, repository);
super(data);
}
private async _getZipUrl(
auth?: string
auth: string
): Promise<OctokitResponse<unknown, 302>> {
const octokit = GitHubBase.octokit(auth as string);
return octokit.rest.repos.downloadZipballArchive({
@@ -38,69 +37,30 @@ export default class GitHubDownload extends GitHubBase implements SourceBase {
});
}
async download(token?: string) {
async download(progress?: (status: string) => void) {
const span = trace.getTracer("ano-file").startSpan("GHDownload.download");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("repoId", this.githubRepository.fullName || "");
try {
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (
this.repository.status == "download" &&
this.repository.model.statusDate > fiveMinuteAgo
)
throw new AnonymousError("repo_in_download", {
httpStatus: 404,
object: this.repository,
});
let response: OctokitResponse<unknown, number>;
try {
if (!token) {
token = await this.getToken();
}
response = await this._getZipUrl(token);
response = await this._getZipUrl(await this.getToken());
} catch (error) {
span.recordException(error as Error);
if ((error as any).status == 401 && config.GITHUB_TOKEN) {
try {
response = await this._getZipUrl(config.GITHUB_TOKEN);
} catch (error) {
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
cause: error as Error,
object: this.repository,
});
}
} else {
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
object: this.repository,
cause: error as Error,
});
}
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
object: this.githubRepository,
cause: error as Error,
});
}
await this.repository.updateStatus(RepositoryStatus.DOWNLOAD);
const originalPath = this.repository.originalCachePath;
await storage.mk(originalPath);
let progress: { transferred: number } | undefined = undefined;
await storage.mk(this.repoId);
let downloadProgress: { transferred: number } | undefined = undefined;
let progressTimeout;
let inDownload = true;
const that = this;
async function updateProgress() {
if (inDownload) {
if (progress && that.repository.status == RepositoryStatus.DOWNLOAD) {
await that.repository.updateStatus(
that.repository.status,
progress.transferred.toString()
);
if (progress) {
progress(downloadProgress?.transferred?.toString() || "");
}
progressTimeout = setTimeout(updateProgress, 1500);
}
@@ -110,45 +70,43 @@ export default class GitHubDownload extends GitHubBase implements SourceBase {
try {
const downloadStream = got.stream(response.url);
downloadStream.addListener("downloadProgress", async (p) => {
progress = p;
downloadProgress = p;
});
await storage.extractZip(originalPath, downloadStream, undefined, this);
await storage.extractZip(
this.repoId,
"",
downloadStream,
undefined,
this
);
} catch (error) {
span.recordException(error as Error);
await this.repository.updateStatus(
RepositoryStatus.ERROR,
"unable_to_download"
);
throw new AnonymousError("unable_to_download", {
httpStatus: 500,
cause: error as Error,
object: this.repository,
object: this.githubRepository,
});
} finally {
inDownload = false;
clearTimeout(progressTimeout);
}
this.repository.model.isReseted = false;
try {
await this.repository.updateStatus(RepositoryStatus.READY);
} catch (error) {
span.recordException(error as Error);
}
} finally {
span.end();
}
}
async getFileContent(file: AnonymizedFile): Promise<Readable> {
async getFileContent(
file: AnonymizedFile,
progress?: (status: string) => void
): Promise<Readable> {
const span = trace
.getTracer("ano-file")
.startSpan("GHDownload.getFileContent");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("repoId", this.githubRepository.fullName || "");
try {
const exists = await storage.exists(file.originalCachePath);
const exists = await storage.exists(file.filePath);
if (exists === FILE_TYPE.FILE) {
return storage.read(file.originalCachePath);
return storage.read(this.repoId, file.filePath);
} else if (exists === FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
@@ -159,18 +117,17 @@ export default class GitHubDownload extends GitHubBase implements SourceBase {
await file.originalPath();
// the cache is not ready, we need to download the repository
await this.download();
return storage.read(file.originalCachePath);
await this.download(progress);
return storage.read(this.repoId, file.filePath);
} finally {
span.end();
}
}
async getFiles() {
const folder = this.repository.originalCachePath;
if ((await storage.exists(folder)) === FILE_TYPE.NOT_FOUND) {
if ((await storage.exists(this.repoId)) === FILE_TYPE.NOT_FOUND) {
await this.download();
}
return storage.listFiles(folder);
return storage.listFiles(this.repoId);
}
}

View File

@@ -1,102 +1,134 @@
import AnonymizedFile from "../AnonymizedFile";
import Repository from "../Repository";
import GitHubBase from "./GitHubBase";
import storage from "../storage";
import { RepositoryStatus, SourceBase, Tree } from "../types";
import { SourceBase, Tree } from "../types";
import * as path from "path";
import got from "got";
import * as stream from "stream";
import AnonymousError from "../AnonymousError";
import config from "../../config";
import { trace } from "@opentelemetry/api";
import { FILE_TYPE } from "../storage/Storage";
export default class GitHubStream extends GitHubBase implements SourceBase {
constructor(
data: {
type: "GitHubDownload" | "GitHubStream" | "Zip";
branch?: string;
commit?: string;
repositoryId?: string;
repositoryName?: string;
accessToken?: string;
},
repository: Repository
) {
super(data, repository);
type: "GitHubDownload" | "GitHubStream" | "Zip" = "GitHubStream";
constructor(data: {
branch?: string;
commit?: string;
repositoryId?: string;
repositoryName?: string;
accessToken?: string;
}) {
super(data);
}
downloadFile(sha: string, token: string) {
const span = trace.getTracer("ano-file").startSpan("GHStream.downloadFile");
span.setAttribute("sha", sha);
const octokit = GitHubBase.octokit(token);
try {
const { url } = octokit.rest.git.getBlob.endpoint({
owner: this.githubRepository.owner,
repo: this.githubRepository.repo,
file_sha: sha,
});
return got.stream(url, {
headers: {
"X-GitHub-Api-Version": "2022-11-28",
accept: "application/vnd.github.raw+json",
authorization: `token ${token}`,
},
});
} catch (error) {
console.error(error);
// span.recordException(error as Error);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
object: this.githubRepository,
cause: error as Error,
});
} finally {
span.end();
}
}
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
return trace
const span = trace
.getTracer("ano-file")
.startActiveSpan("GHStream.getFileContent", async (span) => {
span.setAttribute("path", file.anonymizedPath);
const octokit = GitHubBase.octokit(await this.getToken());
.startSpan("GHStream.getFileContent");
span.setAttribute("repoId", file.repository.repoId);
span.setAttribute("file", file.anonymizedPath);
try {
try {
file.filePath;
} catch (_) {
// compute the original path if ambiguous
await file.originalPath();
}
const fileInfo = await storage.exists(
file.repository.repoId,
file.filePath
);
if (fileInfo == FILE_TYPE.FILE) {
return storage.read(file.repository.repoId, file.filePath);
} else if (fileInfo == FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
object: file,
});
}
span.setAttribute("path", file.filePath);
const file_sha = await file.sha();
if (!file_sha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
try {
const token = await this.getToken(file.repository.owner.id);
const content = this.downloadFile(file_sha, token);
const file_sha = await file.sha();
if (!file_sha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
try {
const ghRes = await octokit.rest.git.getBlob({
owner: this.githubRepository.owner,
repo: this.githubRepository.repo,
file_sha,
});
if (!ghRes.data.content && ghRes.data.size != 0) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
// empty file
let content: Buffer;
if (ghRes.data.content) {
content = Buffer.from(
ghRes.data.content,
ghRes.data.encoding as BufferEncoding
);
} else {
content = Buffer.from("");
}
await storage.write(file.originalCachePath, content, file, this);
this.repository.model.isReseted = false;
await this.repository.model.save();
if (this.repository.status !== RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
return stream.Readable.from(content);
} catch (error) {
if (
(error as any).status === 404 ||
(error as any).httpStatus === 404
) {
throw new AnonymousError("file_not_found", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
}
throw new AnonymousError("file_too_big", {
// duplicate the stream to write it to the storage
const stream1 = content.pipe(new stream.PassThrough());
const stream2 = content.pipe(new stream.PassThrough());
storage.write(
file.repository.repoId,
file.filePath,
stream1,
file,
this
);
return stream2;
} catch (error) {
if (
(error as any).status === 404 ||
(error as any).httpStatus === 404
) {
throw new AnonymousError("file_not_found", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
} finally {
span.end();
}
});
throw new AnonymousError("file_too_big", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
}
} finally {
span.end();
}
}
async getFiles() {
const span = trace.getTracer("ano-file").startSpan("GHStream.getFiles");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("repoName", this.githubRepository.fullName || "");
try {
let commit = this.branch?.commit;
if (!commit && this.repository.model.source.commit) {
commit = this.repository.model.source.commit;
}
return this.getTree(commit);
} finally {
span.end();
@@ -113,33 +145,21 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
}
) {
const span = trace.getTracer("ano-file").startSpan("GHStream.getTree");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("repoName", this.githubRepository.fullName || "");
span.setAttribute("sha", sha);
this.repository.model.truckedFileList = false;
let ghRes: Awaited<ReturnType<typeof this.getGHTree>>;
try {
count.request++;
ghRes = await this.getGHTree(sha, { recursive: true });
} catch (error) {
console.error(error);
span.recordException(error as Error);
if ((error as any).status == 409) {
// empty tree
if (this.repository.status != RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
// cannot be empty otherwise it would try to download it again
span.end();
return { __: {} };
} else {
console.log(
`[ERROR] getTree ${this.repository.repoId}@${sha}: ${
(error as Error).message
}`
);
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
const err = new AnonymousError("repo_not_accessible", {
httpStatus: (error as any).status,
cause: error as Error,
@@ -159,15 +179,12 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
if (ghRes.truncated) {
await this.getTruncatedTree(sha, tree, parentPath, count);
}
if (this.repository.status !== RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
span.end();
return tree;
}
private async getGHTree(sha: string, opt = { recursive: true }) {
const span = trace.getTracer("ano-file").startSpan("GHStream.getGHTree");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("sha", sha);
try {
const octokit = GitHubBase.octokit(await this.getToken());
@@ -196,7 +213,6 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
const span = trace
.getTracer("ano-file")
.startSpan("GHStream.getTruncatedTree");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("sha", sha);
span.setAttribute("parentPath", parentPath);
try {
@@ -207,8 +223,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
data = await this.getGHTree(sha, { recursive: false });
this.tree2Tree(data.tree, truncatedTree, parentPath);
} catch (error) {
console.error(error);
this.repository.model.truckedFileList = true;
span.recordException(error as Error);
return;
}
@@ -235,11 +250,10 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
const data = await this.getGHTree(sha, { recursive: true });
this.tree2Tree(data.tree, truncatedTree, parentPath);
if (data.truncated) {
this.repository.model.truckedFileList = true;
// TODO: TRUNCATED
}
} catch (error) {
console.error(error);
this.repository.model.truckedFileList = true;
span.recordException(error as Error);
}
}
} finally {
@@ -260,7 +274,6 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
parentPath: string = ""
) {
const span = trace.getTracer("ano-file").startSpan("GHStream.tree2Tree");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("parentPath", parentPath);
try {
for (let elem of tree) {
@@ -286,7 +299,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
// if elem is a file add the file size in the file list
if (elem.type == "blob") {
if (Object.keys(current).length > config.MAX_FILE_FOLDER) {
this.repository.model.truckedFileList = true;
// TODO: TRUNCATED
continue;
}
let p = paths[end];

View File

@@ -1,25 +1,22 @@
import AnonymizedFile from "../AnonymizedFile";
import Repository from "../Repository";
import storage from "../storage";
import { SourceBase } from "../types";
import * as stream from "stream";
export default class Zip implements SourceBase {
type = "Zip";
repository: Repository;
url?: string;
constructor(data: any, repository: Repository) {
this.repository = repository;
constructor(data: any, readonly repoId: string) {
this.url = data.url;
}
async getFiles() {
return storage.listFiles(this.repository.originalCachePath);
return storage.listFiles(this.repoId);
}
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
return storage.read(file.originalCachePath);
return storage.read(file.repository.repoId, file.filePath);
}
toJSON(): any {

View File

@@ -1,6 +1,6 @@
import { FILE_TYPE, SourceBase, StorageBase, Tree } from "../types";
import { SourceBase, Tree } from "../types";
import config from "../../config";
import { Stream } from "node:stream";
import * as fs from "fs";
import { Extract } from "unzip-stream";
import { join, basename, dirname } from "path";
@@ -11,21 +11,25 @@ import { promisify } from "util";
import AnonymizedFile from "../AnonymizedFile";
import { lookup } from "mime-types";
import { trace } from "@opentelemetry/api";
import StorageBase, { FILE_TYPE } from "./Storage";
export default class FileSystem implements StorageBase {
export default class FileSystem extends StorageBase {
type = "FileSystem";
constructor() {}
constructor() {
super();
}
/** @override */
async exists(p: string): Promise<FILE_TYPE> {
async exists(repoId: string, p: string = ""): Promise<FILE_TYPE> {
const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return trace
.getTracer("ano-file")
.startActiveSpan("fs.exists", async (span) => {
span.setAttribute("path", p);
span.setAttribute("full-path", join(config.FOLDER, p));
span.setAttribute("full-path", fullPath);
try {
const stat = await fs.promises.stat(join(config.FOLDER, p));
const stat = await fs.promises.stat(fullPath);
if (stat.isDirectory()) return FILE_TYPE.FOLDER;
if (stat.isFile()) return FILE_TYPE.FILE;
} catch (_) {
@@ -37,12 +41,13 @@ export default class FileSystem implements StorageBase {
}
/** @override */
async send(p: string, res: Response) {
async send(repoId: string, p: string, res: Response) {
const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return trace
.getTracer("ano-file")
.startActiveSpan("fs.send", async (span) => {
span.setAttribute("path", p);
res.sendFile(join(config.FOLDER, p), { dotfiles: "allow" }, (err) => {
span.setAttribute("path", fullPath);
res.sendFile(fullPath, { dotfiles: "allow" }, (err) => {
if (err) {
span.recordException(err);
}
@@ -52,44 +57,49 @@ export default class FileSystem implements StorageBase {
}
/** @override */
async read(p: string): Promise<Readable> {
return fs.createReadStream(join(config.FOLDER, p));
async read(repoId: string, p: string): Promise<Readable> {
const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return fs.createReadStream(fullPath);
}
async fileInfo(path: string) {
const info = await fs.promises.stat(join(config.FOLDER, path));
async fileInfo(repoId: string, path: string) {
const fullPath = join(config.FOLDER, this.repoPath(repoId), path);
const info = await fs.promises.stat(fullPath);
return {
size: info.size,
lastModified: info.mtime,
contentType: info.isDirectory()
? "application/x-directory"
: (lookup(join(config.FOLDER, path)) as string),
: (lookup(fullPath) as string),
};
}
/** @override */
async write(
repoId: string,
p: string,
data: Buffer,
data: string | Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.write");
span.setAttribute("path", p);
const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
span.setAttribute("path", fullPath);
try {
await this.mk(dirname(p));
return await fs.promises.writeFile(join(config.FOLDER, p), data, "utf-8");
await this.mk(repoId, dirname(p));
return await fs.promises.writeFile(fullPath, data, "utf-8");
} finally {
span.end();
}
}
/** @override */
async rm(dir: string): Promise<void> {
async rm(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.rm");
span.setAttribute("path", dir);
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
span.setAttribute("path", fullPath);
try {
await fs.promises.rm(join(config.FOLDER, dir), {
await fs.promises.rm(fullPath, {
force: true,
recursive: true,
});
@@ -99,11 +109,12 @@ export default class FileSystem implements StorageBase {
}
/** @override */
async mk(dir: string): Promise<void> {
async mk(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.mk");
span.setAttribute("path", dir);
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
try {
await fs.promises.mkdir(join(config.FOLDER, dir), {
await fs.promises.mkdir(fullPath, {
recursive: true,
});
} catch (err: any) {
@@ -118,9 +129,9 @@ export default class FileSystem implements StorageBase {
/** @override */
async listFiles(
dir: string,
repoId: string,
dir: string = "",
opt: {
root?: string;
onEntry?: (file: { path: string; size: number }) => void;
} = {}
): Promise<Tree> {
@@ -128,20 +139,18 @@ export default class FileSystem implements StorageBase {
.getTracer("ano-file")
.startActiveSpan("fs.listFiles", async (span) => {
span.setAttribute("path", dir);
if (opt.root == null) {
opt.root = config.FOLDER;
}
let files = await fs.promises.readdir(join(opt.root, dir));
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
let files = await fs.promises.readdir(fullPath);
const output: Tree = {};
for (let file of files) {
let filePath = join(dir, file);
try {
const stats = await fs.promises.stat(join(opt.root, filePath));
const stats = await fs.promises.stat(join(fullPath, filePath));
if (file[0] == "$") {
file = "\\" + file;
}
if (stats.isDirectory()) {
output[file] = await this.listFiles(filePath, opt);
output[file] = await this.listFiles(repoId, filePath, opt);
} else if (stats.isFile()) {
if (opt.onEntry) {
opt.onEntry({
@@ -162,16 +171,18 @@ export default class FileSystem implements StorageBase {
/** @override */
async extractZip(
repoId: string,
p: string,
data: Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
const pipe = promisify(pipeline);
const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return pipe(
data,
Extract({
path: join(config.FOLDER, p),
path: fullPath,
decodeString: (buf) => {
const name = buf.toString();
const newName = name.substr(name.indexOf("/") + 1);
@@ -184,6 +195,7 @@ export default class FileSystem implements StorageBase {
/** @override */
async archive(
repoId: string,
dir: string,
opt?: {
format?: "zip" | "tar";
@@ -191,15 +203,16 @@ export default class FileSystem implements StorageBase {
}
) {
const archive = archiver(opt?.format || "zip", {});
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
await this.listFiles(dir, {
await this.listFiles(repoId, dir, {
onEntry: async (file) => {
let rs = await this.read(file.path);
let rs = await this.read(repoId, file.path);
if (opt?.fileTransformer) {
// apply transformation on the stream
rs = rs.pipe(opt.fileTransformer(file.path));
}
const f = file.path.replace(dir, "");
const f = file.path.replace(fullPath, "");
archive.append(rs, {
name: basename(f),
prefix: dirname(f),

View File

@@ -1,26 +1,29 @@
import { FILE_TYPE, SourceBase, StorageBase, Tree, TreeFile } from "../types";
import {
GetObjectCommand,
ListObjectsV2CommandOutput,
PutObjectCommandInput,
S3,
} from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { NodeHttpHandler } from "@smithy/node-http-handler";
import config from "../../config";
import { pipeline, Readable, Transform } from "stream";
import ArchiveStreamToS3 from "decompress-stream-to-s3";
import { Response } from "express";
import { lookup } from "mime-types";
import { contentType } from "mime-types";
import * as archiver from "archiver";
import { dirname, basename } from "path";
import { trace } from "@opentelemetry/api";
import { dirname, basename, join } from "path";
import { SourceBase, Tree, TreeFile } from "../types";
import AnonymousError from "../AnonymousError";
import AnonymizedFile from "../AnonymizedFile";
import { trace } from "@opentelemetry/api";
import StorageBase, { FILE_TYPE } from "./Storage";
export default class S3Storage implements StorageBase {
export default class S3Storage extends StorageBase {
type = "AWS";
constructor() {
super();
if (!config.S3_BUCKET)
throw new AnonymousError("s3_config_not_provided", {
httpStatus: 500,
@@ -40,26 +43,25 @@ export default class S3Storage implements StorageBase {
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
}),
});
}
/** @override */
async exists(path: string): Promise<FILE_TYPE> {
async exists(repoId: string, path: string = ""): Promise<FILE_TYPE> {
const span = trace.getTracer("ano-file").startSpan("s3.exists");
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
try {
// if we can get the file info, it is a file
await this.fileInfo(path);
await this.fileInfo(repoId, path);
return FILE_TYPE.FILE;
} catch (err) {
// check if it is a directory
const data = await this.client().listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: path,
Prefix: join(this.repoPath(repoId), path),
MaxKeys: 1,
});
return (data.Contents?.length || 0) > 0
@@ -72,19 +74,20 @@ export default class S3Storage implements StorageBase {
}
/** @override */
async mk(dir: string): Promise<void> {
async mk(repoId: string, dir: string = ""): Promise<void> {
// no need to create folder on S3
}
/** @override */
async rm(dir: string): Promise<void> {
async rm(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("s3.rm");
span.setAttribute("repoId", repoId);
span.setAttribute("path", dir);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const data = await this.client(200000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
Prefix: join(this.repoPath(repoId), dir),
MaxKeys: 100,
});
@@ -106,7 +109,7 @@ export default class S3Storage implements StorageBase {
await this.client(200000).deleteObjects(params);
if (data.IsTruncated) {
await this.rm(dir);
await this.rm(repoId, dir);
}
} finally {
span.end();
@@ -114,15 +117,16 @@ export default class S3Storage implements StorageBase {
}
/** @override */
async send(p: string, res: Response) {
async send(repoId: string, path: string, res: Response) {
const span = trace.getTracer("ano-file").startSpan("s3.send");
span.setAttribute("path", p);
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
try {
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: p,
Key: join(this.repoPath(repoId), path),
});
const s = await this.client().send(command);
res.status(200);
@@ -142,7 +146,7 @@ export default class S3Storage implements StorageBase {
try {
res.status(500);
} catch (err) {
console.error(`[ERROR] S3 send ${p}`, err);
console.error(`[ERROR] S3 send ${path}`, err);
}
}
} finally {
@@ -150,21 +154,22 @@ export default class S3Storage implements StorageBase {
}
}
async fileInfo(path: string) {
async fileInfo(repoId: string, path: string) {
const span = trace.getTracer("ano-file").startSpan("s3.fileInfo");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const info = await this.client(3000).headObject({
Bucket: config.S3_BUCKET,
Key: path,
Key: join(this.repoPath(repoId), path),
});
return {
size: info.ContentLength,
lastModified: info.LastModified,
contentType: info.ContentType
? info.ContentType
: (lookup(path) as string),
: (contentType(path) as string),
};
} finally {
span.end();
@@ -172,20 +177,21 @@ export default class S3Storage implements StorageBase {
}
/** @override */
async read(path: string): Promise<Readable> {
async read(repoId: string, path: string): Promise<Readable> {
const span = trace.getTracer("ano-file").startSpan("s3.rreadm");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: path,
Key: join(this.repoPath(repoId), path),
});
const res = (await this.client(3000).send(command)).Body;
if (!res) {
throw new AnonymousError("file_not_found", {
httpStatus: 404,
object: path,
object: join(this.repoPath(repoId), path),
});
}
return res as Readable;
@@ -196,26 +202,35 @@ export default class S3Storage implements StorageBase {
/** @override */
async write(
repoId: string,
path: string,
data: Buffer,
data: string | Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("s3.rm");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const params: PutObjectCommandInput = {
Bucket: config.S3_BUCKET,
Key: path,
Key: join(this.repoPath(repoId), path),
Body: data,
ContentType: lookup(path).toString(),
ContentType: contentType(path).toString(),
};
if (source) {
params.Tagging = `source=${source.type}`;
}
// 30s timeout
await this.client(30000).putObject(params);
const parallelUploads3 = new Upload({
// 30s timeout
client: this.client(30000),
params,
});
await parallelUploads3.done();
return;
} finally {
span.end();
@@ -223,7 +238,7 @@ export default class S3Storage implements StorageBase {
}
/** @override */
async listFiles(dir: string): Promise<Tree> {
async listFiles(repoId: string, dir: string = ""): Promise<Tree> {
const span = trace.getTracer("ano-file").startSpan("s3.listFiles");
span.setAttribute("path", dir);
try {
@@ -235,7 +250,7 @@ export default class S3Storage implements StorageBase {
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
Prefix: join(this.repoPath(repoId), dir),
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
@@ -244,7 +259,7 @@ export default class S3Storage implements StorageBase {
for (const f of req.Contents) {
if (!f.Key) continue;
f.Key = f.Key.replace(dir, "");
f.Key = f.Key.replace(join(this.repoPath(repoId), dir), "");
const paths = f.Key.split("/");
let current: Tree = out;
for (let i = 0; i < paths.length - 1; i++) {
@@ -271,19 +286,20 @@ export default class S3Storage implements StorageBase {
/** @override */
async extractZip(
p: string,
repoId: string,
path: string,
data: Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
let toS3: ArchiveStreamToS3;
const span = trace.getTracer("ano-file").startSpan("s3.extractZip");
span.setAttribute("path", p);
span.setAttribute("path", path);
return new Promise((resolve, reject) => {
if (!config.S3_BUCKET) return reject("S3_BUCKET not set");
toS3 = new ArchiveStreamToS3({
bucket: config.S3_BUCKET,
prefix: p,
prefix: join(this.repoPath(repoId), path),
s3: this.client(2 * 60 * 60 * 1000), // 2h timeout
type: "zip",
onEntry: (header) => {
@@ -315,13 +331,15 @@ export default class S3Storage implements StorageBase {
/** @override */
async archive(
dir: string,
repoId: string,
dir: string = "",
opt?: {
format?: "zip" | "tar";
fileTransformer?: (p: string) => Transform;
}
) {
const span = trace.getTracer("ano-file").startSpan("s3.archive");
span.setAttribute("repoId", repoId);
span.setAttribute("path", dir);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
@@ -333,7 +351,7 @@ export default class S3Storage implements StorageBase {
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
Prefix: join(this.repoPath(repoId), dir),
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
@@ -342,9 +360,11 @@ export default class S3Storage implements StorageBase {
for (const f of req.Contents || []) {
if (!f.Key) continue;
const filename = basename(f.Key);
const prefix = dirname(f.Key.replace(dir, ""));
const prefix = dirname(
f.Key.replace(join(this.repoPath(repoId), dir), "")
);
let rs = await this.read(f.Key);
let rs = await this.read(repoId, f.Key);
if (opt?.fileTransformer) {
// apply transformation on the stream
rs = rs.pipe(opt.fileTransformer(f.Key));

117
src/storage/Storage.ts Normal file
View File

@@ -0,0 +1,117 @@
import { join } from "path";
import { Transform, Readable } from "stream";
import * as archiver from "archiver";
import { Response } from "express";
import AnonymizedFile from "../AnonymizedFile";
import { SourceBase, Tree } from "../types";
export enum FILE_TYPE {
FILE = "file",
FOLDER = "folder",
NOT_FOUND = "not_found",
}
export default abstract class StorageBase {
/**
* The type of storage
*/
abstract type: string;
/**
* check if the path exists
* @param path the path to check
*/
abstract exists(repoId: string, path: string): Promise<FILE_TYPE>;
abstract send(repoId: string, path: string, res: Response): Promise<void>;
/**
* Read the content of a file
* @param path the path to the file
*/
abstract read(repoId: string, path: string): Promise<Readable>;
abstract fileInfo(
repoId: string,
path: string
): Promise<{
size: number | undefined;
lastModified: Date | undefined;
contentType: string;
}>;
/**
* Write data to a file
* @param path the path to the file
* @param data the content of the file
* @param file the file
* @param source the source of the file
*/
abstract write(
repoId: string,
path: string,
data: string | Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void>;
/**
* List the files from dir
* @param dir
*/
abstract listFiles(repoId: string, dir: string): Promise<Tree>;
/**
* Extract the content of tar to dir
* @param dir
* @param tar
* @param file the file
* @param source the source of the file
*/
abstract extractZip(
repoId: string,
dir: string,
tar: Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void>;
/**
* Remove the path
* @param dir
*/
abstract rm(repoId: string, dir: string): Promise<void>;
/**
* Archive the content of dir
* @param dir
* @param opt
*/
abstract archive(
repoId: string,
dir: string,
opt?: {
/**
* Archive format
*/
format?: "zip" | "tar";
/**
* Transformer to apply on the content of the file
*/
fileTransformer?: (p: string) => Transform;
}
): Promise<archiver.Archiver>;
/**
* Create a directory
* @param dir
*/
abstract mk(repoId: string, dir: string): Promise<void>;
repoPath(repoId: string) {
return (
join(repoId, "original") + (process.platform === "win32" ? "\\" : "/")
);
}
}

View File

@@ -4,9 +4,7 @@ import Zip from "./source/Zip";
import S3Storage from "./storage/S3";
import FileSystem from "./storage/FileSystem";
import AnonymizedFile from "./AnonymizedFile";
import { Transform, Readable } from "stream";
import * as archiver from "archiver";
import { Response } from "express";
import { Readable } from "stream";
export interface SourceBase {
readonly type: string;
@@ -32,104 +30,6 @@ export interface SourceBase {
export type Source = GitHubDownload | GitHubStream | Zip;
export enum FILE_TYPE {
FILE = "file",
FOLDER = "folder",
NOT_FOUND = "not_found",
}
export interface StorageBase {
/**
* The type of storage
*/
type: string;
/**
* check if the path exists
* @param path the path to check
*/
exists(path: string): Promise<FILE_TYPE>;
send(p: string, res: Response): Promise<void>;
/**
* Read the content of a file
* @param path the path to the file
*/
read(path: string): Promise<Readable>;
fileInfo(path: string): Promise<{
size: number | undefined;
lastModified: Date | undefined;
contentType: string;
}>;
/**
* Write data to a file
* @param path the path to the file
* @param data the content of the file
* @param file the file
* @param source the source of the file
*/
write(
path: string,
data: Buffer,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void>;
/**
* List the files from dir
* @param dir
*/
listFiles(dir: string): Promise<Tree>;
/**
* Extract the content of tar to dir
* @param dir
* @param tar
* @param file the file
* @param source the source of the file
*/
extractZip(
dir: string,
tar: Readable,
file?: AnonymizedFile,
source?: SourceBase
): Promise<void>;
/**
* Remove the path
* @param dir
*/
rm(dir: string): Promise<void>;
/**
* Archive the content of dir
* @param dir
* @param opt
*/
archive(
dir: string,
opt?: {
/**
* Archive format
*/
format?: "zip" | "tar";
/**
* Transformer to apply on the content of the file
*/
fileTransformer?: (p: string) => Transform;
}
): Promise<archiver.Archiver>;
/**
* Create a directory
* @param dir
*/
mk(dir: string): Promise<void>;
}
export type Storage = S3Storage | FileSystem;
export interface Branch {