From 34242906c2968dde41a942f0fc0b488fc0ea23a6 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 2 Mar 2023 17:38:02 -0800 Subject: [PATCH 1/2] xdsclient: send node_proto only on first request on the stream --- xds/internal/testutils/fakeserver/server.go | 18 +- xds/internal/testutils/resource_watcher.go | 64 +++++++ xds/internal/xdsclient/authority_test.go | 52 ++---- xds/internal/xdsclient/loadreport_test.go | 4 +- .../xdsclient/tests/misc_watchers_test.go | 167 ++++++++++++++++++ .../xdsclient/tests/resource_update_test.go | 2 +- xds/internal/xdsclient/transport/transport.go | 30 +++- .../transport/transport_resource_test.go | 2 +- 8 files changed, 285 insertions(+), 54 deletions(-) create mode 100644 xds/internal/testutils/resource_watcher.go diff --git a/xds/internal/testutils/fakeserver/server.go b/xds/internal/testutils/fakeserver/server.go index 8030f923428c..48c2e0b8ef4c 100644 --- a/xds/internal/testutils/fakeserver/server.go +++ b/xds/internal/testutils/fakeserver/server.go @@ -114,13 +114,17 @@ func (wl *wrappedListener) Accept() (net.Conn, error) { return c, err } -// StartServer makes a new Server and gets it to start listening on a local -// port for gRPC requests. The returned cancel function should be invoked by -// the caller upon completion of the test. -func StartServer() (*Server, func(), error) { - lis, err := net.Listen("tcp", "localhost:0") - if err != nil { - return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err) +// StartServer makes a new Server and gets it to start listening on the given +// net.Listener. If the given net.Listener is nil, a new one is created on a +// local port for gRPC requests. The returned cancel function should be invoked +// by the caller upon completion of the test. +func StartServer(lis net.Listener) (*Server, func(), error) { + if lis == nil { + var err error + lis, err = net.Listen("tcp", "localhost:0") + if err != nil { + return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err) + } } s := &Server{ diff --git a/xds/internal/testutils/resource_watcher.go b/xds/internal/testutils/resource_watcher.go new file mode 100644 index 000000000000..01235b0ba66f --- /dev/null +++ b/xds/internal/testutils/resource_watcher.go @@ -0,0 +1,64 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + +// TestResourceWatcher implements the xdsresource.ResourceWatcher interface, +// used to receive updates on watches registered with the xDS client, when using +// the resource-type agnostic WatchResource API. +// +// Tests can the channels provided by this tyep to get access to updates and +// errors sent by the xDS client. +type TestResourceWatcher struct { + // UpdateCh is the channel on which xDS client updates are delivered. + UpdateCh chan *xdsresource.ResourceData + // ErrorCh is the channel on which errors from the xDS client are delivered. + ErrorCh chan error +} + +// OnUpdate is invoked by the xDS client to report an update on the resource +// being watched. +func (w *TestResourceWatcher) OnUpdate(data xdsresource.ResourceData) { + select { + case w.UpdateCh <- &data: + default: + } +} + +// OnError is invoked by the xDS client to report errors. +func (w *TestResourceWatcher) OnError(err error) { + select { + case w.ErrorCh <- err: + default: + } +} + +// OnResourceDoesNotExist is used by the xDS client to report that the resource +// being watched no longer exists. +func (w *TestResourceWatcher) OnResourceDoesNotExist() {} + +// NewTestResourceWatcher returns a TestResourceWatcher to watch for resources +// via the xDS client. +func NewTestResourceWatcher() *TestResourceWatcher { + return &TestResourceWatcher{ + UpdateCh: make(chan *xdsresource.ResourceData), + ErrorCh: make(chan error), + } +} diff --git a/xds/internal/xdsclient/authority_test.go b/xds/internal/xdsclient/authority_test.go index fa2661fcfd8d..4e8ea305921b 100644 --- a/xds/internal/xdsclient/authority_test.go +++ b/xds/internal/xdsclient/authority_test.go @@ -15,6 +15,7 @@ * limitations under the License. * */ + package xdsclient import ( @@ -32,6 +33,7 @@ import ( "google.golang.org/grpc/internal/testutils/xds/e2e" "google.golang.org/grpc/xds/internal" _ "google.golang.org/grpc/xds/internal/httpfilter/router" + "google.golang.org/grpc/xds/internal/testutils" "google.golang.org/grpc/xds/internal/xdsclient/bootstrap" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" @@ -39,34 +41,6 @@ import ( var emptyServerOpts = e2e.ManagementServerOptions{} -type testResourceWatcher struct { - updateCh chan *xdsresource.ResourceData - errorCh chan error -} - -func (w *testResourceWatcher) OnUpdate(data xdsresource.ResourceData) { - select { - case w.updateCh <- &data: - default: - } -} - -func (w *testResourceWatcher) OnError(err error) { - select { - case w.errorCh <- err: - default: - } -} - -func (w *testResourceWatcher) OnResourceDoesNotExist() {} - -func newTestResourceWatcher() *testResourceWatcher { - return &testResourceWatcher{ - updateCh: make(chan *xdsresource.ResourceData), - errorCh: make(chan error), - } -} - var ( // Listener resource type implementation retrieved from the resource type map // in the internal package, which is initialized when the individual resource @@ -131,7 +105,7 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) { defer a.close() rn := "xdsclient-test-lds-resource" - w := newTestResourceWatcher() + w := testutils.NewTestResourceWatcher() cancelResource := a.watchResource(listenerResourceType, rn, w) defer cancelResource() @@ -155,9 +129,9 @@ func (s) TestTimerAndWatchStateOnSendCallback(t *testing.T) { select { case <-ctx.Done(): t.Fatal("Test timed out before watcher received an update from server.") - case err := <-w.errorCh: + case err := <-w.ErrorCh: t.Fatalf("Watch got an unexpected error update: %q. Want valid updates.", err) - case <-w.updateCh: + case <-w.UpdateCh: // This means the OnUpdate callback was invoked and the watcher was notified. } if err := compareWatchState(a, rn, watchStateReceived); err != nil { @@ -176,7 +150,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) { defer a.close() rn := "xdsclient-test-lds-resource" - w := newTestResourceWatcher() + w := testutils.NewTestResourceWatcher() cancelResource := a.watchResource(listenerResourceType, rn, w) defer cancelResource() @@ -188,7 +162,7 @@ func (s) TestTimerAndWatchStateOnErrorCallback(t *testing.T) { select { case <-ctx.Done(): t.Fatal("Test timed out before verifying error propagation.") - case err := <-w.errorCh: + case err := <-w.ErrorCh: if xdsresource.ErrType(err) != xdsresource.ErrorTypeConnection { t.Fatal("Connection error not propagated to watchers.") } @@ -219,7 +193,7 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) { defer a.close() nameA := "xdsclient-test-lds-resourceA" - watcherA := newTestResourceWatcher() + watcherA := testutils.NewTestResourceWatcher() cancelA := a.watchResource(listenerResourceType, nameA, watcherA) if err := updateResourceInServer(ctx, ms, nameA, nodeID); err != nil { @@ -231,13 +205,13 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) { select { case <-ctx.Done(): t.Fatal("Test timed out before watcher received the update.") - case err := <-watcherA.errorCh: + case err := <-watcherA.ErrorCh: t.Fatalf("Watch got an unexpected error update: %q; want: valid update.", err) - case <-watcherA.updateCh: + case <-watcherA.UpdateCh: } nameB := "xdsclient-test-lds-resourceB" - watcherB := newTestResourceWatcher() + watcherB := testutils.NewTestResourceWatcher() cancelB := a.watchResource(listenerResourceType, nameB, watcherB) defer cancelB() @@ -249,9 +223,9 @@ func (s) TestWatchResourceTimerCanRestartOnIgnoredADSRecvError(t *testing.T) { select { case <-ctx.Done(): t.Fatal("Test timed out before mgmt server got the request.") - case u := <-watcherB.updateCh: + case u := <-watcherB.UpdateCh: t.Fatalf("Watch got an unexpected resource update: %v.", u) - case gotErr := <-watcherB.errorCh: + case gotErr := <-watcherB.ErrorCh: wantErr := xdsresource.ErrorTypeConnection if xdsresource.ErrType(gotErr) != wantErr { t.Fatalf("Watch got an unexpected error:%q. Want: %q.", gotErr, wantErr) diff --git a/xds/internal/xdsclient/loadreport_test.go b/xds/internal/xdsclient/loadreport_test.go index 23fb2d4cf46c..92450ce747e0 100644 --- a/xds/internal/xdsclient/loadreport_test.go +++ b/xds/internal/xdsclient/loadreport_test.go @@ -43,7 +43,7 @@ const ( ) func (s) TestLRSClient(t *testing.T) { - fs, sCleanup, err := fakeserver.StartServer() + fs, sCleanup, err := fakeserver.StartServer(nil) if err != nil { t.Fatalf("failed to start fake xDS server: %v", err) } @@ -83,7 +83,7 @@ func (s) TestLRSClient(t *testing.T) { t.Errorf("unexpected NewConn: %v, %v, want channel recv timeout", u, err) } - fs2, sCleanup2, err := fakeserver.StartServer() + fs2, sCleanup2, err := fakeserver.StartServer(nil) if err != nil { t.Fatalf("failed to start fake xDS server: %v", err) } diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index 1b67f547e93c..bff510a9d10f 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -20,14 +20,31 @@ package xdsclient_test import ( "context" + "fmt" "testing" + "github.com/google/uuid" "google.golang.org/grpc/internal/testutils" + "google.golang.org/grpc/internal/testutils/xds/bootstrap" "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal" + xdstestutils "google.golang.org/grpc/xds/internal/testutils" + "google.golang.org/grpc/xds/internal/testutils/fakeserver" "google.golang.org/grpc/xds/internal/xdsclient" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/types/known/anypb" v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +var ( + // Resource type implementations retrieved from the resource type map in the + // internal package, which is initialized when the individual resource types + // are created. + listenerResourceType = internal.ResourceTypeMapForTesting[version.V3ListenerURL].(xdsresource.Type) + routeConfigResourceType = internal.ResourceTypeMapForTesting[version.V3RouteConfigURL].(xdsresource.Type) ) // TestWatchCallAnotherWatch tests the scenario where a watch is registered for @@ -129,3 +146,153 @@ func (s) TestWatchCallAnotherWatch(t *testing.T) { t.Fatal(err) } } + +// TestNodeProtoSentOnlyInFirstRequest verifies that a non-empty node proto gets +// sent only on the first discovery request message on the ADS stream. +// +// It also verifies the same behavior holds after a stream restart. +func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { + overrideFedEnvVar(t) + + // Create a restartable listener which can close existing connections. + l, err := testutils.LocalTCPListener() + if err != nil { + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + // Start a fake xDS management server with the above restartable listener. + // + // We are unable to use the go-control-plane server here, because it caches + // the node proto received in the first request message and adds it to + // subsequent requests before invoking the OnStreamRequest() callback. + // Therefore we cannot verify what is sent by the xDS client. + mgmtServer, cleanup, err := fakeserver.StartServer(lis) + if err != nil { + t.Fatalf("Failed to start fake xDS server: %v", err) + } + defer cleanup() + + // Create a bootstrap file in a temporary directory. + nodeID := uuid.New().String() + bootstrapContents, err := bootstrap.Contents(bootstrap.Options{ + NodeID: nodeID, + ServerURI: mgmtServer.Address, + }) + if err != nil { + t.Fatalf("Failed to create bootstrap file: %v", err) + } + + // Create an xDS client with the above bootstrap contents. + client, close, err := xdsclient.NewWithBootstrapContentsForTesting(bootstrapContents) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + defer close() + + // Configure a listener resource on the fake xDS server. + const ( + serviceName = "my-service-client-side-xds" + routeConfigName = "route-" + serviceName + clusterName = "cluster-" + serviceName + ) + lisAny, err := anypb.New(e2e.DefaultClientListener(serviceName, routeConfigName)) + if err != nil { + t.Fatalf("Failed to marshal listener resource into an Any proto: %v", err) + } + mgmtServer.XDSResponseChan <- &fakeserver.Response{ + Resp: &v3discoverypb.DiscoveryResponse{ + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + VersionInfo: "1", + Resources: []*anypb.Any{lisAny}, + }, + } + + // Register a watch for the Listener resource. + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + watcher := xdstestutils.NewTestResourceWatcher() + client.WatchResource(listenerResourceType, serviceName, watcher) + + // The first request on the stream must contain a non-empty node proto. + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, false); err != nil { + t.Fatal(err) + } + + // The xDS client is expected to ACK the Listener resource. The discovery + // request corresponding to the ACK must contain a nil node proto. + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + t.Fatal(err) + } + + // Configure the route configuration resource on the fake xDS server. + rcAny, err := anypb.New(e2e.DefaultRouteConfig(routeConfigName, serviceName, clusterName)) + if err != nil { + t.Fatalf("Failed to marshal route configuration resource into an Any proto: %v", err) + } + mgmtServer.XDSResponseChan <- &fakeserver.Response{ + Resp: &v3discoverypb.DiscoveryResponse{ + TypeUrl: "type.googleapis.com/envoy.config.route.v3.RouteConfiguration", + VersionInfo: "1", + Resources: []*anypb.Any{rcAny}, + }, + } + + // Register a watch for a RouteConfiguration resource. Ensure that the + // discovery requests for the route configuration resource and the + // subsequent ACK contains an empty node proto. + client.WatchResource(routeConfigResourceType, routeConfigName, watcher) + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + t.Fatal(err) + } + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + t.Fatal(err) + } + + // Stop the management server and expect the error callback to be invoked. + lis.Stop() + select { + case <-ctx.Done(): + t.Fatal("Timeout when waiting for the connection error to be propagated to the watcher") + case <-watcher.ErrorCh: + } + + // Restart the management server. + lis.Restart() + + // The xDS client is expected to re-request previously requested resources. + // Hence, we expect two DiscoveryRequest messages (one for the Listener and + // one for the RouteConfiguration resource). The first message should + // contain a non-nil node proto and second one should contain a nil-proto. + // + // And since we don't push any responses on the response channel of the fake + // server, we do not expect any ACKs here. + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, false); err != nil { + t.Fatal(err) + } + + // The xDS client is expected to ACK the Listener resource. The discovery + // request corresponding to the ACK must contain a nil node proto. + if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + t.Fatal(err) + } +} + +// readDiscoveryResponseAndCheckForNodeProto reads a discovery request message +// out of the provided reqCh. It returns an error if it fails to read a message +// before the context deadline expires. +// +// wantEmptyNodeProto indicates whether the request message is expected to +// contain an empty node proto. This function returns an error if that is not +// the case. +func readDiscoveryResponseAndCheckForNodeProto(ctx context.Context, reqCh *testutils.Channel, wantEmptyNodeProto bool) error { + v, err := reqCh.Receive(ctx) + if err != nil { + return fmt.Errorf("Timeout when waiting for a DiscoveryRequest message") + } + req := v.(*fakeserver.Request).Req.(*v3discoverypb.DiscoveryRequest) + if gotEmptyNodeProto := req.GetNode() == nil; gotEmptyNodeProto != wantEmptyNodeProto { + return fmt.Errorf("Node proto received in DiscoveryRequest message is %v, want empty node proto is %v", req.GetNode(), wantEmptyNodeProto) + } + return nil +} diff --git a/xds/internal/xdsclient/tests/resource_update_test.go b/xds/internal/xdsclient/tests/resource_update_test.go index 70496f54d4c3..31ae8eda8d1a 100644 --- a/xds/internal/xdsclient/tests/resource_update_test.go +++ b/xds/internal/xdsclient/tests/resource_update_test.go @@ -57,7 +57,7 @@ import ( // cleanup function to close the fake server. func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { t.Helper() - fs, sCleanup, err := fakeserver.StartServer() + fs, sCleanup, err := fakeserver.StartServer(nil) if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } diff --git a/xds/internal/xdsclient/transport/transport.go b/xds/internal/xdsclient/transport/transport.go index 6dea512abf94..0ef46b8548d0 100644 --- a/xds/internal/xdsclient/transport/transport.go +++ b/xds/internal/xdsclient/transport/transport.go @@ -277,14 +277,16 @@ type ResourceSendInfo struct { URL string } -func (t *Transport) sendAggregatedDiscoveryServiceRequest(stream adsStream, resourceNames []string, resourceURL, version, nonce string, nackErr error) error { +func (t *Transport) sendAggregatedDiscoveryServiceRequest(stream adsStream, sendNodeProto bool, resourceNames []string, resourceURL, version, nonce string, nackErr error) error { req := &v3discoverypb.DiscoveryRequest{ - Node: t.nodeProto, TypeUrl: resourceURL, ResourceNames: resourceNames, VersionInfo: version, ResponseNonce: nonce, } + if sendNodeProto { + req.Node = t.nodeProto + } if nackErr != nil { req.ErrorDetail = &statuspb.Status{ Code: int32(codes.InvalidArgument), Message: nackErr.Error(), @@ -372,16 +374,32 @@ func (t *Transport) adsRunner(ctx context.Context) { // there are new streams) and the appropriate request is sent out. func (t *Transport) send(ctx context.Context) { var stream adsStream + // The xDS protocol only requires that we send the node proto in the first + // discovery request on every stream. Sending the node proto in every + // request message wastes CPU resources on the client and the server. + sendNodeProto := true for { select { case <-ctx.Done(): return case stream = <-t.adsStreamCh: + // We have a new stream and we've to ensure that the node proto gets + // sent out in the first request on the stream. At this point, we + // might not have any registered watches. Setting this field to true + // here will ensure that the node proto gets sent out along with the + // discovery request when the first watch is registered. + if len(t.resources) == 0 { + sendNodeProto = true + continue + } + if !t.sendExisting(stream) { // Send failed, clear the current stream. Attempt to resend will // only be made after a new stream is created. stream = nil + continue } + sendNodeProto = false case u := <-t.adsRequestCh.Get(): t.adsRequestCh.Load() @@ -408,11 +426,12 @@ func (t *Transport) send(ctx context.Context) { // sending response back). continue } - if err := t.sendAggregatedDiscoveryServiceRequest(stream, resources, url, version, nonce, nackErr); err != nil { + if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, resources, url, version, nonce, nackErr); err != nil { t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, version, nonce, err) // Send failed, clear the current stream. stream = nil } + sendNodeProto = false } } } @@ -440,11 +459,14 @@ func (t *Transport) sendExisting(stream adsStream) bool { // seen by the client on the previous stream t.nonces = make(map[string]string) + // Send node proto only in the first request on the stream. + sendNodeProto := true for url, resources := range t.resources { - if err := t.sendAggregatedDiscoveryServiceRequest(stream, mapToSlice(resources), url, t.versions[url], "", nil); err != nil { + if err := t.sendAggregatedDiscoveryServiceRequest(stream, sendNodeProto, mapToSlice(resources), url, t.versions[url], "", nil); err != nil { t.logger.Warningf("Sending ADS request for resources: %q, url: %q, version: %q, nonce: %q failed: %v", resources, url, t.versions[url], "", err) return false } + sendNodeProto = false } return true diff --git a/xds/internal/xdsclient/transport/transport_resource_test.go b/xds/internal/xdsclient/transport/transport_resource_test.go index ed07e999fc7d..ceb5a7f67bf4 100644 --- a/xds/internal/xdsclient/transport/transport_resource_test.go +++ b/xds/internal/xdsclient/transport/transport_resource_test.go @@ -60,7 +60,7 @@ const ( // cleanup function to close the fake server. func startFakeManagementServer(t *testing.T) (*fakeserver.Server, func()) { t.Helper() - fs, sCleanup, err := fakeserver.StartServer() + fs, sCleanup, err := fakeserver.StartServer(nil) if err != nil { t.Fatalf("Failed to start fake xDS server: %v", err) } From cabd16edc3e915dbefce8b1489ed1c5ef64bb3d9 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 3 Mar 2023 08:43:47 -0800 Subject: [PATCH 2/2] split test helper routines --- .../xdsclient/tests/misc_watchers_test.go | 45 ++++++++++++------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/xds/internal/xdsclient/tests/misc_watchers_test.go b/xds/internal/xdsclient/tests/misc_watchers_test.go index bff510a9d10f..0d09a921f08b 100644 --- a/xds/internal/xdsclient/tests/misc_watchers_test.go +++ b/xds/internal/xdsclient/tests/misc_watchers_test.go @@ -215,13 +215,13 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { client.WatchResource(listenerResourceType, serviceName, watcher) // The first request on the stream must contain a non-empty node proto. - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, false); err != nil { + if err := readDiscoveryResponseAndCheckForNonEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } // The xDS client is expected to ACK the Listener resource. The discovery // request corresponding to the ACK must contain a nil node proto. - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + if err := readDiscoveryResponseAndCheckForEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } @@ -242,10 +242,10 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { // discovery requests for the route configuration resource and the // subsequent ACK contains an empty node proto. client.WatchResource(routeConfigResourceType, routeConfigName, watcher) - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + if err := readDiscoveryResponseAndCheckForEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + if err := readDiscoveryResponseAndCheckForEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } @@ -267,32 +267,45 @@ func (s) TestNodeProtoSentOnlyInFirstRequest(t *testing.T) { // // And since we don't push any responses on the response channel of the fake // server, we do not expect any ACKs here. - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, false); err != nil { + if err := readDiscoveryResponseAndCheckForNonEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } // The xDS client is expected to ACK the Listener resource. The discovery // request corresponding to the ACK must contain a nil node proto. - if err := readDiscoveryResponseAndCheckForNodeProto(ctx, mgmtServer.XDSRequestChan, true); err != nil { + if err := readDiscoveryResponseAndCheckForEmptyNodeProto(ctx, mgmtServer.XDSRequestChan); err != nil { t.Fatal(err) } } -// readDiscoveryResponseAndCheckForNodeProto reads a discovery request message -// out of the provided reqCh. It returns an error if it fails to read a message -// before the context deadline expires. -// -// wantEmptyNodeProto indicates whether the request message is expected to -// contain an empty node proto. This function returns an error if that is not -// the case. -func readDiscoveryResponseAndCheckForNodeProto(ctx context.Context, reqCh *testutils.Channel, wantEmptyNodeProto bool) error { +// readDiscoveryResponseAndCheckForEmptyNodeProto reads a discovery request +// message out of the provided reqCh. It returns an error if it fails to read a +// message before the context deadline expires, or if the read message contains +// a non-empty node proto. +func readDiscoveryResponseAndCheckForEmptyNodeProto(ctx context.Context, reqCh *testutils.Channel) error { + v, err := reqCh.Receive(ctx) + if err != nil { + return fmt.Errorf("Timeout when waiting for a DiscoveryRequest message") + } + req := v.(*fakeserver.Request).Req.(*v3discoverypb.DiscoveryRequest) + if node := req.GetNode(); node != nil { + return fmt.Errorf("Node proto received in DiscoveryRequest message is %v, want empty node proto", node) + } + return nil +} + +// readDiscoveryResponseAndCheckForNonEmptyNodeProto reads a discovery request +// message out of the provided reqCh. It returns an error if it fails to read a +// message before the context deadline expires, or if the read message contains +// an empty node proto. +func readDiscoveryResponseAndCheckForNonEmptyNodeProto(ctx context.Context, reqCh *testutils.Channel) error { v, err := reqCh.Receive(ctx) if err != nil { return fmt.Errorf("Timeout when waiting for a DiscoveryRequest message") } req := v.(*fakeserver.Request).Req.(*v3discoverypb.DiscoveryRequest) - if gotEmptyNodeProto := req.GetNode() == nil; gotEmptyNodeProto != wantEmptyNodeProto { - return fmt.Errorf("Node proto received in DiscoveryRequest message is %v, want empty node proto is %v", req.GetNode(), wantEmptyNodeProto) + if node := req.GetNode(); node == nil { + return fmt.Errorf("Empty node proto received in DiscoveryRequest message, want non-empty node proto") } return nil }