Skip to content

Commit

Permalink
fs: reimplement read and write streams using stream.construct
Browse files Browse the repository at this point in the history
Refs: #23133

PR-URL: #29656
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
  • Loading branch information
ronag committed May 27, 2020
1 parent fb8cc72 commit 54b36e4
Show file tree
Hide file tree
Showing 6 changed files with 321 additions and 144 deletions.
8 changes: 4 additions & 4 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2242,10 +2242,10 @@ The `_construct()` method MUST NOT be called directly. It may be implemented
by child classes, and if so, will be called by the internal `Readable`
class methods only.

This optional function will be called by the stream constructor,
delaying any `_read` and `_destroy` calls until `callback` is called. This is
useful to initialize state or asynchronously initialize resources before the
stream can be used.
This optional function will be scheduled in the next tick by the stream
constructor, delaying any `_read` and `_destroy` calls until `callback` is
called. This is useful to initialize state or asynchronously initialize
resources before the stream can be used.

```js
const { Readable } = require('stream');
Expand Down
230 changes: 97 additions & 133 deletions lib/internal/fs/streams.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ const {

const {
ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
ERR_OUT_OF_RANGE
} = require('internal/errors').codes;
const { deprecate } = require('internal/util');
const { validateInteger } = require('internal/validators');
const { errorOrDestroy } = require('internal/streams/destroy');
const fs = require('fs');
const { Buffer } = require('buffer');
const {
Expand Down Expand Up @@ -49,6 +49,57 @@ function roundUpToMultipleOf8(n) {
return (n + 7) & ~7; // Align to 8 byte boundary.
}

function _construct(callback) {
const stream = this;
if (typeof stream.fd === 'number') {
callback();
return;
}

if (stream.open !== openWriteFs && stream.open !== openReadFs) {
// Backwards compat for monkey patching open().
const orgEmit = stream.emit;
stream.emit = function(...args) {
if (args[0] === 'open') {
this.emit = orgEmit;
callback();
orgEmit.apply(this, args);
} else if (args[0] === 'error') {
this.emit = orgEmit;
callback(args[1]);
} else {
orgEmit.apply(this, args);
}
};
stream.open();
} else {
stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
callback(er);
} else {
stream.fd = fd;
callback();
stream.emit('open', stream.fd);
stream.emit('ready');
}
});
}
}

function close(stream, err, cb) {
if (!stream.fd) {
// TODO(ronag)
// stream.closed = true;
cb(err);
} else {
stream[kFs].close(stream.fd, (er) => {
stream.closed = true;
cb(er || err);
});
stream.fd = null;
}
}

function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
Expand Down Expand Up @@ -79,7 +130,8 @@ function ReadStream(path, options) {
this[kFs].close);
}

Readable.call(this, options);
options.autoDestroy = options.autoClose === undefined ?
true : options.autoClose;

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
Expand All @@ -89,7 +141,6 @@ function ReadStream(path, options) {

this.start = options.start;
this.end = options.end;
this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
Expand All @@ -115,56 +166,28 @@ function ReadStream(path, options) {
}
}

if (typeof this.fd !== 'number')
_openReadFs(this);

this.on('end', function() {
if (this.autoClose) {
this.destroy();
}
});
Readable.call(this, options);
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);

ObjectDefineProperty(ReadStream.prototype, 'autoClose', {
get() {
return this._readableState.autoDestroy;
},
set(val) {
this._readableState.autoDestroy = val;
}
});

const openReadFs = deprecate(function() {
_openReadFs(this);
// Noop.
}, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
ReadStream.prototype.open = openReadFs;

function _openReadFs(stream) {
// Backwards compat for overriden open.
if (stream.open !== openReadFs) {
stream.open();
return;
}

stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
}
stream.emit('error', er);
return;
}

stream.fd = fd;
stream.emit('open', fd);
stream.emit('ready');
// Start the flow of data.
stream.read();
});
}
ReadStream.prototype._construct = _construct;

ReadStream.prototype._read = function(n) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._read(n);
});
}

if (this.destroyed) return;

if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
Expand All @@ -189,17 +212,14 @@ ReadStream.prototype._read = function(n) {

// the actual read.
this[kIsPerformingIO] = true;
this[kFs].read(
this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kFs]
.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);

if (er) {
if (this.autoClose) {
this.destroy();
}
this.emit('error', er);
errorOrDestroy(this, er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
Expand Down Expand Up @@ -235,28 +255,13 @@ ReadStream.prototype._read = function(n) {
};

ReadStream.prototype._destroy = function(err, cb) {
if (typeof this.fd !== 'number') {
this.once('open', closeFsStream.bind(null, this, cb, err));
return;
}

if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
return;
this.once(kIoDone, (er) => close(this, err || er, cb));
} else {
close(this, err, cb);
}

closeFsStream(this, cb, err);
};

function closeFsStream(stream, cb, err) {
stream[kFs].close(stream.fd, (er) => {
stream.closed = true;
cb(er || err);
});

stream.fd = null;
}

ReadStream.prototype.close = function(cb) {
if (typeof cb === 'function') finished(this, cb);
this.destroy();
Expand All @@ -276,11 +281,6 @@ function WriteStream(path, options) {
// Only buffers are supported.
options.decodeStrings = true;

if (options.autoDestroy === undefined) {
options.autoDestroy = options.autoClose === undefined ?
true : (options.autoClose || false);
}

this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
Expand Down Expand Up @@ -315,7 +315,8 @@ function WriteStream(path, options) {
this._writev = null;
}

Writable.call(this, options);
options.autoDestroy = options.autoClose === undefined ?
true : options.autoClose;

// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
Expand All @@ -324,7 +325,6 @@ function WriteStream(path, options) {
this.mode = options.mode === undefined ? 0o666 : options.mode;

this.start = options.start;
this.autoClose = options.autoDestroy;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
Expand All @@ -336,74 +336,44 @@ function WriteStream(path, options) {
this.pos = this.start;
}

Writable.call(this, options);

if (options.encoding)
this.setDefaultEncoding(options.encoding);

if (typeof this.fd !== 'number')
_openWriteFs(this);
}
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
ObjectSetPrototypeOf(WriteStream, Writable);

WriteStream.prototype._final = function(callback) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._final(callback);
});
ObjectDefineProperty(WriteStream.prototype, 'autoClose', {
get() {
return this._writableState.autoDestroy;
},
set(val) {
this._writableState.autoDestroy = val;
}

callback();
};
});

const openWriteFs = deprecate(function() {
_openWriteFs(this);
// Noop.
}, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
WriteStream.prototype.open = openWriteFs;

function _openWriteFs(stream) {
// Backwards compat for overriden open.
if (stream.open !== openWriteFs) {
stream.open();
return;
}

stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
}
stream.emit('error', er);
return;
}

stream.fd = fd;
stream.emit('open', fd);
stream.emit('ready');
});
}

WriteStream.prototype._construct = _construct;

WriteStream.prototype._write = function(data, encoding, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._write(data, encoding, cb);
});
}

if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
return cb(er);
}

this.bytesWritten += bytes;
cb();
});
Expand All @@ -412,16 +382,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
this.pos += data.length;
};


WriteStream.prototype._writev = function(data, cb) {
if (typeof this.fd !== 'number') {
return this.once('open', function() {
this._writev(data, cb);
});
}

if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));

const len = data.length;
const chunks = new Array(len);
let size = 0;
Expand All @@ -436,18 +397,16 @@ WriteStream.prototype._writev = function(data, cb) {
this[kIsPerformingIO] = true;
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
// Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}

if (er) {
if (this.autoClose) {
this.destroy(er);
}
return cb(er);
}

this.bytesWritten += bytes;
cb();
});
Expand All @@ -456,8 +415,13 @@ WriteStream.prototype._writev = function(data, cb) {
this.pos += size;
};


WriteStream.prototype._destroy = ReadStream.prototype._destroy;
WriteStream.prototype._destroy = function(err, cb) {
if (this[kIsPerformingIO]) {
this.once(kIoDone, (er) => close(this, err || er, cb));
} else {
close(this, err, cb);
}
};
WriteStream.prototype.close = function(cb) {
if (cb) {
if (this.closed) {
Expand Down
Loading

0 comments on commit 54b36e4

Please sign in to comment.