From 54b36e401d2b72d95e5f1dbbc787f6beed639347 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Sun, 15 Mar 2020 15:20:46 +0100 Subject: [PATCH] fs: reimplement read and write streams using stream.construct Refs: #23133 PR-URL: https://github.com/nodejs/node/pull/29656 Reviewed-By: Matteo Collina Reviewed-By: Anna Henningsen Reviewed-By: Denys Otrishko --- doc/api/stream.md | 8 +- lib/internal/fs/streams.js | 230 ++++++++---------- test/parallel/test-file-write-stream.js | 2 +- .../test-fs-read-stream-patch-open.js | 8 +- test/parallel/test-fs-stream-construct.js | 209 ++++++++++++++++ .../test-fs-write-stream-patch-open.js | 8 +- 6 files changed, 321 insertions(+), 144 deletions(-) create mode 100644 test/parallel/test-fs-stream-construct.js diff --git a/doc/api/stream.md b/doc/api/stream.md index e8a3050291e9c3..1c9a40bc9fca70 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -2242,10 +2242,10 @@ The `_construct()` method MUST NOT be called directly. It may be implemented by child classes, and if so, will be called by the internal `Readable` class methods only. -This optional function will be called by the stream constructor, -delaying any `_read` and `_destroy` calls until `callback` is called. This is -useful to initialize state or asynchronously initialize resources before the -stream can be used. +This optional function will be scheduled in the next tick by the stream +constructor, delaying any `_read` and `_destroy` calls until `callback` is +called. This is useful to initialize state or asynchronously initialize +resources before the stream can be used. ```js const { Readable } = require('stream'); diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index 9ce21721faad80..9e6050139dc79a 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -10,11 +10,11 @@ const { const { ERR_INVALID_ARG_TYPE, - ERR_OUT_OF_RANGE, - ERR_STREAM_DESTROYED + ERR_OUT_OF_RANGE } = require('internal/errors').codes; const { deprecate } = require('internal/util'); const { validateInteger } = require('internal/validators'); +const { errorOrDestroy } = require('internal/streams/destroy'); const fs = require('fs'); const { Buffer } = require('buffer'); const { @@ -49,6 +49,57 @@ function roundUpToMultipleOf8(n) { return (n + 7) & ~7; // Align to 8 byte boundary. } +function _construct(callback) { + const stream = this; + if (typeof stream.fd === 'number') { + callback(); + return; + } + + if (stream.open !== openWriteFs && stream.open !== openReadFs) { + // Backwards compat for monkey patching open(). + const orgEmit = stream.emit; + stream.emit = function(...args) { + if (args[0] === 'open') { + this.emit = orgEmit; + callback(); + orgEmit.apply(this, args); + } else if (args[0] === 'error') { + this.emit = orgEmit; + callback(args[1]); + } else { + orgEmit.apply(this, args); + } + }; + stream.open(); + } else { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { + if (er) { + callback(er); + } else { + stream.fd = fd; + callback(); + stream.emit('open', stream.fd); + stream.emit('ready'); + } + }); + } +} + +function close(stream, err, cb) { + if (!stream.fd) { + // TODO(ronag) + // stream.closed = true; + cb(err); + } else { + stream[kFs].close(stream.fd, (er) => { + stream.closed = true; + cb(er || err); + }); + stream.fd = null; + } +} + function ReadStream(path, options) { if (!(this instanceof ReadStream)) return new ReadStream(path, options); @@ -79,7 +130,8 @@ function ReadStream(path, options) { this[kFs].close); } - Readable.call(this, options); + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); @@ -89,7 +141,6 @@ function ReadStream(path, options) { this.start = options.start; this.end = options.end; - this.autoClose = options.autoClose === undefined ? true : options.autoClose; this.pos = undefined; this.bytesRead = 0; this.closed = false; @@ -115,56 +166,28 @@ function ReadStream(path, options) { } } - if (typeof this.fd !== 'number') - _openReadFs(this); - - this.on('end', function() { - if (this.autoClose) { - this.destroy(); - } - }); + Readable.call(this, options); } ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype); ObjectSetPrototypeOf(ReadStream, Readable); +ObjectDefineProperty(ReadStream.prototype, 'autoClose', { + get() { + return this._readableState.autoDestroy; + }, + set(val) { + this._readableState.autoDestroy = val; + } +}); + const openReadFs = deprecate(function() { - _openReadFs(this); + // Noop. }, 'ReadStream.prototype.open() is deprecated', 'DEP0135'); ReadStream.prototype.open = openReadFs; -function _openReadFs(stream) { - // Backwards compat for overriden open. - if (stream.open !== openReadFs) { - stream.open(); - return; - } - - stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { - if (er) { - if (stream.autoClose) { - stream.destroy(); - } - stream.emit('error', er); - return; - } - - stream.fd = fd; - stream.emit('open', fd); - stream.emit('ready'); - // Start the flow of data. - stream.read(); - }); -} +ReadStream.prototype._construct = _construct; ReadStream.prototype._read = function(n) { - if (typeof this.fd !== 'number') { - return this.once('open', function() { - this._read(n); - }); - } - - if (this.destroyed) return; - if (!pool || pool.length - pool.used < kMinPoolSpace) { // Discard the old pool. allocNewPool(this.readableHighWaterMark); @@ -189,17 +212,14 @@ ReadStream.prototype._read = function(n) { // the actual read. this[kIsPerformingIO] = true; - this[kFs].read( - this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kFs] + .read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) return this.emit(kIoDone, er); if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); + errorOrDestroy(this, er); } else { let b = null; // Now that we know how much data we have actually read, re-wind the @@ -235,28 +255,13 @@ ReadStream.prototype._read = function(n) { }; ReadStream.prototype._destroy = function(err, cb) { - if (typeof this.fd !== 'number') { - this.once('open', closeFsStream.bind(null, this, cb, err)); - return; - } - if (this[kIsPerformingIO]) { - this.once(kIoDone, (er) => closeFsStream(this, cb, err || er)); - return; + this.once(kIoDone, (er) => close(this, err || er, cb)); + } else { + close(this, err, cb); } - - closeFsStream(this, cb, err); }; -function closeFsStream(stream, cb, err) { - stream[kFs].close(stream.fd, (er) => { - stream.closed = true; - cb(er || err); - }); - - stream.fd = null; -} - ReadStream.prototype.close = function(cb) { if (typeof cb === 'function') finished(this, cb); this.destroy(); @@ -276,11 +281,6 @@ function WriteStream(path, options) { // Only buffers are supported. options.decodeStrings = true; - if (options.autoDestroy === undefined) { - options.autoDestroy = options.autoClose === undefined ? - true : (options.autoClose || false); - } - this[kFs] = options.fs || fs; if (typeof this[kFs].open !== 'function') { throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', @@ -315,7 +315,8 @@ function WriteStream(path, options) { this._writev = null; } - Writable.call(this, options); + options.autoDestroy = options.autoClose === undefined ? + true : options.autoClose; // Path will be ignored when fd is specified, so it can be falsy this.path = toPathIfFileURL(path); @@ -324,7 +325,6 @@ function WriteStream(path, options) { this.mode = options.mode === undefined ? 0o666 : options.mode; this.start = options.start; - this.autoClose = options.autoDestroy; this.pos = undefined; this.bytesWritten = 0; this.closed = false; @@ -336,67 +336,36 @@ function WriteStream(path, options) { this.pos = this.start; } + Writable.call(this, options); + if (options.encoding) this.setDefaultEncoding(options.encoding); - - if (typeof this.fd !== 'number') - _openWriteFs(this); } ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype); ObjectSetPrototypeOf(WriteStream, Writable); -WriteStream.prototype._final = function(callback) { - if (typeof this.fd !== 'number') { - return this.once('open', function() { - this._final(callback); - }); +ObjectDefineProperty(WriteStream.prototype, 'autoClose', { + get() { + return this._writableState.autoDestroy; + }, + set(val) { + this._writableState.autoDestroy = val; } - - callback(); -}; +}); const openWriteFs = deprecate(function() { - _openWriteFs(this); + // Noop. }, 'WriteStream.prototype.open() is deprecated', 'DEP0135'); WriteStream.prototype.open = openWriteFs; -function _openWriteFs(stream) { - // Backwards compat for overriden open. - if (stream.open !== openWriteFs) { - stream.open(); - return; - } - - stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { - if (er) { - if (stream.autoClose) { - stream.destroy(); - } - stream.emit('error', er); - return; - } - - stream.fd = fd; - stream.emit('open', fd); - stream.emit('ready'); - }); -} - +WriteStream.prototype._construct = _construct; WriteStream.prototype._write = function(data, encoding, cb) { - if (typeof this.fd !== 'number') { - return this.once('open', function() { - this._write(data, encoding, cb); - }); - } - - if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); - this[kIsPerformingIO] = true; this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { + // Tell ._destroy() that it's safe to close the fd now. cb(er); return this.emit(kIoDone, er); } @@ -404,6 +373,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { if (er) { return cb(er); } + this.bytesWritten += bytes; cb(); }); @@ -412,16 +382,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { this.pos += data.length; }; - WriteStream.prototype._writev = function(data, cb) { - if (typeof this.fd !== 'number') { - return this.once('open', function() { - this._writev(data, cb); - }); - } - - if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); - const len = data.length; const chunks = new Array(len); let size = 0; @@ -436,18 +397,16 @@ WriteStream.prototype._writev = function(data, cb) { this[kIsPerformingIO] = true; this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { + // Tell ._destroy() that it's safe to close the fd now. cb(er); return this.emit(kIoDone, er); } if (er) { - if (this.autoClose) { - this.destroy(er); - } return cb(er); } + this.bytesWritten += bytes; cb(); }); @@ -456,8 +415,13 @@ WriteStream.prototype._writev = function(data, cb) { this.pos += size; }; - -WriteStream.prototype._destroy = ReadStream.prototype._destroy; +WriteStream.prototype._destroy = function(err, cb) { + if (this[kIsPerformingIO]) { + this.once(kIoDone, (er) => close(this, err || er, cb)); + } else { + close(this, err, cb); + } +}; WriteStream.prototype.close = function(cb) { if (cb) { if (this.closed) { diff --git a/test/parallel/test-file-write-stream.js b/test/parallel/test-file-write-stream.js index 6cab71a4353ff2..09f2063e5623f3 100644 --- a/test/parallel/test-file-write-stream.js +++ b/test/parallel/test-file-write-stream.js @@ -62,12 +62,12 @@ file assert.strictEqual(file.bytesWritten, EXPECTED.length * 2); callbacks.close++; - console.error('write after end should not be allowed'); file.write('should not work anymore', common.expectsError({ code: 'ERR_STREAM_WRITE_AFTER_END', name: 'Error', message: 'write after end' })); + file.on('error', common.mustNotCall()); fs.unlinkSync(fn); }); diff --git a/test/parallel/test-fs-read-stream-patch-open.js b/test/parallel/test-fs-read-stream-patch-open.js index c658105132b155..6fa97737b187a3 100644 --- a/test/parallel/test-fs-read-stream-patch-open.js +++ b/test/parallel/test-fs-read-stream-patch-open.js @@ -10,6 +10,8 @@ const s = fs.createReadStream('asd') .on('error', () => {}); s.open(); -// Allow overriding open(). -fs.ReadStream.prototype.open = common.mustCall(); -fs.createReadStream('asd'); +process.nextTick(() => { + // Allow overriding open(). + fs.ReadStream.prototype.open = common.mustCall(); + fs.createReadStream('asd'); +}); diff --git a/test/parallel/test-fs-stream-construct.js b/test/parallel/test-fs-stream-construct.js new file mode 100644 index 00000000000000..4e6bf0285fc91e --- /dev/null +++ b/test/parallel/test-fs-stream-construct.js @@ -0,0 +1,209 @@ +'use strict'; + +const common = require('../common'); +const fs = require('fs'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); + +const tmpdir = require('../common/tmpdir'); +tmpdir.refresh(); + +const examplePath = fixtures.path('x.txt'); + +{ + // Compat with old node. + + function ReadStream(...args) { + fs.ReadStream.call(this, ...args); + } + Object.setPrototypeOf(ReadStream.prototype, fs.ReadStream.prototype); + Object.setPrototypeOf(ReadStream, fs.ReadStream); + + ReadStream.prototype.open = common.mustCall(function() { + fs.open(this.path, this.flags, this.mode, (er, fd) => { + if (er) { + if (this.autoClose) { + this.destroy(); + } + this.emit('error', er); + return; + } + + this.fd = fd; + this.emit('open', fd); + this.emit('ready'); + }); + }); + + let readyCalled = false; + let ticked = false; + const r = new ReadStream(examplePath) + .on('ready', common.mustCall(() => { + readyCalled = true; + // Make sure 'ready' is emitted in same tick as 'open'. + assert.strictEqual(ticked, false); + })) + .on('error', common.mustNotCall()) + .on('open', common.mustCall((fd) => { + process.nextTick(() => { + ticked = true; + r.destroy(); + }); + assert.strictEqual(readyCalled, false); + assert.strictEqual(fd, r.fd); + })); +} + +{ + // Compat with old node. + + function WriteStream(...args) { + fs.WriteStream.call(this, ...args); + } + Object.setPrototypeOf(WriteStream.prototype, fs.WriteStream.prototype); + Object.setPrototypeOf(WriteStream, fs.WriteStream); + + WriteStream.prototype.open = common.mustCall(function() { + fs.open(this.path, this.flags, this.mode, (er, fd) => { + if (er) { + if (this.autoClose) { + this.destroy(); + } + this.emit('error', er); + return; + } + + this.fd = fd; + this.emit('open', fd); + this.emit('ready'); + }); + }); + + let readyCalled = false; + let ticked = false; + const w = new WriteStream(`${tmpdir.path}/dummy`) + .on('ready', common.mustCall(() => { + readyCalled = true; + // Make sure 'ready' is emitted in same tick as 'open'. + assert.strictEqual(ticked, false); + })) + .on('error', common.mustNotCall()) + .on('open', common.mustCall((fd) => { + process.nextTick(() => { + ticked = true; + w.destroy(); + }); + assert.strictEqual(readyCalled, false); + assert.strictEqual(fd, w.fd); + })); +} + +{ + // Compat with graceful-fs. + + function ReadStream(...args) { + fs.ReadStream.call(this, ...args); + } + Object.setPrototypeOf(ReadStream.prototype, fs.ReadStream.prototype); + Object.setPrototypeOf(ReadStream, fs.ReadStream); + + ReadStream.prototype.open = common.mustCall(function ReadStream$open() { + const that = this; + fs.open(that.path, that.flags, that.mode, (err, fd) => { + if (err) { + if (that.autoClose) + that.destroy(); + + that.emit('error', err); + } else { + that.fd = fd; + that.emit('open', fd); + that.read(); + } + }); + }); + + const r = new ReadStream(examplePath) + .on('open', common.mustCall((fd) => { + assert.strictEqual(fd, r.fd); + r.destroy(); + })); +} + +{ + // Compat with graceful-fs. + + function WriteStream(...args) { + fs.WriteStream.call(this, ...args); + } + Object.setPrototypeOf(WriteStream.prototype, fs.WriteStream.prototype); + Object.setPrototypeOf(WriteStream, fs.WriteStream); + + WriteStream.prototype.open = common.mustCall(function WriteStream$open() { + const that = this; + fs.open(that.path, that.flags, that.mode, function(err, fd) { + if (err) { + that.destroy(); + that.emit('error', err); + } else { + that.fd = fd; + that.emit('open', fd); + } + }); + }); + + const w = new WriteStream(`${tmpdir.path}/dummy2`) + .on('open', common.mustCall((fd) => { + assert.strictEqual(fd, w.fd); + w.destroy(); + })); +} + +{ + // Compat error. + + function ReadStream(...args) { + fs.ReadStream.call(this, ...args); + } + Object.setPrototypeOf(ReadStream.prototype, fs.ReadStream.prototype); + Object.setPrototypeOf(ReadStream, fs.ReadStream); + + ReadStream.prototype.open = common.mustCall(function ReadStream$open() { + const that = this; + fs.open(that.path, that.flags, that.mode, (err, fd) => { + that.emit('error', err); + }); + }); + + const r = new ReadStream('/doesnotexist', { emitClose: true }) + .on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'ENOENT'); + assert.strictEqual(r.destroyed, true); + r.on('close', common.mustCall()); + })); +} + +{ + // Compat error. + + function WriteStream(...args) { + fs.WriteStream.call(this, ...args); + } + Object.setPrototypeOf(WriteStream.prototype, fs.WriteStream.prototype); + Object.setPrototypeOf(WriteStream, fs.WriteStream); + + WriteStream.prototype.open = common.mustCall(function WriteStream$open() { + const that = this; + fs.open(that.path, that.flags, that.mode, (err, fd) => { + that.emit('error', err); + }); + }); + + const w = new WriteStream(`${tmpdir.path}/dummy2`, + { flags: 'wx+', emitClose: true }) + .on('error', common.mustCall((err) => { + assert.strictEqual(err.code, 'EEXIST'); + w.destroy(); + w.on('close', common.mustCall()); + })); +} diff --git a/test/parallel/test-fs-write-stream-patch-open.js b/test/parallel/test-fs-write-stream-patch-open.js index ac683785cbfec7..9e7bb06af528a5 100644 --- a/test/parallel/test-fs-write-stream-patch-open.js +++ b/test/parallel/test-fs-write-stream-patch-open.js @@ -29,6 +29,8 @@ common.expectWarning( const s = fs.createWriteStream(`${tmpdir.path}/out`); s.open(); -// Allow overriding open(). -fs.WriteStream.prototype.open = common.mustCall(); -fs.createWriteStream('asd'); +process.nextTick(() => { + // Allow overriding open(). + fs.WriteStream.prototype.open = common.mustCall(); + fs.createWriteStream('asd'); +});