Skip to content

Commit

Permalink
fix(common): streaming RPCs save initial options (#13648)
Browse files Browse the repository at this point in the history
The streaming RPC wrappers should save the `Options` in effect when the
RPC is created, and then set the `OptionsSpan` to reflect the initial
values.
  • Loading branch information
coryan authored Feb 23, 2024
1 parent ed1212b commit 6d4123a
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 61 deletions.
61 changes: 44 additions & 17 deletions google/cloud/internal/async_read_write_stream_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "absl/types/optional.h"
#include <grpcpp/support/async_stream.h>
#include <memory>
#include <utility>

namespace google {
namespace cloud {
Expand All @@ -46,35 +47,47 @@ class AsyncStreamingReadWriteRpcImpl
AsyncStreamingReadWriteRpcImpl(
std::shared_ptr<CompletionQueueImpl> cq,
std::shared_ptr<grpc::ClientContext> context,
std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
stream)
: AsyncStreamingReadWriteRpcImpl(std::move(cq), std::move(context),
SaveCurrentOptions(),
std::move(stream)) {}

AsyncStreamingReadWriteRpcImpl(
std::shared_ptr<CompletionQueueImpl> cq,
std::shared_ptr<grpc::ClientContext> context, ImmutableOptions options,
std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
stream)
: cq_(std::move(cq)),
context_(std::move(context)),
options_(std::move(options)),
stream_(std::move(stream)) {}

void Cancel() override { context_->TryCancel(); }

future<bool> Start() override {
struct OnStart : public AsyncGrpcOperation {
promise<bool> p;
CallContext call_context;
explicit OnStart(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
p.set_value(ok);
return true;
}
void Cancel() override {}

promise<bool> p;
CallContext call_context;
};
auto op = std::make_shared<OnStart>();
auto op = std::make_shared<OnStart>(options_);
cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); });
return op->p.get_future();
}

future<absl::optional<Response>> Read() override {
struct OnRead : public AsyncGrpcOperation {
promise<absl::optional<Response>> p;
Response response;
CallContext call_context;
explicit OnRead(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
if (!ok) {
Expand All @@ -85,8 +98,12 @@ class AsyncStreamingReadWriteRpcImpl
return true;
}
void Cancel() override {}

promise<absl::optional<Response>> p;
Response response;
CallContext call_context;
};
auto op = std::make_shared<OnRead>();
auto op = std::make_shared<OnRead>(options_);
cq_->StartOperation(op,
[&](void* tag) { stream_->Read(&op->response, tag); });
return op->p.get_future();
Expand All @@ -95,16 +112,19 @@ class AsyncStreamingReadWriteRpcImpl
future<bool> Write(Request const& request,
grpc::WriteOptions options) override {
struct OnWrite : public AsyncGrpcOperation {
promise<bool> p;
CallContext call_context;
explicit OnWrite(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
p.set_value(ok);
return true;
}
void Cancel() override {}

promise<bool> p;
CallContext call_context;
};
auto op = std::make_shared<OnWrite>();
auto op = std::make_shared<OnWrite>(options_);
cq_->StartOperation(op, [&](void* tag) {
stream_->Write(request, std::move(options), tag);
});
Expand All @@ -113,33 +133,39 @@ class AsyncStreamingReadWriteRpcImpl

future<bool> WritesDone() override {
struct OnWritesDone : public AsyncGrpcOperation {
promise<bool> p;
CallContext call_context;
explicit OnWritesDone(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
p.set_value(ok);
return true;
}
void Cancel() override {}

promise<bool> p;
CallContext call_context;
};
auto op = std::make_shared<OnWritesDone>();
auto op = std::make_shared<OnWritesDone>(options_);
cq_->StartOperation(op, [&](void* tag) { stream_->WritesDone(tag); });
return op->p.get_future();
}

future<Status> Finish() override {
struct OnFinish : public AsyncGrpcOperation {
promise<Status> p;
CallContext call_context;
grpc::Status status;
explicit OnFinish(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool /*ok*/) override {
ScopedCallContext scope(call_context);
p.set_value(MakeStatusFromRpcError(std::move(status)));
return true;
}
void Cancel() override {}

promise<Status> p;
CallContext call_context;
grpc::Status status;
};
auto op = std::make_shared<OnFinish>();
auto op = std::make_shared<OnFinish>(options_);
cq_->StartOperation(op,
[&](void* tag) { stream_->Finish(&op->status, tag); });
return op->p.get_future();
Expand All @@ -152,6 +178,7 @@ class AsyncStreamingReadWriteRpcImpl
private:
std::shared_ptr<CompletionQueueImpl> cq_;
std::shared_ptr<grpc::ClientContext> context_;
ImmutableOptions options_;
std::unique_ptr<grpc::ClientAsyncReaderWriterInterface<Request, Response>>
stream_;
};
Expand Down
48 changes: 40 additions & 8 deletions google/cloud/internal/async_read_write_stream_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include "google/cloud/internal/async_read_write_stream_impl.h"
#include "google/cloud/common_options.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/future.h"
#include "google/cloud/internal/opentelemetry.h"
Expand Down Expand Up @@ -116,51 +117,82 @@ TEST(AsyncReadWriteStreamingRpcTest, Basic) {
});

google::cloud::CompletionQueue cq(mock_cq);
OptionsSpan create_span(Options{}.set<UserProjectOption>("create"));
auto stream = MakeStreamingReadWriteRpc<FakeRequest, FakeResponse>(
cq, std::make_shared<grpc::ClientContext>(),
[&mock](grpc::ClientContext* context, grpc::CompletionQueue* cq) {
return mock.FakeRpc(context, cq);
});

auto start = stream->Start();
OptionsSpan start_span(Options{}.set<UserProjectOption>("start"));
auto start = stream->Start().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
EXPECT_TRUE(start.get());

auto write = stream->Write(FakeRequest{"key0"},
grpc::WriteOptions().set_last_message());
OptionsSpan write_span(Options{}.set<UserProjectOption>("write"));
auto write =
stream
->Write(FakeRequest{"key0"}, grpc::WriteOptions().set_last_message())
.then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
EXPECT_TRUE(write.get());

auto read0 = stream->Read();
OptionsSpan read0_span(Options{}.set<UserProjectOption>("read0"));
auto read0 = stream->Read().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
auto response0 = read0.get();
ASSERT_TRUE(response0.has_value());
EXPECT_EQ("key0", response0->key);
EXPECT_EQ("value0_0", response0->value);

auto read1 = stream->Read();
OptionsSpan read1_span(Options{}.set<UserProjectOption>("read1"));
auto read1 = stream->Read().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
auto response1 = read1.get();
ASSERT_TRUE(response1.has_value());
EXPECT_EQ("key0", response1->key);
EXPECT_EQ("value0_1", response1->value);

auto writes_done = stream->WritesDone();
OptionsSpan writes_done_span(Options{}.set<UserProjectOption>("writes_done"));
auto writes_done = stream->WritesDone().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
EXPECT_TRUE(writes_done.get());

auto read2 = stream->Read();
OptionsSpan read2_span(Options{}.set<UserProjectOption>("read2"));
auto read2 = stream->Read().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op(false);
auto response2 = read2.get();
EXPECT_FALSE(response2.has_value());

auto finish = stream->Finish();
OptionsSpan finish_span(Options{}.set<UserProjectOption>("finish"));
auto finish = stream->Finish().then([](auto f) {
EXPECT_EQ(CurrentOptions().get<UserProjectOption>(), "create");
return f.get();
});
ASSERT_THAT(operations, SizeIs(1));
notify_next_op();
EXPECT_THAT(finish.get(), IsOk());
Expand Down
44 changes: 32 additions & 12 deletions google/cloud/internal/async_streaming_read_rpc_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
#include "google/cloud/internal/call_context.h"
#include "google/cloud/internal/completion_queue_impl.h"
#include "google/cloud/internal/grpc_request_metadata.h"
#include "google/cloud/options.h"
#include "google/cloud/version.h"
#include "absl/functional/function_ref.h"
#include "absl/types/optional.h"
#include <grpcpp/support/async_stream.h>
#include <memory>
#include <utility>

namespace google {
namespace cloud {
Expand All @@ -44,35 +46,45 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc<Response> {
public:
AsyncStreamingReadRpcImpl(
std::shared_ptr<CompletionQueueImpl> cq,
std::shared_ptr<grpc::ClientContext> context,
std::shared_ptr<grpc::ClientContext> context, ImmutableOptions options,
std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>> stream)
: cq_(std::move(cq)),
context_(std::move(context)),
options_(std::move(options)),
stream_(std::move(stream)) {}

AsyncStreamingReadRpcImpl(
std::shared_ptr<CompletionQueueImpl> cq,
std::shared_ptr<grpc::ClientContext> context,
std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>> stream)
: AsyncStreamingReadRpcImpl(std::move(cq), std::move(context),
SaveCurrentOptions(), std::move(stream)) {}

void Cancel() override { context_->TryCancel(); }

future<bool> Start() override {
struct OnStart : public AsyncGrpcOperation {
promise<bool> p;
CallContext call_context;
explicit OnStart(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
p.set_value(ok);
return true;
}
void Cancel() override {}

promise<bool> p;
CallContext call_context;
};
auto op = std::make_shared<OnStart>();
auto op = std::make_shared<OnStart>(options_);
cq_->StartOperation(op, [&](void* tag) { stream_->StartCall(tag); });
return op->p.get_future();
}

future<absl::optional<Response>> Read() override {
struct OnRead : public AsyncGrpcOperation {
promise<absl::optional<Response>> p;
Response response;
CallContext call_context;
explicit OnRead(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool ok) override {
ScopedCallContext scope(call_context);
if (!ok) {
Expand All @@ -83,26 +95,33 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc<Response> {
return true;
}
void Cancel() override {}

promise<absl::optional<Response>> p;
Response response;
CallContext call_context;
};
auto op = std::make_shared<OnRead>();
auto op = std::make_shared<OnRead>(options_);
cq_->StartOperation(op,
[&](void* tag) { stream_->Read(&op->response, tag); });
return op->p.get_future();
}

future<Status> Finish() override {
struct OnFinish : public AsyncGrpcOperation {
promise<Status> p;
CallContext call_context;
grpc::Status status;
explicit OnFinish(ImmutableOptions o) : call_context(std::move(o)) {}

bool Notify(bool /*ok*/) override {
ScopedCallContext scope(call_context);
p.set_value(MakeStatusFromRpcError(std::move(status)));
return true;
}
void Cancel() override {}

promise<Status> p;
CallContext call_context;
grpc::Status status;
};
auto op = std::make_shared<OnFinish>();
auto op = std::make_shared<OnFinish>(options_);
cq_->StartOperation(op,
[&](void* tag) { stream_->Finish(&op->status, tag); });
return op->p.get_future();
Expand All @@ -115,6 +134,7 @@ class AsyncStreamingReadRpcImpl : public AsyncStreamingReadRpc<Response> {
private:
std::shared_ptr<CompletionQueueImpl> cq_;
std::shared_ptr<grpc::ClientContext> context_;
ImmutableOptions options_;
std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>> stream_;
};

Expand Down
Loading

0 comments on commit 6d4123a

Please sign in to comment.