feat: adds opentelemetry support

This commit is contained in:
tdurieux
2024-03-27 11:17:56 +00:00
parent 803720e2ea
commit 0caf786c9c
24 changed files with 4522 additions and 1187 deletions

View File

@@ -11,7 +11,6 @@ COPY package.json .
COPY package-lock.json .
COPY tsconfig.json .
COPY ecosystem.config.js .
COPY healthcheck.js .
COPY src ./src
@@ -20,6 +19,6 @@ COPY index.ts .
COPY config.ts .
RUN npm install && npm run build && npm cache clean --force
COPY opentelemetry.js .
CMD [ "pm2-runtime", "ecosystem.config.js"]
CMD [ "node", "--require", "./opentelemetry.js", "./build/index.js"]

View File

@@ -7,7 +7,9 @@ services:
image: tdurieux/anonymous_github:v2
env_file:
- ./.env
environment:
volumes:
- ./repositories:/app/build/repositories/
environment:
- REDIS_HOSTNAME=redis
- DB_HOSTNAME=mongodb
ports:
@@ -23,6 +25,7 @@ services:
links:
- mongodb
- redis
- opentelemetry
redis:
image: "redis:alpine"
@@ -35,7 +38,7 @@ services:
interval: 10s
timeout: 10s
retries: 5
mongodb:
image: mongo:latest
restart: on-failure
@@ -44,6 +47,8 @@ services:
MONGO_INITDB_ROOT_PASSWORD: $DB_PASSWORD
volumes:
- mongodb_data_container:/data/db
ports:
- 127.0.0.1:27017:27017
command: --quiet
healthcheck:
test:
@@ -55,6 +60,30 @@ services:
timeout: 10s
retries: 5
opentelemetry:
image: otel/opentelemetry-collector
restart: always
command: ["--config=/etc/otel-collector-config.yaml"]
volumes:
- ./opentelemetry-collector.yml:/etc/otel-collector-config.yaml
depends_on:
- jaeger
- prometheus
jaeger:
image: jaegertracing/all-in-one:latest
restart: always
ports:
- 127.0.0.1:9411:9411
prometheus:
image: prom/prometheus:latest
restart: always
volumes:
- ./prometheus.yaml:/etc/prometheus/prometheus.yml
ports:
- 127.0.0.1:9090:9090
mongodb-backup:
image: tiredofit/db-backup
links:

View File

@@ -1,20 +0,0 @@
module.exports = {
apps: [
{
name: "AnonymousGitHub",
script: "build/index.js",
exec_mode: "fork",
watch: false,
ignore_watch: [
"node_modules",
"repositories",
"repo",
"public",
".git",
"db_backups",
"build",
],
interpreter: "node",
},
],
};

153
import.js
View File

@@ -1,153 +0,0 @@
const fs = require("fs").promises;
const ofs = require("fs");
const path = require("path");
const gh = require("parse-github-url");
const { Octokit } = require("@octokit/rest");
const config = require("./config");
const db = require("./utils/database");
const repoUtils = require("./utils/repository");
const fileUtils = require("./utils/file");
const githubUtils = require("./utils/github");
// const ROOT = "./repositories";
const ROOT = "./repo";
(async () => {
await db.connect();
const repositories = await fs.readdir(ROOT);
let index = 0;
for (let repo of repositories) {
// for (let repo of ["14bfc5c6-b794-487e-a58a-c54103a93c7b"]) {
console.log("Import ", index++, "/", repositories.length, " ", repo);
try {
const conf = await repoUtils.getConfig(repo);
if (conf) {
continue;
}
// const repoPath = path.join("./repositories", repo);
const repoPath = path.join(ROOT, repo);
const configPath = path.join(repoPath, "config.json");
if (!ofs.existsSync(configPath)) {
continue;
}
const repoConfig = JSON.parse(await fs.readFile(configPath));
const r = gh(repoConfig.repository);
if (r == null) {
console.log(`${repoConfig.repository} is not a valid github url.`);
continue;
}
const fullName = `${r.owner}/${r.name}`;
// const octokit = new Octokit({ auth: config.GITHUB_TOKEN });
// try {
// await octokit.apps.checkToken({
// client_id: config.CLIENT_ID,
// access_token: repoConfig.token,
// });
// } catch (error) {
// delete repoConfig.token;
// continue
// }
let token = repoConfig.token;
if (!token) {
token = config.GITHUB_TOKEN;
}
const branches = await repoUtils.getRepoBranches({
fullName,
token,
});
const details = await repoUtils.getRepoDetails({
fullName,
token,
});
let branch = details.default_branch;
if (r.branch && branches[r.branch]) {
branch = r.branch;
}
if (!branches[branch]) {
console.log(branch, details.default_branch, branches);
}
let commit = branches[branch].commit.sha;
const anonymizeDate = new Date();
let mode = "stream";
// if (details.size < 1024) {
// mode = "download";
// }
let expirationDate = null;
if (repoConfig.expiration_date) {
expirationDate = new Date(repoConfig.expiration_date["$date"]);
}
const expirationMode = repoConfig.expiration
? repoConfig.expiration
: "never";
const repoConfiguration = {
repoId: repo,
fullName,
// owner: "tdurieux",
owner: r.owner,
terms: repoConfig.terms,
repository: repoConfig.repository,
token: repoConfig.token,
branch,
commit,
anonymizeDate,
options: {
image: false,
mode,
expirationMode,
expirationDate,
update: true,
page: details.has_pages,
pdf: false,
notebook: true,
loc: false,
link: true,
},
};
await db.get("anonymized_repositories").updateOne(
{
repoId: repo,
},
{
$set: repoConfiguration,
},
{ upsert: true }
);
if (ofs.existsSync(repoUtils.getOriginalPath(repo))) {
await fs.rm(repoUtils.getOriginalPath(repo), {
recursive: true,
force: true,
});
}
if (ofs.existsSync(repoUtils.getAnonymizedPath(repo))) {
await fs.rm(repoUtils.getAnonymizedPath(repo), {
recursive: true,
force: true,
});
}
// await githubUtils.downloadRepoAndAnonymize(repoConfiguration);
// await fileUtils.getFileList({ repoConfig: repoConfiguration });
await repoUtils.updateStatus(repoConfiguration, "ready");
console.log(
expirationDate,
expirationDate != null && expirationDate < new Date(),
expirationDate < new Date()
);
if (
expirationMode != "never" &&
expirationDate != null &&
expirationDate < new Date()
) {
await repoUtils.updateStatus(repoConfiguration, "expired");
}
} catch (error) {
console.error(error);
}
}
await db.close();
})();

View File

@@ -1,217 +0,0 @@
require("dotenv").config();
import * as mongoose from "mongoose";
import config from "./config";
import * as database from "./src/database/database";
import RepositoryModel from "./src/database/repositories/repositories.model";
import AnonymizedRepositoryModel from "./src/database/anonymizedRepositories/anonymizedRepositories.model";
import UserModel from "./src/database/users/users.model";
const MONGO_URL = `mongodb://${config.DB_USERNAME}:${config.DB_PASSWORD}@${config.DB_HOSTNAME}:27017/`;
async function connect(db) {
const t = new mongoose.Mongoose();
t.set("useNewUrlParser", true);
t.set("useFindAndModify", true);
t.set("useUnifiedTopology", true);
const database = t.connection;
await t.connect(MONGO_URL + db, {
authSource: "admin",
useCreateIndex: true,
useFindAndModify: true,
});
return database;
}
(async () => {
await database.connect();
const oldDB = await connect("anonymous_github");
console.log("Import Users");
let index = 0;
const userQuery = oldDB.collection("users").find();
const totalUser = await userQuery.count();
while (await userQuery.hasNext()) {
const r = await userQuery.next();
index++;
console.log(`Import User [${index}/${totalUser}]: ${r.username}`);
const newRepos = [];
const allRepoIds = [];
if (r.repositories) {
const finds = await RepositoryModel.find({
externalId: {
$in: r.repositories.map((repo) => "gh_" + repo.id),
},
}).select("externalId");
finds.forEach((f) => allRepoIds.push(f.id));
const repoIds = new Set<string>();
const toInsert = r.repositories.filter((f) => {
if (repoIds.has(f.id)) return false;
repoIds.add(f.id);
const externalId = "gh_" + f.id;
return finds.filter((f) => f.externalId == externalId).length == 0;
});
for (const repo of toInsert) {
newRepos.push(
new RepositoryModel({
externalId: "gh_" + repo.id,
name: repo.full_name,
url: repo.html_url,
size: repo.size,
defaultBranch: repo.default_branch,
})
);
}
if (newRepos.length > 0) {
await RepositoryModel.insertMany(newRepos);
}
newRepos.forEach((f) => allRepoIds.push(f.id));
}
const user = new UserModel({
accessTokens: {
github: r.accessToken,
},
externalIDs: {
github: r.profile.id,
},
username: r.username,
emails: r.profile.emails?.map((email) => {
return { email: email.value, default: false };
}),
photo: r.profile.photos[0]?.value,
repositories: allRepoIds,
default: {
terms: r.default?.terms,
options: r.default?.options,
},
});
if (user.emails?.length) user.emails[0].default = true;
await user.save();
}
console.log("Import Repositories");
const repoQuery = oldDB.collection("repositories").find();
const totalRepository = await repoQuery.count();
index = 0;
while (await repoQuery.hasNext()) {
const r = await repoQuery.next();
if (!r.id) continue;
index++;
console.log(
`Import Repository [${index}/${totalRepository}]: ${r.fullName}`
);
let find = await RepositoryModel.findOne({
externalId: "gh_" + r.id,
});
if (find == null) {
find = new RepositoryModel({
externalId: "gh_" + r.id,
name: r.fullName,
url: r.html_url,
size: r.size,
defaultBranch: r.default_branch,
});
}
if (r.branches) {
const branches = [...Object.values(r.branches)].map((b: any) => {
const o: any = { name: b.name, commit: b.commit.sha };
if (b.name == find.defaultBranch) {
o.readme = r.readme;
}
return o;
});
find.branches = branches;
}
await find.save();
}
console.log("Import Anonymized Repositories");
const anoQuery = oldDB.collection("anonymized_repositories").find();
const totalAno = await anoQuery.count();
index = 0;
while (await anoQuery.hasNext()) {
const r = await anoQuery.next();
index++;
console.log(
`Import Anonymized Repository [${index}/${totalAno}]: ${r.repoId}`
);
let repo = await RepositoryModel.findOne({ name: r.fullName });
if (repo == null) {
const tmp = await oldDB
.collection("repositories")
.findOne({ fullName: r.fullName });
if (tmp) {
repo = await RepositoryModel.findOne({ externalId: "gh_" + tmp.id });
} else {
console.error(`Repository ${r.fullName} is not found (renamed)`);
}
}
let size = { storage: 0, file: 0 };
function recursiveCount(files) {
const out = { storage: 0, file: 0 };
for (const name in files) {
const file = files[name];
if (file.size && file.sha && parseInt(file.size) == file.size) {
out.storage += file.size as number;
out.file++;
} else if (typeof file == "object") {
const r = recursiveCount(file);
out.storage += r.storage;
out.file += r.file;
}
}
return out;
}
if (r.originalFiles) {
size = recursiveCount(r.originalFiles);
}
const owner = await UserModel.findOne({ username: r.owner }).select("_id");
await new AnonymizedRepositoryModel({
repoId: r.repoId,
status: r.status,
anonymizeDate: r.anonymizeDate,
lastView: r.lastView,
pageView: r.pageView,
owner: owner?.id,
size,
source: {
accessToken: r.token,
type: r.options.mode == "download" ? "GitHubDownload" : "GitHubStream",
branch: r.branch,
commit: r.commit,
repositoryId: repo?.id,
repositoryName: r.fullName,
},
options: {
terms: r.terms,
expirationMode: r.options.expirationMode,
expirationDate: r.options.expirationDate
? new Date(r.options.expirationDate)
: null,
update: r.options.update,
image: r.options.image,
pdf: r.options.pdf,
notebook: r.options.notebook,
loc: r.options.loc,
link: r.options.link,
page: r.options.page,
pageSource: r.options.pageSource,
},
}).save();
}
console.log("Import finished!");
setTimeout(() => process.exit(), 5000);
})();

View File

@@ -0,0 +1,40 @@
receivers:
otlp:
protocols:
grpc:
exporters:
prometheus:
endpoint: "0.0.0.0:8889"
const_labels:
label1: value1
debug:
otlp:
endpoint: jaeger:4317
tls:
insecure: true
processors:
batch:
extensions:
health_check:
pprof:
endpoint: :1888
zpages:
endpoint: :55679
service:
extensions: [health_check, pprof, zpages]
pipelines:
traces:
receivers: [otlp]
exporters: [debug, otlp]
metrics:
receivers: [otlp]
exporters: [debug, prometheus]
logs:
receivers: [otlp]
exporters: [debug]

29
opentelemetry.js Normal file
View File

@@ -0,0 +1,29 @@
const opentelemetry = require("@opentelemetry/sdk-node");
const {
getNodeAutoInstrumentations,
} = require("@opentelemetry/auto-instrumentations-node");
const {
OTLPTraceExporter,
} = require("@opentelemetry/exporter-trace-otlp-grpc");
const {
OTLPMetricExporter,
} = require("@opentelemetry/exporter-metrics-otlp-grpc");
const { PeriodicExportingMetricReader } = require("@opentelemetry/sdk-metrics");
const { diag, DiagConsoleLogger, DiagLogLevel } = require("@opentelemetry/api");
// diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO);
const sdk = new opentelemetry.NodeSDK({
serviceName: "Anonymous-GitHub",
logRecordProcessor: getNodeAutoInstrumentations().logRecordProcessor,
traceExporter: new OTLPTraceExporter({
url: "http://opentelemetry:4317/v1/traces",
}),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({
url: "http://opentelemetry:4317/v1/metrics",
}),
}),
instrumentations: [getNodeAutoInstrumentations()],
});
sdk.start();

3366
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -35,6 +35,15 @@
"@octokit/oauth-app": "^6.0.0",
"@octokit/plugin-paginate-rest": "^8.0.0",
"@octokit/rest": "^20.0.1",
"@opentelemetry/api": "^1.8.0",
"@opentelemetry/auto-instrumentations-node": "^0.43.0",
"@opentelemetry/exporter-metrics-otlp-grpc": "^0.49.1",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.49.1",
"@opentelemetry/exporter-trace-otlp-grpc": "^0.49.1",
"@opentelemetry/exporter-trace-otlp-proto": "^0.49.1",
"@opentelemetry/sdk-metrics": "^1.22.0",
"@opentelemetry/sdk-node": "^0.49.1",
"@opentelemetry/sdk-trace-node": "^1.22.0",
"@pm2/io": "^5.0.0",
"archiver": "^5.3.1",
"bullmq": "^2.3.2",

6
prometheus.yaml Normal file
View File

@@ -0,0 +1,6 @@
scrape_configs:
- job_name: 'otel-collector'
scrape_interval: 10s
static_configs:
- targets: ['opentelemetry:8889']
- targets: ['opentelemetry:8888']

View File

@@ -1,6 +1,7 @@
import { join, basename } from "path";
import { Response } from "express";
import { Readable } from "stream";
import { trace } from "@opentelemetry/api";
import Repository from "./Repository";
import { FILE_TYPE, Tree, TreeElement, TreeFile } from "./types";
import storage from "./storage";
@@ -13,6 +14,7 @@ import {
import AnonymousError from "./AnonymousError";
import { handleError } from "./routes/route-utils";
import { lookup } from "mime-types";
import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model";
/**
* Represent a file in a anonymized repository
@@ -36,9 +38,16 @@ export default class AnonymizedFile {
}
async sha() {
if (this._sha) return this._sha.replace(/"/g, "");
await this.originalPath();
return this._sha?.replace(/"/g, "");
return trace.getTracer("ano-file").startActiveSpan("sha", async (span) => {
try {
span.setAttribute("anonymizedPath", this.anonymizedPath);
if (this._sha) return this._sha.replace(/"/g, "");
await this.originalPath();
return this._sha?.replace(/"/g, "");
} finally {
span.end();
}
});
}
/**
@@ -47,89 +56,102 @@ export default class AnonymizedFile {
* @returns the origin relative path of the file
*/
async originalPath(): Promise<string> {
if (this._originalPath) return this._originalPath;
if (!this.anonymizedPath)
throw new AnonymousError("path_not_specified", {
object: this,
httpStatus: 400,
});
const paths = this.anonymizedPath.trim().split("/");
let currentOriginal = (await this.repository.files({
force: false,
})) as TreeElement;
let currentOriginalPath = "";
for (let i = 0; i < paths.length; i++) {
const fileName = paths[i];
if (fileName == "") {
continue;
}
if (!(currentOriginal as Tree)[fileName]) {
// anonymize all the file in the folder and check if there is one that match the current filename
const options = [];
for (let originalFileName in currentOriginal) {
if (
anonymizePath(originalFileName, this.repository.options.terms) ==
fileName
) {
options.push(originalFileName);
return trace
.getTracer("ano-file")
.startActiveSpan("originalPath", async (span) => {
try {
span.setAttribute("anonymizedPath", this.anonymizedPath);
if (this._originalPath) return this._originalPath;
if (!this.anonymizedPath) {
throw new AnonymousError("path_not_specified", {
object: this,
httpStatus: 400,
});
}
}
// if only one option we found the original filename
if (options.length == 1) {
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
} else if (options.length == 0) {
throw new AnonymousError("file_not_found", {
object: this,
httpStatus: 404,
});
} else {
const nextName = paths[i + 1];
if (!nextName) {
// if there is no next name we can't find the file and we return the first option
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
}
let found = false;
for (const option of options) {
const optionTree = (currentOriginal as Tree)[option];
if ((optionTree as Tree).child) {
const optionTreeChild = (optionTree as Tree).child;
if ((optionTreeChild as Tree)[nextName]) {
currentOriginalPath = join(currentOriginalPath, option);
currentOriginal = optionTreeChild;
found = true;
break;
let currentOriginal = (await this.repository.files({
force: false,
})) as TreeElement;
const paths = this.anonymizedPath.trim().split("/");
let currentOriginalPath = "";
for (let i = 0; i < paths.length; i++) {
const fileName = paths[i];
if (fileName == "") {
continue;
}
if (!(currentOriginal as Tree)[fileName]) {
// anonymize all the file in the folder and check if there is one that match the current filename
const options = [];
for (let originalFileName in currentOriginal) {
if (
anonymizePath(
originalFileName,
this.repository.options.terms
) == fileName
) {
options.push(originalFileName);
}
}
// if only one option we found the original filename
if (options.length == 1) {
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
} else if (options.length == 0) {
throw new AnonymousError("file_not_found", {
object: this,
httpStatus: 404,
});
} else {
const nextName = paths[i + 1];
if (!nextName) {
// if there is no next name we can't find the file and we return the first option
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
}
let found = false;
for (const option of options) {
const optionTree = (currentOriginal as Tree)[option];
if ((optionTree as Tree).child) {
const optionTreeChild = (optionTree as Tree).child;
if ((optionTreeChild as Tree)[nextName]) {
currentOriginalPath = join(currentOriginalPath, option);
currentOriginal = optionTreeChild;
found = true;
break;
}
}
}
if (!found) {
// if we didn't find the next name we return the first option
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
}
}
} else {
currentOriginalPath = join(currentOriginalPath, fileName);
currentOriginal = (currentOriginal as Tree)[fileName];
}
}
if (!found) {
// if we didn't find the next name we return the first option
currentOriginalPath = join(currentOriginalPath, options[0]);
currentOriginal = (currentOriginal as Tree)[options[0]];
if (
currentOriginal.sha === undefined ||
currentOriginal.size === undefined
) {
throw new AnonymousError("folder_not_supported", { object: this });
}
const file = currentOriginal as TreeFile;
this.fileSize = file.size;
this._sha = file.sha;
this._originalPath = currentOriginalPath;
return this._originalPath;
} finally {
span.end();
}
} else {
currentOriginalPath = join(currentOriginalPath, fileName);
currentOriginal = (currentOriginal as Tree)[fileName];
}
}
if (
currentOriginal.sha === undefined ||
currentOriginal.size === undefined
) {
throw new AnonymousError("folder_not_supported", { object: this });
}
const file = currentOriginal as TreeFile;
this.fileSize = file.size;
this._sha = file.sha;
this._originalPath = currentOriginalPath;
return this._originalPath;
});
}
extension() {
const filename = basename(this.anonymizedPath);
@@ -154,6 +176,7 @@ export default class AnonymizedFile {
"heic",
].includes(extension);
}
isFileSupported() {
const extension = this.extension();
if (!this.repository.options.pdf && extension == "pdf") {
@@ -166,29 +189,48 @@ export default class AnonymizedFile {
}
async content(): Promise<Readable> {
if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
await this.originalPath();
}
if (this.fileSize && this.fileSize > config.MAX_FILE_SIZE) {
throw new AnonymousError("file_too_big", {
object: this,
httpStatus: 403,
return trace
.getTracer("ano-file")
.startActiveSpan("content", async (span) => {
try {
span.setAttribute("anonymizedPath", this.anonymizedPath);
if (this.anonymizedPath.includes(config.ANONYMIZATION_MASK)) {
await this.originalPath();
}
span.addEvent("originalPath", { originalPath: this._originalPath });
if (this.fileSize && this.fileSize > config.MAX_FILE_SIZE) {
throw new AnonymousError("file_too_big", {
object: this,
httpStatus: 403,
});
}
const exist = await storage.exists(this.originalCachePath);
span.addEvent("file_exist", { exist });
if (exist == FILE_TYPE.FILE) {
return storage.read(this.originalCachePath);
} else if (exist == FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
object: this,
httpStatus: 400,
});
}
return await this.repository.source?.getFileContent(this);
} finally {
span.end();
}
});
}
const exist = await storage.exists(this.originalCachePath);
if (exist == FILE_TYPE.FILE) {
return storage.read(this.originalCachePath);
} else if (exist == FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
object: this,
httpStatus: 400,
});
}
return await this.repository.source?.getFileContent(this);
}
async anonymizedContent() {
return (await this.content()).pipe(new AnonymizeTransformer(this));
return trace
.getTracer("ano-file")
.startActiveSpan("anonymizedContent", async (span) => {
span.setAttribute("anonymizedPath", this.anonymizedPath);
const content = await this.content();
return content.pipe(new AnonymizeTransformer(this)).on("close", () => {
span.end();
});
});
}
get originalCachePath() {
@@ -212,55 +254,60 @@ export default class AnonymizedFile {
}
async send(res: Response): Promise<void> {
return new Promise(async (resolve, reject) => {
try {
const content = await this.content();
const mime = lookup(this.anonymizedPath);
if (mime && this.extension() != "ts") {
res.contentType(mime);
} else if (isTextFile(this.anonymizedPath)) {
res.contentType("text/plain");
}
res.header("Accept-Ranges", "none");
let fileInfo: Awaited<ReturnType<typeof storage.fileInfo>>;
return trace.getTracer("ano-file").startActiveSpan("send", async (span) => {
span.setAttribute("anonymizedPath", this.anonymizedPath);
return new Promise<void>(async (resolve, reject) => {
try {
fileInfo = await storage.fileInfo(this.originalCachePath);
} catch (error) {
// unable to get file size
console.error(error);
}
const anonymizer = new AnonymizeTransformer(this);
anonymizer.once("transform", (data) => {
if (data.isText && !mime) {
const content = await this.content();
const mime = lookup(this.anonymizedPath);
if (mime && this.extension() != "ts") {
res.contentType(mime);
} else if (isTextFile(this.anonymizedPath)) {
res.contentType("text/plain");
}
if (fileInfo?.size && !data.wasAnonimized) {
// the text files may be anonymized and therefore the size may be different
res.header("Content-Length", fileInfo.size.toString());
res.header("Accept-Ranges", "none");
let fileInfo: Awaited<ReturnType<typeof storage.fileInfo>>;
try {
fileInfo = await storage.fileInfo(this.originalCachePath);
} catch (error) {
// unable to get file size
console.error(error);
}
});
const anonymizer = new AnonymizeTransformer(this);
content
.pipe(anonymizer)
.pipe(res)
.on("close", () => {
if (!content.closed && !content.destroyed) {
content.destroy();
anonymizer.once("transform", (data) => {
if (data.isText && !mime) {
res.contentType("text/plain");
}
resolve();
})
.on("error", (error) => {
if (!content.closed && !content.destroyed) {
content.destroy();
if (fileInfo?.size && !data.wasAnonimized) {
// the text files may be anonymized and therefore the size may be different
res.header("Content-Length", fileInfo.size.toString());
}
reject(error);
handleError(error, res);
});
} catch (error) {
handleError(error, res);
}
content
.pipe(anonymizer)
.pipe(res)
.on("close", () => {
if (!content.closed && !content.destroyed) {
content.destroy();
}
span.end();
resolve();
})
.on("error", (error) => {
if (!content.closed && !content.destroyed) {
content.destroy();
}
span.recordException(error);
span.end();
reject(error);
handleError(error, res);
});
} catch (error) {
handleError(error, res);
}
});
});
}
}

View File

@@ -27,6 +27,7 @@ import AnonymizedFile from "./AnonymizedFile";
import AnonymizedRepositoryModel from "./database/anonymizedRepositories/anonymizedRepositories.model";
import { getRepositoryFromGitHub } from "./source/GitHubRepository";
import config from "../config";
import { trace } from "@opentelemetry/api";
function anonymizeTreeRecursive(
tree: TreeElement,
@@ -98,6 +99,7 @@ export default class Repository {
}
): Promise<Tree> {
const terms = this._model.options.terms || [];
if (terms.length === 0) return this.files(opt);
return anonymizeTreeRecursive(await this.files(opt), terms, opt) as Tree;
}
@@ -108,25 +110,31 @@ export default class Repository {
* @returns The file tree
*/
async files(opt: { force?: boolean } = { force: false }): Promise<Tree> {
if (!this._model.originalFiles && !opt.force) {
const res = await AnonymizedRepositoryModel.findById(this._model._id, {
originalFiles: 1,
});
if (!res) throw new AnonymousError("repository_not_found");
this.model.originalFiles = res.originalFiles;
const span = trace.getTracer("ano-file").startSpan("Repository.files");
span.setAttribute("repoId", this.repoId);
try {
if (!this._model.originalFiles && !opt.force) {
const res = await AnonymizedRepositoryModel.findById(this._model._id, {
originalFiles: 1,
});
if (!res) throw new AnonymousError("repository_not_found");
this.model.originalFiles = res.originalFiles;
}
if (
this._model.originalFiles &&
Object.getOwnPropertyNames(this._model.originalFiles).length !== 0 &&
!opt.force
) {
return this._model.originalFiles;
}
const files = await this.source.getFiles();
this._model.originalFiles = files;
this._model.size = { storage: 0, file: 0 };
await this.computeSize();
return files;
} finally {
span.end();
}
if (
this._model.originalFiles &&
Object.getOwnPropertyNames(this._model.originalFiles).length !== 0 &&
!opt.force
) {
return this._model.originalFiles;
}
const files = await this.source.getFiles();
this._model.originalFiles = files;
this._model.size = { storage: 0, file: 0 };
await this.computeSize();
return files;
}
/**
@@ -191,6 +199,10 @@ export default class Repository {
* @returns void
*/
async updateIfNeeded(opt?: { force: boolean }): Promise<void> {
const span = trace
.getTracer("ano-file")
.startSpan("Repository.updateIfNeeded");
span.setAttribute("repoId", this.repoId);
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
if (
@@ -212,6 +224,8 @@ export default class Repository {
this.status == RepositoryStatus.READY
) {
console.log(`[UPDATE] ${this._model.repoId} is up to date`);
span.setAttribute("status", "up_to_date");
span.end();
return;
}
this._model.source.commit = newCommit;
@@ -234,6 +248,8 @@ export default class Repository {
);
await this.updateStatus(RepositoryStatus.ERROR, "branch_not_found");
await this.resetSate();
span.setAttribute("status", "branch_not_found");
span.end();
throw new AnonymousError("branch_not_found", {
object: this,
});
@@ -267,6 +283,7 @@ export default class Repository {
});
}
}
span.end();
}
/**
* Download the require state for the repository to work
@@ -274,20 +291,32 @@ export default class Repository {
* @returns void
*/
async anonymize() {
if (this.status === RepositoryStatus.READY) return;
const span = trace.getTracer("ano-file").startSpan("Repository.anonymize");
span.setAttribute("repoId", this.repoId);
if (this.status === RepositoryStatus.READY) {
span.end();
return;
}
await this.updateStatus(RepositoryStatus.PREPARING);
await this.files();
return this.updateStatus(RepositoryStatus.READY);
await this.updateStatus(RepositoryStatus.READY);
span.end();
}
/**
* Update the last view and view count
*/
async countView() {
this._model.lastView = new Date();
this._model.pageView = (this._model.pageView || 0) + 1;
if (!isConnected) return this.model;
return this._model.save();
const span = trace.getTracer("ano-file").startSpan("Repository.countView");
span.setAttribute("repoId", this.repoId);
try {
this._model.lastView = new Date();
this._model.pageView = (this._model.pageView || 0) + 1;
if (!isConnected) return this.model;
return this._model.save();
} finally {
span.end();
}
}
/**
@@ -296,36 +325,54 @@ export default class Repository {
* @param errorMessage a potential error message to display
*/
async updateStatus(status: RepositoryStatus, statusMessage?: string) {
if (!status) return this.model;
this._model.status = status;
this._model.statusDate = new Date();
this._model.statusMessage = statusMessage;
if (!isConnected) return this.model;
return this._model.save();
const span = trace
.getTracer("ano-file")
.startSpan("Repository.updateStatus");
span.setAttribute("repoId", this.repoId);
span.setAttribute("status", status);
span.setAttribute("statusMessage", statusMessage || "");
try {
if (!status) return this.model;
this._model.status = status;
this._model.statusDate = new Date();
this._model.statusMessage = statusMessage;
if (!isConnected) return this.model;
return this._model.save();
} finally {
span.end();
}
}
/**
* Expire the repository
*/
async expire() {
const span = trace.getTracer("ano-file").startSpan("Repository.expire");
span.setAttribute("repoId", this.repoId);
await this.updateStatus(RepositoryStatus.EXPIRING);
await this.resetSate();
await this.updateStatus(RepositoryStatus.EXPIRED);
span.end();
}
/**
* Remove the repository
*/
async remove() {
const span = trace.getTracer("ano-file").startSpan("Repository.remove");
span.setAttribute("repoId", this.repoId);
await this.updateStatus(RepositoryStatus.REMOVING);
await this.resetSate();
await this.updateStatus(RepositoryStatus.REMOVED);
span.end();
}
/**
* Reset/delete the state of the repository
*/
async resetSate(status?: RepositoryStatus, statusMessage?: string) {
const span = trace.getTracer("ano-file").startSpan("Repository.resetState");
span.setAttribute("repoId", this.repoId);
// remove attribute
this._model.size = { storage: 0, file: 0 };
this._model.originalFiles = undefined;
@@ -335,6 +382,7 @@ export default class Repository {
// remove cache
await this.removeCache();
console.log(`[RESET] ${this._model.repoId} has been reset`);
span.end();
}
/**
@@ -342,12 +390,20 @@ export default class Repository {
* @returns
*/
async removeCache() {
this.model.isReseted = true;
await this.model.save();
if (
(await storage.exists(this._model.repoId + "/")) !== FILE_TYPE.NOT_FOUND
) {
return storage.rm(this._model.repoId + "/");
const span = trace
.getTracer("ano-file")
.startSpan("Repository.removeCache");
span.setAttribute("repoId", this.repoId);
try {
this.model.isReseted = true;
await this.model.save();
if (
(await storage.exists(this._model.repoId + "/")) !== FILE_TYPE.NOT_FOUND
) {
return storage.rm(this._model.repoId + "/");
}
} finally {
span.end();
}
}
@@ -366,28 +422,37 @@ export default class Repository {
*/
file: number;
}> {
if (this.status !== RepositoryStatus.READY) return { storage: 0, file: 0 };
if (this._model.size.file) return this._model.size;
function recursiveCount(files: Tree): { storage: number; file: number } {
const out = { storage: 0, file: 0 };
for (const name in files) {
const file = files[name];
if (file.size && parseInt(file.size.toString()) == file.size) {
out.storage += file.size as number;
out.file++;
} else if (typeof file == "object") {
const r = recursiveCount(file as Tree);
out.storage += r.storage;
out.file += r.file;
const span = trace
.getTracer("ano-file")
.startSpan("Repository.removeCache");
span.setAttribute("repoId", this.repoId);
try {
if (this.status !== RepositoryStatus.READY)
return { storage: 0, file: 0 };
if (this._model.size.file) return this._model.size;
function recursiveCount(files: Tree): { storage: number; file: number } {
const out = { storage: 0, file: 0 };
for (const name in files) {
const file = files[name];
if (file.size && parseInt(file.size.toString()) == file.size) {
out.storage += file.size as number;
out.file++;
} else if (typeof file == "object") {
const r = recursiveCount(file as Tree);
out.storage += r.storage;
out.file += r.file;
}
}
return out;
}
return out;
}
const files = await this.files();
this._model.size = recursiveCount(files);
await this._model.save();
return this._model.size;
const files = await this.files();
this._model.size = recursiveCount(files);
await this._model.save();
return this._model.size;
} finally {
span.end();
}
}
/**
@@ -396,14 +461,20 @@ export default class Repository {
* @returns conference of the repository
*/
async conference(): Promise<Conference | null> {
if (!this._model.conference) {
const span = trace.getTracer("ano-file").startSpan("Repository.conference");
span.setAttribute("repoId", this.repoId);
try {
if (!this._model.conference) {
return null;
}
const conference = await ConferenceModel.findOne({
conferenceID: this._model.conference,
});
if (conference) return new Conference(conference);
return null;
} finally {
span.end();
}
const conference = await ConferenceModel.findOne({
conferenceID: this._model.conference,
});
if (conference) return new Conference(conference);
return null;
}
/***** Getters ********/

View File

@@ -6,6 +6,7 @@ import Repository from "./Repository";
import { GitHubRepository } from "./source/GitHubRepository";
import PullRequest from "./PullRequest";
import AnonymizedPullRequestModel from "./database/anonymizedPullRequests/anonymizedPullRequests.model";
import { trace } from "@opentelemetry/api";
/**
* Model for a user
@@ -55,6 +56,10 @@ export default class User {
*/
force: boolean;
}): Promise<GitHubRepository[]> {
const span = trace
.getTracer("ano-file")
.startSpan("User.getGitHubRepositories");
span.setAttribute("username", this.username);
if (
!this._model.repositories ||
this._model.repositories.length == 0 ||
@@ -105,11 +110,14 @@ export default class User {
// have the model
await this._model.save();
span.end();
return repositories.map((r) => new GitHubRepository(r));
} else {
return (
const out = (
await RepositoryModel.find({ _id: { $in: this._model.repositories } })
).map((i) => new GitHubRepository(i));
span.end();
return out;
}
}
@@ -118,6 +126,8 @@ export default class User {
* @returns the list of anonymized repositories
*/
async getRepositories() {
const span = trace.getTracer("ano-file").startSpan("User.getRepositories");
span.setAttribute("username", this.username);
const repositories = (
await AnonymizedRepositoryModel.find(
{
@@ -141,6 +151,7 @@ export default class User {
}
}
await Promise.all(promises);
span.end();
return repositories;
}
/**
@@ -148,6 +159,8 @@ export default class User {
* @returns the list of anonymized repositories
*/
async getPullRequests() {
const span = trace.getTracer("ano-file").startSpan("User.getPullRequests");
span.setAttribute("username", this.username);
const pullRequests = (
await AnonymizedPullRequestModel.find({
owner: this.id,
@@ -166,6 +179,7 @@ export default class User {
}
}
await Promise.all(promises);
span.end();
return pullRequests;
}

View File

@@ -5,6 +5,7 @@ import { basename } from "path";
import { Transform } from "stream";
import { Readable } from "stream";
import AnonymizedFile from "./AnonymizedFile";
import { trace } from "@opentelemetry/api";
const urlRegex =
/<?\b((https?|ftp|file):\/\/)[-A-Za-z0-9+&@#/%?=~_|!:,.;]+[-A-Za-z0-9+&@#/%=~_|]\b\/?>?/g;
@@ -33,42 +34,51 @@ export function isTextFile(filePath: string, content?: Buffer) {
export class AnonymizeTransformer extends Transform {
public wasAnonimized = false;
public isText = false;
public isText: boolean | null = null;
constructor(private readonly file: AnonymizedFile) {
super();
this.isText = isTextFile(this.file.anonymizedPath);
}
_transform(chunk: Buffer, encoding: string, callback: () => void) {
const isText = isTextFile(this.file.anonymizedPath, chunk);
trace
.getTracer("ano-file")
.startActiveSpan("AnonymizeTransformer.transform", async (span) => {
span.setAttribute("path", this.file.anonymizedPath);
if (this.isText === null) {
this.isText = isTextFile(this.file.anonymizedPath, chunk);
}
if (isText) {
this.isText = true;
const anonimizer = new ContentAnonimizer(chunk.toString(), {
repoId: this.file.repository.repoId,
image: this.file.repository.options.image,
link: this.file.repository.options.link,
terms: this.file.repository.options.terms,
repoName: (this.file.repository.source as GitHubBase).githubRepository
?.fullName,
branchName:
(this.file.repository.source as GitHubBase).branch?.name || "main",
if (this.isText) {
const anonimizer = new ContentAnonimizer(chunk.toString(), {
repoId: this.file.repository.repoId,
image: this.file.repository.options.image,
link: this.file.repository.options.link,
terms: this.file.repository.options.terms,
repoName: (this.file.repository.source as GitHubBase)
.githubRepository?.fullName,
branchName:
(this.file.repository.source as GitHubBase).branch?.name ||
"main",
});
anonimizer.anonymize();
if (anonimizer.wasAnonymized) {
this.wasAnonimized = true;
chunk = Buffer.from(anonimizer.content);
}
}
this.emit("transform", {
isText: this.isText,
wasAnonimized: this.wasAnonimized,
chunk,
});
this.push(chunk);
span.end();
callback();
});
anonimizer.anonymize();
if (anonimizer.wasAnonymized) {
this.wasAnonimized = true;
chunk = Buffer.from(anonimizer.content);
}
}
this.emit("transform", {
isText,
wasAnonimized: this.wasAnonimized,
chunk,
});
this.push(chunk);
callback();
}
}
@@ -192,11 +202,22 @@ export class ContentAnonimizer {
}
anonymize() {
this.removeImage();
this.removeLink();
this.replaceGitHubSelfLinks();
this.replaceTerms();
return this.content;
const span = trace
.getTracer("ano-file")
.startSpan("ContentAnonimizer.anonymize");
try {
this.removeImage();
span.addEvent("removeImage");
this.removeLink();
span.addEvent("removeLink");
this.replaceGitHubSelfLinks();
span.addEvent("replaceGitHubSelfLinks");
this.replaceTerms();
span.addEvent("replaceTerms");
return this.content;
} finally {
span.end();
}
}
}
@@ -221,21 +242,28 @@ export function anonymizeContent(
}
export function anonymizePath(path: string, terms: string[]) {
for (let i = 0; i < terms.length; i++) {
let term = terms[i];
if (term.trim() == "") {
continue;
}
try {
new RegExp(term, "gi");
} catch {
// escape regex characters
term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&");
}
path = path.replace(
new RegExp(term, "gi"),
config.ANONYMIZATION_MASK + "-" + (i + 1)
);
}
return path;
return trace
.getTracer("ano-file")
.startActiveSpan("utils.anonymizePath", (span) => {
span.setAttribute("path", path);
for (let i = 0; i < terms.length; i++) {
let term = terms[i];
if (term.trim() == "") {
continue;
}
try {
new RegExp(term, "gi");
} catch {
// escape regex characters
term = term.replace(/[-[\]{}()*+?.,\\^$|#]/g, "\\$&");
}
path = path.replace(
new RegExp(term, "gi"),
config.ANONYMIZATION_MASK + "-" + (i + 1)
);
}
span.setAttribute("return", path);
span.end();
return path;
});
}

View File

@@ -17,7 +17,6 @@ export async function connect() {
await mongoose.connect(MONGO_URL + "production", {
authSource: "admin",
appName: "Anonymous GitHub Server",
compressors: "zlib",
} as ConnectOptions);
isConnected = true;

View File

@@ -4,6 +4,7 @@ config();
import Repository from "../Repository";
import { getRepository as getRepositoryImport } from "../database/database";
import { RepositoryStatus } from "../types";
import { Exception, trace } from "@opentelemetry/api";
export default async function (job: SandboxedJob<Repository, void>) {
const {
@@ -13,6 +14,8 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.downloadRepository");
span.setAttribute("repoId", job.data.repoId);
console.log(`[QUEUE] ${job.data.repoId} is going to be downloaded`);
try {
await connect();
@@ -27,14 +30,18 @@ export default async function (job: SandboxedJob<Repository, void>) {
} catch (error) {
job.updateProgress({ status: "error" });
if (error instanceof Error) {
span.recordException(error as Exception);
await repo.updateStatus(RepositoryStatus.ERROR, error.message);
} else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error);
span.recordException(error);
}
throw error;
}
} catch (error) {
console.error(error);
span.recordException(error as Exception);
console.log(`[QUEUE] ${job.data.repoId} is finished with an error`);
} finally {
span.end();
}
}

View File

@@ -1,6 +1,7 @@
import { SandboxedJob } from "bullmq";
import Repository from "../Repository";
import { getRepository as getRepositoryImport } from "../database/database";
import { Exception, trace } from "@opentelemetry/api";
export default async function (job: SandboxedJob<Repository, void>) {
const {
@@ -10,6 +11,8 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeCache");
span.setAttribute("repoId", job.data.repoId);
try {
await connect();
console.log(
@@ -19,11 +22,13 @@ export default async function (job: SandboxedJob<Repository, void>) {
try {
await repo.removeCache();
} catch (error) {
span.recordException(error as Exception);
throw error;
}
} catch (error) {
console.error(error);
span.recordException(error as Exception);
} finally {
console.log(`[QUEUE] Cache of ${job.data.repoId} is removed.`);
span.end();
}
}

View File

@@ -2,6 +2,8 @@ import { SandboxedJob } from "bullmq";
import Repository from "../Repository";
import { getRepository as getRepositoryImport } from "../database/database";
import { RepositoryStatus } from "../types";
import { trace } from "@opentelemetry/api";
import { Span } from "@opentelemetry/sdk-trace-node";
export default async function (job: SandboxedJob<Repository, void>) {
const {
@@ -11,6 +13,8 @@ export default async function (job: SandboxedJob<Repository, void>) {
connect: () => Promise<void>;
getRepository: typeof getRepositoryImport;
} = require("../database/database");
const span = trace.getTracer("ano-file").startSpan("proc.removeRepository");
span.setAttribute("repoId", job.data.repoId);
try {
await connect();
console.log(`[QUEUE] ${job.data.repoId} is going to be removed`);
@@ -24,11 +28,13 @@ export default async function (job: SandboxedJob<Repository, void>) {
} else if (typeof error === "string") {
await repo.updateStatus(RepositoryStatus.ERROR, error);
}
span.recordException(error as Error);
throw error;
}
} catch (error) {
console.error(error);
span.recordException(error as Error);
} finally {
console.log(`[QUEUE] ${job.data.repoId} is removed`);
span.end();
}
}

View File

@@ -10,7 +10,7 @@ import GitHubBase from "./GitHubBase";
import AnonymizedFile from "../AnonymizedFile";
import { FILE_TYPE, RepositoryStatus, SourceBase } from "../types";
import AnonymousError from "../AnonymousError";
import { tryCatch } from "bullmq";
import { trace } from "@opentelemetry/api";
export default class GitHubDownload extends GitHubBase implements SourceBase {
constructor(
@@ -40,115 +40,129 @@ export default class GitHubDownload extends GitHubBase implements SourceBase {
}
async download(token?: string) {
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (
this.repository.status == "download" &&
this.repository.model.statusDate > fiveMinuteAgo
)
throw new AnonymousError("repo_in_download", {
httpStatus: 404,
object: this.repository,
});
let response: OctokitResponse<unknown, number>;
const span = trace.getTracer("ano-file").startSpan("GHDownload.download");
span.setAttribute("repoId", this.repository.repoId);
try {
if (!token) {
token = await this.getToken();
}
response = await this._getZipUrl(token);
} catch (error) {
if ((error as any).status == 401 && config.GITHUB_TOKEN) {
try {
response = await this._getZipUrl(config.GITHUB_TOKEN);
} catch (error) {
const fiveMinuteAgo = new Date();
fiveMinuteAgo.setMinutes(fiveMinuteAgo.getMinutes() - 5);
if (
this.repository.status == "download" &&
this.repository.model.statusDate > fiveMinuteAgo
)
throw new AnonymousError("repo_in_download", {
httpStatus: 404,
object: this.repository,
});
let response: OctokitResponse<unknown, number>;
try {
if (!token) {
token = await this.getToken();
}
response = await this._getZipUrl(token);
} catch (error) {
span.recordException(error as Error);
if ((error as any).status == 401 && config.GITHUB_TOKEN) {
try {
response = await this._getZipUrl(config.GITHUB_TOKEN);
} catch (error) {
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
cause: error as Error,
object: this.repository,
});
}
} else {
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
cause: error as Error,
object: this.repository,
cause: error as Error,
});
}
} else {
await this.repository.resetSate(
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
httpStatus: 404,
object: this.repository,
cause: error as Error,
});
}
}
await this.repository.updateStatus(RepositoryStatus.DOWNLOAD);
const originalPath = this.repository.originalCachePath;
await storage.mk(originalPath);
let progress: { transferred: number } | undefined = undefined;
let progressTimeout;
let inDownload = true;
await this.repository.updateStatus(RepositoryStatus.DOWNLOAD);
const originalPath = this.repository.originalCachePath;
await storage.mk(originalPath);
let progress: { transferred: number } | undefined = undefined;
let progressTimeout;
let inDownload = true;
const that = this;
async function updateProgress() {
if (inDownload) {
if (progress && that.repository.status == RepositoryStatus.DOWNLOAD) {
await that.repository.updateStatus(
that.repository.status,
progress.transferred.toString()
);
const that = this;
async function updateProgress() {
if (inDownload) {
if (progress && that.repository.status == RepositoryStatus.DOWNLOAD) {
await that.repository.updateStatus(
that.repository.status,
progress.transferred.toString()
);
}
progressTimeout = setTimeout(updateProgress, 1500);
}
progressTimeout = setTimeout(updateProgress, 1500);
}
}
updateProgress();
updateProgress();
try {
const downloadStream = got.stream(response.url);
downloadStream.addListener("downloadProgress", async (p) => {
progress = p;
});
await storage.extractZip(originalPath, downloadStream, undefined, this);
} catch (error) {
await this.repository.updateStatus(
RepositoryStatus.ERROR,
"unable_to_download"
);
throw new AnonymousError("unable_to_download", {
httpStatus: 500,
cause: error as Error,
object: this.repository,
});
try {
const downloadStream = got.stream(response.url);
downloadStream.addListener("downloadProgress", async (p) => {
progress = p;
});
await storage.extractZip(originalPath, downloadStream, undefined, this);
} catch (error) {
span.recordException(error as Error);
await this.repository.updateStatus(
RepositoryStatus.ERROR,
"unable_to_download"
);
throw new AnonymousError("unable_to_download", {
httpStatus: 500,
cause: error as Error,
object: this.repository,
});
} finally {
inDownload = false;
clearTimeout(progressTimeout);
}
this.repository.model.isReseted = false;
try {
await this.repository.updateStatus(RepositoryStatus.READY);
} catch (error) {
span.recordException(error as Error);
}
} finally {
inDownload = false;
clearTimeout(progressTimeout);
}
this.repository.model.isReseted = false;
try {
await this.repository.updateStatus(RepositoryStatus.READY);
} catch (error) {
console.error(error);
span.end();
}
}
async getFileContent(file: AnonymizedFile): Promise<Readable> {
const exists = await storage.exists(file.originalCachePath);
if (exists === FILE_TYPE.FILE) {
return storage.read(file.originalCachePath);
} else if (exists === FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
object: file,
});
}
// will throw an error if the file is not in the repository
await file.originalPath();
const span = trace.getTracer("ano-file").startSpan("GHDownload.getFileContent");
span.setAttribute("repoId", this.repository.repoId);
try {
const exists = await storage.exists(file.originalCachePath);
if (exists === FILE_TYPE.FILE) {
return storage.read(file.originalCachePath);
} else if (exists === FILE_TYPE.FOLDER) {
throw new AnonymousError("folder_not_supported", {
httpStatus: 400,
object: file,
});
}
// will throw an error if the file is not in the repository
await file.originalPath();
// the cache is not ready, we need to download the repository
await this.download();
return storage.read(file.originalCachePath);
// the cache is not ready, we need to download the repository
await this.download();
return storage.read(file.originalCachePath);
} finally {
span.end();
}
}
async getFiles() {

View File

@@ -5,6 +5,7 @@ import { Octokit, RestEndpointMethodTypes } from "@octokit/rest";
import RepositoryModel from "../database/repositories/repositories.model";
import AnonymousError from "../AnonymousError";
import { isConnected } from "../database/database";
import { trace } from "@opentelemetry/api";
export class GitHubRepository {
private _data: Partial<{
@@ -50,64 +51,81 @@ export class GitHubRepository {
accessToken?: string;
}
) {
const octokit = new Octokit({ auth: opt.accessToken });
const commit = await octokit.repos.getCommit({
owner: this.owner,
repo: this.repo,
ref: sha,
});
return commit.data;
const span = trace
.getTracer("ano-file")
.startSpan("GHRepository.getCommitInfo");
span.setAttribute("owner", this.owner);
span.setAttribute("repo", this.repo);
try {
const octokit = new Octokit({ auth: opt.accessToken });
const commit = await octokit.repos.getCommit({
owner: this.owner,
repo: this.repo,
ref: sha,
});
return commit.data;
} finally {
span.end();
}
}
async branches(opt: {
accessToken?: string;
force?: boolean;
}): Promise<Branch[]> {
if (
!this._data.branches ||
this._data.branches.length == 0 ||
opt?.force === true
) {
// get the list of repo from github
const octokit = new Octokit({ auth: opt.accessToken });
try {
const branches = (
await octokit.paginate("GET /repos/{owner}/{repo}/branches", {
owner: this.owner,
repo: this.repo,
per_page: 100,
})
).map((b) => {
return {
name: b.name,
commit: b.commit.sha,
readme: this._data.branches?.filter(
(f: Branch) => f.name == b.name
)[0]?.readme,
} as Branch;
});
this._data.branches = branches;
if (isConnected) {
await RepositoryModel.updateOne(
{ externalId: this.id },
{ $set: { branches } }
);
const span = trace.getTracer("ano-file").startSpan("GHRepository.branches");
span.setAttribute("owner", this.owner);
span.setAttribute("repo", this.repo);
try {
if (
!this._data.branches ||
this._data.branches.length == 0 ||
opt?.force === true
) {
// get the list of repo from github
const octokit = new Octokit({ auth: opt.accessToken });
try {
const branches = (
await octokit.paginate("GET /repos/{owner}/{repo}/branches", {
owner: this.owner,
repo: this.repo,
per_page: 100,
})
).map((b) => {
return {
name: b.name,
commit: b.commit.sha,
readme: this._data.branches?.filter(
(f: Branch) => f.name == b.name
)[0]?.readme,
} as Branch;
});
this._data.branches = branches;
if (isConnected) {
await RepositoryModel.updateOne(
{ externalId: this.id },
{ $set: { branches } }
);
}
} catch (error) {
span.recordException(error as Error);
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status,
cause: error as Error,
object: this,
});
}
} catch (error) {
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status,
cause: error as Error,
object: this,
});
} else if (isConnected) {
const q = await RepositoryModel.findOne({ externalId: this.id }).select(
"branches"
);
this._data.branches = q?.branches;
}
} else if (isConnected) {
const q = await RepositoryModel.findOne({ externalId: this.id }).select(
"branches"
);
this._data.branches = q?.branches;
}
return this._data.branches || [];
return this._data.branches || [];
} finally {
span.end();
}
}
async readme(opt: {
@@ -115,52 +133,60 @@ export class GitHubRepository {
force?: boolean;
accessToken?: string;
}): Promise<string | undefined> {
if (!opt.branch) opt.branch = this._data.defaultBranch || "master";
const span = trace.getTracer("ano-file").startSpan("GHRepository.readme");
span.setAttribute("owner", this.owner);
span.setAttribute("repo", this.repo);
try {
if (!opt.branch) opt.branch = this._data.defaultBranch || "master";
const model = await RepositoryModel.findOne({
externalId: this.id,
}).select("branches");
const model = await RepositoryModel.findOne({
externalId: this.id,
}).select("branches");
if (!model) {
throw new AnonymousError("repo_not_found", { httpStatus: 404 });
}
if (!model) {
throw new AnonymousError("repo_not_found", { httpStatus: 404 });
}
this._data.branches = await this.branches(opt);
model.branches = this._data.branches;
this._data.branches = await this.branches(opt);
model.branches = this._data.branches;
const selected = model.branches.filter((f) => f.name == opt.branch)[0];
if (selected && (!selected.readme || opt?.force === true)) {
// get the list of repo from github
const octokit = new Octokit({ auth: opt.accessToken });
try {
const ghRes = await octokit.repos.getReadme({
owner: this.owner,
repo: this.repo,
ref: selected?.commit,
});
const readme = Buffer.from(
ghRes.data.content,
ghRes.data.encoding as BufferEncoding
).toString("utf-8");
selected.readme = readme;
await model.save();
} catch (error) {
const selected = model.branches.filter((f) => f.name == opt.branch)[0];
if (selected && (!selected.readme || opt?.force === true)) {
// get the list of repo from github
const octokit = new Octokit({ auth: opt.accessToken });
try {
const ghRes = await octokit.repos.getReadme({
owner: this.owner,
repo: this.repo,
ref: selected?.commit,
});
const readme = Buffer.from(
ghRes.data.content,
ghRes.data.encoding as BufferEncoding
).toString("utf-8");
selected.readme = readme;
await model.save();
} catch (error) {
span.recordException(error as Error);
throw new AnonymousError("readme_not_available", {
httpStatus: 404,
cause: error as Error,
object: this,
});
}
}
if (!selected) {
throw new AnonymousError("readme_not_available", {
httpStatus: 404,
cause: error as Error,
object: this,
});
}
}
if (!selected) {
throw new AnonymousError("readme_not_available", {
httpStatus: 404,
object: this,
});
return selected.readme;
} finally {
span.end();
}
return selected.readme;
}
public get owner(): string {
@@ -203,57 +229,69 @@ export async function getRepositoryFromGitHub(opt: {
repo: string;
accessToken: string;
}) {
if (opt.repo.indexOf(".git") > -1) {
opt.repo = opt.repo.replace(".git", "");
}
const octokit = new Octokit({ auth: opt.accessToken });
let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"];
const span = trace
.getTracer("ano-file")
.startSpan("GHRepository.getRepositoryFromGitHub");
span.setAttribute("owner", opt.owner);
span.setAttribute("repo", opt.repo);
try {
r = (
await octokit.repos.get({
owner: opt.owner,
repo: opt.repo,
})
).data;
} catch (error) {
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status,
object: {
owner: opt.owner,
repo: opt.repo,
},
cause: error as Error,
});
}
if (!r)
throw new AnonymousError("repo_not_found", {
httpStatus: 404,
object: {
owner: opt.owner,
repo: opt.repo,
},
});
let model = new RepositoryModel({ externalId: "gh_" + r.id });
if (isConnected) {
const dbModel = await RepositoryModel.findOne({ externalId: "gh_" + r.id });
if (dbModel) {
model = dbModel;
if (opt.repo.indexOf(".git") > -1) {
opt.repo = opt.repo.replace(".git", "");
}
const octokit = new Octokit({ auth: opt.accessToken });
let r: RestEndpointMethodTypes["repos"]["get"]["response"]["data"];
try {
r = (
await octokit.repos.get({
owner: opt.owner,
repo: opt.repo,
})
).data;
} catch (error) {
span.recordException(error as Error);
throw new AnonymousError("repo_not_found", {
httpStatus: (error as any).status,
object: {
owner: opt.owner,
repo: opt.repo,
},
cause: error as Error,
});
}
if (!r)
throw new AnonymousError("repo_not_found", {
httpStatus: 404,
object: {
owner: opt.owner,
repo: opt.repo,
},
});
let model = new RepositoryModel({ externalId: "gh_" + r.id });
if (isConnected) {
const dbModel = await RepositoryModel.findOne({
externalId: "gh_" + r.id,
});
if (dbModel) {
model = dbModel;
}
}
model.name = r.full_name;
model.url = r.html_url;
model.size = r.size;
model.defaultBranch = r.default_branch;
model.hasPage = r.has_pages;
if (model.hasPage) {
const ghPageRes = await octokit.repos.getPages({
owner: opt.owner,
repo: opt.repo,
});
model.pageSource = ghPageRes.data.source;
}
if (isConnected) {
await model.save();
}
return new GitHubRepository(model);
} finally {
span.end();
}
model.name = r.full_name;
model.url = r.html_url;
model.size = r.size;
model.defaultBranch = r.default_branch;
model.hasPage = r.has_pages;
if (model.hasPage) {
const ghPageRes = await octokit.repos.getPages({
owner: opt.owner,
repo: opt.repo,
});
model.pageSource = ghPageRes.data.source;
}
if (isConnected) {
await model.save();
}
return new GitHubRepository(model);
}

View File

@@ -9,6 +9,7 @@ import * as path from "path";
import * as stream from "stream";
import AnonymousError from "../AnonymousError";
import config from "../../config";
import { trace } from "@opentelemetry/api";
export default class GitHubStream extends GitHubBase implements SourceBase {
constructor(
@@ -26,67 +27,83 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
}
async getFileContent(file: AnonymizedFile): Promise<stream.Readable> {
const octokit = new Octokit({
auth: await this.getToken(),
});
return trace
.getTracer("ano-file")
.startActiveSpan("GHStream.getFileContent", async (span) => {
span.setAttribute("path", file.anonymizedPath);
const octokit = new Octokit({
auth: await this.getToken(),
});
const file_sha = await file.sha();
if (!file_sha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
const file_sha = await file.sha();
if (!file_sha) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
try {
const ghRes = await octokit.rest.git.getBlob({
owner: this.githubRepository.owner,
repo: this.githubRepository.repo,
file_sha,
});
if (!ghRes.data.content && ghRes.data.size != 0) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
// empty file
let content: Buffer;
if (ghRes.data.content) {
content = Buffer.from(
ghRes.data.content,
ghRes.data.encoding as BufferEncoding
);
} else {
content = Buffer.from("");
}
await storage.write(file.originalCachePath, content, file, this);
this.repository.model.isReseted = false;
await this.repository.model.save();
if (this.repository.status !== RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
return stream.Readable.from(content);
} catch (error) {
if (
(error as any).status === 404 ||
(error as any).httpStatus === 404
) {
throw new AnonymousError("file_not_found", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
}
throw new AnonymousError("file_too_big", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
} finally {
span.end();
}
});
}
try {
const ghRes = await octokit.rest.git.getBlob({
owner: this.githubRepository.owner,
repo: this.githubRepository.repo,
file_sha,
});
if (!ghRes.data.content && ghRes.data.size != 0) {
throw new AnonymousError("file_not_accessible", {
httpStatus: 404,
object: file,
});
}
// empty file
let content: Buffer;
if (ghRes.data.content) {
content = Buffer.from(
ghRes.data.content,
ghRes.data.encoding as BufferEncoding
);
} else {
content = Buffer.from("");
}
await storage.write(file.originalCachePath, content, file, this);
this.repository.model.isReseted = false;
await this.repository.model.save();
if (this.repository.status !== RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
return stream.Readable.from(content);
} catch (error) {
if ((error as any).status === 404 || (error as any).httpStatus === 404) {
throw new AnonymousError("file_not_found", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
}
throw new AnonymousError("file_too_big", {
httpStatus: (error as any).status || (error as any).httpStatus,
cause: error as Error,
object: file,
});
}
}
async getFiles() {
let commit = this.branch?.commit;
if (!commit && this.repository.model.source.commit) {
commit = this.repository.model.source.commit;
const span = trace.getTracer("ano-file").startSpan("GHStream.getFiles");
span.setAttribute("repoId", this.repository.repoId);
try {
let commit = this.branch?.commit;
if (!commit && this.repository.model.source.commit) {
commit = this.repository.model.source.commit;
}
return this.getTree(commit);
} finally {
span.end();
}
return this.getTree(commit);
}
private async getTree(
@@ -98,6 +115,9 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
request: 0,
}
) {
const span = trace.getTracer("ano-file").startSpan("GHStream.getTree");
span.setAttribute("repoId", this.repository.repoId);
span.setAttribute("sha", sha);
this.repository.model.truckedFileList = false;
let ghRes: Awaited<ReturnType<typeof this.getGHTree>>;
@@ -105,11 +125,13 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
count.request++;
ghRes = await this.getGHTree(sha, { recursive: true });
} catch (error) {
span.recordException(error as Error);
if ((error as any).status == 409) {
// empty tree
if (this.repository.status != RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
// cannot be empty otherwise it would try to download it again
span.end();
return { __: {} };
} else {
console.log(
@@ -121,7 +143,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
RepositoryStatus.ERROR,
"repo_not_accessible"
);
throw new AnonymousError("repo_not_accessible", {
const err = new AnonymousError("repo_not_accessible", {
httpStatus: (error as any).status,
cause: error as Error,
object: {
@@ -130,6 +152,9 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
tree_sha: sha,
},
});
span.recordException(err);
span.end();
throw err;
}
}
const tree = this.tree2Tree(ghRes.tree, truncatedTree, parentPath);
@@ -139,6 +164,7 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
}
if (this.repository.status !== RepositoryStatus.READY)
await this.repository.updateStatus(RepositoryStatus.READY);
span.end();
return tree;
}
@@ -165,9 +191,6 @@ export default class GitHubStream extends GitHubBase implements SourceBase {
},
depth = 0
) {
console.log(
`sha ${sha}, countFiles: ${count.file} countRequest: ${count.request}, parentPath: "${parentPath}"`
);
count.request++;
let data = null;

View File

@@ -10,6 +10,7 @@ import * as archiver from "archiver";
import { promisify } from "util";
import AnonymizedFile from "../AnonymizedFile";
import { lookup } from "mime-types";
import { trace } from "@opentelemetry/api";
export default class FileSystem implements StorageBase {
type = "FileSystem";
@@ -18,19 +19,36 @@ export default class FileSystem implements StorageBase {
/** @override */
async exists(p: string): Promise<FILE_TYPE> {
try {
const stat = await fs.promises.stat(join(config.FOLDER, p));
if (stat.isDirectory()) return FILE_TYPE.FOLDER;
if (stat.isFile()) return FILE_TYPE.FILE;
} catch (_) {
// ignore file not found or not downloaded
}
return FILE_TYPE.NOT_FOUND;
return trace
.getTracer("ano-file")
.startActiveSpan("fs.exists", async (span) => {
span.setAttribute("path", p);
span.setAttribute("full-path", join(config.FOLDER, p));
try {
const stat = await fs.promises.stat(join(config.FOLDER, p));
if (stat.isDirectory()) return FILE_TYPE.FOLDER;
if (stat.isFile()) return FILE_TYPE.FILE;
} catch (_) {
// ignore file not found or not downloaded
}
span.end();
return FILE_TYPE.NOT_FOUND;
});
}
/** @override */
async send(p: string, res: Response) {
res.sendFile(join(config.FOLDER, p), { dotfiles: "allow" });
return trace
.getTracer("ano-file")
.startActiveSpan("fs.send", async (span) => {
span.setAttribute("path", p);
res.sendFile(join(config.FOLDER, p), { dotfiles: "allow" }, (err) => {
if (err) {
span.recordException(err);
}
span.end();
});
});
}
/** @override */
@@ -56,22 +74,46 @@ export default class FileSystem implements StorageBase {
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
await this.mk(dirname(p));
return fs.promises.writeFile(join(config.FOLDER, p), data);
return trace
.getTracer("ano-file")
.startActiveSpan("fs.write", async (span) => {
span.setAttribute("path", p);
try {
await this.mk(dirname(p));
return await fs.promises.writeFile(
join(config.FOLDER, p),
data,
"utf-8"
);
} finally {
span.end();
}
});
}
/** @override */
async rm(dir: string): Promise<void> {
const span = trace.getTracer("ano-file").startSpan("fs.rm");
span.setAttribute("path", dir);
await fs.promises.rm(join(config.FOLDER, dir), {
force: true,
recursive: true,
});
span.end();
}
/** @override */
async mk(dir: string): Promise<void> {
if ((await this.exists(dir)) === FILE_TYPE.NOT_FOUND)
fs.promises.mkdir(join(config.FOLDER, dir), { recursive: true });
return trace
.getTracer("ano-file")
.startActiveSpan("fs.mk", async (span) => {
span.setAttribute("path", dir);
if ((await this.exists(dir)) === FILE_TYPE.NOT_FOUND)
await fs.promises.mkdir(join(config.FOLDER, dir), {
recursive: true,
});
span.end();
});
}
/** @override */
@@ -82,34 +124,40 @@ export default class FileSystem implements StorageBase {
onEntry?: (file: { path: string; size: number }) => void;
} = {}
): Promise<Tree> {
if (opt.root == null) {
opt.root = config.FOLDER;
}
let files = await fs.promises.readdir(join(opt.root, dir));
const output: Tree = {};
for (let file of files) {
let filePath = join(dir, file);
try {
const stats = await fs.promises.stat(join(opt.root, filePath));
if (file[0] == "$") {
file = "\\" + file;
return trace
.getTracer("ano-file")
.startActiveSpan("fs.listFiles", async (span) => {
span.setAttribute("path", dir);
if (opt.root == null) {
opt.root = config.FOLDER;
}
if (stats.isDirectory()) {
output[file] = await this.listFiles(filePath, opt);
} else if (stats.isFile()) {
if (opt.onEntry) {
opt.onEntry({
path: filePath,
size: stats.size,
});
let files = await fs.promises.readdir(join(opt.root, dir));
const output: Tree = {};
for (let file of files) {
let filePath = join(dir, file);
try {
const stats = await fs.promises.stat(join(opt.root, filePath));
if (file[0] == "$") {
file = "\\" + file;
}
if (stats.isDirectory()) {
output[file] = await this.listFiles(filePath, opt);
} else if (stats.isFile()) {
if (opt.onEntry) {
opt.onEntry({
path: filePath,
size: stats.size,
});
}
output[file] = { size: stats.size, sha: stats.ino.toString() };
}
} catch (error) {
span.recordException(error as Error);
}
output[file] = { size: stats.size, sha: stats.ino.toString() };
}
} catch (error) {
console.error(error);
}
}
return output;
span.end();
return output;
});
}
/** @override */
@@ -144,7 +192,7 @@ export default class FileSystem implements StorageBase {
) {
const archive = archiver(opt?.format || "zip", {});
this.listFiles(dir, {
await this.listFiles(dir, {
onEntry: async (file) => {
let rs = await this.read(file.path);
if (opt?.fileTransformer) {

View File

@@ -15,6 +15,7 @@ import * as archiver from "archiver";
import { dirname, basename } from "path";
import AnonymousError from "../AnonymousError";
import AnonymizedFile from "../AnonymizedFile";
import { trace } from "@opentelemetry/api";
export default class S3Storage implements StorageBase {
type = "AWS";
@@ -45,21 +46,27 @@ export default class S3Storage implements StorageBase {
/** @override */
async exists(path: string): Promise<FILE_TYPE> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const span = trace.getTracer("ano-file").startSpan("s3.exists");
span.setAttribute("path", path);
try {
// if we can get the file info, it is a file
await this.fileInfo(path);
return FILE_TYPE.FILE;
} catch (err) {
// check if it is a directory
const data = await this.client().listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: path,
MaxKeys: 1,
});
return (data.Contents?.length || 0) > 0
? FILE_TYPE.FOLDER
: FILE_TYPE.NOT_FOUND;
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
try {
// if we can get the file info, it is a file
await this.fileInfo(path);
return FILE_TYPE.FILE;
} catch (err) {
// check if it is a directory
const data = await this.client().listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: path,
MaxKeys: 1,
});
return (data.Contents?.length || 0) > 0
? FILE_TYPE.FOLDER
: FILE_TYPE.NOT_FOUND;
}
} finally {
span.end();
}
}
@@ -70,95 +77,120 @@ export default class S3Storage implements StorageBase {
/** @override */
async rm(dir: string): Promise<void> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const data = await this.client(200000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 100,
});
const span = trace.getTracer("ano-file").startSpan("s3.rm");
span.setAttribute("path", dir);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const data = await this.client(200000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 100,
});
const params = {
Bucket: config.S3_BUCKET,
Delete: { Objects: new Array<{ Key: string }>() },
};
const params = {
Bucket: config.S3_BUCKET,
Delete: { Objects: new Array<{ Key: string }>() },
};
data.Contents?.forEach(function (content) {
if (content.Key) {
params.Delete.Objects.push({ Key: content.Key });
data.Contents?.forEach(function (content) {
if (content.Key) {
params.Delete.Objects.push({ Key: content.Key });
}
});
if (params.Delete.Objects.length == 0) {
// nothing to remove
return;
}
});
await this.client(200000).deleteObjects(params);
if (params.Delete.Objects.length == 0) {
// nothing to remove
return;
}
await this.client(200000).deleteObjects(params);
if (data.IsTruncated) {
await this.rm(dir);
if (data.IsTruncated) {
await this.rm(dir);
}
} finally {
span.end();
}
}
/** @override */
async send(p: string, res: Response) {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const span = trace.getTracer("ano-file").startSpan("s3.send");
span.setAttribute("path", p);
try {
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: p,
});
const s = await this.client().send(command);
res.status(200);
if (s.ContentType) {
res.contentType(s.ContentType);
}
if (s.ContentLength) {
res.set("Content-Length", s.ContentLength.toString());
}
if (s.Body) {
(s.Body as Readable)?.pipe(res);
} else {
res.end();
}
} catch (error) {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
try {
res.status(500);
} catch (err) {
console.error(`[ERROR] S3 send ${p}`, err);
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: p,
});
const s = await this.client().send(command);
res.status(200);
if (s.ContentType) {
res.contentType(s.ContentType);
}
if (s.ContentLength) {
res.set("Content-Length", s.ContentLength.toString());
}
if (s.Body) {
(s.Body as Readable)?.pipe(res);
} else {
res.end();
}
} catch (error) {
span.recordException(error as Error);
try {
res.status(500);
} catch (err) {
console.error(`[ERROR] S3 send ${p}`, err);
}
}
} finally {
span.end();
}
}
async fileInfo(path: string) {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const info = await this.client(3000).headObject({
Bucket: config.S3_BUCKET,
Key: path,
});
return {
size: info.ContentLength,
lastModified: info.LastModified,
contentType: info.ContentType
? info.ContentType
: (lookup(path) as string),
};
const span = trace.getTracer("ano-file").startSpan("s3.fileInfo");
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const info = await this.client(3000).headObject({
Bucket: config.S3_BUCKET,
Key: path,
});
return {
size: info.ContentLength,
lastModified: info.LastModified,
contentType: info.ContentType
? info.ContentType
: (lookup(path) as string),
};
} finally {
span.end();
}
}
/** @override */
async read(path: string): Promise<Readable> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: path,
});
const res = (await this.client(3000).send(command)).Body;
if (!res) {
throw new AnonymousError("file_not_found", {
httpStatus: 404,
object: path,
const span = trace.getTracer("ano-file").startSpan("s3.rreadm");
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const command = new GetObjectCommand({
Bucket: config.S3_BUCKET,
Key: path,
});
const res = (await this.client(3000).send(command)).Body;
if (!res) {
throw new AnonymousError("file_not_found", {
httpStatus: 404,
object: path,
});
}
return res as Readable;
} finally {
span.end();
}
return res as Readable;
}
/** @override */
@@ -168,60 +200,72 @@ export default class S3Storage implements StorageBase {
file?: AnonymizedFile,
source?: SourceBase
): Promise<void> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const params: PutObjectCommandInput = {
Bucket: config.S3_BUCKET,
Key: path,
Body: data,
ContentType: lookup(path).toString(),
};
if (source) {
params.Tagging = `source=${source.type}`;
const span = trace.getTracer("ano-file").startSpan("s3.rm");
span.setAttribute("path", path);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const params: PutObjectCommandInput = {
Bucket: config.S3_BUCKET,
Key: path,
Body: data,
ContentType: lookup(path).toString(),
};
if (source) {
params.Tagging = `source=${source.type}`;
}
// 30s timeout
await this.client(30000).putObject(params);
return;
} finally {
span.end();
}
// 30s timeout
await this.client(30000).putObject(params);
return;
}
/** @override */
async listFiles(dir: string): Promise<Tree> {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
const out: Tree = {};
let req: ListObjectsV2CommandOutput;
let nextContinuationToken: string | undefined;
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
if (!req.Contents) return out;
nextContinuationToken = req.NextContinuationToken;
const span = trace.getTracer("ano-file").startSpan("s3.listFiles");
span.setAttribute("path", dir);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
const out: Tree = {};
let req: ListObjectsV2CommandOutput;
let nextContinuationToken: string | undefined;
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
if (!req.Contents) return out;
nextContinuationToken = req.NextContinuationToken;
for (const f of req.Contents) {
if (!f.Key) continue;
f.Key = f.Key.replace(dir, "");
const paths = f.Key.split("/");
let current: Tree = out;
for (let i = 0; i < paths.length - 1; i++) {
let p = paths[i];
if (!p) continue;
if (!(current[p] as Tree)) {
current[p] = {} as Tree;
for (const f of req.Contents) {
if (!f.Key) continue;
f.Key = f.Key.replace(dir, "");
const paths = f.Key.split("/");
let current: Tree = out;
for (let i = 0; i < paths.length - 1; i++) {
let p = paths[i];
if (!p) continue;
if (!(current[p] as Tree)) {
current[p] = {} as Tree;
}
current = current[p] as Tree;
}
current = current[p] as Tree;
}
if (f.ETag) {
const fileInfo: TreeFile = { size: f.Size || 0, sha: f.ETag };
const fileName = paths[paths.length - 1];
if (fileName) current[fileName] = fileInfo;
if (f.ETag) {
const fileInfo: TreeFile = { size: f.Size || 0, sha: f.ETag };
const fileName = paths[paths.length - 1];
if (fileName) current[fileName] = fileInfo;
}
}
}
} while (req && req.Contents && req.IsTruncated);
return out;
} while (req && req.Contents && req.IsTruncated);
return out;
} finally {
span.end();
}
}
/** @override */
@@ -232,7 +276,8 @@ export default class S3Storage implements StorageBase {
source?: SourceBase
): Promise<void> {
let toS3: ArchiveStreamToS3;
const span = trace.getTracer("ano-file").startSpan("s3.extractZip");
span.setAttribute("path", p);
return new Promise((resolve, reject) => {
if (!config.S3_BUCKET) return reject("S3_BUCKET not set");
toS3 = new ArchiveStreamToS3({
@@ -253,11 +298,16 @@ export default class S3Storage implements StorageBase {
});
pipeline(data, toS3, (err) => {
if (err) {
span.recordException(err as Error);
return reject(err);
}
span.end();
resolve();
})
.on("finish", resolve)
.on("finish", () => {
span.end();
resolve();
})
.on("error", reject);
});
}
@@ -270,39 +320,45 @@ export default class S3Storage implements StorageBase {
fileTransformer?: (p: string) => Transform;
}
) {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const archive = archiver(opt?.format || "zip", {});
if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
const span = trace.getTracer("ano-file").startSpan("s3.archive");
span.setAttribute("path", dir);
try {
if (!config.S3_BUCKET) throw new Error("S3_BUCKET not set");
const archive = archiver(opt?.format || "zip", {});
if (dir && dir[dir.length - 1] != "/") dir = dir + "/";
let req: ListObjectsV2CommandOutput;
let nextContinuationToken: string | undefined;
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
nextContinuationToken = req.NextContinuationToken;
for (const f of req.Contents || []) {
if (!f.Key) continue;
const filename = basename(f.Key);
const prefix = dirname(f.Key.replace(dir, ""));
let rs = await this.read(f.Key);
if (opt?.fileTransformer) {
// apply transformation on the stream
rs = rs.pipe(opt.fileTransformer(f.Key));
}
archive.append(rs, {
name: filename,
prefix,
let req: ListObjectsV2CommandOutput;
let nextContinuationToken: string | undefined;
do {
req = await this.client(30000).listObjectsV2({
Bucket: config.S3_BUCKET,
Prefix: dir,
MaxKeys: 250,
ContinuationToken: nextContinuationToken,
});
}
} while (req && req.Contents?.length && req.IsTruncated);
archive.finalize();
return archive;
nextContinuationToken = req.NextContinuationToken;
for (const f of req.Contents || []) {
if (!f.Key) continue;
const filename = basename(f.Key);
const prefix = dirname(f.Key.replace(dir, ""));
let rs = await this.read(f.Key);
if (opt?.fileTransformer) {
// apply transformation on the stream
rs = rs.pipe(opt.fileTransformer(f.Key));
}
archive.append(rs, {
name: filename,
prefix,
});
}
} while (req && req.Contents?.length && req.IsTruncated);
archive.finalize();
return archive;
} finally {
span.end();
}
}
}

View File

@@ -14,7 +14,8 @@
"sourceMap": false,
"skipLibCheck": true,
"strict": true,
"esModuleInterop": false
"esModuleInterop": false,
"incremental": true
},
"include": ["src/**/*.ts", "index.ts", "cli.ts"],
"exclude": ["node_modules", ".vscode"]