diff --git a/google/cloud/internal/async_read_write_stream_impl.h b/google/cloud/internal/async_read_write_stream_impl.h index b64c4e9702550..6f65b0cf24d0d 100644 --- a/google/cloud/internal/async_read_write_stream_impl.h +++ b/google/cloud/internal/async_read_write_stream_impl.h @@ -26,6 +26,7 @@ #include "absl/types/optional.h" #include #include +#include namespace google { namespace cloud { @@ -46,35 +47,47 @@ class AsyncStreamingReadWriteRpcImpl AsyncStreamingReadWriteRpcImpl( std::shared_ptr cq, std::shared_ptr context, + std::unique_ptr> + stream) + : AsyncStreamingReadWriteRpcImpl(std::move(cq), std::move(context), + SaveCurrentOptions(), + std::move(stream)) {} + + AsyncStreamingReadWriteRpcImpl( + std::shared_ptr cq, + std::shared_ptr context, ImmutableOptions options, std::unique_ptr> stream) : cq_(std::move(cq)), context_(std::move(context)), + options_(std::move(options)), stream_(std::move(stream)) {} void Cancel() override { context_->TryCancel(); } future Start() override { struct OnStart : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnStart(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); }); return op->p.get_future(); } future> Read() override { struct OnRead : public AsyncGrpcOperation { - promise> p; - Response response; - CallContext call_context; + explicit OnRead(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); if (!ok) { @@ -85,8 +98,12 @@ class AsyncStreamingReadWriteRpcImpl return true; } void Cancel() override {} + + promise> p; + Response response; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Read(&op->response, tag); }); return op->p.get_future(); @@ -95,16 +112,19 @@ class AsyncStreamingReadWriteRpcImpl future Write(Request const& request, grpc::WriteOptions options) override { struct OnWrite : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnWrite(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Write(request, std::move(options), tag); }); @@ -113,33 +133,39 @@ class AsyncStreamingReadWriteRpcImpl future WritesDone() override { struct OnWritesDone : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnWritesDone(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->WritesDone(tag); }); return op->p.get_future(); } future Finish() override { struct OnFinish : public AsyncGrpcOperation { - promise p; - CallContext call_context; - grpc::Status status; + explicit OnFinish(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool /*ok*/) override { ScopedCallContext scope(call_context); p.set_value(MakeStatusFromRpcError(std::move(status))); return true; } void Cancel() override {} + + promise p; + CallContext call_context; + grpc::Status status; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Finish(&op->status, tag); }); return op->p.get_future(); @@ -152,6 +178,7 @@ class AsyncStreamingReadWriteRpcImpl private: std::shared_ptr cq_; std::shared_ptr context_; + ImmutableOptions options_; std::unique_ptr> stream_; }; diff --git a/google/cloud/internal/async_read_write_stream_impl_test.cc b/google/cloud/internal/async_read_write_stream_impl_test.cc index 382da4fe61611..d02481f72ea36 100644 --- a/google/cloud/internal/async_read_write_stream_impl_test.cc +++ b/google/cloud/internal/async_read_write_stream_impl_test.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/internal/async_read_write_stream_impl.h" +#include "google/cloud/common_options.h" #include "google/cloud/completion_queue.h" #include "google/cloud/future.h" #include "google/cloud/internal/opentelemetry.h" @@ -116,24 +117,39 @@ TEST(AsyncReadWriteStreamingRpcTest, Basic) { }); google::cloud::CompletionQueue cq(mock_cq); + OptionsSpan create_span(Options{}.set("create")); auto stream = MakeStreamingReadWriteRpc( cq, std::make_shared(), [&mock](grpc::ClientContext* context, grpc::CompletionQueue* cq) { return mock.FakeRpc(context, cq); }); - auto start = stream->Start(); + OptionsSpan start_span(Options{}.set("start")); + auto start = stream->Start().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); EXPECT_TRUE(start.get()); - auto write = stream->Write(FakeRequest{"key0"}, - grpc::WriteOptions().set_last_message()); + OptionsSpan write_span(Options{}.set("write")); + auto write = + stream + ->Write(FakeRequest{"key0"}, grpc::WriteOptions().set_last_message()) + .then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); EXPECT_TRUE(write.get()); - auto read0 = stream->Read(); + OptionsSpan read0_span(Options{}.set("read0")); + auto read0 = stream->Read().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); auto response0 = read0.get(); @@ -141,7 +157,11 @@ TEST(AsyncReadWriteStreamingRpcTest, Basic) { EXPECT_EQ("key0", response0->key); EXPECT_EQ("value0_0", response0->value); - auto read1 = stream->Read(); + OptionsSpan read1_span(Options{}.set("read1")); + auto read1 = stream->Read().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); auto response1 = read1.get(); @@ -149,18 +169,30 @@ TEST(AsyncReadWriteStreamingRpcTest, Basic) { EXPECT_EQ("key0", response1->key); EXPECT_EQ("value0_1", response1->value); - auto writes_done = stream->WritesDone(); + OptionsSpan writes_done_span(Options{}.set("writes_done")); + auto writes_done = stream->WritesDone().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); EXPECT_TRUE(writes_done.get()); - auto read2 = stream->Read(); + OptionsSpan read2_span(Options{}.set("read2")); + auto read2 = stream->Read().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(false); auto response2 = read2.get(); EXPECT_FALSE(response2.has_value()); - auto finish = stream->Finish(); + OptionsSpan finish_span(Options{}.set("finish")); + auto finish = stream->Finish().then([](auto f) { + EXPECT_EQ(CurrentOptions().get(), "create"); + return f.get(); + }); ASSERT_THAT(operations, SizeIs(1)); notify_next_op(); EXPECT_THAT(finish.get(), IsOk()); diff --git a/google/cloud/internal/async_streaming_read_rpc_impl.h b/google/cloud/internal/async_streaming_read_rpc_impl.h index 66c1474dc8e8a..343d1a65cfa76 100644 --- a/google/cloud/internal/async_streaming_read_rpc_impl.h +++ b/google/cloud/internal/async_streaming_read_rpc_impl.h @@ -21,11 +21,13 @@ #include "google/cloud/internal/call_context.h" #include "google/cloud/internal/completion_queue_impl.h" #include "google/cloud/internal/grpc_request_metadata.h" +#include "google/cloud/options.h" #include "google/cloud/version.h" #include "absl/functional/function_ref.h" #include "absl/types/optional.h" #include #include +#include namespace google { namespace cloud { @@ -44,35 +46,45 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc { public: AsyncStreamingReadRpcImpl( std::shared_ptr cq, - std::shared_ptr context, + std::shared_ptr context, ImmutableOptions options, std::unique_ptr> stream) : cq_(std::move(cq)), context_(std::move(context)), + options_(std::move(options)), stream_(std::move(stream)) {} + AsyncStreamingReadRpcImpl( + std::shared_ptr cq, + std::shared_ptr context, + std::unique_ptr> stream) + : AsyncStreamingReadRpcImpl(std::move(cq), std::move(context), + SaveCurrentOptions(), std::move(stream)) {} + void Cancel() override { context_->TryCancel(); } future Start() override { struct OnStart : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnStart(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); }); return op->p.get_future(); } future> Read() override { struct OnRead : public AsyncGrpcOperation { - promise> p; - Response response; - CallContext call_context; + explicit OnRead(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); if (!ok) { @@ -83,8 +95,12 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc { return true; } void Cancel() override {} + + promise> p; + Response response; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Read(&op->response, tag); }); return op->p.get_future(); @@ -92,17 +108,20 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc { future Finish() override { struct OnFinish : public AsyncGrpcOperation { - promise p; - CallContext call_context; - grpc::Status status; + explicit OnFinish(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool /*ok*/) override { ScopedCallContext scope(call_context); p.set_value(MakeStatusFromRpcError(std::move(status))); return true; } void Cancel() override {} + + promise p; + CallContext call_context; + grpc::Status status; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Finish(&op->status, tag); }); return op->p.get_future(); @@ -115,6 +134,7 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc { private: std::shared_ptr cq_; std::shared_ptr context_; + ImmutableOptions options_; std::unique_ptr> stream_; }; diff --git a/google/cloud/internal/async_streaming_read_rpc_impl_test.cc b/google/cloud/internal/async_streaming_read_rpc_impl_test.cc index 271d8de395d66..f4071fcb23ec1 100644 --- a/google/cloud/internal/async_streaming_read_rpc_impl_test.cc +++ b/google/cloud/internal/async_streaming_read_rpc_impl_test.cc @@ -123,7 +123,7 @@ TEST(AsyncStreamingReadRpcTest, Basic) { OptionsSpan start_span(user_project("start")); auto start = stream->Start().then([](future f) { - EXPECT_EQ(CurrentOptions().get(), "start"); + EXPECT_EQ(CurrentOptions().get(), "create"); return f.get(); }); ASSERT_THAT(operations, SizeIs(1)); @@ -132,7 +132,7 @@ TEST(AsyncStreamingReadRpcTest, Basic) { EXPECT_TRUE(start.get()); OptionsSpan read0_span(user_project("read0")); - auto read0 = stream->Read().then(check_read_span("read0")); + auto read0 = stream->Read().then(check_read_span("create")); ASSERT_THAT(operations, SizeIs(1)); OptionsSpan read0_clear(Options{}); notify_next_op(); @@ -142,7 +142,7 @@ TEST(AsyncStreamingReadRpcTest, Basic) { EXPECT_EQ("value0_0", response0->value); OptionsSpan read1_span(user_project("read1")); - auto read1 = stream->Read().then(check_read_span("read1")); + auto read1 = stream->Read().then(check_read_span("create")); ASSERT_THAT(operations, SizeIs(1)); OptionsSpan read1_clear(Options{}); notify_next_op(false); @@ -151,7 +151,7 @@ TEST(AsyncStreamingReadRpcTest, Basic) { OptionsSpan finish_span(user_project("finish")); auto finish = stream->Finish().then([](future f) { - EXPECT_EQ(CurrentOptions().get(), "finish"); + EXPECT_EQ(CurrentOptions().get(), "create"); return f.get(); }); ASSERT_THAT(operations, SizeIs(1)); diff --git a/google/cloud/internal/async_streaming_write_rpc_impl.h b/google/cloud/internal/async_streaming_write_rpc_impl.h index 95bfc57fce427..8078a8cbf7cd5 100644 --- a/google/cloud/internal/async_streaming_write_rpc_impl.h +++ b/google/cloud/internal/async_streaming_write_rpc_impl.h @@ -26,6 +26,7 @@ #include "absl/types/optional.h" #include #include +#include namespace google { namespace cloud { @@ -48,25 +49,39 @@ class AsyncStreamingWriteRpcImpl std::shared_ptr context, std::unique_ptr response, std::unique_ptr> stream) + : AsyncStreamingWriteRpcImpl(std::move(cq), std::move(context), + std::move(response), SaveCurrentOptions(), + std::move(stream)) {} + + AsyncStreamingWriteRpcImpl( + std::shared_ptr cq, + std::shared_ptr context, + std::unique_ptr response, + google::cloud::internal::ImmutableOptions options, + std::unique_ptr> stream) : cq_(std::move(cq)), context_(std::move(context)), response_(std::move(response)), + options_(std::move(options)), stream_(std::move(stream)) {} void Cancel() override { context_->TryCancel(); } future Start() override { struct OnStart : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnStart(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); }); return op->p.get_future(); } @@ -74,16 +89,19 @@ class AsyncStreamingWriteRpcImpl future Write(Request const& request, grpc::WriteOptions write_options) override { struct OnWrite : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnWrite(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->Write(request, std::move(write_options), tag); }); @@ -92,26 +110,28 @@ class AsyncStreamingWriteRpcImpl future WritesDone() override { struct OnWritesDone : public AsyncGrpcOperation { - promise p; - CallContext call_context; + explicit OnWritesDone(ImmutableOptions o) : call_context(std::move(o)) {} + bool Notify(bool ok) override { ScopedCallContext scope(call_context); p.set_value(ok); return true; } void Cancel() override {} + + promise p; + CallContext call_context; }; - auto op = std::make_shared(); + auto op = std::make_shared(options_); cq_->StartOperation(op, [&](void* tag) { stream_->WritesDone(tag); }); return op->p.get_future(); } future> Finish() override { struct OnFinish : public AsyncGrpcOperation { - std::unique_ptr response; - promise> p; - CallContext call_context; - grpc::Status status; + OnFinish(std::unique_ptr r, ImmutableOptions o) + : response(std::move(r)), call_context(std::move(o)) {} + bool Notify(bool /*ok*/) override { ScopedCallContext scope(call_context); if (status.ok()) { @@ -122,9 +142,13 @@ class AsyncStreamingWriteRpcImpl return true; } void Cancel() override {} + + std::unique_ptr response; + promise> p; + CallContext call_context; + grpc::Status status; }; - auto op = std::make_shared(); - op->response = std::move(response_); + auto op = std::make_shared(std::move(response_), options_); cq_->StartOperation(op, [&](void* tag) { stream_->Finish(&op->status, tag); }); return op->p.get_future(); @@ -138,6 +162,7 @@ class AsyncStreamingWriteRpcImpl std::shared_ptr cq_; std::shared_ptr context_; std::unique_ptr response_; + ImmutableOptions options_; std::unique_ptr> stream_; }; diff --git a/google/cloud/internal/async_streaming_write_rpc_impl_test.cc b/google/cloud/internal/async_streaming_write_rpc_impl_test.cc index 36bc795561eb3..687928f6db920 100644 --- a/google/cloud/internal/async_streaming_write_rpc_impl_test.cc +++ b/google/cloud/internal/async_streaming_write_rpc_impl_test.cc @@ -127,7 +127,7 @@ TEST(AsyncStreamingWriteRpcTest, Basic) { OptionsSpan start_span(user_project("start")); auto start = stream->Start().then([](future f) { - EXPECT_EQ(CurrentOptions().get(), "start"); + EXPECT_EQ(CurrentOptions().get(), "create"); return f.get(); }); ASSERT_THAT(operations, SizeIs(1)); @@ -137,7 +137,7 @@ TEST(AsyncStreamingWriteRpcTest, Basic) { OptionsSpan write0_span(user_project("write0")); auto write0 = stream->Write(FakeRequest{}, grpc::WriteOptions()) - .then(check_write_span("write0")); + .then(check_write_span("create")); ASSERT_THAT(operations, SizeIs(1)); OptionsSpan write0_clear(Options{}); notify_next_op(); @@ -145,14 +145,14 @@ TEST(AsyncStreamingWriteRpcTest, Basic) { OptionsSpan write1_span(user_project("write1")); auto write1 = stream->Write(FakeRequest{}, grpc::WriteOptions()) - .then(check_write_span("write1")); + .then(check_write_span("create")); ASSERT_THAT(operations, SizeIs(1)); OptionsSpan write1_clear(Options{}); notify_next_op(false); EXPECT_FALSE(write1.get()); OptionsSpan writes_done_span(user_project("writes_done")); - auto writes_done = stream->WritesDone().then(check_write_span("writes_done")); + auto writes_done = stream->WritesDone().then(check_write_span("create")); ASSERT_THAT(operations, SizeIs(1)); OptionsSpan writes_done_clear(Options{}); notify_next_op(false); @@ -160,7 +160,7 @@ TEST(AsyncStreamingWriteRpcTest, Basic) { OptionsSpan finish_span(user_project("finish")); auto finish = stream->Finish().then([](future> f) { - EXPECT_EQ(CurrentOptions().get(), "finish"); + EXPECT_EQ(CurrentOptions().get(), "create"); return f.get(); }); ASSERT_THAT(operations, SizeIs(1));