Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl(pubsub): add more than 128 links #12928

Merged
merged 22 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
struct Batch {
std::vector<promise<StatusOr<std::string>>> waiters;
std::weak_ptr<BatchingPublisherConnection> weak;
std::shared_ptr<MessageBatch> batch;

void operator()(future<StatusOr<google::pubsub::v1::PublishResponse>> f) {
auto response = f.get();
Expand All @@ -42,7 +41,6 @@ struct Batch {
for (auto& w : waiters) {
w.set_value(std::move(*response->mutable_message_ids(idx++)));
}
batch->FlushCallback();
}

void SatisfyAllWaiters(Status const& status) {
Expand Down Expand Up @@ -200,9 +198,8 @@ void BatchingPublisherConnection::FlushImpl(std::unique_lock<std::mutex> lk) {
batch.weak = shared_from_this();
request.set_topic(topic_full_name_);

batch_->Flush();
batch.batch = batch_;
sink_->AsyncPublish(std::move(request)).then(std::move(batch));
auto handler = batch_->Flush();
sink_->AsyncPublish(std::move(request)).then(std::move(batch)).then(std::move(handler));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line makes all your suffering through this long review worth it 😂

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is much cleaner than how it started. I hope it makes your suffering through the review worth it as well 🥳

}

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
10 changes: 3 additions & 7 deletions google/cloud/pubsub/internal/message_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,9 @@ class MessageBatch {
virtual void SaveMessage(pubsub::Message m) = 0;

// Captures information about a batch of messages before it's flushed. Invoked
// in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to run
// after FlushCallback().
virtual future<void> Flush() = 0;

// Captures information about the response after we receive it from
// the server. Invoked in the `BatchSink::AsyncPublish(...)` callback.
virtual void FlushCallback() = 0;
// in `BatchingPublisherConnection::FlushImpl(...)`. Returns a task to invoke
// in another callback.
virtual std::function<void(future<void>)> Flush() = 0;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
23 changes: 3 additions & 20 deletions google/cloud/pubsub/internal/noop_message_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_INTERNAL_NOOP_MESSAGE_BATCH_H

#include "google/cloud/pubsub/internal/message_batch.h"
#include "google/cloud/future_void.h"
#include "google/cloud/future.h"

namespace google {
namespace cloud {
Expand All @@ -34,26 +34,9 @@ class NoOpMessageBatch : public MessageBatch {

void SaveMessage(pubsub::Message) override{};

future<void> Flush() override {
future<void> result;
{
std::lock_guard<std::mutex> lk(mu_);
waiter_.get_future();
}
return result;
};

void FlushCallback() override {
promise<void> waiter;
{
std::lock_guard<std::mutex> lk(mu_);
waiter.swap(waiter_);
}
waiter.set_value();
std::function<void(future<void>)> Flush() override {
return [](auto){};
}

std::mutex mu_;
promise<void> waiter_; // ABSL_GUARDED_BY(mu_)
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
15 changes: 6 additions & 9 deletions google/cloud/pubsub/internal/tracing_message_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "google/cloud/pubsub/internal/tracing_message_batch.h"
#include "google/cloud/pubsub/internal/message_batch.h"
#include "google/cloud/pubsub/version.h"
#include "google/cloud/future_void.h"
#include "google/cloud/internal/opentelemetry.h"
#include "opentelemetry/context/runtime_context.h"
#include "opentelemetry/trace/context.h"
Expand Down Expand Up @@ -127,7 +126,7 @@ Spans MakeBatchSinkSpans(Spans message_spans) {

} // namespace

future<void> TracingMessageBatch::Flush() {
std::function<void(future<void>)> TracingMessageBatch::Flush() {
decltype(message_spans_) message_spans;
{
std::lock_guard<std::mutex> lk(message_mu_);
Expand All @@ -141,15 +140,13 @@ future<void> TracingMessageBatch::Flush() {
auto async_scope = internal::GetTracer(internal::CurrentOptions())
->WithActiveSpan(batch_sink_spans.front());

return child_->Flush().then([spans = std::move(batch_sink_spans)](auto f) {
for (auto& span : spans) {
internal::EndSpan(*span);
}
});
return [next = child_->Flush(),
spans = std::move(batch_sink_spans)](auto) mutable {
for (auto& span : spans) internal::EndSpan(*span);
next(make_ready_future());
alevenberg marked this conversation as resolved.
Show resolved Hide resolved
};
}

void TracingMessageBatch::FlushCallback() { child_->FlushCallback(); }

std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>>
TracingMessageBatch::GetMessageSpans() const {
return message_spans_;
Expand Down
7 changes: 3 additions & 4 deletions google/cloud/pubsub/internal/tracing_message_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "google/cloud/pubsub/internal/publisher_stub.h"
#include "google/cloud/pubsub/version.h"
#include "google/cloud/internal/opentelemetry.h"
#include "google/cloud/future.h"
#include <memory>

namespace google {
Expand Down Expand Up @@ -48,10 +49,8 @@ class TracingMessageBatch : public MessageBatch {
~TracingMessageBatch() override = default;

void SaveMessage(pubsub::Message m) override;

future<void> Flush() override;

void FlushCallback() override;

std::function<void(future<void>)> Flush() override;

// For testing only.
std::vector<opentelemetry::nostd::shared_ptr<opentelemetry::trace::Span>>
Expand Down
43 changes: 21 additions & 22 deletions google/cloud/pubsub/internal/tracing_message_batch_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,14 @@ TEST(TracingMessageBatch, Flush) {
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future();
return [](auto) {};
});
auto initial_spans = {message_span};
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty());

Expand All @@ -158,13 +159,14 @@ TEST(TracingMessageBatch, FlushSmallBatch) {
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future();
return [](auto) {};
});
auto initial_spans = {message_span1, message_span2};
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty());

Expand All @@ -191,12 +193,13 @@ TEST(TracingMessageBatch, FlushBatchWithOtelLimit) {
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future();
return [](auto) {};
});
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty());

Expand All @@ -216,12 +219,13 @@ TEST(TracingMessageBatch, FlushLargeBatch) {
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] {
EXPECT_TRUE(ThereIsAnActiveSpan());
return make_ready_future();
return [](auto) {};
});
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

EXPECT_THAT(message_batch->GetMessageSpans(), IsEmpty());

Expand All @@ -244,12 +248,13 @@ TEST(TracingMessageBatch, FlushAddsSpanIdAndTraceIdAttribute) {
auto span_catcher = InstallSpanCatcher();
auto message_span = MakeSpan("test message span");
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); });
EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; });
auto initial_spans = {message_span};
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

// The span must end before it can be processed by the span catcher.
EndSpans(initial_spans);
Expand All @@ -268,12 +273,13 @@ TEST(TracingMessageBatch, FlushSpanMetadataAddsEvent) {
auto span_catcher = InstallSpanCatcher();
auto message_span = MakeSpan("test message span");
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); });
EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; });
auto initial_spans = {message_span};
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

// The span must end before it can be processed by the span catcher.
EndSpans(initial_spans);
Expand All @@ -290,12 +296,13 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) {
auto span1 = MakeSpan("test message span1");
auto span2 = MakeSpan("test message span2");
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, Flush).WillOnce([] { return make_ready_future(); });
EXPECT_CALL(*mock, Flush).WillOnce([] { return [](auto) {}; });
auto initial_spans = {span1, span2};
auto message_batch =
std::make_unique<TracingMessageBatch>(std::move(mock), initial_spans);

message_batch->Flush();
auto end_spans = message_batch->Flush();
end_spans(make_ready_future());

// The span must end before it can be processed by the span catcher.
EndSpans(initial_spans);
Expand All @@ -309,14 +316,6 @@ TEST(TracingMessageBatch, FlushAddsEventForMultipleMessages) {
SpanHasEvents(EventNamed("gl-cpp.batch_flushed")))));
}

TEST(TracingMessageBatch, FlushCallback) {
auto mock = std::make_unique<pubsub_testing::MockMessageBatch>();
EXPECT_CALL(*mock, FlushCallback);
auto message_batch = std::make_unique<TracingMessageBatch>(std::move(mock));

message_batch->FlushCallback();
}

// TODO(#12528): Add an end to end test.

} // namespace
Expand Down
3 changes: 1 addition & 2 deletions google/cloud/pubsub/testing/mock_message_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ class MockMessageBatch : public pubsub_internal::MessageBatch {
~MockMessageBatch() override = default;

MOCK_METHOD(void, SaveMessage, (pubsub::Message), (override));
MOCK_METHOD(future<void>, Flush, (), (override));
MOCK_METHOD(void, FlushCallback, (), (override));
MOCK_METHOD(std::function<void(future<void>)>, Flush, (), (override));
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down