From 96022b9dc246a9f9c39286dc892dce93c7dc579d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 17 Aug 2023 23:44:05 +0000 Subject: [PATCH 1/4] clusterresolver: fix deadlock in dns discovery mechanism --- .../e2e_test/aggregate_cluster_test.go | 331 +++++++----------- .../clusterresolver/resource_resolver_dns.go | 56 ++- 2 files changed, 173 insertions(+), 214 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index d544360bba63..05060c43dfd3 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -18,9 +18,10 @@ package e2e_test import ( "context" - "errors" "fmt" + "net" "sort" + "strconv" "strings" "testing" "time" @@ -317,34 +318,41 @@ func (s) TestAggregateCluster_WithTwoEDSClusters_PrioritiesChange(t *testing.T) } } +func hostAndPortFromAddress(t *testing.T, addr string) (string, uint32) { + t.Helper() + + host, p, err := net.SplitHostPort(addr) + if err != nil { + t.Fatalf("Invalid serving address: %v", addr) + } + port, err := strconv.ParseUint(p, 10, 32) + if err != nil { + t.Fatalf("Invalid serving port %q: %v", p, err) + } + return host, uint32(port) +} + // TestAggregateCluster_WithOneDNSCluster tests the case where the top-level // cluster resource is an aggregate cluster that resolves to a single // LOGICAL_DNS cluster. The test verifies that RPCs can be made to backends that // make up the LOGICAL_DNS cluster. func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, _ := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + host, port := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. - const ( - dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) - ) + const dnsClusterName = clusterName + "-dns" resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{dnsClusterName}), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, host, uint32(port)), }, SkipValidation: true, } @@ -359,19 +367,55 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } + // Make an RPC and ensure that it gets routed to the first backend since the + // child policy for a LOGICAL_DNS cluster is pick_first by default. + client := testgrpc.NewTestServiceClient(cc) + peer := &peer.Peer{} + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) } + if peer.Addr.String() != server.Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) + } +} - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs}) +// Tests the case where the top-level cluster resource is an aggregate cluster +// that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs +// can be made to backends that make up the LOGICAL_DNS cluster. The hostname of +// the LOGICAL_DNS cluster is updated, and the test verifies that RPCs can be +// made to backends that the new hostname resolves to. +func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) { + // Start an xDS management server. + managementServer, nodeID, bootstrapContents, _, cleanup1 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup1() + + // Start two test backends and extract their host and port. The first + // backend is used intially for the LOGICAL_DNS cluster and an update + // switches the cluster to use the second backend. + servers, cleanup2 := startTestServiceBackends(t, 2) + defer cleanup2() + + // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. + const dnsClusterName = clusterName + "-dns" + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[0].Address) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() // Make an RPC and ensure that it gets routed to the first backend since the // child policy for a LOGICAL_DNS cluster is pick_first by default. @@ -380,8 +424,35 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - if peer.Addr.String() != addrs[0].Addr { - t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + if peer.Addr.String() != servers[0].Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, servers[0].Address) + } + + // Update the LOGICAL_DNS cluster's hostname to point to the second backend. + dnsHostName, dnsPort = hostAndPortFromAddress(t, servers[1].Address) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + }, + SkipValidation: true, + } + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Ensure that traffic moves to the second backend eventually. + for ctx.Err() == nil { + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer)); err != nil { + t.Fatalf("EmptyCall() failed: %v", err) + } + if peer.Addr.String() == servers[1].Address { + break + } + } + if ctx.Err() != nil { + t.Fatal("Timeout when waiting for RPCs to switch to the second backend") } } @@ -500,9 +571,6 @@ func (s) TestAggregateCluster_WithEDSAndDNS(t *testing.T) { // cluster. The test verifies that RPCs are successful, this time to backends in // the DNS cluster. func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -513,15 +581,12 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, addrs[1].Addr) // Configure an aggregate cluster pointing to a single EDS cluster. Also, // configure the underlying EDS cluster (and the corresponding endpoints // resource) and DNS cluster (will be used later in the test). - const ( - dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) - ) + const dnsClusterName = clusterName + "-dns" resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ @@ -563,20 +628,6 @@ func (s) TestAggregateCluster_SwitchEDSAndDNS(t *testing.T) { t.Fatal(err) } - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Ensure that start getting routed to the backend corresponding to the // LOGICAL_DNS cluster. for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { @@ -705,17 +756,14 @@ func (s) TestAggregateCluster_BadEDS_GoodToBadDNS(t *testing.T) { // the DNS resolver pushes an update, the test verifies that we switch to the // DNS cluster and can make a successful RPC. func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, _ := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an EDS and LOGICAL_DNS // cluster. Also configure an empty endpoints resource for the EDS cluster @@ -723,8 +771,6 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) nackEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil) nackEndpointResource.Endpoints = []*v3endpointpb.LocalityLbEndpoints{ @@ -755,23 +801,9 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs}) - // Ensure that RPCs start getting routed to the first backend since the // child policy for a LOGICAL_DNS cluster is pick_first by default. - pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]) + pickfirst.CheckRPCsToBackend(ctx, cc, resolver.Address{Addr: server.Address}) } // TestAggregateCluster_BadDNS_GoodEDS tests the case where the top-level @@ -780,34 +812,29 @@ func (s) TestAggregateCluster_BadEDSFromError_GoodToBadDNS(t *testing.T) { // good update, this test verifies the cluster_resolver balancer correctly falls // back from the LOGICAL_DNS cluster to the EDS cluster. func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() - // Start two test backends. - servers, cleanup3 := startTestServiceBackends(t, 2) - defer cleanup3() - addrs, ports := backendAddressesAndPorts(t, servers) + // Start a test service backend. + server := stubserver.StartTestService(t, nil) + defer server.Stop() + _, edsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an LOGICAL_DNS and EDS // cluster. Also configure an endpoints resource for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{dnsClusterName, edsClusterName}), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080), e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone), }, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(ports[0])})}, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{e2e.DefaultEndpoint(edsServiceName, "localhost", []uint32{uint32(edsPort)})}, SkipValidation: true, } ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) @@ -821,20 +848,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Push an error through the DNS resolver. - dnsR.ReportError(errors.New("some error")) - // RPCs should work, higher level DNS cluster errors so should fallback to // EDS cluster. client := testgrpc.NewTestServiceClient(cc) @@ -842,8 +855,8 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } - if peer.Addr.String() != addrs[0].Addr { - t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, addrs[0].Addr) + if peer.Addr.String() != server.Address { + t.Fatalf("EmptyCall() routed to backend %q, want %q", peer.Addr, server.Address) } } @@ -854,9 +867,6 @@ func (s) TestAggregateCluster_BadDNS_GoodEDS(t *testing.T) { // error, the test verifies that RPCs fail with the error triggered by the DNS // Discovery Mechanism (from sending an empty address list down). func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -867,8 +877,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) emptyEndpointResource := e2e.DefaultEndpoint(edsServiceName, "localhost", nil) resources := e2e.UpdateOptions{ @@ -876,7 +884,7 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { Clusters: []*v3clusterpb.Cluster{ makeAggregateClusterResource(clusterName, []string{edsClusterName, dnsClusterName}), e2e.DefaultCluster(edsClusterName, edsServiceName, e2e.SecurityLevelNone), - makeLogicalDNSClusterResource(dnsClusterName, dnsHostName, dnsPort), + makeLogicalDNSClusterResource(dnsClusterName, "bad.ip.v4.address", 8080), }, Endpoints: []*v3endpointpb.ClusterLoadAssignment{emptyEndpointResource}, SkipValidation: true, @@ -892,39 +900,20 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Make an RPC with a short deadline. We expect this RPC to not succeed - // because the EDS resource came back with no endpoints, and we are yet to - // push an update through the DNS resolver. - client := testgrpc.NewTestServiceClient(cc) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) - } - - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Push an error from the DNS resolver as well. - dnsErr := fmt.Errorf("DNS error") - dnsR.ReportError(dnsErr) - // Ensure that the error from the DNS Resolver leads to an empty address // update for both priorities. - _, err := client.EmptyCall(ctx, &testpb.Empty{}) - if code := status.Code(err); code != codes.Unavailable { - t.Fatalf("EmptyCall() failed with code %s, want %s", code, codes.Unavailable) + client := testgrpc.NewTestServiceClient(cc) + for ctx.Err() == nil { + _, err := client.EmptyCall(ctx, &testpb.Empty{}) + if err == nil { + t.Fatal("EmptyCall() succeeded when expected to fail") + } + if status.Code(err) == codes.Unavailable && strings.Contains(err.Error(), "produced zero addresses") { + break + } } - if err == nil || !strings.Contains(err.Error(), "produced zero addresses") { - t.Fatalf("EmptyCall() failed with error: %v, want: produced zero addresses", err) + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for RPCs to fail with expected code and error") } } @@ -937,9 +926,6 @@ func (s) TestAggregateCluster_BadEDS_BadDNS(t *testing.T) { // previously received good update and that RPCs still get routed to the EDS // cluster. func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -950,14 +936,13 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. Also // configure an endpoints resource for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -980,20 +965,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Make an RPC and ensure that it gets routed to the first backend since the // EDS cluster is of higher priority than the LOGICAL_DNS cluster. client := testgrpc.NewTestServiceClient(cc) @@ -1032,9 +1003,6 @@ func (s) TestAggregateCluster_NoFallback_EDSNackedWithPreviousGoodUpdate(t *test // the LOGICAL_DNS cluster, because it is supposed to treat the bad EDS response // as though it received an update with no endpoints. func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -1045,13 +1013,12 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes servers, cleanup3 := startTestServiceBackends(t, 2) defer cleanup3() addrs, ports := backendAddressesAndPorts(t, servers) + dnsHostName, dnsPort := hostAndPortFromAddress(t, servers[1].Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -1080,20 +1047,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes cc, cleanup := setupAndDial(t, bootstrapContents) defer cleanup() - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: addrs[1:]}) - // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. peer := &peer.Peer{} client := testgrpc.NewTestServiceClient(cc) @@ -1111,9 +1064,6 @@ func (s) TestAggregateCluster_Fallback_EDSNackedWithoutPreviousGoodUpdate(t *tes // cluster. The test verifies that the cluster_resolver LB policy falls back to // the LOGICAL_DNS cluster in this case. func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { - dnsTargetCh, _, _, dnsR, cleanup1 := setupDNS() - defer cleanup1() - // Start an xDS management server. mgmtServer, nodeID, _, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) defer cleanup2() @@ -1121,14 +1071,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { // Start a test backend for the LOGICAL_DNS cluster. server := stubserver.StartTestService(t, nil) defer server.Stop() + dnsHostName, dnsPort := hostAndPortFromAddress(t, server.Address) // Configure an aggregate cluster pointing to an EDS and DNS cluster. No // endpoints are configured for the EDS cluster. const ( edsClusterName = clusterName + "-eds" dnsClusterName = clusterName + "-dns" - dnsHostName = "dns_host" - dnsPort = uint32(8080) ) resources := e2e.UpdateOptions{ NodeID: nodeID, @@ -1177,35 +1126,13 @@ func (s) TestAggregateCluster_Fallback_EDS_ResourceNotFound(t *testing.T) { } defer cc.Close() - // Make an RPC with a short deadline. We expect this RPC to not succeed - // because the DNS resolver has not responded with endpoint addresses. - client := testgrpc.NewTestServiceClient(cc) - sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) - defer sCancel() - if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded { - t.Fatalf("EmptyCall() code %s, want %s", status.Code(err), codes.DeadlineExceeded) - } - - // Ensure that the DNS resolver is started for the expected target. - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for DNS resolver to be started") - case target := <-dnsTargetCh: - got, want := target.Endpoint(), fmt.Sprintf("%s:%d", dnsHostName, dnsPort) - if got != want { - t.Fatalf("DNS resolution started for target %q, want %q", got, want) - } - } - - // Update DNS resolver with test backend addresses. - dnsR.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: server.Address}}}) - // Make an RPC and ensure that it gets routed to the LOGICAL_DNS cluster. // Even though the EDS cluster is of higher priority, since the management // server does not respond with an EDS resource, the cluster_resolver LB // policy is expected to fallback to the LOGICAL_DNS cluster once the watch // timeout expires. peer := &peer.Peer{} + client := testgrpc.NewTestServiceClient(cc) if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(peer), grpc.WaitForReady(true)); err != nil { t.Fatalf("EmptyCall() failed: %v", err) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index 9052190b0ff0..03d34c1f9932 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -19,11 +19,13 @@ package clusterresolver import ( + "context" "fmt" "net/url" "sync" "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -43,9 +45,14 @@ var ( type dnsDiscoveryMechanism struct { target string topLevelResolver topLevelResolver - dnsR resolver.Resolver logger *grpclog.PrefixLogger + // All accesses to dnsR must be made within serializer callbacks, thereby + // guaranteeing mutual exclusion. + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc + dnsR resolver.Resolver + mu sync.Mutex addrs []string updateReceived bool @@ -83,16 +90,33 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr return ret } - r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{}) - if err != nil { + // Create the serializer and schedule a callback to create the actual DNS + // resolver. This needs to happen asynchornously to avoid the following + // deadlock: + // - this method is called from rr.updateMechanisms which holds `rr.mu` + // - dns resolver Build() can report state or error inline, which is handled + // in UpdateState or ReportError, both of which call + // topLevelResolver.onUpdate, which tries to grab `rr.mu`, and hence + // deadlocks + ctx, cancel := context.WithCancel(context.Background()) + ret.serializer = grpcsync.NewCallbackSerializer(ctx) + ret.serializerCancel = cancel + ret.serializer.Schedule(func(context.Context) { + r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{}) + if err == nil { + ret.dnsR = r + return + } + if ret.logger.V(2) { ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err) } + ret.mu.Lock() ret.updateReceived = true + ret.mu.Unlock() ret.topLevelResolver.onUpdate() - return ret - } - ret.dnsR = r + }) + return ret } @@ -107,9 +131,11 @@ func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) { } func (dr *dnsDiscoveryMechanism) resolveNow() { - if dr.dnsR != nil { - dr.dnsR.ResolveNow(resolver.ResolveNowOptions{}) - } + dr.serializer.Schedule(func(context.Context) { + if dr.dnsR != nil { + dr.dnsR.ResolveNow(resolver.ResolveNowOptions{}) + } + }) } // The definition of stop() mentions that implementations must not invoke any @@ -119,9 +145,15 @@ func (dr *dnsDiscoveryMechanism) resolveNow() { // after its `Close()` returns. Therefore, we can guarantee that no methods of // the topLevelResolver are invoked after we return from this method. func (dr *dnsDiscoveryMechanism) stop() { - if dr.dnsR != nil { - dr.dnsR.Close() - } + dr.serializerCancel() + done := make(chan struct{}) + dr.serializer.Schedule(func(context.Context) { + if dr.dnsR != nil { + dr.dnsR.Close() + } + close(done) + }) + <-done } // dnsDiscoveryMechanism needs to implement resolver.ClientConn interface to receive From f78bd31e65a37f652620207fa5d993633a9a01d3 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 18 Aug 2023 00:07:21 +0000 Subject: [PATCH 2/4] make vet happy --- .../balancer/clusterresolver/e2e_test/aggregate_cluster_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index 05060c43dfd3..954891656fee 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -390,7 +390,7 @@ func (s) TestAggregateCluster_WithOneDNSCluster_HostnameChange(t *testing.T) { defer cleanup1() // Start two test backends and extract their host and port. The first - // backend is used intially for the LOGICAL_DNS cluster and an update + // backend is used initially for the LOGICAL_DNS cluster and an update // switches the cluster to use the second backend. servers, cleanup2 := startTestServiceBackends(t, 2) defer cleanup2() From de1fa7bda4188096221da1ae170292ac6ed96e57 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 22 Aug 2023 15:58:14 +0000 Subject: [PATCH 3/4] review comments --- .../clusterresolver/clusterresolver.go | 4 +- .../e2e_test/aggregate_cluster_test.go | 39 +++++++++++++ .../clusterresolver/resource_resolver.go | 33 ++++++++--- .../clusterresolver/resource_resolver_dns.go | 57 +++++-------------- 4 files changed, 79 insertions(+), 54 deletions(-) diff --git a/xds/internal/balancer/clusterresolver/clusterresolver.go b/xds/internal/balancer/clusterresolver/clusterresolver.go index dedd9218a181..6a60bc308a96 100644 --- a/xds/internal/balancer/clusterresolver/clusterresolver.go +++ b/xds/internal/balancer/clusterresolver/clusterresolver.go @@ -280,7 +280,7 @@ func (b *clusterResolverBalancer) handleErrorFromUpdate(err error, fromParent bo // EDS resource was removed. No action needs to be taken for this, and we // should continue watching the same EDS resource. if fromParent && xdsresource.ErrType(err) == xdsresource.ErrorTypeResourceNotFound { - b.resourceWatcher.stop() + b.resourceWatcher.stop(false) } if b.child != nil { @@ -326,7 +326,7 @@ func (b *clusterResolverBalancer) run() { // Close results in stopping the endpoint resolvers and closing the // underlying child policy and is the only way to exit this goroutine. case <-b.closed.Done(): - b.resourceWatcher.stop() + b.resourceWatcher.stop(true) if b.child != nil { b.child.Close() diff --git a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go index 954891656fee..f466dcca7bda 100644 --- a/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go +++ b/xds/internal/balancer/clusterresolver/e2e_test/aggregate_cluster_test.go @@ -29,6 +29,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" "google.golang.org/grpc/internal/stubserver" @@ -379,6 +380,44 @@ func (s) TestAggregateCluster_WithOneDNSCluster(t *testing.T) { } } +// Tests the case where the top-level cluster resource is an aggregate cluster +// that resolves to a single LOGICAL_DNS cluster. The specified dns hostname is +// expected to fail url parsing. The test verifies that the channel moves to +// TRANSIENT_FAILURE. +func (s) TestAggregateCluster_WithOneDNSCluster_ParseFailure(t *testing.T) { + // Start an xDS management server. + managementServer, nodeID, bootstrapContents, _, cleanup2 := e2e.SetupManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) + defer cleanup2() + + // Configure an aggregate cluster pointing to a single LOGICAL_DNS cluster. + const dnsClusterName = clusterName + "-dns" + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Clusters: []*v3clusterpb.Cluster{ + makeAggregateClusterResource(clusterName, []string{dnsClusterName}), + makeLogicalDNSClusterResource(dnsClusterName, "%gh&%ij", uint32(8080)), + }, + SkipValidation: true, + } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create xDS client, configure cds_experimental LB policy with a manual + // resolver, and dial the test backends. + cc, cleanup := setupAndDial(t, bootstrapContents) + defer cleanup() + + // Ensure that the ClientConn moves to TransientFailure. + for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() { + if !cc.WaitForStateChange(ctx, state) { + t.Fatalf("Timed out waiting for state change. got %v; want %v", state, connectivity.TransientFailure) + } + } +} + // Tests the case where the top-level cluster resource is an aggregate cluster // that resolves to a single LOGICAL_DNS cluster. The test verifies that RPCs // can be made to backends that make up the LOGICAL_DNS cluster. The hostname of diff --git a/xds/internal/balancer/clusterresolver/resource_resolver.go b/xds/internal/balancer/clusterresolver/resource_resolver.go index aaababa71c57..b9a81e9ba829 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver.go @@ -19,9 +19,11 @@ package clusterresolver import ( + "context" "sync" "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -83,9 +85,11 @@ type discoveryMechanismAndResolver struct { } type resourceResolver struct { - parent *clusterResolverBalancer - logger *grpclog.PrefixLogger - updateChannel chan *resourceUpdate + parent *clusterResolverBalancer + logger *grpclog.PrefixLogger + updateChannel chan *resourceUpdate + serializer *grpcsync.CallbackSerializer + serializerCancel context.CancelFunc // mu protects the slice and map, and content of the resolvers in the slice. mu sync.Mutex @@ -106,12 +110,16 @@ type resourceResolver struct { } func newResourceResolver(parent *clusterResolverBalancer, logger *grpclog.PrefixLogger) *resourceResolver { - return &resourceResolver{ + rr := &resourceResolver{ parent: parent, logger: logger, updateChannel: make(chan *resourceUpdate, 1), childrenMap: make(map[discoveryMechanismKey]discoveryMechanismAndResolver), } + ctx, cancel := context.WithCancel(context.Background()) + rr.serializer = grpcsync.NewCallbackSerializer(ctx) + rr.serializerCancel = cancel + return rr } func equalDiscoveryMechanisms(a, b []DiscoveryMechanism) bool { @@ -210,8 +218,9 @@ func (rr *resourceResolver) resolveNow() { } } -func (rr *resourceResolver) stop() { +func (rr *resourceResolver) stop(closing bool) { rr.mu.Lock() + // Save the previous childrenMap to stop the children outside the mutex, // and reinitialize the map. We only need to reinitialize to allow for the // policy to be reused if the resource comes back. In practice, this does @@ -222,12 +231,18 @@ func (rr *resourceResolver) stop() { rr.childrenMap = make(map[discoveryMechanismKey]discoveryMechanismAndResolver) rr.mechanisms = nil rr.children = nil + rr.mu.Unlock() for _, r := range cm { r.r.stop() } + if closing { + rr.serializerCancel() + <-rr.serializer.Done() + } + // stop() is called when the LB policy is closed or when the underlying // cluster resource is removed by the management server. In the latter case, // an empty config update needs to be pushed to the child policy to ensure @@ -272,7 +287,9 @@ func (rr *resourceResolver) generateLocked() { } func (rr *resourceResolver) onUpdate() { - rr.mu.Lock() - rr.generateLocked() - rr.mu.Unlock() + rr.serializer.Schedule(func(context.Context) { + rr.mu.Lock() + rr.generateLocked() + rr.mu.Unlock() + }) } diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index 03d34c1f9932..e5c9435f6a83 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -19,13 +19,11 @@ package clusterresolver import ( - "context" "fmt" "net/url" "sync" "google.golang.org/grpc/internal/grpclog" - "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/pretty" "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" @@ -45,13 +43,8 @@ var ( type dnsDiscoveryMechanism struct { target string topLevelResolver topLevelResolver - logger *grpclog.PrefixLogger - - // All accesses to dnsR must be made within serializer callbacks, thereby - // guaranteeing mutual exclusion. - serializer *grpcsync.CallbackSerializer - serializerCancel context.CancelFunc dnsR resolver.Resolver + logger *grpclog.PrefixLogger mu sync.Mutex addrs []string @@ -90,34 +83,18 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr return ret } - // Create the serializer and schedule a callback to create the actual DNS - // resolver. This needs to happen asynchornously to avoid the following - // deadlock: - // - this method is called from rr.updateMechanisms which holds `rr.mu` - // - dns resolver Build() can report state or error inline, which is handled - // in UpdateState or ReportError, both of which call - // topLevelResolver.onUpdate, which tries to grab `rr.mu`, and hence - // deadlocks - ctx, cancel := context.WithCancel(context.Background()) - ret.serializer = grpcsync.NewCallbackSerializer(ctx) - ret.serializerCancel = cancel - ret.serializer.Schedule(func(context.Context) { - r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{}) - if err == nil { - ret.dnsR = r - return - } - + r, err := newDNS(resolver.Target{URL: *u}, ret, resolver.BuildOptions{}) + if err != nil { if ret.logger.V(2) { ret.logger.Infof("Failed to build DNS resolver for target %q: %v", target, err) } - ret.mu.Lock() ret.updateReceived = true - ret.mu.Unlock() ret.topLevelResolver.onUpdate() - }) - + return ret + } + ret.dnsR = r return ret + } func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) { @@ -131,11 +108,9 @@ func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) { } func (dr *dnsDiscoveryMechanism) resolveNow() { - dr.serializer.Schedule(func(context.Context) { - if dr.dnsR != nil { - dr.dnsR.ResolveNow(resolver.ResolveNowOptions{}) - } - }) + if dr.dnsR != nil { + dr.dnsR.ResolveNow(resolver.ResolveNowOptions{}) + } } // The definition of stop() mentions that implementations must not invoke any @@ -145,15 +120,9 @@ func (dr *dnsDiscoveryMechanism) resolveNow() { // after its `Close()` returns. Therefore, we can guarantee that no methods of // the topLevelResolver are invoked after we return from this method. func (dr *dnsDiscoveryMechanism) stop() { - dr.serializerCancel() - done := make(chan struct{}) - dr.serializer.Schedule(func(context.Context) { - if dr.dnsR != nil { - dr.dnsR.Close() - } - close(done) - }) - <-done + if dr.dnsR != nil { + dr.dnsR.Close() + } } // dnsDiscoveryMechanism needs to implement resolver.ClientConn interface to receive From 79dd4a6fe0e6ef03dcc80877ce5d688194f1a5cc Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 22 Aug 2023 23:59:52 +0000 Subject: [PATCH 4/4] delete an extraneous newline --- xds/internal/balancer/clusterresolver/resource_resolver_dns.go | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go index e5c9435f6a83..9052190b0ff0 100644 --- a/xds/internal/balancer/clusterresolver/resource_resolver_dns.go +++ b/xds/internal/balancer/clusterresolver/resource_resolver_dns.go @@ -94,7 +94,6 @@ func newDNSResolver(target string, topLevelResolver topLevelResolver, logger *gr } ret.dnsR = r return ret - } func (dr *dnsDiscoveryMechanism) lastUpdate() (any, bool) {