Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(bigtable): add MutateRowsLimiter #13079

Merged
merged 2 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions google/cloud/bigtable/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ add_library(
internal/legacy_row_reader.h
internal/logging_data_client.cc
internal/logging_data_client.h
internal/mutate_rows_limiter.cc
internal/mutate_rows_limiter.h
internal/prefix_range_end.cc
internal/prefix_range_end.h
internal/rate_limiter.cc
Expand Down Expand Up @@ -386,6 +388,7 @@ if (BUILD_TESTING)
internal/legacy_bulk_mutator_test.cc
internal/legacy_row_reader_test.cc
internal/logging_data_client_test.cc
internal/mutate_rows_limiter_test.cc
internal/prefix_range_end_test.cc
internal/rate_limiter_test.cc
internal/retry_traits_test.cc
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/bigtable_client_unit_tests.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ bigtable_client_unit_tests = [
"internal/legacy_bulk_mutator_test.cc",
"internal/legacy_row_reader_test.cc",
"internal/logging_data_client_test.cc",
"internal/mutate_rows_limiter_test.cc",
"internal/prefix_range_end_test.cc",
"internal/rate_limiter_test.cc",
"internal/retry_traits_test.cc",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigtable/google_cloud_cpp_bigtable.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ google_cloud_cpp_bigtable_hdrs = [
"internal/legacy_async_row_sampler.h",
"internal/legacy_row_reader.h",
"internal/logging_data_client.h",
"internal/mutate_rows_limiter.h",
"internal/prefix_range_end.h",
"internal/rate_limiter.h",
"internal/readrowsparser.h",
Expand Down Expand Up @@ -196,6 +197,7 @@ google_cloud_cpp_bigtable_srcs = [
"internal/legacy_async_row_sampler.cc",
"internal/legacy_row_reader.cc",
"internal/logging_data_client.cc",
"internal/mutate_rows_limiter.cc",
"internal/prefix_range_end.cc",
"internal/rate_limiter.cc",
"internal/readrowsparser.cc",
Expand Down
68 changes: 68 additions & 0 deletions google/cloud/bigtable/internal/mutate_rows_limiter.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/bigtable/internal/mutate_rows_limiter.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/internal/opentelemetry.h"
#include <algorithm>
#include <thread>

namespace google {
namespace cloud {
namespace bigtable_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

// clamp value to the range [min, max]
template <typename T>
T Clamp(T value, T min, T max) {
return (std::min)(max, (std::max)(min, value));
}

} // namespace

void ThrottlingMutateRowsLimiter::Acquire() {
auto wait = limiter_.acquire(1);
throttled_since_last_update_ =
throttled_since_last_update_ || wait != Clock::duration::zero();
on_wait_(wait);
}

void ThrottlingMutateRowsLimiter::Update(
google::bigtable::v2::MutateRowsResponse const& response) {
if (!response.has_rate_limit_info()) return;
auto const now = clock_->Now();
if (now < next_update_) return;
auto const& info = response.rate_limit_info();
next_update_ = now + std::chrono::duration_cast<Clock::duration>(
std::chrono::seconds(info.period().seconds()) +
std::chrono::nanoseconds(info.period().nanos()));

// The effective QPS can lag behind the max QPS allowed by the rate limiter.
// In such a case, we should not keep increasing the max QPS allowed. We
// should only increase the ceiling if we are actually hitting that ceiling.
if (info.factor() > 1 && !throttled_since_last_update_) return;
throttled_since_last_update_ = false;

auto factor = Clamp(info.factor(), min_factor_, max_factor_);
auto const period = Clamp(
std::chrono::duration_cast<Clock::duration>(limiter_.period() / factor),
min_period_, max_period_);
limiter_.set_period(period);
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace cloud
} // namespace google
95 changes: 95 additions & 0 deletions google/cloud/bigtable/internal/mutate_rows_limiter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_MUTATE_ROWS_LIMITER_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_MUTATE_ROWS_LIMITER_H

#include "google/cloud/bigtable/internal/rate_limiter.h"
#include "google/cloud/internal/clock.h"
#include "google/cloud/version.h"
#include <google/bigtable/v2/bigtable.pb.h>
#include <chrono>
#include <functional>
#include <memory>

namespace google {
namespace cloud {
namespace bigtable_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

/// A Bigtable-specific wrapper over the more generic `RateLimiter`
class MutateRowsLimiter {
public:
virtual void Acquire() = 0;
virtual void Update(
google::bigtable::v2::MutateRowsResponse const& response) = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this whole thing be divorced from MutateRows() by taking a google::bigtable::v2::RateLimitInfo instead, just making it bigtable_internal::RateLimiter? RateLimitInfo might be used in others calls in the future. (I guess the mutate-rows Update() call would become conditional.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this whole thing be divorced from MutateRows()... RateLimitInfo might be used in others calls in the future.

This is a good question. RateLimitInfo is defined inside MutateRowsResponse though: https://github.com/googleapis/googleapis/blob/92a6b0fc959905f68e567dd60f766167772594bd/google/bigtable/v2/bigtable.proto#L513

The Bigtable team mentioned that the ReadRows API could be worthy of throttling. But I don't think they plan to add that. This feature is really aimed at heavy write jobs.

Moreover, the flags to enable things are called mutate_rows_rate_limit[0-9]*

https://github.com/googleapis/googleapis/blob/92a6b0fc959905f68e567dd60f766167772594bd/google/bigtable/v2/feature_flags.proto#L40-L48

(I guess the mutate-rows Update() call would become conditional.)

Yeah. This was my main motivation, although it's not a big deal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RateLimitInfo is defined inside MutateRowsResponse though

That's not what I see, hence the suggestion.

Moreover, the flags to enable things are called mutate_rows_rate_limit[0-9]*

I don't think that's indicative of anything other than they are the flags for MutateRows(). If RateLimitInfo was used elsewhere, presumably that might get its own flags.

(I guess the mutate-rows Update() call would become conditional.)

Yeah. This was my main motivation, although it's not a big deal.

I agree that it's not a big deal, and therefore not a good main motivation. That is, it seems a shame to specialize the interface for that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not what I see, hence the suggestion.

D'oh. I can't read.

Still this class is internal. We can easily change it later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can easily change it later.

True enough, although I think we suffer a little from we-can-change-it-later-itis.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree... but observation:

It will take as much effort to change it now as it will to change it later... and we might not need to change it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We run into trouble when we do this a bunch, and then we need to change N of them at the same time. 🫨

};

class NoopMutateRowsLimiter : public MutateRowsLimiter {
public:
void Acquire() override {}
void Update(google::bigtable::v2::MutateRowsResponse const&) override {}
};

class ThrottlingMutateRowsLimiter : public MutateRowsLimiter {
public:
using Clock = RateLimiter::Clock;
template <typename Rep1, typename Period1, typename Rep2, typename Period2,
typename Rep3, typename Period3>
explicit ThrottlingMutateRowsLimiter(
std::shared_ptr<Clock> clock,
std::function<void(Clock::duration)> on_wait,
std::chrono::duration<Rep1, Period1> initial_period,
std::chrono::duration<Rep2, Period2> min_period,
std::chrono::duration<Rep3, Period3> max_period, double min_factor,
double max_factor)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL... do I need to change the variable to max_factor_tm ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you do max_factor™? 😊

: clock_(std::move(clock)),
limiter_(clock_, initial_period),
on_wait_(std::move(on_wait)),
next_update_(clock_->Now()),
min_period_(std::chrono::duration_cast<Clock::duration>(min_period)),
max_period_(std::chrono::duration_cast<Clock::duration>(max_period)),
min_factor_(min_factor),
max_factor_(max_factor) {}

void Acquire() override;

/**
* As specified in:
* https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.RateLimitInfo
*/
void Update(
google::bigtable::v2::MutateRowsResponse const& response) override;

// Exposed for testing
Clock::duration period() const { return limiter_.period(); }

private:
std::shared_ptr<Clock> clock_;
RateLimiter limiter_;
std::function<void(Clock::duration)> on_wait_;
bool throttled_since_last_update_ = false;
Clock::time_point next_update_;
Clock::duration min_period_;
Clock::duration max_period_;
double min_factor_;
double max_factor_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace bigtable_internal
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_MUTATE_ROWS_LIMITER_H
Loading