From 5c8e1f656b15177ecef4709b9e502cef58cca479 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Tue, 29 Nov 2022 17:21:04 -0500 Subject: [PATCH] feat: add a query paginator (#1530) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add a query paginator * add some comments * add a test for full table scan * fix format * address comments * update * fix test * fix nit * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md Co-authored-by: Owl Bot --- .../cloud/bigtable/data/v2/models/Query.java | 91 +++++++++ .../bigtable/data/v2/models/QueryTest.java | 178 ++++++++++++++++++ 2 files changed, 269 insertions(+) diff --git a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java index 986a0ca1a5..271ffe3adf 100644 --- a/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java +++ b/google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Query.java @@ -15,6 +15,7 @@ */ package com.google.cloud.bigtable.data.v2.models; +import com.google.api.core.BetaApi; import com.google.api.core.InternalApi; import com.google.bigtable.v2.ReadRowsRequest; import com.google.bigtable.v2.RowFilter; @@ -248,6 +249,29 @@ public List shard(SortedSet splitPoints) { return shards; } + /** + * Create a query paginator that'll split the query into smaller chunks. + * + *

Example usage: + * + *

{@code
+   * Query query = Query.create(...).range("a", "z");
+   * Query.QueryPaginator paginator = query.createQueryPaginator(100);
+   * ByteString lastSeenRowKey = ByteString.EMPTY;
+   * do {
+   *     List rows = client.readRowsCallable().all().call(paginator.getNextQuery());
+   *     for (Row row : rows) {
+   *        // do some processing
+   *        lastSeenRow = row;
+   *     }
+   * } while (paginator.advance(lastSeenRowKey));
+   * }
+ */ + @BetaApi("This surface is stable yet it might be removed in the future.") + public QueryPaginator createPaginator(int pageSize) { + return new QueryPaginator(this, pageSize); + } + /** Get the minimal range that encloses all of the row keys and ranges in this Query. */ public ByteStringRange getBound() { return RowSetUtil.getBound(builder.getRows()); @@ -297,6 +321,73 @@ private static ByteString wrapKey(String key) { return ByteString.copyFromUtf8(key); } + /** + * A Query Paginator that will split a query into small chunks. See {@link + * Query#createPaginator(int)} for example usage. + */ + @BetaApi("This surface is stable yet it might be removed in the future.") + public static class QueryPaginator { + + private final boolean hasOverallLimit; + private long remainingRows; + private Query query; + private final int pageSize; + private ByteString prevSplitPoint; + + QueryPaginator(@Nonnull Query query, int pageSize) { + this.hasOverallLimit = query.builder.getRowsLimit() > 0; + this.remainingRows = query.builder.getRowsLimit(); + this.query = query.limit(pageSize); + if (hasOverallLimit) { + remainingRows -= pageSize; + } + this.pageSize = pageSize; + this.prevSplitPoint = ByteString.EMPTY; + } + + /** Return the next query. */ + public Query getNextQuery() { + return query; + } + + /** + * Construct the next query. Return true if there are more queries to return. False if we've + * read everything. + */ + public boolean advance(@Nonnull ByteString lastSeenRowKey) { + Preconditions.checkNotNull( + lastSeenRowKey, "lastSeenRowKey cannot be null, use ByteString.EMPTY instead."); + // Full table scans don't have ranges or limits. Running the query again will return an empty + // list when we reach the end of the table. lastSeenRowKey won't be updated in this case, and + // we can break out of the loop. + if (lastSeenRowKey.equals(prevSplitPoint)) { + return false; + } + this.prevSplitPoint = lastSeenRowKey; + + // Set the query limit. If the original limit is set, return false if the new + // limit is <= 0 to avoid returning more rows than intended. + if (hasOverallLimit && remainingRows <= 0) { + return false; + } + if (hasOverallLimit) { + query.limit(Math.min(this.pageSize, remainingRows)); + remainingRows -= pageSize; + } else { + query.limit(pageSize); + } + + // Split the row ranges / row keys. Return false if there's nothing + // left on the right of the split point. + RowSetUtil.Split split = RowSetUtil.split(query.builder.getRows(), lastSeenRowKey); + if (split.getRight() == null) { + return false; + } + query.builder.setRows(split.getRight()); + return true; + } + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java index ccb0441c71..655aeda688 100644 --- a/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java +++ b/google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/QueryTest.java @@ -327,4 +327,182 @@ public void testClone() { assertThat(clonedReq).isEqualTo(query); assertThat(clonedReq.toProto(requestContext)).isEqualTo(request); } + + @Test + public void testQueryPaginatorRangeLimitReached() { + int chunkSize = 10, limit = 15; + Query query = Query.create(TABLE_ID).range("a", "z").limit(limit); + Query.QueryPaginator paginator = query.createPaginator(chunkSize); + + Query nextQuery = paginator.getNextQuery(); + + Builder expectedProto = + expectedProtoBuilder() + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(chunkSize); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue(); + int expectedLimit = limit - chunkSize; + nextQuery = paginator.getNextQuery(); + expectedProto = + expectedProtoBuilder() + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("c")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(expectedLimit); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse(); + } + + @Test + public void testQueryPaginatorRangeLimitMultiplyOfChunkSize() { + int chunkSize = 10, limit = 20; + Query query = Query.create(TABLE_ID).range("a", "z").limit(limit); + Query.QueryPaginator paginator = query.createPaginator(chunkSize); + + Query nextQuery = paginator.getNextQuery(); + + Builder expectedProto = + expectedProtoBuilder() + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(chunkSize); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue(); + int expectedLimit = limit - chunkSize; + nextQuery = paginator.getNextQuery(); + expectedProto = + expectedProtoBuilder() + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("c")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(expectedLimit); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("d"))).isFalse(); + } + + @Test + public void testQueryPaginatorRagneNoLimit() { + int chunkSize = 10; + Query query = Query.create(TABLE_ID).range("a", "z"); + Query.QueryPaginator paginator = query.createPaginator(chunkSize); + + Query nextQuery = paginator.getNextQuery(); + + Builder expectedProto = + expectedProtoBuilder() + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyClosed(ByteString.copyFromUtf8("a")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(chunkSize); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isTrue(); + nextQuery = paginator.getNextQuery(); + expectedProto + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder() + .setStartKeyOpen(ByteString.copyFromUtf8("c")) + .setEndKeyOpen(ByteString.copyFromUtf8("z")) + .build())) + .setRowsLimit(chunkSize); + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("z"))).isFalse(); + } + + @Test + public void testQueryPaginatorRowsNoLimit() { + int chunkSize = 10; + Query query = Query.create(TABLE_ID).rowKey("a").rowKey("b").rowKey("c"); + + Query.QueryPaginator paginator = query.createPaginator(chunkSize); + + Query nextQuery = paginator.getNextQuery(); + + ReadRowsRequest.Builder expectedProto = expectedProtoBuilder(); + expectedProto + .getRowsBuilder() + .addRowKeys(ByteString.copyFromUtf8("a")) + .addRowKeys(ByteString.copyFromUtf8("b")) + .addRowKeys(ByteString.copyFromUtf8("c")); + expectedProto.setRowsLimit(chunkSize); + + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + paginator.advance(ByteString.copyFromUtf8("b")); + nextQuery = paginator.getNextQuery(); + expectedProto = expectedProtoBuilder(); + expectedProto.getRowsBuilder().addRowKeys(ByteString.copyFromUtf8("c")); + expectedProto.setRowsLimit(chunkSize); + + assertThat(nextQuery.toProto(requestContext)).isEqualTo(expectedProto.build()); + + assertThat(paginator.advance(ByteString.copyFromUtf8("c"))).isFalse(); + } + + @Test + public void testQueryPaginatorFullTableScan() { + int chunkSize = 10; + Query query = Query.create(TABLE_ID); + Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize); + + ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize); + assertThat(queryPaginator.getNextQuery().toProto(requestContext)) + .isEqualTo(expectedProto.build()); + + assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isTrue(); + expectedProto + .setRows( + RowSet.newBuilder() + .addRowRanges( + RowRange.newBuilder().setStartKeyOpen(ByteString.copyFromUtf8("a")).build())) + .setRowsLimit(chunkSize); + assertThat(queryPaginator.getNextQuery().toProto(requestContext)) + .isEqualTo(expectedProto.build()); + + assertThat(queryPaginator.advance(ByteString.copyFromUtf8("a"))).isFalse(); + } + + @Test + public void testQueryPaginatorEmptyTable() { + int chunkSize = 10; + Query query = Query.create(TABLE_ID); + Query.QueryPaginator queryPaginator = query.createPaginator(chunkSize); + + ReadRowsRequest.Builder expectedProto = expectedProtoBuilder().setRowsLimit(chunkSize); + assertThat(queryPaginator.getNextQuery().toProto(requestContext)) + .isEqualTo(expectedProto.build()); + + assertThat(queryPaginator.advance(ByteString.EMPTY)).isFalse(); + } }