From 1fbd7be94909dcf1e72194837286d12faac2d6ee Mon Sep 17 00:00:00 2001 From: tdurieux Date: Tue, 7 Sep 2021 17:31:34 +0200 Subject: [PATCH] fix: use pipeline instead of pipe --- src/AnonymizedFile.ts | 9 +++----- src/routes/repository-public.ts | 12 +++++----- src/storage/FileSystem.ts | 29 ++++++++++++------------ src/storage/S3.ts | 40 +++++++++++++++------------------ 4 files changed, 42 insertions(+), 48 deletions(-) diff --git a/src/AnonymizedFile.ts b/src/AnonymizedFile.ts index aeab744..aa9cdc2 100644 --- a/src/AnonymizedFile.ts +++ b/src/AnonymizedFile.ts @@ -1,6 +1,7 @@ import * as path from "path"; import * as express from "express"; import * as stream from "stream"; +import { promisify } from "util"; import Repository from "./Repository"; import { Tree, TreeElement, TreeFile } from "./types"; import storage from "./storage"; @@ -167,13 +168,9 @@ export default class AnonymizedFile { } async send(res: express.Response): Promise { + const pipeline = promisify(stream.pipeline); try { - const s = await this.anonymizedContent(); - s.on("error", (err) => { - console.log(err); - res.status(500).send({ error: err.message }); - }); - s.pipe(res); + await pipeline(await this.anonymizedContent(), res); } catch (error) { handleError(error, res); } diff --git a/src/routes/repository-public.ts b/src/routes/repository-public.ts index 44d5431..be65bf4 100644 --- a/src/routes/repository-public.ts +++ b/src/routes/repository-public.ts @@ -1,7 +1,9 @@ +import { promisify } from "util"; import * as express from "express"; +import * as stream from "stream"; import config from "../../config"; -import * as db from "../database/database"; + import { getRepo, handleError } from "./route-utils"; const router = express.Router(); @@ -14,13 +16,14 @@ router.get( const repo = await getRepo(req, res); if (!repo) return; + const pipeline = promisify(stream.pipeline); + try { res.attachment(`${repo.repoId}.zip`); - // ache the file for 6 hours + // cache the file for 6 hours res.header("Cache-Control", "max-age=21600000"); - - repo.zip().pipe(res); + await pipeline(repo.zip(), res) } catch (error) { handleError(error, res); } @@ -61,7 +64,6 @@ router.get( await repo.updateIfNeeded(); } - let download = false; const conference = await repo.conference(); if (conference) { diff --git a/src/storage/FileSystem.ts b/src/storage/FileSystem.ts index 83a86f5..24a008f 100644 --- a/src/storage/FileSystem.ts +++ b/src/storage/FileSystem.ts @@ -1,12 +1,14 @@ import { StorageBase, Tree } from "../types"; +import config from "../../config"; + import * as fs from "fs"; import * as tar from "tar-fs"; import * as path from "path"; import * as express from "express"; -import config from "../../config"; import * as stream from "stream"; import * as gunzip from "gunzip-maybe"; import * as archiver from "archiver"; +import { promisify } from "util"; export default class FileSystem implements StorageBase { type = "FileSystem"; @@ -92,20 +94,17 @@ export default class FileSystem implements StorageBase { /** @override */ async extractTar(p: string, data: stream.Readable): Promise { - return new Promise((resolve, reject) => { - data - .pipe(gunzip()) - .pipe( - tar.extract(path.join(config.FOLDER, p), { - map: (header) => { - header.name = header.name.substr(header.name.indexOf("/") + 1); - return header; - }, - }) - ) - .on("finish", resolve) - .on("error", reject); - }); + const pipeline = promisify(stream.pipeline); + return pipeline( + data, + gunzip(), + tar.extract(path.join(config.FOLDER, p), { + map: (header) => { + header.name = header.name.substr(header.name.indexOf("/") + 1); + return header; + }, + }) + ); } /** @override */ diff --git a/src/storage/S3.ts b/src/storage/S3.ts index e471b75..dedcd5a 100644 --- a/src/storage/S3.ts +++ b/src/storage/S3.ts @@ -2,6 +2,7 @@ import { StorageBase, Tree, TreeFile } from "../types"; import { S3 } from "aws-sdk"; import config from "../../config"; import * as stream from "stream"; +import { promisify } from "util"; import { ArchiveStreamToS3 } from "archive-stream-to-s3"; import * as express from "express"; import * as mime from "mime-types"; @@ -101,9 +102,10 @@ export default class S3Storage implements StorageBase { res.set("Content-Length", headers["content-length"]); res.set("Content-Type", headers["content-type"]); } - ( - response.httpResponse.createUnbufferedStream() as stream.Readable - ).pipe(res); + stream.pipeline( + response.httpResponse.createUnbufferedStream() as stream.Readable, + res + ); }); s.send(); @@ -167,28 +169,22 @@ export default class S3Storage implements StorageBase { /** @override */ async extractTar(p: string, data: stream.Readable): Promise { - return new Promise((resolve, reject) => { - let toS3: ArchiveStreamToS3; + const pipeline = promisify(stream.pipeline); - (ArchiveStreamToS3 as any).prototype.onEntry = function ( - header: any, - stream: any, - next: any - ) { - header.name = header.name.substr(header.name.indexOf("/") + 1); - originalArchiveStreamToS3Entry.call(toS3, header, stream, next); - }; + let toS3: ArchiveStreamToS3; - toS3 = new ArchiveStreamToS3(config.S3_BUCKET, p, this.client); + (ArchiveStreamToS3 as any).prototype.onEntry = function ( + header: any, + stream: any, + next: any + ) { + header.name = header.name.substr(header.name.indexOf("/") + 1); + originalArchiveStreamToS3Entry.call(toS3, header, stream, next); + }; - toS3.on("finish", (result) => { - resolve(result); - }); - toS3.on("error", (e) => { - reject(e); - }); - data.pipe(gunzip()).pipe(toS3); - }); + toS3 = new ArchiveStreamToS3(config.S3_BUCKET, p, this.client); + + return pipeline(data, gunzip(), toS3); } /** @override */