diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 2bb9bd06b3e53..baf03e643a99b 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -195,6 +195,8 @@ add_library( internal/logging_data_client.h internal/prefix_range_end.cc internal/prefix_range_end.h + internal/rate_limiter.cc + internal/rate_limiter.h internal/readrowsparser.cc internal/readrowsparser.h internal/retry_traits.h @@ -385,6 +387,7 @@ if (BUILD_TESTING) internal/legacy_row_reader_test.cc internal/logging_data_client_test.cc internal/prefix_range_end_test.cc + internal/rate_limiter_test.cc internal/retry_traits_test.cc internal/traced_row_reader_test.cc legacy_table_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index e0f49e81b151f..a9f9c5e1d95b5 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -60,6 +60,7 @@ bigtable_client_unit_tests = [ "internal/legacy_row_reader_test.cc", "internal/logging_data_client_test.cc", "internal/prefix_range_end_test.cc", + "internal/rate_limiter_test.cc", "internal/retry_traits_test.cc", "internal/traced_row_reader_test.cc", "legacy_table_test.cc", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index a7218d855a646..7c47da05dd90c 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -97,6 +97,7 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/legacy_row_reader.h", "internal/logging_data_client.h", "internal/prefix_range_end.h", + "internal/rate_limiter.h", "internal/readrowsparser.h", "internal/retry_traits.h", "internal/row_reader_impl.h", @@ -196,6 +197,7 @@ google_cloud_cpp_bigtable_srcs = [ "internal/legacy_row_reader.cc", "internal/logging_data_client.cc", "internal/prefix_range_end.cc", + "internal/rate_limiter.cc", "internal/readrowsparser.cc", "internal/traced_row_reader.cc", "metadata_update_policy.cc", diff --git a/google/cloud/bigtable/internal/rate_limiter.cc b/google/cloud/bigtable/internal/rate_limiter.cc new file mode 100644 index 0000000000000..bb9ffe300d941 --- /dev/null +++ b/google/cloud/bigtable/internal/rate_limiter.cc @@ -0,0 +1,36 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/rate_limiter.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +RateLimiter::Clock::duration RateLimiter::acquire(std::int64_t tokens) { + auto const now = clock_->Now(); + std::lock_guard lk(mu_); + auto const wait = (std::max)(next_ - now, Clock::duration::zero()); + // We can potentially give out tokens from the last `smoothing_interval_`. + next_ = (std::max)(next_, now - smoothing_interval_); + next_ += tokens * period_; + return wait; +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/internal/rate_limiter.h b/google/cloud/bigtable/internal/rate_limiter.h new file mode 100644 index 0000000000000..a0c03411beebe --- /dev/null +++ b/google/cloud/bigtable/internal/rate_limiter.h @@ -0,0 +1,132 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_RATE_LIMITER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_RATE_LIMITER_H + +#include "google/cloud/internal/clock.h" +#include "google/cloud/log.h" +#include "google/cloud/version.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +/** + * A threadsafe interface for rate limiting. + * + * The caller needs to acquire a "token" to perform the operation under rate + * limits. This class limits the number of tokens issued per period of time, + * effectively limiting the operation rate. + * + * The caller may acquire more than one token at a time if it needs to perform a + * burst of the operation under rate limits. More tokens become available as + * time passes, with some maximum to limit the size of bursts. + * + * The allocation of resources must be a "prior reservation". That is, the + * caller must tell the `RateLimiter` how many tokens it wants to acquire + * *before* performing the operation. The `RateLimiter` will tell the caller + * when to perform the operation. + * + * The `RateLimiter` does not sleep. It is the responsibility of the caller to + * sleep. For example: + * + * @code + * auto clock = std::make_shared(); + * auto initial_period = std::chrono::milliseconds(100); + * RateLimiter limiter(clock, initial_period); + * while (MoreThingsToDo()) { + * auto wait = limiter.acquire(1); + * std::this_thread_sleep_for(wait); + * DoOneThing(); + * } + * @endcode + * + * Rate limiting does not start until after the first call to `acquire()`. + * Consider a caller asking for 100 tokens at 1 token/s. We do not want to wait + * 100s for this initial request. Instead, it goes through immediately, and the + * next request is scheduled for 100s from now. + * + * @see https://en.wikipedia.org/wiki/Flow_control_(data)#Open-loop_flow_control + */ +class RateLimiter { + public: + // Absolute time does not matter, so use a steady_clock which is guaranteed to + // increase with time. + using Clock = ::google::cloud::internal::SteadyClock; + + template + explicit RateLimiter(std::shared_ptr clock, + std::chrono::duration period) + : RateLimiter(clock, period, Clock::duration::zero()) {} + + template + explicit RateLimiter(std::shared_ptr clock, + std::chrono::duration period, + std::chrono::duration smoothing_interval) + : clock_(std::move(clock)), + smoothing_interval_(abs(smoothing_interval)), + period_(abs(period)), + next_(clock_->Now()) {} + + /** + * Returns the time to wait before performing the operation associated with + * this call. + * + * The caller can ask for multiple @p tokens, as a way to "weight" the + * operation. For example, instead of acquiring one token per request, you + * might choose to acquire one token per repeated field in a request. + */ + Clock::duration acquire(std::int64_t tokens); + + /** + * Set the period. + * + * Note that the current next_ has already been calculated. This new period + * will not apply to it. The new period will apply to every `acquire()` after + * next. + */ + template + void set_period(std::chrono::duration period) { + std::lock_guard lk(mu_); + period_ = abs(period); + } + Clock::duration period() const { return period_; } + + private: + // Note that std::chrono::abs() is not available until C++17. + template + static Clock::duration abs(std::chrono::duration d) { + return std::chrono::duration_cast( + d >= std::chrono::duration::zero() ? d : -d); + } + + std::mutex mu_; + std::shared_ptr clock_; + // Over any `smoothing_interval_`, we must average <= 1 token per `period_`. + Clock::duration smoothing_interval_; + Clock::duration period_; // ABSL_GUARDED_BY(mu_) + Clock::time_point next_; // ABSL_GUARDED_BY(mu_) +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_RATE_LIMITER_H diff --git a/google/cloud/bigtable/internal/rate_limiter_test.cc b/google/cloud/bigtable/internal/rate_limiter_test.cc new file mode 100644 index 0000000000000..ba9b5bf62b81e --- /dev/null +++ b/google/cloud/bigtable/internal/rate_limiter_test.cc @@ -0,0 +1,189 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/rate_limiter.h" +#include "google/cloud/testing_util/fake_clock.h" +#include "absl/time/time.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::testing_util::FakeSteadyClock; + +TEST(RateLimiter, NoWaitForInitialAcquire) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(1)); + + auto wait = limiter.acquire(100); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); +} + +TEST(RateLimiter, Basic) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(1)); + + for (auto i = 0; i != 10; ++i) { + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(i)); + } + + clock->AdvanceTime(std::chrono::seconds(10)); + for (auto i = 0; i != 10; ++i) { + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + clock->AdvanceTime(std::chrono::seconds(1)); + } +} + +TEST(RateLimiter, WaitsForEachToken) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(1)); + + (void)limiter.acquire(10); + + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(10)); +} + +TEST(RateLimiter, StoresTokens) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::milliseconds(500), + std::chrono::seconds(5)); + + // After 2 seconds, we should have 4 tokens banked. + clock->AdvanceTime(std::chrono::seconds(2)); + auto wait = limiter.acquire(10); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + + // We requested 10 tokens, with 4 tokens banked. We should have to wait 3 + // seconds to give out the remaining 6 tokens at a rate of 2 tokens per + // second. + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(3)); +} + +TEST(RateLimiter, StoresTokensUpToLimit) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(1), std::chrono::seconds(10)); + + // Wait for 100 seconds. We should be able to use 10 tokens from the last 10 + // seconds of this interval. + clock->AdvanceTime(std::chrono::seconds(100)); + (void)limiter.acquire(30); + + // We should have to wait for 30 - 10 = 20 tokens. + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(20)); +} + +TEST(RateLimiter, PeriodLessThanOneSecond) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::milliseconds(100)); + + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Milliseconds(100)); +} + +TEST(RateLimiter, PeriodGreaterThanOneSecond) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(10)); + + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(10)); +} + +TEST(RateLimiter, SetPeriodEventuallyTakesAffect) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::milliseconds(100)); + + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + + limiter.set_period(std::chrono::milliseconds(200)); + EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(200)); + + // The return of this call to `acquire()` has already been determined at the + // 10 QPS rate. + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Milliseconds(100)); + + // Every subsequent call should add on .2 seconds. + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Milliseconds(300)); + + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Milliseconds(500)); +} + +TEST(RateLimiter, AbsoluteValueOfPeriod) { + auto clock = std::make_shared(); + RateLimiter limiter(clock, -std::chrono::seconds(10)); + + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration()); + + limiter.set_period(-std::chrono::seconds(5)); + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(10)); + + wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(15)); +} + +TEST(RateLimiter, ThreadSafety) { + // - Set rate to 1 QPS + // - Spin off N threads + // - In each thread do M acquires at time now + // + // We expect that N * M + 1 acquires yields a wait time of N * M seconds. + + auto constexpr kThreadCount = 8; + auto constexpr kAcquiresPerThread = 1000; + + auto clock = std::make_shared(); + RateLimiter limiter(clock, std::chrono::seconds(1)); + + auto work = [&limiter](int acquires) { + for (auto i = 0; i != acquires; ++i) (void)limiter.acquire(1); + }; + std::vector v; + v.reserve(kThreadCount); + for (auto i = 0; i != kThreadCount; ++i) { + v.emplace_back(work, kAcquiresPerThread); + } + for (auto& t : v) { + t.join(); + } + + // Make sure we didn't drop any individual acquires + auto wait = limiter.acquire(1); + EXPECT_EQ(absl::FromChrono(wait), + absl::Seconds(kThreadCount * kAcquiresPerThread)); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google