Skip to content

Commit

Permalink
feat: add a query paginator (#1530)
Browse files Browse the repository at this point in the history
* feat: add a query paginator

* add some comments

* add a test for full table scan

* fix format

* address comments

* update

* fix test

* fix nit

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
mutianf and gcf-owl-bot[bot] authored Nov 29, 2022
1 parent ee98338 commit 5c8e1f6
Show file tree
Hide file tree
Showing 2 changed files with 269 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.bigtable.data.v2.models;

import com.google.api.core.BetaApi;
import com.google.api.core.InternalApi;
import com.google.bigtable.v2.ReadRowsRequest;
import com.google.bigtable.v2.RowFilter;
Expand Down Expand Up @@ -248,6 +249,29 @@ public List<Query> shard(SortedSet<ByteString> splitPoints) {
return shards;
}

/**
* Create a query paginator that'll split the query into smaller chunks.
*
* <p>Example usage:
*
* <pre>{@code
* Query query = Query.create(...).range("a", "z");
* Query.QueryPaginator paginator = query.createQueryPaginator(100);
* ByteString lastSeenRowKey = ByteString.EMPTY;
* do {
* List<Row> rows = client.readRowsCallable().all().call(paginator.getNextQuery());
* for (Row row : rows) {
* // do some processing
* lastSeenRow = row;
* }
* } while (paginator.advance(lastSeenRowKey));
* }</pre>
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public QueryPaginator createPaginator(int pageSize) {
return new QueryPaginator(this, pageSize);
}

/** Get the minimal range that encloses all of the row keys and ranges in this Query. */
public ByteStringRange getBound() {
return RowSetUtil.getBound(builder.getRows());
Expand Down Expand Up @@ -297,6 +321,73 @@ private static ByteString wrapKey(String key) {
return ByteString.copyFromUtf8(key);
}

/**
* A Query Paginator that will split a query into small chunks. See {@link
* Query#createPaginator(int)} for example usage.
*/
@BetaApi("This surface is stable yet it might be removed in the future.")
public static class QueryPaginator {

private final boolean hasOverallLimit;
private long remainingRows;
private Query query;
private final int pageSize;
private ByteString prevSplitPoint;

QueryPaginator(@Nonnull Query query, int pageSize) {
this.hasOverallLimit = query.builder.getRowsLimit() > 0;
this.remainingRows = query.builder.getRowsLimit();
this.query = query.limit(pageSize);
if (hasOverallLimit) {
remainingRows -= pageSize;
}
this.pageSize = pageSize;
this.prevSplitPoint = ByteString.EMPTY;
}

/** Return the next query. */
public Query getNextQuery() {
return query;
}

/**
* Construct the next query. Return true if there are more queries to return. False if we've
* read everything.
*/
public boolean advance(@Nonnull ByteString lastSeenRowKey) {
Preconditions.checkNotNull(
lastSeenRowKey, "lastSeenRowKey cannot be null, use ByteString.EMPTY instead.");
// Full table scans don't have ranges or limits. Running the query again will return an empty
// list when we reach the end of the table. lastSeenRowKey won't be updated in this case, and
// we can break out of the loop.
if (lastSeenRowKey.equals(prevSplitPoint)) {
return false;
}
this.prevSplitPoint = lastSeenRowKey;

// Set the query limit. If the original limit is set, return false if the new
// limit is <= 0 to avoid returning more rows than intended.
if (hasOverallLimit && remainingRows <= 0) {
return false;
}
if (hasOverallLimit) {
query.limit(Math.min(this.pageSize, remainingRows));
remainingRows -= pageSize;
} else {
query.limit(pageSize);
}

// Split the row ranges / row keys. Return false if there's nothing
// left on the right of the split point.
RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey);
if (split.getRight() == null) {
return false;
}
query.builder.setRows(split.getRight());
return true;
}
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,4 +327,182 @@ public void testClone() {
assertThat(clonedReq).isEqualTo(query);
assertThat(clonedReq.toProto(requestContext)).isEqualTo(request);
}

@Test
public void testQueryPaginatorRangeLimitReached() {
int chunkSize = 10, limit = 15;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRangeLimitMultiplyOfChunkSize() {
int chunkSize = 10, limit = 20;
Query query = Query.create(TABLE_ID).range("a", "z").limit(limit);
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
int expectedLimit = limit - chunkSize;
nextQuery = paginator.getNextQuery();
expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(expectedLimit);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse();
}

@Test
public void testQueryPaginatorRagneNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).range("a", "z");
Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

Builder expectedProto =
expectedProtoBuilder()
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyClosed(ByteString.copyFromUtf8("a"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue();
nextQuery = paginator.getNextQuery();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder()
.setStartKeyOpen(ByteString.copyFromUtf8("c"))
.setEndKeyOpen(ByteString.copyFromUtf8("z"))
.build()))
.setRowsLimit(chunkSize);
assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("z"))).isFalse();
}

@Test
public void testQueryPaginatorRowsNoLimit() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b").rowKey("c");

Query.QueryPaginator paginator = query.createPaginator(chunkSize);

Query nextQuery = paginator.getNextQuery();

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder();
expectedProto
.getRowsBuilder()
.addRowKeys(ByteString.copyFromUtf8("a"))
.addRowKeys(ByteString.copyFromUtf8("b"))
.addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

paginator.advance(ByteString.copyFromUtf8("b"));
nextQuery = paginator.getNextQuery();
expectedProto = expectedProtoBuilder();
expectedProto.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8("c"));
expectedProto.setRowsLimit(chunkSize);

assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build());

assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isFalse();
}

@Test
public void testQueryPaginatorFullTableScan() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isTrue();
expectedProto
.setRows(
RowSet.newBuilder()
.addRowRanges(
RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a")).build()))
.setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isFalse();
}

@Test
public void testQueryPaginatorEmptyTable() {
int chunkSize = 10;
Query query = Query.create(TABLE_ID);
Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize);

ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize);
assertThat(queryPaginator.getNextQuery().toProto(requestContext))
.isEqualTo(expectedProto.build());

assertThat(queryPaginator.advance(ByteString.EMPTY)).isFalse();
}
}

0 comments on commit 5c8e1f6

Please sign in to comment.