diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index 4616914c40..81b2c9b048 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -45,6 +45,7 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller" "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/storage" @@ -249,10 +250,11 @@ func main() { SvcPort: intstr.FromInt(int(port)), } + var namer *utils.Namer var cloud *gce.GCECloud if *inCluster || *useRealCloud { // Create cluster manager - namer, err := newNamer(kubeClient, *clusterName, controller.DefaultFirewallName) + namer, err = newNamer(kubeClient, *clusterName, controller.DefaultFirewallName) if err != nil { glog.Fatalf("%v", err) } @@ -287,7 +289,7 @@ func main() { clusterManager = controller.NewFakeClusterManager(*clusterName, controller.DefaultFirewallName).ClusterManager } - ctx := controller.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod) + ctx := context.NewControllerContext(kubeClient, *watchNamespace, *resyncPeriod, false) // Start loadbalancer controller lbc, err := controller.NewLoadBalancerController(kubeClient, ctx, clusterManager) diff --git a/pkg/context/context.go b/pkg/context/context.go new file mode 100644 index 0000000000..e9fa3b92fb --- /dev/null +++ b/pkg/context/context.go @@ -0,0 +1,60 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package context + +import ( + informerv1 "k8s.io/client-go/informers/core/v1" + informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + "time" +) + +// ControllerContext holds +type ControllerContext struct { + IngressInformer cache.SharedIndexInformer + ServiceInformer cache.SharedIndexInformer + PodInformer cache.SharedIndexInformer + NodeInformer cache.SharedIndexInformer + EndpointInformer cache.SharedIndexInformer + // Stop is the stop channel shared among controllers + StopCh chan struct{} +} + +func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext { + context := &ControllerContext{ + IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + StopCh: make(chan struct{}), + } + if enableEndpointsInformer { + context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + } + return context +} + +func (ctx *ControllerContext) Start() { + go ctx.IngressInformer.Run(ctx.StopCh) + go ctx.ServiceInformer.Run(ctx.StopCh) + go ctx.PodInformer.Run(ctx.StopCh) + go ctx.NodeInformer.Run(ctx.StopCh) + if ctx.EndpointInformer != nil { + go ctx.EndpointInformer.Run(ctx.StopCh) + } +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 325418f3f3..5911d42667 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -27,15 +27,13 @@ import ( apiv1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - informerv1 "k8s.io/client-go/informers/core/v1" - informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" scheme "k8s.io/client-go/kubernetes/scheme" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - + "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/loadbalancers" ) @@ -55,33 +53,6 @@ var ( storeSyncPollPeriod = 5 * time.Second ) -// ControllerContext holds -type ControllerContext struct { - IngressInformer cache.SharedIndexInformer - ServiceInformer cache.SharedIndexInformer - PodInformer cache.SharedIndexInformer - NodeInformer cache.SharedIndexInformer - // Stop is the stop channel shared among controllers - StopCh chan struct{} -} - -func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration) *ControllerContext { - return &ControllerContext{ - IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - StopCh: make(chan struct{}), - } -} - -func (ctx *ControllerContext) Start() { - go ctx.IngressInformer.Run(ctx.StopCh) - go ctx.ServiceInformer.Run(ctx.StopCh) - go ctx.PodInformer.Run(ctx.StopCh) - go ctx.NodeInformer.Run(ctx.StopCh) -} - // LoadBalancerController watches the kubernetes api and adds/removes services // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { @@ -101,7 +72,7 @@ type LoadBalancerController struct { recorder record.EventRecorder nodeQueue *taskQueue ingQueue *taskQueue - tr *GCETranslator + Translator *GCETranslator stopCh chan struct{} // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and @@ -120,7 +91,7 @@ type LoadBalancerController struct { // - clusterManager: A ClusterManager capable of creating all cloud resources // required for L7 loadbalancing. // - resyncPeriod: Watchers relist from the Kubernetes API server this often. -func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) { +func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *context.ControllerContext, clusterManager *ClusterManager) (*LoadBalancerController, error) { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ @@ -196,7 +167,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, ctx *ControllerC // Nodes are updated every 10s and we don't care, so no update handler. }) - lbc.tr = &GCETranslator{&lbc} + lbc.Translator = &GCETranslator{&lbc} lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client} glog.V(3).Infof("Created new loadbalancer controller") @@ -285,8 +256,8 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return err } - allNodePorts := lbc.tr.toNodePorts(&allIngresses) - gceNodePorts := lbc.tr.toNodePorts(&gceIngresses) + allNodePorts := lbc.Translator.toNodePorts(&allIngresses) + gceNodePorts := lbc.Translator.toNodePorts(&gceIngresses) lbNames := lbc.ingLister.Store.ListKeys() lbs, err := lbc.toRuntimeInfo(gceIngresses) if err != nil { @@ -368,7 +339,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { return syncError } - if urlMap, err := lbc.tr.toURLMap(&ing); err != nil { + if urlMap, err := lbc.Translator.toURLMap(&ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index df46de6a8d..afe3811b62 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/kubernetes/pkg/api" + "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/firewalls" "k8s.io/ingress-gce/pkg/loadbalancers" "k8s.io/ingress-gce/pkg/utils" @@ -53,7 +54,7 @@ func defaultBackendName(clusterName string) string { // newLoadBalancerController create a loadbalancer controller. func newLoadBalancerController(t *testing.T, cm *fakeClusterManager) *LoadBalancerController { kubeClient := fake.NewSimpleClientset() - ctx := NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second) + ctx := context.NewControllerContext(kubeClient, api_v1.NamespaceAll, 1*time.Second, true) lb, err := NewLoadBalancerController(kubeClient, ctx, cm.ClusterManager) if err != nil { t.Fatalf("%v", err) diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index ff0b8d32f4..c86fbb17fb 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -43,7 +43,7 @@ func TestZoneListing(t *testing.T) { "zone-2": {"n2"}, } addNodes(lbc, zoneToNode) - zones, err := lbc.tr.ListZones() + zones, err := lbc.Translator.ListZones() if err != nil { t.Errorf("Failed to list zones: %v", err) } @@ -99,7 +99,7 @@ func TestProbeGetter(t *testing.T) { } addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.GetProbe(p) + got, err := lbc.Translator.GetProbe(p) if err != nil || got == nil { t.Errorf("Failed to get probe for node port %v: %v", p, err) } else if getProbePath(got) != exp { @@ -121,7 +121,7 @@ func TestProbeGetterNamedPort(t *testing.T) { pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} } for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.GetProbe(p) + got, err := lbc.Translator.GetProbe(p) if err != nil || got == nil { t.Errorf("Failed to get probe for node port %v: %v", p, err) } else if getProbePath(got) != exp { @@ -172,7 +172,7 @@ func TestProbeGetterCrossNamespace(t *testing.T) { addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) for p, exp := range nodePortToHealthCheck { - got, err := lbc.tr.GetProbe(p) + got, err := lbc.Translator.GetProbe(p) if err != nil || got == nil { t.Errorf("Failed to get probe for node port %v: %v", p, err) } else if getProbePath(got) != exp { @@ -254,7 +254,7 @@ func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { lbc.nodeLister.Indexer.Add(n) } } - lbc.CloudClusterManager.instancePool.Init(lbc.tr) + lbc.CloudClusterManager.instancePool.Init(lbc.Translator) } func getProbePath(p *api_v1.Probe) string {