diff --git a/google/cloud/pubsub/internal/batching_publisher_connection.cc b/google/cloud/pubsub/internal/batching_publisher_connection.cc index 2d6ee84d23ea3..63b9ee0cbce04 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection.cc @@ -23,7 +23,6 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN struct Batch { std::vector>> waiters; std::weak_ptr weak; - std::shared_ptr batch; void operator()(future> f) { auto response = f.get(); @@ -42,7 +41,6 @@ struct Batch { for (auto& w : waiters) { w.set_value(std::move(*response->mutable_message_ids(idx++))); } - batch->FlushCallback(); } void SatisfyAllWaiters(Status const& status) { @@ -200,9 +198,10 @@ void BatchingPublisherConnection::FlushImpl(std::unique_lock lk) { batch.weak = shared_from_this(); request.set_topic(topic_full_name_); - batch_->Flush(); - batch.batch = batch_; - sink_->AsyncPublish(std::move(request)).then(std::move(batch)); + auto handler = batch_->Flush(); + sink_->AsyncPublish(std::move(request)) + .then(std::move(batch)) + .then(std::move(handler)); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc index 9df00eb5fbb0f..eee5ec587da83 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc @@ -66,6 +66,7 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); AsyncSequencer async; @@ -102,6 +103,7 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); AsyncSequencer async; @@ -171,6 +173,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -225,6 +228,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -275,6 +279,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -345,6 +350,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -429,6 +435,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { TEST(BatchingPublisherConnectionTest, BatchTorture) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kMaxMessages = 20; @@ -495,6 +502,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -551,6 +559,7 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -630,6 +639,7 @@ TEST(BatchingPublisherConnectionTest, HandleError) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -660,6 +670,7 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -688,6 +699,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -738,6 +750,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const ordering_key = std::string{"test-key"}; diff --git a/google/cloud/pubsub/internal/message_batch.h b/google/cloud/pubsub/internal/message_batch.h index 998e7e87334b6..3ce7531f337e0 100644 --- a/google/cloud/pubsub/internal/message_batch.h +++ b/google/cloud/pubsub/internal/message_batch.h @@ -16,7 +16,9 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_MESSAGE_BATCH_H #include "google/cloud/pubsub/message.h" +#include "google/cloud/future.h" #include "google/cloud/options.h" +#include namespace google { namespace cloud { @@ -40,12 +42,9 @@ class MessageBatch { virtual void SaveMessage(pubsub::Message m) = 0; // Captures information about a batch of messages before it's flushed. Invoked - // in `BatchingPublisherConnection::FlushImpl(...)`. - virtual void Flush() = 0; - - // Captures information about the response after we receive it from - // the server. Invoked in the `BatchSink::AsyncPublish(...)` callback. - virtual void FlushCallback() = 0; + // in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to invoke + // in another callback. + virtual std::function)> Flush() = 0; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/noop_message_batch.h b/google/cloud/pubsub/internal/noop_message_batch.h index 2411fb4fea7ef..424544ef54431 100644 --- a/google/cloud/pubsub/internal/noop_message_batch.h +++ b/google/cloud/pubsub/internal/noop_message_batch.h @@ -16,7 +16,8 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_NOOP_MESSAGE_BATCH_H #include "google/cloud/pubsub/internal/message_batch.h" - +#include "google/cloud/future.h" +#include namespace google { namespace cloud { namespace pubsub_internal { @@ -33,9 +34,9 @@ class NoOpMessageBatch : public MessageBatch { void SaveMessage(pubsub::Message) override{}; - void Flush() override{}; - - void FlushCallback() override{}; + std::function)> Flush() override { + return [](auto) {}; + } }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 5ba2a3eaf7e39..e27ebba83c84e 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -35,7 +35,7 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { opentelemetry::context::RuntimeContext::GetCurrent()); active_span->AddEvent("gl-cpp.added_to_batch"); { - std::lock_guard lk(message_mu_); + std::lock_guard lk(mu_); message_spans_.push_back(std::move(active_span)); } child_->SaveMessage(std::move(m)); @@ -43,36 +43,36 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { namespace { -opentelemetry::nostd::shared_ptr MakeParentSpan( - std::vector> - message_spans) { - using opentelemetry::trace::SpanContext; - using AttributesList = - std::vector>; - auto constexpr kMaxOtelLinks = 128; - std::vector> links; - auto batch_size = message_spans.size(); - links.reserve(batch_size); - // If the batch size is less than the max size, add the links to a single - // span. - if (batch_size < kMaxOtelLinks) { - std::transform( - message_spans.begin(), message_spans.end(), std::back_inserter(links), - [i = static_cast(0)](auto const& span) mutable { - return std::make_pair( - span->GetContext(), - AttributesList{{"messaging.pubsub.message.link", i++}}); - }); - } - auto batch_sink_parent_span = +using Spans = + std::vector>; +using Attributes = + std::vector>; +using Links = + std::vector>; + +/// Creates a link for each span in the range @p begin to @p end. +auto MakeLinks(Spans::const_iterator begin, Spans::const_iterator end) { + Links links; + std::transform(begin, end, std::back_inserter(links), + [i = static_cast(0)](auto const& span) mutable { + return std::make_pair( + span->GetContext(), + Attributes{{"messaging.pubsub.message.link", i++}}); + }); + return links; +} + +auto MakeParent(Links const& links, Spans const& message_spans) { + auto batch_sink_parent = internal::MakeSpan("BatchSink::AsyncPublish", /*attributes=*/ {{"messaging.pubsub.num_messages_in_batch", - static_cast(batch_size)}}, - /*links*/ links); + static_cast(message_spans.size())}}, + /*links*/ std::move(links)); - auto context = batch_sink_parent_span->GetContext(); + // Add metadata to the message spans about the batch sink span. + auto context = batch_sink_parent->GetContext(); auto trace_id = internal::ToString(context.trace_id()); auto span_id = internal::ToString(context.span_id()); for (auto const& message_span : message_spans) { @@ -80,41 +80,71 @@ opentelemetry::nostd::shared_ptr MakeParentSpan( message_span->SetAttribute("pubsub.batch_sink.trace_id", trace_id); message_span->SetAttribute("pubsub.batch_sink.span_id", span_id); } + return batch_sink_parent; +} + +auto MakeChild( + opentelemetry::nostd::shared_ptr const& parent, + int count, Links const& links) { + opentelemetry::trace::StartSpanOptions options; + options.parent = parent->GetContext(); + return internal::MakeSpan( + "BatchSink::AsyncPublish - Batch #" + std::to_string(count), + /*attributes=*/{{}}, + /*links=*/links, options); +} + +Spans MakeBatchSinkSpans(Spans message_spans) { + int constexpr kMaxOtelLinks = 128; + Spans batch_sink_spans; + // If the batch size is less than the max size, add the links to a single + // span. If the batch size is greater than the max size, create a parent + // span with no links and each child spans will contain links. + if (message_spans.size() <= kMaxOtelLinks) { + batch_sink_spans.push_back(MakeParent( + MakeLinks(message_spans.begin(), message_spans.end()), message_spans)); + return batch_sink_spans; + } + batch_sink_spans.push_back(MakeParent({{}}, message_spans)); + auto batch_sink_parent = batch_sink_spans.front(); + + auto cut = [&message_spans](auto i) { + auto const batch_size = static_cast(kMaxOtelLinks); + return std::next( + i, std::min(batch_size, std::distance(i, message_spans.end()))); + }; + int count = 0; + for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { + // Generates child spans with links between [i, min(i + batch_size, end)) + // such that each child span will have exactly batch_size elements or less. + batch_sink_spans.push_back( + MakeChild(batch_sink_parent, count++, MakeLinks(i, cut(i)))); + } - return batch_sink_parent_span; + return batch_sink_spans; } } // namespace -void TracingMessageBatch::Flush() { +std::function)> TracingMessageBatch::Flush() { decltype(message_spans_) message_spans; { - std::lock_guard lk(message_mu_); + std::lock_guard lk(mu_); message_spans.swap(message_spans_); } - // TODO(#12528): Handle batches larger than 128. - auto batch_sink_parent_span = MakeParentSpan(std::move(message_spans)); + auto batch_sink_spans = MakeBatchSinkSpans(std::move(message_spans)); - // Set the batch sink as the active span. + // Set the batch sink parent span as the active span. This will always be the + // first span in the vector. auto async_scope = internal::GetTracer(internal::CurrentOptions()) - ->WithActiveSpan(batch_sink_parent_span); - { - std::lock_guard lk(batch_sink_mu_); - batch_sink_spans_.push_back(std::move(batch_sink_parent_span)); - } - - child_->Flush(); -} + ->WithActiveSpan(batch_sink_spans.front()); -void TracingMessageBatch::FlushCallback() { - decltype(batch_sink_spans_) spans; - { - std::lock_guard lk(batch_sink_mu_); - spans.swap(batch_sink_spans_); - } - for (auto& span : spans) internal::EndSpan(*span); - child_->FlushCallback(); + return [next = child_->Flush(), + spans = std::move(batch_sink_spans)](auto f) mutable { + for (auto& span : spans) internal::EndSpan(*span); + next(std::move(f)); + }; } std::vector> @@ -122,11 +152,6 @@ TracingMessageBatch::GetMessageSpans() const { return message_spans_; } -std::vector> -TracingMessageBatch::GetBatchSinkSpans() const { - return batch_sink_spans_; -} - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/tracing_message_batch.h b/google/cloud/pubsub/internal/tracing_message_batch.h index 67e91beb0b5d4..9a5fdeb0f2d55 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.h +++ b/google/cloud/pubsub/internal/tracing_message_batch.h @@ -20,7 +20,9 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/internal/publisher_stub.h" #include "google/cloud/pubsub/version.h" +#include "google/cloud/future.h" #include "google/cloud/internal/opentelemetry.h" +#include #include namespace google { @@ -44,40 +46,22 @@ class TracingMessageBatch : public MessageBatch { std::vector> message_spans) : child_(std::move(child)), message_spans_(std::move(message_spans)) {} - // For testing only. - TracingMessageBatch( - std::unique_ptr child, - std::vector> - message_spans, - std::vector> - batch_sink_spans) - : child_(std::move(child)), - message_spans_(std::move(message_spans)), - batch_sink_spans_(std::move(batch_sink_spans)) {} + ~TracingMessageBatch() override = default; void SaveMessage(pubsub::Message m) override; - void Flush() override; - - void FlushCallback() override; + std::function)> Flush() override; // For testing only. std::vector> GetMessageSpans() const; - // For testing only. - std::vector> - GetBatchSinkSpans() const; - private: std::unique_ptr child_; - std::mutex message_mu_; - std::mutex batch_sink_mu_; - std::vector> - message_spans_; // ABSL_GUARDED_BY(message_mu_) + std::mutex mu_; std::vector> - batch_sink_spans_; // ABSL_GUARDED_BY(batch_sink_mu_) + message_spans_; // ABSL_GUARDED_BY(mu_) }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 8b90ad73ea544..3aa6fe422b06a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -37,15 +37,14 @@ using ::google::cloud::testing_util::SpanHasEvents; using ::google::cloud::testing_util::SpanHasInstrumentationScope; using ::google::cloud::testing_util::SpanKindIsClient; using ::google::cloud::testing_util::SpanLinkAttributesAre; +using ::google::cloud::testing_util::SpanLinksSizeIs; using ::google::cloud::testing_util::SpanNamed; -using ::google::cloud::testing_util::SpanWithStatus; using ::google::cloud::testing_util::ThereIsAnActiveSpan; using ::testing::_; using ::testing::AllOf; using ::testing::Contains; using ::testing::ElementsAre; using ::testing::IsEmpty; -using ::testing::SizeIs; namespace { @@ -56,6 +55,16 @@ void EndSpans(std::vector> + spans(n); + std::generate(spans.begin(), spans.end(), [i = 0]() mutable { + return MakeSpan("test span " + std::to_string(i++)); + }); + return spans; +} + TEST(TracingMessageBatch, SaveMessage) { auto span = MakeSpan("test span"); opentelemetry::trace::Scope scope(span); @@ -118,18 +127,16 @@ TEST(TracingMessageBatch, Flush) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); - - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); EXPECT_THAT(spans, @@ -150,18 +157,16 @@ TEST(TracingMessageBatch, FlushSmallBatch) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return [](auto) {}; }); auto initial_spans = {message_span1, message_span2}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); - - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); EXPECT_THAT( @@ -179,36 +184,61 @@ TEST(TracingMessageBatch, FlushSmallBatch) { "messaging.pubsub.message.link", 1))))))); } -// TODO(#12528): Update test when impl is added -TEST(TracingMessageBatch, FlushLargeBatch) { - std::vector> - initial_spans; - initial_spans.reserve(129); - for (int i = 0; i < 129; ++i) { - initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); - } +TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { + auto constexpr kBatchSize = 128; + auto initial_spans = CreateSpans(kBatchSize); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return [](auto) {}; }); auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); + EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); + + auto spans = span_catcher->GetSpans(); + EXPECT_THAT( + spans, Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("BatchSink::AsyncPublish"), + SpanHasAttributes(OTelAttribute( + "messaging.pubsub.num_messages_in_batch", 128)), + SpanLinksSizeIs(128)))); +} + +TEST(TracingMessageBatch, FlushLargeBatch) { + auto constexpr kBatchSize = 129; + auto initial_spans = CreateSpans(kBatchSize); + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Flush).WillOnce([] { + EXPECT_TRUE(ThereIsAnActiveSpan()); + return [](auto) {}; + }); + auto message_batch = + std::make_unique(std::move(mock), initial_spans); + + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, Contains(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("BatchSink::AsyncPublish"), - SpanHasAttributes(OTelAttribute( - "messaging.pubsub.num_messages_in_batch", 129))))); + EXPECT_THAT(spans, + Contains(AllOf( + SpanNamed("BatchSink::AsyncPublish"), + SpanHasAttributes(OTelAttribute( + "messaging.pubsub.num_messages_in_batch", kBatchSize))))); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("BatchSink::AsyncPublish - Batch #0"), + SpanLinksSizeIs(128)))); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("BatchSink::AsyncPublish - Batch #1"), + SpanLinksSizeIs(1)))); } TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { @@ -216,16 +246,16 @@ TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); - EndSpans(message_batch->GetBatchSinkSpans()); EXPECT_THAT( span_catcher->GetSpans(), @@ -241,12 +271,13 @@ TEST(TracingMessageBatch, FlushSpanMetadataAddsEvent) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); @@ -263,83 +294,24 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) { auto span1 = MakeSpan("test message span1"); auto span2 = MakeSpan("test message span2"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {span1, span2}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); - EXPECT_THAT( - span_catcher->GetSpans(), - ElementsAre(AllOf(SpanNamed("test message span1"), - SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))), - AllOf(SpanNamed("test message span2"), - SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); -} - -TEST(TracingMessageBatch, FlushCallback) { - auto span_catcher = InstallSpanCatcher(); - auto span = MakeSpan("test batch sink span"); - opentelemetry::trace::Scope scope(span); - auto mock = std::make_unique(); - EXPECT_CALL(*mock, FlushCallback); - std::vector> - message_spans; - std::vector> - batch_sink_spans; - batch_sink_spans.emplace_back(span); - auto message_batch = std::make_unique( - std::move(mock), message_spans, batch_sink_spans); - - message_batch->FlushCallback(); - - // Verifies that the spans were ended. If the span was not ended, `GetSpans` - // would be empty. auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, SizeIs(1)); - EXPECT_THAT( - spans, - Contains(AllOf(SpanNamed("test batch sink span"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)))); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), IsEmpty()); -} - -TEST(TracingMessageBatch, FlushCallbackWithMultipleMessages) { - auto span_catcher = InstallSpanCatcher(); - auto span1 = MakeSpan("test batch sink span 1"); - auto span2 = MakeSpan("test batch sink span 2"); - auto mock = std::make_unique(); - EXPECT_CALL(*mock, FlushCallback); - std::vector> - message_spans; - std::vector> - batch_sink_spans; - batch_sink_spans.reserve(2); - batch_sink_spans.emplace_back(span1); - batch_sink_spans.emplace_back(span2); - auto message_batch = std::make_unique( - std::move(mock), message_spans, batch_sink_spans); - - message_batch->FlushCallback(); - - // Verifies that the spans were ended. If the span was not ended, `GetSpans` - // would be empty. - auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, SizeIs(2)); - EXPECT_THAT( - spans, ElementsAre( - AllOf(SpanNamed("test batch sink span 1"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)), - AllOf(SpanNamed("test batch sink span 2"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)))); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), IsEmpty()); + EXPECT_THAT(spans, Contains(AllOf( + SpanNamed("test message span1"), + SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); + EXPECT_THAT(spans, Contains(AllOf( + SpanNamed("test message span2"), + SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); } // TODO(#12528): Add an end to end test. diff --git a/google/cloud/pubsub/testing/mock_message_batch.h b/google/cloud/pubsub/testing/mock_message_batch.h index 22838cd51e168..eb55ff36d4cd5 100644 --- a/google/cloud/pubsub/testing/mock_message_batch.h +++ b/google/cloud/pubsub/testing/mock_message_batch.h @@ -17,8 +17,10 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/message.h" +#include "google/cloud/future.h" #include "google/cloud/version.h" #include +#include namespace google { namespace cloud { @@ -33,8 +35,7 @@ class MockMessageBatch : public pubsub_internal::MessageBatch { ~MockMessageBatch() override = default; MOCK_METHOD(void, SaveMessage, (pubsub::Message), (override)); - MOCK_METHOD(void, Flush, (), (override)); - MOCK_METHOD(void, FlushCallback, (), (override)); + MOCK_METHOD(std::function)>, Flush, (), (override)); }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/testing_util/opentelemetry_matchers.h b/google/cloud/testing_util/opentelemetry_matchers.h index f577253ae3329..c641cf685ed4d 100644 --- a/google/cloud/testing_util/opentelemetry_matchers.h +++ b/google/cloud/testing_util/opentelemetry_matchers.h @@ -245,6 +245,13 @@ ::testing::Matcher SpanLinksAre(Args const&... matchers) { return SpanLinksAreImpl(::testing::ElementsAre(matchers...)); } +MATCHER_P(SpanLinksSizeIs, span_links, + "has size: " + std::to_string(span_links)) { + auto const& actual = static_cast(arg->GetLinks().size()); + *result_listener << "has size: " + std::to_string(actual); + return actual == span_links; +} + class SpanCatcher { public: SpanCatcher();