Skip to content

Commit

Permalink
fix(bigtable): Use retry policy on all streams with failing mutations (
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc authored Aug 23, 2022
1 parent 819dfcf commit a4356c2
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 28 deletions.
7 changes: 2 additions & 5 deletions google/cloud/bigtable/internal/async_bulk_apply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,15 @@ void AsyncBulkApplier::OnRead(
}

void AsyncBulkApplier::OnFinish(Status const& status) {
auto const is_retryable = status.ok() || retry_policy_->OnFailure(status);
state_.OnFinish(status);
if (!state_.HasPendingMutations() || !is_retryable) {
if (!state_.HasPendingMutations() || !retry_policy_->OnFailure(status)) {
SetPromise();
return;
}

using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;

auto self = this->shared_from_this();
cq_.MakeRelativeTimer(backoff_policy_->OnCompletion())
.then([self](TimerFuture result) {
.then([self](auto result) {
if (result.get()) {
self->StartIteration();
} else {
Expand Down
61 changes: 61 additions & 0 deletions google/cloud/bigtable/internal/async_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,67 @@ TEST(AsyncBulkApplyTest, CurrentOptionsContinuedOnRetries) {
timer_promise.set_value(make_status_or(std::chrono::system_clock::now()));
}

TEST(AsyncBulkApplyTest, RetriesOkStreamWithFailedMutations) {
std::vector<bigtable::FailedMutation> expected = {
{Status(StatusCode::kUnavailable, "try again"), 0}};
bigtable::BulkMutation mut(IdempotentMutation("r1"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, AsyncMutateRows)
.Times(kNumRetries + 1)
.WillRepeatedly([](CompletionQueue const&,
std::unique_ptr<grpc::ClientContext>,
v2::MutateRowsRequest const& request) {
EXPECT_EQ(kAppProfile, request.app_profile_id());
EXPECT_EQ(kTableName, request.table_name());
EXPECT_THAT(request.entries(), ElementsAre(MatchEntry("r1")));
auto stream = absl::make_unique<MockAsyncMutateRowsStream>();
EXPECT_CALL(*stream, Start).WillOnce([] {
return make_ready_future(true);
});
// The overall stream succeeds, but it contains a failed mutation.
// Our retry and backoff policies should take effect.
EXPECT_CALL(*stream, Read)
.WillOnce([] {
return make_ready_future(
MakeResponse({{0, grpc::StatusCode::UNAVAILABLE}}));
})
.WillOnce([] {
return make_ready_future(
absl::optional<v2::MutateRowsResponse>{});
});
EXPECT_CALL(*stream, Finish).WillOnce([] {
return make_ready_future(Status());
});
return stream;
});

auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.Times(kNumRetries)
.WillRepeatedly([] {
return make_ready_future(
make_status_or(std::chrono::system_clock::now()));
});
CompletionQueue cq(mock_cq);

auto retry = DataLimitedErrorCountRetryPolicy(kNumRetries).clone();
auto mock_b = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*mock_b, OnCompletion).Times(kNumRetries);
auto idempotency = bigtable::DefaultIdempotentMutationPolicy();

MockFunction<void(grpc::ClientContext&)> mock_setup;
EXPECT_CALL(mock_setup, Call).Times(kNumRetries + 1);
internal::OptionsSpan span(
Options{}.set<internal::GrpcSetupOption>(mock_setup.AsStdFunction()));

auto actual = AsyncBulkApplier::Create(
cq, mock, std::move(retry), std::move(mock_b), *idempotency, kAppProfile,
kTableName, std::move(mut));

CheckFailedMutations(actual.get(), expected);
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
// micro-optimization.
std::unique_ptr<bigtable::DataRetryPolicy> retry;
std::unique_ptr<BackoffPolicy> backoff;
do {
while (true) {
auto status = mutator.MakeOneRequest(*stub_);
if (status.ok()) continue;
if (!mutator.HasPendingMutations()) break;
if (!retry) retry = retry_policy();
if (!retry->OnFailure(status)) break;
if (!backoff) backoff = backoff_policy();
auto delay = backoff->OnCompletion();
std::this_thread::sleep_for(delay);
} while (mutator.HasPendingMutations());
}
return std::move(mutator).OnRetryDone();
}

Expand Down
37 changes: 37 additions & 0 deletions google/cloud/bigtable/internal/data_connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,43 @@ TEST(DataConnectionTest, BulkApplyNoSleepIfNoPendingMutations) {
(void)conn->BulkApply(kTableName, std::move(mut));
}

TEST(DataConnectionTest, BulkApplyRetriesOkStreamWithFailedMutations) {
std::vector<bigtable::FailedMutation> expected = {
{Status(StatusCode::kUnavailable, "try again"), 0}};
bigtable::BulkMutation mut(IdempotentMutation("r1"));

auto mock = std::make_shared<MockBigtableStub>();
EXPECT_CALL(*mock, MutateRows)
.Times(kNumRetries + 1)
.WillRepeatedly(
[](std::unique_ptr<grpc::ClientContext>,
google::bigtable::v2::MutateRowsRequest const& request) {
EXPECT_EQ(kAppProfile, request.app_profile_id());
EXPECT_EQ(kTableName, request.table_name());
auto stream = absl::make_unique<MockMutateRowsStream>();
// The overall stream succeeds, but it contains failed mutations.
// Our retry and backoff policies should take effect.
EXPECT_CALL(*stream, Read)
.WillOnce(Return(MakeBulkApplyResponse(
{{0, grpc::StatusCode::UNAVAILABLE}})))
.WillOnce(Return(Status()));
return stream;
});

auto mock_b = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*mock_b, clone).WillOnce([]() {
auto clone = absl::make_unique<MockBackoffPolicy>();
EXPECT_CALL(*clone, OnCompletion).Times(kNumRetries);
return clone;
});

auto conn = TestConnection(std::move(mock));
internal::OptionsSpan span(
CallOptions().set<DataBackoffPolicyOption>(std::move(mock_b)));
auto actual = conn->BulkApply(kTableName, std::move(mut));
CheckFailedMutations(actual, expected);
}

// The `AsyncBulkApplier` is tested extensively in `async_bulk_apply_test.cc`.
// In this test, we just verify that the configuration is passed along.
TEST(DataConnectionTest, AsyncBulkApply) {
Expand Down
7 changes: 2 additions & 5 deletions google/cloud/bigtable/internal/legacy_async_bulk_apply.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,15 @@ void AsyncRetryBulkApply::OnRead(
}

void AsyncRetryBulkApply::OnFinish(CompletionQueue cq, Status const& status) {
auto const is_retryable = status.ok() || rpc_retry_policy_->OnFailure(status);
state_.OnFinish(status);
if (!state_.HasPendingMutations() || !is_retryable) {
if (!state_.HasPendingMutations() || !rpc_retry_policy_->OnFailure(status)) {
SetPromise();
return;
}

using TimerFuture = future<StatusOr<std::chrono::system_clock::time_point>>;

auto self = this->shared_from_this();
cq.MakeRelativeTimer(rpc_backoff_policy_->OnCompletion(status))
.then([self, cq](TimerFuture result) {
.then([self, cq](auto result) {
if (result.get()) {
self->StartIteration(std::move(cq));
} else {
Expand Down
50 changes: 50 additions & 0 deletions google/cloud/bigtable/internal/legacy_async_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,56 @@ TEST_F(AsyncBulkApplyTest, TooManyFailures) {
EXPECT_TRUE(cq_impl_->empty());
}

TEST_F(AsyncBulkApplyTest, RetryPolicyUsedForOkStreamsWithFailedMutations) {
bigtable::BulkMutation mut{bigtable::SingleRowMutation(
"row", {bigtable::SetCell("f", "c", 0_ms, "v2")})};

// We give up on the 3rd error.
auto constexpr kErrorCount = 2;

EXPECT_CALL(*client_, PrepareAsyncMutateRows)
.Times(kErrorCount + 1)
.WillRepeatedly([](grpc::ClientContext*,
btproto::MutateRowsRequest const&,
grpc::CompletionQueue*) {
auto reader = absl::make_unique<
MockClientAsyncReaderInterface<btproto::MutateRowsResponse>>();
EXPECT_CALL(*reader, Read)
.WillOnce([](btproto::MutateRowsResponse* r, void*) {
auto& r1 = *r->add_entries();
r1.set_index(0);
r1.mutable_status()->set_code(grpc::StatusCode::UNAVAILABLE);
})
.WillOnce([](btproto::MutateRowsResponse*, void*) {});
EXPECT_CALL(*reader, Finish).WillOnce([](grpc::Status* status, void*) {
*status = grpc::Status::OK;
});
EXPECT_CALL(*reader, StartCall);
return reader;
});

auto limited_retry_policy = LimitedErrorCountRetryPolicy(kErrorCount);
auto bulk_apply_future = internal::AsyncRetryBulkApply::Create(
cq_, limited_retry_policy.clone(), rpc_backoff_policy_->clone(),
*idempotent_mutation_policy_, metadata_update_policy_, client_,
"my-app-profile", "my-table", std::move(mut));

for (int retry = 0; retry != kErrorCount; ++retry) {
SimulateIteration();
// simulate the backoff timer
cq_impl_->SimulateCompletion(true);
ASSERT_EQ(1U, cq_impl_->size());
}

SimulateIteration();

auto failures = StatusOnly(bulk_apply_future.get());
EXPECT_THAT(failures, ElementsAre(StatusIs(StatusCode::kUnavailable)));

ASSERT_EQ(0U, cq_impl_->size());
EXPECT_TRUE(cq_impl_->empty());
}

TEST_F(AsyncBulkApplyTest, UsesBackoffPolicy) {
bigtable::BulkMutation mut{
bigtable::SingleRowMutation("foo2",
Expand Down
4 changes: 1 addition & 3 deletions google/cloud/bigtable/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,8 @@ std::vector<FailedMutation> Table::BulkApply(BulkMutation mut, Options opts) {
retry_policy->Setup(client_context);
metadata_update_policy_.Setup(client_context);
status = mutator.MakeOneRequest(*client_, client_context);
if (!status.ok() && !retry_policy->OnFailure(status)) {
break;
}
if (!mutator.HasPendingMutations()) break;
if (!retry_policy->OnFailure(status)) break;
auto delay = backoff_policy->OnCompletion(status);
std::this_thread::sleep_for(delay);
}
Expand Down
12 changes: 0 additions & 12 deletions google/cloud/bigtable/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,12 +472,6 @@ class Table {
* It is possible that some mutations may not be attempted at all. These
* mutations are considered failing and will be returned.
*
* @note The retry policy is only impacted by the result of the gRPC stream.
* Let's say you have a `LimitedErrorCountRetryPolicy` of 2. If an
* idempotent mutation fails with a retryable error and the stream itself
* succeeds, it may be retried more than 2 times. Only when the stream
* fails twice will we give up and consider the mutation to be failed.
*
* @note This function takes ownership (and then discards) the data in the
* mutation. In general, a `BulkMutation` can modify multiple rows, and
* the modifications for each row can change (or create) multiple cells,
Expand Down Expand Up @@ -517,12 +511,6 @@ class Table {
* It is possible that some mutations may not be attempted at all. These
* mutations are considered failing and will be returned.
*
* @note The retry policy is only impacted by the result of the gRPC stream.
* Let's say you have a `LimitedErrorCountRetryPolicy` of 2. If an
* idempotent mutation fails with a retryable error and the stream itself
* succeeds, it may be retried more than 2 times. Only when the stream
* fails twice will we give up and consider the mutation to be failed.
*
* @note This function takes ownership (and then discards) the data in the
* mutation. In general, a `BulkMutation` can modify multiple rows, and
* the modifications for each row can change (or create) multiple cells,
Expand Down
35 changes: 35 additions & 0 deletions google/cloud/bigtable/table_bulk_apply_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,41 @@ TEST_F(TableBulkApplyTest, TooManyFailures) {
failures.front().status().code());
}

TEST_F(TableBulkApplyTest, RetryPolicyUsedForOkStreamWithFailedMutations) {
// Create a table with specific policies so we can test the behavior
// without having to depend on timers expiring. In this case tolerate only
// 3 failures.
Table custom_table(
client_, "foo_table",
// Configure the Table to stop at 3 failures.
LimitedErrorCountRetryPolicy(2),
// Use much shorter backoff than the default to test faster.
ExponentialBackoffPolicy(10_us, 40_us));

auto create_stream = [&](grpc::ClientContext*,
btproto::MutateRowsRequest const&) {
auto stream = absl::make_unique<MockMutateRowsReader>(
"google.bigtable.v2.Bigtable.MutateRows");
EXPECT_CALL(*stream, Read)
.WillOnce([](btproto::MutateRowsResponse* r) {
auto& e0 = *r->add_entries();
e0.set_index(0);
e0.mutable_status()->set_code(grpc::StatusCode::UNAVAILABLE);
return true;
})
.WillOnce(Return(false));
EXPECT_CALL(*stream, Finish()).WillOnce(Return(grpc::Status::OK));
return stream;
};

EXPECT_CALL(*client_, MutateRows).Times(3).WillRepeatedly(create_stream);

auto failures = custom_table.BulkApply(BulkMutation(
SingleRowMutation("bar", {SetCell("fam", "col", 0_ms, "qux")})));
EXPECT_FALSE(failures.empty());
EXPECT_EQ(StatusCode::kUnavailable, failures.front().status().code());
}

/// @test Verify that Table::BulkApply() retries only idempotent mutations.
TEST_F(TableBulkApplyTest, RetryOnlyIdempotent) {
// We will send both idempotent and non-idempotent mutations. We prepare the
Expand Down

0 comments on commit a4356c2

Please sign in to comment.