From 7dbaa6eebc5df6c073c78ff77f38b70c3e6f29d5 Mon Sep 17 00:00:00 2001 From: danieljbruce Date: Tue, 7 Feb 2023 11:49:54 -0500 Subject: [PATCH] fix: Out of order read rows fix (#1231) Client changes: Emits an error if the chunk transformer receives keys that are out of order. The chunk transformer is a stream that data passes through before it reaches the stream that has the handlers for errors and retries. Also pulls some comparison functions into a utility file because now those functions are used by the chunk transformer to check to see if rows are in order. They are still used by the table.ts file, but are available in both places now. Test proxy changes: Makes it so that when the client emits an error as a result of rows that are out of order, that the error is sent back to the test runner as an rpc message instead of an rpc error. This is done to stay consistent with the test proxy in Java and allows the TestReadRows_NoRetry_OutOfOrderError test case to pass. --- src/chunktransformer.ts | 6 ++++++ src/table.ts | 21 ++++++--------------- src/utils/table.ts | 18 ++++++++++++++++++ testproxy/services/read-rows.js | 15 +++++++++------ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/src/chunktransformer.ts b/src/chunktransformer.ts index 6404beada..986ceaf6f 100644 --- a/src/chunktransformer.ts +++ b/src/chunktransformer.ts @@ -14,6 +14,7 @@ import {Transform, TransformOptions} from 'stream'; import {Bytes, Mutation} from './mutation'; +import {TableUtils} from './utils/table'; export type Value = string | number | boolean | Uint8Array; @@ -259,6 +260,11 @@ export class ChunkTransformer extends Transform { errorMessage = 'A new row cannot be reset'; } else if (lastRowKey === newRowKey) { errorMessage = 'A commit happened but the same key followed'; + } else if ( + typeof lastRowKey !== 'undefined' && + TableUtils.lessThanOrEqualTo(newRowKey as string, lastRowKey as string) + ) { + errorMessage = 'A row key must be strictly increasing'; } else if (!chunk.familyName) { errorMessage = 'A family must be set'; } else if (chunk.qualifier === null || chunk.qualifier === undefined) { diff --git a/src/table.ts b/src/table.ts index 71b3f7e3d..e744c9855 100644 --- a/src/table.ts +++ b/src/table.ts @@ -786,18 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); }; if (lastRowKey) { - // TODO: lhs and rhs type shouldn't be string, it could be - // string, number, Uint8Array, boolean. Fix the type - // and clean up the casting. - const lessThan = (lhs: string, rhs: string) => { - const lhsBytes = Mutation.convertToBytes(lhs); - const rhsBytes = Mutation.convertToBytes(rhs); - return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1; - }; - const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs); - const lessThanOrEqualTo = (lhs: string, rhs: string) => - !greaterThan(lhs, rhs); - // Readjust and/or remove ranges based on previous valid row reads. // Iterate backward since items may need to be removed. for (let index = ranges.length - 1; index >= 0; index--) { @@ -810,11 +798,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); : range.end; const startKeyIsRead = !startValue || - lessThanOrEqualTo(startValue as string, lastRowKey as string); + TableUtils.lessThanOrEqualTo( + startValue as string, + lastRowKey as string + ); const endKeyIsNotRead = !endValue || (endValue as Buffer).length === 0 || - lessThan(lastRowKey as string, endValue as string); + TableUtils.lessThan(lastRowKey as string, endValue as string); if (startKeyIsRead) { if (endKeyIsNotRead) { // EndKey is not read, reset the range to start from lastRowKey open @@ -831,7 +822,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); // Remove rowKeys already read. rowKeys = rowKeys.filter(rowKey => - greaterThan(rowKey, lastRowKey as string) + TableUtils.greaterThan(rowKey, lastRowKey as string) ); // If there was a row limit in the original request and diff --git a/src/utils/table.ts b/src/utils/table.ts index af82695d6..8785ad516 100644 --- a/src/utils/table.ts +++ b/src/utils/table.ts @@ -13,6 +13,7 @@ // limitations under the License. import {GetRowsOptions, PrefixRange} from '../table'; +import {Mutation} from '../mutation'; export class TableUtils { static getRanges(options: GetRowsOptions) { @@ -49,6 +50,23 @@ export class TableUtils { return ranges; } + // TODO: lhs and rhs type shouldn't be string, it could be + // string, number, Uint8Array, boolean. Fix the type + // and clean up the casting. + static lessThan(lhs: string, rhs: string) { + const lhsBytes = Mutation.convertToBytes(lhs); + const rhsBytes = Mutation.convertToBytes(rhs); + return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1; + } + + static greaterThan(lhs: string, rhs: string) { + return this.lessThan(rhs, lhs); + } + + static lessThanOrEqualTo(lhs: string, rhs: string) { + return !this.greaterThan(lhs, rhs); + } + static createPrefixRange(start: string): PrefixRange { const prefix = start.replace(new RegExp('[\xff]+$'), ''); let endKey = ''; diff --git a/testproxy/services/read-rows.js b/testproxy/services/read-rows.js index dfae9e4e0..809cc3e5c 100644 --- a/testproxy/services/read-rows.js +++ b/testproxy/services/read-rows.js @@ -65,12 +65,15 @@ const readRows = ({clientMap}) => const bigtable = clientMap.get(clientId); const table = getTableInfo(bigtable, tableName); const rowsOptions = getRowsOptions(readRowsRequest); - const [rows] = await table.getRows(rowsOptions); - - return { - status: {code: grpc.status.OK, details: []}, - row: rows.map(getRowResponse), - }; + try { + const [rows] = await table.getRows(rowsOptions); + return { + status: {code: grpc.status.OK, details: []}, + row: rows.map(getRowResponse), + }; + } catch (e) { + return {status: e}; + } }); module.exports = readRows;