Skip to content

Commit

Permalink
impl(bigtable): MutateRowsLimiter can do async
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc committed Nov 28, 2023
1 parent 4dbcd15 commit cde8bef
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 25 deletions.
32 changes: 21 additions & 11 deletions google/cloud/bigtable/internal/mutate_rows_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ void ThrottlingMutateRowsLimiter::Acquire() {
on_wait_(wait);
}

future<void> ThrottlingMutateRowsLimiter::AsyncAcquire() {
auto wait = limiter_.acquire(1);
throttled_since_last_update_ =
throttled_since_last_update_ || wait != Clock::duration::zero();
return async_on_wait_(wait);
}

void ThrottlingMutateRowsLimiter::Update(
google::bigtable::v2::MutateRowsResponse const& response) {
if (!response.has_rate_limit_info()) return;
Expand All @@ -70,18 +77,21 @@ void ThrottlingMutateRowsLimiter::Update(

std::shared_ptr<MutateRowsLimiter> MakeMutateRowsLimiter(
Options const& options) {
if (options.get<bigtable::experimental::BulkApplyThrottlingOption>()) {
using duration = ThrottlingMutateRowsLimiter::Clock::duration;
std::function<void(duration)> sleeper = [](duration d) {
std::this_thread::sleep_for(d);
};
sleeper = internal::MakeTracedSleeper(
options, std::move(sleeper), "gl-cpp.bigtable.bulk_apply_throttling");
return std::make_shared<ThrottlingMutateRowsLimiter>(
std::make_shared<internal::SteadyClock>(), std::move(sleeper),
kInitialPeriod, kMinPeriod, kMaxPeriod, kMinFactor, kMaxFactor);
if (!options.get<bigtable::experimental::BulkApplyThrottlingOption>()) {
return std::make_shared<NoopMutateRowsLimiter>();
}
return std::make_shared<NoopMutateRowsLimiter>();
using duration = ThrottlingMutateRowsLimiter::Clock::duration;
std::function<void(duration)> sleeper = [](duration d) {
std::this_thread::sleep_for(d);
};
sleeper = internal::MakeTracedSleeper(
options, std::move(sleeper), "gl-cpp.bigtable.bulk_apply_throttling");
// TODO(#12959) - use a real sleeper.
auto async_sleeper = [](duration) { return make_ready_future(); };
return std::make_shared<ThrottlingMutateRowsLimiter>(
std::make_shared<internal::SteadyClock>(), std::move(sleeper),
std::move(async_sleeper), kInitialPeriod, kMinPeriod, kMaxPeriod,
kMinFactor, kMaxFactor);
}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
8 changes: 8 additions & 0 deletions google/cloud/bigtable/internal/mutate_rows_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_MUTATE_ROWS_LIMITER_H

#include "google/cloud/bigtable/internal/rate_limiter.h"
#include "google/cloud/future.h"
#include "google/cloud/internal/clock.h"
#include "google/cloud/options.h"
#include "google/cloud/version.h"
Expand All @@ -34,13 +35,15 @@ class MutateRowsLimiter {
public:
virtual ~MutateRowsLimiter() = default;
virtual void Acquire() = 0;
virtual future<void> AsyncAcquire() = 0;
virtual void Update(
google::bigtable::v2::MutateRowsResponse const& response) = 0;
};

class NoopMutateRowsLimiter : public MutateRowsLimiter {
public:
void Acquire() override {}
future<void> AsyncAcquire() override { return make_ready_future(); }
void Update(google::bigtable::v2::MutateRowsResponse const&) override {}
};

Expand All @@ -52,13 +55,15 @@ class ThrottlingMutateRowsLimiter : public MutateRowsLimiter {
explicit ThrottlingMutateRowsLimiter(
std::shared_ptr<Clock> clock,
std::function<void(Clock::duration)> on_wait,
std::function<future<void>(Clock::duration)> async_on_wait,
std::chrono::duration<Rep1, Period1> initial_period,
std::chrono::duration<Rep2, Period2> min_period,
std::chrono::duration<Rep3, Period3> max_period, double min_factor,
double max_factor)
: clock_(std::move(clock)),
limiter_(clock_, initial_period),
on_wait_(std::move(on_wait)),
async_on_wait_(std::move(async_on_wait)),
next_update_(clock_->Now()),
min_period_(std::chrono::duration_cast<Clock::duration>(min_period)),
max_period_(std::chrono::duration_cast<Clock::duration>(max_period)),
Expand All @@ -67,6 +72,8 @@ class ThrottlingMutateRowsLimiter : public MutateRowsLimiter {

void Acquire() override;

future<void> AsyncAcquire() override;

/**
* As specified in:
* https://cloud.google.com/bigtable/docs/reference/data/rpc/google.bigtable.v2#google.bigtable.v2.RateLimitInfo
Expand All @@ -81,6 +88,7 @@ class ThrottlingMutateRowsLimiter : public MutateRowsLimiter {
std::shared_ptr<Clock> clock_;
RateLimiter limiter_;
std::function<void(Clock::duration)> on_wait_;
std::function<future<void>(Clock::duration)> async_on_wait_;
bool throttled_since_last_update_ = false;
Clock::time_point next_update_;
Clock::duration min_period_;
Expand Down
57 changes: 43 additions & 14 deletions google/cloud/bigtable/internal/mutate_rows_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ std::function<void(Clock::duration)> ExpectWait(absl::Duration expected_wait) {
};
}

std::function<future<void>(Clock::duration)> ExpectWaitAsync(
absl::Duration expected_wait) {
return [expected_wait](auto actual_wait) {
EXPECT_EQ(absl::FromChrono(actual_wait), expected_wait);
return make_ready_future();
};
}

auto noop = [](auto) {};
auto async_noop = [](auto) { return make_ready_future(); };

void InduceThrottling(MutateRowsLimiter& limiter) {
limiter.Acquire();
Expand All @@ -69,7 +78,7 @@ TEST(MutateRowsLimiter, BasicRateLimiting) {
.WillOnce(ExpectWait(absl::Seconds(2)))
.WillOnce(ExpectWait(absl::ZeroDuration()));

ThrottlingMutateRowsLimiter limiter(clock, mock.AsStdFunction(),
ThrottlingMutateRowsLimiter limiter(clock, mock.AsStdFunction(), async_noop,
std::chrono::seconds(1), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
limiter.Acquire();
Expand All @@ -83,8 +92,8 @@ TEST(MutateRowsLimiter, BasicRateLimiting) {
TEST(MutateRowsLimiter, ResponseWithoutRateLimitInfo) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(
clock, noop, std::chrono::milliseconds(100), kMinPeriod, kMaxPeriod,
kMinFactor, kMaxFactor);
clock, noop, async_noop, std::chrono::milliseconds(100), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(100));

limiter.Update(google::bigtable::v2::MutateRowsResponse{});
Expand All @@ -94,8 +103,8 @@ TEST(MutateRowsLimiter, ResponseWithoutRateLimitInfo) {
TEST(MutateRowsLimiter, NoRateIncreaseIfNoThrottlingSinceLastUpdate) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(
clock, noop, std::chrono::milliseconds(100), kMinPeriod, kMaxPeriod,
kMinFactor, kMaxFactor);
clock, noop, async_noop, std::chrono::milliseconds(100), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(100));

limiter.Update(MakeResponse(1.25, std::chrono::milliseconds(0)));
Expand All @@ -111,7 +120,7 @@ TEST(MutateRowsLimiter, NoRateIncreaseIfNoThrottlingSinceLastUpdate) {

TEST(MutateRowsLimiter, RateCanDecreaseIfNoThrottlingSinceLastUpdate) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(clock, noop,
ThrottlingMutateRowsLimiter limiter(clock, noop, async_noop,
std::chrono::milliseconds(64), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(64));
Expand All @@ -129,9 +138,9 @@ TEST(MutateRowsLimiter, RateCanDecreaseIfNoThrottlingSinceLastUpdate) {

TEST(MutateRowsLimiter, UpdateClampsMinFactor) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(clock, noop, std::chrono::milliseconds(7),
kMinPeriod, kMaxPeriod, /*min_factor=*/.7,
kMaxFactor);
ThrottlingMutateRowsLimiter limiter(
clock, noop, async_noop, std::chrono::milliseconds(7), kMinPeriod,
kMaxPeriod, /*min_factor=*/.7, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(7));

limiter.Update(MakeResponse(.5, std::chrono::milliseconds(0)));
Expand All @@ -141,8 +150,8 @@ TEST(MutateRowsLimiter, UpdateClampsMinFactor) {
TEST(MutateRowsLimiter, UpdateClampsMaxFactor) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(
clock, noop, std::chrono::milliseconds(13), kMinPeriod, kMaxPeriod,
kMinFactor, /*max_factor=*/1.3);
clock, noop, async_noop, std::chrono::milliseconds(13), kMinPeriod,
kMaxPeriod, kMinFactor, /*max_factor=*/1.3);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(13));

InduceThrottling(limiter);
Expand All @@ -153,7 +162,7 @@ TEST(MutateRowsLimiter, UpdateClampsMaxFactor) {
TEST(MutateRowsLimiter, UpdateClampsResultToMinPeriod) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(
clock, noop, std::chrono::microseconds(11),
clock, noop, async_noop, std::chrono::microseconds(11),
/*min_period=*/std::chrono::microseconds(10), kMaxPeriod, kMinFactor,
kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Microseconds(11));
Expand All @@ -166,7 +175,7 @@ TEST(MutateRowsLimiter, UpdateClampsResultToMinPeriod) {
TEST(MutateRowsLimiter, UpdateClampsResultToMaxPeriod) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(
clock, noop, std::chrono::seconds(9), kMinPeriod,
clock, noop, async_noop, std::chrono::seconds(9), kMinPeriod,
/*max_period=*/std::chrono::seconds(10), kMinFactor, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Seconds(9));

Expand All @@ -176,7 +185,7 @@ TEST(MutateRowsLimiter, UpdateClampsResultToMaxPeriod) {

TEST(MutateRowsLimiter, UpdateRespectsResponsePeriod) {
auto clock = std::make_shared<FakeSteadyClock>();
ThrottlingMutateRowsLimiter limiter(clock, noop,
ThrottlingMutateRowsLimiter limiter(clock, noop, async_noop,
std::chrono::milliseconds(64), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
EXPECT_EQ(absl::FromChrono(limiter.period()), absl::Milliseconds(64));
Expand All @@ -199,6 +208,26 @@ TEST(MutateRowsLimiter, UpdateRespectsResponsePeriod) {
EXPECT_LE(absl::FromChrono(limiter.period()), absl::Milliseconds(100));
}

TEST(MutateRowsLimiter, AsyncBasicRateLimiting) {
auto clock = std::make_shared<FakeSteadyClock>();
MockFunction<future<void>(FakeSteadyClock::duration)> mock;
EXPECT_CALL(mock, Call)
.WillOnce(ExpectWaitAsync(absl::ZeroDuration()))
.WillOnce(ExpectWaitAsync(absl::Seconds(1)))
.WillOnce(ExpectWaitAsync(absl::Seconds(2)))
.WillOnce(ExpectWaitAsync(absl::ZeroDuration()));

ThrottlingMutateRowsLimiter limiter(clock, noop, mock.AsStdFunction(),
std::chrono::seconds(1), kMinPeriod,
kMaxPeriod, kMinFactor, kMaxFactor);
limiter.AsyncAcquire().get();
limiter.AsyncAcquire().get();
limiter.AsyncAcquire().get();

clock->AdvanceTime(std::chrono::seconds(3));
limiter.AsyncAcquire();
}

TEST(MutateRowsLimiter, MakeMutateRowsLimiter) {
auto noop =
MakeMutateRowsLimiter(Options{}.set<BulkApplyThrottlingOption>(false));
Expand Down
1 change: 1 addition & 0 deletions google/cloud/bigtable/testing/mock_mutate_rows_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ namespace testing {
class MockMutateRowsLimiter : public bigtable_internal::MutateRowsLimiter {
public:
MOCK_METHOD(void, Acquire, (), (override));
MOCK_METHOD(future<void>, AsyncAcquire, (), (override));
MOCK_METHOD(void, Update, (google::bigtable::v2::MutateRowsResponse const&),
(override));
};
Expand Down

0 comments on commit cde8bef

Please sign in to comment.