Skip to content

Commit

Permalink
fixup: Addressing review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Dec 19, 2023
1 parent adb9f47 commit 04dccdc
Show file tree
Hide file tree
Showing 10 changed files with 901 additions and 1,042 deletions.
27 changes: 14 additions & 13 deletions src/core/ext/xds/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,20 @@ void XdsClient::ChannelState::RetryableCall<T>::OnRetryTimer() {
}
}

//
// XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle
//

XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle::
~AdsReadDelayHandle() {
XdsClient* client = ads_call_state_->xds_client();
MutexLock lock(&client->mu_);
auto call = ads_call_state_->call_.get();
if (call != nullptr) {
call->StartRecvMessage();
}
}

//
// XdsClient::ChannelState::AdsCallState::AdsResponseParser
//
Expand Down Expand Up @@ -2092,17 +2106,4 @@ std::string XdsClient::DumpClientConfigBinary() {
return api_.AssembleClientConfig(resource_type_metadata_map);
}

XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle::
~AdsReadDelayHandle() {
XdsClient* client = ads_call_state_->xds_client();
if (client == nullptr) {
return;
}
MutexLock lock(&client->mu_);
auto call = ads_call_state_->call_.get();
if (call != nullptr) {
call->StartRecvMessage();
}
}

} // namespace grpc_core
13 changes: 0 additions & 13 deletions src/core/ext/xds/xds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,19 +242,6 @@ class XdsClient : public DualRefCounted<XdsClient> {
absl::Status status_;
};

public:
// class ReadDelayHandle : public RefCounted<ReadDelayHandle> {
// public:
// explicit ReadDelayHandle(WeakRefCountedPtr<ChannelState>
// channel_state); ~ReadDelayHandle() override;

// static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }

// private:
// WeakRefCountedPtr<ChannelState> channel_state_;
// };

private:
struct ResourceState {
std::map<ResourceWatcherInterface*, RefCountedPtr<ResourceWatcherInterface>>
watchers;
Expand Down
106 changes: 54 additions & 52 deletions src/core/ext/xds/xds_transport_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,41 +86,46 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
// Start ops on the call.
grpc_call_error call_error;
grpc_op op;
memset(&op, 0, sizeof(op));
grpc_op ops[2];
memset(ops, 0, sizeof(ops));
// Send initial metadata. No callback for this, since we don't really
// care when it finishes.
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.data.send_initial_metadata.count = 0;
op.flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op.reserved = nullptr;
call_error = grpc_call_start_batch_and_execute(call_, &op, 1, nullptr);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Start a batch with recv_initial_metadata.
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv_;
op.flags = 0;
op.reserved = nullptr;
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_, OnRecvInitialMetadata, this,
nullptr);
call_error = grpc_call_start_batch_and_execute(call_, &op, 1,
&on_recv_initial_metadata_);
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Ref will be released in the callback
GRPC_CLOSURE_INIT(
&on_recv_initial_metadata_, OnRecvInitialMetadata,
this->Ref(DEBUG_LOCATION, "OnRecvInitialMetadata").release(), nullptr);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_recv_initial_metadata_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Start a batch for recv_trailing_metadata.
op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op.data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op.data.recv_status_on_client.status = &status_code_;
op.data.recv_status_on_client.status_details = &status_details_;
op.flags = 0;
op.reserved = nullptr;
memset(ops, 0, sizeof(ops));
op = ops;
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_status_received_);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
}
Expand Down Expand Up @@ -163,6 +168,27 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage(
GPR_ASSERT(GRPC_CALL_OK == call_error);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
StartRecvMessage() {
Ref(DEBUG_LOCATION, "StartRecvMessage").release();
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
GPR_ASSERT(call_ != nullptr);
// Reuses the "OnResponseReceived" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) {
auto self = static_cast<GrpcStreamingCall*>(arg);
grpc_metadata_array_destroy(&self->initial_metadata_recv_);
self->Unref(DEBUG_LOCATION, "OnRecvInitialMetadata");
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRequestSent(void* arg, grpc_error_handle error) {
auto* self = static_cast<GrpcStreamingCall*>(arg);
Expand All @@ -175,19 +201,9 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
self->Unref(DEBUG_LOCATION, "OnRequestSent");
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) {
auto self = static_cast<GrpcStreamingCall*>(arg);
grpc_metadata_array_destroy(&self->initial_metadata_recv_);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
auto self = static_cast<GrpcStreamingCall*>(arg);
// Create a ref that will leave till the end of this function and reduce the
// ref count to account for the ref obtained in StartRecvMessage
auto ref = self->Ref();
self->Unref(DEBUG_LOCATION, "OnResponseReceived");
auto self(static_cast<GrpcStreamingCall*>(arg));
// If there was no payload, then we received status before we received
// another message, so we stop reading.
if (self->recv_message_payload_ == nullptr) {
Expand All @@ -204,20 +220,6 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
CSliceUnref(response_slice);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
StartRecvMessage() {
Ref(DEBUG_LOCATION, "StartRecvMessage").release();
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
op.data.recv_message.recv_message = &recv_message_payload_;
GPR_ASSERT(call_ != nullptr);
// Reuses the "OnResponseReceived" ref taken in ctor.
const grpc_call_error call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnStatusReceived(void* arg, grpc_error_handle /*error*/) {
auto* self = static_cast<GrpcStreamingCall*>(arg);
Expand Down
33 changes: 0 additions & 33 deletions test/core/xds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -163,19 +163,6 @@ grpc_cc_library(
],
)

grpc_cc_library(
name = "xds_client_test_lib",
testonly = True,
hdrs = ["xds_client_test_lib.h"],
external_deps = ["gtest"],
language = "C++",
deps = [
":xds_transport_fake",
"//:xds_client",
"//src/proto/grpc/testing/xds/v3:discovery_proto",
],
)

grpc_cc_test(
name = "xds_client_test",
srcs = ["xds_client_test.cc"],
Expand All @@ -185,7 +172,6 @@ grpc_cc_test(
uses_event_engine = True,
uses_polling = False,
deps = [
":xds_client_test_lib",
":xds_transport_fake",
"//:xds_client",
"//src/proto/grpc/testing/xds/v3:discovery_proto",
Expand All @@ -194,25 +180,6 @@ grpc_cc_test(
],
)

grpc_cc_test(
name = "xds_client_ads_stream_wait_test",
srcs = ["xds_client_ads_stream_wait_test.cc"],
external_deps = ["gtest"],
language = "C++",
# shard_count = 10,
# uses_event_engine = True,
# uses_polling = False,
deps = [
":xds_client_test_lib",
"//:grpc++",
# ":xds_transport_fake",
# "//:xds_client",
# "//src/proto/grpc/testing/xds/v3:discovery_proto",
"//test/core/util:grpc_test_util",
"//test/core/util:scoped_env_var",
],
)

grpc_proto_fuzzer(
name = "xds_client_fuzzer",
srcs = ["xds_client_fuzzer.cc"],
Expand Down
108 changes: 0 additions & 108 deletions test/core/xds/xds_client_ads_stream_wait_test.cc

This file was deleted.

Loading

0 comments on commit 04dccdc

Please sign in to comment.