Skip to content

Commit

Permalink
feat(pubsub): add lease management for unary pull
Browse files Browse the repository at this point in the history
  • Loading branch information
alevenberg committed Jan 8, 2024
1 parent 6a938ab commit bda9daa
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 15 deletions.
16 changes: 8 additions & 8 deletions google/cloud/pubsub/internal/default_pull_ack_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ namespace cloud {
namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN

DefaultPullAckHandler::DefaultPullAckHandler(CompletionQueue cq,
std::weak_ptr<SubscriberStub> w,
Options const& options,
pubsub::Subscription subscription,
std::string ack_id,
std::int32_t delivery_attempt)
DefaultPullAckHandler::DefaultPullAckHandler(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w, Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt, Clock clock)
: cq_(std::move(cq)),
stub_(std::move(w)),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {}
lease_manager_(MakePullLeaseManager(cq_, stub_, subscription_, ack_id_,
options, std::move(clock))) {
lease_manager_->StartLeaseLoop();
}

DefaultPullAckHandler::~DefaultPullAckHandler() = default;

Expand Down
5 changes: 4 additions & 1 deletion google/cloud/pubsub/internal/default_pull_ack_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ class PullLeaseManager;
*/
class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
public:
using Clock = std::function<std::chrono::system_clock::time_point()>;

DefaultPullAckHandler(CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt);
std::int32_t delivery_attempt,
Clock clock = std::chrono::system_clock::now);
~DefaultPullAckHandler() override;

future<Status> ack() override;
Expand Down
169 changes: 163 additions & 6 deletions google/cloud/pubsub/internal/default_pull_ack_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ using ::testing::_;
using ::testing::AllOf;
using ::testing::ByMove;
using ::testing::ElementsAre;
using ::testing::Eq;
using ::testing::HasSubstr;
using ::testing::Le;
using ::testing::Pointee;
using ::testing::Property;
using ::testing::Return;

Expand Down Expand Up @@ -83,8 +86,20 @@ Status PermanentError() {

TEST(PullAckHandlerTest, AckSimple) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
// Expect the ack call.
auto request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
Expand All @@ -96,7 +111,8 @@ TEST(PullAckHandlerTest, AckSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -108,10 +124,105 @@ TEST(PullAckHandlerTest, AckSimple) {
EXPECT_STATUS_OK(status.get());
}

TEST(PullAckHandlerTest, AckWithLeaseManagement) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Use a mock clock where the time is controlled by a test variable. We do not
// really care how many times or exactly when is the clock called, we just
// want to control the passage of time.
auto const start = std::chrono::system_clock::now();
auto current_time = start;
MockClock clock;
EXPECT_CALL(clock, Call).WillRepeatedly([&] { return current_time; });
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto mock = std::make_shared<MockSubscriberStub>();

// these values explain the magic numbers in the expectations
auto constexpr kLeaseExtension = std::chrono::seconds(10);
auto constexpr kLeaseOdd = std::chrono::seconds(3);
auto constexpr kLeaseDeadline = 2 * kLeaseExtension + kLeaseOdd;
auto constexpr kLeaseSlack = std::chrono::seconds(1);
auto constexpr kLastLeaseExtension = 2 * kLeaseSlack + kLeaseOdd;
auto options = google::cloud::pubsub_testing::MakeTestOptions(
Options{}
.set<pubsub::MaxDeadlineTimeOption>(kLeaseDeadline)
.set<pubsub::MaxDeadlineExtensionOption>(kLeaseExtension));

// Set the expectation for the lease management calls.
auto context_matcher = Pointee(Property(&grpc::ClientContext::deadline,
Le(current_time + kLeaseExtension)));
auto request_matcher = AllOf(
Property(&ModifyAckDeadlineRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&ModifyAckDeadlineRequest::ack_deadline_seconds,
kLeaseExtension.count()),
Property(&ModifyAckDeadlineRequest::subscription,
subscription.FullName()));
::testing::InSequence sequence;
EXPECT_CALL(*mock,
AsyncModifyAckDeadline(_, context_matcher, request_matcher))
.WillOnce([&](auto, auto, auto) {
return aseq.PushBack("AsyncModifyAckDeadline").then([](auto f) {
return f.get() ? Status{} : PermanentError();
});
});
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _))
.WillOnce([&](auto, auto, auto) {
return aseq.PushBack("AsyncModifyAckDeadline").then([](auto f) {
return f.get() ? Status{} : PermanentError();
});
});

// Set the expectation for the ack call.
auto ack_request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, ack_request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));

// Create the ack handler.
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
// Extend the lease.
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "AsyncModifyAckDeadline");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
// Ack the message.
auto status = handler->ack();
EXPECT_STATUS_OK(status.get());

// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
// does this automatically as part of its shutdown.
pending.first.set_value(false);
}

TEST(PullAckHandlerTest, AckPermanentError) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
// Expect the ack call.
auto request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
Expand All @@ -120,7 +231,8 @@ TEST(PullAckHandlerTest, AckPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
EXPECT_THAT(status.get(),
Expand All @@ -129,6 +241,16 @@ TEST(PullAckHandlerTest, AckPermanentError) {

TEST(PullAckHandlerTest, NackSimple) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
auto request_matcher = AllOf(
Expand All @@ -144,7 +266,8 @@ TEST(PullAckHandlerTest, NackSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -158,6 +281,16 @@ TEST(PullAckHandlerTest, NackSimple) {

TEST(PullAckHandlerTest, NackPermanentError) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });

auto mock = std::make_shared<MockSubscriberStub>();
auto request_matcher = AllOf(
Expand All @@ -170,7 +303,8 @@ TEST(PullAckHandlerTest, NackPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
EXPECT_THAT(status.get(),
Expand All @@ -179,23 +313,46 @@ TEST(PullAckHandlerTest, NackPermanentError) {

TEST(AckHandlerTest, Subscription) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42,
clock.AsStdFunction());

EXPECT_EQ(handler->subscription(), subscription);
}

TEST(AckHandlerTest, AckId) {
// Start lease loop past max extension so that we don't get any modify ack
// deadline calls.
auto const current_time = std::chrono::system_clock::now();
MockClock clock;
// On the first call, when the class is created return the current time. Any
// calls after will show the current time is past the max deadline. See
// MakeTestOptions for the magic numbers.
EXPECT_CALL(clock, Call)
.WillOnce([&] { return current_time; })
.WillRepeatedly([&] { return current_time + std::chrono::seconds(301); });
auto mock = std::make_shared<MockSubscriberStub>();
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).Times(0);
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(),
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
42);
42, clock.AsStdFunction());

EXPECT_EQ(handler->ack_id(), "test-ack-id");
}
Expand Down

0 comments on commit bda9daa

Please sign in to comment.