Skip to content

Commit

Permalink
refactor(pubsub): use the clock class instead of a lambda (#13389)
Browse files Browse the repository at this point in the history
* feat(pubsub): add lease management for unary pull

* refactor(pubsub): use the clock class instead of a lambda for the lease manager

* revert pull ack handler changes
  • Loading branch information
alevenberg authored Jan 12, 2024
1 parent 35d7b49 commit db22daa
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 96 deletions.
11 changes: 6 additions & 5 deletions google/cloud/pubsub/internal/default_pull_lease_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,17 @@ std::chrono::seconds DefaultLeaseExtension(Options const& options) {

DefaultPullLeaseManager::DefaultPullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options options,
pubsub::Subscription subscription, std::string ack_id, Clock clock)
pubsub::Subscription subscription, std::string ack_id,
std::shared_ptr<Clock> clock)
: cq_(std::move(cq)),
stub_(std::move(w)),
options_(std::move(options)),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
clock_(std::move(clock)),
lease_deadline_(DefaultLeaseDeadline(clock_(), options_)),
lease_deadline_(DefaultLeaseDeadline(clock_->Now(), options_)),
lease_extension_(DefaultLeaseExtension(options_)),
current_lease_(clock_() + kMinimalLeaseExtension) {}
current_lease_(clock_->Now() + kMinimalLeaseExtension) {}

DefaultPullLeaseManager::~DefaultPullLeaseManager() {
if (!timer_.valid()) return;
Expand All @@ -59,7 +60,7 @@ DefaultPullLeaseManager::~DefaultPullLeaseManager() {
void DefaultPullLeaseManager::StartLeaseLoop() {
auto s = stub_.lock();
if (!s) return;
auto const now = clock_();
auto const now = clock_->Now();

// Check if the lease has expired, or is so close to expiring that we cannot
// extend it. In either case, simply return and stop the loop.
Expand Down Expand Up @@ -94,7 +95,7 @@ future<Status> DefaultPullLeaseManager::ExtendLease(
google::cloud::Idempotency::kIdempotent, cq_,
[stub = std::move(stub), deadline = now + extension, clock = clock_](
auto cq, auto context, auto const& request) {
if (deadline < clock()) {
if (deadline < clock->Now()) {
return make_ready_future(
Status(StatusCode::kDeadlineExceeded, "lease already expired"));
}
Expand Down
13 changes: 7 additions & 6 deletions google/cloud/pubsub/internal/default_pull_lease_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "google/cloud/pubsub/subscription.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/internal/clock.h"
#include "google/cloud/options.h"
#include "google/cloud/status_or.h"
#include <chrono>
Expand All @@ -39,12 +40,12 @@ class DefaultPullLeaseManager
: public PullLeaseManager,
public std::enable_shared_from_this<DefaultPullLeaseManager> {
public:
using Clock = std::function<std::chrono::system_clock::time_point()>;
using Clock = ::google::cloud::internal::SystemClock;

DefaultPullLeaseManager(CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
Options options, pubsub::Subscription subscription,
std::string ack_id,
Clock clock = std::chrono::system_clock::now);
DefaultPullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options options,
pubsub::Subscription subscription, std::string ack_id,
std::shared_ptr<Clock> clock = std::make_shared<Clock>());
~DefaultPullLeaseManager() override;

void StartLeaseLoop() override;
Expand Down Expand Up @@ -75,7 +76,7 @@ class DefaultPullLeaseManager
Options options_;
pubsub::Subscription subscription_;
std::string ack_id_;
Clock clock_;
std::shared_ptr<Clock> clock_;
// The absolute deadline to complete processing the message.
// The application can configure this value using
// `pubsub::MaxDeadlineTimeOption`
Expand Down
116 changes: 50 additions & 66 deletions google/cloud/pubsub/internal/default_pull_lease_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/testing_util/async_sequencer.h"
#include "google/cloud/testing_util/fake_clock.h"
#include "google/cloud/testing_util/mock_completion_queue_impl.h"
#include "google/cloud/testing_util/status_matchers.h"
#include <gmock/gmock.h>
Expand All @@ -30,6 +31,7 @@ namespace {

using ::google::cloud::pubsub_testing::MockSubscriberStub;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::FakeSystemClock;
using ::google::cloud::testing_util::StatusIs;
using ::google::pubsub::v1::ModifyAckDeadlineRequest;
using ::testing::_;
Expand All @@ -43,9 +45,6 @@ using ::testing::Pointee;
using ::testing::Property;
using ::testing::Return;

using MockClock =
::testing::MockFunction<std::chrono::system_clock::time_point()>;

Options MakeTestOptions() {
return google::cloud::pubsub_testing::MakeTestOptions(
Options{}
Expand All @@ -72,13 +71,10 @@ Status PermanentError() {
TEST(DefaultPullLeaseManager, SimpleLeaseLoop) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto const start = std::chrono::system_clock::now();
auto current_time = start;
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
Expand Down Expand Up @@ -133,33 +129,33 @@ TEST(DefaultPullLeaseManager, SimpleLeaseLoop) {
});
});
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction());
cq, mock, options, subscription, "test-ack-id", clock);
manager->StartLeaseLoop();
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
EXPECT_THAT(manager->current_lease(), current_time + kLeaseExtension);
EXPECT_THAT(manager->current_lease(), clock->Now() + kLeaseExtension);

current_time = current_time + kLeaseExtension - kLeaseSlack;
clock->AdvanceTime(kLeaseExtension - kLeaseSlack);
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
EXPECT_THAT(manager->current_lease(), current_time + kLeaseExtension);
EXPECT_THAT(manager->current_lease(), clock->Now() + kLeaseExtension);

// This is close to the end of the lifetime
current_time = current_time + kLeaseExtension - kLeaseSlack;
clock->AdvanceTime(kLeaseExtension - kLeaseSlack);
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
EXPECT_THAT(manager->current_lease(), current_time + kLastLeaseExtension);
EXPECT_THAT(manager->current_lease(), clock->Now() + kLastLeaseExtension);

// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
Expand All @@ -170,19 +166,16 @@ TEST(DefaultPullLeaseManager, SimpleLeaseLoop) {
TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyReleased) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
clock.AsStdFunction());
std::move(clock));
// This can happen if the subscriber is shutdown, but the application manages
// to hold to a `AckHandler` reference. In this case, we expect the loop to
// stop (or have no effect).
Expand All @@ -193,68 +186,58 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyReleased) {
TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastMaxExtension) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
clock.AsStdFunction());
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
EXPECT_THAT(manager->lease_deadline(),
Eq(current_time + std::chrono::seconds(300)));
current_time = current_time + std::chrono::seconds(301);
// See the MakeTestOptions() for the magic number.
clock->AdvanceTime(std::chrono::seconds(301));
manager->StartLeaseLoop();
// This is a "AsyncModifyAckDeadline() is not called" test.
}

TEST(DefaultPullLeaseManager, StartLeaseLoopTooCloseMaxExtension) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
clock.AsStdFunction());
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
EXPECT_THAT(manager->lease_deadline(),
Eq(current_time + std::chrono::seconds(300)));
current_time =
current_time + std::chrono::seconds(299) + std::chrono::milliseconds(500);
// See the MakeTestOptions() for the magic number.
clock->AdvanceTime(std::chrono::seconds(299) +
std::chrono::milliseconds(500));
manager->StartLeaseLoop();
// This is a "AsyncModifyAckDeadline() is not called" test.
}

TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastCurrentExtension) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id",
clock.AsStdFunction());
cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock);
EXPECT_GT(manager->current_lease(), current_time);
current_time = manager->current_lease();
clock->SetTime(manager->current_lease());
manager->StartLeaseLoop();
// This is a "AsyncModifyAckDeadline() is not called" test.
}
Expand All @@ -263,8 +246,8 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();
Expand All @@ -276,7 +259,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MaxDeadlineTimeOption>(std::chrono::seconds(300))
.set<pubsub::MinDeadlineExtensionOption>(
std::chrono::seconds(10))),
subscription, "test-ack-id", clock.AsStdFunction());
subscription, "test-ack-id", clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9));
Expand All @@ -288,7 +271,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MaxDeadlineTimeOption>(std::chrono::seconds(300))
.set<pubsub::MaxDeadlineExtensionOption>(
std::chrono::seconds(30))),
subscription, "test-ack-id", clock.AsStdFunction());
subscription, "test-ack-id", clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(29));
Expand All @@ -301,7 +284,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) {
.set<pubsub::MinDeadlineExtensionOption>(std::chrono::seconds(10))
.set<pubsub::MaxDeadlineExtensionOption>(
std::chrono::seconds(30))),
subscription, "test-ack-id", clock.AsStdFunction());
subscription, "test-ack-id", clock);
EXPECT_EQ(manager->lease_deadline(),
current_time + std::chrono::seconds(300));
EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9));
Expand All @@ -325,10 +308,10 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction());
cq, mock, options, subscription, "test-ack-id", std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_STATUS_OK(status.get());
Expand All @@ -344,13 +327,12 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineExceeded) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto current_time = std::chrono::system_clock::now();
MockClock clock;
// Set the time for the clock call to after the current time + extension.
EXPECT_CALL(clock, Call).WillRepeatedly([&] {
return current_time + std::chrono::seconds(11);
});
auto clock = std::make_shared<FakeSystemClock>();
// Set the time for the clock call to after the current time +
// extension.
clock->SetTime(current_time + std::chrono::seconds(11));
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction());
cq, mock, options, subscription, "test-ack-id", std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_THAT(status.get(),
Expand All @@ -375,10 +357,10 @@ TEST(DefaultPullLeaseManager, ExtendLeasePermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto current_time = std::chrono::system_clock::now();
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(current_time);
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction());
cq, mock, options, subscription, "test-ack-id", std::move(clock));

auto status = manager->ExtendLease(mock, current_time, kLeaseExtension);
EXPECT_THAT(status.get(),
Expand All @@ -390,9 +372,10 @@ TEST(DefaultPullLeaseManager, Subscription) {
auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
MockClock clock;
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(std::chrono::system_clock::now());
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, Options{}, subscription, "test-ack-id", clock.AsStdFunction());
cq, mock, Options{}, subscription, "test-ack-id", std::move(clock));

EXPECT_EQ(manager->subscription(), subscription);
}
Expand All @@ -401,11 +384,12 @@ TEST(DefaultPullLeaseManager, AckId) {
auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
MockClock clock;
auto clock = std::make_shared<FakeSystemClock>();
clock->SetTime(std::chrono::system_clock::now());
auto manager = std::make_shared<DefaultPullLeaseManager>(
cq, mock, Options{},
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
clock.AsStdFunction());
std::move(clock));

EXPECT_EQ(manager->ack_id(), "test-ack-id");
}
Expand Down
5 changes: 2 additions & 3 deletions google/cloud/pubsub/internal/pull_lease_manager_factory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
std::shared_ptr<PullLeaseManager> MakePullLeaseManager(
CompletionQueue cq, std::weak_ptr<SubscriberStub> stub,
pubsub::Subscription subscription, std::string ack_id,
Options const& options,
std::function<std::chrono::system_clock::time_point()> const& clock) {
Options const& options, std::shared_ptr<Clock> clock) {
std::shared_ptr<PullLeaseManager> manager =
std::make_shared<pubsub_internal::DefaultPullLeaseManager>(
std::move(cq), std::move(stub), options, std::move(subscription),
std::move(ack_id), clock);
std::move(ack_id), std::move(clock));
if (internal::TracingEnabled(options)) {
manager = MakeTracingPullLeaseManager(std::move(manager));
}
Expand Down
Loading

0 comments on commit db22daa

Please sign in to comment.