Skip to content

Commit

Permalink
Read initial metadata.
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Nov 27, 2023
1 parent 7cd2e48 commit 948273b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 31 deletions.
66 changes: 35 additions & 31 deletions src/core/ext/xds/xds_transport_grpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <string.h>

#include <cstddef>
#include <functional>
#include <memory>
#include <string_view>
Expand Down Expand Up @@ -86,50 +87,47 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall(
GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr);
// Start ops on the call.
grpc_call_error call_error;
grpc_op ops[3];
memset(ops, 0, sizeof(ops));
grpc_op op;
memset(&op, 0, sizeof(op));
// Send initial metadata. No callback for this, since we don't really
// care when it finishes.
grpc_op* op = ops;
op->op = GRPC_OP_SEND_INITIAL_METADATA;
op->data.send_initial_metadata.count = 0;
op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op->reserved = nullptr;
op++;
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), nullptr);
op.op = GRPC_OP_SEND_INITIAL_METADATA;
op.data.send_initial_metadata.count = 0;
op.flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY |
GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET;
op.reserved = nullptr;
call_error = grpc_call_start_batch_and_execute(call_, &op, 1, nullptr);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Start a batch with recv_initial_metadata.
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_INITIAL_METADATA;
op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv_;
op.flags = 0;
op.reserved = nullptr;
GRPC_CLOSURE_INIT(&on_recv_initial_metadata_, OnRecvInitialMetadata, this,
nullptr);
call_error = grpc_call_start_batch_and_execute(call_, &op, 1,
&on_recv_initial_metadata_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
// Start a batch with recv_initial_metadata and recv_message.
op = ops;
op->op = GRPC_OP_RECV_INITIAL_METADATA;
op->data.recv_initial_metadata.recv_initial_metadata =
&initial_metadata_recv_;
op->flags = 0;
op->reserved = nullptr;
op++;
// Start a batch for recv_trailing_metadata.
op->op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op->data.recv_status_on_client.status = &status_code_;
op->data.recv_status_on_client.status_details = &status_details_;
op->flags = 0;
op->reserved = nullptr;
op++;
Ref(DEBUG_LOCATION, "OnStatusReceived").release();
op.op = GRPC_OP_RECV_STATUS_ON_CLIENT;
op.data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_;
op.data.recv_status_on_client.status = &status_code_;
op.data.recv_status_on_client.status_details = &status_details_;
op.flags = 0;
op.reserved = nullptr;
// This callback signals the end of the call, so it relies on the initial
// ref instead of a new ref. When it's invoked, it's the initial ref that is
// unreffed.
GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr);
call_error = grpc_call_start_batch_and_execute(
call_, ops, static_cast<size_t>(op - ops), &on_status_received_);
call_error =
grpc_call_start_batch_and_execute(call_, &op, 1, &on_status_received_);
GPR_ASSERT(GRPC_CALL_OK == call_error);
GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr);
}

GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
~GrpcStreamingCall() {
grpc_metadata_array_destroy(&initial_metadata_recv_);
grpc_metadata_array_destroy(&trailing_metadata_recv_);
grpc_byte_buffer_destroy(send_message_payload_);
grpc_byte_buffer_destroy(recv_message_payload_);
Expand Down Expand Up @@ -178,6 +176,12 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
self->Unref(DEBUG_LOCATION, "OnRequestSent");
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) {
auto self = static_cast<GrpcStreamingCall*>(arg);
grpc_metadata_array_destroy(&self->initial_metadata_recv_);
}

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
OnResponseReceived(void* arg, grpc_error_handle /*error*/) {
auto self = static_cast<GrpcStreamingCall*>(arg);
Expand All @@ -203,7 +207,7 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::

void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::
StartRecvMessage() {
Ref(DEBUG_LOCATION, "OnResponseReceived").release();
Ref(DEBUG_LOCATION, "StartRecvMessage").release();
grpc_op op;
memset(&op, 0, sizeof(op));
op.op = GRPC_OP_RECV_MESSAGE;
Expand Down
2 changes: 2 additions & 0 deletions src/core/ext/xds/xds_transport_grpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall
void StartRecvMessage() override;

private:
static void OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/);
static void OnRequestSent(void* arg, grpc_error_handle error);
static void OnResponseReceived(void* arg, grpc_error_handle /*error*/);
static void OnStatusReceived(void* arg, grpc_error_handle /*error*/);
Expand All @@ -116,6 +117,7 @@ class GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall

// recv_initial_metadata
grpc_metadata_array initial_metadata_recv_;
grpc_closure on_recv_initial_metadata_;

// send_message
grpc_byte_buffer* send_message_payload_ = nullptr;
Expand Down

0 comments on commit 948273b

Please sign in to comment.