diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 5010fe28e5..b04d5ddb85 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -292,7 +292,7 @@ func NewController( }) } - nodeEventHandler := cache.ResourceEventHandlerFuncs{ + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { node := obj.(*apiv1.Node) negController.enqueueNode(node) @@ -301,20 +301,20 @@ func NewController( node := obj.(*apiv1.Node) negController.enqueueNode(node) }, - } - - if negController.runL4 { - nodeEventHandler.UpdateFunc = func(old, cur interface{}) { + UpdateFunc: func(old, cur interface{}) { oldNode := old.(*apiv1.Node) currentNode := cur.(*apiv1.Node) - candidateNodeCheck := utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes - if candidateNodeCheck(oldNode) != candidateNodeCheck(currentNode) { - logger.Info("Node has changed, enqueueing", "node", klog.KObj(currentNode)) + + vmIpCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType) + vmIpPortCandidateNodeCheck := negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType) + + if vmIpCandidateNodeCheck(oldNode) != vmIpCandidateNodeCheck(currentNode) || + vmIpPortCandidateNodeCheck(oldNode) != vmIpPortCandidateNodeCheck(currentNode) { + logger.Info("Node has changed, enqueueing", "node", currentNode.Name) negController.enqueueNode(currentNode) } - } - } - nodeInformer.AddEventHandler(nodeEventHandler) + }, + }) if enableAsm { negController.enableASM = enableAsm diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 93aa49157e..16b76031c9 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -94,11 +94,12 @@ type syncerManager struct { enableNonGcpMode bool enableEndpointSlices bool - // zoneMap keeps track of the last set of zones the neg controller - // has seen. zoneMap is protected by the mu mutex. - zoneMap map[string]struct{} - logger klog.Logger + + // zone maps keep track of the last set of zones the neg controller has seen + // for their respective NEG types. zone maps are protected by the mu mutex. + vmIpZoneMap map[string]struct{} + vmIpPortZoneMap map[string]struct{} } func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, @@ -117,14 +118,10 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, enableEndpointSlices bool, logger klog.Logger) *syncerManager { - zones, err := zoneGetter.ListZones(utils.AllNodesPredicate) - if err != nil { - logger.V(3).Info("Unable to initialize zone map in neg manager", "err", err) - } - zoneMap := make(map[string]struct{}) - for _, zone := range zones { - zoneMap[zone] = struct{}{} - } + var vmIpZoneMap, vmIpPortZoneMap map[string]struct{} + updateZoneMap(&vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), zoneGetter, logger) + updateZoneMap(&vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), zoneGetter, logger) + return &syncerManager{ namer: namer, recorder: recorder, @@ -142,8 +139,9 @@ func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, kubeSystemUID: kubeSystemUID, enableNonGcpMode: enableNonGcpMode, enableEndpointSlices: enableEndpointSlices, - zoneMap: zoneMap, logger: logger, + vmIpZoneMap: vmIpZoneMap, + vmIpPortZoneMap: vmIpPortZoneMap, } } @@ -289,21 +287,40 @@ func (manager *syncerManager) SyncNodes() { defer manager.mu.Unlock() // When a zone change occurs (new zone is added or deleted), a sync should be triggered - isZoneChange := manager.updateZoneMap() + isVmIpZoneChange := updateZoneMap(&manager.vmIpZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpEndpointType), manager.zoneGetter, manager.logger) + isVmIpPortZoneChange := updateZoneMap(&manager.vmIpPortZoneMap, negtypes.NodePredicateForNetworkEndpointType(negtypes.VmIpPortEndpointType), manager.zoneGetter, manager.logger) + for key, syncer := range manager.syncerMap { - needSync := isZoneChange || key.NegType == negtypes.VmIpEndpointType - if needSync && !syncer.IsStopped() { - syncer.Sync() + if syncer.IsStopped() { + continue + } + + switch key.NegType { + + case negtypes.VmIpEndpointType: + if isVmIpZoneChange { + syncer.Sync() + } + + case negtypes.VmIpPortEndpointType, negtypes.NonGCPPrivateEndpointType: + if isVmIpPortZoneChange { + syncer.Sync() + } + + default: + manager.logger.Error(nil, "Not triggering sync for syncer of unknown type", "syncerType", key.NegType) } } } -// updateZoneMap updates the manager's zone map with the current zones and returns true if the -// zones have changed. The caller must obtain mu mutex before calling this function -func (manager *syncerManager) updateZoneMap() bool { - zones, err := manager.zoneGetter.ListZones(utils.AllNodesPredicate) +// updateZoneMap updates the existingZoneMap with the latest zones and returns +// true if the zones have changed. The caller must obtain mu mutex of the +// manager before calling this function since it modifies the passed +// existingZoneMap. +func updateZoneMap(existingZoneMap *map[string]struct{}, candidateNodePredicate utils.NodeConditionPredicate, zoneGetter negtypes.ZoneGetter, logger klog.Logger) bool { + zones, err := zoneGetter.ListZones(candidateNodePredicate) if err != nil { - manager.logger.Error(err, "Unable to list zones") + logger.Error(err, "Unable to list zones") return false } @@ -312,8 +329,9 @@ func (manager *syncerManager) updateZoneMap() bool { newZoneMap[zone] = struct{}{} } - zoneChange := !reflect.DeepEqual(manager.zoneMap, newZoneMap) - manager.zoneMap = newZoneMap + zoneChange := !reflect.DeepEqual(*existingZoneMap, newZoneMap) + *existingZoneMap = newZoneMap + return zoneChange } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 54054f365e..ba097f349e 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -1459,7 +1459,7 @@ func TestSyncNodesConditions(t *testing.T) { }, { desc: "vm ip neg, zones are the same", - expectSync: true, + expectSync: false, negType: negtypes.VmIpEndpointType, }, { diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index 984bc77bca..91f0a18ccf 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -435,6 +435,14 @@ func EndpointsDataFromEndpointSlices(slices []*discovery.EndpointSlice) []Endpoi func NodePredicateForEndpointCalculatorMode(mode EndpointsCalculatorMode) utils.NodeConditionPredicate { // VM_IP NEGs can include unready and upgrading nodes. if mode == L4ClusterMode || mode == L4LocalMode { + return NodePredicateForNetworkEndpointType(VmIpEndpointType) + } + return NodePredicateForNetworkEndpointType(VmIpPortEndpointType) +} + +// NodePredicateForNetworkEndpointType returns the predicate function to select candidate nodes, given the NEG type. +func NodePredicateForNetworkEndpointType(negType NetworkEndpointType) utils.NodeConditionPredicate { + if negType == VmIpEndpointType { return utils.CandidateNodesPredicateIncludeUnreadyExcludeUpgradingNodes } return utils.CandidateNodesPredicate