Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch BYOB Reader to default Reader, to handle Web API StreamReader #786

Merged
merged 1 commit into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ This module is used by [strtok3](https://github.com/Borewit/strtok3)
The `peek-readable` contains one class: `StreamReader`, which reads from a [stream.Readable](https://nodejs.org/api/stream.html#stream_class_stream_readable).

- Class `StreamReader` is used to read from Node.js [stream.Readable](https://nodejs.org/api/stream.html#stream_class_stream_readable).
- Class `WebStreamReader` is used to read from [ReadableStream<Uint8Array>](https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream)
- Class `WebStreamByobReader` is used to read from [ReadableStream<Uint8Array>](https://developer.mozilla.org/docs/Web/API/ReadableStream)

## Compatibility

Expand All @@ -34,7 +34,7 @@ npm install --save peek-readable

## API Documentation

Both `StreamReader` and `WebStreamReader` implement the [IStreamReader interface](#istreamreader-interface).
Both `StreamReader` and `WebStreamByobReader` implement the [IStreamReader interface](#istreamreader-interface).

### `IStreamReader` Interface

Expand Down
3 changes: 2 additions & 1 deletion biome.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"style":{
"useConsistentBuiltinInstantiation": "error",
"useThrowNewError": "error",
"useThrowOnlyError": "error"
"useThrowOnlyError": "error",
"noParameterAssign": { "level": "off"}
}
}
},
Expand Down
13 changes: 4 additions & 9 deletions lib/WebStreamReader.ts → lib/WebStreamByobReader.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
import type { ReadableStream as NodeReadableStream, ReadableStreamBYOBReader } from 'node:stream/web';
import type { ReadableStreamBYOBReader } from 'node:stream/web';
import { EndOfStreamError } from './EndOfStreamError.js';
export { EndOfStreamError } from './EndOfStreamError.js';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<Uint8Array>;

/**
* Read from a WebStream
* Read from a WebStream using a BYOB reader
* Reference: https://nodejs.org/api/webstreams.html#class-readablestreambyobreader
*/
export class WebStreamReader extends AbstractStreamReader {

private reader: ReadableStreamBYOBReader;
export class WebStreamByobReader extends AbstractStreamReader {

public constructor(stream: AnyWebByteStream) {
public constructor(private reader: ReadableStreamBYOBReader) {
super();
this.reader = stream.getReader({ mode: 'byob' }) as ReadableStreamBYOBReader;
}

protected async readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number> {
Expand Down
81 changes: 81 additions & 0 deletions lib/WebStreamDefaultReader.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import type { ReadableStreamDefaultReader } from 'node:stream/web';
import { EndOfStreamError } from './EndOfStreamError.js';
import { AbstractStreamReader } from "./AbstractStreamReader.js";

export class WebStreamDefaultReader extends AbstractStreamReader {
private buffer: Uint8Array | null = null; // Internal buffer to store excess data
private bufferOffset = 0; // Current position in the buffer

public constructor(private reader: ReadableStreamDefaultReader) {
super();
}

protected async readFromStream(buffer: Uint8Array, offset: number, length: number): Promise<number> {
if (this.endOfStream) {
throw new EndOfStreamError();
}

let totalBytesRead = 0;

// Serve from the internal buffer first
if (this.buffer) {
const remainingInBuffer = this.buffer.byteLength - this.bufferOffset;
const toCopy = Math.min(remainingInBuffer, length);
buffer.set(this.buffer.subarray(this.bufferOffset, this.bufferOffset + toCopy), offset);
this.bufferOffset += toCopy;
totalBytesRead += toCopy;
length -= toCopy;
offset += toCopy;

// If the buffer is exhausted, clear it
if (this.bufferOffset >= this.buffer.byteLength) {
this.buffer = null;
this.bufferOffset = 0;
}
}

// Continue reading from the stream if more data is needed
while (length > 0 && !this.endOfStream) {
const result = await this.reader.read();

if (result.done) {
this.endOfStream = true;
break;
}

if (result.value) {
const chunk = result.value;

// If the chunk is larger than the requested length, store the excess
if (chunk.byteLength > length) {
buffer.set(chunk.subarray(0, length), offset);
this.buffer = chunk;
this.bufferOffset = length; // Keep track of the unconsumed part
totalBytesRead += length;
return totalBytesRead;
}

// Otherwise, consume the entire chunk
buffer.set(chunk, offset);
totalBytesRead += chunk.byteLength;
length -= chunk.byteLength;
offset += chunk.byteLength;
}
}

if (totalBytesRead === 0 && this.endOfStream) {
throw new EndOfStreamError();
}

return totalBytesRead;
}

public abort(): Promise<void> {
return this.reader.cancel();
}

public async close(): Promise<void> {
await this.abort();
this.reader.releaseLock();
}
}
14 changes: 14 additions & 0 deletions lib/WebStreamReaderFactory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { type ReadableStream as NodeReadableStream, ReadableStreamDefaultReader } from 'node:stream/web';
import { WebStreamByobReader } from './WebStreamByobReader.js';
import { WebStreamDefaultReader } from './WebStreamDefaultReader.js';

export type AnyWebByteStream = NodeReadableStream<Uint8Array> | ReadableStream<Uint8Array> | ReadableStream;

export function makeWebStreamReader(stream: AnyWebByteStream): WebStreamByobReader | WebStreamDefaultReader {
const reader = stream.getReader({mode: "byob"});
if (reader instanceof ReadableStreamDefaultReader) {
return new WebStreamDefaultReader(reader as ReadableStreamDefaultReader);
}
// Fall back on default reader
return new WebStreamByobReader(reader);
}
8 changes: 7 additions & 1 deletion lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@

import { AbstractStreamReader } from './AbstractStreamReader.js';
import type { WebStreamByobReader } from './WebStreamByobReader.js';

export { EndOfStreamError } from './EndOfStreamError.js';
export { StreamReader } from './StreamReader.js';
export { WebStreamReader, type AnyWebByteStream } from './WebStreamReader.js';
export { WebStreamByobReader } from './WebStreamByobReader.js';
export { WebStreamDefaultReader } from './WebStreamDefaultReader.js';
export type { IStreamReader } from './AbstractStreamReader.js';
export { type AnyWebByteStream, makeWebStreamReader } from './WebStreamReaderFactory.js'
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "peek-readable",
"version": "5.3.1",
"version": "5.4.0-beta.1",
"description": "Read and peek from a readable stream",
"author": {
"name": "Borewit",
Expand Down
48 changes: 34 additions & 14 deletions test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import chaiAsPromised from 'chai-as-promised';
import {EventEmitter} from 'node:events';
import * as fs from 'node:fs';
import {Readable} from 'node:stream';
import {EndOfStreamError, type IStreamReader, StreamReader, WebStreamReader} from '../lib/index.js';
import { EndOfStreamError, type IStreamReader, makeWebStreamReader, StreamReader } from '../lib/index.js';
import {SourceStream, stringToReadableStream} from './util.js';
import type { ReadStream } from 'node:fs';

use(chaiAsPromised);

interface StreamFactorySuite {
description: string;
isDefaultWebReader?: true;
fromString: (input: string, delay?: number) => IStreamReader;
}

Expand All @@ -33,8 +34,12 @@ describe('Matrix', () => {
description: 'Node.js StreamReader',
fromString: (input, delay) => new StreamReader(new SourceStream(input, delay))
}, {
description: 'WebStream Reader',
fromString: (input, delay) => new WebStreamReader(stringToReadableStream(input, delay))
description: 'WebStream BYOB Reader',
fromString: (input, delay) => makeWebStreamReader(stringToReadableStream(input, false, delay))
}, {
description: 'WebStream Default Reader',
isDefaultWebReader: true,
fromString: (input, delay) => makeWebStreamReader(stringToReadableStream(input, true, delay ))
}];

streamFactories
Expand Down Expand Up @@ -68,7 +73,7 @@ describe('Matrix', () => {
assert.strictEqual(bytesRead, 5, 'Should read 5 bytes');
assert.strictEqual(new TextDecoder('latin1').decode(uint8Array), 'peter');

// should should reject at the end of the stream
// should reject at the end of the stream
uint8Array = new Uint8Array(1);
try {
await streamReader.read(uint8Array, 0, 1);
Expand All @@ -86,7 +91,11 @@ describe('Matrix', () => {
return uint8Array[0];
}

it('should support concurrent reads', () => {
it('should support concurrent reads', async function () {

if (factory.isDefaultWebReader) {
this.skip(); // Default web reader does not support concurrent reads
}

const streamReader = factory.fromString('\x00\x01\x02\x03\x04\x05\x06\x07\x08\x09');

Expand All @@ -96,11 +105,10 @@ describe('Matrix', () => {
prom.push(readByteAsNumber(streamReader));
}

return Promise.all(prom).then(res => {
for (let i = 0; i < 10; ++i) {
assert.strictEqual(res[i], i);
}
});
const res = await Promise.all(prom);
for (let i = 0; i < 10; ++i) {
assert.strictEqual(res[i], i);
}

});
});
Expand Down Expand Up @@ -408,14 +416,26 @@ describe('Node.js StreamReader', () => {
});
});

describe('WebStreamReader', () => {
describe('abort() should release stream-lock', () => {

it('`BYOB WebStreamReader`', async () => {

const readableStream = stringToReadableStream('abc', false);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.close();
assert.isFalse(readableStream.locked, 'stream is unlocked after closing tokenizer');
});

it('abort() should release stream-lock', async () => {
it('Default WebStreamReader', async () => {

const readableStream = stringToReadableStream('abc');
const readableStream = stringToReadableStream('abc', true);
assert.isFalse(readableStream.locked, 'stream is unlocked before initializing tokenizer');

const webStreamReader = new WebStreamReader(readableStream);
const webStreamReader = makeWebStreamReader(readableStream);
assert.isTrue(readableStream.locked, 'stream is locked after initializing tokenizer');

await webStreamReader.close();
Expand Down
63 changes: 42 additions & 21 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
// Utilities for testing

import { Readable } from 'node:stream';
import { ReadableStream} from 'node:stream/web';
import { ReadableStream } from 'node:stream/web';

/**
* A mock Node.js readable-stream, using string to read from
*/
export class SourceStream extends Readable {

private buf: Uint8Array;
private readonly buf: Uint8Array;

constructor(private str = '', private delay = 0) {
super();
Expand All @@ -26,7 +26,7 @@ export class SourceStream extends Readable {


// Function to convert a string to a BYOB ReadableStream
function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
function stringReadableStream(inputString: string, delay = 0): ReadableStream<Uint8Array> {
// Convert the string to a Uint8Array using TextEncoder
const encoder = new TextEncoder();
const uint8Array = encoder.encode(inputString);
Expand All @@ -39,31 +39,52 @@ function stringToBYOBStream(inputString: string, delay = 0): ReadableStream<Uint
async pull(controller) {
// Check if there is data left to be pushed
if (position < uint8Array.length) {
// Push the chunk to the controller
const remaining = uint8Array.length - position;

if (controller.byobRequest) {
const remaining = uint8Array.length - position;
// @ts-ignore
const v = controller.byobRequest.view;
const bytesRead = Math.min(remaining, v.byteLength);
v.set(uint8Array.subarray(position, position + bytesRead));
// BYOB path
const view = (controller.byobRequest as ReadableStreamBYOBRequest).view as Uint8Array;
const bytesRead = Math.min(remaining, view.byteLength);
view.set(uint8Array.subarray(position, position + bytesRead));
position += bytesRead;
// @ts-ignore
controller.byobRequest.respond(bytesRead);
(controller.byobRequest as ReadableStreamBYOBRequest).respond(bytesRead);
} else {
setTimeout(() => {
controller.enqueue(uint8Array);
position = uint8Array.length;
}, delay);
}
if (position >= uint8Array.length) {
controller.close();
// Non-BYOB path
const chunk = uint8Array.subarray(position, position + remaining);
position += remaining;

if (delay > 0) {
await new Promise((resolve) => setTimeout(resolve, delay));
}
controller.enqueue(chunk);
}
}

// Close the stream if all data has been pushed
if (position >= uint8Array.length) {
controller.close();
}
},
cancel() {
// Handle stream cancellation
position = uint8Array.length;
}
});
}

// Function to convert a string to a ReadableStreamBYOBReader
export function stringToReadableStream(inputString: string, delay?: number): ReadableStream<Uint8Array> {
return stringToBYOBStream(inputString, delay);
export function stringToReadableStream(inputString: string, forceDefault: boolean, delay?: number): ReadableStream<Uint8Array> {
const stream = stringReadableStream(inputString, delay);
const _getReader = stream.getReader.bind(stream);

// @ts-ignore
stream.getReader = (options?: { mode?: string }) => {
if (forceDefault) {
// Force returning the default reader
return _getReader(); // Call without options for a default reader
}
// @ts-ignore
return _getReader(options); // Pass through other options
};

return stream;
}
Loading