Skip to content

Commit

Permalink
fixup: remove compose
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jul 22, 2021
1 parent 6c91184 commit d476062
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 54 deletions.
169 changes: 168 additions & 1 deletion lib/internal/streams/body.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const { createDeferredPromise } = require('internal/util');
const { destroyer } = require('internal/streams/destroy');
const from = require('internal/streams/from');
const assert = require('internal/assert');
const pipeline = require('internal/streams/pipeline');

const {
isBlob
Expand All @@ -25,23 +26,30 @@ const isWritableStream =

const {
isIterable,
isNodeStream,
isReadable,
isWritable,
isDuplexNodeStream,
isReadableNodeStream,
isWritableNodeStream,
} = require('internal/streams/utils');

const {
ArrayIsArray,
JSONParse,
PromiseResolve,
Symbol,
SymbolAsyncIterator
} = primordials;

const {
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_INVALID_RETURN_VALUE,
ERR_INVALID_STATE,
ERR_MISSING_ARGS,
ERR_INVALID_ARG_VALUE,
},
} = require('internal/errors');

Expand Down Expand Up @@ -73,7 +81,10 @@ class Body {
constructor(body, options) {
// TODO (ronag): What about TransformStream?

if (body[kState]) {
if (ArrayIsArray(body)) {
const d = compose(...body);
this[kState] = { readable: d, writable: d };
} else if (body[kState]) {
this[kState] = body[kState];
} else if (
isReadableStream(body?.readable) &&
Expand Down Expand Up @@ -322,4 +333,160 @@ function fromAsyncGen(fn) {
};
}

function compose(...streams) {
if (streams.length === 0) {
throw new ERR_MISSING_ARGS('streams');
}

if (streams.length === 1) {
return new Body(streams[0]);
}

const orgStreams = [...streams];

if (typeof streams[0] === 'function') {
streams[0] = new Body(streams[0]).nodeStream();
}

if (typeof streams[streams.length - 1] === 'function') {
const idx = streams.length - 1;
streams[idx] = new Body(streams[idx]);
}

for (let n = 0; n < streams.length; ++n) {
// TODO(ronag): Relax requirements.

if (!isNodeStream(streams[n])) {
// TODO(ronag): Add checks for non streams.
continue;
}
if (n < streams.length - 1 && !isReadable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be readable'
);
}
if (n > 0 && !isWritable(streams[n])) {
throw new ERR_INVALID_ARG_VALUE(
`streams[${n}]`,
orgStreams[n],
'must be writable'
);
}
}

let ondrain;
let onfinish;
let onreadable;
let onclose;
let d;

function onfinished(err) {
const cb = onclose;
onclose = null;

if (cb) {
cb(err);
} else if (err) {
d.destroy(err);
} else if (!readable && !writable) {
d.destroy();
}
}

const head = streams[0];
const tail = pipeline(streams, onfinished);

const writable = !!isWritable(head);
const readable = !!isReadable(tail);

d = new BodyDuplex({
// TODO (ronag): highWaterMark?
readableObjectMode: !!tail?.readableObjectMode,
writableObjectMode: !!head?.writableObjectMode,
readable,
writable,
});

if (writable) {
d._write = function(chunk, encoding, callback) {
if (head.write(chunk, encoding)) {
callback();
} else {
ondrain = callback;
}
};

d._final = function(callback) {
head.end();
onfinish = callback;
};

head.on('drain', function() {
if (ondrain) {
const cb = ondrain;
ondrain = null;
cb();
}
});

tail.on('finish', function() {
if (onfinish) {
const cb = onfinish;
onfinish = null;
cb();
}
});
}

if (readable) {
tail.on('readable', function() {
if (onreadable) {
const cb = onreadable;
onreadable = null;
cb();
}
});

tail.on('end', function() {
d.push(null);
});

d._read = function() {
while (true) {
const buf = tail.read();

if (buf === null) {
onreadable = d._read;
return;
}

if (!d.push(buf)) {
return;
}
}
};
}

d._destroy = function(err, callback) {
if (!err && onclose !== null) {
err = new AbortError();
}

onreadable = null;
ondrain = null;
onfinish = null;

if (onclose === null) {
callback(err);
} else {
onclose = callback;
destroyer(tail, err);
}
};

return d;
}

module.exports = Body;
1 change: 0 additions & 1 deletion test/parallel/test-bootstrap-modules.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ const expectedModules = new Set([
'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/body',
'NativeModule internal/streams/compose',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
'NativeModule internal/streams/duplexify',
Expand Down
Loading

0 comments on commit d476062

Please sign in to comment.