From 17d999af05f23e4aac4b29bd730f07c771346565 Mon Sep 17 00:00:00 2001 From: Qi Ni Date: Thu, 31 Aug 2023 10:51:15 +0800 Subject: [PATCH] fix: check endpoint slice update after backend pool update for local service to prevent mismatch --- pkg/consts/consts.go | 3 + pkg/nodemanager/nodemanager.go | 13 +- pkg/provider/azure.go | 21 +-- pkg/provider/azure_loadbalancer.go | 59 +++++-- .../azure_loadbalancer_backendpool.go | 4 +- .../azure_loadbalancer_healthprobe_test.go | 2 +- pkg/provider/azure_loadbalancer_test.go | 72 +++++++- pkg/provider/azure_local_services.go | 114 +++++++++++-- pkg/provider/azure_local_services_test.go | 155 ++++++++++-------- pkg/provider/azure_standard.go | 12 +- pkg/provider/azure_standard_test.go | 2 +- pkg/provider/azure_test.go | 4 +- pkg/provider/azure_utils.go | 2 +- pkg/provider/azure_vmss_test.go | 10 +- tests/e2e/network/multiple_standard_lb.go | 44 ++++- 15 files changed, 368 insertions(+), 149 deletions(-) diff --git a/pkg/consts/consts.go b/pkg/consts/consts.go index d9ddea625e..27e7bd86cf 100644 --- a/pkg/consts/consts.go +++ b/pkg/consts/consts.go @@ -17,6 +17,7 @@ limitations under the License. package consts import ( + "strings" "time" "github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2021-09-01/storage" @@ -203,6 +204,8 @@ const ( IPVersionDualStackString string = "DualStack" ) +var IPVersionIPv6StringLower = strings.ToLower(IPVersionIPv6String) + // LB variables for dual-stack var ( // Service.Spec.LoadBalancerIP has been deprecated and may be removed in a future release. Those two annotations are introduced as alternatives to set IPv4/IPv6 LoadBalancer IPs. diff --git a/pkg/nodemanager/nodemanager.go b/pkg/nodemanager/nodemanager.go index 077600e821..55ef8c9b84 100644 --- a/pkg/nodemanager/nodemanager.go +++ b/pkg/nodemanager/nodemanager.go @@ -115,11 +115,6 @@ var updateNetworkConditionBackoff = wait.Backoff{ Jitter: 1.0, } -const ( - v4Suffix = "IPv4" - v6Suffix = "IPv6" -) - // CloudNodeController reconciles node information. type CloudNodeController struct { nodeName string @@ -586,18 +581,18 @@ func nodeAddressesChangeDetected(addressSet1, addressSet2 []v1.NodeAddress) bool addressMap1 := map[string]string{} for i := range addressSet1 { - suffix := v4Suffix + suffix := consts.IPVersionIPv4String if net.ParseIP(addressSet1[i].Address).To4() == nil { - suffix = v6Suffix + suffix = consts.IPVersionIPv6String } addrType := fmt.Sprintf("%s/%s", addressSet1[i].Type, suffix) addressMap1[addrType] = addressSet1[i].Address } for _, v := range addressSet2 { - suffix := v4Suffix + suffix := consts.IPVersionIPv4String if net.ParseIP(v.Address).To4() == nil { - suffix = v6Suffix + suffix = consts.IPVersionIPv6String } addrType := fmt.Sprintf("%s/%s", v.Type, suffix) if addressMap1[addrType] != v.Address { diff --git a/pkg/provider/azure.go b/pkg/provider/azure.go index dec5be6409..a322fe1fce 100644 --- a/pkg/provider/azure.go +++ b/pkg/provider/azure.go @@ -1258,18 +1258,19 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { az.unmanagedNodes.Delete(prevNode.ObjectMeta.Name) } - // if the node is being deleted from the cluster, exclude it from load balancers - if newNode == nil { - az.excludeLoadBalancerNodes.Insert(prevNode.ObjectMeta.Name) - az.nodesWithCorrectLoadBalancerByPrimaryVMSet.Delete(strings.ToLower(prevNode.ObjectMeta.Name)) - } - // Remove from nodePrivateIPs cache. for _, address := range getNodePrivateIPAddresses(prevNode) { - klog.V(4).Infof("removing IP address %s of the node %s", address, prevNode.Name) + klog.V(6).Infof("removing IP address %s of the node %s", address, prevNode.Name) az.nodePrivateIPs[prevNode.Name].Delete(address) delete(az.nodePrivateIPToNodeNameMap, address) } + + // if the node is being deleted from the cluster, exclude it from load balancers + if newNode == nil { + az.excludeLoadBalancerNodes.Insert(prevNode.ObjectMeta.Name) + az.nodesWithCorrectLoadBalancerByPrimaryVMSet.Delete(strings.ToLower(prevNode.ObjectMeta.Name)) + delete(az.nodePrivateIPs, strings.ToLower(prevNode.Name)) + } } if newNode != nil { @@ -1318,15 +1319,15 @@ func (az *Cloud) updateNodeCaches(prevNode, newNode *v1.Node) { // Add to nodePrivateIPs cache for _, address := range getNodePrivateIPAddresses(newNode) { - if az.nodePrivateIPs[newNode.Name] == nil { - az.nodePrivateIPs[newNode.Name] = sets.New[string]() + if az.nodePrivateIPs[strings.ToLower(newNode.Name)] == nil { + az.nodePrivateIPs[strings.ToLower(newNode.Name)] = sets.New[string]() } if az.nodePrivateIPToNodeNameMap == nil { az.nodePrivateIPToNodeNameMap = make(map[string]string) } klog.V(6).Infof("adding IP address %s of the node %s", address, newNode.Name) - az.nodePrivateIPs[newNode.Name].Insert(address) + az.nodePrivateIPs[strings.ToLower(newNode.Name)].Insert(address) az.nodePrivateIPToNodeNameMap[address] = newNode.Name } } diff --git a/pkg/provider/azure_loadbalancer.go b/pkg/provider/azure_loadbalancer.go index 71f644aff3..6fcab4031f 100644 --- a/pkg/provider/azure_loadbalancer.go +++ b/pkg/provider/azure_loadbalancer.go @@ -167,6 +167,12 @@ func (az *Cloud) reconcileService(ctx context.Context, clusterName string, servi key := strings.ToLower(serviceName) if az.useMultipleStandardLoadBalancers() && isLocalService(service) { az.localServiceNameToServiceInfoMap.Store(key, newServiceInfo(getServiceIPFamily(service), lbName)) + // There are chances that the endpointslice changes after EnsureHostsInPool, so + // need to check endpointslice for a second time. + if err := az.checkAndApplyLocalServiceBackendPoolUpdates(*lb, service); err != nil { + klog.Errorf("failed to checkAndApplyLocalServiceBackendPoolUpdates: %v", err) + return nil, err + } } else { az.localServiceNameToServiceInfoMap.Delete(key) } @@ -1801,18 +1807,9 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, if az.useMultipleStandardLoadBalancers() { lbToReconcile = *existingLBs } - for _, lb := range lbToReconcile { - lbName := pointer.StringDeref(lb.Name, "") - if lb.LoadBalancerPropertiesFormat != nil && lb.LoadBalancerPropertiesFormat.BackendAddressPools != nil { - for _, backendPool := range *lb.LoadBalancerPropertiesFormat.BackendAddressPools { - isIPv6 := isBackendPoolIPv6(pointer.StringDeref(backendPool.Name, "")) - if strings.EqualFold(pointer.StringDeref(backendPool.Name, ""), az.getBackendPoolNameForService(service, clusterName, isIPv6)) { - if err := az.LoadBalancerBackendPool.EnsureHostsInPool(service, nodes, lbBackendPoolIDs[isIPv6], vmSetName, clusterName, lbName, backendPool); err != nil { - return nil, err - } - } - } - } + lb, err = az.reconcileBackendPoolHosts(lb, lbToReconcile, service, nodes, clusterName, vmSetName, lbBackendPoolIDs) + if err != nil { + return nil, err } } @@ -1824,6 +1821,44 @@ func (az *Cloud) reconcileLoadBalancer(clusterName string, service *v1.Service, return lb, nil } +func (az *Cloud) reconcileBackendPoolHosts( + currentLB *network.LoadBalancer, + lbs []network.LoadBalancer, + service *v1.Service, + nodes []*v1.Node, + clusterName, vmSetName string, + lbBackendPoolIDs map[bool]string, +) (*network.LoadBalancer, error) { + var res *network.LoadBalancer + res = currentLB + for _, lb := range lbs { + lb := lb + lbName := pointer.StringDeref(lb.Name, "") + if lb.LoadBalancerPropertiesFormat != nil && lb.LoadBalancerPropertiesFormat.BackendAddressPools != nil { + for i, backendPool := range *lb.LoadBalancerPropertiesFormat.BackendAddressPools { + isIPv6 := isBackendPoolIPv6(pointer.StringDeref(backendPool.Name, "")) + if strings.EqualFold(pointer.StringDeref(backendPool.Name, ""), az.getBackendPoolNameForService(service, clusterName, isIPv6)) { + if err := az.LoadBalancerBackendPool.EnsureHostsInPool( + service, + nodes, + lbBackendPoolIDs[isIPv6], + vmSetName, + clusterName, + lbName, + (*lb.LoadBalancerPropertiesFormat.BackendAddressPools)[i], + ); err != nil { + return nil, err + } + } + } + } + if strings.EqualFold(lbName, *currentLB.Name) { + res = &lb + } + } + return res, nil +} + // addOrUpdateLBInList adds or updates the given lb in the list func addOrUpdateLBInList(lbs *[]network.LoadBalancer, targetLB *network.LoadBalancer) { for i, lb := range *lbs { diff --git a/pkg/provider/azure_loadbalancer_backendpool.go b/pkg/provider/azure_loadbalancer_backendpool.go index 28d75bdb04..376f2e8977 100644 --- a/pkg/provider/azure_loadbalancer_backendpool.go +++ b/pkg/provider/azure_loadbalancer_backendpool.go @@ -352,7 +352,7 @@ func (bc *backendPoolTypeNodeIPConfig) GetBackendPrivateIPs(clusterName string, klog.Errorf("bc.GetBackendPrivateIPs for service (%s): GetNodeNameByIPConfigurationID failed with error: %v", serviceName, err) continue } - privateIPsSet, ok := bc.nodePrivateIPs[nodeName] + privateIPsSet, ok := bc.nodePrivateIPs[strings.ToLower(nodeName)] if !ok { klog.Warningf("bc.GetBackendPrivateIPs for service (%s): failed to get private IPs of node %s", serviceName, nodeName) continue @@ -658,7 +658,7 @@ func (bi *backendPoolTypeNodeIP) ReconcileBackendPools(clusterName string, servi bp := newBackendPools[i] var nodeIPAddressesToBeDeleted []string for nodeName := range bi.excludeLoadBalancerNodes { - for ip := range bi.nodePrivateIPs[nodeName] { + for ip := range bi.nodePrivateIPs[strings.ToLower(nodeName)] { klog.V(2).Infof("bi.ReconcileBackendPools for service (%s): found unwanted node private IP %s, decouple it from the LB %s", serviceName, ip, lbName) nodeIPAddressesToBeDeleted = append(nodeIPAddressesToBeDeleted, ip) } diff --git a/pkg/provider/azure_loadbalancer_healthprobe_test.go b/pkg/provider/azure_loadbalancer_healthprobe_test.go index 5603e88d59..27726521e4 100644 --- a/pkg/provider/azure_loadbalancer_healthprobe_test.go +++ b/pkg/provider/azure_loadbalancer_healthprobe_test.go @@ -39,7 +39,7 @@ func getTestProbes(protocol, path string, interval, servicePort, probePort, numO func getTestProbe(protocol, path string, interval, servicePort, probePort, numOfProbe *int32, isIPv6 bool) network.Probe { suffix := "" if isIPv6 { - suffix = "-" + v6Suffix + suffix = "-" + consts.IPVersionIPv6String } expectedProbes := network.Probe{ Name: pointer.String(fmt.Sprintf("atest1-TCP-%d", *servicePort) + suffix), diff --git a/pkg/provider/azure_loadbalancer_test.go b/pkg/provider/azure_loadbalancer_test.go index f07fa61998..cf9c95a429 100644 --- a/pkg/provider/azure_loadbalancer_test.go +++ b/pkg/provider/azure_loadbalancer_test.go @@ -3055,7 +3055,7 @@ func getTCPResetTestRules(enableTCPReset bool) map[bool][]network.LoadBalancingR func getTestRule(enableTCPReset bool, port int32, isIPv6 bool) network.LoadBalancingRule { suffix := "" if isIPv6 { - suffix = "-" + v6Suffix + suffix = "-" + consts.IPVersionIPv6String } expectedRules := network.LoadBalancingRule{ Name: pointer.String(fmt.Sprintf("atest1-TCP-%d", port) + suffix), @@ -3089,7 +3089,7 @@ func getHATestRules(enableTCPReset, hasProbe bool, protocol v1.Protocol, isIPv6, suffix := "" enableFloatingIP := true if isIPv6 { - suffix = "-" + v6Suffix + suffix = "-" + consts.IPVersionIPv6String if isInternal { enableFloatingIP = false } @@ -3128,7 +3128,7 @@ func getHATestRules(enableTCPReset, hasProbe bool, protocol v1.Protocol, isIPv6, func getFloatingIPTestRule(enableTCPReset, enableFloatingIP bool, port int32, isIPv6 bool) network.LoadBalancingRule { suffix := "" if isIPv6 { - suffix = "-" + v6Suffix + suffix = "-" + consts.IPVersionIPv6String } expectedRules := network.LoadBalancingRule{ Name: pointer.String(fmt.Sprintf("atest1-TCP-%d%s", port, suffix)), @@ -9034,3 +9034,69 @@ func TestAddOrUpdateLBInList(t *testing.T) { addOrUpdateLBInList(&existingLBs, &targetLB) assert.Equal(t, expectedLBs, existingLBs) } + +func TestReconcileBackendPoolHosts(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + svc := getTestService("test", v1.ProtocolTCP, nil, false) + lbBackendPoolIDs := map[bool]string{false: "id"} + clusterName := "kubernetes" + ips := []string{"10.0.0.1"} + bp1 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips) + bp2 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips) + ips = []string{"10.0.0.2", "10.0.0.3"} + bp3 := buildTestLoadBalancerBackendPoolWithIPs(clusterName, ips) + lb1 := &network.LoadBalancer{ + Name: pointer.String(clusterName), + LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{ + BackendAddressPools: &[]network.BackendAddressPool{bp1}, + }, + } + lb2 := &network.LoadBalancer{ + Name: pointer.String("lb2"), + LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{ + BackendAddressPools: &[]network.BackendAddressPool{bp2}, + }, + } + expectedLB := &network.LoadBalancer{ + Name: pointer.String(clusterName), + LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{ + BackendAddressPools: &[]network.BackendAddressPool{bp3}, + }, + } + existingLBs := []network.LoadBalancer{*lb1, *lb2} + + cloud := GetTestCloud(ctrl) + mockLBBackendPool := NewMockBackendPool(ctrl) + mockLBBackendPool.EXPECT().EnsureHostsInPool(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), bp1).DoAndReturn(fakeEnsureHostsInPool()) + mockLBBackendPool.EXPECT().EnsureHostsInPool(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), bp2).Return(nil) + cloud.LoadBalancerBackendPool = mockLBBackendPool + + var err error + lb1, err = cloud.reconcileBackendPoolHosts(lb1, existingLBs, &svc, []*v1.Node{}, clusterName, "vmss", lbBackendPoolIDs) + assert.NoError(t, err) + assert.Equal(t, expectedLB, lb1) + + mockLBBackendPool.EXPECT().EnsureHostsInPool(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("error")) + _, err = cloud.reconcileBackendPoolHosts(lb1, existingLBs, &svc, []*v1.Node{}, clusterName, "vmss", lbBackendPoolIDs) + assert.Equal(t, errors.New("error"), err) +} + +func fakeEnsureHostsInPool() func(*v1.Service, []*v1.Node, string, string, string, string, network.BackendAddressPool) error { + return func(svc *v1.Service, nodes []*v1.Node, lbBackendPoolID, vmSet, clusterName, lbName string, backendPool network.BackendAddressPool) error { + backendPool.LoadBalancerBackendAddresses = &[]network.LoadBalancerBackendAddress{ + { + LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{ + IPAddress: pointer.String("10.0.0.2"), + }, + }, + { + LoadBalancerBackendAddressPropertiesFormat: &network.LoadBalancerBackendAddressPropertiesFormat{ + IPAddress: pointer.String("10.0.0.3"), + }, + }, + } + return nil + } +} diff --git a/pkg/provider/azure_local_services.go b/pkg/provider/azure_local_services.go index 66ff100ed6..25ff0e8162 100644 --- a/pkg/provider/azure_local_services.go +++ b/pkg/provider/azure_local_services.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/informers" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + utilnet "k8s.io/utils/net" "k8s.io/utils/pointer" "sigs.k8s.io/cloud-provider-azure/pkg/consts" @@ -286,7 +287,6 @@ func (az *Cloud) getLocalServiceInfo(serviceName string) (*serviceInfo, bool) { // setUpEndpointSlicesInformer creates an informer for EndpointSlices of local services. // It watches the update events and send backend pool update operations to the batch updater. -// TODO (niqi): the update of endpointslice may be slower than tue update of endpoint pods. Need to fix this. func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInformerFactory) { endpointSlicesInformer := informerFactory.Discovery().V1().EndpointSlices().Informer() _, _ = endpointSlicesInformer.AddEventHandler( @@ -311,7 +311,7 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf key := strings.ToLower(fmt.Sprintf("%s/%s", newES.Namespace, svcName)) si, found := az.getLocalServiceInfo(key) if !found { - klog.V(4).Infof("EndpointSlice %s/%s belongs to service %s, but the service is not a local service, skip updating load balancer backend pool", key, newES.Namespace, newES.Name) + klog.V(4).Infof("EndpointSlice %s/%s belongs to service %s, but the service is not a local service, or has not finished the initial reconciliation loop. Skip updating load balancer backend pool", newES.Namespace, newES.Name, key) return } lbName, ipFamily := si.lbName, si.ipFamily @@ -328,18 +328,13 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf } } for _, previousNodeName := range previousNodeNames { - nodeIPsSet := az.nodePrivateIPs[previousNodeName] + nodeIPsSet := az.nodePrivateIPs[strings.ToLower(previousNodeName)] previousIPs = append(previousIPs, setToStrings(nodeIPsSet)...) } for _, currentNodeName := range currentNodeNames { - nodeIPsSet := az.nodePrivateIPs[currentNodeName] + nodeIPsSet := az.nodePrivateIPs[strings.ToLower(currentNodeName)] currentIPs = append(currentIPs, setToStrings(nodeIPsSet)...) } - ipsToBeDeleted := compareNodeIPs(previousIPs, currentIPs) - if len(ipsToBeDeleted) == 0 && len(previousIPs) == len(currentIPs) { - klog.V(4).Infof("No IP change detected for EndpointSlice %s/%s, skip updating load balancer backend pool", newES.Namespace, newES.Name) - return - } if az.backendPoolUpdater != nil { var bpNames []string @@ -353,14 +348,11 @@ func (az *Cloud) setUpEndpointSlicesInformer(informerFactory informers.SharedInf default: bpNames = append(bpNames, bpNameIPv4, bpNameIPv6) } + currentIPsInBackendPools := make(map[string][]string) for _, bpName := range bpNames { - if len(ipsToBeDeleted) > 0 { - az.backendPoolUpdater.addOperation(getRemoveIPsFromBackendPoolOperation(key, lbName, bpName, ipsToBeDeleted)) - } - if len(currentIPs) > 0 { - az.backendPoolUpdater.addOperation(getAddIPsToBackendPoolOperation(key, lbName, bpName, currentIPs)) - } + currentIPsInBackendPools[bpName] = previousIPs } + az.applyIPChangesAmongLocalServiceBackendPoolsByIPFamily(lbName, key, currentIPsInBackendPools, currentIPs) } }, DeleteFunc: func(obj interface{}) { @@ -408,7 +400,7 @@ func compareNodeIPs(previousIPs, currentIPs []string) []string { func getLocalServiceBackendPoolName(serviceName string, ipv6 bool) string { serviceName = strings.ToLower(strings.Replace(serviceName, "/", "-", -1)) if ipv6 { - return fmt.Sprintf("%s-ipv6", serviceName) + return fmt.Sprintf("%s-%s", serviceName, consts.IPVersionIPv6StringLower) } return serviceName } @@ -471,12 +463,16 @@ func newServiceInfo(ipFamily, lbName string) *serviceInfo { // getLocalServiceEndpointsNodeNames gets the node names that host all endpoints of the local service. func (az *Cloud) getLocalServiceEndpointsNodeNames(service *v1.Service) (sets.Set[string], error) { - var ep *discovery_v1.EndpointSlice + var ( + ep *discovery_v1.EndpointSlice + foundInCache bool + ) az.endpointSlicesCache.Range(func(key, value interface{}) bool { endpointSlice := value.(*discovery_v1.EndpointSlice) if strings.EqualFold(getServiceNameOfEndpointSlice(endpointSlice), service.Name) && strings.EqualFold(endpointSlice.Namespace, service.Namespace) { ep = endpointSlice + foundInCache = true return false } return true @@ -499,6 +495,9 @@ func (az *Cloud) getLocalServiceEndpointsNodeNames(service *v1.Service) (sets.Se if ep == nil { return nil, fmt.Errorf("failed to find EndpointSlice for service %s/%s", service.Namespace, service.Name) } + if !foundInCache { + az.endpointSlicesCache.Store(strings.ToLower(fmt.Sprintf("%s/%s", ep.Namespace, ep.Name)), ep) + } var nodeNames []string for _, endpoint := range ep.Endpoints { @@ -544,3 +543,84 @@ func (az *Cloud) cleanupLocalServiceBackendPool( } return lbs, nil } + +// checkAndApplyLocalServiceBackendPoolUpdates if the IPs in the backend pool are aligned +// with the corresponding endpointslice, and update the backend pool if necessary. +func (az *Cloud) checkAndApplyLocalServiceBackendPoolUpdates(lb network.LoadBalancer, service *v1.Service) error { + serviceName := getServiceName(service) + endpointsNodeNames, err := az.getLocalServiceEndpointsNodeNames(service) + if err != nil { + return err + } + var expectedIPs []string + for nodeName := range endpointsNodeNames { + ips := az.nodePrivateIPs[strings.ToLower(nodeName)] + for ip := range ips { + expectedIPs = append(expectedIPs, ip) + } + } + currentIPsInBackendPools := make(map[string][]string) + for _, bp := range *lb.BackendAddressPools { + bpName := pointer.StringDeref(bp.Name, "") + if localServiceOwnsBackendPool(serviceName, bpName) { + var currentIPs []string + for _, address := range *bp.LoadBalancerBackendAddresses { + currentIPs = append(currentIPs, *address.IPAddress) + } + currentIPsInBackendPools[bpName] = currentIPs + } + } + az.applyIPChangesAmongLocalServiceBackendPoolsByIPFamily(*lb.Name, serviceName, currentIPsInBackendPools, expectedIPs) + + return nil +} + +// applyIPChangesAmongLocalServiceBackendPoolsByIPFamily reconciles IPs by IP family +// amone the backend pools of a local service. +func (az *Cloud) applyIPChangesAmongLocalServiceBackendPoolsByIPFamily( + lbName, serviceName string, + currentIPsInBackendPools map[string][]string, + expectedIPs []string, +) { + currentIPsInBackendPoolsIPv4 := make(map[string][]string) + currentIPsInBackendPoolsIPv6 := make(map[string][]string) + for bpName, ips := range currentIPsInBackendPools { + if managedResourceHasIPv6Suffix(bpName) { + currentIPsInBackendPoolsIPv6[bpName] = ips + } else { + currentIPsInBackendPoolsIPv4[bpName] = ips + } + } + + var ipv4, ipv6 []string + for _, ip := range expectedIPs { + if utilnet.IsIPv6String(ip) { + ipv6 = append(ipv6, ip) + } else { + ipv4 = append(ipv4, ip) + } + } + az.reconcileIPsInLocalServiceBackendPoolsAsync(lbName, serviceName, currentIPsInBackendPoolsIPv6, ipv6) + az.reconcileIPsInLocalServiceBackendPoolsAsync(lbName, serviceName, currentIPsInBackendPoolsIPv4, ipv4) +} + +// reconcileIPsInLocalServiceBackendPoolsAsync reconciles IPs in the backend pools of a local service. +func (az *Cloud) reconcileIPsInLocalServiceBackendPoolsAsync( + lbName, serviceName string, + currentIPsInBackendPools map[string][]string, + expectedIPs []string, +) { + for bpName, currentIPs := range currentIPsInBackendPools { + ipsToBeDeleted := compareNodeIPs(currentIPs, expectedIPs) + if len(ipsToBeDeleted) == 0 && len(currentIPs) == len(expectedIPs) { + klog.V(4).Infof("No IP change detected for service %s, skip updating load balancer backend pool", serviceName) + return + } + if len(ipsToBeDeleted) > 0 { + az.backendPoolUpdater.addOperation(getRemoveIPsFromBackendPoolOperation(serviceName, lbName, bpName, ipsToBeDeleted)) + } + if len(expectedIPs) > 0 { + az.backendPoolUpdater.addOperation(getAddIPsToBackendPoolOperation(serviceName, lbName, bpName, expectedIPs)) + } + } +} diff --git a/pkg/provider/azure_local_services_test.go b/pkg/provider/azure_local_services_test.go index ab7d57a9e9..a32ed6dba9 100644 --- a/pkg/provider/azure_local_services_test.go +++ b/pkg/provider/azure_local_services_test.go @@ -34,6 +34,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/pointer" @@ -223,26 +224,6 @@ func TestLoadBalancerBackendPoolUpdater(t *testing.T) { u.removeOperation(tc.removeOperationServiceName) } time.Sleep(3 * time.Second) - //err := wait.PollUntilContextTimeout(context.Background(), time.Second, 5*time.Second, true, func(ctx context.Context) (bool, error) { - // resultLen := 0 - // results.Range(func(key, value interface{}) bool { - // resultLen++ - // return true - // }) - // return resultLen == len(tc.expectedResult), nil - //}) - //assert.NoError(t, err) - - //actualResults := make(map[batchOperationResult]bool) - //results.Range(func(key, value interface{}) bool { - // actualResults[key.(batchOperationResult)] = true - // return true - //}) - - //err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, true, func(ctx context.Context) (bool, error) { - // return assert.Equal(t, tc.expectedResult, actualResults, tc.name), nil - //}) - //assert.NoError(t, err) }) } } @@ -262,9 +243,6 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { putBackendPoolErr *retry.Error expectedCreateOrUpdateBackendPools []network.BackendAddressPool expectedBackendPools []network.BackendAddressPool - //expectedResult map[batchOperationResult]bool - //expectedPoolNameToErrMsg map[string]string - //expectedResultCount int }{ { name: "Retriable error when getting backend pool", @@ -279,10 +257,6 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { expectedBackendPools: []network.BackendAddressPool{ getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}), }, - //expectedResult: map[batchOperationResult]bool{ - // newBatchOperationResult("lb1/pool1", true, nil): true, - //}, - //expectedResultCount: 1, }, { name: "Retriable error when updating backend pool", @@ -299,10 +273,6 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { expectedBackendPools: []network.BackendAddressPool{ getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}), }, - //expectedResult: map[batchOperationResult]bool{ - // newBatchOperationResult("lb1/pool1", true, nil): true, - //}, - //expectedResultCount: 1, }, { name: "Non-retriable error when getting backend pool", @@ -314,8 +284,6 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { expectedBackendPools: []network.BackendAddressPool{ getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{}), }, - //expectedPoolNameToErrMsg: map[string]string{"lb1/pool1": "Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: error"}, - //expectedResultCount: 1, }, { name: "Non-retriable error when updating backend pool", @@ -328,8 +296,6 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { expectedCreateOrUpdateBackendPools: []network.BackendAddressPool{ getTestBackendAddressPoolWithIPs("lb1", "pool1", []string{"10.0.0.1", "10.0.0.2"}), }, - //expectedPoolNameToErrMsg: map[string]string{"lb1/pool1": "Retriable: false, RetryAfter: 0s, HTTPStatusCode: 0, RawError: error"}, - //expectedResultCount: 1, }, { name: "Backend pool not found", @@ -416,48 +382,12 @@ func TestLoadBalancerBackendPoolUpdaterFailed(t *testing.T) { defer cancel() go u.run(ctx) - //results := sync.Map{} for _, op := range tc.operations { op := op - //go func() { u.addOperation(op) - //result := op.wait() - //results.Store(result, true) - //}() time.Sleep(100 * time.Millisecond) } time.Sleep(3 * time.Second) - //err := wait.PollUntilContextTimeout(context.Background(), time.Second, 5*time.Second, true, func(ctx context.Context) (bool, error) { - // resultLen := 0 - // results.Range(func(key, value interface{}) bool { - // resultLen++ - // return true - // }) - // return resultLen == tc.expectedResultCount, nil - //}) - //assert.NoError(t, err) - - //actualResults := make(map[batchOperationResult]bool) - //poolNameToErrMsg := make(map[string]string) - //results.Range(func(key, value interface{}) bool { - // actualResults[key.(batchOperationResult)] = true - // if tc.getBackendPoolErr != nil && !tc.getBackendPoolErr.Retriable || - // tc.putBackendPoolErr != nil && !tc.putBackendPoolErr.Retriable { - // poolNameToErrMsg[key.(batchOperationResult).name] = key.(batchOperationResult).err.Error() - // } - // return true - //}) - - //err = wait.PollUntilContextTimeout(ctx, time.Second, 5*time.Second, true, func(ctx context.Context) (bool, error) { - // if tc.expectedResult != nil { - // return assert.Equal(t, tc.expectedResult, actualResults, tc.name), nil - // } - // if tc.expectedPoolNameToErrMsg != nil { - // return assert.Equal(t, tc.expectedPoolNameToErrMsg, poolNameToErrMsg, tc.name), nil - // } - // return false, errors.New("unexpected result") - //}) - //assert.NoError(t, err) }) } } @@ -604,3 +534,86 @@ func TestGetBackendPoolNamesAndIDsForService(t *testing.T) { _ = cloud.getBackendPoolNamesForService(&svc, "test") _ = cloud.getBackendPoolIDsForService(&svc, "test", "lb") } + +func TestCheckAndApplyLocalServiceBackendPoolUpdates(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + for _, tc := range []struct { + description string + existingEPS *discovery_v1.EndpointSlice + expectedErr error + }{ + { + description: "should update backend pool as expected", + existingEPS: getTestEndpointSlice("eps1", "default", "svc1", "node2"), + }, + { + description: "should report an error if failed to get the endpointslice", + expectedErr: errors.New("failed to find EndpointSlice for service default/svc1"), + }, + } { + t.Run(tc.description, func(t *testing.T) { + cloud := GetTestCloud(ctrl) + cloud.localServiceNameToServiceInfoMap.Store("default/svc1", &serviceInfo{lbName: "lb1"}) + svc := getTestService("svc1", v1.ProtocolTCP, nil, false) + var client kubernetes.Interface + if tc.existingEPS != nil { + client = fake.NewSimpleClientset(&svc, tc.existingEPS) + } else { + client = fake.NewSimpleClientset(&svc) + } + cloud.KubeClient = client + informerFactory := informers.NewSharedInformerFactory(client, 0) + cloud.serviceLister = informerFactory.Core().V1().Services().Lister() + cloud.LoadBalancerBackendPoolUpdateIntervalInSeconds = 1 + cloud.LoadBalancerSku = consts.LoadBalancerSkuStandard + cloud.MultipleStandardLoadBalancerConfigurations = []MultipleStandardLoadBalancerConfiguration{ + { + Name: "lb1", + }, + } + cloud.localServiceNameToServiceInfoMap.Store("default/svc1", newServiceInfo(consts.IPVersionIPv4String, "lb1")) + cloud.nodePrivateIPs = map[string]sets.Set[string]{ + "node1": sets.New[string]("10.0.0.1", "fd00::1"), + "node2": sets.New[string]("10.0.0.2", "fd00::2"), + } + + existingBackendPool := getTestBackendAddressPoolWithIPs("lb1", "default-svc1", []string{"10.0.0.1"}) + existingBackendPoolIPv6 := getTestBackendAddressPoolWithIPs("lb1", "default-svc1-ipv6", []string{"fd00::1"}) + existingLB := network.LoadBalancer{ + Name: pointer.String("lb1"), + LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{ + BackendAddressPools: &[]network.BackendAddressPool{ + existingBackendPool, + existingBackendPoolIPv6, + }, + }, + } + expectedBackendPool := getTestBackendAddressPoolWithIPs("lb1", "default-svc1", []string{"10.0.0.2"}) + expectedBackendPoolIPv6 := getTestBackendAddressPoolWithIPs("lb1", "default-svc1-ipv6", []string{"fd00::2"}) + mockLBClient := mockloadbalancerclient.NewMockInterface(ctrl) + if tc.existingEPS != nil { + mockLBClient.EXPECT().GetLBBackendPool(gomock.Any(), gomock.Any(), "lb1", "default-svc1", "").Return(existingBackendPool, nil) + mockLBClient.EXPECT().GetLBBackendPool(gomock.Any(), gomock.Any(), "lb1", "default-svc1-ipv6", "").Return(existingBackendPoolIPv6, nil) + mockLBClient.EXPECT().CreateOrUpdateBackendPools(gomock.Any(), gomock.Any(), "lb1", "default-svc1", expectedBackendPool, "").Return(nil) + mockLBClient.EXPECT().CreateOrUpdateBackendPools(gomock.Any(), gomock.Any(), "lb1", "default-svc1-ipv6", expectedBackendPoolIPv6, "").Return(nil) + } + cloud.LoadBalancerClient = mockLBClient + + u := newLoadBalancerBackendPoolUpdater(cloud, time.Second) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cloud.backendPoolUpdater = u + go cloud.backendPoolUpdater.run(ctx) + + err := cloud.checkAndApplyLocalServiceBackendPoolUpdates(existingLB, &svc) + if tc.expectedErr != nil { + assert.Equal(t, tc.expectedErr, err) + } else { + assert.NoError(t, err) + } + time.Sleep(2 * time.Second) + }) + } +} diff --git a/pkg/provider/azure_standard.go b/pkg/provider/azure_standard.go index 1040317bec..9f5dd05944 100644 --- a/pkg/provider/azure_standard.go +++ b/pkg/provider/azure_standard.go @@ -54,14 +54,6 @@ var ( vmasIDRE = regexp.MustCompile(`/subscriptions/(?:.*)/resourceGroups/(?:.*)/providers/Microsoft.Compute/availabilitySets/(.+)`) ) -const ( - v6Suffix = "IPv6" -) - -var ( - v6SuffixLower = strings.ToLower(v6Suffix) -) - // returns the full identifier of an availabilitySet func (az *Cloud) getAvailabilitySetID(resourceGroup, availabilitySetName string) string { return fmt.Sprintf( @@ -270,7 +262,7 @@ func isInternalLoadBalancer(lb *network.LoadBalancer) bool { // clusters moving from IPv6 to dualstack will require no changes as the IPv4 backend pool will created with func getBackendPoolName(clusterName string, isIPv6 bool) string { if isIPv6 { - return fmt.Sprintf("%s-%s", clusterName, v6Suffix) + return fmt.Sprintf("%s-%s", clusterName, consts.IPVersionIPv6String) } return clusterName @@ -290,7 +282,7 @@ func isBackendPoolIPv6(name string) bool { } func managedResourceHasIPv6Suffix(name string) bool { - return strings.HasSuffix(strings.ToLower(name), fmt.Sprintf("-%s", v6SuffixLower)) + return strings.HasSuffix(strings.ToLower(name), fmt.Sprintf("-%s", consts.IPVersionIPv6StringLower)) } func (az *Cloud) getLoadBalancerRuleName(service *v1.Service, protocol v1.Protocol, port int32, isIPv6 bool) string { diff --git a/pkg/provider/azure_standard_test.go b/pkg/provider/azure_standard_test.go index b6c6dba3c8..335365b0f9 100644 --- a/pkg/provider/azure_standard_test.go +++ b/pkg/provider/azure_standard_test.go @@ -568,7 +568,7 @@ func testGetLoadBalancerSubResourceIDs( subscriptionID, rgName, c.loadBalancerName, - clusterName) + "-" + v6Suffix + clusterName) + "-" + consts.IPVersionIPv6String subResourceIDs := getLoadBalancerSubResourceIDs(clusterName, c.loadBalancerName) assert.Equal(t, expectedV4, subResourceIDs[false]) assert.Equal(t, expectedV6, subResourceIDs[true]) diff --git a/pkg/provider/azure_test.go b/pkg/provider/azure_test.go index ed6eff7623..60d4b6e8f7 100644 --- a/pkg/provider/azure_test.go +++ b/pkg/provider/azure_test.go @@ -241,7 +241,7 @@ func setMockPublicIP(az *Cloud, mockPIPsClient *mockpublicipclient.MockInterface ipAddr1 := "1.2.3.4" ipAddra := "1.2.3.5" if isIPv6 { - suffix = "-" + v6Suffix + suffix = "-" + consts.IPVersionIPv6String ipVer = network.IPv6 ipAddr1 = "fd00::eef0" ipAddra = "fd00::eef1" @@ -1994,7 +1994,7 @@ func validateLoadBalancer(t *testing.T, loadBalancer *network.LoadBalancer, serv expectedFrontendIPs = append(expectedFrontendIPs, expectedFrontendIP) if svcIPFamilyCount == 2 { expectedFrontendIP := ExpectedFrontendIPInfo{ - Name: az.getDefaultFrontendIPConfigName(&services[i]) + "-" + v6Suffix, + Name: az.getDefaultFrontendIPConfigName(&services[i]) + "-" + consts.IPVersionIPv6String, Subnet: pointer.String(expectedSubnetName), } expectedFrontendIPs = append(expectedFrontendIPs, expectedFrontendIP) diff --git a/pkg/provider/azure_utils.go b/pkg/provider/azure_utils.go index e8724ef6a5..7c05498742 100644 --- a/pkg/provider/azure_utils.go +++ b/pkg/provider/azure_utils.go @@ -453,7 +453,7 @@ func getServicePIPPrefixID(service *v1.Service, isIPv6 bool) string { // the old PIPs will be recreated. func getResourceByIPFamily(resource string, isDualStack, isIPv6 bool) string { if isDualStack && isIPv6 { - return fmt.Sprintf("%s-%s", resource, v6Suffix) + return fmt.Sprintf("%s-%s", resource, consts.IPVersionIPv6String) } return resource } diff --git a/pkg/provider/azure_vmss_test.go b/pkg/provider/azure_vmss_test.go index 4f9b34fa5b..86fc984630 100644 --- a/pkg/provider/azure_vmss_test.go +++ b/pkg/provider/azure_vmss_test.go @@ -52,9 +52,9 @@ const ( testVMSSName = "vmss" testVMPowerState = "PowerState/Running" testLBBackendpoolID0 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-0" - testLBBackendpoolID0v6 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-0" + "-" + v6Suffix + testLBBackendpoolID0v6 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-0" + "-" + consts.IPVersionIPv6String testLBBackendpoolID1 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-1" - testLBBackendpoolID1v6 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-1" + "-" + v6Suffix + testLBBackendpoolID1v6 = "/subscriptions/sub/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-1" + "-" + consts.IPVersionIPv6String testLBBackendpoolID2 = "/subscriptions/sub/resourceGroups/rg1/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/backendpool-2" ) @@ -62,7 +62,7 @@ func buildTestVMSSWithLB(name, namePrefix string, lbBackendpoolIDs []string, ipv lbBackendpoolsV4, lbBackendpoolsV6 := make([]compute.SubResource, 0), make([]compute.SubResource, 0) for _, id := range lbBackendpoolIDs { lbBackendpoolsV4 = append(lbBackendpoolsV4, compute.SubResource{ID: pointer.String(id)}) - lbBackendpoolsV6 = append(lbBackendpoolsV6, compute.SubResource{ID: pointer.String(id + "-" + v6Suffix)}) + lbBackendpoolsV6 = append(lbBackendpoolsV6, compute.SubResource{ID: pointer.String(id + "-" + consts.IPVersionIPv6String)}) } ipConfig := []compute.VirtualMachineScaleSetIPConfiguration{ { @@ -2421,7 +2421,7 @@ func TestEnsureVMSSInPool(t *testing.T) { }, }, isBasicLB: false, - backendPoolID: testLBBackendpoolID1 + "-" + v6Suffix, + backendPoolID: testLBBackendpoolID1 + "-" + consts.IPVersionIPv6String, clusterIP: "fd00::e68b", expectedPutVMSS: false, expectedErr: fmt.Errorf("failed to find a primary IP configuration (IPv6=true) for the VMSS VM or VMSS \"vmss\""), @@ -2436,7 +2436,7 @@ func TestEnsureVMSSInPool(t *testing.T) { }, }, isBasicLB: false, - backendPoolID: testLBBackendpoolID1 + "-" + v6Suffix, + backendPoolID: testLBBackendpoolID1 + "-" + consts.IPVersionIPv6String, setIPv6Config: true, clusterIP: "fd00::e68b", expectedPutVMSS: true, diff --git a/tests/e2e/network/multiple_standard_lb.go b/tests/e2e/network/multiple_standard_lb.go index 1dc0232c54..4f8268a6c9 100644 --- a/tests/e2e/network/multiple_standard_lb.go +++ b/tests/e2e/network/multiple_standard_lb.go @@ -189,12 +189,13 @@ var _ = Describe("Ensure LoadBalancer", Label(utils.TestSuiteLabelMultiSLB), fun Expect(err).NotTo(HaveOccurred()) Expect(len(ips)).NotTo(BeZero()) - nodes, err := utils.GetAgentNodes(cs) - By(fmt.Sprintf("Checking the node count in the local service backend pool to equal %d)", len(nodes))) + nodeNames, err := getDeploymentPodsNodeNames(cs, ns.Name, testDeploymentName) + Expect(err).NotTo(HaveOccurred()) + By(fmt.Sprintf("Checking the node count in the local service backend pool to equal %d", len(nodeNames))) clusterName := os.Getenv("CLUSTER_NAME") Expect(err).NotTo(HaveOccurred()) expectedBPName := fmt.Sprintf("%s-%s", svc.Namespace, svc.Name) - err = checkNodeCountInBackendPoolByServiceIPs(tc, clusterName, expectedBPName, ips, len(nodes)) + err = checkNodeCountInBackendPoolByServiceIPs(tc, clusterName, expectedBPName, ips, len(nodeNames)) Expect(err).NotTo(HaveOccurred()) By("Scaling the deployment to 3 replicas and then to 1") @@ -221,10 +222,12 @@ var _ = Describe("Ensure LoadBalancer", Label(utils.TestSuiteLabelMultiSLB), fun deployment.Spec.Replicas = pointer.Int32(5) _, err = cs.AppsV1().Deployments(ns.Name).Update(context.Background(), deployment, metav1.UpdateOptions{}) Expect(err).NotTo(HaveOccurred()) + nodeNames, err = getDeploymentPodsNodeNames(cs, ns.Name, testDeploymentName) + Expect(err).NotTo(HaveOccurred()) err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { - if err := checkNodeCountInBackendPoolByServiceIPs(tc, clusterName, expectedBPName, ips, len(nodes)); err != nil { + if err := checkNodeCountInBackendPoolByServiceIPs(tc, clusterName, expectedBPName, ips, len(nodeNames)); err != nil { if strings.Contains(err.Error(), "expected node count") { - utils.Logf("Waiting for the node count in the backend pool to equal %d", len(nodes)) + utils.Logf("Waiting for the node count in the backend pool to equal %d", len(nodeNames)) return false, nil } return false, err @@ -235,6 +238,37 @@ var _ = Describe("Ensure LoadBalancer", Label(utils.TestSuiteLabelMultiSLB), fun }) }) +// getDeploymentPodsNodeNames returns the node names of the pods in the deployment created in BeforeEach. +func getDeploymentPodsNodeNames(kubeClient clientset.Interface, namespace, deploymentName string) (map[string]bool, error) { + var ( + podList *v1.PodList + res = make(map[string]bool) + err error + ) + err = wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) { + podList, err = kubeClient.CoreV1().Pods(namespace).List(context.Background(), metav1.ListOptions{}) + if err != nil { + return false, err + } + for _, pod := range podList.Items { + if strings.HasPrefix(pod.Name, deploymentName) { + if pod.Spec.NodeName == "" { + utils.Logf("Waiting for pod %s to be running", pod.Name) + return false, nil + } + utils.Logf("Pod %s is running on node %s", pod.Name, pod.Spec.NodeName) + res[pod.Spec.NodeName] = true + } + } + return true, nil + }) + if err != nil { + return nil, err + } + + return res, nil +} + func checkNodeCountInBackendPoolByServiceIPs(tc *utils.AzureTestClient, expectedLBName, bpName string, svcIPs []string, expectedCount int) error { for _, svcIP := range svcIPs { lb := getAzureLoadBalancerFromPIP(tc, svcIP, tc.GetResourceGroup(), tc.GetResourceGroup())