diff --git a/google/cloud/pubsub/internal/default_pull_lease_manager.cc b/google/cloud/pubsub/internal/default_pull_lease_manager.cc index 1139f99be9b04..622870e67758f 100644 --- a/google/cloud/pubsub/internal/default_pull_lease_manager.cc +++ b/google/cloud/pubsub/internal/default_pull_lease_manager.cc @@ -40,16 +40,17 @@ std::chrono::seconds DefaultLeaseExtension(Options const& options) { DefaultPullLeaseManager::DefaultPullLeaseManager( CompletionQueue cq, std::weak_ptr w, Options options, - pubsub::Subscription subscription, std::string ack_id, Clock clock) + pubsub::Subscription subscription, std::string ack_id, + std::shared_ptr clock) : cq_(std::move(cq)), stub_(std::move(w)), options_(std::move(options)), subscription_(std::move(subscription)), ack_id_(std::move(ack_id)), clock_(std::move(clock)), - lease_deadline_(DefaultLeaseDeadline(clock_(), options_)), + lease_deadline_(DefaultLeaseDeadline(clock_->Now(), options_)), lease_extension_(DefaultLeaseExtension(options_)), - current_lease_(clock_() + kMinimalLeaseExtension) {} + current_lease_(clock_->Now() + kMinimalLeaseExtension) {} DefaultPullLeaseManager::~DefaultPullLeaseManager() { if (!timer_.valid()) return; @@ -59,7 +60,7 @@ DefaultPullLeaseManager::~DefaultPullLeaseManager() { void DefaultPullLeaseManager::StartLeaseLoop() { auto s = stub_.lock(); if (!s) return; - auto const now = clock_(); + auto const now = clock_->Now(); // Check if the lease has expired, or is so close to expiring that we cannot // extend it. In either case, simply return and stop the loop. @@ -94,7 +95,7 @@ future DefaultPullLeaseManager::ExtendLease( google::cloud::Idempotency::kIdempotent, cq_, [stub = std::move(stub), deadline = now + extension, clock = clock_]( auto cq, auto context, auto const& request) { - if (deadline < clock()) { + if (deadline < clock->Now()) { return make_ready_future( Status(StatusCode::kDeadlineExceeded, "lease already expired")); } diff --git a/google/cloud/pubsub/internal/default_pull_lease_manager.h b/google/cloud/pubsub/internal/default_pull_lease_manager.h index ecae9fc6ba08c..29bf7fb3501a0 100644 --- a/google/cloud/pubsub/internal/default_pull_lease_manager.h +++ b/google/cloud/pubsub/internal/default_pull_lease_manager.h @@ -20,6 +20,7 @@ #include "google/cloud/pubsub/subscription.h" #include "google/cloud/completion_queue.h" #include "google/cloud/future.h" +#include "google/cloud/internal/clock.h" #include "google/cloud/options.h" #include "google/cloud/status_or.h" #include @@ -39,12 +40,12 @@ class DefaultPullLeaseManager : public PullLeaseManager, public std::enable_shared_from_this { public: - using Clock = std::function; + using Clock = ::google::cloud::internal::SystemClock; - DefaultPullLeaseManager(CompletionQueue cq, std::weak_ptr w, - Options options, pubsub::Subscription subscription, - std::string ack_id, - Clock clock = std::chrono::system_clock::now); + DefaultPullLeaseManager( + CompletionQueue cq, std::weak_ptr w, Options options, + pubsub::Subscription subscription, std::string ack_id, + std::shared_ptr clock = std::make_shared()); ~DefaultPullLeaseManager() override; void StartLeaseLoop() override; @@ -75,7 +76,7 @@ class DefaultPullLeaseManager Options options_; pubsub::Subscription subscription_; std::string ack_id_; - Clock clock_; + std::shared_ptr clock_; // The absolute deadline to complete processing the message. // The application can configure this value using // `pubsub::MaxDeadlineTimeOption` diff --git a/google/cloud/pubsub/internal/default_pull_lease_manager_test.cc b/google/cloud/pubsub/internal/default_pull_lease_manager_test.cc index 08dbe033b476b..51b5cde07b237 100644 --- a/google/cloud/pubsub/internal/default_pull_lease_manager_test.cc +++ b/google/cloud/pubsub/internal/default_pull_lease_manager_test.cc @@ -18,6 +18,7 @@ #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" #include "google/cloud/testing_util/async_sequencer.h" +#include "google/cloud/testing_util/fake_clock.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" #include "google/cloud/testing_util/status_matchers.h" #include @@ -30,6 +31,7 @@ namespace { using ::google::cloud::pubsub_testing::MockSubscriberStub; using ::google::cloud::testing_util::AsyncSequencer; +using ::google::cloud::testing_util::FakeSystemClock; using ::google::cloud::testing_util::StatusIs; using ::google::pubsub::v1::ModifyAckDeadlineRequest; using ::testing::_; @@ -43,9 +45,6 @@ using ::testing::Pointee; using ::testing::Property; using ::testing::Return; -using MockClock = - ::testing::MockFunction; - Options MakeTestOptions() { return google::cloud::pubsub_testing::MakeTestOptions( Options{} @@ -72,13 +71,10 @@ Status PermanentError() { TEST(DefaultPullLeaseManager, SimpleLeaseLoop) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); - // Use a mock clock where the time is controlled by a test variable. We do not - // really care how many times or exactly when is the clock called, we just - // want to control the passage of time. auto const start = std::chrono::system_clock::now(); auto current_time = start; - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); @@ -133,33 +129,33 @@ TEST(DefaultPullLeaseManager, SimpleLeaseLoop) { }); }); auto manager = std::make_shared( - cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction()); + cq, mock, options, subscription, "test-ack-id", clock); manager->StartLeaseLoop(); auto pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "AsyncModifyAckDeadline"); pending.first.set_value(true); pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "MakeRelativeTimer"); - EXPECT_THAT(manager->current_lease(), current_time + kLeaseExtension); + EXPECT_THAT(manager->current_lease(), clock->Now() + kLeaseExtension); - current_time = current_time + kLeaseExtension - kLeaseSlack; + clock->AdvanceTime(kLeaseExtension - kLeaseSlack); pending.first.set_value(true); pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "AsyncModifyAckDeadline"); pending.first.set_value(true); pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "MakeRelativeTimer"); - EXPECT_THAT(manager->current_lease(), current_time + kLeaseExtension); + EXPECT_THAT(manager->current_lease(), clock->Now() + kLeaseExtension); // This is close to the end of the lifetime - current_time = current_time + kLeaseExtension - kLeaseSlack; + clock->AdvanceTime(kLeaseExtension - kLeaseSlack); pending.first.set_value(true); pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "AsyncModifyAckDeadline"); pending.first.set_value(true); pending = aseq.PopFrontWithName(); EXPECT_EQ(pending.second, "MakeRelativeTimer"); - EXPECT_THAT(manager->current_lease(), current_time + kLastLeaseExtension); + EXPECT_THAT(manager->current_lease(), clock->Now() + kLastLeaseExtension); // Terminate the loop. With exceptions disabled abandoning a future with a // continuation results in a crash. In non-test programs, the completion queue @@ -170,19 +166,16 @@ TEST(DefaultPullLeaseManager, SimpleLeaseLoop) { TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyReleased) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); - // Use a mock clock where the time is controlled by a test variable. We do not - // really care how many times or exactly when is the clock called, we just - // want to control the passage of time. auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0); auto manager = std::make_shared( cq, mock, MakeTestOptions(), subscription, "test-ack-id", - clock.AsStdFunction()); + std::move(clock)); // This can happen if the subscriber is shutdown, but the application manages // to hold to a `AckHandler` reference. In this case, we expect the loop to // stop (or have no effect). @@ -193,22 +186,19 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyReleased) { TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastMaxExtension) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); - // Use a mock clock where the time is controlled by a test variable. We do not - // really care how many times or exactly when is the clock called, we just - // want to control the passage of time. auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0); auto manager = std::make_shared( - cq, mock, MakeTestOptions(), subscription, "test-ack-id", - clock.AsStdFunction()); + cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock); EXPECT_THAT(manager->lease_deadline(), Eq(current_time + std::chrono::seconds(300))); - current_time = current_time + std::chrono::seconds(301); + // See the MakeTestOptions() for the magic number. + clock->AdvanceTime(std::chrono::seconds(301)); manager->StartLeaseLoop(); // This is a "AsyncModifyAckDeadline() is not called" test. } @@ -216,23 +206,20 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastMaxExtension) { TEST(DefaultPullLeaseManager, StartLeaseLoopTooCloseMaxExtension) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); - // Use a mock clock where the time is controlled by a test variable. We do not - // really care how many times or exactly when is the clock called, we just - // want to control the passage of time. auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0); auto manager = std::make_shared( - cq, mock, MakeTestOptions(), subscription, "test-ack-id", - clock.AsStdFunction()); + cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock); EXPECT_THAT(manager->lease_deadline(), Eq(current_time + std::chrono::seconds(300))); - current_time = - current_time + std::chrono::seconds(299) + std::chrono::milliseconds(500); + // See the MakeTestOptions() for the magic number. + clock->AdvanceTime(std::chrono::seconds(299) + + std::chrono::milliseconds(500)); manager->StartLeaseLoop(); // This is a "AsyncModifyAckDeadline() is not called" test. } @@ -240,21 +227,17 @@ TEST(DefaultPullLeaseManager, StartLeaseLoopTooCloseMaxExtension) { TEST(DefaultPullLeaseManager, StartLeaseLoopAlreadyPastCurrentExtension) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); - // Use a mock clock where the time is controlled by a test variable. We do not - // really care how many times or exactly when is the clock called, we just - // want to control the passage of time. auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0); auto manager = std::make_shared( - cq, mock, MakeTestOptions(), subscription, "test-ack-id", - clock.AsStdFunction()); + cq, mock, MakeTestOptions(), subscription, "test-ack-id", clock); EXPECT_GT(manager->current_lease(), current_time); - current_time = manager->current_lease(); + clock->SetTime(manager->current_lease()); manager->StartLeaseLoop(); // This is a "AsyncModifyAckDeadline() is not called" test. } @@ -263,8 +246,8 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) { auto subscription = pubsub::Subscription("test-project", "test-subscription"); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto mock = std::make_shared(); @@ -276,7 +259,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) { .set(std::chrono::seconds(300)) .set( std::chrono::seconds(10))), - subscription, "test-ack-id", clock.AsStdFunction()); + subscription, "test-ack-id", clock); EXPECT_EQ(manager->lease_deadline(), current_time + std::chrono::seconds(300)); EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9)); @@ -288,7 +271,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) { .set(std::chrono::seconds(300)) .set( std::chrono::seconds(30))), - subscription, "test-ack-id", clock.AsStdFunction()); + subscription, "test-ack-id", clock); EXPECT_EQ(manager->lease_deadline(), current_time + std::chrono::seconds(300)); EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(29)); @@ -301,7 +284,7 @@ TEST(DefaultPullLeaseManager, InitializeDeadlines) { .set(std::chrono::seconds(10)) .set( std::chrono::seconds(30))), - subscription, "test-ack-id", clock.AsStdFunction()); + subscription, "test-ack-id", clock); EXPECT_EQ(manager->lease_deadline(), current_time + std::chrono::seconds(300)); EXPECT_EQ(manager->LeaseRefreshPeriod(), std::chrono::seconds(9)); @@ -325,10 +308,10 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineSimple) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); auto manager = std::make_shared( - cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction()); + cq, mock, options, subscription, "test-ack-id", std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_STATUS_OK(status.get()); @@ -344,13 +327,12 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineExceeded) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - // Set the time for the clock call to after the current time + extension. - EXPECT_CALL(clock, Call).WillRepeatedly([&] { - return current_time + std::chrono::seconds(11); - }); + auto clock = std::make_shared(); + // Set the time for the clock call to after the current time + + // extension. + clock->SetTime(current_time + std::chrono::seconds(11)); auto manager = std::make_shared( - cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction()); + cq, mock, options, subscription, "test-ack-id", std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_THAT(status.get(), @@ -375,10 +357,10 @@ TEST(DefaultPullLeaseManager, ExtendLeasePermanentError) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); auto manager = std::make_shared( - cq, mock, options, subscription, "test-ack-id", clock.AsStdFunction()); + cq, mock, options, subscription, "test-ack-id", std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_THAT(status.get(), @@ -390,9 +372,10 @@ TEST(DefaultPullLeaseManager, Subscription) { auto mock = std::make_shared(); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); - MockClock clock; + auto clock = std::make_shared(); + clock->SetTime(std::chrono::system_clock::now()); auto manager = std::make_shared( - cq, mock, Options{}, subscription, "test-ack-id", clock.AsStdFunction()); + cq, mock, Options{}, subscription, "test-ack-id", std::move(clock)); EXPECT_EQ(manager->subscription(), subscription); } @@ -401,11 +384,12 @@ TEST(DefaultPullLeaseManager, AckId) { auto mock = std::make_shared(); AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); - MockClock clock; + auto clock = std::make_shared(); + clock->SetTime(std::chrono::system_clock::now()); auto manager = std::make_shared( cq, mock, Options{}, pubsub::Subscription("test-project", "test-subscription"), "test-ack-id", - clock.AsStdFunction()); + std::move(clock)); EXPECT_EQ(manager->ack_id(), "test-ack-id"); } diff --git a/google/cloud/pubsub/internal/pull_lease_manager_factory.cc b/google/cloud/pubsub/internal/pull_lease_manager_factory.cc index 678d0a40a3be8..a409c7336493b 100644 --- a/google/cloud/pubsub/internal/pull_lease_manager_factory.cc +++ b/google/cloud/pubsub/internal/pull_lease_manager_factory.cc @@ -27,12 +27,11 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN std::shared_ptr MakePullLeaseManager( CompletionQueue cq, std::weak_ptr stub, pubsub::Subscription subscription, std::string ack_id, - Options const& options, - std::function const& clock) { + Options const& options, std::shared_ptr clock) { std::shared_ptr manager = std::make_shared( std::move(cq), std::move(stub), options, std::move(subscription), - std::move(ack_id), clock); + std::move(ack_id), std::move(clock)); if (internal::TracingEnabled(options)) { manager = MakeTracingPullLeaseManager(std::move(manager)); } diff --git a/google/cloud/pubsub/internal/pull_lease_manager_factory.h b/google/cloud/pubsub/internal/pull_lease_manager_factory.h index f1b0e43bd9dfc..cfa8857885a4e 100644 --- a/google/cloud/pubsub/internal/pull_lease_manager_factory.h +++ b/google/cloud/pubsub/internal/pull_lease_manager_factory.h @@ -19,6 +19,7 @@ #include "google/cloud/pubsub/internal/subscriber_stub.h" #include "google/cloud/pubsub/subscription.h" #include "google/cloud/completion_queue.h" +#include "google/cloud/internal/clock.h" #include "google/cloud/options.h" #include "google/cloud/version.h" #include @@ -30,12 +31,13 @@ namespace cloud { namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +using Clock = ::google::cloud::internal::SystemClock; + std::shared_ptr MakePullLeaseManager( CompletionQueue cq, std::weak_ptr stub, pubsub::Subscription subscription, std::string ack_id, Options const& options, - std::function const& clock = - std::chrono::system_clock::now); + std::shared_ptr clock = std::make_shared()); GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal diff --git a/google/cloud/pubsub/internal/pull_lease_manager_factory_test.cc b/google/cloud/pubsub/internal/pull_lease_manager_factory_test.cc index 0dcab8152cf72..97f5ae96b2a1c 100644 --- a/google/cloud/pubsub/internal/pull_lease_manager_factory_test.cc +++ b/google/cloud/pubsub/internal/pull_lease_manager_factory_test.cc @@ -17,6 +17,7 @@ #include "google/cloud/pubsub/testing/mock_subscriber_stub.h" #include "google/cloud/pubsub/testing/test_retry_policies.h" #include "google/cloud/testing_util/async_sequencer.h" +#include "google/cloud/testing_util/fake_clock.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" #include "google/cloud/testing_util/opentelemetry_matchers.h" #include "google/cloud/testing_util/status_matchers.h" @@ -28,11 +29,9 @@ namespace pubsub_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN namespace { -using MockClock = - ::testing::MockFunction; - using ::google::cloud::pubsub_testing::MockSubscriberStub; using ::google::cloud::testing_util::AsyncSequencer; +using ::google::cloud::testing_util::FakeSystemClock; using ::google::pubsub::v1::ModifyAckDeadlineRequest; using ::testing::_; using ::testing::AllOf; @@ -80,11 +79,11 @@ TEST(DefaultPullLeaseManager, ExtendLeaseDeadlineSimple) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); - auto manager = MakePullLeaseManager(std::move(cq), std::move(mock), - subscription, "test-ack-id", - MakeTestOptions(), clock.AsStdFunction()); + auto clock = std::make_shared(); + clock->SetTime(current_time); + auto manager = + MakePullLeaseManager(std::move(cq), std::move(mock), subscription, + "test-ack-id", MakeTestOptions(), std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_STATUS_OK(status.get()); @@ -121,11 +120,11 @@ TEST(DefaultPullLeaseManager, TracingEnabled) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); auto manager = MakePullLeaseManager( std::move(cq), std::move(mock), subscription, "test-ack-id", - EnableTracing(MakeTestOptions()), clock.AsStdFunction()); + EnableTracing(MakeTestOptions()), std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_STATUS_OK(status.get()); @@ -156,11 +155,11 @@ TEST(DefaultPullLeaseManager, TracingDisabled) { AsyncSequencer aseq; auto cq = MakeMockCompletionQueue(aseq); auto current_time = std::chrono::system_clock::now(); - MockClock clock; - EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; }); + auto clock = std::make_shared(); + clock->SetTime(current_time); auto manager = MakePullLeaseManager( std::move(cq), std::move(mock), subscription, "test-ack-id", - DisableTracing(MakeTestOptions()), clock.AsStdFunction()); + DisableTracing(MakeTestOptions()), std::move(clock)); auto status = manager->ExtendLease(mock, current_time, kLeaseExtension); EXPECT_STATUS_OK(status.get());