fix: use pipeline instead of pipe

This commit is contained in:
tdurieux
2021-09-07 17:31:34 +02:00
parent 924990b3cb
commit 1fbd7be949
4 changed files with 42 additions and 48 deletions
+3 -6
View File
@@ -1,6 +1,7 @@
import * as path from "path"; import * as path from "path";
import * as express from "express"; import * as express from "express";
import * as stream from "stream"; import * as stream from "stream";
import { promisify } from "util";
import Repository from "./Repository"; import Repository from "./Repository";
import { Tree, TreeElement, TreeFile } from "./types"; import { Tree, TreeElement, TreeFile } from "./types";
import storage from "./storage"; import storage from "./storage";
@@ -167,13 +168,9 @@ export default class AnonymizedFile {
} }
async send(res: express.Response): Promise<void> { async send(res: express.Response): Promise<void> {
const pipeline = promisify(stream.pipeline);
try { try {
const s = await this.anonymizedContent(); await pipeline(await this.anonymizedContent(), res);
s.on("error", (err) => {
console.log(err);
res.status(500).send({ error: err.message });
});
s.pipe(res);
} catch (error) { } catch (error) {
handleError(error, res); handleError(error, res);
} }
+7 -5
View File
@@ -1,7 +1,9 @@
import { promisify } from "util";
import * as express from "express"; import * as express from "express";
import * as stream from "stream";
import config from "../../config"; import config from "../../config";
import * as db from "../database/database";
import { getRepo, handleError } from "./route-utils"; import { getRepo, handleError } from "./route-utils";
const router = express.Router(); const router = express.Router();
@@ -14,13 +16,14 @@ router.get(
const repo = await getRepo(req, res); const repo = await getRepo(req, res);
if (!repo) return; if (!repo) return;
const pipeline = promisify(stream.pipeline);
try { try {
res.attachment(`${repo.repoId}.zip`); res.attachment(`${repo.repoId}.zip`);
// ache the file for 6 hours // cache the file for 6 hours
res.header("Cache-Control", "max-age=21600000"); res.header("Cache-Control", "max-age=21600000");
await pipeline(repo.zip(), res)
repo.zip().pipe(res);
} catch (error) { } catch (error) {
handleError(error, res); handleError(error, res);
} }
@@ -61,7 +64,6 @@ router.get(
await repo.updateIfNeeded(); await repo.updateIfNeeded();
} }
let download = false; let download = false;
const conference = await repo.conference(); const conference = await repo.conference();
if (conference) { if (conference) {
+14 -15
View File
@@ -1,12 +1,14 @@
import { StorageBase, Tree } from "../types"; import { StorageBase, Tree } from "../types";
import config from "../../config";
import * as fs from "fs"; import * as fs from "fs";
import * as tar from "tar-fs"; import * as tar from "tar-fs";
import * as path from "path"; import * as path from "path";
import * as express from "express"; import * as express from "express";
import config from "../../config";
import * as stream from "stream"; import * as stream from "stream";
import * as gunzip from "gunzip-maybe"; import * as gunzip from "gunzip-maybe";
import * as archiver from "archiver"; import * as archiver from "archiver";
import { promisify } from "util";
export default class FileSystem implements StorageBase { export default class FileSystem implements StorageBase {
type = "FileSystem"; type = "FileSystem";
@@ -92,20 +94,17 @@ export default class FileSystem implements StorageBase {
/** @override */ /** @override */
async extractTar(p: string, data: stream.Readable): Promise<void> { async extractTar(p: string, data: stream.Readable): Promise<void> {
return new Promise((resolve, reject) => { const pipeline = promisify(stream.pipeline);
data return pipeline(
.pipe(gunzip()) data,
.pipe( gunzip(),
tar.extract(path.join(config.FOLDER, p), { tar.extract(path.join(config.FOLDER, p), {
map: (header) => { map: (header) => {
header.name = header.name.substr(header.name.indexOf("/") + 1); header.name = header.name.substr(header.name.indexOf("/") + 1);
return header; return header;
}, },
}) })
) );
.on("finish", resolve)
.on("error", reject);
});
} }
/** @override */ /** @override */
+18 -22
View File
@@ -2,6 +2,7 @@ import { StorageBase, Tree, TreeFile } from "../types";
import { S3 } from "aws-sdk"; import { S3 } from "aws-sdk";
import config from "../../config"; import config from "../../config";
import * as stream from "stream"; import * as stream from "stream";
import { promisify } from "util";
import { ArchiveStreamToS3 } from "archive-stream-to-s3"; import { ArchiveStreamToS3 } from "archive-stream-to-s3";
import * as express from "express"; import * as express from "express";
import * as mime from "mime-types"; import * as mime from "mime-types";
@@ -101,9 +102,10 @@ export default class S3Storage implements StorageBase {
res.set("Content-Length", headers["content-length"]); res.set("Content-Length", headers["content-length"]);
res.set("Content-Type", headers["content-type"]); res.set("Content-Type", headers["content-type"]);
} }
( stream.pipeline(
response.httpResponse.createUnbufferedStream() as stream.Readable response.httpResponse.createUnbufferedStream() as stream.Readable,
).pipe(res); res
);
}); });
s.send(); s.send();
@@ -167,28 +169,22 @@ export default class S3Storage implements StorageBase {
/** @override */ /** @override */
async extractTar(p: string, data: stream.Readable): Promise<void> { async extractTar(p: string, data: stream.Readable): Promise<void> {
return new Promise<void>((resolve, reject) => { const pipeline = promisify(stream.pipeline);
let toS3: ArchiveStreamToS3;
(ArchiveStreamToS3 as any).prototype.onEntry = function ( let toS3: ArchiveStreamToS3;
header: any,
stream: any,
next: any
) {
header.name = header.name.substr(header.name.indexOf("/") + 1);
originalArchiveStreamToS3Entry.call(toS3, header, stream, next);
};
toS3 = new ArchiveStreamToS3(config.S3_BUCKET, p, this.client); (ArchiveStreamToS3 as any).prototype.onEntry = function (
header: any,
stream: any,
next: any
) {
header.name = header.name.substr(header.name.indexOf("/") + 1);
originalArchiveStreamToS3Entry.call(toS3, header, stream, next);
};
toS3.on("finish", (result) => { toS3 = new ArchiveStreamToS3(config.S3_BUCKET, p, this.client);
resolve(result);
}); return pipeline(data, gunzip(), toS3);
toS3.on("error", (e) => {
reject(e);
});
data.pipe(gunzip()).pipe(toS3);
});
} }
/** @override */ /** @override */