Skip to content

Commit

Permalink
Merge pull request #492 from freehan/unblocking-syncer
Browse files Browse the repository at this point in the history
Restructure syncer package
  • Loading branch information
k8s-ci-robot authored Oct 14, 2018
2 parents 24a5ada + e35f1d5 commit 2d2c91a
Show file tree
Hide file tree
Showing 6 changed files with 331 additions and 261 deletions.
2 changes: 1 addition & 1 deletion pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncer"
negsyncer "k8s.io/ingress-gce/pkg/neg/syncers"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
)

Expand Down
12 changes: 6 additions & 6 deletions pkg/neg/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
"k8s.io/client-go/tools/record"
backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake"
"k8s.io/ingress-gce/pkg/context"
"k8s.io/ingress-gce/pkg/neg/syncer"
"k8s.io/ingress-gce/pkg/neg/syncers"
"k8s.io/ingress-gce/pkg/neg/types"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
Expand Down Expand Up @@ -67,14 +67,14 @@ func TestEnsureAndStopSyncer(t *testing.T) {
name string
ports types.PortNameMap
stop bool
expect []syncer.NegSyncerKey // keys of running syncers
expect []syncers.NegSyncerKey // keys of running syncers
}{
{
"ns1",
"n1",
types.PortNameMap{1000: "80", 2000: "443"},
false,
[]syncer.NegSyncerKey{
[]syncers.NegSyncerKey{
getSyncerKey("ns1", "n1", 1000, "80"),
getSyncerKey("ns1", "n1", 2000, "443"),
},
Expand All @@ -84,7 +84,7 @@ func TestEnsureAndStopSyncer(t *testing.T) {
"n1",
types.PortNameMap{3000: "80", 4000: "namedport"},
false,
[]syncer.NegSyncerKey{
[]syncers.NegSyncerKey{
getSyncerKey("ns1", "n1", 3000, "80"),
getSyncerKey("ns1", "n1", 4000, "namedport"),
},
Expand All @@ -94,7 +94,7 @@ func TestEnsureAndStopSyncer(t *testing.T) {
"n1",
types.PortNameMap{3000: "80"},
false,
[]syncer.NegSyncerKey{
[]syncers.NegSyncerKey{
getSyncerKey("ns1", "n1", 3000, "80"),
getSyncerKey("ns1", "n1", 4000, "namedport"),
getSyncerKey("ns2", "n1", 3000, "80"),
Expand All @@ -105,7 +105,7 @@ func TestEnsureAndStopSyncer(t *testing.T) {
"n1",
types.PortNameMap{},
true,
[]syncer.NegSyncerKey{
[]syncers.NegSyncerKey{
getSyncerKey("ns2", "n1", 3000, "80"),
},
},
Expand Down
132 changes: 13 additions & 119 deletions pkg/neg/syncer/batch.go → pkg/neg/syncers/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,12 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package syncer
package syncers

import (
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"

Expand All @@ -35,27 +34,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/ingress-gce/pkg/neg/metrics"
negtypes "k8s.io/ingress-gce/pkg/neg/types"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
)

const (
MAX_NETWORK_ENDPOINTS_PER_BATCH = 500
// For each NEG, only retries 15 times to process it.
// This is a convention in kube-controller-manager.
maxRetries = 15
minRetryDelay = 5 * time.Second
maxRetryDelay = 600 * time.Second
)

// NegSyncerKey includes information to uniquely identify a NEG
type NegSyncerKey struct {
Namespace string
Name string
Port int32
TargetPort string
}

// batchSyncer handles synchorizing NEGs for one service port. It handles sync, resync and retry on error.
// It syncs NEG in batch and waits for all operation to complete before continue to the next batch.
type batchSyncer struct {
Expand Down Expand Up @@ -107,13 +87,13 @@ func (s *batchSyncer) init() {
// Start starts the syncer go routine if it has not been started.
func (s *batchSyncer) Start() error {
if !s.IsStopped() {
return fmt.Errorf("NEG syncer for %s is already running.", s.formattedName())
return fmt.Errorf("NEG syncer for %s is already running.", s.NegSyncerKey.String())
}
if s.IsShuttingDown() {
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.formattedName())
return fmt.Errorf("NEG syncer for %s is shutting down. ", s.NegSyncerKey.String())
}

glog.V(2).Infof("Starting NEG syncer for service port %s", s.formattedName())
glog.V(2).Infof("Starting NEG syncer for service port %s", s.NegSyncerKey.String())
s.init()
go func() {
for {
Expand Down Expand Up @@ -142,7 +122,7 @@ func (s *batchSyncer) Start() error {
s.stateLock.Lock()
s.shuttingDown = false
s.stateLock.Unlock()
glog.V(2).Infof("Stopping NEG syncer for %s", s.formattedName())
glog.V(2).Infof("Stopping NEG syncer for %s", s.NegSyncerKey.String())
return
}
case <-retryCh:
Expand All @@ -158,7 +138,7 @@ func (s *batchSyncer) Stop() {
s.stateLock.Lock()
defer s.stateLock.Unlock()
if !s.stopped {
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.formattedName())
glog.V(2).Infof("Stopping NEG syncer for service port %s", s.NegSyncerKey.String())
s.stopped = true
s.shuttingDown = true
close(s.syncCh)
Expand All @@ -168,7 +148,7 @@ func (s *batchSyncer) Stop() {
// Sync informs syncer to run sync loop as soon as possible.
func (s *batchSyncer) Sync() bool {
if s.IsStopped() {
glog.Warningf("NEG syncer for %s is already stopped.", s.formattedName())
glog.Warningf("NEG syncer for %s is already stopped.", s.NegSyncerKey.String())
return false
}
select {
Expand All @@ -193,10 +173,10 @@ func (s *batchSyncer) IsShuttingDown() bool {

func (s *batchSyncer) sync() (err error) {
if s.IsStopped() || s.IsShuttingDown() {
glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.formattedName())
glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.NegSyncerKey.String())
return nil
}
glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.formattedName())
glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.NegSyncerKey.String())
start := time.Now()
defer metrics.ObserveNegSync(s.negName, metrics.AttachSync, err, start)
ep, exists, err := s.endpointLister.Get(
Expand Down Expand Up @@ -250,47 +230,8 @@ func (s *batchSyncer) ensureNetworkEndpointGroups() error {

var errList []error
for _, zone := range zones {
// Assume error is caused by not existing
neg, err := s.cloud.GetNetworkEndpointGroup(s.negName, zone)
if err != nil {
// Most likely to be caused by non-existed NEG
glog.V(4).Infof("Error while retriving %q in zone %q: %v", s.negName, zone, err)
}

needToCreate := false
if neg == nil {
needToCreate = true
} else if !utils.EqualResourceIDs(neg.LoadBalancer.Network, s.cloud.NetworkURL()) ||
!utils.EqualResourceIDs(neg.LoadBalancer.Subnetwork, s.cloud.SubnetworkURL()) {
needToCreate = true
glog.V(2).Infof("NEG %q in %q does not match network and subnetwork of the cluster. Deleting NEG.", s.negName, zone)
err = s.cloud.DeleteNetworkEndpointGroup(s.negName, zone)
if err != nil {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
}

if needToCreate {
glog.V(2).Infof("Creating NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
err = s.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{
Name: s.negName,
NetworkEndpointType: gce.NEGIPPortNetworkEndpointType,
LoadBalancer: &compute.NetworkEndpointGroupLbNetworkEndpointGroup{
Network: s.cloud.NetworkURL(),
Subnetwork: s.cloud.SubnetworkURL(),
},
}, zone)
if err != nil {
errList = append(errList, err)
} else {
if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil {
s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s in %q.", s.negName, s.formattedName(), zone)
}
}
if err := ensureNetworkEndpointGroup(s.Namespace, s.Name, s.negName, zone, s.NegSyncerKey.String(), s.cloud, s.serviceLister, s.recorder); err != nil {
errList = append(errList, err)
}
}
return utilerrors.NewAggregate(errList)
Expand Down Expand Up @@ -445,13 +386,13 @@ func (s *batchSyncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.

func (s *batchSyncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.NegSyncerKey.String(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.AttachNetworkEndpoints, "Attach")
}

func (s *batchSyncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) {
wg.Add(1)
glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone)
glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.NegSyncerKey.String(), s.negName, zone)
go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.DetachNetworkEndpoints, "Detach")
}

Expand Down Expand Up @@ -485,50 +426,3 @@ func (s *batchSyncer) resetRetryDelay() {
s.retryCount = 0
s.lastRetryDelay = time.Duration(0)
}

func (s *batchSyncer) formattedName() string {
return fmt.Sprintf("%s/%s-%v/%s", s.Namespace, s.Name, s.Port, s.TargetPort)
}

// encodeEndpoint encodes ip and instance into a single string
func encodeEndpoint(ip, instance, port string) string {
return fmt.Sprintf("%s||%s||%s", ip, instance, port)
}

// decodeEndpoint decodes ip and instance from an encoded string
func decodeEndpoint(str string) (string, string, string) {
strs := strings.Split(str, "||")
return strs[0], strs[1], strs[2]
}

// calculateDifference determines what endpoints needs to be added and removed in order to move current state to target state.
func calculateDifference(targetMap, currentMap map[string]sets.String) (map[string]sets.String, map[string]sets.String) {
addSet := map[string]sets.String{}
removeSet := map[string]sets.String{}
for zone, endpointSet := range targetMap {
diff := endpointSet.Difference(currentMap[zone])
if len(diff) > 0 {
addSet[zone] = diff
}
}

for zone, endpointSet := range currentMap {
diff := endpointSet.Difference(targetMap[zone])
if len(diff) > 0 {
removeSet[zone] = diff
}
}
return addSet, removeSet
}

// getService retrieves service object from serviceLister based on the input Namespace and Name
func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Service {
service, exists, err := serviceLister.GetByKey(utils.ServiceKeyFunc(namespace, name))
if exists && err == nil {
return service.(*apiv1.Service)
}
if err != nil {
glog.Errorf("Failed to retrieve service %s/%s from store: %v", namespace, name, err)
}
return nil
}
Loading

0 comments on commit 2d2c91a

Please sign in to comment.