Skip to content

Commit

Permalink
fixup: Code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Dec 2, 2023
1 parent 7782302 commit 8320342
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self), update = std::move(update),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->OnResourceChangedHelper(std::move(update),
std::move(read_delay_handle));
self->OnResourceChangedHelper(std::move(update));
},
DEBUG_LOCATION);
}
Expand All @@ -232,8 +231,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self), status = std::move(status),
read_delay_handle = std::move(read_delay_handle)]() mutable {
self->OnErrorHelper(std::move(status),
std::move(read_delay_handle));
self->OnErrorHelper(std::move(status));
},
DEBUG_LOCATION);
}
Expand All @@ -243,7 +241,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
discovery_mechanism_->parent()->work_serializer()->Run(
[self = std::move(self),
read_delay_handle = std::move(read_delay_handle)]() {
self->OnResourceDoesNotExistHelper(read_delay_handle);
self->OnResourceDoesNotExistHelper();
},
DEBUG_LOCATION);
}
Expand All @@ -253,8 +251,7 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
// in methods of this class rather than in lambdas to work around an MSVC
// bug.
void OnResourceChangedHelper(
std::shared_ptr<const XdsEndpointResource> update,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) {
std::shared_ptr<const XdsEndpointResource> update) {
std::string resolution_note;
if (update->priorities.empty()) {
resolution_note = absl::StrCat(
Expand All @@ -278,26 +275,21 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {
}
discovery_mechanism_->parent()->OnEndpointChanged(
discovery_mechanism_->index(), std::move(update),
std::move(resolution_note), read_delay_handle);
std::move(resolution_note));
}
void OnErrorHelper(
absl::Status status,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) {
void OnErrorHelper(absl::Status status) {
discovery_mechanism_->parent()->OnError(
discovery_mechanism_->index(),
absl::StrCat("EDS watcher error for resource ",
discovery_mechanism_->GetEdsResourceName(), " (",
status.ToString(), ")"),
std::move(read_delay_handle));
status.ToString(), ")"));
}
void OnResourceDoesNotExistHelper(
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) {
void OnResourceDoesNotExistHelper() {
discovery_mechanism_->parent()->OnResourceDoesNotExist(
discovery_mechanism_->index(),
absl::StrCat("EDS resource ",
discovery_mechanism_->GetEdsResourceName(),
" does not exist"),
read_delay_handle);
" does not exist"));
}
RefCountedPtr<EdsDiscoveryMechanism> discovery_mechanism_;
};
Expand Down Expand Up @@ -394,15 +386,11 @@ class XdsClusterResolverLb : public LoadBalancingPolicy {

void ShutdownLocked() override;

void OnEndpointChanged(
size_t index, std::shared_ptr<const XdsEndpointResource> update,
std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle);
void OnError(size_t index, std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle);
void OnResourceDoesNotExist(
size_t index, std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle);
void OnEndpointChanged(size_t index,
std::shared_ptr<const XdsEndpointResource> update,
std::string resolution_note);
void OnError(size_t index, std::string resolution_note);
void OnResourceDoesNotExist(size_t index, std::string resolution_note);

void MaybeDestroyChildPolicyLocked();

Expand Down Expand Up @@ -484,8 +472,7 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::Start() {
if (resolver_ == nullptr) {
parent()->OnResourceDoesNotExist(
index(),
absl::StrCat("error creating DNS resolver for ", GetDnsHostname()),
XdsClient::ReadDelayHandle::NoWait());
absl::StrCat("error creating DNS resolver for ", GetDnsHostname()));
return;
}
resolver_->StartLocked();
Expand Down Expand Up @@ -523,8 +510,7 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
"DNS resolution failed for ", discovery_mechanism_->GetDnsHostname(),
" (", result.addresses.status().ToString(), ")");
}
lb_policy->OnError(index, result.resolution_note,
XdsClient::ReadDelayHandle::NoWait());
lb_policy->OnError(index, result.resolution_note);
return;
}
// Convert resolver result to EDS update.
Expand All @@ -537,8 +523,7 @@ void XdsClusterResolverLb::LogicalDNSDiscoveryMechanism::ResolverResultHandler::
priority.localities.emplace(locality.name.get(), std::move(locality));
update->priorities.emplace_back(std::move(priority));
lb_policy->OnEndpointChanged(index, std::move(update),
std::move(result.resolution_note),
XdsClient::ReadDelayHandle::NoWait());
std::move(result.resolution_note));
}

//
Expand Down Expand Up @@ -663,8 +648,7 @@ const XdsEndpointResource::PriorityList& GetUpdatePriorityList(

void XdsClusterResolverLb::OnEndpointChanged(
size_t index, std::shared_ptr<const XdsEndpointResource> update,
std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */) {
std::string resolution_note) {
if (shutting_down_) return;
if (GRPC_TRACE_FLAG_ENABLED(grpc_lb_xds_cluster_resolver_trace)) {
gpr_log(GPR_INFO,
Expand Down Expand Up @@ -761,9 +745,7 @@ void XdsClusterResolverLb::OnEndpointChanged(
(void)UpdateChildPolicyLocked();
}

void XdsClusterResolverLb::OnError(
size_t index, std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) {
void XdsClusterResolverLb::OnError(size_t index, std::string resolution_note) {
gpr_log(GPR_ERROR,
"[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
" reported error: %s",
Expand All @@ -773,21 +755,20 @@ void XdsClusterResolverLb::OnError(
// Call OnEndpointChanged() with an empty update just like
// OnResourceDoesNotExist().
OnEndpointChanged(index, std::make_shared<XdsEndpointResource>(),
std::move(resolution_note), std::move(read_delay_handle));
std::move(resolution_note));
}
}

void XdsClusterResolverLb::OnResourceDoesNotExist(
size_t index, std::string resolution_note,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle) {
void XdsClusterResolverLb::OnResourceDoesNotExist(size_t index,
std::string resolution_note) {
gpr_log(GPR_ERROR,
"[xds_cluster_resolver_lb %p] discovery mechanism %" PRIuPTR
" resource does not exist: %s",
this, index, resolution_note.c_str());
if (shutting_down_) return;
// Call OnEndpointChanged() with an empty update.
OnEndpointChanged(index, std::make_shared<XdsEndpointResource>(),
std::move(resolution_note), std::move(read_delay_handle));
std::move(resolution_note));
}

//
Expand Down
3 changes: 0 additions & 3 deletions src/core/ext/xds/xds_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,6 @@ class XdsClient::ChannelState::AdsCallState

RetryableCall<AdsCallState>* parent() const { return parent_.get(); }
ChannelState* chand() const { return parent_->chand(); }
XdsTransportFactory::XdsTransport::StreamingCall* call() const {
return call_.get();
}
XdsClient* xds_client() const { return chand()->xds_client(); }
bool seen_response() const { return seen_response_; }

Expand Down
1 change: 1 addition & 0 deletions test/core/xds/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ grpc_cc_library(
external_deps = [
"absl/strings",
"absl/types:optional",
"gtest",
],
language = "C++",
deps = [
Expand Down
25 changes: 21 additions & 4 deletions test/core/xds/xds_client_ads_stream_wait_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//

#include <memory>
#include <optional>
#include <string_view>
#include <utility>

Expand Down Expand Up @@ -59,21 +60,37 @@ TEST_F(XdsClientNotifyWatchersDone, Basic) {
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 6))
.Serialize());
stream->SendMessageToClient(
ResponseBuilder(XdsFooResourceType::Get()->type_url())
.set_version_info("2")
.set_nonce("A")
.AddFooResource(XdsFooResource("foo1", 8))
.Serialize());
// XdsClient should have delivered the response to the watcher.
auto resource = watcher->WaitForNextResourceAndHandle();
ASSERT_NE(resource, absl::nullopt);
EXPECT_EQ(resource->first->name, "foo1");
EXPECT_EQ(resource->first->value, 6);
EXPECT_EQ(resource->name(), "foo1");
EXPECT_EQ(resource->resource_value(), 6);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"1", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
EXPECT_EQ(stream->read_count(), 0);
resource->second.reset();
EXPECT_EQ(stream->read_count(), 1);
resource = absl::nullopt;
EXPECT_EQ(stream->read_count(), 2);
resource = watcher->WaitForNextResourceAndHandle();
ASSERT_NE(resource, absl::nullopt);
// XdsClient should have sent an ACK message to the xDS server.
request = WaitForRequest(stream.get());
ASSERT_TRUE(request.has_value());
CheckRequest(*request, XdsFooResourceType::Get()->type_url(),
/*version_info=*/"2", /*response_nonce=*/"A",
/*error_detail=*/absl::OkStatus(),
/*resource_names=*/{"foo1"});
resource = absl::nullopt;
// Cancel watch.
CancelFooWatch(watcher.get(), "foo1");
EXPECT_TRUE(stream->Orphaned());
Expand Down
26 changes: 21 additions & 5 deletions test/core/xds/xds_client_test_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
#include "google/protobuf/util/json_util.h"
#include "gtest/gtest.h"
#include "upb/reflection/def.h"
#include "xds_transport_fake.h"

#include <grpc/support/json.h>
#include <grpc/support/log.h>
Expand Down Expand Up @@ -235,9 +236,22 @@ class XdsClientTestBase : public ::testing::Test {
XdsTestResourceType<ResourceStruct, all_resources_required_in_sotw>,
ResourceStruct> {
public:
using ResourceAndReadDelayHandle =
std::pair<std::shared_ptr<const ResourceStruct>,
RefCountedPtr<XdsClient::ReadDelayHandle>>;
class ResourceAndReadDelayHandle {
public:
ResourceAndReadDelayHandle(
std::shared_ptr<const ResourceStruct> resource,
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle)
: resource_(std::move(resource)),
read_delay_handle_(std::move(read_delay_handle)) {}

absl::string_view name() const { return resource_->name; }

int resource_value() const { return resource_->value; }

private:
std::shared_ptr<const ResourceStruct> resource_;
RefCountedPtr<XdsClient::ReadDelayHandle> read_delay_handle_;
};

// A watcher implementation that queues delivered watches.
class Watcher : public XdsResourceTypeImpl<
Expand Down Expand Up @@ -333,10 +347,11 @@ class XdsClientTestBase : public ::testing::Test {
RefCountedPtr<XdsClient::ReadDelayHandle>
read_delay_handle) override {
MutexLock lock(&mu_);
queue_.emplace_back(
std::make_pair(std::move(foo), std::move(read_delay_handle)));
queue_.emplace_back(ResourceAndReadDelayHandle(
std::move(foo), std::move(read_delay_handle)));
cv_.Signal();
}

void OnError(
absl::Status status,
RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
Expand All @@ -345,6 +360,7 @@ class XdsClientTestBase : public ::testing::Test {
queue_.push_back(std::move(status));
cv_.Signal();
}

void OnResourceDoesNotExist(
RefCountedPtr<XdsClient::ReadDelayHandle> /* read_delay_handle */)
override {
Expand Down
47 changes: 41 additions & 6 deletions test/core/xds/xds_transport_fake.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@

#include "test/core/xds/xds_transport_fake.h"

#include <cstddef>
#include <functional>
#include <memory>
#include <string>
#include <thread>
#include <type_traits>
#include <utility>

#include "gtest/gtest.h"

#include <grpc/event_engine/event_engine.h>
#include <grpc/support/log.h>

Expand Down Expand Up @@ -120,15 +125,45 @@ void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient(
CompleteSendMessageFromClientLocked(ok);
}

void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() {
absl::optional<std::string> pending;
grpc_core::ReleasableMutexLock lock(&mu_);
ASSERT_FALSE(read_pending_);
read_count_ += 1;
read_pending_ = true;
if (!to_client_messages_.empty()) {
// Dispatch pending message (if there's one) on a separate thread to avoid
// recursion
std::thread thread(
[](const RefCountedPtr<FakeStreamingCall>& call) {
call->DispatchPendingMessageToClient();
},
Ref());
thread.detach();
}
}

void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient(
absl::string_view payload) {
ExecCtx exec_ctx;
RefCountedPtr<RefCountedEventHandler> event_handler;
{
MutexLock lock(&mu_);
event_handler = event_handler_->Ref();
ReleasableMutexLock lock(&mu_);
to_client_messages_.emplace_back(payload);
lock.Release();
DispatchPendingMessageToClient();
}

void FakeXdsTransportFactory::FakeStreamingCall::
DispatchPendingMessageToClient() {
ReleasableMutexLock lock(&mu_);
if (!read_pending_ || to_client_messages_.empty()) {
return;
}
event_handler->OnRecvMessage(payload);
read_pending_ = false;
std::string message = std::move(to_client_messages_.front());
to_client_messages_.pop_front();
auto event_handler = event_handler_;
lock.Release();
ExecCtx exec_ctx;
event_handler->OnRecvMessage(message);
}

void FakeXdsTransportFactory::FakeStreamingCall::MaybeSendStatusToClient(
Expand Down
Loading

0 comments on commit 8320342

Please sign in to comment.