Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix createReadStream retry from sending a full table scan #1026

Merged
merged 5 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 76 additions & 74 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -729,13 +729,16 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const options = opts || {};
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3;
let activeRequestStream: AbortableDuplex;
let rowKeys: string[] | null;
let rowKeys: string[];
const ranges = options.ranges || [];
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
let filter: {} | null;
let rowsLimit: number;
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;
let numRequestsMade = 0;

rowKeys = options.keys || [];

if (options.start || options.end) {
if (options.ranges || options.prefix || options.prefixes) {
throw new Error(
Expand All @@ -748,10 +751,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
});
}

if (options.keys) {
rowKeys = options.keys;
}

if (options.prefix) {
if (options.ranges || options.start || options.end || options.prefixes) {
throw new Error(
Expand All @@ -772,19 +771,22 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
});
}

if (options.filter) {
filter = Filter.parse(options.filter);
// If rowKeys and ranges are both empty, the request is a full table scan.
// Add an empty range to simplify the resumption logic.
if (rowKeys.length === 0 && ranges.length === 0) {
ranges.push({});
}

if (options.limit) {
rowsLimit = options.limit;
if (options.filter) {
filter = Filter.parse(options.filter);
}

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (activeRequestStream) {
// TODO: properly end the stream instead of abort
activeRequestStream.abort();
}
return end();
Expand All @@ -808,90 +810,90 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
};

if (lastRowKey) {
// TODO: lhs and rhs type shouldn't be string, it could be
// string, number, Uint8Array, boolean. Fix the type
// and clean up the casting.
const lessThan = (lhs: string, rhs: string) => {
const lhsBytes = Mutation.convertToBytes(lhs);
const rhsBytes = Mutation.convertToBytes(rhs);
return (lhsBytes as Buffer).compare(rhsBytes as Uint8Array) === -1;
};
const greaterThan = (lhs: string, rhs: string) => lessThan(rhs, lhs);
const greaterThanOrEqualTo = (lhs: string, rhs: string) =>
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
!lessThan(rhs, lhs);

if (ranges.length === 0) {
ranges.push({
start: {
value: lastRowKey,
inclusive: false,
},
});
} else {
// Readjust and/or remove ranges based on previous valid row reads.

// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const isWithinStart =
!startValue ||
greaterThanOrEqualTo(startValue as string, lastRowKey as string);
const isWithinEnd =
!endValue || lessThan(lastRowKey as string, endValue as string);
if (isWithinStart) {
if (isWithinEnd) {
// The lastRowKey is within this range, adjust the start
// value.
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// The lastRowKey is past this range, remove this range.
ranges.splice(index, 1);
}
const lessThanOrEqualTo = (lhs: string, rhs: string) =>
!greaterThan(lhs, rhs);

// Readjust and/or remove ranges based on previous valid row reads.
// Iterate backward since items may need to be removed.
for (let index = ranges.length - 1; index >= 0; index--) {
const range = ranges[index];
const startValue = is.object(range.start)
? (range.start as BoundData).value
: range.start;
const endValue = is.object(range.end)
? (range.end as BoundData).value
: range.end;
const startKeyIsRead =
!startValue ||
lessThanOrEqualTo(startValue as string, lastRowKey as string);
igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
const endKeyIsNotRead =
!endValue ||
(endValue as Buffer).length === 0 ||
lessThan(lastRowKey as string, endValue as string);
if (startKeyIsRead) {
if (endKeyIsNotRead) {
// EndKey is not read, reset the range to start from lastRowKey open
range.start = {
value: lastRowKey,
inclusive: false,
};
} else {
// EndKey is read, remove this range
ranges.splice(index, 1);
}
}
}

// Remove rowKeys already read.
if (rowKeys) {
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
);
if (rowKeys.length === 0) {
rowKeys = null;
}
}
}
if (rowKeys || ranges.length) {
reqOpts.rows = {};
rowKeys = rowKeys.filter(rowKey =>
greaterThan(rowKey, lastRowKey as string)
);

igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
if (rowKeys) {
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];
// If there was a row limit in the original request and
// we've already read all the rows, end the stream and
// do not retry.
if (hasLimit && rowsLimit === rowsRead) {
userStream.end();
return;
}

if (ranges.length) {
reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);
// If all the row keys and ranges are read, end the stream
// and do not retry.
if (rowKeys.length === 0 && ranges.length === 0) {
userStream.end();
return;
}
}

// Create the new reqOpts
reqOpts.rows = {};

igorbernstein2 marked this conversation as resolved.
Show resolved Hide resolved
// TODO: preprocess all the keys and ranges to Bytes
reqOpts.rows.rowKeys = rowKeys.map(
Mutation.convertToBytes
) as {} as Uint8Array[];

reqOpts.rows.rowRanges = ranges.map(range =>
Filter.createRange(
range.start as BoundData,
range.end as BoundData,
'Key'
)
);

if (filter) {
reqOpts.filter = filter;
}

if (rowsLimit) {
if (hasLimit) {
reqOpts.rowsLimit = rowsLimit - rowsRead;
}

Expand Down
60 changes: 40 additions & 20 deletions system-test/data/read-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
"name": "simple read",
"max_retries": 3,
"request_options": [
{}
{
"rowKeys": [],
"rowRanges": [{}]
}
],
"responses": [
{ "row_keys": [ "a", "b", "c" ] }
Expand All @@ -22,8 +25,10 @@
"name": "retries a failed read",
"max_retries": 3,
"request_options": [
{},
{ "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand All @@ -41,7 +46,18 @@
"name": "fails after all available retries",
"max_retries": 3,
"request_options": [
{}, {}, {}, {}
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [],
"rowRanges": [{}]
}
],
"responses": [
{ "end_with_error": 4 },
Expand All @@ -62,14 +78,16 @@
"name": "resets the retry counter after a successful read",
"max_retries": 3,
"request_options": [
{},
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [],
"rowRanges": [{}]
},
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "a" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a" ], "end_with_error": 4 },
Expand Down Expand Up @@ -98,8 +116,8 @@
}]
},
"request_options": [
{ "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] },
{ "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] }
{ "rowKeys": [], "rowRanges": [ { "startKeyClosed": "a", "endKeyClosed": "z" } ] },
{ "rowKeys": [], "rowRanges": [ { "startKeyOpen": "b", "endKeyClosed": "z" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand All @@ -126,11 +144,13 @@
}]
},
"request_options": [
{ "rowRanges": [
{ "rowKeys": [],
"rowRanges": [
{ "startKeyClosed": "a", "endKeyClosed": "c" },
{ "startKeyClosed": "x", "endKeyClosed": "z" }
] },
{ "rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] }
{ "rowKeys": [],
"rowRanges": [ { "startKeyClosed": "x", "endKeyClosed": "z" } ] }
],
"responses": [
{ "row_keys": [ "a", "b", "c" ], "end_with_error": 4 },
Expand All @@ -151,8 +171,8 @@
"keys": ["a", "b", "x"]
},
"request_options": [
{ "rowKeys": [ "a", "b", "x" ] },
{ "rowKeys": [ "x" ], "rowRanges": [ { "startKeyOpen": "c" } ] }
{ "rowKeys": [ "a", "b", "x" ], "rowRanges": [] },
danieljbruce marked this conversation as resolved.
Show resolved Hide resolved
{ "rowKeys": [ "x" ], "rowRanges": [] }
],
"responses": [
{ "row_keys": [ "a", "b", "c" ], "end_with_error": 4 },
Expand All @@ -172,8 +192,8 @@
"limit": 10
},
"request_options": [
{ "rowsLimit": 10 },
{ "rowsLimit": 8, "rowRanges": [ { "startKeyOpen": "b" } ] }
{ "rowKeys": [], "rowRanges": [{}], "rowsLimit": 10 },
{ "rowsLimit": 8, "rowKeys":[], "rowRanges": [ { "startKeyOpen": "b" } ] }
],
"responses": [
{ "row_keys": [ "a", "b" ], "end_with_error": 4 },
Expand Down
Loading