Remove OpenTelemetry tracing infrastructure (#662)

This commit is contained in:
Thomas Durieux
2026-04-15 04:39:08 +02:00
committed by GitHub
parent c6d6806d7a
commit 655ae92c4c
19 changed files with 848 additions and 1366 deletions
+1 -2
View File
@@ -16,6 +16,5 @@ COPY public ./public
COPY src ./src COPY src ./src
RUN npm install && npm run build && npm cache clean --force RUN npm install && npm run build && npm cache clean --force
COPY opentelemetry.js .
CMD [ "node", "--require", "./opentelemetry.js", "./build/server/index.js"] CMD [ "node", "./build/server/index.js"]
+1 -25
View File
@@ -37,7 +37,7 @@ services:
mode: replicated mode: replicated
replicas: 4 replicas: 4
endpoint_mode: dnsrr endpoint_mode: dnsrr
entrypoint: ["node", "--require", "./opentelemetry.js", "./build/streamer/index.js"] entrypoint: ["node", "./build/streamer/index.js"]
env_file: env_file:
- ./.env - ./.env
volumes: volumes:
@@ -89,30 +89,6 @@ services:
timeout: 10s timeout: 10s
retries: 5 retries: 5
opentelemetry:
image: otel/opentelemetry-collector
restart: always
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./opentelemetry-collector.yml:/etc/otel-collector-config.yaml
depends_on:
- jaeger
- prometheus
jaeger:
image: jaegertracing/all-in-one:latest
restart: always
ports:
- 127.0.0.1:16686:16686
prometheus:
image: prom/prometheus:latest
restart: always
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- 127.0.0.1:9090:9090
mongodb-backup: mongodb-backup:
image: tiredofit/db-backup image: tiredofit/db-backup
links: links:
-40
View File
@@ -1,40 +0,0 @@
receivers:
otlp:
protocols:
grpc:
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
debug:
otlp:
endpoint: jaeger:4317
tls:
insecure: true
processors:
batch:
extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp]
exporters: [debug, otlp]
metrics:
receivers: [otlp]
exporters: [debug, prometheus]
logs:
receivers: [otlp]
exporters: [debug]
-29
View File
@@ -1,29 +0,0 @@
const opentelemetry = require("@opentelemetry/sdk-node");
const {
getNodeAutoInstrumentations,
} = require("@opentelemetry/auto-instrumentations-node");
const {
OTLPTraceExporter,
} = require("@opentelemetry/exporter-trace-otlp-grpc");
const {
OTLPMetricExporter,
} = require("@opentelemetry/exporter-metrics-otlp-grpc");
const { PeriodicExportingMetricReader } = require("@opentelemetry/sdk-metrics");
const { diag, DiagConsoleLogger, DiagLogLevel } = require("@opentelemetry/api");
// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO);
const sdk = new opentelemetry.NodeSDK({
serviceName: process.env.SERVICE_NAME || "Anonymous-GitHub",
logRecordProcessor: getNodeAutoInstrumentations().logRecordProcessor,
traceExporter: new OTLPTraceExporter({
url: "http://opentelemetry:4317/v1/traces",
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: "http://opentelemetry:4317/v1/metrics",
}),
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();
-9
View File
@@ -35,15 +35,6 @@
"@aws-sdk/lib-storage": "^3.540.0", "@aws-sdk/lib-storage": "^3.540.0",
"@mongodb-js/zstd": "^1.2.0", "@mongodb-js/zstd": "^1.2.0",
"@octokit/rest": "^20.0.2", "@octokit/rest": "^20.0.2",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/auto-instrumentations-node": "^0.43.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.49.1",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.49.1",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.49.1",
"@opentelemetry/exporter-trace-otlp-proto": "^0.49.1",
"@opentelemetry/sdk-metrics": "^1.22.0",
"@opentelemetry/sdk-node": "^0.49.1",
"@opentelemetry/sdk-trace-node": "^1.22.0",
"@smithy/node-http-handler": "^2.5.0", "@smithy/node-http-handler": "^2.5.0",
"archiver": "^5.3.2", "archiver": "^5.3.2",
"bullmq": "^2.4.0", "bullmq": "^2.4.0",
-6
View File
@@ -1,6 +0,0 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ['opentelemetry:8889']
- targets: ['opentelemetry:8888']
+160 -227
View File
@@ -1,7 +1,6 @@
import { join, basename, dirname } from "path"; import { join, basename, dirname } from "path";
import { Response } from "express"; import { Response } from "express";
import { Readable } from "stream"; import { Readable } from "stream";
import { trace } from "@opentelemetry/api";
import { lookup } from "mime-types"; import { lookup } from "mime-types";
import got from "got"; import got from "got";
@@ -35,96 +34,76 @@ export default class AnonymizedFile {
} }
async sha() { async sha() {
return trace if (this._file) return this._file.sha?.replace(/"/g, "");
.getTracer("ano-file") this._file = await this.getFileInfo();
.startActiveSpan("AnnoFile.sha", async (span) => { return this._file.sha?.replace(/"/g, "");
try {
span.setAttribute("anonymizedPath", this.anonymizedPath);
if (this._file) return this._file.sha?.replace(/"/g, "");
this._file = await this.getFileInfo();
return this._file.sha?.replace(/"/g, "");
} finally {
span.end();
}
});
} }
async getFileInfo(): Promise<IFile> { async getFileInfo(): Promise<IFile> {
const span = trace.getTracer("ano-file").startSpan("AnnoFile.getFileInfo"); if (this._file) return this._file;
span.setAttribute("repoId", this.repository.repoId); let fileDir = dirname(this.anonymizedPath);
span.setAttribute("file", this.anonymizedPath); if (fileDir == ".") fileDir = "";
if (fileDir.endsWith("/")) fileDir = fileDir.slice(0, -1);
const filename = basename(this.anonymizedPath);
try { if (!this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
if (this._file) return this._file; if (this.anonymizedPath == "") {
let fileDir = dirname(this.anonymizedPath); return {
if (fileDir == ".") fileDir = ""; name: "",
if (fileDir.endsWith("/")) fileDir = fileDir.slice(0, -1); path: "",
const filename = basename(this.anonymizedPath);
if (!this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
if (this.anonymizedPath == "") {
return {
name: "",
path: "",
repoId: this.repository.repoId,
};
}
const query: FilterQuery<IFile> = {
repoId: this.repository.repoId, repoId: this.repository.repoId,
path: fileDir,
}; };
if (filename != "") query.name = filename;
const res = await FileModel.findOne(query);
if (res) {
this._file = res;
return res;
}
throw new AnonymousError("file_not_found", {
object: this,
httpStatus: 404,
});
} }
const query: FilterQuery<IFile> = {
const pathQuery = fileDir
.split("/")
.map((p) => {
if (p.includes(config.ANONYMIZATION_MASK)) {
return "[^/]+";
}
return p;
})
.join("/");
const nameQuery = filename.replace(
new RegExp(config.ANONYMIZATION_MASK + "(-[0-9]+)?"),
"[^/]+"
);
const candidates = await FileModel.find({
repoId: this.repository.repoId, repoId: this.repository.repoId,
path: new RegExp(pathQuery), path: fileDir,
name: new RegExp(nameQuery), };
}).exec(); if (filename != "") query.name = filename;
const res = await FileModel.findOne(query);
for (const candidate of candidates) { if (res) {
const candidatePath = join(candidate.path, candidate.name); this._file = res;
if ( return res;
anonymizePath(candidatePath, this.repository.options.terms || []) ==
this.anonymizedPath
) {
this._file = candidate;
return candidate;
}
} }
throw new AnonymousError("file_not_found", { throw new AnonymousError("file_not_found", {
object: this, object: this,
httpStatus: 404, httpStatus: 404,
}); });
} catch (error) {
span.recordException(error as Error);
throw error;
} finally {
span.end();
} }
const pathQuery = fileDir
.split("/")
.map((p) => {
if (p.includes(config.ANONYMIZATION_MASK)) {
return "[^/]+";
}
return p;
})
.join("/");
const nameQuery = filename.replace(
new RegExp(config.ANONYMIZATION_MASK + "(-[0-9]+)?"),
"[^/]+"
);
const candidates = await FileModel.find({
repoId: this.repository.repoId,
path: new RegExp(pathQuery),
name: new RegExp(nameQuery),
}).exec();
for (const candidate of candidates) {
const candidatePath = join(candidate.path, candidate.name);
if (
anonymizePath(candidatePath, this.repository.options.terms || []) ==
this.anonymizedPath
) {
this._file = candidate;
return candidate;
}
}
throw new AnonymousError("file_not_found", {
object: this,
httpStatus: 404,
});
} }
/** /**
@@ -133,24 +112,16 @@ export default class AnonymizedFile {
* @returns the origin relative path of the file * @returns the origin relative path of the file
*/ */
async originalPath(): Promise<string> { async originalPath(): Promise<string> {
const span = trace.getTracer("ano-file").startSpan("AnnoFile.originalPath"); if (this.anonymizedPath == null) {
span.setAttribute("repoId", this.repository.repoId); throw new AnonymousError("path_not_specified", {
span.setAttribute("file", this.anonymizedPath); object: this,
try { httpStatus: 400,
span.setAttribute("anonymizedPath", this.anonymizedPath); });
if (this.anonymizedPath == null) {
throw new AnonymousError("path_not_specified", {
object: this,
httpStatus: 400,
});
}
if (!this._file) {
this._file = await this.getFileInfo();
}
return join(this._file.path, this._file.name);
} finally {
span.end();
} }
if (!this._file) {
this._file = await this.getFileInfo();
}
return join(this._file.path, this._file.name);
} }
extension() { extension() {
const filename = basename(this._file?.name || this.anonymizedPath); const filename = basename(this._file?.name || this.anonymizedPath);
@@ -188,59 +159,39 @@ export default class AnonymizedFile {
} }
async content(): Promise<Readable> { async content(): Promise<Readable> {
return trace if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
.getTracer("ano-file") await this.originalPath();
.startActiveSpan("content", async (span) => { }
try { if (this._file?.size && this._file?.size > config.MAX_FILE_SIZE) {
if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) { throw new AnonymousError("file_too_big", {
await this.originalPath(); object: this,
} httpStatus: 403,
span.addEvent("filePath", { originalPath: this.filePath });
if (this._file?.size && this._file?.size > config.MAX_FILE_SIZE) {
throw new AnonymousError("file_too_big", {
object: this,
httpStatus: 403,
});
}
const content = await this.repository.source?.getFileContent(this);
if (
!this.repository.model.isReseted ||
this.repository.status != RepositoryStatus.READY
) {
this.repository.model.isReseted = false;
await this.repository.updateStatus(RepositoryStatus.READY);
}
return content;
} finally {
span.end();
}
}); });
}
const content = await this.repository.source?.getFileContent(this);
if (
!this.repository.model.isReseted ||
this.repository.status != RepositoryStatus.READY
) {
this.repository.model.isReseted = false;
await this.repository.updateStatus(RepositoryStatus.READY);
}
return content;
} }
async anonymizedContent() { async anonymizedContent() {
const span = trace.getTracer("ano-file").startSpan("Repository.conference");
span.setAttribute("anonymizedPath", this.anonymizedPath);
const anonymizer = this.repository.generateAnonymizeTransformer( const anonymizer = this.repository.generateAnonymizeTransformer(
this.anonymizedPath this.anonymizedPath
); );
if (!config.STREAMER_ENTRYPOINT) { if (!config.STREAMER_ENTRYPOINT) {
// collect the content locally // collect the content locally
const content = await this.content(); const content = await this.content();
return content.pipe(anonymizer).on("close", () => { return content.pipe(anonymizer);
span.end();
});
} }
// const cacheableLookup = new CacheableLookup();
// const hostName = new URL(config.STREAMER_ENTRYPOINT).hostname;
// const ipHost = await cacheableLookup.lookupAsync(hostName);
// use the streamer service // use the streamer service
return got.stream(join(config.STREAMER_ENTRYPOINT, "api"), { return got.stream(join(config.STREAMER_ENTRYPOINT, "api"), {
method: "POST", method: "POST",
// lookup: cacheableLookup.lookup,
// host: ipHost.address,
// dnsCache: cacheableLookup,
json: { json: {
token: await this.repository.getToken(), token: await this.repository.getToken(),
repoFullName: this.repository.model.source.repositoryName, repoFullName: this.repository.model.source.repositoryName,
@@ -268,107 +219,89 @@ export default class AnonymizedFile {
return join(this._file.path, this._file.name); return join(this._file.path, this._file.name);
} }
// cacheableLookup = new CacheableLookup({
// maxTtl: 60,
// });
async send(res: Response): Promise<void> { async send(res: Response): Promise<void> {
const anonymizer = this.repository.generateAnonymizeTransformer( const anonymizer = this.repository.generateAnonymizeTransformer(
this.anonymizedPath this.anonymizedPath
); );
return trace // eslint-disable-next-line no-async-promise-executor
.getTracer("ano-file") return new Promise<void>(async (resolve, reject) => {
.startActiveSpan("AnonymizedFile.send", async (span) => { try {
span.setAttribute("repoId", this.repository.repoId); if (config.STREAMER_ENTRYPOINT) {
span.setAttribute("anonymizedPath", this.anonymizedPath); // use the streamer service
// eslint-disable-next-line no-async-promise-executor const [sha, token] = await Promise.all([
return new Promise<void>(async (resolve, reject) => { this.sha(),
try { this.repository.getToken(),
if (config.STREAMER_ENTRYPOINT) { ]);
// use the streamer service const resStream = got
const [sha, token] = await Promise.all([ .stream(join(config.STREAMER_ENTRYPOINT, "api"), {
this.sha(), method: "POST",
this.repository.getToken(), json: {
]); sha,
const resStream = got token,
.stream(join(config.STREAMER_ENTRYPOINT, "api"), { repoFullName: this.repository.model.source.repositoryName,
method: "POST", commit: this.repository.model.source.commit,
json: { branch: this.repository.model.source.branch,
sha, repoId: this.repository.repoId,
token, filePath: this.filePath,
repoFullName: this.repository.model.source.repositoryName, anonymizerOptions: anonymizer.opt,
commit: this.repository.model.source.commit, },
branch: this.repository.model.source.branch, })
repoId: this.repository.repoId, .on("error", (err) => {
filePath: this.filePath, handleError(
anonymizerOptions: anonymizer.opt, new AnonymousError("file_not_found", {
}, object: this,
}) httpStatus: 404,
.on("error", (err) => { }),
span.recordException(err); res
handleError( );
new AnonymousError("file_not_found", {
object: this,
httpStatus: 404,
}),
res
);
});
resStream.pipe(res);
res.on("close", () => {
span.end();
resolve();
});
res.on("error", (err) => {
reject(err);
span.recordException(err);
span.end();
});
return;
}
const mime = lookup(this.anonymizedPath);
if (mime && this.extension() != "ts") {
res.contentType(mime);
} else if (isTextFile(this.anonymizedPath)) {
res.contentType("text/plain");
}
res.header("Accept-Ranges", "none");
anonymizer.once("transform", (data) => {
if (!mime && data.isText) {
res.contentType("text/plain");
}
if (!data.wasAnonimized && this._file?.size) {
// the text files may be anonymized and therefore the size may be different
res.header("Content-Length", this._file?.size.toString());
}
}); });
const content = await this.content(); resStream.pipe(res);
function handleStreamError(error: Error) { res.on("close", () => {
if (!content.closed && !content.destroyed) { resolve();
content.destroy(); });
} res.on("error", (err) => {
span.recordException(error); reject(err);
span.end(); });
reject(error); return;
// handleError(error, res); }
}
content const mime = lookup(this.anonymizedPath);
.on("error", handleStreamError) if (mime && this.extension() != "ts") {
.pipe(anonymizer) res.contentType(mime);
.pipe(res) } else if (isTextFile(this.anonymizedPath)) {
.on("error", handleStreamError) res.contentType("text/plain");
.on("close", () => { }
if (!content.closed && !content.destroyed) { res.header("Accept-Ranges", "none");
content.destroy(); anonymizer.once("transform", (data) => {
} if (!mime && data.isText) {
span.end(); res.contentType("text/plain");
resolve(); }
}); if (!data.wasAnonimized && this._file?.size) {
} catch (error) { // the text files may be anonymized and therefore the size may be different
handleError(error, res); res.header("Content-Length", this._file?.size.toString());
} }
}); });
}); const content = await this.content();
function handleStreamError(error: Error) {
if (!content.closed && !content.destroyed) {
content.destroy();
}
reject(error);
}
content
.on("error", handleStreamError)
.pipe(anonymizer)
.pipe(res)
.on("error", handleStreamError)
.on("close", () => {
if (!content.closed && !content.destroyed) {
content.destroy();
}
resolve();
});
} catch (error) {
handleError(error, res);
}
});
} }
} }
+67 -74
View File
@@ -1,4 +1,3 @@
import { trace } from "@opentelemetry/api";
import { Octokit } from "@octokit/rest"; import { Octokit } from "@octokit/rest";
import Repository from "./Repository"; import Repository from "./Repository";
@@ -26,80 +25,74 @@ export async function checkToken(token: string) {
} }
export async function getToken(repository: Repository) { export async function getToken(repository: Repository) {
const span = trace.getTracer("ano-file").startSpan("GHUtils.getToken");
span.setAttribute("repoId", repository.repoId);
console.log("getToken", repository.repoId); console.log("getToken", repository.repoId);
try { // if (repository.model.source.accessToken) {
// if (repository.model.source.accessToken) { // // only check the token if the repo has been visited less than 10 minutes ago
// // only check the token if the repo has been visited less than 10 minutes ago // if (
// if ( // repository.status == RepositoryStatus.READY &&
// repository.status == RepositoryStatus.READY && // repository.model.lastView > new Date(Date.now() - 1000 * 60 * 10)
// repository.model.lastView > new Date(Date.now() - 1000 * 60 * 10) // ) {
// ) { // return repository.model.source.accessToken;
// return repository.model.source.accessToken; // } else if (await checkToken(repository.model.source.accessToken)) {
// } else if (await checkToken(repository.model.source.accessToken)) { // return repository.model.source.accessToken;
// return repository.model.source.accessToken; // }
// } // }
// } if (!repository.owner.model.accessTokens?.github) {
if (!repository.owner.model.accessTokens?.github) { const query = await UserModel.findById(repository.owner.id, {
const query = await UserModel.findById(repository.owner.id, { accessTokens: 1,
accessTokens: 1, accessTokenDates: 1,
accessTokenDates: 1, });
}); if (query?.accessTokens) {
if (query?.accessTokens) { repository.owner.model.accessTokens = query.accessTokens;
repository.owner.model.accessTokens = query.accessTokens; repository.owner.model.accessTokenDates = query.accessTokenDates;
repository.owner.model.accessTokenDates = query.accessTokenDates;
}
} }
const ownerAccessToken = repository.owner.model.accessTokens?.github;
if (ownerAccessToken) {
const tokenAge = repository.owner.model.accessTokenDates?.github;
// if the token is older than 7 days, refresh it
if (
!tokenAge ||
tokenAge < new Date(Date.now() - 1000 * 60 * 60 * 24 * 7)
) {
const url = `https://api.github.com/applications/${config.CLIENT_ID}/token`;
const headers = {
Accept: "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
};
const res = await fetch(url, {
method: "PATCH",
body: JSON.stringify({
access_token: ownerAccessToken,
}),
credentials: "include",
headers: {
...headers,
Authorization:
"Basic " +
Buffer.from(
config.CLIENT_ID + ":" + config.CLIENT_SECRET
).toString("base64"),
},
});
const resBody = (await res.json()) as { token: string };
repository.owner.model.accessTokens.github = resBody.token;
if (!repository.owner.model.accessTokenDates) {
repository.owner.model.accessTokenDates = {
github: new Date(),
};
} else {
repository.owner.model.accessTokenDates.github = new Date();
}
await repository.owner.model.save();
return resBody.token;
}
const check = await checkToken(ownerAccessToken);
if (check) {
repository.model.source.accessToken = ownerAccessToken;
return ownerAccessToken;
}
}
return config.GITHUB_TOKEN;
} finally {
span.end();
} }
const ownerAccessToken = repository.owner.model.accessTokens?.github;
if (ownerAccessToken) {
const tokenAge = repository.owner.model.accessTokenDates?.github;
// if the token is older than 7 days, refresh it
if (
!tokenAge ||
tokenAge < new Date(Date.now() - 1000 * 60 * 60 * 24 * 7)
) {
const url = `https://api.github.com/applications/${config.CLIENT_ID}/token`;
const headers = {
Accept: "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
};
const res = await fetch(url, {
method: "PATCH",
body: JSON.stringify({
access_token: ownerAccessToken,
}),
credentials: "include",
headers: {
...headers,
Authorization:
"Basic " +
Buffer.from(
config.CLIENT_ID + ":" + config.CLIENT_SECRET
).toString("base64"),
},
});
const resBody = (await res.json()) as { token: string };
repository.owner.model.accessTokens.github = resBody.token;
if (!repository.owner.model.accessTokenDates) {
repository.owner.model.accessTokenDates = {
github: new Date(),
};
} else {
repository.owner.model.accessTokenDates.github = new Date();
}
await repository.owner.model.save();
return resBody.token;
}
const check = await checkToken(ownerAccessToken);
if (check) {
repository.model.source.accessToken = ownerAccessToken;
return ownerAccessToken;
}
}
return config.GITHUB_TOKEN;
} }
+75 -143
View File
@@ -20,7 +20,6 @@ import {
getRepositoryFromGitHub, getRepositoryFromGitHub,
GitHubRepository, GitHubRepository,
} from "./source/GitHubRepository"; } from "./source/GitHubRepository";
import { trace } from "@opentelemetry/api";
import { getToken } from "./GitHubUtils"; import { getToken } from "./GitHubUtils";
import { FILE_TYPE } from "./storage/Storage"; import { FILE_TYPE } from "./storage/Storage";
import config from "../config"; import config from "../config";
@@ -152,44 +151,38 @@ export default class Repository {
force: false, force: false,
} }
): Promise<IFile[]> { ): Promise<IFile[]> {
const span = trace.getTracer("ano-file").startSpan("Repository.files"); const hasFile = await FileModel.exists({ repoId: this.repoId }).exec();
span.setAttribute("repoId", this.repoId); if (!hasFile || opt.force) {
try { await FileModel.deleteMany({ repoId: this.repoId }).exec();
const hasFile = await FileModel.exists({ repoId: this.repoId }).exec(); const files = await this.source.getFiles(opt.progress);
if (!hasFile || opt.force) { files.forEach((f) => (f.repoId = this.repoId));
await FileModel.deleteMany({ repoId: this.repoId }).exec(); await FileModel.insertMany(files);
const files = await this.source.getFiles(opt.progress);
files.forEach((f) => (f.repoId = this.repoId));
await FileModel.insertMany(files);
this._model.size = { storage: 0, file: 0 }; this._model.size = { storage: 0, file: 0 };
await this.computeSize(); await this.computeSize();
}
if (opt.path?.includes(config.ANONYMIZATION_MASK)) {
const f = new AnonymizedFile({
repository: this,
anonymizedPath: opt.path,
});
opt.path = await f.originalPath();
}
let pathQuery: string | RegExp | undefined = opt.path
? new RegExp(`^${opt.path}`)
: undefined;
if (opt.recursive === false) {
pathQuery = opt.path ? new RegExp(`^${opt.path}$`) : "";
}
const query: FilterQuery<IFile> = {
repoId: this.repoId,
};
if (pathQuery !== undefined) {
query.path = pathQuery;
}
return await FileModel.find(query).exec();
} finally {
span.end();
} }
if (opt.path?.includes(config.ANONYMIZATION_MASK)) {
const f = new AnonymizedFile({
repository: this,
anonymizedPath: opt.path,
});
opt.path = await f.originalPath();
}
let pathQuery: string | RegExp | undefined = opt.path
? new RegExp(`^${opt.path}`)
: undefined;
if (opt.recursive === false) {
pathQuery = opt.path ? new RegExp(`^${opt.path}$`) : "";
}
const query: FilterQuery<IFile> = {
repoId: this.repoId,
};
if (pathQuery !== undefined) {
query.path = pathQuery;
}
return await FileModel.find(query).exec();
} }
/** /**
@@ -276,11 +269,6 @@ export default class Repository {
* @returns void * @returns void
*/ */
async updateIfNeeded(opt?: { force: boolean }): Promise<void> { async updateIfNeeded(opt?: { force: boolean }): Promise<void> {
const span = trace
.getTracer("ano-file")
.startSpan("Repository.updateIfNeeded");
span.setAttribute("repoId", this.repoId);
if ( if (
this._model.options.expirationMode !== "never" && this._model.options.expirationMode !== "never" &&
this.status != RepositoryStatus.EXPIRED && this.status != RepositoryStatus.EXPIRED &&
@@ -344,8 +332,6 @@ export default class Repository {
this.status == RepositoryStatus.READY this.status == RepositoryStatus.READY
) { ) {
console.log(`[UPDATE] ${this._model.repoId} is up to date`); console.log(`[UPDATE] ${this._model.repoId} is up to date`);
span.setAttribute("status", "up_to_date");
span.end();
return; return;
} }
this._model.source.commit = newCommit; this._model.source.commit = newCommit;
@@ -368,8 +354,6 @@ export default class Repository {
); );
await this.updateStatus(RepositoryStatus.ERROR, "branch_not_found"); await this.updateStatus(RepositoryStatus.ERROR, "branch_not_found");
await this.resetSate(); await this.resetSate();
span.setAttribute("status", "branch_not_found");
span.end();
throw new AnonymousError("branch_not_found", { throw new AnonymousError("branch_not_found", {
object: this, object: this,
}); });
@@ -386,7 +370,6 @@ export default class Repository {
}); });
} }
} }
span.end();
} }
/** /**
* Download the require state for the repository to work * Download the require state for the repository to work
@@ -394,10 +377,7 @@ export default class Repository {
* @returns void * @returns void
*/ */
async anonymize(progress?: (status: string) => void) { async anonymize(progress?: (status: string) => void) {
const span = trace.getTracer("ano-file").startSpan("Repository.anonymize");
span.setAttribute("repoId", this.repoId);
if (this.status === RepositoryStatus.READY) { if (this.status === RepositoryStatus.READY) {
span.end();
return; return;
} }
this.model.increment(); this.model.increment();
@@ -418,23 +398,16 @@ export default class Repository {
} }
await this.updateStatus(RepositoryStatus.READY); await this.updateStatus(RepositoryStatus.READY);
await this.computeSize(); await this.computeSize();
span.end();
} }
/** /**
* Update the last view and view count * Update the last view and view count
*/ */
async countView() { async countView() {
const span = trace.getTracer("ano-file").startSpan("Repository.countView"); this._model.lastView = new Date();
span.setAttribute("repoId", this.repoId); this._model.pageView = (this._model.pageView || 0) + 1;
try { if (!isConnected) return this.model;
this._model.lastView = new Date(); await this._model.save();
this._model.pageView = (this._model.pageView || 0) + 1;
if (!isConnected) return this.model;
await this._model.save();
} finally {
span.end();
}
} }
/** /**
@@ -443,54 +416,36 @@ export default class Repository {
* @param errorMessage a potential error message to display * @param errorMessage a potential error message to display
*/ */
async updateStatus(status: RepositoryStatus, statusMessage?: string) { async updateStatus(status: RepositoryStatus, statusMessage?: string) {
const span = trace if (!status) return this.model;
.getTracer("ano-file") this._model.status = status;
.startSpan("Repository.updateStatus"); this._model.statusDate = new Date();
span.setAttribute("repoId", this.repoId); this._model.statusMessage = statusMessage;
span.setAttribute("status", status); if (!isConnected) return this.model;
span.setAttribute("statusMessage", statusMessage || ""); await this._model.save();
try {
if (!status) return this.model;
this._model.status = status;
this._model.statusDate = new Date();
this._model.statusMessage = statusMessage;
if (!isConnected) return this.model;
await this._model.save();
} finally {
span.end();
}
} }
/** /**
* Expire the repository * Expire the repository
*/ */
async expire() { async expire() {
const span = trace.getTracer("ano-file").startSpan("Repository.expire");
span.setAttribute("repoId", this.repoId);
await this.updateStatus(RepositoryStatus.EXPIRING); await this.updateStatus(RepositoryStatus.EXPIRING);
await this.resetSate(); await this.resetSate();
await this.updateStatus(RepositoryStatus.EXPIRED); await this.updateStatus(RepositoryStatus.EXPIRED);
span.end();
} }
/** /**
* Remove the repository * Remove the repository
*/ */
async remove() { async remove() {
const span = trace.getTracer("ano-file").startSpan("Repository.remove");
span.setAttribute("repoId", this.repoId);
await this.updateStatus(RepositoryStatus.REMOVING); await this.updateStatus(RepositoryStatus.REMOVING);
await this.resetSate(); await this.resetSate();
await this.updateStatus(RepositoryStatus.REMOVED); await this.updateStatus(RepositoryStatus.REMOVED);
span.end();
} }
/** /**
* Reset/delete the state of the repository * Reset/delete the state of the repository
*/ */
async resetSate(status?: RepositoryStatus, statusMessage?: string) { async resetSate(status?: RepositoryStatus, statusMessage?: string) {
const span = trace.getTracer("ano-file").startSpan("Repository.resetState");
span.setAttribute("repoId", this.repoId);
// remove attribute // remove attribute
this._model.size = { storage: 0, file: 0 }; this._model.size = { storage: 0, file: 0 };
if (status) { if (status) {
@@ -502,7 +457,6 @@ export default class Repository {
this.removeCache(), this.removeCache(),
]); ]);
console.log(`[RESET] ${this._model.repoId} has been reset`); console.log(`[RESET] ${this._model.repoId} has been reset`);
span.end();
} }
/** /**
@@ -510,22 +464,14 @@ export default class Repository {
* @returns * @returns
*/ */
async removeCache() { async removeCache() {
const span = trace await storage.rm(this.repoId);
.getTracer("ano-file") this.model.isReseted = true;
.startSpan("Repository.removeCache"); if (isConnected) {
span.setAttribute("repoId", this.repoId); try {
try { await this.model.save();
await storage.rm(this.repoId); } catch (error) {
this.model.isReseted = true; console.error("[ERROR] removeCache save", error);
if (isConnected) {
try {
await this.model.save();
} catch (error) {
console.error("[ERROR] removeCache save", error);
}
} }
} finally {
span.end();
} }
} }
@@ -544,39 +490,31 @@ export default class Repository {
*/ */
file: number; file: number;
}> { }> {
const span = trace if (this.status !== RepositoryStatus.READY)
.getTracer("ano-file") return { storage: 0, file: 0 };
.startSpan("Repository.computeSize"); if (this._model.size.file) return this._model.size;
span.setAttribute("repoId", this.repoId); const res = await FileModel.aggregate([
try { {
if (this.status !== RepositoryStatus.READY) $match: {
return { storage: 0, file: 0 }; repoId: this.repoId,
if (this._model.size.file) return this._model.size;
const res = await FileModel.aggregate([
{
$match: {
repoId: this.repoId,
},
}, },
{ },
$group: { {
_id: "$repoId", $group: {
storage: { $sum: "$size" }, _id: "$repoId",
file: { $sum: 1 }, storage: { $sum: "$size" },
}, file: { $sum: 1 },
}, },
]); },
this._model.size = { ]);
storage: res[0]?.storage || 0, this._model.size = {
file: res[0]?.file || 0, storage: res[0]?.storage || 0,
}; file: res[0]?.file || 0,
if (isConnected) { };
await this._model.save(); if (isConnected) {
} await this._model.save();
return this._model.size;
} finally {
span.end();
} }
return this._model.size;
} }
/** /**
@@ -585,20 +523,14 @@ export default class Repository {
* @returns conference of the repository * @returns conference of the repository
*/ */
async conference(): Promise<Conference | null> { async conference(): Promise<Conference | null> {
const span = trace.getTracer("ano-file").startSpan("Repository.conference"); if (!this._model.conference) {
span.setAttribute("repoId", this.repoId);
try {
if (!this._model.conference) {
return null;
}
const conference = await ConferenceModel.findOne({
conferenceID: this._model.conference,
});
if (conference) return new Conference(conference);
return null; return null;
} finally {
span.end();
} }
const conference = await ConferenceModel.findOne({
conferenceID: this._model.conference,
});
if (conference) return new Conference(conference);
return null;
} }
/***** Getters ********/ /***** Getters ********/
-14
View File
@@ -1,5 +1,3 @@
import { trace } from "@opentelemetry/api";
import AnonymizedRepositoryModel from "./model/anonymizedRepositories/anonymizedRepositories.model"; import AnonymizedRepositoryModel from "./model/anonymizedRepositories/anonymizedRepositories.model";
import RepositoryModel from "./model/repositories/repositories.model"; import RepositoryModel from "./model/repositories/repositories.model";
import { IUserDocument } from "./model/users/users.types"; import { IUserDocument } from "./model/users/users.types";
@@ -57,10 +55,6 @@ export default class User {
*/ */
force: boolean; force: boolean;
}): Promise<GitHubRepository[]> { }): Promise<GitHubRepository[]> {
const span = trace
.getTracer("ano-file")
.startSpan("User.getGitHubRepositories");
span.setAttribute("username", this.username);
if ( if (
!this._model.repositories || !this._model.repositories ||
this._model.repositories.length == 0 || this._model.repositories.length == 0 ||
@@ -111,13 +105,11 @@ export default class User {
// have the model // have the model
await this._model.save(); await this._model.save();
span.end();
return repositories.map((r) => new GitHubRepository(r)); return repositories.map((r) => new GitHubRepository(r));
} else { } else {
const out = ( const out = (
await RepositoryModel.find({ _id: { $in: this._model.repositories } }) await RepositoryModel.find({ _id: { $in: this._model.repositories } })
).map((i) => new GitHubRepository(i)); ).map((i) => new GitHubRepository(i));
span.end();
return out; return out;
} }
} }
@@ -127,8 +119,6 @@ export default class User {
* @returns the list of anonymized repositories * @returns the list of anonymized repositories
*/ */
async getRepositories() { async getRepositories() {
const span = trace.getTracer("ano-file").startSpan("User.getRepositories");
span.setAttribute("username", this.username);
const repositories = ( const repositories = (
await AnonymizedRepositoryModel.find({ await AnonymizedRepositoryModel.find({
owner: this.id, owner: this.id,
@@ -147,7 +137,6 @@ export default class User {
} }
} }
await Promise.all(promises); await Promise.all(promises);
span.end();
return repositories; return repositories;
} }
/** /**
@@ -155,8 +144,6 @@ export default class User {
* @returns the list of anonymized repositories * @returns the list of anonymized repositories
*/ */
async getPullRequests() { async getPullRequests() {
const span = trace.getTracer("ano-file").startSpan("User.getPullRequests");
span.setAttribute("username", this.username);
const pullRequests = ( const pullRequests = (
await AnonymizedPullRequestModel.find({ await AnonymizedPullRequestModel.find({
owner: this.id, owner: this.id,
@@ -175,7 +162,6 @@ export default class User {
} }
} }
await Promise.all(promises); await Promise.all(promises);
span.end();
return pullRequests; return pullRequests;
} }
+38 -63
View File
@@ -1,7 +1,6 @@
import { basename } from "path"; import { basename } from "path";
import { Transform, Readable } from "stream"; import { Transform, Readable } from "stream";
import { isText } from "istextorbinary"; import { isText } from "istextorbinary";
import { trace } from "@opentelemetry/api";
import config from "../config"; import config from "../config";
@@ -49,30 +48,24 @@ export class AnonymizeTransformer extends Transform {
} }
_transform(chunk: Buffer, encoding: string, callback: () => void) { _transform(chunk: Buffer, encoding: string, callback: () => void) {
trace if (this.isText === null) {
.getTracer("ano-file") this.isText = isTextFile(this.opt.filePath, chunk);
.startActiveSpan("AnonymizeTransformer.transform", async (span) => { }
span.setAttribute("path", this.opt.filePath); if (this.isText) {
if (this.isText === null) { const content = this.anonimizer.anonymize(chunk.toString());
this.isText = isTextFile(this.opt.filePath, chunk); if (this.anonimizer.wasAnonymized) {
} chunk = Buffer.from(content);
if (this.isText) { }
const content = this.anonimizer.anonymize(chunk.toString()); }
if (this.anonimizer.wasAnonymized) {
chunk = Buffer.from(content);
}
}
this.emit("transform", { this.emit("transform", {
isText: this.isText, isText: this.isText,
wasAnonimized: this.wasAnonimized, wasAnonimized: this.wasAnonimized,
chunk, chunk,
}); });
this.push(chunk); this.push(chunk);
span.end(); callback();
callback();
});
} }
} }
@@ -179,48 +172,30 @@ export class ContentAnonimizer {
} }
anonymize(content: string) { anonymize(content: string) {
const span = trace content = this.removeImage(content);
.getTracer("ano-file") content = this.removeLink(content);
.startSpan("ContentAnonimizer.anonymize"); content = this.replaceGitHubSelfLinks(content);
try { content = this.replaceTerms(content);
content = this.removeImage(content); return content;
span.addEvent("removeImage");
content = this.removeLink(content);
span.addEvent("removeLink");
content = this.replaceGitHubSelfLinks(content);
span.addEvent("replaceGitHubSelfLinks");
content = this.replaceTerms(content);
span.addEvent("replaceTerms");
return content;
} finally {
span.end();
}
} }
} }
export function anonymizePath(path: string, terms: string[]) { export function anonymizePath(path: string, terms: string[]) {
return trace for (let i = 0; i < terms.length; i++) {
.getTracer("ano-file") let term = terms[i];
.startActiveSpan("utils.anonymizePath", (span) => { if (term.trim() == "") {
span.setAttribute("path", path); continue;
for (let i = 0; i < terms.length; i++) { }
let term = terms[i]; try {
if (term.trim() == "") { new RegExp(term, "gi");
continue; } catch {
} // escape regex characters
try { term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&");
new RegExp(term, "gi"); }
} catch { path = path.replace(
// escape regex characters new RegExp(term, "gi"),
term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&"); config.ANONYMIZATION_MASK + "-" + (i + 1)
} );
path = path.replace( }
new RegExp(term, "gi"), return path;
config.ANONYMIZATION_MASK + "-" + (i + 1)
);
}
span.setAttribute("return", path);
span.end();
return path;
});
} }
+44 -61
View File
@@ -4,7 +4,6 @@ import { OctokitResponse } from "@octokit/types";
import storage from "../storage"; import storage from "../storage";
import GitHubBase, { GitHubBaseData } from "./GitHubBase"; import GitHubBase, { GitHubBaseData } from "./GitHubBase";
import { trace } from "@opentelemetry/api";
import { FILE_TYPE } from "../storage/Storage"; import { FILE_TYPE } from "../storage/Storage";
import { octokit } from "../GitHubUtils"; import { octokit } from "../GitHubUtils";
import AnonymousError from "../AnonymousError"; import AnonymousError from "../AnonymousError";
@@ -27,47 +26,39 @@ export default class GitHubDownload extends GitHubBase {
} }
async download(progress?: (status: string) => void) { async download(progress?: (status: string) => void) {
const span = trace.getTracer("ano-file").startSpan("GHDownload.download"); let response: OctokitResponse<unknown, number>;
span.setAttribute("repoId", this.data.repoId);
try { try {
let response: OctokitResponse<unknown, number>; response = await this.getZipUrl();
try { } catch (error) {
response = await this.getZipUrl(); throw new AnonymousError("repo_not_found", {
} catch (error) { httpStatus: (error as any).status || 404,
span.recordException(error as Error); object: this.data,
throw new AnonymousError("repo_not_found", { cause: error as Error,
httpStatus: (error as any).status || 404, });
object: this.data, }
cause: error as Error, await storage.mk(this.data.repoId);
}); try {
} const downloadStream = got.stream(response.url);
await storage.mk(this.data.repoId); downloadStream.addListener(
try { "downloadProgress",
const downloadStream = got.stream(response.url); (p: { transferred?: number }) => {
downloadStream.addListener( if (progress && p.transferred) {
"downloadProgress", progress("Repository download: " + humanFileSize(p.transferred));
(p: { transferred?: number }) => {
if (progress && p.transferred) {
progress("Repository download: " + humanFileSize(p.transferred));
}
} }
); }
await storage.extractZip( );
this.data.repoId, await storage.extractZip(
"", this.data.repoId,
downloadStream, "",
this.type downloadStream,
); this.type
} catch (error) { );
span.recordException(error as Error); } catch (error) {
throw new AnonymousError("unable_to_download", { throw new AnonymousError("unable_to_download", {
httpStatus: 500, httpStatus: 500,
cause: error as Error, cause: error as Error,
object: this.data, object: this.data,
}); });
}
} finally {
span.end();
} }
} }
@@ -75,29 +66,21 @@ export default class GitHubDownload extends GitHubBase {
file: AnonymizedFile, file: AnonymizedFile,
progress?: (status: string) => void progress?: (status: string) => void
): Promise<Readable> { ): Promise<Readable> {
const span = trace const exists = await storage.exists(this.data.repoId, file.filePath);
.getTracer("ano-file") if (exists === FILE_TYPE.FILE) {
.startSpan("GHDownload.getFileContent");
span.setAttribute("repoId", this.data.repoId);
try {
const exists = await storage.exists(this.data.repoId, file.filePath);
if (exists === FILE_TYPE.FILE) {
return storage.read(this.data.repoId, file.filePath);
} else if (exists === FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
object: file,
});
}
// will throw an error if the file is not in the repository
await file.originalPath();
// the cache is not ready, we need to download the repository
await this.download(progress);
return storage.read(this.data.repoId, file.filePath); return storage.read(this.data.repoId, file.filePath);
} finally { } else if (exists === FILE_TYPE.FOLDER) {
span.end(); throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
object: file,
});
} }
// will throw an error if the file is not in the repository
await file.originalPath();
// the cache is not ready, we need to download the repository
await this.download(progress);
return storage.read(this.data.repoId, file.filePath);
} }
async getFiles(progress?: (status: string) => void) { async getFiles(progress?: (status: string) => void) {
+153 -189
View File
@@ -1,7 +1,6 @@
import { Branch } from "../types"; import { Branch } from "../types";
import * as gh from "parse-github-url"; import * as gh from "parse-github-url";
import { RestEndpointMethodTypes } from "@octokit/rest"; import { RestEndpointMethodTypes } from "@octokit/rest";
import { trace } from "@opentelemetry/api";
import AnonymousError from "../AnonymousError"; import AnonymousError from "../AnonymousError";
import { isConnected } from "../../server/database"; import { isConnected } from "../../server/database";
@@ -55,81 +54,64 @@ export class GitHubRepository {
accessToken: string; accessToken: string;
} }
) { ) {
const span = trace const oct = octokit(opt.accessToken);
.getTracer("ano-file") const commit = await oct.repos.getCommit({
.startSpan("GHRepository.getCommitInfo"); owner: this.owner,
span.setAttribute("owner", this.owner); repo: this.repo,
span.setAttribute("repo", this.repo); ref: sha,
try { });
const oct = octokit(opt.accessToken); return commit.data;
const commit = await oct.repos.getCommit({
owner: this.owner,
repo: this.repo,
ref: sha,
});
return commit.data;
} finally {
span.end();
}
} }
async branches(opt: { async branches(opt: {
accessToken: string; accessToken: string;
force?: boolean; force?: boolean;
}): Promise<Branch[]> { }): Promise<Branch[]> {
const span = trace.getTracer("ano-file").startSpan("GHRepository.branches"); if (
span.setAttribute("owner", this.owner); !this._data.branches ||
span.setAttribute("repo", this.repo); this._data.branches.length == 0 ||
try { opt?.force === true
if ( ) {
!this._data.branches || // get the list of repo from github
this._data.branches.length == 0 || const oct = octokit(opt.accessToken);
opt?.force === true try {
) { const branches = (
// get the list of repo from github await oct.paginate("GET /repos/{owner}/{repo}/branches", {
const oct = octokit(opt.accessToken); owner: this.owner,
try { repo: this.repo,
const branches = ( per_page: 100,
await oct.paginate("GET /repos/{owner}/{repo}/branches", { })
owner: this.owner, ).map((b) => {
repo: this.repo, return {
per_page: 100, name: b.name,
}) commit: b.commit.sha,
).map((b) => { readme: this._data.branches?.filter(
return { (f: Branch) => f.name == b.name
name: b.name, )[0]?.readme,
commit: b.commit.sha, } as Branch;
readme: this._data.branches?.filter( });
(f: Branch) => f.name == b.name this._data.branches = branches;
)[0]?.readme, if (isConnected) {
} as Branch; await RepositoryModel.updateOne(
}); { externalId: this.id },
this._data.branches = branches; { $set: { branches } }
if (isConnected) { );
await RepositoryModel.updateOne(
{ externalId: this.id },
{ $set: { branches } }
);
}
} catch (error) {
span.recordException(error as Error);
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status,
cause: error as Error,
object: this,
});
} }
} else if (isConnected) { } catch (error) {
const q = await RepositoryModel.findOne({ externalId: this.id }).select( throw new AnonymousError("repo_not_found", {
"branches" httpStatus: (error as any).status,
); cause: error as Error,
this._data.branches = q?.branches; object: this,
});
} }
} else if (isConnected) {
return this._data.branches || []; const q = await RepositoryModel.findOne({ externalId: this.id }).select(
} finally { "branches"
span.end(); );
this._data.branches = q?.branches;
} }
return this._data.branches || [];
} }
async readme(opt: { async readme(opt: {
@@ -137,60 +119,52 @@ export class GitHubRepository {
force?: boolean; force?: boolean;
accessToken: string; accessToken: string;
}): Promise<string | undefined> { }): Promise<string | undefined> {
const span = trace.getTracer("ano-file").startSpan("GHRepository.readme"); if (!opt.branch) opt.branch = this._data.defaultBranch || "master";
span.setAttribute("owner", this.owner);
span.setAttribute("repo", this.repo);
try {
if (!opt.branch) opt.branch = this._data.defaultBranch || "master";
const model = await RepositoryModel.findOne({ const model = await RepositoryModel.findOne({
externalId: this.id, externalId: this.id,
}).select("branches"); }).select("branches");
if (!model) { if (!model) {
throw new AnonymousError("repo_not_found", { httpStatus: 404 }); throw new AnonymousError("repo_not_found", { httpStatus: 404 });
} }
this._data.branches = await this.branches(opt); this._data.branches = await this.branches(opt);
model.branches = this._data.branches; model.branches = this._data.branches;
const selected = model.branches.filter((f) => f.name == opt.branch)[0]; const selected = model.branches.filter((f) => f.name == opt.branch)[0];
if (selected && (!selected.readme || opt?.force === true)) { if (selected && (!selected.readme || opt?.force === true)) {
// get the list of repo from github // get the list of repo from github
const oct = octokit(opt.accessToken); const oct = octokit(opt.accessToken);
try { try {
const ghRes = await oct.repos.getReadme({ const ghRes = await oct.repos.getReadme({
owner: this.owner, owner: this.owner,
repo: this.repo, repo: this.repo,
ref: selected?.commit, ref: selected?.commit,
}); });
const readme = Buffer.from( const readme = Buffer.from(
ghRes.data.content, ghRes.data.content,
ghRes.data.encoding as BufferEncoding ghRes.data.encoding as BufferEncoding
).toString("utf-8"); ).toString("utf-8");
selected.readme = readme; selected.readme = readme;
await model.save(); await model.save();
} catch (error) { } catch (error) {
span.recordException(error as Error);
throw new AnonymousError("readme_not_available", {
httpStatus: 404,
cause: error as Error,
object: this,
});
}
}
if (!selected) {
throw new AnonymousError("readme_not_available", { throw new AnonymousError("readme_not_available", {
httpStatus: 404, httpStatus: 404,
cause: error as Error,
object: this, object: this,
}); });
} }
return selected.readme;
} finally {
span.end();
} }
if (!selected) {
throw new AnonymousError("readme_not_available", {
httpStatus: 404,
object: this,
});
}
return selected.readme;
} }
public get owner(): string { public get owner(): string {
@@ -235,60 +209,44 @@ export async function getRepositoryFromGitHub(opt: {
accessToken: string; accessToken: string;
force?: boolean; force?: boolean;
}) { }) {
const span = trace if (opt.repo.indexOf(".git") > -1) {
.getTracer("ano-file") opt.repo = opt.repo.replace(".git", "");
.startSpan("GHRepository.getRepositoryFromGitHub"); }
span.setAttribute("owner", opt.owner); let dbModel = null;
span.setAttribute("repo", opt.repo); if (opt.repositoryID) {
try { dbModel = isConnected
if (opt.repo.indexOf(".git") > -1) { ? await RepositoryModel.findById(opt.repositoryID)
opt.repo = opt.repo.replace(".git", ""); : null;
} opt.owner = dbModel?.name?.split("/")[0] || opt.owner;
let dbModel = null; opt.repo = dbModel?.name?.split("/")[1] || opt.repo;
if (opt.repositoryID) { } else {
dbModel = isConnected dbModel = isConnected
? await RepositoryModel.findById(opt.repositoryID) ? await RepositoryModel.findOne({
: null; name: opt.owner + "/" + opt.repo,
opt.owner = dbModel?.name?.split("/")[0] || opt.owner;
opt.repo = dbModel?.name?.split("/")[1] || opt.repo;
} else {
dbModel = isConnected
? await RepositoryModel.findOne({
name: opt.owner + "/" + opt.repo,
})
: null;
}
if (dbModel && !opt.force) {
return new GitHubRepository(dbModel);
}
const oct = octokit(opt.accessToken);
let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"];
try {
r = (
await oct.repos.get({
owner: opt.owner,
repo: opt.repo,
}) })
).data; : null;
} catch (error) { }
span.recordException(error as Error); if (dbModel && !opt.force) {
if ( return new GitHubRepository(dbModel);
error instanceof Error && }
error.message.includes( const oct = octokit(opt.accessToken);
"organization has enabled OAuth App access restrictions" let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"];
) try {
) { r = (
throw new AnonymousError("repo_access_limited", { await oct.repos.get({
httpStatus: 403, owner: opt.owner,
object: { repo: opt.repo,
owner: opt.owner, })
repo: opt.repo, ).data;
}, } catch (error) {
cause: error as Error, if (
}); error instanceof Error &&
} error.message.includes(
throw new AnonymousError("repo_not_found", { "organization has enabled OAuth App access restrictions"
httpStatus: (error as any).status, )
) {
throw new AnonymousError("repo_access_limited", {
httpStatus: 403,
object: { object: {
owner: opt.owner, owner: opt.owner,
repo: opt.repo, repo: opt.repo,
@@ -296,32 +254,38 @@ export async function getRepositoryFromGitHub(opt: {
cause: error as Error, cause: error as Error,
}); });
} }
if (!r) throw new AnonymousError("repo_not_found", {
throw new AnonymousError("repo_not_found", { httpStatus: (error as any).status,
httpStatus: 404, object: {
object: {
owner: opt.owner,
repo: opt.repo,
},
});
const model = dbModel || new RepositoryModel({ externalId: "gh_" + r.id });
model.name = r.full_name;
model.url = r.html_url;
model.size = r.size;
model.defaultBranch = r.default_branch;
model.hasPage = r.has_pages;
if (model.hasPage) {
const ghPageRes = await oct.repos.getPages({
owner: opt.owner, owner: opt.owner,
repo: opt.repo, repo: opt.repo,
}); },
model.pageSource = ghPageRes.data.source; cause: error as Error,
} });
if (isConnected) {
await model.save();
}
return new GitHubRepository(model);
} finally {
span.end();
} }
if (!r)
throw new AnonymousError("repo_not_found", {
httpStatus: 404,
object: {
owner: opt.owner,
repo: opt.repo,
},
});
const model = dbModel || new RepositoryModel({ externalId: "gh_" + r.id });
model.name = r.full_name;
model.url = r.html_url;
model.size = r.size;
model.defaultBranch = r.default_branch;
model.hasPage = r.has_pages;
if (model.hasPage) {
const ghPageRes = await oct.repos.getPages({
owner: opt.owner,
repo: opt.repo,
});
model.pageSource = ghPageRes.data.source;
}
if (isConnected) {
await model.save();
}
return new GitHubRepository(model);
} }
+90 -142
View File
@@ -7,7 +7,6 @@ import { basename, dirname } from "path";
import * as stream from "stream"; import * as stream from "stream";
import AnonymousError from "../AnonymousError"; import AnonymousError from "../AnonymousError";
import { trace } from "@opentelemetry/api";
import { FILE_TYPE } from "../storage/Storage"; import { FILE_TYPE } from "../storage/Storage";
import { octokit } from "../GitHubUtils"; import { octokit } from "../GitHubUtils";
import FileModel from "../model/files/files.model"; import FileModel from "../model/files/files.model";
@@ -21,8 +20,6 @@ export default class GitHubStream extends GitHubBase {
} }
downloadFile(token: string, sha: string) { downloadFile(token: string, sha: string) {
const span = trace.getTracer("ano-file").startSpan("GHStream.downloadFile");
span.setAttribute("sha", sha);
const oct = octokit(token); const oct = octokit(token);
try { try {
const { url } = oct.rest.git.getBlob.endpoint({ const { url } = oct.rest.git.getBlob.endpoint({
@@ -40,14 +37,11 @@ export default class GitHubStream extends GitHubBase {
}); });
} catch (error) { } catch (error) {
console.error(error); console.error(error);
// span.recordException(error as Error);
throw new AnonymousError("repo_not_accessible", { throw new AnonymousError("repo_not_accessible", {
httpStatus: 404, httpStatus: 404,
object: this.data, object: this.data,
cause: error as Error, cause: error as Error,
}); });
} finally {
span.end();
} }
} }
@@ -56,12 +50,6 @@ export default class GitHubStream extends GitHubBase {
repoId: string, repoId: string,
fileSha: () => Promise<string> | string fileSha: () => Promise<string> | string
) { ) {
const span = trace
.getTracer("ano-file")
.startSpan("GHStream.getFileContent");
span.setAttribute("repoId", repoId);
span.setAttribute("file", filePath);
const fileInfo = await storage.exists(repoId, filePath); const fileInfo = await storage.exists(repoId, filePath);
if (fileInfo == FILE_TYPE.FILE) { if (fileInfo == FILE_TYPE.FILE) {
return storage.read(repoId, filePath); return storage.read(repoId, filePath);
@@ -76,10 +64,6 @@ export default class GitHubStream extends GitHubBase {
await fileSha() await fileSha()
); );
content.on("close", () => {
span.end();
});
// duplicate the stream to write it to the storage // duplicate the stream to write it to the storage
const stream1 = content.pipe(new stream.PassThrough()); const stream1 = content.pipe(new stream.PassThrough());
const stream2 = content.pipe(new stream.PassThrough()); const stream2 = content.pipe(new stream.PassThrough());
@@ -99,45 +83,30 @@ export default class GitHubStream extends GitHubBase {
} }
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> { async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
const span = trace
.getTracer("ano-file")
.startSpan("GHStream.getFileContent");
span.setAttribute("repoId", file.repository.repoId);
span.setAttribute("file", file.anonymizedPath);
try { try {
try { void file.filePath;
void file.filePath; } catch (_) {
} catch (_) { // compute the original path if ambiguous
// compute the original path if ambiguous await file.originalPath();
await file.originalPath();
}
return this.getFileContentCache(
file.filePath,
file.repository.repoId,
async () => {
const fileSha = await file.sha();
if (!fileSha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
return fileSha;
}
);
} finally {
span.end();
} }
return this.getFileContentCache(
file.filePath,
file.repository.repoId,
async () => {
const fileSha = await file.sha();
if (!fileSha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
return fileSha;
}
);
} }
async getFiles(progress?: (status: string) => void) { async getFiles(progress?: (status: string) => void) {
const span = trace.getTracer("ano-file").startSpan("GHStream.getFiles"); return this.getTruncatedTree(this.data.commit, progress);
span.setAttribute("repoId", this.data.repoId);
try {
return this.getTruncatedTree(this.data.commit, progress);
} finally {
span.end();
}
} }
private async getGHTree( private async getGHTree(
@@ -145,25 +114,19 @@ export default class GitHubStream extends GitHubBase {
count = { request: 0, file: 0 }, count = { request: 0, file: 0 },
opt = { recursive: true, callback: () => {} } opt = { recursive: true, callback: () => {} }
) { ) {
const span = trace.getTracer("ano-file").startSpan("GHStream.getGHTree"); const oct = octokit(await this.data.getToken());
span.setAttribute("sha", sha); const ghRes = await oct.git.getTree({
try { owner: this.data.organization,
const oct = octokit(await this.data.getToken()); repo: this.data.repoName,
const ghRes = await oct.git.getTree({ tree_sha: sha,
owner: this.data.organization, recursive: opt.recursive === true ? "1" : undefined,
repo: this.data.repoName, });
tree_sha: sha, count.request++;
recursive: opt.recursive === true ? "1" : undefined, count.file += ghRes.data.tree.length;
}); if (opt.callback) {
count.request++; opt.callback();
count.file += ghRes.data.tree.length;
if (opt.callback) {
opt.callback();
}
return ghRes.data;
} finally {
span.end();
} }
return ghRes.data;
} }
private async getTruncatedTree( private async getTruncatedTree(
@@ -175,65 +138,56 @@ export default class GitHubStream extends GitHubBase {
request: 0, request: 0,
file: 0, file: 0,
}; };
const span = trace
.getTracer("ano-file")
.startSpan("GHStream.getTruncatedTree");
span.setAttribute("sha", sha);
span.setAttribute("parentPath", parentPath);
const output: IFile[] = []; const output: IFile[] = [];
let data = null;
try { try {
let data = null; data = await this.getGHTree(sha, count, {
try { recursive: false,
data = await this.getGHTree(sha, count, { callback: () => {
recursive: false, if (progress) {
callback: () => { progress("List file: " + count.file);
if (progress) { }
progress("List file: " + count.file); },
}
},
});
output.push(...this.tree2Tree(data.tree, parentPath));
} catch (error) {
console.log(error);
if ((error as any).status == 409 || (error as any).status == 404) {
// empty repo
data = { tree: [] };
} else {
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status || 404,
object: this.data,
cause: error as Error,
});
}
}
const promises: ReturnType<GitHubStream["getGHTree"]>[] = [];
const parentPaths: string[] = [];
for (const file of data.tree) {
if (file.type == "tree" && file.path && file.sha) {
const elementPath = path.join(parentPath, file.path);
parentPaths.push(elementPath);
promises.push(
this.getGHTree(file.sha, count, {
recursive: true,
callback: () => {
if (progress) {
progress("List file: " + count.file);
}
},
})
);
}
}
(await Promise.all(promises)).forEach((data, i) => {
if (data.truncated) {
// TODO: the tree is truncated
}
output.push(...this.tree2Tree(data.tree, parentPaths[i]));
}); });
return output; output.push(...this.tree2Tree(data.tree, parentPath));
} finally { } catch (error) {
span.end(); console.log(error);
if ((error as any).status == 409 || (error as any).status == 404) {
// empty repo
data = { tree: [] };
} else {
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status || 404,
object: this.data,
cause: error as Error,
});
}
} }
const promises: ReturnType<GitHubStream["getGHTree"]>[] = [];
const parentPaths: string[] = [];
for (const file of data.tree) {
if (file.type == "tree" && file.path && file.sha) {
const elementPath = path.join(parentPath, file.path);
parentPaths.push(elementPath);
promises.push(
this.getGHTree(file.sha, count, {
recursive: true,
callback: () => {
if (progress) {
progress("List file: " + count.file);
}
},
})
);
}
}
(await Promise.all(promises)).forEach((data, i) => {
if (data.truncated) {
// TODO: the tree is truncated
}
output.push(...this.tree2Tree(data.tree, parentPaths[i]));
});
return output;
} }
private tree2Tree( private tree2Tree(
@@ -247,25 +201,19 @@ export default class GitHubStream extends GitHubBase {
}[], }[],
parentPath: string = "" parentPath: string = ""
) { ) {
const span = trace.getTracer("ano-file").startSpan("GHStream.tree2Tree"); return tree.map((elem) => {
span.setAttribute("parentPath", parentPath); const fullPath = path.join(parentPath, elem.path || "");
try { let pathFile = dirname(fullPath);
return tree.map((elem) => { if (pathFile === ".") {
const fullPath = path.join(parentPath, elem.path || ""); pathFile = "";
let pathFile = dirname(fullPath); }
if (pathFile === ".") { return new FileModel({
pathFile = ""; name: basename(fullPath),
} path: pathFile,
return new FileModel({ repoId: this.data.repoId,
name: basename(fullPath), size: elem.size,
path: pathFile, sha: elem.sha,
repoId: this.data.repoId,
size: elem.size,
sha: elem.sha,
});
}); });
} finally { });
span.end();
}
} }
} }
+45 -85
View File
@@ -7,7 +7,6 @@ import { Readable, pipeline, Transform } from "stream";
import * as archiver from "archiver"; import * as archiver from "archiver";
import { promisify } from "util"; import { promisify } from "util";
import { lookup } from "mime-types"; import { lookup } from "mime-types";
import { trace } from "@opentelemetry/api";
import StorageBase, { FILE_TYPE } from "./Storage"; import StorageBase, { FILE_TYPE } from "./Storage";
import FileModel from "../model/files/files.model"; import FileModel from "../model/files/files.model";
import { IFile } from "../model/files/files.types"; import { IFile } from "../model/files/files.types";
@@ -22,37 +21,20 @@ export default class FileSystem extends StorageBase {
/** @override */ /** @override */
async exists(repoId: string, p: string = ""): Promise<FILE_TYPE> { async exists(repoId: string, p: string = ""): Promise<FILE_TYPE> {
const fullPath = join(config.FOLDER, this.repoPath(repoId), p); const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return trace try {
.getTracer("ano-file") const stat = await fs.promises.stat(fullPath);
.startActiveSpan("fs.exists", async (span) => { if (stat.isDirectory()) return FILE_TYPE.FOLDER;
span.setAttribute("path", p); if (stat.isFile()) return FILE_TYPE.FILE;
span.setAttribute("full-path", fullPath); } catch (_) {
try { // ignore file not found or not downloaded
const stat = await fs.promises.stat(fullPath); }
if (stat.isDirectory()) return FILE_TYPE.FOLDER; return FILE_TYPE.NOT_FOUND;
if (stat.isFile()) return FILE_TYPE.FILE;
} catch (_) {
// ignore file not found or not downloaded
}
span.end();
return FILE_TYPE.NOT_FOUND;
});
} }
/** @override */ /** @override */
async send(repoId: string, p: string, res: Response) { async send(repoId: string, p: string, res: Response) {
const fullPath = join(config.FOLDER, this.repoPath(repoId), p); const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
return trace res.sendFile(fullPath, { dotfiles: "allow" });
.getTracer("ano-file")
.startActiveSpan("fs.send", async (span) => {
span.setAttribute("path", fullPath);
res.sendFile(fullPath, { dotfiles: "allow" }, (err) => {
if (err) {
span.recordException(err);
}
span.end();
});
});
} }
/** @override */ /** @override */
@@ -79,9 +61,7 @@ export default class FileSystem extends StorageBase {
p: string, p: string,
data: string | Readable data: string | Readable
): Promise<void> { ): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.write");
const fullPath = join(config.FOLDER, this.repoPath(repoId), p); const fullPath = join(config.FOLDER, this.repoPath(repoId), p);
span.setAttribute("path", fullPath);
try { try {
await this.mk(repoId, dirname(p)); await this.mk(repoId, dirname(p));
if (data instanceof Readable) { if (data instanceof Readable) {
@@ -91,32 +71,21 @@ export default class FileSystem extends StorageBase {
} }
return await fs.promises.writeFile(fullPath, data, "utf-8"); return await fs.promises.writeFile(fullPath, data, "utf-8");
} catch (err: any) { } catch (err: any) {
span.recordException(err);
// throw err; // throw err;
} finally {
span.end();
} }
} }
/** @override */ /** @override */
async rm(repoId: string, dir: string = ""): Promise<void> { async rm(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.rm");
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
span.setAttribute("path", fullPath); await fs.promises.rm(fullPath, {
try { force: true,
await fs.promises.rm(fullPath, { recursive: true,
force: true, });
recursive: true,
});
} finally {
span.end();
}
} }
/** @override */ /** @override */
async mk(repoId: string, dir: string = ""): Promise<void> { async mk(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.mk");
span.setAttribute("path", dir);
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
try { try {
await fs.promises.mkdir(fullPath, { await fs.promises.mkdir(fullPath, {
@@ -124,11 +93,8 @@ export default class FileSystem extends StorageBase {
}); });
} catch (err: any) { } catch (err: any) {
if (err.code !== "EEXIST") { if (err.code !== "EEXIST") {
span.recordException(err);
throw err; throw err;
} }
} finally {
span.end();
} }
} }
@@ -140,46 +106,40 @@ export default class FileSystem extends StorageBase {
onEntry?: (file: { path: string; size: number }) => void; onEntry?: (file: { path: string; size: number }) => void;
} = {} } = {}
): Promise<IFile[]> { ): Promise<IFile[]> {
return trace const fullPath = join(config.FOLDER, this.repoPath(repoId), dir);
.getTracer("ano-file") const files = await fs.promises.readdir(fullPath);
.startActiveSpan("fs.listFiles", async (span) => { const output2: IFile[] = [];
span.setAttribute("path", dir); for (const file of files) {
const fullPath = join(config.FOLDER, this.repoPath(repoId), dir); const filePath = join(fullPath, file);
const files = await fs.promises.readdir(fullPath); try {
const output2: IFile[] = []; const stats = await fs.promises.stat(filePath);
for (const file of files) { if (stats.isDirectory()) {
const filePath = join(fullPath, file); output2.push(new FileModel({ name: file, path: dir, repoId }));
try { output2.push(
const stats = await fs.promises.stat(filePath); ...(await this.listFiles(repoId, join(dir, file), opt))
if (stats.isDirectory()) { );
output2.push(new FileModel({ name: file, path: dir, repoId })); } else if (stats.isFile()) {
output2.push( if (opt.onEntry) {
...(await this.listFiles(repoId, join(dir, file), opt)) opt.onEntry({
); path: join(dir, file),
} else if (stats.isFile()) { size: stats.size,
if (opt.onEntry) { });
opt.onEntry({
path: join(dir, file),
size: stats.size,
});
}
output2.push(
new FileModel({
name: file,
path: dir,
repoId: repoId,
size: stats.size,
sha: stats.ino.toString(),
})
);
}
} catch (error) {
span.recordException(error as Error);
} }
output2.push(
new FileModel({
name: file,
path: dir,
repoId: repoId,
size: stats.size,
sha: stats.ino.toString(),
})
);
} }
span.end(); } catch (error) {
return output2; // ignore stat errors for individual files
}); }
}
return output2;
} }
/** @override */ /** @override */
+171 -233
View File
@@ -12,7 +12,6 @@ import ArchiveStreamToS3 from "decompress-stream-to-s3";
import { Response } from "express"; import { Response } from "express";
import { lookup } from "mime-types"; import { lookup } from "mime-types";
import * as archiver from "archiver"; import * as archiver from "archiver";
import { trace } from "@opentelemetry/api";
import { dirname, basename, join } from "path"; import { dirname, basename, join } from "path";
import AnonymousError from "../AnonymousError"; import AnonymousError from "../AnonymousError";
import StorageBase, { FILE_TYPE } from "./Storage"; import StorageBase, { FILE_TYPE } from "./Storage";
@@ -51,27 +50,21 @@ export default class S3Storage extends StorageBase {
/** @override */ /** @override */
async exists(repoId: string, path: string = ""): Promise<FILE_TYPE> { async exists(repoId: string, path: string = ""): Promise<FILE_TYPE> {
const span = trace.getTracer("ano-file").startSpan("s3.exists"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("path", path);
try { try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); // if we can get the file info, it is a file
try { await this.fileInfo(repoId, path);
// if we can get the file info, it is a file return FILE_TYPE.FILE;
await this.fileInfo(repoId, path); } catch (err) {
return FILE_TYPE.FILE; // check if it is a directory
} catch (err) { const data = await this.client().listObjectsV2({
// check if it is a directory Bucket: config.S3_BUCKET,
const data = await this.client().listObjectsV2({ Prefix: join(this.repoPath(repoId), path),
Bucket: config.S3_BUCKET, MaxKeys: 1,
Prefix: join(this.repoPath(repoId), path), });
MaxKeys: 1, return (data.Contents?.length || 0) > 0
}); ? FILE_TYPE.FOLDER
return (data.Contents?.length || 0) > 0 : FILE_TYPE.NOT_FOUND;
? FILE_TYPE.FOLDER
: FILE_TYPE.NOT_FOUND;
}
} finally {
span.end();
} }
} }
@@ -82,126 +75,97 @@ export default class S3Storage extends StorageBase {
/** @override */ /** @override */
async rm(repoId: string, dir: string = ""): Promise<void> { async rm(repoId: string, dir: string = ""): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("s3.rm"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("repoId", repoId); const data = await this.client(200000).listObjectsV2({
span.setAttribute("path", dir); Bucket: config.S3_BUCKET,
try { Prefix: join(this.repoPath(repoId), dir),
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); MaxKeys: 100,
const data = await this.client(200000).listObjectsV2({ });
Bucket: config.S3_BUCKET,
Prefix: join(this.repoPath(repoId), dir),
MaxKeys: 100,
});
const params = { const params = {
Bucket: config.S3_BUCKET, Bucket: config.S3_BUCKET,
Delete: { Objects: new Array<{ Key: string }>() }, Delete: { Objects: new Array<{ Key: string }>() },
}; };
data.Contents?.forEach(function (content) { data.Contents?.forEach(function (content) {
if (content.Key) { if (content.Key) {
params.Delete.Objects.push({ Key: content.Key }); params.Delete.Objects.push({ Key: content.Key });
}
});
if (params.Delete.Objects.length == 0) {
// nothing to remove
return;
} }
await this.client(200000).deleteObjects(params); });
if (data.IsTruncated) { if (params.Delete.Objects.length == 0) {
await this.rm(repoId, dir); // nothing to remove
} return;
} finally { }
span.end(); await this.client(200000).deleteObjects(params);
if (data.IsTruncated) {
await this.rm(repoId, dir);
} }
} }
/** @override */ /** @override */
async send(repoId: string, path: string, res: Response) { async send(repoId: string, path: string, res: Response) {
const span = trace.getTracer("ano-file").startSpan("s3.send"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try { try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
try {
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
});
const s = await this.client().send(command);
res.status(200);
if (s.ContentType) {
res.contentType(s.ContentType);
}
if (s.ContentLength) {
res.set("Content-Length", s.ContentLength.toString());
}
if (s.Body) {
(s.Body as Readable)?.pipe(res);
} else {
res.end();
}
} catch (error) {
span.recordException(error as Error);
try {
res.status(500);
} catch (err) {
console.error(`[ERROR] S3 send ${path}`, err);
}
}
} finally {
span.end();
}
}
async fileInfo(repoId: string, path: string) {
const span = trace.getTracer("ano-file").startSpan("s3.fileInfo");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const info = await this.client(3000).headObject({
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
});
return {
size: info.ContentLength,
lastModified: info.LastModified,
contentType: info.ContentType
? info.ContentType
: (lookup(path) as string),
};
} finally {
span.end();
}
}
/** @override */
async read(repoId: string, path: string): Promise<Readable> {
const span = trace.getTracer("ano-file").startSpan("s3.rreadm");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const command = new GetObjectCommand({ const command = new GetObjectCommand({
Bucket: config.S3_BUCKET, Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path), Key: join(this.repoPath(repoId), path),
}); });
const res = (await this.client(3000).send(command)).Body; const s = await this.client().send(command);
if (!res) { res.status(200);
throw new AnonymousError("file_not_found", { if (s.ContentType) {
httpStatus: 404, res.contentType(s.ContentType);
object: join(this.repoPath(repoId), path), }
}); if (s.ContentLength) {
res.set("Content-Length", s.ContentLength.toString());
}
if (s.Body) {
(s.Body as Readable)?.pipe(res);
} else {
res.end();
}
} catch (error) {
try {
res.status(500);
} catch (err) {
console.error(`[ERROR] S3 send ${path}`, err);
} }
return res as Readable;
} finally {
span.end();
} }
} }
async fileInfo(repoId: string, path: string) {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const info = await this.client(3000).headObject({
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
});
return {
size: info.ContentLength,
lastModified: info.LastModified,
contentType: info.ContentType
? info.ContentType
: (lookup(path) as string),
};
}
/** @override */
async read(repoId: string, path: string): Promise<Readable> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
});
const res = (await this.client(3000).send(command)).Body;
if (!res) {
throw new AnonymousError("file_not_found", {
httpStatus: 404,
object: join(this.repoPath(repoId), path),
});
}
return res as Readable;
}
/** @override */ /** @override */
async write( async write(
repoId: string, repoId: string,
@@ -209,80 +173,66 @@ export default class S3Storage extends StorageBase {
data: string | Readable, data: string | Readable,
source?: string source?: string
): Promise<void> { ): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("s3.rm"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("repoId", repoId);
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
if (data instanceof Readable) { if (data instanceof Readable) {
data.on("error", (err) => { data.on("error", (err) => {
console.error(`[ERROR] S3 write ${path}`, err); console.error(`[ERROR] S3 write ${path}`, err);
span.recordException(err as Error); this.rm(repoId, path);
this.rm(repoId, path);
});
}
const params: PutObjectCommandInput = {
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
Body: data,
ContentType: lookup(path).toString(),
};
if (source) {
params.Tagging = `source=${source}`;
}
const parallelUploads3 = new Upload({
// 30s timeout
client: this.client(30000),
params,
}); });
await parallelUploads3.done();
} finally {
span.end();
} }
const params: PutObjectCommandInput = {
Bucket: config.S3_BUCKET,
Key: join(this.repoPath(repoId), path),
Body: data,
ContentType: lookup(path).toString(),
};
if (source) {
params.Tagging = `source=${source}`;
}
const parallelUploads3 = new Upload({
// 30s timeout
client: this.client(30000),
params,
});
await parallelUploads3.done();
} }
/** @override */ /** @override */
async listFiles(repoId: string, dir: string = ""): Promise<IFile[]> { async listFiles(repoId: string, dir: string = ""): Promise<IFile[]> {
const span = trace.getTracer("ano-file").startSpan("s3.listFiles"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("path", dir); if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
try { const out: IFile[] = [];
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set"); let req: ListObjectsV2CommandOutput;
if (dir && dir[dir.length - 1] != "/") dir = dir + "/"; let nextContinuationToken: string | undefined;
const out: IFile[] = []; do {
let req: ListObjectsV2CommandOutput; req = await this.client(30000).listObjectsV2({
let nextContinuationToken: string | undefined; Bucket: config.S3_BUCKET,
do { Prefix: join(this.repoPath(repoId), dir),
req = await this.client(30000).listObjectsV2({ MaxKeys: 250,
Bucket: config.S3_BUCKET, ContinuationToken: nextContinuationToken,
Prefix: join(this.repoPath(repoId), dir), });
MaxKeys: 250, if (!req.Contents) return out;
ContinuationToken: nextContinuationToken, nextContinuationToken = req.NextContinuationToken;
});
if (!req.Contents) return out;
nextContinuationToken = req.NextContinuationToken;
for (const f of req.Contents) { for (const f of req.Contents) {
if (!f.Key) continue; if (!f.Key) continue;
f.Key = f.Key.replace(join(this.repoPath(repoId), dir), ""); f.Key = f.Key.replace(join(this.repoPath(repoId), dir), "");
out.push( out.push(
new FileModel({ new FileModel({
name: basename(f.Key), name: basename(f.Key),
path: dirname(f.Key), path: dirname(f.Key),
repoId, repoId,
size: f.Size, size: f.Size,
sha: f.ETag, sha: f.ETag,
}) })
); );
} }
} while (req && req.Contents && req.IsTruncated); } while (req && req.Contents && req.IsTruncated);
return out; return out;
} finally {
span.end();
}
} }
/** @override */ /** @override */
@@ -293,8 +243,6 @@ export default class S3Storage extends StorageBase {
source?: string source?: string
): Promise<void> { ): Promise<void> {
let toS3: ArchiveStreamToS3; let toS3: ArchiveStreamToS3;
const span = trace.getTracer("ano-file").startSpan("s3.extractZip");
span.setAttribute("path", path);
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
if (!config.S3_BUCKET) return reject("S3_BUCKET not set"); if (!config.S3_BUCKET) return reject("S3_BUCKET not set");
toS3 = new ArchiveStreamToS3({ toS3 = new ArchiveStreamToS3({
@@ -315,14 +263,11 @@ export default class S3Storage extends StorageBase {
}); });
pipeline(data, toS3, (err) => { pipeline(data, toS3, (err) => {
if (err) { if (err) {
span.recordException(err as Error);
return reject(err); return reject(err);
} }
span.end();
resolve(); resolve();
}) })
.on("finish", () => { .on("finish", () => {
span.end();
resolve(); resolve();
}) })
.on("error", reject); .on("error", reject);
@@ -338,48 +283,41 @@ export default class S3Storage extends StorageBase {
fileTransformer?: (p: string) => Transform; fileTransformer?: (p: string) => Transform;
} }
) { ) {
const span = trace.getTracer("ano-file").startSpan("s3.archive"); if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
span.setAttribute("repoId", repoId); const archive = archiver(opt?.format || "zip", {});
span.setAttribute("path", dir); if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const archive = archiver(opt?.format || "zip", {});
if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
let req: ListObjectsV2CommandOutput; let req: ListObjectsV2CommandOutput;
let nextContinuationToken: string | undefined; let nextContinuationToken: string | undefined;
do { do {
req = await this.client(30000).listObjectsV2({ req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET, Bucket: config.S3_BUCKET,
Prefix: join(this.repoPath(repoId), dir), Prefix: join(this.repoPath(repoId), dir),
MaxKeys: 250, MaxKeys: 250,
ContinuationToken: nextContinuationToken, ContinuationToken: nextContinuationToken,
}); });
nextContinuationToken = req.NextContinuationToken; nextContinuationToken = req.NextContinuationToken;
for (const f of req.Contents || []) { for (const f of req.Contents || []) {
if (!f.Key) continue; if (!f.Key) continue;
const filename = basename(f.Key); const filename = basename(f.Key);
const prefix = dirname( const prefix = dirname(
f.Key.replace(join(this.repoPath(repoId), dir), "") f.Key.replace(join(this.repoPath(repoId), dir), "")
); );
let rs = await this.read(repoId, f.Key); let rs = await this.read(repoId, f.Key);
if (opt?.fileTransformer) { if (opt?.fileTransformer) {
// apply transformation on the stream // apply transformation on the stream
rs = rs.pipe(opt.fileTransformer(f.Key)); rs = rs.pipe(opt.fileTransformer(f.Key));
}
archive.append(rs, {
name: filename,
prefix,
});
} }
} while (req && req.Contents?.length && req.IsTruncated);
archive.finalize(); archive.append(rs, {
return archive; name: filename,
} finally { prefix,
span.end(); });
} }
} while (req && req.Contents?.length && req.IsTruncated);
archive.finalize();
return archive;
} }
} }
@@ -1,4 +1,3 @@
import { Exception, trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq"; import { SandboxedJob } from "bullmq";
import { config } from "dotenv"; import { config } from "dotenv";
config(); config();
@@ -14,8 +13,6 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>; connect: () => Promise<void>;
getRepository: typeof getRepositoryImport; getRepository: typeof getRepositoryImport;
} = require("../../server/database"); } = require("../../server/database");
const span = trace.getTracer("ano-file").startSpan("proc.downloadRepository");
span.setAttribute("repoId", job.data.repoId);
console.log(`[QUEUE] ${job.data.repoId} is going to be downloaded`); console.log(`[QUEUE] ${job.data.repoId} is going to be downloaded`);
let statusInterval: any = null; let statusInterval: any = null;
await connect(); await connect();
@@ -58,17 +55,14 @@ export default async function (job: SandboxedJob<Repository, void>) {
} catch (error) { } catch (error) {
updateProgress({ status: "error" }); updateProgress({ status: "error" });
if (error instanceof Error) { if (error instanceof Error) {
span.recordException(error as Exception);
await repo.updateStatus(RepositoryStatus.ERROR, error.message); await repo.updateStatus(RepositoryStatus.ERROR, error.message);
} else if (typeof error === "string") { } else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error); await repo.updateStatus(RepositoryStatus.ERROR, error);
span.recordException(error);
} }
throw error; throw error;
} }
} catch (error: any) { } catch (error: any) {
clearInterval(statusInterval); clearInterval(statusInterval);
span.recordException(error as Exception);
console.log(`[QUEUE] ${job.data.repoId} is finished with an error`, error); console.log(`[QUEUE] ${job.data.repoId} is finished with an error`, error);
setTimeout(async () => { setTimeout(async () => {
// delay to avoid double saving // delay to avoid double saving
@@ -78,6 +72,5 @@ export default async function (job: SandboxedJob<Repository, void>) {
}, 400); }, 400);
} finally { } finally {
clearInterval(statusInterval); clearInterval(statusInterval);
span.end();
} }
} }
+2 -11
View File
@@ -1,4 +1,3 @@
import { Exception, trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq"; import { SandboxedJob } from "bullmq";
import Repository from "../../core/Repository"; import Repository from "../../core/Repository";
import { getRepository as getRepositoryImport } from "../../server/database"; import { getRepository as getRepositoryImport } from "../../server/database";
@@ -11,24 +10,16 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>; connect: () => Promise<void>;
getRepository: typeof getRepositoryImport; getRepository: typeof getRepositoryImport;
} = require("../../server/database"); } = require("../../server/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeCache");
span.setAttribute("repoId", job.data.repoId);
try { try {
await connect(); await connect();
console.log( console.log(
`[QUEUE] Cache of ${job.data.repoId} is going to be removed...` `[QUEUE] Cache of ${job.data.repoId} is going to be removed...`
); );
const repo = await getRepository(job.data.repoId); const repo = await getRepository(job.data.repoId);
try { await repo.removeCache();
await repo.removeCache();
} catch (error) {
span.recordException(error as Exception);
throw error;
}
} catch (error) { } catch (error) {
span.recordException(error as Exception); // error already handled
} finally { } finally {
console.log(`[QUEUE] Cache of ${job.data.repoId} is removed.`); console.log(`[QUEUE] Cache of ${job.data.repoId} is removed.`);
span.end();
} }
} }
+1 -6
View File
@@ -1,4 +1,3 @@
import { trace } from "@opentelemetry/api";
import { SandboxedJob } from "bullmq"; import { SandboxedJob } from "bullmq";
import Repository from "../../core/Repository"; import Repository from "../../core/Repository";
import { getRepository as getRepositoryImport } from "../../server/database"; import { getRepository as getRepositoryImport } from "../../server/database";
@@ -12,8 +11,6 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>; connect: () => Promise<void>;
getRepository: typeof getRepositoryImport; getRepository: typeof getRepositoryImport;
} = require("../../server/database"); } = require("../../server/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeRepository");
span.setAttribute("repoId", job.data.repoId);
try { try {
await connect(); await connect();
console.log(`[QUEUE] ${job.data.repoId} is going to be removed`); console.log(`[QUEUE] ${job.data.repoId} is going to be removed`);
@@ -27,13 +24,11 @@ export default async function (job: SandboxedJob<Repository, void>) {
} else if (typeof error === "string") { } else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error); await repo.updateStatus(RepositoryStatus.ERROR, error);
} }
span.recordException(error as Error);
throw error; throw error;
} }
} catch (error) { } catch (error) {
span.recordException(error as Error); // error already handled
} finally { } finally {
console.log(`[QUEUE] ${job.data.repoId} is removed`); console.log(`[QUEUE] ${job.data.repoId} is removed`);
span.end();
} }
} }