Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
dbolduc committed Nov 7, 2023
1 parent c61e53b commit 56531f9
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 60 deletions.
7 changes: 3 additions & 4 deletions google/cloud/bigtable/internal/rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

RateLimiter::Clock::duration RateLimiter::acquire(std::int64_t tokens) {
auto const now = clock_->Now();
std::unique_lock<std::mutex> lk(mu_);
std::lock_guard<std::mutex> lk(mu_);
auto const wait = (std::max)(next_ - now, Clock::duration::zero());
// We can only keep up to M stored tokens.
next_ = (std::max)(next_, now - max_stored_tokens_ * period_);
// We can potentially give out tokens from the last `smoothing_interval_`.
next_ = (std::max)(next_, now - smoothing_interval_);
next_ += tokens * period_;
lk.unlock();
return wait;
}

Expand Down
26 changes: 17 additions & 9 deletions google/cloud/bigtable/internal/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
* limits. This class limits the number of tokens issued per period of time,
* effectively limiting the operation rate.
*
* The caller may acquire more than one token at a time, if it needs to perform
* a burst of the operation under rate limits. More tokens become available as
* The caller may acquire more than one token at a time if it needs to perform a
* burst of the operation under rate limits. More tokens become available as
* time passes, with some maximum to limit the size of bursts.
*
* The allocation of resources must be a "prior reservation". That is, the
Expand Down Expand Up @@ -70,17 +70,24 @@ class RateLimiter {
// increase with time.
using Clock = ::google::cloud::internal::SteadyClock;

template <typename Rep, typename Period>
template <typename Rep1, typename Period1>
explicit RateLimiter(std::shared_ptr<Clock> clock,
std::chrono::duration<Rep1, Period1> period)
: RateLimiter(clock, period, Clock::duration::zero()) {}

template <typename Rep1, typename Period1, typename Rep2, typename Period2>
explicit RateLimiter(std::shared_ptr<Clock> clock,
std::chrono::duration<Rep, Period> period,
std::int64_t max_stored_tokens = 0)
std::chrono::duration<Rep1, Period1> period,
std::chrono::duration<Rep2, Period2> smoothing_interval)
: clock_(std::move(clock)),
// Note that std::chrono::abs() is not available until C++17.
smoothing_interval_(std::chrono::duration_cast<Clock::duration>(
smoothing_interval >= Clock::duration::zero()
? smoothing_interval
: -smoothing_interval)),
period_(std::chrono::duration_cast<Clock::duration>(
period >= Clock::duration::zero() ? period : -period)),
// Start with a full set of tokens.
next_(clock_->Now() - max_stored_tokens * period_),
max_stored_tokens_(max_stored_tokens) {}
next_(clock_->Now()) {}

/**
* Returns the time to wait before performing the operation associated with
Expand Down Expand Up @@ -111,9 +118,10 @@ class RateLimiter {
private:
std::mutex mu_;
std::shared_ptr<Clock> clock_;
// Over any `smoothing_interval_`, we must average <= 1 token per `period_`.
Clock::duration smoothing_interval_;
Clock::duration period_; // ABSL_GUARDED_BY(mu_)
Clock::time_point next_; // ABSL_GUARDED_BY(mu_)
std::int64_t max_stored_tokens_ = 0;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
71 changes: 24 additions & 47 deletions google/cloud/bigtable/internal/rate_limiter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ using ::google::cloud::testing_util::FakeSteadyClock;

TEST(RateLimiter, NoWaitForInitialAcquire) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 0);
RateLimiter limiter(clock, std::chrono::seconds(1));

auto wait = limiter.acquire(100);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());
}

TEST(RateLimiter, Basic) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 0);
RateLimiter limiter(clock, std::chrono::seconds(1));

for (auto i = 0; i != 10; ++i) {
auto wait = limiter.acquire(1);
Expand All @@ -50,73 +50,50 @@ TEST(RateLimiter, Basic) {
}
}

TEST(RateLimiter, WaitsForEachPermit) {
TEST(RateLimiter, WaitsForEachToken) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 0);
RateLimiter limiter(clock, std::chrono::seconds(1));

(void)limiter.acquire(10);

auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(10));
}

TEST(RateLimiter, SpendsStorage) {
TEST(RateLimiter, StoresTokens) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 10);
RateLimiter limiter(clock, std::chrono::milliseconds(500),
std::chrono::seconds(5));

// We start with 10 stored permits. We spend 6 of them. We have 4 remaining.
auto wait = limiter.acquire(6);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());

// We are asked for 6 permits. We spend our 4 stored permits. We still have 2
// permits to deal with. Schedule the *next* call to `acquire()` for 2 seconds
// from now.
wait = limiter.acquire(6);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());

wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(2));
}

TEST(RateLimiter, StoresPermits) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::milliseconds(500), 10);

// We start with 10 stored permits. Spend them immediately.
auto wait = limiter.acquire(10);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());

// After 2 seconds, we should have 4 permits banked.
// After 2 seconds, we should have 4 tokens banked.
clock->AdvanceTime(std::chrono::seconds(2));
wait = limiter.acquire(10);
auto wait = limiter.acquire(10);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());

// We requested 10 permits, with 4 permits banked. We should have to wait 3
// seconds to give out the remaining 6 permits at a rate of 2 permits per
// We requested 10 tokens, with 4 tokens banked. We should have to wait 3
// seconds to give out the remaining 6 tokens at a rate of 2 tokens per
// second.
wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(3));
}

TEST(RateLimiter, AddsAsMuchAsItCanStore) {
TEST(RateLimiter, StoresTokensUpToLimit) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 10);

// Spend 5 stored permits.
(void)limiter.acquire(5);
RateLimiter limiter(clock, std::chrono::seconds(1), std::chrono::seconds(10));

// Wait for 100 seconds. We should be capped at the max stored permits, 10.
// Wait for 100 seconds. We should be able to use 10 tokens from the last 10
// seconds of this interval.
clock->AdvanceTime(std::chrono::seconds(100));
(void)limiter.acquire(30);

// We should have to wait for 30 - 10 = 20 permits.
// We should have to wait for 30 - 10 = 20 tokens.
auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(20));
}

TEST(RateLimiter, Rate) {
TEST(RateLimiter, Period) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::milliseconds(100), 0);
RateLimiter limiter(clock, std::chrono::milliseconds(100));

auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());
Expand All @@ -125,9 +102,9 @@ TEST(RateLimiter, Rate) {
EXPECT_EQ(absl::FromChrono(wait), absl::Milliseconds(100));
}

TEST(RateLimiter, RateLessThanOne) {
TEST(RateLimiter, PeriodLessThanOne) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(10), 0);
RateLimiter limiter(clock, std::chrono::seconds(10));

auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());
Expand All @@ -136,9 +113,9 @@ TEST(RateLimiter, RateLessThanOne) {
EXPECT_EQ(absl::FromChrono(wait), absl::Seconds(10));
}

TEST(RateLimiter, SetRateEventuallyTakesAffect) {
TEST(RateLimiter, SetPeriodEventuallyTakesAffect) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::milliseconds(100), 0);
RateLimiter limiter(clock, std::chrono::milliseconds(100));

auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());
Expand All @@ -161,7 +138,7 @@ TEST(RateLimiter, SetRateEventuallyTakesAffect) {

TEST(RateLimiter, AbsoluteValueOfPeriod) {
auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, -std::chrono::seconds(10), 0);
RateLimiter limiter(clock, -std::chrono::seconds(10));

auto wait = limiter.acquire(1);
EXPECT_EQ(absl::FromChrono(wait), absl::ZeroDuration());
Expand All @@ -185,7 +162,7 @@ TEST(RateLimiter, ThreadSafety) {
auto constexpr kAcquiresPerThread = 1000;

auto clock = std::make_shared<FakeSteadyClock>();
RateLimiter limiter(clock, std::chrono::seconds(1), 0);
RateLimiter limiter(clock, std::chrono::seconds(1));

auto work = [&limiter](int acquires) {
for (auto i = 0; i != acquires; ++i) (void)limiter.acquire(1);
Expand Down

0 comments on commit 56531f9

Please sign in to comment.