Skip to content

Commit

Permalink
fix: Out of order read rows fix (#1231)
Browse files Browse the repository at this point in the history
Client changes:

Emits an error if the chunk transformer receives keys that are out of order. The chunk transformer is a stream that data passes through before it reaches the stream that has the handlers for errors and retries.

Also pulls some comparison functions into a utility file because now those functions are used by the chunk transformer to check to see if rows are in order. They are still used by the table.ts file, but are available in both places now.

Test proxy changes:

Makes it so that when the client emits an error as a result of rows that are out of order, that the error is sent back to the test runner as an rpc message instead of an rpc error. This is done to stay consistent with the test proxy in Java and allows the TestReadRows_NoRetry_OutOfOrderError test case to pass.
  • Loading branch information
danieljbruce authored Feb 7, 2023
1 parent 2d9e759 commit 7dbaa6e
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 21 deletions.
6 changes: 6 additions & 0 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import {Transform, TransformOptions} from 'stream';
import {Bytes, Mutation} from './mutation';
import {TableUtils} from './utils/table';

export type Value = string | number | boolean | Uint8Array;

Expand Down Expand Up @@ -259,6 +260,11 @@ export class ChunkTransformer extends Transform {
errorMessage = 'A new row cannot be reset';
} else if (lastRowKey === newRowKey) {
errorMessage = 'A commit happened but the same key followed';
} else if (
typeof lastRowKey !== 'undefined' &&
TableUtils.lessThanOrEqualTo(newRowKey as string, lastRowKey as string)
) {
errorMessage = 'A row key must be strictly increasing';
} else if (!chunk.familyName) {
errorMessage = 'A family must be set';
} else if (chunk.qualifier === null || chunk.qualifier === undefined) {
Expand Down
21 changes: 6 additions & 15 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -786,18 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
};

if (lastRowKey) {
// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
const lessThan = (lhs: string, rhs: string) => {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
};
const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs);
const lessThanOrEqualTo = (lhs: string, rhs: string) =>
!greaterThan(lhs, rhs);

// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
Expand All @@ -810,11 +798,14 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
: range.end;
const startKeyIsRead =
!startValue ||
lessThanOrEqualTo(startValue as string, lastRowKey as string);
TableUtils.lessThanOrEqualTo(
startValue as string,
lastRowKey as string
);
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
lessThan(lastRowKey as string, endValue as string);
TableUtils.lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
Expand All @@ -831,7 +822,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

// Remove rowKeys already read.
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
TableUtils.greaterThan(rowKey, lastRowKey as string)
);

// If there was a row limit in the original request and
Expand Down
18 changes: 18 additions & 0 deletions src/utils/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

import {GetRowsOptions, PrefixRange} from '../table';
import {Mutation} from '../mutation';

export class TableUtils {
static getRanges(options: GetRowsOptions) {
Expand Down Expand Up @@ -49,6 +50,23 @@ export class TableUtils {
return ranges;
}

// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
static lessThan(lhs: string, rhs: string) {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
}

static greaterThan(lhs: string, rhs: string) {
return this.lessThan(rhs, lhs);
}

static lessThanOrEqualTo(lhs: string, rhs: string) {
return !this.greaterThan(lhs, rhs);
}

static createPrefixRange(start: string): PrefixRange {
const prefix = start.replace(new RegExp('[\xff]+$'), '');
let endKey = '';
Expand Down
15 changes: 9 additions & 6 deletions testproxy/services/read-rows.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,15 @@ const readRows = ({clientMap}) =>
const bigtable = clientMap.get(clientId);
const table = getTableInfo(bigtable, tableName);
const rowsOptions = getRowsOptions(readRowsRequest);
const [rows] = await table.getRows(rowsOptions);

return {
status: {code: grpc.status.OK, details: []},
row: rows.map(getRowResponse),
};
try {
const [rows] = await table.getRows(rowsOptions);
return {
status: {code: grpc.status.OK, details: []},
row: rows.map(getRowResponse),
};
} catch (e) {
return {status: e};
}
});

module.exports = readRows;

0 comments on commit 7dbaa6e

Please sign in to comment.