Skip to content

Commit

Permalink
extract common utils from batchSyncer
Browse files Browse the repository at this point in the history
  • Loading branch information
freehan committed Oct 3, 2018
1 parent 7885e57 commit e35f1d5
Show file tree
Hide file tree
Showing 4 changed files with 322 additions and 252 deletions.
130 changes: 12 additions & 118 deletions pkg/neg/syncers/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -35,27 +34,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)

const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
// For each NEG, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
)

// NegSyncerKey includes information to uniquely identify a NEG
type NegSyncerKey struct {
Namespace string
Name string
Port int32
TargetPort string
}

// batchSyncer handles synchorizing NEGs for one service port. It handles sync, resync and retry on error.
// It syncs NEG in batch and waits for all operation to complete before continue to the next batch.
type batchSyncer struct {
Expand Down Expand Up @@ -107,13 +87,13 @@ func (s *batchSyncer) init() {
// Start starts the syncer go routine if it has not been started.
func (s *batchSyncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s is already running.", s.formattedName())
return fmt.Errorf("NEG syncer for %s is already running.", s.NegSyncerKey.String())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.formattedName())
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.NegSyncerKey.String())
}

glog.V(2).Infof("Starting NEG syncer for service port %s", s.formattedName())
glog.V(2).Infof("Starting NEG syncer for service port %s", s.NegSyncerKey.String())
s.init()
go func() {
for {
Expand Down Expand Up @@ -142,7 +122,7 @@ func (s *batchSyncer) Start() error {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s", s.formattedName())
glog.V(2).Infof("Stopping NEG syncer for %s", s.NegSyncerKey.String())
return
}
case <-retryCh:
Expand All @@ -158,7 +138,7 @@ func (s *batchSyncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.formattedName())
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.NegSyncerKey.String())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
Expand All @@ -168,7 +148,7 @@ func (s *batchSyncer) Stop() {
// Sync informs syncer to run sync loop as soon as possible.
func (s *batchSyncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s is already stopped.", s.formattedName())
glog.Warningf("NEG syncer for %s is already stopped.", s.NegSyncerKey.String())
return false
}
select {
Expand All @@ -193,10 +173,10 @@ func (s *batchSyncer) IsShuttingDown() bool {

func (s *batchSyncer) sync() (err error) {
if s.IsStopped() || s.IsShuttingDown() {
glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.formattedName())
glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.NegSyncerKey.String())
return nil
}
glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.formattedName())
glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.NegSyncerKey.String())
start := time.Now()
defer metrics.ObserveNegSync(s.negName, metrics.AttachSync, err, start)
ep, exists, err := s.endpointLister.Get(
Expand Down Expand Up @@ -250,47 +230,8 @@ func (s *batchSyncer) ensureNetworkEndpointGroups() error {

var errList []error
for _, zone := range zones {
// Assume error is caused by not existing
neg, err := s.cloud.GetNetworkEndpointGroup(s.negName, zone)
if err != nil {
// Most likely to be caused by non-existed NEG
glog.V(4).Infof("Error while retriving %q in zone %q: %v", s.negName, zone, err)
}

needToCreate := false
if neg == nil {
needToCreate = true
} else if !utils.EqualResourceIDs(neg.LoadBalancer.Network, s.cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.LoadBalancer.Subnetwork, s.cloud.SubnetworkURL()) {
needToCreate = true
glog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", s.negName, zone)
err = s.cloud.DeleteNetworkEndpointGroup(s.negName, zone)
if err != nil {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
}

if needToCreate {
glog.V(2).Infof("Creating NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
err = s.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: s.negName,
NetworkEndpointType: gce.NEGIPPortNetworkEndpointType,
LoadBalancer: &compute.NetworkEndpointGroupLbNetworkEndpointGroup{
Network: s.cloud.NetworkURL(),
Subnetwork: s.cloud.SubnetworkURL(),
},
}, zone)
if err != nil {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.cloud, s.serviceLister, s.recorder); err != nil {
errList = append(errList, err)
}
}
return utilerrors.NewAggregate(errList)
Expand Down Expand Up @@ -445,13 +386,13 @@ func (s *batchSyncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.

func (s *batchSyncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.NegSyncerKey.String(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.AttachNetworkEndpoints, "Attach")
}

func (s *batchSyncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.NegSyncerKey.String(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.DetachNetworkEndpoints, "Detach")
}

Expand Down Expand Up @@ -485,50 +426,3 @@ func (s *batchSyncer) resetRetryDelay() {
s.retryCount = 0
s.lastRetryDelay = time.Duration(0)
}

func (s *batchSyncer) formattedName() string {
return fmt.Sprintf("%s/%s-%v/%s", s.Namespace, s.Name, s.Port, s.TargetPort)
}

// encodeEndpoint encodes ip and instance into a single string
func encodeEndpoint(ip, instance, port string) string {
return fmt.Sprintf("%s||%s||%s", ip, instance, port)
}

// decodeEndpoint decodes ip and instance from an encoded string
func decodeEndpoint(str string) (string, string, string) {
strs := strings.Split(str, "||")
return strs[0], strs[1], strs[2]
}

// calculateDifference determines what endpoints needs to be added and removed in order to move current state to target state.
func calculateDifference(targetMap, currentMap map[string]sets.String) (map[string]sets.String, map[string]sets.String) {
addSet := map[string]sets.String{}
removeSet := map[string]sets.String{}
for zone, endpointSet := range targetMap {
diff := endpointSet.Difference(currentMap[zone])
if len(diff) > 0 {
addSet[zone] = diff
}
}

for zone, endpointSet := range currentMap {
diff := endpointSet.Difference(targetMap[zone])
if len(diff) > 0 {
removeSet[zone] = diff
}
}
return addSet, removeSet
}

// getService retrieves service object from serviceLister based on the input Namespace and Name
func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Service {
service, exists, err := serviceLister.GetByKey(utils.ServiceKeyFunc(namespace, name))
if exists && err == nil {
return service.(*apiv1.Service)
}
if err != nil {
glog.Errorf("Failed to retrieve service %s/%s from store: %v", namespace, name, err)
}
return nil
}
134 changes: 0 additions & 134 deletions pkg/neg/syncers/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,140 +162,6 @@ func TestToZoneNetworkEndpointMap(t *testing.T) {
}
}

func TestEncodeDecodeEndpoint(t *testing.T) {
ip := "10.0.0.10"
instance := "somehost"
port := "8080"

retIp, retInstance, retPort := decodeEndpoint(encodeEndpoint(ip, instance, port))

if ip != retIp || instance != retInstance || retPort != port {
t.Fatalf("Encode and decode endpoint failed. Expect %q, %q, %q but got %q, %q, %q.", ip, instance, port, retIp, retInstance, retPort)
}
}

func TestCalculateDifference(t *testing.T) {
testCases := []struct {
targetSet map[string]sets.String
currentSet map[string]sets.String
addSet map[string]sets.String
removeSet map[string]sets.String
}{
// unchanged
{
targetSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
currentSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
addSet: map[string]sets.String{},
removeSet: map[string]sets.String{},
},
// unchanged
{
targetSet: map[string]sets.String{},
currentSet: map[string]sets.String{},
addSet: map[string]sets.String{},
removeSet: map[string]sets.String{},
},
// add in one zone
{
targetSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
currentSet: map[string]sets.String{},
addSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
removeSet: map[string]sets.String{},
},
// add in 2 zones
{
targetSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
negtypes.TestZone2: sets.NewString("e", "f", "g"),
},
currentSet: map[string]sets.String{},
addSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
negtypes.TestZone2: sets.NewString("e", "f", "g"),
},
removeSet: map[string]sets.String{},
},
// remove in one zone
{
targetSet: map[string]sets.String{},
currentSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
addSet: map[string]sets.String{},
removeSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
},
// remove in 2 zones
{
targetSet: map[string]sets.String{},
currentSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
negtypes.TestZone2: sets.NewString("e", "f", "g"),
},
addSet: map[string]sets.String{},
removeSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
negtypes.TestZone2: sets.NewString("e", "f", "g"),
},
},
// add and delete in one zone
{
targetSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
},
currentSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("b", "c", "d"),
},
addSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a"),
},
removeSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("d"),
},
},
// add and delete in 2 zones
{
targetSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a", "b", "c"),
negtypes.TestZone2: sets.NewString("a", "b", "c"),
},
currentSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("b", "c", "d"),
negtypes.TestZone2: sets.NewString("b", "c", "d"),
},
addSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("a"),
negtypes.TestZone2: sets.NewString("a"),
},
removeSet: map[string]sets.String{
negtypes.TestZone1: sets.NewString("d"),
negtypes.TestZone2: sets.NewString("d"),
},
},
}

for _, tc := range testCases {
addSet, removeSet := calculateDifference(tc.targetSet, tc.currentSet)

if !reflect.DeepEqual(addSet, tc.addSet) {
t.Errorf("Failed to calculate difference for add, expecting %v, but got %v", tc.addSet, addSet)
}

if !reflect.DeepEqual(removeSet, tc.removeSet) {
t.Errorf("Failed to calculate difference for remove, expecting %v, but got %v", tc.removeSet, removeSet)
}
}
}

func TestSyncNetworkEndpoints(t *testing.T) {
syncer := NewTestSyncer()
if err := syncer.ensureNetworkEndpointGroups(); err != nil {
Expand Down
Loading

0 comments on commit e35f1d5

Please sign in to comment.