Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigtable): reverse scans #12022

Merged
merged 2 commits into from
Jul 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions google/cloud/bigtable/data_connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,15 @@ future<std::vector<FailedMutation>> DataConnection::AsyncBulkApply(
RowReader DataConnection::ReadRows(std::string const& table_name,
RowSet row_set, std::int64_t rows_limit,
Filter filter) {
auto const& options = google::cloud::internal::CurrentOptions();
return ReadRowsFull(ReadRowsParams{
std::move(table_name),
google::cloud::internal::CurrentOptions().get<AppProfileIdOption>(),
std::move(row_set), rows_limit, std::move(filter)});
options.get<AppProfileIdOption>(),
std::move(row_set),
rows_limit,
std::move(filter),
options.get<ReverseScanOption>(),
});
}

// NOLINTNEXTLINE(performance-unnecessary-value-param)
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/data_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct ReadRowsParams {
RowSet row_set;
std::int64_t rows_limit;
Filter filter = Filter::PassAllFilter();
bool reverse = false;
};

/**
Expand Down
26 changes: 26 additions & 0 deletions google/cloud/bigtable/examples/read_snippets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,27 @@ void ReadFilter(google::cloud::bigtable::Table table,
(std::move(table));
}

void ReadRowsReverse(google::cloud::bigtable::Table table,
std::vector<std::string> const&) {
//! [reverse scan] [START bigtable_reverse_scan]
namespace cbt = ::google::cloud::bigtable;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](cbt::Table table) {
// Read and print the rows.
auto reader = table.ReadRows(
cbt::RowRange::RightOpen("phone#5c10102", "phone#5c10103"), 3,
cbt::Filter::PassAllFilter(),
Options{}.set<cbt::ReverseScanOption>(true));
for (StatusOr<cbt::Row>& row : reader) {
if (!row) throw std::move(row).status();
PrintRow(*row);
}
}
//! [reverse scan] [END bigtable_reverse_scan]
(std::move(table));
}

void RunAll(std::vector<std::string> const& argv) {
namespace examples = ::google::cloud::bigtable::examples;
namespace cbt = ::google::cloud::bigtable;
Expand Down Expand Up @@ -391,6 +412,10 @@ void RunAll(std::vector<std::string> const& argv) {
ReadFilter(table, {});
std::cout << "Running ReadRowsWithLimit() example" << std::endl;
ReadRowsWithLimit(table, {"5"});
if (!google::cloud::bigtable::examples::UsingEmulator()) {
std::cout << "Running ReadRowsReverse() example" << std::endl;
ReadRowsReverse(table, {});
}

std::cout << "Running ReadKeySet() example" << std::endl;
ReadKeysSet({table.project_id(), table.instance_id(), table.table_id(),
Expand All @@ -415,6 +440,7 @@ int main(int argc, char* argv[]) try {
MakeCommandEntry("read-row-ranges", {}, ReadRowRanges),
MakeCommandEntry("read-row-prefix", {}, ReadRowPrefix),
MakeCommandEntry("read-filter", {}, ReadFilter),
MakeCommandEntry("read-rows-reverse", {}, ReadRowsReverse),
{"auto", RunAll},
};

Expand Down
4 changes: 2 additions & 2 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull(
auto impl = std::make_shared<DefaultRowReader>(
stub_, std::move(params.app_profile_id), std::move(params.table_name),
std::move(params.row_set), params.rows_limit, std::move(params.filter),
false, retry_policy(), backoff_policy());
params.reverse, retry_policy(), backoff_policy());
return MakeRowReader(std::move(impl));
}

Expand Down Expand Up @@ -374,7 +374,7 @@ void DataConnectionImpl::AsyncReadRows(
std::function<future<bool>(bigtable::Row)> on_row,
std::function<void(Status)> on_finish, bigtable::RowSet row_set,
std::int64_t rows_limit, bigtable::Filter filter) {
auto reverse = false;
auto reverse = internal::CurrentOptions().get<bigtable::ReverseScanOption>();
bigtable_internal::AsyncRowReader::Create(
background_->cq(), stub_, app_profile_id(), table_name, std::move(on_row),
std::move(on_finish), std::move(row_set), rows_limit, std::move(filter),
Expand Down
49 changes: 47 additions & 2 deletions google/cloud/bigtable/internal/data_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "google/cloud/bigtable/internal/data_connection_impl.h"
#include "google/cloud/bigtable/data_connection.h"
#include "google/cloud/bigtable/internal/defaults.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/bigtable/testing/mock_bigtable_stub.h"
#include "google/cloud/bigtable/testing/mock_policies.h"
#include "google/cloud/common_options.h"
Expand All @@ -40,6 +41,7 @@ using ::google::cloud::bigtable::DataBackoffPolicyOption;
using ::google::cloud::bigtable::DataLimitedErrorCountRetryPolicy;
using ::google::cloud::bigtable::DataRetryPolicyOption;
using ::google::cloud::bigtable::IdempotentMutationPolicyOption;
using ::google::cloud::bigtable::ReverseScanOption;
using ::google::cloud::bigtable::testing::MockAsyncReadRowsStream;
using ::google::cloud::bigtable::testing::MockBigtableStub;
using ::google::cloud::bigtable::testing::MockIdempotentMutationPolicy;
Expand Down Expand Up @@ -705,6 +707,23 @@ TEST(DataConnectionTest, ReadRows) {
EXPECT_EQ(reader.begin(), reader.end());
}

TEST(DataConnectionTest, ReadRowsReverseScan) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, ReadRows)
.WillOnce([](auto, google::bigtable::v2::ReadRowsRequest const& request) {
EXPECT_TRUE(request.reversed());

auto stream = std::make_unique<MockReadRowsStream>();
EXPECT_CALL(*stream, Read).WillOnce(Return(Status()));
return stream;
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions().set<ReverseScanOption>(true));
auto reader = conn->ReadRows(kTableName, TestRowSet(), 42, TestFilter());
EXPECT_EQ(reader.begin(), reader.end());
}

// The DefaultRowReader is tested extensively in `default_row_reader_test.cc`.
// In this test, we just verify that the configuration is passed along.
TEST(DataConnectionTest, ReadRowsFull) {
Expand All @@ -716,7 +735,7 @@ TEST(DataConnectionTest, ReadRowsFull) {
EXPECT_EQ(42, request.rows_limit());
EXPECT_THAT(request, HasTestRowSet());
EXPECT_THAT(request.filter(), IsTestFilter());
EXPECT_FALSE(request.reversed());
EXPECT_TRUE(request.reversed());

auto stream = std::make_unique<MockReadRowsStream>();
EXPECT_CALL(*stream, Read).WillOnce(Return(Status()));
Expand All @@ -726,7 +745,7 @@ TEST(DataConnectionTest, ReadRowsFull) {
auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions());
auto reader = conn->ReadRowsFull(bigtable::ReadRowsParams{
kTableName, kAppProfile, TestRowSet(), 42, TestFilter()});
kTableName, kAppProfile, TestRowSet(), 42, TestFilter(), true});
EXPECT_EQ(reader.begin(), reader.end());
}

Expand Down Expand Up @@ -1478,6 +1497,32 @@ TEST(DataConnectionTest, AsyncReadRows) {
TestFilter());
}

TEST(DataConnectionTest, AsyncReadRowsReverseScan) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncReadRows)
.WillOnce(
[](CompletionQueue const&, auto, v2::ReadRowsRequest const& request) {
EXPECT_TRUE(request.reversed());
using ErrorStream =
internal::AsyncStreamingReadRpcError<v2::ReadRowsResponse>;
return std::make_unique<ErrorStream>(PermanentError());
});

MockFunction<future<bool>(bigtable::Row const&)> on_row;
EXPECT_CALL(on_row, Call).Times(0);

MockFunction<void(Status)> on_finish;
EXPECT_CALL(on_finish, Call).WillOnce([](Status const& status) {
EXPECT_THAT(status, StatusIs(StatusCode::kPermissionDenied));
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(CallOptions().set<ReverseScanOption>(true));
conn->AsyncReadRows(kTableName, on_row.AsStdFunction(),
on_finish.AsStdFunction(), TestRowSet(), 42,
TestFilter());
}

TEST(DataConnectionTest, AsyncReadRowEmpty) {
auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncReadRows)
Expand Down
22 changes: 22 additions & 0 deletions google/cloud/bigtable/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,28 @@ struct AppProfileIdOption {
using Type = std::string;
};

/**
* Read rows in reverse order.
*
* The rows will be streamed in reverse lexicographic order of the keys. This is
* particularly useful to get the last N records before a key.
*
* This option does not affect the contents of the rows, just the order that
* the rows are returned.
*
* @note When using this option, the order of row keys in a `bigtable::RowRange`
* does not change. The row keys still must be supplied in lexicographic order.
*
* @snippet read_snippets.cc reverse scan
*
* @see https://cloud.google.com/bigtable/docs/reads#reverse-scan
*
* @ingroup bigtable-options
*/
struct ReverseScanOption {
using Type = bool;
};

/**
* The endpoint for data operations.
*
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/bigtable/tests/data_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ TEST_P(DataIntegrationTest, TableReadRowsAllRows) {

auto read4 = table.ReadRows(RowSet(), Filter::PassAllFilter());
CheckEqualUnordered(created, MoveCellsFromReader(read4));

if (GetParam() == "with-data-connection" && !UsingCloudBigtableEmulator()) {
auto read5 = table.ReadRows(RowSet(), Filter::PassAllFilter(),
Options{}.set<ReverseScanOption>(true));
CheckEqualUnordered(created, MoveCellsFromReader(read5));
}
}

TEST_P(DataIntegrationTest, TableReadRowsPartialRows) {
Expand Down