Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync ingress-specific backends and minor logging changes #123

Merged
merged 2 commits into from
Feb 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 30 additions & 35 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,29 +119,34 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey)
glog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey)
return
}

glog.V(3).Infof("Ingress %v/%v added, enqueuing", addIng.Namespace, addIng.Name)
lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.Enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*extensions.Ingress)
if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) {
glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey)
glog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey)
return
}
glog.Infof("Delete notification received for Ingress %v/%v", delIng.Namespace, delIng.Name)

glog.V(3).Infof("Ingress %v/%v deleted, enqueueing", delIng.Namespace, delIng.Name)
lbc.ingQueue.Enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
curIng := cur.(*extensions.Ingress)
if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) {
return
}
if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Ingress %v changed, syncing", curIng.Name)
if reflect.DeepEqual(old, cur) {
return
}

glog.V(3).Infof("Ingress %v/%v changed, enqueuing", curIng.Namespace, curIng.Name)
lbc.ingQueue.Enqueue(cur)
},
})
Expand Down Expand Up @@ -259,10 +264,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return lbc.CloudClusterManager.GC(lbNames, allNodePorts)
}

ingressObj, ok := obj.(*extensions.Ingress)
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
// DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()

// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
Expand All @@ -283,41 +290,29 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
glog.V(3).Infof("Finished syncing %v", key)
}()

// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
lbs, err := lbc.toRuntimeInfo(extensions.IngressList{
Items: []extensions.Ingress{*ingressObj},
})
singleIngressList := &extensions.IngressList{
Items: []extensions.Ingress{*ing},
}
lbs, err := lbc.toRuntimeInfo(singleIngressList)
if err != nil {
return err
}

igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
// Get all service ports for the ingress being synced.
ingSvcPorts := lbc.Translator.ToNodePorts(singleIngressList)

igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, ingSvcPorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
if err != nil {
// TODO: Implement proper backoff for the queue.
const eventMsg = "GCE"
if fwErr, ok := err.(*firewalls.FirewallSyncError); ok {
if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists {
lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
glog.Warningf("Received firewallSyncError but don't have an ingress for raising an event: %v", fwErr.Message)
}
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists {
lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v, error: %v", eventMsg, err)
}
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, eventMsg, err.Error())
syncError = err
}
}

if !ingExists {
return syncError
}

ing := *obj.(*extensions.Ingress)
if isGCEMultiClusterIngress(&ing) {
if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
Expand All @@ -329,7 +324,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
}

if lbc.negEnabled {
svcPorts := lbc.Translator.ToNodePorts(&extensions.IngressList{Items: []extensions.Ingress{ing}})
svcPorts := lbc.Translator.ToNodePorts(singleIngressList)
for _, svcPort := range svcPorts {
if svcPort.NEGEnabled {

Expand All @@ -351,21 +346,21 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return syncError
}

if urlMap, err := lbc.Translator.ToURLMap(&ing); err != nil {
if urlMap, err := lbc.Translator.ToURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error())
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing *extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingresses(ing.Namespace)

// Update IP through update/status endpoint
Expand Down Expand Up @@ -401,7 +396,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing
}

// toRuntimeInfo returns L7RuntimeInfo for the given ingresses.
func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) (lbs []*loadbalancers.L7RuntimeInfo, err error) {
func (lbc *LoadBalancerController) toRuntimeInfo(ingList *extensions.IngressList) (lbs []*loadbalancers.L7RuntimeInfo, err error) {
for _, ing := range ingList.Items {
k, err := keyFunc(&ing)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/translator/translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func (t *GCE) getHTTPProbe(svc api_v1.Service, targetPort intstr.IntOrString, pr
}
}
}
glog.V(4).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr)
glog.V(5).Infof("%v: lacks a matching HTTP probe for use in health checks.", logStr)
}
return nil, nil
}
Expand Down