Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sync of multi-cluster ingress #183

Merged
merged 2 commits into from
Mar 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 33 additions & 42 deletions pkg/controller/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (c *ClusterManager) shutdown() error {
return err
}
if err := c.firewallPool.Shutdown(); err != nil {
if _, ok := err.(*firewalls.FirewallSyncError); ok {
if _, ok := err.(*firewalls.FirewallXPNError); ok {
return nil
}
return err
Expand All @@ -93,61 +93,52 @@ func (c *ClusterManager) shutdown() error {
return c.backendPool.Shutdown()
}

// Checkpoint performs a checkpoint with the cloud.
// EnsureLoadBalancer creates the backend services and higher-level LB resources.
// - lb is the single cluster L7 loadbalancers we wish to exist. If they already
// exist, they should not have any broken links between say, a UrlMap and
// TargetHttpProxy.
// - nodeNames are the names of nodes we wish to add to all loadbalancer
// instance groups.
// - backendServicePorts are the ports for which we require BackendServices.
// - namedPorts are the ports which must be opened on instance groups.
// - firewallPorts are the ports which must be opened in the firewall rule.
// Returns the list of all instance groups corresponding to the given loadbalancers.
// If in performing the checkpoint the cluster manager runs out of quota, a
// googleapi 403 is returned.
func (c *ClusterManager) Checkpoint(lb *loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, endpointPorts []string) ([]*compute.InstanceGroup, error) {
glog.V(4).Infof("Checkpoint(%v lb, %v nodeNames, %v backendServicePorts, %v namedPorts, %v endpointPorts)", lb, len(nodeNames), len(backendServicePorts), len(namedPorts), len(endpointPorts))

if len(namedPorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
namedPorts = append(namedPorts, c.defaultBackendNodePort)
}
// Multiple ingress paths can point to the same service (and hence nodePort)
// but each nodePort can only have one set of cloud resources behind it. So
// don't waste time double validating GCE BackendServices.
namedPorts = uniq(namedPorts)
backendServicePorts = uniq(backendServicePorts)
// Create Instance Groups.
igs, err := c.EnsureInstanceGroupsAndPorts(namedPorts)
if err != nil {
return igs, err
}
if err := c.backendPool.Ensure(backendServicePorts, igs); err != nil {
return igs, err
}
if err := c.instancePool.Sync(nodeNames); err != nil {
return igs, err
}
if err := c.l7Pool.Sync([]*loadbalancers.L7RuntimeInfo{lb}); err != nil {
return igs, err
}

if err := c.firewallPool.Sync(nodeNames, endpointPorts...); err != nil {
return igs, err
// - lbServicePorts are the ports for which we require Backend Services.
// - instanceGroups are the groups to be referenced by the Backend Services..
// If GCE runs out of quota, a googleapi 403 is returned.
func (c *ClusterManager) EnsureLoadBalancer(lb *loadbalancers.L7RuntimeInfo, lbServicePorts []backends.ServicePort, instanceGroups []*compute.InstanceGroup) error {
glog.V(4).Infof("EnsureLoadBalancer(%q lb, %v lbServicePorts, %v instanceGroups)", lb.String(), len(lbServicePorts), len(instanceGroups))
if err := c.backendPool.Ensure(uniq(lbServicePorts), instanceGroups); err != nil {
return err
}

return igs, nil
return c.l7Pool.Sync([]*loadbalancers.L7RuntimeInfo{lb})
}

func (c *ClusterManager) EnsureInstanceGroupsAndPorts(servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servicePorts []backends.ServicePort) ([]*compute.InstanceGroup, error) {
if len(servicePorts) != 0 {
// Add the default backend node port to the list of named ports for instance groups.
servicePorts = append(servicePorts, c.defaultBackendNodePort)
}

// Convert to slice of NodePort int64s.
ports := []int64{}
for _, p := range servicePorts {
for _, p := range uniq(servicePorts) {
ports = append(ports, p.NodePort)
}

// Create instance groups and set named ports.
igs, err := instances.EnsureInstanceGroupsAndPorts(c.instancePool, c.ClusterNamer, ports)
if err != nil {
return nil, err
}

// Add/remove instances to the instance groups.
if err = c.instancePool.Sync(nodeNames); err != nil {
return nil, err
}

return igs, err
}

func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error {
return c.firewallPool.Sync(nodeNames, endpointPorts...)
}

// GC garbage collects unused resources.
// - lbNames are the names of L7 loadbalancers we wish to exist. Those not in
// this list are removed from the cloud.
Expand Down
121 changes: 66 additions & 55 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
return nil
}

// sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) (err error) {
// sync manages Ingress create/updates/deletes
func (lbc *LoadBalancerController) sync(key string) (retErr error) {
if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod)
return fmt.Errorf("waiting for stores to sync")
Expand All @@ -247,89 +247,96 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
return err
}

// allNodePorts contains ServicePorts used by all ingresses (single-cluster and multi-cluster).
allNodePorts := lbc.Translator.ToNodePorts(&allIngresses)
// gceNodePorts contains the ServicePorts used by only single-cluster ingress.
gceNodePorts := lbc.Translator.ToNodePorts(&gceIngresses)
nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister))
if err != nil {
return err
}
lbNames := lbc.ingLister.Store.ListKeys()

lbNames := lbc.ingLister.Store.ListKeys()
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
return err
}

if !ingExists {
glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key)
return lbc.CloudClusterManager.GC(lbNames, allNodePorts)
// GC will find GCE resources that were used for this ingress and delete them.
return lbc.CloudClusterManager.GC(lbNames, gceNodePorts)
}

// Get ingress and DeepCopy for assurance that we don't pollute other goroutines with changes.
ing, ok := obj.(*extensions.Ingress)
if !ok {
return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj)
}
// DeepCopy for assurance that we don't pollute other goroutines with changes.
ing = ing.DeepCopy()

// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
// successful checkpoint we know that existing L7s are WAI, and the L7
// for the Ingress associated with "key" is ready for a UrlMap update.
// If this encounters an error, eg for quota reasons, we want to invoke
// Phase 2 right away and retry checkpointing.
// * Phase 2 performs GC by refcounting shared resources. This needs to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, I think this comment was useful specially this explanation of why we always run phase 2, even when phase 1 (creating LB fails).
This helps understand why we have the defer func and tricky syncError tracking code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will replace with a better comment elsewhere.

// happen periodically whether or not stage 1 fails. At the end of a
// successful GC we know that there are no dangling cloud resources that
// don't have an associated Kubernetes Ingress/Service/Endpoint.

var syncError error
defer func() {
if deferErr := lbc.CloudClusterManager.GC(lbNames, allNodePorts); deferErr != nil {
err = fmt.Errorf("error during sync %v, error during GC %v", syncError, deferErr)
if retErr != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", retErr.Error())
}
// Garbage collection will occur regardless of an error occurring. If an error occurred,
// it could have been caused by quota issues; therefore, garbage collecting now may
// free up enough quota for the next sync to pass.
if gcErr := lbc.CloudClusterManager.GC(lbNames, gceNodePorts); gcErr != nil {
retErr = fmt.Errorf("error during sync %v, error during GC %v", retErr, gcErr)
}
glog.V(3).Infof("Finished syncing %v", key)
}()

singleIngressList := &extensions.IngressList{
Items: []extensions.Ingress{*ing},
igs, err := lbc.CloudClusterManager.EnsureInstanceGroupsAndPorts(nodeNames, allNodePorts)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set syncError = err as at other places?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. As I said, WIP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be retErr now?

}

if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress and return.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same. set syncError = err?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted syncError

}
if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil {
return err
}
glog.V(3).Infof("Finished syncing MCI-ingress %v", key)
return nil
}

// Continue syncing this specific GCE ingress.
lb, err := lbc.toRuntimeInfo(ing)
if err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Ingress", err.Error())
return err
}

// Get all service ports for the ingress being synced.
ingSvcPorts := lbc.Translator.ToNodePorts(singleIngressList)
lbSvcPorts := lbc.Translator.ToNodePorts(&extensions.IngressList{
Items: []extensions.Ingress{*ing},
})

igs, err := lbc.CloudClusterManager.Checkpoint(lb, nodeNames, ingSvcPorts, allNodePorts, lbc.Translator.GatherEndpointPorts(gceNodePorts))
if err != nil {
const eventMsg = "GCE"
if fwErr, ok := err.(*firewalls.FirewallSyncError); ok {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, eventMsg, fwErr.Message)
} else {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, eventMsg, err.Error())
syncError = err
}
// Create the backend services and higher-level LB resources.
if err = lbc.CloudClusterManager.EnsureLoadBalancer(lb, lbSvcPorts, igs); err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set to retErr?

}

if isGCEMultiClusterIngress(ing) {
// Add instance group names as annotation on the ingress.
if ing.Annotations == nil {
ing.Annotations = map[string]string{}
}
if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil {
negEndpointPorts := lbc.Translator.GatherEndpointPorts(gceNodePorts)
// Ensure firewall rule for the cluster and pass any NEG endpoint ports.
if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts); err != nil {
if fwErr, ok := err.(*firewalls.FirewallXPNError); ok {
// XPN: Raise an event and ignore the error.
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, "XPN", fwErr.Message)
} else {
return err
}
return updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations)
}

// If NEG enabled, link the backend services to the NEGs.
if lbc.negEnabled {
svcPorts := lbc.Translator.ToNodePorts(singleIngressList)
for _, svcPort := range svcPorts {
for _, svcPort := range lbSvcPorts {
if svcPort.NEGEnabled {

zones, err := lbc.Translator.ListZones()
if err != nil {
return err
Expand All @@ -344,20 +351,24 @@ func (lbc *LoadBalancerController) sync(key string) (err error) {
// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
syncError = fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)
return syncError
return fmt.Errorf("unable to get loadbalancer: %v", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was being set to syncError earlier and hence we were generating event for this. Is the change to not generate event intentional?

}

urlMap, err := lbc.Translator.ToURLMap(ing)
if err != nil {
return fmt.Errorf("convert to URL Map error %v", err)
}

if urlMap, err := lbc.Translator.ToURLMap(ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
if err := l7.UpdateUrlMap(urlMap); err != nil {
return fmt.Errorf("update URL Map error: %v", err)
}
return syncError

if err := lbc.updateIngressStatus(l7, ing); err != nil {
return fmt.Errorf("update ingress status error: %v", err)
}

glog.V(3).Infof("Finished syncing %v", key)
return nil
}

// updateIngressStatus updates the IP and annotations of a loadbalancer.
Expand Down
8 changes: 4 additions & 4 deletions pkg/firewalls/firewalls.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,19 +147,19 @@ func (fr *FirewallRules) deleteFirewall(name string) error {
return err
}

func newFirewallXPNError(internal error, cmd string) *FirewallSyncError {
return &FirewallSyncError{
func newFirewallXPNError(internal error, cmd string) *FirewallXPNError {
return &FirewallXPNError{
Internal: internal,
Message: fmt.Sprintf("Firewall change required by network admin: `%v`", cmd),
}
}

type FirewallSyncError struct {
type FirewallXPNError struct {
Internal error
Message string
}

func (f *FirewallSyncError) Error() string {
func (f *FirewallXPNError) Error() string {
return f.Message
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/firewalls/firewalls_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func TestSyncXPNReadOnly(t *testing.T) {
nodes := []string{"node-a", "node-b", "node-c"}

err := fp.Sync(nodes)
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "create") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "create") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}

Expand Down Expand Up @@ -193,12 +193,12 @@ func TestSyncXPNReadOnly(t *testing.T) {

nodes = append(nodes, "node-d")
err = fp.Sync(nodes)
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "update") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "update") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}

err = fp.Shutdown()
if fwErr, ok := err.(*FirewallSyncError); !ok || !strings.Contains(fwErr.Message, "delete") {
if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "delete") {
t.Errorf("Expected firewall sync error with a user message. Received err: %v", err)
}
}
Expand Down