diff --git a/package-lock.json b/package-lock.json index 199daa9..a057956 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,6 +10,7 @@ "license": "GPL-3.0", "dependencies": { "@aws-sdk/client-s3": "^3.540.0", + "@aws-sdk/lib-storage": "^3.540.0", "@mongodb-js/zstd": "^1.2.0", "@octokit/rest": "^20.0.2", "@opentelemetry/api": "^1.8.0", @@ -557,6 +558,35 @@ "node": ">=14.0.0" } }, + "node_modules/@aws-sdk/lib-storage": { + "version": "3.540.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.540.0.tgz", + "integrity": "sha512-xNLOpuOSzGO90fwn+GBsM//a4ALYl85WEsovKyJI6jYJTMCGLrzJQeq8cxeC5Xz6w8Ol86lf80Gll/cz4phy7g==", + "dependencies": { + "@smithy/abort-controller": "^2.2.0", + "@smithy/middleware-endpoint": "^2.5.0", + "@smithy/smithy-client": "^2.5.0", + "buffer": "5.6.0", + "events": "3.3.0", + "stream-browserify": "3.0.0", + "tslib": "^2.6.2" + }, + "engines": { + "node": ">=14.0.0" + }, + "peerDependencies": { + "@aws-sdk/client-s3": "^3.0.0" + } + }, + "node_modules/@aws-sdk/lib-storage/node_modules/buffer": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", + "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", + "dependencies": { + "base64-js": "^1.0.2", + "ieee754": "^1.1.4" + } + }, "node_modules/@aws-sdk/middleware-bucket-endpoint": { "version": "3.535.0", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-bucket-endpoint/-/middleware-bucket-endpoint-3.535.0.tgz", @@ -5946,6 +5976,14 @@ "node": ">= 0.6" } }, + "node_modules/events": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", + "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==", + "engines": { + "node": ">=0.8.x" + } + }, "node_modules/express": { "version": "4.19.2", "resolved": "https://registry.npmjs.org/express/-/express-4.19.2.tgz", @@ -9578,6 +9616,15 @@ "node": ">= 0.8" } }, + "node_modules/stream-browserify": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz", + "integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==", + "dependencies": { + "inherits": "~2.0.4", + "readable-stream": "^3.5.0" + } + }, "node_modules/stream-shift": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", @@ -10828,6 +10875,31 @@ "tslib": "^2.6.2" } }, + "@aws-sdk/lib-storage": { + "version": "3.540.0", + "resolved": "https://registry.npmjs.org/@aws-sdk/lib-storage/-/lib-storage-3.540.0.tgz", + "integrity": "sha512-xNLOpuOSzGO90fwn+GBsM//a4ALYl85WEsovKyJI6jYJTMCGLrzJQeq8cxeC5Xz6w8Ol86lf80Gll/cz4phy7g==", + "requires": { + "@smithy/abort-controller": "^2.2.0", + "@smithy/middleware-endpoint": "^2.5.0", + "@smithy/smithy-client": "^2.5.0", + "buffer": "5.6.0", + "events": "3.3.0", + "stream-browserify": "3.0.0", + "tslib": "^2.6.2" + }, + "dependencies": { + "buffer": { + "version": "5.6.0", + "resolved": "https://registry.npmjs.org/buffer/-/buffer-5.6.0.tgz", + "integrity": "sha512-/gDYp/UtU0eA1ys8bOs9J6a+E/KWIY+DZ+Q2WESNUA0jFRsJOc0SNUO6xJ5SGA1xueg3NL65W6s+NY5l9cunuw==", + "requires": { + "base64-js": "^1.0.2", + "ieee754": "^1.1.4" + } + } + } + }, "@aws-sdk/middleware-bucket-endpoint": { "version": "3.535.0", "resolved": "https://registry.npmjs.org/@aws-sdk/middleware-bucket-endpoint/-/middleware-bucket-endpoint-3.535.0.tgz", @@ -14866,6 +14938,11 @@ "resolved": "https://registry.npmjs.org/etag/-/etag-1.8.1.tgz", "integrity": "sha512-aIL5Fx7mawVa300al2BnEE4iNvo1qETxLrPI/o05L7z6go7fCw1J6EQmbK4FmJ2AS7kgVF/KEZWufBfdClMcPg==" }, + "events": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/events/-/events-3.3.0.tgz", + "integrity": "sha512-mQw+2fkQbALzQ7V0MY0IqdnXNOeTtP4r0lN9z7AAawCXgqea7bDii20AYrIBrFd/Hx0M2Ocz6S111CaFkUcb0Q==" + }, "express": { "version": "4.19.2", "resolved": "https://registry.npmjs.org/express/-/express-4.19.2.tgz", @@ -17521,6 +17598,15 @@ "resolved": "https://registry.npmjs.org/statuses/-/statuses-2.0.1.tgz", "integrity": "sha512-RwNA9Z/7PrK06rYLIzFMlaF+l73iwpzsqRIFgbMLbTcLD6cOao82TaWefPXQvB2fOC4AjuYSEndS7N/mTCbkdQ==" }, + "stream-browserify": { + "version": "3.0.0", + "resolved": "https://registry.npmjs.org/stream-browserify/-/stream-browserify-3.0.0.tgz", + "integrity": "sha512-H73RAHsVBapbim0tU2JwwOiXUj+fikfiaoYAKHF3VJfA0pe2BCzkhAHBlLG6REzE+2WNZcxOXjK7lkso+9euLA==", + "requires": { + "inherits": "~2.0.4", + "readable-stream": "^3.5.0" + } + }, "stream-shift": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/stream-shift/-/stream-shift-1.0.3.tgz", diff --git a/package.json b/package.json index 292fa35..fe815a2 100644 --- a/package.json +++ b/package.json @@ -32,6 +32,7 @@ ], "dependencies": { "@aws-sdk/client-s3": "^3.540.0", + "@aws-sdk/lib-storage": "^3.540.0", "@mongodb-js/zstd": "^1.2.0", "@octokit/rest": "^20.0.2", "@opentelemetry/api": "^1.8.0", diff --git a/src/AnonymizedFile.ts b/src/AnonymizedFile.ts index 76cef3a..6ad9949 100644 --- a/src/AnonymizedFile.ts +++ b/src/AnonymizedFile.ts @@ -3,7 +3,7 @@ import { Response } from "express"; import { Readable } from "stream"; import { trace } from "@opentelemetry/api"; import Repository from "./Repository"; -import { FILE_TYPE, Tree, TreeElement, TreeFile } from "./types"; +import { RepositoryStatus, Tree, TreeElement, TreeFile } from "./types"; import storage from "./storage"; import config from "../config"; import { @@ -14,7 +14,7 @@ import { import AnonymousError from "./AnonymousError"; import { handleError } from "./routes/route-utils"; import { lookup } from "mime-types"; -import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model"; +import { FILE_TYPE } from "./storage/Storage"; /** * Represent a file in a anonymized repository @@ -193,28 +193,20 @@ export default class AnonymizedFile { .getTracer("ano-file") .startActiveSpan("content", async (span) => { try { - span.setAttribute("anonymizedPath", this.anonymizedPath); if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { await this.originalPath(); } - span.addEvent("originalPath", { originalPath: this._originalPath }); + span.addEvent("filePath", { originalPath: this.filePath }); if (this.fileSize && this.fileSize > config.MAX_FILE_SIZE) { throw new AnonymousError("file_too_big", { object: this, httpStatus: 403, }); } - const exist = await storage.exists(this.originalCachePath); - span.addEvent("file_exist", { exist }); - if (exist == FILE_TYPE.FILE) { - return storage.read(this.originalCachePath); - } else if (exist == FILE_TYPE.FOLDER) { - throw new AnonymousError("folder_not_supported", { - object: this, - httpStatus: 400, - }); - } - return await this.repository.source?.getFileContent(this); + const out = await this.repository.source?.getFileContent(this); + this.repository.model.isReseted = false; + this.repository.updateStatus(RepositoryStatus.READY); + return out; } finally { span.end(); } @@ -233,81 +225,70 @@ export default class AnonymizedFile { }); } - get originalCachePath() { - if (!this.originalPath) - throw new AnonymousError("path_not_defined", { - object: this, - httpStatus: 400, - }); + get filePath() { if (!this._originalPath) { if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { throw new AnonymousError("path_not_defined", { object: this, httpStatus: 400, }); - } else { - return join(this.repository.originalCachePath, this.anonymizedPath); } + return this.anonymizedPath; } - return join(this.repository.originalCachePath, this._originalPath); + return this._originalPath; } async send(res: Response): Promise { - return trace.getTracer("ano-file").startActiveSpan("send", async (span) => { - span.setAttribute("anonymizedPath", this.anonymizedPath); - return new Promise(async (resolve, reject) => { - try { - const content = await this.content(); - const mime = lookup(this.anonymizedPath); - if (mime && this.extension() != "ts") { - res.contentType(mime); - } else if (isTextFile(this.anonymizedPath)) { - res.contentType("text/plain"); - } - res.header("Accept-Ranges", "none"); - let fileInfo: Awaited>; + return trace + .getTracer("ano-file") + .startActiveSpan("AnonymizedFile.send", async (span) => { + span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("anonymizedPath", this.anonymizedPath); + return new Promise(async (resolve, reject) => { try { - fileInfo = await storage.fileInfo(this.originalCachePath); - } catch (error) { - // unable to get file size - console.error(error); - } - const anonymizer = new AnonymizeTransformer(this); - - anonymizer.once("transform", (data) => { - if (data.isText && !mime) { + const mime = lookup(this.anonymizedPath); + if (mime && this.extension() != "ts") { + res.contentType(mime); + } else if (isTextFile(this.anonymizedPath)) { res.contentType("text/plain"); } - if (fileInfo?.size && !data.wasAnonimized) { - // the text files may be anonymized and therefore the size may be different - res.header("Content-Length", fileInfo.size.toString()); - } - }); - - content - .pipe(anonymizer) - .pipe(res) - .on("close", () => { - if (!content.closed && !content.destroyed) { - content.destroy(); + res.header("Accept-Ranges", "none"); + const anonymizer = new AnonymizeTransformer(this); + anonymizer.once("transform", (data) => { + if (!mime && data.isText) { + res.contentType("text/plain"); } - span.end(); - resolve(); - }) - .on("error", (error) => { - if (!content.closed && !content.destroyed) { - content.destroy(); + if (!data.wasAnonimized && this.fileSize) { + // the text files may be anonymized and therefore the size may be different + res.header("Content-Length", this.fileSize.toString()); } - span.recordException(error); - span.end(); - reject(error); - handleError(error, res); }); - } catch (error) { - handleError(error, res); - } + + const content = await this.content(); + content + .pipe(anonymizer) + .pipe(res) + .on("close", () => { + if (!content.closed && !content.destroyed) { + content.destroy(); + } + span.end(); + resolve(); + }) + .on("error", (error) => { + if (!content.closed && !content.destroyed) { + content.destroy(); + } + span.recordException(error); + span.end(); + reject(error); + handleError(error, res); + }); + } catch (error) { + handleError(error, res); + } + }); }); - }); } } diff --git a/src/AnonymousError.ts b/src/AnonymousError.ts index 09cf7ad..ab9fcb2 100644 --- a/src/AnonymousError.ts +++ b/src/AnonymousError.ts @@ -39,7 +39,7 @@ export default class AnonymousError extends CustomError { } else if (this.value instanceof User) { detail = `${this.value.username}`; } else if (this.value instanceof GitHubBase) { - detail = `${this.value.repository.repoId}`; + detail = `${this.value.githubRepository.fullName}`; } out += this.message; if (detail) { diff --git a/src/Repository.ts b/src/Repository.ts index 99346bf..e3656bf 100644 --- a/src/Repository.ts +++ b/src/Repository.ts @@ -1,13 +1,5 @@ -import { join } from "path"; import storage from "./storage"; -import { - FILE_TYPE, - RepositoryStatus, - Source, - Tree, - TreeElement, - TreeFile, -} from "./types"; +import { RepositoryStatus, Source, Tree, TreeElement, TreeFile } from "./types"; import { Readable } from "stream"; import User from "./User"; import GitHubStream from "./source/GitHubStream"; @@ -21,7 +13,7 @@ import GitHubBase from "./source/GitHubBase"; import Conference from "./Conference"; import ConferenceModel from "./database/conference/conferences.model"; import AnonymousError from "./AnonymousError"; -import { downloadQueue, removeQueue } from "./queue"; +import { downloadQueue } from "./queue"; import { isConnected } from "./database/database"; import AnonymizedFile from "./AnonymizedFile"; import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model"; @@ -64,13 +56,13 @@ export default class Repository { this._model = data; switch (data.source.type) { case "GitHubDownload": - this.source = new GitHubDownload(data.source, this); + this.source = new GitHubDownload(data.source, this.repoId); break; case "GitHubStream": - this.source = new GitHubStream(data.source, this); + this.source = new GitHubStream(data.source); break; case "Zip": - this.source = new Zip(data.source, this); + this.source = new Zip(data.source, this.repoId); break; default: throw new AnonymousError("unsupported_source", { @@ -180,7 +172,7 @@ export default class Repository { * @returns A stream of anonymized repository compressed */ zip(): Promise { - return storage.archive(this.originalCachePath, { + return storage.archive(this.repoId, "", { format: "zip", fileTransformer: (filename: string) => new AnonymizeTransformer( @@ -394,14 +386,10 @@ export default class Repository { .startSpan("Repository.removeCache"); span.setAttribute("repoId", this.repoId); try { + return storage.rm(this.repoId); + } finally { this.model.isReseted = true; await this.model.save(); - if ( - (await storage.exists(this._model.repoId + "/")) !== FILE_TYPE.NOT_FOUND - ) { - return storage.rm(this._model.repoId + "/"); - } - } finally { span.end(); } } @@ -490,13 +478,6 @@ export default class Repository { return this._model; } - get originalCachePath() { - return ( - join(this._model.repoId, "original") + - (process.platform === "win32" ? "\\" : "/") - ); - } - get status() { return this._model.status; } diff --git a/src/processes/downloadRepository.ts b/src/processes/downloadRepository.ts index f904aaf..688eedb 100644 --- a/src/processes/downloadRepository.ts +++ b/src/processes/downloadRepository.ts @@ -39,6 +39,7 @@ export default async function (job: SandboxedJob) { throw error; } } catch (error) { + console.error(error) span.recordException(error as Exception); console.log(`[QUEUE] ${job.data.repoId} is finished with an error`); } finally { diff --git a/src/server.ts b/src/server.ts index 2402a39..2af4c4b 100644 --- a/src/server.ts +++ b/src/server.ts @@ -19,6 +19,14 @@ import AnonymizedPullRequestModel from "./database/anonymizedPullRequests/anonym import { getUser } from "./routes/route-utils"; function indexResponse(req: express.Request, res: express.Response) { + if ( + req.path.startsWith("/script") || + req.path.startsWith("/style") || + req.path.startsWith("/favicon") || + req.path.startsWith("/api") + ) { + return res.status(404).send("Not found"); + } if ( req.params.repoId && req.headers["accept"] && diff --git a/src/source/GitHubBase.ts b/src/source/GitHubBase.ts index 26ff14e..abddb7f 100644 --- a/src/source/GitHubBase.ts +++ b/src/source/GitHubBase.ts @@ -6,30 +6,22 @@ import AnonymizedFile from "../AnonymizedFile"; import { Branch, Tree } from "../types"; import { GitHubRepository } from "./GitHubRepository"; import config from "../../config"; -import Repository from "../Repository"; import UserModel from "../database/users/users.model"; -import AnonymousError from "../AnonymousError"; export default abstract class GitHubBase { - type: "GitHubDownload" | "GitHubStream" | "Zip"; + abstract type: "GitHubDownload" | "GitHubStream" | "Zip"; githubRepository: GitHubRepository; branch: Branch; accessToken: string | undefined; - repository: Repository; validToken: boolean = false; - constructor( - data: { - type: "GitHubDownload" | "GitHubStream" | "Zip"; - branch?: string; - commit?: string; - repositoryId?: string; - repositoryName?: string; - accessToken?: string; - }, - repository: Repository - ) { - this.type = data.type; + constructor(data: { + accessToken?: string; + commit?: string; + branch?: string; + repositoryId?: string; + repositoryName?: string; + }) { this.accessToken = data.accessToken; const branches = []; if (data.branch && data.commit) { @@ -40,23 +32,15 @@ export default abstract class GitHubBase { externalId: data.repositoryId, branches, }); - this.repository = repository; this.branch = branches[0]; } - async getFileContent(file: AnonymizedFile): Promise { - throw new AnonymousError("method_not_implemented", { - httpStatus: 501, - object: this, - }); - } + abstract getFileContent( + file: AnonymizedFile, + progress?: (status: string) => void + ): Promise; - getFiles(): Promise { - throw new AnonymousError("method_not_implemented", { - httpStatus: 501, - object: this, - }); - } + abstract getFiles(progress?: (status: string) => void): Promise; static octokit(token: string) { return new Octokit({ @@ -77,22 +61,24 @@ export default abstract class GitHubBase { } } - async getToken() { + async getToken(ownerID?: any) { const span = trace.getTracer("ano-file").startSpan("GHBase.getToken"); - span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("repoId", this.githubRepository.fullName || ""); try { if (this.validToken) { return this.accessToken as string; } - const user = await UserModel.findById(this.repository.owner.id, { - accessTokens: 1, - }); - if (user?.accessTokens.github) { - const check = await GitHubBase.checkToken(user.accessTokens.github); - if (check) { - this.accessToken = user.accessTokens.github; - this.validToken = true; - return this.accessToken; + if (ownerID) { + const user = await UserModel.findById(ownerID, { + accessTokens: 1, + }); + if (user?.accessTokens.github) { + const check = await GitHubBase.checkToken(user.accessTokens.github); + if (check) { + this.accessToken = user.accessTokens.github; + this.validToken = true; + return this.accessToken; + } } } if (this.accessToken) { diff --git a/src/source/GitHubDownload.ts b/src/source/GitHubDownload.ts index 95f11e5..bf5992b 100644 --- a/src/source/GitHubDownload.ts +++ b/src/source/GitHubDownload.ts @@ -2,32 +2,31 @@ import got from "got"; import { Readable } from "stream"; import { OctokitResponse } from "@octokit/types"; -import config from "../../config"; import storage from "../storage"; -import Repository from "../Repository"; import GitHubBase from "./GitHubBase"; import AnonymizedFile from "../AnonymizedFile"; -import { FILE_TYPE, RepositoryStatus, SourceBase } from "../types"; +import { SourceBase } from "../types"; import AnonymousError from "../AnonymousError"; import { trace } from "@opentelemetry/api"; +import { FILE_TYPE } from "../storage/Storage"; export default class GitHubDownload extends GitHubBase implements SourceBase { + type: "GitHubDownload" | "GitHubStream" | "Zip" = "GitHubDownload"; constructor( data: { - type: "GitHubDownload" | "GitHubStream" | "Zip"; branch?: string; commit?: string; repositoryId?: string; repositoryName?: string; accessToken?: string; }, - repository: Repository + readonly repoId: string ) { - super(data, repository); + super(data); } private async _getZipUrl( - auth?: string + auth: string ): Promise> { const octokit = GitHubBase.octokit(auth as string); return octokit.rest.repos.downloadZipballArchive({ @@ -38,69 +37,30 @@ export default class GitHubDownload extends GitHubBase implements SourceBase { }); } - async download(token?: string) { + async download(progress?: (status: string) => void) { const span = trace.getTracer("ano-file").startSpan("GHDownload.download"); - span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("repoId", this.githubRepository.fullName || ""); try { - const fiveMinuteAgo = new Date(); - fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5); - if ( - this.repository.status == "download" && - this.repository.model.statusDate > fiveMinuteAgo - ) - throw new AnonymousError("repo_in_download", { - httpStatus: 404, - object: this.repository, - }); let response: OctokitResponse; try { - if (!token) { - token = await this.getToken(); - } - response = await this._getZipUrl(token); + response = await this._getZipUrl(await this.getToken()); } catch (error) { span.recordException(error as Error); - if ((error as any).status == 401 && config.GITHUB_TOKEN) { - try { - response = await this._getZipUrl(config.GITHUB_TOKEN); - } catch (error) { - await this.repository.resetSate( - RepositoryStatus.ERROR, - "repo_not_accessible" - ); - throw new AnonymousError("repo_not_accessible", { - httpStatus: 404, - cause: error as Error, - object: this.repository, - }); - } - } else { - await this.repository.resetSate( - RepositoryStatus.ERROR, - "repo_not_accessible" - ); - throw new AnonymousError("repo_not_accessible", { - httpStatus: 404, - object: this.repository, - cause: error as Error, - }); - } + throw new AnonymousError("repo_not_accessible", { + httpStatus: 404, + object: this.githubRepository, + cause: error as Error, + }); } - await this.repository.updateStatus(RepositoryStatus.DOWNLOAD); - const originalPath = this.repository.originalCachePath; - await storage.mk(originalPath); - let progress: { transferred: number } | undefined = undefined; + await storage.mk(this.repoId); + let downloadProgress: { transferred: number } | undefined = undefined; let progressTimeout; let inDownload = true; - const that = this; async function updateProgress() { if (inDownload) { - if (progress && that.repository.status == RepositoryStatus.DOWNLOAD) { - await that.repository.updateStatus( - that.repository.status, - progress.transferred.toString() - ); + if (progress) { + progress(downloadProgress?.transferred?.toString() || ""); } progressTimeout = setTimeout(updateProgress, 1500); } @@ -110,45 +70,43 @@ export default class GitHubDownload extends GitHubBase implements SourceBase { try { const downloadStream = got.stream(response.url); downloadStream.addListener("downloadProgress", async (p) => { - progress = p; + downloadProgress = p; }); - await storage.extractZip(originalPath, downloadStream, undefined, this); + await storage.extractZip( + this.repoId, + "", + downloadStream, + undefined, + this + ); } catch (error) { span.recordException(error as Error); - await this.repository.updateStatus( - RepositoryStatus.ERROR, - "unable_to_download" - ); throw new AnonymousError("unable_to_download", { httpStatus: 500, cause: error as Error, - object: this.repository, + object: this.githubRepository, }); } finally { inDownload = false; clearTimeout(progressTimeout); } - - this.repository.model.isReseted = false; - try { - await this.repository.updateStatus(RepositoryStatus.READY); - } catch (error) { - span.recordException(error as Error); - } } finally { span.end(); } } - async getFileContent(file: AnonymizedFile): Promise { + async getFileContent( + file: AnonymizedFile, + progress?: (status: string) => void + ): Promise { const span = trace .getTracer("ano-file") .startSpan("GHDownload.getFileContent"); - span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("repoId", this.githubRepository.fullName || ""); try { - const exists = await storage.exists(file.originalCachePath); + const exists = await storage.exists(file.filePath); if (exists === FILE_TYPE.FILE) { - return storage.read(file.originalCachePath); + return storage.read(this.repoId, file.filePath); } else if (exists === FILE_TYPE.FOLDER) { throw new AnonymousError("folder_not_supported", { httpStatus: 400, @@ -159,18 +117,17 @@ export default class GitHubDownload extends GitHubBase implements SourceBase { await file.originalPath(); // the cache is not ready, we need to download the repository - await this.download(); - return storage.read(file.originalCachePath); + await this.download(progress); + return storage.read(this.repoId, file.filePath); } finally { span.end(); } } async getFiles() { - const folder = this.repository.originalCachePath; - if ((await storage.exists(folder)) === FILE_TYPE.NOT_FOUND) { + if ((await storage.exists(this.repoId)) === FILE_TYPE.NOT_FOUND) { await this.download(); } - return storage.listFiles(folder); + return storage.listFiles(this.repoId); } } diff --git a/src/source/GitHubStream.ts b/src/source/GitHubStream.ts index 57e46a5..6859605 100644 --- a/src/source/GitHubStream.ts +++ b/src/source/GitHubStream.ts @@ -1,102 +1,134 @@ import AnonymizedFile from "../AnonymizedFile"; -import Repository from "../Repository"; import GitHubBase from "./GitHubBase"; import storage from "../storage"; -import { RepositoryStatus, SourceBase, Tree } from "../types"; +import { SourceBase, Tree } from "../types"; import * as path from "path"; +import got from "got"; import * as stream from "stream"; import AnonymousError from "../AnonymousError"; import config from "../../config"; import { trace } from "@opentelemetry/api"; +import { FILE_TYPE } from "../storage/Storage"; export default class GitHubStream extends GitHubBase implements SourceBase { - constructor( - data: { - type: "GitHubDownload" | "GitHubStream" | "Zip"; - branch?: string; - commit?: string; - repositoryId?: string; - repositoryName?: string; - accessToken?: string; - }, - repository: Repository - ) { - super(data, repository); + type: "GitHubDownload" | "GitHubStream" | "Zip" = "GitHubStream"; + + constructor(data: { + branch?: string; + commit?: string; + repositoryId?: string; + repositoryName?: string; + accessToken?: string; + }) { + super(data); + } + + downloadFile(sha: string, token: string) { + const span = trace.getTracer("ano-file").startSpan("GHStream.downloadFile"); + span.setAttribute("sha", sha); + const octokit = GitHubBase.octokit(token); + try { + const { url } = octokit.rest.git.getBlob.endpoint({ + owner: this.githubRepository.owner, + repo: this.githubRepository.repo, + file_sha: sha, + }); + return got.stream(url, { + headers: { + "X-GitHub-Api-Version": "2022-11-28", + accept: "application/vnd.github.raw+json", + authorization: `token ${token}`, + }, + }); + } catch (error) { + console.error(error); + // span.recordException(error as Error); + throw new AnonymousError("repo_not_accessible", { + httpStatus: 404, + object: this.githubRepository, + cause: error as Error, + }); + } finally { + span.end(); + } } async getFileContent(file: AnonymizedFile): Promise { - return trace + const span = trace .getTracer("ano-file") - .startActiveSpan("GHStream.getFileContent", async (span) => { - span.setAttribute("path", file.anonymizedPath); - const octokit = GitHubBase.octokit(await this.getToken()); + .startSpan("GHStream.getFileContent"); + span.setAttribute("repoId", file.repository.repoId); + span.setAttribute("file", file.anonymizedPath); + try { + try { + file.filePath; + } catch (_) { + // compute the original path if ambiguous + await file.originalPath(); + } + const fileInfo = await storage.exists( + file.repository.repoId, + file.filePath + ); + if (fileInfo == FILE_TYPE.FILE) { + return storage.read(file.repository.repoId, file.filePath); + } else if (fileInfo == FILE_TYPE.FOLDER) { + throw new AnonymousError("folder_not_supported", { + httpStatus: 400, + object: file, + }); + } + span.setAttribute("path", file.filePath); + const file_sha = await file.sha(); + if (!file_sha) { + throw new AnonymousError("file_not_accessible", { + httpStatus: 404, + object: file, + }); + } + try { + const token = await this.getToken(file.repository.owner.id); + const content = this.downloadFile(file_sha, token); - const file_sha = await file.sha(); - if (!file_sha) { - throw new AnonymousError("file_not_accessible", { - httpStatus: 404, - object: file, - }); - } - try { - const ghRes = await octokit.rest.git.getBlob({ - owner: this.githubRepository.owner, - repo: this.githubRepository.repo, - file_sha, - }); - if (!ghRes.data.content && ghRes.data.size != 0) { - throw new AnonymousError("file_not_accessible", { - httpStatus: 404, - object: file, - }); - } - // empty file - let content: Buffer; - if (ghRes.data.content) { - content = Buffer.from( - ghRes.data.content, - ghRes.data.encoding as BufferEncoding - ); - } else { - content = Buffer.from(""); - } - await storage.write(file.originalCachePath, content, file, this); - this.repository.model.isReseted = false; - await this.repository.model.save(); - if (this.repository.status !== RepositoryStatus.READY) - await this.repository.updateStatus(RepositoryStatus.READY); - return stream.Readable.from(content); - } catch (error) { - if ( - (error as any).status === 404 || - (error as any).httpStatus === 404 - ) { - throw new AnonymousError("file_not_found", { - httpStatus: (error as any).status || (error as any).httpStatus, - cause: error as Error, - object: file, - }); - } - throw new AnonymousError("file_too_big", { + // duplicate the stream to write it to the storage + const stream1 = content.pipe(new stream.PassThrough()); + const stream2 = content.pipe(new stream.PassThrough()); + storage.write( + file.repository.repoId, + file.filePath, + stream1, + file, + this + ); + return stream2; + } catch (error) { + if ( + (error as any).status === 404 || + (error as any).httpStatus === 404 + ) { + throw new AnonymousError("file_not_found", { httpStatus: (error as any).status || (error as any).httpStatus, cause: error as Error, object: file, }); - } finally { - span.end(); } - }); + throw new AnonymousError("file_too_big", { + httpStatus: (error as any).status || (error as any).httpStatus, + cause: error as Error, + object: file, + }); + } + } finally { + span.end(); + } } async getFiles() { const span = trace.getTracer("ano-file").startSpan("GHStream.getFiles"); - span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("repoName", this.githubRepository.fullName || ""); try { let commit = this.branch?.commit; - if (!commit && this.repository.model.source.commit) { - commit = this.repository.model.source.commit; - } return this.getTree(commit); } finally { span.end(); @@ -113,33 +145,21 @@ export default class GitHubStream extends GitHubBase implements SourceBase { } ) { const span = trace.getTracer("ano-file").startSpan("GHStream.getTree"); - span.setAttribute("repoId", this.repository.repoId); + span.setAttribute("repoName", this.githubRepository.fullName || ""); span.setAttribute("sha", sha); - this.repository.model.truckedFileList = false; let ghRes: Awaited>; try { count.request++; ghRes = await this.getGHTree(sha, { recursive: true }); } catch (error) { + console.error(error); span.recordException(error as Error); if ((error as any).status == 409) { - // empty tree - if (this.repository.status != RepositoryStatus.READY) - await this.repository.updateStatus(RepositoryStatus.READY); // cannot be empty otherwise it would try to download it again span.end(); return { __: {} }; } else { - console.log( - `[ERROR] getTree ${this.repository.repoId}@${sha}: ${ - (error as Error).message - }` - ); - await this.repository.resetSate( - RepositoryStatus.ERROR, - "repo_not_accessible" - ); const err = new AnonymousError("repo_not_accessible", { httpStatus: (error as any).status, cause: error as Error, @@ -159,15 +179,12 @@ export default class GitHubStream extends GitHubBase implements SourceBase { if (ghRes.truncated) { await this.getTruncatedTree(sha, tree, parentPath, count); } - if (this.repository.status !== RepositoryStatus.READY) - await this.repository.updateStatus(RepositoryStatus.READY); span.end(); return tree; } private async getGHTree(sha: string, opt = { recursive: true }) { const span = trace.getTracer("ano-file").startSpan("GHStream.getGHTree"); - span.setAttribute("repoId", this.repository.repoId); span.setAttribute("sha", sha); try { const octokit = GitHubBase.octokit(await this.getToken()); @@ -196,7 +213,6 @@ export default class GitHubStream extends GitHubBase implements SourceBase { const span = trace .getTracer("ano-file") .startSpan("GHStream.getTruncatedTree"); - span.setAttribute("repoId", this.repository.repoId); span.setAttribute("sha", sha); span.setAttribute("parentPath", parentPath); try { @@ -207,8 +223,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase { data = await this.getGHTree(sha, { recursive: false }); this.tree2Tree(data.tree, truncatedTree, parentPath); } catch (error) { - console.error(error); - this.repository.model.truckedFileList = true; + span.recordException(error as Error); return; } @@ -235,11 +250,10 @@ export default class GitHubStream extends GitHubBase implements SourceBase { const data = await this.getGHTree(sha, { recursive: true }); this.tree2Tree(data.tree, truncatedTree, parentPath); if (data.truncated) { - this.repository.model.truckedFileList = true; + // TODO: TRUNCATED } } catch (error) { - console.error(error); - this.repository.model.truckedFileList = true; + span.recordException(error as Error); } } } finally { @@ -260,7 +274,6 @@ export default class GitHubStream extends GitHubBase implements SourceBase { parentPath: string = "" ) { const span = trace.getTracer("ano-file").startSpan("GHStream.tree2Tree"); - span.setAttribute("repoId", this.repository.repoId); span.setAttribute("parentPath", parentPath); try { for (let elem of tree) { @@ -286,7 +299,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase { // if elem is a file add the file size in the file list if (elem.type == "blob") { if (Object.keys(current).length > config.MAX_FILE_FOLDER) { - this.repository.model.truckedFileList = true; + // TODO: TRUNCATED continue; } let p = paths[end]; diff --git a/src/source/Zip.ts b/src/source/Zip.ts index 0c9a526..31b50a2 100644 --- a/src/source/Zip.ts +++ b/src/source/Zip.ts @@ -1,25 +1,22 @@ import AnonymizedFile from "../AnonymizedFile"; -import Repository from "../Repository"; import storage from "../storage"; import { SourceBase } from "../types"; import * as stream from "stream"; export default class Zip implements SourceBase { type = "Zip"; - repository: Repository; url?: string; - constructor(data: any, repository: Repository) { - this.repository = repository; + constructor(data: any, readonly repoId: string) { this.url = data.url; } async getFiles() { - return storage.listFiles(this.repository.originalCachePath); + return storage.listFiles(this.repoId); } async getFileContent(file: AnonymizedFile): Promise { - return storage.read(file.originalCachePath); + return storage.read(file.repository.repoId, file.filePath); } toJSON(): any { diff --git a/src/storage/FileSystem.ts b/src/storage/FileSystem.ts index 3b1283f..a88cf0f 100644 --- a/src/storage/FileSystem.ts +++ b/src/storage/FileSystem.ts @@ -1,6 +1,6 @@ -import { FILE_TYPE, SourceBase, StorageBase, Tree } from "../types"; +import { SourceBase, Tree } from "../types"; import config from "../../config"; - +import { Stream } from "node:stream"; import * as fs from "fs"; import { Extract } from "unzip-stream"; import { join, basename, dirname } from "path"; @@ -11,21 +11,25 @@ import { promisify } from "util"; import AnonymizedFile from "../AnonymizedFile"; import { lookup } from "mime-types"; import { trace } from "@opentelemetry/api"; +import StorageBase, { FILE_TYPE } from "./Storage"; -export default class FileSystem implements StorageBase { +export default class FileSystem extends StorageBase { type = "FileSystem"; - constructor() {} + constructor() { + super(); + } /** @override */ - async exists(p: string): Promise { + async exists(repoId: string, p: string = ""): Promise { + const fullPath = join(config.FOLDER, this.repoPath(repoId), p); return trace .getTracer("ano-file") .startActiveSpan("fs.exists", async (span) => { span.setAttribute("path", p); - span.setAttribute("full-path", join(config.FOLDER, p)); + span.setAttribute("full-path", fullPath); try { - const stat = await fs.promises.stat(join(config.FOLDER, p)); + const stat = await fs.promises.stat(fullPath); if (stat.isDirectory()) return FILE_TYPE.FOLDER; if (stat.isFile()) return FILE_TYPE.FILE; } catch (_) { @@ -37,12 +41,13 @@ export default class FileSystem implements StorageBase { } /** @override */ - async send(p: string, res: Response) { + async send(repoId: string, p: string, res: Response) { + const fullPath = join(config.FOLDER, this.repoPath(repoId), p); return trace .getTracer("ano-file") .startActiveSpan("fs.send", async (span) => { - span.setAttribute("path", p); - res.sendFile(join(config.FOLDER, p), { dotfiles: "allow" }, (err) => { + span.setAttribute("path", fullPath); + res.sendFile(fullPath, { dotfiles: "allow" }, (err) => { if (err) { span.recordException(err); } @@ -52,44 +57,49 @@ export default class FileSystem implements StorageBase { } /** @override */ - async read(p: string): Promise { - return fs.createReadStream(join(config.FOLDER, p)); + async read(repoId: string, p: string): Promise { + const fullPath = join(config.FOLDER, this.repoPath(repoId), p); + return fs.createReadStream(fullPath); } - async fileInfo(path: string) { - const info = await fs.promises.stat(join(config.FOLDER, path)); + async fileInfo(repoId: string, path: string) { + const fullPath = join(config.FOLDER, this.repoPath(repoId), path); + const info = await fs.promises.stat(fullPath); return { size: info.size, lastModified: info.mtime, contentType: info.isDirectory() ? "application/x-directory" - : (lookup(join(config.FOLDER, path)) as string), + : (lookup(fullPath) as string), }; } /** @override */ async write( + repoId: string, p: string, - data: Buffer, + data: string | Readable, file?: AnonymizedFile, source?: SourceBase ): Promise { const span = trace.getTracer("ano-file").startSpan("fs.write"); - span.setAttribute("path", p); + const fullPath = join(config.FOLDER, this.repoPath(repoId), p); + span.setAttribute("path", fullPath); try { - await this.mk(dirname(p)); - return await fs.promises.writeFile(join(config.FOLDER, p), data, "utf-8"); + await this.mk(repoId, dirname(p)); + return await fs.promises.writeFile(fullPath, data, "utf-8"); } finally { span.end(); } } /** @override */ - async rm(dir: string): Promise { + async rm(repoId: string, dir: string = ""): Promise { const span = trace.getTracer("ano-file").startSpan("fs.rm"); - span.setAttribute("path", dir); + const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); + span.setAttribute("path", fullPath); try { - await fs.promises.rm(join(config.FOLDER, dir), { + await fs.promises.rm(fullPath, { force: true, recursive: true, }); @@ -99,11 +109,12 @@ export default class FileSystem implements StorageBase { } /** @override */ - async mk(dir: string): Promise { + async mk(repoId: string, dir: string = ""): Promise { const span = trace.getTracer("ano-file").startSpan("fs.mk"); span.setAttribute("path", dir); + const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); try { - await fs.promises.mkdir(join(config.FOLDER, dir), { + await fs.promises.mkdir(fullPath, { recursive: true, }); } catch (err: any) { @@ -118,9 +129,9 @@ export default class FileSystem implements StorageBase { /** @override */ async listFiles( - dir: string, + repoId: string, + dir: string = "", opt: { - root?: string; onEntry?: (file: { path: string; size: number }) => void; } = {} ): Promise { @@ -128,20 +139,18 @@ export default class FileSystem implements StorageBase { .getTracer("ano-file") .startActiveSpan("fs.listFiles", async (span) => { span.setAttribute("path", dir); - if (opt.root == null) { - opt.root = config.FOLDER; - } - let files = await fs.promises.readdir(join(opt.root, dir)); + const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); + let files = await fs.promises.readdir(fullPath); const output: Tree = {}; for (let file of files) { let filePath = join(dir, file); try { - const stats = await fs.promises.stat(join(opt.root, filePath)); + const stats = await fs.promises.stat(join(fullPath, filePath)); if (file[0] == "$") { file = "\\" + file; } if (stats.isDirectory()) { - output[file] = await this.listFiles(filePath, opt); + output[file] = await this.listFiles(repoId, filePath, opt); } else if (stats.isFile()) { if (opt.onEntry) { opt.onEntry({ @@ -162,16 +171,18 @@ export default class FileSystem implements StorageBase { /** @override */ async extractZip( + repoId: string, p: string, data: Readable, file?: AnonymizedFile, source?: SourceBase ): Promise { const pipe = promisify(pipeline); + const fullPath = join(config.FOLDER, this.repoPath(repoId), p); return pipe( data, Extract({ - path: join(config.FOLDER, p), + path: fullPath, decodeString: (buf) => { const name = buf.toString(); const newName = name.substr(name.indexOf("/") + 1); @@ -184,6 +195,7 @@ export default class FileSystem implements StorageBase { /** @override */ async archive( + repoId: string, dir: string, opt?: { format?: "zip" | "tar"; @@ -191,15 +203,16 @@ export default class FileSystem implements StorageBase { } ) { const archive = archiver(opt?.format || "zip", {}); + const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); - await this.listFiles(dir, { + await this.listFiles(repoId, dir, { onEntry: async (file) => { - let rs = await this.read(file.path); + let rs = await this.read(repoId, file.path); if (opt?.fileTransformer) { // apply transformation on the stream rs = rs.pipe(opt.fileTransformer(file.path)); } - const f = file.path.replace(dir, ""); + const f = file.path.replace(fullPath, ""); archive.append(rs, { name: basename(f), prefix: dirname(f), diff --git a/src/storage/S3.ts b/src/storage/S3.ts index b33bb6a..a1891d4 100644 --- a/src/storage/S3.ts +++ b/src/storage/S3.ts @@ -1,26 +1,29 @@ -import { FILE_TYPE, SourceBase, StorageBase, Tree, TreeFile } from "../types"; import { GetObjectCommand, ListObjectsV2CommandOutput, PutObjectCommandInput, S3, } from "@aws-sdk/client-s3"; +import { Upload } from "@aws-sdk/lib-storage"; import { NodeHttpHandler } from "@smithy/node-http-handler"; import config from "../../config"; import { pipeline, Readable, Transform } from "stream"; import ArchiveStreamToS3 from "decompress-stream-to-s3"; import { Response } from "express"; -import { lookup } from "mime-types"; +import { contentType } from "mime-types"; import * as archiver from "archiver"; -import { dirname, basename } from "path"; +import { trace } from "@opentelemetry/api"; +import { dirname, basename, join } from "path"; +import { SourceBase, Tree, TreeFile } from "../types"; import AnonymousError from "../AnonymousError"; import AnonymizedFile from "../AnonymizedFile"; -import { trace } from "@opentelemetry/api"; +import StorageBase, { FILE_TYPE } from "./Storage"; -export default class S3Storage implements StorageBase { +export default class S3Storage extends StorageBase { type = "AWS"; constructor() { + super(); if (!config.S3_BUCKET) throw new AnonymousError("s3_config_not_provided", { httpStatus: 500, @@ -40,26 +43,25 @@ export default class S3Storage implements StorageBase { requestHandler: new NodeHttpHandler({ requestTimeout: timeout, connectionTimeout: timeout, - }), }); } /** @override */ - async exists(path: string): Promise { + async exists(repoId: string, path: string = ""): Promise { const span = trace.getTracer("ano-file").startSpan("s3.exists"); span.setAttribute("path", path); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); try { // if we can get the file info, it is a file - await this.fileInfo(path); + await this.fileInfo(repoId, path); return FILE_TYPE.FILE; } catch (err) { // check if it is a directory const data = await this.client().listObjectsV2({ Bucket: config.S3_BUCKET, - Prefix: path, + Prefix: join(this.repoPath(repoId), path), MaxKeys: 1, }); return (data.Contents?.length || 0) > 0 @@ -72,19 +74,20 @@ export default class S3Storage implements StorageBase { } /** @override */ - async mk(dir: string): Promise { + async mk(repoId: string, dir: string = ""): Promise { // no need to create folder on S3 } /** @override */ - async rm(dir: string): Promise { + async rm(repoId: string, dir: string = ""): Promise { const span = trace.getTracer("ano-file").startSpan("s3.rm"); + span.setAttribute("repoId", repoId); span.setAttribute("path", dir); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); const data = await this.client(200000).listObjectsV2({ Bucket: config.S3_BUCKET, - Prefix: dir, + Prefix: join(this.repoPath(repoId), dir), MaxKeys: 100, }); @@ -106,7 +109,7 @@ export default class S3Storage implements StorageBase { await this.client(200000).deleteObjects(params); if (data.IsTruncated) { - await this.rm(dir); + await this.rm(repoId, dir); } } finally { span.end(); @@ -114,15 +117,16 @@ export default class S3Storage implements StorageBase { } /** @override */ - async send(p: string, res: Response) { + async send(repoId: string, path: string, res: Response) { const span = trace.getTracer("ano-file").startSpan("s3.send"); - span.setAttribute("path", p); + span.setAttribute("repoId", repoId); + span.setAttribute("path", path); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); try { const command = new GetObjectCommand({ Bucket: config.S3_BUCKET, - Key: p, + Key: join(this.repoPath(repoId), path), }); const s = await this.client().send(command); res.status(200); @@ -142,7 +146,7 @@ export default class S3Storage implements StorageBase { try { res.status(500); } catch (err) { - console.error(`[ERROR] S3 send ${p}`, err); + console.error(`[ERROR] S3 send ${path}`, err); } } } finally { @@ -150,21 +154,22 @@ export default class S3Storage implements StorageBase { } } - async fileInfo(path: string) { + async fileInfo(repoId: string, path: string) { const span = trace.getTracer("ano-file").startSpan("s3.fileInfo"); + span.setAttribute("repoId", repoId); span.setAttribute("path", path); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); const info = await this.client(3000).headObject({ Bucket: config.S3_BUCKET, - Key: path, + Key: join(this.repoPath(repoId), path), }); return { size: info.ContentLength, lastModified: info.LastModified, contentType: info.ContentType ? info.ContentType - : (lookup(path) as string), + : (contentType(path) as string), }; } finally { span.end(); @@ -172,20 +177,21 @@ export default class S3Storage implements StorageBase { } /** @override */ - async read(path: string): Promise { + async read(repoId: string, path: string): Promise { const span = trace.getTracer("ano-file").startSpan("s3.rreadm"); + span.setAttribute("repoId", repoId); span.setAttribute("path", path); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); const command = new GetObjectCommand({ Bucket: config.S3_BUCKET, - Key: path, + Key: join(this.repoPath(repoId), path), }); const res = (await this.client(3000).send(command)).Body; if (!res) { throw new AnonymousError("file_not_found", { httpStatus: 404, - object: path, + object: join(this.repoPath(repoId), path), }); } return res as Readable; @@ -196,26 +202,35 @@ export default class S3Storage implements StorageBase { /** @override */ async write( + repoId: string, path: string, - data: Buffer, + data: string | Readable, file?: AnonymizedFile, source?: SourceBase ): Promise { const span = trace.getTracer("ano-file").startSpan("s3.rm"); + span.setAttribute("repoId", repoId); span.setAttribute("path", path); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + const params: PutObjectCommandInput = { Bucket: config.S3_BUCKET, - Key: path, + Key: join(this.repoPath(repoId), path), Body: data, - ContentType: lookup(path).toString(), + ContentType: contentType(path).toString(), }; if (source) { params.Tagging = `source=${source.type}`; } - // 30s timeout - await this.client(30000).putObject(params); + + const parallelUploads3 = new Upload({ + // 30s timeout + client: this.client(30000), + params, + }); + + await parallelUploads3.done(); return; } finally { span.end(); @@ -223,7 +238,7 @@ export default class S3Storage implements StorageBase { } /** @override */ - async listFiles(dir: string): Promise { + async listFiles(repoId: string, dir: string = ""): Promise { const span = trace.getTracer("ano-file").startSpan("s3.listFiles"); span.setAttribute("path", dir); try { @@ -235,7 +250,7 @@ export default class S3Storage implements StorageBase { do { req = await this.client(30000).listObjectsV2({ Bucket: config.S3_BUCKET, - Prefix: dir, + Prefix: join(this.repoPath(repoId), dir), MaxKeys: 250, ContinuationToken: nextContinuationToken, }); @@ -244,7 +259,7 @@ export default class S3Storage implements StorageBase { for (const f of req.Contents) { if (!f.Key) continue; - f.Key = f.Key.replace(dir, ""); + f.Key = f.Key.replace(join(this.repoPath(repoId), dir), ""); const paths = f.Key.split("/"); let current: Tree = out; for (let i = 0; i < paths.length - 1; i++) { @@ -271,19 +286,20 @@ export default class S3Storage implements StorageBase { /** @override */ async extractZip( - p: string, + repoId: string, + path: string, data: Readable, file?: AnonymizedFile, source?: SourceBase ): Promise { let toS3: ArchiveStreamToS3; const span = trace.getTracer("ano-file").startSpan("s3.extractZip"); - span.setAttribute("path", p); + span.setAttribute("path", path); return new Promise((resolve, reject) => { if (!config.S3_BUCKET) return reject("S3_BUCKET not set"); toS3 = new ArchiveStreamToS3({ bucket: config.S3_BUCKET, - prefix: p, + prefix: join(this.repoPath(repoId), path), s3: this.client(2 * 60 * 60 * 1000), // 2h timeout type: "zip", onEntry: (header) => { @@ -315,13 +331,15 @@ export default class S3Storage implements StorageBase { /** @override */ async archive( - dir: string, + repoId: string, + dir: string = "", opt?: { format?: "zip" | "tar"; fileTransformer?: (p: string) => Transform; } ) { const span = trace.getTracer("ano-file").startSpan("s3.archive"); + span.setAttribute("repoId", repoId); span.setAttribute("path", dir); try { if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); @@ -333,7 +351,7 @@ export default class S3Storage implements StorageBase { do { req = await this.client(30000).listObjectsV2({ Bucket: config.S3_BUCKET, - Prefix: dir, + Prefix: join(this.repoPath(repoId), dir), MaxKeys: 250, ContinuationToken: nextContinuationToken, }); @@ -342,9 +360,11 @@ export default class S3Storage implements StorageBase { for (const f of req.Contents || []) { if (!f.Key) continue; const filename = basename(f.Key); - const prefix = dirname(f.Key.replace(dir, "")); + const prefix = dirname( + f.Key.replace(join(this.repoPath(repoId), dir), "") + ); - let rs = await this.read(f.Key); + let rs = await this.read(repoId, f.Key); if (opt?.fileTransformer) { // apply transformation on the stream rs = rs.pipe(opt.fileTransformer(f.Key)); diff --git a/src/storage/Storage.ts b/src/storage/Storage.ts new file mode 100644 index 0000000..315613a --- /dev/null +++ b/src/storage/Storage.ts @@ -0,0 +1,117 @@ +import { join } from "path"; +import { Transform, Readable } from "stream"; +import * as archiver from "archiver"; +import { Response } from "express"; + +import AnonymizedFile from "../AnonymizedFile"; +import { SourceBase, Tree } from "../types"; + +export enum FILE_TYPE { + FILE = "file", + FOLDER = "folder", + NOT_FOUND = "not_found", +} + +export default abstract class StorageBase { + /** + * The type of storage + */ + abstract type: string; + + /** + * check if the path exists + * @param path the path to check + */ + abstract exists(repoId: string, path: string): Promise; + + abstract send(repoId: string, path: string, res: Response): Promise; + + /** + * Read the content of a file + * @param path the path to the file + */ + abstract read(repoId: string, path: string): Promise; + + abstract fileInfo( + repoId: string, + path: string + ): Promise<{ + size: number | undefined; + lastModified: Date | undefined; + contentType: string; + }>; + + /** + * Write data to a file + * @param path the path to the file + * @param data the content of the file + * @param file the file + * @param source the source of the file + */ + abstract write( + repoId: string, + path: string, + data: string | Readable, + file?: AnonymizedFile, + source?: SourceBase + ): Promise; + + /** + * List the files from dir + * @param dir + */ + abstract listFiles(repoId: string, dir: string): Promise; + + /** + * Extract the content of tar to dir + * @param dir + * @param tar + * @param file the file + * @param source the source of the file + */ + abstract extractZip( + repoId: string, + dir: string, + tar: Readable, + file?: AnonymizedFile, + source?: SourceBase + ): Promise; + + /** + * Remove the path + * @param dir + */ + abstract rm(repoId: string, dir: string): Promise; + + /** + * Archive the content of dir + * @param dir + * @param opt + */ + abstract archive( + repoId: string, + dir: string, + opt?: { + /** + * Archive format + */ + format?: "zip" | "tar"; + /** + * Transformer to apply on the content of the file + */ + fileTransformer?: (p: string) => Transform; + } + ): Promise; + + /** + * Create a directory + * @param dir + */ + abstract mk(repoId: string, dir: string): Promise; + + repoPath(repoId: string) { + return ( + join(repoId, "original") + (process.platform === "win32" ? "\\" : "/") + ); + } +} diff --git a/src/types.ts b/src/types.ts index 48c6df6..bcebd8c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,9 +4,7 @@ import Zip from "./source/Zip"; import S3Storage from "./storage/S3"; import FileSystem from "./storage/FileSystem"; import AnonymizedFile from "./AnonymizedFile"; -import { Transform, Readable } from "stream"; -import * as archiver from "archiver"; -import { Response } from "express"; +import { Readable } from "stream"; export interface SourceBase { readonly type: string; @@ -32,104 +30,6 @@ export interface SourceBase { export type Source = GitHubDownload | GitHubStream | Zip; -export enum FILE_TYPE { - FILE = "file", - FOLDER = "folder", - NOT_FOUND = "not_found", -} - -export interface StorageBase { - /** - * The type of storage - */ - type: string; - - /** - * check if the path exists - * @param path the path to check - */ - exists(path: string): Promise; - - send(p: string, res: Response): Promise; - - /** - * Read the content of a file - * @param path the path to the file - */ - read(path: string): Promise; - - fileInfo(path: string): Promise<{ - size: number | undefined; - lastModified: Date | undefined; - contentType: string; - }>; - - /** - * Write data to a file - * @param path the path to the file - * @param data the content of the file - * @param file the file - * @param source the source of the file - */ - write( - path: string, - data: Buffer, - file?: AnonymizedFile, - source?: SourceBase - ): Promise; - - /** - * List the files from dir - * @param dir - */ - listFiles(dir: string): Promise; - - /** - * Extract the content of tar to dir - * @param dir - * @param tar - * @param file the file - * @param source the source of the file - */ - extractZip( - dir: string, - tar: Readable, - file?: AnonymizedFile, - source?: SourceBase - ): Promise; - - /** - * Remove the path - * @param dir - */ - rm(dir: string): Promise; - - /** - * Archive the content of dir - * @param dir - * @param opt - */ - archive( - dir: string, - opt?: { - /** - * Archive format - */ - format?: "zip" | "tar"; - /** - * Transformer to apply on the content of the file - */ - fileTransformer?: (p: string) => Transform; - } - ): Promise; - - /** - * Create a directory - * @param dir - */ - mk(dir: string): Promise; -} - export type Storage = S3Storage | FileSystem; export interface Branch {