diff --git a/src/core/ext/xds/xds_client.cc b/src/core/ext/xds/xds_client.cc index 020a529001e63..2d7efdc9ecd31 100644 --- a/src/core/ext/xds/xds_client.cc +++ b/src/core/ext/xds/xds_client.cc @@ -699,6 +699,20 @@ void XdsClient::ChannelState::RetryableCall::OnRetryTimer() { } } +// +// XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle +// + +XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle:: + ~AdsReadDelayHandle() { + XdsClient* client = ads_call_state_->xds_client(); + MutexLock lock(&client->mu_); + auto call = ads_call_state_->call_.get(); + if (call != nullptr) { + call->StartRecvMessage(); + } +} + // // XdsClient::ChannelState::AdsCallState::AdsResponseParser // @@ -2092,17 +2106,4 @@ std::string XdsClient::DumpClientConfigBinary() { return api_.AssembleClientConfig(resource_type_metadata_map); } -XdsClient::ChannelState::AdsCallState::AdsReadDelayHandle:: - ~AdsReadDelayHandle() { - XdsClient* client = ads_call_state_->xds_client(); - if (client == nullptr) { - return; - } - MutexLock lock(&client->mu_); - auto call = ads_call_state_->call_.get(); - if (call != nullptr) { - call->StartRecvMessage(); - } -} - } // namespace grpc_core diff --git a/src/core/ext/xds/xds_client.h b/src/core/ext/xds/xds_client.h index 0e973c5e7f4ee..cac2c7e2edd87 100644 --- a/src/core/ext/xds/xds_client.h +++ b/src/core/ext/xds/xds_client.h @@ -242,19 +242,6 @@ class XdsClient : public DualRefCounted { absl::Status status_; }; - public: - // class ReadDelayHandle : public RefCounted { - // public: - // explicit ReadDelayHandle(WeakRefCountedPtr - // channel_state); ~ReadDelayHandle() override; - - // static RefCountedPtr NoWait() { return nullptr; } - - // private: - // WeakRefCountedPtr channel_state_; - // }; - - private: struct ResourceState { std::map> watchers; diff --git a/src/core/ext/xds/xds_transport_grpc.cc b/src/core/ext/xds/xds_transport_grpc.cc index 1eec734b8c0d3..65a2bfc5c2dad 100644 --- a/src/core/ext/xds/xds_transport_grpc.cc +++ b/src/core/ext/xds/xds_transport_grpc.cc @@ -86,41 +86,46 @@ GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::GrpcStreamingCall( GRPC_CLOSURE_INIT(&on_request_sent_, OnRequestSent, this, nullptr); // Start ops on the call. grpc_call_error call_error; - grpc_op op; - memset(&op, 0, sizeof(op)); + grpc_op ops[2]; + memset(ops, 0, sizeof(ops)); // Send initial metadata. No callback for this, since we don't really // care when it finishes. - op.op = GRPC_OP_SEND_INITIAL_METADATA; - op.data.send_initial_metadata.count = 0; - op.flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | - GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; - op.reserved = nullptr; - call_error = grpc_call_start_batch_and_execute(call_, &op, 1, nullptr); - GPR_ASSERT(GRPC_CALL_OK == call_error); - // Start a batch with recv_initial_metadata. - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_RECV_INITIAL_METADATA; - op.data.recv_initial_metadata.recv_initial_metadata = &initial_metadata_recv_; - op.flags = 0; - op.reserved = nullptr; - GRPC_CLOSURE_INIT(&on_recv_initial_metadata_, OnRecvInitialMetadata, this, - nullptr); - call_error = grpc_call_start_batch_and_execute(call_, &op, 1, - &on_recv_initial_metadata_); + grpc_op* op = ops; + op->op = GRPC_OP_SEND_INITIAL_METADATA; + op->data.send_initial_metadata.count = 0; + op->flags = GRPC_INITIAL_METADATA_WAIT_FOR_READY | + GRPC_INITIAL_METADATA_WAIT_FOR_READY_EXPLICITLY_SET; + op->reserved = nullptr; + op++; + op->op = GRPC_OP_RECV_INITIAL_METADATA; + op->data.recv_initial_metadata.recv_initial_metadata = + &initial_metadata_recv_; + op->flags = 0; + op->reserved = nullptr; + op++; + // Ref will be released in the callback + GRPC_CLOSURE_INIT( + &on_recv_initial_metadata_, OnRecvInitialMetadata, + this->Ref(DEBUG_LOCATION, "OnRecvInitialMetadata").release(), nullptr); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_recv_initial_metadata_); GPR_ASSERT(GRPC_CALL_OK == call_error); // Start a batch for recv_trailing_metadata. - op.op = GRPC_OP_RECV_STATUS_ON_CLIENT; - op.data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; - op.data.recv_status_on_client.status = &status_code_; - op.data.recv_status_on_client.status_details = &status_details_; - op.flags = 0; - op.reserved = nullptr; + memset(ops, 0, sizeof(ops)); + op = ops; + op->op = GRPC_OP_RECV_STATUS_ON_CLIENT; + op->data.recv_status_on_client.trailing_metadata = &trailing_metadata_recv_; + op->data.recv_status_on_client.status = &status_code_; + op->data.recv_status_on_client.status_details = &status_details_; + op->flags = 0; + op->reserved = nullptr; + op++; // This callback signals the end of the call, so it relies on the initial // ref instead of a new ref. When it's invoked, it's the initial ref that is // unreffed. GRPC_CLOSURE_INIT(&on_status_received_, OnStatusReceived, this, nullptr); - call_error = - grpc_call_start_batch_and_execute(call_, &op, 1, &on_status_received_); + call_error = grpc_call_start_batch_and_execute( + call_, ops, static_cast(op - ops), &on_status_received_); GPR_ASSERT(GRPC_CALL_OK == call_error); GRPC_CLOSURE_INIT(&on_response_received_, OnResponseReceived, this, nullptr); } @@ -163,6 +168,27 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall::SendMessage( GPR_ASSERT(GRPC_CALL_OK == call_error); } +void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: + StartRecvMessage() { + Ref(DEBUG_LOCATION, "StartRecvMessage").release(); + grpc_op op; + memset(&op, 0, sizeof(op)); + op.op = GRPC_OP_RECV_MESSAGE; + op.data.recv_message.recv_message = &recv_message_payload_; + GPR_ASSERT(call_ != nullptr); + // Reuses the "OnResponseReceived" ref taken in ctor. + const grpc_call_error call_error = + grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); + GPR_ASSERT(GRPC_CALL_OK == call_error); +} + +void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: + OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) { + auto self = static_cast(arg); + grpc_metadata_array_destroy(&self->initial_metadata_recv_); + self->Unref(DEBUG_LOCATION, "OnRecvInitialMetadata"); +} + void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: OnRequestSent(void* arg, grpc_error_handle error) { auto* self = static_cast(arg); @@ -175,19 +201,9 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: self->Unref(DEBUG_LOCATION, "OnRequestSent"); } -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - OnRecvInitialMetadata(void* arg, grpc_error_handle /*error*/) { - auto self = static_cast(arg); - grpc_metadata_array_destroy(&self->initial_metadata_recv_); -} - void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: OnResponseReceived(void* arg, grpc_error_handle /*error*/) { - auto self = static_cast(arg); - // Create a ref that will leave till the end of this function and reduce the - // ref count to account for the ref obtained in StartRecvMessage - auto ref = self->Ref(); - self->Unref(DEBUG_LOCATION, "OnResponseReceived"); + auto self(static_cast(arg)); // If there was no payload, then we received status before we received // another message, so we stop reading. if (self->recv_message_payload_ == nullptr) { @@ -204,20 +220,6 @@ void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: CSliceUnref(response_slice); } -void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: - StartRecvMessage() { - Ref(DEBUG_LOCATION, "StartRecvMessage").release(); - grpc_op op; - memset(&op, 0, sizeof(op)); - op.op = GRPC_OP_RECV_MESSAGE; - op.data.recv_message.recv_message = &recv_message_payload_; - GPR_ASSERT(call_ != nullptr); - // Reuses the "OnResponseReceived" ref taken in ctor. - const grpc_call_error call_error = - grpc_call_start_batch_and_execute(call_, &op, 1, &on_response_received_); - GPR_ASSERT(GRPC_CALL_OK == call_error); -} - void GrpcXdsTransportFactory::GrpcXdsTransport::GrpcStreamingCall:: OnStatusReceived(void* arg, grpc_error_handle /*error*/) { auto* self = static_cast(arg); diff --git a/test/core/xds/BUILD b/test/core/xds/BUILD index f054d2e7b22da..1e19fc469982e 100644 --- a/test/core/xds/BUILD +++ b/test/core/xds/BUILD @@ -163,19 +163,6 @@ grpc_cc_library( ], ) -grpc_cc_library( - name = "xds_client_test_lib", - testonly = True, - hdrs = ["xds_client_test_lib.h"], - external_deps = ["gtest"], - language = "C++", - deps = [ - ":xds_transport_fake", - "//:xds_client", - "//src/proto/grpc/testing/xds/v3:discovery_proto", - ], -) - grpc_cc_test( name = "xds_client_test", srcs = ["xds_client_test.cc"], @@ -185,7 +172,6 @@ grpc_cc_test( uses_event_engine = True, uses_polling = False, deps = [ - ":xds_client_test_lib", ":xds_transport_fake", "//:xds_client", "//src/proto/grpc/testing/xds/v3:discovery_proto", @@ -194,25 +180,6 @@ grpc_cc_test( ], ) -grpc_cc_test( - name = "xds_client_ads_stream_wait_test", - srcs = ["xds_client_ads_stream_wait_test.cc"], - external_deps = ["gtest"], - language = "C++", - # shard_count = 10, - # uses_event_engine = True, - # uses_polling = False, - deps = [ - ":xds_client_test_lib", - "//:grpc++", - # ":xds_transport_fake", - # "//:xds_client", - # "//src/proto/grpc/testing/xds/v3:discovery_proto", - "//test/core/util:grpc_test_util", - "//test/core/util:scoped_env_var", - ], -) - grpc_proto_fuzzer( name = "xds_client_fuzzer", srcs = ["xds_client_fuzzer.cc"], diff --git a/test/core/xds/xds_client_ads_stream_wait_test.cc b/test/core/xds/xds_client_ads_stream_wait_test.cc deleted file mode 100644 index 8a413da0a98a7..0000000000000 --- a/test/core/xds/xds_client_ads_stream_wait_test.cc +++ /dev/null @@ -1,108 +0,0 @@ -// -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#include -#include - -#include "absl/status/status.h" -#include "absl/types/optional.h" -#include "gtest/gtest.h" - -#include - -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/proto/grpc/testing/xds/v3/discovery.pb.h" -#include "test/core/util/test_config.h" -#include "test/core/xds/xds_client_test_lib.h" - -namespace grpc_core { -namespace testing { -namespace { - -using XdsClientNotifyWatchersDone = XdsClientTestBase; - -TEST_F(XdsClientNotifyWatchersDone, Basic) { - InitXdsClient(); - // Start a watch for "foo1". - auto watcher = StartFooWatch("foo1"); - // Watcher should initially not see any resource reported. - EXPECT_FALSE(watcher->HasEvent()); - // XdsClient should have created an ADS stream. - auto stream = WaitForAdsStream(); - ASSERT_TRUE(stream != nullptr); - // XdsClient should have sent a subscription request on the ADS stream. - auto request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"", /*response_nonce=*/"", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - CheckRequestNode(*request); // Should be present on the first request. - // Send a response. - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("1") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 6)) - .Serialize()); - stream->SendMessageToClient( - ResponseBuilder(XdsFooResourceType::Get()->type_url()) - .set_version_info("2") - .set_nonce("A") - .AddFooResource(XdsFooResource("foo1", 8)) - .Serialize()); - // XdsClient should have delivered the response to the watcher. - auto resource = watcher->WaitForNextResourceAndHandle(); - ASSERT_NE(resource, absl::nullopt); - EXPECT_EQ(resource->name(), "foo1"); - EXPECT_EQ(resource->resource_value(), 6); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"1", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - EXPECT_EQ(stream->read_count(), 1); - resource = absl::nullopt; - EXPECT_EQ(stream->read_count(), 2); - resource = watcher->WaitForNextResourceAndHandle(); - ASSERT_NE(resource, absl::nullopt); - // XdsClient should have sent an ACK message to the xDS server. - request = WaitForRequest(stream.get()); - ASSERT_TRUE(request.has_value()); - CheckRequest(*request, XdsFooResourceType::Get()->type_url(), - /*version_info=*/"2", /*response_nonce=*/"A", - /*error_detail=*/absl::OkStatus(), - /*resource_names=*/{"foo1"}); - resource = absl::nullopt; - // Cancel watch. - CancelFooWatch(watcher.get(), "foo1"); - EXPECT_TRUE(stream->Orphaned()); -} - -} // namespace -} // namespace testing -} // namespace grpc_core - -int main(int argc, char** argv) { - ::testing::InitGoogleTest(&argc, argv); - grpc::testing::TestEnvironment env(&argc, argv); - grpc_init(); - int ret = RUN_ALL_TESTS(); - grpc_shutdown(); - return ret; -} diff --git a/test/core/xds/xds_client_test.cc b/test/core/xds/xds_client_test.cc index 485e8675d936b..9f72b6f83c6fd 100644 --- a/test/core/xds/xds_client_test.cc +++ b/test/core/xds/xds_client_test.cc @@ -20,21 +20,44 @@ #include "src/core/ext/xds/xds_client.h" +#include + +#include +#include +#include #include #include +#include +#include + #include "absl/strings/str_cat.h" #include "absl/time/time.h" #include "absl/types/optional.h" +#include "absl/types/variant.h" +#include "gmock/gmock.h" #include "gtest/gtest.h" +#include "upb/reflection/def.h" #include +#include +#include +#include #include "src/core/ext/xds/xds_bootstrap.h" +#include "src/core/ext/xds/xds_resource_type_impl.h" +#include "src/core/lib/event_engine/default_event_engine.h" +#include "src/core/lib/gprpp/debug_location.h" +#include "src/core/lib/gprpp/sync.h" +#include "src/core/lib/json/json.h" +#include "src/core/lib/json/json_args.h" +#include "src/core/lib/json/json_object_loader.h" +#include "src/core/lib/json/json_reader.h" +#include "src/core/lib/json/json_writer.h" +#include "src/proto/grpc/testing/xds/v3/base.pb.h" #include "src/proto/grpc/testing/xds/v3/discovery.pb.h" #include "test/core/util/scoped_env_var.h" #include "test/core/util/test_config.h" -#include "test/core/xds/xds_client_test_lib.h" #include "test/core/xds/xds_transport_fake.h" // IWYU pragma: no_include @@ -44,11 +67,707 @@ // IWYU pragma: no_include "google/protobuf/json/json.h" // IWYU pragma: no_include "google/protobuf/util/json_util.h" +using envoy::service::discovery::v3::DiscoveryRequest; +using envoy::service::discovery::v3::DiscoveryResponse; + namespace grpc_core { namespace testing { namespace { -using XdsClientTest = XdsClientTestBase; +class XdsClientTest : public ::testing::Test { + protected: + // A fake bootstrap implementation that allows tests to populate the + // fields however they want. + class FakeXdsBootstrap : public XdsBootstrap { + public: + class FakeNode : public Node { + public: + FakeNode() : id_("xds_client_test") {} + const std::string& id() const override { return id_; } + const std::string& cluster() const override { return cluster_; } + const std::string& locality_region() const override { + return locality_region_; + } + const std::string& locality_zone() const override { + return locality_zone_; + } + const std::string& locality_sub_zone() const override { + return locality_sub_zone_; + } + const Json::Object& metadata() const override { return metadata_; } + + void set_id(std::string id) { id_ = std::move(id); } + void set_cluster(std::string cluster) { cluster_ = std::move(cluster); } + void set_locality_region(std::string locality_region) { + locality_region_ = std::move(locality_region); + } + void set_locality_zone(std::string locality_zone) { + locality_zone_ = std::move(locality_zone); + } + void set_locality_sub_zone(std::string locality_sub_zone) { + locality_sub_zone_ = std::move(locality_sub_zone); + } + void set_metadata(Json::Object metadata) { + metadata_ = std::move(metadata); + } + + private: + std::string id_; + std::string cluster_; + std::string locality_region_; + std::string locality_zone_; + std::string locality_sub_zone_; + Json::Object metadata_; + }; + + class FakeXdsServer : public XdsServer { + public: + const std::string& server_uri() const override { return server_uri_; } + bool IgnoreResourceDeletion() const override { + return ignore_resource_deletion_; + } + bool Equals(const XdsServer& other) const override { + const auto& o = static_cast(other); + return server_uri_ == o.server_uri_ && + ignore_resource_deletion_ == o.ignore_resource_deletion_; + } + + void set_server_uri(std::string server_uri) { + server_uri_ = std::move(server_uri); + } + void set_ignore_resource_deletion(bool ignore_resource_deletion) { + ignore_resource_deletion_ = ignore_resource_deletion; + } + + private: + std::string server_uri_ = "default_xds_server"; + bool ignore_resource_deletion_ = false; + }; + + class FakeAuthority : public Authority { + public: + const XdsServer* server() const override { + return server_.has_value() ? &*server_ : nullptr; + } + + void set_server(absl::optional server) { + server_ = std::move(server); + } + + private: + absl::optional server_; + }; + + class Builder { + public: + Builder() { node_.emplace(); } + + Builder& set_node_id(std::string id) { + if (!node_.has_value()) node_.emplace(); + node_->set_id(std::move(id)); + return *this; + } + Builder& AddAuthority(std::string name, FakeAuthority authority) { + authorities_[std::move(name)] = std::move(authority); + return *this; + } + Builder& set_ignore_resource_deletion(bool ignore_resource_deletion) { + server_.set_ignore_resource_deletion(ignore_resource_deletion); + return *this; + } + std::unique_ptr Build() { + auto bootstrap = std::make_unique(); + bootstrap->server_ = std::move(server_); + bootstrap->node_ = std::move(node_); + bootstrap->authorities_ = std::move(authorities_); + return bootstrap; + } + + private: + FakeXdsServer server_; + absl::optional node_; + std::map authorities_; + }; + + std::string ToString() const override { return ""; } + + const XdsServer& server() const override { return server_; } + const Node* node() const override { + return node_.has_value() ? &*node_ : nullptr; + } + const Authority* LookupAuthority(const std::string& name) const override { + auto it = authorities_.find(name); + if (it == authorities_.end()) return nullptr; + return &it->second; + } + const XdsServer* FindXdsServer(const XdsServer& server) const override { + const auto& fake_server = static_cast(server); + if (fake_server == server_) return &server_; + for (const auto& p : authorities_) { + const auto* authority_server = + static_cast(p.second.server()); + if (authority_server != nullptr && *authority_server == fake_server) { + return authority_server; + } + } + return nullptr; + } + + private: + FakeXdsServer server_; + absl::optional node_; + std::map authorities_; + }; + + // A template for a test xDS resource type with an associated watcher impl. + // For simplicity, we use JSON instead of proto for serialization. + // + // The specified ResourceStruct must provide the following: + // - a static JsonLoader() method, as described in json_object_loader.h + // - an AsJsonString() method that returns the object in JSON string form + // - a static TypeUrl() method that returns the resource type + // + // The all_resources_required_in_sotw parameter indicates the value + // that should be returned by the AllResourcesRequiredInSotW() method. + template + class XdsTestResourceType + : public XdsResourceTypeImpl< + XdsTestResourceType, + ResourceStruct> { + public: + struct ResourceAndReadDelayHandle { + std::shared_ptr resource; + RefCountedPtr read_delay_handle; + }; + + // A watcher implementation that queues delivered watches. + class Watcher : public XdsResourceTypeImpl< + XdsTestResourceType, + ResourceStruct>::WatcherInterface { + public: + // Returns true if no event is received during the timeout period. + bool ExpectNoEvent(absl::Duration timeout) { + MutexLock lock(&mu_); + return !WaitForEventLocked(timeout); + } + + bool HasEvent() { + MutexLock lock(&mu_); + return !queue_.empty(); + } + + absl::optional WaitForNextResourceAndHandle( + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return absl::nullopt; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) + ? "error" + : "does-not-exist") + << " at " << location.file() << ":" << location.line(); + return absl::nullopt; + } + auto foo = std::move(absl::get(event)); + queue_.pop_front(); + return foo; + } + + std::shared_ptr WaitForNextResource( + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + auto resource_and_handle = + WaitForNextResourceAndHandle(timeout, location); + if (!resource_and_handle.has_value()) { + return nullptr; + } + return resource_and_handle->resource; + } + + absl::optional WaitForNextError( + absl::Duration timeout = absl::Seconds(1), + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return absl::nullopt; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) + ? "resource" + : "does-not-exist") + << " at " << location.file() << ":" << location.line(); + return absl::nullopt; + } + absl::Status error = std::move(absl::get(event)); + queue_.pop_front(); + return std::move(error); + } + + bool WaitForDoesNotExist(absl::Duration timeout, + SourceLocation location = SourceLocation()) { + MutexLock lock(&mu_); + if (!WaitForEventLocked(timeout)) return false; + Event& event = queue_.front(); + if (!absl::holds_alternative(event)) { + EXPECT_TRUE(false) + << "got unexpected event " + << (absl::holds_alternative(event) ? "error" + : "resource") + << " at " << location.file() << ":" << location.line(); + return false; + } + queue_.pop_front(); + return true; + } + + private: + struct DoesNotExist {}; + using Event = + absl::variant; + + void OnResourceChanged(std::shared_ptr foo, + RefCountedPtr + read_delay_handle) override { + MutexLock lock(&mu_); + ResourceAndReadDelayHandle event_details = { + std::move(foo), std::move(read_delay_handle)}; + queue_.emplace_back(std::move(event_details)); + cv_.Signal(); + } + + void OnError( + absl::Status status, + RefCountedPtr /* read_delay_handle */) + override { + MutexLock lock(&mu_); + queue_.push_back(std::move(status)); + cv_.Signal(); + } + + void OnResourceDoesNotExist( + RefCountedPtr /* read_delay_handle */) + override { + MutexLock lock(&mu_); + queue_.push_back(DoesNotExist()); + cv_.Signal(); + } + + // Returns true if an event was received, or false if the timeout + // expires before any event is received. + bool WaitForEventLocked(absl::Duration timeout) + ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { + while (queue_.empty()) { + if (cv_.WaitWithTimeout(&mu_, + timeout * grpc_test_slowdown_factor())) { + return false; + } + } + return true; + } + + Mutex mu_; + CondVar cv_; + std::deque queue_ ABSL_GUARDED_BY(&mu_); + }; + + absl::string_view type_url() const override { + return ResourceStruct::TypeUrl(); + } + XdsResourceType::DecodeResult Decode( + const XdsResourceType::DecodeContext& /*context*/, + absl::string_view serialized_resource) const override { + auto json = JsonParse(serialized_resource); + XdsResourceType::DecodeResult result; + if (!json.ok()) { + result.resource = json.status(); + } else { + absl::StatusOr foo = + LoadFromJson(*json); + if (!foo.ok()) { + auto it = json->object().find("name"); + if (it != json->object().end()) { + result.name = it->second.string(); + } + result.resource = foo.status(); + } else { + result.name = foo->name; + result.resource = std::make_unique(std::move(*foo)); + } + } + return result; + } + bool AllResourcesRequiredInSotW() const override { + return all_resources_required_in_sotw; + } + void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {} + + static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) { + google::protobuf::Any any; + any.set_type_url( + absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl())); + any.set_value(resource.AsJsonString()); + return any; + } + }; + + // A fake "Foo" xDS resource type. + struct XdsFooResource : public XdsResourceType::ResourceData { + std::string name; + uint32_t value; + + XdsFooResource() = default; + XdsFooResource(std::string name, uint32_t value) + : name(std::move(name)), value(value) {} + + bool operator==(const XdsFooResource& other) const { + return name == other.name && value == other.value; + } + + std::string AsJsonString() const { + return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}"); + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + static const auto* loader = JsonObjectLoader() + .Field("name", &XdsFooResource::name) + .Field("value", &XdsFooResource::value) + .Finish(); + return loader; + } + + static absl::string_view TypeUrl() { return "test.v3.foo"; } + }; + using XdsFooResourceType = XdsTestResourceType; + + // A fake "Bar" xDS resource type. + struct XdsBarResource : public XdsResourceType::ResourceData { + std::string name; + std::string value; + + XdsBarResource() = default; + XdsBarResource(std::string name, std::string value) + : name(std::move(name)), value(std::move(value)) {} + + bool operator==(const XdsBarResource& other) const { + return name == other.name && value == other.value; + } + + std::string AsJsonString() const { + return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, + "\"}"); + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + static const auto* loader = JsonObjectLoader() + .Field("name", &XdsBarResource::name) + .Field("value", &XdsBarResource::value) + .Finish(); + return loader; + } + + static absl::string_view TypeUrl() { return "test.v3.bar"; } + }; + using XdsBarResourceType = XdsTestResourceType; + + // A fake "WildcardCapable" xDS resource type. + // This resource type return true for AllResourcesRequiredInSotW(), + // just like LDS and CDS. + struct XdsWildcardCapableResource : public XdsResourceType::ResourceData { + std::string name; + uint32_t value; + + XdsWildcardCapableResource() = default; + XdsWildcardCapableResource(std::string name, uint32_t value) + : name(std::move(name)), value(value) {} + + bool operator==(const XdsWildcardCapableResource& other) const { + return name == other.name && value == other.value; + } + + std::string AsJsonString() const { + return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, + "\"}"); + } + + static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { + static const auto* loader = + JsonObjectLoader() + .Field("name", &XdsWildcardCapableResource::name) + .Field("value", &XdsWildcardCapableResource::value) + .Finish(); + return loader; + } + + static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; } + }; + using XdsWildcardCapableResourceType = + XdsTestResourceType; + + // A helper class to build and serialize a DiscoveryResponse. + class ResponseBuilder { + public: + explicit ResponseBuilder(absl::string_view type_url) { + response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url)); + } + + ResponseBuilder& set_version_info(absl::string_view version_info) { + response_.set_version_info(std::string(version_info)); + return *this; + } + ResponseBuilder& set_nonce(absl::string_view nonce) { + response_.set_nonce(std::string(nonce)); + return *this; + } + + template + ResponseBuilder& AddResource( + const typename ResourceType::ResourceType& resource, + bool in_resource_wrapper = false) { + auto* res = response_.add_resources(); + *res = ResourceType::EncodeAsAny(resource); + if (in_resource_wrapper) { + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name(resource.name); + *resource_wrapper.mutable_resource() = std::move(*res); + res->PackFrom(resource_wrapper); + } + return *this; + } + + ResponseBuilder& AddFooResource(const XdsFooResource& resource, + bool in_resource_wrapper = false) { + return AddResource(resource, in_resource_wrapper); + } + + ResponseBuilder& AddBarResource(const XdsBarResource& resource, + bool in_resource_wrapper = false) { + return AddResource(resource, in_resource_wrapper); + } + + ResponseBuilder& AddWildcardCapableResource( + const XdsWildcardCapableResource& resource, + bool in_resource_wrapper = false) { + return AddResource(resource, + in_resource_wrapper); + } + + ResponseBuilder& AddInvalidResource( + absl::string_view type_url, absl::string_view value, + absl::string_view resource_wrapper_name = "") { + auto* res = response_.add_resources(); + res->set_type_url(absl::StrCat("type.googleapis.com/", type_url)); + res->set_value(std::string(value)); + if (!resource_wrapper_name.empty()) { + envoy::service::discovery::v3::Resource resource_wrapper; + resource_wrapper.set_name(std::string(resource_wrapper_name)); + *resource_wrapper.mutable_resource() = std::move(*res); + res->PackFrom(resource_wrapper); + } + return *this; + } + + ResponseBuilder& AddInvalidResourceWrapper() { + auto* res = response_.add_resources(); + res->set_type_url( + "type.googleapis.com/envoy.service.discovery.v3.Resource"); + res->set_value(std::string("\0", 1)); + return *this; + } + + ResponseBuilder& AddEmptyResource() { + response_.add_resources(); + return *this; + } + + std::string Serialize() { + std::string serialized_response; + EXPECT_TRUE(response_.SerializeToString(&serialized_response)); + return serialized_response; + } + + private: + DiscoveryResponse response_; + }; + + // Sets transport_factory_ and initializes xds_client_ with the + // specified bootstrap config. + void InitXdsClient( + FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), + Duration resource_request_timeout = Duration::Seconds(15)) { + auto transport_factory = MakeOrphanable(); + transport_factory_ = transport_factory->Ref(); + xds_client_ = MakeRefCounted( + bootstrap_builder.Build(), std::move(transport_factory), + grpc_event_engine::experimental::GetDefaultEventEngine(), "foo agent", + "foo version", resource_request_timeout * grpc_test_slowdown_factor()); + } + + // Starts and cancels a watch for a Foo resource. + RefCountedPtr StartFooWatch( + absl::string_view resource_name) { + auto watcher = MakeRefCounted(); + XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); + return watcher; + } + void CancelFooWatch(XdsFooResourceType::Watcher* watcher, + absl::string_view resource_name, + bool delay_unsubscription = false) { + XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, + delay_unsubscription); + } + + // Starts and cancels a watch for a Bar resource. + RefCountedPtr StartBarWatch( + absl::string_view resource_name) { + auto watcher = MakeRefCounted(); + XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); + return watcher; + } + void CancelBarWatch(XdsBarResourceType::Watcher* watcher, + absl::string_view resource_name, + bool delay_unsubscription = false) { + XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, + delay_unsubscription); + } + + // Starts and cancels a watch for a WildcardCapable resource. + RefCountedPtr + StartWildcardCapableWatch(absl::string_view resource_name) { + auto watcher = MakeRefCounted(); + XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name, + watcher); + return watcher; + } + void CancelWildcardCapableWatch( + XdsWildcardCapableResourceType::Watcher* watcher, + absl::string_view resource_name, bool delay_unsubscription = false) { + XdsWildcardCapableResourceType::CancelWatch( + xds_client_.get(), resource_name, watcher, delay_unsubscription); + } + + RefCountedPtr WaitForAdsStream( + const XdsBootstrap::XdsServer& server, + absl::Duration timeout = absl::Seconds(5)) { + const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); + GPR_ASSERT(xds_server != nullptr); + return transport_factory_->WaitForStream( + *xds_server, FakeXdsTransportFactory::kAdsMethod, + timeout * grpc_test_slowdown_factor()); + } + + void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, + absl::Status status) { + const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); + GPR_ASSERT(xds_server != nullptr); + transport_factory_->TriggerConnectionFailure(*xds_server, + std::move(status)); + } + + RefCountedPtr WaitForAdsStream( + absl::Duration timeout = absl::Seconds(5)) { + return WaitForAdsStream(xds_client_->bootstrap().server(), timeout); + } + + // Gets the latest request sent to the fake xDS server. + absl::optional WaitForRequest( + FakeXdsTransportFactory::FakeStreamingCall* stream, + absl::Duration timeout = absl::Seconds(3), + SourceLocation location = SourceLocation()) { + auto message = + stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor()); + if (!message.has_value()) return absl::nullopt; + DiscoveryRequest request; + bool success = request.ParseFromString(*message); + EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at " + << location.file() << ":" << location.line(); + if (!success) return absl::nullopt; + return std::move(request); + } + + // Helper function to check the fields of a DiscoveryRequest. + void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url, + absl::string_view version_info, + absl::string_view response_nonce, + const absl::Status& error_detail, + const std::set& resource_names, + SourceLocation location = SourceLocation()) { + EXPECT_EQ(request.type_url(), + absl::StrCat("type.googleapis.com/", type_url)) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.version_info(), version_info) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.response_nonce(), response_nonce) + << location.file() << ":" << location.line(); + if (error_detail.ok()) { + EXPECT_FALSE(request.has_error_detail()) + << location.file() << ":" << location.line(); + } else { + EXPECT_EQ(request.error_detail().code(), + static_cast(error_detail.code())) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.error_detail().message(), error_detail.message()) + << location.file() << ":" << location.line(); + } + EXPECT_THAT(request.resource_names(), + ::testing::UnorderedElementsAreArray(resource_names)) + << location.file() << ":" << location.line(); + } + + // Helper function to check the contents of the node message in a + // request against the client's node info. + void CheckRequestNode(const DiscoveryRequest& request, + SourceLocation location = SourceLocation()) { + // These fields come from the bootstrap config. + EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().cluster(), + xds_client_->bootstrap().node()->cluster()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().region(), + xds_client_->bootstrap().node()->locality_region()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().zone(), + xds_client_->bootstrap().node()->locality_zone()) + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().locality().sub_zone(), + xds_client_->bootstrap().node()->locality_sub_zone()) + << location.file() << ":" << location.line(); + if (xds_client_->bootstrap().node()->metadata().empty()) { + EXPECT_FALSE(request.node().has_metadata()) + << location.file() << ":" << location.line(); + } else { + std::string metadata_json_str; + auto status = + MessageToJsonString(request.node().metadata(), &metadata_json_str, + GRPC_CUSTOM_JSONUTIL::JsonPrintOptions()); + ASSERT_TRUE(status.ok()) + << status << " on " << location.file() << ":" << location.line(); + auto metadata_json = JsonParse(metadata_json_str); + ASSERT_TRUE(metadata_json.ok()) + << metadata_json.status() << " on " << location.file() << ":" + << location.line(); + Json expected = + Json::FromObject(xds_client_->bootstrap().node()->metadata()); + EXPECT_EQ(*metadata_json, expected) + << location.file() << ":" << location.line() + << ":\nexpected: " << JsonDump(expected) + << "\nactual: " << JsonDump(*metadata_json); + } + EXPECT_EQ(request.node().user_agent_name(), "foo agent") + << location.file() << ":" << location.line(); + EXPECT_EQ(request.node().user_agent_version(), "foo version") + << location.file() << ":" << location.line(); + } + + RefCountedPtr transport_factory_; + RefCountedPtr xds_client_; +}; TEST_F(XdsClientTest, BasicWatch) { InitXdsClient(); @@ -1999,6 +2718,78 @@ TEST_F(XdsClientTest, FederationChannelFailureReportedToWatchers) { EXPECT_TRUE(stream2->Orphaned()); } +TEST_F(XdsClientTest, AdsReadWaitsForHandleRelease) { + InitXdsClient(); + // Start watches for "foo1" and "foo2". + auto watcher1 = StartFooWatch("foo1"); + auto watcher2 = StartFooWatch("foo2"); + // Watchers should initially not see any resource reported. + EXPECT_FALSE(watcher1->HasEvent()); + EXPECT_FALSE(watcher2->HasEvent()); + // XdsClient should have created an ADS stream. + auto stream = WaitForAdsStream(); + ASSERT_TRUE(stream != nullptr); + // XdsClient should have sent a subscription request on the ADS stream. + auto request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + // Send a response with 2 resources. + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("1") + .set_nonce("A") + .AddFooResource(XdsFooResource("foo1", 6)) + .AddFooResource(XdsFooResource("foo2", 6)) + .Serialize()); + // Send a response with a single resource + stream->SendMessageToClient( + ResponseBuilder(XdsFooResourceType::Get()->type_url()) + .set_version_info("2") + .set_nonce("B") + .AddFooResource(XdsFooResource("foo1", 8)) + .Serialize()); + // XdsClient should have delivered the response to the watcher. + auto resource1 = watcher1->WaitForNextResourceAndHandle(); + ASSERT_NE(resource1, absl::nullopt); + auto resource2 = watcher2->WaitForNextResourceAndHandle(); + ASSERT_NE(resource2, absl::nullopt); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"1", /*response_nonce=*/"A", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + EXPECT_EQ(stream->reads_started(), 1); + resource1 = absl::nullopt; + resource2 = absl::nullopt; + EXPECT_EQ(stream->reads_started(), 2); + resource1 = watcher1->WaitForNextResourceAndHandle(); + resource2 = watcher2->WaitForNextResourceAndHandle(); + ASSERT_NE(resource1, absl::nullopt); + EXPECT_EQ(resource2, absl::nullopt); + // XdsClient should have sent an ACK message to the xDS server. + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo1", "foo2"}); + resource1 = absl::nullopt; + EXPECT_EQ(stream->reads_started(), 3); + // Cancel watch. + CancelFooWatch(watcher1.get(), "foo1"); + request = WaitForRequest(stream.get()); + ASSERT_TRUE(request.has_value()); + CheckRequest(*request, XdsFooResourceType::Get()->type_url(), + /*version_info=*/"2", /*response_nonce=*/"B", + /*error_detail=*/absl::OkStatus(), + /*resource_names=*/{"foo2"}); + CancelFooWatch(watcher2.get(), "foo2"); + EXPECT_TRUE(stream->Orphaned()); +} + } // namespace } // namespace testing } // namespace grpc_core diff --git a/test/core/xds/xds_client_test_lib.h b/test/core/xds/xds_client_test_lib.h deleted file mode 100644 index 74dd475ae6484..0000000000000 --- a/test/core/xds/xds_client_test_lib.h +++ /dev/null @@ -1,790 +0,0 @@ -// -// Copyright 2023 gRPC authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// - -#ifndef GRPC_TEST_CORE_XDS_XDS_CLIENT_TEST_LIB_H -#define GRPC_TEST_CORE_XDS_XDS_CLIENT_TEST_LIB_H - -#include - -#include -#include -#include -#include -#include -#include -#include - -#include - -#include "absl/base/thread_annotations.h" -#include "absl/status/status.h" -#include "absl/status/statusor.h" -#include "absl/strings/str_cat.h" -#include "absl/strings/string_view.h" -#include "absl/time/time.h" -#include "absl/types/optional.h" -#include "absl/types/variant.h" -#include "gmock/gmock.h" -#include "google/protobuf/json/json.h" -#include "google/protobuf/struct.pb.h" -#include "google/protobuf/util/json_util.h" -#include "gtest/gtest.h" -#include "upb/reflection/def.h" - -#include -#include -#include - -#include "src/core/ext/xds/xds_bootstrap.h" -#include "src/core/ext/xds/xds_client.h" -#include "src/core/ext/xds/xds_resource_type.h" -#include "src/core/ext/xds/xds_resource_type_impl.h" -#include "src/core/lib/event_engine/default_event_engine.h" -#include "src/core/lib/gprpp/debug_location.h" -#include "src/core/lib/gprpp/orphanable.h" -#include "src/core/lib/gprpp/ref_counted_ptr.h" -#include "src/core/lib/gprpp/sync.h" -#include "src/core/lib/gprpp/time.h" -#include "src/core/lib/json/json.h" -#include "src/core/lib/json/json_args.h" -#include "src/core/lib/json/json_object_loader.h" -#include "src/core/lib/json/json_reader.h" -#include "src/core/lib/json/json_writer.h" -#include "src/proto/grpc/testing/xds/v3/base.pb.h" -#include "src/proto/grpc/testing/xds/v3/discovery.pb.h" -#include "test/core/util/test_config.h" -#include "test/core/xds/xds_transport_fake.h" - -namespace grpc_core { -namespace testing { - -using envoy::service::discovery::v3::DiscoveryRequest; -using envoy::service::discovery::v3::DiscoveryResponse; - -class XdsClientTestBase : public ::testing::Test { - protected: - // A fake bootstrap implementation that allows tests to populate the - // fields however they want. - class FakeXdsBootstrap : public XdsBootstrap { - public: - class FakeNode : public Node { - public: - FakeNode() : id_("xds_client_test") {} - const std::string& id() const override { return id_; } - const std::string& cluster() const override { return cluster_; } - const std::string& locality_region() const override { - return locality_region_; - } - const std::string& locality_zone() const override { - return locality_zone_; - } - const std::string& locality_sub_zone() const override { - return locality_sub_zone_; - } - const Json::Object& metadata() const override { return metadata_; } - - void set_id(std::string id) { id_ = std::move(id); } - void set_cluster(std::string cluster) { cluster_ = std::move(cluster); } - void set_locality_region(std::string locality_region) { - locality_region_ = std::move(locality_region); - } - void set_locality_zone(std::string locality_zone) { - locality_zone_ = std::move(locality_zone); - } - void set_locality_sub_zone(std::string locality_sub_zone) { - locality_sub_zone_ = std::move(locality_sub_zone); - } - void set_metadata(Json::Object metadata) { - metadata_ = std::move(metadata); - } - - private: - std::string id_; - std::string cluster_; - std::string locality_region_; - std::string locality_zone_; - std::string locality_sub_zone_; - Json::Object metadata_; - }; - - class FakeXdsServer : public XdsServer { - public: - const std::string& server_uri() const override { return server_uri_; } - bool IgnoreResourceDeletion() const override { - return ignore_resource_deletion_; - } - bool Equals(const XdsServer& other) const override { - const auto& o = static_cast(other); - return server_uri_ == o.server_uri_ && - ignore_resource_deletion_ == o.ignore_resource_deletion_; - } - - void set_server_uri(std::string server_uri) { - server_uri_ = std::move(server_uri); - } - void set_ignore_resource_deletion(bool ignore_resource_deletion) { - ignore_resource_deletion_ = ignore_resource_deletion; - } - - private: - std::string server_uri_ = "default_xds_server"; - bool ignore_resource_deletion_ = false; - }; - - class FakeAuthority : public Authority { - public: - const XdsServer* server() const override { - return server_.has_value() ? &*server_ : nullptr; - } - - void set_server(absl::optional server) { - server_ = std::move(server); - } - - private: - absl::optional server_; - }; - - class Builder { - public: - Builder() { node_.emplace(); } - - Builder& set_node_id(std::string id) { - if (!node_.has_value()) node_.emplace(); - node_->set_id(std::move(id)); - return *this; - } - Builder& AddAuthority(std::string name, FakeAuthority authority) { - authorities_[std::move(name)] = std::move(authority); - return *this; - } - Builder& set_ignore_resource_deletion(bool ignore_resource_deletion) { - server_.set_ignore_resource_deletion(ignore_resource_deletion); - return *this; - } - std::unique_ptr Build() { - auto bootstrap = std::make_unique(); - bootstrap->server_ = std::move(server_); - bootstrap->node_ = std::move(node_); - bootstrap->authorities_ = std::move(authorities_); - return bootstrap; - } - - private: - FakeXdsServer server_; - absl::optional node_; - std::map authorities_; - }; - - std::string ToString() const override { return ""; } - - const XdsServer& server() const override { return server_; } - const Node* node() const override { - return node_.has_value() ? &*node_ : nullptr; - } - const Authority* LookupAuthority(const std::string& name) const override { - auto it = authorities_.find(name); - if (it == authorities_.end()) return nullptr; - return &it->second; - } - const XdsServer* FindXdsServer(const XdsServer& server) const override { - const auto& fake_server = static_cast(server); - if (fake_server == server_) return &server_; - for (const auto& p : authorities_) { - const auto* authority_server = - static_cast(p.second.server()); - if (authority_server != nullptr && *authority_server == fake_server) { - return authority_server; - } - } - return nullptr; - } - - private: - FakeXdsServer server_; - absl::optional node_; - std::map authorities_; - }; - - // A template for a test xDS resource type with an associated watcher impl. - // For simplicity, we use JSON instead of proto for serialization. - // - // The specified ResourceStruct must provide the following: - // - a static JsonLoader() method, as described in json_object_loader.h - // - an AsJsonString() method that returns the object in JSON string form - // - a static TypeUrl() method that returns the resource type - // - // The all_resources_required_in_sotw parameter indicates the value - // that should be returned by the AllResourcesRequiredInSotW() method. - template - class XdsTestResourceType - : public XdsResourceTypeImpl< - XdsTestResourceType, - ResourceStruct> { - public: - class ResourceAndReadDelayHandle { - public: - ResourceAndReadDelayHandle( - std::shared_ptr resource, - RefCountedPtr read_delay_handle) - : resource_(std::move(resource)), - read_delay_handle_(std::move(read_delay_handle)) {} - - absl::string_view name() const { return resource_->name; } - - int resource_value() const { return resource_->value; } - - std::shared_ptr resource() const { - return resource_; - } - - private: - std::shared_ptr resource_; - RefCountedPtr read_delay_handle_; - }; - - // A watcher implementation that queues delivered watches. - class Watcher : public XdsResourceTypeImpl< - XdsTestResourceType, - ResourceStruct>::WatcherInterface { - public: - // Returns true if no event is received during the timeout period. - bool ExpectNoEvent(absl::Duration timeout) { - MutexLock lock(&mu_); - return !WaitForEventLocked(timeout); - } - - bool HasEvent() { - MutexLock lock(&mu_); - return !queue_.empty(); - } - - absl::optional WaitForNextResourceAndHandle( - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return absl::nullopt; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) - ? "error" - : "does-not-exist") - << " at " << location.file() << ":" << location.line(); - return absl::nullopt; - } - auto foo = std::move(absl::get(event)); - queue_.pop_front(); - return foo; - } - - std::shared_ptr WaitForNextResource( - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - auto resource_and_handle = - WaitForNextResourceAndHandle(timeout, location); - if (!resource_and_handle.has_value()) { - return nullptr; - } - return resource_and_handle->resource(); - } - - absl::optional WaitForNextError( - absl::Duration timeout = absl::Seconds(1), - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return absl::nullopt; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) - ? "resource" - : "does-not-exist") - << " at " << location.file() << ":" << location.line(); - return absl::nullopt; - } - absl::Status error = std::move(absl::get(event)); - queue_.pop_front(); - return std::move(error); - } - - bool WaitForDoesNotExist(absl::Duration timeout, - SourceLocation location = SourceLocation()) { - MutexLock lock(&mu_); - if (!WaitForEventLocked(timeout)) return false; - Event& event = queue_.front(); - if (!absl::holds_alternative(event)) { - EXPECT_TRUE(false) - << "got unexpected event " - << (absl::holds_alternative(event) ? "error" - : "resource") - << " at " << location.file() << ":" << location.line(); - return false; - } - queue_.pop_front(); - return true; - } - - private: - struct DoesNotExist {}; - using Event = - absl::variant; - - void OnResourceChanged(std::shared_ptr foo, - RefCountedPtr - read_delay_handle) override { - MutexLock lock(&mu_); - queue_.emplace_back(ResourceAndReadDelayHandle( - std::move(foo), std::move(read_delay_handle))); - cv_.Signal(); - } - - void OnError( - absl::Status status, - RefCountedPtr /* read_delay_handle */) - override { - MutexLock lock(&mu_); - queue_.push_back(std::move(status)); - cv_.Signal(); - } - - void OnResourceDoesNotExist( - RefCountedPtr /* read_delay_handle */) - override { - MutexLock lock(&mu_); - queue_.push_back(DoesNotExist()); - cv_.Signal(); - } - - // Returns true if an event was received, or false if the timeout - // expires before any event is received. - bool WaitForEventLocked(absl::Duration timeout) - ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) { - while (queue_.empty()) { - if (cv_.WaitWithTimeout(&mu_, - timeout * grpc_test_slowdown_factor())) { - return false; - } - } - return true; - } - - Mutex mu_; - CondVar cv_; - std::deque queue_ ABSL_GUARDED_BY(&mu_); - }; - - absl::string_view type_url() const override { - return ResourceStruct::TypeUrl(); - } - XdsResourceType::DecodeResult Decode( - const XdsResourceType::DecodeContext& /*context*/, - absl::string_view serialized_resource) const override { - auto json = JsonParse(serialized_resource); - XdsResourceType::DecodeResult result; - if (!json.ok()) { - result.resource = json.status(); - } else { - absl::StatusOr foo = - LoadFromJson(*json); - if (!foo.ok()) { - auto it = json->object().find("name"); - if (it != json->object().end()) { - result.name = it->second.string(); - } - result.resource = foo.status(); - } else { - result.name = foo->name; - result.resource = std::make_unique(std::move(*foo)); - } - } - return result; - } - bool AllResourcesRequiredInSotW() const override { - return all_resources_required_in_sotw; - } - void InitUpbSymtab(XdsClient*, upb_DefPool* /*symtab*/) const override {} - - static google::protobuf::Any EncodeAsAny(const ResourceStruct& resource) { - google::protobuf::Any any; - any.set_type_url( - absl::StrCat("type.googleapis.com/", ResourceStruct::TypeUrl())); - any.set_value(resource.AsJsonString()); - return any; - } - }; - - // A fake "Foo" xDS resource type. - struct XdsFooResource : public XdsResourceType::ResourceData { - std::string name; - uint32_t value; - - XdsFooResource() = default; - XdsFooResource(std::string name, uint32_t value) - : name(std::move(name)), value(value) {} - - bool operator==(const XdsFooResource& other) const { - return name == other.name && value == other.value; - } - - std::string AsJsonString() const { - return absl::StrCat("{\"name\":\"", name, "\",\"value\":", value, "}"); - } - - static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { - static const auto* loader = JsonObjectLoader() - .Field("name", &XdsFooResource::name) - .Field("value", &XdsFooResource::value) - .Finish(); - return loader; - } - - static absl::string_view TypeUrl() { return "test.v3.foo"; } - }; - using XdsFooResourceType = XdsTestResourceType; - - // A fake "Bar" xDS resource type. - struct XdsBarResource : public XdsResourceType::ResourceData { - std::string name; - std::string value; - - XdsBarResource() = default; - XdsBarResource(std::string name, std::string value) - : name(std::move(name)), value(std::move(value)) {} - - bool operator==(const XdsBarResource& other) const { - return name == other.name && value == other.value; - } - - std::string AsJsonString() const { - return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, - "\"}"); - } - - static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { - static const auto* loader = JsonObjectLoader() - .Field("name", &XdsBarResource::name) - .Field("value", &XdsBarResource::value) - .Finish(); - return loader; - } - - static absl::string_view TypeUrl() { return "test.v3.bar"; } - }; - using XdsBarResourceType = XdsTestResourceType; - - // A fake "WildcardCapable" xDS resource type. - // This resource type return true for AllResourcesRequiredInSotW(), - // just like LDS and CDS. - struct XdsWildcardCapableResource : public XdsResourceType::ResourceData { - std::string name; - uint32_t value; - - XdsWildcardCapableResource() = default; - XdsWildcardCapableResource(std::string name, uint32_t value) - : name(std::move(name)), value(value) {} - - bool operator==(const XdsWildcardCapableResource& other) const { - return name == other.name && value == other.value; - } - - std::string AsJsonString() const { - return absl::StrCat("{\"name\":\"", name, "\",\"value\":\"", value, - "\"}"); - } - - static const JsonLoaderInterface* JsonLoader(const JsonArgs&) { - static const auto* loader = - JsonObjectLoader() - .Field("name", &XdsWildcardCapableResource::name) - .Field("value", &XdsWildcardCapableResource::value) - .Finish(); - return loader; - } - - static absl::string_view TypeUrl() { return "test.v3.wildcard_capable"; } - }; - using XdsWildcardCapableResourceType = - XdsTestResourceType; - - // A helper class to build and serialize a DiscoveryResponse. - class ResponseBuilder { - public: - explicit ResponseBuilder(absl::string_view type_url) { - response_.set_type_url(absl::StrCat("type.googleapis.com/", type_url)); - } - - ResponseBuilder& set_version_info(absl::string_view version_info) { - response_.set_version_info(std::string(version_info)); - return *this; - } - ResponseBuilder& set_nonce(absl::string_view nonce) { - response_.set_nonce(std::string(nonce)); - return *this; - } - - template - ResponseBuilder& AddResource( - const typename ResourceType::ResourceType& resource, - bool in_resource_wrapper = false) { - auto* res = response_.add_resources(); - *res = ResourceType::EncodeAsAny(resource); - if (in_resource_wrapper) { - envoy::service::discovery::v3::Resource resource_wrapper; - resource_wrapper.set_name(resource.name); - *resource_wrapper.mutable_resource() = std::move(*res); - res->PackFrom(resource_wrapper); - } - return *this; - } - - ResponseBuilder& AddFooResource(const XdsFooResource& resource, - bool in_resource_wrapper = false) { - return AddResource(resource, in_resource_wrapper); - } - - ResponseBuilder& AddBarResource(const XdsBarResource& resource, - bool in_resource_wrapper = false) { - return AddResource(resource, in_resource_wrapper); - } - - ResponseBuilder& AddWildcardCapableResource( - const XdsWildcardCapableResource& resource, - bool in_resource_wrapper = false) { - return AddResource(resource, - in_resource_wrapper); - } - - ResponseBuilder& AddInvalidResource( - absl::string_view type_url, absl::string_view value, - absl::string_view resource_wrapper_name = "") { - auto* res = response_.add_resources(); - res->set_type_url(absl::StrCat("type.googleapis.com/", type_url)); - res->set_value(std::string(value)); - if (!resource_wrapper_name.empty()) { - envoy::service::discovery::v3::Resource resource_wrapper; - resource_wrapper.set_name(std::string(resource_wrapper_name)); - *resource_wrapper.mutable_resource() = std::move(*res); - res->PackFrom(resource_wrapper); - } - return *this; - } - - ResponseBuilder& AddInvalidResourceWrapper() { - auto* res = response_.add_resources(); - res->set_type_url( - "type.googleapis.com/envoy.service.discovery.v3.Resource"); - res->set_value(std::string("\0", 1)); - return *this; - } - - ResponseBuilder& AddEmptyResource() { - response_.add_resources(); - return *this; - } - - std::string Serialize() { - std::string serialized_response; - EXPECT_TRUE(response_.SerializeToString(&serialized_response)); - return serialized_response; - } - - private: - DiscoveryResponse response_; - }; - - // Sets transport_factory_ and initializes xds_client_ with the - // specified bootstrap config. - void InitXdsClient( - FakeXdsBootstrap::Builder bootstrap_builder = FakeXdsBootstrap::Builder(), - Duration resource_request_timeout = Duration::Seconds(15)) { - auto transport_factory = MakeOrphanable(); - transport_factory_ = transport_factory->Ref(); - xds_client_ = MakeRefCounted( - bootstrap_builder.Build(), std::move(transport_factory), - grpc_event_engine::experimental::GetDefaultEventEngine(), "foo agent", - "foo version", resource_request_timeout * grpc_test_slowdown_factor()); - } - - // Starts and cancels a watch for a Foo resource. - RefCountedPtr StartFooWatch( - absl::string_view resource_name) { - auto watcher = MakeRefCounted(); - XdsFooResourceType::StartWatch(xds_client_.get(), resource_name, watcher); - return watcher; - } - void CancelFooWatch(XdsFooResourceType::Watcher* watcher, - absl::string_view resource_name, - bool delay_unsubscription = false) { - XdsFooResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, - delay_unsubscription); - } - - // Starts and cancels a watch for a Bar resource. - RefCountedPtr StartBarWatch( - absl::string_view resource_name) { - auto watcher = MakeRefCounted(); - XdsBarResourceType::StartWatch(xds_client_.get(), resource_name, watcher); - return watcher; - } - void CancelBarWatch(XdsBarResourceType::Watcher* watcher, - absl::string_view resource_name, - bool delay_unsubscription = false) { - XdsBarResourceType::CancelWatch(xds_client_.get(), resource_name, watcher, - delay_unsubscription); - } - - // Starts and cancels a watch for a WildcardCapable resource. - RefCountedPtr - StartWildcardCapableWatch(absl::string_view resource_name) { - auto watcher = MakeRefCounted(); - XdsWildcardCapableResourceType::StartWatch(xds_client_.get(), resource_name, - watcher); - return watcher; - } - void CancelWildcardCapableWatch( - XdsWildcardCapableResourceType::Watcher* watcher, - absl::string_view resource_name, bool delay_unsubscription = false) { - XdsWildcardCapableResourceType::CancelWatch( - xds_client_.get(), resource_name, watcher, delay_unsubscription); - } - - RefCountedPtr WaitForAdsStream( - const XdsBootstrap::XdsServer& server, - absl::Duration timeout = absl::Seconds(5)) { - const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); - GPR_ASSERT(xds_server != nullptr); - return transport_factory_->WaitForStream( - *xds_server, FakeXdsTransportFactory::kAdsMethod, - timeout * grpc_test_slowdown_factor()); - } - - void TriggerConnectionFailure(const XdsBootstrap::XdsServer& server, - absl::Status status) { - const auto* xds_server = xds_client_->bootstrap().FindXdsServer(server); - GPR_ASSERT(xds_server != nullptr); - transport_factory_->TriggerConnectionFailure(*xds_server, - std::move(status)); - } - - RefCountedPtr WaitForAdsStream( - absl::Duration timeout = absl::Seconds(5)) { - return WaitForAdsStream(xds_client_->bootstrap().server(), timeout); - } - - // Gets the latest request sent to the fake xDS server. - absl::optional WaitForRequest( - FakeXdsTransportFactory::FakeStreamingCall* stream, - absl::Duration timeout = absl::Seconds(3), - SourceLocation location = SourceLocation()) { - auto message = - stream->WaitForMessageFromClient(timeout * grpc_test_slowdown_factor()); - if (!message.has_value()) return absl::nullopt; - DiscoveryRequest request; - bool success = request.ParseFromString(*message); - EXPECT_TRUE(success) << "Failed to deserialize DiscoveryRequest at " - << location.file() << ":" << location.line(); - if (!success) return absl::nullopt; - return std::move(request); - } - - // Helper function to check the fields of a DiscoveryRequest. - void CheckRequest(const DiscoveryRequest& request, absl::string_view type_url, - absl::string_view version_info, - absl::string_view response_nonce, - const absl::Status& error_detail, - const std::set& resource_names, - SourceLocation location = SourceLocation()) { - EXPECT_EQ(request.type_url(), - absl::StrCat("type.googleapis.com/", type_url)) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.version_info(), version_info) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.response_nonce(), response_nonce) - << location.file() << ":" << location.line(); - if (error_detail.ok()) { - EXPECT_FALSE(request.has_error_detail()) - << location.file() << ":" << location.line(); - } else { - EXPECT_EQ(request.error_detail().code(), - static_cast(error_detail.code())) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.error_detail().message(), error_detail.message()) - << location.file() << ":" << location.line(); - } - EXPECT_THAT(request.resource_names(), - ::testing::UnorderedElementsAreArray(resource_names)) - << location.file() << ":" << location.line(); - } - - // Helper function to check the contents of the node message in a - // request against the client's node info. - void CheckRequestNode(const DiscoveryRequest& request, - SourceLocation location = SourceLocation()) { - // These fields come from the bootstrap config. - EXPECT_EQ(request.node().id(), xds_client_->bootstrap().node()->id()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().cluster(), - xds_client_->bootstrap().node()->cluster()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().region(), - xds_client_->bootstrap().node()->locality_region()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().zone(), - xds_client_->bootstrap().node()->locality_zone()) - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().locality().sub_zone(), - xds_client_->bootstrap().node()->locality_sub_zone()) - << location.file() << ":" << location.line(); - if (xds_client_->bootstrap().node()->metadata().empty()) { - EXPECT_FALSE(request.node().has_metadata()) - << location.file() << ":" << location.line(); - } else { - std::string metadata_json_str; - auto status = - MessageToJsonString(request.node().metadata(), &metadata_json_str, - GRPC_CUSTOM_JSONUTIL::JsonPrintOptions()); - ASSERT_TRUE(status.ok()) - << status << " on " << location.file() << ":" << location.line(); - auto metadata_json = JsonParse(metadata_json_str); - ASSERT_TRUE(metadata_json.ok()) - << metadata_json.status() << " on " << location.file() << ":" - << location.line(); - Json expected = - Json::FromObject(xds_client_->bootstrap().node()->metadata()); - EXPECT_EQ(*metadata_json, expected) - << location.file() << ":" << location.line() - << ":\nexpected: " << JsonDump(expected) - << "\nactual: " << JsonDump(*metadata_json); - } - EXPECT_EQ(request.node().user_agent_name(), "foo agent") - << location.file() << ":" << location.line(); - EXPECT_EQ(request.node().user_agent_version(), "foo version") - << location.file() << ":" << location.line(); - } - - RefCountedPtr transport_factory_; - RefCountedPtr xds_client_; -}; - -} // namespace testing -} // namespace grpc_core - -#endif // GRPC_TEST_CORE_XDS_XDS_CLIENT_TEST_LIB_H \ No newline at end of file diff --git a/test/core/xds/xds_transport_fake.cc b/test/core/xds/xds_transport_fake.cc index d6d794509cc01..ac141c7e0eff3 100644 --- a/test/core/xds/xds_transport_fake.cc +++ b/test/core/xds/xds_transport_fake.cc @@ -25,8 +25,6 @@ #include #include -#include "gtest/gtest.h" - #include #include @@ -50,6 +48,10 @@ FakeXdsTransportFactory::FakeStreamingCall::~FakeStreamingCall() { { MutexLock lock(&mu_); if (transport_->abort_on_undrained_messages()) { + for (const auto& message : from_client_messages_) { + gpr_log(GPR_ERROR, "From client message left in queue: %s", + message.c_str()); + } GPR_ASSERT(from_client_messages_.empty()); } } @@ -126,41 +128,48 @@ void FakeXdsTransportFactory::FakeStreamingCall::CompleteSendMessageFromClient( void FakeXdsTransportFactory::FakeStreamingCall::StartRecvMessage() { absl::optional pending; - ReleasableMutexLock lock(&mu_); - ASSERT_FALSE(read_pending_); - read_count_ += 1; + MutexLock lock(&mu_); + if (read_pending_) { + gpr_log(GPR_ERROR, + "StartRecvMessage had been called while there is already a pending " + "read request"); + return; + } + ++reads_started_; read_pending_ = true; if (!to_client_messages_.empty()) { // Dispatch pending message (if there's one) on a separate thread to avoid // recursion - std::thread thread( - [](const RefCountedPtr& call) { - call->DispatchPendingMessageToClient(); - }, - Ref()); - thread.detach(); + GetDefaultEventEngine()->Run( + [call = static_cast>(Ref())]() { + call->MaybeDeliverMessageToClient(); + }); } } void FakeXdsTransportFactory::FakeStreamingCall::SendMessageToClient( absl::string_view payload) { - ReleasableMutexLock lock(&mu_); - to_client_messages_.emplace_back(payload); - lock.Release(); - DispatchPendingMessageToClient(); + { + MutexLock lock(&mu_); + to_client_messages_.emplace_back(payload); + } + MaybeDeliverMessageToClient(); } -void FakeXdsTransportFactory::FakeStreamingCall:: - DispatchPendingMessageToClient() { - ReleasableMutexLock lock(&mu_); - if (!read_pending_ || to_client_messages_.empty()) { - return; +void FakeXdsTransportFactory::FakeStreamingCall::MaybeDeliverMessageToClient() { + RefCountedPtr event_handler; + std::string message; + { + ReleasableMutexLock lock(&mu_); + if (!read_pending_ || to_client_messages_.empty()) { + return; + } + read_pending_ = false; + message = std::move(to_client_messages_.front()); + to_client_messages_.pop_front(); + event_handler = event_handler_; + lock.Release(); } - read_pending_ = false; - std::string message = std::move(to_client_messages_.front()); - to_client_messages_.pop_front(); - auto event_handler = event_handler_; - lock.Release(); ExecCtx exec_ctx; event_handler->OnRecvMessage(message); } diff --git a/test/core/xds/xds_transport_fake.h b/test/core/xds/xds_transport_fake.h index 666970cf1e811..a1a9e8d85dbd0 100644 --- a/test/core/xds/xds_transport_fake.h +++ b/test/core/xds/xds_transport_fake.h @@ -88,9 +88,9 @@ class FakeXdsTransportFactory : public XdsTransportFactory { bool Orphaned(); - size_t read_count() { + size_t reads_started() { MutexLock lock(&mu_); - return read_count_; + return reads_started_; } private: @@ -116,7 +116,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory { void CompleteSendMessageFromClientLocked(bool ok) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_); - void DispatchPendingMessageToClient(); + void MaybeDeliverMessageToClient(); RefCountedPtr transport_; const char* method_; @@ -127,7 +127,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory { std::deque from_client_messages_ ABSL_GUARDED_BY(&mu_); bool status_sent_ ABSL_GUARDED_BY(&mu_) = false; bool orphaned_ ABSL_GUARDED_BY(&mu_) = false; - size_t read_count_ ABSL_GUARDED_BY(&mu_) = 0; + size_t reads_started_ ABSL_GUARDED_BY(&mu_) = 0; bool read_pending_ ABSL_GUARDED_BY(&mu_) = false; std::deque to_client_messages_ ABSL_GUARDED_BY(&mu_); }; @@ -143,7 +143,7 @@ class FakeXdsTransportFactory : public XdsTransportFactory { // EventHandler::OnRequestSent() upon reading a request from the client. // If this is set to false, that behavior will be inhibited, and // EventHandler::OnRequestSent() will not be called until the test - // expicitly calls FakeStreamingCall::CompleteSendMessageFromClient(). + // explicitly calls FakeStreamingCall::CompleteSendMessageFromClient(). // // This value affects all transports created after this call is // complete. Any transport that already exists prior to this call diff --git a/test/cpp/end2end/xds/BUILD b/test/cpp/end2end/xds/BUILD index 71674936d4dd8..5db35f4da1b68 100644 --- a/test/cpp/end2end/xds/BUILD +++ b/test/cpp/end2end/xds/BUILD @@ -109,7 +109,7 @@ grpc_cc_test( external_deps = [ "gtest", ], - # flaky = True, # TODO(b/144705388) + flaky = True, # TODO(b/144705388) linkstatic = True, # Fixes dyld error on MacOS shard_count = 50, tags = [