From ebaf600aad9cdbf03e5a7338dc05dc84964bca36 Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 12 Jul 2018 14:29:38 -0700 Subject: [PATCH] Extract firewall management into separate controller --- cmd/glbc/main.go | 10 +- pkg/context/context.go | 5 + pkg/controller/cluster_manager.go | 34 ------ pkg/controller/controller.go | 138 +++++++++------------- pkg/controller/controller_test.go | 8 +- pkg/controller/fakes.go | 18 +-- pkg/controller/node.go | 10 +- pkg/controller/utils.go | 116 ++----------------- pkg/controller/utils_test.go | 5 +- pkg/firewalls/controller.go | 183 ++++++++++++++++++++++++++++++ pkg/firewalls/controller_test.go | 105 +++++++++++++++++ pkg/firewalls/firewalls.go | 18 ++- pkg/firewalls/firewalls_test.go | 6 +- pkg/firewalls/interfaces.go | 2 +- pkg/test/utils.go | 11 ++ pkg/utils/join.go | 51 +++++++++ pkg/utils/taskqueue.go | 26 +++-- pkg/utils/utils.go | 110 ++++++++++++++++++ 18 files changed, 584 insertions(+), 272 deletions(-) create mode 100644 pkg/firewalls/controller.go create mode 100644 pkg/firewalls/controller_test.go create mode 100644 pkg/utils/join.go diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 10dfe1f678..7dae5222af 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -40,6 +40,7 @@ import ( "k8s.io/ingress-gce/cmd/glbc/app" "k8s.io/ingress-gce/pkg/backendconfig" "k8s.io/ingress-gce/pkg/crd" + "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/version" ) @@ -166,9 +167,9 @@ func makeLeaderElectionConfig(client clientset.Interface, recorder record.EventR } func runControllers(ctx *context.ControllerContext) { - namer, err := app.NewNamer(ctx.KubeClient, flags.F.ClusterName, controller.DefaultFirewallName) + namer, err := app.NewNamer(ctx.KubeClient, flags.F.ClusterName, firewalls.DefaultFirewallName) if err != nil { - glog.Fatalf("app.NewNamer(ctx.KubeClient, %q, %q) = %v", flags.F.ClusterName, controller.DefaultFirewallName, err) + glog.Fatalf("app.NewNamer(ctx.KubeClient, %q, %q) = %v", flags.F.ClusterName, firewalls.DefaultFirewallName, err) } clusterManager, err := controller.NewClusterManager(ctx, namer, flags.F.HealthCheckPath, flags.F.DefaultSvcHealthCheckPath) @@ -182,6 +183,8 @@ func runControllers(ctx *context.ControllerContext) { glog.Fatalf("controller.NewLoadBalancerController(ctx, clusterManager, stopCh) = %v", err) } + fwc := firewalls.NewFirewallController(ctx, namer, flags.F.NodePortRanges.Values()) + if clusterManager.ClusterNamer.UID() != "" { glog.V(0).Infof("Cluster name: %+v", clusterManager.ClusterNamer.UID()) } @@ -197,6 +200,9 @@ func runControllers(ctx *context.ControllerContext) { go app.RunSIGTERMHandler(lbc, flags.F.DeleteAllOnQuit) + go fwc.Run(stopCh) + glog.V(0).Infof("firewall controller started") + ctx.Start(stopCh) lbc.Run() diff --git a/pkg/context/context.go b/pkg/context/context.go index f675e33e4f..a777749cfb 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -33,6 +33,11 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" ) +const ( + // Frequency to poll on local stores to sync. + StoreSyncPollPeriod = 5 * time.Second +) + // ControllerContext holds the state needed for the execution of the controller. type ControllerContext struct { KubeClient kubernetes.Interface diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go index ce0edf25e9..225587b0f0 100644 --- a/pkg/controller/cluster_manager.go +++ b/pkg/controller/cluster_manager.go @@ -17,17 +17,12 @@ limitations under the License. package controller import ( - "net/http" - "github.com/golang/glog" compute "google.golang.org/api/compute/v1" - gce "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/context" - "k8s.io/ingress-gce/pkg/firewalls" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/loadbalancers" @@ -40,7 +35,6 @@ type ClusterManager struct { instancePool instances.NodePool backendPool backends.BackendPool l7Pool loadbalancers.LoadBalancerPool - firewallPool firewalls.SingleFirewallPool // TODO: Refactor so we simply init a health check pool. healthChecker healthchecks.HealthChecker @@ -53,32 +47,10 @@ func (c *ClusterManager) Init(zl instances.ZoneLister, pp backends.ProbeProvider // TODO: Initialize other members as needed. } -// IsHealthy returns an error if the cluster manager is unhealthy. -func (c *ClusterManager) IsHealthy() (err error) { - // TODO: Expand on this, for now we just want to detect when the GCE client - // is broken. - _, err = c.backendPool.List() - - // If this container is scheduled on a node without compute/rw it is - // effectively useless, but it is healthy. Reporting it as unhealthy - // will lead to container crashlooping. - if utils.IsHTTPErrorCode(err, http.StatusForbidden) { - glog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err) - return nil - } - return -} - func (c *ClusterManager) shutdown() error { if err := c.l7Pool.Shutdown(); err != nil { return err } - if err := c.firewallPool.Shutdown(); err != nil { - if _, ok := err.(*firewalls.FirewallXPNError); ok { - return nil - } - return err - } // The backend pool will also delete instance groups. return c.backendPool.Shutdown() } @@ -106,10 +78,6 @@ func (c *ClusterManager) EnsureInstanceGroupsAndPorts(nodeNames []string, servic return igs, err } -func (c *ClusterManager) EnsureFirewall(nodeNames []string, endpointPorts []string) error { - return c.firewallPool.Sync(nodeNames, endpointPorts...) -} - // GC garbage collects unused resources. // - lbNames are the names of L7 loadbalancers we wish to exist. Those not in // this list are removed from the cloud. @@ -144,7 +112,6 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []utils.ServicePort) err return err } glog.V(2).Infof("Shutting down firewall as there are no loadbalancers") - c.firewallPool.Shutdown() } return nil @@ -174,6 +141,5 @@ func NewClusterManager( // L7 pool creates targetHTTPProxy, ForwardingRules, UrlMaps, StaticIPs. cluster.l7Pool = loadbalancers.NewLoadBalancerPool(ctx.Cloud, cluster.ClusterNamer) - cluster.firewallPool = firewalls.NewFirewallPool(ctx.Cloud, cluster.ClusterNamer, gce.LoadBalancerSrcRanges(), flags.F.NodePortRanges.Values()) return &cluster, nil } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ef3a6a0502..c2be805b09 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "net/http" "reflect" "sync" "time" @@ -33,37 +34,23 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" backendconfigv1beta1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1beta1" - backendconfig "k8s.io/ingress-gce/pkg/backendconfig" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller/translator" - "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/tls" "k8s.io/ingress-gce/pkg/utils" ) -const ( - // DefaultFirewallName is the default firewall name. - DefaultFirewallName = "" - // Frequency to poll on local stores to sync. - storeSyncPollPeriod = 5 * time.Second -) - -var ( - keyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc - // DefaultFirewallName is the name to user for firewall rules created - // by an L7 controller when the --fireall-rule is not used. -) - // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { client kubernetes.Interface ctx *context.ControllerContext - ingLister StoreToIngressLister + joiner *utils.Joiner + ingLister utils.StoreToIngressLister nodeLister cache.Indexer nodes *NodeController @@ -101,7 +88,7 @@ func NewLoadBalancerController( lbc := LoadBalancerController{ client: ctx.KubeClient, ctx: ctx, - ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, + ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, nodeLister: ctx.NodeInformer.GetIndexer(), nodes: NewNodeController(ctx, clusterManager), CloudClusterManager: clusterManager, @@ -109,12 +96,13 @@ func NewLoadBalancerController( hasSynced: ctx.HasSynced, } lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) + lbc.joiner = utils.NewJoiner(lbc.ingLister, ctx.ServiceInformer.GetIndexer(), ctx.DefaultBackendSvcPortID) // Ingress event handlers. ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) - if !isGCEIngress(addIng) && !isGCEMultiClusterIngress(addIng) { + if !utils.IsGCEIngress(addIng) && !utils.IsGCEMultiClusterIngress(addIng) { glog.V(4).Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey) return } @@ -125,7 +113,7 @@ func NewLoadBalancerController( }, DeleteFunc: func(obj interface{}) { delIng := obj.(*extensions.Ingress) - if !isGCEIngress(delIng) && !isGCEMultiClusterIngress(delIng) { + if !utils.IsGCEIngress(delIng) && !utils.IsGCEMultiClusterIngress(delIng) { glog.V(4).Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, annotations.IngressClassKey) return } @@ -135,7 +123,7 @@ func NewLoadBalancerController( }, UpdateFunc: func(old, cur interface{}) { curIng := cur.(*extensions.Ingress) - if !isGCEIngress(curIng) && !isGCEMultiClusterIngress(curIng) { + if !utils.IsGCEIngress(curIng) && !utils.IsGCEMultiClusterIngress(curIng) { return } if reflect.DeepEqual(old, cur) { @@ -150,10 +138,16 @@ func NewLoadBalancerController( // Service event handlers. ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: lbc.enqueueIngressForObject, + AddFunc: func(obj interface{}) { + svc := obj.(*apiv1.Service) + ings := lbc.joiner.IngressesForService(svc) + lbc.ingQueue.Enqueue(convert(ings)...) + }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - lbc.enqueueIngressForObject(cur) + svc := cur.(*apiv1.Service) + ings := lbc.joiner.IngressesForService(svc) + lbc.ingQueue.Enqueue(convert(ings)...) } }, // Ingress deletes matter, service deletes don't. @@ -162,13 +156,24 @@ func NewLoadBalancerController( // BackendConfig event handlers. if ctx.BackendConfigEnabled { ctx.BackendConfigInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: lbc.enqueueIngressForObject, + AddFunc: func(obj interface{}) { + beConfig := obj.(*backendconfigv1beta1.BackendConfig) + ings := lbc.joiner.IngressesForBackendConfig(beConfig) + lbc.ingQueue.Enqueue(convert(ings)...) + + }, UpdateFunc: func(old, cur interface{}) { if !reflect.DeepEqual(old, cur) { - lbc.enqueueIngressForObject(cur) + beConfig := cur.(*backendconfigv1beta1.BackendConfig) + ings := lbc.joiner.IngressesForBackendConfig(beConfig) + lbc.ingQueue.Enqueue(convert(ings)...) } }, - DeleteFunc: lbc.enqueueIngressForObject, + DeleteFunc: func(obj interface{}) { + beConfig := obj.(*backendconfigv1beta1.BackendConfig) + ings := lbc.joiner.IngressesForBackendConfig(beConfig) + lbc.ingQueue.Enqueue(convert(ings)...) + }, }) } @@ -176,55 +181,13 @@ func NewLoadBalancerController( lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.client} // Register health check on controller context. - ctx.AddHealthCheck("ingress", lbc.CloudClusterManager.IsHealthy) + ctx.AddHealthCheck("ingress", lbc.IsHealthy) glog.V(3).Infof("Created new loadbalancer controller") return &lbc } -// enqueueIngressForObject enqueues Ingresses that are associated with the -// passed in object. It is a wrapper around functions which do the actual -// work of enqueueing Ingresses based on a typed object. -func (lbc *LoadBalancerController) enqueueIngressForObject(obj interface{}) { - switch obj.(type) { - case *apiv1.Service: - svc := obj.(*apiv1.Service) - lbc.enqueueIngressForService(svc) - case *backendconfigv1beta1.BackendConfig: - beConfig := obj.(*backendconfigv1beta1.BackendConfig) - lbc.enqueueIngressForBackendConfig(beConfig) - default: - // Do nothing. - } -} - -// enqueueIngressForService enqueues all the Ingresses for a Service. -func (lbc *LoadBalancerController) enqueueIngressForService(svc *apiv1.Service) { - ings, err := lbc.ingLister.GetServiceIngress(svc, lbc.ctx.DefaultBackendSvcPortID) - if err != nil { - glog.V(5).Infof("ignoring service %v: %v", svc.Name, err) - return - } - for _, ing := range ings { - if !isGCEIngress(&ing) { - continue - } - lbc.ingQueue.Enqueue(&ing) - } -} - -// enqueueIngressForBackendConfig enqueues all Ingresses for a BackendConfig. -func (lbc *LoadBalancerController) enqueueIngressForBackendConfig(beConfig *backendconfigv1beta1.BackendConfig) { - // Get all the Services associated with this BackendConfig. - svcLister := lbc.ctx.ServiceInformer.GetIndexer() - linkedSvcs := backendconfig.GetServicesForBackendConfig(svcLister, beConfig) - // Enqueue all the Ingresses associated with each Service. - for _, svc := range linkedSvcs { - lbc.enqueueIngressForService(svc) - } -} - // Run starts the loadbalancer controller. func (lbc *LoadBalancerController) Run() { glog.Infof("Starting loadbalancer controller") @@ -259,10 +222,26 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { return nil } +// IsHealthy returns an error if the cluster manager is unhealthy. +func (lbc *LoadBalancerController) IsHealthy() (err error) { + // TODO: Expand on this, for now we just want to detect when the GCE client + // is broken. + _, err = lbc.CloudClusterManager.backendPool.List() + + // If this container is scheduled on a node without compute/rw it is + // effectively useless, but it is healthy. Reporting it as unhealthy + // will lead to container crashlooping. + if utils.IsHTTPErrorCode(err, http.StatusForbidden) { + glog.Infof("Reporting cluster as healthy, but unable to list backends: %v", err) + return nil + } + return +} + // sync manages Ingress create/updates/deletes func (lbc *LoadBalancerController) sync(key string) (retErr error) { if !lbc.hasSynced() { - time.Sleep(storeSyncPollPeriod) + time.Sleep(context.StoreSyncPollPeriod) return fmt.Errorf("waiting for stores to sync") } glog.V(3).Infof("Syncing %v", key) @@ -273,7 +252,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { } // gceSvcPorts contains the ServicePorts used by only single-cluster ingress. gceSvcPorts := lbc.ToSvcPorts(&gceIngresses) - nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) + nodeNames, err := utils.GetReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } @@ -296,7 +275,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { } ing = ing.DeepCopy() - ensureErr := lbc.ensureIngress(ing, nodeNames, gceSvcPorts) + ensureErr := lbc.ensureIngress(ing, nodeNames) if ensureErr != nil { lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeWarning, "Sync", fmt.Sprintf("Error during sync: %v", ensureErr.Error())) } @@ -311,7 +290,7 @@ func (lbc *LoadBalancerController) sync(key string) (retErr error) { return ensureErr } -func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNames []string, gceSvcPorts []utils.ServicePort) error { +func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNames []string) error { urlMap, errs := lbc.Translator.TranslateIngress(ing, lbc.ctx.DefaultBackendSvcPortID) if errs != nil { return fmt.Errorf("error while evaluating the ingress spec: %v", joinErrs(errs)) @@ -323,7 +302,7 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa return err } - if isGCEMultiClusterIngress(ing) { + if utils.IsGCEMultiClusterIngress(ing) { // Add instance group names as annotation on the ingress and return. if ing.Annotations == nil { ing.Annotations = map[string]string{} @@ -353,17 +332,6 @@ func (lbc *LoadBalancerController) ensureIngress(ing *extensions.Ingress, nodeNa return err } - negEndpointPorts := lbc.Translator.GatherEndpointPorts(gceSvcPorts) - // Ensure firewall rule for the cluster and pass any NEG endpoint ports. - if err = lbc.CloudClusterManager.EnsureFirewall(nodeNames, negEndpointPorts); err != nil { - if fwErr, ok := err.(*firewalls.FirewallXPNError); ok { - // XPN: Raise an event and ignore the error. - lbc.ctx.Recorder(ing.Namespace).Eventf(ing, apiv1.EventTypeNormal, "XPN", fwErr.Message) - } else { - return err - } - } - // If NEG enabled, link the backend services to the NEGs. if lbc.ctx.NEGEnabled { for _, svcPort := range ingSvcPorts { @@ -434,7 +402,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing // toRuntimeInfo returns L7RuntimeInfo for the given ingress. func (lbc *LoadBalancerController) toRuntimeInfo(ing *extensions.Ingress, urlMap *utils.GCEURLMap) (*loadbalancers.L7RuntimeInfo, error) { - k, err := keyFunc(ing) + k, err := utils.KeyFunc(ing) if err != nil { return nil, fmt.Errorf("cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err) } diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 6834819e73..fad23349cd 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -50,7 +50,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalanc BackendConfigEnabled: false, Namespace: api_v1.NamespaceAll, ResyncPeriod: 1 * time.Minute, - DefaultBackendSvcPortID: testDefaultBeSvcPort.ID, + DefaultBackendSvcPortID: test.DefaultBeSvcPort.ID, } ctx := context.NewControllerContext(kubeClient, backendConfigClient, cm.fakeBackends, ctxConfig) lbc := NewLoadBalancerController(ctx, cm.ClusterManager, stopCh) @@ -58,7 +58,7 @@ func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalanc lbc.hasSynced = func() bool { return true } // Create the default-backend service. - defaultSvc := test.NewService(testDefaultBeSvcPort.ID.Service, api_v1.ServiceSpec{ + defaultSvc := test.NewService(test.DefaultBeSvcPort.ID.Service, api_v1.ServiceSpec{ Type: api_v1.ServiceTypeNodePort, Ports: []api_v1.ServicePort{ { @@ -98,7 +98,7 @@ func deleteIngress(lbc *LoadBalancerController, ing *extensions.Ingress) { // getKey returns the key for an ingress. func getKey(ing *extensions.Ingress, t *testing.T) string { - key, err := keyFunc(ing) + key, err := utils.KeyFunc(ing) if err != nil { t.Fatalf("Unexpected error getting key for Ingress %v: %v", ing.Name, err) } @@ -192,7 +192,7 @@ func TestEnsureMCIngress(t *testing.T) { addIngress(lbc, ing) ingStoreKey := getKey(ing, t) - if err := lbc.ensureIngress(ing, []string{"node-a", "node-b"}, []utils.ServicePort{}); err != nil { + if err := lbc.ensureIngress(ing, []string{"node-a", "node-b"}); err != nil { t.Fatalf("lbc.sync(%v) = err %v", ingStoreKey, err) } diff --git a/pkg/controller/fakes.go b/pkg/controller/fakes.go index 2f5dc71e31..afdf8a1816 100644 --- a/pkg/controller/fakes.go +++ b/pkg/controller/fakes.go @@ -17,30 +17,20 @@ limitations under the License. package controller import ( - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" - "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" - "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/neg" + "k8s.io/ingress-gce/pkg/test" "k8s.io/ingress-gce/pkg/utils" ) var ( - testBackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} - testDefaultBeSvcPort = utils.ServicePort{ - ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "system", Name: "default"}, Port: testBackendPort}, - NodePort: 30000, - Protocol: annotations.ProtocolHTTP, - } - testSrcRanges = []string{"1.1.1.1/20"} - testNodePortRanges = []string{"30000-32767"} + testSrcRanges = []string{"1.1.1.1/20"} ) // ClusterManager fake @@ -64,20 +54,18 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager nodePool := instances.NewNodePool(fakeIGs, namer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) - healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", "/healthz", namer, testDefaultBeSvcPort.ID.Service) + healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", "/healthz", namer, test.DefaultBeSvcPort.ID.Service) backendPool := backends.NewBackendPool( fakeBackends, fakeNEG, healthChecker, nodePool, namer, false, false) l7Pool := loadbalancers.NewLoadBalancerPool(fakeLbs, namer) - frPool := firewalls.NewFirewallPool(firewalls.NewFakeFirewallsProvider(false, false), namer, testSrcRanges, testNodePortRanges) cm := &ClusterManager{ ClusterNamer: namer, instancePool: nodePool, backendPool: backendPool, l7Pool: l7Pool, - firewallPool: frPool, } return &fakeClusterManager{cm, fakeLbs, fakeBackends, fakeIGs, namer} } diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 9326bb7b30..d5014b32ca 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -50,8 +50,12 @@ func NewNodeController(ctx *context.ControllerContext, cm *ClusterManager) *Node c.queue = utils.NewPeriodicTaskQueue("nodes", c.sync) ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: c.queue.Enqueue, - DeleteFunc: c.queue.Enqueue, + AddFunc: func(obj interface{}) { + c.queue.Enqueue(obj) + }, + DeleteFunc: func(obj interface{}) { + c.queue.Enqueue(obj) + }, UpdateFunc: func(oldObj, newObj interface{}) { if nodeStatusChanged(oldObj.(*apiv1.Node), newObj.(*apiv1.Node)) { c.queue.Enqueue(newObj) @@ -72,7 +76,7 @@ func (c *NodeController) Shutdown() { } func (c *NodeController) sync(key string) error { - nodeNames, err := getReadyNodeNames(listers.NewNodeLister(c.lister)) + nodeNames, err := utils.GetReadyNodeNames(listers.NewNodeLister(c.lister)) if err != nil { return err } diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 373188bcb6..35ec41b2d2 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -23,14 +23,11 @@ import ( "strings" compute "google.golang.org/api/compute/v1" + extensions "k8s.io/api/extensions/v1beta1" api_v1 "k8s.io/api/core/v1" - extensions "k8s.io/api/extensions/v1beta1" - listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/annotations" - "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" ) @@ -42,94 +39,6 @@ func joinErrs(errs []error) error { return errors.New(strings.Join(errStrs, "; ")) } -// isGCEIngress returns true if the Ingress matches the class managed by this -// controller. -func isGCEIngress(ing *extensions.Ingress) bool { - class := annotations.FromIngress(ing).IngressClass() - if flags.F.IngressClass == "" { - return class == "" || class == annotations.GceIngressClass - } - return class == flags.F.IngressClass -} - -// isGCEMultiClusterIngress returns true if the given Ingress has -// ingress.class annotation set to "gce-multi-cluster". -func isGCEMultiClusterIngress(ing *extensions.Ingress) bool { - class := annotations.FromIngress(ing).IngressClass() - return class == annotations.GceMultiIngressClass -} - -// StoreToIngressLister makes a Store that lists Ingress. -// TODO: Move this to cache/listers post 1.1. -type StoreToIngressLister struct { - cache.Store -} - -// List lists all Ingress' in the store (both single and multi cluster ingresses). -func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { - for _, m := range s.Store.List() { - newIng := m.(*extensions.Ingress) - if isGCEIngress(newIng) || isGCEMultiClusterIngress(newIng) { - ing.Items = append(ing.Items, *newIng) - } - } - return ing, nil -} - -// ListGCEIngresses lists all GCE Ingress' in the store. -func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { - for _, m := range s.Store.List() { - newIng := m.(*extensions.Ingress) - if isGCEIngress(newIng) { - ing.Items = append(ing.Items, *newIng) - } - } - return ing, nil -} - -// GetServiceIngress gets all the Ingress' that have rules pointing to a service. -// Note that this ignores services without the right nodePorts. -func (s *StoreToIngressLister) GetServiceIngress(svc *api_v1.Service, systemDefaultBackend utils.ServicePortID) (ings []extensions.Ingress, err error) { -IngressLoop: - for _, m := range s.Store.List() { - ing := *m.(*extensions.Ingress) - - // Check if system default backend is involved - if ing.Spec.Backend == nil && systemDefaultBackend.Service.Name == svc.Name && systemDefaultBackend.Service.Namespace == svc.Namespace { - ings = append(ings, ing) - continue - } - - if ing.Namespace != svc.Namespace { - continue - } - - // Check service of default backend - if ing.Spec.Backend != nil && ing.Spec.Backend.ServiceName == svc.Name { - ings = append(ings, ing) - continue - } - - // Check the target service for each path rule - for _, rule := range ing.Spec.Rules { - if rule.IngressRuleValue.HTTP == nil { - continue - } - for _, p := range rule.IngressRuleValue.HTTP.Paths { - if p.Backend.ServiceName == svc.Name { - ings = append(ings, ing) - // Skip the rest of the rules to avoid duplicate ingresses in list - continue IngressLoop - } - } - } - } - if len(ings) == 0 { - err = fmt.Errorf("no ingress for service %v", svc.Name) - } - return -} - // setInstanceGroupsAnnotation sets the instance-groups annotation with names of the given instance groups. func setInstanceGroupsAnnotation(existing map[string]string, igs []*compute.InstanceGroup) error { type Value struct { @@ -161,22 +70,6 @@ func uniq(svcPorts []utils.ServicePort) []utils.ServicePort { return svcPorts } -// getReadyNodeNames returns names of schedulable, ready nodes from the node lister. -func getReadyNodeNames(lister listers.NodeLister) ([]string, error) { - var nodeNames []string - nodes, err := lister.ListWithPredicate(utils.NodeIsReady) - if err != nil { - return nodeNames, err - } - for _, n := range nodes { - if n.Spec.Unschedulable { - continue - } - nodeNames = append(nodeNames, n.Name) - } - return nodeNames, nil -} - func nodeStatusChanged(old, cur *api_v1.Node) bool { if old.Spec.Unschedulable != cur.Spec.Unschedulable { return true @@ -186,3 +79,10 @@ func nodeStatusChanged(old, cur *api_v1.Node) bool { } return false } + +func convert(ings []*extensions.Ingress) (retVal []interface{}) { + for _, ing := range ings { + retVal = append(retVal, ing) + } + return +} diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index e92f7993ec..179cd325e3 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/flags" "k8s.io/ingress-gce/pkg/utils" ) @@ -38,7 +39,7 @@ import ( var firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC) func TestZoneListing(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, firewalls.DefaultFirewallName) lbc := newLoadBalancerController(t, cm) zoneToNode := map[string][]string{ "zone-1": {"n1"}, @@ -63,7 +64,7 @@ func TestZoneListing(t *testing.T) { } func TestInstancesAddedToZones(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) + cm := NewFakeClusterManager(flags.DefaultClusterUID, firewalls.DefaultFirewallName) lbc := newLoadBalancerController(t, cm) zoneToNode := map[string][]string{ "zone-1": {"n1", "n2"}, diff --git a/pkg/firewalls/controller.go b/pkg/firewalls/controller.go new file mode 100644 index 0000000000..275876aff9 --- /dev/null +++ b/pkg/firewalls/controller.go @@ -0,0 +1,183 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/glog" + apiv1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/controller/translator" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" +) + +var ( + // queueKey is a "fake" key which can be enqueued to a task queue. + queueKey = &extensions.Ingress{ + ObjectMeta: metav1.ObjectMeta{Name: "queueKey"}, + } +) + +// FirewallController synchronizes the firewall rule for all ingresses. +type FirewallController struct { + ctx *context.ControllerContext + firewallPool SingleFirewallPool + queue utils.TaskQueue + joiner *utils.Joiner + ingLister utils.StoreToIngressLister + translator *translator.Translator + nodeLister cache.Indexer + hasSynced func() bool +} + +// NewFirewallController returns a new firewall controller. +func NewFirewallController( + ctx *context.ControllerContext, + namer *utils.Namer, + portRanges []string) *FirewallController { + + firewallPool := NewFirewallPool(ctx.Cloud, namer, gce.LoadBalancerSrcRanges(), portRanges) + + fwc := &FirewallController{ + ctx: ctx, + firewallPool: firewallPool, + ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, + translator: translator.NewTranslator(namer, ctx), + nodeLister: ctx.NodeInformer.GetIndexer(), + hasSynced: ctx.HasSynced, + } + + fwc.queue = utils.NewPeriodicTaskQueue("firewall", fwc.sync) + fwc.joiner = utils.NewJoiner(fwc.ingLister, ctx.ServiceInformer.GetIndexer(), ctx.DefaultBackendSvcPortID) + + // Ingress event handlers. + ctx.IngressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + addIng := obj.(*extensions.Ingress) + if !utils.IsGCEIngress(addIng) && !utils.IsGCEMultiClusterIngress(addIng) { + return + } + fwc.queue.Enqueue(queueKey) + }, + DeleteFunc: func(obj interface{}) { + delIng := obj.(*extensions.Ingress) + if !utils.IsGCEIngress(delIng) && !utils.IsGCEMultiClusterIngress(delIng) { + return + } + fwc.queue.Enqueue(queueKey) + }, + UpdateFunc: func(old, cur interface{}) { + curIng := cur.(*extensions.Ingress) + if !utils.IsGCEIngress(curIng) && !utils.IsGCEMultiClusterIngress(curIng) { + return + } + fwc.queue.Enqueue(queueKey) + }, + }) + + // Service event handlers. + ctx.ServiceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svc := obj.(*apiv1.Service) + ings := fwc.joiner.IngressesForService(svc) + if len(ings) > 0 { + fwc.queue.Enqueue(queueKey) + } + }, + UpdateFunc: func(old, cur interface{}) { + if !reflect.DeepEqual(old, cur) { + svc := cur.(*apiv1.Service) + ings := fwc.joiner.IngressesForService(svc) + if len(ings) > 0 { + fwc.queue.Enqueue(queueKey) + } + } + }, + }) + + return fwc +} + +// ToSvcPorts is a helper method over translator.TranslateIngress to process a list of ingresses. +// TODO(rramkumar): This is a copy of code in controller.go. Extract this into +// something shared. +func (fwc *FirewallController) ToSvcPorts(ings *extensions.IngressList) []utils.ServicePort { + var knownPorts []utils.ServicePort + for _, ing := range ings.Items { + urlMap, _ := fwc.translator.TranslateIngress(&ing, fwc.ctx.DefaultBackendSvcPortID) + knownPorts = append(knownPorts, urlMap.AllServicePorts()...) + } + return knownPorts +} + +func (fwc *FirewallController) Run(stopCh chan struct{}) { + defer fwc.shutdown() + fwc.queue.Run(time.Second, stopCh) +} + +// This should only be called when the process is being terminated. +func (fwc *FirewallController) shutdown() { + glog.Infof("Shutting down Firewall Controller") + fwc.queue.Shutdown() +} + +func (fwc *FirewallController) sync(key string) error { + if !fwc.hasSynced() { + time.Sleep(context.StoreSyncPollPeriod) + return fmt.Errorf("waiting for stores to sync") + } + glog.V(3).Infof("Syncing firewall") + + gceIngresses, err := fwc.ingLister.ListGCEIngresses() + if err != nil { + return err + } + // If there are no more ingresses, then delete the firewall rule. + if len(gceIngresses.Items) == 0 { + fwc.firewallPool.GC() + return nil + } + + // gceSvcPorts contains the ServicePorts used by only single-cluster ingress. + gceSvcPorts := fwc.ToSvcPorts(&gceIngresses) + nodeNames, err := utils.GetReadyNodeNames(listers.NewNodeLister(fwc.nodeLister)) + if err != nil { + return err + } + negPorts := fwc.translator.GatherEndpointPorts(gceSvcPorts) + + // Ensure firewall rule for the cluster and pass any NEG endpoint ports. + if err := fwc.firewallPool.Sync(nodeNames, negPorts...); err != nil { + if fwErr, ok := err.(*FirewallXPNError); ok { + // XPN: Raise an event on each ingress + for _, ing := range gceIngresses.Items { + fwc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeNormal, "XPN", fwErr.Message) + } + } else { + return err + } + } + return nil +} diff --git a/pkg/firewalls/controller_test.go b/pkg/firewalls/controller_test.go new file mode 100644 index 0000000000..ccc81a34d1 --- /dev/null +++ b/pkg/firewalls/controller_test.go @@ -0,0 +1,105 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package firewalls + +import ( + "testing" + "time" + + api_v1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes/fake" + backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" + test "k8s.io/ingress-gce/pkg/test" + "k8s.io/ingress-gce/pkg/utils" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/ingress-gce/pkg/context" +) + +// newFirewallController creates a firewall controller. +func newFirewallController() *FirewallController { + kubeClient := fake.NewSimpleClientset() + backendConfigClient := backendconfigclient.NewSimpleClientset() + fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) + + ctxConfig := context.ControllerContextConfig{ + NEGEnabled: true, + BackendConfigEnabled: false, + Namespace: api_v1.NamespaceAll, + ResyncPeriod: 1 * time.Minute, + DefaultBackendSvcPortID: test.DefaultBeSvcPort.ID, + } + + ctx := context.NewControllerContext(kubeClient, backendConfigClient, fakeGCE, ctxConfig) + fwc := NewFirewallController(ctx, namer, []string{"30000-32767"}) + fwc.hasSynced = func() bool { return true } + + return fwc +} + +// TestFirewallCreateDelete asserts that `sync` will ensure the L7 firewall with +// the correct ports. It also asserts that when no ingresses exist, that the +// firewall rule is deleted. +func TestFirewallCreateDelete(t *testing.T) { + fwc := newFirewallController() + + // Create the default-backend service. + defaultSvc := test.NewService(test.DefaultBeSvcPort.ID.Service, api_v1.ServiceSpec{ + Type: api_v1.ServiceTypeNodePort, + Ports: []api_v1.ServicePort{ + { + Name: "http", + Port: 80, + NodePort: 30000, + }, + }, + }) + + fwc.ctx.KubeClient.CoreV1().Services(defaultSvc.Namespace).Create(defaultSvc) + fwc.ctx.ServiceInformer.GetIndexer().Add(defaultSvc) + + ing := test.NewIngress(types.NamespacedName{Name: "my-ingress", Namespace: "default"}, extensions.IngressSpec{}) + fwc.ctx.KubeClient.Extensions().Ingresses(ing.Namespace).Create(ing) + fwc.ctx.IngressInformer.GetIndexer().Add(ing) + + key, _ := utils.KeyFunc(queueKey) + if err := fwc.sync(key); err != nil { + t.Fatalf("fwc.sync() = %v, want nil", err) + } + + // Verify a firewall rule was created. + _, err := fwc.ctx.Cloud.GetFirewall(ruleName) + if err != nil { + t.Fatalf("cloud.GetFirewall(%v) = _, %v, want _, nil", ruleName, err) + } + + fwc.ctx.KubeClient.Extensions().Ingresses(ing.Namespace).Delete(ing.Name, &meta_v1.DeleteOptions{}) + fwc.ctx.IngressInformer.GetIndexer().Delete(ing) + + if err := fwc.sync(key); err != nil { + t.Fatalf("fwc.sync() = %v, want nil", err) + } + + // Verify the firewall rule was deleted. + _, err = fwc.ctx.Cloud.GetFirewall(ruleName) + if !utils.IsNotFoundError(err) { + t.Fatalf("cloud.GetFirewall(%v) = _, %v, want _, 404 error", ruleName, err) + } +} diff --git a/pkg/firewalls/firewalls.go b/pkg/firewalls/firewalls.go index a0abdc59c9..4e5b6669c2 100644 --- a/pkg/firewalls/firewalls.go +++ b/pkg/firewalls/firewalls.go @@ -31,11 +31,19 @@ import ( "k8s.io/ingress-gce/pkg/utils" ) +const ( + // DefaultFirewallName is the name to use for firewall rules created + // by an L7 controller when --firewall-rule is not used. + DefaultFirewallName = "" +) + // FirewallRules manages firewall rules. type FirewallRules struct { - cloud Firewall - namer *utils.Namer - srcRanges []string + cloud Firewall + namer *utils.Namer + srcRanges []string + // TODO(rramkumar): Eliminate this variable. We should just pass in + // all the port ranges to open with each call to Sync() portRanges []string } @@ -100,8 +108,8 @@ func (fr *FirewallRules) Sync(nodeNames []string, additionalPorts ...string) err return fr.updateFirewall(expectedFirewall) } -// Shutdown shuts down this firewall rules manager. -func (fr *FirewallRules) Shutdown() error { +// GC deletes the firewall rule. +func (fr *FirewallRules) GC() error { name := fr.namer.FirewallRule() glog.V(3).Infof("Deleting firewall %q", name) return fr.deleteFirewall(name) diff --git a/pkg/firewalls/firewalls_test.go b/pkg/firewalls/firewalls_test.go index 5a794ea152..0374ade000 100644 --- a/pkg/firewalls/firewalls_test.go +++ b/pkg/firewalls/firewalls_test.go @@ -123,7 +123,7 @@ func TestFirewallPoolSyncPorts(t *testing.T) { verifyFirewallRule(fwp, ruleName, nodes, srcRanges, append(portRanges(), negTargetports...), t) } -func TestFirewallPoolShutdown(t *testing.T) { +func TestFirewallPoolGC(t *testing.T) { fwp := NewFakeFirewallsProvider(false, false) fp := NewFirewallPool(fwp, namer, srcRanges, portRanges()) nodes := []string{"node-a", "node-b", "node-c"} @@ -133,7 +133,7 @@ func TestFirewallPoolShutdown(t *testing.T) { } verifyFirewallRule(fwp, ruleName, nodes, srcRanges, portRanges(), t) - if err := fp.Shutdown(); err != nil { + if err := fp.GC(); err != nil { t.Fatal(err) } @@ -197,7 +197,7 @@ func TestSyncXPNReadOnly(t *testing.T) { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } - err = fp.Shutdown() + err = fp.GC() if fwErr, ok := err.(*FirewallXPNError); !ok || !strings.Contains(fwErr.Message, "delete") { t.Errorf("Expected firewall sync error with a user message. Received err: %v", err) } diff --git a/pkg/firewalls/interfaces.go b/pkg/firewalls/interfaces.go index 99398829a0..7ff53616e6 100644 --- a/pkg/firewalls/interfaces.go +++ b/pkg/firewalls/interfaces.go @@ -23,7 +23,7 @@ import ( // SingleFirewallPool syncs the firewall rule for L7 traffic. type SingleFirewallPool interface { Sync(nodeNames []string, additionalPorts ...string) error - Shutdown() error + GC() error } // Firewall interfaces with the GCE firewall api. diff --git a/pkg/test/utils.go b/pkg/test/utils.go index 7460ff256b..3b49b46b03 100644 --- a/pkg/test/utils.go +++ b/pkg/test/utils.go @@ -7,7 +7,18 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" + "k8s.io/ingress-gce/pkg/annotations" backendconfig "k8s.io/ingress-gce/pkg/apis/backendconfig/v1beta1" + "k8s.io/ingress-gce/pkg/utils" +) + +var ( + BackendPort = intstr.IntOrString{Type: intstr.Int, IntVal: 80} + DefaultBeSvcPort = utils.ServicePort{ + ID: utils.ServicePortID{Service: types.NamespacedName{Namespace: "system", Name: "default"}, Port: BackendPort}, + NodePort: 30000, + Protocol: annotations.ProtocolHTTP, + } ) // NewIngress returns an Ingress with the given spec. diff --git a/pkg/utils/join.go b/pkg/utils/join.go new file mode 100644 index 0000000000..5496e4c032 --- /dev/null +++ b/pkg/utils/join.go @@ -0,0 +1,51 @@ +package utils + +import ( + "github.com/golang/glog" + api_v1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" + "k8s.io/client-go/tools/cache" + backendconfigv1beta1 "k8s.io/ingress-gce/pkg/apis/backendconfig/v1beta1" + "k8s.io/ingress-gce/pkg/backendconfig" +) + +// Joiner returns all Ingresses that are linked to another k8s resources +// by performing operations similar to a database "Join". +type Joiner struct { + ingLister StoreToIngressLister + svcLister cache.Indexer + defaultBackendSvcPortID ServicePortID +} + +func NewJoiner(ingLister StoreToIngressLister, svcLister cache.Indexer, defaultBackendSvcPortID ServicePortID) *Joiner { + return &Joiner{ingLister, svcLister, defaultBackendSvcPortID} +} + +// IngressesForService gets all the Ingresses that reference a Service. +func (j *Joiner) IngressesForService(svc *api_v1.Service) (ingList []*extensions.Ingress) { + ings, err := j.ingLister.GetServiceIngress(svc, j.defaultBackendSvcPortID) + if err != nil { + glog.V(4).Infof("ignoring service %v: %v", svc.Name, err) + return + } + for _, ing := range ings { + if !IsGCEIngress(&ing) { + continue + } + ingList = append(ingList, &ing) + } + return +} + +// IngressesForBackendConfig gets all Ingresses that reference (indirectly) a BackendConfig. +// TODO(rramkumar): This can be optimized to remove nested loops. +func (j *Joiner) IngressesForBackendConfig(beConfig *backendconfigv1beta1.BackendConfig) (ingList []*extensions.Ingress) { + // Get all the Services associated with this BackendConfig. + linkedSvcs := backendconfig.GetServicesForBackendConfig(j.svcLister, beConfig) + // Enqueue all the Ingresses associated with each Service. + for _, svc := range linkedSvcs { + ingsForSvc := j.IngressesForService(svc) + ingList = append(ingList, ingsForSvc...) + } + return +} diff --git a/pkg/utils/taskqueue.go b/pkg/utils/taskqueue.go index e83b23476a..f241dece9f 100644 --- a/pkg/utils/taskqueue.go +++ b/pkg/utils/taskqueue.go @@ -25,10 +25,14 @@ import ( "k8s.io/client-go/util/workqueue" ) +var ( + KeyFunc = cache.DeletionHandlingMetaNamespaceKeyFunc +) + // TaskQueue is a rate limited operation queue. type TaskQueue interface { Run(period time.Duration, stopCh <-chan struct{}) - Enqueue(obj interface{}) + Enqueue(objs ...interface{}) Shutdown() } @@ -54,15 +58,17 @@ func (t *PeriodicTaskQueue) Run(period time.Duration, stopCh <-chan struct{}) { wait.Until(t.worker, period, stopCh) } -// Enqueue a key to the work queue. -func (t *PeriodicTaskQueue) Enqueue(obj interface{}) { - key, err := t.keyFunc(obj) - if err != nil { - glog.Errorf("Couldn't get key for object %+v (type %T): %v", obj, obj, err) - return +// Enqueue one or more keys to the work queue. +func (t *PeriodicTaskQueue) Enqueue(objs ...interface{}) { + for _, obj := range objs { + key, err := t.keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v (type %T): %v", obj, obj, err) + return + } + glog.V(4).Infof("Enqueue key=%q (%v)", key, t.resource) + t.queue.Add(key) } - glog.V(4).Infof("Enqueue key=%q (%v)", key, t.resource) - t.queue.Add(key) } // Shutdown shuts down the work queue and waits for the worker to ACK @@ -97,7 +103,7 @@ func (t *PeriodicTaskQueue) worker() { func NewPeriodicTaskQueue(resource string, syncFn func(string) error) *PeriodicTaskQueue { return &PeriodicTaskQueue{ resource: resource, - keyFunc: cache.DeletionHandlingMetaNamespaceKeyFunc, + keyFunc: KeyFunc, queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan struct{}), diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index eb0d14eecb..450358b14f 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -25,7 +25,13 @@ import ( compute "google.golang.org/api/compute/v1" "google.golang.org/api/googleapi" + api_v1 "k8s.io/api/core/v1" + extensions "k8s.io/api/extensions/v1beta1" "k8s.io/apimachinery/pkg/types" + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/flags" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" ) @@ -241,3 +247,107 @@ func IGLinks(igs []*compute.InstanceGroup) (igLinks []string) { } return } + +// IsGCEIngress returns true if the Ingress matches the class managed by this +// controller. +func IsGCEIngress(ing *extensions.Ingress) bool { + class := annotations.FromIngress(ing).IngressClass() + if flags.F.IngressClass == "" { + return class == "" || class == annotations.GceIngressClass + } + return class == flags.F.IngressClass +} + +// IsGCEMultiClusterIngress returns true if the given Ingress has +// ingress.class annotation set to "gce-multi-cluster". +func IsGCEMultiClusterIngress(ing *extensions.Ingress) bool { + class := annotations.FromIngress(ing).IngressClass() + return class == annotations.GceMultiIngressClass +} + +// StoreToIngressLister makes a Store that lists Ingress. +type StoreToIngressLister struct { + cache.Store +} + +// List lists all Ingress' in the store (both single and multi cluster ingresses). +func (s *StoreToIngressLister) ListAll() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if IsGCEIngress(newIng) || IsGCEMultiClusterIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// ListGCEIngresses lists all GCE Ingress' in the store. +func (s *StoreToIngressLister) ListGCEIngresses() (ing extensions.IngressList, err error) { + for _, m := range s.Store.List() { + newIng := m.(*extensions.Ingress) + if IsGCEIngress(newIng) { + ing.Items = append(ing.Items, *newIng) + } + } + return ing, nil +} + +// GetServiceIngress gets all the Ingress' that have rules pointing to a service. +// Note that this ignores services without the right nodePorts. +func (s *StoreToIngressLister) GetServiceIngress(svc *api_v1.Service, systemDefaultBackend ServicePortID) (ings []extensions.Ingress, err error) { +IngressLoop: + for _, m := range s.Store.List() { + ing := *m.(*extensions.Ingress) + + // Check if system default backend is involved + if ing.Spec.Backend == nil && systemDefaultBackend.Service.Name == svc.Name && systemDefaultBackend.Service.Namespace == svc.Namespace { + ings = append(ings, ing) + continue + } + + if ing.Namespace != svc.Namespace { + continue + } + + // Check service of default backend + if ing.Spec.Backend != nil && ing.Spec.Backend.ServiceName == svc.Name { + ings = append(ings, ing) + continue + } + + // Check the target service for each path rule + for _, rule := range ing.Spec.Rules { + if rule.IngressRuleValue.HTTP == nil { + continue + } + for _, p := range rule.IngressRuleValue.HTTP.Paths { + if p.Backend.ServiceName == svc.Name { + ings = append(ings, ing) + // Skip the rest of the rules to avoid duplicate ingresses in list + continue IngressLoop + } + } + } + } + if len(ings) == 0 { + err = fmt.Errorf("no ingress for service %v", svc.Name) + } + return +} + +// GetReadyNodeNames returns names of schedulable, ready nodes from the node lister. +// TODO(rramkumar): Add a test for this. +func GetReadyNodeNames(lister listers.NodeLister) ([]string, error) { + var nodeNames []string + nodes, err := lister.ListWithPredicate(NodeIsReady) + if err != nil { + return nodeNames, err + } + for _, n := range nodes { + if n.Spec.Unschedulable { + continue + } + nodeNames = append(nodeNames, n.Name) + } + return nodeNames, nil +}