diff --git a/packages/eventstream-serde-node/src/EventStreamMarshaller.ts b/packages/eventstream-serde-node/src/EventStreamMarshaller.ts index 184ba7ebd0c5a..c6b4e9e72959b 100644 --- a/packages/eventstream-serde-node/src/EventStreamMarshaller.ts +++ b/packages/eventstream-serde-node/src/EventStreamMarshaller.ts @@ -29,38 +29,6 @@ export class EventStreamMarshaller { } serialize(input: AsyncIterable, serializer: (event: T) => Message): Readable { - const serializedIterable = this.universalMarshaller.serialize(input, serializer); - if (typeof Readable.from === "function") { - //reference: https://nodejs.org/dist/latest-v13.x/docs/api/stream.html#stream_new_stream_readable_options - return Readable.from(serializedIterable); - } else { - const iterator = serializedIterable[Symbol.asyncIterator](); - const serializedStream = new Readable({ - autoDestroy: true, - objectMode: true, - async read() { - iterator - .next() - .then(({ done, value }) => { - if (done) { - this.push(null); - } else { - this.push(value); - } - }) - .catch((err) => { - this.destroy(err); - }); - }, - }); - //TODO: use 'autoDestroy' when targeting Node 11 - serializedStream.on("error", () => { - serializedStream.destroy(); - }); - serializedStream.on("end", () => { - serializedStream.destroy(); - }); - return serializedStream; - } + return Readable.from(this.universalMarshaller.serialize(input, serializer)); } }