diff --git a/src/table.ts b/src/table.ts index 5da77593f..bcf7606c7 100644 --- a/src/table.ts +++ b/src/table.ts @@ -728,7 +728,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); createReadStream(opts?: GetRowsOptions) { const options = opts || {}; const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3; - let activeRequestStream: AbortableDuplex; + let activeRequestStream: AbortableDuplex | null; let rowKeys: string[]; const ranges = options.ranges || []; let filter: {} | null; @@ -786,7 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); userStream.end = () => { rowStream?.unpipe(userStream); if (activeRequestStream) { - // TODO: properly end the stream instead of abort activeRequestStream.abort(); } return end(); @@ -927,23 +926,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]); - rowStream.on('error', (error: ServiceError) => { - if (IGNORED_STATUS_CODES.has(error.code)) { - // We ignore the `cancelled` "error", since we are the ones who cause - // it when the user calls `.abort()`. - userStream.end(); - return; - } - rowStream.unpipe(userStream); - if ( - numRequestsMade <= maxRetries && - RETRYABLE_STATUS_CODES.has(error.code) - ) { - makeNewRequest(); - } else { - userStream.emit('error', error); - } - }); + rowStream + .on('error', (error: ServiceError) => { + rowStream.unpipe(userStream); + activeRequestStream = null; + if (IGNORED_STATUS_CODES.has(error.code)) { + // We ignore the `cancelled` "error", since we are the ones who cause + // it when the user calls `.abort()`. + userStream.end(); + return; + } + if ( + numRequestsMade <= maxRetries && + RETRYABLE_STATUS_CODES.has(error.code) + ) { + makeNewRequest(); + } else { + userStream.emit('error', error); + } + }) + .on('end', () => { + activeRequestStream = null; + }); rowStream.pipe(userStream); numRequestsMade++; };