Skip to content

Commit

Permalink
deflake(pubsub): mock completion queue to avoid unwanted timers (#9369)
Browse files Browse the repository at this point in the history
Instead of relying on "long enough" timeouts use a mock completion
queue to control exactly when the timer expire (never in these tests).
  • Loading branch information
coryan authored Jun 28, 2022
1 parent dab59a4 commit 91aae8c
Showing 1 changed file with 48 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
namespace {

using ::google::cloud::internal::AutomaticallyCreatedBackgroundThreads;
using ::google::cloud::internal::RunAsyncBase;
using ::google::cloud::testing_util::AsyncSequencer;
using ::google::cloud::testing_util::IsOk;
using ::google::cloud::testing_util::MockCompletionQueueImpl;
using ::google::cloud::testing_util::StatusIs;
using ::testing::_;
using ::testing::AtLeast;
Expand Down Expand Up @@ -107,8 +109,7 @@ class FakeStream {

std::shared_ptr<StreamingSubscriptionBatchSource> MakeTestBatchSource(
CompletionQueue cq, std::shared_ptr<SessionShutdownManager> shutdown,
std::shared_ptr<SubscriberStub> mock,
std::chrono::milliseconds max_hold_time = std::chrono::milliseconds(10)) {
std::shared_ptr<SubscriberStub> mock) {
auto subscription = pubsub::Subscription("test-project", "test-subscription");
auto opts = DefaultSubscriberOptions(pubsub_testing::MakeTestOptions(
Options{}
Expand All @@ -118,7 +119,7 @@ std::shared_ptr<StreamingSubscriptionBatchSource> MakeTestBatchSource(
return std::make_shared<StreamingSubscriptionBatchSource>(
std::move(cq), std::move(shutdown), std::move(mock),
std::move(subscription).FullName(), "test-client-id", std::move(opts),
AckBatchingConfig(1, max_hold_time));
AckBatchingConfig(1, std::chrono::milliseconds(10)));
}

TEST(StreamingSubscriptionBatchSourceTest, Start) {
Expand Down Expand Up @@ -805,11 +806,28 @@ TEST(StreamingSubscriptionBatchSourceTest, StateOStream) {
EXPECT_EQ("kFinishing", as_string(StreamState::kFinishing));
}

TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceIncludesDeadline) {
AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();
CompletionQueue MakeMockCompletionQueue(AsyncSequencer<bool>& aseq) {
auto mock_cq = std::make_shared<MockCompletionQueueImpl>();
EXPECT_CALL(*mock_cq, MakeRelativeTimer)
.WillRepeatedly([&](std::chrono::nanoseconds) {
return aseq.PushBack("MakeRelativeTimer").then([](auto) {
return make_status_or(std::chrono::system_clock::now());
});
});
EXPECT_CALL(*mock_cq, RunAsync)
.WillRepeatedly([&](std::unique_ptr<RunAsyncBase> f) {
aseq.PushBack("RunAsync").then([function = std::move(f)](auto) mutable {
function->exec();
});
});
return CompletionQueue(std::move(mock_cq));
}

TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceIncludesDeadline) {
AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);

auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

{
::testing::InSequence sequence;
Expand Down Expand Up @@ -877,36 +895,35 @@ TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceIncludesDeadline) {
}

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(
background.cq(), shutdown, mock,
// Make the hold time long enough such that we can ignore the chances of a
// timer triggering in the test.
std::chrono::milliseconds(500));
auto uut = MakeTestBatchSource(cq, shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});
auto timer = aseq.PopFrontWithName();
EXPECT_EQ(timer.second, "MakeRelativeTimer");
aseq.PopFront().set_value(true); // Start()
aseq.PopFront().set_value(true); // Write()
aseq.PopFront().set_value(true); // Read()
auto last_read = aseq.PopFrontWithName();
EXPECT_EQ(last_read.second, "Read");
auto run = aseq.PopFrontWithName();
EXPECT_EQ(run.second, "RunAsync");

uut->ExtendLeases({"fake-006"}, std::chrono::seconds(10));
aseq.PopFront().set_value(true); // Write()

shutdown->MarkAsShutdown("test", {});
uut->Shutdown();
last_read.first.set_value(false);
aseq.PopFront().set_value(true); // Finish()
run.first.set_value(true);
aseq.PopFront().set_value(false); // Read()
aseq.PopFront().set_value(true); // Finish()

EXPECT_THAT(done.get(), IsOk());
}

TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceDeadlineStateChange) {
AutomaticallyCreatedBackgroundThreads background;
auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

AsyncSequencer<bool> aseq;
auto cq = MakeMockCompletionQueue(aseq);

auto mock = std::make_shared<pubsub_testing::MockSubscriberStub>();

EXPECT_CALL(*mock, AsyncStreamingPull)
.WillOnce([&](google::cloud::CompletionQueue&,
Expand Down Expand Up @@ -968,8 +985,8 @@ TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceDeadlineStateChange) {
::testing::InSequence sequence;
EXPECT_CALL(*stream, Start).WillOnce(start_response);
EXPECT_CALL(*stream, Write).WillOnce(write_response);
// Two Read() calls with subscription properties, only the first should
// trigger a `Write()` call that updates the stream deadline.
// Two Read() calls with subscription properties, only the first
// should trigger a `Write()` call that updates the stream deadline.
EXPECT_CALL(*stream, Read).WillOnce(read_response_with_eos);
EXPECT_CALL(*stream, Read).WillOnce(read_response_with_eos);
EXPECT_CALL(*stream, Write).WillOnce(write_response_with_deadline);
Expand All @@ -983,21 +1000,23 @@ TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceDeadlineStateChange) {
});

auto shutdown = std::make_shared<SessionShutdownManager>();
auto uut = MakeTestBatchSource(
background.cq(), shutdown, mock,
// Make the hold time long enough such that we can ignore the chances of a
// timer triggering in the test.
std::chrono::milliseconds(500));
auto uut = MakeTestBatchSource(cq, shutdown, mock);

auto done = shutdown->Start({});
uut->Start([](StatusOr<google::pubsub::v1::StreamingPullResponse> const&) {});
auto timer = aseq.PopFrontWithName();
EXPECT_EQ(timer.second, "MakeRelativeTimer");
aseq.PopFront().set_value(true); // Start()
aseq.PopFront().set_value(true); // Write()

auto read = aseq.PopFrontWithName();
EXPECT_EQ(read.second, "Read");
read.first.set_value(true);
// The successful Read generates an immediate Read() call.

// The successful Read generates a RunAsync() which generates a Read().
auto run = aseq.PopFrontWithName();
EXPECT_EQ(run.second, "RunAsync");
run.first.set_value(true);
read = aseq.PopFrontWithName();
EXPECT_EQ(read.second, "Read");
// Because Read() changed the subscription properties, this will trigger
Expand All @@ -1009,6 +1028,9 @@ TEST(StreamingSubscriptionBatchSourceTest, ExactlyOnceDeadlineStateChange) {

// A second read, but does not change the subscription properties.
read.first.set_value(true);
run = aseq.PopFrontWithName();
EXPECT_EQ(run.second, "RunAsync");
run.first.set_value(true);
read = aseq.PopFrontWithName();
EXPECT_EQ(read.second, "Read");

Expand Down

0 comments on commit 91aae8c

Please sign in to comment.