Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add lease management for unary pull #13428

Merged
merged 12 commits into from
Jan 25, 2024
17 changes: 16 additions & 1 deletion google/cloud/pubsub/internal/default_pull_ack_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,22 @@ DefaultPullAckHandler::DefaultPullAckHandler(CompletionQueue cq,
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {}
MakePullLeaseManager(cq_, stub_, subscription_, ack_id_, options)) {
initialize();
}

DefaultPullAckHandler::DefaultPullAckHandler(
CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt, std::shared_ptr<PullLeaseManager> manager)
: cq_(std::move(cq)),
stub_(std::move(w)),
subscription_(std::move(subscription)),
ack_id_(std::move(ack_id)),
delivery_attempt_(delivery_attempt),
lease_manager_(std::move(manager)) {
initialize();
}

DefaultPullAckHandler::~DefaultPullAckHandler() = default;

Expand Down
7 changes: 7 additions & 0 deletions google/cloud/pubsub/internal/default_pull_ack_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
Options const& options,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt);
// For testing.
DefaultPullAckHandler(CompletionQueue cq, std::weak_ptr<SubscriberStub> w,
pubsub::Subscription subscription, std::string ack_id,
std::int32_t delivery_attempt,
std::shared_ptr<PullLeaseManager> manager);
~DefaultPullAckHandler() override;

future<Status> ack() override;
Expand All @@ -55,6 +60,8 @@ class DefaultPullAckHandler : public pubsub::PullAckHandler::Impl {
pubsub::Subscription subscription() const override;

private:
void initialize() { lease_manager_->StartLeaseLoop(); }

CompletionQueue cq_;
std::weak_ptr<SubscriberStub> stub_;
pubsub::Subscription subscription_;
Expand Down
42 changes: 26 additions & 16 deletions google/cloud/pubsub/internal/default_pull_ack_handler_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

#include "google/cloud/pubsub/internal/default_pull_ack_handler.h"
#include "google/cloud/pubsub/internal/default_pull_lease_manager.h"
#include "google/cloud/pubsub/options.h"
#include "google/cloud/pubsub/retry_policy.h"
#include "google/cloud/pubsub/testing/mock_pull_lease_manager.h"
#include "google/cloud/pubsub/testing/mock_subscriber_stub.h"
#include "google/cloud/pubsub/testing/test_retry_policies.h"
#include "google/cloud/testing_util/async_sequencer.h"
Expand All @@ -29,6 +29,7 @@ namespace pubsub_internal {
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::google::cloud::pubsub_testing::MockPullLeaseManager;
using ::google::cloud::pubsub_testing::MockSubscriberStub;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::StatusIs;
Expand All @@ -45,13 +46,6 @@ using ::testing::Return;
using MockClock =
::testing::MockFunction<std::chrono::system_clock::time_point()>;

Options MakeTestOptions() {
return google::cloud::pubsub_testing::MakeTestOptions(
Options{}
.set<pubsub::MaxDeadlineTimeOption>(std::chrono::seconds(300))
.set<pubsub::MaxDeadlineExtensionOption>(std::chrono::seconds(10)));
}

CompletionQueue MakeMockCompletionQueue(AsyncSequencer<bool>& aseq) {
auto mock = std::make_shared<testing_util::MockCompletionQueueImpl>();
EXPECT_CALL(*mock, MakeRelativeTimer).WillRepeatedly([&](auto) {
Expand Down Expand Up @@ -96,7 +90,8 @@ TEST(PullAckHandlerTest, AckSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -120,7 +115,8 @@ TEST(PullAckHandlerTest, AckPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->ack();
EXPECT_THAT(status.get(),
Expand All @@ -144,7 +140,8 @@ TEST(PullAckHandlerTest, NackSimple) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
auto timer = aseq.PopFrontWithName();
Expand All @@ -170,20 +167,34 @@ TEST(PullAckHandlerTest, NackPermanentError) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());
EXPECT_EQ(handler->delivery_attempt(), 42);
auto status = handler->nack();
EXPECT_THAT(status.get(),
StatusIs(StatusCode::kPermissionDenied, HasSubstr("uh-oh")));
}

TEST(PullAckHandlerTest, StartsLeaseManager) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");

auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto lm = std::make_shared<MockPullLeaseManager>();
EXPECT_CALL(*lm, StartLeaseLoop()).Times(1);
auto handler = std::make_unique<DefaultPullAckHandler>(cq, mock, subscription,
"test-ack-id", 42, lm);
}

TEST(AckHandlerTest, Subscription) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
auto mock = std::make_shared<MockSubscriberStub>();
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(), subscription, "test-ack-id", 42);
cq, mock, subscription, "test-ack-id", 42,
std::make_shared<MockPullLeaseManager>());

EXPECT_EQ(handler->subscription(), subscription);
}
Expand All @@ -193,9 +204,8 @@ TEST(AckHandlerTest, AckId) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler = std::make_unique<DefaultPullAckHandler>(
cq, mock, MakeTestOptions(),
pubsub::Subscription("test-project", "test-subscription"), "test-ack-id",
42);
cq, mock, pubsub::Subscription("test-project", "test-subscription"),
"test-ack-id", 42, std::make_shared<MockPullLeaseManager>());

EXPECT_EQ(handler->ack_id(), "test-ack-id");
}
Expand Down
47 changes: 44 additions & 3 deletions google/cloud/pubsub/internal/pull_ack_handler_factory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,28 @@ TEST(PullAckHandlerTest, AckSimple) {
auto request_matcher = AllOf(
Property(&AcknowledgeRequest::ack_ids, ElementsAre("test-ack-id")),
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, _)).WillRepeatedly([]() {
return make_ready_future(Status{});
});
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).WillRepeatedly([]() {
return make_ready_future(Status{});
});
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler =
MakePullAckHandler(std::move(cq), std::move(mock), subscription,
"test-ack-id", 42, MakeTestOptions());

auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
EXPECT_THAT(std::move(handler).ack().get(), StatusIs(StatusCode::kOk));

// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
// does this automatically as part of its shutdown.
pending.first.set_value(false);
}

#ifdef GOOGLE_CLOUD_CPP_HAVE_OPENTELEMETRY
Expand All @@ -100,14 +113,28 @@ TEST(PullAckHandlerTest, TracingEnabled) {
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));
// Since the lease manager is started in the constructor of the ack handler,
// we need to match the lease manager calls.
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).WillRepeatedly([]() {
return make_ready_future(Status{});
});
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler =
MakePullAckHandler(std::move(cq), std::move(mock), subscription,
"test-ack-id", 42, EnableTracing(MakeTestOptions()));
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");

EXPECT_THAT(std::move(handler).ack().get(), StatusIs(StatusCode::kOk));

// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
// does this automatically as part of its shutdown.
pending.first.set_value(false);
auto spans = span_catcher->GetSpans();
EXPECT_THAT(spans, Contains(AllOf(
SpanHasInstrumentationScope(), SpanKindIsClient(),
Expand All @@ -124,13 +151,27 @@ TEST(PullAckHandlerTest, TracingDisabled) {
Property(&AcknowledgeRequest::subscription, subscription.FullName()));
EXPECT_CALL(*mock, AsyncAcknowledge(_, _, request_matcher))
.WillOnce(Return(ByMove(make_ready_future(Status{}))));
// Since the lease manager is started in the constructor of the ack handler,
// we need to match the lease manager calls.
EXPECT_CALL(*mock, AsyncModifyAckDeadline(_, _, _)).WillRepeatedly([]() {
return make_ready_future(Status{});
});
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);
auto handler =
MakePullAckHandler(std::move(cq), std::move(mock), subscription,
"test-ack-id", 42, DisableTracing(MakeTestOptions()));
auto pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");
pending.first.set_value(true);
pending = aseq.PopFrontWithName();
EXPECT_EQ(pending.second, "MakeRelativeTimer");

EXPECT_THAT(std::move(handler).ack().get(), StatusIs(StatusCode::kOk));
// Terminate the loop. With exceptions disabled abandoning a future with a
// continuation results in a crash. In non-test programs, the completion queue
// does this automatically as part of its shutdown.
pending.first.set_value(false);

EXPECT_THAT(span_catcher->GetSpans(), IsEmpty());
}
Expand Down
Loading