From b8878e44a0f296507cfe53f2f270ebb1004277dd Mon Sep 17 00:00:00 2001 From: dbolduc Date: Wed, 5 Jul 2023 09:40:54 -0400 Subject: [PATCH 1/2] feat(bigtable): reverse scans --- google/cloud/bigtable/data_connection.cc | 9 +++- google/cloud/bigtable/data_connection.h | 1 + .../cloud/bigtable/examples/read_snippets.cc | 25 ++++++++++ .../bigtable/internal/data_connection_impl.cc | 4 +- .../internal/data_connection_impl_test.cc | 49 ++++++++++++++++++- google/cloud/bigtable/options.h | 22 +++++++++ .../bigtable/tests/data_integration_test.cc | 6 +++ 7 files changed, 110 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigtable/data_connection.cc b/google/cloud/bigtable/data_connection.cc index 1c3f0bc7b9b7f..6642cb136ce84 100644 --- a/google/cloud/bigtable/data_connection.cc +++ b/google/cloud/bigtable/data_connection.cc @@ -77,10 +77,15 @@ future> DataConnection::AsyncBulkApply( RowReader DataConnection::ReadRows(std::string const& table_name, RowSet row_set, std::int64_t rows_limit, Filter filter) { + auto const& options = google::cloud::internal::CurrentOptions(); return ReadRowsFull(ReadRowsParams{ std::move(table_name), - google::cloud::internal::CurrentOptions().get(), - std::move(row_set), rows_limit, std::move(filter)}); + options.get(), + std::move(row_set), + rows_limit, + std::move(filter), + options.get(), + }); } // NOLINTNEXTLINE(performance-unnecessary-value-param) diff --git a/google/cloud/bigtable/data_connection.h b/google/cloud/bigtable/data_connection.h index 17c87dc846a3e..cc5fd0b690b1d 100644 --- a/google/cloud/bigtable/data_connection.h +++ b/google/cloud/bigtable/data_connection.h @@ -49,6 +49,7 @@ struct ReadRowsParams { RowSet row_set; std::int64_t rows_limit; Filter filter = Filter::PassAllFilter(); + bool reverse = false; }; /** diff --git a/google/cloud/bigtable/examples/read_snippets.cc b/google/cloud/bigtable/examples/read_snippets.cc index 6392c1bc6dc56..0cbaf149b7791 100644 --- a/google/cloud/bigtable/examples/read_snippets.cc +++ b/google/cloud/bigtable/examples/read_snippets.cc @@ -334,6 +334,26 @@ void ReadFilter(google::cloud::bigtable::Table table, (std::move(table)); } +void ReadRowsReverse(google::cloud::bigtable::Table table, + std::vector const&) { + //! [reverse scan] [START bigtable_reverse_scan] + namespace cbt = ::google::cloud::bigtable; + using ::google::cloud::Options; + using ::google::cloud::StatusOr; + [](cbt::Table table) { + // Read and print the rows. + for (StatusOr& row : table.ReadRows( + cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3, + cbt::Filter::PassAllFilter(), + Options{}.set(true))) { + if (!row) throw std::move(row).status(); + PrintRow(*row); + } + } + //! [reverse scan] [END bigtable_reverse_scan] + (std::move(table)); +} + void RunAll(std::vector const& argv) { namespace examples = ::google::cloud::bigtable::examples; namespace cbt = ::google::cloud::bigtable; @@ -391,6 +411,10 @@ void RunAll(std::vector const& argv) { ReadFilter(table, {}); std::cout << "Running ReadRowsWithLimit() example" << std::endl; ReadRowsWithLimit(table, {"5"}); + if (!google::cloud::bigtable::examples::UsingEmulator()) { + std::cout << "Running ReadRowsReverse() example" << std::endl; + ReadRowsReverse(table, {}); + } std::cout << "Running ReadKeySet() example" << std::endl; ReadKeysSet({table.project_id(), table.instance_id(), table.table_id(), @@ -415,6 +439,7 @@ int main(int argc, char* argv[]) try { MakeCommandEntry("read-row-ranges", {}, ReadRowRanges), MakeCommandEntry("read-row-prefix", {}, ReadRowPrefix), MakeCommandEntry("read-filter", {}, ReadFilter), + MakeCommandEntry("read-rows-reverse", {}, ReadRowsReverse), {"auto", RunAll}, }; diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 7c6839fae2374..fd28cc2295f8c 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -178,7 +178,7 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull( auto impl = std::make_shared( stub_, std::move(params.app_profile_id), std::move(params.table_name), std::move(params.row_set), params.rows_limit, std::move(params.filter), - false, retry_policy(), backoff_policy()); + params.reverse, retry_policy(), backoff_policy()); return MakeRowReader(std::move(impl)); } @@ -374,7 +374,7 @@ void DataConnectionImpl::AsyncReadRows( std::function(bigtable::Row)> on_row, std::function on_finish, bigtable::RowSet row_set, std::int64_t rows_limit, bigtable::Filter filter) { - auto reverse = false; + auto reverse = internal::CurrentOptions().get(); bigtable_internal::AsyncRowReader::Create( background_->cq(), stub_, app_profile_id(), table_name, std::move(on_row), std::move(on_finish), std::move(row_set), rows_limit, std::move(filter), diff --git a/google/cloud/bigtable/internal/data_connection_impl_test.cc b/google/cloud/bigtable/internal/data_connection_impl_test.cc index f2d851de01c03..4cebbb18bc7b5 100644 --- a/google/cloud/bigtable/internal/data_connection_impl_test.cc +++ b/google/cloud/bigtable/internal/data_connection_impl_test.cc @@ -15,6 +15,7 @@ #include "google/cloud/bigtable/internal/data_connection_impl.h" #include "google/cloud/bigtable/data_connection.h" #include "google/cloud/bigtable/internal/defaults.h" +#include "google/cloud/bigtable/options.h" #include "google/cloud/bigtable/testing/mock_bigtable_stub.h" #include "google/cloud/bigtable/testing/mock_policies.h" #include "google/cloud/common_options.h" @@ -40,6 +41,7 @@ using ::google::cloud::bigtable::DataBackoffPolicyOption; using ::google::cloud::bigtable::DataLimitedErrorCountRetryPolicy; using ::google::cloud::bigtable::DataRetryPolicyOption; using ::google::cloud::bigtable::IdempotentMutationPolicyOption; +using ::google::cloud::bigtable::ReverseScanOption; using ::google::cloud::bigtable::testing::MockAsyncReadRowsStream; using ::google::cloud::bigtable::testing::MockBigtableStub; using ::google::cloud::bigtable::testing::MockIdempotentMutationPolicy; @@ -705,6 +707,23 @@ TEST(DataConnectionTest, ReadRows) { EXPECT_EQ(reader.begin(), reader.end()); } +TEST(DataConnectionTest, ReadRowsReverseScan) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, ReadRows) + .WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Read).WillOnce(Return(Status())); + return stream; + }); + + auto conn = TestConnection(std::move(mock)); + internal::OptionsSpan span(CallOptions().set(true)); + auto reader = conn->ReadRows(kTableName, TestRowSet(), 42, TestFilter()); + EXPECT_EQ(reader.begin(), reader.end()); +} + // The DefaultRowReader is tested extensively in `default_row_reader_test.cc`. // In this test, we just verify that the configuration is passed along. TEST(DataConnectionTest, ReadRowsFull) { @@ -716,7 +735,7 @@ TEST(DataConnectionTest, ReadRowsFull) { EXPECT_EQ(42, request.rows_limit()); EXPECT_THAT(request, HasTestRowSet()); EXPECT_THAT(request.filter(), IsTestFilter()); - EXPECT_FALSE(request.reversed()); + EXPECT_TRUE(request.reversed()); auto stream = std::make_unique(); EXPECT_CALL(*stream, Read).WillOnce(Return(Status())); @@ -726,7 +745,7 @@ TEST(DataConnectionTest, ReadRowsFull) { auto conn = TestConnection(std::move(mock)); internal::OptionsSpan span(CallOptions()); auto reader = conn->ReadRowsFull(bigtable::ReadRowsParams{ - kTableName, kAppProfile, TestRowSet(), 42, TestFilter()}); + kTableName, kAppProfile, TestRowSet(), 42, TestFilter(), true}); EXPECT_EQ(reader.begin(), reader.end()); } @@ -1478,6 +1497,32 @@ TEST(DataConnectionTest, AsyncReadRows) { TestFilter()); } +TEST(DataConnectionTest, AsyncReadRowsReverseScan) { + auto mock = std::make_shared(); + EXPECT_CALL(*mock, AsyncReadRows) + .WillOnce( + [](CompletionQueue const&, auto, v2::ReadRowsRequest const& request) { + EXPECT_TRUE(request.reversed()); + using ErrorStream = + internal::AsyncStreamingReadRpcError; + return std::make_unique(PermanentError()); + }); + + MockFunction(bigtable::Row const&)> on_row; + EXPECT_CALL(on_row, Call).Times(0); + + MockFunction on_finish; + EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) { + EXPECT_THAT(status, StatusIs(StatusCode::kPermissionDenied)); + }); + + auto conn = TestConnection(std::move(mock)); + internal::OptionsSpan span(CallOptions().set(true)); + conn->AsyncReadRows(kTableName, on_row.AsStdFunction(), + on_finish.AsStdFunction(), TestRowSet(), 42, + TestFilter()); +} + TEST(DataConnectionTest, AsyncReadRowEmpty) { auto mock = std::make_shared(); EXPECT_CALL(*mock, AsyncReadRows) diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index 4c5291c547dd2..53369aa1cb5a2 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -76,6 +76,28 @@ struct AppProfileIdOption { using Type = std::string; }; +/** + * Read rows in reverse order. + * + * The rows will be streamed in reverse lexicographic order of the keys. This is + * particularly useful to get the last N records before a key. + * + * This option does not affect the contents of the rows, just the order that + * the rows are returned. + * + * @note When using this option, the order of row keys in a `bigtable::RowRange` + * does not change. The row keys still must be supplied in lexicographic order. + * + * @snippet read_snippets.cc reverse scan + * + * @see https://cloud.google.com/bigtable/docs/reads#reverse-scan + * + * @ingroup bigtable-options + */ +struct ReverseScanOption { + using Type = bool; +}; + /** * The endpoint for data operations. * diff --git a/google/cloud/bigtable/tests/data_integration_test.cc b/google/cloud/bigtable/tests/data_integration_test.cc index 64b4118072b5b..931fee6ec048d 100644 --- a/google/cloud/bigtable/tests/data_integration_test.cc +++ b/google/cloud/bigtable/tests/data_integration_test.cc @@ -198,6 +198,12 @@ TEST_P(DataIntegrationTest, TableReadRowsAllRows) { auto read4 = table.ReadRows(RowSet(), Filter::PassAllFilter()); CheckEqualUnordered(created, MoveCellsFromReader(read4)); + + if (GetParam() == "with-data-connection" && !UsingCloudBigtableEmulator()) { + auto read5 = table.ReadRows(RowSet(), Filter::PassAllFilter(), + Options{}.set(true)); + CheckEqualUnordered(created, MoveCellsFromReader(read5)); + } } TEST_P(DataIntegrationTest, TableReadRowsPartialRows) { From b666bda253daa8c337ce3b7a26ff2a3fd07359d2 Mon Sep 17 00:00:00 2001 From: dbolduc Date: Wed, 5 Jul 2023 12:10:41 -0400 Subject: [PATCH 2/2] address review comments --- google/cloud/bigtable/examples/read_snippets.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/google/cloud/bigtable/examples/read_snippets.cc b/google/cloud/bigtable/examples/read_snippets.cc index 0cbaf149b7791..eb96e7b8e11bf 100644 --- a/google/cloud/bigtable/examples/read_snippets.cc +++ b/google/cloud/bigtable/examples/read_snippets.cc @@ -342,10 +342,11 @@ void ReadRowsReverse(google::cloud::bigtable::Table table, using ::google::cloud::StatusOr; [](cbt::Table table) { // Read and print the rows. - for (StatusOr& row : table.ReadRows( - cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3, - cbt::Filter::PassAllFilter(), - Options{}.set(true))) { + auto reader = table.ReadRows( + cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3, + cbt::Filter::PassAllFilter(), + Options{}.set(true)); + for (StatusOr& row : reader) { if (!row) throw std::move(row).status(); PrintRow(*row); }