Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
sawsa307 committed Mar 22, 2023
1 parent af91d0f commit 9a4c233
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType, l.lpConfig)
return toZoneNetworkEndpointMap(eds, l.podLister, l.zoneGetter, l.servicePortName, l.networkEndpointType, l.lpConfig)
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down
7 changes: 4 additions & 3 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (s *transactionSyncer) syncInternal() error {
start := time.Now()
err := s.syncInternalImpl()
if err != nil {
s.logger.V(3).Info("Setting error state", "err", err, "errorState", s.getErrorStateReason(err))
s.setErrorState(s.getErrorStateReason(err))
}
s.updateStatus(err)
Expand Down Expand Up @@ -261,8 +262,7 @@ func (s *transactionSyncer) syncInternalImpl() error {
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
s.setErrorState(s.getErrorStateReason(err))
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
return err
}
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
if err != nil {
Expand Down Expand Up @@ -493,8 +493,9 @@ func (s *transactionSyncer) operationInternal(operation transactionOp, zone stri
endpointBatchErr := s.ValidateEndpointBatch(err, operation)
if endpointBatchErr != nil {
s.syncLock.Lock()
defer s.syncLock.Unlock()
s.logger.V(3).Info("Setting error state", "errorState", s.getErrorStateReason(endpointBatchErr))
s.setErrorState(s.getErrorStateReason(endpointBatchErr))
s.syncLock.Unlock()
}
}

Expand Down
24 changes: 18 additions & 6 deletions pkg/neg/syncers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func ensureNetworkEndpointGroup(svcNamespace, svcName, negName, zone, negService
}

// toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map, and also return the count for duplicated endpoints
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, lpConfig negtypes.PodLabelPropagationConfig) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, podLister cache.Indexer, zoneGetter negtypes.ZoneGetter, servicePortName string, networkEndpointType negtypes.NetworkEndpointType, lpConfig negtypes.PodLabelPropagationConfig) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, int, error) {
zoneNetworkEndpointMap := map[string]negtypes.NetworkEndpointSet{}
networkEndpointPodMap := negtypes.EndpointPodMap{}
dupCount := 0
Expand Down Expand Up @@ -251,19 +251,31 @@ func toZoneNetworkEndpointMap(eds []negtypes.EndpointsData, zoneGetter negtypes.
}
if endpointAddress.NodeName == nil || len(*endpointAddress.NodeName) == 0 {
klog.V(2).Infof("Detected unexpected error when checking missing nodeName. Endpoint %q in Endpoints %s/%s does not have an associated node. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return nil, nil, dupCount, negtypes.ErrEPMissingNodeName
return nil, nil, dupCount, negtypes.ErrEPInvalidNodeName
}
if endpointAddress.TargetRef == nil {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not have an associated pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
continue
return nil, nil, dupCount, negtypes.ErrEPInvalidPod
}
zone, err := zoneGetter.GetZoneForNode(*endpointAddress.NodeName)
if err != nil {
return nil, nil, dupCount, negtypes.ErrNodeNotFound
return nil, nil, dupCount, negtypes.ErrEPInvalidNodeName
}
if zone == "" {
klog.V(2).Info("Detected unexpected error when checking missing zone")
return nil, nil, dupCount, negtypes.ErrEPMissingZone
klog.V(2).Info("Detected unexpected error when checking zone")
return nil, nil, dupCount, negtypes.ErrEPInvalidZone
}

key := fmt.Sprintf("%s/%s", endpointAddress.TargetRef.Namespace, endpointAddress.TargetRef.Name)
obj, exists, err := podLister.GetByKey(key)
if err != nil || !exists {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not correspond to an existing pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return nil, nil, dupCount, negtypes.ErrEPInvalidPod
}
_, ok := obj.(*apiv1.Pod)
if !ok {
klog.V(2).Infof("Endpoint %q in Endpoints %s/%s does not correspond to an existing pod. Skipping", endpointAddress.Addresses, ed.Meta.Namespace, ed.Meta.Name)
return nil, nil, dupCount, negtypes.ErrEPInvalidPod
}
if zoneNetworkEndpointMap[zone] == nil {
zoneNetworkEndpointMap[zone] = negtypes.NewNetworkEndpointSet()
Expand Down
Loading

0 comments on commit 9a4c233

Please sign in to comment.