Skip to content

Commit

Permalink
Prevent concurrent read error
Browse files Browse the repository at this point in the history
Mostly prevented by defensive programming, scoping the deferred promise better.

Fixes: #421
  • Loading branch information
Borewit committed Dec 16, 2021
1 parent 583ab7e commit f132223
Showing 1 changed file with 19 additions and 17 deletions.
36 changes: 19 additions & 17 deletions lib/StreamReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ const maxStreamReadSize = 1 * 1024 * 1024; // Maximum request length on read-str
export class StreamReader {

/**
* Deferred read request
* Deferred used for postponed read request (as not data is yet available to read)
*/
private request: IReadRequest | null = null;
private deferred:Deferred<number> | null = null;

private endOfStream = false;

Expand Down Expand Up @@ -103,46 +103,48 @@ export class StreamReader {
*/
private async _read(buffer: Uint8Array, offset: number, length: number): Promise<number> {

if(this.request) throw new Error('Concurrent read operation?');

const readBuffer = this.s.read(length);

if (readBuffer) {
buffer.set(readBuffer, offset);
return readBuffer.length;
} else {
this.request = {
const request = {
buffer,
offset,
length,
deferred: new Deferred<number>()
};
this.deferred = request.deferred;
this.s.once('readable', () => {
this.tryRead();
this.readDeferred(request);
});
return this.request.deferred.promise;
return request.deferred.promise;
}
}

private tryRead() {
if (!this.request) throw new Error('this.request should be defined');
const readBuffer = this.s.read(this.request.length);
/**
* Process deferred read request
* @param request Deferred read request
*/
private readDeferred(request: IReadRequest) {
const readBuffer = this.s.read(request.length);
if (readBuffer) {
this.request.buffer.set(readBuffer, this.request.offset);
this.request.deferred.resolve(readBuffer.length);
this.request = null;
request.buffer.set(readBuffer, request.offset);
request.deferred.resolve(readBuffer.length);
this.deferred = null;
} else {
this.s.once('readable', () => {
this.tryRead();
this.readDeferred(request);
});
}
}

private reject(err: Error) {
this.endOfStream = true;
if (this.request) {
this.request.deferred.reject(err);
this.request = null;
if (this.deferred) {
this.deferred.reject(err);
this.deferred = null;
}
}
}

0 comments on commit f132223

Please sign in to comment.