Skip to content

Commit

Permalink
feat(bigtable): reverse scans (#12022)
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Jul 5, 2023
1 parent bfe5ef7 commit 286a6eb
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 6 deletions.
9 changes: 7 additions & 2 deletions google/cloud/bigtable/data_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ future<std::vector<FailedMutation>> DataConnection::AsyncBulkApply(
RowReader DataConnection::ReadRows(std::string const& table_name,
RowSet row_set, std::int64_t rows_limit,
Filter filter) {
auto const& options = google::cloud::internal::CurrentOptions();
return ReadRowsFull(ReadRowsParams{
std::move(table_name),
google::cloud::internal::CurrentOptions().get<AppProfileIdOption>(),
std::move(row_set), rows_limit, std::move(filter)});
options.get<AppProfileIdOption>(),
std::move(row_set),
rows_limit,
std::move(filter),
options.get<ReverseScanOption>(),
});
}

// NOLINTNEXTLINE(performance-unnecessary-value-param)
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/data_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct ReadRowsParams {
RowSet row_set;
std::int64_t rows_limit;
Filter filter = Filter::PassAllFilter();
bool reverse = false;
};

/**
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/bigtable/examples/read_snippets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,27 @@ void ReadFilter(google::cloud::bigtable::Table table,
(std::move(table));
}

void ReadRowsReverse(google::cloud::bigtable::Table table,
std::vector<std::string> const&) {
//! [reverse scan] [START bigtable_reverse_scan]
namespace cbt = ::google::cloud::bigtable;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
auto reader = table.ReadRows(
cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3,
cbt::Filter::PassAllFilter(),
Options{}.set<cbt::ReverseScanOption>(true));
for (StatusOr<cbt::Row>& row : reader) {
if (!row) throw std::move(row).status();
PrintRow(*row);
}
}
//! [reverse scan] [END bigtable_reverse_scan]
(std::move(table));
}

void RunAll(std::vector<std::string> const& argv) {
namespace examples = ::google::cloud::bigtable::examples;
namespace cbt = ::google::cloud::bigtable;
Expand Down Expand Up @@ -391,6 +412,10 @@ void RunAll(std::vector<std::string> const& argv) {
ReadFilter(table, {});
std::cout << "Running ReadRowsWithLimit() example" << std::endl;
ReadRowsWithLimit(table, {"5"});
if (!google::cloud::bigtable::examples::UsingEmulator()) {
std::cout << "Running ReadRowsReverse() example" << std::endl;
ReadRowsReverse(table, {});
}

std::cout << "Running ReadKeySet() example" << std::endl;
ReadKeysSet({table.project_id(), table.instance_id(), table.table_id(),
Expand All @@ -415,6 +440,7 @@ int main(int argc, char* argv[]) try {
MakeCommandEntry("read-row-ranges", {}, ReadRowRanges),
MakeCommandEntry("read-row-prefix", {}, ReadRowPrefix),
MakeCommandEntry("read-filter", {}, ReadFilter),
MakeCommandEntry("read-rows-reverse", {}, ReadRowsReverse),
{"auto", RunAll},
};

Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull(
auto impl = std::make_shared<DefaultRowReader>(
stub_, std::move(params.app_profile_id), std::move(params.table_name),
std::move(params.row_set), params.rows_limit, std::move(params.filter),
false, retry_policy(), backoff_policy());
params.reverse, retry_policy(), backoff_policy());
return MakeRowReader(std::move(impl));
}

Expand Down Expand Up @@ -374,7 +374,7 @@ void DataConnectionImpl::AsyncReadRows(
std::function<future<bool>(bigtable::Row)> on_row,
std::function<void(Status)> on_finish, bigtable::RowSet row_set,
std::int64_t rows_limit, bigtable::Filter filter) {
auto reverse = false;
auto reverse = internal::CurrentOptions().get<bigtable::ReverseScanOption>();
bigtable_internal::AsyncRowReader::Create(
background_->cq(), stub_, app_profile_id(), table_name, std::move(on_row),
std::move(on_finish), std::move(row_set), rows_limit, std::move(filter),
Expand Down
49 changes: 47 additions & 2 deletions google/cloud/bigtable/internal/data_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "google/cloud/bigtable/internal/data_connection_impl.h"
#include "google/cloud/bigtable/data_connection.h"
#include "google/cloud/bigtable/internal/defaults.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
#include "google/cloud/bigtable/testing/mock_policies.h"
#include "google/cloud/common_options.h"
Expand All @@ -40,6 +41,7 @@ using ::google::cloud::bigtable::DataBackoffPolicyOption;
using ::google::cloud::bigtable::DataLimitedErrorCountRetryPolicy;
using ::google::cloud::bigtable::DataRetryPolicyOption;
using ::google::cloud::bigtable::IdempotentMutationPolicyOption;
using ::google::cloud::bigtable::ReverseScanOption;
using ::google::cloud::bigtable::testing::MockAsyncReadRowsStream;
using ::google::cloud::bigtable::testing::MockBigtableStub;
using ::google::cloud::bigtable::testing::MockIdempotentMutationPolicy;
Expand Down Expand Up @@ -705,6 +707,23 @@ TEST(DataConnectionTest, ReadRows) {
EXPECT_EQ(reader.begin(), reader.end());
}

TEST(DataConnectionTest, ReadRowsReverseScan) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, ReadRows)
.WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) {
EXPECT_TRUE(request.reversed());

auto stream = std::make_unique<MockReadRowsStream>();
EXPECT_CALL(*stream, Read).WillOnce(Return(Status()));
return stream;
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions().set<ReverseScanOption>(true));
auto reader = conn->ReadRows(kTableName, TestRowSet(), 42, TestFilter());
EXPECT_EQ(reader.begin(), reader.end());
}

// The DefaultRowReader is tested extensively in `default_row_reader_test.cc`.
// In this test, we just verify that the configuration is passed along.
TEST(DataConnectionTest, ReadRowsFull) {
Expand All @@ -716,7 +735,7 @@ TEST(DataConnectionTest, ReadRowsFull) {
EXPECT_EQ(42, request.rows_limit());
EXPECT_THAT(request, HasTestRowSet());
EXPECT_THAT(request.filter(), IsTestFilter());
EXPECT_FALSE(request.reversed());
EXPECT_TRUE(request.reversed());

auto stream = std::make_unique<MockReadRowsStream>();
EXPECT_CALL(*stream, Read).WillOnce(Return(Status()));
Expand All @@ -726,7 +745,7 @@ TEST(DataConnectionTest, ReadRowsFull) {
auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions());
auto reader = conn->ReadRowsFull(bigtable::ReadRowsParams{
kTableName, kAppProfile, TestRowSet(), 42, TestFilter()});
kTableName, kAppProfile, TestRowSet(), 42, TestFilter(), true});
EXPECT_EQ(reader.begin(), reader.end());
}

Expand Down Expand Up @@ -1478,6 +1497,32 @@ TEST(DataConnectionTest, AsyncReadRows) {
TestFilter());
}

TEST(DataConnectionTest, AsyncReadRowsReverseScan) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncReadRows)
.WillOnce(
[](CompletionQueue const&, auto, v2::ReadRowsRequest const& request) {
EXPECT_TRUE(request.reversed());
using ErrorStream =
internal::AsyncStreamingReadRpcError<v2::ReadRowsResponse>;
return std::make_unique<ErrorStream>(PermanentError());
});

MockFunction<future<bool>(bigtable::Row const&)> on_row;
EXPECT_CALL(on_row, Call).Times(0);

MockFunction<void(Status)> on_finish;
EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) {
EXPECT_THAT(status, StatusIs(StatusCode::kPermissionDenied));
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions().set<ReverseScanOption>(true));
conn->AsyncReadRows(kTableName, on_row.AsStdFunction(),
on_finish.AsStdFunction(), TestRowSet(), 42,
TestFilter());
}

TEST(DataConnectionTest, AsyncReadRowEmpty) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncReadRows)
Expand Down
22 changes: 22 additions & 0 deletions google/cloud/bigtable/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ struct AppProfileIdOption {
using Type = std::string;
};

/**
* Read rows in reverse order.
*
* The rows will be streamed in reverse lexicographic order of the keys. This is
* particularly useful to get the last N records before a key.
*
* This option does not affect the contents of the rows, just the order that
* the rows are returned.
*
* @note When using this option, the order of row keys in a `bigtable::RowRange`
* does not change. The row keys still must be supplied in lexicographic order.
*
* @snippet read_snippets.cc reverse scan
*
* @see https://cloud.google.com/bigtable/docs/reads#reverse-scan
*
* @ingroup bigtable-options
*/
struct ReverseScanOption {
using Type = bool;
};

/**
* The endpoint for data operations.
*
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigtable/tests/data_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ TEST_P(DataIntegrationTest, TableReadRowsAllRows) {

auto read4 = table.ReadRows(RowSet(), Filter::PassAllFilter());
CheckEqualUnordered(created, MoveCellsFromReader(read4));

if (GetParam() == "with-data-connection" && !UsingCloudBigtableEmulator()) {
auto read5 = table.ReadRows(RowSet(), Filter::PassAllFilter(),
Options{}.set<ReverseScanOption>(true));
CheckEqualUnordered(created, MoveCellsFromReader(read5));
}
}

TEST_P(DataIntegrationTest, TableReadRowsPartialRows) {
Expand Down

0 comments on commit 286a6eb

Please sign in to comment.