diff --git a/Dockerfile b/Dockerfile index 03b10cf..386e2ba 100644 --- a/Dockerfile +++ b/Dockerfile @@ -16,6 +16,5 @@ COPY public ./public COPY src ./src RUN npm install && npm run build && npm cache clean --force -COPY opentelemetry.js . -CMD [ "node", "--require", "./opentelemetry.js", "./build/server/index.js"] \ No newline at end of file +CMD [ "node", "./build/server/index.js"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 7f2f70d..9449e47 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -37,7 +37,7 @@ services: mode: replicated replicas: 4 endpoint_mode: dnsrr - entrypoint: ["node", "--require", "./opentelemetry.js", "./build/streamer/index.js"] + entrypoint: ["node", "./build/streamer/index.js"] env_file: - ./.env volumes: @@ -89,30 +89,6 @@ services: timeout: 10s retries: 5 - opentelemetry: - image: otel/opentelemetry-collector - restart: always - command: ["--config=/etc/otel-collector-config.yaml"] - volumes: - - ./opentelemetry-collector.yml:/etc/otel-collector-config.yaml - depends_on: - - jaeger - - prometheus - - jaeger: - image: jaegertracing/all-in-one:latest - restart: always - ports: - - 127.0.0.1:16686:16686 - - prometheus: - image: prom/prometheus:latest - restart: always - volumes: - - ./prometheus.yaml:/etc/prometheus/prometheus.yml - ports: - - 127.0.0.1:9090:9090 - mongodb-backup: image: tiredofit/db-backup links: diff --git a/opentelemetry-collector.yml b/opentelemetry-collector.yml deleted file mode 100644 index f793701..0000000 --- a/opentelemetry-collector.yml +++ /dev/null @@ -1,40 +0,0 @@ -receivers: - otlp: - protocols: - grpc: - -exporters: - prometheus: - endpoint: "0.0.0.0:8889" - const_labels: - label1: value1 - - debug: - - otlp: - endpoint: jaeger:4317 - tls: - insecure: true - -processors: - batch: - -extensions: - health_check: - pprof: - endpoint: :1888 - zpages: - endpoint: :55679 - -service: - extensions: [health_check, pprof, zpages] - pipelines: - traces: - receivers: [otlp] - exporters: [debug, otlp] - metrics: - receivers: [otlp] - exporters: [debug, prometheus] - logs: - receivers: [otlp] - exporters: [debug] diff --git a/opentelemetry.js b/opentelemetry.js deleted file mode 100644 index 70a94e8..0000000 --- a/opentelemetry.js +++ /dev/null @@ -1,29 +0,0 @@ -const opentelemetry = require("@opentelemetry/sdk-node"); -const { - getNodeAutoInstrumentations, -} = require("@opentelemetry/auto-instrumentations-node"); -const { - OTLPTraceExporter, -} = require("@opentelemetry/exporter-trace-otlp-grpc"); -const { - OTLPMetricExporter, -} = require("@opentelemetry/exporter-metrics-otlp-grpc"); -const { PeriodicExportingMetricReader } = require("@opentelemetry/sdk-metrics"); -const { diag, DiagConsoleLogger, DiagLogLevel } = require("@opentelemetry/api"); - -// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO); - -const sdk = new opentelemetry.NodeSDK({ - serviceName: process.env.SERVICE_NAME || "Anonymous-GitHub", - logRecordProcessor: getNodeAutoInstrumentations().logRecordProcessor, - traceExporter: new OTLPTraceExporter({ - url: "http://opentelemetry:4317/v1/traces", - }), - metricReader: new PeriodicExportingMetricReader({ - exporter: new OTLPMetricExporter({ - url: "http://opentelemetry:4317/v1/metrics", - }), - }), - instrumentations: [getNodeAutoInstrumentations()], -}); -sdk.start(); diff --git a/package.json b/package.json index 341c983..a633c1b 100644 --- a/package.json +++ b/package.json @@ -35,15 +35,6 @@ "@aws-sdk/lib-storage": "^3.540.0", "@mongodb-js/zstd": "^1.2.0", "@octokit/rest": "^20.0.2", - "@opentelemetry/api": "^1.8.0", - "@opentelemetry/auto-instrumentations-node": "^0.43.0", - "@opentelemetry/exporter-metrics-otlp-grpc": "^0.49.1", - "@opentelemetry/exporter-metrics-otlp-proto": "^0.49.1", - "@opentelemetry/exporter-trace-otlp-grpc": "^0.49.1", - "@opentelemetry/exporter-trace-otlp-proto": "^0.49.1", - "@opentelemetry/sdk-metrics": "^1.22.0", - "@opentelemetry/sdk-node": "^0.49.1", - "@opentelemetry/sdk-trace-node": "^1.22.0", "@smithy/node-http-handler": "^2.5.0", "archiver": "^5.3.2", "bullmq": "^2.4.0", diff --git a/prometheus.yaml b/prometheus.yaml deleted file mode 100644 index 6bd7781..0000000 --- a/prometheus.yaml +++ /dev/null @@ -1,6 +0,0 @@ -scrape_configs: - - job_name: 'otel-collector' - scrape_interval: 10s - static_configs: - - targets: ['opentelemetry:8889'] - - targets: ['opentelemetry:8888'] \ No newline at end of file diff --git a/src/core/AnonymizedFile.ts b/src/core/AnonymizedFile.ts index a7064ae..c6cbbfa 100644 --- a/src/core/AnonymizedFile.ts +++ b/src/core/AnonymizedFile.ts @@ -1,7 +1,6 @@ import { join, basename, dirname } from "path"; import { Response } from "express"; import { Readable } from "stream"; -import { trace } from "@opentelemetry/api"; import { lookup } from "mime-types"; import got from "got"; @@ -35,96 +34,76 @@ export default class AnonymizedFile { } async sha() { - return trace - .getTracer("ano-file") - .startActiveSpan("AnnoFile.sha", async (span) => { - try { - span.setAttribute("anonymizedPath", this.anonymizedPath); - if (this._file) return this._file.sha?.replace(/"/g, ""); - this._file = await this.getFileInfo(); - return this._file.sha?.replace(/"/g, ""); - } finally { - span.end(); - } - }); + if (this._file) return this._file.sha?.replace(/"/g, ""); + this._file = await this.getFileInfo(); + return this._file.sha?.replace(/"/g, ""); } async getFileInfo(): Promise { - const span = trace.getTracer("ano-file").startSpan("AnnoFile.getFileInfo"); - span.setAttribute("repoId", this.repository.repoId); - span.setAttribute("file", this.anonymizedPath); + if (this._file) return this._file; + let fileDir = dirname(this.anonymizedPath); + if (fileDir == ".") fileDir = ""; + if (fileDir.endsWith("/")) fileDir = fileDir.slice(0, -1); + const filename = basename(this.anonymizedPath); - try { - if (this._file) return this._file; - let fileDir = dirname(this.anonymizedPath); - if (fileDir == ".") fileDir = ""; - if (fileDir.endsWith("/")) fileDir = fileDir.slice(0, -1); - const filename = basename(this.anonymizedPath); - - if (!this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { - if (this.anonymizedPath == "") { - return { - name: "", - path: "", - repoId: this.repository.repoId, - }; - } - const query: FilterQuery = { + if (!this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { + if (this.anonymizedPath == "") { + return { + name: "", + path: "", repoId: this.repository.repoId, - path: fileDir, }; - if (filename != "") query.name = filename; - const res = await FileModel.findOne(query); - if (res) { - this._file = res; - return res; - } - throw new AnonymousError("file_not_found", { - object: this, - httpStatus: 404, - }); } - - const pathQuery = fileDir - .split("/") - .map((p) => { - if (p.includes(config.ANONYMIZATION_MASK)) { - return "[^/]+"; - } - return p; - }) - .join("/"); - const nameQuery = filename.replace( - new RegExp(config.ANONYMIZATION_MASK + "(-[0-9]+)?"), - "[^/]+" - ); - - const candidates = await FileModel.find({ + const query: FilterQuery = { repoId: this.repository.repoId, - path: new RegExp(pathQuery), - name: new RegExp(nameQuery), - }).exec(); - - for (const candidate of candidates) { - const candidatePath = join(candidate.path, candidate.name); - if ( - anonymizePath(candidatePath, this.repository.options.terms || []) == - this.anonymizedPath - ) { - this._file = candidate; - return candidate; - } + path: fileDir, + }; + if (filename != "") query.name = filename; + const res = await FileModel.findOne(query); + if (res) { + this._file = res; + return res; } throw new AnonymousError("file_not_found", { object: this, httpStatus: 404, }); - } catch (error) { - span.recordException(error as Error); - throw error; - } finally { - span.end(); } + + const pathQuery = fileDir + .split("/") + .map((p) => { + if (p.includes(config.ANONYMIZATION_MASK)) { + return "[^/]+"; + } + return p; + }) + .join("/"); + const nameQuery = filename.replace( + new RegExp(config.ANONYMIZATION_MASK + "(-[0-9]+)?"), + "[^/]+" + ); + + const candidates = await FileModel.find({ + repoId: this.repository.repoId, + path: new RegExp(pathQuery), + name: new RegExp(nameQuery), + }).exec(); + + for (const candidate of candidates) { + const candidatePath = join(candidate.path, candidate.name); + if ( + anonymizePath(candidatePath, this.repository.options.terms || []) == + this.anonymizedPath + ) { + this._file = candidate; + return candidate; + } + } + throw new AnonymousError("file_not_found", { + object: this, + httpStatus: 404, + }); } /** @@ -133,24 +112,16 @@ export default class AnonymizedFile { * @returns the origin relative path of the file */ async originalPath(): Promise { - const span = trace.getTracer("ano-file").startSpan("AnnoFile.originalPath"); - span.setAttribute("repoId", this.repository.repoId); - span.setAttribute("file", this.anonymizedPath); - try { - span.setAttribute("anonymizedPath", this.anonymizedPath); - if (this.anonymizedPath == null) { - throw new AnonymousError("path_not_specified", { - object: this, - httpStatus: 400, - }); - } - if (!this._file) { - this._file = await this.getFileInfo(); - } - return join(this._file.path, this._file.name); - } finally { - span.end(); + if (this.anonymizedPath == null) { + throw new AnonymousError("path_not_specified", { + object: this, + httpStatus: 400, + }); } + if (!this._file) { + this._file = await this.getFileInfo(); + } + return join(this._file.path, this._file.name); } extension() { const filename = basename(this._file?.name || this.anonymizedPath); @@ -188,59 +159,39 @@ export default class AnonymizedFile { } async content(): Promise { - return trace - .getTracer("ano-file") - .startActiveSpan("content", async (span) => { - try { - if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { - await this.originalPath(); - } - span.addEvent("filePath", { originalPath: this.filePath }); - if (this._file?.size && this._file?.size > config.MAX_FILE_SIZE) { - throw new AnonymousError("file_too_big", { - object: this, - httpStatus: 403, - }); - } - const content = await this.repository.source?.getFileContent(this); - if ( - !this.repository.model.isReseted || - this.repository.status != RepositoryStatus.READY - ) { - this.repository.model.isReseted = false; - await this.repository.updateStatus(RepositoryStatus.READY); - } - return content; - } finally { - span.end(); - } + if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { + await this.originalPath(); + } + if (this._file?.size && this._file?.size > config.MAX_FILE_SIZE) { + throw new AnonymousError("file_too_big", { + object: this, + httpStatus: 403, }); + } + const content = await this.repository.source?.getFileContent(this); + if ( + !this.repository.model.isReseted || + this.repository.status != RepositoryStatus.READY + ) { + this.repository.model.isReseted = false; + await this.repository.updateStatus(RepositoryStatus.READY); + } + return content; } async anonymizedContent() { - const span = trace.getTracer("ano-file").startSpan("Repository.conference"); - span.setAttribute("anonymizedPath", this.anonymizedPath); const anonymizer = this.repository.generateAnonymizeTransformer( this.anonymizedPath ); if (!config.STREAMER_ENTRYPOINT) { // collect the content locally const content = await this.content(); - return content.pipe(anonymizer).on("close", () => { - span.end(); - }); + return content.pipe(anonymizer); } - // const cacheableLookup = new CacheableLookup(); - // const hostName = new URL(config.STREAMER_ENTRYPOINT).hostname; - // const ipHost = await cacheableLookup.lookupAsync(hostName); - // use the streamer service return got.stream(join(config.STREAMER_ENTRYPOINT, "api"), { method: "POST", - // lookup: cacheableLookup.lookup, - // host: ipHost.address, - // dnsCache: cacheableLookup, json: { token: await this.repository.getToken(), repoFullName: this.repository.model.source.repositoryName, @@ -268,107 +219,89 @@ export default class AnonymizedFile { return join(this._file.path, this._file.name); } - // cacheableLookup = new CacheableLookup({ - // maxTtl: 60, - // }); - async send(res: Response): Promise { const anonymizer = this.repository.generateAnonymizeTransformer( this.anonymizedPath ); - return trace - .getTracer("ano-file") - .startActiveSpan("AnonymizedFile.send", async (span) => { - span.setAttribute("repoId", this.repository.repoId); - span.setAttribute("anonymizedPath", this.anonymizedPath); - // eslint-disable-next-line no-async-promise-executor - return new Promise(async (resolve, reject) => { - try { - if (config.STREAMER_ENTRYPOINT) { - // use the streamer service - const [sha, token] = await Promise.all([ - this.sha(), - this.repository.getToken(), - ]); - const resStream = got - .stream(join(config.STREAMER_ENTRYPOINT, "api"), { - method: "POST", - json: { - sha, - token, - repoFullName: this.repository.model.source.repositoryName, - commit: this.repository.model.source.commit, - branch: this.repository.model.source.branch, - repoId: this.repository.repoId, - filePath: this.filePath, - anonymizerOptions: anonymizer.opt, - }, - }) - .on("error", (err) => { - span.recordException(err); - handleError( - new AnonymousError("file_not_found", { - object: this, - httpStatus: 404, - }), - res - ); - }); - resStream.pipe(res); - res.on("close", () => { - span.end(); - resolve(); - }); - res.on("error", (err) => { - reject(err); - span.recordException(err); - span.end(); - }); - return; - } - - const mime = lookup(this.anonymizedPath); - if (mime && this.extension() != "ts") { - res.contentType(mime); - } else if (isTextFile(this.anonymizedPath)) { - res.contentType("text/plain"); - } - res.header("Accept-Ranges", "none"); - anonymizer.once("transform", (data) => { - if (!mime && data.isText) { - res.contentType("text/plain"); - } - if (!data.wasAnonimized && this._file?.size) { - // the text files may be anonymized and therefore the size may be different - res.header("Content-Length", this._file?.size.toString()); - } + // eslint-disable-next-line no-async-promise-executor + return new Promise(async (resolve, reject) => { + try { + if (config.STREAMER_ENTRYPOINT) { + // use the streamer service + const [sha, token] = await Promise.all([ + this.sha(), + this.repository.getToken(), + ]); + const resStream = got + .stream(join(config.STREAMER_ENTRYPOINT, "api"), { + method: "POST", + json: { + sha, + token, + repoFullName: this.repository.model.source.repositoryName, + commit: this.repository.model.source.commit, + branch: this.repository.model.source.branch, + repoId: this.repository.repoId, + filePath: this.filePath, + anonymizerOptions: anonymizer.opt, + }, + }) + .on("error", (err) => { + handleError( + new AnonymousError("file_not_found", { + object: this, + httpStatus: 404, + }), + res + ); }); - const content = await this.content(); - function handleStreamError(error: Error) { - if (!content.closed && !content.destroyed) { - content.destroy(); - } - span.recordException(error); - span.end(); - reject(error); - // handleError(error, res); - } - content - .on("error", handleStreamError) - .pipe(anonymizer) - .pipe(res) - .on("error", handleStreamError) - .on("close", () => { - if (!content.closed && !content.destroyed) { - content.destroy(); - } - span.end(); - resolve(); - }); - } catch (error) { - handleError(error, res); + resStream.pipe(res); + res.on("close", () => { + resolve(); + }); + res.on("error", (err) => { + reject(err); + }); + return; + } + + const mime = lookup(this.anonymizedPath); + if (mime && this.extension() != "ts") { + res.contentType(mime); + } else if (isTextFile(this.anonymizedPath)) { + res.contentType("text/plain"); + } + res.header("Accept-Ranges", "none"); + anonymizer.once("transform", (data) => { + if (!mime && data.isText) { + res.contentType("text/plain"); + } + if (!data.wasAnonimized && this._file?.size) { + // the text files may be anonymized and therefore the size may be different + res.header("Content-Length", this._file?.size.toString()); } }); - }); + const content = await this.content(); + function handleStreamError(error: Error) { + if (!content.closed && !content.destroyed) { + content.destroy(); + } + reject(error); + } + content + .on("error", handleStreamError) + .pipe(anonymizer) + .pipe(res) + .on("error", handleStreamError) + .on("close", () => { + if (!content.closed && !content.destroyed) { + content.destroy(); + } + resolve(); + }); + } catch (error) { + handleError(error, res); + } + }); } } diff --git a/src/core/GitHubUtils.ts b/src/core/GitHubUtils.ts index 4366eb0..f16877e 100644 --- a/src/core/GitHubUtils.ts +++ b/src/core/GitHubUtils.ts @@ -1,4 +1,3 @@ -import { trace } from "@opentelemetry/api"; import { Octokit } from "@octokit/rest"; import Repository from "./Repository"; @@ -26,80 +25,74 @@ export async function checkToken(token: string) { } export async function getToken(repository: Repository) { - const span = trace.getTracer("ano-file").startSpan("GHUtils.getToken"); - span.setAttribute("repoId", repository.repoId); console.log("getToken", repository.repoId); - try { - // if (repository.model.source.accessToken) { - // // only check the token if the repo has been visited less than 10 minutes ago - // if ( - // repository.status == RepositoryStatus.READY && - // repository.model.lastView > new Date(Date.now() - 1000 * 60 * 10) - // ) { - // return repository.model.source.accessToken; - // } else if (await checkToken(repository.model.source.accessToken)) { - // return repository.model.source.accessToken; - // } - // } - if (!repository.owner.model.accessTokens?.github) { - const query = await UserModel.findById(repository.owner.id, { - accessTokens: 1, - accessTokenDates: 1, - }); - if (query?.accessTokens) { - repository.owner.model.accessTokens = query.accessTokens; - repository.owner.model.accessTokenDates = query.accessTokenDates; - } + // if (repository.model.source.accessToken) { + // // only check the token if the repo has been visited less than 10 minutes ago + // if ( + // repository.status == RepositoryStatus.READY && + // repository.model.lastView > new Date(Date.now() - 1000 * 60 * 10) + // ) { + // return repository.model.source.accessToken; + // } else if (await checkToken(repository.model.source.accessToken)) { + // return repository.model.source.accessToken; + // } + // } + if (!repository.owner.model.accessTokens?.github) { + const query = await UserModel.findById(repository.owner.id, { + accessTokens: 1, + accessTokenDates: 1, + }); + if (query?.accessTokens) { + repository.owner.model.accessTokens = query.accessTokens; + repository.owner.model.accessTokenDates = query.accessTokenDates; } - const ownerAccessToken = repository.owner.model.accessTokens?.github; - if (ownerAccessToken) { - const tokenAge = repository.owner.model.accessTokenDates?.github; - // if the token is older than 7 days, refresh it - if ( - !tokenAge || - tokenAge < new Date(Date.now() - 1000 * 60 * 60 * 24 * 7) - ) { - const url = `https://api.github.com/applications/${config.CLIENT_ID}/token`; - const headers = { - Accept: "application/vnd.github+json", - "X-GitHub-Api-Version": "2022-11-28", - }; - - const res = await fetch(url, { - method: "PATCH", - body: JSON.stringify({ - access_token: ownerAccessToken, - }), - credentials: "include", - headers: { - ...headers, - Authorization: - "Basic " + - Buffer.from( - config.CLIENT_ID + ":" + config.CLIENT_SECRET - ).toString("base64"), - }, - }); - const resBody = (await res.json()) as { token: string }; - repository.owner.model.accessTokens.github = resBody.token; - if (!repository.owner.model.accessTokenDates) { - repository.owner.model.accessTokenDates = { - github: new Date(), - }; - } else { - repository.owner.model.accessTokenDates.github = new Date(); - } - await repository.owner.model.save(); - return resBody.token; - } - const check = await checkToken(ownerAccessToken); - if (check) { - repository.model.source.accessToken = ownerAccessToken; - return ownerAccessToken; - } - } - return config.GITHUB_TOKEN; - } finally { - span.end(); } + const ownerAccessToken = repository.owner.model.accessTokens?.github; + if (ownerAccessToken) { + const tokenAge = repository.owner.model.accessTokenDates?.github; + // if the token is older than 7 days, refresh it + if ( + !tokenAge || + tokenAge < new Date(Date.now() - 1000 * 60 * 60 * 24 * 7) + ) { + const url = `https://api.github.com/applications/${config.CLIENT_ID}/token`; + const headers = { + Accept: "application/vnd.github+json", + "X-GitHub-Api-Version": "2022-11-28", + }; + + const res = await fetch(url, { + method: "PATCH", + body: JSON.stringify({ + access_token: ownerAccessToken, + }), + credentials: "include", + headers: { + ...headers, + Authorization: + "Basic " + + Buffer.from( + config.CLIENT_ID + ":" + config.CLIENT_SECRET + ).toString("base64"), + }, + }); + const resBody = (await res.json()) as { token: string }; + repository.owner.model.accessTokens.github = resBody.token; + if (!repository.owner.model.accessTokenDates) { + repository.owner.model.accessTokenDates = { + github: new Date(), + }; + } else { + repository.owner.model.accessTokenDates.github = new Date(); + } + await repository.owner.model.save(); + return resBody.token; + } + const check = await checkToken(ownerAccessToken); + if (check) { + repository.model.source.accessToken = ownerAccessToken; + return ownerAccessToken; + } + } + return config.GITHUB_TOKEN; } diff --git a/src/core/Repository.ts b/src/core/Repository.ts index 5ecfcc3..7113928 100644 --- a/src/core/Repository.ts +++ b/src/core/Repository.ts @@ -20,7 +20,6 @@ import { getRepositoryFromGitHub, GitHubRepository, } from "./source/GitHubRepository"; -import { trace } from "@opentelemetry/api"; import { getToken } from "./GitHubUtils"; import { FILE_TYPE } from "./storage/Storage"; import config from "../config"; @@ -152,44 +151,38 @@ export default class Repository { force: false, } ): Promise { - const span = trace.getTracer("ano-file").startSpan("Repository.files"); - span.setAttribute("repoId", this.repoId); - try { - const hasFile = await FileModel.exists({ repoId: this.repoId }).exec(); - if (!hasFile || opt.force) { - await FileModel.deleteMany({ repoId: this.repoId }).exec(); - const files = await this.source.getFiles(opt.progress); - files.forEach((f) => (f.repoId = this.repoId)); - await FileModel.insertMany(files); + const hasFile = await FileModel.exists({ repoId: this.repoId }).exec(); + if (!hasFile || opt.force) { + await FileModel.deleteMany({ repoId: this.repoId }).exec(); + const files = await this.source.getFiles(opt.progress); + files.forEach((f) => (f.repoId = this.repoId)); + await FileModel.insertMany(files); - this._model.size = { storage: 0, file: 0 }; - await this.computeSize(); - } - if (opt.path?.includes(config.ANONYMIZATION_MASK)) { - const f = new AnonymizedFile({ - repository: this, - anonymizedPath: opt.path, - }); - opt.path = await f.originalPath(); - } - - let pathQuery: string | RegExp | undefined = opt.path - ? new RegExp(`^${opt.path}`) - : undefined; - if (opt.recursive === false) { - pathQuery = opt.path ? new RegExp(`^${opt.path}$`) : ""; - } - - const query: FilterQuery = { - repoId: this.repoId, - }; - if (pathQuery !== undefined) { - query.path = pathQuery; - } - return await FileModel.find(query).exec(); - } finally { - span.end(); + this._model.size = { storage: 0, file: 0 }; + await this.computeSize(); } + if (opt.path?.includes(config.ANONYMIZATION_MASK)) { + const f = new AnonymizedFile({ + repository: this, + anonymizedPath: opt.path, + }); + opt.path = await f.originalPath(); + } + + let pathQuery: string | RegExp | undefined = opt.path + ? new RegExp(`^${opt.path}`) + : undefined; + if (opt.recursive === false) { + pathQuery = opt.path ? new RegExp(`^${opt.path}$`) : ""; + } + + const query: FilterQuery = { + repoId: this.repoId, + }; + if (pathQuery !== undefined) { + query.path = pathQuery; + } + return await FileModel.find(query).exec(); } /** @@ -276,11 +269,6 @@ export default class Repository { * @returns void */ async updateIfNeeded(opt?: { force: boolean }): Promise { - const span = trace - .getTracer("ano-file") - .startSpan("Repository.updateIfNeeded"); - span.setAttribute("repoId", this.repoId); - if ( this._model.options.expirationMode !== "never" && this.status != RepositoryStatus.EXPIRED && @@ -344,8 +332,6 @@ export default class Repository { this.status == RepositoryStatus.READY ) { console.log(`[UPDATE] ${this._model.repoId} is up to date`); - span.setAttribute("status", "up_to_date"); - span.end(); return; } this._model.source.commit = newCommit; @@ -368,8 +354,6 @@ export default class Repository { ); await this.updateStatus(RepositoryStatus.ERROR, "branch_not_found"); await this.resetSate(); - span.setAttribute("status", "branch_not_found"); - span.end(); throw new AnonymousError("branch_not_found", { object: this, }); @@ -386,7 +370,6 @@ export default class Repository { }); } } - span.end(); } /** * Download the require state for the repository to work @@ -394,10 +377,7 @@ export default class Repository { * @returns void */ async anonymize(progress?: (status: string) => void) { - const span = trace.getTracer("ano-file").startSpan("Repository.anonymize"); - span.setAttribute("repoId", this.repoId); if (this.status === RepositoryStatus.READY) { - span.end(); return; } this.model.increment(); @@ -418,23 +398,16 @@ export default class Repository { } await this.updateStatus(RepositoryStatus.READY); await this.computeSize(); - span.end(); } /** * Update the last view and view count */ async countView() { - const span = trace.getTracer("ano-file").startSpan("Repository.countView"); - span.setAttribute("repoId", this.repoId); - try { - this._model.lastView = new Date(); - this._model.pageView = (this._model.pageView || 0) + 1; - if (!isConnected) return this.model; - await this._model.save(); - } finally { - span.end(); - } + this._model.lastView = new Date(); + this._model.pageView = (this._model.pageView || 0) + 1; + if (!isConnected) return this.model; + await this._model.save(); } /** @@ -443,54 +416,36 @@ export default class Repository { * @param errorMessage a potential error message to display */ async updateStatus(status: RepositoryStatus, statusMessage?: string) { - const span = trace - .getTracer("ano-file") - .startSpan("Repository.updateStatus"); - span.setAttribute("repoId", this.repoId); - span.setAttribute("status", status); - span.setAttribute("statusMessage", statusMessage || ""); - try { - if (!status) return this.model; - this._model.status = status; - this._model.statusDate = new Date(); - this._model.statusMessage = statusMessage; - if (!isConnected) return this.model; - await this._model.save(); - } finally { - span.end(); - } + if (!status) return this.model; + this._model.status = status; + this._model.statusDate = new Date(); + this._model.statusMessage = statusMessage; + if (!isConnected) return this.model; + await this._model.save(); } /** * Expire the repository */ async expire() { - const span = trace.getTracer("ano-file").startSpan("Repository.expire"); - span.setAttribute("repoId", this.repoId); await this.updateStatus(RepositoryStatus.EXPIRING); await this.resetSate(); await this.updateStatus(RepositoryStatus.EXPIRED); - span.end(); } /** * Remove the repository */ async remove() { - const span = trace.getTracer("ano-file").startSpan("Repository.remove"); - span.setAttribute("repoId", this.repoId); await this.updateStatus(RepositoryStatus.REMOVING); await this.resetSate(); await this.updateStatus(RepositoryStatus.REMOVED); - span.end(); } /** * Reset/delete the state of the repository */ async resetSate(status?: RepositoryStatus, statusMessage?: string) { - const span = trace.getTracer("ano-file").startSpan("Repository.resetState"); - span.setAttribute("repoId", this.repoId); // remove attribute this._model.size = { storage: 0, file: 0 }; if (status) { @@ -502,7 +457,6 @@ export default class Repository { this.removeCache(), ]); console.log(`[RESET] ${this._model.repoId} has been reset`); - span.end(); } /** @@ -510,22 +464,14 @@ export default class Repository { * @returns */ async removeCache() { - const span = trace - .getTracer("ano-file") - .startSpan("Repository.removeCache"); - span.setAttribute("repoId", this.repoId); - try { - await storage.rm(this.repoId); - this.model.isReseted = true; - if (isConnected) { - try { - await this.model.save(); - } catch (error) { - console.error("[ERROR] removeCache save", error); - } + await storage.rm(this.repoId); + this.model.isReseted = true; + if (isConnected) { + try { + await this.model.save(); + } catch (error) { + console.error("[ERROR] removeCache save", error); } - } finally { - span.end(); } } @@ -544,39 +490,31 @@ export default class Repository { */ file: number; }> { - const span = trace - .getTracer("ano-file") - .startSpan("Repository.computeSize"); - span.setAttribute("repoId", this.repoId); - try { - if (this.status !== RepositoryStatus.READY) - return { storage: 0, file: 0 }; - if (this._model.size.file) return this._model.size; - const res = await FileModel.aggregate([ - { - $match: { - repoId: this.repoId, - }, + if (this.status !== RepositoryStatus.READY) + return { storage: 0, file: 0 }; + if (this._model.size.file) return this._model.size; + const res = await FileModel.aggregate([ + { + $match: { + repoId: this.repoId, }, - { - $group: { - _id: "$repoId", - storage: { $sum: "$size" }, - file: { $sum: 1 }, - }, + }, + { + $group: { + _id: "$repoId", + storage: { $sum: "$size" }, + file: { $sum: 1 }, }, - ]); - this._model.size = { - storage: res[0]?.storage || 0, - file: res[0]?.file || 0, - }; - if (isConnected) { - await this._model.save(); - } - return this._model.size; - } finally { - span.end(); + }, + ]); + this._model.size = { + storage: res[0]?.storage || 0, + file: res[0]?.file || 0, + }; + if (isConnected) { + await this._model.save(); } + return this._model.size; } /** @@ -585,20 +523,14 @@ export default class Repository { * @returns conference of the repository */ async conference(): Promise { - const span = trace.getTracer("ano-file").startSpan("Repository.conference"); - span.setAttribute("repoId", this.repoId); - try { - if (!this._model.conference) { - return null; - } - const conference = await ConferenceModel.findOne({ - conferenceID: this._model.conference, - }); - if (conference) return new Conference(conference); + if (!this._model.conference) { return null; - } finally { - span.end(); } + const conference = await ConferenceModel.findOne({ + conferenceID: this._model.conference, + }); + if (conference) return new Conference(conference); + return null; } /***** Getters ********/ diff --git a/src/core/User.ts b/src/core/User.ts index 59b8aed..ca42b58 100644 --- a/src/core/User.ts +++ b/src/core/User.ts @@ -1,5 +1,3 @@ -import { trace } from "@opentelemetry/api"; - import AnonymizedRepositoryModel from "./model/anonymizedRepositories/anonymizedRepositories.model"; import RepositoryModel from "./model/repositories/repositories.model"; import { IUserDocument } from "./model/users/users.types"; @@ -57,10 +55,6 @@ export default class User { */ force: boolean; }): Promise { - const span = trace - .getTracer("ano-file") - .startSpan("User.getGitHubRepositories"); - span.setAttribute("username", this.username); if ( !this._model.repositories || this._model.repositories.length == 0 || @@ -111,13 +105,11 @@ export default class User { // have the model await this._model.save(); - span.end(); return repositories.map((r) => new GitHubRepository(r)); } else { const out = ( await RepositoryModel.find({ _id: { $in: this._model.repositories } }) ).map((i) => new GitHubRepository(i)); - span.end(); return out; } } @@ -127,8 +119,6 @@ export default class User { * @returns the list of anonymized repositories */ async getRepositories() { - const span = trace.getTracer("ano-file").startSpan("User.getRepositories"); - span.setAttribute("username", this.username); const repositories = ( await AnonymizedRepositoryModel.find({ owner: this.id, @@ -147,7 +137,6 @@ export default class User { } } await Promise.all(promises); - span.end(); return repositories; } /** @@ -155,8 +144,6 @@ export default class User { * @returns the list of anonymized repositories */ async getPullRequests() { - const span = trace.getTracer("ano-file").startSpan("User.getPullRequests"); - span.setAttribute("username", this.username); const pullRequests = ( await AnonymizedPullRequestModel.find({ owner: this.id, @@ -175,7 +162,6 @@ export default class User { } } await Promise.all(promises); - span.end(); return pullRequests; } diff --git a/src/core/anonymize-utils.ts b/src/core/anonymize-utils.ts index 3bd647a..8d4c523 100644 --- a/src/core/anonymize-utils.ts +++ b/src/core/anonymize-utils.ts @@ -1,7 +1,6 @@ import { basename } from "path"; import { Transform, Readable } from "stream"; import { isText } from "istextorbinary"; -import { trace } from "@opentelemetry/api"; import config from "../config"; @@ -49,30 +48,24 @@ export class AnonymizeTransformer extends Transform { } _transform(chunk: Buffer, encoding: string, callback: () => void) { - trace - .getTracer("ano-file") - .startActiveSpan("AnonymizeTransformer.transform", async (span) => { - span.setAttribute("path", this.opt.filePath); - if (this.isText === null) { - this.isText = isTextFile(this.opt.filePath, chunk); - } - if (this.isText) { - const content = this.anonimizer.anonymize(chunk.toString()); - if (this.anonimizer.wasAnonymized) { - chunk = Buffer.from(content); - } - } + if (this.isText === null) { + this.isText = isTextFile(this.opt.filePath, chunk); + } + if (this.isText) { + const content = this.anonimizer.anonymize(chunk.toString()); + if (this.anonimizer.wasAnonymized) { + chunk = Buffer.from(content); + } + } - this.emit("transform", { - isText: this.isText, - wasAnonimized: this.wasAnonimized, - chunk, - }); + this.emit("transform", { + isText: this.isText, + wasAnonimized: this.wasAnonimized, + chunk, + }); - this.push(chunk); - span.end(); - callback(); - }); + this.push(chunk); + callback(); } } @@ -179,48 +172,30 @@ export class ContentAnonimizer { } anonymize(content: string) { - const span = trace - .getTracer("ano-file") - .startSpan("ContentAnonimizer.anonymize"); - try { - content = this.removeImage(content); - span.addEvent("removeImage"); - content = this.removeLink(content); - span.addEvent("removeLink"); - content = this.replaceGitHubSelfLinks(content); - span.addEvent("replaceGitHubSelfLinks"); - content = this.replaceTerms(content); - span.addEvent("replaceTerms"); - return content; - } finally { - span.end(); - } + content = this.removeImage(content); + content = this.removeLink(content); + content = this.replaceGitHubSelfLinks(content); + content = this.replaceTerms(content); + return content; } } export function anonymizePath(path: string, terms: string[]) { - return trace - .getTracer("ano-file") - .startActiveSpan("utils.anonymizePath", (span) => { - span.setAttribute("path", path); - for (let i = 0; i < terms.length; i++) { - let term = terms[i]; - if (term.trim() == "") { - continue; - } - try { - new RegExp(term, "gi"); - } catch { - // escape regex characters - term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&"); - } - path = path.replace( - new RegExp(term, "gi"), - config.ANONYMIZATION_MASK + "-" + (i + 1) - ); - } - span.setAttribute("return", path); - span.end(); - return path; - }); + for (let i = 0; i < terms.length; i++) { + let term = terms[i]; + if (term.trim() == "") { + continue; + } + try { + new RegExp(term, "gi"); + } catch { + // escape regex characters + term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&"); + } + path = path.replace( + new RegExp(term, "gi"), + config.ANONYMIZATION_MASK + "-" + (i + 1) + ); + } + return path; } diff --git a/src/core/source/GitHubDownload.ts b/src/core/source/GitHubDownload.ts index faad582..0006b12 100644 --- a/src/core/source/GitHubDownload.ts +++ b/src/core/source/GitHubDownload.ts @@ -4,7 +4,6 @@ import { OctokitResponse } from "@octokit/types"; import storage from "../storage"; import GitHubBase, { GitHubBaseData } from "./GitHubBase"; -import { trace } from "@opentelemetry/api"; import { FILE_TYPE } from "../storage/Storage"; import { octokit } from "../GitHubUtils"; import AnonymousError from "../AnonymousError"; @@ -27,47 +26,39 @@ export default class GitHubDownload extends GitHubBase { } async download(progress?: (status: string) => void) { - const span = trace.getTracer("ano-file").startSpan("GHDownload.download"); - span.setAttribute("repoId", this.data.repoId); + let response: OctokitResponse; try { - let response: OctokitResponse; - try { - response = await this.getZipUrl(); - } catch (error) { - span.recordException(error as Error); - throw new AnonymousError("repo_not_found", { - httpStatus: (error as any).status || 404, - object: this.data, - cause: error as Error, - }); - } - await storage.mk(this.data.repoId); - try { - const downloadStream = got.stream(response.url); - downloadStream.addListener( - "downloadProgress", - (p: { transferred?: number }) => { - if (progress && p.transferred) { - progress("Repository download: " + humanFileSize(p.transferred)); - } + response = await this.getZipUrl(); + } catch (error) { + throw new AnonymousError("repo_not_found", { + httpStatus: (error as any).status || 404, + object: this.data, + cause: error as Error, + }); + } + await storage.mk(this.data.repoId); + try { + const downloadStream = got.stream(response.url); + downloadStream.addListener( + "downloadProgress", + (p: { transferred?: number }) => { + if (progress && p.transferred) { + progress("Repository download: " + humanFileSize(p.transferred)); } - ); - await storage.extractZip( - this.data.repoId, - "", - downloadStream, - this.type - ); - } catch (error) { - span.recordException(error as Error); - throw new AnonymousError("unable_to_download", { - httpStatus: 500, - cause: error as Error, - object: this.data, - }); - } - } finally { - span.end(); + } + ); + await storage.extractZip( + this.data.repoId, + "", + downloadStream, + this.type + ); + } catch (error) { + throw new AnonymousError("unable_to_download", { + httpStatus: 500, + cause: error as Error, + object: this.data, + }); } } @@ -75,29 +66,21 @@ export default class GitHubDownload extends GitHubBase { file: AnonymizedFile, progress?: (status: string) => void ): Promise { - const span = trace - .getTracer("ano-file") - .startSpan("GHDownload.getFileContent"); - span.setAttribute("repoId", this.data.repoId); - try { - const exists = await storage.exists(this.data.repoId, file.filePath); - if (exists === FILE_TYPE.FILE) { - return storage.read(this.data.repoId, file.filePath); - } else if (exists === FILE_TYPE.FOLDER) { - throw new AnonymousError("folder_not_supported", { - httpStatus: 400, - object: file, - }); - } - // will throw an error if the file is not in the repository - await file.originalPath(); - - // the cache is not ready, we need to download the repository - await this.download(progress); + const exists = await storage.exists(this.data.repoId, file.filePath); + if (exists === FILE_TYPE.FILE) { return storage.read(this.data.repoId, file.filePath); - } finally { - span.end(); + } else if (exists === FILE_TYPE.FOLDER) { + throw new AnonymousError("folder_not_supported", { + httpStatus: 400, + object: file, + }); } + // will throw an error if the file is not in the repository + await file.originalPath(); + + // the cache is not ready, we need to download the repository + await this.download(progress); + return storage.read(this.data.repoId, file.filePath); } async getFiles(progress?: (status: string) => void) { diff --git a/src/core/source/GitHubRepository.ts b/src/core/source/GitHubRepository.ts index f12bdd9..5071f23 100644 --- a/src/core/source/GitHubRepository.ts +++ b/src/core/source/GitHubRepository.ts @@ -1,7 +1,6 @@ import { Branch } from "../types"; import * as gh from "parse-github-url"; import { RestEndpointMethodTypes } from "@octokit/rest"; -import { trace } from "@opentelemetry/api"; import AnonymousError from "../AnonymousError"; import { isConnected } from "../../server/database"; @@ -55,81 +54,64 @@ export class GitHubRepository { accessToken: string; } ) { - const span = trace - .getTracer("ano-file") - .startSpan("GHRepository.getCommitInfo"); - span.setAttribute("owner", this.owner); - span.setAttribute("repo", this.repo); - try { - const oct = octokit(opt.accessToken); - const commit = await oct.repos.getCommit({ - owner: this.owner, - repo: this.repo, - ref: sha, - }); - return commit.data; - } finally { - span.end(); - } + const oct = octokit(opt.accessToken); + const commit = await oct.repos.getCommit({ + owner: this.owner, + repo: this.repo, + ref: sha, + }); + return commit.data; } async branches(opt: { accessToken: string; force?: boolean; }): Promise { - const span = trace.getTracer("ano-file").startSpan("GHRepository.branches"); - span.setAttribute("owner", this.owner); - span.setAttribute("repo", this.repo); - try { - if ( - !this._data.branches || - this._data.branches.length == 0 || - opt?.force === true - ) { - // get the list of repo from github - const oct = octokit(opt.accessToken); - try { - const branches = ( - await oct.paginate("GET /repos/{owner}/{repo}/branches", { - owner: this.owner, - repo: this.repo, - per_page: 100, - }) - ).map((b) => { - return { - name: b.name, - commit: b.commit.sha, - readme: this._data.branches?.filter( - (f: Branch) => f.name == b.name - )[0]?.readme, - } as Branch; - }); - this._data.branches = branches; - if (isConnected) { - await RepositoryModel.updateOne( - { externalId: this.id }, - { $set: { branches } } - ); - } - } catch (error) { - span.recordException(error as Error); - throw new AnonymousError("repo_not_found", { - httpStatus: (error as any).status, - cause: error as Error, - object: this, - }); + if ( + !this._data.branches || + this._data.branches.length == 0 || + opt?.force === true + ) { + // get the list of repo from github + const oct = octokit(opt.accessToken); + try { + const branches = ( + await oct.paginate("GET /repos/{owner}/{repo}/branches", { + owner: this.owner, + repo: this.repo, + per_page: 100, + }) + ).map((b) => { + return { + name: b.name, + commit: b.commit.sha, + readme: this._data.branches?.filter( + (f: Branch) => f.name == b.name + )[0]?.readme, + } as Branch; + }); + this._data.branches = branches; + if (isConnected) { + await RepositoryModel.updateOne( + { externalId: this.id }, + { $set: { branches } } + ); } - } else if (isConnected) { - const q = await RepositoryModel.findOne({ externalId: this.id }).select( - "branches" - ); - this._data.branches = q?.branches; + } catch (error) { + throw new AnonymousError("repo_not_found", { + httpStatus: (error as any).status, + cause: error as Error, + object: this, + }); } - - return this._data.branches || []; - } finally { - span.end(); + } else if (isConnected) { + const q = await RepositoryModel.findOne({ externalId: this.id }).select( + "branches" + ); + this._data.branches = q?.branches; } + + return this._data.branches || []; } async readme(opt: { @@ -137,60 +119,52 @@ export class GitHubRepository { force?: boolean; accessToken: string; }): Promise { - const span = trace.getTracer("ano-file").startSpan("GHRepository.readme"); - span.setAttribute("owner", this.owner); - span.setAttribute("repo", this.repo); - try { - if (!opt.branch) opt.branch = this._data.defaultBranch || "master"; + if (!opt.branch) opt.branch = this._data.defaultBranch || "master"; - const model = await RepositoryModel.findOne({ - externalId: this.id, - }).select("branches"); + const model = await RepositoryModel.findOne({ + externalId: this.id, + }).select("branches"); - if (!model) { - throw new AnonymousError("repo_not_found", { httpStatus: 404 }); - } + if (!model) { + throw new AnonymousError("repo_not_found", { httpStatus: 404 }); + } - this._data.branches = await this.branches(opt); - model.branches = this._data.branches; + this._data.branches = await this.branches(opt); + model.branches = this._data.branches; - const selected = model.branches.filter((f) => f.name == opt.branch)[0]; - if (selected && (!selected.readme || opt?.force === true)) { - // get the list of repo from github - const oct = octokit(opt.accessToken); - try { - const ghRes = await oct.repos.getReadme({ - owner: this.owner, - repo: this.repo, - ref: selected?.commit, - }); - const readme = Buffer.from( - ghRes.data.content, - ghRes.data.encoding as BufferEncoding - ).toString("utf-8"); - selected.readme = readme; - await model.save(); - } catch (error) { - span.recordException(error as Error); - throw new AnonymousError("readme_not_available", { - httpStatus: 404, - cause: error as Error, - object: this, - }); - } - } - - if (!selected) { + const selected = model.branches.filter((f) => f.name == opt.branch)[0]; + if (selected && (!selected.readme || opt?.force === true)) { + // get the list of repo from github + const oct = octokit(opt.accessToken); + try { + const ghRes = await oct.repos.getReadme({ + owner: this.owner, + repo: this.repo, + ref: selected?.commit, + }); + const readme = Buffer.from( + ghRes.data.content, + ghRes.data.encoding as BufferEncoding + ).toString("utf-8"); + selected.readme = readme; + await model.save(); + } catch (error) { throw new AnonymousError("readme_not_available", { httpStatus: 404, + cause: error as Error, object: this, }); } - - return selected.readme; - } finally { - span.end(); } + + if (!selected) { + throw new AnonymousError("readme_not_available", { + httpStatus: 404, + object: this, + }); + } + + return selected.readme; } public get owner(): string { @@ -235,60 +209,44 @@ export async function getRepositoryFromGitHub(opt: { accessToken: string; force?: boolean; }) { - const span = trace - .getTracer("ano-file") - .startSpan("GHRepository.getRepositoryFromGitHub"); - span.setAttribute("owner", opt.owner); - span.setAttribute("repo", opt.repo); - try { - if (opt.repo.indexOf(".git") > -1) { - opt.repo = opt.repo.replace(".git", ""); - } - let dbModel = null; - if (opt.repositoryID) { - dbModel = isConnected - ? await RepositoryModel.findById(opt.repositoryID) - : null; - opt.owner = dbModel?.name?.split("/")[0] || opt.owner; - opt.repo = dbModel?.name?.split("/")[1] || opt.repo; - } else { - dbModel = isConnected - ? await RepositoryModel.findOne({ - name: opt.owner + "/" + opt.repo, - }) - : null; - } - if (dbModel && !opt.force) { - return new GitHubRepository(dbModel); - } - const oct = octokit(opt.accessToken); - let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"]; - try { - r = ( - await oct.repos.get({ - owner: opt.owner, - repo: opt.repo, + if (opt.repo.indexOf(".git") > -1) { + opt.repo = opt.repo.replace(".git", ""); + } + let dbModel = null; + if (opt.repositoryID) { + dbModel = isConnected + ? await RepositoryModel.findById(opt.repositoryID) + : null; + opt.owner = dbModel?.name?.split("/")[0] || opt.owner; + opt.repo = dbModel?.name?.split("/")[1] || opt.repo; + } else { + dbModel = isConnected + ? await RepositoryModel.findOne({ + name: opt.owner + "/" + opt.repo, }) - ).data; - } catch (error) { - span.recordException(error as Error); - if ( - error instanceof Error && - error.message.includes( - "organization has enabled OAuth App access restrictions" - ) - ) { - throw new AnonymousError("repo_access_limited", { - httpStatus: 403, - object: { - owner: opt.owner, - repo: opt.repo, - }, - cause: error as Error, - }); - } - throw new AnonymousError("repo_not_found", { - httpStatus: (error as any).status, + : null; + } + if (dbModel && !opt.force) { + return new GitHubRepository(dbModel); + } + const oct = octokit(opt.accessToken); + let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"]; + try { + r = ( + await oct.repos.get({ + owner: opt.owner, + repo: opt.repo, + }) + ).data; + } catch (error) { + if ( + error instanceof Error && + error.message.includes( + "organization has enabled OAuth App access restrictions" + ) + ) { + throw new AnonymousError("repo_access_limited", { + httpStatus: 403, object: { owner: opt.owner, repo: opt.repo, @@ -296,32 +254,38 @@ export async function getRepositoryFromGitHub(opt: { cause: error as Error, }); } - if (!r) - throw new AnonymousError("repo_not_found", { - httpStatus: 404, - object: { - owner: opt.owner, - repo: opt.repo, - }, - }); - const model = dbModel || new RepositoryModel({ externalId: "gh_" + r.id }); - model.name = r.full_name; - model.url = r.html_url; - model.size = r.size; - model.defaultBranch = r.default_branch; - model.hasPage = r.has_pages; - if (model.hasPage) { - const ghPageRes = await oct.repos.getPages({ + throw new AnonymousError("repo_not_found", { + httpStatus: (error as any).status, + object: { owner: opt.owner, repo: opt.repo, - }); - model.pageSource = ghPageRes.data.source; - } - if (isConnected) { - await model.save(); - } - return new GitHubRepository(model); - } finally { - span.end(); + }, + cause: error as Error, + }); } + if (!r) + throw new AnonymousError("repo_not_found", { + httpStatus: 404, + object: { + owner: opt.owner, + repo: opt.repo, + }, + }); + const model = dbModel || new RepositoryModel({ externalId: "gh_" + r.id }); + model.name = r.full_name; + model.url = r.html_url; + model.size = r.size; + model.defaultBranch = r.default_branch; + model.hasPage = r.has_pages; + if (model.hasPage) { + const ghPageRes = await oct.repos.getPages({ + owner: opt.owner, + repo: opt.repo, + }); + model.pageSource = ghPageRes.data.source; + } + if (isConnected) { + await model.save(); + } + return new GitHubRepository(model); } diff --git a/src/core/source/GitHubStream.ts b/src/core/source/GitHubStream.ts index 4ee15ae..7dad5fa 100644 --- a/src/core/source/GitHubStream.ts +++ b/src/core/source/GitHubStream.ts @@ -7,7 +7,6 @@ import { basename, dirname } from "path"; import * as stream from "stream"; import AnonymousError from "../AnonymousError"; -import { trace } from "@opentelemetry/api"; import { FILE_TYPE } from "../storage/Storage"; import { octokit } from "../GitHubUtils"; import FileModel from "../model/files/files.model"; @@ -21,8 +20,6 @@ export default class GitHubStream extends GitHubBase { } downloadFile(token: string, sha: string) { - const span = trace.getTracer("ano-file").startSpan("GHStream.downloadFile"); - span.setAttribute("sha", sha); const oct = octokit(token); try { const { url } = oct.rest.git.getBlob.endpoint({ @@ -40,14 +37,11 @@ export default class GitHubStream extends GitHubBase { }); } catch (error) { console.error(error); - // span.recordException(error as Error); throw new AnonymousError("repo_not_accessible", { httpStatus: 404, object: this.data, cause: error as Error, }); - } finally { - span.end(); } } @@ -56,12 +50,6 @@ export default class GitHubStream extends GitHubBase { repoId: string, fileSha: () => Promise | string ) { - const span = trace - .getTracer("ano-file") - .startSpan("GHStream.getFileContent"); - span.setAttribute("repoId", repoId); - span.setAttribute("file", filePath); - const fileInfo = await storage.exists(repoId, filePath); if (fileInfo == FILE_TYPE.FILE) { return storage.read(repoId, filePath); @@ -76,10 +64,6 @@ export default class GitHubStream extends GitHubBase { await fileSha() ); - content.on("close", () => { - span.end(); - }); - // duplicate the stream to write it to the storage const stream1 = content.pipe(new stream.PassThrough()); const stream2 = content.pipe(new stream.PassThrough()); @@ -99,45 +83,30 @@ export default class GitHubStream extends GitHubBase { } async getFileContent(file: AnonymizedFile): Promise { - const span = trace - .getTracer("ano-file") - .startSpan("GHStream.getFileContent"); - span.setAttribute("repoId", file.repository.repoId); - span.setAttribute("file", file.anonymizedPath); try { - try { - void file.filePath; - } catch (_) { - // compute the original path if ambiguous - await file.originalPath(); - } - return this.getFileContentCache( - file.filePath, - file.repository.repoId, - async () => { - const fileSha = await file.sha(); - if (!fileSha) { - throw new AnonymousError("file_not_accessible", { - httpStatus: 404, - object: file, - }); - } - return fileSha; - } - ); - } finally { - span.end(); + void file.filePath; + } catch (_) { + // compute the original path if ambiguous + await file.originalPath(); } + return this.getFileContentCache( + file.filePath, + file.repository.repoId, + async () => { + const fileSha = await file.sha(); + if (!fileSha) { + throw new AnonymousError("file_not_accessible", { + httpStatus: 404, + object: file, + }); + } + return fileSha; + } + ); } async getFiles(progress?: (status: string) => void) { - const span = trace.getTracer("ano-file").startSpan("GHStream.getFiles"); - span.setAttribute("repoId", this.data.repoId); - try { - return this.getTruncatedTree(this.data.commit, progress); - } finally { - span.end(); - } + return this.getTruncatedTree(this.data.commit, progress); } private async getGHTree( @@ -145,25 +114,19 @@ export default class GitHubStream extends GitHubBase { count = { request: 0, file: 0 }, opt = { recursive: true, callback: () => {} } ) { - const span = trace.getTracer("ano-file").startSpan("GHStream.getGHTree"); - span.setAttribute("sha", sha); - try { - const oct = octokit(await this.data.getToken()); - const ghRes = await oct.git.getTree({ - owner: this.data.organization, - repo: this.data.repoName, - tree_sha: sha, - recursive: opt.recursive === true ? "1" : undefined, - }); - count.request++; - count.file += ghRes.data.tree.length; - if (opt.callback) { - opt.callback(); - } - return ghRes.data; - } finally { - span.end(); + const oct = octokit(await this.data.getToken()); + const ghRes = await oct.git.getTree({ + owner: this.data.organization, + repo: this.data.repoName, + tree_sha: sha, + recursive: opt.recursive === true ? "1" : undefined, + }); + count.request++; + count.file += ghRes.data.tree.length; + if (opt.callback) { + opt.callback(); } + return ghRes.data; } private async getTruncatedTree( @@ -175,65 +138,56 @@ export default class GitHubStream extends GitHubBase { request: 0, file: 0, }; - const span = trace - .getTracer("ano-file") - .startSpan("GHStream.getTruncatedTree"); - span.setAttribute("sha", sha); - span.setAttribute("parentPath", parentPath); const output: IFile[] = []; + let data = null; try { - let data = null; - try { - data = await this.getGHTree(sha, count, { - recursive: false, - callback: () => { - if (progress) { - progress("List file: " + count.file); - } - }, - }); - output.push(...this.tree2Tree(data.tree, parentPath)); - } catch (error) { - console.log(error); - if ((error as any).status == 409 || (error as any).status == 404) { - // empty repo - data = { tree: [] }; - } else { - throw new AnonymousError("repo_not_found", { - httpStatus: (error as any).status || 404, - object: this.data, - cause: error as Error, - }); - } - } - const promises: ReturnType[] = []; - const parentPaths: string[] = []; - for (const file of data.tree) { - if (file.type == "tree" && file.path && file.sha) { - const elementPath = path.join(parentPath, file.path); - parentPaths.push(elementPath); - promises.push( - this.getGHTree(file.sha, count, { - recursive: true, - callback: () => { - if (progress) { - progress("List file: " + count.file); - } - }, - }) - ); - } - } - (await Promise.all(promises)).forEach((data, i) => { - if (data.truncated) { - // TODO: the tree is truncated - } - output.push(...this.tree2Tree(data.tree, parentPaths[i])); + data = await this.getGHTree(sha, count, { + recursive: false, + callback: () => { + if (progress) { + progress("List file: " + count.file); + } + }, }); - return output; - } finally { - span.end(); + output.push(...this.tree2Tree(data.tree, parentPath)); + } catch (error) { + console.log(error); + if ((error as any).status == 409 || (error as any).status == 404) { + // empty repo + data = { tree: [] }; + } else { + throw new AnonymousError("repo_not_found", { + httpStatus: (error as any).status || 404, + object: this.data, + cause: error as Error, + }); + } } + const promises: ReturnType[] = []; + const parentPaths: string[] = []; + for (const file of data.tree) { + if (file.type == "tree" && file.path && file.sha) { + const elementPath = path.join(parentPath, file.path); + parentPaths.push(elementPath); + promises.push( + this.getGHTree(file.sha, count, { + recursive: true, + callback: () => { + if (progress) { + progress("List file: " + count.file); + } + }, + }) + ); + } + } + (await Promise.all(promises)).forEach((data, i) => { + if (data.truncated) { + // TODO: the tree is truncated + } + output.push(...this.tree2Tree(data.tree, parentPaths[i])); + }); + return output; } private tree2Tree( @@ -247,25 +201,19 @@ export default class GitHubStream extends GitHubBase { }[], parentPath: string = "" ) { - const span = trace.getTracer("ano-file").startSpan("GHStream.tree2Tree"); - span.setAttribute("parentPath", parentPath); - try { - return tree.map((elem) => { - const fullPath = path.join(parentPath, elem.path || ""); - let pathFile = dirname(fullPath); - if (pathFile === ".") { - pathFile = ""; - } - return new FileModel({ - name: basename(fullPath), - path: pathFile, - repoId: this.data.repoId, - size: elem.size, - sha: elem.sha, - }); + return tree.map((elem) => { + const fullPath = path.join(parentPath, elem.path || ""); + let pathFile = dirname(fullPath); + if (pathFile === ".") { + pathFile = ""; + } + return new FileModel({ + name: basename(fullPath), + path: pathFile, + repoId: this.data.repoId, + size: elem.size, + sha: elem.sha, }); - } finally { - span.end(); - } + }); } } diff --git a/src/core/storage/FileSystem.ts b/src/core/storage/FileSystem.ts index eb36f7e..dd28d18 100644 --- a/src/core/storage/FileSystem.ts +++ b/src/core/storage/FileSystem.ts @@ -7,7 +7,6 @@ import { Readable, pipeline, Transform } from "stream"; import * as archiver from "archiver"; import { promisify } from "util"; import { lookup } from "mime-types"; -import { trace } from "@opentelemetry/api"; import StorageBase, { FILE_TYPE } from "./Storage"; import FileModel from "../model/files/files.model"; import { IFile } from "../model/files/files.types"; @@ -22,37 +21,20 @@ export default class FileSystem extends StorageBase { /** @override */ async exists(repoId: string, p: string = ""): Promise { const fullPath = join(config.FOLDER, this.repoPath(repoId), p); - return trace - .getTracer("ano-file") - .startActiveSpan("fs.exists", async (span) => { - span.setAttribute("path", p); - span.setAttribute("full-path", fullPath); - try { - const stat = await fs.promises.stat(fullPath); - if (stat.isDirectory()) return FILE_TYPE.FOLDER; - if (stat.isFile()) return FILE_TYPE.FILE; - } catch (_) { - // ignore file not found or not downloaded - } - span.end(); - return FILE_TYPE.NOT_FOUND; - }); + try { + const stat = await fs.promises.stat(fullPath); + if (stat.isDirectory()) return FILE_TYPE.FOLDER; + if (stat.isFile()) return FILE_TYPE.FILE; + } catch (_) { + // ignore file not found or not downloaded + } + return FILE_TYPE.NOT_FOUND; } /** @override */ async send(repoId: string, p: string, res: Response) { const fullPath = join(config.FOLDER, this.repoPath(repoId), p); - return trace - .getTracer("ano-file") - .startActiveSpan("fs.send", async (span) => { - span.setAttribute("path", fullPath); - res.sendFile(fullPath, { dotfiles: "allow" }, (err) => { - if (err) { - span.recordException(err); - } - span.end(); - }); - }); + res.sendFile(fullPath, { dotfiles: "allow" }); } /** @override */ @@ -79,9 +61,7 @@ export default class FileSystem extends StorageBase { p: string, data: string | Readable ): Promise { - const span = trace.getTracer("ano-file").startSpan("fs.write"); const fullPath = join(config.FOLDER, this.repoPath(repoId), p); - span.setAttribute("path", fullPath); try { await this.mk(repoId, dirname(p)); if (data instanceof Readable) { @@ -91,32 +71,21 @@ export default class FileSystem extends StorageBase { } return await fs.promises.writeFile(fullPath, data, "utf-8"); } catch (err: any) { - span.recordException(err); // throw err; - } finally { - span.end(); } } /** @override */ async rm(repoId: string, dir: string = ""): Promise { - const span = trace.getTracer("ano-file").startSpan("fs.rm"); const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); - span.setAttribute("path", fullPath); - try { - await fs.promises.rm(fullPath, { - force: true, - recursive: true, - }); - } finally { - span.end(); - } + await fs.promises.rm(fullPath, { + force: true, + recursive: true, + }); } /** @override */ async mk(repoId: string, dir: string = ""): Promise { - const span = trace.getTracer("ano-file").startSpan("fs.mk"); - span.setAttribute("path", dir); const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); try { await fs.promises.mkdir(fullPath, { @@ -124,11 +93,8 @@ export default class FileSystem extends StorageBase { }); } catch (err: any) { if (err.code !== "EEXIST") { - span.recordException(err); throw err; } - } finally { - span.end(); } } @@ -140,46 +106,40 @@ export default class FileSystem extends StorageBase { onEntry?: (file: { path: string; size: number }) => void; } = {} ): Promise { - return trace - .getTracer("ano-file") - .startActiveSpan("fs.listFiles", async (span) => { - span.setAttribute("path", dir); - const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); - const files = await fs.promises.readdir(fullPath); - const output2: IFile[] = []; - for (const file of files) { - const filePath = join(fullPath, file); - try { - const stats = await fs.promises.stat(filePath); - if (stats.isDirectory()) { - output2.push(new FileModel({ name: file, path: dir, repoId })); - output2.push( - ...(await this.listFiles(repoId, join(dir, file), opt)) - ); - } else if (stats.isFile()) { - if (opt.onEntry) { - opt.onEntry({ - path: join(dir, file), - size: stats.size, - }); - } - output2.push( - new FileModel({ - name: file, - path: dir, - repoId: repoId, - size: stats.size, - sha: stats.ino.toString(), - }) - ); - } - } catch (error) { - span.recordException(error as Error); + const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); + const files = await fs.promises.readdir(fullPath); + const output2: IFile[] = []; + for (const file of files) { + const filePath = join(fullPath, file); + try { + const stats = await fs.promises.stat(filePath); + if (stats.isDirectory()) { + output2.push(new FileModel({ name: file, path: dir, repoId })); + output2.push( + ...(await this.listFiles(repoId, join(dir, file), opt)) + ); + } else if (stats.isFile()) { + if (opt.onEntry) { + opt.onEntry({ + path: join(dir, file), + size: stats.size, + }); } + output2.push( + new FileModel({ + name: file, + path: dir, + repoId: repoId, + size: stats.size, + sha: stats.ino.toString(), + }) + ); } - span.end(); - return output2; - }); + } catch (error) { + // ignore stat errors for individual files + } + } + return output2; } /** @override */ diff --git a/src/core/storage/S3.ts b/src/core/storage/S3.ts index c401f77..5cfd20a 100644 --- a/src/core/storage/S3.ts +++ b/src/core/storage/S3.ts @@ -12,7 +12,6 @@ import ArchiveStreamToS3 from "decompress-stream-to-s3"; import { Response } from "express"; import { lookup } from "mime-types"; import * as archiver from "archiver"; -import { trace } from "@opentelemetry/api"; import { dirname, basename, join } from "path"; import AnonymousError from "../AnonymousError"; import StorageBase, { FILE_TYPE } from "./Storage"; @@ -51,27 +50,21 @@ export default class S3Storage extends StorageBase { /** @override */ async exists(repoId: string, path: string = ""): Promise { - const span = trace.getTracer("ano-file").startSpan("s3.exists"); - span.setAttribute("path", path); + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - try { - // if we can get the file info, it is a file - await this.fileInfo(repoId, path); - return FILE_TYPE.FILE; - } catch (err) { - // check if it is a directory - const data = await this.client().listObjectsV2({ - Bucket: config.S3_BUCKET, - Prefix: join(this.repoPath(repoId), path), - MaxKeys: 1, - }); - return (data.Contents?.length || 0) > 0 - ? FILE_TYPE.FOLDER - : FILE_TYPE.NOT_FOUND; - } - } finally { - span.end(); + // if we can get the file info, it is a file + await this.fileInfo(repoId, path); + return FILE_TYPE.FILE; + } catch (err) { + // check if it is a directory + const data = await this.client().listObjectsV2({ + Bucket: config.S3_BUCKET, + Prefix: join(this.repoPath(repoId), path), + MaxKeys: 1, + }); + return (data.Contents?.length || 0) > 0 + ? FILE_TYPE.FOLDER + : FILE_TYPE.NOT_FOUND; } } @@ -82,126 +75,97 @@ export default class S3Storage extends StorageBase { /** @override */ async rm(repoId: string, dir: string = ""): Promise { - const span = trace.getTracer("ano-file").startSpan("s3.rm"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", dir); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - const data = await this.client(200000).listObjectsV2({ - Bucket: config.S3_BUCKET, - Prefix: join(this.repoPath(repoId), dir), - MaxKeys: 100, - }); + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + const data = await this.client(200000).listObjectsV2({ + Bucket: config.S3_BUCKET, + Prefix: join(this.repoPath(repoId), dir), + MaxKeys: 100, + }); - const params = { - Bucket: config.S3_BUCKET, - Delete: { Objects: new Array<{ Key: string }>() }, - }; + const params = { + Bucket: config.S3_BUCKET, + Delete: { Objects: new Array<{ Key: string }>() }, + }; - data.Contents?.forEach(function (content) { - if (content.Key) { - params.Delete.Objects.push({ Key: content.Key }); - } - }); - - if (params.Delete.Objects.length == 0) { - // nothing to remove - return; + data.Contents?.forEach(function (content) { + if (content.Key) { + params.Delete.Objects.push({ Key: content.Key }); } - await this.client(200000).deleteObjects(params); + }); - if (data.IsTruncated) { - await this.rm(repoId, dir); - } - } finally { - span.end(); + if (params.Delete.Objects.length == 0) { + // nothing to remove + return; + } + await this.client(200000).deleteObjects(params); + + if (data.IsTruncated) { + await this.rm(repoId, dir); } } /** @override */ async send(repoId: string, path: string, res: Response) { - const span = trace.getTracer("ano-file").startSpan("s3.send"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", path); + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - try { - const command = new GetObjectCommand({ - Bucket: config.S3_BUCKET, - Key: join(this.repoPath(repoId), path), - }); - const s = await this.client().send(command); - res.status(200); - if (s.ContentType) { - res.contentType(s.ContentType); - } - if (s.ContentLength) { - res.set("Content-Length", s.ContentLength.toString()); - } - if (s.Body) { - (s.Body as Readable)?.pipe(res); - } else { - res.end(); - } - } catch (error) { - span.recordException(error as Error); - try { - res.status(500); - } catch (err) { - console.error(`[ERROR] S3 send ${path}`, err); - } - } - } finally { - span.end(); - } - } - - async fileInfo(repoId: string, path: string) { - const span = trace.getTracer("ano-file").startSpan("s3.fileInfo"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", path); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - const info = await this.client(3000).headObject({ - Bucket: config.S3_BUCKET, - Key: join(this.repoPath(repoId), path), - }); - return { - size: info.ContentLength, - lastModified: info.LastModified, - contentType: info.ContentType - ? info.ContentType - : (lookup(path) as string), - }; - } finally { - span.end(); - } - } - - /** @override */ - async read(repoId: string, path: string): Promise { - const span = trace.getTracer("ano-file").startSpan("s3.rreadm"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", path); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); const command = new GetObjectCommand({ Bucket: config.S3_BUCKET, Key: join(this.repoPath(repoId), path), }); - const res = (await this.client(3000).send(command)).Body; - if (!res) { - throw new AnonymousError("file_not_found", { - httpStatus: 404, - object: join(this.repoPath(repoId), path), - }); + const s = await this.client().send(command); + res.status(200); + if (s.ContentType) { + res.contentType(s.ContentType); + } + if (s.ContentLength) { + res.set("Content-Length", s.ContentLength.toString()); + } + if (s.Body) { + (s.Body as Readable)?.pipe(res); + } else { + res.end(); + } + } catch (error) { + try { + res.status(500); + } catch (err) { + console.error(`[ERROR] S3 send ${path}`, err); } - return res as Readable; - } finally { - span.end(); } } + async fileInfo(repoId: string, path: string) { + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + const info = await this.client(3000).headObject({ + Bucket: config.S3_BUCKET, + Key: join(this.repoPath(repoId), path), + }); + return { + size: info.ContentLength, + lastModified: info.LastModified, + contentType: info.ContentType + ? info.ContentType + : (lookup(path) as string), + }; + } + + /** @override */ + async read(repoId: string, path: string): Promise { + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + const command = new GetObjectCommand({ + Bucket: config.S3_BUCKET, + Key: join(this.repoPath(repoId), path), + }); + const res = (await this.client(3000).send(command)).Body; + if (!res) { + throw new AnonymousError("file_not_found", { + httpStatus: 404, + object: join(this.repoPath(repoId), path), + }); + } + return res as Readable; + } + /** @override */ async write( repoId: string, @@ -209,80 +173,66 @@ export default class S3Storage extends StorageBase { data: string | Readable, source?: string ): Promise { - const span = trace.getTracer("ano-file").startSpan("s3.rm"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", path); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - if (data instanceof Readable) { - data.on("error", (err) => { - console.error(`[ERROR] S3 write ${path}`, err); - span.recordException(err as Error); - this.rm(repoId, path); - }); - } - - const params: PutObjectCommandInput = { - Bucket: config.S3_BUCKET, - Key: join(this.repoPath(repoId), path), - Body: data, - ContentType: lookup(path).toString(), - }; - if (source) { - params.Tagging = `source=${source}`; - } - - const parallelUploads3 = new Upload({ - // 30s timeout - client: this.client(30000), - params, + if (data instanceof Readable) { + data.on("error", (err) => { + console.error(`[ERROR] S3 write ${path}`, err); + this.rm(repoId, path); }); - - await parallelUploads3.done(); - } finally { - span.end(); } + + const params: PutObjectCommandInput = { + Bucket: config.S3_BUCKET, + Key: join(this.repoPath(repoId), path), + Body: data, + ContentType: lookup(path).toString(), + }; + if (source) { + params.Tagging = `source=${source}`; + } + + const parallelUploads3 = new Upload({ + // 30s timeout + client: this.client(30000), + params, + }); + + await parallelUploads3.done(); } /** @override */ async listFiles(repoId: string, dir: string = ""): Promise { - const span = trace.getTracer("ano-file").startSpan("s3.listFiles"); - span.setAttribute("path", dir); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - if (dir && dir[dir.length - 1] != "/") dir = dir + "/"; - const out: IFile[] = []; - let req: ListObjectsV2CommandOutput; - let nextContinuationToken: string | undefined; - do { - req = await this.client(30000).listObjectsV2({ - Bucket: config.S3_BUCKET, - Prefix: join(this.repoPath(repoId), dir), - MaxKeys: 250, - ContinuationToken: nextContinuationToken, - }); - if (!req.Contents) return out; - nextContinuationToken = req.NextContinuationToken; + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + if (dir && dir[dir.length - 1] != "/") dir = dir + "/"; + const out: IFile[] = []; + let req: ListObjectsV2CommandOutput; + let nextContinuationToken: string | undefined; + do { + req = await this.client(30000).listObjectsV2({ + Bucket: config.S3_BUCKET, + Prefix: join(this.repoPath(repoId), dir), + MaxKeys: 250, + ContinuationToken: nextContinuationToken, + }); + if (!req.Contents) return out; + nextContinuationToken = req.NextContinuationToken; - for (const f of req.Contents) { - if (!f.Key) continue; - f.Key = f.Key.replace(join(this.repoPath(repoId), dir), ""); - out.push( - new FileModel({ - name: basename(f.Key), - path: dirname(f.Key), - repoId, - size: f.Size, - sha: f.ETag, - }) - ); - } - } while (req && req.Contents && req.IsTruncated); - return out; - } finally { - span.end(); - } + for (const f of req.Contents) { + if (!f.Key) continue; + f.Key = f.Key.replace(join(this.repoPath(repoId), dir), ""); + out.push( + new FileModel({ + name: basename(f.Key), + path: dirname(f.Key), + repoId, + size: f.Size, + sha: f.ETag, + }) + ); + } + } while (req && req.Contents && req.IsTruncated); + return out; } /** @override */ @@ -293,8 +243,6 @@ export default class S3Storage extends StorageBase { source?: string ): Promise { let toS3: ArchiveStreamToS3; - const span = trace.getTracer("ano-file").startSpan("s3.extractZip"); - span.setAttribute("path", path); return new Promise((resolve, reject) => { if (!config.S3_BUCKET) return reject("S3_BUCKET not set"); toS3 = new ArchiveStreamToS3({ @@ -315,14 +263,11 @@ export default class S3Storage extends StorageBase { }); pipeline(data, toS3, (err) => { if (err) { - span.recordException(err as Error); return reject(err); } - span.end(); resolve(); }) .on("finish", () => { - span.end(); resolve(); }) .on("error", reject); @@ -338,48 +283,41 @@ export default class S3Storage extends StorageBase { fileTransformer?: (p: string) => Transform; } ) { - const span = trace.getTracer("ano-file").startSpan("s3.archive"); - span.setAttribute("repoId", repoId); - span.setAttribute("path", dir); - try { - if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); - const archive = archiver(opt?.format || "zip", {}); - if (dir && dir[dir.length - 1] != "/") dir = dir + "/"; + if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); + const archive = archiver(opt?.format || "zip", {}); + if (dir && dir[dir.length - 1] != "/") dir = dir + "/"; - let req: ListObjectsV2CommandOutput; - let nextContinuationToken: string | undefined; - do { - req = await this.client(30000).listObjectsV2({ - Bucket: config.S3_BUCKET, - Prefix: join(this.repoPath(repoId), dir), - MaxKeys: 250, - ContinuationToken: nextContinuationToken, - }); + let req: ListObjectsV2CommandOutput; + let nextContinuationToken: string | undefined; + do { + req = await this.client(30000).listObjectsV2({ + Bucket: config.S3_BUCKET, + Prefix: join(this.repoPath(repoId), dir), + MaxKeys: 250, + ContinuationToken: nextContinuationToken, + }); - nextContinuationToken = req.NextContinuationToken; - for (const f of req.Contents || []) { - if (!f.Key) continue; - const filename = basename(f.Key); - const prefix = dirname( - f.Key.replace(join(this.repoPath(repoId), dir), "") - ); + nextContinuationToken = req.NextContinuationToken; + for (const f of req.Contents || []) { + if (!f.Key) continue; + const filename = basename(f.Key); + const prefix = dirname( + f.Key.replace(join(this.repoPath(repoId), dir), "") + ); - let rs = await this.read(repoId, f.Key); - if (opt?.fileTransformer) { - // apply transformation on the stream - rs = rs.pipe(opt.fileTransformer(f.Key)); - } - - archive.append(rs, { - name: filename, - prefix, - }); + let rs = await this.read(repoId, f.Key); + if (opt?.fileTransformer) { + // apply transformation on the stream + rs = rs.pipe(opt.fileTransformer(f.Key)); } - } while (req && req.Contents?.length && req.IsTruncated); - archive.finalize(); - return archive; - } finally { - span.end(); - } + + archive.append(rs, { + name: filename, + prefix, + }); + } + } while (req && req.Contents?.length && req.IsTruncated); + archive.finalize(); + return archive; } } diff --git a/src/queue/processes/downloadRepository.ts b/src/queue/processes/downloadRepository.ts index 4cd7b9d..94135b2 100644 --- a/src/queue/processes/downloadRepository.ts +++ b/src/queue/processes/downloadRepository.ts @@ -1,4 +1,3 @@ -import { Exception, trace } from "@opentelemetry/api"; import { SandboxedJob } from "bullmq"; import { config } from "dotenv"; config(); @@ -14,8 +13,6 @@ export default async function (job: SandboxedJob) { connect: () => Promise; getRepository: typeof getRepositoryImport; } = require("../../server/database"); - const span = trace.getTracer("ano-file").startSpan("proc.downloadRepository"); - span.setAttribute("repoId", job.data.repoId); console.log(`[QUEUE] ${job.data.repoId} is going to be downloaded`); let statusInterval: any = null; await connect(); @@ -58,17 +55,14 @@ export default async function (job: SandboxedJob) { } catch (error) { updateProgress({ status: "error" }); if (error instanceof Error) { - span.recordException(error as Exception); await repo.updateStatus(RepositoryStatus.ERROR, error.message); } else if (typeof error === "string") { await repo.updateStatus(RepositoryStatus.ERROR, error); - span.recordException(error); } throw error; } } catch (error: any) { clearInterval(statusInterval); - span.recordException(error as Exception); console.log(`[QUEUE] ${job.data.repoId} is finished with an error`, error); setTimeout(async () => { // delay to avoid double saving @@ -78,6 +72,5 @@ export default async function (job: SandboxedJob) { }, 400); } finally { clearInterval(statusInterval); - span.end(); } } diff --git a/src/queue/processes/removeCache.ts b/src/queue/processes/removeCache.ts index 3cebf75..b37b4bb 100644 --- a/src/queue/processes/removeCache.ts +++ b/src/queue/processes/removeCache.ts @@ -1,4 +1,3 @@ -import { Exception, trace } from "@opentelemetry/api"; import { SandboxedJob } from "bullmq"; import Repository from "../../core/Repository"; import { getRepository as getRepositoryImport } from "../../server/database"; @@ -11,24 +10,16 @@ export default async function (job: SandboxedJob) { connect: () => Promise; getRepository: typeof getRepositoryImport; } = require("../../server/database"); - const span = trace.getTracer("ano-file").startSpan("proc.removeCache"); - span.setAttribute("repoId", job.data.repoId); try { await connect(); console.log( `[QUEUE] Cache of ${job.data.repoId} is going to be removed...` ); const repo = await getRepository(job.data.repoId); - try { - await repo.removeCache(); - } catch (error) { - span.recordException(error as Exception); - throw error; - } + await repo.removeCache(); } catch (error) { - span.recordException(error as Exception); + // error already handled } finally { console.log(`[QUEUE] Cache of ${job.data.repoId} is removed.`); - span.end(); } } diff --git a/src/queue/processes/removeRepository.ts b/src/queue/processes/removeRepository.ts index 7041167..d9c9e7c 100644 --- a/src/queue/processes/removeRepository.ts +++ b/src/queue/processes/removeRepository.ts @@ -1,4 +1,3 @@ -import { trace } from "@opentelemetry/api"; import { SandboxedJob } from "bullmq"; import Repository from "../../core/Repository"; import { getRepository as getRepositoryImport } from "../../server/database"; @@ -12,8 +11,6 @@ export default async function (job: SandboxedJob) { connect: () => Promise; getRepository: typeof getRepositoryImport; } = require("../../server/database"); - const span = trace.getTracer("ano-file").startSpan("proc.removeRepository"); - span.setAttribute("repoId", job.data.repoId); try { await connect(); console.log(`[QUEUE] ${job.data.repoId} is going to be removed`); @@ -27,13 +24,11 @@ export default async function (job: SandboxedJob) { } else if (typeof error === "string") { await repo.updateStatus(RepositoryStatus.ERROR, error); } - span.recordException(error as Error); throw error; } } catch (error) { - span.recordException(error as Error); + // error already handled } finally { console.log(`[QUEUE] ${job.data.repoId} is removed`); - span.end(); } }