Skip to content

Commit

Permalink
fixup: redid a read start
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Nov 8, 2023
1 parent 3aef2fd commit 08f0257
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 97 deletions.
8 changes: 2 additions & 6 deletions src/core/ext/filters/client_channel/lb_policy/xds/cds.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,7 @@ class CdsLb : public LoadBalancingPolicy {

void OnResourceChanged(
std::shared_ptr<const XdsClusterResource> cluster_data,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) override {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
[self = std::move(self), cluster_data = std::move(cluster_data),
Expand All @@ -146,9 +144,7 @@ class CdsLb : public LoadBalancingPolicy {
DEBUG_LOCATION);
}
void OnResourceDoesNotExist(
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) override {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<ClusterWatcher> self = Ref();
parent_->work_serializer()->Run(
[self = std::move(self),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ TraceFlag grpc_lb_xds_cluster_resolver_trace(false, "xds_cluster_resolver_lb");

namespace {

using ReadDelayHandle =
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle;

constexpr absl::string_view kXdsClusterResolver =
"xds_cluster_resolver_experimental";

Expand Down Expand Up @@ -219,7 +216,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
}
void OnResourceChanged(
std::shared_ptr<const XdsEndpointResource> update,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self), update = std::move(update),
Expand All @@ -238,7 +235,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
DEBUG_LOCATION);
}
void OnResourceDoesNotExist(
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override {
RefCountedPtr<EndpointWatcher> self = Ref();
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self),
Expand All @@ -254,7 +251,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// bug.
void OnResourceChangedHelper(
std::shared_ptr<const XdsEndpointResource> update,
RefCountedPtr<ReadDelayHandle> /*read_delay_handle*/) {
RefCountedPtr<XdsApi::ReadDelayHandle> /*read_delay_handle*/) {
std::string resolution_note;
if (update->priorities.empty()) {
resolution_note = absl::StrCat(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,7 @@ TraceFlag grpc_xds_resolver_trace(false, "xds_resolver");

namespace {

using ReadDelayHandle =
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle;
using ReadDelayHandle = XdsApi::ReadDelayHandle;

//
// XdsResolver
Expand Down
4 changes: 1 addition & 3 deletions src/core/ext/xds/xds_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,7 @@ void MaybeLogDiscoveryResponse(

absl::Status XdsApi::ParseAdsResponse(
absl::string_view encoded_response, AdsResponseParserInterface* parser,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) {
RefCountedPtr<ReadDelayHandle> read_delay_handle) {
upb::Arena arena;
const XdsApiContext context = {client_, tracer_, symtab_->ptr(), arena.ptr()};
// Decode the response.
Expand Down
22 changes: 16 additions & 6 deletions src/core/ext/xds/xds_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,20 @@ class XdsTransportFactory;
// - CSDS response generation
class XdsApi {
public:
class ReadDelayHandle : public InternallyRefCounted<ReadDelayHandle> {
public:
explicit ReadDelayHandle(absl::AnyInvocable<void()> read)
: read_(std::move(read)) {}
~ReadDelayHandle() override { read_(); }

void Orphan() override {}

static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }

private:
absl::AnyInvocable<void()> read_;
};

// Interface defined by caller and passed to ParseAdsResponse().
class AdsResponseParserInterface {
public:
Expand All @@ -74,9 +88,7 @@ class XdsApi {
virtual void ParseResource(
upb_Arena* arena, size_t idx, absl::string_view type_url,
absl::string_view resource_name, absl::string_view serialized_resource,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) = 0;
RefCountedPtr<ReadDelayHandle> read_delay_handle) = 0;

// Called when a resource is wrapped in a Resource wrapper proto but
// we fail to parse the Resource wrapper.
Expand Down Expand Up @@ -166,9 +178,7 @@ class XdsApi {
// Otherwise, all events are reported to the parser.
absl::Status ParseAdsResponse(
absl::string_view encoded_response, AdsResponseParserInterface* parser,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle);
RefCountedPtr<ReadDelayHandle> read_delay_handle);

// Creates an initial LRS request.
std::string CreateLrsInitialRequest();
Expand Down
47 changes: 22 additions & 25 deletions src/core/ext/xds/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "absl/strings/strip.h"
#include "absl/types/optional.h"
#include "upb/mem/arena.h"
#include "xds_api.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>
Expand All @@ -58,8 +59,6 @@
namespace grpc_core {

using ::grpc_event_engine::experimental::EventEngine;
using ReadDelayHandle =
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle;

TraceFlag grpc_xds_client_trace(false, "xds_client");
TraceFlag grpc_xds_client_refcount_trace(false, "xds_client_refcount");
Expand Down Expand Up @@ -154,7 +153,7 @@ class XdsClient::ChannelState::AdsCallState
void ParseResource(
upb_Arena* arena, size_t idx, absl::string_view type_url,
absl::string_view resource_name, absl::string_view serialized_resource,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

void ResourceWrapperParsingFailed(size_t idx,
Expand Down Expand Up @@ -263,7 +262,7 @@ class XdsClient::ChannelState::AdsCallState
ResourceState& state = authority_state.resource_map[type_][name_.key];
state.meta.client_status = XdsApi::ResourceMetadata::DOES_NOT_EXIST;
ads_calld_->xds_client()->NotifyWatchersOnResourceDoesNotExist(
state.watchers, ReadDelayHandle::NoWait());
state.watchers, XdsApi::ReadDelayHandle::NoWait());
}
ads_calld_->xds_client()->work_serializer_.DrainQueue();
ads_calld_.reset();
Expand Down Expand Up @@ -291,10 +290,8 @@ class XdsClient::ChannelState::AdsCallState
: ads_calld_(std::move(ads_calld)) {}

void OnRequestSent(bool ok) override { ads_calld_->OnRequestSent(ok); }
void OnRecvMessage(
absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
ads_calld_->OnRecvMessage(payload, std::move(read_delay_handle));
void OnRecvMessage(absl::string_view payload) override {
ads_calld_->OnRecvMessage(payload);
}
void OnStatusReceived(absl::Status status) override {
ads_calld_->OnStatusReceived(std::move(status));
Expand All @@ -319,8 +316,7 @@ class XdsClient::ChannelState::AdsCallState
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&XdsClient::mu_);

void OnRequestSent(bool ok);
void OnRecvMessage(absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle);
void OnRecvMessage(absl::string_view payload);
void OnStatusReceived(absl::Status status);

bool IsCurrentCallOnChannel() const;
Expand Down Expand Up @@ -373,10 +369,8 @@ class XdsClient::ChannelState::LrsCallState
: lrs_calld_(std::move(lrs_calld)) {}

void OnRequestSent(bool ok) override { lrs_calld_->OnRequestSent(ok); }
void OnRecvMessage(
absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle) override {
lrs_calld_->OnRecvMessage(payload, std::move(read_delay_handle));
void OnRecvMessage(absl::string_view payload) override {
lrs_calld_->OnRecvMessage(payload);
}
void OnStatusReceived(absl::Status status) override {
lrs_calld_->OnStatusReceived(std::move(status));
Expand Down Expand Up @@ -423,8 +417,7 @@ class XdsClient::ChannelState::LrsCallState
};

void OnRequestSent(bool ok);
void OnRecvMessage(absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle);
void OnRecvMessage(absl::string_view payload);
void OnStatusReceived(absl::Status status);

bool IsCurrentCallOnChannel() const;
Expand Down Expand Up @@ -753,7 +746,7 @@ void UpdateResourceMetadataNacked(const std::string& version,
void XdsClient::ChannelState::AdsCallState::AdsResponseParser::ParseResource(
upb_Arena* arena, size_t idx, absl::string_view type_url,
absl::string_view resource_name, absl::string_view serialized_resource,
RefCountedPtr<ReadDelayHandle> read_delay_handle) {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) {
std::string error_prefix = absl::StrCat(
"resource index ", idx, ": ",
resource_name.empty() ? "" : absl::StrCat(resource_name, ": "));
Expand Down Expand Up @@ -1045,11 +1038,16 @@ void XdsClient::ChannelState::AdsCallState::OnRequestSent(bool ok) {
}

void XdsClient::ChannelState::AdsCallState::OnRecvMessage(
absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle) {
absl::string_view payload) {
{
MutexLock lock(&xds_client()->mu_);
if (!IsCurrentCallOnChannel()) return;
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle =
MakeRefCounted<XdsApi::ReadDelayHandle>([ref = Ref()]() {
if (ref->call_ != nullptr) {
ref->call_->Read();
}
});
// Parse and validate the response.
AdsResponseParser parser(this);
absl::Status status = xds_client()->api_.ParseAdsResponse(
Expand Down Expand Up @@ -1395,8 +1393,7 @@ void XdsClient::ChannelState::LrsCallState::OnRequestSent(bool /*ok*/) {
}

void XdsClient::ChannelState::LrsCallState::OnRecvMessage(
absl::string_view payload,
RefCountedPtr<ReadDelayHandle> /* read_delay_handle */) {
absl::string_view payload) {
MutexLock lock(&xds_client()->mu_);
// If we're no longer the current call, ignore the result.
if (!IsCurrentCallOnChannel()) return;
Expand Down Expand Up @@ -1621,8 +1618,8 @@ void XdsClient::WatchResource(const XdsResourceType* type,
work_serializer_.Schedule(
[watcher, value = resource_state.resource]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnGenericResourceChanged(value,
ReadDelayHandle::NoWait());
watcher->OnGenericResourceChanged(
value, XdsApi::ReadDelayHandle::NoWait());
},
DEBUG_LOCATION);
} else if (resource_state.meta.client_status ==
Expand All @@ -1634,7 +1631,7 @@ void XdsClient::WatchResource(const XdsResourceType* type,
}
work_serializer_.Schedule(
[watcher]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
watcher->OnResourceDoesNotExist(ReadDelayHandle::NoWait());
watcher->OnResourceDoesNotExist(XdsApi::ReadDelayHandle::NoWait());
},
DEBUG_LOCATION);
} else if (resource_state.meta.client_status ==
Expand Down Expand Up @@ -1963,7 +1960,7 @@ void XdsClient::NotifyWatchersOnErrorLocked(
void XdsClient::NotifyWatchersOnResourceDoesNotExist(
const std::map<ResourceWatcherInterface*,
RefCountedPtr<ResourceWatcherInterface>>& watchers,
RefCountedPtr<ReadDelayHandle> read_delay_handle) {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) {
work_serializer_.Schedule(
[watchers, read_delay_handle = std::move(read_delay_handle)]()
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) {
Expand Down
15 changes: 6 additions & 9 deletions src/core/ext/xds/xds_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <set>
#include <string>
#include <utility>
#include <variant>
#include <vector>

#include "absl/base/thread_annotations.h"
Expand Down Expand Up @@ -64,16 +65,12 @@ class XdsClient : public DualRefCounted<XdsClient> {
public:
virtual void OnGenericResourceChanged(
std::shared_ptr<const XdsResourceType::ResourceData> resource,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle)
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnError(absl::Status status)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
virtual void OnResourceDoesNotExist(
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle)
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&work_serializer_) = 0;
};

Expand Down Expand Up @@ -288,9 +285,7 @@ class XdsClient : public DualRefCounted<XdsClient> {
void NotifyWatchersOnResourceDoesNotExist(
const std::map<ResourceWatcherInterface*,
RefCountedPtr<ResourceWatcherInterface>>& watchers,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle);
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle);

void MaybeRegisterResourceTypeLocked(const XdsResourceType* resource_type)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
Expand All @@ -313,6 +308,8 @@ class XdsClient : public DualRefCounted<XdsClient> {
const XdsBootstrap::XdsServer& server, const char* reason)
ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);

void AdsReadNext();

std::unique_ptr<XdsBootstrap> bootstrap_;
OrphanablePtr<XdsTransportFactory> transport_factory_;
const Duration request_timeout_;
Expand Down
10 changes: 4 additions & 6 deletions src/core/ext/xds/xds_resource_type_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <utility>

#include "absl/strings/string_view.h"
#include "xds_api.h"
#include "xds_client.h"

#include "src/core/ext/xds/xds_client.h"
#include "src/core/ext/xds/xds_resource_type.h"
Expand All @@ -44,18 +46,14 @@ class XdsResourceTypeImpl : public XdsResourceType {
public:
virtual void OnResourceChanged(
std::shared_ptr<const ResourceType> resource,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) = 0;
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) = 0;

private:
// Get result from XdsClient generic watcher interface, perform
// down-casting, and invoke the caller's OnResourceChanged() method.
void OnGenericResourceChanged(
std::shared_ptr<const XdsResourceType::ResourceData> resource,
RefCountedPtr<
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle>
read_delay_handle) override {
RefCountedPtr<XdsApi::ReadDelayHandle> read_delay_handle) override {
OnResourceChanged(
std::static_pointer_cast<const ResourceType>(std::move(resource)),
std::move(read_delay_handle));
Expand Down
4 changes: 2 additions & 2 deletions src/core/ext/xds/xds_server_config_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "absl/strings/string_view.h"
#include "absl/types/optional.h"
#include "absl/types/variant.h"
#include "xds_api.h"

#include <grpc/grpc.h>
#include <grpc/grpc_security.h>
Expand Down Expand Up @@ -92,8 +93,7 @@
namespace grpc_core {
namespace {

using ReadDelayHandle =
XdsTransportFactory::XdsTransport::StreamingCall::ReadDelayHandle;
using ReadDelayHandle = XdsApi::ReadDelayHandle;

TraceFlag grpc_xds_server_config_fetcher_trace(false,
"xds_server_config_fetcher");
Expand Down
17 changes: 1 addition & 16 deletions src/core/ext/xds/xds_transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,6 @@ class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> {
// Represents a bidi streaming RPC call.
class StreamingCall : public InternallyRefCounted<StreamingCall> {
public:
class ReadDelayHandle : public RefCounted<ReadDelayHandle> {
public:
explicit ReadDelayHandle(RefCountedPtr<StreamingCall> call)
: call_(std::move(call)) {}

~ReadDelayHandle() override { call_->Read(); }

static RefCountedPtr<ReadDelayHandle> NoWait() { return nullptr; }

private:
RefCountedPtr<StreamingCall> call_;
};

// An interface for handling events on a streaming call.
class EventHandler {
public:
Expand All @@ -65,9 +52,7 @@ class XdsTransportFactory : public InternallyRefCounted<XdsTransportFactory> {
virtual void OnRequestSent(bool ok) = 0;
// Called when a message is received on the stream.
// Returns true to immediately resume the read.
virtual void OnRecvMessage(
absl::string_view payload,
RefCountedPtr<ReadDelayHandle> read_delay_handle) = 0;
virtual void OnRecvMessage(absl::string_view payload) = 0;
// Called when status is received on the stream.
virtual void OnStatusReceived(absl::Status status) = 0;
};
Expand Down
Loading

0 comments on commit 08f0257

Please sign in to comment.