From 948273bbe6443f675b3fc95d797f19269425221b Mon Sep 17 00:00:00 2001 From: Eugene Ostroukhov Date: Mon, 20 Nov 2023 10:32:06 -0800 Subject: [PATCH] Read initial metadata. --- src/core/ext/xds/xds_transport_grpc.cc | 66 ++++++++++++++------------ src/core/ext/xds/xds_transport_grpc.h | 2 + 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/core/ext/xds/xds_transport_grpc.cc b/src/core/ext/xds/xds_transport_grpc.cc index 2a6aa0c0470d6..3480c5189c356 100644 --- a/src/core/ext/xds/xds_transport_grpc.cc +++ b/src/core/ext/xds/xds_transport_grpc.cc @@ -20,6 +20,7 @@ #include +#include #include #include #include @@ -86,50 +87,47 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr); // Start ops on the call. grpc_call_error call_error; - grpc_op ops[3]; - memset(ops, 0, sizeof(ops)); + grpc_op op; + memset(&op, 0, sizeof(op)); // Send initial metadata. No callback for this, since we don't really // care when it finishes. - grpc_op* op = ops; - op->op = GRPC_OP_SEND_INITIAL_METADATA; - op->data.send_initial_metadata.count = 0; - op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - op->reserved = nullptr; - op++; - call_error = grpc_call_start_batch_and_execute( - call_, ops, static_cast(op - ops), nullptr); + op.op = GRPC_OP_SEND_INITIAL_METADATA; + op.data.send_initial_metadata.count = 0; + op.flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + op.reserved = nullptr; + call_error = grpc_call_start_batch_and_execute(call_, &op, 1, nullptr); + GPR_ASSERT(GRPC_CALL_OK == call_error); + // Start a batch with recv_initial_metadata. + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_INITIAL_METADATA; + op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv_; + op.flags = 0; + op.reserved = nullptr; + GRPC_CLOSURE_INIT(&on_recv_initial_metadata_, OnRecvInitialMetadata, this, + nullptr); + call_error = grpc_call_start_batch_and_execute(call_, &op, 1, + &on_recv_initial_metadata_); GPR_ASSERT(GRPC_CALL_OK == call_error); - // Start a batch with recv_initial_metadata and recv_message. - op = ops; - op->op = GRPC_OP_RECV_INITIAL_METADATA; - op->data.recv_initial_metadata.recv_initial_metadata = - &initial_metadata_recv_; - op->flags = 0; - op->reserved = nullptr; - op++; // Start a batch for recv_trailing_metadata. - op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; - op->data.recv_status_on_client.status = &status_code_; - op->data.recv_status_on_client.status_details = &status_details_; - op->flags = 0; - op->reserved = nullptr; - op++; - Ref(DEBUG_LOCATION, "OnStatusReceived").release(); + op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op.data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; + op.data.recv_status_on_client.status = &status_code_; + op.data.recv_status_on_client.status_details = &status_details_; + op.flags = 0; + op.reserved = nullptr; // This callback signals the end of the call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); - call_error = grpc_call_start_batch_and_execute( - call_, ops, static_cast(op - ops), &on_status_received_); + call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); } GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: ~GrpcStreamingCall() { - grpc_metadata_array_destroy(&initial_metadata_recv_); grpc_metadata_array_destroy(&trailing_metadata_recv_); grpc_byte_buffer_destroy(send_message_payload_); grpc_byte_buffer_destroy(recv_message_payload_); @@ -178,6 +176,12 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: self->Unref(DEBUG_LOCATION, "OnRequestSent"); } +void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: + OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) { + auto self = static_cast(arg); + grpc_metadata_array_destroy(&self->initial_metadata_recv_); +} + void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: OnResponseReceived(void* arg, grpc_error_handle /*error*/) { auto self = static_cast(arg); @@ -203,7 +207,7 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: StartRecvMessage() { - Ref(DEBUG_LOCATION, "OnResponseReceived").release(); + Ref(DEBUG_LOCATION, "StartRecvMessage").release(); grpc_op op; memset(&op, 0, sizeof(op)); op.op = GRPC_OP_RECV_MESSAGE; diff --git a/src/core/ext/xds/xds_transport_grpc.h b/src/core/ext/xds/xds_transport_grpc.h index b46e988f5f89c..a4d25ff75c9d6 100644 --- a/src/core/ext/xds/xds_transport_grpc.h +++ b/src/core/ext/xds/xds_transport_grpc.h @@ -103,6 +103,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall void StartRecvMessage() override; private: + static void OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/); static void OnRequestSent(void* arg, grpc_error_handle error); static void OnResponseReceived(void* arg, grpc_error_handle /*error*/); static void OnStatusReceived(void* arg, grpc_error_handle /*error*/); @@ -116,6 +117,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall // recv_initial_metadata grpc_metadata_array initial_metadata_recv_; + grpc_closure on_recv_initial_metadata_; // send_message grpc_byte_buffer* send_message_payload_ = nullptr;