Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(pubsub): use the clock class instead of a lambda #13389

Merged
merged 3 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions google/cloud/pubsub/internal/default_pull_ack_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

DefaultPullAckHandler::DefaultPullAckHandler(CompletionQueue cq,
std::weak_ptr<SubscriberStub> w,
Options const& options,
pubsub::Subscription subscription,
std::string ack_id,
std::int32_t delivery_attempt)
DefaultPullAckHandler::DefaultPullAckHandler(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt, Clock clock)
: cq_(std::move(cq)),
stub_(std::move(w)),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {}
lease_manager_(MakePullLeaseManager(cq_, stub_, subscription_, ack_id_,
options, std::move(clock))) {
lease_manager_->StartLeaseLoop();
}

DefaultPullAckHandler::~DefaultPullAckHandler() = default;

Expand Down
5 changes: 4 additions & 1 deletion google/cloud/pubsub/internal/default_pull_ack_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ class PullLeaseManager;
*/
class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
public:
using Clock = std::function<std::chrono::system_clock::time_point()>;

DefaultPullAckHandler(CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt);
std::int32_t delivery_attempt,
Clock clock = std::chrono::system_clock::now);
~DefaultPullAckHandler() override;

future<Status> ack() override;
Expand Down
169 changes: 163 additions & 6 deletions google/cloud/pubsub/internal/default_pull_ack_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ using ::testing::_;
using ::testing::AllOf;
using ::testing::ByMove;
using ::testing::ElementsAre;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::Le;
using ::testing::Pointee;
using ::testing::Property;
using ::testing::Return;

Expand Down Expand Up @@ -83,8 +86,20 @@ Status PermanentError() {

TEST(PullAckHandlerTest, AckSimple) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
// Expect the ack call.
auto request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
Expand All @@ -96,7 +111,8 @@ TEST(PullAckHandlerTest, AckSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -108,10 +124,105 @@ TEST(PullAckHandlerTest, AckSimple) {
EXPECT_STATUS_OK(status.get());
}

TEST(PullAckHandlerTest, AckWithLeaseManagement) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto const start = std::chrono::system_clock::now();
auto current_time = start;
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();

// these values explain the magic numbers in the expectations
auto constexpr kLeaseExtension = std::chrono::seconds(10);
auto constexpr kLeaseOdd = std::chrono::seconds(3);
auto constexpr kLeaseDeadline = 2 * kLeaseExtension + kLeaseOdd;
auto constexpr kLeaseSlack = std::chrono::seconds(1);
auto constexpr kLastLeaseExtension = 2 * kLeaseSlack + kLeaseOdd;
auto options = google::cloud::pubsub_testing::MakeTestOptions(
Options{}
.set<pubsub::MaxDeadlineTimeOption>(kLeaseDeadline)
.set<pubsub::MaxDeadlineExtensionOption>(kLeaseExtension));

// Set the expectation for the lease management calls.
auto context_matcher = Pointee(Property(&grpc::ClientContext::deadline,
Le(current_time + kLeaseExtension)));
auto request_matcher = AllOf(
Property(&ModifyAckDeadlineRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&ModifyAckDeadlineRequest::ack_deadline_seconds,
kLeaseExtension.count()),
Property(&ModifyAckDeadlineRequest::subscription,
subscription.FullName()));
::testing::InSequence sequence;
EXPECT_CALL(*mock,
AsyncModifyAckDeadline(_, context_matcher, request_matcher))
.WillOnce([&](auto, auto, auto) {
return aseq.PushBack("AsyncModifyAckDeadline").then([](auto f) {
return f.get() ? Status{} : PermanentError();
});
});
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _))
.WillOnce([&](auto, auto, auto) {
return aseq.PushBack("AsyncModifyAckDeadline").then([](auto f) {
return f.get() ? Status{} : PermanentError();
});
});

// Set the expectation for the ack call.
auto ack_request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, ack_request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));

// Create the ack handler.
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
// Extend the lease.
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
// Ack the message.
auto status = handler->ack();
EXPECT_STATUS_OK(status.get());

// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
// does this automatically as part of its shutdown.
pending.first.set_value(false);
}

TEST(PullAckHandlerTest, AckPermanentError) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
// Expect the ack call.
auto request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
Expand All @@ -120,7 +231,8 @@ TEST(PullAckHandlerTest, AckPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
EXPECT_THAT(status.get(),
Expand All @@ -129,6 +241,16 @@ TEST(PullAckHandlerTest, AckPermanentError) {

TEST(PullAckHandlerTest, NackSimple) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
auto request_matcher = AllOf(
Expand All @@ -144,7 +266,8 @@ TEST(PullAckHandlerTest, NackSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -158,6 +281,16 @@ TEST(PullAckHandlerTest, NackSimple) {

TEST(PullAckHandlerTest, NackPermanentError) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
auto request_matcher = AllOf(
Expand All @@ -170,7 +303,8 @@ TEST(PullAckHandlerTest, NackPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
EXPECT_THAT(status.get(),
Expand All @@ -179,23 +313,46 @@ TEST(PullAckHandlerTest, NackPermanentError) {

TEST(AckHandlerTest, Subscription) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());

EXPECT_EQ(handler->subscription(), subscription);
}

TEST(AckHandlerTest, AckId) {
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(),
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
42);
42, clock.AsStdFunction());

EXPECT_EQ(handler->ack_id(), "test-ack-id");
}
Expand Down
Loading