Skip to content

Commit

Permalink
Sync backends for a single ingress at a time
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Sardo committed Feb 6, 2018
1 parent 64a3126 commit 0d0a394
Showing 1 changed file with 30 additions and 35 deletions.
65 changes: 30 additions & 35 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,34 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey)
glog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey)
return
}

glog.V(3).Infof("Ingress %v/%v added, enqueuing", addIng.Namespace, addIng.Name)
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*extensions.Ingress)
if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) {
glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey)
glog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey)
return
}
glog.Infof("Delete notification received for Ingress %v/%v", delIng.Namespace, delIng.Name)

glog.V(3).Infof("Ingress %v/%v deleted, enqueueing", delIng.Namespace, delIng.Name)
lbc.ingQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
curIng := cur.(*extensions.Ingress)
if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) {
return
}
if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Ingress %v changed, syncing", curIng.Name)
if reflect.DeepEqual(old, cur) {
return
}

glog.V(3).Infof("Ingress %v/%v changed, enqueuing", curIng.Namespace, curIng.Name)
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -259,10 +264,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return lbc.CloudClusterManager.GC(lbNames, allNodePorts)
}

ingressObj, ok := obj.(*extensions.Ingress)
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
// DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()

// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
Expand All @@ -283,41 +290,29 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
glog.V(3).Infof("Finished syncing %v", key)
}()

// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
lbs, err := lbc.toRuntimeInfo(extensions.IngressList{
Items: []extensions.Ingress{*ingressObj},
})
singleIngressList := &extensions.IngressList{
Items: []extensions.Ingress{*ing},
}
lbs, err := lbc.toRuntimeInfo(singleIngressList)
if err != nil {
return err
}

igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
// Get all service ports for the ingress being synced.
ingSvcPorts := lbc.Translator.ToNodePorts(singleIngressList)

igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, ingSvcPorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
if err != nil {
// TODO: Implement proper backoff for the queue.
const eventMsg = "GCE"
if fwErr, ok := err.(*firewalls.FirewallSyncError); ok {
if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists {
lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
glog.Warningf("Received firewallSyncError but don't have an ingress for raising an event: %v", fwErr.Message)
}
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists {
lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v, error: %v", eventMsg, err)
}
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, eventMsg, err.Error())
syncError = err
}
}

if !ingExists {
return syncError
}

ing := *obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(&ing) {
if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
Expand All @@ -329,7 +324,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
}

if lbc.negEnabled {
svcPorts := lbc.Translator.ToNodePorts(&extensions.IngressList{Items: []extensions.Ingress{ing}})
svcPorts := lbc.Translator.ToNodePorts(singleIngressList)
for _, svcPort := range svcPorts {
if svcPort.NEGEnabled {

Expand All @@ -351,21 +346,21 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return syncError
}

if urlMap, err := lbc.Translator.ToURLMap(&ing); err != nil {
if urlMap, err := lbc.Translator.ToURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)

// Update IP through update/status endpoint
Expand Down Expand Up @@ -401,7 +396,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
}

// toRuntimeInfo returns L7RuntimeInfo for the given ingresses.
func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) (lbs []*loadbalancers.L7RuntimeInfo, err error) {
func (lbc *LoadBalancerController) toRuntimeInfo(ingList *extensions.IngressList) (lbs []*loadbalancers.L7RuntimeInfo, err error) {
for _, ing := range ingList.Items {
k, err := keyFunc(&ing)
if err != nil {
Expand Down

0 comments on commit 0d0a394

Please sign in to comment.