From 30f19db3520926d4d7ceb748a0a6bcdeb4baebb2 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Mon, 16 Oct 2023 14:21:49 -0400 Subject: [PATCH 01/22] impl(pubsub): add more than 128 links --- .../pubsub/internal/tracing_message_batch.cc | 88 +++++++++++++++---- .../internal/tracing_message_batch_test.cc | 60 +++++++++++-- .../testing_util/opentelemetry_matchers.h | 7 ++ 3 files changed, 129 insertions(+), 26 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 5ba2a3eaf7e39..4b6cf0487dbdd 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -43,7 +43,35 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { namespace { -opentelemetry::nostd::shared_ptr MakeParentSpan( +/// Inserts a link for each span in @p links between the @p begin and @p end +/// iterators. +template +void GenerateLinks( + Iterator begin, Iterator end, + std::vector>>>& + links) { + static_assert( + std::is_same< + absl::decay_t, + opentelemetry::nostd::shared_ptr>::value, + "Iterator is not the right type."); + using opentelemetry::trace::SpanContext; + using AttributesList = + std::vector>; + std::transform(begin, end, std::back_inserter(links), + [i = static_cast(0)](auto const& span) mutable { + return std::make_pair( + span->GetContext(), + AttributesList{{"messaging.pubsub.message.link", i++}}); + }); +} + +std::vector> +MakeBatchSinkSpans( std::vector> message_spans) { using opentelemetry::trace::SpanContext; @@ -52,19 +80,16 @@ opentelemetry::nostd::shared_ptr MakeParentSpan( opentelemetry::common::AttributeValue>>; auto constexpr kMaxOtelLinks = 128; std::vector> links; - auto batch_size = message_spans.size(); + auto batch_size = static_cast(message_spans.size()); links.reserve(batch_size); // If the batch size is less than the max size, add the links to a single - // span. - if (batch_size < kMaxOtelLinks) { - std::transform( - message_spans.begin(), message_spans.end(), std::back_inserter(links), - [i = static_cast(0)](auto const& span) mutable { - return std::make_pair( - span->GetContext(), - AttributesList{{"messaging.pubsub.message.link", i++}}); - }); + // span. If the batch size is greater than the max size, this will be a parent + // span with no links and each child spans will contain links. + bool const is_small_batch = batch_size <= kMaxOtelLinks; + if (is_small_batch) { + GenerateLinks(message_spans.begin(), message_spans.end(), links); } + auto batch_sink_parent_span = internal::MakeSpan("BatchSink::AsyncPublish", /*attributes=*/ @@ -72,6 +97,35 @@ opentelemetry::nostd::shared_ptr MakeParentSpan( static_cast(batch_size)}}, /*links*/ links); + std::vector> + batch_sink_spans; + auto num_of_batches = is_small_batch ? 0 : (batch_size / kMaxOtelLinks) + 1; + batch_sink_spans.reserve(num_of_batches + 1); + batch_sink_spans.emplace_back(batch_sink_parent_span); + + // Create N spans with up to 128 links per batch. + if (!is_small_batch) { + for (int i = 0; i < num_of_batches; ++i) { + std::vector> links; + links.reserve(kMaxOtelLinks); + GenerateLinks( + message_spans.begin() + (kMaxOtelLinks * i), + message_spans.begin() + + std::min(static_cast((kMaxOtelLinks * (i + 1))), + static_cast(batch_size)), + links); + + opentelemetry::trace::StartSpanOptions options; + options.parent = batch_sink_parent_span->GetContext(); + auto batch_sink_span = internal::MakeSpan( + "BatchSink::AsyncPublish - Batch #" + std::to_string(i), + /*attributes=*/{{}}, + /*links=*/links, options); + batch_sink_spans.emplace_back(batch_sink_span); + } + } + + // Add metadata to the message spans about the batch sink span. auto context = batch_sink_parent_span->GetContext(); auto trace_id = internal::ToString(context.trace_id()); auto span_id = internal::ToString(context.span_id()); @@ -81,7 +135,7 @@ opentelemetry::nostd::shared_ptr MakeParentSpan( message_span->SetAttribute("pubsub.batch_sink.span_id", span_id); } - return batch_sink_parent_span; + return batch_sink_spans; } } // namespace @@ -93,15 +147,15 @@ void TracingMessageBatch::Flush() { message_spans.swap(message_spans_); } - // TODO(#12528): Handle batches larger than 128. - auto batch_sink_parent_span = MakeParentSpan(std::move(message_spans)); + auto batch_sink_spans = MakeBatchSinkSpans(std::move(message_spans)); - // Set the batch sink as the active span. + // Set the batch sink parent span as the active span. This will always be the + // first span in the vector. auto async_scope = internal::GetTracer(internal::CurrentOptions()) - ->WithActiveSpan(batch_sink_parent_span); + ->WithActiveSpan(batch_sink_spans[0]); { std::lock_guard lk(batch_sink_mu_); - batch_sink_spans_.push_back(std::move(batch_sink_parent_span)); + batch_sink_spans_.swap(batch_sink_spans); } child_->Flush(); diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 8b90ad73ea544..53546ac8708f2 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -37,6 +37,7 @@ using ::google::cloud::testing_util::SpanHasEvents; using ::google::cloud::testing_util::SpanHasInstrumentationScope; using ::google::cloud::testing_util::SpanKindIsClient; using ::google::cloud::testing_util::SpanLinkAttributesAre; +using ::google::cloud::testing_util::SpanLinksSizeIs; using ::google::cloud::testing_util::SpanNamed; using ::google::cloud::testing_util::SpanWithStatus; using ::google::cloud::testing_util::ThereIsAnActiveSpan; @@ -179,14 +180,15 @@ TEST(TracingMessageBatch, FlushSmallBatch) { "messaging.pubsub.message.link", 1))))))); } -// TODO(#12528): Update test when impl is added -TEST(TracingMessageBatch, FlushLargeBatch) { +TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { + int const k_batch_size = 128; std::vector> initial_spans; - initial_spans.reserve(129); - for (int i = 0; i < 129; ++i) { + initial_spans.reserve(k_batch_size); + for (int i = 0; i < k_batch_size; ++i) { initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); } + ASSERT_THAT(initial_spans, ::testing::SizeIs(k_batch_size)); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { @@ -204,11 +206,51 @@ TEST(TracingMessageBatch, FlushLargeBatch) { EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, Contains(AllOf( - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanNamed("BatchSink::AsyncPublish"), - SpanHasAttributes(OTelAttribute( - "messaging.pubsub.num_messages_in_batch", 129))))); + EXPECT_THAT( + spans, Contains(AllOf(SpanHasInstrumentationScope(), SpanKindIsClient(), + SpanNamed("BatchSink::AsyncPublish"), + SpanHasAttributes(OTelAttribute( + "messaging.pubsub.num_messages_in_batch", 128)), + SpanLinksSizeIs(128)))); +} + +TEST(TracingMessageBatch, FlushLargeBatch) { + int const k_batch_size = 129; + std::vector> + initial_spans; + initial_spans.reserve(k_batch_size); + for (int i = 0; i < k_batch_size; ++i) { + initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); + } + ASSERT_THAT(initial_spans, ::testing::SizeIs(k_batch_size)); + auto span_catcher = InstallSpanCatcher(); + auto mock = std::make_unique(); + EXPECT_CALL(*mock, Flush).WillOnce([] { + EXPECT_TRUE(ThereIsAnActiveSpan()); + }); + auto message_batch = + std::make_unique(std::move(mock), initial_spans); + + message_batch->Flush(); + + // The span must end before it can be processed by the span catcher. + EndSpans(message_batch->GetBatchSinkSpans()); + + EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); + EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(3)); + + auto spans = span_catcher->GetSpans(); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("BatchSink::AsyncPublish"), + SpanHasAttributes(OTelAttribute( + "messaging.pubsub.num_messages_in_batch", + k_batch_size))))); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("BatchSink::AsyncPublish - Batch #0"), + SpanLinksSizeIs(128)))); + EXPECT_THAT(spans, + Contains(AllOf(SpanNamed("BatchSink::AsyncPublish - Batch #1"), + SpanLinksSizeIs(1)))); } TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { diff --git a/google/cloud/testing_util/opentelemetry_matchers.h b/google/cloud/testing_util/opentelemetry_matchers.h index f577253ae3329..c641cf685ed4d 100644 --- a/google/cloud/testing_util/opentelemetry_matchers.h +++ b/google/cloud/testing_util/opentelemetry_matchers.h @@ -245,6 +245,13 @@ ::testing::Matcher SpanLinksAre(Args const&... matchers) { return SpanLinksAreImpl(::testing::ElementsAre(matchers...)); } +MATCHER_P(SpanLinksSizeIs, span_links, + "has size: " + std::to_string(span_links)) { + auto const& actual = static_cast(arg->GetLinks().size()); + *result_listener << "has size: " + std::to_string(actual); + return actual == span_links; +} + class SpanCatcher { public: SpanCatcher(); From 8d5a30eae8a556fdd8382cd798bd02d79bb8a852 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 15:07:45 -0400 Subject: [PATCH 02/22] fix m32 build: --- .../pubsub/internal/tracing_message_batch.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 4b6cf0487dbdd..b3cd75e3083c3 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -78,9 +78,9 @@ MakeBatchSinkSpans( using AttributesList = std::vector>; - auto constexpr kMaxOtelLinks = 128; + int constexpr kMaxOtelLinks = 128; std::vector> links; - auto batch_size = static_cast(message_spans.size()); + auto batch_size = message_spans.size(); links.reserve(batch_size); // If the batch size is less than the max size, add the links to a single // span. If the batch size is greater than the max size, this will be a parent @@ -99,8 +99,11 @@ MakeBatchSinkSpans( std::vector> batch_sink_spans; - auto num_of_batches = is_small_batch ? 0 : (batch_size / kMaxOtelLinks) + 1; - batch_sink_spans.reserve(num_of_batches + 1); + auto num_of_batches = + is_small_batch + ? 0 + : (static_cast(batch_size) / kMaxOtelLinks) + 1; + batch_sink_spans.reserve(static_cast(num_of_batches + 1)); batch_sink_spans.emplace_back(batch_sink_parent_span); // Create N spans with up to 128 links per batch. @@ -111,8 +114,8 @@ MakeBatchSinkSpans( GenerateLinks( message_spans.begin() + (kMaxOtelLinks * i), message_spans.begin() + - std::min(static_cast((kMaxOtelLinks * (i + 1))), - static_cast(batch_size)), + std::min(static_cast(kMaxOtelLinks * (i + 1)), + static_cast(batch_size)), links); opentelemetry::trace::StartSpanOptions options; From c8f893c5bb0c6c7db8d8726858aa5dbac0dc9c9e Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 15:12:40 -0400 Subject: [PATCH 03/22] fix check --- google/cloud/pubsub/internal/tracing_message_batch.cc | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index b3cd75e3083c3..8b3175b7bee15 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -111,12 +111,11 @@ MakeBatchSinkSpans( for (int i = 0; i < num_of_batches; ++i) { std::vector> links; links.reserve(kMaxOtelLinks); - GenerateLinks( - message_spans.begin() + (kMaxOtelLinks * i), - message_spans.begin() + - std::min(static_cast(kMaxOtelLinks * (i + 1)), - static_cast(batch_size)), - links); + GenerateLinks(message_spans.begin() + (kMaxOtelLinks * i), + message_spans.begin() + + std::min(static_cast(kMaxOtelLinks * (i + 1)), + static_cast(batch_size)), + links); opentelemetry::trace::StartSpanOptions options; options.parent = batch_sink_parent_span->GetContext(); From 41330dbb1f568c259d839e0a14ebae4dce04f1a3 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 16:56:12 -0400 Subject: [PATCH 04/22] k batch size --- .../internal/tracing_message_batch_test.cc | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 53546ac8708f2..02a462c12a7a7 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -181,14 +181,14 @@ TEST(TracingMessageBatch, FlushSmallBatch) { } TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { - int const k_batch_size = 128; + auto constexpr kBatchSize = 128; std::vector> initial_spans; - initial_spans.reserve(k_batch_size); - for (int i = 0; i < k_batch_size; ++i) { + initial_spans.reserve(kBatchSize); + for (int i = 0; i < kBatchSize; ++i) { initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); } - ASSERT_THAT(initial_spans, ::testing::SizeIs(k_batch_size)); + ASSERT_THAT(initial_spans, ::testing::SizeIs(kBatchSize)); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { @@ -215,14 +215,14 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { } TEST(TracingMessageBatch, FlushLargeBatch) { - int const k_batch_size = 129; + auto constexpr kBatchSize = 128; std::vector> initial_spans; - initial_spans.reserve(k_batch_size); - for (int i = 0; i < k_batch_size; ++i) { + initial_spans.reserve(kBatchSize); + for (int i = 0; i < kBatchSize; ++i) { initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); } - ASSERT_THAT(initial_spans, ::testing::SizeIs(k_batch_size)); + ASSERT_THAT(initial_spans, ::testing::SizeIs(kBatchSize)); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { @@ -241,10 +241,10 @@ TEST(TracingMessageBatch, FlushLargeBatch) { auto spans = span_catcher->GetSpans(); EXPECT_THAT(spans, - Contains(AllOf(SpanNamed("BatchSink::AsyncPublish"), - SpanHasAttributes(OTelAttribute( - "messaging.pubsub.num_messages_in_batch", - k_batch_size))))); + Contains(AllOf( + SpanNamed("BatchSink::AsyncPublish"), + SpanHasAttributes(OTelAttribute( + "messaging.pubsub.num_messages_in_batch", kBatchSize))))); EXPECT_THAT(spans, Contains(AllOf(SpanNamed("BatchSink::AsyncPublish - Batch #0"), SpanLinksSizeIs(128)))); From 58227a55b9a0156dcafa3dc0f9907b3285e0428d Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 17:03:44 -0400 Subject: [PATCH 05/22] refactor create spans --- .../internal/tracing_message_batch_test.cc | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 02a462c12a7a7..fbec5a347a6b3 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -57,6 +57,18 @@ void EndSpans(std::vector> +CreateSpans(int n) { + std::vector> + spans; + spans.resize(n); + std::generate(spans.begin(), spans.end(), [i = 0]() mutable { + return MakeSpan("test span " + std::to_string(i++)); + }); + return spans; +} + TEST(TracingMessageBatch, SaveMessage) { auto span = MakeSpan("test span"); opentelemetry::trace::Scope scope(span); @@ -76,7 +88,7 @@ TEST(TracingMessageBatch, SaveMultipleMessages) { auto mock = std::make_unique(); EXPECT_CALL(*mock, SaveMessage).Times(2); auto message_batch = std::make_unique(std::move(mock)); - auto message = pubsub::MessageBuilder().SetData("test").Build(); + auto message = pubsub ::MessageBuilder().SetData("test").Build(); // Save the first span. auto span1 = MakeSpan("test span"); @@ -182,13 +194,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) { TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { auto constexpr kBatchSize = 128; - std::vector> - initial_spans; - initial_spans.reserve(kBatchSize); - for (int i = 0; i < kBatchSize; ++i) { - initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); - } - ASSERT_THAT(initial_spans, ::testing::SizeIs(kBatchSize)); + auto initial_spans = CreateSpans(kBatchSize); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { @@ -215,14 +221,8 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { } TEST(TracingMessageBatch, FlushLargeBatch) { - auto constexpr kBatchSize = 128; - std::vector> - initial_spans; - initial_spans.reserve(kBatchSize); - for (int i = 0; i < kBatchSize; ++i) { - initial_spans.emplace_back(MakeSpan("test span " + std::to_string(i))); - } - ASSERT_THAT(initial_spans, ::testing::SizeIs(kBatchSize)); + auto constexpr kBatchSize = 129; + auto initial_spans = CreateSpans(kBatchSize); auto span_catcher = InstallSpanCatcher(); auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { From e1ec2200035e396b6c986116f870f65aa9ca2459 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 17:06:17 -0400 Subject: [PATCH 06/22] add iterator --- .../pubsub/internal/tracing_message_batch.cc | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 8b3175b7bee15..28feb189ee764 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -43,16 +43,10 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { namespace { -/// Inserts a link for each span in @p links between the @p begin and @p end -/// iterators. -template -void GenerateLinks( - Iterator begin, Iterator end, - std::vector>>>& - links) { +/// Inserts a link for each span in @p destination between the @p begin and @p +/// end iterators. +template +void GenerateLinks(Iterator begin, Iterator end, OutputIterator destination) { static_assert( std::is_same< absl::decay_t, @@ -62,7 +56,7 @@ void GenerateLinks( using AttributesList = std::vector>; - std::transform(begin, end, std::back_inserter(links), + std::transform(begin, end, destination, [i = static_cast(0)](auto const& span) mutable { return std::make_pair( span->GetContext(), @@ -87,7 +81,8 @@ MakeBatchSinkSpans( // span with no links and each child spans will contain links. bool const is_small_batch = batch_size <= kMaxOtelLinks; if (is_small_batch) { - GenerateLinks(message_spans.begin(), message_spans.end(), links); + GenerateLinks(message_spans.begin(), message_spans.end(), + std::back_inserter(links)); } auto batch_sink_parent_span = @@ -115,7 +110,7 @@ MakeBatchSinkSpans( message_spans.begin() + std::min(static_cast(kMaxOtelLinks * (i + 1)), static_cast(batch_size)), - links); + std::back_inserter(links)); opentelemetry::trace::StartSpanOptions options; options.parent = batch_sink_parent_span->GetContext(); From 82465b716bfd4177ddecd2764da2d9dbe2ea529e Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 18:20:05 -0400 Subject: [PATCH 07/22] add something --- .../pubsub/internal/tracing_message_batch.cc | 55 +++++++++++-------- .../internal/tracing_message_batch_test.cc | 3 +- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 28feb189ee764..6ba12d8b7cfee 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -52,7 +52,6 @@ void GenerateLinks(Iterator begin, Iterator end, OutputIterator destination) { absl::decay_t, opentelemetry::nostd::shared_ptr>::value, "Iterator is not the right type."); - using opentelemetry::trace::SpanContext; using AttributesList = std::vector>; @@ -64,6 +63,34 @@ void GenerateLinks(Iterator begin, Iterator end, OutputIterator destination) { }); } +template +void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, + T batch_sink_parent_span, std::ptrdiff_t batch_size, + std::vector& batch_sink_spans) { + using LinksList = std::vector< + std::pair>>>; + auto cut = [&](auto i) { + return std::next( + i, std::min(batch_size - 1, std::distance(i, message_spans.end()))); + }; + int batches = message_spans.size() / batch_size; + int count = 0; + for (auto i = message_spans.begin(); count <= batches; i = cut(i)) { + LinksList links; + // Generate links between [i, min((i + batch_size) -1), end)) range. + GenerateLinks(i, cut(i), std::back_inserter(links)); + opentelemetry::trace::StartSpanOptions options; + options.parent = batch_sink_parent_span->GetContext(); + auto batch_sink_span = internal::MakeSpan( + "BatchSink::AsyncPublish - Batch #" + std::to_string(count++), + /*attributes=*/{{}}, + /*links=*/links, options); + batch_sink_spans.emplace_back(batch_sink_span); + } +} + std::vector> MakeBatchSinkSpans( std::vector> @@ -94,32 +121,12 @@ MakeBatchSinkSpans( std::vector> batch_sink_spans; - auto num_of_batches = - is_small_batch - ? 0 - : (static_cast(batch_size) / kMaxOtelLinks) + 1; - batch_sink_spans.reserve(static_cast(num_of_batches + 1)); batch_sink_spans.emplace_back(batch_sink_parent_span); // Create N spans with up to 128 links per batch. if (!is_small_batch) { - for (int i = 0; i < num_of_batches; ++i) { - std::vector> links; - links.reserve(kMaxOtelLinks); - GenerateLinks(message_spans.begin() + (kMaxOtelLinks * i), - message_spans.begin() + - std::min(static_cast(kMaxOtelLinks * (i + 1)), - static_cast(batch_size)), - std::back_inserter(links)); - - opentelemetry::trace::StartSpanOptions options; - options.parent = batch_sink_parent_span->GetContext(); - auto batch_sink_span = internal::MakeSpan( - "BatchSink::AsyncPublish - Batch #" + std::to_string(i), - /*attributes=*/{{}}, - /*links=*/links, options); - batch_sink_spans.emplace_back(batch_sink_span); - } + GenerateBatchSinkChildrenSpans(message_spans, batch_sink_parent_span, batch_size, + batch_sink_spans); } // Add metadata to the message spans about the batch sink span. @@ -149,7 +156,7 @@ void TracingMessageBatch::Flush() { // Set the batch sink parent span as the active span. This will always be the // first span in the vector. auto async_scope = internal::GetTracer(internal::CurrentOptions()) - ->WithActiveSpan(batch_sink_spans[0]); + ->WithActiveSpan(*batch_sink_spans.begin()); { std::lock_guard lk(batch_sink_mu_); batch_sink_spans_.swap(batch_sink_spans); diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index fbec5a347a6b3..b386416f5c0d1 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -61,8 +61,7 @@ void EndSpans(std::vector> CreateSpans(int n) { std::vector> - spans; - spans.resize(n); + spans(n); std::generate(spans.begin(), spans.end(), [i = 0]() mutable { return MakeSpan("test span " + std::to_string(i++)); }); From f339b437c6cc21b42dbfd21849e8ca92788cc368 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Thu, 19 Oct 2023 18:21:50 -0400 Subject: [PATCH 08/22] checkers --- google/cloud/pubsub/internal/tracing_message_batch.cc | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 6ba12d8b7cfee..6fd7adc23ff4a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -65,8 +65,9 @@ void GenerateLinks(Iterator begin, Iterator end, OutputIterator destination) { template void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, - T batch_sink_parent_span, std::ptrdiff_t batch_size, - std::vector& batch_sink_spans) { + T batch_sink_parent_span, + std::ptrdiff_t batch_size, + std::vector& batch_sink_spans) { using LinksList = std::vector< std::pair Date: Thu, 19 Oct 2023 18:29:50 -0400 Subject: [PATCH 09/22] fix --- google/cloud/pubsub/internal/tracing_message_batch.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 6fd7adc23ff4a..e32bbb9fdd3f9 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -76,9 +76,8 @@ void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, return std::next( i, std::min(batch_size - 1, std::distance(i, message_spans.end()))); }; - int batches = message_spans.size() / batch_size; int count = 0; - for (auto i = message_spans.begin(); count <= batches; i = cut(i)) { + for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { LinksList links; // Generate links between [i, min((i + batch_size) -1), end)) range. GenerateLinks(i, cut(i), std::back_inserter(links)); From f1746b8ecb27872985bfa4550771709148c317b1 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 20 Oct 2023 10:13:51 -0400 Subject: [PATCH 10/22] initial changes --- .../cloud/pubsub/internal/tracing_message_batch.cc | 13 +++++++------ .../pubsub/internal/tracing_message_batch_test.cc | 3 +-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index e32bbb9fdd3f9..0df3b7e4cb15a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -43,6 +43,11 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { namespace { +using Links = std::vector< + std::pair>>>; + /// Inserts a link for each span in @p destination between the @p begin and @p /// end iterators. template @@ -68,17 +73,13 @@ void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, T batch_sink_parent_span, std::ptrdiff_t batch_size, std::vector& batch_sink_spans) { - using LinksList = std::vector< - std::pair>>>; auto cut = [&](auto i) { return std::next( i, std::min(batch_size - 1, std::distance(i, message_spans.end()))); }; int count = 0; for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { - LinksList links; + Links links; // Generate links between [i, min((i + batch_size) -1), end)) range. GenerateLinks(i, cut(i), std::back_inserter(links)); opentelemetry::trace::StartSpanOptions options; @@ -156,7 +157,7 @@ void TracingMessageBatch::Flush() { // Set the batch sink parent span as the active span. This will always be the // first span in the vector. auto async_scope = internal::GetTracer(internal::CurrentOptions()) - ->WithActiveSpan(*batch_sink_spans.begin()); + ->WithActiveSpan(batch_sink_spans.front()); { std::lock_guard lk(batch_sink_mu_); batch_sink_spans_.swap(batch_sink_spans); diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index b386416f5c0d1..14390aa99f950 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -58,8 +58,7 @@ void EndSpans(std::vector> -CreateSpans(int n) { +auto CreateSpans(int n) { std::vector> spans(n); std::generate(spans.begin(), spans.end(), [i = 0]() mutable { From ee1ac45ad28b739a5a422922a3a03f20b336d7be Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 20 Oct 2023 10:57:21 -0400 Subject: [PATCH 11/22] refactor links --- .../pubsub/internal/tracing_message_batch.cc | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 0df3b7e4cb15a..9e869aa1e775a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -43,29 +43,24 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { namespace { -using Links = std::vector< - std::pair>>>; - -/// Inserts a link for each span in @p destination between the @p begin and @p -/// end iterators. -template -void GenerateLinks(Iterator begin, Iterator end, OutputIterator destination) { - static_assert( - std::is_same< - absl::decay_t, - opentelemetry::nostd::shared_ptr>::value, - "Iterator is not the right type."); - using AttributesList = - std::vector>; - std::transform(begin, end, destination, +using Spans = + std::vector>; +using Attributes = + std::vector>; +using Links = + std::vector>; + +/// Creates a link for each span in the range @p begin to @p end. +auto MakeLinks(Spans::const_iterator begin, Spans::const_iterator end) { + Links links; + std::transform(begin, end, std::back_inserter(links), [i = static_cast(0)](auto const& span) mutable { return std::make_pair( span->GetContext(), - AttributesList{{"messaging.pubsub.message.link", i++}}); + Attributes{{"messaging.pubsub.message.link", i++}}); }); + return links; } template @@ -79,9 +74,8 @@ void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, }; int count = 0; for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { - Links links; + Links links = MakeLinks(i, cut(i)); // Generate links between [i, min((i + batch_size) -1), end)) range. - GenerateLinks(i, cut(i), std::back_inserter(links)); opentelemetry::trace::StartSpanOptions options; options.parent = batch_sink_parent_span->GetContext(); auto batch_sink_span = internal::MakeSpan( @@ -109,8 +103,7 @@ MakeBatchSinkSpans( // span with no links and each child spans will contain links. bool const is_small_batch = batch_size <= kMaxOtelLinks; if (is_small_batch) { - GenerateLinks(message_spans.begin(), message_spans.end(), - std::back_inserter(links)); + links = MakeLinks(message_spans.begin(), message_spans.end()); } auto batch_sink_parent_span = From c90eb644793391aaa14c8e8986019c52685dc1f3 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Fri, 20 Oct 2023 11:03:20 -0400 Subject: [PATCH 12/22] fixes --- google/cloud/pubsub/internal/tracing_message_batch.cc | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 9e869aa1e775a..cb746046f461a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -90,12 +90,8 @@ std::vector> MakeBatchSinkSpans( std::vector> message_spans) { - using opentelemetry::trace::SpanContext; - using AttributesList = - std::vector>; int constexpr kMaxOtelLinks = 128; - std::vector> links; + Links links; auto batch_size = message_spans.size(); links.reserve(batch_size); // If the batch size is less than the max size, add the links to a single From 37f4bc577b8917f6d2fbce21587e32a1596186c0 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Sun, 22 Oct 2023 18:28:56 -0400 Subject: [PATCH 13/22] address comments --- .../pubsub/internal/tracing_message_batch.cc | 96 +++++++++---------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index cb746046f461a..62628e52b2af0 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -63,64 +63,16 @@ auto MakeLinks(Spans::const_iterator begin, Spans::const_iterator end) { return links; } -template -void GenerateBatchSinkChildrenSpans(std::vector const& message_spans, - T batch_sink_parent_span, - std::ptrdiff_t batch_size, - std::vector& batch_sink_spans) { - auto cut = [&](auto i) { - return std::next( - i, std::min(batch_size - 1, std::distance(i, message_spans.end()))); - }; - int count = 0; - for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { - Links links = MakeLinks(i, cut(i)); - // Generate links between [i, min((i + batch_size) -1), end)) range. - opentelemetry::trace::StartSpanOptions options; - options.parent = batch_sink_parent_span->GetContext(); - auto batch_sink_span = internal::MakeSpan( - "BatchSink::AsyncPublish - Batch #" + std::to_string(count++), - /*attributes=*/{{}}, - /*links=*/links, options); - batch_sink_spans.emplace_back(batch_sink_span); - } -} - -std::vector> -MakeBatchSinkSpans( - std::vector> - message_spans) { - int constexpr kMaxOtelLinks = 128; - Links links; - auto batch_size = message_spans.size(); - links.reserve(batch_size); - // If the batch size is less than the max size, add the links to a single - // span. If the batch size is greater than the max size, this will be a parent - // span with no links and each child spans will contain links. - bool const is_small_batch = batch_size <= kMaxOtelLinks; - if (is_small_batch) { - links = MakeLinks(message_spans.begin(), message_spans.end()); - } - - auto batch_sink_parent_span = +auto MakeParent(Links links, Spans const& message_spans) { + auto batch_sink_parent = internal::MakeSpan("BatchSink::AsyncPublish", /*attributes=*/ {{"messaging.pubsub.num_messages_in_batch", - static_cast(batch_size)}}, - /*links*/ links); - - std::vector> - batch_sink_spans; - batch_sink_spans.emplace_back(batch_sink_parent_span); - - // Create N spans with up to 128 links per batch. - if (!is_small_batch) { - GenerateBatchSinkChildrenSpans(message_spans, batch_sink_parent_span, - batch_size, batch_sink_spans); - } + static_cast(message_spans.size())}}, + /*links*/ std::move(links)); // Add metadata to the message spans about the batch sink span. - auto context = batch_sink_parent_span->GetContext(); + auto context = batch_sink_parent->GetContext(); auto trace_id = internal::ToString(context.trace_id()); auto span_id = internal::ToString(context.span_id()); for (auto const& message_span : message_spans) { @@ -128,6 +80,44 @@ MakeBatchSinkSpans( message_span->SetAttribute("pubsub.batch_sink.trace_id", trace_id); message_span->SetAttribute("pubsub.batch_sink.span_id", span_id); } + return batch_sink_parent; +} + +auto MakeChild( + opentelemetry::nostd::shared_ptr parent, + int count, Links links) { + opentelemetry::trace::StartSpanOptions options; + options.parent = parent->GetContext(); + return internal::MakeSpan( + "BatchSink::AsyncPublish - Batch #" + std::to_string(count), + /*attributes=*/{{}}, + /*links=*/links, options); +} + +Spans MakeBatchSinkSpans(Spans message_spans) { + int constexpr kMaxOtelLinks = 128; + Spans batch_sink_spans; + // If the batch size is less than the max size, add the links to a single + // span. If the batch size is greater than the max size, this will be a parent + // span with no links and each child spans will contain links. + if (message_spans.size() <= kMaxOtelLinks) { + batch_sink_spans.push_back(MakeParent( + MakeLinks(message_spans.begin(), message_spans.end()), message_spans)); + return batch_sink_spans; + } + batch_sink_spans.push_back(MakeParent({{}}, message_spans)); + auto batch_sink_parent = batch_sink_spans.front(); + + auto cut = [&message_spans](auto i) { + auto const batch_size = static_cast(kMaxOtelLinks); + return std::next( + i, std::min(batch_size, std::distance(i, message_spans.end()))); + }; + int count = 0; + for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { + batch_sink_spans.push_back( + MakeChild(batch_sink_parent, count++, MakeLinks(i, cut(i)))); + } return batch_sink_spans; } From 4d750a377f853ef7e056e6a50ac8c7be21454896 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Sun, 22 Oct 2023 18:34:15 -0400 Subject: [PATCH 14/22] fix comments --- google/cloud/pubsub/internal/tracing_message_batch.cc | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 62628e52b2af0..78f53db59f9a3 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -98,7 +98,7 @@ Spans MakeBatchSinkSpans(Spans message_spans) { int constexpr kMaxOtelLinks = 128; Spans batch_sink_spans; // If the batch size is less than the max size, add the links to a single - // span. If the batch size is greater than the max size, this will be a parent + // span. If the batch size is greater than the max size, create a parent // span with no links and each child spans will contain links. if (message_spans.size() <= kMaxOtelLinks) { batch_sink_spans.push_back(MakeParent( @@ -115,6 +115,8 @@ Spans MakeBatchSinkSpans(Spans message_spans) { }; int count = 0; for (auto i = message_spans.begin(); i != message_spans.end(); i = cut(i)) { + // Generates child spans with links between [i, min(i + batch_size, end)) + // such that each child span will have exactly batch_size elements or less. batch_sink_spans.push_back( MakeChild(batch_sink_parent, count++, MakeLinks(i, cut(i)))); } From 6dbe77f6625fd1af23592edced139e41ff167b5b Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Sun, 22 Oct 2023 20:08:48 -0400 Subject: [PATCH 15/22] clang --- google/cloud/pubsub/internal/tracing_message_batch.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 78f53db59f9a3..6f16c5266a345 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -63,7 +63,7 @@ auto MakeLinks(Spans::const_iterator begin, Spans::const_iterator end) { return links; } -auto MakeParent(Links links, Spans const& message_spans) { +auto MakeParent(Links const& links, Spans const& message_spans) { auto batch_sink_parent = internal::MakeSpan("BatchSink::AsyncPublish", /*attributes=*/ @@ -84,8 +84,8 @@ auto MakeParent(Links links, Spans const& message_spans) { } auto MakeChild( - opentelemetry::nostd::shared_ptr parent, - int count, Links links) { + opentelemetry::nostd::shared_ptr const& parent, + int count, Links const& links) { opentelemetry::trace::StartSpanOptions options; options.parent = parent->GetContext(); return internal::MakeSpan( From 6aa9831ea0b7564bcecf9d386eb89d1f34fcd360 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Tue, 24 Oct 2023 16:36:27 -0400 Subject: [PATCH 16/22] initial pass using a future --- google/cloud/pubsub/internal/message_batch.h | 6 +- .../pubsub/internal/noop_message_batch.h | 22 ++++- .../pubsub/internal/tracing_message_batch.cc | 28 ++---- .../pubsub/internal/tracing_message_batch.h | 20 +--- .../internal/tracing_message_batch_test.cc | 95 +++---------------- .../cloud/pubsub/testing/mock_message_batch.h | 3 +- 6 files changed, 51 insertions(+), 123 deletions(-) diff --git a/google/cloud/pubsub/internal/message_batch.h b/google/cloud/pubsub/internal/message_batch.h index 998e7e87334b6..953cecb5d9ec9 100644 --- a/google/cloud/pubsub/internal/message_batch.h +++ b/google/cloud/pubsub/internal/message_batch.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_MESSAGE_BATCH_H #include "google/cloud/pubsub/message.h" +#include "google/cloud/future_void.h" #include "google/cloud/options.h" namespace google { @@ -40,8 +41,9 @@ class MessageBatch { virtual void SaveMessage(pubsub::Message m) = 0; // Captures information about a batch of messages before it's flushed. Invoked - // in `BatchingPublisherConnection::FlushImpl(...)`. - virtual void Flush() = 0; + // in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to run + // after FlushCallback(). + virtual future Flush() = 0; // Captures information about the response after we receive it from // the server. Invoked in the `BatchSink::AsyncPublish(...)` callback. diff --git a/google/cloud/pubsub/internal/noop_message_batch.h b/google/cloud/pubsub/internal/noop_message_batch.h index 2411fb4fea7ef..63181ec2279f1 100644 --- a/google/cloud/pubsub/internal/noop_message_batch.h +++ b/google/cloud/pubsub/internal/noop_message_batch.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_NOOP_MESSAGE_BATCH_H #include "google/cloud/pubsub/internal/message_batch.h" +#include "google/cloud/future_void.h" namespace google { namespace cloud { @@ -33,9 +34,26 @@ class NoOpMessageBatch : public MessageBatch { void SaveMessage(pubsub::Message) override{}; - void Flush() override{}; + future Flush() override { + future result; + { + std::lock_guard lk(mu_); + waiter_.get_future(); + } + return result; + }; - void FlushCallback() override{}; + void FlushCallback() override { + promise waiter; + { + std::lock_guard lk(mu_); + waiter.swap(waiter_); + } + waiter.set_value(); + } + + std::mutex mu_; + promise waiter_; // ABSL_GUARDED_BY(mu_) }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 6f16c5266a345..ec070f4a952d2 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -17,6 +17,7 @@ #include "google/cloud/pubsub/internal/tracing_message_batch.h" #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/version.h" +#include "google/cloud/future_void.h" #include "google/cloud/internal/opentelemetry.h" #include "opentelemetry/context/runtime_context.h" #include "opentelemetry/trace/context.h" @@ -126,7 +127,7 @@ Spans MakeBatchSinkSpans(Spans message_spans) { } // namespace -void TracingMessageBatch::Flush() { +future TracingMessageBatch::Flush() { decltype(message_spans_) message_spans; { std::lock_guard lk(message_mu_); @@ -139,34 +140,21 @@ void TracingMessageBatch::Flush() { // first span in the vector. auto async_scope = internal::GetTracer(internal::CurrentOptions()) ->WithActiveSpan(batch_sink_spans.front()); - { - std::lock_guard lk(batch_sink_mu_); - batch_sink_spans_.swap(batch_sink_spans); - } - child_->Flush(); + return child_->Flush().then([spans = std::move(batch_sink_spans)](auto f) { + for (auto& span : spans) { + internal::EndSpan(*span); + } + }); } -void TracingMessageBatch::FlushCallback() { - decltype(batch_sink_spans_) spans; - { - std::lock_guard lk(batch_sink_mu_); - spans.swap(batch_sink_spans_); - } - for (auto& span : spans) internal::EndSpan(*span); - child_->FlushCallback(); -} +void TracingMessageBatch::FlushCallback() { child_->FlushCallback(); } std::vector> TracingMessageBatch::GetMessageSpans() const { return message_spans_; } -std::vector> -TracingMessageBatch::GetBatchSinkSpans() const { - return batch_sink_spans_; -} - GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace pubsub_internal } // namespace cloud diff --git a/google/cloud/pubsub/internal/tracing_message_batch.h b/google/cloud/pubsub/internal/tracing_message_batch.h index 67e91beb0b5d4..858fe5173abed 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.h +++ b/google/cloud/pubsub/internal/tracing_message_batch.h @@ -44,21 +44,12 @@ class TracingMessageBatch : public MessageBatch { std::vector> message_spans) : child_(std::move(child)), message_spans_(std::move(message_spans)) {} - // For testing only. - TracingMessageBatch( - std::unique_ptr child, - std::vector> - message_spans, - std::vector> - batch_sink_spans) - : child_(std::move(child)), - message_spans_(std::move(message_spans)), - batch_sink_spans_(std::move(batch_sink_spans)) {} + ~TracingMessageBatch() override = default; void SaveMessage(pubsub::Message m) override; - void Flush() override; + future Flush() override; void FlushCallback() override; @@ -66,18 +57,11 @@ class TracingMessageBatch : public MessageBatch { std::vector> GetMessageSpans() const; - // For testing only. - std::vector> - GetBatchSinkSpans() const; - private: std::unique_ptr child_; std::mutex message_mu_; - std::mutex batch_sink_mu_; std::vector> message_spans_; // ABSL_GUARDED_BY(message_mu_) - std::vector> - batch_sink_spans_; // ABSL_GUARDED_BY(batch_sink_mu_) }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 14390aa99f950..67d8199ffcd02 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -129,6 +129,7 @@ TEST(TracingMessageBatch, Flush) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return make_ready_future(); }); auto initial_spans = {message_span}; auto message_batch = @@ -136,11 +137,7 @@ TEST(TracingMessageBatch, Flush) { message_batch->Flush(); - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); - EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); EXPECT_THAT(spans, @@ -161,6 +158,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return make_ready_future(); }); auto initial_spans = {message_span1, message_span2}; auto message_batch = @@ -168,11 +166,7 @@ TEST(TracingMessageBatch, FlushSmallBatch) { message_batch->Flush(); - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); - EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); EXPECT_THAT( @@ -197,17 +191,14 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return make_ready_future(); }); auto message_batch = std::make_unique(std::move(mock), initial_spans); message_batch->Flush(); - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); - EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(1)); auto spans = span_catcher->GetSpans(); EXPECT_THAT( @@ -225,17 +216,14 @@ TEST(TracingMessageBatch, FlushLargeBatch) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); + return make_ready_future(); }); auto message_batch = std::make_unique(std::move(mock), initial_spans); message_batch->Flush(); - // The span must end before it can be processed by the span catcher. - EndSpans(message_batch->GetBatchSinkSpans()); - EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), SizeIs(3)); auto spans = span_catcher->GetSpans(); EXPECT_THAT(spans, @@ -256,7 +244,7 @@ TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); @@ -265,7 +253,6 @@ TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); - EndSpans(message_batch->GetBatchSinkSpans()); EXPECT_THAT( span_catcher->GetSpans(), @@ -281,7 +268,7 @@ TEST(TracingMessageBatch, FlushSpanMetadataAddsEvent) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); @@ -303,7 +290,7 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) { auto span1 = MakeSpan("test message span1"); auto span2 = MakeSpan("test message span2"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush); + EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); auto initial_spans = {span1, span2}; auto message_batch = std::make_unique(std::move(mock), initial_spans); @@ -313,73 +300,21 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) { // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); - EXPECT_THAT( - span_catcher->GetSpans(), - ElementsAre(AllOf(SpanNamed("test message span1"), - SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))), - AllOf(SpanNamed("test message span2"), - SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); -} - -TEST(TracingMessageBatch, FlushCallback) { - auto span_catcher = InstallSpanCatcher(); - auto span = MakeSpan("test batch sink span"); - opentelemetry::trace::Scope scope(span); - auto mock = std::make_unique(); - EXPECT_CALL(*mock, FlushCallback); - std::vector> - message_spans; - std::vector> - batch_sink_spans; - batch_sink_spans.emplace_back(span); - auto message_batch = std::make_unique( - std::move(mock), message_spans, batch_sink_spans); - - message_batch->FlushCallback(); - - // Verifies that the spans were ended. If the span was not ended, `GetSpans` - // would be empty. auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, SizeIs(1)); - EXPECT_THAT( - spans, - Contains(AllOf(SpanNamed("test batch sink span"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)))); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), IsEmpty()); + EXPECT_THAT(spans, Contains(AllOf( + SpanNamed("test message span1"), + SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); + EXPECT_THAT(spans, Contains(AllOf( + SpanNamed("test message span2"), + SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); } -TEST(TracingMessageBatch, FlushCallbackWithMultipleMessages) { - auto span_catcher = InstallSpanCatcher(); - auto span1 = MakeSpan("test batch sink span 1"); - auto span2 = MakeSpan("test batch sink span 2"); +TEST(TracingMessageBatch, FlushCallback) { auto mock = std::make_unique(); EXPECT_CALL(*mock, FlushCallback); - std::vector> - message_spans; - std::vector> - batch_sink_spans; - batch_sink_spans.reserve(2); - batch_sink_spans.emplace_back(span1); - batch_sink_spans.emplace_back(span2); - auto message_batch = std::make_unique( - std::move(mock), message_spans, batch_sink_spans); + auto message_batch = std::make_unique(std::move(mock)); message_batch->FlushCallback(); - - // Verifies that the spans were ended. If the span was not ended, `GetSpans` - // would be empty. - auto spans = span_catcher->GetSpans(); - EXPECT_THAT(spans, SizeIs(2)); - EXPECT_THAT( - spans, ElementsAre( - AllOf(SpanNamed("test batch sink span 1"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)), - AllOf(SpanNamed("test batch sink span 2"), - SpanHasInstrumentationScope(), SpanKindIsClient(), - SpanWithStatus(opentelemetry::trace::StatusCode::kOk)))); - EXPECT_THAT(message_batch->GetBatchSinkSpans(), IsEmpty()); } // TODO(#12528): Add an end to end test. diff --git a/google/cloud/pubsub/testing/mock_message_batch.h b/google/cloud/pubsub/testing/mock_message_batch.h index 22838cd51e168..a200ad37c2d81 100644 --- a/google/cloud/pubsub/testing/mock_message_batch.h +++ b/google/cloud/pubsub/testing/mock_message_batch.h @@ -17,6 +17,7 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/message.h" +#include "google/cloud/future_void.h" #include "google/cloud/version.h" #include @@ -33,7 +34,7 @@ class MockMessageBatch : public pubsub_internal::MessageBatch { ~MockMessageBatch() override = default; MOCK_METHOD(void, SaveMessage, (pubsub::Message), (override)); - MOCK_METHOD(void, Flush, (), (override)); + MOCK_METHOD(future, Flush, (), (override)); MOCK_METHOD(void, FlushCallback, (), (override)); }; From c8d54cc63ef1748ac9c8331f56ca2d89ef266372 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 09:28:10 -0400 Subject: [PATCH 17/22] address commnents' --- .../internal/batching_publisher_connection.cc | 7 +-- google/cloud/pubsub/internal/message_batch.h | 10 ++--- .../pubsub/internal/noop_message_batch.h | 23 ++-------- .../pubsub/internal/tracing_message_batch.cc | 15 +++---- .../pubsub/internal/tracing_message_batch.h | 7 ++- .../internal/tracing_message_batch_test.cc | 43 +++++++++---------- .../cloud/pubsub/testing/mock_message_batch.h | 3 +- 7 files changed, 39 insertions(+), 69 deletions(-) diff --git a/google/cloud/pubsub/internal/batching_publisher_connection.cc b/google/cloud/pubsub/internal/batching_publisher_connection.cc index 2d6ee84d23ea3..4a78d4f196eac 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection.cc @@ -23,7 +23,6 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN struct Batch { std::vector>> waiters; std::weak_ptr weak; - std::shared_ptr batch; void operator()(future> f) { auto response = f.get(); @@ -42,7 +41,6 @@ struct Batch { for (auto& w : waiters) { w.set_value(std::move(*response->mutable_message_ids(idx++))); } - batch->FlushCallback(); } void SatisfyAllWaiters(Status const& status) { @@ -200,9 +198,8 @@ void BatchingPublisherConnection::FlushImpl(std::unique_lock lk) { batch.weak = shared_from_this(); request.set_topic(topic_full_name_); - batch_->Flush(); - batch.batch = batch_; - sink_->AsyncPublish(std::move(request)).then(std::move(batch)); + auto handler = batch_->Flush(); + sink_->AsyncPublish(std::move(request)).then(std::move(batch)).then(std::move(handler)); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/message_batch.h b/google/cloud/pubsub/internal/message_batch.h index 953cecb5d9ec9..bec781103b5c3 100644 --- a/google/cloud/pubsub/internal/message_batch.h +++ b/google/cloud/pubsub/internal/message_batch.h @@ -41,13 +41,9 @@ class MessageBatch { virtual void SaveMessage(pubsub::Message m) = 0; // Captures information about a batch of messages before it's flushed. Invoked - // in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to run - // after FlushCallback(). - virtual future Flush() = 0; - - // Captures information about the response after we receive it from - // the server. Invoked in the `BatchSink::AsyncPublish(...)` callback. - virtual void FlushCallback() = 0; + // in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to invoke + // in another callback. + virtual std::function)> Flush() = 0; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/noop_message_batch.h b/google/cloud/pubsub/internal/noop_message_batch.h index 63181ec2279f1..9b55dbf67eb2c 100644 --- a/google/cloud/pubsub/internal/noop_message_batch.h +++ b/google/cloud/pubsub/internal/noop_message_batch.h @@ -16,7 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_NOOP_MESSAGE_BATCH_H #include "google/cloud/pubsub/internal/message_batch.h" -#include "google/cloud/future_void.h" +#include "google/cloud/future.h" namespace google { namespace cloud { @@ -34,26 +34,9 @@ class NoOpMessageBatch : public MessageBatch { void SaveMessage(pubsub::Message) override{}; - future Flush() override { - future result; - { - std::lock_guard lk(mu_); - waiter_.get_future(); - } - return result; - }; - - void FlushCallback() override { - promise waiter; - { - std::lock_guard lk(mu_); - waiter.swap(waiter_); - } - waiter.set_value(); + std::function)> Flush() override { + return [](auto){}; } - - std::mutex mu_; - promise waiter_; // ABSL_GUARDED_BY(mu_) }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index ec070f4a952d2..024773609378d 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -17,7 +17,6 @@ #include "google/cloud/pubsub/internal/tracing_message_batch.h" #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/version.h" -#include "google/cloud/future_void.h" #include "google/cloud/internal/opentelemetry.h" #include "opentelemetry/context/runtime_context.h" #include "opentelemetry/trace/context.h" @@ -127,7 +126,7 @@ Spans MakeBatchSinkSpans(Spans message_spans) { } // namespace -future TracingMessageBatch::Flush() { +std::function)> TracingMessageBatch::Flush() { decltype(message_spans_) message_spans; { std::lock_guard lk(message_mu_); @@ -141,15 +140,13 @@ future TracingMessageBatch::Flush() { auto async_scope = internal::GetTracer(internal::CurrentOptions()) ->WithActiveSpan(batch_sink_spans.front()); - return child_->Flush().then([spans = std::move(batch_sink_spans)](auto f) { - for (auto& span : spans) { - internal::EndSpan(*span); - } - }); + return [next = child_->Flush(), + spans = std::move(batch_sink_spans)](auto) mutable { + for (auto& span : spans) internal::EndSpan(*span); + next(make_ready_future()); + }; } -void TracingMessageBatch::FlushCallback() { child_->FlushCallback(); } - std::vector> TracingMessageBatch::GetMessageSpans() const { return message_spans_; diff --git a/google/cloud/pubsub/internal/tracing_message_batch.h b/google/cloud/pubsub/internal/tracing_message_batch.h index 858fe5173abed..8a9a40976938a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.h +++ b/google/cloud/pubsub/internal/tracing_message_batch.h @@ -21,6 +21,7 @@ #include "google/cloud/pubsub/internal/publisher_stub.h" #include "google/cloud/pubsub/version.h" #include "google/cloud/internal/opentelemetry.h" +#include "google/cloud/future.h" #include namespace google { @@ -48,10 +49,8 @@ class TracingMessageBatch : public MessageBatch { ~TracingMessageBatch() override = default; void SaveMessage(pubsub::Message m) override; - - future Flush() override; - - void FlushCallback() override; + + std::function)> Flush() override; // For testing only. std::vector> diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index 67d8199ffcd02..d59c76c8c903f 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -129,13 +129,14 @@ TEST(TracingMessageBatch, Flush) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(); + return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); @@ -158,13 +159,14 @@ TEST(TracingMessageBatch, FlushSmallBatch) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(); + return [](auto) {}; }); auto initial_spans = {message_span1, message_span2}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); @@ -191,12 +193,13 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(); + return [](auto) {}; }); auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); @@ -216,12 +219,13 @@ TEST(TracingMessageBatch, FlushLargeBatch) { auto mock = std::make_unique(); EXPECT_CALL(*mock, Flush).WillOnce([] { EXPECT_TRUE(ThereIsAnActiveSpan()); - return make_ready_future(); + return [](auto) {}; }); auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty()); @@ -244,12 +248,13 @@ TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); @@ -268,12 +273,13 @@ TEST(TracingMessageBatch, FlushSpanMetadataAddsEvent) { auto span_catcher = InstallSpanCatcher(); auto message_span = MakeSpan("test message span"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {message_span}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); @@ -290,12 +296,13 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) { auto span1 = MakeSpan("test message span1"); auto span2 = MakeSpan("test message span2"); auto mock = std::make_unique(); - EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); }); + EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; }); auto initial_spans = {span1, span2}; auto message_batch = std::make_unique(std::move(mock), initial_spans); - message_batch->Flush(); + auto end_spans = message_batch->Flush(); + end_spans(make_ready_future()); // The span must end before it can be processed by the span catcher. EndSpans(initial_spans); @@ -309,14 +316,6 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) { SpanHasEvents(EventNamed("gl-cpp.batch_flushed"))))); } -TEST(TracingMessageBatch, FlushCallback) { - auto mock = std::make_unique(); - EXPECT_CALL(*mock, FlushCallback); - auto message_batch = std::make_unique(std::move(mock)); - - message_batch->FlushCallback(); -} - // TODO(#12528): Add an end to end test. } // namespace diff --git a/google/cloud/pubsub/testing/mock_message_batch.h b/google/cloud/pubsub/testing/mock_message_batch.h index a200ad37c2d81..f96582d7d0465 100644 --- a/google/cloud/pubsub/testing/mock_message_batch.h +++ b/google/cloud/pubsub/testing/mock_message_batch.h @@ -34,8 +34,7 @@ class MockMessageBatch : public pubsub_internal::MessageBatch { ~MockMessageBatch() override = default; MOCK_METHOD(void, SaveMessage, (pubsub::Message), (override)); - MOCK_METHOD(future, Flush, (), (override)); - MOCK_METHOD(void, FlushCallback, (), (override)); + MOCK_METHOD(std::function)>, Flush, (), (override)); }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END From 50f0ba736ab1531b4076896f01d4db57ede5efc9 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 09:30:27 -0400 Subject: [PATCH 18/22] checkers --- .../cloud/pubsub/internal/batching_publisher_connection.cc | 4 +++- google/cloud/pubsub/internal/noop_message_batch.h | 4 ++-- google/cloud/pubsub/internal/tracing_message_batch.h | 6 +++--- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google/cloud/pubsub/internal/batching_publisher_connection.cc b/google/cloud/pubsub/internal/batching_publisher_connection.cc index 4a78d4f196eac..63b9ee0cbce04 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection.cc @@ -199,7 +199,9 @@ void BatchingPublisherConnection::FlushImpl(std::unique_lock lk) { request.set_topic(topic_full_name_); auto handler = batch_->Flush(); - sink_->AsyncPublish(std::move(request)).then(std::move(batch)).then(std::move(handler)); + sink_->AsyncPublish(std::move(request)) + .then(std::move(batch)) + .then(std::move(handler)); } GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/noop_message_batch.h b/google/cloud/pubsub/internal/noop_message_batch.h index 9b55dbf67eb2c..52ed7c48c8d7b 100644 --- a/google/cloud/pubsub/internal/noop_message_batch.h +++ b/google/cloud/pubsub/internal/noop_message_batch.h @@ -34,8 +34,8 @@ class NoOpMessageBatch : public MessageBatch { void SaveMessage(pubsub::Message) override{}; - std::function)> Flush() override { - return [](auto){}; + std::function)> Flush() override { + return [](auto) {}; } }; diff --git a/google/cloud/pubsub/internal/tracing_message_batch.h b/google/cloud/pubsub/internal/tracing_message_batch.h index 8a9a40976938a..bf35e12119b3e 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.h +++ b/google/cloud/pubsub/internal/tracing_message_batch.h @@ -20,8 +20,8 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/internal/publisher_stub.h" #include "google/cloud/pubsub/version.h" -#include "google/cloud/internal/opentelemetry.h" #include "google/cloud/future.h" +#include "google/cloud/internal/opentelemetry.h" #include namespace google { @@ -49,8 +49,8 @@ class TracingMessageBatch : public MessageBatch { ~TracingMessageBatch() override = default; void SaveMessage(pubsub::Message m) override; - - std::function)> Flush() override; + + std::function)> Flush() override; // For testing only. std::vector> From ae4be5890a5e460b84f9dbbf61a455a84c5b4809 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 09:49:38 -0400 Subject: [PATCH 19/22] nits --- google/cloud/pubsub/internal/message_batch.h | 3 ++- google/cloud/pubsub/internal/noop_message_batch.h | 2 +- google/cloud/pubsub/internal/tracing_message_batch.cc | 8 ++++---- google/cloud/pubsub/internal/tracing_message_batch.h | 5 +++-- .../cloud/pubsub/internal/tracing_message_batch_test.cc | 2 +- google/cloud/pubsub/testing/mock_message_batch.h | 3 ++- 6 files changed, 13 insertions(+), 10 deletions(-) diff --git a/google/cloud/pubsub/internal/message_batch.h b/google/cloud/pubsub/internal/message_batch.h index bec781103b5c3..3ce7531f337e0 100644 --- a/google/cloud/pubsub/internal/message_batch.h +++ b/google/cloud/pubsub/internal/message_batch.h @@ -16,8 +16,9 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_MESSAGE_BATCH_H #include "google/cloud/pubsub/message.h" -#include "google/cloud/future_void.h" +#include "google/cloud/future.h" #include "google/cloud/options.h" +#include namespace google { namespace cloud { diff --git a/google/cloud/pubsub/internal/noop_message_batch.h b/google/cloud/pubsub/internal/noop_message_batch.h index 52ed7c48c8d7b..424544ef54431 100644 --- a/google/cloud/pubsub/internal/noop_message_batch.h +++ b/google/cloud/pubsub/internal/noop_message_batch.h @@ -17,7 +17,7 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/future.h" - +#include namespace google { namespace cloud { namespace pubsub_internal { diff --git a/google/cloud/pubsub/internal/tracing_message_batch.cc b/google/cloud/pubsub/internal/tracing_message_batch.cc index 024773609378d..e27ebba83c84e 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch.cc @@ -35,7 +35,7 @@ void TracingMessageBatch::SaveMessage(pubsub::Message m) { opentelemetry::context::RuntimeContext::GetCurrent()); active_span->AddEvent("gl-cpp.added_to_batch"); { - std::lock_guard lk(message_mu_); + std::lock_guard lk(mu_); message_spans_.push_back(std::move(active_span)); } child_->SaveMessage(std::move(m)); @@ -129,7 +129,7 @@ Spans MakeBatchSinkSpans(Spans message_spans) { std::function)> TracingMessageBatch::Flush() { decltype(message_spans_) message_spans; { - std::lock_guard lk(message_mu_); + std::lock_guard lk(mu_); message_spans.swap(message_spans_); } @@ -141,9 +141,9 @@ std::function)> TracingMessageBatch::Flush() { ->WithActiveSpan(batch_sink_spans.front()); return [next = child_->Flush(), - spans = std::move(batch_sink_spans)](auto) mutable { + spans = std::move(batch_sink_spans)](auto f) mutable { for (auto& span : spans) internal::EndSpan(*span); - next(make_ready_future()); + next(std::move(f)); }; } diff --git a/google/cloud/pubsub/internal/tracing_message_batch.h b/google/cloud/pubsub/internal/tracing_message_batch.h index bf35e12119b3e..9a5fdeb0f2d55 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch.h +++ b/google/cloud/pubsub/internal/tracing_message_batch.h @@ -22,6 +22,7 @@ #include "google/cloud/pubsub/version.h" #include "google/cloud/future.h" #include "google/cloud/internal/opentelemetry.h" +#include #include namespace google { @@ -58,9 +59,9 @@ class TracingMessageBatch : public MessageBatch { private: std::unique_ptr child_; - std::mutex message_mu_; + std::mutex mu_; std::vector> - message_spans_; // ABSL_GUARDED_BY(message_mu_) + message_spans_; // ABSL_GUARDED_BY(mu_) }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index d59c76c8c903f..f4f29af8eca06 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -86,7 +86,7 @@ TEST(TracingMessageBatch, SaveMultipleMessages) { auto mock = std::make_unique(); EXPECT_CALL(*mock, SaveMessage).Times(2); auto message_batch = std::make_unique(std::move(mock)); - auto message = pubsub ::MessageBuilder().SetData("test").Build(); + auto message = pubsub::MessageBuilder().SetData("test").Build(); // Save the first span. auto span1 = MakeSpan("test span"); diff --git a/google/cloud/pubsub/testing/mock_message_batch.h b/google/cloud/pubsub/testing/mock_message_batch.h index f96582d7d0465..eb55ff36d4cd5 100644 --- a/google/cloud/pubsub/testing/mock_message_batch.h +++ b/google/cloud/pubsub/testing/mock_message_batch.h @@ -17,9 +17,10 @@ #include "google/cloud/pubsub/internal/message_batch.h" #include "google/cloud/pubsub/message.h" -#include "google/cloud/future_void.h" +#include "google/cloud/future.h" #include "google/cloud/version.h" #include +#include namespace google { namespace cloud { From 7d7032497206a4caf0e7cfa2df1c323ca291edcb Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 10:03:25 -0400 Subject: [PATCH 20/22] clang --- google/cloud/pubsub/internal/tracing_message_batch_test.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/google/cloud/pubsub/internal/tracing_message_batch_test.cc b/google/cloud/pubsub/internal/tracing_message_batch_test.cc index f4f29af8eca06..3aa6fe422b06a 100644 --- a/google/cloud/pubsub/internal/tracing_message_batch_test.cc +++ b/google/cloud/pubsub/internal/tracing_message_batch_test.cc @@ -39,14 +39,12 @@ using ::google::cloud::testing_util::SpanKindIsClient; using ::google::cloud::testing_util::SpanLinkAttributesAre; using ::google::cloud::testing_util::SpanLinksSizeIs; using ::google::cloud::testing_util::SpanNamed; -using ::google::cloud::testing_util::SpanWithStatus; using ::google::cloud::testing_util::ThereIsAnActiveSpan; using ::testing::_; using ::testing::AllOf; using ::testing::Contains; using ::testing::ElementsAre; using ::testing::IsEmpty; -using ::testing::SizeIs; namespace { From a7c333be3fa2cb87e84b89ee55d6e24cb3522082 Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 11:38:13 -0400 Subject: [PATCH 21/22] add mocks --- .../batching_publisher_connection_test.cc | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc index 9df00eb5fbb0f..81efdc701db05 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc @@ -102,6 +102,9 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); AsyncSequencer async; @@ -171,6 +174,9 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -275,6 +281,9 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -345,6 +354,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillOnce([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -429,6 +439,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { TEST(BatchingPublisherConnectionTest, BatchTorture) { auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillOnce([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kMaxMessages = 20; @@ -551,6 +562,9 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -630,6 +644,9 @@ TEST(BatchingPublisherConnectionTest, HandleError) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -660,6 +677,9 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -688,6 +708,9 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -738,6 +761,9 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { + return [](auto) {}; + }); pubsub::Topic const topic("test-project", "test-topic"); auto const ordering_key = std::string{"test-key"}; From 1fde8d7e0674cc2ebc37277ec7cb4ef7ea172cad Mon Sep 17 00:00:00 2001 From: Anna Levenberg Date: Wed, 25 Oct 2023 12:57:48 -0400 Subject: [PATCH 22/22] fix --- .../batching_publisher_connection_test.cc | 39 +++++++------------ 1 file changed, 13 insertions(+), 26 deletions(-) diff --git a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc index 81efdc701db05..eee5ec587da83 100644 --- a/google/cloud/pubsub/internal/batching_publisher_connection_test.cc +++ b/google/cloud/pubsub/internal/batching_publisher_connection_test.cc @@ -66,6 +66,7 @@ TEST(BatchingPublisherConnectionTest, FastDestructor) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); AsyncSequencer async; @@ -102,9 +103,7 @@ TEST(BatchingPublisherConnectionTest, DefaultMakesProgress) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); AsyncSequencer async; @@ -174,9 +173,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageCount) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -231,6 +228,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -281,9 +279,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSize) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -354,7 +350,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeLargeMessageBreak) { TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillOnce([] { return [](auto) {}; }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kSinglePayload = 128; @@ -439,7 +435,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMessageSizeOversizedSingleton) { TEST(BatchingPublisherConnectionTest, BatchTorture) { auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillOnce([] { return [](auto) {}; }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto constexpr kMaxMessages = 20; @@ -506,6 +502,7 @@ TEST(BatchingPublisherConnectionTest, BatchByMaximumHoldTime) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -562,9 +559,7 @@ TEST(BatchingPublisherConnectionTest, BatchByFlush) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -644,9 +639,7 @@ TEST(BatchingPublisherConnectionTest, HandleError) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -677,9 +670,7 @@ TEST(BatchingPublisherConnectionTest, HandleInvalidResponse) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); EXPECT_CALL(*mock, AsyncPublish) @@ -708,9 +699,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingPartialBatch) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const error_status = Status(StatusCode::kPermissionDenied, "uh-oh"); @@ -761,9 +750,7 @@ TEST(BatchingPublisherConnectionTest, HandleErrorWithOrderingResume) { auto mock = std::make_shared(); auto mock_batch = std::make_shared>(); - EXPECT_CALL(*mock_batch, Flush).Times(1).WillRepeatedly([] { - return [](auto) {}; - }); + EXPECT_CALL(*mock_batch, Flush).WillRepeatedly([] { return [](auto) {}; }); pubsub::Topic const topic("test-project", "test-topic"); auto const ordering_key = std::string{"test-key"};