Skip to content

Commit

Permalink
Stop using stream.pipe() (#1307)
Browse files Browse the repository at this point in the history
stream.pipe() should rarely be used in practice because:

1. it can cause memory leaks, and
2. it does not propogate errors between streams

PartialPipe.pipeline() explicitly notes this and provides mitigations for it.

See:

* https://nodejs.org/api/stream.html#readablepipedestination-options
* https://stackoverflow.com/questions/58875655/whats-the-difference-between-pipe-and-pipeline-on-streams
  • Loading branch information
alxndrsn authored Feb 10, 2025
1 parent 2a9e1bc commit 0943c61
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 11 deletions.
6 changes: 3 additions & 3 deletions lib/data/briefcase.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { Transform, pipeline } = require('stream');
const { Transform } = require('stream');
const hparser = require('htmlparser2');
const { identity, last } = require('ramda');
const csv = require('csv-stringify');
Expand Down Expand Up @@ -164,8 +164,8 @@ const processRow = (xml, instanceId, fields, header, selectValues) => new Promis
}
}, { xmlMode: true, decodeEntities: true });

if (typeof xml.pipe === 'function') {
pipeline(xml, parser, rejectIfError(reject));
if (xml instanceof PartialPipe) {
xml.with(parser).pipeline(rejectIfError(reject));
} else {
parser.write(xml);
parser.end();
Expand Down
7 changes: 4 additions & 3 deletions lib/data/client-audits.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
// including this file, may be copied, modified, propagated, or distributed
// except according to the terms contained in the LICENSE file.

const { Transform, pipeline } = require('stream');
const { Transform } = require('stream');
const parse = require('csv-parse');
const csv = require('csv-stringify');
const sanitize = require('sanitize-filename');
const { PartialPipe } = require('../util/stream');
const { zipPart } = require('../util/zip');

const headers = [ 'event', 'node', 'start', 'end', 'latitude', 'longitude', 'accuracy', 'old-value', 'new-value' ];
Expand Down Expand Up @@ -54,8 +55,8 @@ const parseClientAudits = (input) => {

return new Promise((pass, fail) => {
// feed either the stream or the buffer into the parser now.
if (input.pipe != null) {
pipeline(input, parser, (err) => { if (err != null) fail(err); });
if (input instanceof PartialPipe) {
input.with(parser).pipeline((err) => { if (err != null) fail(err); });
} else {
parser.write(input);
parser.end();
Expand Down
8 changes: 5 additions & 3 deletions lib/task/fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const archiver = require('archiver');
const yauzl = require('yauzl');
const Problem = require('../util/problem');
const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto');
const { PartialPipe } = require('../util/stream');


// given a directory containing files, a path to a tmpfile, and keyinfo data,
Expand All @@ -31,14 +32,15 @@ const { generateLocalCipherer, getLocalDecipherer } = require('../util/crypto');
const encryptToArchive = (directory, tmpFilePath, keys) => {
const outStream = createWriteStream(tmpFilePath);
const zipStream = archiver('zip', { zlib: { level: 9 } });
zipStream.pipe(outStream);

// create a cipher-generator for use below.
const [ localkey, cipherer ] = generateLocalCipherer(keys);
const local = { key: localkey, ivs: {} };

// call up all files in the directory.
return promisify(readdir)(directory).then((files) => new Promise((resolve, reject) => {
PartialPipe.of(zipStream, outStream).pipeline(reject);

// stream each file into the zip, encrypting on the way in. clean up each
// plaintext file as soon as we're done with them.
// TODO: copypasted for now to lib/resources/backup
Expand All @@ -48,7 +50,7 @@ const encryptToArchive = (directory, tmpFilePath, keys) => {
local.ivs[basename(file)] = iv.toString('base64');

const readStream = createReadStream(filePath);
zipStream.append(readStream.pipe(cipher), { name: file });
zipStream.append(PartialPipe.of(readStream, cipher).pipeline(reject), { name: file });
readStream.on('end', () => unlinkSync(filePath)); // sync to ensure completion.
}

Expand Down Expand Up @@ -138,7 +140,7 @@ const decryptFromArchive = (archivePath, directory, passphrase = '') =>
if (completed === entries.length) resolve();
});
decipher.on('error', () => reject(Problem.user.undecryptable()));
inStream.pipe(decipher).pipe(outStream);
PartialPipe.of(inStream, decipher, outStream).pipeline(reject);
});
});
}, reject);
Expand Down
5 changes: 3 additions & 2 deletions lib/util/crypto.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const digest = require('digest-stream');
const { createHash, randomBytes, generateKeyPair, pbkdf2, createPrivateKey, createPublicKey, createCipheriv, createDecipheriv, publicEncrypt, privateDecrypt } = require('crypto');
const { RSA_NO_PADDING } = require('crypto').constants;
const { Transform } = require('stream');
const { PartialPipe } = require('./stream');
const { unpadPkcs1OaepMgf1Sha256 } = require('./quarantine/oaep');
const { unpadPkcs7 } = require('./quarantine/pkcs7');
const { promisify } = require('util');
Expand Down Expand Up @@ -261,9 +262,9 @@ const streamSubmissionCleartext = (key, iv, input) => {
});

if (typeof input.pipe === 'function') {
return input.pipe(decipher).pipe(transform);
return PartialPipe.of(input, decipher, transform);
} else {
const result = decipher.pipe(transform);
const result = PartialPipe.of(decipher, transform);
decipher.write(input);
decipher.end();
return result;
Expand Down

0 comments on commit 0943c61

Please sign in to comment.