Skip to content

Commit

Permalink
Merge pull request #2044 from sawsa307/refactor-syncInternalImpl
Browse files Browse the repository at this point in the history
Refactor syncInternalImpl and toZoneNetworkEndpointMap
  • Loading branch information
k8s-ci-robot authored Apr 6, 2023
2 parents ed70041 + 6450e61 commit 4c7c629
Show file tree
Hide file tree
Showing 9 changed files with 702 additions and 334 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/metrics/neg_metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (sm *SyncerMetrics) UpdateSyncer(key negtypes.NegSyncerKey, syncResult *neg
sm.syncerStatusMap = make(map[negtypes.NegSyncerKey]string)
sm.logger.V(3).Info("Syncer Metrics failed to initialize correctly, reinitializing syncerStatusMap: %v", sm.syncerStatusMap)
}
sm.syncerStatusMap[key] = syncResult.Result
sm.syncerStatusMap[key] = string(syncResult.Result)
}

// SetSyncerEPMetrics update the endpoint count based on the endpointStat
Expand Down
27 changes: 24 additions & 3 deletions pkg/neg/syncers/endpoints_calculator.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpoints(eds []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand Down Expand Up @@ -166,6 +172,12 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpoints(_ []types.Endpoints
return subsetMap, nil, 0, err
}

func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(_ []types.EndpointsData, currentMap map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
// this should be the same as CalculateEndpoints for L4 ec
subsetMap, _, _, err := l.CalculateEndpoints(nil, currentMap)
return subsetMap, nil, err
}

func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error {
// this should be a no-op for now
return nil
Expand All @@ -176,16 +188,18 @@ type L7EndpointsCalculator struct {
zoneGetter types.ZoneGetter
servicePortName string
podLister cache.Indexer
nodeLister cache.Indexer
networkEndpointType types.NetworkEndpointType
enableDualStackNEG bool
logger klog.Logger
}

func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
func NewL7EndpointsCalculator(zoneGetter types.ZoneGetter, podLister, nodeLister cache.Indexer, svcPortName string, endpointType types.NetworkEndpointType, logger klog.Logger, enableDualStackNEG bool) *L7EndpointsCalculator {
return &L7EndpointsCalculator{
zoneGetter: zoneGetter,
servicePortName: svcPortName,
podLister: podLister,
nodeLister: nodeLister,
networkEndpointType: endpointType,
enableDualStackNEG: enableDualStackNEG,
logger: logger.WithName("L7EndpointsCalculator"),
Expand All @@ -199,7 +213,14 @@ func (l *L7EndpointsCalculator) Mode() types.EndpointsCalculatorMode {

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, int, error) {
return toZoneNetworkEndpointMap(eds, l.zoneGetter, l.servicePortName, l.networkEndpointType)
result, err := toZoneNetworkEndpointMap(eds, l.zoneGetter, l.podLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, result.DupCount, err
}

// CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs.
func (l *L7EndpointsCalculator) CalculateEndpointsDegradedMode(eds []types.EndpointsData, _ map[string]types.NetworkEndpointSet) (map[string]types.NetworkEndpointSet, types.EndpointPodMap, error) {
result := toZoneNetworkEndpointMapDegradedMode(eds, l.zoneGetter, l.podLister, l.nodeLister, l.servicePortName, l.networkEndpointType)
return result.NetworkEndpointSet, result.EndpointPodMap, nil
}

func nodeMapToString(nodeMap map[string][]*v1.Node) string {
Expand Down Expand Up @@ -235,7 +256,7 @@ func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.Endpoints
}

if countFromEndpointData != countFromPodMap {
l.logger.Info("Detected error when comparing endpoint counts", "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount)
return types.ErrEPCountsDiffer
}
return nil
Expand Down
8 changes: 4 additions & 4 deletions pkg/neg/syncers/endpoints_calculator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,10 +232,10 @@ func TestValidateEndpoints(t *testing.T) {
zoneGetter := negtypes.NewFakeZoneGetter()
testContext := negtypes.NewTestContext()
podLister := testContext.PodInformer.GetIndexer()
nodeLister := listers.NewNodeLister(testContext.NodeInformer.GetIndexer())
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(nodeLister, zoneGetter, svcKey, klog.TODO())
nodeLister := testContext.NodeInformer.GetIndexer()
L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, testPortName, negtypes.VmIpPortEndpointType, klog.TODO(), testContext.EnableDualStackNEG)
L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())
L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, svcKey, klog.TODO())

testEndpointPodMap := map[negtypes.NetworkEndpoint]types.NamespacedName{
{
Expand Down
159 changes: 86 additions & 73 deletions pkg/neg/syncers/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package syncers

import (
"context"
"errors"
"fmt"
"net/http"
"strings"
Expand Down Expand Up @@ -187,6 +186,7 @@ func GetEndpointsCalculator(nodeLister, podLister cache.Indexer, zoneGetter negt
return NewL7EndpointsCalculator(
zoneGetter,
podLister,
nodeLister,
syncerKey.PortTuple.Name,
syncerKey.NegType,
logger,
Expand Down Expand Up @@ -220,17 +220,16 @@ func (s *transactionSyncer) syncInternal() error {
}

func (s *transactionSyncer) syncInternalImpl() error {
if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}

if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
s.logger.V(3).Info("Skip syncing NEG", "negSyncerKey", s.NegSyncerKey.String())
return nil
}
s.logger.V(2).Info("Sync NEG", "negSyncerKey", s.NegSyncerKey.String(), "endpointsCalculatorMode", s.endpointsCalculator.Mode())

currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
Expand All @@ -246,8 +245,6 @@ func (s *transactionSyncer) syncInternalImpl() error {

var targetMap map[string]negtypes.NetworkEndpointSet
var endpointPodMap negtypes.EndpointPodMap
var dupCount int

slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
Expand All @@ -256,37 +253,31 @@ func (s *transactionSyncer) syncInternalImpl() error {
s.logger.Error(nil, "Endpoint slices for the service doesn't exist. Skipping NEG sync")
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
for i, slice := range slices {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
continue
}
lastSyncTimestamp := negCR.Status.LastSyncTime
epsCreationTimestamp := endpointslice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointslice.Namespace, "Name", endpointslice.Name, "staleness", epsStaleness)
endpointSlices := convertUntypedToEPS(slices)
s.computeEPSStaleness(endpointSlices)

}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, dupCount, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
targetMap, endpointPodMap, err = s.getEndpointsCalculation(endpointsData, currentMap)

if !s.enableDegradedMode && err != nil {
return err
}
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
if err != nil {
// TODO(cheungdavid): return error from ValidateEndpoint after degraded mode is implemented
// for now we don't return error so it won't break the sync
s.setErrorState(s.getErrorStateReason(err))
} else if s.enableDegradedMode {
if !s.inErrorState() && err != nil {
return err // if we encounter an error, we will return and run the next sync in degraded mode
}
degradedTargetMap, degradedPodMap, degradedModeErr := s.endpointsCalculator.CalculateEndpointsDegradedMode(endpointsData, currentMap)
if degradedModeErr != nil {
return degradedModeErr
}
notInDegraded, onlyInDegraded := calculateNetworkEndpointDifference(targetMap, degradedTargetMap)
if s.inErrorState() {
targetMap = degradedTargetMap
endpointPodMap = degradedPodMap
if len(notInDegraded) == 0 && len(onlyInDegraded) == 0 {
s.resetErrorState()
}
}
// TODO(cheungdavid): in the else branch, publish metrics if we don't encounter error and we are not in error state
}
s.logStats(targetMap, "desired NEG endpoints")

Expand Down Expand Up @@ -317,6 +308,23 @@ func (s *transactionSyncer) syncInternalImpl() error {
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
}

func (s *transactionSyncer) getEndpointsCalculation(
endpointsData []negtypes.EndpointsData,
currentMap map[string]negtypes.NetworkEndpointSet,
) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) {
targetMap, endpointPodMap, dupCount, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return nil, nil, err
}
if s.enableDegradedMode {
err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount)
if err != nil {
return nil, nil, err
}
}
return targetMap, endpointPodMap, nil
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) inErrorState() bool {
return s.errorState != ""
Expand All @@ -327,6 +335,11 @@ func (s *transactionSyncer) setErrorState(errorState string) {
s.errorState = errorState
}

// syncLock must already be acquired before execution
func (s *transactionSyncer) resetErrorState() {
s.errorState = ""
}

// ensureNetworkEndpointGroups ensures NEGs are created and configured correctly in the corresponding zones.
func (s *transactionSyncer) ensureNetworkEndpointGroups() error {
var err error
Expand Down Expand Up @@ -376,7 +389,7 @@ func (s *transactionSyncer) getErrorStateReason(err error) string {
return ""
}
if result, contains := negtypes.ErrorStateResult[err]; contains {
return result
return string(result)
}
return ""
}
Expand All @@ -403,8 +416,6 @@ func (s *transactionSyncer) ValidateEndpointBatch(err error, operation transacti

// syncNetworkEndpoints spins off go routines to execute NEG operations
func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]negtypes.NetworkEndpointSet) error {
var wg sync.WaitGroup

syncFunc := func(endpointMap map[string]negtypes.NetworkEndpointSet, operation transactionOp) error {
for zone, endpointSet := range endpointMap {
if endpointSet.Len() == 0 {
Expand All @@ -428,10 +439,10 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
}

if operation == attachOp {
s.attachNetworkEndpoints(zone, batch, &wg)
s.attachNetworkEndpoints(zone, batch)
}
if operation == detachOp {
s.detachNetworkEndpoints(zone, batch, &wg)
s.detachNetworkEndpoints(zone, batch)
}
}
return nil
Expand All @@ -444,52 +455,25 @@ func (s *transactionSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints m
if err := syncFunc(removeEndpoints, detachOp); err != nil {
return err
}
go s.collectSyncResult(&wg)
return nil
}

// collectSyncResult collects the result of the sync and emits the metrics for sync result
func (s *transactionSyncer) collectSyncResult(wg *sync.WaitGroup) {
wg.Wait()
s.syncLock.Lock()
defer s.syncLock.Unlock()

var syncResult *negtypes.NegSyncResult
switch s.errorState {
case "":
syncResult = negtypes.NewNegSyncResult(nil, negtypes.ResultSuccess)
case negtypes.ResultInvalidAPIResponse:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidAPIResponse, negtypes.ResultInvalidAPIResponse)
case negtypes.ResultInvalidEPAttach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPAttach, negtypes.ResultInvalidEPAttach)
case negtypes.ResultInvalidEPDetach:
syncResult = negtypes.NewNegSyncResult(negtypes.ErrInvalidEPDetach, negtypes.ResultInvalidEPDetach)
default:
syncResult = negtypes.NewNegSyncResult(errors.New("Unknown error state value"), negtypes.ResultOtherError)
}

s.syncCollector.UpdateSyncer(s.NegSyncerKey, syncResult)
}

// attachNetworkEndpoints creates go routine to run operations for attaching network endpoints
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
func (s *transactionSyncer) attachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Attaching endpoints to NEG.", "countOfEndpointsBeingAttached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(attachOp, zone, networkEndpointMap, wg)
go s.operationInternal(attachOp, zone, networkEndpointMap)
}

// detachNetworkEndpoints creates go routine to run operations for detaching network endpoints
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
func (s *transactionSyncer) detachNetworkEndpoints(zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
s.logger.V(2).Info("Detaching endpoints from NEG.", "countOfEndpointsBeingDetached", len(networkEndpointMap), "negSyncerKey", s.NegSyncerKey.String(), "zone", zone)
wg.Add(1)
go s.operationInternal(detachOp, zone, networkEndpointMap, wg)
go s.operationInternal(detachOp, zone, networkEndpointMap)
}

// operationInternal executes NEG API call and commits the transactions
// It will record events when operations are completed
// If error occurs or any transaction entry requires reconciliation, it will trigger resync
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint, wg *sync.WaitGroup) {
defer wg.Done()
func (s *transactionSyncer) operationInternal(operation transactionOp, zone string, networkEndpointMap map[negtypes.NetworkEndpoint]*composite.NetworkEndpoint) {
var err error
start := time.Now()
networkEndpoints := []*composite.NetworkEndpoint{}
Expand Down Expand Up @@ -734,6 +718,35 @@ func (s *transactionSyncer) updateStatus(syncErr error) {
}
}

func convertUntypedToEPS(endpointSliceUntyped []interface{}) []*discovery.EndpointSlice {
endpointSlices := make([]*discovery.EndpointSlice, len(endpointSliceUntyped))
for i, slice := range endpointSliceUntyped {
endpointslice := slice.(*discovery.EndpointSlice)
endpointSlices[i] = endpointslice
}
return endpointSlices
}

func (s *transactionSyncer) computeEPSStaleness(endpointSlices []*discovery.EndpointSlice) {
negCR, err := getNegFromStore(s.svcNegLister, s.Namespace, s.NegSyncerKey.NegName)
if err != nil {
s.logger.Error(err, "unable to retrieve neg from the store", "neg", klog.KRef(s.Namespace, s.NegName))
return
}
lastSyncTimestamp := negCR.Status.LastSyncTime
for _, endpointSlice := range endpointSlices {
epsCreationTimestamp := endpointSlice.ObjectMeta.CreationTimestamp

epsStaleness := time.Since(lastSyncTimestamp.Time)
// if this endpoint slice is newly created/created after last sync
if lastSyncTimestamp.Before(&epsCreationTimestamp) {
epsStaleness = time.Since(epsCreationTimestamp.Time)
}
metrics.PublishNegEPSStalenessMetrics(epsStaleness)
s.logger.V(3).Info("Endpoint slice syncs", "Namespace", endpointSlice.Namespace, "Name", endpointSlice.Name, "staleness", epsStaleness)
}
}

// getNegFromStore returns the neg associated with the provided namespace and neg name if it exists otherwise throws an error
func getNegFromStore(svcNegLister cache.Indexer, namespace, negName string) (*negv1beta1.ServiceNetworkEndpointGroup, error) {
n, exists, err := svcNegLister.GetByKey(fmt.Sprintf("%s/%s", namespace, negName))
Expand Down
Loading

0 comments on commit 4c7c629

Please sign in to comment.